Spark Structured Streaming 与Kafka的整合

Structured Streaming 与0.10及以上版本的Kafka整合来对Kafka中的读书进行读取和写入操作。

Linking

对于使用SBT/Maven定义的Scala/Java应用程序,请将你的应用程序与如下的artifact相连接:

groupId = org.apache.spark
artifactId = spark-sql-kafka-0-10_2.11
version = 2.2.0

对于Python引用程序,你需要在发布应用程序时添加上述的库及其依赖,详情请参考下面的发布模块介绍。

从Kafka中读取数据

为流式查询创建一个Kafka Source

Scala 代码:

// 订阅一个topic
val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

// 订阅多个topic
val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1,topic2")
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

// 订阅满足一定正则式的topic
val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribePattern", "topic.*")
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

Java代码:

// 订阅一个topic
DataFrame<Row> df = spark
  .readStream()
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

// 订阅多个topic
DataFrame<Row> df = spark
  .readStream()
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1,topic2")
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

// 订阅满足一定正则式的topic
DataFrame<Row> df = spark
  .readStream()
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribePattern", "topic.*")
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

Python代码:

# 订阅一个topic
df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("subscribe", "topic1") \
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

# 订阅多个topic
df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("subscribe", "topic1,topic2") \
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

# 订阅满足一定正则式的topic
df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("subscribePattern", "topic.*") \
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

为批量查询定义一个Kafak Source

如果你的用例更适用于批处理的话,你可以根据既定的offset范围来创建一个DataSet/DataFrame
Scala代码:

// 订阅一个topic,默认从topic最早的offset到最近的offset
val df = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

// 订阅多个topic,并指定每个topic的订阅范围
val df = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1,topic2")
  .option("startingOffsets", """{"topic1":{"0":23,"1":-2},"topic2":{"0":-2}}""")
  .option("endingOffsets", """{"topic1":{"0":50,"1":-1},"topic2":{"0":-1}}""")
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

// 订阅满足一定正则式的topic,默认从topic最早的offset到最近的offset
val df = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribePattern", "topic.*")
  .option("startingOffsets", "earliest")
  .option("endingOffsets", "latest")
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

Java代码:

// 订阅一个topic,默认从topic最早的offset到最近的offset
DataFrame<Row> df = spark
  .read()
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load();
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");

// 订阅多个topic,并指定每个topic的订阅范围
DataFrame<Row> df = spark
  .read()
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1,topic2")
  .option("startingOffsets", "{\"topic1\":{\"0\":23,\"1\":-2},\"topic2\":{\"0\":-2}}")
  .option("endingOffsets", "{\"topic1\":{\"0\":50,\"1\":-1},\"topic2\":{\"0\":-1}}")
  .load();
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");

// 订阅满足一定正则式的topic,默认从topic最早的offset到最近的offset
DataFrame<Row> df = spark
  .read()
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribePattern", "topic.*")
  .option("startingOffsets", "earliest")
  .option("endingOffsets", "latest")
  .load();
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");

Python代码:

# 订阅一个topic,默认从topic最早的offset到最近的offset
df = spark \
  .read \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("subscribe", "topic1") \
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

# 订阅多个topic,并指定每个topic的订阅范围
df = spark \
  .read \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("subscribe", "topic1,topic2") \
  .option("startingOffsets", """{"topic1":{"0":23,"1":-2},"topic2":{"0":-2}}""") \
  .option("endingOffsets", """{"topic1":{"0":50,"1":-1},"topic2":{"0":-1}}""") \
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

# 订阅满足一定正则式的topic,默认从topic最早的offset到最近的offset
df = spark \
  .read \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("subscribePattern", "topic.*") \
  .option("startingOffsets", "earliest") \
  .option("endingOffsets", "latest") \
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

Source中的每一行都遵循下列模式:

Column Type
key binary
value binary
topic string
partition int
offset long
timestamp long
timestampType int

不论是批处理还是流式处理都必须为Kafka Source设置如下选项:

选项 意义
assign json值{"topicA":[0,1],"topicB":[2,4]} 指定消费的TopicPartition,Kafka Source只能指定"assign","subscribe","subscribePattern"选项中的一个
subscribe 一个以逗号隔开的topic列表 订阅的topic列表,Kafka Source只能指定"assign","subscribe","subscribePattern"选项中的一个
subscribePattern Java正则表达式 订阅的topic列表的正则式,Kafka Source只能指定"assign","subscribe","subscribePattern"选项中的一个
kafka.bootstrap.servers 以逗号隔开的host:port列表 Kafka的"bootstrap.servers"配置

下面的配置是可选的:

选项 默认值 支持的查询类型 意义
startingOffsets "earliest","lates"(仅streaming支持);或者json 字符"""{"topicA":{"0":23,"1":-1},"TopicB":{"0":-2}}""" 对于流式处理来说是"latest",对于批处理来说是"earliest" streaming和batch 查询开始的位置可以是"earliest"(从最早的位置开始),"latest"(从最新的位置开始),或者通过一个json为每个TopicPartition指定开始的offset。通过Json指定的话,json中-2可以用于表示earliest,-1可以用于表示latest。注意:对于批处理而言,latest值不允许使用的。
endingOffsets latest or json string{"topicA":{"0":23,"1":-1},"topicB":{"0":-1}} latest batch 一个批查询的结束位置,可以是"latest",即最近的offset,或者通过json来为每个TopicPartition指定一个结束位置,在json中,-1表示latest,而-2是不允许使用的
failOnDataLoss true or false true streaming query 数据丢失之后(topic被删除,或者offset不在可用范围内时)查询是否失败,这可能会引起一个告警,如果你觉得不适用于你的应用程序时,你可以禁用掉。批查询如果在读出数据的时候发现数据丢失了总会失败。
kafkaConsumer.pollTimeoutMs long 512 streaming 和batch executor轮询kafka中的数据的超时时间
fetchOffset.numRetries int 3 streaming 和batch 获取Kafka offset的重试次数
fetchOffset.retryIntervalMs long 10 获取Kafka offset的时间间隔
maxOffsetPerTrigger long none streaming 和batch 不想翻译了。。。

写数据到Kafka

这里我们讨论对鞋流式查询或者批查询数据到Apache Kafka的支持,请注意Apache Kafka只支持至少一次语义,所以当写流式查询数据或者批查询数据到Kafka时,有些记录可能会重复,这是有可能发生的,例如,kafka需要重新获取还未被一个Broker识别的消息记录,即使这条消息已经被Broker接收并将消息写入记录中了。由于Kafka本身的这些写语义,Structured Streaming无法避免写入的重复记录的发生。如果一个查询的写入成功,你就可以认为是至少写入了一次,一个删除写入重复记录的解决方案就是引入一个唯一主键,这就可以在读取时执行去重操作了。

Column Type
key(可选) string或者binary
value(必选) string或者binary
topic(*可选) string

写入Kafka中的DataFrame必须遵循如下格式:

Column Type
key(可选) string或者binary
value(必选) string或者binary
topic(*可选) string

注意:如果topic配置项没有指定的话,topic列是需要指定的
value列是唯一必选的选项,如果key列没有指定的话,系统会指定为null。如果topic列指定的话,会将给定的数据写入到Kafak中对应的topic中,除非"topic"配置项已经指定,如果配置项已经指定的话,配置项中的配置会覆盖掉topic列中的配置。

选项 意义
kafka.bootstrap.server 逗号分隔的host:port列表 Kafka的"bootstrap.servers"配置

无论是批查询还是流式查询,下面的选项必须得为Kafka sink指定;

选项 意义
kafka.bootstrap.server 逗号分隔的host:port列表 Kafka的"bootstrap.servers"配置
选项 默认值 支持的查询类型 意义
topic string none streaming和batch 设置允许写入所有行到Kafka中的topic列表,这个配置会覆盖数据中存在的topic列

下面的配置是可选的:

选项 默认值 支持的查询类型 意义
topic string none streaming和batch 设置允许写入所有行到Kafka中的topic列表,这个配置会覆盖数据中存在的topic列

