早期SparkStreaming和Kafka集成,使用Write Ahead Logs (WALs)日志来实现,如下图。因为要同步保存所有收到的Kafka数据,写入分布式文件系统(例如HDFS)上,所以性能并不好。

Spark 1.3 中引入了这种新的receiver-less"direct"方法,以确保更强的端到端连接。此方法不使用接收器接收数据,而是定期查询 Kafka 在每个主题分区中的最新偏移量( offsets),并相应地定义要在每个批处理中处理的偏移范围(offset ranges)。启动处理数据的作业时,Kafka 的 simple consumer API用于读取 Kafka 定义的偏移范围(类似于文件系统中的读取文件)。

SparkStreaming与Kafka0.8版本整合,参考官方文档
Approach 2: Direct Approach (No Receivers)
This new receiver-less “direct” approach has been introduced in Spark 1.3 to ensure stronger end-to-end guarantees. Instead of using receivers to receive data, this approach periodically queries Kafka for the latest offsets in each topic+partition, and accordingly defines the offset ranges to process in each batch. When the jobs to process the data are launched, Kafka’s simple consumer API is used to read the defined ranges of offsets from Kafka (similar to read files from a file system). Note that this feature was introduced in Spark 1.3 for the Scala and Java API, in Spark 1.4 for the Python API.
Spark 1.3 中引入了这种新的receiver-less"direct"方法,以确保更强的端到端连接。此方法不使用接收器接收数据,而是定期查询 Kafka 在每个主题分区中的最新偏移量( offsets),并相应地定义要在每个批处理中处理的偏移范围(offset ranges)。启动处理数据的作业时,Kafka 的 simple consumer API用于读取 Kafka 定义的偏移范围(类似于文件系统中的读取文件)。请注意,此功能在 Spark 1.3 中引入了 Scala 和 Java API,在 Spark 1.4 中为 Python API。
This approach has the following advantages over the receiver-based approach (i.e. Approach 1).
与receiver-based approach的方法比较,这种 receiver-less “direct” approach方法具有以下优点。
-
Simplified Parallelism: No need to create multiple input Kafka streams and union them. With
directStream, Spark Streaming will create as many RDD partitions as there are Kafka partitions to consume, which will all read data from Kafka in parallel. So there is a one-to-one mapping between Kafka and RDD partitions, which is easier to understand and tune.简化并行性:无需创建多个输入 Kafka 流并将其联合。使用
directStream,Spark Streaming 将创建尽可能多的 RDD 分区,就像需要使用 Kafka 分区一样,这些分区将并行读取来自Kafka的数据。因此,Kafka 和 RDD 分区之间存在一一对映,更易于理解和调整。 -
Efficiency: Achieving zero-data loss in the first approach required the data to be stored in a Write-Ahead Log, which further replicated the data. This is actually inefficient as the data effectively gets replicated twice - once by Kafka, and a second time by the Write-Ahead Log. This second approach eliminates the problem as there is no receiver, and hence no need for Write-Ahead Logs. As long as you have sufficient Kafka retention, messages can be recovered from Kafka.
效率:在第一种方法中实现零数据丢失需要将数据存储在写入Write-Ahead Log中,从而进一步复制数据。这实际上效率低下,因为数据有效地被复制两次 - 一次由 Kafka 复制,第二次由Write-Ahead Log日志复制。第二种“direct” approach方法消除了问题,因为没有接收器,因此无需提前写入日志。只要你有足够的Kafka存储空间和合适的存储级别设置,消息可以从Kafka中恢复。
-
Exactly-once semantics: The first approach uses Kafka’s high-level API to store consumed offsets in Zookeeper. This is traditionally the way to consume data from Kafka. While this approach (in combination with-write-ahead logs) can ensure zero data loss (i.e. at-least once semantics), there is a small chance some records may get consumed twice under some failures. This occurs because of inconsistencies between data reliably received by Spark Streaming and offsets tracked by Zookeeper. Hence, in this second approach, we use simple Kafka API that does not use Zookeeper. Offsets are tracked by Spark Streaming within its checkpoints. This eliminates inconsistencies between Spark Streaming and Zookeeper/Kafka, and so each record is received by Spark Streaming effectively exactly once despite failures. In order to achieve exactly-once semantics for output of your results, your output operation that saves the data to an external data store must be either idempotent, or an atomic transaction that saves results and offsets (see Semantics of output operations in the main programming guide for further information).
有且只有一次语义(Exactly-once semantics):第一种方法使用 Kafka 的高级 API 在 Zookeeper 中存储消耗的偏移量。这是传统上使用卡夫卡数据的方法。虽然此方法(与预写日志结合使用)可以确保零数据丢失(即至少一次语义),但在某些小概率的情况下,某些记录可能消费两次。这是因为Spark Streaming可靠接收的数据与 Zookeeper 跟踪的偏移量之间的不一致。因此,在第二种方法中,我们使用不使用 Zookeeper 的简单 Kafka API。偏移量由 Spark Streaming在其checkpoints跟踪。这消除了 Spark 流和 Zookeeper/Kafka 之间的不一致,因此,尽管出现故障, Spark Streaming仍有效接收每个记录一次。为了实现结果输出的Exactly-once 语义,将数据存储保存到外部数据存储的输出操作必须是幂等的,
幂等(idempotent、idempotence)是一个数学与计算机学概念。在编程中一个幂等操作的特点是其任意多次执行所产生的影响均与一次执行的影响相同。
或者保存结果和偏移的原子事务(请参阅输出操作的语义(see Semantics of output operations ))。
Note that one disadvantage of this approach is that it does not update offsets in Zookeeper, hence Zookeeper-based Kafka monitoring tools will not show progress. However, you can access the offsets processed by this approach in each batch and update Zookeeper yourself (see below).
请注意,这种方法的一个缺点是它不更新存储在Zookeeper中的偏移量(offsets),因此基于 Zookeeper 的 Kafka 监控工具不会显示进度。但是,您可以访问每个批次中此方法处理的偏移量(offsets),并自行更新 Zookeeper(见下文)。
Next, we discuss how to use this approach in your streaming application.
接下来,我们将讨论如何在你的Spark Streaming程序中使用Direct Approach。
-
Linking: This approach is supported only in Scala/Java application. Link your SBT/Maven project with the following artifact (see Linking section in the main programming guide for further information).
链接:此方法仅在 Scala/Java 应用程序中支持。将 SBT/Maven 项目与以下工件链接(请参阅主编程指南中的链接部分( 见 Linking section有关详细信息)。
groupId = org.apache.spark artifactId = spark-streaming-kafka-0-8_2.12 version = 2.4.5Programming: In the streaming application code, import
KafkaUtilsand create an input DStream as follows.编程:在流式处理应用程序代码中,导入 KafkaUtils 并创建输入 DStream,如下所示。
import org.apache.spark.streaming.kafka._ val directKafkaStream = KafkaUtils.createDirectStream[ [key class], [value class], [key decoder class], [value decoder class] ]( streamingContext, [map of Kafka parameters], [set of topics to consume])
You can also pass a messageHandler to createDirectStream to access MessageAndMetadata that contains metadata about the current message and transform it to any desired type. See the API docs.
您还可以传递messageHandler 给 createDirectStream来连接 MessageAndMetadata,它包了含当前消息元数据的消息和元数据,并将其转换为任何所需的类型。请参阅 API docs文档。
In the Kafka parameters, you must specify either metadata.broker.list or bootstrap.servers. By default, it will start consuming from the latest offset of each Kafka partition. If you set configuration auto.offset.reset in Kafka parameters to smallest, then it will start consuming from the smallest offset.
在 Kafka parameters中,必须指定 metadata.broker.list 或 bootstrap.servers. 默认情况下,它将从每个 Kafka 分区的最新偏移量(latest offset)开始使用。如果将 Kafka 参数中的配置 auto.offset.reset 设置为最小 smallest,则它将开始从最小偏移量开始使用。
You can also start consuming from any arbitrary offset using other variations of KafkaUtils.createDirectStream. Furthermore, if you want to access the Kafka offsets consumed in each batch, you can do the following.
您还可以使用 KafkaUtils.createDirectStream 的其他变体从任意偏移开始消耗。此外,如果要访问每个批次中消耗的 Kafka 偏移量,可以执行以下操作。
// Hold a reference to the current offset ranges, so it can be used downstream
var offsetRanges = Array.empty[OffsetRange]
directKafkaStream.transform { rdd =>
offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
rdd
}.map {
...
}.foreachRDD { rdd =>
for (o <- offsetRanges) {
println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
}
...
}
You can use this to update Zookeeper yourself if you want Zookeeper-based Kafka monitoring tools to show progress of the streaming application.
如果您希望使用基于 Zookeeper 的 Kafka 监视工具来显示流应用程序的进度,你可以额外使用offsetRanges来更新 Zookeeper。
Note that the typecast to HasOffsetRanges will only succeed if it is done in the first method called on the directKafkaStream, not later down a chain of methods. You can use transform() instead of foreachRDD() as your first method call in order to access offsets, then call further Spark methods. However, be aware that the one-to-one mapping between RDD partition and Kafka partition does not remain after any methods that shuffle or repartition, e.g. reduceByKey() or window().
请注意,对 HasOffsetRanges 的类型转换,只有在第一个调用 directKafkaStream 的方法中完成,而不是稍后在方法链下执行,才会成功。您可以使用 transform() 而不是 foreachRDD() 作为第一个方法调用来访问偏移,然后进一步调用Spark的方法。但是,请注意,RDD 分区和 Kafka 分区之间的一一对应关系不会保留,如果使用了任何shuffle或 repartition的方法。例如 reduceByKey() 或window()之后保留。
Another thing to note is that since this approach does not use Receivers, the standard receiver-related (that is, configurations of the form spark.streaming.receiver.* ) will not apply to the input DStreams created by this approach (will apply to other input DStreams though). Instead, use the configurations spark.streaming.kafka.*. An important one is spark.streaming.kafka.maxRatePerPartition which is the maximum rate (in messages per second) at which each Kafka partition will be read by this direct API.
需要注意的另一点是,由于此方法不使用接收器,因此与接收器相关的配置参数spark.streaming.receiver.* 将不适用于此方法所创建的输入 DStreams。而应该使用配置 configurations spark.streaming.kafka.*. 其中特别重要的一个是spark.streaming.kafka.maxRatePerPartition 这是direct API从每个 Kafka 分区读取速率(以每秒的消息表示)的最大值。
-
Deploying: As with any Spark applications,
spark-submitis used to launch your application. However, the details are slightly different for Scala/Java applications and Python applications.部署:与任何 Spark 应用程序一样,Spark 提交用于启动应用程序。但是,Scala/Java 应用程序和 Python 应用程序的详细信息略有不同。
For Scala and Java applications, if you are using SBT or Maven for project management, then package
spark-streaming-kafka-0-8_2.12and its dependencies into the application JAR. Make surespark-core_2.12andspark-streaming_2.12are marked asprovideddependencies as those are already present in a Spark installation. Then usespark-submitto launch your application (see Deploying section in the main programming guide).对于 Scala 和 Java 应用程序,如果您使用 SBT 或 Maven 进行项目管理,则
spark-streaming-kafka-0-8_2.12及其依赖项打包到应用程序 JAR 中。确保spark-core_2.12和spark-streaming_2.12标记provided依赖项,因为这些依赖项已存在于 Spark 安装中。然后使用spark-submit来启动应用程序(请参阅主编程指南中的部署部分(see Deploying section)。
For Python applications which lack SBT/Maven project management,
spark-streaming-kafka-0-8_2.12and its dependencies can be directly added tospark-submitusing--packages(see Application Submission Guide). That is,对于缺乏 SBT/Maven 项目管理的 Python 应用程序,可以直接将
spark-streaming-kafka-0-8_2.12及其依赖项用--packages添加到spark-submit(see Application Submission Guide)。如下./bin/spark-submit --packages org.apache.spark:spark-streaming-kafka-0-8_2.12:2.4.5 ...Alternatively, you can also download the JAR of the Maven artifact
spark-streaming-kafka-0-8-assemblyfrom the [Maven repository](https://search.maven.org/#search|ga|1|a%3A"spark-streaming-kafka-0-8-assembly_2.12" AND v%3A"2.4.5") and add it tospark-submitwith--jars.