Flink系列 - 实时数仓之FlinkCDC实现动态分流实战(十)

  自从Flink出了FlinkCDC之后,我们对数据库日志的采集就变得方便了许多了,除去了MaxWell、Cannel、OGG等第三方组件的繁琐配置,目前实现CDC有两种方式:HQL实现 和 DataStreamAPI实现(推荐)。
  想更深入的了解CDC可以通过此链接进行学习:
  1. 文档 -> https://ververica.github.io/flink-cdc-connectors/master/
  2. 项目 -> https://github.com/ververica/flink-cdc-connectors

  废话不多说,今天我们就使用 FlinkCDC 对业务数据进行动态的分流的实现。

一、动态分流

  由于FlinkCDC是把全部数据统一写入一个Topic中, 这样显然不利于日后的数据处理。所以需要把各个表拆开处理。但是由于每个表有不同的特点,有些表是维度表,有些表是事实表。
  在实时计算中一般把维度数据写入存储容器,一般是方便通过主键查询的数据库比如HBase,Redis,MySQL等。一般把事实数据写入流中,进行进一步处理,最终形成宽表。
  这样的配置不适合写在配置文件中,因为这样的话,业务端随着需求变化每增加一张表,就要修改配置重启计算程序。所以这里需要一种动态配置方案,把这种配置长期保存起来,一旦配置有变化,实时计算可以自动感知。

二、实现流程图
企业微信截图_16385248902249.png

从图中我们可以看出,把分好的流保存到对应表、主题中:
1)业务数据保存到Kafka的主题中
2)维度数据保存到HBase的表中

三、代码实现

3.1)引入 pom.xml 主要的依赖

<dependencies>
      
       <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>

         <dependency>
            <groupId>com.ververica</groupId>
            <artifactId>flink-connector-mysql-cdc</artifactId>
            <version>2.0.2</version>
        </dependency>       
    
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.2</version>
        </dependency>  
      
        <!-- 如果保存检查点到 hdfs上,需要引入此依赖 -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop.version}</version>
        </dependency>

        <!--phoenix-->
        <dependency>
            <groupId>org.apache.phoenix</groupId>
            <artifactId>phoenix-core</artifactId>
            <version>4.7.0-HBase-1.1</version>
            <exclusions>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>log4j</groupId>
                    <artifactId>log4j</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

      <dependency>
            <groupId>org.apache.phoenix</groupId>
            <artifactId>phoenix-spark</artifactId>
            <version>5.0.0-HBase-2.0</version>
            <exclusions>
                <exclusion>
                    <groupId>org.glassfish</groupId>
                    <artifactId>javax.el</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

</dependencies>

3.2)主要逻辑代码

