开源湖仓框架LakeSoul支持Flink CDC整库同步

首先,附上Github连接

LakeSoul:https://github.com/lakesoul-io/LakeSoul

一、导语

LakeSoul 是由数元灵科技研发的云原生湖仓一体框架,具备高可扩展的元数据管理、ACID 事务、高效灵活的 upsert 操作、Schema 演进和批流一体化处理等特性。LakeSoul Flink CDC Sink 支持从 MySQL 数据源整库同步到 LakeSoul,能够支持自动建表、自动 Schema 变更、Exactly Once 语义等。

本篇文章将介绍Flink CDC的功能优势以及LakeSoul Flink CDC整库同步的使用教程,并完整地演示:将一个 MySQL 库整库同步到 LakeSoul 中,涵盖自动建表、DDL 变更等操作。

二、Flink CDC

1.CDC简介

CDC 是变更数据捕获(Change Data Capture)的简称,它可以监测并捕获源数据库的增量变动记录,同步到一个或多个数据目的(Sink)。可捕获的数据库变动包括数据或数据表的插入(INSERT)、更新(UPDATE)、删除(DELETE)等等,将这些变更按发生的顺序完整记录下来并写入到消息中间件中。

LakeSoul 提供了一套独立的 CDC 语义表达规范,通过表属性设置一个 CDC Op 列,即可表示每条数据的操作类型,在后续 Merge 时会自动根据操作语义进行合并。可以通过 Debezium、Canal、Flink 等将 CDC 数据转换后导入 LakeSoul。

2.为什么使用Flink CDC

最初LakeSoul框架没有使用Flink CDC,而是通过 Debezium将 CDC 数据转换后导入 LakeSoul:

整体上可以分为一下几个流程:

1. 对接 Mysql 和 kafka

2. 创建 Debezium CDC 同步任务

3. 使用 Spark Streaming,消费 Kafka 数据并同步更新至 LakeSoul

可以看出通过Debezium将 CDC 数据转换后导入LakeSoul整体的处理链路较长,需要用到的组件也比较多。Debezium是通过Kafka Streams 实现的 CDC 功能,而LakeSoul现在使用的是Flink CDC 模块,可以跳过 Debezium 和 Kafka 的中转,使用flink-cdc-connectors对上游数据源的变动进行直接的订阅处理。

Flink社区开发的flink-cdc-connectors 组件是一个可以直接从 MySQL、PostgreSQL 等数据库直接读取全量数据和增量变更数据的 source 组件。flink-cdc-connectors可以捕获在一个或多个表中发生的所有变更。该模式通常有一个前记录和一个后记录。flink-cdc-connectors可以直接在Flink中以非约束模式使用,而不需要使用类似kafka的中间件来转数据。

Flink CDC可以直接从数据库获取到binlog供下游进行业务计算分析,从内部实现上讲,flink-cdc-connectors组件内置了一套Debezium和Kafka组件,但这个细节对用户屏蔽。

可以看到通过Flink将 CDC 数据转换后导入 LakeSoul如下图所示,数据不再通过kafka进行同步,简化了整体架构:

LakeSoul Flink 作业启动后初始化阶段,首先会读取配置的 MySQL DB 中的所有表(排除掉不需要同步的表)。对每一个表,首先判断在 LakeSoul 中是否存在,如果不存在则自动创建一个 LakeSoul 表,其 Schema 与 MySQL 对应表一致。完成初始化后,会读取所有表的 CDC Stream,以 Upsert 的方式写入到对应的各个 LakeSoul 表中。在同步期间如果 MySQL 表发生 DDL Schema 变更,则该变更也会同样应用到对应的 LakeSoul 表中。下面将完整地讲解LakeSoul Flink CDC整库同步的使用教程。

三、LakeSoul Flink CDC整库同步

1. 准备环境

1.1 启动一个本地MySQL数据库

推荐使用 MySQL Docker 镜像来快速启动一个 MySQL 数据库实例:

cd docker/lakesoul-docker-compose-env/

docker run--name lakesoul-test-mysql -e MYSQL_ROOT_PASSWORD=root -e MYSQL_DATABASE=test_cdc -p 3306:3306 -d mysql:8

1.2 配置 LakeSoul 元数据库以及 Spark 环境

1.2.1 启动一个 PostgreSQL 数据库并进行初始化

可以通过docker使用下面命令快速搭建一个pg数据库:

docker run -d --name lakesoul-test-pg -p 5432:5432 -e POSTGRES_USER=lakesoul_test -e POSTGRES_PASSWORD=lakesoul_test -e POSTGRES_DB=lakesoul_test -d swr.cn-north-4.myhuaweicloud.com/dmetasoul-repo/postgres:14.5

进行PG数据库初始化,进入docker容器中,将meta_init.sql拷贝到容器,可通过docker ps命令查看容器id:

docker exec -it 容器id /bin/bash

docker cp script/meta_init.sql 容器id:script/meta_init.sql

在docker容器中执行:

