简介理解
传统的批处理注重结果(例如将一批的数据进行处理, 最终的处理结果才是最终想要的),而流注重变化,每一个输入都会对应输出,这种行为的不同,造成了批处理的相应较慢(或者说是需要等待很长时间),而流处理的相应较快。
Kafka Streams 是一个库。一个基于Kafka的构建流处理程序的库,将流处理变得更为简单,特别是其输入一个Topic,输出是另一个Topic的程序。Kafka Streams不依赖于集群和框架,只是一个库,只需要Kafka和相关的处理代码,Kafka会去协调程序处理代码。
Demo代码
- Main入口
public class KafkaStreamsDemo {
public static void main(String[] args) {
Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "hellow_stream_demo");
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
StreamsBuilder builder = new StreamsBuilder();
// 注册一个stream 并订阅topic
KStream<String, String> stream1 = builder.stream("HelloWorld");
// 转换方法1:flatMapValues
KStream<String, String> stream2 = stream1.
flatMapValues(new ValueMapper<String,Iterable<String>>(){
@Override
public Iterable<String> apply(String s) {
List<String> list = Lists.newArrayList("flatMapValues::"+s);
return list;
}
});
stream2.to("hellow_stream_out");
// 转换方法2:transform 进行转换
KStream<String, String> stream3 = stream1.transform(new KafkaDemoTransformSupplier());
stream3.to("hellow_stream_out_2");
KafkaStreams streams = new KafkaStreams(builder.build(), config);
streams.start();
}
}
- Supplier类
public class KafkaDemoTransformSupplier implements TransformerSupplier<String, String, KeyValue<String,String>> {
@Override
public Transformer<String, String, KeyValue<String, String>> get() {
return new KafkaDemoTransformer();
}
}
- transformer类
public class KafkaDemoTransformer implements Transformer<String,String, KeyValue<String, String>> {
private ProcessorContext processorContext;
@Override
public void init(ProcessorContext processorContext) {
this.processorContext = processorContext;
}
@Override
public KeyValue<String, String> transform(String key, String value) {
String str = "transfer::"+value;
// 此处开启,会推送两遍消息
// processorContext.forward(key,str);
return new KeyValue<>(key,str);
}
@Override
public void close() {
}
}