Spark Streaming + Kafka Integration Guide (Kafka broker version 0.10.0 or higher)

原文链接

翻译者:倪辰皓

最后翻译时间:2017/06/18

目前状态:已完成

转载请附上本文链接: http://www.jianshu.com/p/669294ea8dbd


The Spark Streaming integration for Kafka 0.10 is similar in design to the 0.8 Direct Stream approach. It provides simple parallelism, 1:1 correspondence between Kafka partitions and Spark partitions, and access to offsets and metadata. However, because the newer integration uses the new Kafka consumer API instead of the simple API, there are notable differences in usage. This version of the integration is marked as experimental, so the API is potentially subject to change.

对于kafka 0.10的Spark Streaming集成和0.8版本的Direct Stream approach在设计上是相似的。它提供简单的并行化(parallelism),kafka partitions与Spark partitions1:1的对应,以及获取offsets和元数据(metadata)。但是,因为新的集成使用new Kafka consumer API替换了简单API,所以在使用上有着显著的一些不同。这个集成的版本被标记为实验性的,所以这个API可能会调整。

Linking

For Scala/Java applications using SBT/Maven project definitions, link your streaming application with the following artifact (see Linking section in the main programming guide for further information).

对于使用SBT/Maven project definitions的Java/Scala程序,使用以下artifact 来连接到你的streaming应用(see Linking section in the main programming guide for further information).

groupId = org.apache.spark
artifactId = spark-streaming-kafka-0-10_2.11
version = 2.1.1

Creating a Direct Stream

Note that the namespace for the import includes the version, org.apache.spark.streaming.kafka010

注意,import的命名空间包括了版本,org.apache.spark.streaming.kafka010

import java.util.*;
import org.apache.spark.SparkConf;
import org.apache.spark.TaskContext;
import org.apache.spark.api.java.*;
import org.apache.spark.api.java.function.*;
import org.apache.spark.streaming.api.java.*;
import org.apache.spark.streaming.kafka010.*;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import scala.Tuple2;

Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put("bootstrap.servers", "localhost:9092,anotherhost:9092");
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", StringDeserializer.class);
kafkaParams.put("group.id", "use_a_separate_group_id_for_each_stream");
kafkaParams.put("auto.offset.reset", "latest");
kafkaParams.put("enable.auto.commit", false);

Collection<String> topics = Arrays.asList("topicA", "topicB");

final JavaInputDStream<ConsumerRecord<String, String>> stream =
  KafkaUtils.createDirectStream(
    streamingContext,
    LocationStrategies.PreferConsistent(),
    ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)
  );

stream.mapToPair(
  new PairFunction<ConsumerRecord<String, String>, String, String>() {
    @Override
    public Tuple2<String, String> call(ConsumerRecord<String, String> record) {
      return new Tuple2<>(record.key(), record.value());
    }
  })

For possible kafkaParams, see Kafka consumer config docs. If your Spark batch duration is larger than the default Kafka heartbeat session timeout (30 seconds), increase heartbeat.interval.ms and session.timeout.ms appropriately. For batches larger than 5 minutes, this will require changing group.max.session.timeout.ms on the broker. Note that the example sets enable.auto.commit to false, for discussion see Storing Offsets below.

如果需要了解更多的kafka参数,请查看Kafka consumer config docs.如果你的Spark batch duration比默认的 Kafka heartbeat session timeout (30 seconds)更大,需要合适地增加heartbeat.interval.ms以及session.timeout.ms.对于batches大于5分钟的,这将需要改变broker上的group.max.session.timeout.ms.注意,例子中设置了enable.auto.commit为false,关于这里的讨论,请参见下面的 Storing Offsets.

LocationStrategies

The new Kafka consumer API will pre-fetch messages into buffers. Therefore it is important for performance reasons that the Spark integration keep cached consumers on executors (rather than recreating them for each batch), and prefer to schedule partitions on the host locations that have the appropriate consumers.

In most cases, you should use LocationStrategies.PreferConsistent as shown above. This will distribute partitions evenly across available executors. If your executors are on the same hosts as your Kafka brokers, use PreferBrokers, which will prefer to schedule partitions on the Kafka leader for that partition. Finally, if you have a significant skew in load among partitions, use PreferFixed. This allows you to specify an explicit mapping of partitions to hosts (any unspecified partitions will use a consistent location).

