Spark Streaming 对接 Kafka 的那些事儿

1. Background

Kafka 作为 Spark Streaming 数据的最重要的来源,官方为此专门提供了二者整合的 jar 包。这使得我们可以很方便的对接二者,但是其中的细节还是很繁杂的。了解其中的原理对于理解 Spark 和 Kafka 都有很大帮助,也是我们日后进行调优的基石。

2. Basic

2.1 Spark Streaming 如何对接 Kafka?

对于 Kafka 的版本来说,有两个重要节点:0.8.2 和 0.10。也就意味着 Spark 官方提供的整合 jar 包有两个重要节点版本:spark-streaming-kafka-0-8 和 spark-streaming-kafka-0-10。0.8.2 可以通过 receiver based 或者 dirct 的方式进行。0.10 则只支持 dircet 方式的对接。注意在 Spark 2.3 以后已经将 Kafka 0.8 版本的支持标为过时。

2.2 我们谈论 RDD 的时候是在谈论什么?

大家都知道,Spark Streaming 是基于 DStream 展开的,通过 action 操作触发执行,并在设定的时间间隔汇集多个 RDD 组成 DStream,最终封装成 task 到 executor 调度。是一种批处理,伪实时的计算框架。所以 Spark Streaming 底层是 Spark Core,而 Spark Core 的核心又是 RDD。那么这么抽象的 RDD 到底表示了什么?除了源码注释中的五大特性外,RDD 中包含了数据吗?答案显然是不包含的。比如 textfile 生成的 RDD,实际上他是文件位置的描述。KafkaRDD 生成的时候实际上是指定了一定范围的 offset。BlockRDD 实际上存储的是 blockId,即数据在 blockmanager 的标识符。之所以不包含数据,是因为大数据量的数据都是从 driver 端发送的话,那么 driver 的压力会很大,而且存在单点故障问题。刚刚提到的 BlockRDD 和 KafkaRDD 分别对应了 Spark Streaming 以 receiver based 和 direct 对接的方式产生的 RDD。因为 BlockRDD 是基于 receiver 的,所以他的分区数和 Kafka topics 的分区是没有任何关系的,BlockRDD 一个分区对应的是一个 block,而一个 block 是 receiver 每隔 200ms(默认值) 生成的。而 KafkaRDD 则是直接对接 Kafka,他的分区和 Kafka topics 的分区是一一对应的。前面说所有的 RDD 都不包含数据也是有特例的,parallelize 方式从内存中直接生成的 RDD 会将数据从 driver 发送到 executor,可以理解为是包含数据的,而不再是一种描述/标识符。

2.3 High level API & Simple consumer

在 Kafka 中有两种类型的消费者,一种是具有消费者组的概念可以自己管理组内 rebalance,并且管理 offset 的我们称之为 High level API;另一种是需要调用者自己管理 offset 并且不具备消费者组概念的我们称之为 Simple consumer。后面版本又出了 new consumer 整合了二者,这里不做过多介绍。

2.4 WAL(write-ahead logs)

以下截选自官网:

Configuring write-ahead logs - Since Spark 1.2, we have introduced write-ahead logs for achieving strong fault-tolerance guarantees. If enabled, all the data received from a receiver gets written into a write-ahead log in the configuration checkpoint directory. This prevents data loss on driver recovery, thus ensuring zero data loss.It is recommended that the replication of the received data within Spark be disabled when the write-ahead log is enabled as the log is already stored in a replicated storage system.

WAL 即预写日志,通过将数据和计算进程等信息保存在磁盘上,当集群出现故障进行恢复时,可以恢复到故障前的状态。如果 Spark 开启了 WAL,建议将 Spark 中的多副本关闭,因为数据已经通过 WAL 保存在了可靠的存储中。checkpoint 和 WAL 什么关系?初学者可能会混淆二者的概念,checkpoint 根据在不同的语境有不同的含义,比如在 Spark Streaming 中的注释是:

