9.Flink实时项目之订单宽表

1.需求分析

订单是统计分析的重要的对象,围绕订单有很多的维度统计需求,比如用户、地区、商品、品类、品牌等等。为了之后统计计算更加方便,减少大表之间的关联,所以在实时计算过程中将围绕订单的相关数据整合成为一张订单的宽表。那究竟哪些数据需要和订单整合在一起?

如上图,由于在之前的操作(BaseDbTask)我们已经把数据分拆成了事实数据和维度数据,事实数据(绿色)进入 kafka 数据流(DWD 层)中,维度数据(蓝色)进入 hbase 中长期保存。那么我们在 DWM 层中要把实时和维度数据进行整合关联在一起,形成宽表。那么这里就要处理有两种关联,事实数据和事实数据关联、事实数据和维度数据关联。

  • 事实数据和事实数据关联,其实就是流与流之间的关联。
  • 事实数据与维度数据关联,其实就是流计算中查询外部数据源。
数据流程走向

2. 创建实体类

import java.math.BigDecimal;
/**
 * @author zhangbao
 * @date 2021/10/25 19:55
 * @desc 订单
 */
@Data
public class OrderInfo {
    Long id;
    Long province_id;
    String order_status;
    Long user_id;
    BigDecimal total_amount;
    BigDecimal activity_reduce_amount;
    BigDecimal coupon_reduce_amount;
    BigDecimal original_total_amount;
    BigDecimal feight_fee;
    String expire_time;
    String create_time;
    String operate_time;
    String create_date; // 把其他字段处理得到
    String create_hour;
    Long create_ts;
}
import java.math.BigDecimal;
/**
 * @author zhangbao
 * @date 2021/10/25 19:55
 * @desc 订单明细
 */
@Data
public class OrderDetail {
    Long id;
    Long order_id ;
    Long sku_id;
    BigDecimal order_price ;
    Long sku_num ;
    String sku_name;
    String create_time;
    BigDecimal split_total_amount;
    BigDecimal split_activity_amount;
    BigDecimal split_coupon_amount;
    Long create_ts;
}

3. 消费kafka事实数据

在dwm包下创建任务OrderWideApp.java,对订单及明细数据做格式转换,在这个阶段可以做一些ETL操作。

import cn.hutool.core.date.DateTime;
import cn.hutool.core.date.DateUnit;
import cn.hutool.core.date.DateUtil;
import com.alibaba.fastjson.JSONObject;
import com.zhangbao.gmall.realtime.bean.OrderDetail;
import com.zhangbao.gmall.realtime.bean.OrderInfo;
import com.zhangbao.gmall.realtime.utils.MyKafkaUtil;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;

/**
 * @author zhangbao
 * @date 2021/10/25 19:58
 * @desc
 */
