Flink-CDC 1.2 介绍及使用

CDC简介

CDC 是 Change Data Capture(变更数据获取)的简称。核心思想是,监测并捕获数据库的变动(包括数据或数据表的插入、更新以及删除等),将这些变更按发生的顺序完整记录下来,写入到消息中间件中以供其他服务进行订阅及消费。

CDC 的种类

CDC 主要分为基于查询和基于 Binlog 两种方式,我们主要了解一下这两种之间的区别:

基于查询的CDC 基于Binlog的CDC
开源产品 Sqoop、Kafka JDBC Source Canal、Maxwell、Debezium
执行模式 Batch Streaming
是否可以捕获所有数据变化
延迟性 高延迟 低延迟
是否增加数据库压力

Sqoop 导入数据的方式为通过SQL语句进行查询得到数据

  • 全量导入:where 1=1
  • 增量导入:where 创建时间=当天
  • 新增及变化:where 创建时间=当天 or 修改时间=当天
  • 特殊:只导入一次

基于查询的 和 基于binlog的,查询方式为了不影响日常数据库性能,所以一般会在凌晨等时间进行批量操作,这种方式会导致,数据的中间状态没办法被记录,如订单,今天下单,支付,发货,最终只能记录到发货,下单和支付状态无法记录。

Flink-CDC 简介

Flink 社区开发了 flink-cdc-connectors 组件,这是一个可以直接从 MySQL、PostgreSQL 等数据库直接读取全量数据和增量变更数据的 source 组件。目前也已开源,开源地址:https://github.com/ververica/flink-cdc-connectors官方文档地址 会告诉你 Flink-CDC-Connector 的版本与 Flink 的版本关系。

为什么选择 Flink-CDC

Maxwell 是读取 MySQL 二进制日志并将行更新作为 JSON 写入 Kafka、Kinesis 或其他流媒体平台的工具。

canal 是把自己伪装成MySQL slave,模拟MySQL slave的交互协议向MySQL Mater发送 dump协议,MySQL mater收到canal发送过来的dump请求,开始推送binary log给canal,然后canal解析binary log,再发送到存储目的地,比如MySQL,Kafka,Elastic Search等等。

Debezium 是用于捕获变更数据的开源分布式平台。可以响应数据库的所有插入,更新和删除操作。

Flink-CDC 基于 Debezium,可以直读取到数据并进行定制化的流处理方案,如果使用其他方案,如 Canal、Maxwell等,就需要多一个中间件,来进行数据读取并把读取的数据在发送到kafka等步骤,即时性肯定没有 Flink-CDC,包括还要考虑中间层的高可用等。

FlinkCDC 的使用

把Mysql binlog 打开

# bin log日志的名称
log-bin=mysql-bin
# 以行的方式
binlog_format=row
# 定具体要同步的数据库
binlog-do-db=test_jdbc1
# 重启 MYSQL
systemctl restart mysqld

编写Flink Stream

依赖
<properties>
    <java.version>1.8</java.version>
    <flink-version>1.12.2</flink-version>
    <logback.version>1.2.3</logback.version>
</properties>
<dependencies>

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>${flink-version}</version>
    </dependency>

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_2.12</artifactId>
        <version>${flink-version}</version>
    </dependency>

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients_2.12</artifactId>
        <version>${flink-version}</version>
    </dependency>

    <!-- 做断点续传(checkpoint)需要 -->
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-client</artifactId>
        <version>3.1.3</version>
    </dependency>

    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <version>8.0.19</version>
    </dependency>

    <dependency>
        <groupId>com.alibaba.ververica</groupId>
        <artifactId>flink-connector-mysql-cdc</artifactId>
        <version>1.2.0</version>
    </dependency>

    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.47</version>
    </dependency>

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-shaded-hadoop-2-uber</artifactId>
        <version>2.8.3-10.0</version>
    </dependency>

</dependencies>

<build>
    <plugins>
        <!-- 一般项目打包是不会含有依赖的,使用这个可以帮你把依赖带上,不带的话提交到job是无法运行的 -->
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-assembly-plugin</artifactId>
            <version>3.0.0</version>
            <configuration>
                <descriptorRefs>
                    <descriptorEef>jar-with-dependencies</descriptorEef>
                </descriptorRefs>
            </configuration>
            <executions>
                <execution>
                    <id>make-assembly</id>
                    <phase>package</phase>
                    <goals>
                        <goal>single</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>
