随着智能设备和传感器系统的不断增加,现在可以获得真实世界中任何元素的数据和上下文信息。物联网(IoT)是这一趋势的代名词,它也成为一种社会意义,因为它影响到日常生活的各个领域。当然,这种趋势为改善我们的生活和公司提供了巨大的可能性。例如,对业务流程的实时洞察近年来变得越来越重要。有了正确的信息,在某些事件之间只有短暂的延迟,运营层面的决策者才能迅速做出反应,并根据流程调整变化。因此,对流程有深入了解的公司可以选择优化业务,为客户提供新的服务级别并增加收益。
几年前人们在物流行业就已经看到了这种必要性。多年以来,大多数物流供应商都会监控货物的位置状态。物流客户想知道货物在哪里,还想知道货物发生了什么。例如,对于制造商来说,易碎品或敏感品损坏的代价可能会非常昂贵,并且因为没有库存而停产。由于这个原因,物流供应商必须收集更多关于货物的更详细的信息。收集卡车、飞机或轮船的GPS坐标是不够的。它需要收集数据包级别上数据,因为关于每批货物的环境条件或移动的信息都是独立的。
这大量的信息必须首先得到处理。一种选择是使用数据仓库和商业智能系统来保存和处理数据。但这种方法反应速度较慢。根据所使用的技术,可能需要数小时才能检测到流程中的事件和差异。另一种分析实时数据流的技术是复杂事件处理(CEP)。基于数据库系统的ECA(event-condition-action)规则,它首先被开发用来检测证券交易中的事件。由于出现了新的数据源,它们也被用来处理RFID或其他传感器数据。但是传统的CEP引擎,如Esper、Drools或Tibco,不能有效地扩展到每秒处理数百万个事件并同时检测这些数据中的复杂事件模式。因此,随着apachestorm、Apache Spark Streaming和Apache Flink等可伸缩流处理框架的兴起,为可伸缩复杂事件处理奠定了基础。虽然基于Spark流的CEP引擎由于Apache Spark的微批处理(例如Stratio Decision[2])保留了相同的性能限制,但基于Storm的框架需要涉及整个系统堆栈(例如WSO2 CEP[3])。现在,Apache Flink框架提供了一个CEP库,因为“[…]它真正的流特性及其低延迟和高吞吐量流处理能力自然适合CEP工作负载。”
莱比锡大学和一些商业伙伴在德国联邦教育和研究部资助的LogiLEIT[5]项目中探讨了这一研究课题。与ScaDS能力中心合作,开发了一种基于大数据技术的可扩展方法,以增强当前的能力。
数据
底层数据通过安装在包裹上的智能手机传感器进行测量。移动设备每15秒发送一次当前状态信息“回家”。传感器事件包含环境信息(例如位置、压力、亮度、加速度、温度、湿度、方位)或传感器本身(例如分辨率、功耗、电池百分比和温度)。
数据通过套接字连接发送,格式如下:
{"_id":"00c4b0-4671359aab5c","latitude":51.39712,"longitude":12.3595,"orientation_l_r":116.648438,"accuracy":9,"bearing":0,"provider":"gps","speed":0,"shipment_number":"BEAVER"}
{"_id":"00c891-1d6dccb2f195","latitude":51.38093,"longitude":12.3762,"orientation_l_r":17.11562,"accuracy":5,"bearing":0,"provider":"gps","speed":0.5,"shipment_number":"BEAVER"}
{"_id":"00cadd-161ae86fbfd1","latitude":51.33808,"longitude":12.3765,"orientation_l_r":167,"accuracy":13,"bearing":16.2,"provider":"gps","speed":0.5,"shipment_number":"BEAVER"}
{"_id":"010359-6521124406ca","latitude":53.54148,"longitude":9.9921,"orientation_l_r":52,"accuracy":16,"bearing":0,"provider":"gps","speed":0,"shipment_number":"EAGLE-1"}
由于Flink提供了到不同消息传递系统(例如Kafka、RabbitMQ)的连接器,因此也可以处理在这些系统上发送的数据流。
连接到套接字流
在开始数据处理和监视之前,必须建立到数据流的连接。在本例中,它是端口9999上的TCP套接字连接。就像在每一个Flink应用程序中一样,执行环境也必须初始化。
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4);
env.enableCheckpointing(15000, CheckpointingMode.EXACTLY_ONCE);
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
SingleOutputStreamOperator<SensorEvent> eventStream = env.socketTextStream("localhost", 9999, '\n').flatMap(new splitMap());
在本例中,并行度设置为4,这意味着所有操作符、数据源和数据接收器都分布在集群中的四个节点上。在第3行中,每隔一秒就会启用和创建检查点。该模式保证每个数据元素只处理一次。第4行定义了内部事件时间的特征。这个配置参数需要给每个传入的数据流元素一个时间戳,例如用于窗口计算。由于在数据流中没有提供时间戳,因此TimeCharacteristic被设置为ProcessingTime,因此每个数据元素都获得第一个处理元素的节点的系统时间。此外,与摄取时间或事件时间模式相比,这种模式在性能和延迟方面是最有效的。在第5行中,最后建立到本地主机上的流服务器的连接,端口9999,并调用一个flatMap函数,该函数将传入的JSON事件转换为SensorEvent的pojo对象。
查找被丢弃的包裹
对于物流供应商以及重要和易碎货物的接收者来说,检测包裹在运输过程中是否被扔是非常重要的。如果一个不可缺少的货物的损坏很快被发现,这个零件可以重新安排订购或生产,并且可以减少停工时间。
为了观察特定模式上的数据流,可能需要定义一个适当的分区策略。这可以通过使用用户定义的策略在底层对数据进行物理分区,或者使用高级keyBy运算符将数据逻辑分离为不相交的数据元素组。结果是一个KeyedDataStream,它也是构建数据窗口(见下文)或使用聚合函数所必需的。为了检测一个包裹内的事件,装运编号属性用于分区,因为该属性用作每个包裹的唯一标识符。
模式API用于描述必须观察数据流的事件模式。每个事件模式由不同的阶段组成;每个阶段都有自己的条件。阶段的顺序定义了处理顺序。每个阶段都必须有一个唯一的名称来标识。检测丢弃包的模式如下所示。首先,定义了一个名为orientationEvent的阶段,它还描述了事件模式的开始(第2行)。下一行的where子句确定完成模式的第一个状态所需的事件条件。为了简化起见,假设丢弃的包的特征是方向属性4的值大于100。对于CEP应用来说,非常重要的是事件序列的定义,例如,为了描述事件A必须紧跟着事件B。followedBy函数声明模式的第一阶段必须紧跟着第二阶段,并附加了一个称为accelerationEvent的阶段。followedBy运算符具有其他事件可以在事件之间发生的特性。还有一个next操作符,它意味着事件B必须直接继承前一个事件a,并且它们之间没有其他事件。模式的第二部分声明丢弃包的第二个特征是加速。所以这部分的事件条件是加速度大于10。模式定义的内部部分确定事件必须在其中发生的时间窗口。
KeyedStream< SensorEvent, Tuple> keyedData = eventData.keyBy("shipment_number");
Pattern< SensorEvent, ?> dropPattern = Pattern.< SensorEvent >begin("orientationEvent")
.where(new FilterFunction< SensorEvent >() {
public boolean filter(SensorEvent orientationEvent) throws Exception {
return orientationEvent.getOrientation_l_r() > 100.0;
}
})
.followedBy("accelerationEvent")
.where(new FilterFunction< SensorEvent >() {
public boolean filter(SensorEvent accelerationEvent) throws Exception {
return accelerationEvent.getAcceleration() > 10.0;
}
})
.within(Time.seconds(3));
在这个简单的CEP应用程序的最后一部分,观察数据流和描述的事件模式被放在一起。然后,如果事件模式发生在数据流中,则使用PatternSelectFunction提取触发事件的值,并抛出警告或启动另一个操作,例如启动另一个应用程序或向系统发送消息。
PatternStream<SensorEvent> FlowFirstPatternStream = CEP.pattern(keyedData, dropPattern);
DataStream<String> warning = FlowFirstPatternStream.select(new PatternSelectFunction<SensorEvent, String>() {
public String select(Map<String, SensorEvent > pattern) throws Exception {
ShipmentEvent orientEvent = pattern.get("orientationEvent");
ShipmentEvent accelEvent = pattern.get("accelerationEvent");
if (orientEvent.getShipment_number().equals(accelEvent.getShipment_number())) {
return "Shipment : " + orientEvent.getShipment_number() + " was dropped!";
}
}
});
warning.print();
扩展程序
事件模式的另一个重要部分是观察传感器值与某些阈值的偏差。为此,必须应用时间窗口上的聚合。由于并非批处理API的所有聚合函数都可以在流式API中使用,所以使用fold运算符计算时间窗口内传感器值的平均值。fold函数来自函数编程的高阶函数。它从初始值开始,并对流中的每个新数据元素应用相应的处理步骤。在这种情况下,它用于汇总值并计算时间窗口中方向事件的数量。然后,这两个值用于计算方向属性的平均值,然后可用于识别事件,例如,方向值在过去10秒内变化了20%。折叠流包含装运编号的元素(用于标识),然后是当前方向值和计算值。函数返回一个新的SensorEvent,其中包含前面提到的属性。
private static class AvgFold implements FoldFunction<SensorEvent, Tuple5<String, Double, Double, Double, Double>> {
public Tuple5<String, Double, Double, Double, Double> fold(Tuple5<String, Double, Double, Double, Double> total, SensorEvent current) throws Exception {
double count = total.f4 + 1.0d;
double sum = total.f3 + Double.parseDouble(current. orientation);
double avg = sum / count;
return new Tuple5<String, Double, Double, Double, Double> (current.shipment_number, Double.parseDouble(current.orientation), sum, count, avg);
}
}
折叠应用于键控窗口,以一起处理同一包的流元素。使用10秒的滑动窗口,使用元素的处理时间作为时间戳。fold的一个特点是它可以用自愿性值初始化。在代码示例中,上面的事件模式通过fold函数进行调整,事件模式被改变以触发事件,其中当前方向值比时间窗口的聚合平均值高20%以上。
SingleOutputStreamOperator< Tuple5<String, Double, Double, Double, Double >> aggregatedEventStream = eventData
.keyBy("shipment_number")
.window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.fold(new Tuple5<String, Double, Double, Double, Double >("", 0.0d, 0.0d, 0.0d, 0.0d), new AvgFold());
Pattern< Tuple5<String, Double, Double, Double, Double >, ?> examplePattern = Pattern.< Tuple5<String, Double, Double, Double, Double >>begin("orientationEvent")
.where(new FilterFunction< Tuple5<String, Double, Double, Double, Double >>(){
public boolean filter(Tuple5<String, Double, Double, Double, Double > event) throws Exception {
return event.f1 > event.f4*1.20;
}
})
. followedBy("accelerationEvent")
.where(new FilterFunction< SensorEvent >() {
public boolean filter(SensorEvent accelerationEvent) throws Exception {
return accelerationEvent.getAcceleration() > 10.0;
}
})
.within(Time.seconds(20));
结论
首次在CEP复杂数据处理中引入了可伸缩的事件处理框架。由于其年轻的历史(第一个版本于2016年4月发布),一些功能方面仍然是开放的。因此,与现有的CEP软件如Esper或Drools进行比较有点不公平。将来,Flink CEP库将得到扩展,初步结果已经可用。自从1.1.0版发布了一个用于CEP的scala api,它允许开发人员友好地描述事件模式。
欲了解更多信息,请联系Martin Roth或Norman Spangenberg。
参考
[1] http://blogs-images.forbes.com/jacobmorgan/files/2014/05/libelium_smart_world_infographic_big.png
[2] https://stratio.atlassian.net/wiki/display/DECISION0x9/Home
[3] http://wso2.com/products/complex-event-processor
[4] http://flink.apache.org/news/2016/04/06/cep-monitoring.html