Flink实战双流join之Window Join

Window Join将流中两个key相同的元素联结在一起。这种联结方式看起来非常像inner join,两个元素必须都存在,才会出现在结果中。
在Flink中,分为有三种不同类型的典型窗口:滚动窗口、滑动窗口、会话窗口。我们以窗口的类型分开讲解。
在执行窗口join时,会将所有key能够匹配上、且处在同一个滚动窗口的事件进行join,join之后传递到JoinFunction或者FlatJoinFunction。这种join看起来就像是INNER JOIN,滚动窗口operator不会将一个在某个流中,而在另一个流中不存在的元素发送到下游。


image.png

上述图,表示两个流进行滚动窗口join,我们发现,只要是两个流中都有的元素,才发生了join操作。
来做个案例:
使用两个指定Source模拟数据,一个Source是订单明细,一个Source是商品数据。我们通过window join,将数据关联到一起。
输出结果如下:


image.png

1、先将Flink的依赖导入进来
 <repositories>
        <repository>
            <id>aliyunmaven</id>
            <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
        </repository>
    </repositories>

    <properties>
        <flink-version>1.12.0</flink-version>
        <scala-version>2.12</scala-version>
        <mysql-version>5.1.47</mysql-version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink-version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala-version}</artifactId>
            <version>${flink-version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala-version}</artifactId>
            <version>${flink-version}</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.62</version>
        </dependency>
    </dependencies>
image.png
package com.istudy.bean;

import com.alibaba.fastjson.JSON;

import java.math.BigDecimal;

/**
 * @projectname: HaiStream
 * @description:
 * @author: Mr.Zhang
 * @create: 2021-03-13 17:06
 **/
public class FactOrderItem {
    private String goodsId;
    private String goodsName;
    private BigDecimal count;
    private BigDecimal totalMoney;

    @Override
    public String toString() {
        return JSON.toJSONString(this);
    }

    public String getGoodsId() {
        return goodsId;
    }

    public void setGoodsId(String goodsId) {
        this.goodsId = goodsId;
    }

    public String getGoodsName() {
        return goodsName;
    }

    public void setGoodsName(String goodsName) {
        this.goodsName = goodsName;
    }

    public BigDecimal getCount() {
        return count;
    }

    public void setCount(BigDecimal count) {
        this.count = count;
    }

    public BigDecimal getTotalMoney() {
        return totalMoney;
    }

    public void setTotalMoney(BigDecimal totalMoney) {
        this.totalMoney = totalMoney;
    }

}
package com.istudy.bean;

import com.alibaba.fastjson.JSON;


import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;

/**
 * @projectname: HaiStream
 * @description:先为本次的测试构建两个实体类,一个是Goods(商品类)、另一个OrderItem(订单明细)
 * @author: Mr.Zhang
 * @create: 2021-03-13 17:03
 **/
public class Goods {
    private String goodsId;
    private String goodsName;
    private BigDecimal goodsPrice;

    public static List<Goods> GOODS_LIST;
    public static Random r;

    static  {
        r = new Random();

        GOODS_LIST = new ArrayList<>();

        GOODS_LIST.add(new Goods("1", "小米12", new BigDecimal(4890)));
        GOODS_LIST.add(new Goods("2", "iphone12", new BigDecimal(12000)));
        GOODS_LIST.add(new Goods("3", "MacBookPro", new BigDecimal(15000)));
        GOODS_LIST.add(new Goods("4", "Thinkpad X1", new BigDecimal(9800)));
        GOODS_LIST.add(new Goods("5", "MeiZu One", new BigDecimal(3200)));
        GOODS_LIST.add(new Goods("6", "Mate 40", new BigDecimal(6500)));
    }

    public static Goods randomGoods() {
        int rIndex = r.nextInt(GOODS_LIST.size());

        return GOODS_LIST.get(rIndex);
    }

    public Goods() {
    }

    public Goods(String goodsId, String goodsName, BigDecimal goodsPrice) {
        this.goodsId = goodsId;
        this.goodsName = goodsName;
        this.goodsPrice = goodsPrice;
    }

    public String getGoodsId() {
        return goodsId;
    }

    public void setGoodsId(String goodsId) {
        this.goodsId = goodsId;
    }

    public String getGoodsName() {
        return goodsName;
    }

    public void setGoodsName(String goodsName) {
        this.goodsName = goodsName;
    }

    public BigDecimal getGoodsPrice() {
        return goodsPrice;
    }

