Flink Sql教程(5)

Flink 双流Join

概述

  • 在之前的Flink教程03里面给大家讲过了维表Join,今天来和大家分享一下双流Join
  • 目前Flink双流Join分成两类:UnBounded Join 和 Time Interval Join
  • 在有些场景下,用哪个都行,不过后者的性能会优于前者,而且如果在双流Join之后想要再进行窗口计算,那么只能使用Time Interval Join,目前的UnBounded Join后面是没有办法再进行Event Time的窗口计算

UnBounded Join

  • 分为两种Join,一种是Inner Join,另一种是Outer Join
    • Inner Join:双流Join最大的问题是两边的数据量是不一样的,会存在一条流中的数据已经到达,而另一条流中与之匹配的数据还未到达的情况,那么Flink是如何解决这个问题的呢?举个例子:假设左表先来了3条数据,Join 的key分别是1、2、3,右表中尚未有数据到达,那么Flink 会将左表的那三条数据缓存在Join节点的state中,同时不会有数据下发。此时,右表来了一条key是4的数据,未能与左表中的key关联上,那么这条数据同样也会被缓存在Join节点的state中。而当右表来了一条key为1的数据时,与左表中key为1的数据成功关联,那么此时,会将这两条数据Join起来之后的数据下发,而其他尚未匹配上的数据会在state中继续等待,直到他们的有缘人出现,才能够继续前进。
    • Outer Join:支持LEFT JOINRIGHT JOINFULL OUTER JOIN三种语法,此处我们以LEFT JOIN为例。还是左表先来三条数据,key分别是1、2、3,不过此时的结果会和上面的不一样,他们三个虽然还会在Join节点的state中缓存,但是会将数据下发,那么大家会问了,右边的数据怎么办,此时并没有Join成功啊,如果下发数据不就存在异常吗?答:Flink会将右边的数据补上NULL,当右表中key为1、2、3的数据出现时,会将刚才下发的三条数据撤回,将右表中的数据重新填充到下发的三条数据中,之后,再将这三条数据下发;而如果右表先到了,左表尚未到达的话,会一直等待,不会先行下发再撤回。RIGHT JOIN与之相似,只是一个下发左边,一个下发右边;FULL OUTER JOIN是两边都会下发和撤回。
    • 缺点:
      • 因为要存放大量的数据在state中,如果左右表的数据一直无法匹配,那么久而久之,内存很容易就被打爆。解决办法有加机器和使用RocksDBStateBackend,同时需要配上合理的状态清理配置,具体的写法可以自行翻看官网文档
      • Join之前最好先根据主键去重,不然会缓存大量无用数据在Join节点的state节点中。举个栗子:key为1的数据因为各种原因出现了三条,而这三条实际上是同一条数据。那么,在Join时,如果右表只有一条key为1的数据,那么只会有一条数据下发(Inner Join)另外两条一直在死等;或者下发一条有右边数据的和两条右边数据为NULL的数据(Left Outer Join),同时,这两条数据也会在Join节点的state中缓存,等待右表的数据到达。同样也会打爆我们的内存。去重可以很好的减少Join节点内存的压力
      • 假设现在有A、B、C三条流要进行JOIN,SQL写法为:A LEFT JOIN B ON A.KEY1 = B.KEY1 LEFT JOIN C ON B.KEY2 = C.KEY2,如果A与B Join的结果产生了大量B.KEY2为NULL的数据,那么在与C Join时,必然会出现热点问题。那么如何解决呢?我们可以交换Join的顺序,让B、C先行Join,产生的结果再与A流进行Join,这样就能很好的解决热点问题
    • 下面我们通过代码和运行结果,来看看UnBounded Join的写法和产生结果
    package FlinkSql;

    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.table.api.EnvironmentSettings;
    import org.apache.flink.table.api.Table;
    import org.apache.flink.table.api.java.StreamTableEnvironment;
    import org.apache.flink.types.Row;
    
    
    public class FlinkSql05 {
    
        public static final String KAFKA_TABLE_SOURCE_DDL_01 = ""+
                "CREATE TABLE t1 (\n" +
                "    user_id BIGINT,\n" +
                "    order_id BIGINT,\n" +
                "    ts BIGINT\n" +
                ") WITH (\n" +
                "    'connector.type' = 'kafka',  -- 指定连接类型是kafka\n" +
                "    'connector.version' = '0.11',  -- 与我们之前Docker安装的kafka版本要一致\n" +
                "    'connector.topic' = 'unBoundedJoin01_t1', -- 之前创建的topic \n" +
                "    'connector.properties.group.id' = 'flink-test-0', -- 消费者组,相关概念可自行百度\n" +
                "    'connector.startup-mode' = 'latest-offset',  --指定从最早消费\n" +
                "    'connector.properties.zookeeper.connect' = 'localhost:2181',  -- zk地址\n" +
                "    'connector.properties.bootstrap.servers' = 'localhost:9092',  -- broker地址\n" +
                "    'format.type' = 'csv'  -- csv格式,和topic中的消息格式保持一致\n" +
                ")";
    
        public static final String KAFKA_TABLE_SOURCE_DDL_02 = ""+
                "CREATE TABLE t2 (\n" +
                "    order_id BIGINT,\n" +
                "    item_id BIGINT,\n" +
                "    ts BIGINT\n" +
                ") WITH (\n" +
                "    'connector.type' = 'kafka',  -- 指定连接类型是kafka\n" +
                "    'connector.version' = '0.11',  -- 与我们之前Docker安装的kafka版本要一致\n" +
                "    'connector.topic' = 'unBoundedJoin01_t2', -- 之前创建的topic \n" +
                "    'connector.properties.group.id' = 'flink-test-0', -- 消费者组,相关概念可自行百度\n" +
                "    'connector.startup-mode' = 'latest-offset',  --指定从最早消费\n" +
                "    'connector.properties.zookeeper.connect' = 'localhost:2181',  -- zk地址\n" +
                "    'connector.properties.bootstrap.servers' = 'localhost:9092',  -- broker地址\n" +
                "    'format.type' = 'csv'  -- csv格式,和topic中的消息格式保持一致\n" +
                ")";
        public static final String KAFKA_TABLE_SOURCE_DDL_03 = ""+
                "CREATE TABLE t3 (\n" +
                "    user_id BIGINT,\n" +
                "    order_id BIGINT,\n" +
                "    ts BIGINT,\n" +
                "    r_t AS TO_TIMESTAMP(FROM_UNIXTIME(ts,'yyyy-MM-dd HH:mm:ss'),'yyyy-MM-dd HH:mm:ss'),-- 计算列,因为ts是bigint,没法作为水印,所以用UDF转成TimeStamp\n"+
                "    WATERMARK FOR r_t AS r_t - INTERVAL '5' SECOND -- 指定水印生成方式\n"+
                ") WITH (\n" +
                "    'connector.type' = 'kafka',  -- 指定连接类型是kafka\n" +
                "    'connector.version' = '0.11',  -- 与我们之前Docker安装的kafka版本要一致\n" +
                "    'connector.topic' = 'timeIntervalJoin_01', -- 之前创建的topic \n" +
                "    'connector.properties.group.id' = 'flink-test-0', -- 消费者组,相关概念可自行百度\n" +
                "    'connector.startup-mode' = 'latest-offset',  --指定从最早消费\n" +
                "    'connector.properties.zookeeper.connect' = 'localhost:2181',  -- zk地址\n" +
                "    'connector.properties.bootstrap.servers' = 'localhost:9092',  -- broker地址\n" +
                "    'format.type' = 'csv'  -- csv格式,和topic中的消息格式保持一致\n" +
                ")";
    
        public static final String KAFKA_TABLE_SOURCE_DDL_04 = ""+
                "CREATE TABLE t4 (\n" +
                "    order_id BIGINT,\n" +
                "    item_id BIGINT,\n" +
                "    ts BIGINT,\n" +
                "    r_t AS TO_TIMESTAMP(FROM_UNIXTIME(ts,'yyyy-MM-dd HH:mm:ss'),'yyyy-MM-dd HH:mm:ss'),-- 计算列,因为ts是bigint,没法作为水印,所以用UDF转成TimeStamp\n"+
                "    WATERMARK FOR r_t AS r_t - INTERVAL '5' SECOND -- 指定水印生成方式\n"+
                ") WITH (\n" +
                "    'connector.type' = 'kafka',  -- 指定连接类型是kafka\n" +
                "    'connector.version' = '0.11',  -- 与我们之前Docker安装的kafka版本要一致\n" +
                "    'connector.topic' = 'timeIntervalJoin_02', -- 之前创建的topic \n" +
                "    'connector.properties.group.id' = 'flink-test-0', -- 消费者组,相关概念可自行百度\n" +
                "    'connector.startup-mode' = 'latest-offset',  --指定从最早消费\n" +
                "    'connector.properties.zookeeper.connect' = 'localhost:2181',  -- zk地址\n" +
                "    'connector.properties.bootstrap.servers' = 'localhost:9092',  -- broker地址\n" +
                "    'format.type' = 'csv'  -- csv格式,和topic中的消息格式保持一致\n" +
                ")";
    
    
    //    public static final String MYSQL_TABLE_SINK = "";
    
        public static void main(String argv[]) throws Exception {
    
            //构建StreamExecutionEnvironment
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            //构建EnvironmentSettings 并指定Blink Planner
            EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
    
            //构建StreamTableEnvironment
            StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, bsSettings);
    
            //注册kafka 数据源表
            tEnv.sqlUpdate(KAFKA_TABLE_SOURCE_DDL_01);
    
            tEnv.sqlUpdate(KAFKA_TABLE_SOURCE_DDL_02);
    
            //左表数据  543462,1001,1511658000
            //右表数据  1001,4238,1511658001
            //不用一开始就给kafka灌入数据,可以等任务正常启动没有数据后再输入数据,方便观察现象
    
            //UnBounded 双流Join 之 Inner Join
            Table unBoundedJoin_inner_join = tEnv.sqlQuery("select a.*,b.* from t1 a inner join t2 b on a.order_id = b.order_id");
    
            DataStream<Tuple2<Boolean, Row>> unBoundedJoin_inner_join_DS = tEnv.toRetractStream(unBoundedJoin_inner_join, Row.class);
    
            //在一开始没有数据时,没有输出;当我们往左表的kafka中输入543462,1001,1511658000时,依旧没有数据下发,符合我们之前所说的言论
            //之后再往右表灌入数据,此时会有数据输出
            //(true,543462,1001,1511658000,1001,4238,1511658001)
    //        unBoundedJoin_inner_join_DS.print().setParallelism(1).name("unBoundedJoin_inner_join");
    
            //UnBounded 双流Join 之 Left Join
            //再准备几条kafka数据
            //左表    223813,2042400,1511658002
            //右表    2042400,4104826,1511658001
            //同样也是先别灌入
    
            Table unBoundedJoin_left_join = tEnv.sqlQuery("select a.*,b.* from t1 a left join t2 b on a.order_id = b.order_id");
    
            DataStream<Tuple2<Boolean, Row>> unBoundedJoin_left_join_DS = tEnv.toRetractStream(unBoundedJoin_left_join, Row.class);
    
    //        unBoundedJoin_left_join_DS.print().setParallelism(1).name("unBoundedJoin_left_join");
            //此时左表输入223813,2042400,1511658002,发现数据下发,右边都为NULL
            //输出:(true,223813,2042400,1511658002,null,null,null)
            //然后再将2042400,4104826,1511658001插入右表中
            //(false,223813,2042400,1511658002,null,null,null)
            //(true,223813,2042400,1511658002,2042400,4104826,1511658001)
            //与我们前面所说一致!先是输出右边补齐为NULL的数据,等能够Join上了,再撤回刚才的数据,重新将Join之后的数据下发
            //我们测试的都是左表先到,而右表在等待的情况,那么如果右表先到,左表后到,数据结果又是什么样呢?大家自行尝试吧
        
            //执行任务,必不可少一句话!
            env.execute("双流join");
        }
    }