Set the context to periodically checkpoint the DStream operations for driver fault-tolerance.

而 RDD 中的 checkpoint 则要复杂一些,具体可以参考之前的这篇文章。而 WAL 则是依赖 checkpoint 实现的,当然我们可以修改源码通过其他的方式实现 WAL。

2.5 Kakfa offset

虽然这是 Kafka 的知识,但是 Spark Streaming 毕竟对接的是 Kakfa,所以一些基本的概念还要清楚的。首先看看什么是 offset:

用来唯一的标识 kafka topic 中分区的每一条记录。

再来看看 committed offset:

记录 consumerGroup 在指定(topic,partition)的消费记录。
每一个(group,topic,partition)确定一个 commited offset

offset 可以理解为书的页码。而 committed offset 可以理解为书签,这个书签一定是针对某一个人(对应 kafka 的消费者组)的,表示了某个人对某本书的阅读进程。

committed offset 可以用持久化的方式保存在任何地方。比如 Zookeeper、Kakfa、甚至直接保存在 HDFS 中。以 Zookeeper 为例,保存的路径是 /consumers/xx-consumer_group/offsets/xx-topic/xx-partition/。例如名字为"console-consumer-57704"的消费者组在 topic 名为 test 上的 0 号分区的 committed offset 可以通过 get /consumers/console-consumer-57704/offsets/test/0 来获取。

我们前面说 receiver based 的对接方式消费 kafka 数据使用的是 High level api,框架帮我们管理 offset( 保存在 Zookeeper),是具有消费者组的概念的。也即是多个 receiver 可以位于同一个消费者组,共同完成一份数据的消费。对于 direct 模式,我们在使用 simple consumer(082版本) or new consumer(010版本) 时引入了一个 OffsetRange 的概念,这个类有四个变量:topic, partition, fromOffset, untilOffset,可以看到其中是没有消费者组的,即单纯的表示了 Kafka 中某个 topic-partition 的一段消息。类比之前书的例子,则单纯的代表了某本书的某几页,和任何读者是无关的。

2.6 metadata.broker.list & bootstrap.servers & zookeeper

在 Spark Streaming 对接 Kafka 时,经常会遇到这三个参数。首先可以简单将 bootstrap.servers 理解为新版本中的 metadata.broker.list,新版本中已经将 metadata.broker.list标记为过时。那么 bootstrap.servers和 zookeeper 参数之间有什么差别呢?首先需要知道的是,二者都是为了保存 Kakfa 中一些重要信息,如元数据信息和消费的 offset 信息。旧版本中这些信息都保存在了 Zookeeper 中,如获取 "first" 的元数据信息kafka-topics.sh --zookeeper hadoop102:2181 --desc --topic first,结果如下:

​ Topic:first PartitionCount:3 ReplicationFactor:3 Configs:
​ Topic: first Partition: 0 Leader: 102 Replicas: 102,103,104 Isr: 102,103,104
​ Topic: first Partition: 1 Leader: 103 Replicas: 103,104,102 Isr: 102,103,104
​ Topic: first Partition: 2 Leader: 104 Replicas: 104,102,103 Isr: 104,102,103

这些信息我们可以通过 Zookeeper 客户端获取:以 2 号分区为例:get /brokers/topics/first/partitions/2/state

{"controller_epoch":26,"leader":104,"version":1,"leader_epoch":54,"isr":[104,102,103]}

而在新版本中,这些信息都存在了 Kakfa broker 中。

2.7 receiver reliability

以下内容来自官网:

Reliable Receiver: A reliable receiver correctly sends acknowledgment to a reliable source when the data has been received and stored in Spark with replication.

Unreliable Receiver:An unreliable receiver does not send acknowledgment to a source. This can be used for sources that do not support acknowledgment, or even for reliable sources when one does not want or need to go into the complexity of acknowledgment.

可以看到,receiver 是否可靠取决于 receiver 是否发送 ACK 给数据源。

