维表关联系列目录:
一、维表服务与Flink异步IO
二、Mysql维表关联:全量加载
三、Hbase维表关联:LRU策略
四、Redis维表关联:实时查询
五、kafka维表关联:广播方式
六、自定义异步查询
一、维表服务
维度或者是维表概念熟知应该从数据仓库维度建模开始了解的,区别于事实表业务真实发生的数据,通常用来表示业务属性,比喻订单业务中,商品属性、商家属性都可以称之为维度表。在flink 流处理实时分析中或者实时数仓中,同样需要使用维表来完成一些数据过滤或者字段补齐操作,但是我们所需要的维度数据通常存储在Mysql/Redis/Hbase/Es这样的外部数据库中,并且可能是会随时变动的,根据业务要求数据的时效性,需要不同程度的感知维表数据的变化,在实际使用中常常会有以下几种方案可供选择:
在维度数据量比较小并且业务要求的时效性不高,可以定时全量加载维度数据到内存中,直接从内存中查询维度数据;
在维度数据量比较大并且业务要求的时效性不高,这时候全量加载就会撑爆内存,可以使用LRU的缓存策略,当缓存的维度数据达到一定大小,采用淘汰最近最少使用的数据,同时还可以设置数据的过期时间;
业务要求数据时效性比较高,那么就需要flink实时查询,这个时候需要注意外部存储所能承受的QPS;
最后一种方案直接将维度数据发送到kafka中,flink任务消费kafka的维度数据,然后使用广播方式将维度数据广播到每一个处理task中,这种方式同样要求数据量比较小
二、Flink 异步IO
flink异步IO用于对外部访问的一种优化手段,可参考http://wuchong.me/blog/2017/05/17/flink-internals-async-io 阿里云邪大牛对flink 异步IO的介绍,里面详细介绍了异步IO相对于同步处理的性能优化与有序、无序原理实现,在这里分析一些源码帮助理解。
1. `@Override`
2. `public void processElement(StreamRecord<IN> element) throws Exception {`
3. `final StreamRecordQueueEntry<OUT> streamRecordBufferEntry = new StreamRecordQueueEntry<>(element);`
4. `if (timeout > 0L) {`
5. `// register a timeout for this AsyncStreamRecordBufferEntry`
6. `long timeoutTimestamp = timeout + getProcessingTimeService().getCurrentProcessingTime();`
8. `final ScheduledFuture<?> timerFuture = getProcessingTimeService().registerTimer(`
9. `timeoutTimestamp,`
10. `new ProcessingTimeCallback() {`
11. `@Override`
12. `public void onProcessingTime(long timestamp) throws Exception {`
13. `userFunction.timeout(element.getValue(), streamRecordBufferEntry);`
14. `}`
15. `});`
16. `// Cancel the timer once we've completed the stream record buffer entry. This will remove`
17. `// the register trigger task`
18. `streamRecordBufferEntry.onComplete(`
19. `(StreamElementQueueEntry<Collection<OUT>> value) -> {`
20. `timerFuture.cancel(true);`
21. `},`
22. `executor);`
23. `}`
24. `addAsyncBufferEntry(streamRecordBufferEntry);`
25. `userFunction.asyncInvoke(element.getValue(), streamRecordBufferEntry);`
26. `}`
代码入口是AsyncWaitOperator算子processElement 方法,表示处理元素方法,每个处理的元素都会被封装成为StreamRecordQueueEntry对象,该对象会被放入内部有序或者无序的队列中,Emitter则负责从队列里面取数据,那么如何判断已经进入的元素已经完成异步IO操作了呢?答案就在StreamRecordQueueEntry里面:
StreamRecordQueueEntry持有CompletableFuture对象,CompletableFuture是java8 提供了一个更强大的异步调用处理类,提供了异步获取结果无需阻塞、多阶段关联异步调用。具体用法可参考https://www.cnblogs.com/cjsblog/p/9267163.html
StreamRecordQueueEntry对象添加到队列的同时执行其onComplete方法,内部调用的是CompletableFuture的onComplete,表示在完成异步IO的回调方法,回调方法是一个信号灯释放操作,会通知Emitter可以从队列中读取数据了
StreamRecordQueueEntry对象会被作为AsyncFunction函数的asyncInvoke方法的入参,在这个方法里面需要使用外部存储异步客户端或者使用线程池中执行作为异步客户端去查询数据并且调用其complete方法,实际上也就是调用StreamRecordQueueEntry对象中complete方法,那么就会触发之前注册的onComplete回调方法完成后续操作
在AsyncFunction函数中还有一个timeout方法,在异步调用超时的情况下会被触发。接下来看下其实现原理:
在processElement方法里面timeout>0的逻辑里面,通过flink提供的定时机制注册了一个ProcessingTimeCallback的回调,那么在超过timout时间就会调用其onProcessingTime方法,在onProcessingTime方法中会调用AsyncFunction中timeout方法
AsyncFunction中timeout方法中调用了ResultFuture对象(实际上就是StreamRecordQueueEntry对象)中CompletableFuture的completeExceptionally方法,那么检测到该CompletableFuture还是处于uncomplete的状态就会抛出异常
在timeout>0的逻辑里面还有一个调用StreamRecordQueueEntry对象的onComplete回调方法,在其CompletableFuture完成时会调用cancel 取消超时回调。
在AsyncFunction函数中默认timeout方法仅仅是会抛出Async function call has timed out.异常,我们也可以重写该方法,获取更多的信息。