Flink系列 - 实时数仓之ETL实战(二)

一、概述

  上一篇实战中我们已经使用ogg实现了mysql数据以json的格式同步到了kafka里边去了,也就是说我们的源端的埋点的数据已经处理好咯;那么接下来我们就可以使用 Flink 开始对数据源进行处理计算,当然这里值得一提的是:ogg 同步过来的json数据格式是嵌套型的,而且我们的数据不是想普通的网站日志那么简单,因为的源数据是从数据库中过来的-会涉及到增、删、改,因此我们要对刚从源库中过来的数据进行简单的 ETL 处理。废话不多说,先看下数据格式是长啥样的:

# 添加
{"table":"bms_st.employees","op_type":"I","op_ts":"2012-04-12 14:23:13.177344","current_ts":"2012-04-12T14:23:19.796000","pos":"00000000010000036968","after":{"EID":"101","ENAME":"changyin","ESAL":6666.66}}
{"table":"bms_st.employees","op_type":"I","op_ts":"2012-04-12 14:23:13.177344","current_ts":"2012-04-12T14:23:19.797000","pos":"00000000010000037147","after":{"EID":"102","ENAME":"siling","ESAL":1234.12}}
# 修改
{"table":"bms_st.employees","op_type":"U","op_ts":"2012-04-12 14:24:37.789810","current_ts":"2012-04-12T14:24:44.247000","pos":"00000000010000037501","before":{"EID":"102","ENAME":"siling","ESAL":1234.12},"after":{"EID":"102","ENAME":"sunsiling","ESAL":1000.00}}
# 删除
{"table":"bms_st.employees","op_type":"D","op_ts":"2012-04-12 14:24:37.789810","current_ts":"2012-04-12T14:24:44.248000","pos":"00000000010000037636","before":{"EID":"101","ENAME":"changyin","ESAL":6666.66}}

  从数据格式中可以看得出:op_type 是我们对数据源的增删改的标志,真正的数据是在 after 或者 before 的值里边的。接下来我们将用 Flink 对这些数据进行 ETL处理 并发往 kafka 供下一层数仓计算使用:

二、项目结构
image.png

mmain: 程序入口
utils:工具类
entity:实体类
commonbase:抽象父类
achieve:实现类

三、项目的实现
3.1 静态的资源文件,用于配置信息 application.properties:
# source kafka config
PJbtServers1: cdh101:9092,cdh102:9092,cdh103:9092
PJgroupId1: test
PJoffsetReset1: latest
PJtopicStr1: piaoju-topic

# sink kafka config
pj-BtServers2: cdh101:9092,cdh102:9092,cdh103:9092
pj-ZkStr2: cdh101:2181,cdh102:2181,cdh102:2181
pj-GroupId2: test
pj-OffsetReset2: latest
pj-TopicStr2: piaoju-to-kafka-topic

# ---------------------------------------------------------------------------------------------------------

# 员工日增薪资
employee_tb_name: bms_st.employees
employee_job_name: EmployeeSource
#employee_create_table: employee_money
#employee_row_col: tb_name VARCHAR, op_type VARCHAR, ts VARCHAR, eId VARCHAR, eName VARCHAR, eSal VARCHAR

3.2 在 utils目录 下创建获取以上文件信息值的类 LoadPropertiesFile.java:
import java.io.InputStream;
import java.util.Properties;
/**
 * @author feiniu
 * @create 2020-03-26 9:37
 */
public class LoadPropertiesFile {

    public static String getPropertyFileValues(String proKey){
        String proStr = "";
        try {
            //读取配置文件
            InputStream is = LoadPropertiesFile.class.getClassLoader().getResourceAsStream("application.properties");
            Properties properties = new Properties();
            properties.load(is);
            proStr = properties.getProperty(proKey);
        } catch (Exception e) {
            e.printStackTrace();
            throw new RuntimeException(e);
        }
        return proStr;
    }

}

3.3 commonbase 目录下创建抽象类 对接kafka的数据,并解析关键字段,代码架构如下:
package com.nfdwsyy.commonbase;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.nfdwsyy.utils.LoadPropertiesFile;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import java.text.ParseException;
import java.util.Properties;
/**
 * @author feiniu
 * @create 2020-04-03 20:47
 */
public abstract class SourceCommonBase {

    public void getDataStream(String jobName) throws Exception {

        // 1. 环境的设置

       // 2.资源配置文件信息的获取
      
       // 3.消费者接收数据并做json的简要解析
       
       // 4.抽象方法的设置

}

  1. 环境的设置:
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 开启 Checkpoint,每 1000毫秒进行一次 Checkpoint
        env.enableCheckpointing(1000);
        // Checkpoint 语义设置为 EXACTLY_ONCE
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        // CheckPoint 的超时时间
        env.getCheckpointConfig().setCheckpointTimeout(60000);
        // 同一时间,只允许 有 1 个 Checkpoint 在发生
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
        // 两次 Checkpoint 之间的最小时间间隔为 500 毫秒
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
        // 当 Flink 任务取消时,保留外部保存的 CheckPoint 信息
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        // 当有较新的 Savepoint 时,作业也会从 Checkpoint 处恢复
        env.getCheckpointConfig().setPreferCheckpointForRecovery(true);
        // 作业最多允许 Checkpoint 失败 1 次(flink 1.9 开始支持)
        env.getCheckpointConfig().setTolerableCheckpointFailureNumber(1);

2.资源配置文件信息的获取:

        // 获取资源配置文件信息
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", LoadPropertiesFile.getPropertyFileValues("PJbtServers1"));
        properties.setProperty("group.id", LoadPropertiesFile.getPropertyFileValues("PJgroupId1"));
        properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");  //key 反序列化
        properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("auto.offset.reset", LoadPropertiesFile.getPropertyFileValues("PJoffsetReset1")); //value 反序列化

3.消费者接收数据并做json的简要解析:

        FlinkKafkaConsumer011<String> myConsumer = new FlinkKafkaConsumer011<>(
                LoadPropertiesFile.getPropertyFileValues("PJtopicStr1"),
                new SimpleStringSchema(),
                properties);

        DataStream<String> stream = env.addSource(myConsumer).setParallelism(1);

        // prase json
        DataStream<String> mStream = stream.map(new MapFunction<String, String>() {
            @Override
            public String map(String s) throws Exception {
                JSONObject jsonObject = JSON.parseObject(s);
                String table = jsonObject.getString("table");
                String op_type = jsonObject.getString("op_type");
                String op_ts = jsonObject.getString("op_ts");
                String before = jsonObject.getString("before");
                String after = jsonObject.getString("after");

                String resultStr = parseSourceKafkaJson(table,op_type,op_ts,before,after);
                return resultStr;
            }
        });

        // let chirld etl to kafka
        sendToSinkKafka(mStream);

        env.execute(jobName);

4.抽象方法的设置:

    // let chirld class do it
    public abstract String parseSourceKafkaJson(String table, String op_type, String op_ts, String before, String after) throws ParseException;

    // sink to kafka
    public abstract void sendToSinkKafka(DataStream<String> mStream);

3.4 achieve下创建实现类,用于对数据进行 ETL 处理,类的架构设计如下:
package com.nfdwsyy.achieve;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.nfdwsyy.commonbase.SourceCommonBase;
import com.nfdwsyy.entity.Employee;
import com.nfdwsyy.utils.LoadPropertiesFile;
import com.nfdwsyy.utils.MySinkKafka;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.shaded.netty4.io.netty.util.internal.StringUtil;
import org.apache.flink.streaming.api.datastream.DataStream;
import java.io.Serializable;
import java.text.ParseException;

/**
 * @author feiniu
 * @create 2020-07-23 10:12
 */
public class EmpSourceAchi extends SourceCommonBase implements Serializable {

    @Override
    public String parseSourceKafkaJson(String table, String op_type, String op_ts, String before, String after) throws ParseException {

        // 1.数据的 ETL 处理   (这里根据实际情况而定)
   
    }

    @Override
    public void sendToSinkKafka(DataStream<String> mStream) {
        
           // 2.将处理完之后的数据发往 kafka 队列 供下游计算使用
        
    }

    // 3. 调用父类的处理方法,供主类调用
    
}

1.数据的 ETL 处理:

        String eId = "";
        String eName = "";
        double eSal = 0;
        double after_money = 0;
        double before_money = 0;

        JSONObject jObjBefore = JSON.parseObject(before);
        JSONObject jObjAfter = JSON.parseObject(after);

        System.out.println("在 parseSourceKafkaJson 方法中,table -> "+ table +" , op_type -> "+ op_type +" , op_ts -> "+ op_ts +" , before -> "+ before + " , after -> "+ after);

        String tb_name = LoadPropertiesFile.getPropertyFileValues("employee_tb_name");
        Employee employee = null;
        if (StringUtil.isNullOrEmpty(op_type) || StringUtil.isNullOrEmpty(table)){
            System.out.println("获取的类型为空哦-> "+ op_type);
        }else if (table.equals(tb_name)){
            switch (op_type){
                case "I":
                    eId = jObjAfter.getString("EID");
                    eName = jObjAfter.getString("ENAME");
                    eSal = Double.parseDouble(jObjAfter.getString("ESAL"));
                    break;
                case "U":
                    eId = jObjAfter.getString("EID");
                    eName = jObjAfter.getString("ENAME");
                    after_money = Double.valueOf(jObjAfter.getString("ESAL"));
                    before_money = Double.valueOf(jObjBefore.getString("ESAL"));
                    eSal = after_money - before_money;
                    break;
                case "D":
                    eId = jObjBefore.getString("EID");
                    eName = jObjBefore.getString("ENAME");
                    eSal = Double.parseDouble("-"+ jObjBefore.getString("ESAL"));
                    break;
            }

            employee = new Employee(tb_name, op_type, op_ts, eId, eName, eSal);
        }

        // the entity must have tb_name
        return JSONObject.toJSONString(employee);

2.将处理完之后的数据发往 kafka :