Time Interval Join

  • 写法:
    • ltime = rtime
    • ltime >= rtime AND ltime < rtime + INTERVAL '10' MINUTE
    • ltime BETWEEN rtime - INTERVAL '10' SECOND AND rtime + INTERVAL '5' SECOND
  • 目前只支持Inner Join,如果想让Join不上的数据最终也下发,只能使用UnBounded Join
  • 要么都是Event Time 要么都是Process Time,不能混用
  • 同样,我们也通过代码来学习如何使用
    //将下面代码嵌入上面的 env.execute("双流join") 前面
    //Time Interval 双流JOIN

    tEnv.sqlUpdate(KAFKA_TABLE_SOURCE_DDL_03);

    tEnv.sqlUpdate(KAFKA_TABLE_SOURCE_DDL_04);

    //左表数据  543462,1001,1511658000
    //右表数据  1001,4238,1511658011

    //使用time interval join,并且指定时间范围为t3.r_t的上下10秒内
    Table timeIntervalJoin = tEnv.sqlQuery(""+
            "select t3.*,t4.item_id,t4.ts from t3 join t4 on t3.order_id = t4.order_id " +
            "and t4.r_t between t3.r_t - interval '10' second and t3.r_t + interval '10' second ");

    //因为是time interval join,所以不会有撤回事件发生,所以使用append流
    DataStream<Row> tiemIntervalJoinDs = tEnv.toAppendStream(timeIntervalJoin, Row.class);

    tiemIntervalJoinDs.print().setParallelism(1).name("timeIntervalJoin");
    //当我们将数据输入各自的kafka topic中后,发现并没有数据输出,因为t3.r_t - t4.r_t = -11,已经超过了我们指定的时间范围
    //右表再输入1001,4238,1511658010
    //输出:543462,1001,1511658000,2017-11-26T09:00,4238,1511658010
    //time interval join之后可以再接窗口计算,这里就不给大家实际演示了,大家自行操作吧

附录

  • 因为这次使用的是csv格式的数据,所以大家记得在pom.xml里面加上依赖
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-csv</artifactId>
    <version>1.10.0</version>
</dependency>
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,099评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,828评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,540评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,848评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,971评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,132评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,193评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,934评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,376评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,687评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,846评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,537评论 4 335
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,175评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,887评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,134评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,674评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,741评论 2 351