本文基于spark streaming通过direct mode访问kafka的场景,从源码出发分析spark streaming如何实现数据读取的限流和反压。
我们知道,KafkaUtils.createDirectStream方法用于创建direct mode访问kafka的InputDStream.
从源码可以看到,所有KafkaUtils.createDirectStream方法重载最终都会调用下面这个方法创建DirectKafkaInputDStream对象:
1 DirectKafkaInputDStream如何生成KafkaRDD?
先看看DirectKafkaInputDStream类图:
DirectKafkaInputDstream.compute方法用于生成指定时间batch的KafkaRDD,这个RDD会从kafka各个分区抽取指定offset range的数据。
而这个offet range由以下三个因素决定:
1. 已消费到的最新offsets值,即DirectKafkaInputDStream.currentOffsets
2. 当前batch应该消费的最大消息数量,即DirectKafkaInputDStream.maxMessagesPerPartition的返回值
3. 当前kafka分区的最大可用的offset,即DirectKafkaInputDStream.latestOffsets的返回值
DirectKafkaInputDstream.compute会调用DirectKafkaInputDstream.clamp方法计算得到每个分区offset range的end offset,计算的方式就是currentOffsets中各个分区的begin offset加上maxMessagesPerPartition中对应分区的消息数,再和latestOffsets中对应分区的latest offset做比较,取更小值。
其中,clamp的参数offets就是DirectKafkaInputDStream.latestOffsets的返回值。
最后,DirectKafkaInputDstream.compute会将各个分区的end offsets和currentOffets中的begin offsets组成各个分区的offset ranges,用以生成KafkaRDD对象:
2 DirectKafkaInputDStream如何实现限流反压?
2.1 rateController的生成
Spark Streaming限流反压机制主要在DirectKafkaInputDStream.maxMessagesPerPartition方法中实现,其依赖于rateController对象计算最新的数据消费速率(即每秒消费的消息数量),先看看DirectKafkaInputDStream中的rateController对象是如何生成的:
可以看到,DirectKafkaInputDStream使用的是DirectKafkaRateController.
这里的RateController.isBackPressureEnabled会check配置参数spark.streaming.backpressure.enabled, 默认为false.
接着看RateEstimator.create方法:
可以看到,目前(spark 2.4.0)spark只有一种rate estimator选择,即PIDRateEstimator.
2.2 DirectKafkaInputDStream.maxMessagesPerPartition
下面我们看看DirectKafkaInputDStream.maxMessagesPerPartition做了哪些事情:
1. 调用RateController.getLatestRate方法得到最新的数据消费速率:
注意,这里得到的estimatedRateLimit是在一个batch中每秒从所有kafka分区消费的消息总数量。
2. 计算各个分区的消息延迟量(即各个分区在latestOffsets和currentOffsets中对应分区offset的差值)。
3. 以每个分区的延迟量为权重,计算各个分区的数据消费速率backpressureRate.
4. 当maxRateLimitPerPartition>0时,比较backpressureRate和maxRateLimitPerPartition,取二者更小者作为分区的消费速率(这里正是限流功能的体现)。其中,maxRateLimitPerPartition由参数spark.streaming.kafka.maxRatePerPartition决定,默认为0.
5. 最后,将#4中计算得到的消费速率乘以batch的时长(单位转化为秒),并和minRateLimitPerPartition做比较(应该是和secsPerBatch*minRateLimitPerPartition做比较,此处可能是spark的一个bug),取二者更大者作为分区要消费的消息数。其中,minRateLimitPerPartition由参数spark.streaming.kafka.minRatePerPartition决定,默认为1.
到此,我们分析了在DirectKafkaInputDStream中是如何利用RateController得到消费速率,并根据消费速率计算出指定batch中需要从各个分区消费的offset ranges, 进而生成对应KafkaRDD对象的。
下面,我们就详细讨论一下RateController是如何计算出数据消费速率的。
3 RateController & RateEstimator 如何计算和更新数据消费速率?
先看看RateController和RateEstimator的类图:
从上面的类图可以看到,所有RateController的构造函数都是需要两个参数:streamUID和rateEstimator.
streamUID是stream的唯一ID,由StreamingContext.getNewInputStreamId生成。 RateEstimator是用来估计输入流的消费速率的组件。
从上面的类图可以看出,有两种RateEstimator实现:
1. ConstantEstimator:以一个固定值作为输入流的抽取速率,在spark的内部测试类(如DirectKafkaStreamSuite)中使用。
2. PIDRateEstimator: 以proportional-integral-derivative (PID)方法评估输入流的数据抽取速率,下文会详细介绍。
3.1 RateController
从上面的类图可以看到,所有的RateController都是继承自StreamingListener的,也就是说rateController是在每个batch执行完成后作为一个listener被调用以更新数据消费速率的。
先来看看RateController的onBatchCompleted方法:
先从batchInfo中抽取出四个变量:
1. processingEnd: batch执行结束的时间。
2. workDelay:处理batch数据所消耗的时间(单位是毫秒),也就是batch从开始执行到执行结束的时间。
3. waitDelay:batch的等待时间(单位是毫秒,主要是前一个batch未能在新的batch开始之前完成而导致的延迟),也就是从batch被提交到其第一个job开始执行的时间。
4. elems:在batch的时间段中,RateController对应的stream接受的消息数量。
RateController.computeAndPublish方法以这四个变量为参数,计算并更新数据消费速率:
首先,调用reateEstimator.compute方法计算得到新的数据消费速率,并更新到RateController的rateLimit变量中。
接着,调用publish方法发布更新后的数据消费速率,publish方法由RateController的子类实现。
因为DirectKafkaInputDStream.maxMessagesPerPartition直接调用RateController.getLatestRate方法获取最新的数据消费速率,不需要额外的发布动作。
所以,DirectKafkaRateController的publish方法是一个空实现:
3.2 PIDRateEstimator
从3.1的分析可以看出,数据消费速率的计算是在rateEstimator中完成的,上文也有提到,spark中只有一种可选择的RateEstimator实现类,即PIDRateEstimator (ConstantEstimator仅在测试中使用)。
PIDRateEstimator的主要计算源码如下:
1. 基于batch的消息数量elems和处理时间workDelay,计算出batch的消息处理速率:processingRate
2. 用上一个batch的消息处理速率latestRate减去processingRate,得到两个连续batch的消息处理速率差值:error
3. 基于batch的等待时间waitDelay和processingRate,计算出为了弥补等待时间造成的延迟所需要增加的消息处理速率差值:historicalError(顾名思义,就是因为历史batch处理延迟导致的差值)
4. 基于上一个batch的消息处理速率差值latestError和当前batch的消息处理速率差值error,计算出消息处理速率差值的变化率dError
5. 最后,根据PIDRateEstimator的参数proportional,integral和derivative(这三个参数可分别看作是error,historicalError和dError的权重值),用latestRate减去加权后的这三个error值,再和PIDRateEstimator.minRate做比较取更大者作为新的数据消费速率newRate.
其中,proportional默认为1,integral默认为0.2,derivative默认为0 (也就是说dError默认是不起作用的)。从上文RateEstimator.create的源码可以看到,这三个权重值和PIDRateEstimator.minRate均由配置参数spark.streaming.backpressure.pid.*决定。
可以看到,在计算新的数据消费速率时,spark将消息处理速率,累计延迟以及消息处理速率差值的变化都纳入了考量范围,避免新的batch消费超过处理能力的数据量,从而达到反压的功能。
4 总结
本文基于spark streaming direct mode消费kafka数据的场景,从源码角度分析了DirectKafkaInputDStream计算kafka分区的offset ranges并生成KafkaRDD,DirectKafkaInputDStream如何借助RateController计算各个kafka分区在新batch中应处理的最大消息数量,以及RateController和RateEstimator的内部结构,并着重分析了PIDRateEstimator的实现原理。
总的来说,spark streaming在通过direct mode访问kafka数据时,通过配置参数spark.streaming.kafka.maxRatePerPartition和spark.streaming.kafka.minRatePerPartition进行限流;与此同时,通过PIDRateEstimator计算出合适的数据消费速率,从而控制数据处理的workload,进行反压。
5 思考
这里聊一下笔者对spark streaming限流反压机制的思考。spark streaming的限流反压仅仅是依据历史(前一个batch)处理速率和累计延迟去估算新的batch合适的数据处理速率,这种机制在一些场景下确实可以有效地起到反压作用。
比如,当内存充足,数据处理的瓶颈在CPU,CPU来不及计算单个batch的数据导致延迟,这个时候spark streaming的反压机制计算出来的数据处理速率是和数据量线性成比例的,所以估算出来的新的数据处理速率可以保证新batch的数据可以在一个batch时间内处理完毕,并可以比较充分地利用整个batch interval的时间。
然而,当数据处理速率和数据量并非成比例,比如,当内存成为瓶颈,GC或者spill数据到磁盘的时间可能占了整个batch处理时间的一大部分,spark streaming反压机制估算出来的单条记录处理时间可能远大于实际的单条记录处理时间。这个时候新的batch就会读取很少的数据进行处理,可能就不会碰到内存瓶颈,很快就算完了,那么在估算时又会把数据处理速率估计得很大,导致又读取了大量数据进行处理,从而又碰到内存瓶颈,增加数据延迟。
从根本上讲,以上描述的问题是由于spark streaming的限流反压在估算数据处理速率时,没有更细粒度地考虑前一个batch的处理时间消耗在哪,而是笼统地用batch的数据量除以整个batch的处理时间来估算每秒能处理的记录数。
还有一个小问题,spark streaming的反压机制是从第一个batch结束后开始计算处理速率的,那么第一个batch就只能通过限流的方式来控制流入的数据量,而这个限流机制是需要开发人员手动设置并且无法在程序运行期间动态调整。
6 说明
1. spark源码版本:2.4.0
2. 本文的讨论和分析均基于Spark Streaming direct mode访问kafka的场景,其他streaming情况(如spark streaming的receiver-based访问模式)不一定适用
3. 水平有限,如有错误,望读者指出