flink维表关联系列之维表服务与Flink异步IO

维表关联系列目录:
一、维表服务与Flink异步IO
二、Mysql维表关联:全量加载
三、Hbase维表关联:LRU策略
四、Redis维表关联:实时查询
五、kafka维表关联:广播方式
六、自定义异步查询

一、维表服务

维度或者是维表概念熟知应该从数据仓库维度建模开始了解的,区别于事实表业务真实发生的数据,通常用来表示业务属性,比喻订单业务中,商品属性、商家属性都可以称之为维度表。在flink 流处理实时分析中或者实时数仓中,同样需要使用维表来完成一些数据过滤或者字段补齐操作,但是我们所需要的维度数据通常存储在Mysql/Redis/Hbase/Es这样的外部数据库中,并且可能是会随时变动的,根据业务要求数据的时效性,需要不同程度的感知维表数据的变化,在实际使用中常常会有以下几种方案可供选择:

  1. 在维度数据量比较小并且业务要求的时效性不高,可以定时全量加载维度数据到内存中,直接从内存中查询维度数据;

  2. 在维度数据量比较大并且业务要求的时效性不高,这时候全量加载就会撑爆内存,可以使用LRU的缓存策略,当缓存的维度数据达到一定大小,采用淘汰最近最少使用的数据,同时还可以设置数据的过期时间;

  3. 业务要求数据时效性比较高,那么就需要flink实时查询,这个时候需要注意外部存储所能承受的QPS;

  4. 最后一种方案直接将维度数据发送到kafka中,flink任务消费kafka的维度数据,然后使用广播方式将维度数据广播到每一个处理task中,这种方式同样要求数据量比较小

二、Flink 异步IO

flink异步IO用于对外部访问的一种优化手段,可参考http://wuchong.me/blog/2017/05/17/flink-internals-async-io 阿里云邪大牛对flink 异步IO的介绍,里面详细介绍了异步IO相对于同步处理的性能优化与有序、无序原理实现,在这里分析一些源码帮助理解。

1.  `@Override`

2.  `public  void processElement(StreamRecord<IN> element)  throws  Exception  {`

3.  `final  StreamRecordQueueEntry<OUT> streamRecordBufferEntry =  new  StreamRecordQueueEntry<>(element);`

4.  `if  (timeout >  0L)  {`

5.  `// register a timeout for this AsyncStreamRecordBufferEntry`

6.  `long timeoutTimestamp = timeout + getProcessingTimeService().getCurrentProcessingTime();`

8.  `final  ScheduledFuture<?> timerFuture = getProcessingTimeService().registerTimer(`

9.  `timeoutTimestamp,`

10.  `new  ProcessingTimeCallback()  {`

11.  `@Override`

12.  `public  void onProcessingTime(long timestamp)  throws  Exception  {`

13.  `userFunction.timeout(element.getValue(), streamRecordBufferEntry);`

14.  `}`

15.  `});`

16.  `// Cancel the timer once we've completed the stream record buffer entry. This will remove`

17.  `// the register trigger task`

18.  `streamRecordBufferEntry.onComplete(`

19.  `(StreamElementQueueEntry<Collection<OUT>> value)  ->  {`

20.  `timerFuture.cancel(true);`

21.  `},`

22.  `executor);`

23.  `}`

24.  `addAsyncBufferEntry(streamRecordBufferEntry);`

25.  `userFunction.asyncInvoke(element.getValue(), streamRecordBufferEntry);`

26.  `}`

代码入口是AsyncWaitOperator算子processElement 方法,表示处理元素方法,每个处理的元素都会被封装成为StreamRecordQueueEntry对象,该对象会被放入内部有序或者无序的队列中,Emitter则负责从队列里面取数据,那么如何判断已经进入的元素已经完成异步IO操作了呢?答案就在StreamRecordQueueEntry里面:

  1. StreamRecordQueueEntry持有CompletableFuture对象,CompletableFuture是java8 提供了一个更强大的异步调用处理类,提供了异步获取结果无需阻塞、多阶段关联异步调用。具体用法可参考https://www.cnblogs.com/cjsblog/p/9267163.html

  2. StreamRecordQueueEntry对象添加到队列的同时执行其onComplete方法,内部调用的是CompletableFuture的onComplete,表示在完成异步IO的回调方法,回调方法是一个信号灯释放操作,会通知Emitter可以从队列中读取数据了

  3. StreamRecordQueueEntry对象会被作为AsyncFunction函数的asyncInvoke方法的入参,在这个方法里面需要使用外部存储异步客户端或者使用线程池中执行作为异步客户端去查询数据并且调用其complete方法,实际上也就是调用StreamRecordQueueEntry对象中complete方法,那么就会触发之前注册的onComplete回调方法完成后续操作

在AsyncFunction函数中还有一个timeout方法,在异步调用超时的情况下会被触发。接下来看下其实现原理:

  1. 在processElement方法里面timeout>0的逻辑里面,通过flink提供的定时机制注册了一个ProcessingTimeCallback的回调,那么在超过timout时间就会调用其onProcessingTime方法,在onProcessingTime方法中会调用AsyncFunction中timeout方法

  2. AsyncFunction中timeout方法中调用了ResultFuture对象(实际上就是StreamRecordQueueEntry对象)中CompletableFuture的completeExceptionally方法,那么检测到该CompletableFuture还是处于uncomplete的状态就会抛出异常

  3. 在timeout>0的逻辑里面还有一个调用StreamRecordQueueEntry对象的onComplete回调方法,在其CompletableFuture完成时会调用cancel 取消超时回调。

在AsyncFunction函数中默认timeout方法仅仅是会抛出Async function call has timed out.异常,我们也可以重写该方法,获取更多的信息。

image
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,185评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,445评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,684评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,564评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,681评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,874评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,025评论 3 408
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,761评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,217评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,545评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,694评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,351评论 4 332
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,988评论 3 315
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,778评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,007评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,427评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,580评论 2 349

推荐阅读更多精彩内容

  • title: 论事件驱动与异步IOtag: 事件驱动 异步IOcategories: notes 转载自人云思云 ...
    cmustard阅读 959评论 0 0
  • 架构 Apache Flink是一个框架和分布式处理引擎,用于对无界和有界数据流进行有状态计算。Flink设计为在...
    盗梦者_56f2阅读 37,789评论 0 6
  • 前言 前端工程师因为需要操纵Ajax(Ajax的A就是Asynchronous的意思),因此,是最了解异步IO的人...
    白昔月阅读 3,986评论 1 8
  • 花海 彼岸路 满眼血红 奈何桥 孟婆处 “为什么还是忘不掉,请再给我一碗汤吧。” 面巾下一声叹息,又是一碗清汤。 ...
    Han夜阅读 160评论 0 0
  • 打从记事起,就觉得我对物的揣摩,强于对人性的参透。比如,我很小就听得出生完蛋的鸡与没生蛋的鸡叫声不同,却怎么也猜不...
    鹤舞松泉阅读 465评论 0 4