[译]Spark Streaming + Kafka集成指南

本文适用于Kafka broker 0.8.2.1及更高版本。

这里会说明如何配置Spark Streaming接收Kafka的数据。有两种方法 - 老方法使用Receiver和Kafka的高层API,新方法不适用Receiver。两种方法具有不同的编程模型,性能特点和语义保证,下面具体介绍。两种方法对于当前版本的Spark(2.1.1)都有稳定的API。

方法1:基于Receiver的方法

这个方法使用Receiver接收数据。Receiver使用Kafka的高层消费者API实现。和所有receiver一样,通过Receiver从Kafka接收的数据存储到Spark executor中,然后由Spark Streaming启动的作业处理这些数据。

但是,在默认配置下,这种方法会在出错时出现数据丢失(具体参见receiver reliability。为了保证零数据丢失,必须在Spark Streaming中额外启用Write Ahead Logs)。这样会同步保存所有接收到的Kafka数据到分布式文件系统(如HDFS)中,所有数据都可以从出错中进行恢复。

下面,讨论如何使用这种方法编写streaming应用程序。

  1. 链接:对于使用SBT/Maven工程定义的Scala/Java应用程序,需要将你的streaming应用程序链接到下面的artifact。
 groupId = org.apache.spark
 artifactId = spark-streaming-kafka-0-8_2.11
 version = 2.1.1

对于Python应用程序,必须字部署用用程序时添加上面的库及其依赖。参见下面的部署章节。

  1. 编程:在streaming应用程序代码中,引入KafkaUtils并创建一个输入DStream,如下。
 import org.apache.spark.streaming.kafka._

 val kafkaStream = KafkaUtils.createStream(streamingContext,
     [ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume])

需要记住的几点:

  • Kafka中的Topic分区和Spark Streaming中的RDD分区是不相关的。所以在KafkaUtils.createStream()增加指定topic分区数量只会增加单个receiver中消费topic的线程数量。不会增加Spark处理数据的并行性。
  • 对于不同group和topic可以创建多个Kafka输入DStream,使用多个receiver并行接收数据。
  • 如果已经启用了Write Ahead Logs,接收的数据会被复制到日志中。因此,需要将输入流的存储级别设置为StorageLevel.MEMORY_AND_DISK_SER(即KafkaUtils.createStream(..., StorageLevel.MEMORY_AND_DISK_SER))。
  1. 部署:对于任何Spark应用程序,spark-submit用于启动应用程序。但是,对于Scala/Java应用程序和Python应用程序有些不同。

对于Scala和Java应用程序,如果使用了SBT或者Maven管理项目,则会将spark-streaming-kafka-0-8_2.11及其依赖打包到应用程序JAR包中。确保spark-core_2.11spark-streaming_2.11标记为provided,它们在Spark安装包中已经存在了。然后使用spark-submit启动应用程序。

对于Python应用程序缺少了SBT/Maven项目管理,需要将spark-streaming-kafka-0-8_2.11及其依赖直接添加到spark-submit,使用--packages(具体参见Application Submission Guide)。如下:

./bin/spark-submit --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.1 ...

另外,也可以从Maven repository下载spark-streaming-kafka-0-8-assembly的JAR包,然后使用--jars添加到spark-submit

方法2:直接方法(不适用Receiver)
这种新方法从Spark 1.3开始支持,具有更强的端到端保证。和使用receiver接收数据不同,这种方法会周期性地查询Kafka每个topic+partition最新的偏移量,然后根据定义的偏移量范围在每个批次中处理数据。当处理数据的作业启动后,Kafka的简单消费者API会被用来读取定义偏移量范围的数据(和从文件系统中读取文件类似)。注意,这个特性是从Spark 1.3开始支持Scala和Java API,从Spark 1.4开始支持Python API。

