Flink 使用之 MySQL CDC

Flink 使用介绍相关文档目录

Flink 使用介绍相关文档目录

CDC 简介

CDC即Change Data Capture 变更数据捕获,为Flink 1.11中一个新增功能。我们可以通过CDC得知数据源表的更新内容(包含Insert Update和Delete),并将这些更新内容作为数据流发送到下游系统。捕获到的数据操作具有一个标识符,分别对应数据的增加,修改和删除。

  • +I:新增数据。
  • -U:一条数据的修改会产生两个U标识符数据。其中-U含义为修改前数据。
  • +U:修改之后的数据。
  • -D:删除的数据。

MySQL 启用binlog

接下来以MySQL CDC为例,和大家一起配置Flink MySQL CDC。

在使用CDC之前务必要开启MySQL的binlog。下面以MySQL 5.7版本为例说明。

修改my.cnf文件,在[mysqld]一节增加:

server_id=1
log_bin=mysql-bin
binlog_format=ROW
expire_logs_days=30
binlog_do_db=db_a
binlog_do_db=db_b

配置项的解释如下:

  • server_id:MySQL5.7及以上版本开启binlog必须要配置这个选项。对于MySQL集群,不同节点的server_id必须不同。对于单实例部署则没有要求。
  • log_bin:指定binlog文件名和储存位置。如果不指定路径,默认位置为/var/lib/mysql/
  • binlog_format:binlog格式。有3个值可以选择:ROW:记录哪条数据被修改和修改之后的数据,会产生大量日志。STATEMENT:记录修改数据的SQL,日志量较小。MIXED:混合使用上述两个模式。CDC要求必须配置为ROW。
  • expire_logs_days:bin_log过期时间,超过该时间的log会自动删除。
  • binlog_do_db:binlog记录哪些数据库。如果需要配置多个库,如例子中配置多项。切勿使用逗号分隔。

配置文件修改完毕后保存并重启MySQL。然后进入MySQL命令行,验证是否已启用binlog:

mysql> show variables like '%bin%';
+--------------------------------------------+--------------------------------+
| Variable_name                              | Value                          |
+--------------------------------------------+--------------------------------+
| bind_address                               | *                              |
| binlog_cache_size                          | 32768                          |
| binlog_checksum                            | CRC32                          |
| binlog_direct_non_transactional_updates    | OFF                            |
| binlog_error_action                        | ABORT_SERVER                   |
| binlog_format                              | ROW                            |
| binlog_group_commit_sync_delay             | 0                              |
| binlog_group_commit_sync_no_delay_count    | 0                              |
| binlog_gtid_simple_recovery                | ON                             |
| binlog_max_flush_queue_time                | 0                              |
| binlog_order_commits                       | ON                             |
| binlog_row_image                           | FULL                           |
| binlog_rows_query_log_events               | OFF                            |
| binlog_stmt_cache_size                     | 32768                          |
| binlog_transaction_dependency_history_size | 25000                          |
| binlog_transaction_dependency_tracking     | COMMIT_ORDER                   |
| innodb_api_enable_binlog                   | OFF                            |
| innodb_locks_unsafe_for_binlog             | OFF                            |
| log_bin                                    | ON                             |
| log_bin_basename                           | /var/lib/mysql/mysql-bin       |
| log_bin_index                              | /var/lib/mysql/mysql-bin.index |
| log_bin_trust_function_creators            | OFF                            |
| log_bin_use_v1_row_events                  | OFF                            |
| log_statements_unsafe_for_binlog           | ON                             |
| max_binlog_cache_size                      | 18446744073709547520           |
| max_binlog_size                            | 1073741824                     |
| max_binlog_stmt_cache_size                 | 18446744073709547520           |
| sql_log_bin                                | ON                             |
| sync_binlog                                | 1                              |
+--------------------------------------------+--------------------------------+
29 rows in set (0.00 sec)

发现log_bin的值为ON。binlog配置已生效。

初始化MySQL 源数据表

到这里MySQL环境已经配置完毕。接下来开始准备测试表和数据。

create database demo character set utf8mb4;
use demo;

create table student(`id` int primary key, `name` varchar(128), `age` int);

这里创建了演示数据库demo和一张student表。

使用Java代码读取CDC数据流

到这一步我们开始使用Flink程序来获取CDC数据流。

使用传统MySQL 数据源方式

首先需要引入Flink Connector MySQL CDC依赖。

<dependency>
    <groupId>com.alibaba.ververica</groupId>
    <artifactId>flink-connector-mysql-cdc</artifactId>
    <version>1.3.0</version>
</dependency>

然后使用Table API编写程序。这里我们仅仅将CDC数据流配置为数据源,然后将CDC数据流的内容打印出来。

val env = StreamExecutionEnvironment.getExecutionEnvironment

