Spark Streaming+Kakfa细节剖析
本文基于 Kafka 1.1.0
和 Spark 2.3.0
版本源代码进行分析
Kafka 消费接口
Kafka消费接口包含:低级和高级API,这个区分主要针对broker版本 <0.9.0,在0.9.0中引入了新消费者APIorg.apache.kafka.clients.consumer.KafkaConsumer
将两者结合起来了。
低级API(<0.9.0)
低级API的入口类为 kafka.consumer.SimpleConsumer
,已经被标记为废弃Deprecated,它的替代者是KafkaConsumer。
低级api的好处是可以更多样的控制读取数据,如:
- 同一条消息读取多次
- 只读取一个Topic中的部分Partition
- 管理读取会话,使一条消息只被读取和处理一次(Exactly Once)
使用该API的5个步骤为(引用):
- Find an active Broker and find out which Broker is the leader for your topic and partition
- Determine who the replica Brokers are for your topic and partition
- Build the request defining what data you are interested in
- Fetch the data
- Identify and recover from leader changes
高级API(<0.9.0)
更多的时候,我们只关心从kafka中不断的消费新数据,而对上面提到的细节不关心,这些细节包括:find Leader Broker、partition增加减少、consumer增加减少(引起Rebalance)、leader变化、异常处理等。
这时就可以使用高级API了,它的入口是 kafka.consumer.ZookeeperConsumerConnector
要用高级API应该使用多线程开发,每一个线程对应一个Consumer,Consumer和Partition的数量关系为:
- 如果consumer比partition多,是浪费,因为kafka的设计是在一个partition上是不允许并发的,所以consumer数不要大于partition数
- 如果consumer比partition少,一个consumer会对应于多个partitions,这里主要合理分配consumer数和partition数,否则会导致partition里面的数据被取的不均匀
- 如果consumer从多个partition读到数据,不保证数据间的顺序性,kafka只保证在一个partition上数据是有序的,但多个partition,根据你读的顺序会有不同
- 增减consumer,broker,partition会导致rebalance,所以rebalance后consumer对应的partition会发生变化
更详细内容见引用
新消费者API(>=0.9.0)
新API是消费API的重新优化,它利用到kafka内置的组协调协议(group coordination protocol),它有以下几个优点:
- 重新设计API,结合了低级API和高级API的功能于一体
- 使用pure java开发,且不再依赖于zookeeper和scala了, 老API依赖zookeeper来管理topic的消费组信息
- 新增安全扩展 https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=51809888
详细使用示例见引用
关于Kafka offset
的管理
- 提交至ZK,但频繁操作zk是效率比较低的,早期高级消费API offset存在ZK中
- 提交至Kafka集群内部,Kafka 0.9版本后使用高级消费API offset是存在kafka topic:
__consumer_offsets
- 自己控制,存储在外部系统中如redis、hdfs等
Spark Streaming + Kafka
spark源码中有两个不同版本的代码:spark-streaming-kafka-0-8 和spark-streaming-kafka-0-10, 对比如下:
这里以 spark-streaming-kafka-0-8 的进行介绍
Kafka Receiver模式(2.3.0已废弃)
示意图:
Receiver使用的是高级消费者API,Receiver模式使用代码如下:
import org.apache.spark.streaming.kafka._
val kafkaStream = KafkaUtils.createStream(streamingContext,
[ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume])
关键点为:
- Topic分区数和Spark RDD分区数是没有关系的,在参数中指定的Topic分区只是增加了,消费者的数量,对应到消费者处理线程数。不会影响spark处理数据的并行度
- 当有多个Topic或者Group时,可以通过创建多个DStream来增加Receiver的数量,从而增加接收数据的并行度,一个Receiver只在一个worker上运行,详细见 level-of-parallelism-in-data-receiving
- 如果启用了WAF,配置为:
spark.streaming.receiver.writeAheadLog.enable
,则需要设定存储的级别如:StorageLevel.MEMORY_AND_DISK_SER
,如果没有启用WAF,Receiver模式不能保证至少一次的语义。(TODO:代码细节)
Receiver模式关联的Spark Conf为spark.streaming.receiver.*
.
Direct模式(推荐)
示意图:
使用新消费API中的低级接口来实现,相比Receiver有以下几个特点:
- 简化并行:不需要通过创建多个DStream并将他们Union的方式,来达到并行读取数据,Direct方式Kafka的Partition和RDD Partition是一致的。
- 更高效:Receiver模式需要开启WAL才可以不丢数据,而Direct没有Receiver,它使用Spark本身的checkout来报错offset(TODO 另外由于Receiver是个单独的Executor异步读取数据,某些情况下内存需求会很大,因此导致整个App的Execotor都需要多申请写内存)
- Exactly-once语义(TODO)
Direct模式没有更新offset到zookeeper中,将导致基于zookeeper的kafka监控工具无法显示数据消费情况,不过这可以通过手动提交offset来实现。
相关配置项为:spark.streaming.kafka.*
. 其中关键配置项为spark.streaming.kafka.maxRatePerPartition
用于控制最高速率。
Direct模式默认关闭kafka offset自动提交如:
private[kafka010] def fixKafkaParams(kafkaParams: ju.HashMap[String, Object]): Unit = {
logWarning(s"overriding ${ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG} to false for executor")
kafkaParams.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false: java.lang.Boolean)
logWarning(s"overriding ${ConsumerConfig.AUTO_OFFSET_RESET_CONFIG} to none for executor")
kafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none")
SparkSteaming+Kafka处理数据Exactly Once介绍
流式处理包含三个部分:
- Receive Data:这里主要依赖于数据源,kafka本身是基于offset的可以保证数据仅读取一次。
- Transformming Data:spark借助rdd的特性保证
- Output Operation:重点要考虑
以下介绍采用Direct模式时保证Exactly Once的三种方式:
- checkpoint+幂等输出操作: 这种方法有缺陷,当代码变更是,checkpoint的数据无法恢复
- Kafka+幂等输出操作:关闭自动提交offset,你需要在确保处理完输出后,再执行提交commitAsync
stream.foreachRDD { rdd =>
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
// some time later, after outputs have completed
stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}
注:这里类型强转CanCommitOffsets的前提是stream是createDirectStream的结果,没有经过其他转换
- 外部支持事务的存储:将offset和结果数据在一个事务中完成存储
其他问题:
- back pressure的实现方式?
- Spark Checkpoint?
结语
本文只记部分内容,更多内容和细节还是需要看官方文档和代码。