    public void setGoodsPrice(BigDecimal goodsPrice) {
        this.goodsPrice = goodsPrice;
    }

    @Override
    public String toString() {
        return JSON.toJSONString(this);
    }
    public static void main(String[] args) {
        randomGoods();
    }
}
package com.istudy.bean;

import com.alibaba.fastjson.JSON;

/**
 * @projectname: HaiStream
 * @description:先为本次的测试构建两个实体类,一个是Goods(商品类)、另一个OrderItem(订单明细)
 * @author: Mr.Zhang
 * @create: 2021-03-13 17:05
 **/
public class OrderItem {
    private String itemId;
    private String goodsId;
    private Integer count;

    @Override
    public String toString() {
        return JSON.toJSONString(this);
    }

    public String getItemId() {
        return itemId;
    }

    public void setItemId(String itemId) {
        this.itemId = itemId;
    }

    public String getGoodsId() {
        return goodsId;
    }

    public void setGoodsId(String goodsId) {
        this.goodsId = goodsId;
    }

    public Integer getCount() {
        return count;
    }

    public void setCount(Integer count) {
        this.count = count;
    }
}
package com.istudy.streamsource;

import com.istudy.bean.Goods;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;

import java.util.concurrent.TimeUnit;

/**
 * @projectname: HaiStream
 * @description:构建一个商品Stream源(这个好比就是维表)
 * @author: Mr.Zhang
 * @create: 2021-03-13 17:07
 **/
public class GoodsSource extends RichSourceFunction {

        private Boolean isCancel;

        @Override
        public void open(Configuration parameters) throws Exception {
            isCancel = false;
        }

        @Override
        public void run(SourceContext sourceContext) throws Exception {
            while(!isCancel) {
                Goods.GOODS_LIST.stream().forEach(goods -> sourceContext.collect(goods));
                TimeUnit.SECONDS.sleep(1);
            }
        }

        @Override
        public void cancel() {
            isCancel = true;
        }
    }

package com.istudy.streamsource;


import com.istudy.bean.Goods;
import com.istudy.bean.OrderItem;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;

import java.util.Random;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

/**
 * @projectname: HaiStream
 * @description:构建订单明细Stream源
 * @author: Mr.Zhang
 * @create: 2021-03-13 17:19
 **/
public class OrderItemSource extends RichSourceFunction {

    private Boolean isCancel;
    private Random r;

    @Override
    public void open(Configuration parameters) throws Exception {
        isCancel = false;
        r = new Random();
    }

    @Override
    public void run(SourceContext sourceContext) throws Exception {
        while(!isCancel) {
            Goods goods = Goods.randomGoods();
            OrderItem orderItem = new OrderItem();
            orderItem.setGoodsId(goods.getGoodsId());
            orderItem.setCount(r.nextInt(10) + 1);
            orderItem.setItemId(UUID.randomUUID().toString());

            sourceContext.collect(orderItem);

            orderItem.setGoodsId("111");
            sourceContext.collect(orderItem);

            TimeUnit.SECONDS.sleep(1);
        }
    }

    @Override
    public void cancel() {
        isCancel = true;
    }
}
package com.istudy.watermark;

import com.istudy.bean.Goods;
import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.streaming.api.functions.TimestampAssigner;

/**
 * @projectname: HaiStream
 * @description:构建水印分配器(此处为了简单),直接使用系统时间了
 * @author: Mr.Zhang
 * @create: 2021-03-13 17:18
 **/
public class GoodsWatermark implements WatermarkStrategy<Goods> {

    @Override
    public TimestampAssigner<Goods> createTimestampAssigner(TimestampAssignerSupplier.Context context) {
        return (element, recordTimestamp) -> System.currentTimeMillis();
    }

    @Override
    public WatermarkGenerator<Goods> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
        return new WatermarkGenerator<Goods>() {
            @Override
            public void onEvent(Goods event, long eventTimestamp, WatermarkOutput output) {
                output.emitWatermark(new Watermark(System.currentTimeMillis()));
            }

            @Override
            public void onPeriodicEmit(WatermarkOutput output) {
                output.emitWatermark(new Watermark(System.currentTimeMillis()));
            }
        };
    }
}
package com.istudy.watermark;

import com.istudy.bean.OrderItem;
import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.streaming.api.functions.TimestampAssigner;

/**
 * @projectname: HaiStream
 * @description:
 * @author: Mr.Zhang
 * @create: 2021-03-13 17:17
 **/
