Flink 双流Join
概述
- 在之前的Flink教程03里面给大家讲过了维表Join,今天来和大家分享一下双流Join
- 目前Flink双流Join分成两类:UnBounded Join 和 Time Interval Join
- 在有些场景下,用哪个都行,不过后者的性能会优于前者,而且如果在双流Join之后想要再进行窗口计算,那么只能使用Time Interval Join,目前的UnBounded Join后面是没有办法再进行Event Time的窗口计算
UnBounded Join
- 分为两种Join,一种是Inner Join,另一种是Outer Join
- Inner Join:双流Join最大的问题是两边的数据量是不一样的,会存在一条流中的数据已经到达,而另一条流中与之匹配的数据还未到达的情况,那么Flink是如何解决这个问题的呢?举个例子:假设左表先来了3条数据,Join 的key分别是1、2、3,右表中尚未有数据到达,那么Flink 会将左表的那三条数据缓存在Join节点的state中,同时不会有数据下发。此时,右表来了一条key是4的数据,未能与左表中的key关联上,那么这条数据同样也会被缓存在Join节点的state中。而当右表来了一条key为1的数据时,与左表中key为1的数据成功关联,那么此时,会将这两条数据Join起来之后的数据下发,而其他尚未匹配上的数据会在state中继续等待,直到他们的有缘人出现,才能够继续前进。
- Outer Join:支持
LEFT JOIN
、RIGHT JOIN
、FULL OUTER JOIN
三种语法,此处我们以LEFT JOIN
为例。还是左表先来三条数据,key分别是1、2、3,不过此时的结果会和上面的不一样,他们三个虽然还会在Join节点的state中缓存,但是会将数据下发,那么大家会问了,右边的数据怎么办,此时并没有Join成功啊,如果下发数据不就存在异常吗?答:Flink会将右边的数据补上NULL,当右表中key为1、2、3的数据出现时,会将刚才下发的三条数据撤回,将右表中的数据重新填充到下发的三条数据中,之后,再将这三条数据下发;而如果右表先到了,左表尚未到达的话,会一直等待,不会先行下发再撤回。RIGHT JOIN
与之相似,只是一个下发左边,一个下发右边;FULL OUTER JOIN
是两边都会下发和撤回。
- 缺点:
- 因为要存放大量的数据在state中,如果左右表的数据一直无法匹配,那么久而久之,内存很容易就被打爆。解决办法有加机器和使用RocksDBStateBackend,同时需要配上合理的状态清理配置,具体的写法可以自行翻看官网文档
- Join之前最好先根据主键去重,不然会缓存大量无用数据在Join节点的state节点中。举个栗子:key为1的数据因为各种原因出现了三条,而这三条实际上是同一条数据。那么,在Join时,如果右表只有一条key为1的数据,那么只会有一条数据下发(Inner Join)另外两条一直在死等;或者下发一条有右边数据的和两条右边数据为NULL的数据(Left Outer Join),同时,这两条数据也会在Join节点的state中缓存,等待右表的数据到达。同样也会打爆我们的内存。去重可以很好的减少Join节点内存的压力
- 假设现在有A、B、C三条流要进行JOIN,SQL写法为:
A LEFT JOIN B ON A.KEY1 = B.KEY1 LEFT JOIN C ON B.KEY2 = C.KEY2
,如果A与B Join的结果产生了大量B.KEY2为NULL的数据,那么在与C Join时,必然会出现热点问题。那么如何解决呢?我们可以交换Join的顺序,让B、C先行Join,产生的结果再与A流进行Join,这样就能很好的解决热点问题
- 下面我们通过代码和运行结果,来看看UnBounded Join的写法和产生结果
package FlinkSql;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
public class FlinkSql05 {
public static final String KAFKA_TABLE_SOURCE_DDL_01 = ""+
"CREATE TABLE t1 (\n" +
" user_id BIGINT,\n" +
" order_id BIGINT,\n" +
" ts BIGINT\n" +
") WITH (\n" +
" 'connector.type' = 'kafka', -- 指定连接类型是kafka\n" +
" 'connector.version' = '0.11', -- 与我们之前Docker安装的kafka版本要一致\n" +
" 'connector.topic' = 'unBoundedJoin01_t1', -- 之前创建的topic \n" +
" 'connector.properties.group.id' = 'flink-test-0', -- 消费者组,相关概念可自行百度\n" +
" 'connector.startup-mode' = 'latest-offset', --指定从最早消费\n" +
" 'connector.properties.zookeeper.connect' = 'localhost:2181', -- zk地址\n" +
" 'connector.properties.bootstrap.servers' = 'localhost:9092', -- broker地址\n" +
" 'format.type' = 'csv' -- csv格式,和topic中的消息格式保持一致\n" +
")";
public static final String KAFKA_TABLE_SOURCE_DDL_02 = ""+
"CREATE TABLE t2 (\n" +
" order_id BIGINT,\n" +
" item_id BIGINT,\n" +
" ts BIGINT\n" +
") WITH (\n" +
" 'connector.type' = 'kafka', -- 指定连接类型是kafka\n" +
" 'connector.version' = '0.11', -- 与我们之前Docker安装的kafka版本要一致\n" +
" 'connector.topic' = 'unBoundedJoin01_t2', -- 之前创建的topic \n" +
" 'connector.properties.group.id' = 'flink-test-0', -- 消费者组,相关概念可自行百度\n" +
" 'connector.startup-mode' = 'latest-offset', --指定从最早消费\n" +
" 'connector.properties.zookeeper.connect' = 'localhost:2181', -- zk地址\n" +
" 'connector.properties.bootstrap.servers' = 'localhost:9092', -- broker地址\n" +
" 'format.type' = 'csv' -- csv格式,和topic中的消息格式保持一致\n" +
")";
public static final String KAFKA_TABLE_SOURCE_DDL_03 = ""+
"CREATE TABLE t3 (\n" +
" user_id BIGINT,\n" +
" order_id BIGINT,\n" +
" ts BIGINT,\n" +
" r_t AS TO_TIMESTAMP(FROM_UNIXTIME(ts,'yyyy-MM-dd HH:mm:ss'),'yyyy-MM-dd HH:mm:ss'),-- 计算列,因为ts是bigint,没法作为水印,所以用UDF转成TimeStamp\n"+
" WATERMARK FOR r_t AS r_t - INTERVAL '5' SECOND -- 指定水印生成方式\n"+
") WITH (\n" +
" 'connector.type' = 'kafka', -- 指定连接类型是kafka\n" +
" 'connector.version' = '0.11', -- 与我们之前Docker安装的kafka版本要一致\n" +
" 'connector.topic' = 'timeIntervalJoin_01', -- 之前创建的topic \n" +
" 'connector.properties.group.id' = 'flink-test-0', -- 消费者组,相关概念可自行百度\n" +
" 'connector.startup-mode' = 'latest-offset', --指定从最早消费\n" +
" 'connector.properties.zookeeper.connect' = 'localhost:2181', -- zk地址\n" +
" 'connector.properties.bootstrap.servers' = 'localhost:9092', -- broker地址\n" +
" 'format.type' = 'csv' -- csv格式,和topic中的消息格式保持一致\n" +
")";
public static final String KAFKA_TABLE_SOURCE_DDL_04 = ""+
"CREATE TABLE t4 (\n" +
" order_id BIGINT,\n" +
" item_id BIGINT,\n" +
" ts BIGINT,\n" +
" r_t AS TO_TIMESTAMP(FROM_UNIXTIME(ts,'yyyy-MM-dd HH:mm:ss'),'yyyy-MM-dd HH:mm:ss'),-- 计算列,因为ts是bigint,没法作为水印,所以用UDF转成TimeStamp\n"+
" WATERMARK FOR r_t AS r_t - INTERVAL '5' SECOND -- 指定水印生成方式\n"+
") WITH (\n" +
" 'connector.type' = 'kafka', -- 指定连接类型是kafka\n" +
" 'connector.version' = '0.11', -- 与我们之前Docker安装的kafka版本要一致\n" +
" 'connector.topic' = 'timeIntervalJoin_02', -- 之前创建的topic \n" +
" 'connector.properties.group.id' = 'flink-test-0', -- 消费者组,相关概念可自行百度\n" +
" 'connector.startup-mode' = 'latest-offset', --指定从最早消费\n" +
" 'connector.properties.zookeeper.connect' = 'localhost:2181', -- zk地址\n" +
" 'connector.properties.bootstrap.servers' = 'localhost:9092', -- broker地址\n" +
" 'format.type' = 'csv' -- csv格式,和topic中的消息格式保持一致\n" +
")";
// public static final String MYSQL_TABLE_SINK = "";
public static void main(String argv[]) throws Exception {
//构建StreamExecutionEnvironment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//构建EnvironmentSettings 并指定Blink Planner
EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
//构建StreamTableEnvironment
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, bsSettings);
//注册kafka 数据源表
tEnv.sqlUpdate(KAFKA_TABLE_SOURCE_DDL_01);
tEnv.sqlUpdate(KAFKA_TABLE_SOURCE_DDL_02);
//左表数据 543462,1001,1511658000
//右表数据 1001,4238,1511658001
//不用一开始就给kafka灌入数据,可以等任务正常启动没有数据后再输入数据,方便观察现象
//UnBounded 双流Join 之 Inner Join
Table unBoundedJoin_inner_join = tEnv.sqlQuery("select a.*,b.* from t1 a inner join t2 b on a.order_id = b.order_id");
DataStream<Tuple2<Boolean, Row>> unBoundedJoin_inner_join_DS = tEnv.toRetractStream(unBoundedJoin_inner_join, Row.class);
//在一开始没有数据时,没有输出;当我们往左表的kafka中输入543462,1001,1511658000时,依旧没有数据下发,符合我们之前所说的言论
//之后再往右表灌入数据,此时会有数据输出
//(true,543462,1001,1511658000,1001,4238,1511658001)
// unBoundedJoin_inner_join_DS.print().setParallelism(1).name("unBoundedJoin_inner_join");
//UnBounded 双流Join 之 Left Join
//再准备几条kafka数据
//左表 223813,2042400,1511658002
//右表 2042400,4104826,1511658001
//同样也是先别灌入
Table unBoundedJoin_left_join = tEnv.sqlQuery("select a.*,b.* from t1 a left join t2 b on a.order_id = b.order_id");
DataStream<Tuple2<Boolean, Row>> unBoundedJoin_left_join_DS = tEnv.toRetractStream(unBoundedJoin_left_join, Row.class);
// unBoundedJoin_left_join_DS.print().setParallelism(1).name("unBoundedJoin_left_join");
//此时左表输入223813,2042400,1511658002,发现数据下发,右边都为NULL
//输出:(true,223813,2042400,1511658002,null,null,null)
//然后再将2042400,4104826,1511658001插入右表中
//(false,223813,2042400,1511658002,null,null,null)
//(true,223813,2042400,1511658002,2042400,4104826,1511658001)
//与我们前面所说一致!先是输出右边补齐为NULL的数据,等能够Join上了,再撤回刚才的数据,重新将Join之后的数据下发
//我们测试的都是左表先到,而右表在等待的情况,那么如果右表先到,左表后到,数据结果又是什么样呢?大家自行尝试吧
//执行任务,必不可少一句话!
env.execute("双流join");
}
}
Time Interval Join
- 写法:
ltime = rtime
ltime >= rtime AND ltime < rtime + INTERVAL '10' MINUTE
ltime BETWEEN rtime - INTERVAL '10' SECOND AND rtime + INTERVAL '5' SECOND
- 目前只支持Inner Join,如果想让Join不上的数据最终也下发,只能使用UnBounded Join
- 要么都是Event Time 要么都是Process Time,不能混用
- 同样,我们也通过代码来学习如何使用
//将下面代码嵌入上面的 env.execute("双流join") 前面
//Time Interval 双流JOIN
tEnv.sqlUpdate(KAFKA_TABLE_SOURCE_DDL_03);
tEnv.sqlUpdate(KAFKA_TABLE_SOURCE_DDL_04);
//左表数据 543462,1001,1511658000
//右表数据 1001,4238,1511658011
//使用time interval join,并且指定时间范围为t3.r_t的上下10秒内
Table timeIntervalJoin = tEnv.sqlQuery(""+
"select t3.*,t4.item_id,t4.ts from t3 join t4 on t3.order_id = t4.order_id " +
"and t4.r_t between t3.r_t - interval '10' second and t3.r_t + interval '10' second ");
//因为是time interval join,所以不会有撤回事件发生,所以使用append流
DataStream<Row> tiemIntervalJoinDs = tEnv.toAppendStream(timeIntervalJoin, Row.class);
tiemIntervalJoinDs.print().setParallelism(1).name("timeIntervalJoin");
//当我们将数据输入各自的kafka topic中后,发现并没有数据输出,因为t3.r_t - t4.r_t = -11,已经超过了我们指定的时间范围
//右表再输入1001,4238,1511658010
//输出:543462,1001,1511658000,2017-11-26T09:00,4238,1511658010
//time interval join之后可以再接窗口计算,这里就不给大家实际演示了,大家自行操作吧
附录
- 因为这次使用的是csv格式的数据,所以大家记得在
pom.xml
里面加上依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-csv</artifactId>
<version>1.10.0</version>
</dependency>