Spark Streaming限流反压机制源码剖析

本文基于spark streaming通过direct mode访问kafka的场景,从源码出发分析spark streaming如何实现数据读取的限流和反压。

我们知道,KafkaUtils.createDirectStream方法用于创建direct mode访问kafka的InputDStream.

从源码可以看到,所有KafkaUtils.createDirectStream方法重载最终都会调用下面这个方法创建DirectKafkaInputDStream对象:

KafkaUtils.createDirectStream

1 DirectKafkaInputDStream如何生成KafkaRDD?

先看看DirectKafkaInputDStream类图:

DirectKafkaInputDStream类图

DirectKafkaInputDstream.compute方法用于生成指定时间batch的KafkaRDD,这个RDD会从kafka各个分区抽取指定offset range的数据。

而这个offet range由以下三个因素决定:

1. 已消费到的最新offsets值,即DirectKafkaInputDStream.currentOffsets

2. 当前batch应该消费的最大消息数量,即DirectKafkaInputDStream.maxMessagesPerPartition的返回值

3. 当前kafka分区的最大可用的offset,即DirectKafkaInputDStream.latestOffsets的返回值

DirectKafkaInputDstream.compute会调用DirectKafkaInputDstream.clamp方法计算得到每个分区offset range的end offset,计算的方式就是currentOffsets中各个分区的begin offset加上maxMessagesPerPartition中对应分区的消息数,再和latestOffsets中对应分区的latest offset做比较,取更小值。

DirectKafkaInputDstream.clamp

其中,clamp的参数offets就是DirectKafkaInputDStream.latestOffsets的返回值。

最后,DirectKafkaInputDstream.compute会将各个分区的end offsets和currentOffets中的begin offsets组成各个分区的offset ranges,用以生成KafkaRDD对象:


DirectKafkaInputDstream.compute

2 DirectKafkaInputDStream如何实现限流反压?

2.1 rateController的生成

Spark Streaming限流反压机制主要在DirectKafkaInputDStream.maxMessagesPerPartition方法中实现,其依赖于rateController对象计算最新的数据消费速率(即每秒消费的消息数量),先看看DirectKafkaInputDStream中的rateController对象是如何生成的:


DirectKafkaInputDStream.rateController

可以看到,DirectKafkaInputDStream使用的是DirectKafkaRateController.

这里的RateController.isBackPressureEnabled会check配置参数spark.streaming.backpressure.enabled, 默认为false.

接着看RateEstimator.create方法:

RateEstimator.create

可以看到,目前(spark 2.4.0)spark只有一种rate estimator选择,即PIDRateEstimator.

2.2 DirectKafkaInputDStream.maxMessagesPerPartition

下面我们看看DirectKafkaInputDStream.maxMessagesPerPartition做了哪些事情:

    1. 调用RateController.getLatestRate方法得到最新的数据消费速率:


DirectKafkaInputDStream.maxMessagesPerPartition

          注意,这里得到的estimatedRateLimit是在一个batch中每秒从所有kafka分区消费的消息总数量。

    2. 计算各个分区的消息延迟量(即各个分区在latestOffsets和currentOffsets中对应分区offset的差值)。

    3. 以每个分区的延迟量为权重,计算各个分区的数据消费速率backpressureRate.

    4. 当maxRateLimitPerPartition>0时,比较backpressureRate和maxRateLimitPerPartition,取二者更小者作为分区的消费速率(这里正是限流功能的体现)。其中,maxRateLimitPerPartition由参数spark.streaming.kafka.maxRatePerPartition决定,默认为0.

DirectKafkaInputDStream.maxMessagesPerPartition

    5. 最后,将#4中计算得到的消费速率乘以batch的时长(单位转化为秒),并和minRateLimitPerPartition做比较(应该是和secsPerBatch*minRateLimitPerPartition做比较,此处可能是spark的一个bug),取二者更大者作为分区要消费的消息数。其中,minRateLimitPerPartition由参数spark.streaming.kafka.minRatePerPartition决定,默认为1.

DirectKafkaInputDStream.maxMessagesPerPartition

到此,我们分析了在DirectKafkaInputDStream中是如何利用RateController得到消费速率,并根据消费速率计算出指定batch中需要从各个分区消费的offset ranges, 进而生成对应KafkaRDD对象的。

下面,我们就详细讨论一下RateController是如何计算出数据消费速率的。

3 RateController & RateEstimator 如何计算和更新数据消费速率?

先看看RateController和RateEstimator的类图:

RateController和RateEstimator的类图

从上面的类图可以看到,所有RateController的构造函数都是需要两个参数:streamUID和rateEstimator.

streamUID是stream的唯一ID,由StreamingContext.getNewInputStreamId生成。 RateEstimator是用来估计输入流的消费速率的组件。

从上面的类图可以看出,有两种RateEstimator实现:

    1. ConstantEstimator:以一个固定值作为输入流的抽取速率,在spark的内部测试类(如DirectKafkaStreamSuite)中使用。

    2. PIDRateEstimator: 以proportional-integral-derivative (PID)方法评估输入流的数据抽取速率,下文会详细介绍。

3.1 RateController

从上面的类图可以看到,所有的RateController都是继承自StreamingListener的,也就是说rateController是在每个batch执行完成后作为一个listener被调用以更新数据消费速率的。

先来看看RateController的onBatchCompleted方法:

RateController.onBatchCompleted

