首先,附上Github连接
LakeSoul:https://github.com/lakesoul-io/LakeSoul
一、导语
LakeSoul 是由数元灵科技研发的云原生湖仓一体框架,具备高可扩展的元数据管理、ACID 事务、高效灵活的 upsert 操作、Schema 演进和批流一体化处理等特性。LakeSoul Flink CDC Sink 支持从 MySQL 数据源整库同步到 LakeSoul,能够支持自动建表、自动 Schema 变更、Exactly Once 语义等。
本篇文章将介绍Flink CDC的功能优势以及LakeSoul Flink CDC整库同步的使用教程,并完整地演示:将一个 MySQL 库整库同步到 LakeSoul 中,涵盖自动建表、DDL 变更等操作。
二、Flink CDC
1.CDC简介
CDC 是变更数据捕获(Change Data Capture)的简称,它可以监测并捕获源数据库的增量变动记录,同步到一个或多个数据目的(Sink)。可捕获的数据库变动包括数据或数据表的插入(INSERT)、更新(UPDATE)、删除(DELETE)等等,将这些变更按发生的顺序完整记录下来并写入到消息中间件中。
LakeSoul 提供了一套独立的 CDC 语义表达规范,通过表属性设置一个 CDC Op 列,即可表示每条数据的操作类型,在后续 Merge 时会自动根据操作语义进行合并。可以通过 Debezium、Canal、Flink 等将 CDC 数据转换后导入 LakeSoul。
2.为什么使用Flink CDC
最初LakeSoul框架没有使用Flink CDC,而是通过 Debezium将 CDC 数据转换后导入 LakeSoul:
整体上可以分为一下几个流程:
1. 对接 Mysql 和 kafka
2. 创建 Debezium CDC 同步任务
3. 使用 Spark Streaming,消费 Kafka 数据并同步更新至 LakeSoul
可以看出通过Debezium将 CDC 数据转换后导入LakeSoul整体的处理链路较长,需要用到的组件也比较多。Debezium是通过Kafka Streams 实现的 CDC 功能,而LakeSoul现在使用的是Flink CDC 模块,可以跳过 Debezium 和 Kafka 的中转,使用flink-cdc-connectors对上游数据源的变动进行直接的订阅处理。
Flink社区开发的flink-cdc-connectors 组件是一个可以直接从 MySQL、PostgreSQL 等数据库直接读取全量数据和增量变更数据的 source 组件。flink-cdc-connectors可以捕获在一个或多个表中发生的所有变更。该模式通常有一个前记录和一个后记录。flink-cdc-connectors可以直接在Flink中以非约束模式使用,而不需要使用类似kafka的中间件来转数据。
Flink CDC可以直接从数据库获取到binlog供下游进行业务计算分析,从内部实现上讲,flink-cdc-connectors组件内置了一套Debezium和Kafka组件,但这个细节对用户屏蔽。
可以看到通过Flink将 CDC 数据转换后导入 LakeSoul如下图所示,数据不再通过kafka进行同步,简化了整体架构:
LakeSoul Flink 作业启动后初始化阶段,首先会读取配置的 MySQL DB 中的所有表(排除掉不需要同步的表)。对每一个表,首先判断在 LakeSoul 中是否存在,如果不存在则自动创建一个 LakeSoul 表,其 Schema 与 MySQL 对应表一致。完成初始化后,会读取所有表的 CDC Stream,以 Upsert 的方式写入到对应的各个 LakeSoul 表中。在同步期间如果 MySQL 表发生 DDL Schema 变更,则该变更也会同样应用到对应的 LakeSoul 表中。下面将完整地讲解LakeSoul Flink CDC整库同步的使用教程。
三、LakeSoul Flink CDC整库同步
1. 准备环境
1.1 启动一个本地MySQL数据库
推荐使用 MySQL Docker 镜像来快速启动一个 MySQL 数据库实例:
cd docker/lakesoul-docker-compose-env/
docker run--name lakesoul-test-mysql -e MYSQL_ROOT_PASSWORD=root -e MYSQL_DATABASE=test_cdc -p 3306:3306 -d mysql:8
1.2 配置 LakeSoul 元数据库以及 Spark 环境
1.2.1 启动一个 PostgreSQL 数据库并进行初始化
可以通过docker使用下面命令快速搭建一个pg数据库:
docker run -d --name lakesoul-test-pg -p 5432:5432 -e POSTGRES_USER=lakesoul_test -e POSTGRES_PASSWORD=lakesoul_test -e POSTGRES_DB=lakesoul_test -d swr.cn-north-4.myhuaweicloud.com/dmetasoul-repo/postgres:14.5
进行PG数据库初始化,进入docker容器中,将meta_init.sql拷贝到容器,可通过docker ps命令查看容器id:
docker exec -it 容器id /bin/bash
docker cp script/meta_init.sql 容器id:script/meta_init.sql
在docker容器中执行:
PGPASSWORD=lakesoul_test psql -h localhost -p 5432 -U lakesoul_test -f script/meta_init.sql
1.2.2 安装 Spark 环境
由于 Apache Spark 官方的下载安装包不包含 hadoop-cloud 以及 AWS S3 等依赖,我们提供了一个 Spark 安装包,其中包含了 hadoop cloud 、s3 等必要的依赖:https://dmetasoul-bucket.obs.cn-southwest-2.myhuaweicloud.com/releases/spark/spark-3.3.2-bin-lakesoul-8e167b33.tgz
wget https://dmetasoul-bucket.obs.cn-southwest-2.myhuaweicloud.com/releases/spark/spark-3.3.2-bin-hadoop-3.3.5.tgz
tar xf spark-3.3.2-bin-hadoop-3.3.5.tgz
export SPARK_HOME=${PWD}/spark-3.3.2-bin-dmetasoul
如果是生产部署,推荐下载不打包 Hadoop 的 Spark 安装包:
https://dlcdn.apache.org/spark/spark-3.3.2/spark-3.3.2-bin-without-hadoop.tgz
并参考 https://spark.apache.org/docs/latest/hadoop-provided.html这篇文档使用集群环境中的 Hadoop 依赖和配置。
LakeSoul 发布 jar 包可以从 GitHub Releases 页面下载:https://github.com/meta-soul/LakeSoul/releases。
wget https://github.com/meta-soul/LakeSoul/releases/download/v2.2.0/lakesoul-spark-2.2.0-spark-3.3.jar -P $SPARK_HOME/jars
下载后请将 jar 包放到 Spark 安装目录下的 jars 目录中
如果访问 Github 有问题,也可以从如下链接下载:https://dmetasoul-bucket.obs.cn-southwest-2.myhuaweicloud.com/releases/lakesoul/lakesoul-spark-2.2.0-spark-3.3.jar
从 2.1.0 版本起,LakeSoul 自身的依赖已经通过 shade 方式打包到一个 jar 包中。之前的版本是多个 jar 包以 tar.gz 压缩包的形式发布。
1.2.3 为 LakeSoul 增加 PG 数据库配置
默认情况下,pg数据库连接到本地数据库,配置信息如下:
lakesoul.pg.driver=com.lakesoul.shaded.org.postgresql.Driver
lakesoul.pg.url=jdbc:postgresql://127.0.0.1:5432/lakesoul_test?stringtype=unspecified
lakesoul.pg.username=lakesoul_test
lakesoul.pg.password=lakesoul_test
自定义 PG 数据库配置信息,需要在程序启动前增加一个环境变量 lakesoul_home,将配置文件信息引入进来。假如 PG 数据库配置信息文件路径名为:/opt/soft/pg.property,则在程序启动前需要添加这个环境变量:
export lakesoul_home=/opt/soft/pg.property
用户可以在这里自定义数据库配置信息,这样用户自定义 PG DB 的配置信息就会在 Spark 作业中生效。
1.2.4 启动Spark环境
启动一个 spark-sql SQL 交互式查询命令行环境:
$SPARK_HOME/bin/spark-sql --conf spark.sql.extensions=com.dmetasoul.lakesoul.sql.LakeSoulSparkSessionExtension --conf spark.sql.catalog.lakesoul=org.apache.spark.sql.lakesoul.catalog.LakeSoulCatalog --conf spark.sql.defaultCatalog=lakesoul --conf spark.sql.warehouse.dir=/tmp/lakesoul --conf spark.dmetasoul.lakesoul.snapshot.cache.expire.seconds=10
这里启动 Spark 本地任务,增加了两个选项:
1. spark.sql.warehouse.dir=/tmp/lakesoul 设置这个参数是因为 Spark SQL 中默认表保存位置,需要和 Flink 作业产出目录设置为同一个目录。
2. spark.dmetasoul.lakesoul.snapshot.cache.expire.seconds=10 设置这个参数是因为 LakeSoul 在 Spark 中缓存了元数据信息,设置一个较小的缓存过期时间方便查询到最新的数据。
启动 Spark SQL 命令行后,可以执行:
SHOW DATABASES;
SHOW TABLES IN default;
可以看到 LakeSoul 中目前只有一个 default database,其中也没有表。
1.3 预先在 MySQL 中创建一张表并写入数据
1. 安装mycli
pip install mycli
2. 启动 mycli 并连接 MySQL 数据库
mycli mysql://root@localhost:3306/test_cdc -p root
3. 创建表并写入数据
CREATE TABLE mysql_test_1 (id INT PRIMARY KEY, name VARCHAR(255), type SMALLINT);
INSERT INTO mysql_test_1 VALUES (1, 'Bob', 10);
SELECT * FROM mysql_test_1;
2. 启动同步作业
2.1 启动一个本地的 Flink Cluster
可以从 Flink 下载页面下载 Flink 1.14.5,也可以从我们的国内镜像地址下载(与Apache官网完全相同)。
解压下载的 Flink 安装包:
tar xf flink-1.14.5-bin-scala_2.12.tgz
export FLINK_HOME=${PWD}/flink-1.14.5
然后启动一个本地的 Flink Cluster:
$FLINK_HOME/bin/start-cluster.sh
可以打开 http://localhost:8081 查看 Flink 本地 cluster 是否已经正常启动:
2.2 提交 LakeSoul Flink CDC Sink 作业
向上面启动的 Flink cluster 提交一个 LakeSoul Flink CDC Sink 作业:
./bin/flink run-ys1-yjm1G-ytm2G \
-corg.apache.flink.lakesoul.entry.MysqlCdc \
lakesoul-flink-2.2.0-flink-1.14.jar \
--source_db.host localhost \
--source_db.port3306\
--source_db.db_name test_cdc \
--source_db.user root \
--source_db.password root \
--source.parallelism1\
--sink.parallelism1\
--warehouse_pathfile:/tmp/lakesoul \
--flink.checkpoint file:/tmp/flink/chk \
--flink.savepoint file:/tmp/flink/svp \
--job.checkpoint_interval10000\
--server_time_zoneUTC
其中 lakesoul-flink 的 jar 包可以从 Github Release 页面下载。如果访问 Github 有问题,也可以通过这个链接下载:https://dmetasoul-bucket.obs.cn-southwest-2.myhuaweicloud.com/releases/lakesoul/lakesoul-flink-2.2.0-flink-1.14.jar
在 http://localhost:8081 Flink 作业页面中,点击 Running Job,进入查看 LakeSoul 作业是否已经处于 Running 状态。
可以点击进入作业页面,此时应该可以看到已经同步了一条数据:
2.3 使用 Spark SQL 读取 LakeSoul 表中已经同步的数据
在 Spark SQL Shell 中执行:
SHOW DATABASES;
SHOW TABLES IN test_cdc;
DESC test_cdc.mysql_test_1;
SELECT * FROM test_cdc.mysql_test_1;
可以看到每条语句的运行结果,即 LakeSoul 中自动新建了一个test_cdcdatabase,其中自动新建了一张mysql_test_1表,表的字段、主键与 MySQL 相同(多了一个 rowKinds 列)
LakeSoul 使用一个额外的操作列(列名可以自定义)来记录 CDC 的操作类型,可以支持从 Debezium, canal 和 Flink CDC 中导入 CDC 数据。
创建 LakeSoul CDC 表,需要添加一个表属性 lakesoul_cdc_change_column 来配置 CDC 变更类型的列名。这一列需要是 string 类型,包含三种取值之一: update, insert, delete.
在 LakeSoul 读数据自动合并时,会保留最新的 insert、update 数据,并自动过滤掉 delete 的行。
2.4 MySQL 中执行 Update 后观察同步情况
在 mycli 中执行更新:
UPDATE mysql_test_1 SET name='Peter' WHERE id=1;
然后在 LakeSoul 中再次读取:
SELECT * FROM test_cdc.mysql_test_1;
可以看到已经读到了最新的数据:
2.5 MySQL 中执行 DDL 后观察同步情况,并读取新、旧数据
在 mycli 中修改表的结构:
ALTER TABLE mysql_test_1 ADD COLUMN new_col FLOAT;
即在最后新增一列,默认为 null。在 mycli 中验证执行结果:
此时,LakeSoul 中已经同步了表结构,我们可以在 spark-sql 中查看表结构:
DESC test_cdc.mysql_test_1;
这时,从 LakeSoul 中读取数据,新增列同样为 null:
SELECT * FROM test_cdc.mysql_test_1;
向 MySQL 中新插入一条数据:
INSERT INTO mysql_test_1 VALUES (2,'Alice',20,9.9);
从 LakeSoul 中再次读取:
从 MySQL 中删除一条数据:
delete from mysql_test_1 where id=1;
从 LakeSoul 中读取:
可以看到 LakeSoul 每次都读取到了同步后的结果,与 MySQL 中完全一致。
2.6 MySQL 中新建表后观察同步情况
在 MySQL 中新建一张表,schema 与之前表不同:
CREATE TABLE mysql_test_2 (name VARCHAR(100) PRIMARY KEY, phone_no VARCHAR(20));
在 LakeSoul 可以看到新表已经自动创建,可以查看表结构:
往 MySQL 新表中插入一条数据:
INSERT INTO mysql_test_2 VALUES ('Bob', '10010');
LakeSoul 中也成功同步并读取到新表的数据:
四、结束语
LakeSoul Flink CDC 支持从 MySQL 数据源整库同步到 LakeSoul;支持 Schema 变更(DDL)自动同步到 LakeSoul;支持运行过程中上游数据库中新建表自动感知,在 LakeSoul 中自动建表;支持严格一次(Exactly Once)语义,即使 Flink 作业发生 Failover,能够保证数据不丢不重。