代码

flink 主类

package com.example.flinkcdcmysql;

import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
import com.alibaba.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction;
import com.alibaba.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;


public class FlinkStreamCdc {

    public static void main(String[] args) throws Exception {

        // 1. 获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 1.1 开启CK并指定状态后端为FS,test-jdbc1-checkpoint-211216 目录会自己创建
        env.setStateBackend(new FsStateBackend("hdfs://node103:9000/test-jdbc1/ck"));

        // 5s 做一次 CK
        env.enableCheckpointing(5000L);
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        // ck 的超时时间 10s
        env.getCheckpointConfig().setCheckpointTimeout(10000L);
        // 允许检查点最大并发,当前一个检查点延时,允许在规定的时间点在开启一个检查点,如5s开启的检查点,延时到14s,那10s的时候允许在启动一个检查点,这就2个检查点了
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(2);
        // 最小间隔时间,如5s开启的检查点,延时到14s,最小间隔时间为2s,也就是16s开启另一个检查点,不会存在 并发检查点的问题。
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(2000);

        // 老版本中 需要设置重启策略,新版本重启策略比较合理,老版本重启次数是Int的最大值
        // RestartStrategies.fixedDelayRestart(),重启一次间隔一定时间重启第二次,直到重启次数以参数限制为准
//        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,5000));


        // 2. 通过FlinkCDC构建SourceFunction并读取数据
        DebeziumSourceFunction<String> build = MySQLSource.<String>builder()
                .hostname("192.168.81.104")
                .port(3306)
                .username("dev_fqr")
                .password("Dev@fqr2021")
                // 读取哪个库,可以读取多个库,默认监控库下所有表
                .databaseList("test_jdbc1")
                // 监控库下的某些表 test_jdbc1.table,test_jdbc1.table1
                .tableList("test_jdbc1.test_standard_1")
                // 反序列化  用的是 Debezium 的 StringDebeziumDeserializationSchema() 格式不方便,所以要自定义
                .deserializer(new CustomerStringDebeziumDeserializationSchema())
                // 启动参数 提供了如下几个静态方法
                // StartupOptions.initial() 第一次启动的时候,会把历史数据读过来(全量)做快照,后续读取binlog加载新的数据,如果不做 chackpoint 会存在重启又全量一遍。
                // StartupOptions.earliest() 只从binlog开始的位置读(源头),这里注意,如果binlog开启的时间比你建库时间晚,可能会读不到建库语句会报错,earliest要求能读到建表语句
                // StartupOptions.latest() 只从binlog最新的位置开始读
                // StartupOptions.specificOffset() 自指定从binlog的什么位置开始读
                // StartupOptions.timestamp() 自指定binlog的开始时间戳
                .startupOptions(StartupOptions.initial())
                .build();
        DataStreamSource<String> stringDataStreamSource = env.addSource(build);
        // 3. 打印数据
        stringDataStreamSource.print();

        // 4. 启动任务
        env.execute("flink-cdc");
    }
}

CustomerStringDebeziumDeserializationSchema 类

package com.example.flinkcdcmysql;

import com.alibaba.fastjson.JSONObject;
import com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema;
import io.debezium.data.Envelope;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;

import java.util.Locale;

public class CustomerStringDebeziumDeserializationSchema implements DebeziumDeserializationSchema<String> {

    /**
     * 变为一个JSON格式
     * {
     *     "database":"",
     *     "tableName":"",
     *     "operate":"",
     *     // 修改之前的数据
     *     "before":{
     *
     *     },
     *     // 修改之后的数据
     *     "after":{
     *
     *     }
     * }
    **/
    @Override
    public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception {
        // 1. 创建JSON对象
        JSONObject result = new JSONObject();
        // 2. 获取库名&表名
        String topic = sourceRecord.topic();
        String[] fields = topic.split("\\.");
        String database = fields[1];
        String tableName = fields[2];

        Struct struct = (Struct) sourceRecord.value();
        // 3. 获取before数据
        Struct before = struct.getStruct("before");
        JSONObject beforeJson = new JSONObject();
        if(before != null) {
            Schema beforeSchema = before.schema();
            for (Field field : beforeSchema.fields()) {
                Object beforeValue = before.get(field);
                beforeJson.put(field.name(), beforeValue);
            }
        }
        // 4. 获取after数据
        Struct after = struct.getStruct("after");
        JSONObject afterJson = new JSONObject();
        if(after != null) {
            Schema afterSchema = after.schema();
            for (Field field : afterSchema.fields()) {
                Object afterValue = after.get(field);
                afterJson.put(field.name(), afterValue);
            }
        }
        // 5. 获取操作类型
        Envelope.Operation operation = Envelope.operationFor(sourceRecord);
        String opName = operation.toString().toLowerCase();
        // 为后续方便转一下
        if("create".equals(opName)){
            opName = "insert";
        }
        // 6. 将字段写入JSON对象
        result.put("database",database);
        result.put("tableName",tableName);
        result.put("before",beforeJson);
        result.put("after",afterJson);
        result.put("operate",opName);
        // 7. 输出数据
        collector.collect(result.toJSONString());
    }

