flink sql 知其所以然(十三):流 join问题解决

1.序篇

本节是 flink sql 流 join 系列的下篇,上篇的链接如下:

废话不多说,咱们先直接上本文的目录和结论,小伙伴可以先看结论快速了解博主期望本文能给小伙伴们带来什么帮助:

  1. 背景及应用场景介绍:博主期望你能了解到,flink sql 提供的丰富的 join 方式(总结 6 种:regular join,维表 join,快照 join,interval join,array 拍平,table function)对我们满足需求提供了强大的后盾, 这 6 种 join 中涉及到流与流的 join 最常用的是 regular join 以及 interval join,本节主要介绍 interval join
  2. 来一个实战案例:博主以上节说到的曝光日志流点击日志流为案例展开,主要是想告诉小伙伴 flink sql left join 数据不会互相等待,存在 retract 问题,会导致写入 kafka 的数据量变大, 然后转变思路为使用 flink sql interval join 的方式可以使得数据互相等待一段时间进行 join,这种方式不会存在 retract 问题
  3. flink sql interval join 的解决方案以及原理的介绍:主要介绍 interval join 的在上述实战案例的运行结果及分析源码机制,博主期望你能了解到,interval join 的执行机制是会在你设置的 interval 区间之内互相等待一段时间,一旦时间推进(事件时间由 watermark 推进)到区间之外(即当前这条数据再也不可能被另一条流的数据 join 到时),outer join 会输出没有 join 到的数据,inner join 会从 state 中删除这条数据
  4. 总结及展望

2.背景及应用场景介绍

书接上文,上文介绍了曝光流在关联点击流时,使用 flink sql regular join 存在的 retract 问题。

本文介绍怎么使用 flink sql interval join 解决这些问题。

3.来一个实战案例

flink sql 知其所以然(十二):流 join 很难嘛???(上)

看看上节的实际案例,来看看在具体输入值的场景下,输出值应该长啥样。

场景:即常见的曝光日志流(show_log)通过 log_id 关联点击日志流(click_log),将数据的关联结果进行下发。

来一波输入数据:

曝光数据:

log_id timestamp show_params
1 2021-11-01 00:01:03 show_params
2 2021-11-01 00:03:00 show_params2
3 2021-11-01 00:05:00 show_params3

点击数据:

log_id timestamp click_params
1 2021-11-01 00:01:53 click_params
2 2021-11-01 00:02:01 click_params2

预期输出数据如下:

log_id timestamp show_params click_params
1 2021-11-01 00:01:00 show_params click_params
2 2021-11-01 00:01:00 show_params2 click_params2
3 2021-11-01 00:02:00 show_params3 null

上节的 flink sql regular join 解决方案如下:

INSERT INTO sink_tableSELECT    show_log.log_id as log_id,    show_log.timestamp as timestamp,    show_log.show_params as show_params,    click_log.click_params as click_paramsFROM show_logLEFT JOIN click_log ON show_log.log_id = click_log.log_id;

上节说道,flink sql left join 在流数据到达时,如果左表流(show_log)join 不到右表流(click_log) ,则不会等待右流直接输出(show_log,null),在后续右表流数据代打时,会将(show_log,null)撤回,发送(show_log,click_log)。这就是为什么产生了 retract 流,从而导致重复写入 kafka。

对此,我们也是提出了对应的解决思路,既然 left join 中左流不会等待右流,那么能不能让左流强行等待右流一段时间,实在等不到在数据关联不到的数据即可。

当当当!!!

本文的 flink sql interval join 登场,它就能等。

4.flink sql interval join

4.1.interval join 定义

大家先通过下面这句话和图简单了解一下 interval join 的作用(熟悉 DataStream 的小伙伴萌可能已经使用过了),后续会详细介绍原理。

interval join 就是用一个流的数据去关联另一个流的一段时间区间内的数据。关联到就下发关联到的数据,关联不到且在超时后就根据是否是 outer join(left join,right join,full join)下发没关联到的数据。

图片

<figcaption style="margin: 5px 0px 0px; padding: 0px; outline: 0px; max-width: 100%; box-sizing: border-box !important; overflow-wrap: break-word !important; text-align: center; color: rgb(136, 136, 136); font-size: 12px; font-family: PingFangSC-Light;">interval join</figcaption>

4.2.案例解决方案

来看看上述案例的 flink sql interval join sql 怎么写:

