场景
我们在写CEP规则的时候动态的逻辑大致可以分为三个部分:
1、通过条件过滤数据
2、事件链(Pattern)
3、每条事件匹配的条件(Condition)
所以可以构想开发一个通用的Job,然后通过一个配置文件来达到规则的触发。
设计
1、首先为了方便处理数据,我们先把数据都转成JSONObject,方便后续直接使用
2、所有涉及到条件判断的部分,我们都可以使用Jexl来实现(关于jexl的使用我已经有过介绍了)
3、整个事件链的描述需要定义一套DSL语言然后来解析(前端可以实现可视化的配置)
实现
1、条件的过滤
public class RuleFilter extends RichFilterFunction<JSONObject> {
private Jexl jexl;
/**
* 规则表达式
*/
private String express;
public RuleFilter(String express) {
this.express = express;
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
jexl = new Jexl();
}
@Override
public boolean filter(JSONObject jsonObj) throws Exception {
if(jsonObj == null){
return false;
}
return jexl.evaluateBoolean(express, jsonObj);
}
}
这里的Jexl是我对jexl的封装
2、事件条件的判断
public class RuleCondition extends SimpleCondition<JSONObject> {
private String express;
private Jexl jexl;
public RuleCondition(String express) {
this.express = express;
this.jexl = new Jexl();
}
@Override
public boolean filter(JSONObject jsonObject) throws Exception {
if(jsonObject == null){
return false;
}
return jexl.evaluateBoolean(express, jsonObject);
}
}
提示:
功能FLINK CEP SQL本事是实现的,这里主要是探讨Flink SQL大致实现的过程