场景:
数据流向:kafka-->spark-->es。
其中spark数据处理会有过滤,etl等步骤,需求不同,搭配不同。如A场景需要过滤+etl;B场景需要过滤+Etl1+etl2;C场景需要etl2+etl3。不同的合成有不同的功能。
实现:使用合成模式。
定义:
接口ChildProcess,作为合成子项的统一接口:
public interface ChildProcess {
public Dataset doChildProcess() throws Exception;
}
组合类:
public class CompositeChildProcess implements ChildProcess,Serializable {
private List<ChildProcess> childProcesses = new ArrayList<>();
public void add(ChildProcess childProcess) {
childProcesses.add(childProcess);
}
public void remove(ChildProcess childProcess) {
childProcesses.remove(childProcess);
}
@Override
public Dataset doChildProcess(Dataset dataset) throws Exception{
for (ChildProcess childProcess : childProcesses) {
dataset = childProcess.doChildProcess(dataset);
}
return dataset;
}
}
elt合成子项:
public class CustomEtlProcess implements ChildProcess {
@Override
public Dataset doChildProcess(Dataset dataset) throws Exception{
....
return etl;
}
}
filter合成子项:
public class CustomkafkaFilterProcess implements ChildProcess,Serializable {
@Override
public Dataset doChildProcess(Dataset dataset) throws Exception{
....
return filter;
}
}
client调用端:
如kafka-filter-etl-es过程
CompositeChildProcess compositeChildProcess = new CompositeChildProcess();
CustomkafkaFilterProcess customkafkaFilterProcess = new CustomkafkaFilterProcess();
compositeChildProcess.add(customkafkaFilterProcess);
CustomEtlProcess customEtlProcess = new CustomEtlProcess();
compositeChildProcess.add(customEtlProcess);
compositeChildProcess.doChildProcess();
如kafka-etl-filter-etl-es过程:
CompositeChildProcess compositeChildProcess = new CompositeChildProcess();
CustomEtlProcess customEtlProcess = new CustomEtlProcess();
compositeChildProcess.add(customEtlProcess);
CustomkafkaFilterProcess customkafkaFilterProcess = new CustomkafkaFilterProcess();
compositeChildProcess.add(customkafkaFilterProcess);
CustomEtlProcess customEtlProcess = new CustomEtlProcess();
compositeChildProcess.add(customEtlProcess);
compositeChildProcess.doChildProcess();
由于所以子项,包含组合类都实现了子项接口,因此可有任意组合和多层级组合,实现灵活多变的流程控制。