INSERT INTO sink_tableSELECT   
 show_log.log_id as log_id,    
show_log.timestamp as timestamp,    
show_log.show_params as show_params,   
 click_log.click_params as click_params
FROM show_log
 LEFT JOIN click_log 
ON show_log.log_id = click_log.log_id
AND show_log.row_time     
BETWEEN click_log.row_time - INTERVAL '10' MINUTE     
AND click_log.row_time + INTERVAL '10' MINUTE;

这里设置了 show_log.row_time BETWEEN click_log.row_time - INTERVAL '10' MINUTE AND click_log.row_time + INTERVAL '10' MINUTE代表 show_log 表中的数据会和 click_log 表中的 row_time 在前后 10 分钟之内的数据进行关联。

运行结果如下:

+[1 | 2021-11-01 00:01:03 | show_params | click_params]
+[2 | 2021-11-01 00:03:00 | show_params | click_params]
+[3 | 2021-11-01 00:05:00 | show_params | null]

如上就是我们期望的正确结果了。

flink web ui 算子图如下:

图片

<figcaption style="margin: 5px 0px 0px; padding: 0px; outline: 0px; max-width: 100%; box-sizing: border-box !important; overflow-wrap: break-word !important; text-align: center; color: rgb(136, 136, 136); font-size: 12px; font-family: PingFangSC-Light;">flink web ui</figcaption>

那么此时你可能有一个问题,结果中的前两条数据 join 到了输出我是理解的,那当 show_log join 不到 click_log 时为啥也输出了?原理是啥?

博主带你们来定位到具体的实现源码。先看一下 transformations。

图片

<figcaption style="margin: 5px 0px 0px; padding: 0px; outline: 0px; max-width: 100%; box-sizing: border-box !important; overflow-wrap: break-word !important; text-align: center; color: rgb(136, 136, 136); font-size: 12px; font-family: PingFangSC-Light;">transformations</figcaption>

可以看到事件时间下 interval join 的具体 operator 是 org.apache.flink.table.runtime.operators.join.KeyedCoProcessOperatorWithWatermarkDelay

其核心逻辑就集中在 processElement1processElement2 中,在 processElement1processElement2 中使用 org.apache.flink.table.runtime.operators.join.interval.RowTimeIntervalJoin 来处理具体 join 逻辑。RowTimeIntervalJoin 重要方法如下图所示。

图片

TimeIntervalJoin

下面详细给大家解释一下。

4.3.TimeIntervalJoin 简版说明

join 时,左流和右流会在 interval 时间之内相互等待,如果等到了则输出数据[+(show_log,click_log)],如果等不到,并且另一条流的时间已经推进到当前这条数据在也不可能 join 到另一条流的数据时,则直接输出[+(show_log,null)],[+(null,click_log)]。

举个例子,show_log.row_time BETWEEN click_log.row_time - INTERVAL '10' MINUTE AND click_log.row_time + INTERVAL '10' MINUTE, 当 click_log 的时间推进到 2021-11-01 11:00:00 时,这时 show_log 来一条 2021-11-01 02:00:00 的数据, 那这条 show_log 必然不可能和 click_log 中的数据 join 到了,因为 click_log 中 2021-11-01 01:50:002021-11-01 02:10:00 之间的数据以及过期删除了。则 show_log 直接输出 [+(show_log,null)]

Notes:

如果你设置了 allowLateness,join 不到的数据的输出和 state 的清理会多保留 allowLateness 时间

4.4.TimeIntervalJoin 详细实现说明