PGPASSWORD=lakesoul_test psql -h localhost -p 5432 -U lakesoul_test -f script/meta_init.sql

1.2.2 安装 Spark 环境

由于 Apache Spark 官方的下载安装包不包含 hadoop-cloud 以及 AWS S3 等依赖,我们提供了一个 Spark 安装包,其中包含了 hadoop cloud 、s3 等必要的依赖:https://dmetasoul-bucket.obs.cn-southwest-2.myhuaweicloud.com/releases/spark/spark-3.3.2-bin-lakesoul-8e167b33.tgz

wget https://dmetasoul-bucket.obs.cn-southwest-2.myhuaweicloud.com/releases/spark/spark-3.3.2-bin-hadoop-3.3.5.tgz

tar xf spark-3.3.2-bin-hadoop-3.3.5.tgz

export SPARK_HOME=${PWD}/spark-3.3.2-bin-dmetasoul

如果是生产部署,推荐下载不打包 Hadoop 的 Spark 安装包:

https://dlcdn.apache.org/spark/spark-3.3.2/spark-3.3.2-bin-without-hadoop.tgz

并参考 https://spark.apache.org/docs/latest/hadoop-provided.html这篇文档使用集群环境中的 Hadoop 依赖和配置。

LakeSoul 发布 jar 包可以从 GitHub Releases 页面下载:https://github.com/meta-soul/LakeSoul/releases

wget https://github.com/meta-soul/LakeSoul/releases/download/v2.2.0/lakesoul-spark-2.2.0-spark-3.3.jar -P $SPARK_HOME/jars

下载后请将 jar 包放到 Spark 安装目录下的 jars 目录中

如果访问 Github 有问题,也可以从如下链接下载:https://dmetasoul-bucket.obs.cn-southwest-2.myhuaweicloud.com/releases/lakesoul/lakesoul-spark-2.2.0-spark-3.3.jar

从 2.1.0 版本起,LakeSoul 自身的依赖已经通过 shade 方式打包到一个 jar 包中。之前的版本是多个 jar 包以 tar.gz 压缩包的形式发布。

1.2.3 为 LakeSoul 增加 PG 数据库配置

默认情况下,pg数据库连接到本地数据库,配置信息如下:

lakesoul.pg.driver=com.lakesoul.shaded.org.postgresql.Driver

lakesoul.pg.url=jdbc:postgresql://127.0.0.1:5432/lakesoul_test?stringtype=unspecified

lakesoul.pg.username=lakesoul_test

lakesoul.pg.password=lakesoul_test

自定义 PG 数据库配置信息,需要在程序启动前增加一个环境变量 lakesoul_home,将配置文件信息引入进来。假如 PG 数据库配置信息文件路径名为:/opt/soft/pg.property,则在程序启动前需要添加这个环境变量:

export lakesoul_home=/opt/soft/pg.property

用户可以在这里自定义数据库配置信息,这样用户自定义 PG DB 的配置信息就会在 Spark 作业中生效。

1.2.4 启动Spark环境

启动一个 spark-sql SQL 交互式查询命令行环境:

$SPARK_HOME/bin/spark-sql --conf spark.sql.extensions=com.dmetasoul.lakesoul.sql.LakeSoulSparkSessionExtension --conf spark.sql.catalog.lakesoul=org.apache.spark.sql.lakesoul.catalog.LakeSoulCatalog --conf spark.sql.defaultCatalog=lakesoul --conf spark.sql.warehouse.dir=/tmp/lakesoul --conf spark.dmetasoul.lakesoul.snapshot.cache.expire.seconds=10

这里启动 Spark 本地任务,增加了两个选项:

1. spark.sql.warehouse.dir=/tmp/lakesoul 设置这个参数是因为 Spark SQL 中默认表保存位置,需要和 Flink 作业产出目录设置为同一个目录。

2. spark.dmetasoul.lakesoul.snapshot.cache.expire.seconds=10 设置这个参数是因为 LakeSoul 在 Spark 中缓存了元数据信息,设置一个较小的缓存过期时间方便查询到最新的数据。

启动 Spark SQL 命令行后,可以执行:

SHOW DATABASES;

SHOW TABLES IN default;

可以看到 LakeSoul 中目前只有一个 default database,其中也没有表。

1.3 预先在 MySQL 中创建一张表并写入数据

1. 安装mycli

pip install mycli

2. 启动 mycli 并连接 MySQL 数据库

mycli mysql://root@localhost:3306/test_cdc -p root

3. 创建表并写入数据

CREATE TABLE mysql_test_1 (id INT PRIMARY KEY, name VARCHAR(255), type SMALLINT);

INSERT INTO mysql_test_1 VALUES (1, 'Bob', 10);

SELECT * FROM mysql_test_1;

2. 启动同步作业

2.1 启动一个本地的 Flink Cluster

可以从 Flink 下载页面下载 Flink 1.14.5,也可以从我们的国内镜像地址下载(与Apache官网完全相同)。

解压下载的 Flink 安装包:

tar xf flink-1.14.5-bin-scala_2.12.tgz

