flink维表关联系列之维表服务与Flink异步IO

维表关联系列目录:
一、维表服务与Flink异步IO
二、Mysql维表关联:全量加载
三、Hbase维表关联:LRU策略
四、Redis维表关联:实时查询
五、kafka维表关联:广播方式
六、自定义异步查询

一、维表服务

维度或者是维表概念熟知应该从数据仓库维度建模开始了解的,区别于事实表业务真实发生的数据,通常用来表示业务属性,比喻订单业务中,商品属性、商家属性都可以称之为维度表。在flink 流处理实时分析中或者实时数仓中,同样需要使用维表来完成一些数据过滤或者字段补齐操作,但是我们所需要的维度数据通常存储在Mysql/Redis/Hbase/Es这样的外部数据库中,并且可能是会随时变动的,根据业务要求数据的时效性,需要不同程度的感知维表数据的变化,在实际使用中常常会有以下几种方案可供选择:

  1. 在维度数据量比较小并且业务要求的时效性不高,可以定时全量加载维度数据到内存中,直接从内存中查询维度数据;

  2. 在维度数据量比较大并且业务要求的时效性不高,这时候全量加载就会撑爆内存,可以使用LRU的缓存策略,当缓存的维度数据达到一定大小,采用淘汰最近最少使用的数据,同时还可以设置数据的过期时间;

  3. 业务要求数据时效性比较高,那么就需要flink实时查询,这个时候需要注意外部存储所能承受的QPS;

  4. 最后一种方案直接将维度数据发送到kafka中,flink任务消费kafka的维度数据,然后使用广播方式将维度数据广播到每一个处理task中,这种方式同样要求数据量比较小

二、Flink 异步IO

flink异步IO用于对外部访问的一种优化手段,可参考http://wuchong.me/blog/2017/05/17/flink-internals-async-io 阿里云邪大牛对flink 异步IO的介绍,里面详细介绍了异步IO相对于同步处理的性能优化与有序、无序原理实现,在这里分析一些源码帮助理解。

1.  `@Override`

2.  `public  void processElement(StreamRecord<IN> element)  throws  Exception  {`

3.  `final  StreamRecordQueueEntry<OUT> streamRecordBufferEntry =  new  StreamRecordQueueEntry<>(element);`

4.  `if  (timeout >  0L)  {`

5.  `// register a timeout for this AsyncStreamRecordBufferEntry`

6.  `long timeoutTimestamp = timeout + getProcessingTimeService().getCurrentProcessingTime();`

8.  `final  ScheduledFuture<?> timerFuture = getProcessingTimeService().registerTimer(`

9.  `timeoutTimestamp,`

10.  `new  ProcessingTimeCallback()  {`

11.  `@Override`

12.  `public  void onProcessingTime(long timestamp)  throws  Exception  {`

13.  `userFunction.timeout(element.getValue(), streamRecordBufferEntry);`

14.  `}`

15.  `});`

16.  `// Cancel the timer once we've completed the stream record buffer entry. This will remove`

17.  `// the register trigger task`

18.  `streamRecordBufferEntry.onComplete(`

19.  `(StreamElementQueueEntry<Collection<OUT>> value)  ->  {`

20.  `timerFuture.cancel(true);`

21.  `},`

22.  `executor);`

23.  `}`

24.  `addAsyncBufferEntry(streamRecordBufferEntry);`

25.  `userFunction.asyncInvoke(element.getValue(), streamRecordBufferEntry);`

26.  `}`

代码入口是AsyncWaitOperator算子processElement 方法,表示处理元素方法,每个处理的元素都会被封装成为StreamRecordQueueEntry对象,该对象会被放入内部有序或者无序的队列中,Emitter则负责从队列里面取数据,那么如何判断已经进入的元素已经完成异步IO操作了呢?答案就在StreamRecordQueueEntry里面:

  1. StreamRecordQueueEntry持有CompletableFuture对象,CompletableFuture是java8 提供了一个更强大的异步调用处理类,提供了异步获取结果无需阻塞、多阶段关联异步调用。具体用法可参考https://www.cnblogs.com/cjsblog/p/9267163.html

  2. StreamRecordQueueEntry对象添加到队列的同时执行其onComplete方法,内部调用的是CompletableFuture的onComplete,表示在完成异步IO的回调方法,回调方法是一个信号灯释放操作,会通知Emitter可以从队列中读取数据了

  3. StreamRecordQueueEntry对象会被作为AsyncFunction函数的asyncInvoke方法的入参,在这个方法里面需要使用外部存储异步客户端或者使用线程池中执行作为异步客户端去查询数据并且调用其complete方法,实际上也就是调用StreamRecordQueueEntry对象中complete方法,那么就会触发之前注册的onComplete回调方法完成后续操作

在AsyncFunction函数中还有一个timeout方法,在异步调用超时的情况下会被触发。接下来看下其实现原理:

  1. 在processElement方法里面timeout>0的逻辑里面,通过flink提供的定时机制注册了一个ProcessingTimeCallback的回调,那么在超过timout时间就会调用其onProcessingTime方法,在onProcessingTime方法中会调用AsyncFunction中timeout方法

  2. AsyncFunction中timeout方法中调用了ResultFuture对象(实际上就是StreamRecordQueueEntry对象)中CompletableFuture的completeExceptionally方法,那么检测到该CompletableFuture还是处于uncomplete的状态就会抛出异常

  3. 在timeout>0的逻辑里面还有一个调用StreamRecordQueueEntry对象的onComplete回调方法,在其CompletableFuture完成时会调用cancel 取消超时回调。

在AsyncFunction函数中默认timeout方法仅仅是会抛出Async function call has timed out.异常,我们也可以重写该方法,获取更多的信息。

image
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

  • title: 论事件驱动与异步IOtag: 事件驱动 异步IOcategories: notes 转载自人云思云 ...
    cmustard阅读 978评论 0 0
  • 架构 Apache Flink是一个框架和分布式处理引擎,用于对无界和有界数据流进行有状态计算。Flink设计为在...
    盗梦者_56f2阅读 37,863评论 0 6
  • 前言 前端工程师因为需要操纵Ajax(Ajax的A就是Asynchronous的意思),因此,是最了解异步IO的人...
    白昔月阅读 4,022评论 1 8
  • 花海 彼岸路 满眼血红 奈何桥 孟婆处 “为什么还是忘不掉,请再给我一碗汤吧。” 面巾下一声叹息,又是一碗清汤。 ...
    Han夜阅读 169评论 0 0
  • 打从记事起,就觉得我对物的揣摩,强于对人性的参透。比如,我很小就听得出生完蛋的鸡与没生蛋的鸡叫声不同,却怎么也猜不...
    鹤舞松泉阅读 475评论 0 4