receiver 方式
/* 输入的四个参数分别代表着
* 1. zkQuorum :zookeeper地址
* 2. group:消费者所在的组
* 3. topics:该消费者所消费的topics
* 4. numThreads:开启消费topic线程的个数
*/
val Array(zkQuorum, group, topics, numThreads) = args
val sparkConf = new SparkConf().setAppName("KafkaWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(1))
ssc.checkpoint(".") //这里表示把检查点文件写入分布式文件系统HDFS,所以要启动Hadoop
// 将topics转换成topic-->numThreads的哈稀表
val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
// 创建连接 Kafka 的消费者链接
val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
val words = lines.flatMap(_.split(" ")) //将输入的每行用空格分割成一个个word
注意点:
在 Receiver 的方式中,Spark中的 partition 和 kafka 中的 partition 并不是相关的,
所以如果我们加大每个 topic 的 partition 数量,仅仅是增加线程来处理由单一Receiver消费的 Topic。
但是这并没有增加Spark在处理数据上的并行度.
对于不同的 Group 和 topic 我们可以使用多个Receiver创建不同的 Dstream 来并行接收数据,
之后可以利用 union 来统一成一个 Dstream
防止数据丢失:
可能因为 execture 、driver 死掉而丢失
增加(spark.streaming.receiver.writeAheadLog.enable=true)
KafkaUtils.createStream(…, StorageLevel.MEMORY_AND_DISK_SER)
减少数据丢失
参考:
Spark Streaming + Kafka整合
Spark Streaming 实时处理数据案例
Direct 方式
为了WAL的性能损失和exactly-once,spark streaming1.3中使用Kafka direct API。非常巧妙,Spark driver计算下个batch的offsets,指导executor消费对应的topics和partitions。消费Kafka消息,就像消费文件系统文件一样。
上面的过程直接看翻译,很清晰的~(periodically:周期的,query:查询,calculate:计算,offset:偏移量,ranges:范围,batch:批处理,在spark streaming 中表示一个个切片,consume:消费)
- 不再需要kafka receivers,executor直接通过Kafka API消费数据
- WAL不再需要,如果从失败恢复,可以重新消费
- exactly-once得到了保证,不会再从WAL中重复读取数据
创建DStream,返回接收到的输入数据
/* 输入的四个参数分别代表着
* topicSeq:topic列表
* kafkaParams:consumer 配置项
* fromOffsets:topic + partition 起始 offset,fromOffsets是可选的。
*/
var stream = KafkaUtils.createDirectStream(ssc,
PreferConsistent, Subscribe[String, String](topicSeq, kafkaParams, fromOffsets))
参考
Spark Streaming和Kafka集成深入浅出。(一些概念的介绍很好,还有:反压)
Spark Streaming 使用 Kafka 保证数据零丢失。(太吊了!强烈推荐仔细看!)
反压
如果在一个 batch 内收到的消息比较多,这就需要为 executor 分配更多内存,可能会导致其他spark streaming应用程序资源分配不足,甚至有OOM的风险。特别是第一次启动应用程序,从earliest offset消费数据时,kafka保留的历史消息越多,数据处理时间也就越长。反压可以限制每个batch接收到的消息量,降低数据倾斜的风险,开启反压:
SparkConf.set("spark.streaming.backpressure.enabled", "true")
设置每个kafka partition读取消息的最大速率:
SparkConf.set("spark.streaming.kafka.maxRatePerPartition",
"spark.streaming.kafka.maxRatePerPartition")
这个值要结合 spark Streaming 处理消息的速率和 batch Duration,尽量保证读取的每个 partition 数据在 batch Duration 时间内处理完,这个参数需要不断调整,以做到尽可能高的吞吐量.
两种方式优缺点:
receiver 保证数据不丢失,但是可能数据会重复消费
Direct 相比 Receiver 模式而言能够确保机制更加健壮. 区别于使用 Receiver 来被动接收数据, Direct 模式会周期性地主动查询 Kafka, 来获得每个 topic+partition 的最新的 offset, 从而定义每个 batch 的 offset 的范围. 当处理数据的 job 启动时, 就会使用 Kafka 的简单 consumer api 来获取 Kafka 指定 offset范围的数据。
receiver 对于多个分区需要增加多个 recevier 然后 union 使用
Direct 不需要创建多个输入 DStream 然后对它们进行 union 操作. Spark会创建跟Kafka partition一样多的RDD partition, 并且会并行从Kafka中读取数据. 所以在Kafka partition 和 RDD partition 之间, 有一个一对一的映射关系