详解Flink-CDC

CDC介绍

CDC 是 Change Data Capture(变更数据获取)的简称。核心思想是,监测并捕获数据库的变动(包括数据或数据表的插入、更新以及删除等),将这些变更按发生的顺序完整记录下来,写入到消息中间件中以供其他服务进行订阅及消费。

CDC种类

1.基于查询的CDC

例如:Sqoop、Kafka JDBC source等产品。
特点:基于批处理,不能捕获到所有数据的变化、高延迟、需要查询数据库,会增加数据库压力

2.基于binlog的CDC

例如:Maxwell、Canal、Debezium
特点:基于streaming模式、能捕捉所有数据的变化、低延迟、不会增加数据库压力。

Flink-CDC

Flink 社区开发了 flink-cdc-connectors 组件,这是一个可以直接从 MySQL、PostgreSQL
等数据库直接读取全量数据和增量变更数据的 source 组件。目前也已开源。
开源地址:https://github.com/ververica/flink-cdc-connectors

代码实现:

注:flink1.12不支持sql模式,支持stream模式,flink1.13支持

<dependencies>
 <dependency>
 <groupId>org.apache.flink</groupId>
 <artifactId>flink-java</artifactId>
 <version>1.12.0</version>
 </dependency>
 <dependency>
 <groupId>org.apache.flink</groupId>
 <artifactId>flink-streaming-java_2.12</artifactId>
 <version>1.12.0</version>
 </dependency>
 <dependency>
 <groupId>org.apache.flink</groupId>
 <artifactId>flink-clients_2.12</artifactId>
 <version>1.12.0</version>
 </dependency>
 <dependency>
 <groupId>org.apache.hadoop</groupId>
 <artifactId>hadoop-client</artifactId>
 <version>3.1.3</version>
 </dependency>
 <dependency>
 <groupId>mysql</groupId>
 <artifactId>mysql-connector-java</artifactId>
 <version>5.1.49</version>
</dependency>
<dependency>
 <groupId>org.apache.flink</groupId>
 <artifactId>flink-table-planner-blink_2.12</artifactId>
 <version>1.12.0</version>
</dependency>
<dependency>
 <groupId>com.ververica</groupId>
 <artifactId>flink-connector-mysql-cdc</artifactId>
 <version>2.0.0</version>
</dependency>
 <dependency>
 <groupId>com.alibaba</groupId>
 <artifactId>fastjson</artifactId>
 <version>1.2.75</version>
 </dependency>
</dependencies>
<build>
 <plugins>
 <plugin>
 <groupId>org.apache.maven.plugins</groupId>
 <artifactId>maven-assembly-plugin</artifactId>
 <version>3.0.0</version>
 <configuration>
 <descriptorRefs>
 <descriptorRef>jar-with-dependencies</descriptorRef>
 </descriptorRefs>
 </configuration>
 <executions>
 <execution>
 <id>make-assembly</id>
 <phase>package</phase>
 <goals>
 <goal>single</goal>
 </goals>
 </execution>
 </executions>
 </plugin>
 </plugins>
</build>

Flink stream模式

public class FlinkCDC {
 public static void main(String[] args) throws Exception {
 //1.创建执行环境
 StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
 env.setParallelism(1);
 //2.Flink-CDC 将读取 binlog 的位置信息以状态的方式保存在 CK,如果想要做到断点
续传,需要从 Checkpoint 或者 Savepoint 启动程序
 //2.1 开启 Checkpoint,每隔 5 秒钟做一次 CK
 env.enableCheckpointing(5000L);
 //2.2 指定 CK 的一致性语义
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
 //2.3 设置任务关闭的时候保留最后一次 CK 数据
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckp
ointCleanup.RETAIN_ON_CANCELLATION);
 //2.4 指定从 CK 自动重启策略
 env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 2000L));
 //2.5 设置状态后端
 env.setStateBackend(new FsStateBackend("hdfs://hadoop-101:8020/flinkCDC"));
 //2.6 设置访问 HDFS 的用户名
 System.setProperty("HADOOP_USER_NAME", "hadoop");

 //3.创建 Flink-MySQL-CDC 的 Source
 //initial (default): Performs an initial snapshot on the monitored database tables upon first startup, and continue to read the latest binlog.
 //latest-offset: Never to perform snapshot on the monitored database tables upon first startup, just read from the end of the binlog which means only have the changes since the connector was started.
 //timestamp: Never to perform snapshot on the monitored database tables upon first startup, and directly read binlog from the specified timestamp. The consumer will traverse the binlog from the beginning and ignore change events whose timestamp is smaller than the specified timestamp.
 //specific-offset: Never to perform snapshot on the monitored database tables upon first startup, and directly read binlog from the specified offset.
 
 DebeziumSourceFunction<String> mysqlSource = MySQLSource.<String>builder()
 .hostname("hadoop102")
 .port(3306)
 .username("root")
 .password("root")
 .databaseList("flink_test")
 .tableList("flink.user_info") //可选配置项,如果不指定该参数,则会读取上一个配置下的所有表的数据,注意:指定的时候需要使用"db.table"的方式
 .startupOptions(StartupOptions.initial())
 .deserializer(new StringDebeziumDeserializationSchema())
 .build();
 //4.使用 CDC Source 从 MySQL 读取数据
 DataStreamSource<String> mysqlDS = env.addSource(mysqlSource);
 //5.打印数据
 mysqlDS.print();
 //6.执行任务
 env.execute();
 } }

FlinkSQL 方式

