如何使用 Apache Flink 查询 Pulsar 流

原作者: Sijie Guo、Markos Sfikas
翻译:StreamNative-Sijia

在之前的博客中,我们介绍了 Apache Pulsar 及其与其他消息系统的不同之处,并讲解了如何融合 Pulsar 和 Flink 协同工作,为大规模弹性数据处理提供无缝的开发人员体验。本文将介绍 Apache Pulsar 和 Apache Flink 的集成和最新研发进展,并详细说明如何利用 Pulsar 内置 schema,使用 Apache Flink 实时查询 Pulsar 流。

Apache Pulsar 简介

Apache Pulsar 是一个灵活的发布/订阅消息系统,支持持久日志存储。Pulsar 的架构优势包括多租户、统一消息模型、结构化事件流、云原生架构等,这些优势让 Pulsar 能够完美适用于多种用户场景,从计费、支付、交易服务到融合组织中不同的消息架构。更多关于 Pulsar 的信息,点击 Apache Pulsar documentation 或通过 Slack 与 Pulsar 社区联系。

现有 Pulsar & Flink 集成(Apache Flink 1.6+)

在现有的 Pulsar 和 Flink 集成中,Pulsar 作为 Flink 应用程序中的消息队列来使用。Flink 开发人员可以选择特定 Pulsar source,并连接到所需的 Puslar 集群和 topic,将 Pulsar 用作 Flink 的流 source 和流 sink:

// create and configure Pulsar consumer
PulsarSourceBuilder<String>builder = PulsarSourceBuilder  
  .builder(new SimpleStringSchema()) 
  .serviceUrl(serviceUrl)
  .topic(inputTopic)
  .subsciptionName(subscription);
SourceFunction<String> src = builder.build();
// ingest DataStream with Pulsar consumer
DataStream<String> words = env.addSource(src);

然后,Pulsar 流可以连接到 Flink 的处理逻辑。

// perform computation on DataStream (here a simple WordCount)
DataStream<WordWithCount> wc = words
  .flatmap((FlatMapFunction<String, WordWithCount>) (word, collector) -> {
    collector.collect(new WordWithCount(word, 1));
  })
 
  .returns(WordWithCount.class)
  .keyBy("word")
  .timeWindow(Time.seconds(5))
  .reduce((ReduceFunction<WordWithCount>) (c1, c2) ->
    new WordWithCount(c1.word, c1.count + c2.count));

然后通过 sink 将数据写出到 Pulsar。

// emit result via Pulsar producer 
wc.addSink(new FlinkPulsarProducer<>(
  serviceUrl,
  outputTopic,
  new AuthentificationDisabled(),
  wordWithCount -> wordWithCount.toString().getBytes(UTF_8),
  wordWithCount -> wordWithCount.word)
);

对于集成而言,这是重要的第一步,但现有设计还不足以充分利用 Pulsar 的全部功能。Pulsar 与 Flink 1.6.0 的集成中有一些不足,包括:既没有作为持久存储来使用,也没有与 Flink 进行 schema 集成,导致在为应用程序 schema 注册添加描述时,需要手动输入。

Pulsar 与 Flink 1.9 的集成:将 Pulsar 用作 Flink catalog

Flink 1.9.0 与 Pulsar 的最新集成解决了前面提到的问题。阿里巴巴 Blink 对 Flink 仓库的贡献不仅强化了处理架构,还增加了新功能,使得 Flink 与 Pulsar 的集成更强大有效。在新 connector 的实现中引入了 Pulsar schema 集成,增加了对 Table API 的支持,同时提供了 exactly-once 语义的 Pulsar 读与 at-least-once 语义的 Pulsar 写。并且,通过 schema 集成,Pulsar 可以注册为 Flink catalog,只需几个命令就可以在 Pulsar 流上运行 Flink 查询。下面我们将详细介绍新的集成,并举例说明如何使用 Flink SQL 查询 Pulsar 流。

利用 Flink <> Pulsar Schema 集成

在展开集成细节与具体的使用方法之前,我们先来看一下 Pulsar schema 是怎么工作的。Apache Pulsar 内置对 Schema 的支持,无须额外管理 schema。Pulsar 的数据 schema 与每个 topic 相关联,因此,producer 和 consumer 都可以使用预定义 schema 信息发送数据,而 broker 可以验证 schema ,并在兼容性检查中管理 schema 多版本化和 schema 演化。

