Spark Streaming与kafka集成有以下两种接收数据的方式:
- 基于Receiver的方式
- 基于Direct的方式
基于Receiver方式
这种方式使用Receiver来接收kafka中的数据,Receiver是基于kafka的高层Consumer API来实现的。Receiver从Kafka中获取的数据都是存储在Spark Executor的内存中的,然后Spark Streaming启动的job会去处理那些数据。
在提交Spark Streaming任务后,Spark集群会划出指定的Receivers来专门、持续不断、异步读取kafka数据,读取时间间隔以及每次读取offsets访问由参数来配置。读取的数据保存在Receiver中,具体StorageLevel方式由用户指定。当Driver触发batch任务的时候,Receivers中的数据会转移到Executors中去执行。在执行完之后,Receivers会相应更新Zookeeper的offsets。
在默认的配置下,这种方式可能会因为底层的失败而丢失数据。如果要启用高可靠机制,让数据零丢失,就必须启用Spark Streaming的预写日志机制(Write Ahead Log, WAL)。该机制会同步地将接收到的kafka数据写入到分布式文件系统(如HDFS)上的预写日志中。所以,即使底层节点出现了失败,也可以通过预写日志中的数据进行恢复。
确保At least once的读取方式,可以设置:
spark.streaming.receiver.writeAheadLog.enable = true
基于Receiver方式读取数据,用户可以专注于所读数据,而不用关注或维护consumer的offsets,这减少了用户的工作以及代码量,而且相对比较简单。
基于Receiver方式存在的问题:
- 启用WAL机制,每次处理之前需要将该batch内的数据备份到checkpoint目录中,这降低了数据处理效率,同时加重了Receiver的压力;另外由于数据备份机制,会收到负载影响,负载一高就会出现延迟的风险,导致应用崩溃。
- 采用
MEMORY_AND_DISK_SER
降低对内存的要求,但是在一定程度上影响了计算的速度。 - 单Receiver内存。由于Receiver是属于Executor的一部分,为了提高吞吐量,提高Receiver的内存。但是在每次batch计算中,参与计算的batch并不会使用这么多内存,导致资源严重浪费。
- 提高并行度,采用多个Receiver来保存kafka的数据。Receiver读取数据是异步的,不会参与计算。如果提高了并行度来平衡吞吐量很不划算。
- Receiver和计算的Executor是异步的,在遇到网络等因素时,会导致计算出现延迟,计算队列一直在增加,而Receiver一直在接收数据,这非常容易导致程序崩溃。
- 在程序失败恢复时,有可能出现数据部分落地,但是程序失败,未更新offsets的情况,这会导致数据重复消费。
基于Direct方式
在Spark1.3版本中引入,替代使用Receiver来接收数据,这种方式会周期性地查询kafka,来获得每个topic+partition的最新的offset,从而定义每个batch的offset的范围,当处理数据的job启动时,就会使用kafka的简单Consumer API来获取kafka中指定offset范围的数据。
基于Direct方式的优势:
- 简化并行读取:如果要读取多个partition,不需要创建多个输入DStream然后对他们进行union操作。Spark会创建跟Kafka partition一样多的RDD partition,并且会并行从kafka中读取数据。所以在kafka partition和RDD partition之间,有一一对应的关系。
- 高性能:如果要保证数据零丢失,在基于Receiver的方式中,需要开启WAL机制。这种方式其实效率很低,因为数据实际被复制了两份,kafka自己本身就有高可靠的机制,会对数据复制一份,而这里又会复制一份到WAL中。而基于Direct的方式,不依赖于Receiver,不需要开启WAL机制,只要kafka中做了数据的复制,那么就可以通过kafka的副本进行恢复。
- 强一致语义:基于Receiver的方式,使用kafka的高阶API来在Zookeeper中保存消费过的offset。这是消费kafka数据的传统方式。这种方式配合WAL机制,可以保证数据零丢失的高可靠性,但是却无法保证数据被处理一次且仅一次,可能会处理两次。因为Spark和Zookeeper之间可能是不同步的。基于Direct的方式,使用kafka的简单api,Spark Streaming自己就负责追踪消费的offset,并保存在checkpoint中。Spark自己一定是同步的,因此可以保证数据时消费一次且仅消费一次。
- 降低资源:Direct不需要Receiver,其申请的Executors全部参与到计算任务中;而Receiver则需要专门的Receivers来读取kafka数据且不参与计算。因此相同的资源申请,Direct能够支持更大的业务。Receiver与其他Executor是异步的,并持续不断接收数据,对于小业务量的场景还好,如果遇到大业务量时,需要提高Receiver的内存,但是参与计算的Executor并不需要那么多的内存,而Direct因为没有Receiver,而是在计算的时候读取数据,然后直接计算,所以对内存的要求很低。
- 鲁棒性更好:基于Receiver方式需要Receiver来异步持续不断的读取数据,因此遇到网络、存储负载等因素,导致实时任务出现堆积,但Receiver却还在持续读取数据,此种情况容易导致计算崩溃。Direct则没有这种顾虑,其Driver在触发batch计算任务时,才会读取数据并计算,队列出现堆积并不不会引起程序的失败。
基于Direct方式的不足
- Direct方式需要采用checkpoint或者第三方存储来维护offset,而不是像Receiver那样,通过Zookeeper来维护offsets,提高了用户的开发成本。
- 基于Receiver方式指定topic指定consumer的消费情况均能够通过Zookeeper来监控,而Direct则没有这么便利,如果想做监控并可视化,则需要投入人力开发。