public class OrderItemWatermark implements WatermarkStrategy<OrderItem> {

    @Override
    public TimestampAssigner<OrderItem> createTimestampAssigner(TimestampAssignerSupplier.Context context) {
        return (element, recordTimestamp) -> System.currentTimeMillis();
    }

    @Override
    public WatermarkGenerator<OrderItem> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
        return new WatermarkGenerator<OrderItem>() {
            @Override
            public void onEvent(OrderItem event, long eventTimestamp, WatermarkOutput output) {
                output.emitWatermark(new Watermark(System.currentTimeMillis()));
            }

            @Override
            public void onPeriodicEmit(WatermarkOutput output) {
                output.emitWatermark(new Watermark(System.currentTimeMillis()));
            }
        };
    }
}
package com.istudy.work;

import com.istudy.bean.FactOrderItem;
import com.istudy.bean.Goods;
import com.istudy.bean.OrderItem;
import com.istudy.streamsource.GoodsSource;
import com.istudy.streamsource.OrderItemSource;
import com.istudy.watermark.GoodsWatermark;
import com.istudy.watermark.OrderItemWatermark;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

import java.math.BigDecimal;

/**
 * @projectname: HaiStream
 * @description:
 * @author: Mr.Zhang
 * @create: 2021-03-13 17:16
 **/
public class TumbleWindowJoin {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 构建商品数据流
        SingleOutputStreamOperator<Goods> goodsDS = env.addSource(new GoodsSource(), TypeInformation.of(Goods.class))
                .assignTimestampsAndWatermarks(new GoodsWatermark() {
                });
        // 构建订单明细数据流
         SingleOutputStreamOperator<OrderItem> orderItemDS = env.addSource(new OrderItemSource(), TypeInformation.of(OrderItem.class))
                .assignTimestampsAndWatermarks(new OrderItemWatermark());
            // 进行关联查询
            DataStream<FactOrderItem> factOrderItemDS = orderItemDS.join(goodsDS)
            //todo 1、Window Join首先需要使用where和equalTo指定使用哪个key来进行关联,此处我们通过应用方法,基于GoodsId来关联两个流中的元素。
            // 第一个流orderItemDS
            .where(OrderItem::getGoodsId)
            // 第二流goodsDS
            .equalTo(Goods::getGoodsId)
            //todo 2、设置了5秒的滚动窗口,流的元素关联都会在这个5秒的窗口中进行关联。
            .window(TumblingEventTimeWindows.of(Time.seconds(5)))
            //todo 3、apply方法中实现了,将两个不同类型的元素关联并生成一个新类型的元素。
            .apply((OrderItem item, Goods goods) -> {
            FactOrderItem factOrderItem = new FactOrderItem();
            factOrderItem.setGoodsId(goods.getGoodsId());
            factOrderItem.setGoodsName(goods.getGoodsName());
            factOrderItem.setCount(new BigDecimal(item.getCount()));
            factOrderItem.setTotalMoney(goods.getGoodsPrice().multiply(new BigDecimal(item.getCount())));
                    return factOrderItem;
                });

        factOrderItemDS.print();

        env.execute("滚动窗口JOIN");
    }
}
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,997评论 6 502
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,603评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 163,359评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,309评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,346评论 6 390
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,258评论 1 300
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,122评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,970评论 0 275
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,403评论 1 313
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,596评论 3 334
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,769评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,464评论 5 344
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,075评论 3 327
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,705评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,848评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,831评论 2 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,678评论 2 354

推荐阅读更多精彩内容

  • Window JoinTumbling Window JoinSliding Window JoinSession...
    专职掏大粪阅读 1,298评论 0 0
  • Window Join(窗口连接) Window Join 连接共享公共 Key 并位于同一窗口中的两个流的数据元...
    Alex90阅读 1,057评论 0 0
  • Window Join 基于窗口的Join是将具有相同key并位于同一个窗口中的事件进行联结。 用法: 官方案例:...
    fodder阅读 8,699评论 1 4
  • 前言 本文是基础中的基础,看官可以放心食用。 在数据库中的静态表上做OLAP分析时,两表join是非常常见的操作。...
    LittleMagic阅读 7,453评论 19 18
  • 推荐指数: 6.0 书籍主旨关键词:特权、焦点、注意力、语言联想、情景联想 观点: 1.统计学现在叫数据分析,社会...
    Jenaral阅读 5,719评论 0 5