这种方法相比于基于receiver的方法具有以下优势:

  • 简化并行:不需要创建多个输入Kafka流,然后合并它们。使用directStream,Spark Streaming会创建和Kafka分区一样多的RDD分区进行消费,会并行读取Kafka的数据。所以Kafka分区和RDD分区会有一一对应,更容易理解和使用。
  • 效率:方法1中实现零数据丢失需要将数据存储到Write Ahead Log,这会复制一遍数据。这实际上是低效的,因为数据复制了两次,一次是Kafka,一次是Write Ahead Log。方法2解决了这个问题,因为没有receiver,也就不需要Write Ahead Logs。只要有足够的Kafka缓冲,可以从Kafka恢复消息。
  • 只有一次语义:方法1使用Kafka的高层API在Zookeeper中存储消费的偏移量。这是从Kafka消费数据的传统方法。虽然这种方法(结合write ahead logs)可以保证零数据丢失(即至少一次语义),但是还是会有一些情况会在出错时导致一些记录被消费两次。这是因为Spark Streaming接收数据和Zookeeper跟踪的偏移量不一致导致的。因此,在方法2中,使用了简单Kafka API不适用Zookeeper。偏移量是在Spark Streaming的检查点中跟踪的。这就消除了Spark Streaming和Zookeeper/Kafka的不一致,每条记录都只会被Spark Streaming接收一次,即便在出错的情况下。为了实现结果输出的只有一次语义,数据存到外部存储的输出操作必须是幂等的,或是保存结果和偏移量的原子事务。

注意,这种方法的一个劣势是不在Zookeeper中更新偏移量,因此基于Zookeeper的Kafka监控工具就无法显示进度。但是,可以在每个批次中访问偏移量,然后自己更新到Zookeeper中。

下面,讨论如何使用这种方法编程。

  1. 链接:这种方法只有Scala/Java应用程序支持。SBT/Maven工程链接下面的artifact。
 groupId = org.apache.spark
 artifactId = spark-streaming-kafka-0-8_2.11
 version = 2.1.1
  1. 编程:在streaming应用程序代码中,引入KafkaUtils然后创建一个输入DStream,如下。
 import org.apache.spark.streaming.kafka._

 val directKafkaStream = KafkaUtils.createDirectStream[
     [key class], [value class], [key decoder class], [value decoder class] ](
     streamingContext, [map of Kafka parameters], [set of topics to consume])

可传递messageHandlercreateDirectStream,用于访问包含当前消息元数据的MessageAndMetadata并转为想要的格式。可参见API docsexample

在Kafka参数中,必须指定metadata.broker.listbootstrap.servers。默认地,会从每个Kafka分区的最近偏移量开始消费。如果设置了配置auto.offset.resetsmallest,则会从最小的偏移量开始。

也可以从任意偏移量开始消费,使用其它KafkaUtils.createDirectStream变量。另外,如果想要访问每个批次范根的Kafka偏移量,方法如下。

 // Hold a reference to the current offset ranges, so it can be used downstream
 var offsetRanges = Array.empty[OffsetRange]

 directKafkaStream.transform { rdd =>
   offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
   rdd
 }.map {
           ...
 }.foreachRDD { rdd =>
   for (o <- offsetRanges) {
     println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
   }
   ...
 }

如果想要使用基于Zookeeper的Kafka监控工具显示streaming应用程序的过程,可使用上述代码自己更新偏移量的信息到Zookeeper。

注意,HasOffsetRanges只在directKafkaStream调用的第一个方法中可以成功获取。可以使用transform()不用foreachRDD()作为第一个方法调用以便访问偏移量,然后再调用更多Spark方法。但是,需要意识到RDD分区和Kafka分区的一一对应关系在调用了shuffle或者repartition方法(如reduceByKey()或window())后就不存在了。

另外需要注意的是,由于这种方法不使用Receiver,标准receiver(spark.streaming.receiver.*配置相关)不能应用于这里的输入DStream。相反,使用spark.streaming.kafka.*配置。非常重要的一个是spark.streaming.kafka.maxRatePerPartition设置每个Kafka分区通过直接API读取的最大速率(每秒钟的记录数)。

  1. 部署:和方法1一样的。
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,634评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,951评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,427评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,770评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,835评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,799评论 1 294
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,768评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,544评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,979评论 1 308
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,271评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,427评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,121评论 5 340
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,756评论 3 324
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,375评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,579评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,410评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,315评论 2 352

推荐阅读更多精彩内容