以上面案例的 show_log(左表) interval join click_log(右表) 为例(不管是 inner interval join,left interval join,right interval join 还是 full interval join,都会按照下面的流程执行):

  1. 第一步,首先如果 join xxx on 中的条件是等式则代表 join 是在相同 key 下进行的(上述案例中 join 的 key 即 show_log.log_id,click_log.log_id),相同 key 的数据会被发送到一个并发中进行处理。如果 join xxx on 中的条件是不等式,则两个流的 source 算子向 join 算子下发数据是按照 global 的 partition 策略进行下发的,并且 join 算子并发会被设置为 1,所有的数据会被发送到这一个并发中处理。
  2. 第二步,相同 key 下,一条 show_log 的数据先到达,首先会计算出下面要使用的最重要的三类时间戳:
  • 根据 show_log 的时间戳(l_time)计算出能关联到的右流的时间区间下限(r_lower)、上限(r_upper)
  • 根据 show_log 目前的 watermark 计算出目前右流的数据能够过期做过期处理的时间的最小值(r_expire)
  • 获取左流的 l_watermark,右流的 r_watermark,这两个时间戳在事件语义的任务中都是 watermark
  1. 第三步,遍历所有同 key 下的 click_log 来做 join
  • 对于遍历的每一条 click_log,走如下步骤
  • 经过判断,如果 on 中的条件为 true,则和 click_log 关联,输出[+(show_log,click_log)]数据;如果 on 中的条件为 false,则啥也不干
  • 接着判断当前这条 click_log 的数据时间(r_time)是否小于右流的数据过期时间的最小值(r_expire)(即判断这条 click_log 是否永远不会再被 show_log join 到了)。如果小于,并且当前 click_log 这一侧是 outer join,则不用等直接输出[+(null,click_log)]),从状态删除这条 click_log;如果 click_log 这一侧不是 outer join,则直接从状态里删除这条 click_log。
  1. 第四步,判断右流的时间戳(r_watermark)是否小于能关联到的右流的时间区间上限(r_upper):
  • 如果是,则说明这条 show_log 还有可能被 click_log join 到,则 show_log 放到 state 中,并注册后面用于状态清除的 timer。
  • 如果否,则说明关联不到了,则输出[+(show_log,null)]
  1. 第五步,timer 触发时:
  • timer 触发时,根据当前 l_watermark,r_watermark 以及 state 中存储的 show_log,click_log 的 l_time,r_time 判断是否再也不会被对方 join 到,如果是,则根据是否为 outer join 对应输出[+(show_log,null)],[+(null,click_log)],并从状态中删除对应的 show_log,click_log。

上面只是左流 show_log 数据到达时的执行流程(即 ProcessElement1),当右流 click_log 到达时也是完全类似的执行流程(即 ProcessElement2)。

4.5.使用注意事项

小伙伴萌在使用 interval join 需要注意的两点事项:

  1. interval join 的时间区间取决于日志的真实情况:设置大了容易造成任务的 state 太大,并且时效性也会变差。设置小了,join 不到,下发的数据在后续使用时,数据质量会存在问题。所以小伙伴萌在使用时建议先使用离线数据做一遍两条流的时间戳 diff 比较,来确定真实情况下的时间戳 diff 的分布是怎样的。举例:你通过离线数据 join 并做时间戳 diff 后发现 99% 的数据都能在时间戳相差 5min 以内 join 到,那么你就有依据去设置 interval 时间差为 5min。
  2. interval join 中的时间区间条件即支持事件时间,也支持处理时间。事件时间由 watermark 推进。

5.总结与展望

本文主要介绍了 flink sql interval 是怎么避免出现 flink regular join 存在的 retract 问题的,并通过解析其实现说明了运行原理,博主期望你读完本文之后能了解到:

  1. 背景及应用场景介绍:博主期望你能了解到,flink sql 提供的丰富的 join 方式(总结 6 种:regular join,维表 join,快照 join,interval join,array 拍平,table function)对我们满足需求提供了强大的后盾, 这 6 种 join 中涉及到流与流的 join 最常用的是 regular join 以及 interval join,本节主要介绍 interval join
  2. 来一个实战案例:博主以上节说到的曝光日志流点击日志流为案例展开,主要是想告诉小伙伴 flink sql left join 数据不会互相等待,存在 retract 问题,会导致写入 kafka 的数据量变大, 然后转变思路为使用 flink sql interval join 的方式可以使得数据互相等待一段时间进行 join,这种方式不会存在 retract 问题
  3. flink sql interval join 的解决方案以及原理的介绍:主要介绍 interval join 的在上述实战案例的运行结果及分析源码机制,博主期望你能了解到,interval join 的执行机制是会在你设置的 interval 区间之内互相等待一段时间,一旦时间推进(事件时间由 watermark 推进)到区间之外(即当前这条数据再也不可能被另一条流的数据 join 到时),outer join 会输出没有 join 到的数据,inner join 会从 state 中删除这条数据
  4. 总结及展望
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,080评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,422评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,630评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,554评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,662评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,856评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,014评论 3 408
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,752评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,212评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,541评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,687评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,347评论 4 331
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,973评论 3 315
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,777评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,006评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,406评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,576评论 2 349

推荐阅读更多精彩内容