水位线
是数据流中插入的一个标记,用来表示事件时间的进展,它会随着数据一起在任务间传递。
可以实现WatermarkStrategy接口自定义生成水位线
为数据流中的元素分配时间戳并生成水印以表示事件时间进度。 给定的 WatermarkStrategy 用于创建 TimestampAssigner 和 WatermarkGenerator。对于数据流中的每个事件,调用 TimestampAssigner.extractTimestamp(Object, long) 方法来分配事件时间戳。对于数据流中的每个事件,将调用 WatermarkGenerator.onEvent(Object, long, WatermarkOutput)。定期(由 ExecutionConfig.getAutoWatermarkInterval() 定义)将调用 WatermarkGenerator.onPeriodicEmit(WatermarkOutput) 方法来发射水位线。也可以不在onPeriodicEmit里面,在onEvent中判断满足某一条件才发射。
常见的水印生成模式可以作为 WatermarkStrategy 类中的静态方法找到。
flink内置:
forMonotonousTimestamps有序流的水位线
forBoundedOutOfOrderness乱序流的水位线
WatermarkStrategy.forMonotonousTimestamps() ==WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(0))
自定义时还可以指定周期性生成或者是断点式生成
周期性生成比如200ms。断点式生成在onevent中满足一定条件才发射水位线
多个并行子任务时,下游可能会收到多个上游发来的水位线,木桶原理,小的为准。因为水位线的本质是当前时间之前的数据,都已经到齐了。
窗口
按键分区和非按键分区数据流
区别在于调用窗口算子之前是否有keyby操作。
经过按键分区 keyBy 操作后,数据流会按照 key 被分为多条逻辑流(logical streams),就是 KeyedStream。基于KeyedStream 进行窗口操作时, 窗口计算会在多个并行子任务上同时执行。相同 key 的数据会被发送到同一个并行子任务,而窗口操作会基于每个 key 进行单独的处理。所以可以认为,每个 key 上都定义了一组窗口,各自独立地进行统计计算。
如果没有进行 keyBy,那么原始的 DataStream 就不会分成多条逻辑流。这时窗口逻辑只能在一个任务(task)上执行,就相当于并行度变成了 1。所以在实际应用中一般不推荐使用这种方式。对于非按键分区的窗口操作,手动调大窗口算子的并行度也是无效的,windowAll 本身就是一个非并行的操作
达到窗口关闭时间已经触发计算然后销毁了(窗口默认被销毁),所以无法再进入到窗口中,自然也就无法更新计算结果了
增量聚合函数
ReduceFunction和AggregateFunction
.keyBy(r -> r.f0)
.reduce(new ReduceFunction<Tuple2<String, Long>>() {
public Tuple2<String, Long> reduce(Tuple2<String, Long> value1,
Tuple2<String, Long> value2) throws Exception {
// 定义累加规则,窗口闭合时,向下游发送累加结果
return Tuple2.of(value1.f0, value1.f1 + value2.f1);
}
})
ReduceFunction 可以解决大多数归约聚合的问题,但是这个接口有一个限制,就是聚合状态的类型、输出结果的类型都必须和输入数据类型一样。这就迫使我们必须在聚合前,先将数据转换(map)成预期结果类型;
AggregateFunction 可以看作是 ReduceFunction 的通用版本,这里有三种类型:输入类型(IN)、累加器类型(ACC)和输出类型(OUT)。输入类型 IN 就是输入流中元素的数据类型;累加器类型 ACC 则是我们进行聚合的中间状态类型;而输出类型当然就是最终计算结果的类型。
createAccumulator():创建一个累加器,这就是为聚合创建了一个初始状态,每个聚合任务只会调用一次。
⚫ add():将输入的元素添加到累加器中。这就是基于聚合状态,对新来的数据进行进一步聚合的过程。方法传入两个参数:当前新到的数据 value,和当前的累加器accumulator;返回一个新的累加器值,也就是对聚合状态进行更新。每条数据到来之后都会调用这个方法。
⚫ getResult():从累加器中提取聚合的输出结果。也就是说,我们可以定义多个状态,然后再基于这些聚合的状态计算出一个结果进行输出。比如之前我们提到的计算平均值,就可以把 sum 和 count 作为状态放入累加器,而在调用这个方法时相除得到最终结果。这个方法只在窗口要输出结果时调用。
⚫ merge():合并两个累加器,并将合并后的状态作为一个累加器返回。这个方法只在需要合并窗口的场景下才会被调用;最常见的合并窗口(Merging Window)的场景就是会话窗口(Session Windows)。
全窗口函数
窗口操作中的另一大类就是全窗口函数。与增量聚合函数不同,全窗口函数需要先收集窗口中的数据,并在内部缓存起来,等到窗口要输出结果的时候再取出数据进行计算;
ProcessWindowFunction
增量聚合函数处理计算会更高效。举一个最简单的例子,对一组数据求和。大量的数据连续不断到来,全窗口函数只是把它们收集缓存起来,并没有处理;到了窗口要关闭、输出结果的时候,再遍历所有数据依次叠加,得到最终结果。而如果我们采用增量聚合的方式,那么只需要保存一个当前和的状态,每个数据到来时就会做一次加法,更新状态;到了要输出结果的时候,只要将当前状态直接拿出来就可以了。增量聚合相当于把计算量“均摊”到了窗口收集数据的过程中,自然就会比全窗口聚合更加高效、输出更加实时。而全窗口函数的优势在于提供了更多的信息,可以认为是更加“通用”的窗口操作。它只负责收集数据、提供上下文相关信息,把所有的原材料都准备好,至于拿来做什么我们完全可以任意发挥。这就使得窗口计算更加灵活,功能更加强大。
也可以增量聚合和全窗口函数的结合起来使用
这样调用的处理机制是:基于第一个参数(增量聚合函数)来处理窗口数据,每来一个数据就做一次聚合;等到窗口需要触发计算时,则调用第二个参数(全窗口函数)的处理逻辑输出结果。需要注意的是,这里的全窗口函数就不再缓存所有数据了,而是直接将增量聚合函数的结果拿来当作了 Iterable 类型的输入。一般情况下,这时的可迭代集合中就只有一个元素了。
窗口处理的主体还是增量聚合,而引入全窗口函数又可以获取到更多的信息包装输出,这样的结合兼具了两种窗口函数的优势,在保证处理性能和实时性的同时支持了更加丰富的应用场景。
Trigger
Trigger 是一个抽象类,自定义时必须实现下面四个抽象方法;
⚫ onElement():窗口中每到来一个元素,都会调用这个方法。
⚫ onEventTime():当注册的事件时间定时器触发时,将调用这个方法。
⚫ onProcessingTime ():当注册的处理时间定时器触发时,将调用这个方法。
⚫ clear():当窗口关闭销毁时,调用这个方法。一般用来清除自定义的状态。
前三个方法返回类型都是 TriggerResult,这是一个枚举类型(enum),其中定义了对窗口进行操作的四种类型。
⚫ CONTINUE(继续):什么都不做
⚫ FIRE(触发):触发计算,输出结果
⚫ PURGE(清除):清空窗口中的所有数据,销毁窗口
⚫ FIRE_AND_PURGE(触发并清除):触发计算输出结果,并清除窗口
移除器(Evictor)
Evictor 接口定义了两个方法:
⚫ evictBefore():定义执行窗口函数之前的移除数据操作
⚫ evictAfter():定义执行窗口函数之后的以处数据操作
默认情况下,预实现的移除器都是在执行窗口函数(window fucntions)之前移除数据的
窗口的销毁
一般情况下,当时间达到了结束点,就会直接触发计算输出结果、进而清除状态销毁窗口。这时窗口的销毁可以认为和触发计算是同一时刻。这里需要注意,Flink 中只对时间窗口(TimeWindow)有销毁机制;由于计数窗口(CountWindow)是基于全局窗口(GlobalWindw)实现的,而全局窗口不会清除状态,所以就不会被销毁。
ProcessFunction
内部单独定义了两个方法:一个是必须要实现的抽象方法.processElement();另一个是非抽象方法.onTimer()
八类处理函数:
(1)ProcessFunction
最基本的处理函数,基于 DataStream 直接调用.process()时作为参数传入。
(2)KeyedProcessFunction
对流按键分区后的处理函数,基于 KeyedStream 调用.process()时作为参数传入。要想使用
定时器,比如基于 KeyedStream。
(3)ProcessWindowFunction
开窗之后的处理函数,也是全窗口函数的代表。基于 WindowedStream 调用.process()时作
为参数传入。
(4)ProcessAllWindowFunction
同样是开窗之后的处理函数,基于 AllWindowedStream 调用.process()时作为参数传入。
(5)CoProcessFunction
合并(connect)两条流之后的处理函数,基于 ConnectedStreams 调用.process()时作为参数传入。
(6)ProcessJoinFunction
间隔连接(interval join)两条流之后的处理函数,基于 IntervalJoined 调用.process()时作为参数传入。
(7)BroadcastProcessFunction
广播连接流处理函数,基于 BroadcastConnectedStream 调用.process()时作为参数传入。这里的“广播连接流”BroadcastConnectedStream,是一个未 keyBy 的普通 DataStream 与一个广播流(BroadcastStream)做连接(conncet)之后的产物。
(8)KeyedBroadcastProcessFunction
按键分区的广播连接流处理函数,同样是基于 BroadcastConnectedStream 调用.process()时作为参数传入。与 BroadcastProcessFunction 不同的是,这时的广播连接流,是一个 KeyedStream与广播流(BroadcastStream)做连接之后的产物。
多流转换
分流
使用侧输出流、定义多个侧输出流。process处理时加到对应的侧输出流中
合流
Union
最简单的合流操作,就是直接将多条流合在一起,叫作流的“联合”(union);联合操作要求必须流中的数据类型必须相同,合并之后的新流会包括所有流中的元素,数据类型不变。不限制流的个数。
合流的时候数据顺序?
Connect
流的联合虽然简单,不过受限于数据类型不能改变。connect限制只能2条流。合并后返回的是ConnectedStreams,相应的处理方法也是CoMapFunction之类的。
基于时间的合流
Windows Join
stream1.join(stream2).where(<KeySelector>).equalTo(<KeySelector>).window(<WindowAssigner>).apply(<JoinFunction>)
window join类似于 inner join。也就是说,最后处理输出的,只有两条流中数据按 key 配对成功的那些
Interval Join
间隔联结的思路就是针对一条流的每个数据,开辟出其时间戳前后的一段时间间隔,看这期间是否有来自另一条流的数据匹配,匹配的规则也是key相同
stream1.keyBy(<KeySelector>).intervalJoin(stream2.keyBy(<KeySelector>)).between(Time.milliseconds(-2), Time.milliseconds(1)).process (new ProcessJoinFunction<Integer, Integer, String(){@Override
public void processElement(Integer left, Integer right, Context ctx,
Collector<String> out) {
out.collect(left + "," + right);}});
Windows CoGroup
用法跟 window join 非常类似,也是将两条流合并之后开窗处理匹配的元素,调用时只需要将.join()换为.coGroup()
public interface CoGroupFunction<IN1, IN2, O> extends Function, Serializable {
void coGroup(Iterable<IN1> first, Iterable<IN2> second, Collector<O> out) throws Exception; }
coGroup 操作比窗口的 join 更加通用,不仅可以实现类似 SQL 中的“内连接”(inner join),也可以实现左外连接(left outer join)、右外连接(right outer join)和全外连接(full outer join)
状态编程
一个算子任务会按照并行度分为多个并行子任务执行,而不同的子任务会占据不同的任务槽(task slot)。由于不同的 slot 在计算资源上是物理隔离的,所以 Flink能管理的状态在并行任务间是无法共享的,每个状态只能针对当前子任务的实例有效。而很多有状态的操作(比如聚合、窗口)都是要先做 keyBy 进行按键分区的。按键分区之后,任务所进行的所有计算都应该只针对当前 key 有效,所以状态也应该按照 key 彼此隔离。在这种情况下,状态的访问方式又会有所不同。基于这样的想法,我们又可以将托管状态分为两类:算子状态和按键分区状态。
我们需要先声明一个状态的 StateDescriptor,这个 Descriptor 中包含了 StateName 和 State 的类型,在同一个 Operator 里,StateName 会作为这个 State 的唯一标识
算子状态作用于当前并行子任务、按键分区状态作用于输入流的key级别。所以按键分区状态只能在keyby后使用
按键分区状态(Keyed State)
对于 keyedState,我们需要 OperatorID、StateName,Key、Namespace 才能定位到 State 。
1. 值状态(ValueState)
2. 列表状态(ListState)
3. 映射状态(MapState)
4. 归约状态(ReducingState)
5. 聚合状态(AggregatingState)
状态TTL
StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.seconds(10))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build();
依次:设定的状态生存时间、设置更新类型、设置状态的可见性。(所谓的“状态可见性”,是指因为清除操作并不是实时的,所以当状态过期之后还有可能基于存在,这时如果对它进行访问,能否正常读取到就是一个问题了。这里设置的 NeverReturnExpired 是默认行为,表示从不返回过期值,也就是只要过期就认为它已经被清除了,应用不能继续读取;这在处理会话或者隐私数据时比较重要。对应的另一种配置是 ReturnExpireDefNotCleanedUp,就是如果过期状态还存在,就返回它的值)
目前的 TTL 设置只支持处理时间。另外,所有集合类型的状态(例如ListState、MapState)在设置 TTL 时,都是针对每一项(per-entry)元素的。也就是说,一个列表状态中的每一个元素,都会以自己的失效时间来进行清理,而不是整个列表一起清理。
算子状态(Operator State)
作用于并行子任务,对于 OperatorState,我们只需要 OperatorID,StateName,即可定位到唯一的 OperatorState 值。
1. 列表状态(ListState)
每个并行子任务维护一个列表状态,在当算子并行度进行缩放调整时,算子的列表状态中的所有元素项会被统一收集起来,相当于把多个分区的列表合并成了一个“大列表”,然后再均匀地分配给所有并行任务。这种“均匀分配”的具体方法就是“轮询”(round-robin),与之前介绍的 rebanlance 数据传输方式类似,是通过逐一“发牌”的方式将状态项平均分配的。这种方式也叫作“平均分割重组”(even-splitredistribution)。
2. 联合列表状态(UnionListState)
与列表状态类似,仅仅是进行缩放调整时对于状态的分配方式不同。
3. 广播状态(BroadcastState)
有时我们希望算子并行子任务都保持同一份“全局”状态,用来做统一的配置和规则设定。
状态持久化和状态后端
Flink 会定期保存检查点,在检查点中会记录每个算子的 id 和状态;如果发生故障,Flink 就会用最近一次成功保存的检查点来恢复应用的状态,重新启动处理流程,就如同“读档”一样。
状态后端的分类:
1. 哈希表状态后端
即放内存,具体实现上,哈希表状态后端在内部会直接把状态当作对象(objects),保存在 Taskmanager 的 JVM 堆(heap)上。普通的状态,以及窗口中收集的数据和触发器(triggers),都会以键值对(key-value)的形式存储起来,所以底层是一个哈希表(HashMap)
2. RocksDB
RocksDB 是一种内嵌的 key-value 存储介质,可以把数据持久化到本地硬盘。配置EmbeddedRocksDBStateBackend 后,会将处理中的数据全部放入 RocksDB 数据库中,RocksDB默认存储在 TaskManager 的本地数据目录里。与 HashMapStateBackend 直接在堆内存中存储对象不同,这种方式下状态主要是放在RocksDB 中的。数据被存储为序列化的字节数组(Byte Arrays),读写操作需要序列化/反序列化,因此状态的访问性能要差一些。
为每个Job配置状态后端:env.setStateBackend(new HashMapStateBackend());
指定checkpoint的目录:state.checkpoints.dir: hdfs://namenode:40010/flink/checkpoints
3.FsStateBackend
检查点
所有任务状态在同一时间点的一个“快照”(snapshot),它的触发是周期性的。当所有任务都恰好处理完一个相同的输入数据的时候,将它们的状态保存下来。首先,这样避免了除状态之外其他额外信息的存储,提高了检查点保存的效率。其次,一个数据要么就是被所有任务完整地处理完,状态得到了保存;要么就是没处理完,状态全部没保存:这就相当于构建了一个“事务”(transaction)。
检查点的保存
JobManagerCheckpointStorage(默认)
FileSystemCheckpointStorage
也可以设置定期去保存检查点作用于恢复
/user-defined-checkpoint-dir /{job-id} | + --shared/ + --taskowned/ + --chk-1/ + --chk-2/ + --chk-3/
barrier概念
barrier注入数据流中,根据数据流一起流转。每个barrier携带一个快照ID。来自不同快照的多个障碍可以同时在流中,这意味着各种快照可能同时发生。一旦Sink操作符(流式 DAG 的末端)从其所有输入流中接收到屏障 n,它就会向检查点协调器确认快照 n。 在所有Sink都确认快照后,则完成了此次快照。
aligned Checkpointing
每当接收到 Barrier,算子进行本地的 Checkpoint 快照,并在完成后异步上传本地快照,同时将 Barrier 以广播方式发送至下游。当某个 Checkpoint 的所有 Barrier 到达 DAG 末端且所有算子完成快照,则标志着全局快照的成功。在有多个输入 Channel 的情况下,为了数据准确性,算子会等待所有流的 Barrier 都到达之后才会开始本地的快照,这种机制被称为 Barrier 对齐。在对齐的过程中,算子只会继续处理的来自未出现 Barrier Channel 的数据,而其余 Channel 的数据会被写入输入队列,直至在队列满后被阻塞。对齐的策略下,如果某个并行子任务收到的barrire更快,则需要将数据先缓存起来,直到其他子任务也收到barrier。
当算子包含任何形式的状态时,这些状态也必须是快照的一部分。算子在收到上游所有的snapshot barriers后,发送snapshot barriers到下游之前完成自身状态的snapshot。因此状态可能是很大的,所以需要保存到状态后端。而在checkpoint中仅仅保存了一个指向该状态的指针
Unaligned Checkpointing
检查点屏障实际上不再嵌入到数据流中。可以将未处理的缓冲数据(in-flight data)也保存进检查点。这样,当我们遇到一个分区barrier时就不需等待对齐,而是可以直接启动状态的保存了。但是这种情况下对state backend的IO压力很大,因为需要将其他子任务落后的数据一并保存到状态后端中。
Exactly Once vs. At Least Once
对齐步骤可能会增加流式传输程序的等待时间。 通常,这种额外的延迟大约为几毫秒,但我们已经看到一些异常值的延迟显着增加的情况。 对于所有记录都需要始终超低延迟(几毫秒)的应用程序,Flink 有一个开关可以在检查点期间跳过流对齐。 一旦操作员从每个输入中看到检查点障碍,检查点快照仍然会被绘制。
当跳过对齐时,操作员会继续处理所有输入,即使在检查点 n 的一些检查点屏障到达之后也是如此。 这样,运算符还会在获取检查点 n 的状态快照之前处理属于检查点 n+1 的元素。 在恢复时,这些记录将作为副本出现,因为它们都包含在检查点 n 的状态快照中,并且将作为检查点 n 之后的数据的一部分重放。
两者区别就在于对齐方式,如果是不强制对齐的话,数据会继续发到下游处理,并且最终输出到第三方储存。假设checkpoint n已经完成,但是这个checkpoint中包括了缓冲数据,如果此时发生故障重启,这些缓冲数据就会进行重放,而实际上某些数据已经输出到第三方存储,所以导致了重复数据。
enableUnalignedCheckpoints:只能用于exactly once,是不是两者组合起来就等于Al least once? 这块后续再看看
savepoint
与此相反,保存点是由用户创建、拥有和删除的。保存点的使用是手动的备份和恢复。 例如,这可能是更新Flink 版本、更改作业图、更改并行度等等。 可以认为保存点是Operator ID -> State for each stateful operator
todo:检查点保存点保存数据的形式,k应当就是uid+状态名称+key,v是状态
SQL
将table转换为流
(1)调用 toDataStream()方法:针对只会插入、不会更新的表
(2)调用 toChangelogStream()方法
flink sql指定时间属性
事件时间
1. SQL中直接指定,这里我们把 ts 字段定义为事件时间属性,而且基于 ts 设置了 5 秒的水位线延迟
2. 在数据流转换为表时定义,
处理时间
1. 在sql中直接定义
2. 在数据流转换为表时定义
在 Flink 1.12 之前的版本中,Table API 和 SQL 提供了一组“分组窗口”(Group Window)函数,常用的时间窗口如滚动窗口、滑动窗口、会话窗口都有对应的实现;具体在 SQL 中就是调用 TUMBLE()、HOP()、SESSION(),传入时间属性字段、窗口大小等参数就可以了。以滚动窗口为例:TUMBLE(ts, INTERVAL '1' HOUR)
从1.13开始开始使用窗口表值函数(TVF)来定义窗口,在窗口 TVF 的返回值中,除去原始表中的所有列,还增加了用来描述窗口的额外 3 个列:“窗口起始点”(window_start)、“窗口结束点”(window_end)、“窗口时间”(window_time)。起始点和结束点比较好理解,这里的“窗口时间”指的是窗口中的时间属性,它的值等于window_end - 1ms,所以相当于是窗口中能够包含数据的最大时间戳。
(1)滚动窗口(TUMBLE)
TUMBLE(TABLE EventTable, DESCRIPTOR(ts), INTERVAL '1' HOUR)这里基于时间字段 ts,对表 EventTable 中的数据开了大小为 1 小时的滚动窗口。窗口会将表中的每一行数据,按照它们 ts 的值分配到一个指定的窗口中。
(2)滑动窗口(HOP)
HOP(TABLE EventTable, DESCRIPTOR(ts), INTERVAL '5' MINUTES, INTERVAL '1' HOURS));这里我们基于时间属性 ts,在表 EventTable 上创建了大小为 1 小时的滑动窗口,每 5 分钟滑动一次。需要注意的是,紧跟在时间属性字段后面的第三个参数是步长(slide),第四个参数才是窗口大小(size)。
(3)累积窗口(CUMULATE)
CUMULATE(TABLE EventTable, DESCRIPTOR(ts), INTERVAL '1' HOURS, INTERVAL '1' DAYS))这里我们基于时间属性 ts,在表 EventTable 上定义了一个统计周期为 1 天、累积步长为 1小时的累积窗口。注意第三个参数为步长 step,第四个参数则是最大窗口长度。
为了防止状态无限增长耗尽资源,Flink Table API 和 SQL 可以在表环境中配置状态的生存时间(TTL)
TableConfig tableConfig = tableEnv.getConfig();
tableConfig.setIdleStateRetention(Duration.ofMinutes(60)); 或者
configuration.setString("table.exec.state.ttl", "60 min");
Join查询
regular join
常规联结(Regular Join)是 SQL 中原生定义的 Join 方式,联结用 INNER JOIN 来定义,这块也是innerjoin、left join、right join、full join等常规知识。
函数
系统函数
Flink SQL中的系统函数又主要可以分为两大类:标量函数(Scalar Functions)和聚合函数(Aggregate Functions)。
标量函数
比较函数、逻辑函数、算术函数、字符串函数等
聚合函数
COUNT(*)、RANK()
自定义函数
Flink 的 Table API 和 SQL 提供了多种自定义函数的接口,以抽象类的形式定义。当前 UDF主要有以下几类:
⚫ 标量函数(Scalar Functions):将输入的标量值转换成一个新的标量值;
⚫ 表函数(Table Functions):将标量值转换成一个或多个新的行数据,也就是扩展成一个表;
⚫ 聚合函数(Aggregate Functions):将多行数据里的标量值转换成一个新的标量值;
聚合函数的概念我们之前已经接触过多次,如 SUM()、MAX()、MIN()、AVG()、COUNT()都是常见的系统内置聚合函数。
⚫ 表聚合函数(Table Aggregate Functions):将多行数据里的标量值转换成一个或多个新的行数据。
目前 SQL 中没有直接使用表聚合函数的方式,所以需要使用 Table API 的方式来调用
Flink CEP
维表Join
Row和Rowdata
Row为RowData的低阶数据类型,常常用于DataStream中的使用;而RowData是通过不同的Conveter转换Row得到的高阶数据,用于TableAPI和Sql中使用 ;两者中的存储内容基本是一致的。