详解flink中Look up维表的使用

背景

在流式计算中,维表是一个很常见的概念,一般用于sql的join中,对流式数据进行数据补全,比如我们的source stream是来自日志的订单数据,但是日志中我们只是记录了订单商品的id,并没有其他的信息,但是我们把数据存入数仓进行数据分析的时候,却需要商品名称、价格等等其他的信息,这种问题我们可以在进行流处理的时候通过查询维表的方式对数据进行数据补全。

维表一般存储在外部存储中,比如mysql、hbase、redis等等,今天我们以mysql为例,讲讲flink中维表的使用。

LookupableTableSource

在flink中提供了一个LookupableTableSource,可以用于实现维表,也就是我们可以通过某几个key列去查询外部存储来获取相关的信息用于补全stream的数据。

public interface LookupableTableSource<T> extends TableSource<T> {

    TableFunction<T> getLookupFunction(String[] lookupKeys);

    AsyncTableFunction<T> getAsyncLookupFunction(String[] lookupKeys);

    boolean isAsyncEnabled();
}

我们看到,LookupableTableSource有三个方法

  • getLookupFunction:用于同步查询维表的数据,返回一个TableFunction,所以本质上还是通过用户自定义 UDTF来实现的。
  • getAsyncLookupFunction:用于异步查询维表的数据,该方法返回一个对象
  • isAsyncEnabled:默认情况下是同步查询,如果要开启异步查询,这个方法需要返回true

在flink里,我们看到实现了这个接口的主要有四个类,JdbcTableSource,HBaseTableSource,CsvTableSource,HiveTableSource,今天我们主要以jdbc为例讲讲如何进行维表查询。

实例讲解

接下来我们讲一个小例子,首先定义一下stream source,我们使用flink 1.11提供的datagen来生成数据。

我们来模拟生成用户的数据,这里只生成的用户的id,范围在1-100之间。

CREATE TABLE datagen (
 userid int,
 proctime as PROCTIME()
) WITH (
 'connector' = 'datagen',
 'rows-per-second'='100',
 'fields.userid.kind'='random',
 'fields.userid.min'='1',
 'fields.userid.max'='100'
)

datagen具体的使用方法可以参考:

聊聊flink 1.11 中的随机数据生成器-DataGen connector

然后再创建一个mysql维表信息:

CREATE TABLE dim_mysql (
  id int,
  name STRING,
  PRIMARY KEY (id) NOT ENFORCED
) WITH (
   'connector' = 'jdbc',
   'url' = 'jdbc:mysql://localhost:3306/test',
   'table-name' = 'userinfo',
   'username' = 'root',
   'password' = 'root'
)

我们这个mysql表中样例数据如下:

image

最后执行sql查询,流表关联维表:

SELECT * FROM datagen LEFT JOIN dim_mysql FOR SYSTEM_TIME AS OF datagen.proctime  ON datagen.userid = dim_mysql.id

结果示例如下:

3> 53,2020-09-03T07:19:34.565,null,null
3> 73,2020-09-03T07:19:34.566,null,null
1> 14,2020-09-03T07:19:34.566,14,aaddda
2> 11,2020-09-03T07:19:34.566,null,null
4> 8,2020-09-03T07:19:34.566,8,name8
1> 61,2020-09-03T07:19:34.567,null,null
3> 12,2020-09-03T07:19:34.567,12,aaa
2> 99,2020-09-03T07:19:34.567,null,null
4> 37,2020-09-03T07:19:34.568,null,null
2> 13,2020-09-03T07:19:34.569,13,aaddda
3> 6,2020-09-03T07:19:34.568,6,name6

我们看到对于维表中存在的数据,已经关联出来了,对于维表中没有的数据,显示为null

完整代码请参考: https://github.com/zhangjun0x01/bigdata-examples/blob/master/flink/src/main/java/dimension/JdbcDim.java

源码解析

JdbcTableSource

以jdbc为例,我们来看看flink底层是怎么做的。

JdbcTableSource#isAsyncEnabled方法返回的是false,也就是不支持异步的查询,所以进入JdbcTableSource#getLookupFunction方法。

    @Override
    public TableFunction<Row> getLookupFunction(String[] lookupKeys) {
        final RowTypeInfo rowTypeInfo = (RowTypeInfo) fromDataTypeToLegacyInfo(producedDataType);
        return JdbcLookupFunction.builder()
                .setOptions(options)
                .setLookupOptions(lookupOptions)
                .setFieldTypes(rowTypeInfo.getFieldTypes())
                .setFieldNames(rowTypeInfo.getFieldNames())
                .setKeyNames(lookupKeys)
                .build();
    }

最终是构造了一个JdbcLookupFunction对象,

  • options是连接jdbc的一些参数,比如user、pass、url等。
  • lookupOptions是一些有关维表的参数,主要是缓存的大小、超时时间等。
  • lookupKeys也就是要去关联查询维表的字段。

JdbcLookupFunction

所以我们来看看JdbcLookupFunction类,这个JdbcLookupFunction是一个TableFunction的子类,具体的TableFunction的使用可以参考这个文章:

Flink实战教程-自定义函数之TableFunction

一个TableFunction最核心的就是eval方法,在这个方法里,做的主要的工作就是通过传进来的多个keys拼接成sql去来查询数据,首先查询的是缓存,缓存有数据就直接返回,缓存没有的话再去查询数据库,然后再将查询的结果返回并放入缓存,下次查询的时候直接查询缓存。

为什么要加一个缓存呢?默认情况下是不开启缓存的,每来一个查询,都会给维表发送一个请求去查询,如果数据量比较大的话,势必会给存储维表的系统造成一定的压力,所以flink提供了一个LRU缓存,查询维表的时候,先查询缓存,缓存没有再去查询外部系统,但是如果有一个数据查询频率比较高,一直被命中,就无法获取新数据了。所以缓存还要加一个超时时间,过了这个时间,把这个数据强制删除,去外部系统查询新的数据。

具体的怎么开启缓存呢?我们看下JdbcLookupFunction#open方法

    @Override
    public void open(FunctionContext context) throws Exception {
        try {
            establishConnectionAndStatement();
            this.cache = cacheMaxSize == -1 || cacheExpireMs == -1 ? null : CacheBuilder.newBuilder()
                    .expireAfterWrite(cacheExpireMs, TimeUnit.MILLISECONDS)
                    .maximumSize(cacheMaxSize)
                    .build();
        } catch (SQLException sqe) {
            throw new IllegalArgumentException("open() failed.", sqe);
        } catch (ClassNotFoundException cnfe) {
            throw new IllegalArgumentException("JDBC driver class not found.", cnfe);
        }
    }

也就是说cacheMaxSize和cacheExpireMs需要同时设置,就会构造一个缓存对象cache来缓存数据.这两个参数对应的DDL的属性就是lookup.cache.max-rows和lookup.cache.ttl

对于具体的缓存的大小和超时时间的设置,用户需要根据自身的情况来自己定义,在数据的准确性和系统的吞吐量之间做一个权衡。

更多干货信息,欢迎关注我的公众号【大数据技术与应用实战】

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,222评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,455评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,720评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,568评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,696评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,879评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,028评论 3 409
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,773评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,220评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,550评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,697评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,360评论 4 332
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,002评论 3 315
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,782评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,010评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,433评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,587评论 2 350