3. Deep

3.1 Receiver-based Approach

关于 receiver based 方式需要知道的事儿:

  • 工作流程:driver 调度 receiver 到 executor 端启动,receiver 会持续不断从 kafka 中接收数据并存放到 BlockManager 中。生成 job 时会将该批次的所有 block 组装起来生成 BlockRDD。一个 block 对应一个 task。block/task 的个数有两个参数决定,一个是 Spark Streaming 程序一个批次的间隔(batchDuration),另一个是 receiver 生成一个 block 的时间周期(spark.streaming.blockInterval默认值是 200ms )。比如批次间隔是 2s,block 周期默认 200ms,那么一个批次将产生 10 个 block/task。即生成的一个 BlockRDD 具有 10 个分区,分区数量决定了处理数据的并行度/效率:如果通过设置参数导致分区数多小,将无法充分利用集群资源。但分区数也不能过多,生成 block 的周期不应小于 50 ms,任务调度所占用的时间比重将过大。

  • Kafka topic 的分区和 Spark Streaming 生成的 BlockRDD 的分区不是一一对应的关系。通过 KafkaUtils.createStream()创建的 ReceiverInputDStream中有一个参数topics: Map[String, Int],key 表示 topic_name,value 表示 numPartitions。通过增加 numPartitions 的数量,只是单纯增加了消费者组中消费者的个数,并不能增加 Spark 接收/处理数据的并行度。若想增加接收数据的并行度应该增加 receiver 的个数,通过调用多次KafkaUtils.createStream()来创建多个 receiver 和 DStream,然后用 StreamingContext.union(streams: Seq[DStream[T]])来合并多个 DStream,以此来增加接收数据的并行度。

  • 每个 receiver 需要占用一个 cpu,所以在本地模式下,不要使用 local[1] 这种方式;在集群模式下,总核数要大于总receiver 的个数

  • 如果开启了 WAL 机制,那么创建的 DStream 的存储级别应该设置为单副本:KafkaUtils.createStream(..., StorageLevel.MEMORY_AND_DISK_SER)

  • receiver 接收的 block 会放入blockmananger,每个 executor 都会有一个 blockmanager 实例,由于数据的本地性,receiver 所在的 executor 会被调度执行更多的 task,就会导致其他某些 executor 比较空闲。可以通过1.增加 receiver 2.repartition 增加分区 3.调小参数spark.locality.wait(How long to wait to launch a data-local task before giving up and launching it on a less-local node) 来缓解。

创建入口:

def createStream(
      ssc: StreamingContext,
      zkQuorum: String,
      groupId: String,
      topics: Map[String, Int],
      storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
    ): ReceiverInputDStream[(String, String)] = {
    val kafkaParams = Map[String, String](
      "zookeeper.connect" -> zkQuorum, "group.id" -> groupId,
      "zookeeper.connection.timeout.ms" -> "10000")
    createStream[String, String, StringDecoder, StringDecoder](
      ssc, kafkaParams, topics, storageLevel)
  }

def createStream[K: ClassTag, V: ClassTag, U <: Decoder[_]: ClassTag, T <: Decoder[_]: ClassTag](
      ssc: StreamingContext,
      kafkaParams: Map[String, String],
      topics: Map[String, Int],
      storageLevel: StorageLevel
    ): ReceiverInputDStream[(K, V)] = {
    val walEnabled = WriteAheadLogUtils.enableReceiverLog(ssc.conf)
    new KafkaInputDStream[K, V, U, T](ssc, kafkaParams, topics, walEnabled, storageLevel)
  }


KafkaUtils 提供了两个入口供我们创建 ReceiverInputDstream ,第一个简易版的默认消息的 KV 都是 String 类型,并且持久化级别是 MEMORY_AND_DISK_SER_2,不支持传递 Kakfa 配置参数。第二个则支持配置所有参数。下面的代码演示中 No WAL 因为要设置auto.commit.interval.ms参数,所以使用了第二种方式创建。 With WAL 则使用了第一种简易版的入口。