下面分别是 Pulsar schema 用于 producer 和 consumer 的示例。在 producer 端,可以指定使用 schema,并且 Pulsar 无需执行序列化/反序列化,就可以发送一个 POJO 类。类似地,在 consumer 端,也可以指定数据 schema,并且在接收到数据后,Pulsar 会立即自动验证 schema 信息,获取给定版本的 schema,然后将数据反序列化到 POJO 结构。Pulsar 在 topic 的元数据中存储 schema 信息。

// Create producer with Struct schema and send messages
Producer<User> producer = client.newProducer(Schema.AVRO(User.class)).create();
producer.newMessage()
  .value(User.builder()
    .userName(“pulsar-user”)
    .userId(1L)
    .build())
  .send();
// Create consumer with Struct schema and receive messages
Consumer<User> consumer = client.newCOnsumer(Schema.AVRO(User.class)).create();
consumer.receive();

假设一个应用程序对 producer 和/或 consumer 指定 schema。在接收到 schema 信息时,连接到 broker 的 producer(或 consumer)传输此类信息,以便 broker 在返回或拒绝该 schema 前注册 schema、验证 schema,并检查 schema 兼容性,如下图所示:

image

Pulsar 不仅可以处理并存储 schema 信息,还可以在必要时处理 schema 演化(schema evolution)。Pulsar 能够有效管理 broker 中的 schema 演化,在必要的兼容性检查中,追踪 schema 的所有版本。

另外,当消息发布在 producer 端时,Pulsar 会在消息的元数据中标记 schema 版本;当 consumer 接收到消息,并完成反序列化元数据时,Pulsar 将会检查与此消息相关联的 schema 版本,并从 broker 中获取 schema 信息。因此,当 Pulsar 与 Flink 应用集成时,Pulsar 使用预先存在的 schema 信息,并将带有 schema 信息的单个消息映射到 Flink 类型系统的不同行中。

当 Flink 用户不直接与 schema 交互或不使用原始 schema(primitive schema)时(例如,用 topic 来存储字符串或长数值),Pulsar 会转换消息到 Flink 行,即“值”;或者在结构化的 schema 类型(例如,JSON 和 AVRO)中,Pulsar 从 schema 信息中提取单个字段信息,并将字段映射到 Flink 的类型系统。最后,所有与消息相关的元数据信息(例如,消息密钥、topic、发布时间、事件时间等)都会转换到 Flink 行中的元数据字段。以下是使用原始 schema 和结构化 schema 的两个示例,解释了如何将数据从 Pulsar topic 转换到 Flink 类型系统。

原始 schema(Primitive Schema):

root
|-- value: DOUBLE
|-- __key: BYTES
|-- __topic: STRING
|-- __messageId: BYTES
|-- __publishTime: TIMESTAMP(3)
|-- __eventTime: TIMESTAMP(3)

结构化 schema(Avor Schema):

@Data
@AllArgsConstructor
@NoArgsConstructor
public static class Foo {
    public int i;
    public float f;
    public Bar bar;
}
@Data
@AllArgsConstructor
@NoArgsConstructor
public static class Bar {
    public boolean b;
    public String s;
}
Schema s = Schema.AVRO(Foo.getClass());
root
 |-- i: INT
 |-- f: FLOAT
 |-- bar: ROW<`b` BOOLEAN, `s` STRING>
 |-- __key: BYTES
 |-- __topic: STRING
 |-- __messageId: BYTES
 |-- __publishTime: TIMESTAMP(3)
 |-- __eventTime: TIMESTAMP(3)

当所有 schema 信息都映射到 Flink 类型系统时,就可以在 Flink 中根据指定 schema 信息构建 Pulsar source、sink 或 catalog,如下所示:

Flink & Pulsar: 从 Pulsar 读取数据

  • 创建用于流查询的 Pulsar source