The cache for consumers has a default maximum size of 64. If you expect to be handling more than (64 * number of executors) Kafka partitions, you can change this setting via spark.streaming.kafka.consumer.cache.maxCapacity

The cache is keyed by topicpartition and group.id, so use a separate group.id for each call to createDirectStream.

新的Kafka consumer API 将会预提取消息到缓存中。因此,为了性能的缘故,很重要的是Spark集成将缓存的消费者保留在了executors(而不是为每一个batch重复创建它们),并且推荐在那些有合适的消费者的host locations调度partition.

在大多数的情况下,你应该使用LocationStrategies.PreferConsistent,正如上面显示的那样。这将会均匀的分布partition给可获取的executors.如果你的executors和你的kafka brokers在相同节点,使用PreferBrokers,这将更优先在kafka leader上调度partitions.最后,如果你在partitions的load有一个有意义的倾斜,使用PreferFixed. 这将允许你制定一个显示的,partitions到hosts的映射关系(所以没有指定的分区将会使用一个一致的位置)。

默认的最大的消费者的缓存的大小是64。如果你希望去处理更多(64 * number of executors)kafka分区,通过设置spark.streaming.kafka.consumer.cache.maxCapacity, 你可以改变这个配置。

这个缓存是被topic,partition以及group.id来标记的,所以为每一个调用createDirectStream使用单独的group.id

ConsumerStrategies

The new Kafka consumer API has a number of different ways to specify topics, some of which require considerable post-object-instantiation setup. ConsumerStrategies provides an abstraction that allows Spark to obtain properly configured consumers even after restart from checkpoint.

ConsumerStrategies.Subscribe, as shown above, allows you to subscribe to a fixed collection of topics. SubscribePattern allows you to use a regex to specify topics of interest. Note that unlike the 0.8 integration, using Subscribe or SubscribePattern should respond to adding partitions during a running stream. Finally, Assign allows you to specify a fixed collection of partitions. All three strategies have overloaded constructors that allow you to specify the starting offset for a particular partition.

If you have specific consumer setup needs that are not met by the options above, ConsumerStrategy is a public class that you can extend.

新的kafka消费者API有着大量的不同方法来指定一个topic,其中一些需要大量的post-object-instantiation步骤。ConsumerStrategies提供了一个抽象,即允许Spark来获取正确配置的消费者,即使在从检查点重启以后

ConsumerStrategies.Subscribe,正如上面显示的,允许你去订阅一个固定的topics的集合。SubscribePattern允许你使用一个正则表达式来指定感兴趣的topic。注意,不像0.8集成,使用SubscribeSubscribePattern应该在运行的流中对添加分区做出响应。最后,Assign允许你去指定一个固定的分区的集合。这所有的三条策略已经重载了构造方法即允许你去指定一个特定的分区开始的offset。

如果你有着特定的消费者的启动需求没有被上述选项满足的,ConsumerStrategy是一个public的类,你可以扩展它。

Creating an RDD

If you have a use case that is better suited to batch processing, you can create an RDD for a defined range of offsets.

如果你有一个用例更适合批处理,你可以为一个定位范围内的offset创建一个RDD

 // Import dependencies and create kafka params as in Create Direct Stream above
 
 OffsetRange[] offsetRanges = {
   // topic, partition, inclusive starting offset, exclusive ending offset
   OffsetRange.create("test", 0, 0, 100),
   OffsetRange.create("test", 1, 0, 100)
 };
 
 JavaRDD<ConsumerRecord<String, String>> rdd = KafkaUtils.createRDD(
   sparkContext,
   kafkaParams,
   offsetRanges,
   LocationStrategies.PreferConsistent()
 );

Note that you cannot use PreferBrokers, because without the stream there is not a driver-side consumer to automatically look up broker metadata for you. Use PreferFixed with your own metadata lookups if necessary.

注意,你不能使用 PreferBrokers,因为如果没有流,将没有一个driver-side的消费者来自动地为你查找broker元数据。如果有需要的话,使用 PreferFixed 和你自己的元数据查找。

