CDC简介
CDC 是 Change Data Capture(变更数据获取)的简称。核心思想是,监测并捕获数据库的变动(包括数据或数据表的插入、更新以及删除等),将这些变更按发生的顺序完整记录下来,写入到消息中间件中以供其他服务进行订阅及消费。
CDC 的种类
CDC 主要分为基于查询和基于 Binlog 两种方式,我们主要了解一下这两种之间的区别:
基于查询的CDC | 基于Binlog的CDC | |
---|---|---|
开源产品 | Sqoop、Kafka JDBC Source | Canal、Maxwell、Debezium |
执行模式 | Batch | Streaming |
是否可以捕获所有数据变化 | 否 | 是 |
延迟性 | 高延迟 | 低延迟 |
是否增加数据库压力 | 是 | 否 |
Sqoop 导入数据的方式为通过SQL语句进行查询得到数据
- 全量导入:where 1=1
- 增量导入:where 创建时间=当天
- 新增及变化:where 创建时间=当天 or 修改时间=当天
- 特殊:只导入一次
基于查询的 和 基于binlog的,查询方式为了不影响日常数据库性能,所以一般会在凌晨等时间进行批量操作,这种方式会导致,数据的中间状态没办法被记录,如订单,今天下单,支付,发货,最终只能记录到发货,下单和支付状态无法记录。
Flink-CDC 简介
Flink 社区开发了 flink-cdc-connectors 组件,这是一个可以直接从 MySQL、PostgreSQL 等数据库直接读取全量数据和增量变更数据的 source 组件。目前也已开源,开源地址:https://github.com/ververica/flink-cdc-connectors,官方文档地址 会告诉你 Flink-CDC-Connector 的版本与 Flink 的版本关系。
为什么选择 Flink-CDC
Maxwell 是读取 MySQL 二进制日志并将行更新作为 JSON 写入 Kafka、Kinesis 或其他流媒体平台的工具。
canal 是把自己伪装成MySQL slave,模拟MySQL slave的交互协议向MySQL Mater发送 dump协议,MySQL mater收到canal发送过来的dump请求,开始推送binary log给canal,然后canal解析binary log,再发送到存储目的地,比如MySQL,Kafka,Elastic Search等等。
Debezium 是用于捕获变更数据的开源分布式平台。可以响应数据库的所有插入,更新和删除操作。
Flink-CDC 基于 Debezium,可以直读取到数据并进行定制化的流处理方案,如果使用其他方案,如 Canal、Maxwell等,就需要多一个中间件,来进行数据读取并把读取的数据在发送到kafka等步骤,即时性肯定没有 Flink-CDC,包括还要考虑中间层的高可用等。
FlinkCDC 的使用
把Mysql binlog 打开
# bin log日志的名称
log-bin=mysql-bin
# 以行的方式
binlog_format=row
# 定具体要同步的数据库
binlog-do-db=test_jdbc1
# 重启 MYSQL
systemctl restart mysqld
编写Flink Stream
依赖
<properties>
<java.version>1.8</java.version>
<flink-version>1.12.2</flink-version>
<logback.version>1.2.3</logback.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink-version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>${flink-version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>${flink-version}</version>
</dependency>
<!-- 做断点续传(checkpoint)需要 -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.1.3</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.19</version>
</dependency>
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>1.2.0</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.47</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-hadoop-2-uber</artifactId>
<version>2.8.3-10.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<!-- 一般项目打包是不会含有依赖的,使用这个可以帮你把依赖带上,不带的话提交到job是无法运行的 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
<configuration>
<descriptorRefs>
<descriptorEef>jar-with-dependencies</descriptorEef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
代码
flink 主类
package com.example.flinkcdcmysql;
import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
import com.alibaba.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction;
import com.alibaba.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class FlinkStreamCdc {
public static void main(String[] args) throws Exception {
// 1. 获取执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 1.1 开启CK并指定状态后端为FS,test-jdbc1-checkpoint-211216 目录会自己创建
env.setStateBackend(new FsStateBackend("hdfs://node103:9000/test-jdbc1/ck"));
// 5s 做一次 CK
env.enableCheckpointing(5000L);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// ck 的超时时间 10s
env.getCheckpointConfig().setCheckpointTimeout(10000L);
// 允许检查点最大并发,当前一个检查点延时,允许在规定的时间点在开启一个检查点,如5s开启的检查点,延时到14s,那10s的时候允许在启动一个检查点,这就2个检查点了
env.getCheckpointConfig().setMaxConcurrentCheckpoints(2);
// 最小间隔时间,如5s开启的检查点,延时到14s,最小间隔时间为2s,也就是16s开启另一个检查点,不会存在 并发检查点的问题。
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(2000);
// 老版本中 需要设置重启策略,新版本重启策略比较合理,老版本重启次数是Int的最大值
// RestartStrategies.fixedDelayRestart(),重启一次间隔一定时间重启第二次,直到重启次数以参数限制为准
// env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,5000));
// 2. 通过FlinkCDC构建SourceFunction并读取数据
DebeziumSourceFunction<String> build = MySQLSource.<String>builder()
.hostname("192.168.81.104")
.port(3306)
.username("dev_fqr")
.password("Dev@fqr2021")
// 读取哪个库,可以读取多个库,默认监控库下所有表
.databaseList("test_jdbc1")
// 监控库下的某些表 test_jdbc1.table,test_jdbc1.table1
.tableList("test_jdbc1.test_standard_1")
// 反序列化 用的是 Debezium 的 StringDebeziumDeserializationSchema() 格式不方便,所以要自定义
.deserializer(new CustomerStringDebeziumDeserializationSchema())
// 启动参数 提供了如下几个静态方法
// StartupOptions.initial() 第一次启动的时候,会把历史数据读过来(全量)做快照,后续读取binlog加载新的数据,如果不做 chackpoint 会存在重启又全量一遍。
// StartupOptions.earliest() 只从binlog开始的位置读(源头),这里注意,如果binlog开启的时间比你建库时间晚,可能会读不到建库语句会报错,earliest要求能读到建表语句
// StartupOptions.latest() 只从binlog最新的位置开始读
// StartupOptions.specificOffset() 自指定从binlog的什么位置开始读
// StartupOptions.timestamp() 自指定binlog的开始时间戳
.startupOptions(StartupOptions.initial())
.build();
DataStreamSource<String> stringDataStreamSource = env.addSource(build);
// 3. 打印数据
stringDataStreamSource.print();
// 4. 启动任务
env.execute("flink-cdc");
}
}
CustomerStringDebeziumDeserializationSchema 类
package com.example.flinkcdcmysql;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema;
import io.debezium.data.Envelope;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import java.util.Locale;
public class CustomerStringDebeziumDeserializationSchema implements DebeziumDeserializationSchema<String> {
/**
* 变为一个JSON格式
* {
* "database":"",
* "tableName":"",
* "operate":"",
* // 修改之前的数据
* "before":{
*
* },
* // 修改之后的数据
* "after":{
*
* }
* }
**/
@Override
public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception {
// 1. 创建JSON对象
JSONObject result = new JSONObject();
// 2. 获取库名&表名
String topic = sourceRecord.topic();
String[] fields = topic.split("\\.");
String database = fields[1];
String tableName = fields[2];
Struct struct = (Struct) sourceRecord.value();
// 3. 获取before数据
Struct before = struct.getStruct("before");
JSONObject beforeJson = new JSONObject();
if(before != null) {
Schema beforeSchema = before.schema();
for (Field field : beforeSchema.fields()) {
Object beforeValue = before.get(field);
beforeJson.put(field.name(), beforeValue);
}
}
// 4. 获取after数据
Struct after = struct.getStruct("after");
JSONObject afterJson = new JSONObject();
if(after != null) {
Schema afterSchema = after.schema();
for (Field field : afterSchema.fields()) {
Object afterValue = after.get(field);
afterJson.put(field.name(), afterValue);
}
}
// 5. 获取操作类型
Envelope.Operation operation = Envelope.operationFor(sourceRecord);
String opName = operation.toString().toLowerCase();
// 为后续方便转一下
if("create".equals(opName)){
opName = "insert";
}
// 6. 将字段写入JSON对象
result.put("database",database);
result.put("tableName",tableName);
result.put("before",beforeJson);
result.put("after",afterJson);
result.put("operate",opName);
// 7. 输出数据
collector.collect(result.toJSONString());
}
// 和 StringDebeziumDeserializationSchema 保持一致
@Override
public TypeInformation<String> getProducedType() {
return BasicTypeInfo.STRING_TYPE_INFO;
}
}
提交到 flink web 中会提示 :
ould not find a file system implementation for scheme 'hdfs'. The scheme is not directly supported by Flink and no Hadoop file system to support this scheme could be loaded.
需要给每个做为 Task 的服务器中的 flink 加入如下依赖,把该jar 下载下来,然后丢到 flink/lib 目录下。
需要注意的是看jar名是 hadoop2,但实际我的 hadoop 版本为 3.1.3 依然可以使用
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-hadoop-2-uber</artifactId>
<version>2.8.3-10.0</version>
</dependency>
flink 在同步的时候,会先读取默认的binlog文件,把原始数据载入,后从配置文件中指定的库去读取。如果对已有大量数据或表的同步的时候,会有假死的现象,其实是 flink 在读取binlog从中扫描库、表已经配置信息以及解析数据。同步过程中不建议并行度 > 1 可能会导致数据重复
通过 op 字段,可以解析出对数据的操作行为,是删除(d),修改(u)还是新增(c),
断点续传
在执行过程中我们可以看到 flink 会根据我们 checkpoint 的策略进行检查点报错到 hadoop
但如果你手动 cancel job 之后,会发现 在 hadoop 中并不会保留这个 chk-xx 这个文件,必须使用 save point 的命令触发保存才有效。
cd /flink/bin
# 手动触发 387bbf770086336de78819d9fee38579 为 运行的任务 ID
./flink savepoint 387bbf770086336de78819d9fee38579 hdfs://node103:9000/test-jdbc1/sv
执行完命令后就可以取消job,并重启job。
命令方式的重启
./flink run -s hdfs://node103:9000/test-jdbc1/sv/savepoint-387bbf-16028c26460e -c com.example.flinkcdcmysql.FlinkStreamCdc xxxxx.jar
Flink SQL
依赖
<!-- flink sql 需要的依赖 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.12</artifactId>
<version>${flink-version}</version>
</dependency>
代码
package com.example.flinkcdcmysql;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
public class FlinkSqlCdc {
public static void main(String[] args) throws Exception {
// 1.获取执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 2.DDL方式建表
tableEnv.executeSql("CREATE TABLE mysql_binlog ( " +
" id bigint NOT NULL, " +
" name varchar(200) " +
") WITH ( " +
" 'connector' = 'mysql-cdc', " + // 连接器的名称
" 'hostname' = 'node103', " +
" 'port' = '3306', " +
" 'username' = 'dev_fqr', " +
" 'password' = 'Dev@fqr2021!', " +
" 'database-name' = 'test_jdbc1', " +
" 'table-name' = 'test_standard_1' " +
")");
// 3.查询数据
Table table = tableEnv.sqlQuery("select * from mysql_binlog");
// 4.将动态表转换为流
DataStream<Tuple2<Boolean, Row>> tuple2DataStream = tableEnv.toRetractStream(table, Row.class);
tuple2DataStream.print();
// 5.启动任务
env.execute("Flink-SQL-CDC");
}
}