flink 官方中文文档 Apache Flink Documentation | Apache Flink
一、环境准备
mysql 8.0.x 安装mysql
doris 2.1.x 手动部署Doris
flink 1.8 Flink 本地部署
jdk
二、初始化表
flink sql并不会自动给mysql和doris创建表
所以需要自行提前创建表
mysql 略
doris
DROP TABLE IF EXISTS testdb.d_table_a;
CREATE TABLE IF NOT EXISTS testdb.d_table_a
(
`id` BIGINT COMMENT '主键id',
`uid` BIGINT COMMENT '用户ID',
`address` varchar(512) COMMENT '地址',
)
UNIQUE KEY(id)
COMMENT "用户地址表"
DISTRIBUTED BY HASH(id) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1",
"enable_unique_key_merge_on_write" = "true"
);
#参数解释:
#数据表模型 目前分为三类:DUPLICATE KEY, UNIQUE KEY, AGGREGATE KEY
#UNIQUE KEY(id)
#指定表的数据分布方式为按id列的哈希值分布,并设置桶的数量为1。
#DISTRIBUTED BY HASH(id) BUCKETS 1
#设置表的属性,指定副本分配策略,这里设置的是默认位置的一个副本。
#PROPERTIES ("replication_allocation" = "tag.location.default: 1"):
#部分列更新:比如第一个job同步a,b,c列,第二个job同步x,y,z列,如果不开启,第二个job会覆盖掉a,b,c列
#"enable_unique_key_merge_on_write" = "true"
三、实现方式
(一)flink sql
1、flink bin目录下启动sql-client会进入到flink sql客户端
./sql-client.sh
设置执行检查点间隔 10s
2、SET execution.checkpointing.interval = 10s;
3、创建 source
CREATE TABLE mysqlSource (
id BIGINT,
uid BIGINT,
address VARCHAR(512),
PRIMARY KEY (id) NOT ENFORCED
)
WITH (
'connector' = 'mysql-cdc',
'hostname' = ${mysql.hostname},
'port' = ${mysql.port},
'username' = ${mysql.username},
'password' = ${mysql.password},
'database-name' = 'testdb',
'table-name' = 'm_table_a'
);
4、创建 sink
CREATE TABLE dorisSink (
id BIGINT,
uid BIGINT,
address VARCHAR(512),
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'doris',
'fenodes' = ${doris.fenodes} ,
'table.identifier' = 'testdb.d_table_a',
'username' = ${doris.username} ,
'password' = ${doris.password} ,
'sink.properties.partial_columns' = 'true',
'sink.properties.column_separator' = '$^&@#$',
'sink.properties.line_delimiter' = '!^*^!'
);
配置解释:
#flink sql 读取出来的数据类似如下 :
# 1,张三,湖南
# 2,李四,湖北
# 3,王五,北京
# 西城区
# 4,赵六,广东
#其中列分隔符为',',行分隔符为'\n',其中'西城区'一行被当成了一条单独的数据,但是又与doris中表字段不匹配,所以设置行分隔符为'!^*^!'(写成自己数据中不容易出现的组合符号就行),否则写入失败
#设置以下参数
#列分隔符
# 'sink.properties.column_separator' = '$^&@#$',
#行分隔符
# 'sink.properties.line_delimiter' = '!^*^!'
# 支持根据id更新部分字段
'sink.properties.partial_columns' = 'true'
#并且需要在创建表时添加
"enable_unique_key_merge_on_write" = "true"
5、同步source数据到sink
INSERT INTO MysqlSink SELECT id, uid, address FROM dorisSink ;
6、访问本地Flink Web UI 查看运行情况
略
(二)TableApi/DataStream
1、创建maven工程
导入相关依赖
<properties>
<flink.cdc.version>3.2.1</flink.cdc.version>
<doris.version>24.0.1</doris.version>
<mysql.version>8.0.26</mysql.version>
<flink.version>1.18.0</flink.version>
</properties>
<dependencies>
<!-- 连接器 -->
<!-- flink-CDC -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-sql-connector-mysql-cdc</artifactId>
<version>${flink.cdc.version}</version>
</dependency>
<!-- doris-connector -->
<dependency>
<groupId>org.apache.doris</groupId>
<artifactId>flink-doris-connector-1.18</artifactId>
<version>${doris.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc</artifactId>
<version>1.16.1</version>
</dependency>
<!-- mysql-connector -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>${mysql.version}</version>
</dependency>
<!-- Flink 运行环境所需依赖包 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-loader</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>30.1-jre</version>
</dependency>
2、Java代码
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class ExecuteHandler {
public static void main(String[] args) {
// 用于接收args 入参(如果有用到)
// ParameterTool parameterTool = ParameterTool.fromArgs(args);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 启用检查点机制,并设置检查点的时间间隔为 10000 毫秒(即 10 秒)。这意味着 Flink 每 10 秒会自动触发一次检查点。
env.enableCheckpointing(10000);
// 配置检查点模式为 EXACTLY_ONCE,这意味着 Flink 会确保每个数据记录在发生故障后恰好被处理一次,从而保证数据的一致性和准确性。
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 设置两次检查点之间的最小暂停时间间隔为 500 毫秒。这可以防止频繁的检查点操作对性能造成影响。
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
// 设置检查点超时时间为 60000 毫秒(即 60 秒)。如果一个检查点在 60 秒内没有完成,则会被视为失败。
env.getCheckpointConfig().setCheckpointTimeout(60000);
// 设置同时进行的最大检查点数为 1。这意味着在前一个检查点完成之前,不会开始新的检查点。
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// 设置可容忍的检查点失败次数为 5。这意味着在连续 5 次检查点失败后,Flink 才会停止尝试新的检查点。
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(5);
// 设置本地,checkpoint的保持位置,发生故障时可以找到故障的checkpoint,重启时指定对应的checkpoint
env.getCheckpointConfig().setCheckpointStorage("file:///tmp/flink-checkpoints");
// 设置重启策略为基于失败率的重启策略。具体来说,如果在 5 分钟内发生 3 次失败,Flink 将尝试重启作业,每次重启之间等待 10 秒。
env.setRestartStrategy(RestartStrategies.failureRateRestart(3, Time.minutes(5), Time.seconds(10)));
// 并行度设置为1
env.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 创建 source
tableEnv.executeSql("CREATE TABLE mysqlSource (\n" +
" id BIGINT,\n" +
" uid BIGINT,\n" +
" address VARCHAR(512),\n" +
" PRIMARY KEY (id) NOT ENFORCED\n" +
")\n" +
"WITH (\n" +
" 'connector' = 'mysql-cdc',\n" +
" 'hostname' = ${mysql.hostname},\n" +
" 'port' = ${mysql.port},\n" +
" 'username' = ${mysql.username},\n" +
" 'password' = ${mysql.password},\n" +
" 'database-name' = 'testdb',\n" +
" 'table-name' = 'm_table_a'\n" +
");");
// 创建 sink
tableEnv.executeSql("CREATE TABLE dorisSink (\n" +
" id BIGINT,\n" +
" uid BIGINT,\n" +
" address VARCHAR(512),\n" +
" PRIMARY KEY (id) NOT ENFORCED\n" +
") WITH (\n" +
" 'connector' = 'doris',\n" +
" 'fenodes' = ${doris.fenodes} ,\n" +
" 'table.identifier' = 'testdb.d_table_a',\n" +
" 'username' = ${doris.username} ,\n" +
" 'password' = ${doris.password} ,\n" +
" 'sink.properties.column_separator' = '$^&@#$',\n" +
" 'sink.properties.line_delimiter' = '!^*^!'\n" +
");");
// 同步source数据到sink
tableEnv.executeSql("INSERT INTO MysqlSink SELECT id, uid, address FROM dorisSink ;");
}
}
3、打包并上传到flink
入口类必填,其他三个参数视情况而定,然后点击submit即可看到对应job成功启动
四、遇到的问题
本地单机部署任务没有报错,但是通过flink kubernetes operator部署application模式,提示如下,但是在flink启动日志中有对应的连接器信息
Cannot discover a connector using option: 'connector'='mysql-cdc'
Cannot discover a connector using option: 'connector'='doris'
原因:
Flink使用Java的服务提供者接口(SPI)通过标识符加载表连接器/格式工厂。由于SPI资源文件名为org.apache.flink.table.factorys。每个表连接器/格式的工厂都在同一个目录META-INF/services下,当构建使用多个表连接器/格式化的项目的uber jar时,这些资源文件将相互覆盖,这将导致Flink无法加载表连接器/模板工厂。
以flink-sql-connector-mysql-cdc-2.4.1为例
官网参考:概览 | Apache Flink
解决方案:
使用官网的打包方式打包
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<executions>
<execution>
<id>shade</id>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers combine.children="append">
<!-- The service transformer is needed to merge META-INF/services files -->
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
<!-- ... -->
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>