为Streaming查询创建一个Kafka Sink

Scala代码:

// 写DataFrame中的key-value数据到option指定的kafka topic中
val ds = df
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("topic", "topic1")
  .start()

// 写DataFrame中的key-value数据到数据中指定的Kafka topic中
val ds = df
  .selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .start()

Java代码:

// 写DataFrame中的key-value数据到option指定的kafka topic中
StreamingQuery ds = df
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .writeStream()
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("topic", "topic1")
  .start()

// 写DataFrame中的key-value数据到数据中指定的Kafka topic中
StreamingQuery ds = df
  .selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
  .writeStream()
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .start()

Python代码:

# 写DataFrame中的key-value数据到option指定的kafka topic中
ds = df \
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
  .writeStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("topic", "topic1") \
  .start()

# 写DataFrame中的key-value数据到数据中指定的Kafka topic中
ds = df \
  .selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)") \
  .writeStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .start()

写Batch查询的结果到Kafka中

Scala代码:

//写DataFrame中的key-value数据到option指定的kafka topic中
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .write
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("topic", "topic1")
  .save()

// 写DataFrame中的key-value数据到数据中指定的Kafka topic中
df.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
  .write
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .save()

Java代码:

// 写DataFrame中的key-value数据到option指定的kafka topic中
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .write()
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("topic", "topic1")
  .save()

// 写DataFrame中的key-value数据到数据中指定的Kafka topic中
df.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
  .write()
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .save()

Python代码:

# 写DataFrame中的key-value数据到option指定的kafka topic中
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
  .write \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("topic", "topic1") \
  .save()

# 写DataFrame中的key-value数据到数据中指定的Kafka topic中
df.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)") \
  .write \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .save()

Kafka特殊配置

Kafka自身的配置可以通过DataStreamReader.option中以kafka.为前缀来指定,如:stream.option("kafka.bootstrap.servers","host:port"),有关Kafka的可配参数,请参阅Kafka Consumer COnfig文档中关于读数据的参数以及Kafka Producer COnfig文档中关于写数据的参数。
注意:下面的Kafka参数是不能设置的,如果设置的话Kafka的Source或者Sink会抛出异常:
group.id: Kafka source will create a unique group id for each query automatically.
auto.offset.reset: Set the source option startingOffsets to specify where to start instead. Structured Streaming manages which offsets are consumed internally, rather than rely on the kafka Consumer to do it. This will ensure that no data is missed when new topics/partitions are dynamically subscribed. Note that startingOffsets only applies when a new streaming query is started, and that resuming will always pick up from where the query left off.
key.deserializer: Keys are always deserialized as byte arrays with ByteArrayDeserializer. Use DataFrame operations to explicitly deserialize the keys.
value.deserializer: Values are always deserialized as byte arrays with ByteArrayDeserializer. Use DataFrame operations to explicitly deserialize the values.
key.serializer: Keys are always serialized with ByteArraySerializer or StringSerializer. Use DataFrame operations to explicitly serialize the keys into either strings or byte arrays.
value.serializer: values are always serialized with ByteArraySerializer or StringSerializer. Use DataFrame oeprations to explicitly serialize the values into either strings or byte arrays.
enable.auto.commit: Kafka source doesn’t commit any offset.
interceptor.classes: Kafka source always read keys and values as byte arrays. It’s not safe to use ConsumerInterceptor as it may break the query.

发布

作为Spark应用程序,通过spark-submit来启动你的应用程序。spark-sql-kafka-0-10_2.11以及它的依赖可以使用--packages添加到'spark-submit'中,如:
./bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0 ...
请参考应用程序提交指南来获取更多关于提交带有外部依赖的应用程序的信息。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,864评论 6 494
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,175评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,401评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,170评论 1 286
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,276评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,364评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,401评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,179评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,604评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,902评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,070评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,751评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,380评论 3 319
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,077评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,312评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,924评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,957评论 2 351

推荐阅读更多精彩内容