export FLINK_HOME=${PWD}/flink-1.14.5

然后启动一个本地的 Flink Cluster:

$FLINK_HOME/bin/start-cluster.sh

可以打开 http://localhost:8081 查看 Flink 本地 cluster 是否已经正常启动:

2.2 提交 LakeSoul Flink CDC Sink 作业

向上面启动的 Flink cluster 提交一个 LakeSoul Flink CDC Sink 作业:

./bin/flink run-ys1-yjm1G-ytm2G \

-corg.apache.flink.lakesoul.entry.MysqlCdc \

  lakesoul-flink-2.2.0-flink-1.14.jar \

--source_db.host localhost \

--source_db.port3306\

--source_db.db_name test_cdc \

--source_db.user root \

--source_db.password root \

--source.parallelism1\

--sink.parallelism1\

--warehouse_pathfile:/tmp/lakesoul \

--flink.checkpoint file:/tmp/flink/chk \

--flink.savepoint file:/tmp/flink/svp \

--job.checkpoint_interval10000\

--server_time_zoneUTC

其中 lakesoul-flink 的 jar 包可以从 Github Release 页面下载。如果访问 Github 有问题,也可以通过这个链接下载:https://dmetasoul-bucket.obs.cn-southwest-2.myhuaweicloud.com/releases/lakesoul/lakesoul-flink-2.2.0-flink-1.14.jar

http://localhost:8081 Flink 作业页面中,点击 Running Job,进入查看 LakeSoul 作业是否已经处于 Running 状态。

可以点击进入作业页面,此时应该可以看到已经同步了一条数据:

2.3 使用 Spark SQL 读取 LakeSoul 表中已经同步的数据

在 Spark SQL Shell 中执行:

SHOW DATABASES;

SHOW TABLES IN test_cdc;

DESC test_cdc.mysql_test_1;

SELECT * FROM test_cdc.mysql_test_1;

可以看到每条语句的运行结果,即 LakeSoul 中自动新建了一个test_cdcdatabase,其中自动新建了一张mysql_test_1表,表的字段、主键与 MySQL 相同(多了一个 rowKinds 列)

LakeSoul 使用一个额外的操作列(列名可以自定义)来记录 CDC 的操作类型,可以支持从 Debezium, canalFlink CDC 中导入 CDC 数据。

创建 LakeSoul CDC 表,需要添加一个表属性 lakesoul_cdc_change_column 来配置 CDC 变更类型的列名。这一列需要是 string 类型,包含三种取值之一: update, insert, delete.

在 LakeSoul 读数据自动合并时,会保留最新的 insert、update 数据,并自动过滤掉 delete 的行。

2.4 MySQL 中执行 Update 后观察同步情况

在 mycli 中执行更新:

UPDATE mysql_test_1 SET name='Peter' WHERE id=1;

然后在 LakeSoul 中再次读取:

SELECT * FROM test_cdc.mysql_test_1;

可以看到已经读到了最新的数据:

2.5 MySQL 中执行 DDL 后观察同步情况,并读取新、旧数据

在 mycli 中修改表的结构:

ALTER TABLE mysql_test_1 ADD COLUMN new_col FLOAT;

即在最后新增一列,默认为 null。在 mycli 中验证执行结果:

此时,LakeSoul 中已经同步了表结构,我们可以在 spark-sql 中查看表结构:

DESC test_cdc.mysql_test_1;

这时,从 LakeSoul 中读取数据,新增列同样为 null:

SELECT * FROM test_cdc.mysql_test_1;

向 MySQL 中新插入一条数据:

INSERT INTO mysql_test_1 VALUES (2,'Alice',20,9.9);

从 LakeSoul 中再次读取:

从 MySQL 中删除一条数据:

delete from mysql_test_1 where id=1;

从 LakeSoul 中读取:

可以看到 LakeSoul 每次都读取到了同步后的结果,与 MySQL 中完全一致。

2.6 MySQL 中新建表后观察同步情况

在 MySQL 中新建一张表,schema 与之前表不同:

CREATE TABLE mysql_test_2 (name VARCHAR(100) PRIMARY KEY, phone_no VARCHAR(20));

在 LakeSoul 可以看到新表已经自动创建,可以查看表结构:

往 MySQL 新表中插入一条数据:

INSERT INTO mysql_test_2 VALUES ('Bob', '10010');

LakeSoul 中也成功同步并读取到新表的数据:

四、结束语

LakeSoul Flink CDC 支持从 MySQL 数据源整库同步到 LakeSoul;支持 Schema 变更(DDL)自动同步到 LakeSoul;支持运行过程中上游数据库中新建表自动感知,在 LakeSoul 中自动建表;支持严格一次(Exactly Once)语义,即使 Flink 作业发生 Failover,能够保证数据不丢不重。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,100评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,308评论 3 388
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,718评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,275评论 1 287
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,376评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,454评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,464评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,248评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,686评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,974评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,150评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,817评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,484评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,140评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,374评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,012评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,041评论 2 351

推荐阅读更多精彩内容