public class OrderWideApp {
    public static void main(String[] args) {
        //webui模式,需要添加pom依赖
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
//        StreamExecutionEnvironment env1 = StreamExecutionEnvironment.createLocalEnvironment();
        //设置并行度
        env.setParallelism(4);
        //设置检查点
//        env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
//        env.getCheckpointConfig().setCheckpointTimeout(60000);
//        env.setStateBackend(new FsStateBackend("hdfs://hadoop101:9000/gmall/flink/checkpoint/uniqueVisit"));
//        //指定哪个用户读取hdfs文件
//        System.setProperty("HADOOP_USER_NAME","zhangbao");

        //从kafka的dwd主题获取订单和订单详情
        String orderInfoTopic = "dwd_order_info";
        String orderDetailTopic = "dwd_order_detail";
        String orderWideTopic = "dwm_order_wide";
        String orderWideGroup = "order_wide_group";

        //订单数据
        FlinkKafkaConsumer<String> orderInfoSource = MyKafkaUtil.getKafkaSource(orderInfoTopic, orderWideGroup);
        DataStreamSource<String> orderInfoDs = env.addSource(orderInfoSource);

        //订单详情数据
        FlinkKafkaConsumer<String> orderDetailSource = MyKafkaUtil.getKafkaSource(orderDetailTopic, orderWideGroup);
        DataStreamSource<String> orderDetailDs = env.addSource(orderDetailSource);

        //对订单数据进行转换
        SingleOutputStreamOperator<OrderInfo> orderInfoObjDs = orderInfoDs.map(new RichMapFunction<String, OrderInfo>() {
            @Override
            public OrderInfo map(String jsonStr) throws Exception {
                System.out.println("order info str >>> "+jsonStr);
                OrderInfo orderInfo = JSONObject.parseObject(jsonStr, OrderInfo.class);
                DateTime createTime = DateUtil.parse(orderInfo.getCreate_time(), "yyyy-MM-dd HH:mm:ss");
                orderInfo.setCreate_ts(createTime.getTime());
                return orderInfo;
            }
        });

        //对订单明细数据进行转换
        SingleOutputStreamOperator<OrderDetail> orderDetailObjDs = orderDetailDs.map(new RichMapFunction<String, OrderDetail>() {
            @Override
            public OrderDetail map(String jsonStr) throws Exception {
                System.out.println("order detail str >>> "+jsonStr);
                OrderDetail orderDetail = JSONObject.parseObject(jsonStr, OrderDetail.class);
                DateTime createTime = DateUtil.parse(orderDetail.getCreate_time(), "yyyy-MM-dd HH:mm:ss");
                orderDetail.setCreate_ts(createTime.getTime());
                return orderDetail;
            }
        });

        orderInfoDs.print("order info >>>");
        orderDetailDs.print("order detail >>>");

        try {
            env.execute("order wide task");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

4. 双流join准备

在 flink 中的流 join 大体分为两种,一种是基于时间窗口的 join(Time Windowed Join),比如 join、coGroup 等。另一种是基于状态缓存的 join(Temporal Table Join),比如 intervalJoin。这里选用 intervalJoin,因为相比较窗口 join,intervalJoin 使用更简单,而且避免了应匹配的数据处于不同窗口的问题。

intervalJoin 目前只有一个问题,就是还不支持 left join。但是我们这里是订单主表与订单从表之间的关联不需要 left join,所以 intervalJoin 是较好的选择。

官方文档:interval-join

先设置时间水位线,然后在分组

//指定事件时间字段
//订单事件时间字段
SingleOutputStreamOperator<OrderInfo> orderInfoWithTsDs = orderInfoObjDs.assignTimestampsAndWatermarks(
        WatermarkStrategy
                .<OrderInfo>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                .withTimestampAssigner(new SerializableTimestampAssigner<OrderInfo>() {
                    @Override
                    public long extractTimestamp(OrderInfo orderInfo, long l) {
                        return orderInfo.getCreate_ts();
                    }
                })
);
//订单明细指定事件事件字段
SingleOutputStreamOperator<OrderDetail> orderDetailWithTsDs = orderDetailObjDs.assignTimestampsAndWatermarks(
        WatermarkStrategy.<OrderDetail>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                .withTimestampAssigner(new SerializableTimestampAssigner<OrderDetail>() {
                    @Override
                    public long extractTimestamp(OrderDetail orderDetail, long l) {
                        return orderDetail.getCreate_ts();
                    }
                })
);

//分组
KeyedStream<OrderInfo, Long> orderInfoKeysDs = orderInfoWithTsDs.keyBy(OrderInfo::getId);
KeyedStream<OrderDetail, Long> orderDetailKeysDs = orderDetailWithTsDs.keyBy(OrderDetail::getId);

5. 建立订单宽表

import lombok.AllArgsConstructor;
import lombok.Data;
import org.apache.commons.lang3.ObjectUtils;

import java.math.BigDecimal;

/**
 * @author zhangbaohpu
 * @date  2021/11/13 11:10
 * @desc 订单宽表
 */
@Data
@AllArgsConstructor
public class OrderWide {
    Long detail_id;
    Long order_id ;
    Long sku_id;
    BigDecimal order_price ;
    Long sku_num ;
    String sku_name;
    Long province_id;
    String order_status;
    Long user_id;
    BigDecimal total_amount;
    BigDecimal activity_reduce_amount;
    BigDecimal coupon_reduce_amount;
    BigDecimal original_total_amount;
    BigDecimal feight_fee;
    BigDecimal split_feight_fee;
    BigDecimal split_activity_amount;
    BigDecimal split_coupon_amount;
    BigDecimal split_total_amount;
    String expire_time;
    String create_time;
    String operate_time;
    String create_date; // 把其他字段处理得到
    String create_hour;
    String province_name;//查询维表得到
    String province_area_code;
    String province_iso_code;
    String province_3166_2_code;
    Integer user_age ;
    String user_gender;
    Long spu_id; //作为维度数据 要关联进来
    Long tm_id;
    Long category3_id;
    String spu_name;
    String tm_name;
    String category3_name;
    public OrderWide(OrderInfo orderInfo, OrderDetail orderDetail){
        mergeOrderInfo(orderInfo);
        mergeOrderDetail(orderDetail);
    }
    public void mergeOrderInfo(OrderInfo orderInfo ) {
        if (orderInfo != null) {
            this.order_id = orderInfo.id;
            this.order_status = orderInfo.order_status;
            this.create_time = orderInfo.create_time;
            this.create_date = orderInfo.create_date;
            this.activity_reduce_amount = orderInfo.activity_reduce_amount;
            this.coupon_reduce_amount = orderInfo.coupon_reduce_amount;
            this.original_total_amount = orderInfo.original_total_amount;
            this.feight_fee = orderInfo.feight_fee;
            this.total_amount = orderInfo.total_amount;
            this.province_id = orderInfo.province_id;
            this.user_id = orderInfo.user_id;
        }
    }
    public void mergeOrderDetail(OrderDetail orderDetail ) {
        if (orderDetail != null) {
            this.detail_id = orderDetail.id;
            this.sku_id = orderDetail.sku_id;
            this.sku_name = orderDetail.sku_name;
            this.order_price = orderDetail.order_price;
            this.sku_num = orderDetail.sku_num;
            this.split_activity_amount=orderDetail.split_activity_amount;
            this.split_coupon_amount=orderDetail.split_coupon_amount;
            this.split_total_amount=orderDetail.split_total_amount;
        }
    }
    public void mergeOtherOrderWide(OrderWide otherOrderWide){
        this.order_status =
                ObjectUtils.firstNonNull( this.order_status ,otherOrderWide.order_status);
        this.create_time =
                ObjectUtils.firstNonNull(this.create_time,otherOrderWide.create_time);
        this.create_date =
                ObjectUtils.firstNonNull(this.create_date,otherOrderWide.create_date);
        this.coupon_reduce_amount =
                ObjectUtils.firstNonNull(this.coupon_reduce_amount,otherOrderWide.coupon_reduce_amount);
        this.activity_reduce_amount =
                ObjectUtils.firstNonNull(this.activity_reduce_amount,otherOrderWide.activity_reduce_amount);
        this.original_total_amount =
                ObjectUtils.firstNonNull(this.original_total_amount,otherOrderWide.original_total_amount);
        this.feight_fee = ObjectUtils.firstNonNull( this.feight_fee,otherOrderWide.feight_fee);
        this.total_amount =
                ObjectUtils.firstNonNull( this.total_amount,otherOrderWide.total_amount);
        this.user_id = ObjectUtils.<Long>firstNonNull(this.user_id,otherOrderWide.user_id);
        this.sku_id = ObjectUtils.firstNonNull( this.sku_id,otherOrderWide.sku_id);
        this.sku_name = ObjectUtils.firstNonNull(this.sku_name,otherOrderWide.sku_name);
        this.order_price =
                ObjectUtils.firstNonNull(this.order_price,otherOrderWide.order_price);
        this.sku_num = ObjectUtils.firstNonNull( this.sku_num,otherOrderWide.sku_num);
        this.split_activity_amount=ObjectUtils.firstNonNull(this.split_activity_amount);
        this.split_coupon_amount=ObjectUtils.firstNonNull(this.split_coupon_amount);
        this.split_total_amount=ObjectUtils.firstNonNull(this.split_total_amount);
    } }

6. 双流join

在做好数据封装,并标记时间水位线,我们可以做订单和订单明细表的双流join操作了。

import cn.hutool.core.date.DateTime;
import cn.hutool.core.date.DateUtil;
import com.alibaba.fastjson.JSONObject;
import com.zhangbao.gmall.realtime.bean.OrderDetail;
import com.zhangbao.gmall.realtime.bean.OrderInfo;
import com.zhangbao.gmall.realtime.bean.OrderWide;
import com.zhangbao.gmall.realtime.utils.MyKafkaUtil;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;

import java.time.Duration;

/**
 * @author zhangbao
 * @date 2021/10/25 19:58
 * @desc
 * 启动服务
 *      zk > kf > maxwell > hdfs > hbase > baseDbTask > OrderWideApp > mysql配置表
 * 业务流程
 *      模拟生成数据
 *      maxwell监控mysql数据
 *      kafka接收maxwell发送的数据,放入ODS层(ods_base_db_m)
 *      baseDbTask消费kafka的主题数据并进行分流
 *          从mysql读取配置表
 *          将配置缓存到map集合中
 *          检查phoenix(hbase的皮肤)是否存在表
 *          对数据表进行分流发送到不同dwd层主题
 */
public class OrderWideApp {
    public static void main(String[] args) {
        //webui模式,需要添加pom依赖
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
//        StreamExecutionEnvironment env1 = StreamExecutionEnvironment.createLocalEnvironment();
        //设置并行度
        env.setParallelism(4);
        //设置检查点
//        env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
//        env.getCheckpointConfig().setCheckpointTimeout(60000);
//        env.setStateBackend(new FsStateBackend("hdfs://hadoop101:9000/gmall/flink/checkpoint/uniqueVisit"));
//        //指定哪个用户读取hdfs文件
//        System.setProperty("HADOOP_USER_NAME","zhangbao");


        //从kafka的dwd主题获取订单和订单详情
        String orderInfoTopic = "dwd_order_info";
        String orderDetailTopic = "dwd_order_detail";
        String orderWideTopic = "dwm_order_wide";
        String orderWideGroup = "order_wide_group";

        //订单数据
        FlinkKafkaConsumer<String> orderInfoSource = MyKafkaUtil.getKafkaSource(orderInfoTopic, orderWideGroup);
        DataStreamSource<String> orderInfoDs = env.addSource(orderInfoSource);

        //订单详情数据
        FlinkKafkaConsumer<String> orderDetailSource = MyKafkaUtil.getKafkaSource(orderDetailTopic, orderWideGroup);
        DataStreamSource<String> orderDetailDs = env.addSource(orderDetailSource);

        //对订单数据进行转换
        SingleOutputStreamOperator<OrderInfo> orderInfoObjDs = orderInfoDs.map(new RichMapFunction<String, OrderInfo>() {
            @Override
            public OrderInfo map(String jsonStr) throws Exception {
                System.out.println("order info str >>> "+jsonStr);
                OrderInfo orderInfo = JSONObject.parseObject(jsonStr, OrderInfo.class);
                DateTime createTime = DateUtil.parse(orderInfo.getCreate_time(), "yyyy-MM-dd HH:mm:ss");
                orderInfo.setCreate_ts(createTime.getTime());
                return orderInfo;
            }
        });

        //对订单明细数据进行转换
        SingleOutputStreamOperator<OrderDetail> orderDetailObjDs = orderDetailDs.map(new RichMapFunction<String, OrderDetail>() {
            @Override
            public OrderDetail map(String jsonStr) throws Exception {
                System.out.println("order detail str >>> "+jsonStr);
                OrderDetail orderDetail = JSONObject.parseObject(jsonStr, OrderDetail.class);
                DateTime createTime = DateUtil.parse(orderDetail.getCreate_time(), "yyyy-MM-dd HH:mm:ss");
                orderDetail.setCreate_ts(createTime.getTime());
                return orderDetail;
            }
        });

        orderInfoObjDs.print("order info >>>");
        orderDetailObjDs.print("order detail >>>");

        //指定事件时间字段
        //订单事件时间字段
        SingleOutputStreamOperator<OrderInfo> orderInfoWithTsDs = orderInfoObjDs.assignTimestampsAndWatermarks(
                WatermarkStrategy
                        .<OrderInfo>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                        .withTimestampAssigner(new SerializableTimestampAssigner<OrderInfo>() {
                            @Override
                            public long extractTimestamp(OrderInfo orderInfo, long l) {
                                return orderInfo.getCreate_ts();
                            }
                        })
        );
        //订单明细指定事件事件字段
        SingleOutputStreamOperator<OrderDetail> orderDetailWithTsDs = orderDetailObjDs.assignTimestampsAndWatermarks(
                WatermarkStrategy.<OrderDetail>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                        .withTimestampAssigner(new SerializableTimestampAssigner<OrderDetail>() {
                            @Override
                            public long extractTimestamp(OrderDetail orderDetail, long l) {
                                return orderDetail.getCreate_ts();
                            }
                        })
        );

        //分组
        KeyedStream<OrderInfo, Long> orderInfoKeysDs = orderInfoWithTsDs.keyBy(OrderInfo::getId);
        KeyedStream<OrderDetail, Long> orderDetailKeysDs = orderDetailWithTsDs.keyBy(OrderDetail::getOrder_id);

        /**
         * interval-join
         * https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/operators/joining/#interval-join
         */
        SingleOutputStreamOperator<OrderWide> orderWideDs = orderInfoKeysDs.intervalJoin(orderDetailKeysDs)
                .between(Time.milliseconds(-5), Time.milliseconds(5))
                .process(new ProcessJoinFunction<OrderInfo, OrderDetail, OrderWide>() {
                    @Override
                    public void processElement(OrderInfo orderInfo, OrderDetail orderDetail, ProcessJoinFunction<OrderInfo, OrderDetail, OrderWide>.Context context, Collector<OrderWide> out) throws Exception {
                        out.collect(new OrderWide(orderInfo, orderDetail));
                    }
                });
        orderWideDs.print("order wide ds >>>");

        try {
            env.execute("order wide task");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

到这里我们就把订单的部分宽表数据给做出来了,下一节,我们再把一些维度数据给关联进来,就形成了一个完整订单宽表。

更多请在某公号平台搜索:选手一号位,本文编号:1009,回复即可获取。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,634评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,951评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,427评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,770评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,835评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,799评论 1 294
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,768评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,544评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,979评论 1 308
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,271评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,427评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,121评论 5 340
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,756评论 3 324
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,375评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,579评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,410评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,315评论 2 352