Flink教程

1. Flink运行框架

1.1 运行组件:
  • 作业管理器(JobManager):请求slot
  • 资源管理器(ResourceManager):管理slot
  • 任务管理器(TaskManager):提供slot,执行任务
  • 分发器(Dispatcher):跨作业运行。将应用分配给JM
1.2 任务提交流程
image.png

yarn模式


image.png
1.3 任务调度原理
image.png
1.4 并行度与slot

一个特定算子的 子任务(subtask)的个数被称之为其并行度(parallelism)
一个 stream 的并行度,可以认为就是其所有算子中最大的并行度
在设置Slot时,在所有设置中的最大设置的并行度大小则就是所需要设置的Slot的数量

1.5 程序

Source 、Transformation 和 Sink。

2. 流处理API

2.1 Environment

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

2.2 Source
  • 从集合:env.fromCollection(List1)
  • 从文件:env.readTextFile(path)
  • 从kafka:
    (1)引入kafka依赖
    (2)Properties properties = new Properties();
    properties.setProperty("bootstrap.servers", "localhost:9092");
    (3)DataStream<String> dataStream = env.addSource(new FlinkKafkaConsumer<String>("sensor", new SimpleStringSchema(),properties));
    (4)自定义source:
    DataStream<SensorReading> dataStream = env.addSource(new MySensorSource());
2.3 Tranform:
  • 基本转换算子:map,flatMap,filter
    map:一对一,return
    flatmap:一对多,out.collect(field);
    filter:筛选:return 布尔值
  • 聚合操作算子:keyby,滚动聚合算子(sum(),min(),max(),minBy(),maxBy()),reduce
    Flink设计中,所有数据必须先分组才能做聚合操作。
    先keyBy得到KeyedStream,然后调用其reduce、sum等聚合操作方法。(先分组后聚合)
  • 多流转换算子
    Connect和CoMap
    Union
  • 算子转换


    image.png
  • 数据类型
    基础数据类型:Int, Double, Long, String, …
    元祖:Tuple0~Tuple25 例:Tuple2<String, Integer>
    POJO : bean对象
    JAVA里面的类型:Arrays, Lists, Maps, Enums等
  • 函数类
    实现udf函数:implements MapFunction, FilterFunction, ProcessFunction
    匿名函数: 例 tweets.filter( tweet -> tweet.contains("flink") );
    富函数:RichMapFunction,RichFlatMapFunction,RichFilterFunction。它与常规函数的不同在于,可以获取运行环境的上下文,并拥有一些生命周期方法,所以可以实现更复杂的功能。方法:open(),close(),getRuntimeContext()
2.4 sink:

stream.addSink(new MySink(xxxx))

  • kafka: dataStream.addSink( new FlinkKafkaProducer<String>("localhost:9092", "sinktest", new SimpleStringSchema()));
  • redis: dataStream.addSink(new RedisSink<>(config, new MyRedisMapper()));
  • 自定义RichSinkFunction

3. Flink的window

3.1 概念

window是一种切割无限数据为有限块进行处理的手段。Window是无限数据流处理的核心,Window将一个无限的stream拆分成有限大小的”buckets”桶,我们可以在这些桶上做计算操作。

3.2 Window类型

时间窗口(Time Window):滚动(固定的窗口,没重叠),滑动(固定的窗口长度和滑动间隔,可以有重叠),会话
计数窗口(Count Window):滚动,滑动

3.3 Window Api
创建窗口:在keyby后面
  • 滚动时间窗口(tumbling time window)
    .timeWindow(Time.seconds(15))
  • 滑动时间窗口(sliding time window)
    .timeWindow(Time.seconds(15),Time.seconds(5))
  • 会话窗口(session window)
    .window(EventTimeSessionWindows.withGap(Time.minutes(10)))
  • 滚动计数窗口(tumbling count window)
    .countWindow(5)
  • 滑动计数窗口(sliding count window)
    .countWindow(10,2)
    Time.milliseconds(x),Time.seconds(x),Time.minutes(x)