public static void main(String[] args) throws Exception {

        //TODO 1.获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        //1.1 设置CK&状态后端
        //env.setStateBackend(new FsStateBackend("hdfs://hadoop102:8020/gmall-flink-210325/ck"));
        //env.enableCheckpointing(5000L);
        //env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        //env.getCheckpointConfig().setCheckpointTimeout(10000L);
        //env.getCheckpointConfig().setMaxConcurrentCheckpoints(2);
        //env.getCheckpointConfig().setMinPauseBetweenCheckpoints(3000);

        //env.setRestartStrategy(RestartStrategies.fixedDelayRestart());

        //TODO 2.消费Kafka ods_base_db 主题数据创建流
        String sourceTopic = "ods_base_db";
        String groupId = "base_db_app_211212";
        DataStreamSource<String> kafkaDS = env.addSource(MyKafkaUtil.getKafkaConsumer(sourceTopic, groupId));

        //TODO 3.将每行数据转换为JSON对象并过滤(delete) 主流
        SingleOutputStreamOperator<JSONObject> jsonObjDS = kafkaDS.map(JSON::parseObject)
                .filter(new FilterFunction<JSONObject>() {
                    @Override
                    public boolean filter(JSONObject value) throws Exception {
                        //取出数据的操作类型
                        String type = value.getString("type");

                        return !"delete".equals(type);
                    }
                });

        //TODO 4.使用FlinkCDC消费配置表并处理成    广播流
        SourceFunction<String> mySqlSource = MySqlSource.<String>builder()
                .hostname("your hostname")
                .port(3306)
                .databaseList("lcy_db") // monitor all tables under inventory database
                .tableList("lcy_db.table_process")
                .username("luchangyin")
                .password("123456")
                .deserializer(new MyStringDeserializationSchema()) // 自定义:converts SourceRecord to JSON String
                .startupOptions(StartupOptions.initial()) // .initial() latest
                .build();

        DataStream<String> tbProcessDStream = env.addSource(mySqlSource).name("source-cdc-table_process");
        MapStateDescriptor<String, TableProcess> mapStateDescriptor = new MapStateDescriptor<>("map-state", String.class, TableProcess.class);
        BroadcastStream<String> broadcastStream = tbProcessDStream.broadcast(mapStateDescriptor);

        //TODO 5.连接主流和广播流
        BroadcastConnectedStream<JSONObject, String> connectedStream = jsonObjDS.connect(broadcastStream);

        //TODO 6.分流  处理数据  广播流数据,主流数据(根据广播流数据进行处理)
        OutputTag<JSONObject> hbaseTag = new OutputTag<JSONObject>("hbase-tag") {};
        SingleOutputStreamOperator<JSONObject> kafka = connectedStream.process(new MyTableProcessFunction(hbaseTag, mapStateDescriptor));

        //TODO 7.提取Kafka流数据和HBase流数据
        DataStream<JSONObject> hbase = kafka.getSideOutput(hbaseTag);

        //TODO 8.将Kafka数据写入Kafka主题,将HBase数据写入Phoenix表
        kafka.print("Kafka >>>>>>>>");
        hbase.print("HBase >>>>>>>");

//  ------- 入 HBASE 和 Kafka 的自定义函数自个去实现,这里不做讲述 --------- 
//        hbase.addSink(new DimSinkFunction());
//        kafka.addSink(MyKafkaUtil.getKafkaProducer(new KafkaSerializationSchema<JSONObject>() {
//            @Override
//            public ProducerRecord<byte[], byte[]> serialize(JSONObject jsonObject, @Nullable Long ts) {
//                return new ProducerRecord<>(jsonObject.getString("sinkTable"),jsonObject.getString("after").getBytes());
//            }
//        }));


        //TODO 9.启动任务
        env.execute("BaseDBApp");
    }

3.3)自定义反序列化类(这里接触过FlinkCDC都知道,接受过来的日志数据格式非常多,因此我们需要自定义获取我们需要的具体通用数据),在 utils 包下:

public class MyStringDeserializationSchema implements DebeziumDeserializationSchema<String> {

    @Override
    public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception {

        //构建结果对象
        JSONObject result = new JSONObject();

        //获取数据库名称&表名称
        String topic = sourceRecord.topic();
        String[] fields = topic.split("\\.");
        String database = fields[1];
        String tableName = fields[2];

        //获取数据
        Struct value = (Struct) sourceRecord.value();

        //After
        Struct after = value.getStruct("after");
        JSONObject data = new JSONObject();
        if (after != null) { //delete数据,则after为null
            Schema schema = after.schema();
            List<Field> fieldList = schema.fields();

            for (int i = 0; i < fieldList.size(); i++) {
                Field field = fieldList.get(i);
                Object fieldValue = after.get(field);
                data.put(field.name(), fieldValue);
            }
        }

        //Before
        Struct before = value.getStruct("before");
        JSONObject beforeData = new JSONObject();
        if (before != null) { //delete数据,则after为null
            Schema schema = before.schema();
            List<Field> fieldList = schema.fields();

            for (int i = 0; i < fieldList.size(); i++) {
                Field field = fieldList.get(i);
                Object fieldValue = before.get(field);
                beforeData.put(field.name(), fieldValue);
            }
        }

        //获取时间
//        long ts = Long.parseLong(value.get("ts_ms").toString());
//        String dt = MyDateUtils.getTimestamp2Fm(ts);

        //获取操作类型 CREATE UPDATE DELETE
        Envelope.Operation operation = Envelope.operationFor(sourceRecord);
        String type = operation.toString().toLowerCase();
        if ("create".equals(type)) {
            type = "insert";
        }

        //封装数据
        result.put("database", database);
        result.put("tableName", tableName);
        result.put("after", data);
        result.put("before", beforeData);
        result.put("type", type);
//        result.put("ts", ts);
//        result.put("dt", dt);

        //输出封装好的数据
        collector.collect(result.toJSONString());
    }