Obtaining Offsets

 stream.foreachRDD(new VoidFunction<JavaRDD<ConsumerRecord<String, String>>>() {
   @Override
   public void call(JavaRDD<ConsumerRecord<String, String>> rdd) {
     final OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
     rdd.foreachPartition(new VoidFunction<Iterator<ConsumerRecord<String, String>>>() {
       @Override
       public void call(Iterator<ConsumerRecord<String, String>> consumerRecords) {
         OffsetRange o = offsetRanges[TaskContext.get().partitionId()];
         System.out.println(
           o.topic() + " " + o.partition() + " " + o.fromOffset() + " " + o.untilOffset());
       }
     });
   }
 });

Note that the typecast to HasOffsetRanges will only succeed if it is done in the first method called on the result of createDirectStream, not later down a chain of methods. Be aware that the one-to-one mapping between RDD partition and Kafka partition does not remain after any methods that shuffle or repartition, e.g. reduceByKey() or window().

注意,只有在第一个方法调用createDirectStream的结果完成,不晚于一连串的方法(?翻译有问题),HasOffsetRanges的强制转换才会成功。要知道在任何重组或重新分区的方法执行以后,比如reduceByKey() or window(),RDD partition和kafka partition的一对一映射将不会保持。

Storing Offsets

Kafka delivery semantics in the case of failure depend on how and when offsets are stored. Spark output operations are at-least-once. So if you want the equivalent of exactly-once semantics, you must either store offsets after an idempotent output, or store offsets in an atomic transaction alongside output. With this integration, you have 3 options, in order of increasing reliablity (and code complexity), for how to store offsets.

在失败的情况下,kafka传递语义依赖于如何以及何时存储offset.Spark输出操作是至少一次at-least-once.所以如果你希望相当于exactly-once的语义,你必须要么在一次幂等输出后存储offsets,或者和输出实现在同一个原子事务中。在这份集成中,你有三个选项来为了增加可靠性(以及代码灵活性)而存储offsets.

Checkpoints

If you enable Spark checkpointing, offsets will be stored in the checkpoint. This is easy to enable, but there are drawbacks. Your output operation must be idempotent, since you will get repeated outputs; transactions are not an option. Furthermore, you cannot recover from a checkpoint if your application code has changed. For planned upgrades, you can mitigate this by running the new code at the same time as the old code (since outputs need to be idempotent anyway, they should not clash). But for unplanned failures that require code changes, you will lose data unless you have another way to identify known good starting offsets.

如果你启用了Spark检查点,offsets将会被存储在检查点中。这很容易启动,但是存在着缺陷。考虑到你将会产生重复的操作,你的输出操作必须是幂等的。此外,如果你的程序代码改变了,你将不能从检查点恢复。为了计划中的一些升级,通过同时运行新的代码以及旧的代码(既然输出无论如何需要保证幂等,它们不会产生冲突),你可以缓和这一情况。但是对于计划之外的失败导致的需要更改代码,你将失去你的数据,除非你有其他的方式来确定offset的开始位置。

Kafka itself

Kafka has an offset commit API that stores offsets in a special Kafka topic. By default, the new consumer will periodically auto-commit offsets. This is almost certainly not what you want, because messages successfully polled by the consumer may not yet have resulted in a Spark output operation, resulting in undefined semantics. This is why the stream example above sets “enable.auto.commit” to false. However, you can commit offsets to Kafka after you know your output has been stored, using the commitAsync API. The benefit as compared to checkpoints is that Kafka is a durable store regardless of changes to your application code. However, Kafka is not transactional, so your outputs must still be idempotent.

kafka有一个offset提交的API来在一个特殊的kafka topic中存储offsets.默认情况下,新的消费者将会周期性的自动提交offsets.这几乎肯定不是你所想要的,因为消息成功的被消费者拉取,由于未定义的语义,可能在Spark的输出操作没有产生结果。这就是为什么上面的stream例子里将enable.auto.commit设置为false.但是,你可以在你确信你的输出被存储了以后使用 commitAsync API 提交offset到kafka.这么做的优点,相比于检查点,就是kafka是可靠的存储而不管你的程序代码是否改变。但是,kafka是非事务的,所以你的输出必须仍然是幂等的。

