flink cdc 使用

flink cdc 使用

目前 cdc 产品 非常多 ,目前我使用canal ,flink cdc (集成 debezium) 二者 对比相对来说 flink cdc 更加强大,功能很多 但是 有很多 坑,迭代速度很快,借助flink 分布式计算框架,分布式处理 数据。

1. canal

装个服务端,客户端自己写,当然也提供了一些适配器,我之前是定制 客户端写的组件。

https://github.com/alibaba/canal

2. flink cdc

官方文档

https://ververica.github.io/flink-cdc-connectors/master/

https://nightlies.apache.org/flink/flink-docs-master/zh/docs/dev/table/tableapi/

cdc 开发主要就是写 sql (flink sql),借助 flink 各种连接器 ,快速同步数据 sink 到各个地方,确实使用 方便 借助 checkpoint 可以 保证 事务操作 的精确 一次 操作(这个叼)。

flinksql 上手很容易 但是有个大坑,就是 如果 job 多了 很耗 数据库连接 和多次 重复读 bin日志 。

社区文章,阿里的内部解决 也是 合源 合 整库同步。(阿里云 flink cdc 合源,整库同步这种技术未开源,因为他有专门 数据同步服务)

image.png

https://flink-learning.org.cn/article/detail/da710dd3cdfb9b430af405725ad27784

腾讯 云 通过 和 source 解决

目前使用 Flink CDC Connector 做数据同步时,每个表都需要建立一个数据库连接,在多表、整库同步等场景下,对数据库实例的压力非常大,Oceanus 引入了多 source 复用的优化来解决这种问题。

https://cloud.tencent.com/document/product/849/76650

3. 不买 云服务 合源 解决方案

flink 1.45 ,flink cdc 2.1

思路 参考

http://www.dlink.top/docs/extend/practice_guide/cdc_kafka_multi_source_merge

flink table kafka

https://nightlies.apache.org/flink/flink-docs-master/zh/docs/connectors/table/formats/debezium/

json 解析 参考 debezium 官网

https://debezium.io/documentation/reference/1.3/connectors/mysql.html#mysql-connector-events_debezium

4 .代码 实现

  • datastream
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.DebeziumSourceFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;

/**
 * 
 * @Date 2022/8/26 16:04
 */
public class TestCdcKafka {

    public static void main(String[] args) throws Exception {

        //1.获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        // enable checkpoint
        String checkpointDir = "file:///D:/checkpoint";

        env.enableCheckpointing(3000);
        env.getCheckpointConfig().setCheckpointStorage(checkpointDir);
        //1.1 设置 CK&状态后端
        //略

        MySqlSource<String> mySqlSource =  MySqlSource.<String>builder()
                .hostname("")
                .port(3306)
                .username("")
                .password("")
                .databaseList("XXSX").tableList()
                //.databaseList()
                //.tableList() //这个注释,就是多库同步
         
                .deserializer(new CustomerDeserialization()) //这里需要自定义序列化格式
                //.deserializer(new StringDebeziumDeserializationSchema()) //默认是这个序列化格式
                .startupOptions(StartupOptions.latest())
                .build();

        //2.通过 FlinkCDC 构建 SourceFunction 并读取数据

        DataStreamSource<String> streamSource = env.fromSource(mySqlSource,WatermarkStrategy.noWatermarks(), "MySQL Source");

        //3.打印数据并将数据写入 Kafka
        streamSource.print();
        String sinkTopic = "testcdc";
        streamSource.addSink(getKafkaProducer("XXX:9092",sinkTopic));

        //4.启动任务
        env.execute("FlinkCDC");



    }

    //kafka 生产者
    public static FlinkKafkaProducer<String> getKafkaProducer(String brokers,String topic) {
        return new FlinkKafkaProducer<String>(brokers,
                topic,
                new SimpleStringSchema());
    }
}
  • 序列化
import com.alibaba.fastjson.JSONObject;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Field;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Schema;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Struct;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.source.SourceRecord;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import io.debezium.data.Envelope;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector;

import java.util.ArrayList;
import java.util.List;
/**
 * @Author 
 * @Date 2022/8/26 16:14
 */
public class CustomerDeserialization implements DebeziumDeserializationSchema<String> {