3.1.1 No WAL

object ReceiverWithoutWAL {

  def main(args: Array[String]): Unit = {

    val sparkConf: SparkConf = new SparkConf()
      .setAppName("ReceiverWithoutWAL")
      .setMaster("local[*]")

    val sc = new SparkContext(sparkConf)

    val kafkaParams: Map[String, String] = Map[String, String](
      "zookeeper.connect" -> "hadoop102:2181",
      "group.id" -> "consumer-group_receiver_no_wal",
      "zookeeper.connection.timeout.ms" -> "10000",
      "auto.commit.interval.ms" -> "1000")

    val ssc = new StreamingContext(sc, Seconds(10))
    val topicMap = Map("topic_receiver_no_wal" -> 1)

    val lines = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicMap, StorageLevel.MEMORY_AND_DISK_SER_2).map(_._2)

    lines.print()

    ssc.start()
    ssc.awaitTermination()
  }
}

这里将 offset 自动保存在 Zookeeper 中,并且自动提交 offset 时间间隔设置为 1s,Spark Streaming 批次间隔为 10s。通过测试发现,提交 offset 后没有处理数据(没到下一个批次的时间间隔),此时 driver 挂掉 --> receiver 底层存储依赖的BlockManager 挂掉 --> 存储的数据丢失。重启后去按照最新 offset 去 kafka 拉数据造成上一批数据丢失未处理。p.s. 在 Idea 中直接点击停止终止程序时,无论是否到达 offset 提交周期,都会自动提交 offset 再关闭程序;而在控制台中直接 kill 掉 jvm 进程则不会自动提交 offset。

3.1.2 With WAL

object ReceiverWithWAL {

  def createSSC(): StreamingContext = {
    val sparkConf: SparkConf = new SparkConf()
      .set("spark.streaming.receiver.writeAheadLog.enable", "true")
      .setAppName("ReceiverWithWAL")
      .setMaster("local[*]")

    val sc = new SparkContext(sparkConf)
    val ssc = new StreamingContext(sc, Seconds(5))
    ssc.checkpoint("/ch")
    val topicMap = Map("topic_receiver_with_wal" -> 1)
    val zkQuorum = "hadoop102:2181"
    val groupId = "consumer-group_receiver_with_wal"
    val lines: DStream[String] = KafkaUtils.createStream(ssc, zkQuorum, groupId, topicMap, StorageLevel.MEMORY_AND_DISK_SER).map(_._2)

    lines.print()
    ssc
  }


  def main(args: Array[String]): Unit = {
    val ssc = StreamingContext.getActiveOrCreate("file:///Users/tianciyu/Desktop/ch", () => createSSC())

    ssc.start()
    ssc.awaitTermination()
  }

}

这里通过配置spark.streaming.receiver.writeAheadLog.enable参数开启了 WAL,并通过 getActiveOrCreate都方式获取 StreamingContext,这样就可以在发生故障后,通过 WAL 从设置的 checkpoint 目录恢复计算和数据。注意因为开启了 WAL,所以将持久化级别的副本数设置为了1个。

以下截取自源码:

val walEnabled = WriteAheadLogUtils.enableReceiverLog(ssc.conf)

def getReceiver(): Receiver[(K, V)] = {
    if (!useReliableReceiver) {
      new KafkaReceiver[K, V, U, T](kafkaParams, topics, storageLevel)
    } else {
      new ReliableKafkaReceiver[K, V, U, T](kafkaParams, topics, storageLevel)
    }
}

