kafka
topic为order中数据格式
{"order_id":"order_01","order_no":"0001","date_create":"1584497993601","date_update":"1584497993601"}
{"order_id":"order_02","order_no":"0002","date_create":"1584497994602","date_update":"1584497994602"}
{"order_id":"order_03","order_no":"0003","date_create":"1584497995603","date_update":"1584497995603"}
{"order_id":"order_04","order_no":"0004","date_create":"1584497997604","date_update":"1584497997604"}
{"order_id":"order_05","order_no":"0005","date_create":"1584497998605","date_update":"1584497998605"}
mysql
order_detail表ddl
CREATE TABLE `order_detail` (
`id` varchar(11) NOT NULL,
`pay` int(11) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
order_detail表数据
mysql> select * from order_detail;
+------+------+
| id | pay |
+------+------+
| 0001 | 10 |
| 0002 | 20 |
| 0003 | 30 |
| 0004 | 40 |
| 0005 | 50 |
+------+------+
5 rows in set (0.00 sec)
join
流表:创建data source去消费kafka中topic数据(此处创建了流对应的视图,简化后边join操作的sql书写。sql写法和阿里云的写法完全一致)
此处kafka地址为虚拟机集群地址(三台节点的集群)
-- kafka数据流
CREATE TABLE order (
messageKey varbinary,
message varbinary,
topic varchar,
`partition` int,
`offset` bigint
) WITH (
type = 'kafka010',
topic = 'order',
bootstrap.servers = 'xxx.xxx.xxx.xxx:9092,xxx.xxx.xxx.xxx:9092,xxx.xxx.xxx.xxx:9092',
`group.id` = 'local',
startupMode = 'TIMESTAMP',
starttime = '2020-03-27 00:00:00'
);
-- 数据流对应的视图
CREATE VIEW view_order AS
SELECT JSON_VALUE(CAST(t1.`message` AS VARCHAR), '$.order_id') AS `order_id`
, JSON_VALUE(CAST(t1.`message` AS VARCHAR), '$.order_no') AS `order_no`
, JSON_VALUE(CAST(t1.`message` AS VARCHAR), '$.date_create') AS `date_create`
, JSON_VALUE(CAST(t1.`message` AS VARCHAR), '$.date_update') AS `date_update`
FROM order t1;
维表:创建mysql维表
CREATE TABLE order_detail (
id VARCHAR,
pay INT,
PRIMARY KEY(id)
) WITH (
type = 'mysql',
dburl = 'jdbc:mysql://xxx.xxx.xxx.xxx:3306/big_data?useUnicode=true&characterEncoding=UTF-8',
drivername = 'com.mysql.jdbc.Driver',
tablename = 'order_detail',
username = 'root',
password = '123456'
);
流表join维表,并打印输出信息
-- 流join维表
SELECT t.`order_no`
, w.`pay`
, t.`date_create`
, t.`date_update`
FROM view_order AS t
JOIN order_detail FOR SYSTEM_TIME AS OF PROCTIME() AS w -- 维表JOIN时必须指定此声明
ON t.`order_no` = w.`id`; -- join的条件必须包含维表的主键
out_put.png
注意事项:
- 流表join维表时,流表是驱动表,并且只能够join上当前维表的数据。如,当前时刻为10:00,流表只能够join上维表在10:00时刻的快照数据(快照数据是我自己这么定义的,方便理解):10:00这个时刻,流表有数据A,但是维表无对应数据a,则数据A,a不会被join到;在10:01这个时刻,维表中有了对应数据a,但此时也无法join到,因为流表数据A早就在10:00被处理只是处理结果为未join到
- mysql服务需要开启允许远程服务,否则flink程序无法消费到mysql表中数据创建成一张维表。并报下边错误
mysql_remote_access_error.png
- 声明一个维表时,必须要指名主键。维表JOIN的时候,ON的条件必须包含所有主键的等值条件
1.维表支持INNER JOIN和LEFT JOIN,不支持RIGHT JOIN或FULL JOIN
2.必须加上FOR SYSTEM_TIME AS OF PROCTIME(),表示JOIN维表当前时刻所看到的每条数据。
3.JOIN行为只发生在处理时间(processing time),即使维表中的数据新增、更新或删除,已关联的数据也不会被改变或撤回。
4.ON条件必须包含维表PRIMARY KEY的等值条件(且要求与真实表定义一致),但除此之外,ON条件中也可以有其他等值条件。
维表和维表不能进行JOIN。
问题和解决方案
1.flink任务进行维表join操作时,会将当前维表的数据都加载到flink内部(内存或磁盘)。那么当维表的数据十分庞大时,维表全量数据都加载到flink内部可能导致flink任务无法进行或者join操作十分缓慢。(解决方案:使用LRU最少使用缓存淘汰算法。能够保证flink内部保存的维表数据是最新和使用最多的数据,旧数据或者未被使用到的数据都会被淘汰,大大减少flink缓存压力)
更多解决方案见 https://blog.csdn.net/aA518189/article/details/102833324
CREATE TABLE tableName(
colName cloType,
...
PRIMARY KEY(keyInfo),
PERIOD FOR SYSTEM_TIME
)WITH(
type='mysql',
url='jdbcUrl',
userName='dbUserName',
password='dbPwd',
tableName='tableName',
--------------LRU最少使用缓存淘汰算法----------------
cache ='LRU',
cacheSize ='10000',
cacheTTLMs ='60000',
-------------------------------------------------
parallelism ='1',
partitionedJoin='false'
);