    @Override
    public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception {

        //1.创建 JSON 对象用于存储最终数据
        JSONObject result = new JSONObject();

        //2.获取库名&表名放入 source
        String topic = sourceRecord.topic();
        String[] fields = topic.split("\\.");
        String database = fields[1];
        String tableName = fields[2];
        JSONObject source = new JSONObject();
        source.put("db",database);
        source.put("table",tableName);

        Struct value = (Struct) sourceRecord.value();
        //3.获取"before"数据
        Struct before = value.getStruct("before");
        JSONObject beforeJson = new JSONObject();
        if (before != null) {
            Schema beforeSchema = before.schema();
            List<Field> beforeFields = beforeSchema.fields();
            for (Field field : beforeFields) {
                Object beforeValue = before.get(field);
                beforeJson.put(field.name(), beforeValue);
            }
        }

        //4.获取"after"数据
        Struct after = value.getStruct("after");
        JSONObject afterJson = new JSONObject();
        if (after != null) {
            Schema afterSchema = after.schema();
            List<Field> afterFields = afterSchema.fields();
            for (Field field : afterFields) {
                Object afterValue = after.get(field);
                afterJson.put(field.name(), afterValue);
            }
        }

        //5.获取操作类型  CREATE UPDATE DELETE 进行符合 Debezium-op 的字母
        Envelope.Operation operation = Envelope.operationFor(sourceRecord);
        String type = operation.toString().toLowerCase();
        if ("insert".equals(type)) {
            type = "c";
        }
        if ("update".equals(type)) {
            type = "u";
        }
        if ("delete".equals(type)) {
            type = "d";
        }
        if ("create".equals(type)) {
            type = "c";
        }

        //6.将字段写入 JSON 对象
        result.put("source", source);
        result.put("before", beforeJson);
        result.put("after", afterJson);
        result.put("op", type);

        //7.输出数据
        collector.collect(result.toJSONString());

    }

    @Override
    public TypeInformation<String> getProducedType() {
        return BasicTypeInfo.STRING_TYPE_INFO;
    }
}
  • flink sql
- |
  CREATE TABLE KafkaTable (
   origin_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' VIRTUAL,
  event_time TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL,
  origin_database STRING METADATA FROM 'value.source.database' VIRTUAL,
  origin_schema STRING METADATA FROM 'value.source.schema' VIRTUAL,
  origin_table STRING METADATA FROM 'value.source.table' VIRTUAL,
  `id` BIGINT,
  `agent_id` BIGINT,
  `usable_amount` decimal(20,4)
  ) WITH (
  'connector' = 'kafka',
  'topic' = 'testcdc',
  'properties.bootstrap.servers' = 'XXXXX:9092',
  'properties.group.id' = 'testGroup11',
  'scan.startup.mode' = 'earliest-offset',
  'value.format' = 'debezium-json'
  )
- |
  CREATE TABLE t_user_copy(
  id BIGINT,
  agent_id BIGINT,
  usable_amount decimal(20,4),
  PRIMARY KEY (id) NOT ENFORCED
  ) WITH (
  'connector' = 'elasticsearch-7',
  'hosts' = 'http://XXXXX:9200',
  'index' = 'user_cdc1'
  )
- select * FROM KafkaTable where origin_database='XXXX' and origin_table = 'XXXX'
彩蛋

实时数仓

flink cdc + kafka + doris 成本不高,相比 Hadoop 生态 那一套下来

doris 官网,像 使用 mysql 一样

https://doris.apache.org/zh-CN/docs/data-operate/import/import-way/binlog-load-manual

数仓文章 关于 doris

https://blog.csdn.net/weixin_46141936/article/details/121846412

https://blog.csdn.net/weixin_43320999/article/details/111599512

https://blog.csdn.net/dajiangtai007/article/details/123501210

新东方

https://blog.csdn.net/m0_54252387/article/details/125739846

https://cloud.tencent.com/developer/article/1925453?from=article.detail.1807913

数仓

https://cloud.tencent.com/developer/article/1938194

https://segmentfault.com/a/1190000040686141

https://www.daimajiaoliu.com/daima/7b7448559360801

https://blog.csdn.net/qq_37067752/article/details/107474369

https://tech.meituan.com/2020/04/09/doris-in-meituan-waimai.html

https://developer.aliyun.com/article/985042

https://yangshibiao.blog.csdn.net/article/details/118687344?spm=1001.2101.3001.6650.1&utm_medium=distribute.pc_relevant.none-task-blog-2%7Edefault%7ECTRLIST%7Edefault-1-118687344-blog-124237872.pc_relevant_multi_platform_whitelistv1_exp2&depth_1-utm_source=distribute.pc_relevant.none-task-blog-2%7Edefault%7ECTRLIST%7Edefault-1-118687344-blog-124237872.pc_relevant_multi_platform_whitelistv1_e

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,919评论 6 502
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,567评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 163,316评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,294评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,318评论 6 390
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,245评论 1 299
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,120评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,964评论 0 275
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,376评论 1 313
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,592评论 2 333
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,764评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,460评论 5 344
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,070评论 3 327
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,697评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,846评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,819评论 2 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,665评论 2 354

推荐阅读更多精彩内容