import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class FlinkSQL_CDC {
 public static void main(String[] args) throws Exception {
 StreamExecutionEnvironment env = 
 StreamExecutionEnvironment.getExecutionEnvironment();
 env.setParallelism(1);
 StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
 tableEnv.executeSql("CREATE TABLE user_info (" +
 " id INT," +
 " name STRING," +
 " phone_num STRING" +
 ") WITH (" +
 " 'connector' = 'mysql-cdc'," +
 " 'hostname' = 'hadoop-101'," +
 " 'port' = '3306'," +
 " 'username' = 'root'," +
 " 'password' = 'root'," +
 " 'database-name' = 'flink_test'," +
 " 'table-name' = 'user_info'" +
 ")");
 tableEnv.executeSql("select * from user_info").print();
 env.execute();
 } }

自定义反序列化器

import com.alibaba.fastjson.JSONObject;
import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
import com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema;
import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction;
import io.debezium.data.Envelope;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import java.util.Properties;
public class Flink_CDCWithCustomerSchema {
 public static void main(String[] args) throws Exception {
 //1.创建执行环境
 StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
 env.setParallelism(1);
 //2.创建 Flink-MySQL-CDC 的 Source
 DebeziumSourceFunction<String> mysqlSource = MySQLSource.<String>builder()
 .hostname("hadoop102")
 .port(3306)
 .username("root")
 .password("000000")
 .databaseList("gmall-flink")
 .tableList("gmall-flink.z_user_info") //可选配置项,如果不指定该参数,则会读取上一个配置下的所有表的数据,注意:指定的时候需要使用"db.table"的方式
.startupOptions(StartupOptions.initial())
 .deserializer(new DebeziumDeserializationSchema<String>() { //自定义数
据解析器
 @Override
 public void deserialize(SourceRecord sourceRecord, Collector<String> 
collector) throws Exception {
 //获取主题信息,包含着数据库和表名 
mysql_binlog_source.gmall-flink.z_user_info
 String topic = sourceRecord.topic();
 String[] arr = topic.split("\\.");
 String db = arr[1];
 String tableName = arr[2];
 //获取操作类型 READ DELETE UPDATE CREATE
 Envelope.Operation operation = 
Envelope.operationFor(sourceRecord);
 //获取值信息并转换为 Struct 类型
 Struct value = (Struct) sourceRecord.value();
 //获取变化后的数据
 Struct after = value.getStruct("after");
 //创建 JSON 对象用于存储数据信息
 JSONObject data = new JSONObject();
 for (Field field : after.schema().fields()) {
 Object o = after.get(field);
 data.put(field.name(), o);
 }
 //创建 JSON 对象用于封装最终返回值数据信息
 JSONObject result = new JSONObject();
 result.put("operation", operation.toString().toLowerCase());
 result.put("data", data);
 result.put("database", db);
 result.put("table", tableName);
 //发送数据至下游
 collector.collect(result.toJSONString());
 }
 @Override
 public TypeInformation<String> getProducedType() {
 return TypeInformation.of(String.class);
 }
 })
 .build();
 //3.使用 CDC Source 从 MySQL 读取数据
 DataStreamSource<String> mysqlDS = env.addSource(mysqlSource);
 //4.打印数据
 mysqlDS.print();
 //5.执行任务
 env.execute();
 } }

Flink CDC2.0

Flink-CDC1.0存在的问题

1.一致性通过加锁保证

Debesium在保证数据一致性时,需要对读取的库或者表加锁。

2.不支持水平拓展

Flink-CDC只支持单并发,全量读取数据阶段,如果表数据据量级大,读取效率在小时级别。

3.全量读取阶段不支持ckeckpoint

解决办法

在对于有主键的表做初始化模式,整体的流程主要分为 5 个阶段:
1.Chunk 切分;
2.Chunk 分配;(实现并行读取数据&CheckPoint)
3.Chunk 读取;(实现无锁读取)
4.Chunk 汇报;
5.Chunk 分配。

Chunk 切分

根据 Netflix DBlog 的论文中的无锁算法原理,对于目标表按照主键进行数据分片,设置每个切片的区间为左闭右开或者左开右闭来保证数据的连续性。

Chunk 分配

将划分好的 Chunk 分发给多个 SourceReader,每个 SourceReader 读取表中的一部分数据,实现了并行读取的目标。
同时在每个 Chunk 读取的时候可以单独做 CheckPoint,某个 Chunk 读取失败只需要单独执行该 Chunk 的任务,而不需要像 1.x 中失败了只能从头读取。
若每个 SourceReader 保证了数据一致性,则全表就保证了数据一致性。

Chunk 读取

读取可以分为 5 个阶段
1)SourceReader 读取表数据之前先记录当前的 Binlog 位置信息记为低位点;
2)SourceReader 将自身区间内的数据查询出来并放置在 buffer 中;
3)查询完成之后记录当前的 Binlog 位置信息记为高位点;
4)在增量部分消费从低位点到高位点的 Binlog; 5)根据主键,对 buffer 中的数据进行修正并输出。
通过以上5个阶段可以保证每个Chunk最终的输出就是在高位点时该Chunk中最新的数据,但是目前只是做到了保证单个 Chunk 中的数据一致性。

Chunk 汇报

在 Snapshot Chunk 读取完成之后,有一个汇报的流程,如上图所示,即 SourceReader 需要将 Snapshot Chunk 完成信息汇报给 SourceEnumerator。

Chunk 分配

FlinkCDC 是支持全量+增量数据同步的,在 SourceEnumerator 接收到所有的 Snapshot。Chunk 完成信息之后,还有一个消费增量数据(Binlog)的任务,此时是通过下发 Binlog Chunk给任意一个 SourceReader 进行单并发读取来实现的。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,099评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,828评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,540评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,848评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,971评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,132评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,193评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,934评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,376评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,687评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,846评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,537评论 4 335
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,175评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,887评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,134评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,674评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,741评论 2 351

推荐阅读更多精彩内容