Flink实践-同步Mysql数据到doris 以及遇到的问题

flink 官方中文文档 Apache Flink Documentation | Apache Flink

一、环境准备

mysql 8.0.x 安装mysql
doris 2.1.x 手动部署Doris
flink 1.8 Flink 本地部署
jdk

二、初始化表

flink sql并不会自动给mysql和doris创建表
所以需要自行提前创建表
mysql 略
doris

DROP TABLE IF EXISTS testdb.d_table_a;
CREATE TABLE IF NOT EXISTS testdb.d_table_a
(
    `id`                  BIGINT COMMENT '主键id',
    `uid`                 BIGINT COMMENT '用户ID',
    `address`             varchar(512) COMMENT '地址',
)
UNIQUE KEY(id)
COMMENT "用户地址表"
DISTRIBUTED BY HASH(id) BUCKETS 1
PROPERTIES (
    "replication_allocation" = "tag.location.default: 1",
    "enable_unique_key_merge_on_write" = "true"
);

#参数解释:
#数据表模型 目前分为三类:DUPLICATE KEY, UNIQUE KEY, AGGREGATE KEY
#UNIQUE KEY(id)
#指定表的数据分布方式为按id列的哈希值分布,并设置桶的数量为1。
#DISTRIBUTED BY HASH(id) BUCKETS 1
#设置表的属性,指定副本分配策略,这里设置的是默认位置的一个副本。
#PROPERTIES ("replication_allocation" = "tag.location.default: 1"): 

#部分列更新:比如第一个job同步a,b,c列,第二个job同步x,y,z列,如果不开启,第二个job会覆盖掉a,b,c列
#"enable_unique_key_merge_on_write" = "true"

三、实现方式

(一)flink sql

1、flink bin目录下启动sql-client会进入到flink sql客户端

./sql-client.sh
flink sql客户端.png

设置执行检查点间隔 10s

2、SET  execution.checkpointing.interval = 10s;

3、创建 source

CREATE TABLE mysqlSource (
    id BIGINT,
    uid BIGINT,
    address VARCHAR(512),
    PRIMARY KEY (id) NOT ENFORCED
)
WITH (
    'connector' = 'mysql-cdc',
    'hostname' = ${mysql.hostname},
    'port' = ${mysql.port},
    'username' = ${mysql.username},
    'password' = ${mysql.password},
    'database-name' = 'testdb',
    'table-name' = 'm_table_a'
);

4、创建 sink

CREATE TABLE dorisSink (
    id BIGINT,
    uid BIGINT,
    address VARCHAR(512),
    PRIMARY KEY (id) NOT ENFORCED
) WITH (
  'connector' = 'doris',
  'fenodes' = ${doris.fenodes} ,
  'table.identifier' = 'testdb.d_table_a',
  'username' = ${doris.username} ,
  'password' = ${doris.password} ,
'sink.properties.partial_columns' = 'true',
  'sink.properties.column_separator' = '$^&@#$',
  'sink.properties.line_delimiter' = '!^*^!'
);

配置解释:

#flink sql 读取出来的数据类似如下 :
# 1,张三,湖南
# 2,李四,湖北
# 3,王五,北京
# 西城区
# 4,赵六,广东

#其中列分隔符为',',行分隔符为'\n',其中'西城区'一行被当成了一条单独的数据,但是又与doris中表字段不匹配,所以设置行分隔符为'!^*^!'(写成自己数据中不容易出现的组合符号就行),否则写入失败
#设置以下参数
#列分隔符
# 'sink.properties.column_separator' = '$^&@#$',
#行分隔符
# 'sink.properties.line_delimiter' = '!^*^!'

# 支持根据id更新部分字段
'sink.properties.partial_columns' = 'true'
#并且需要在创建表时添加
"enable_unique_key_merge_on_write" = "true"

5、同步source数据到sink

INSERT INTO MysqlSink SELECT id, uid, address FROM dorisSink ;

6、访问本地Flink Web UI 查看运行情况

(二)TableApi/DataStream