    @Override
    public TypeInformation<String> getProducedType() {
        return BasicTypeInfo.STRING_TYPE_INFO;
    }

}

3.4)创建实体类 TableProcess (对应我们的通用配置表):

@Data
public class TableProcess {
    //动态分流Sink常量
    public static final String SINK_TYPE_HBASE = "hbase";
    public static final String SINK_TYPE_KAFKA = "kafka";
    public static final String SINK_TYPE_CK = "clickhouse";
    //来源表
    String sourceTable;
    //操作类型 insert,update,delete
    String operateType;
    //输出类型 hbase kafka
    String sinkType;
    //输出表(主题)
    String sinkTable;
    //输出字段
    String sinkColumns;
    //主键字段
    String sinkPk;
    //建表扩展
    String sinkExtend;
}

3.5)创建合并流的处理类 MyTableProcessFunction :

public class MyTableProcessFunction extends BroadcastProcessFunction<JSONObject, String, JSONObject> {

    private OutputTag<JSONObject> objectOutputTag;
    private MapStateDescriptor<String, TableProcess> mapStateDescriptor;
    private Connection connection;

    public MyTableProcessFunction(OutputTag<JSONObject> objectOutputTag, MapStateDescriptor<String, TableProcess> mapStateDescriptor) {
        this.objectOutputTag = objectOutputTag;
        this.mapStateDescriptor = mapStateDescriptor;
    }

    @Override
    public void open(Configuration parameters) {
        try {
//            System.out.println("凤凰驱动:"+ ShoppingConfig.PHOENIX_DRIVER +" , 凤凰服务:"+ ShoppingConfig.PHOENIX_SERVER);
            Class.forName(ShoppingConfig.PHOENIX_DRIVER);
            connection = DriverManager.getConnection(ShoppingConfig.PHOENIX_SERVER);
//            System.out.println("链接凤凰的对象为 -> "+ connection);
        } catch (Exception e) {
            System.out.println("链接凤凰失败-> "+ e.toString());
            e.printStackTrace();
        }
    }

    @Override
    public void processBroadcastElement(String s, Context context, Collector<JSONObject> collector) throws Exception {
        //TODO 1.获取并解析数据
        JSONObject jsonObject = JSON.parseObject(s);
        String data = jsonObject.getString("after");
        TableProcess tableProcess = JSON.parseObject(data, TableProcess.class);

        //TODO 2.建表
        if(TableProcess.SINK_TYPE_HBASE.equals(tableProcess.getSinkType())){
            checkTable(tableProcess.getSinkTable(),
                    tableProcess.getSinkColumns(),
                    tableProcess.getSinkPk(),
                    tableProcess.getSinkExtend());
        }

        //TODO 3.写入状态,广播出去
        BroadcastState<String,TableProcess> broadcastState = context.getBroadcastState(mapStateDescriptor);
        String key = tableProcess.getSourceTable() +"-"+ tableProcess.getOperateType();
        broadcastState.put(key, tableProcess);
    }

    @Override
    public void processElement(JSONObject value, ReadOnlyContext ctx, Collector<JSONObject> out) throws Exception {
        //TODO 1.获取状态数据
        ReadOnlyBroadcastState<String, TableProcess> broadcastState = ctx.getBroadcastState(mapStateDescriptor);
        String key = value.getString("tableName") +"-"+ value.getString("type");
        TableProcess tableProcess = broadcastState.get(key);

        if (tableProcess != null){
            //TODO 2.过滤字段
            filterColumn(value.getJSONObject("after"), tableProcess.getSinkColumns());

            //TODO 3.分流
            //将输出表/主题信息写入Value
            value.put("sinkTable",tableProcess.getSinkTable());
            String sinkType = tableProcess.getSinkType();
            if(TableProcess.SINK_TYPE_KAFKA.equals(sinkType)){
                //Kafka数据,写入主流
                out.collect(value);
            }else if(TableProcess.SINK_TYPE_HBASE.equals(sinkType)){
                //HBase数据,写入侧输出流
                ctx.output(objectOutputTag, value);
            }
        }else {
            System.out.println("该组合key:"+ key +"不存在!");
        }

    }

