WordCount
WordCount 堪称大数据界的HelloWorld,相信不管是Hadoop还是Spark等大数据工具的上手实例,第一个十有八九是WordCount。
Kafka Stream也不例外。作为集成在Kafka消息系统上的数据实时处理接口,WordCount也可以作为一个很好的入门实例。
实际上,Kafka官方已经提供了WordCount的Demo,org.apache.kafka.streams.examples.wordcount.WordCountDemo
,但亲手实现一遍可以帮助我们快速入门。
逻辑流程
需要记住的是,Kafka中的数据都以<key, value>的形式存在。
假设我们的Kafka中,已经存在一个topic,其中的数据来自于一个文本文件。我们希望编写一个Kafka Streams Application对此topic中的数据进行WordCount计算,大概步骤可以分解为:
Stream 从源topic中取出每一行数据记录 (<key, value>格式) ---- <null, "Hello World hello">
MapValue 将value中所有文本转换成小写形式 ---- <null, "hello world hello">
FlatMapValues 按空格分解成单词 ---- <null, “hello”>,<null, “world”>, <null, “hello”>
SelectKey 将value的值赋给key ---- <"hello", “hello”>,<“world”, “world”>, <“hello”, “hello”>
GroupByKey 按相同的Key分组 ---- (<"hello", “hello”>, <"hello, “hello”>),(<"world", “world”>)
Count 计算每个组中元素个数 ---- <"hello", 2>,<"world", 1>
To 将结果返回Kafka
逻辑代码
首先进行配置,包括Kafka Streams Application的ID,Kafka集群位置等:
Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "word-count");
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
这里使用Kafka Streams DSL。DSL提供的各种算子大部分情况下都可以满足需求。
使用DSL:
StreamsBuilder builder = new StreamsBuilder();
从Kafka源topic获取数据流:
KStream<String, String> textLines = builder.stream("streams-plaintext-input");
KStream即代表了由各个数据记录组成的数据流。
KStream可以从一或更多topic中的数据得来。
KStream可以进行对数据记录的逐条转换,和其它KStream,KTable进行join操作,或aggregate成KTable。
对得到的KStream进行transformation和aggregation:
将数据记录中的大写全部替换成小写:
.mapValues(textLine -> textLine.toLowerCase())
将各行数据按空格拆分:
.flatMapValues(textLine -> Arrays.asList(textLine.split(" ")))
将value作为新的key:
.selectKey((key, word) -> word)
aggregation操作前group by key:
.groupByKey()
计算每个组中的元素个数:
.count(Materialized.as("Counts"));
得到结果后将其存储为KTable:
KTable<String, Long> wordCounts = textLines
.mapValues ...
最后导入目标topic,其中key为String,value为Long。
wordCounts.toStream().to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));
KafkaStreams streams = new KafkaStreams(builder.build(), config);
streams.start();