1. Flink运行框架
1.1 运行组件:
- 作业管理器(JobManager):请求slot
- 资源管理器(ResourceManager):管理slot
- 任务管理器(TaskManager):提供slot,执行任务
- 分发器(Dispatcher):跨作业运行。将应用分配给JM
1.2 任务提交流程
yarn模式
1.3 任务调度原理
1.4 并行度与slot
一个特定算子的 子任务(subtask)的个数被称之为其并行度(parallelism)
一个 stream 的并行度,可以认为就是其所有算子中最大的并行度
在设置Slot时,在所有设置中的最大设置的并行度大小则就是所需要设置的Slot的数量
1.5 程序
Source 、Transformation 和 Sink。
2. 流处理API
2.1 Environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
2.2 Source
- 从集合:
env.fromCollection(List1)
- 从文件:
env.readTextFile(path)
- 从kafka:
(1)引入kafka依赖
(2)Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
(3)DataStream<String> dataStream = env.addSource(new FlinkKafkaConsumer<String>("sensor", new SimpleStringSchema(),properties));
(4)自定义source:
DataStream<SensorReading> dataStream = env.addSource(new MySensorSource());
2.3 Tranform:
- 基本转换算子:map,flatMap,filter
map:一对一,return
flatmap:一对多,out.collect(field);
filter:筛选:return 布尔值 - 聚合操作算子:keyby,滚动聚合算子(sum(),min(),max(),minBy(),maxBy()),reduce
Flink设计中,所有数据必须先分组才能做聚合操作。
先keyBy得到KeyedStream,然后调用其reduce、sum等聚合操作方法。(先分组后聚合) - 多流转换算子
Connect和CoMap
Union -
算子转换
image.png - 数据类型
基础数据类型:Int, Double, Long, String, …
元祖:Tuple0~Tuple25 例:Tuple2<String, Integer>
POJO : bean对象
JAVA里面的类型:Arrays, Lists, Maps, Enums等 - 函数类
实现udf函数:implements MapFunction, FilterFunction, ProcessFunction
匿名函数: 例 tweets.filter( tweet -> tweet.contains("flink") );
富函数:RichMapFunction,RichFlatMapFunction,RichFilterFunction。它与常规函数的不同在于,可以获取运行环境的上下文,并拥有一些生命周期方法,所以可以实现更复杂的功能。方法:open(),close(),getRuntimeContext()
2.4 sink:
stream.addSink(new MySink(xxxx))
- kafka:
dataStream.addSink( new FlinkKafkaProducer<String>("localhost:9092", "sinktest", new SimpleStringSchema()));
- redis:
dataStream.addSink(new RedisSink<>(config, new MyRedisMapper()));
- 自定义RichSinkFunction
3. Flink的window
3.1 概念
window是一种切割无限数据为有限块进行处理的手段。Window是无限数据流处理的核心,Window将一个无限的stream拆分成有限大小的”buckets”桶,我们可以在这些桶上做计算操作。
3.2 Window类型
时间窗口(Time Window):滚动(固定的窗口,没重叠),滑动(固定的窗口长度和滑动间隔,可以有重叠),会话
计数窗口(Count Window):滚动,滑动
3.3 Window Api
创建窗口:在keyby后面
- 滚动时间窗口(tumbling time window)
.timeWindow(Time.seconds(15))
- 滑动时间窗口(sliding time window)
.timeWindow(Time.seconds(15),Time.seconds(5))
- 会话窗口(session window)
.window(EventTimeSessionWindows.withGap(Time.minutes(10)))
- 滚动计数窗口(tumbling count window)
.countWindow(5)
- 滑动计数窗口(sliding count window)
.countWindow(10,2)
Time.milliseconds(x),Time.seconds(x),Time.minutes(x)
窗口函数:
- 增量聚合函数:增量聚合函数,特点即每次数据过来都处理,但是到了窗口临界才输出结果。ReduceFunction, AggregateFunction
.aggregate(new AggregateFunction<SensorReading, Integer, Integer>()
- 全窗口函数:全窗口函数,特点即数据过来先不处理,等到窗口临界再遍历、计算、输出结果。ProcessWindowFunction,WindowFunction
.process(new ProcessWindowFunction<SensorReading, Object, Tuple, TimeWindow>()
.apply(new WindowFunction<SensorReading, Tuple3<String, Long, Integer>, String, TimeWindow>()
- 其他API:
.trigger() ——触发器
.evitor() ——移除器
.allowedLateness() ——允许处理迟到的数据
.sideOutputLateData() ——将迟到的数据放入侧输出流
.getSideOutput() ——获取侧输出流
4. 时间语义和Watermark
4.1 flink中的时间语义
Event Time:事件创建时间;
Ingestion Time:数据进入Flink的时间;
Processing Time:执行操作算子的本地系统时间,与机器相关;
4.2 引入事件时间
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
4.3 Watermark
4.3.1 概念
- Flink对于迟到数据有三层保障,先来后到的保障顺序是:
WaterMark => 约等于放宽窗口标准
allowedLateness => 允许迟到(ProcessingTime超时,但是EventTime没超时)
sideOutputLateData => 超过迟到时间,另外捕获,之后可以自己批处理合并先前的数据
必须要有个机制来保证一个特定的时间后,必须触发window去进行计算了,这个特别的机制,就是Watermark。 Watermark 就是触发前一窗口的“关窗时间”,一旦触发关门那么以当前时刻为准在窗口范围内的所有所有数据都会收入窗中。 - 怎样避免乱序数据带来的计算不正确?
(1)遇到一个时间戳达到了窗口关闭时间,不应该立即触发窗口计算,而是等待一段时间,等迟到的数据来了再关闭窗口
(2)Watermark是一种衡量Event Time进展的机制,可以设定延迟触发
Watermark是用于处理乱序事件的,而正确的处理乱序事件,通常用Watermark机制结合window来实现
(3)数据流中的Watermark用于表示”timestamp小于Watermark的数据,都已经到达了“,因此,window的执行也是由Watermark触发的。
(4)Watermark可以理解成一个延迟触发机制,我们可以设置Watermark的延时时长t,每次系统会校验已经到达的数据中最大的maxEventTime,然后认定eventTime小于maxEventTime - t的所有数据都已经到达,如果有窗口的停止时间等于maxEventTime – t,那么这个窗口被触发执行。Watermark = maxEventTime-延迟时间t
watermark 用来让程序自己平衡延迟和结果正确性
4.3.2 Watermark的引入
//乱序数据
dataStream.assignTimestampsAndWatermarks( new BoundedOutOfOrdernessTimestampExtractor<SensorReading>(Time.milliseconds(1000)) {
@Override
public long extractTimestamp(element: SensorReading): Long = {
return element.getTimestamp() * 1000L;
}
});
5. Flink状态管理
算子状态(Operator State)
键控状态(Keyed State):值状态(value state),列表状态(List state),映射状态(Map state),聚合状态(Reducing state & Aggregating State)
状态后端(State Backends)
6. ProcessFunction API(底层API)
DataStream API提供了一系列的Low-Level转换算子。可以访问时间戳、watermark以及注册定时事件。还可以输出特定的一些事件,例如超时事件等。
6.1 Flink提供了8个Process Function:
ProcessFunction
KeyedProcessFunction
CoProcessFunction
ProcessJoinFunction
BroadcastProcessFunction
KeyedBroadcastProcessFunction
ProcessWindowFunction
ProcessAllWindowFunction
6.2 KeyedProcessFunction:
用在keyedStream上
processElement(I value, Context ctx, Collector<O> out)
:Context可以访问元素的时间戳,元素的 key ,以及TimerService 时间服务
onTimer(long timestamp, OnTimerContext ctx, Collector<O> out)
:是一个回调函数。当之前注册的定时器触发时调用
6.3 TimerService和定时器(Timers)
Context 和OnTimerContext 所持有的TimerService 对象拥有以下方法:
-
long currentProcessingTime()
返回当前处理时间 -
long currentWatermark()
返回当前watermark 的时间戳 -
void registerProcessingTimeTimer( long timestamp)
会注册当前key的processing time的定时器。当processing time 到达定时时间时,触发timer。 -
void registerEventTimeTimer(long timestamp)
会注册当前key 的event time 定时器。当Watermark水位线大于等于定时器注册的时间时,触发定时器执行回调函数。 -
void deleteProcessingTimeTimer(long timestamp)
删除之前注册处理时间定时器。如果没有这个时间戳的定时器,则不执行。 -
void deleteEventTimeTimer(long timestamp)
删除之前注册的事件时间定时器,如果没有此时间戳的定时器,则不执行。
当定时器timer 触发时,会执行回调函数onTimer()。注意定时器timer 只能在keyed streams 上面使用。