翻译者:倪辰皓
最后翻译时间:2017/06/18
目前状态:已完成
转载请附上本文链接: http://www.jianshu.com/p/669294ea8dbd
The Spark Streaming integration for Kafka 0.10 is similar in design to the 0.8 Direct Stream approach. It provides simple parallelism, 1:1 correspondence between Kafka partitions and Spark partitions, and access to offsets and metadata. However, because the newer integration uses the new Kafka consumer API instead of the simple API, there are notable differences in usage. This version of the integration is marked as experimental, so the API is potentially subject to change.
对于kafka 0.10的Spark Streaming集成和0.8版本的Direct Stream approach
在设计上是相似的。它提供简单的并行化(parallelism),kafka partitions与Spark partitions1:1的对应,以及获取offsets和元数据(metadata)。但是,因为新的集成使用new Kafka consumer API
替换了简单API,所以在使用上有着显著的一些不同。这个集成的版本被标记为实验性的,所以这个API可能会调整。
Linking
For Scala/Java applications using SBT/Maven project definitions, link your streaming application with the following artifact (see Linking section in the main programming guide for further information).
对于使用SBT/Maven project definitions的Java/Scala程序,使用以下artifact 来连接到你的streaming应用(see Linking section in the main programming guide for further information).
groupId = org.apache.spark
artifactId = spark-streaming-kafka-0-10_2.11
version = 2.1.1
Creating a Direct Stream
Note that the namespace for the import includes the version, org.apache.spark.streaming.kafka010
注意,import的命名空间包括了版本,org.apache.spark.streaming.kafka010
import java.util.*;
import org.apache.spark.SparkConf;
import org.apache.spark.TaskContext;
import org.apache.spark.api.java.*;
import org.apache.spark.api.java.function.*;
import org.apache.spark.streaming.api.java.*;
import org.apache.spark.streaming.kafka010.*;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import scala.Tuple2;
Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put("bootstrap.servers", "localhost:9092,anotherhost:9092");
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", StringDeserializer.class);
kafkaParams.put("group.id", "use_a_separate_group_id_for_each_stream");
kafkaParams.put("auto.offset.reset", "latest");
kafkaParams.put("enable.auto.commit", false);
Collection<String> topics = Arrays.asList("topicA", "topicB");
final JavaInputDStream<ConsumerRecord<String, String>> stream =
KafkaUtils.createDirectStream(
streamingContext,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)
);
stream.mapToPair(
new PairFunction<ConsumerRecord<String, String>, String, String>() {
@Override
public Tuple2<String, String> call(ConsumerRecord<String, String> record) {
return new Tuple2<>(record.key(), record.value());
}
})
For possible kafkaParams, see Kafka consumer config docs. If your Spark batch duration is larger than the default Kafka heartbeat session timeout (30 seconds), increase heartbeat.interval.ms and session.timeout.ms appropriately. For batches larger than 5 minutes, this will require changing group.max.session.timeout.ms on the broker. Note that the example sets enable.auto.commit to false, for discussion see Storing Offsets below.
如果需要了解更多的kafka参数,请查看Kafka consumer config docs.如果你的Spark batch duration比默认的 Kafka heartbeat session timeout (30 seconds)更大,需要合适地增加heartbeat.interval.ms以及session.timeout.ms.对于batches大于5分钟的,这将需要改变broker上的group.max.session.timeout.ms.注意,例子中设置了enable.auto.commit为false,关于这里的讨论,请参见下面的 Storing Offsets.
LocationStrategies
The new Kafka consumer API will pre-fetch messages into buffers. Therefore it is important for performance reasons that the Spark integration keep cached consumers on executors (rather than recreating them for each batch), and prefer to schedule partitions on the host locations that have the appropriate consumers.
In most cases, you should use LocationStrategies.PreferConsistent as shown above. This will distribute partitions evenly across available executors. If your executors are on the same hosts as your Kafka brokers, use PreferBrokers, which will prefer to schedule partitions on the Kafka leader for that partition. Finally, if you have a significant skew in load among partitions, use PreferFixed. This allows you to specify an explicit mapping of partitions to hosts (any unspecified partitions will use a consistent location).
The cache for consumers has a default maximum size of 64. If you expect to be handling more than (64 * number of executors) Kafka partitions, you can change this setting via spark.streaming.kafka.consumer.cache.maxCapacity
The cache is keyed by topicpartition and group.id, so use a separate group.id for each call to createDirectStream.
新的Kafka consumer API 将会预提取消息到缓存中。因此,为了性能的缘故,很重要的是Spark集成将缓存的消费者保留在了executors(而不是为每一个batch重复创建它们),并且推荐在那些有合适的消费者的host locations调度partition.
在大多数的情况下,你应该使用LocationStrategies.PreferConsistent
,正如上面显示的那样。这将会均匀的分布partition给可获取的executors.如果你的executors和你的kafka brokers在相同节点,使用PreferBrokers
,这将更优先在kafka leader上调度partitions.最后,如果你在partitions的load有一个有意义的倾斜,使用PreferFixed
. 这将允许你制定一个显示的,partitions到hosts的映射关系(所以没有指定的分区将会使用一个一致的位置)。
默认的最大的消费者的缓存的大小是64。如果你希望去处理更多(64 * number of executors)kafka分区,通过设置spark.streaming.kafka.consumer.cache.maxCapacity
, 你可以改变这个配置。
这个缓存是被topic,partition以及group.id来标记的,所以为每一个调用createDirectStream使用单独的group.id
ConsumerStrategies
The new Kafka consumer API has a number of different ways to specify topics, some of which require considerable post-object-instantiation setup. ConsumerStrategies provides an abstraction that allows Spark to obtain properly configured consumers even after restart from checkpoint.
ConsumerStrategies.Subscribe, as shown above, allows you to subscribe to a fixed collection of topics. SubscribePattern allows you to use a regex to specify topics of interest. Note that unlike the 0.8 integration, using Subscribe or SubscribePattern should respond to adding partitions during a running stream. Finally, Assign allows you to specify a fixed collection of partitions. All three strategies have overloaded constructors that allow you to specify the starting offset for a particular partition.
If you have specific consumer setup needs that are not met by the options above, ConsumerStrategy is a public class that you can extend.
新的kafka消费者API有着大量的不同方法来指定一个topic,其中一些需要大量的post-object-instantiation步骤。ConsumerStrategies提供了一个抽象,即允许Spark来获取正确配置的消费者,即使在从检查点重启以后
ConsumerStrategies.Subscribe
,正如上面显示的,允许你去订阅一个固定的topics的集合。SubscribePattern
允许你使用一个正则表达式来指定感兴趣的topic。注意,不像0.8集成,使用Subscribe
或SubscribePattern
应该在运行的流中对添加分区做出响应。最后,Assign
允许你去指定一个固定的分区的集合。这所有的三条策略已经重载了构造方法即允许你去指定一个特定的分区开始的offset。
如果你有着特定的消费者的启动需求没有被上述选项满足的,ConsumerStrategy
是一个public
的类,你可以扩展它。
Creating an RDD
If you have a use case that is better suited to batch processing, you can create an RDD for a defined range of offsets.
如果你有一个用例更适合批处理,你可以为一个定位范围内的offset创建一个RDD
// Import dependencies and create kafka params as in Create Direct Stream above
OffsetRange[] offsetRanges = {
// topic, partition, inclusive starting offset, exclusive ending offset
OffsetRange.create("test", 0, 0, 100),
OffsetRange.create("test", 1, 0, 100)
};
JavaRDD<ConsumerRecord<String, String>> rdd = KafkaUtils.createRDD(
sparkContext,
kafkaParams,
offsetRanges,
LocationStrategies.PreferConsistent()
);
Note that you cannot use PreferBrokers, because without the stream there is not a driver-side consumer to automatically look up broker metadata for you. Use PreferFixed with your own metadata lookups if necessary.
注意,你不能使用 PreferBrokers
,因为如果没有流,将没有一个driver-side的消费者来自动地为你查找broker元数据。如果有需要的话,使用 PreferFixed
和你自己的元数据查找。
Obtaining Offsets
stream.foreachRDD(new VoidFunction<JavaRDD<ConsumerRecord<String, String>>>() {
@Override
public void call(JavaRDD<ConsumerRecord<String, String>> rdd) {
final OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
rdd.foreachPartition(new VoidFunction<Iterator<ConsumerRecord<String, String>>>() {
@Override
public void call(Iterator<ConsumerRecord<String, String>> consumerRecords) {
OffsetRange o = offsetRanges[TaskContext.get().partitionId()];
System.out.println(
o.topic() + " " + o.partition() + " " + o.fromOffset() + " " + o.untilOffset());
}
});
}
});
Note that the typecast to HasOffsetRanges will only succeed if it is done in the first method called on the result of createDirectStream, not later down a chain of methods. Be aware that the one-to-one mapping between RDD partition and Kafka partition does not remain after any methods that shuffle or repartition, e.g. reduceByKey() or window().
注意,只有在第一个方法调用createDirectStream的结果完成,不晚于一连串的方法(?翻译有问题),HasOffsetRanges
的强制转换才会成功。要知道在任何重组或重新分区的方法执行以后,比如reduceByKey() or window(),RDD partition和kafka partition的一对一映射将不会保持。
Storing Offsets
Kafka delivery semantics in the case of failure depend on how and when offsets are stored. Spark output operations are at-least-once. So if you want the equivalent of exactly-once semantics, you must either store offsets after an idempotent output, or store offsets in an atomic transaction alongside output. With this integration, you have 3 options, in order of increasing reliablity (and code complexity), for how to store offsets.
在失败的情况下,kafka传递语义依赖于如何以及何时存储offset.Spark输出操作是至少一次at-least-once
.所以如果你希望相当于exactly-once
的语义,你必须要么在一次幂等输出后存储offsets,或者和输出实现在同一个原子事务中。在这份集成中,你有三个选项来为了增加可靠性(以及代码灵活性)而存储offsets.
Checkpoints
If you enable Spark checkpointing, offsets will be stored in the checkpoint. This is easy to enable, but there are drawbacks. Your output operation must be idempotent, since you will get repeated outputs; transactions are not an option. Furthermore, you cannot recover from a checkpoint if your application code has changed. For planned upgrades, you can mitigate this by running the new code at the same time as the old code (since outputs need to be idempotent anyway, they should not clash). But for unplanned failures that require code changes, you will lose data unless you have another way to identify known good starting offsets.
如果你启用了Spark检查点,offsets将会被存储在检查点中。这很容易启动,但是存在着缺陷。考虑到你将会产生重复的操作,你的输出操作必须是幂等的。此外,如果你的程序代码改变了,你将不能从检查点恢复。为了计划中的一些升级,通过同时运行新的代码以及旧的代码(既然输出无论如何需要保证幂等,它们不会产生冲突),你可以缓和这一情况。但是对于计划之外的失败导致的需要更改代码,你将失去你的数据,除非你有其他的方式来确定offset的开始位置。
Kafka itself
Kafka has an offset commit API that stores offsets in a special Kafka topic. By default, the new consumer will periodically auto-commit offsets. This is almost certainly not what you want, because messages successfully polled by the consumer may not yet have resulted in a Spark output operation, resulting in undefined semantics. This is why the stream example above sets “enable.auto.commit” to false. However, you can commit offsets to Kafka after you know your output has been stored, using the commitAsync API. The benefit as compared to checkpoints is that Kafka is a durable store regardless of changes to your application code. However, Kafka is not transactional, so your outputs must still be idempotent.
kafka有一个offset提交的API来在一个特殊的kafka topic中存储offsets.默认情况下,新的消费者将会周期性的自动提交offsets.这几乎肯定不是你所想要的,因为消息成功的被消费者拉取,由于未定义的语义,可能在Spark的输出操作没有产生结果。这就是为什么上面的stream例子里将enable.auto.commit
设置为false.但是,你可以在你确信你的输出被存储了以后使用 commitAsync API
提交offset到kafka.这么做的优点,相比于检查点,就是kafka是可靠的存储而不管你的程序代码是否改变。但是,kafka是非事务的,所以你的输出必须仍然是幂等的。
stream.foreachRDD(new VoidFunction<JavaRDD<ConsumerRecord<String, String>>>() {
@Override
public void call(JavaRDD<ConsumerRecord<String, String>> rdd) {
OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
// some time later, after outputs have completed
((CanCommitOffsets) stream.inputDStream()).commitAsync(offsetRanges);
}
});
Your own data store
For data stores that support transactions, saving offsets in the same transaction as the results can keep the two in sync, even in failure situations. If you’re careful about detecting repeated or skipped offset ranges, rolling back the transaction prevents duplicated or lost messages from affecting results. This gives the equivalent of exactly-once semantics. It is also possible to use this tactic even for outputs that result from aggregations, which are typically hard to make idempotent.
对于支持事务的数据存储,在同一个事务中保存offset作为结果可以保证两者是同步的,即使在失败的情况下。如果你担心获取重复的或是跳过offset范围,回滚事务来防止受影响的结果造成的复制或者丢失信息。这给予了相当于 exactly-once
的语义。即使是对于那些典型的难以实现幂等的聚合操作,使用这一策略仍然是可以的。
// The details depend on your data store, but the general idea looks like this
// begin from the the offsets committed to the database
Map<TopicPartition, Long> fromOffsets = new HashMap<>();
for (resultSet : selectOffsetsFromYourDatabase)
fromOffsets.put(new TopicPartition(resultSet.string("topic"), resultSet.int("partition")), resultSet.long("offset"));
}
JavaInputDStream<ConsumerRecord<String, String>> stream = KafkaUtils.createDirectStream(
streamingContext,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String>Assign(fromOffsets.keySet(), kafkaParams, fromOffsets)
);
stream.foreachRDD(new VoidFunction<JavaRDD<ConsumerRecord<String, String>>>() {
@Override
public void call(JavaRDD<ConsumerRecord<String, String>> rdd) {
OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
Object results = yourCalculation(rdd);
// begin your transaction
// update results
// update offsets where the end of existing offsets matches the beginning of this batch of offsets
// assert that offsets were updated correctly
// end your transaction
}
});
SSL / TLS
The new Kafka consumer supports SSL. To enable it, set kafkaParams appropriately before passing to createDirectStream / createRDD. Note that this only applies to communication between Spark and Kafka brokers; you are still responsible for separately securing Spark inter-node communication.
新的kafka消费者支持SSL。为了启用它,在传递到createDirectStream / createRDD之前合适地配置kafka参数。注意,这只是应用于Spark和Kafka brokers之间的通信,你仍然需要负责单独地保证Spart节点间的通信的安全。
Map<String, Object> kafkaParams = new HashMap<String, Object>();
// the usual params, make sure to change the port in bootstrap.servers if 9092 is not TLS
kafkaParams.put("security.protocol", "SSL");
kafkaParams.put("ssl.truststore.location", "/some-directory/kafka.client.truststore.jks");
kafkaParams.put("ssl.truststore.password", "test1234");
kafkaParams.put("ssl.keystore.location", "/some-directory/kafka.client.keystore.jks");
kafkaParams.put("ssl.keystore.password", "test1234");
kafkaParams.put("ssl.key.password", "test1234");
Deploying
As with any Spark applications, spark-submit is used to launch your application.
正如同所有的Spark应用, spark-submit
被用来启动你的应用。
For Scala and Java applications, if you are using SBT or Maven for project management, then package spark-streaming-kafka-0-10_2.11 and its dependencies into the application JAR. Make sure spark-core_2.11 and spark-streaming_2.11 are marked as provided dependencies as those are already present in a Spark installation. Then use spark-submit to launch your application (see Deploying section in the main programming guide).
对于Scala和Java应用,如果你使用SBT或是Maven来管理项目,那么将spark-streaming-kafka-0-10_2.11
以及它的以来打包进应用的jar文件。请确保spark-core_2.11
以及spark-streaming_2.11
被标记为provided
的依赖,因为这些在Spark的安装中已经提供了。然后,使用spark-submit
来启动你的程序(可以在主编程指南中参见部署章节)