receiver模式
原理图:
executor中会有receiver tasks接收kafka推送过来的数据,数据会被持久化,默认级别为memory_and_disk_ser_2,这个级别也可以修改
receiver task对接收过来的数据进行存储和备份
备份完成后去zookeeper中更新消费偏移量,
然后向driver中的receiver tracker汇报数据位置,最后driver根据数据本地化将task分发到不同节点上执行。
存在的问题:
当driver进程挂掉了,driver下的executor都会被杀死,当更新完zookeeper消费偏移量的时候,这个时候driver挂了,就会存在找不到数据的问题,数据丢失
解决办法:
开启wal(write ahead log)预写日志机制,在接受过来数据备份到其他节点的时候,同时备份到HDFS上一份(我们将接收来的数据持久化级别降到memory_and_disk),这样能保证数据的安全性,不过,因为写HDFS比较消耗性能,要在备份完数据之后才能进行更新zookeeper以及汇报位置等,这样会增加job的执行时间,这样对于任务的执行提高了延迟度
但是这样也会存在数据重复消费的问题
数据存储到HDFS上,提交偏移量太慢,driver挂了,任务重启后,先去HDFS上恢复数据
receiver的并行度设置:
receiver的并行度是由spark.streaming.blockInterval来决定的,默认为200ms,假设batchInterval为5s,那么每隔blockInterval就会产生一个block,这里就对应每批次产生RDD的partition,这样5秒产生的这个Dstream中的这个RDD的partition为25个,并行度就是25。如果想提高并行度可以减少blockInterval的数值,但是最好不要低于50ms。
Direct模式理解
SparkStreaming+kafka 的Driect模式就是将kafka看成存数据的一方,不是被动接收数据,而是主动去取数据。消费者偏移量也不是用zookeeper来管理,而是SparkStreaming内部对消费者偏移量自动来维护,默认消费偏移量是在内存中,当然如果设置了checkpoint目录,那么消费偏移量也会保存在checkpoint中。当然也可以实现用zookeeper来管理。
Direct模式并行度设置
Direct模式的并行度是由读取的kafka中topic的partition数决定的。
能保证有且只有一次,但是注意在逻辑里有输出到数据库等操作成功后,系统出错。spark里的逻辑会重新消费,但是数据会有重复数据