其中 useReliableReceiver 是通过 walEnabled 赋值的。可以看到源码中通过读取spark.streaming.receiver.writeAheadLog.enable参数来决定是否使用 ReliableReceiver。而在 ReliableReceiver 中会将auto.commit.enable设置为 false。目的是写入 WAL 预写日志后再更新 offset。此时没有处理数据 driver 挂掉,重启后从 checkpoint 中恢复状态并从 WAL 拉取数据,成功处理上一批数据防止了数据丢失(没有 WAL 则 receiver 接收的数据只存储在 BlockManager 中)。WAL 的本质是先处理数据再更新 offset,这里的处理数据对框架来说便是将数据持久化保存在可靠存储。

3.2 Direct Approach

关于 direct 方式对接 Kafka 需要知道的事儿:

  • 没有 receiver,少占用一个cpu核

  • 不再需要 receiver 接收数据,写入 blockManager,运行时再通过 blockId 取数据。没有多余的网络传输、磁盘读取来获取数据的过程。而是采用 simple consumer(082版本) / new consumer(010版本) 的方式通过每个批次对应的 OffsetRange 直接从 Kafka 中读取数据,提高了效率。

  • DirectKafkaInputDStream 生成的 RDD 不再是 BlockRDD,而是KafkaRDD 。KafkaRDD 的分区和 Kafka topic 的分区一一对应,更便于并行度的调优。对比基于 receiver 的方式,则需要多次创建 ReceiverInputDStream 然后进行 union。

  • 无需 WAL。因为直接从 Kakfa 中取数据,所以 driver 挂了并不会造成数据丢失。对比 receiver based 方式,因为 receiver 的存在,虽然数据最初来源自可靠存储 Kafka 中,但是加了一层 receiver ,想保证数据 0 丢失仍然需要 WAL 配合。

  • 不再使用 Zookeeper 存储 offsets。

    对于 082 版本来说:

    Hence, in this second approach, we use simple Kafka API that does not use Zookeeper. Offsets are tracked by Spark Streaming within its checkpoints.

    offset 默认是保存在 checkpoint 中,所以必须配置 checkpoint,否则每次重启将触发 auto.offset.reset 逻辑。

    对于 010 版本来说:

    Kafka has an offset commit API that stores offsets in a special Kafka topic. By default, the new consumer will periodically auto-commit offsets.

    offset 默认是保存在 Kafka 一个特殊的 topic 中(__consumer_offsets)。这个无需配置。

  • 可以通过手动维护 offset,实现精准一次消费语义(Exactly-once semantics)

3.2.1 kafka version: 0.8.2

/** 
*       Requires "metadata.broker.list" or "bootstrap.servers"
*   to be set with Kafka broker(s) (NOT zookeeper servers), specified in
*   host1:port1,host2:port2 form.
*   If not starting from a checkpoint, "auto.offset.reset" may be set to "largest" or "smallest"
*   to determine where the stream starts (defaults to "largest")
*/

def createDirectStream[
    K: ClassTag,
    V: ClassTag,
    KD <: Decoder[K]: ClassTag,
    VD <: Decoder[V]: ClassTag] (
      ssc: StreamingContext,
      kafkaParams: Map[String, String],
      topics: Set[String]
  )

首先看一下入口函数,注释中有两点需要注意:1. 需要配置metadata.broker.list 或者 bootstrap.servers。 2.如果没有配置 checkpoint,则由auto.offset.reset参数决定每次启动时从哪里读取数据。

这里以配置了 checkpoint 为例,写一个 quick start demo:

object DirectApproach082 {


  def createSSC(): StreamingContext = {
    val conf: SparkConf = new SparkConf().setAppName("KafkaStreamingApp").setMaster("local[*]")
    val ssc = new StreamingContext(conf,Seconds(3))
    ssc.checkpoint("file:///Users/tianciyu/Desktop/ch")
    val bootstrapServer:String = "hadoop102:9092,hadoop103:9092,hadoop104:9092"
    val group:String = "consumer-group_direct_082"
    val deserializer:String = "org.apache.kafka.common.serialization.StringDeserializer"
    val topic:String = "topic_direct_082"

    val paramsMap: Map[String, String] = Map[String,String] (
      ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> bootstrapServer,
      ConsumerConfig.GROUP_ID_CONFIG -> group,
      ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> deserializer,
      ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> deserializer
    )
    val lines: InputDStream[(String, String)] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
      ssc,
      paramsMap,
      Set(topic))

