Kafka核心API——Stream API

Kafka Stream概念及初识高层架构图

Kafka Stream是Apache Kafka从0.10版本引入的一个新Feature,它提供了对存储于Kafka内的数据进行流式处理和分析的功能。简而言之,Kafka Stream就是一个用来做流计算的类库,与Storm、Spark Streaming、Flink的作用类似,但要轻量得多。

Kafka Stream的基本概念:

  • Kafka Stream是处理分析存储在Kafka数据的客户端程序库(lib)
  • 由于Kafka Streams是Kafka的一个lib,所以实现的程序不依赖单独的环境
  • Kafka Stream通过state store可以实现高效的状态操作
  • 支持原语Processor和高层抽象DSL

Kafka Stream的高层架构图:

image.png

  • Partition的数据会分发到不同的Task上,Task主要是用来做流式的并行处理
  • 每个Task都会有自己的state store去记录状态
  • 每个Thread里会有多个Task

Kafka Stream 核心概念

Kafka Stream关键词:

  • 流和流处理器:流指的是数据流,流处理器指的是数据流到某个节点时对其进行处理的单元
  • 流处理拓扑:一个拓扑图,该拓扑图展示了数据流的走向,以及流处理器的节点位置
  • 源处理器及Sink处理器:源处理器指的是数据的源头,即第一个处理器,Sink处理器则反之,是最终产出结果的一个处理器

如下图所示:


image.png

Kafka Stream使用演示

下图是Kafka Stream完整的高层架构图:

image.png

从上图中可以看到,Consumer对一组Partition进行消费,这组Partition可以在一个Topic中或多个Topic中。然后形成数据流,经过各个流处理器后最终通过Producer输出到一组Partition中,同样这组Partition也可以在一个Topic中或多个Topic中。这个过程就是数据流的输入和输出。

因此,我们在使用Stream API前需要先创建两个Topic,一个作为输入,一个作为输出。到服务器上使用命令行创建两个Topic:

[root@txy-server2 ~]# kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic input-topic
[root@txy-server2 ~]# kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic output-topic

由于之前依赖的kafka-clients包中没有Stream API,所以需要另外引入Stream的依赖包。在项目中添加如下依赖:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams</artifactId>
    <version>2.5.0</version>
</dependency>

接下来以一个经典的词频统计为例,演示一下Stream API的使用。代码示例:

package com.zj.study.kafka.stream;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Produced;

import java.util.List;
import java.util.Properties;

public class StreamSample {

    private static final String INPUT_TOPIC = "input-topic";
    private static final String OUTPUT_TOPIC = "output-topic";

    /**
     * 构建配置属性
     */
    public static Properties getProperties() {
        Properties properties = new Properties();
        properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "49.232.153.84:9092");
        properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-app");
        properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

        return properties;
    }

    public static KafkaStreams createKafkaStreams() {
        Properties properties = getProperties();

        // 构建流结构拓扑
        StreamsBuilder builder = new StreamsBuilder();
        // 构建wordCount这个Processor
        wordCountStream(builder);
        Topology topology = builder.build();

        // 构建KafkaStreams
        return new KafkaStreams(topology, properties);
    }

    /**
     * 定义流计算过程
     * 例子为词频统计
     */
    public static void wordCountStream(StreamsBuilder builder) {
        // 不断的从INPUT_TOPIC上获取新的数据,并追加到流上的一个抽象对象
        KStream<String, String> source = builder.stream(INPUT_TOPIC);
        // KTable是数据集的抽象对象
        KTable<String, Long> count = source.flatMapValues(
                // 以空格为分隔符将字符串进行拆分
                v -> List.of(v.toLowerCase().split(" "))
                // 按value进行分组统计
        ).groupBy((k, v) -> v).count();

        KStream<String, Long> sink = count.toStream();
        // 将统计结果输出到OUTPUT_TOPIC
        sink.to(OUTPUT_TOPIC, Produced.with(Serdes.String(), Serdes.Long()));
    }

    public static void main(String[] args) {
        KafkaStreams streams = createKafkaStreams();
        // 启动该Stream
        streams.start();
    }
}

KTableKStream的关系与区别,如下图:

image.png

  • KTable类似于一个时间片段,在一个时间片段内输入的数据就会update进去,以这样的形式来维护这张表
  • KStream则没有update这个概念,而是不断的追加

运行以上代码,然后到服务器中使用kafka-console-producer.sh脚本命令向input-topic生产一些数据,如下:

[root@txy-server2 ~]# kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic input-topic
>Hello World Java
>Hello World Kafka
>Hello Java Kafka
>Hello Java

然后再运行kafka-console-consumer.sh脚本命令从output-topic中消费数据,并进行打印。具体如下:

[root@txy-server2 ~]# kafka-console-consumer.sh --bootstrap-server 172.21.0.10:9092 --topic output-topic --property print.key=true --property print.value=true --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer --from-beginning

控制台输出的结果:

world   2
hello   3
java    2
kafka   2
hello   4
java    3

从输出结果中可以看到,Kafka Stream首先是对前三行语句进行了一次词频统计,所以前半段是:

world   2
hello   3
java    2
kafka   2

当最后一行输入之后,又再做了一次词频统计,并针对新的统计结果进行输出,其他没有变化的则不作输出,所以最后打印了:

hello   4
java    3

这也是KTableKStream的一个体现,从测试的结果可以看出Kafka Stream是实时进行流计算的,并且每次只会针对有变化的内容进行输出。


foreach方法

在之前的例子中,我们是从某个Topic读取数据进行流处理后再输出到另一个Topic里。但在一些场景下,我们可能不希望将结果数据输出到Topic,而是写入到一些存储服务中,例如ElasticSearch、MongoDB、MySQL等。

在这种场景下,就可以利用到foreach方法,该方法用于迭代流中的元素。我们可以在foreach中将数据存入例如Map、List等容器,然后再批量写入到数据库或其他存储中间件即可。

foreach方法使用示例:

public static void foreachStream(StreamsBuilder builder) {
    KStream<String, String> source = builder.stream(INPUT_TOPIC);
    source.flatMapValues(
            v -> List.of(v.toLowerCase().split(" "))
    ).foreach((k, v) -> System.out.println(k + " : " + v));
}
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 220,492评论 6 513
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 94,048评论 3 396
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 166,927评论 0 358
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 59,293评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 68,309评论 6 397
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 52,024评论 1 308
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,638评论 3 420
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,546评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 46,073评论 1 319
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,188评论 3 340
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,321评论 1 352
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,998评论 5 347
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,678评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,186评论 0 23
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,303评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,663评论 3 375
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,330评论 2 358