一、介绍
通过Kafka Stream编写一个或多个的计算逻辑的处理器拓扑。其中处理器拓扑是一个由流(线)连接的流处理(节点)的图。根据不同的需求,我们可以构造不同的处理器拓扑图去实现某一功能,但如果为每一个功能都自己去构造一个拓扑图,未免使得我们的代码太过冗余。此时,我们就需要开发一个可以根据我们拓扑处理器及需求动态创建拓扑图的工具。
二. 构建一个固定的拓扑
1. 首先加载kafka stream的配置项
private static Map<String, Object> props = new HashMap<>();
//加载配置文件
static {
PropertyReaderUtil reader = new PropertyReaderUtil();
Map<String, String> serverMap = reader.readPropertyFile("kafkaServer.properties");
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-stream-processing-application");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, serverMap.get("bootstrap.servers"));
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG,WallclockTimestampExtractor.class);
}
2.构建固定拓扑处理器图
public static void topology(){
TopologyBuilder builder = new TopologyBuilder();
String parentName = "source";
builder.addSource("source","data");
//KeywordProcessor、ClassifitionProcessor为具体的拓扑处理器
builder.addProcessor("keyword", KeywordProcessor::new,parentName);
builder.addProcessor("classifition", ClassifitionProcessor::new,"keyword");
builder.addSink("sink","output","classifition");
StreamsConfig streamsConfig = new StreamsConfig(props);
KafkaStreams kafkaStreams = new KafkaStreams(builder,streamsConfig);
kafkaStreams.start();
}
三、构建动态拓扑图
1.创建一个类实现ProcessorSupplier
接口
public class ProcessorSupplierFactory implements ProcessorSupplier {
public static final Logger logger = LoggerFactory.getLogger(ProcessorSupplierFactory.class);
private String processorName;
public ProcessorSupplierFactory(String processorName){
this.processorName = processorName;
}
@Override
public Processor get() {
Processor processor = null;
try {
processor = (Processor) Class.forName(processorName).newInstance();
} catch (InstantiationException e) {
logger.error("反射类失败",e);
} catch (IllegalAccessException e) {
logger.error("错误:",e);
} catch (ClassNotFoundException e) {
logger.error("找不到类",e);
}
return processor;
}
}
通过反射以及构造器实现动态创建拓扑处理器(processor)实例。
2.加载kafka stream配置文件
private static Map<String, Object> props = new HashMap<>();
//加载配置文件
static {
PropertyReaderUtil reader = new PropertyReaderUtil();
Map<String, String> serverMap = reader.readPropertyFile("kafkaServer.properties");
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-stream-processing-application");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, serverMap.get("bootstrap.servers"));
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);
}
3.动态构建拓扑处理器图
/**
* 动态构建拓扑
* @param processors 拓扑处理器ClassName集合
*/
public static void dynamicTopology(List<String> processors){
TopologyBuilder builder = new TopologyBuilder();
String parentName = "source";
builder.addSource("source","data");
for (String process :
processors) {
//通过构造器动态创建Processor实例
builder.addProcessor(process, new ProcessorSupplierFactory(process), parentName);
parentName = process;
}
builder.addSink("sink","output",parentName);
StreamsConfig streamsConfig = new StreamsConfig(props);
KafkaStreams kafkaStreams = new KafkaStreams(builder,streamsConfig);
kafkaStreams.start();
}
至此,我们就实现了如何动态的去创建拓扑处理器图的功能。