stream.foreachRDD(new VoidFunction<JavaRDD<ConsumerRecord<String, String>>>() {
  @Override
  public void call(JavaRDD<ConsumerRecord<String, String>> rdd) {
    OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();

    // some time later, after outputs have completed
    ((CanCommitOffsets) stream.inputDStream()).commitAsync(offsetRanges);
  }
});

Your own data store

For data stores that support transactions, saving offsets in the same transaction as the results can keep the two in sync, even in failure situations. If you’re careful about detecting repeated or skipped offset ranges, rolling back the transaction prevents duplicated or lost messages from affecting results. This gives the equivalent of exactly-once semantics. It is also possible to use this tactic even for outputs that result from aggregations, which are typically hard to make idempotent.

对于支持事务的数据存储,在同一个事务中保存offset作为结果可以保证两者是同步的,即使在失败的情况下。如果你担心获取重复的或是跳过offset范围,回滚事务来防止受影响的结果造成的复制或者丢失信息。这给予了相当于 exactly-once的语义。即使是对于那些典型的难以实现幂等的聚合操作,使用这一策略仍然是可以的。

// The details depend on your data store, but the general idea looks like this

// begin from the the offsets committed to the database
Map<TopicPartition, Long> fromOffsets = new HashMap<>();
for (resultSet : selectOffsetsFromYourDatabase)
  fromOffsets.put(new TopicPartition(resultSet.string("topic"), resultSet.int("partition")), resultSet.long("offset"));
}

JavaInputDStream<ConsumerRecord<String, String>> stream = KafkaUtils.createDirectStream(
  streamingContext,
  LocationStrategies.PreferConsistent(),
  ConsumerStrategies.<String, String>Assign(fromOffsets.keySet(), kafkaParams, fromOffsets)
);

stream.foreachRDD(new VoidFunction<JavaRDD<ConsumerRecord<String, String>>>() {
  @Override
  public void call(JavaRDD<ConsumerRecord<String, String>> rdd) {
    OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
    
    Object results = yourCalculation(rdd);

    // begin your transaction

    // update results
    // update offsets where the end of existing offsets matches the beginning of this batch of offsets
    // assert that offsets were updated correctly

    // end your transaction
  }
});

SSL / TLS

The new Kafka consumer supports SSL. To enable it, set kafkaParams appropriately before passing to createDirectStream / createRDD. Note that this only applies to communication between Spark and Kafka brokers; you are still responsible for separately securing Spark inter-node communication.

新的kafka消费者支持SSL。为了启用它,在传递到createDirectStream / createRDD之前合适地配置kafka参数。注意,这只是应用于Spark和Kafka brokers之间的通信,你仍然需要负责单独地保证Spart节点间的通信的安全。

Map<String, Object> kafkaParams = new HashMap<String, Object>();
// the usual params, make sure to change the port in bootstrap.servers if 9092 is not TLS
kafkaParams.put("security.protocol", "SSL");
kafkaParams.put("ssl.truststore.location", "/some-directory/kafka.client.truststore.jks");
kafkaParams.put("ssl.truststore.password", "test1234");
kafkaParams.put("ssl.keystore.location", "/some-directory/kafka.client.keystore.jks");
kafkaParams.put("ssl.keystore.password", "test1234");
kafkaParams.put("ssl.key.password", "test1234");

Deploying

As with any Spark applications, spark-submit is used to launch your application.

正如同所有的Spark应用, spark-submit被用来启动你的应用。

For Scala and Java applications, if you are using SBT or Maven for project management, then package spark-streaming-kafka-0-10_2.11 and its dependencies into the application JAR. Make sure spark-core_2.11 and spark-streaming_2.11 are marked as provided dependencies as those are already present in a Spark installation. Then use spark-submit to launch your application (see Deploying section in the main programming guide).

对于Scala和Java应用,如果你使用SBT或是Maven来管理项目,那么将spark-streaming-kafka-0-10_2.11以及它的以来打包进应用的jar文件。请确保spark-core_2.11以及spark-streaming_2.11被标记为provided的依赖,因为这些在Spark的安装中已经提供了。然后,使用spark-submit来启动你的程序(可以在主编程指南中参见部署章节

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,884评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,755评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,369评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,799评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,910评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,096评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,159评论 3 411
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,917评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,360评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,673评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,814评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,509评论 4 334
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,156评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,882评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,123评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,641评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,728评论 2 351

推荐阅读更多精彩内容