        DataStream<String> mS = mStream.filter(new FilterFunction<String>() {
            @Override
            public boolean filter(String s) throws Exception {
                if (StringUtil.isNullOrEmpty(s)){
                    return false;
                } else {
                    return true;

                }
            }
        });

        String broker_list = LoadPropertiesFile.getPropertyFileValues("pj-BtServers2");
        String topic = LoadPropertiesFile.getPropertyFileValues("pj-TopicStr2");
        String groupId = LoadPropertiesFile.getPropertyFileValues("pj-GroupId2");
        String offsetReset = LoadPropertiesFile.getPropertyFileValues("pj-OffsetReset2");
        // the entity must have tb_name
        String tb_name = LoadPropertiesFile.getPropertyFileValues("employee_tb_name");
       // 发往 Kafka 的自定义类
        mS.addSink(new MySinkKafka(broker_list, topic, groupId, offsetReset, tb_name)).name("employee_tb_name");

  1. 调用父类的处理方法,供主类调用 :
    // transfer the parent method
    public void successKafka2KafkaMethod(){

        try {
            String jobName = LoadPropertiesFile.getPropertyFileValues("employee_job_name");
            getDataStream(jobName +" Source");
        } catch (Exception e) {
            e.printStackTrace();
        }

    }

  到这里整体上算是弄完了,但是要注意的一点是数据发往 kafka 的类是需要我们去自定义的,接下来我们再去创建一个数据发往 kafka 的工具类:

package com.nfdwsyy.utils;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;

/**
 * @author feiniu
 * @create 2020-04-04 10:29
 */
public class MySinkKafka extends RichSinkFunction<String> {

    private Properties props = null;
    private KafkaProducer producer = null;
    private ProducerRecord record = null;

    private String broker_list;
    private String topic;
    private String groupId;
    private String offsetReset;
    private String sourceTbName;

    public MySinkKafka(String broker_list, String topic, String groupId, String offsetReset, String sourceTbName) {
        this.broker_list = broker_list;
        this.topic = topic;
        this.groupId = groupId;
        this.offsetReset = offsetReset;
        this.sourceTbName = sourceTbName;
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);

        props = new Properties();
        props.put("bootstrap.servers", broker_list);
        props.put("group.id", groupId);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //key 序列化
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //value 序列化
        props.put("auto.offset.reset", offsetReset); //value 反序列化

        producer = new KafkaProducer<String, String>(props);

    }

    @Override
    public void invoke(String value, Context context) {

        if(value.equals("") || value.equals("null")) {
            System.out.println("Sink 中 invoke 方法过来的字符串值-> "+ value);
        } else {
            JSONObject jObjNew = JSON.parseObject(value);
            String tb_name = jObjNew.getString("tb_name");

            System.out.println("表明对比 -> " + tb_name + " --- " + sourceTbName);

            if (tb_name.equals(sourceTbName)) {
                record = new ProducerRecord<String, String>(topic, null, null, value);
                producer.send(record);
                System.out.println("发送数据: " + value);
                producer.flush();
            }
        }

    }

    @Override
    public void close() throws Exception {
        super.close();
    }

}

  1. 创建主类,调用ETL方法:
package com.nfdwsyy.mmain;

import com.nfdwsyy.achieve.EmpSourceAchi;

/**
 * @author feiniu
 * @create 2020-07-23 10:55
 */
public class EmployeeMain01 {

    public static void main(String[] args){

        EmpSourceAchi empAchi = new EmpSourceAchi();
        empAchi.successKafka2KafkaMethod();

    }

}

  好了,全部代码都写完了,接下来我们可以去测试使用咯。

四、本地测试 并 打包部署上 yarn
4.1 本地测试

  运行程序之后对数据库的源表进行增删改,即可在控制台看到发往kafka的数据,这里不做本地测试。

4.2 部署上 yarn 服务器

打包并上传至服务器的指定目录,然后执行如下命令部署应用:


bin/flink run -m yarn-cluster -ynm oggsyncflinkjob -d -c com.nfdwsyy.mmain.EmployeeMain01 /opt/mycdhflink/myjar/Kafka2FlinkETL2Kafka.jar

这时候我们可以在页面上部署情况了:
image.png

image.png

接下来我们再启动接收ETL之后的消费者:

bin/kafka-console-consumer.sh --bootstrap-server cdh101:9092,cdh102:9092,cdh103:9092 --topic piaoju-to-kafka-topic --from-beginning

源库中对表数据操作:
image.png

处理过后的数据如下图:
image.png

  从处理结果的数据看来,其实它已经变成是一个处理过增删改操作之后最简单的 json串了,那么至于如果对这些处理过后的数据进行计算如聚合等那都是小菜一碟了;原创不易,转载必须注明出处;欲知如何计算,请看下回分晓,哈哈哈哈。。。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,186评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,858评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,620评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,888评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,009评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,149评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,204评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,956评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,385评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,698评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,863评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,544评论 4 335
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,185评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,899评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,141评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,684评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,750评论 2 351