// 使用MySQLSource创建数据源
// 同时指定StringDebeziumDeserializationSchema,将CDC转换为String类型输出
val sourceFunction = MySQLSource.builder().hostname("your-ip").port(3306)
    .databaseList("demo").username("root").password("123456")
    .deserializer(new StringDebeziumDeserializationSchema).build();

// 单并行度打印,避免输出乱序
env.addSource(sourceFunction).print.setParallelism(1)

env.execute()

此时我们插入一条数据:

insert into student values(2, 'kate', 28);

可以看到程序有如下输出:

SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={ts_sec=1618390979, file=mysql-bin.000003, pos=885, row=1, server_id=1, event=2}} ConnectRecord{topic='mysql_binlog_source.demo.student', kafkaPartition=null, key=Struct{id=2}, keySchema=Schema{mysql_binlog_source.demo.student.Key:STRUCT}, value=Struct{after=Struct{id=2,name=kate,age=28},source=Struct{version=1.4.1.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1618390979000,db=demo,table=student,server_id=1,file=mysql-bin.000003,pos=1011,row=0,thread=2},op=c,ts_ms=1618391175254}, valueSchema=Schema{mysql_binlog_source.demo.student.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}

使用SQL

接下来我们使用更为简洁的SQL方式。

首先引入Flink SQL必须的依赖。需要注意的是,这里使用blink planner。本例子中使用Scala语言编写,所以引入了Scala相关依赖。

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
</dependency>

<dependency>
    <groupId>com.alibaba.ververica</groupId>
    <artifactId>flink-connector-mysql-cdc</artifactId>
    <version>1.3.0</version>
</dependency>

编写如下所示的程序代码:

// 创建Blink Streaming的TableEnvironment
val bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
val tableEnvironment = TableEnvironment.create(bsSettings)

// 创建表,connector使用mysql-cdc
tableEnvironment.executeSql("CREATE TABLE mysql_binlog (id INT NOT NULL, name STRING, age INT) WITH ('connector' = 'mysql-cdc', 'hostname' = '10.180.210.135', 'port' = '3306', 'username' = 'root', 'password' = '123456', 'database-name' = 'demo', 'table-name' = 'student')")

// 创建下游数据表,这里使用print类型的connector,将数据直接打印出来
tableEnvironment.executeSql("CREATE TABLE sink_table (id INT NOT NULL, name STRING, age INT) WITH ('connector' = 'print')")

// 将CDC数据源和下游数据表对接起来
tableEnvironment.executeSql("INSERT INTO sink_table SELECT id, name, age FROM mysql_binlog")

接下来可以执行insert语句插入数据,控制台会打印出数据的变化。

例如我们依次执行:

insert into student values(1,'paul',20);
update student set age=30 where id=1;
delete from student where id=1;

在控制台可以得到如下输出:

+I(1,paul,20)
-U(1,paul,20)
+U(1,paul,30)
-D(1,paul,30)

使用SQL Client读取CDC

相比较创建一个Java项目以jar包的方式创建作业,Fllink提供了一个更为简单的方式:使用 SQL Client。接下来我们开始配置SQL Client环境。

配置Flink环境

在Flink SQL Client使用CDC功能之前,我们需要将相关依赖放入Flink目录。

访问https://mvnrepository.com/artifact/com.alibaba.ververica/flink-connector-mysql-cdc/,下载flink-connector-mysql-cdcjar包,复制到flink安装位置的lib目录中。

启动Flink SQL Client

这里SQL Client在standalone集群上运行。

官网配置方式链接:https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/sqlclient/#getting-started,简单来说是执行Flink安装目录如下两个命令:

./bin/start-cluster.sh
./bin/sql-client.sh embedded

如果没有问题,此时可以进入SQL Client。

执行如下SQL(和上一章"使用SQL"使用的语句相同):

CREATE TABLE mysql_binlog (
 id INT NOT NULL,
 name STRING,
 age INT
) WITH (
 'connector' = 'mysql-cdc',
 'hostname' = 'localhost',
 'port' = '3306',
 'username' = 'root',
 'password' = '123456',
 'database-name' = 'demo',
 'table-name' = 'student'
);

CREATE TABLE sink_table (
 id INT NOT NULL,
 name STRING,
 age INT
) WITH (
    'connector' = 'print'
);

INSERT INTO sink_table SELECT id, name, age FROM mysql_binlog;

然后在MySQL命令行执行些insert语句插入数据。需要注意的是sink_table的输出是无法在SQL client上面查看的。需要打开Flink Web UI的Task Managers页面的stdout标签。可以找到类似如下输出:

+I(1,paul,20)
-U(1,paul,20)
+U(1,paul,30)
-D(1,paul,30)

Flink 已经成功捕获到MySQL的数据变更。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,362评论 5 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,330评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,247评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,560评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,580评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,569评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,929评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,587评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,840评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,596评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,678评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,366评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,945评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,929评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,165评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 43,271评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,403评论 2 342

推荐阅读更多精彩内容