维表join

kafka

topic为order中数据格式

{"order_id":"order_01","order_no":"0001","date_create":"1584497993601","date_update":"1584497993601"}
{"order_id":"order_02","order_no":"0002","date_create":"1584497994602","date_update":"1584497994602"}
{"order_id":"order_03","order_no":"0003","date_create":"1584497995603","date_update":"1584497995603"}
{"order_id":"order_04","order_no":"0004","date_create":"1584497997604","date_update":"1584497997604"}
{"order_id":"order_05","order_no":"0005","date_create":"1584497998605","date_update":"1584497998605"}

mysql

order_detail表ddl

CREATE TABLE `order_detail` (
  `id` varchar(11) NOT NULL,
  `pay` int(11) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

order_detail表数据

mysql> select * from order_detail;
+------+------+
| id   | pay  |
+------+------+
| 0001 |   10 |
| 0002 |   20 |
| 0003 |   30 |
| 0004 |   40 |
| 0005 |   50 |
+------+------+
5 rows in set (0.00 sec)

join

流表:创建data source去消费kafka中topic数据(此处创建了流对应的视图,简化后边join操作的sql书写。sql写法和阿里云的写法完全一致)
此处kafka地址为虚拟机集群地址(三台节点的集群)

-- kafka数据流
CREATE TABLE order (
    messageKey varbinary,
    message varbinary,
    topic varchar,
    `partition` int,
    `offset` bigint
) WITH (
    type = 'kafka010',
    topic = 'order',
    bootstrap.servers = 'xxx.xxx.xxx.xxx:9092,xxx.xxx.xxx.xxx:9092,xxx.xxx.xxx.xxx:9092',
    `group.id` = 'local',
    startupMode = 'TIMESTAMP',
    starttime = '2020-03-27 00:00:00'
);

-- 数据流对应的视图
CREATE VIEW view_order AS
SELECT JSON_VALUE(CAST(t1.`message` AS VARCHAR), '$.order_id')      AS `order_id`
     , JSON_VALUE(CAST(t1.`message` AS VARCHAR), '$.order_no')      AS `order_no`
     , JSON_VALUE(CAST(t1.`message` AS VARCHAR), '$.date_create')   AS `date_create`
     , JSON_VALUE(CAST(t1.`message` AS VARCHAR), '$.date_update')   AS `date_update`
FROM order t1;

维表:创建mysql维表

CREATE TABLE order_detail (
    id VARCHAR,
    pay INT,
    PRIMARY KEY(id)
) WITH (
    type = 'mysql',
    dburl = 'jdbc:mysql://xxx.xxx.xxx.xxx:3306/big_data?useUnicode=true&characterEncoding=UTF-8',
    drivername = 'com.mysql.jdbc.Driver',
    tablename = 'order_detail',
    username = 'root',
    password = '123456'
);

流表join维表,并打印输出信息

-- 流join维表
SELECT t.`order_no`
     , w.`pay`
     , t.`date_create`
     , t.`date_update`
FROM view_order AS t
JOIN order_detail FOR SYSTEM_TIME AS OF PROCTIME() AS w -- 维表JOIN时必须指定此声明
    ON t.`order_no` = w.`id`;                           -- join的条件必须包含维表的主键
out_put.png

注意事项:

  1. 流表join维表时,流表是驱动表,并且只能够join上当前维表的数据。如,当前时刻为10:00,流表只能够join上维表在10:00时刻的快照数据(快照数据是我自己这么定义的,方便理解):10:00这个时刻,流表有数据A,但是维表无对应数据a,则数据A,a不会被join到;在10:01这个时刻,维表中有了对应数据a,但此时也无法join到,因为流表数据A早就在10:00被处理只是处理结果为未join到
  2. mysql服务需要开启允许远程服务,否则flink程序无法消费到mysql表中数据创建成一张维表。并报下边错误
mysql_remote_access_error.png
  1. 声明一个维表时,必须要指名主键。维表JOIN的时候,ON的条件必须包含所有主键的等值条件
1.维表支持INNER JOIN和LEFT JOIN,不支持RIGHT JOIN或FULL JOIN
2.必须加上FOR SYSTEM_TIME AS OF PROCTIME(),表示JOIN维表当前时刻所看到的每条数据。
3.JOIN行为只发生在处理时间(processing time),即使维表中的数据新增、更新或删除,已关联的数据也不会被改变或撤回。
4.ON条件必须包含维表PRIMARY KEY的等值条件(且要求与真实表定义一致),但除此之外,ON条件中也可以有其他等值条件。
维表和维表不能进行JOIN。

问题和解决方案

1.flink任务进行维表join操作时,会将当前维表的数据都加载到flink内部(内存或磁盘)。那么当维表的数据十分庞大时,维表全量数据都加载到flink内部可能导致flink任务无法进行或者join操作十分缓慢。(解决方案:使用LRU最少使用缓存淘汰算法。能够保证flink内部保存的维表数据是最新和使用最多的数据,旧数据或者未被使用到的数据都会被淘汰,大大减少flink缓存压力)
更多解决方案见 https://blog.csdn.net/aA518189/article/details/102833324

 CREATE TABLE tableName(
     colName cloType,
     ...
     PRIMARY KEY(keyInfo),
     PERIOD FOR SYSTEM_TIME
  )WITH(
     type='mysql',
     url='jdbcUrl',
     userName='dbUserName',
     password='dbPwd',
     tableName='tableName',
    --------------LRU最少使用缓存淘汰算法----------------
     cache ='LRU',
     cacheSize ='10000',
     cacheTTLMs ='60000',
    -------------------------------------------------
     parallelism ='1',
     partitionedJoin='false'
  );
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容