Spark 您所在的位置:网站首页 kafka环境变量 Spark

Spark

2023-03-07 10:38| 来源: 网络整理| 查看: 265

前段时间学习了spark streaming采用kafka作为数据源时,数据接收并行度这一部分的源代码。本文主要将学习的体会记录一下,有理解不对的地方请多多指教。

Streaming从kafka接收数据有Receiver和direct两种方式。下面我们看一下这两种方式的源码。

Direct approach

这种方式是使用kafka的低阶API从kafka消费数据。一般如果需要自行维护partition的offset,实现自定义checkpoint文件,或者exactlyOnce场景下就会用到这一方式。

首先需要看一下DirectKafkaInputDStream这个类,他是我们调用KafkaUtil.

createDirectStream方法生成的用来从kafka端接收数据的。

compute方法定义了InputDStream是如何根据指定的batchTime生成RDD的。

latestLeaderOffsets方法是获取当前InputDStream所包含的topic下所有的partition的最新offset的。Clamp方法是根据spark.streaming.kafka.maxRatePerPartition和backpressure这两个参数来设置当前block可以消费到的offset的(即untilOffset)。这个数值需要跟partition最新的offset取最小值。

maxMessagesPerPartition方法实现了获取某个partition能消费到的message的数量。该方法首先会计算一个每分区每秒钟消费的消息数上线effectiveRateLimitPerPartition,他的value如下图红框中,是在spark.streaming.kafka.maxRatePerPartition和batckpressure中取一个最小值,如果只配置了一个则以配置的为准,都没配置则返回None,返回None时直接取leader最新的offset。然后再根据batchTime计算出某partition在batchTime内能消费的消息数上限。

其中backpressure是spark1.5版本之后增加的参数,能够根据上一个batch的执行效率,动态估算出当前batch能处理的最大消息数。这个参数在每个batch计算完成后,会通过StreamingListenerBus监听StreamingListenerBatchCompleted事件,然后由org.apache.spark.streaming.scheduler.

onBatchCompleted方法来重新计算,如下:

Backpressure的具体实现思路先不展开了(计算公式在PIDRateEstimator.compute方法中)。我们回到DirectKafkaInputDStream.compute方法。当计算完每个partition的untilOffset之后,会根据当前InputDStream所消费的topic的每个partition的currentOffset和untilOffset构建KafkaRDD。

在kafkaRDD中我们可以看到他重写的一些RDD的方法,

在getPartitions方法中可以看到,KafkaRDD的partition个数就是topic的partition个数之和。

在getPreferredLocations方法中可以看到,partition的首选location就是该topic的某个partition的leader所在的host。这是很合理的,因为leader上的数据正常情况下是最新的而且是最准确的。而follower的数据往往还需要从leader上做同步,并且一旦同步出现较大的落后,还会从in-sync列表中移除。而且kafka的读写都是通过leader进行的。

关于方法中part.host可以一路反推回去,会跟踪到KafkaCluster.getLeaderOffsets方法中调用的findLeaders方法,即part.host就是leader的host。

compute方法是RDD用来构建一个partition的数据的。

我们看一下用来从partition中获取数据的KafkaRDDIterator类。在类体中会发现

val consumer = connectLeader

的代码,这说明一点,spark streaming的kafka低阶API是每一个partition起一个consumer来消费数据的。

然后我们看一下fetchBatch方法。该方法中是我们很熟悉的一段根据起止offset消费kafka某topic某partition数据的代码。

通过kafkaRDD这个类的阅读我们可以看出,接收数据是以partition的leader为维度做分布式的,这样做可以保证这个host上是有我要消费的数据的,能够实现数据本地化。

Receiver

这种方式是采用kafka的高阶API来消费数据的。

建立InputDStream的代码如下:

从KafkaUtils.createStream开始跟到KafkaInputDStream类,

getReceiver()方法中的变量useReliableReceiver是判断是否配置了WAL机制。如下:

我们看一下KafkaReceiver的实现代码:

在他的onStart()方法中可以看到他是创建了一个线程池executorPool来消费消息的。而这个线程池的线程数,就是我们在KafkaUtils.createStream时的入参onlineStaffTopicMap的values的和。也就是说入参onlineStaffTopicMap的value指的是某个topic在这个InputDStream中会有多少个consumer去消费数据。

再看一下MessageHandler中消费及保存数据的逻辑:

这段代码中streamIterator是被我们所喜闻乐见的使用高阶API从kafka消费数据的代码。在代码中消费完数据之后,调用了store方法将message进行了保存。

Store方法最终会将这条消息addData到BlockGenerator类中的currentBuffer:

ArrayBuffer中。

该类中的updateCurrentBuffer方法值得我们关注一下,他是用来将已经收集到的消息封装成一个Block的。

那么这个方法什么情况下会被调用呢,需要看一下blockIntervalTimer的实现类RecurringTimer。

RecurringTimer是一个定时重复执行高阶函数callback的执行器,他是通过Thread反复执行loop方法实现的,loop方法中只要定时器不被终止,就会反复调用triggerActionForNextInterval方法,而triggerActionForNextInterval会在特定的时刻(即nextTime)执行callback函数(即入参updateCurrentBuffer函数)。执行完成之后会在nextTime上增加period作为下一次执行的时刻。

而period方法是什么呢,他就是我们在构建blockIntervalTimer时的入参blockIntervalMs,也就是streaming性能的一个优化点spark.streaming.blockInterval。也就是说,这段代码的逻辑是每间隔blockInterval将由consumer消费到的数据切分成一个block。由此我们可以看到,这个参数是用来将Batch中所接受到的数据以它为时间间隔切分为block,而在streaming处理数据时,会将block作为一个partition来进行分布式计算,也就是说我们在指定的batchTime中,根据blockInterval能切出多少个block,就能分成多少个partition,从而决定了streaming处理时的分布式程度。这一段代码如下:

具体为什么我们说一个block会作为一个partition来进行计算,这一点可以看一下ReceiverInputDStream类的compute方法,该方法调用了createBlockRDD方法来创建基于Receiver模式的RDD。在该方法中可以看到最终封装的RDD为BlockRDD或者WriteAheadLogBackedBlockRDD。

BlockRDD类中getPartitions方法是说将这个batch的blocks作为partitions。Compute方法则按照入参BlockRDDPartition的blockId,从blockManager中获取该block作为partition的数据。getPreferredLocations则是将BlockRDDPartition所在的host作为partition的首选位置。

总结

通过阅读源码我们可以看出,direct的方式是从kafka消费完数据之后直接封装成partition的数据提供给作业使用,而receiver是将消费到数据按照blockInterval切分成block,保存到blockManager中,在使用时会根据blockId获取该数据。

另外direct的方式rdd的partition与topic的partition是一一对应的,如果某个topic只有一个partition就不好了。而receiver的partition是根据blockInterval切分出来的,blockInterval的默认值是200ms,不存在这个问题。

这两种方式在生产环境上用的都比较多,我们一开始采用的是receiver的方式。后来为了实现自定义checkpoint,改为了direct的方式。



【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

    专题文章
      CopyRight 2018-2019 实验室设备网 版权所有