先从batchInfo中抽取出四个变量:

    1. processingEnd: batch执行结束的时间。

    2. workDelay:处理batch数据所消耗的时间(单位是毫秒),也就是batch从开始执行到执行结束的时间。

    3. waitDelay:batch的等待时间(单位是毫秒,主要是前一个batch未能在新的batch开始之前完成而导致的延迟),也就是从batch被提交到其第一个job开始执行的时间。

    4. elems:在batch的时间段中,RateController对应的stream接受的消息数量。

RateController.computeAndPublish方法以这四个变量为参数,计算并更新数据消费速率:

RateController.computeAndPublish

首先,调用reateEstimator.compute方法计算得到新的数据消费速率,并更新到RateController的rateLimit变量中。

接着,调用publish方法发布更新后的数据消费速率,publish方法由RateController的子类实现。

因为DirectKafkaInputDStream.maxMessagesPerPartition直接调用RateController.getLatestRate方法获取最新的数据消费速率,不需要额外的发布动作。

所以,DirectKafkaRateController的publish方法是一个空实现:

DirectKafkaRateController

3.2 PIDRateEstimator

从3.1的分析可以看出,数据消费速率的计算是在rateEstimator中完成的,上文也有提到,spark中只有一种可选择的RateEstimator实现类,即PIDRateEstimator (ConstantEstimator仅在测试中使用)。

PIDRateEstimator的主要计算源码如下:

PIDRateEstimator.compute

1. 基于batch的消息数量elems和处理时间workDelay,计算出batch的消息处理速率:processingRate

2. 用上一个batch的消息处理速率latestRate减去processingRate,得到两个连续batch的消息处理速率差值:error

3. 基于batch的等待时间waitDelay和processingRate,计算出为了弥补等待时间造成的延迟所需要增加的消息处理速率差值:historicalError(顾名思义,就是因为历史batch处理延迟导致的差值)

4. 基于上一个batch的消息处理速率差值latestError和当前batch的消息处理速率差值error,计算出消息处理速率差值的变化率dError

5. 最后,根据PIDRateEstimator的参数proportional,integral和derivative(这三个参数可分别看作是error,historicalError和dError的权重值),用latestRate减去加权后的这三个error值,再和PIDRateEstimator.minRate做比较取更大者作为新的数据消费速率newRate.

其中,proportional默认为1,integral默认为0.2,derivative默认为0 (也就是说dError默认是不起作用的)。从上文RateEstimator.create的源码可以看到,这三个权重值和PIDRateEstimator.minRate均由配置参数spark.streaming.backpressure.pid.*决定。

可以看到,在计算新的数据消费速率时,spark将消息处理速率,累计延迟以及消息处理速率差值的变化都纳入了考量范围,避免新的batch消费超过处理能力的数据量,从而达到反压的功能。

4 总结

本文基于spark streaming direct mode消费kafka数据的场景,从源码角度分析了DirectKafkaInputDStream计算kafka分区的offset ranges并生成KafkaRDD,DirectKafkaInputDStream如何借助RateController计算各个kafka分区在新batch中应处理的最大消息数量,以及RateController和RateEstimator的内部结构,并着重分析了PIDRateEstimator的实现原理。

总的来说,spark streaming在通过direct mode访问kafka数据时,通过配置参数spark.streaming.kafka.maxRatePerPartition和spark.streaming.kafka.minRatePerPartition进行限流;与此同时,通过PIDRateEstimator计算出合适的数据消费速率,从而控制数据处理的workload,进行反压。

5 思考

这里聊一下笔者对spark streaming限流反压机制的思考。spark streaming的限流反压仅仅是依据历史(前一个batch)处理速率和累计延迟去估算新的batch合适的数据处理速率,这种机制在一些场景下确实可以有效地起到反压作用。

比如,当内存充足,数据处理的瓶颈在CPU,CPU来不及计算单个batch的数据导致延迟,这个时候spark streaming的反压机制计算出来的数据处理速率是和数据量线性成比例的,所以估算出来的新的数据处理速率可以保证新batch的数据可以在一个batch时间内处理完毕,并可以比较充分地利用整个batch interval的时间。

然而,当数据处理速率和数据量并非成比例,比如,当内存成为瓶颈,GC或者spill数据到磁盘的时间可能占了整个batch处理时间的一大部分,spark streaming反压机制估算出来的单条记录处理时间可能远大于实际的单条记录处理时间。这个时候新的batch就会读取很少的数据进行处理,可能就不会碰到内存瓶颈,很快就算完了,那么在估算时又会把数据处理速率估计得很大,导致又读取了大量数据进行处理,从而又碰到内存瓶颈,增加数据延迟。

从根本上讲,以上描述的问题是由于spark streaming的限流反压在估算数据处理速率时,没有更细粒度地考虑前一个batch的处理时间消耗在哪,而是笼统地用batch的数据量除以整个batch的处理时间来估算每秒能处理的记录数。

还有一个小问题,spark streaming的反压机制是从第一个batch结束后开始计算处理速率的,那么第一个batch就只能通过限流的方式来控制流入的数据量,而这个限流机制是需要开发人员手动设置并且无法在程序运行期间动态调整。

6 说明

1. spark源码版本:2.4.0

2. 本文的讨论和分析均基于Spark Streaming direct mode访问kafka的场景,其他streaming情况(如spark streaming的receiver-based访问模式)不一定适用

3. 水平有限,如有错误,望读者指出

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,718评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,683评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,207评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,755评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,862评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,050评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,136评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,882评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,330评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,651评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,789评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,477评论 4 333
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,135评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,864评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,099评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,598评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,697评论 2 351

推荐阅读更多精彩内容