描述
- 通过operation信息(insert、update、delete),实现了对上游数据的变更识别能力,这些信息会记录在于上游数据连接的flink table中
- 通过cdc接入数据并且转换到其他flink table中时,该flink table需要设置primary key(该key从kafka消息的key或value中获取),以让flink识别出对应的update和delete操作应作用于该flink table的哪一条数据
- sink外部表也需要设置对等的primary key,否则update fink table时将无法在原有数据上修改而是增加数据,以及delete flink table时将无法删除原有数据
- value.fields-include选项指定为ALL时,表示key从value中获取,指定为EXCEPT_KEY时,表示从key中获取,默认为ALL
- flink会先读topic的全量数据,再做增量的upsert
原理
- flink table A通过cdc对接上游数据,对接收到的数据进行primary key判断,如果primary key不存在于flink table A中,则判断为insert,如果primary key存在于flink table A中且数据的value不为空,则判断为update,否则判断为delete。
- flink table A把数据转换到flink table B时,flink table B会根据数据中附带的操作类型做所需的update和delete操作,因此需要flink table B指定primary key,以实现update和delete操作的数据定位
- flink table B把数据输出到外部系统时,flink table B会把操作类型解码成外部系统所能识别的操作命令,并且通过外部系统的唯一字段与flink table B的sink数据做比对,如果发现一致则进行对应的update/delete,否则做insert
依赖
1.0.0 ~ 1.3.0,适用于flink 1.12.x以下
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>1.3.0</version>
</dependency>
1.4.0 ~ 2.0.0,适用于flink 1.13.x以上
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>2.0.0</version>
</dependency>
使用
String sourceTableSQL = "create table score(" +
"id int," +
"student_no string," +
"class_name string," +
"score double," +
"primary key(id) not enforced" + // 多个联合主键用逗号分隔
") with (" +
"'connector'='upsert-kafka'," +
"'topic' = 'ods'," +
"'properties.bootstrap.servers'='localhost:9092'," +
"'key.format'='raw'," +
"'value.format'='json'" +
")";