1、创建maven工程
导入相关依赖

 <properties>
        <flink.cdc.version>3.2.1</flink.cdc.version>
        <doris.version>24.0.1</doris.version>
        <mysql.version>8.0.26</mysql.version>
        <flink.version>1.18.0</flink.version>
    </properties>

    <dependencies>
        <!-- 连接器 -->
        <!-- flink-CDC -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-sql-connector-mysql-cdc</artifactId>
            <version>${flink.cdc.version}</version>
        </dependency>
        <!-- doris-connector -->
        <dependency>
            <groupId>org.apache.doris</groupId>
            <artifactId>flink-doris-connector-1.18</artifactId>
            <version>${doris.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-jdbc</artifactId>
            <version>1.16.1</version>
        </dependency>
        <!-- mysql-connector -->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>${mysql.version}</version>
        </dependency>


        <!-- Flink 运行环境所需依赖包 -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-runtime</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-loader</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-base</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-common</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-core</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>com.google.guava</groupId>
            <artifactId>guava</artifactId>
            <version>30.1-jre</version>
        </dependency>

2、Java代码

import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class ExecuteHandler {
    public static void main(String[] args) {
        // 用于接收args 入参(如果有用到)
        // ParameterTool parameterTool = ParameterTool.fromArgs(args);

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 启用检查点机制,并设置检查点的时间间隔为 10000 毫秒(即 10 秒)。这意味着 Flink 每 10 秒会自动触发一次检查点。
        env.enableCheckpointing(10000);
        // 配置检查点模式为 EXACTLY_ONCE,这意味着 Flink 会确保每个数据记录在发生故障后恰好被处理一次,从而保证数据的一致性和准确性。
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        // 设置两次检查点之间的最小暂停时间间隔为 500 毫秒。这可以防止频繁的检查点操作对性能造成影响。
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
        // 设置检查点超时时间为 60000 毫秒(即 60 秒)。如果一个检查点在 60 秒内没有完成,则会被视为失败。
        env.getCheckpointConfig().setCheckpointTimeout(60000);
        // 设置同时进行的最大检查点数为 1。这意味着在前一个检查点完成之前,不会开始新的检查点。
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
        // 设置可容忍的检查点失败次数为 5。这意味着在连续 5 次检查点失败后,Flink 才会停止尝试新的检查点。
        env.getCheckpointConfig().setTolerableCheckpointFailureNumber(5);
        // 设置本地,checkpoint的保持位置,发生故障时可以找到故障的checkpoint,重启时指定对应的checkpoint
        env.getCheckpointConfig().setCheckpointStorage("file:///tmp/flink-checkpoints");
        // 设置重启策略为基于失败率的重启策略。具体来说,如果在 5 分钟内发生 3 次失败,Flink 将尝试重启作业,每次重启之间等待 10 秒。
        env.setRestartStrategy(RestartStrategies.failureRateRestart(3, Time.minutes(5), Time.seconds(10)));
        // 并行度设置为1
        env.setParallelism(1);

        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        // 创建 source
        tableEnv.executeSql("CREATE TABLE mysqlSource (\n" +
                "    id BIGINT,\n" +
                "    uid BIGINT,\n" +
                "    address VARCHAR(512),\n" +
                "    PRIMARY KEY (id) NOT ENFORCED\n" +
                ")\n" +
                "WITH (\n" +
                "    'connector' = 'mysql-cdc',\n" +
                "    'hostname' = ${mysql.hostname},\n" +
                "    'port' = ${mysql.port},\n" +
                "    'username' = ${mysql.username},\n" +
                "    'password' = ${mysql.password},\n" +
                "    'database-name' = 'testdb',\n" +
                "    'table-name' = 'm_table_a'\n" +
                ");");

        // 创建 sink
        tableEnv.executeSql("CREATE TABLE dorisSink (\n" +
                "    id BIGINT,\n" +
                "    uid BIGINT,\n" +
                "    address VARCHAR(512),\n" +
                "    PRIMARY KEY (id) NOT ENFORCED\n" +
                ") WITH (\n" +
                "  'connector' = 'doris',\n" +
                "  'fenodes' = ${doris.fenodes} ,\n" +
                "  'table.identifier' = 'testdb.d_table_a',\n" +
                "  'username' = ${doris.username} ,\n" +
                "  'password' = ${doris.password} ,\n" +
                "  'sink.properties.column_separator' = '$^&@#$',\n" +
                "  'sink.properties.line_delimiter' = '!^*^!'\n" +
                ");");
        // 同步source数据到sink
        tableEnv.executeSql("INSERT INTO MysqlSink SELECT id, uid, address FROM dorisSink ;");
    }
}

3、打包并上传到flink


上传任务jar包到flink.png

入口类必填,其他三个参数视情况而定,然后点击submit即可看到对应job成功启动

四、遇到的问题

本地单机部署任务没有报错,但是通过flink kubernetes operator部署application模式,提示如下,但是在flink启动日志中有对应的连接器信息

Cannot discover a connector using option: 'connector'='mysql-cdc'
Cannot discover a connector using option: 'connector'='doris'

原因:
Flink使用Java的服务提供者接口(SPI)通过标识符加载表连接器/格式工厂。由于SPI资源文件名为org.apache.flink.table.factorys。每个表连接器/格式的工厂都在同一个目录META-INF/services下,当构建使用多个表连接器/格式化的项目的uber jar时,这些资源文件将相互覆盖,这将导致Flink无法加载表连接器/模板工厂。
以flink-sql-connector-mysql-cdc-2.4.1为例


flink-sql-connector-mysql-cdc-2.4.1.png

官网参考:概览 | Apache Flink

image.png

解决方案:
使用官网的打包方式打包

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <executions>
                    <execution>
                        <id>shade</id>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <transformers combine.children="append">
                                <!-- The service transformer is needed to merge META-INF/services files -->
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
                                <!-- ... -->
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容