flink - cdc - kafka

描述
  • 通过operation信息(insert、update、delete),实现了对上游数据的变更识别能力,这些信息会记录在于上游数据连接的flink table中
  • 通过cdc接入数据并且转换到其他flink table中时,该flink table需要设置primary key(该key从kafka消息的key或value中获取),以让flink识别出对应的update和delete操作应作用于该flink table的哪一条数据
  • sink外部表也需要设置对等的primary key,否则update fink table时将无法在原有数据上修改而是增加数据,以及delete flink table时将无法删除原有数据
  • value.fields-include选项指定为ALL时,表示key从value中获取,指定为EXCEPT_KEY时,表示从key中获取,默认为ALL
  • flink会先读topic的全量数据,再做增量的upsert
原理
  1. flink table A通过cdc对接上游数据,对接收到的数据进行primary key判断,如果primary key不存在于flink table A中,则判断为insert,如果primary key存在于flink table A中且数据的value不为空,则判断为update,否则判断为delete。
  2. flink table A把数据转换到flink table B时,flink table B会根据数据中附带的操作类型做所需的update和delete操作,因此需要flink table B指定primary key,以实现update和delete操作的数据定位
  3. flink table B把数据输出到外部系统时,flink table B会把操作类型解码成外部系统所能识别的操作命令,并且通过外部系统的唯一字段与flink table B的sink数据做比对,如果发现一致则进行对应的update/delete,否则做insert
依赖
1.0.0 ~ 1.3.0,适用于flink 1.12.x以下
<dependency>
    <groupId>com.alibaba.ververica</groupId>
    <artifactId>flink-connector-mysql-cdc</artifactId>
    <version>1.3.0</version>
</dependency>
1.4.0 ~ 2.0.0,适用于flink 1.13.x以上
<dependency>
    <groupId>com.ververica</groupId>
    <artifactId>flink-connector-mysql-cdc</artifactId>
    <version>2.0.0</version>
</dependency>
使用
String sourceTableSQL = "create table score(" +
        "id int," +
        "student_no string," +
        "class_name string," +
        "score double," +
        "primary key(id) not enforced" + // 多个联合主键用逗号分隔
        ") with (" +
        "'connector'='upsert-kafka'," +
        "'topic' = 'ods'," +
        "'properties.bootstrap.servers'='localhost:9092'," +
        "'key.format'='raw'," +
        "'value.format'='json'" +
        ")";
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容