在SparkStreaming任务运行的过程中,由于数据流量或者网络的抖动,任务的batch很可能出现delay,所以就出现了一个需求:实时监控任务对kafka消息的消费,及时了解堆积情况。
这个需求应该有很多种解决方案,我这边提供的思路是基于Spark Metrics System的。SparkStreaming任务在运行过程中,会产生很多Metrics信息,不断地推送到Sink上面,我们这里使用到的是MetricsServlet。
打开Spark UI,我们能够很方便地通过RestAPI方式请求任务Metrics信息,接口如下:
返回的Metrics信息如下:
这里应用的方案就是在这些Metrics里面添加一个新Metrics,这个Metrics应该能够向监控应用程序提供任务batch对records的消费情况。
我们知道,SparkStreaming应用消费Kafka数据有两种API:Reciever模式和Direct模式。所以针对使用的不同的API,需要提供不同的Metrics信息,其格式可以如下设置:
- Reciever-Metrics
kafka.consumer.$zkQuorum.$topic.$groupId - Direcct-Metrics: kafka.direct.$kafkaBrokerList.$topic.lastCompletedBatch_sumOffsets
注意其中带“$”号 的为变量,需要根据实际情况赋值的,其它为常量字符串。
上面两个Metrics我们使用registerGauge方法分别向MetricsSystem注册就可以了。
根据上面Metrics的信息可以解读到,对于Reciever-Metrics,只向监控应用提供Kafka集群的连接信息,包括ZK,topics和groupId,注意对于多个topic的情况,要注册多个Metrics,然后需要监控应用自己调kafka的API去获取该consumer的offset和logsize,从而计算出堆积量;而对于Direcct-Metrics,需要Spark计算出每个batch消费的最新offset之和(实际上是计算消费的每个topic下所有partition的最新offset之和)。
针对具体使用来说,首先根据应用创建DStream时传递给API的参数获取到
1)对Reciever模式:zookeeper.connect
,group.id
,topics
;
2)对Direcct模式:metadata.broker.list
或者bootstrap.servers
,topics
;
等信息,并将信息配置在StreamingContext新建的结构里面(以便于StreamingSource获取)。
这样对于Reciever-Metrics来说,使用获取的信息构造对应的Metrics并注册,就可以了,对于value设置为0;对于Direcct-Metrics来说,需要在DirectKafkaInputDStream里面每一次compute计算时,将offsetRanges里面的元数据计算后推送到StreamingJobProgressListener里面(其中配置一个topic->sumOffsets的HashMap结构即可,每次compute向里面更新最新的计算结果)。最后在StreamingSource中registerGauge时根据topic就可以获取到sumOffset。
实现下来需要修改的Spark源码文件可能包括:
1)StreamingJobProgressListener.scala
2)DirectKafkaInputDStream.scala
3)KafkaInputDStream.scala
4)StreamingSource.scala
5)StreamingContext.scala