    private void filterColumn(JSONObject data, String sinkColumns) {

        String[] fields = sinkColumns.split(",");
        List<String> columns = Arrays.asList(fields);
        data.entrySet().removeIf(next -> !columns.contains(next.getKey()));
    }

    //建表语句 : create table if not exists db.tn(id varchar primary key,tm_name varchar) xxx;
    private void checkTable(String sinkTable, String sinkColumns, String sinkPk, String sinkExtend) {

        PreparedStatement preparedStatement = null;

        try{
            if (sinkPk == null){
                sinkPk = "id";
            }

            if(sinkExtend == null){
                sinkExtend = "";
            }

            StringBuffer createTableSQL = new StringBuffer("create table if not exists ")
                    .append(ShoppingConfig.HBASE_SCHEMA)
                    .append(".")
                    .append(sinkTable)
                    .append("(");

            String[] fields = sinkColumns.split(",");
            for(int i = 0; i < fields.length; i++){
                String field = fields[i];
                //判断是否为主键
                if (sinkPk.equals(field)){
                    createTableSQL.append(field).append(" varchar primary key ");
                }else{
                    createTableSQL.append(field).append(" varchar ");
                }

                //判断是否为最后一个字段,如果不是,则添加","
                if (i < fields.length - 1){
                    createTableSQL.append(",");
                }
            }

            createTableSQL.append(")").append(sinkExtend);

            //打印建表语句
            System.out.println(createTableSQL);

            //预编译
            preparedStatement = connection.prepareStatement(createTableSQL.toString());
            //执行
            preparedStatement.execute();
        }catch (SQLException e){
            throw new RuntimeException("Phoenix表"+ sinkTable +"建表失败!");
        }finally {
            if (preparedStatement != null){
                try{
                    preparedStatement.close();
                }catch (SQLException e){
                    e.printStackTrace();
                }
            }
        }

    }

}

3.6)在MySQL中创建我们的配置表

-- 配置表
CREATE TABLE `table_process` (
     `source_table` varchar(200) NOT NULL COMMENT '来源表 ',
     `operate_type` varchar(200) NOT NULL COMMENT '操作类型 insert,update,delete',
     `sink_type` varchar(200) DEFAULT NULL COMMENT '输出类型 hbase kafka',
     `sink_table` varchar(200) DEFAULT NULL COMMENT '输出表 (主题 )',
     `sink_columns` varchar(2000) DEFAULT NULL COMMENT '输出字段 ',
     `sink_pk` varchar(200) DEFAULT NULL COMMENT '主键字段 ',
     `sink_extend` varchar(200) DEFAULT NULL COMMENT '建表扩展 ',
     PRIMARY KEY (`source_table`,`operate_type`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

insert into table_process(source_table,operate_type,sink_type,sink_table,sink_columns) values ('order_detail','insert','hbase','order_detail','id,order_id,sku_name,sku_num,create_time');
insert into table_process(source_table,operate_type,sink_type,sink_table,sink_columns) values ('user_info','read','hbase','user_info','id,name,phone_num,user_level,create_time');
insert into table_process(source_table,operate_type,sink_type,sink_table,sink_columns) values ('base_attr_info','read','kafka','base_attr_info','id,attr_name,category_id');
select * from table_process;

四、联调测试

4.1)部分kafka源数据:

kafka-source 的数据 :> {"database":"lcy_db","before":{},"after":{"name":"钟表","id":10},"type":"read","tableName":"base_category1"}
kafka-source 的数据 :> {"database":"lcy_db","before":{},"after":{"name":"鞋靴","id":11},"type":"read","tableName":"base_category1"}
kafka-source 的数据 :> {"database":"lcy_db","before":{},"after":{"name":"母婴","id":12},"type":"read","tableName":"base_category1"}
kafka-source 的数据 :> {"database":"lcy_db","before":{},"after":{"name":"礼品箱包","id":13},"type":"read","tableName":"base_category1"}
kafka-source 的数据 :> {"database":"lcy_db","before":{},"after":{"name":"食品饮料、保健食品","id":14},"type":"read","tableName":"base_category1"}
kafka-source 的数据 :> {"database":"lcy_db","before":{},"after":{"name":"珠宝","id":15},"type":"read","tableName":"base_category1"}

4.2)配置表部分数据:

配置表的数据 : > {"database":"lcy_db","before":{},"after":{"operate_type":"read","sink_type":"kafka","sink_table":"base_attr_info","source_table":"base_attr_info","sink_columns":"id,attr_name,category_id"},"type":"read","tableName":"table_process"}
配置表的数据 : > {"database":"lcy_db","before":{},"after":{"operate_type":"insert","sink_type":"hbase","sink_table":"order_detail","source_table":"order_detail","sink_columns":"id,order_id,sku_name,sku_num,create_time"},"type":"read","tableName":"table_process"}
配置表的数据 : > {"database":"lcy_db","before":{},"after":{"operate_type":"read","sink_type":"hbase","sink_table":"user_info","source_table":"user_info","sink_columns":"id,name,phone_num,user_level,create_time"},"type":"read","tableName":"table_process"}

4.3)分流结果数据:

# 入hbase的维表数据
HBase >>>>>>>> {"sinkTable":"user_info","database":"lcy_db","before":{},"after":{"create_time":"2020-12-04 23:28:45","name":"时心邦","user_level":"1","phone_num":"13794339138","id":3998},"type":"read","tableName":"user_info"}
HBase >>>>>>>> {"sinkTable":"user_info","database":"lcy_db","before":{},"after":{"create_time":"2020-12-04 23:28:45","name":"宋厚庆","user_level":"1","phone_num":"13274778653","id":3999},"type":"read","tableName":"user_info"}
HBase >>>>>>>> {"sinkTable":"user_info","database":"lcy_db","before":{},"after":{"create_time":"2020-12-04 23:28:45","name":"康素云","user_level":"1","phone_num":"13739911376","id":4000},"type":"read","tableName":"user_info"}
# 入kafka的事实表数据
Kafka >>>>>>>>> {"sinkTable":"base_attr_info","database":"lcy_db","before":{},"after":{"category_id":477,"attr_name":"香水彩妆","id":112},"type":"read","tableName":"base_attr_info"}
Kafka >>>>>>>>> {"sinkTable":"base_attr_info","database":"lcy_db","before":{},"after":{"category_id":477,"attr_name":"面部护肤","id":113},"type":"read","tableName":"base_attr_info"}
Kafka >>>>>>>>> {"sinkTable":"base_attr_info","database":"lcy_db","before":{},"after":{"category_id":473,"attr_name":"香调","id":114},"type":"read","tableName":"base_attr_info"}

五、总结

  到这里我们已经完成了动态分流的主题实现,之前要是没接触过,说容易也不容易,但是说难也不是很难;之所以总结这个动态分流主要是在项目中还是挺重要的,毕竟原业务系统日志数据过来之后会统一放在同一个topic中,即使你在代码中使用判断有多少个业务表然后在发不作业也行,不过这样的弊端是如果源业务系统有新增表的话必须要添加判断然后再重新发布作业,这样是不利于我们在生产上的操作的,那么我们的动态分流技术就可以很好的避免了此类的弊端,如果使用了动态分流,那么如果业务表中有新增数据,我们只需要在配置表中添加新表的信息即可,即我们只需要维护这个配置表即可,程序不需要动,这样大大的提升了开发成本和维护效率。over了。。。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,456评论 5 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,370评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,337评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,583评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,596评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,572评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,936评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,595评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,850评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,601评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,685评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,371评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,951评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,934评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,167评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 43,636评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,411评论 2 342

推荐阅读更多精彩内容