搭建flinkCDC采集mysql到doris环境
1. 搭建环境
第1步首先搭建平台包扩flink、flinkCDC、mysql、doris;
1.1 准备环境
- 实验平台:VMware虚拟机CentOS8;
虚拟机需要最小6G内存+30G存储;关于linux系统存储扩容,可以参考LVM(logic volumn manager)系统
Docker: Docker version 24.0.1, build 6802122
jkd:orale的Java11;
ssh工具:MobaXterm;
准备Java
flink1.8依赖Java11,需要卸载linux centOS8自带的openJDK1.8;
使用yum remove命令卸载即可。yum list|grep jdk 列出Java安装目录,删除即可。
- 到oracle官网下载java11的.gz包(jdk-11.0.20_linux-x64_bin.tar.gz),上传解压,配置/etc/profile环境变量,如下追加到文件末尾即可。
JAVA_HOME=/root/java/jdk-11.0.20 # 本次项目Java安装目录,具体安装时需要根据实际情况修改。
export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
export PATH=$PATH:$JAVA_HOME/bin</pre>
配置好上述文件保存退出执行如下命令,让配置生效
source /etc/profile
在执行查询Java版本
java -version
出现版本号,验证安装jdk成功。
1.2 安装flink:
准备 Flink Standalone 集群:到Fink官网下载最新版flink-1.18.0-bin-scala_2.12.tgz
上传到 Iinux虚拟机,解压之后配置环境变量,同jdk环境变量配置流程
配置好上述文件保存退出执行如下命令,让配置生效
source /etc/profile
进入到flink安装目录,执行./bin/start-cluster.sh
启动flink,最后可以从web-ui页面上 查看flink作业情况:
地址:http://192.168.55.133:8081/
如果web页面无法访问,需要检查 flink的配置,如下0.0.0.0代表任意访问地址。
其他需要测试保存点,打开页面作业取消、提交入口等配置,也可以进入flink的配置目录flink-conf.yaml 进行配置。如下:
185 #==============================================================================
186 # Rest & web frontend
187 #==============================================================================
188
189 # The port to which the REST client connects to. If rest.bind-port has
190 # not been specified, then the server will bind to this port as well.
191 #
192 rest.port: 8081
193
194 # The address to which the REST client will connect to
195 #
196 rest.address: 0.0.0.0
197
198 # Port range for the REST and web server to bind to.
199 #
200 #rest.bind-port: 8080-8090
201
202 # The address that the REST & web server binds to
203 # By default, this is localhost, which prevents the REST & web server from
204 # being able to communicate outside of the machine/container it is running on.
205 #
206 # To enable this, set the bind address to one that has access to outside-facing
207 # network interface, such as 0.0.0.0.
208 #
209 rest.bind-address: 0.0.0.0
210
211 # Flag to specify whether job submission is enabled from the web-based
212 # runtime monitor. Uncomment to disable.
213
214 #web.submit.enable: false
215 web.submit.enable: true
216
217 # Flag to specify whether job cancellation is enabled from the web-based
218 # runtime monitor. Uncomment to disable.
219
220 web.cancel.enable: true
flink-web-ui访问不到
flink-web Ui访问问题异常
- 当我使用 Docker compose部署flink的时候,可以直接通过外部浏览器访问到服务器的 flink-webUI,但是当我使用本地部署,启动flink之后也无法通过外部浏览器访问到 ui界面。
- 最后查看防火墙状态
systemctl status firewalld
发现是由于防火墙没有关闭,之后关闭防火墙再次访问可以正常打开systemctl stop firewalld
,或者防火墙开着,开放对应端口。具体操作见下面附件《防火墙操作》章节
2. Docker部署mysql和doris
docker的安装和使用这里省略。。。
这里给出运行mysql和doris的dockerfile
2.1 docker部署mysql5.7
docker run -id -p 3306:3306 \
-v /root/mysql/conf:/etc/mysql/mysql.conf.d/ \
-v /root/mysql/data:/var/lib/mysql \
-v /root/mysql/logs:/logs \
-e MYSQL_ROOT_PASSWORD=root \
--name mysql \
mysql:5.7
systemctl start docker
启动docker
启动mysql的binlog
创建mysql配置文件
vi /mydata/mysql/conf/my.cnf
下的my.cnf文件
注意 :每个用于读取 binlog 的 MySQL 数据库客户端都应该有一个唯一的 id,称为 Server id。
docker restart mysql
#重启mysql docker exec -it mysql bin/bash
#docker启动mysql客户端
mysql -u root -proot
show variables like '%log_bin%;
#查看是否生效
外部客户端navicate连接
正常连接即可,若不能正常连接,先开通防火墙端口3306,再执行如下操作
使用navicate连接报错解决,在服务器连接上mysql执行如下命令:
select user,plugin from user where user='root';
alter user 'root'@'%' identified with mysql_native_password by 'root';
flush privileges; </pre>
use mysql;
update user set host = '%' where user = 'root';
select host, user from user; </pre>
再次连接,成功
2.2 使用docker部署doris
最小实验环境(1Fe+1Be)
运行doris容器前,做宿主机配置 由于 Doris 的运行需要内存映射支持,需在宿主机执行如下命令
运行doris容器
--name=fe \
--env FE_SERVERS="fe1:192.168.55.133:9010" \
--env FE_ID=1 \
-p 8030:8030 \
-p 9030:9030 \
-v /data/fe/doris-meta:/opt/apache-doris/fe/doris-meta \
-v /data/fe/log:/opt/apache-doris/fe/log \
-v /data/fe/conf:/opt/apache-doris/be/conf \
--net=host \
apache/doris:2.0.0_alpha-fe-x86_64
# -------------------------------------------------- #
docker run -itd \
--name=be \
--env FE_SERVERS="fe1:192.168.55.133:9010" \
--env BE_ADDR="192.168.55.133:9050" \
-p 8040:8040 \
-v /data/be/storage:/opt/apache-doris/be/storage \
-v /data/be/log:/opt/apache-doris/be/log \
-v /data/fe/conf:/opt/apache-doris/be/conf \
--net=host \
apache/doris:2.0.0_alpha-be-x86_64
通过mysql客户端连接到doris的fe后,将后端be添加到集群
(docker部署host网络模式,无需添加)
ALTER SYSTEM ADD BACKEND "be_host_ip:heartbeat_service_port";
ALTER SYSTEM ADD BACKEND "192.168.55.133:9050";
查看fe、be情况:
show frontends\G;
SHOW BACKENDS\G;
3. 安装flinkCDC
参考flinkCDC官网文档: 基于 Flink CDC 3.0 构建 MySQL 到 Doris 的 Streaming ELT — CDC Connectors for Apache Flink® documentation (ververica.github.io)
通过 FlinkCDC cli 提交任务
下载下面列出的二进制压缩包,并解压得到目录
flink-cdc-3.0.0
: flink-cdc-3.0.0-bin.tar.gz flink-cdc-3.0.0 下会包含 bin、lib、log、conf 四个目录。-
下载下面列出的 connector 包,并且移动到 lib 目录下 下载链接只对已发布的版本有效, SNAPSHOT 版本需要本地基于 master 或 release- 分支编译
编写任务配置 yaml 文件 下面给出了一个整库同步的示例文件 mysql-to-doris.yaml:
################################################################################
# Description: Sync MySQL all tables to Doris
################################################################################
source:
type: mysql
hostname: localhost
port: 3306
username: root
password: 123456
tables: app_db.\.*
server-id: 5400-5404
server-time-zone: UTC
sink:
type: doris
fenodes: 127.0.0.1:8030
username: root
password: ""
table.create.properties.light_schema_change: true
table.create.properties.replication_num: 1
pipeline:
name: Sync MySQL Database to Doris
parallelism: 2
已经启动了flink任务之后(flink-web页面可以看到有 Available Task Slots ),配置好mysql-to-doris.yaml后启动flinkCDC,之后可以看到已经启动
bash bin/flink-cdc.sh mysql-to-doris.yaml
启动之后看到有一个正在运行的job两个task(启动的task数量取决于mysql-to-doris.yaml中配置的并行度,两者一致)
- mysql-to-doris.yaml讲解:
其中: source 中的 tables: app_db.\.*
通过正则匹配同步 app_db
下的所有表。 sink 添加 table.create.properties.replication_num
参数是由于 Docker 镜像中只有一个 Doris BE 节点。
- 最后,通过命令行提交任务到 Flink Standalone cluster
bash bin/flink-cdc.sh mysql-to-doris.yaml
提交成功后,返回信息如:
Pipeline has been submitted to cluster.
Job ID: ae30f4580f1918bebf16752d4963dc54
Job Description: Sync MySQL Database to Doris
在 Flink Web UI,可以看到一个名为 Sync MySQL Database to Doris
的任务正在运行。
更多高级用法
MySQL CDC 连接器 — CDC Connectors for Apache Flink® documentation (ververica.github.io)
附件一:防火墙操作
不关闭防火墙,但是在防火墙上永久开放端口号
- 启动防火墙:
sudo systemctl start firewalld
- 永久开放8081flink-web端口和8030Doris-web端口和数据库连接端口9030:
sudo firewall-cmd --zone=public --add-port=8081/tcp --permanent
sudo firewall-cmd --zone=public --add-port=8030/tcp --permanent
sudo firewall-cmd --zone=public --add-port=9030/tcp --permanent
重新加载防火墙配置:
sudo firewall-cmd --reload
这些命令将启动防火墙并允许8081和8030端口的流量通过防火墙。
查看已经在防火墙中开放了哪些端口,可以使用以下命令:
sudo firewall-cmd --list-ports
这个命令将列出防火墙中已经开放的所有端口。如果你想查看特定服务的端口是否已经开放,可以使用以下命令:
sudo firewall-cmd --list-ports | grep <service_name>
将 `` 替换为你想要查找的服务的名称,例如
ssh
、http
等。另外,如果你想查看所有已经开放的服务,可以使用以下命令:
sudo firewall-cmd --list-services
这个命令将列出防火墙中已经开放的所有服务。
疑问
以上实现了基础核心采集功能
-
断点续传,flink服务挂掉重启:如何保障?
- 应该需要检查点和保存点
如果mysql增加了新表,如何通过flinkCDC同步新增表到doris?
检查点的类型选择如何定?state.backend.type: hashmap、rocksdb、其他
检查点的操作说明:
- 确定作业ID:首先,你需要确定你想要创建 Savepoint 的 Flink 作业的作业ID。你可以在 Flink 的 Web UI 或者命令行中找到作业的作业ID。
- 使用命令行工具创建 Savepoint:使用
flink savepoint
命令来创建 Savepoint。命令的基本格式如下:
./bin/flink savepoint <jobID> [targetDirectory]
其中,`` 是你要创建 Savepoint 的作业的作业ID,
[targetDirectory]
是可选的目标目录,用于指定 Savepoint 的保存路径。如果不指定目标目录,Flink 将会自动选择一个默认的保存路径。例如,如果你的作业ID是
e2b5e0c7f3b7d94d9c5c1a7e60d7e0a1
,你可以使用以下命令来创建 Savepoint:
./bin/flink savepoint e2b5e0c7f3b7d94d9c5c1a7e60d7e0a1 /path/to/savepoint/directory
- 等待 Savepoint 创建完成:执行创建 Savepoint 的命令后,Flink 会开始创建 Savepoint。你需要等待一段时间,直到 Savepoint 创建完成。你可以在 Flink 的 Web UI 中或者命令行中查看 Savepoint 的创建进度和状态。
- 确认 Savepoint 创建完成:一旦 Savepoint 创建完成,你可以在指定的目标目录中找到 Savepoint 文件,确认 Savepoint 创建成功。
通过上述步骤,你就可以手动创建一个 Flink Savepoint。创建成功后,你可以使用这个 Savepoint 来恢复应用程序的状态,或者在升级、迁移应用程序时使用。
希望这个步骤能够帮助你手动创建 Flink 的 Savepoint。如果你有其他问题,欢迎随时提问。
指定saveponit启动作业:
在 Flink 中,你可以在启动作业时指定要使用的 Savepoint。这样,Flink 会从指定的 Savepoint 中恢复应用程序的状态,并从上次保存的状态开始处理数据。下面是如何在启动作业时指定 Savepoint 的步骤:
- 确定要使用的 Savepoint:首先,你需要确定你要使用的 Savepoint 文件的路径。你可以选择之前手动创建的 Savepoint,或者使用 Flink 自动保存的 Savepoint。
- 使用命令行工具启动作业:使用
flink run
命令来启动作业,并在命令中指定要使用的 Savepoint。命令的基本格式如下:
./bin/flink run -s :savepointPath :jobFile
其中,
:savepointPath
是你要使用的 Savepoint 文件的路径,:jobFile
是你要运行的作业文件。例如,如果你想要从
/path/to/savepoint/directory/savepoint-1234
这个 Savepoint 启动作业,你可以使用以下命令:
./bin/flink run -s file:///path/to/savepoint/directory/savepoint-1234 /path/to/your/job.jar
- 等待作业启动:执行启动作业的命令后,Flink 会从指定的 Savepoint 中启动作业。你需要等待一段时间,直到作业成功启动。
通过上述步骤,你就可以在启动作业时指定使用的 Savepoint。这样,Flink 将会从指定的 Savepoint 中恢复应用程序的状态,并继续处理数据。