    lines.print()
    ssc
  }

  def main(args: Array[String]): Unit = {
    val ssc: StreamingContext = StreamingContext.getActiveOrCreate("file:///Users/tianciyu/Desktop/ch", ()=>createSSC())
    ssc.start()
    ssc.awaitTermination()
  }

}

我们也可以在处理每个批次时,获取当前消费的 offset:

...

lines.transform { rdd =>
      offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
      rdd
    }.foreachRDD { rdd =>
      for (o <- offsetRanges) {
        println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
      }
    }

...

这里有两点需要注意:1. rdd.asInstanceOf[HasOffsetRanges]操作只在 Dstream 的第一个方法调用中有效,如果需要的话一般将transform()作为第一个调用。 2. KafkaRDD 的分区和 Kakfa topic 中的分区一一对应是在初始阶段,如果后续使用了 shuffle 算子如 reduceByKey(),repartition()等操作将不再一一对应。

3.2.2 kafak version: 0.10

对于 0.10 版本而言,多了两个参数。

@param locationStrategy In most cases, pass in LocationStrategies.preferConsistent,
@param consumerStrategy In most cases, pass in ConsumerStrategies.subscribe

我们分别传入最常用的 preferConsistent 和 subscribe 即可。其他和 0.82 版本类似:

...

val topicsSet = "topic_direct_082".split(",").toSet
val kafkaParams = Map[String, Object]("bootstrap.servers" -> "hadoop102:9092",
                                      "key.deserializer"->classOf[StringDeserializer],
                                      "value.deserializer"-> classOf[StringDeserializer],
                                      "group.id"->"consumer-group_direct_010")

val messages = KafkaUtils.createDirectStream[String, String](
  ssc,
  LocationStrategies.PreferConsistent,
  ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams))

...

还有一点与 0.8.2 不同的是 offset 的保存方式。0.8.2 保存在 checkpoint 中(如不设置每次都会触发auto.offset.reset逻辑)。而在 0.10 中,offset 是保存在 kafka 的 topic:__consumer_offsets 里面,定期的自动提交。

3.3 Offset Management

部分内容已经在前面提及,这里做一下汇总和补充。

3.3.1 自动提交 offset

通过配置 enable.auto.commit 为 true 并结合 auto.commit.interval.ms 提交间隔进行自动提交的设置。

其中 receiver based 方式会将 offset 自动提交到 Zookeeper。direct 方式根据版本不同提交的位置不同,0.8.2 版本是自动保存在 checkpoint 中,所以需要手动配置 checkpoint ,否则无效。 0.10 版本自动提交到 Kakfa broker 中的名为 __consumer_offsets 的 topic 里。这种方式无需另外配置。

3.3.2 手动提交 offset

从上面自动提交可以发现,offset 存放的地方至少有三种:Zookeeper、checkpoint、Kafka。所以手动提交一样可以将 offset 提交到这三个地方,其中 checkpoint 并不推荐,原因是 checkpoint 自身具有缺陷,比如无法更新代码等原因。这里以 Zookeeper 的方式为例进行演示:

package kafka2ss.direct

import kafka.common.TopicAndPartition
import kafka.consumer
import kafka.message.MessageAndMetadata
import kafka.serializer.StringDecoder
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka.KafkaCluster.Err
import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaCluster, KafkaUtils, OffsetRange}
import org.apache.spark.streaming.{Seconds, StreamingContext}

import scala.collection.immutable.HashMap


object ManualOffsetZK {