val env = StreamExecutionEnvironment.getExecutionEnvironment
val props = new Properties()
props.setProperty("service.url", "pulsar://...")
props.setProperty("admin.url", "http://...")
props.setProperty("partitionDiscoveryIntervalMillis", "5000")
props.setProperty("startingOffsets", "earliest")
props.setProperty("topic", "test-source-topic")
val source = new FlinkPulsarSource(props)
// you don't need to provide a type information to addSource since FlinkPulsarSource is ResultTypeQueryable
val dataStream = env.addSource(source)(null)
// chain operations on dataStream of Row and sink the output
// end method chaining
env.execute()
  • 将 Pusar 中的 topic 注册为 streaming tables
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = StreamTableEnvironment.create(env)
val prop = new Properties()
prop.setProperty("service.url", serviceUrl)
prop.setProperty("admin.url", adminUrl)
prop.setProperty("flushOnCheckpoint", "true")
prop.setProperty("failOnWrite", "true")
props.setProperty("topic", "test-sink-topic")
tEnv
  .connect(new Pulsar().properties(props))
  .inAppendMode()
  .registerTableSource("sink-table")
val sql = "INSERT INTO sink-table ....."
tEnv.sqlUpdate(sql)
env.execute()

Flink & Pulsar:向 Pulsar 写入数据

  • 创建用于流查询的 Pulsar sink
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = .....
val prop = new Properties()
prop.setProperty("service.url", serviceUrl)
prop.setProperty("admin.url", adminUrl)
prop.setProperty("flushOnCheckpoint", "true")
prop.setProperty("failOnWrite", "true")
props.setProperty("topic", "test-sink-topic")
stream.addSink(new FlinkPulsarSink(prop, DummyTopicKeyExtractor))
env.execute()
  • 向 Pulsar 写入 streaming table
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = StreamTableEnvironment.create(env)
val prop = new Properties()
prop.setProperty("service.url", serviceUrl)
prop.setProperty("admin.url", adminUrl)
prop.setProperty("flushOnCheckpoint", "true")
prop.setProperty("failOnWrite", "true")
props.setProperty("topic", "test-sink-topic")
tEnv
  .connect(new Pulsar().properties(props))
  .inAppendMode()
  .registerTableSource("sink-table")
val sql = "INSERT INTO sink-table ....."
tEnv.sqlUpdate(sql)
env.execute()

在以上示例中,Flink 开发人员都无需担心 schema 注册、序列化/反序列化,并将 Pulsar 集群注册为 Flink 中的 source、sink 或 streaming table。当这三个要素同时存在时,Pulsar 会被注册为 Flink 中的 catalog,这可以极大简化数据处理与查询,例如,编写程序从 Pulsar 查询数据,使用 Table API 和 SQL 查询 Pulsar 数据流等。

未来计划

Pulsar 与 Flink 集成的目标在于简化如何同时使用这两个框架来构建统一的数据处理堆栈,为开发人员提供便利。相比经典的 Lamda 架构(在线的高速层与离线的批处理层相结合,共同运行数据计算),Flink 和 Pulsar 的组合提供了真正统一的数据处理堆栈。Flink 作为统一的计算引擎,处理在线(流)和离线(批)工作负载,而 Pulsar 作为统一数据处理堆栈的统一数据存储层,简化了开发人员的工作。

在改进集成的道路上仍有很多工作要做,例如,能够利用 Pulsar connector 对 Flink 社区的贡献的新 source API(FLIP-27),Pulsar 中允许有效 source 并行扩展的 Key_Shared 订阅类型等。除此之外,可以改进的方向还包括:端到端的 exactly-once 保证(目前只能在 Pulsar source 中使用,不能在 Pulsar sink 中使用),以及将 Pulsar/BookKeeper 用作 Flink 状态后端等。

点击此处观看关于 Flink 和 Pulsar 集成发展的详细介绍,此视频来自 Flink Forward Europe 2019,也可以订阅 Flink 开发邮件列表,获取关于 Flink 和 Pulsar 贡献与集成工作的最新消息。
想要随时掌握 Pulsar 的研发进展、用户案例和热点话题吗?快来关注 Apache Pulsar 和 StreamNative 微信公众号,我们会在第一时间和您分享 Pulsar 的一切。

原文链接:
https://flink.apache.org/news/2019/11/25/query-pulsar-streams-using-apache-flink.html

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,254评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,875评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,682评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,896评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,015评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,152评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,208评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,962评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,388评论 1 304
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,700评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,867评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,551评论 4 335
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,186评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,901评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,142评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,689评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,757评论 2 351

推荐阅读更多精彩内容