    // 和 StringDebeziumDeserializationSchema 保持一致
    @Override
    public TypeInformation<String> getProducedType() {
        return BasicTypeInfo.STRING_TYPE_INFO;
    }

}

提交到 flink web 中会提示 :
ould not find a file system implementation for scheme 'hdfs'. The scheme is not directly supported by Flink and no Hadoop file system to support this scheme could be loaded.

需要给每个做为 Task 的服务器中的 flink 加入如下依赖,把该jar 下载下来,然后丢到 flink/lib 目录下。
需要注意的是看jar名是 hadoop2,但实际我的 hadoop 版本为 3.1.3 依然可以使用
<dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-shaded-hadoop-2-uber</artifactId>
      <version>2.8.3-10.0</version>
</dependency>

flink 在同步的时候,会先读取默认的binlog文件,把原始数据载入,后从配置文件中指定的库去读取。如果对已有大量数据或表的同步的时候,会有假死的现象,其实是 flink 在读取binlog从中扫描库、表已经配置信息以及解析数据。同步过程中不建议并行度 > 1 可能会导致数据重复

通过 op 字段,可以解析出对数据的操作行为,是删除(d),修改(u)还是新增(c),

断点续传

在执行过程中我们可以看到 flink 会根据我们 checkpoint 的策略进行检查点报错到 hadoop

但如果你手动 cancel job 之后,会发现 在 hadoop 中并不会保留这个 chk-xx 这个文件,必须使用 save point 的命令触发保存才有效。

cd /flink/bin
# 手动触发 387bbf770086336de78819d9fee38579 为 运行的任务 ID
./flink savepoint 387bbf770086336de78819d9fee38579 hdfs://node103:9000/test-jdbc1/sv

执行完命令后就可以取消job,并重启job。

命令方式的重启
./flink run -s  hdfs://node103:9000/test-jdbc1/sv/savepoint-387bbf-16028c26460e -c com.example.flinkcdcmysql.FlinkStreamCdc xxxxx.jar

Flink SQL

依赖
    <!-- flink sql 需要的依赖 -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-planner-blink_2.12</artifactId>
        <version>${flink-version}</version>
    </dependency>
代码
package com.example.flinkcdcmysql;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

public class FlinkSqlCdc {

    public static void main(String[] args) throws Exception {

        // 1.获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        // 2.DDL方式建表
        tableEnv.executeSql("CREATE TABLE mysql_binlog ( " +
                " id bigint NOT NULL, " +
                " name varchar(200) " +
                ") WITH ( " +
                " 'connector' = 'mysql-cdc', " + // 连接器的名称
                " 'hostname' = 'node103', " +
                " 'port' = '3306', " +
                " 'username' = 'dev_fqr', " +
                " 'password' = 'Dev@fqr2021!', " +
                " 'database-name' = 'test_jdbc1', " +
                " 'table-name' = 'test_standard_1' " +
                ")");

        // 3.查询数据
        Table table = tableEnv.sqlQuery("select * from mysql_binlog");
        // 4.将动态表转换为流
        DataStream<Tuple2<Boolean, Row>> tuple2DataStream = tableEnv.toRetractStream(table, Row.class);
        tuple2DataStream.print();
        // 5.启动任务
        env.execute("Flink-SQL-CDC");

    }

}

demo 地址,以及MaxWell 和 canal 等都有

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,864评论 6 494
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,175评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,401评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,170评论 1 286
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,276评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,364评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,401评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,179评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,604评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,902评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,070评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,751评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,380评论 3 319
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,077评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,312评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,924评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,957评论 2 351

推荐阅读更多精彩内容