启动一个单节点 kafka:
bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties
我们创建了两个topic,一个是输入topic,另一个是输出topic:
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic streams-plaintext-input
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic streams-plaintext-output
执行自带的 wordcount 示例
bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountDemo
会读取 streams-plaintext-input,为读取的每条消息执行 Word Count 程序的转换计算逻辑,然后持续地把处理结果固定写入 streams-wordcount-output 中。
生产消息:
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-plaintext-input
再开启一个终端,运行console consumer脚本来验证Word Count程序的计算结果:
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
--topic streams-wordcount-output \
--from-beginning \
--formatter kafka.tools.DefaultMessageFormatter \
--property print.key=true \
--property print.value=true \
--property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
--property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
再producer终端发送消息:
>hello world
>kafka streams example
>kafka topics
>streams word count
>new message
>new record
consumer终端会输出统计结果:
hello 1
world 1
kafka 1
streams 1
example 1
kafka 2
topics 1
streams 2
word 1
count 1
new 1
message 1
new 2
record 1