  def getKafkaStream(kafkaParams: Map[String, String], group: String, ssc: StreamingContext, topics: String*): InputDStream[(String, String)] = {
    val kafkaCluster = new KafkaCluster(kafkaParams)
    var kafkaStream: InputDStream[(String, String)] = null
    val partitionsE: Either[Err, Set[TopicAndPartition]] = kafkaCluster.getPartitions(topics.toSet)
    if (partitionsE.isLeft) throw new SparkException("get kafka partition failed:")

    val partitions: Set[TopicAndPartition] = partitionsE.right.get
    //从zookeeper中获取offset信息
    val offsetsE: Either[Err, Map[TopicAndPartition, Long]] = kafkaCluster.getConsumerOffsets(group, partitions)
    //如果在zookeeper中没有记录,就从最小的offset开始消费
    if (offsetsE.isLeft) {
      kafkaParams + ("auto.offset.reset" -> "smallest")
      kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics.toSet)
    } else {
      val offsets: Map[TopicAndPartition, Long] = offsetsE.right.get

      val earliestOffsets: Map[TopicAndPartition, KafkaCluster.LeaderOffset] = kafkaCluster.getEarliestLeaderOffsets(partitions).right.get
      var newConsumerOffsets: HashMap[TopicAndPartition, Long] = HashMap()

      offsets.foreach((f: (TopicAndPartition, Long)) => {
        val min: Long = earliestOffsets(f._1).offset
        //如果zookeeper中记录的offset在kafka中不存在(已经过期),就指定其现有kafka的最小offset位置开始消费
        newConsumerOffsets += (f._1 -> Math.max(f._2,min))
      })

      kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](
        ssc, kafkaParams, newConsumerOffsets, (mmd: MessageAndMetadata[String, String]) => (mmd.key, mmd.message))

    }

    kafkaStream
  }

  def saveOffset(kafkaCluster: KafkaCluster, group: String, kafkaDStream: InputDStream[(String,String)]): Unit = {

    kafkaDStream.map(_._2).foreachRDD {
      rdd: RDD[String] => {
        val offsetRanges: HasOffsetRanges = rdd.asInstanceOf[HasOffsetRanges]
        val ranges: Array[OffsetRange] = offsetRanges.offsetRanges
        var topicAndPartitionToOffset = new HashMap[TopicAndPartition, Long]()
        for (range <- ranges) {
          topicAndPartitionToOffset += (range.topicAndPartition() -> range.untilOffset)
        }
        kafkaCluster.setConsumerOffsets(group, topicAndPartitionToOffset)
      }
    }
  }

  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("KafkaLowStreamingApp").setMaster("local[*]")
    val ssc = new StreamingContext(conf, Seconds(3))
    val bootstrapServer: String = "hadoop102:9092"
    val group: String = "consumer_group_offset_management_zk"
    val deserializer: String = "org.apache.kafka.common.serialization.StringDeserializer"
    val topic: String = "topic_offset_management_zk"

    val paramsMap: Map[String, String] = Map[String, String](
      "zookeeper.connect" -> "hadoop102:2181,hadoop103:2181,hadoop104:2181",
      ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> bootstrapServer,
      ConsumerConfig.GROUP_ID_CONFIG -> group,
      ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> deserializer,
      ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> deserializer
    )

    val kafkaCluster = new KafkaCluster(paramsMap)
    val kafkaDStream: InputDStream[(String, String)] = getKafkaStream(paramsMap,group,ssc,topic)

    //消费
    kafkaDStream.map(_._2).flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _).print
    saveOffset(kafkaCluster, group, kafkaDStream)

    ssc.start()
    ssc.awaitTermination()

  }

}

3.4 Exactly-once semantics

在讲解如何实现 Exactly-once 语义前,先了解以流处理系统中语义的概念。

The semantics of streaming systems are often captured in terms of how many times each record can be processed by the system. There are three types of guarantees that a system can provide under all possible operating conditions (despite failures, etc.)