窗口函数:
  • 增量聚合函数:增量聚合函数,特点即每次数据过来都处理,但是到了窗口临界才输出结果。ReduceFunction, AggregateFunction
    .aggregate(new AggregateFunction<SensorReading, Integer, Integer>()
  • 全窗口函数:全窗口函数,特点即数据过来先不处理,等到窗口临界再遍历、计算、输出结果。ProcessWindowFunction,WindowFunction
    .process(new ProcessWindowFunction<SensorReading, Object, Tuple, TimeWindow>()
    .apply(new WindowFunction<SensorReading, Tuple3<String, Long, Integer>, String, TimeWindow>()
  • 其他API:
    .trigger() ——触发器
    .evitor() ——移除器
    .allowedLateness() ——允许处理迟到的数据
    .sideOutputLateData() ——将迟到的数据放入侧输出流
    .getSideOutput() ——获取侧输出流

4. 时间语义和Watermark

4.1 flink中的时间语义

Event Time:事件创建时间;
Ingestion Time:数据进入Flink的时间;
Processing Time:执行操作算子的本地系统时间,与机器相关;

4.2 引入事件时间

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

4.3 Watermark
4.3.1 概念
  • Flink对于迟到数据有三层保障,先来后到的保障顺序是:
    WaterMark => 约等于放宽窗口标准
    allowedLateness => 允许迟到(ProcessingTime超时,但是EventTime没超时)
    sideOutputLateData => 超过迟到时间,另外捕获,之后可以自己批处理合并先前的数据
    必须要有个机制来保证一个特定的时间后,必须触发window去进行计算了,这个特别的机制,就是Watermark。 Watermark 就是触发前一窗口的“关窗时间”,一旦触发关门那么以当前时刻为准在窗口范围内的所有所有数据都会收入窗中。
  • 怎样避免乱序数据带来的计算不正确?
    (1)遇到一个时间戳达到了窗口关闭时间,不应该立即触发窗口计算,而是等待一段时间,等迟到的数据来了再关闭窗口
    (2)Watermark是一种衡量Event Time进展的机制,可以设定延迟触发
    Watermark是用于处理乱序事件的,而正确的处理乱序事件,通常用Watermark机制结合window来实现
    (3)数据流中的Watermark用于表示”timestamp小于Watermark的数据,都已经到达了“,因此,window的执行也是由Watermark触发的。
    (4)Watermark可以理解成一个延迟触发机制,我们可以设置Watermark的延时时长t,每次系统会校验已经到达的数据中最大的maxEventTime,然后认定eventTime小于maxEventTime - t的所有数据都已经到达,如果有窗口的停止时间等于maxEventTime – t,那么这个窗口被触发执行。Watermark = maxEventTime-延迟时间t
    watermark 用来让程序自己平衡延迟和结果正确性
4.3.2 Watermark的引入
//乱序数据
dataStream.assignTimestampsAndWatermarks( new BoundedOutOfOrdernessTimestampExtractor<SensorReading>(Time.milliseconds(1000)) {
  @Override
  public long extractTimestamp(element: SensorReading): Long = { 
    return element.getTimestamp() * 1000L;
  } 
});

5. Flink状态管理

算子状态(Operator State)
键控状态(Keyed State):值状态(value state),列表状态(List state),映射状态(Map state),聚合状态(Reducing state & Aggregating State)
状态后端(State Backends)

6. ProcessFunction API(底层API)

DataStream API提供了一系列的Low-Level转换算子。可以访问时间戳、watermark以及注册定时事件。还可以输出特定的一些事件,例如超时事件等。

6.1 Flink提供了8个Process Function:

ProcessFunction
KeyedProcessFunction
CoProcessFunction
ProcessJoinFunction
BroadcastProcessFunction
KeyedBroadcastProcessFunction
ProcessWindowFunction
ProcessAllWindowFunction

6.2 KeyedProcessFunction:

用在keyedStream上
processElement(I value, Context ctx, Collector<O> out):Context可以访问元素的时间戳,元素的 key ,以及TimerService 时间服务
onTimer(long timestamp, OnTimerContext ctx, Collector<O> out):是一个回调函数。当之前注册的定时器触发时调用

6.3 TimerService和定时器(Timers)

Context 和OnTimerContext 所持有的TimerService 对象拥有以下方法:

  • long currentProcessingTime() 返回当前处理时间
  • long currentWatermark() 返回当前watermark 的时间戳
  • void registerProcessingTimeTimer( long timestamp) 会注册当前key的processing time的定时器。当processing time 到达定时时间时,触发timer。
  • void registerEventTimeTimer(long timestamp) 会注册当前key 的event time 定时器。当Watermark水位线大于等于定时器注册的时间时,触发定时器执行回调函数。
  • void deleteProcessingTimeTimer(long timestamp) 删除之前注册处理时间定时器。如果没有这个时间戳的定时器,则不执行。
  • void deleteEventTimeTimer(long timestamp) 删除之前注册的事件时间定时器,如果没有此时间戳的定时器,则不执行。
    当定时器timer 触发时,会执行回调函数onTimer()。注意定时器timer 只能在keyed streams 上面使用。
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 217,907评论 6 506
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,987评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,298评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,586评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,633评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,488评论 1 302
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,275评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,176评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,619评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,819评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,932评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,655评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,265评论 3 329
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,871评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,994评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,095评论 3 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,884评论 2 354

推荐阅读更多精彩内容