At most once: Each record will be either processed once or not processed at all.
At least once: Each record will be processed one or more times. This is stronger than at-most once as it ensure that no data will be lost. But there may be duplicates.
Exactly once: Each record will be processed exactly once - no data will be lost and no data will be processed multiple times. This is obviously the strongest guarantee of the three.

首先要明确的是,所谓语义一定是和故障恢复关联的,如果不考虑故障的话,全都是 exactly-once。那么如何能做到所谓的精确一次消费呢?

If a streaming application has to achieve end-to-end exactly-once guarantees, then each step has to provide an exactly-once guarantee. That is, each record must be received exactly once, transformed exactly once, and pushed to downstream systems exactly once. Let’s understand the semantics of these steps in the context of Spark Streaming.

如果保证端到端的精确一次消费,需要满足三个过程都是精确一次消费。结合语义和故障恢复的强关联,换句话说,所谓 Exactly once 就是三个过程的容错性保证

  1. 接收数据:

If all of the input data is already present in a fault-tolerant file system like HDFS, Spark Streaming can always recover from any failure and process all of the data.This gives exactly-once semantics, meaning all of the data will be processed exactly once no matter what fails.

  • 如果数据源来自 HDFS 这种支持容错的存储系统的文件,那么该过程可以保证精确一次语义。官网这段话指出了保证接收数据时精确一次消费的要点:容错性。需要注意这里的容错性隐性包含了:原数据端必须支持随机读取。比如通过控制台 socket 作为原数据端无法保证 exactly-once,最核心的问题是在故障恢复的时候没办法读取之前的数据。
  • receiver based 对接 Kafka 的方式,只能保证最少一次语义
  • direct 模式对接 Kafka 可以保证精确一次

关于 receiver based 方式只能保证一次语义,direct 可以保证精确一次的官网解释如下:

There is a small chance some records may get consumed twice under some failures. This occurs because of inconsistencies between data reliably received by Spark Streaming and offsets tracked by Zookeeper. Hence, in this second approach, we use simple Kafka API that does not use Zookeeper. Offsets are tracked by Spark Streaming within its checkpoints. This eliminates inconsistencies between Spark Streaming and Zookeeper/Kafka, and so each record is received by Spark Streaming effectively exactly once despite failures.

这段话理解起来有一定难度,难点在于理解什么是 Spark Streaming 数据的接收与 Zookeeper 维护 offset 的不一致性。因为要保证最少一次或者精确一次的前提是不能丢数据,所以 receiver based 肯定是开启了 WAL 机制的。设想这样一种场景:receiver 从 Kakfa 拉数据并写入 WAL 后,没来得及更新 offset 时,receiver 挂掉。恢复后,因为 WAL 的机制,Spark Streaming 会从 WAL 中恢复计算和数据,消费了之前的数据,但是 offset 是没有更新的,所以拉取数据时会重复拉取上一批数据并二次消费。而 direct 模式消除了这种不一致性,数据的接收和 offset 的维护都是 Spark Streaming 自己负责。

  1. 处理数据:因为底层依赖的 RDD 的各种容错机制,可以在处理数据过程中即使发生错误也能保证精确一次消费。具体可以参考这篇文章:spark RDD 容错

  2. 输出结果

  • offset 和结果放在不同位置,根据二者执行的先后顺序有 at least once 和 at most once
  • offset 和结果放在一个(repartition(1))事务中 --> exactly-once

4. Ref

  1. http://spark.apache.org/docs/2.4.2/streaming-programming-guide.html

  2. http://spark.apache.org/docs/2.4.2/streaming-kafka-0-8-integration.html

  3. http://spark.apache.org/docs/2.4.2/streaming-kafka-0-10-integration.html

  4. What is the difference between simple consumer and high level consumer?

  5. https://www.slideshare.net/QuentinAmbard/exactly-once-with-spark-streaming

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,362评论 5 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,330评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,247评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,560评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,580评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,569评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,929评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,587评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,840评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,596评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,678评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,366评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,945评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,929评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,165评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 43,271评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,403评论 2 342