import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import com.ververica.cdc.connectors.oceanbase.table.OceanBaseDeserializationSchema;
import com.ververica.cdc.connectors.oceanbase.table.OceanBaseRecord;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import io.debezium.data.Envelope;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.shaded.netty4.io.netty.util.internal.ObjectUtil;
import org.apache.flink.util.Collector;
import org.apache.flink.util.StringUtils;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import java.util.Map;
import java.util.Objects;
/**
* @author majiajue
* @Title:
* @Description:
* @date 2023/8/915:16
*/
public class OceanBaseDeserializer implements OceanBaseDeserializationSchema<String> {
@Override
public TypeInformation<String> getProducedType() {
return BasicTypeInfo.STRING_TYPE_INFO;
}
@Override
public void deserialize(OceanBaseRecord oceanBaseRecord, Collector<String> collector) throws Exception {
JSONObject jsonObject = new JSONObject();
//提取数据库名
String database = oceanBaseRecord.getSourceInfo().getDatabase();;
//提取表名
String tableName = oceanBaseRecord.getSourceInfo().getTable();
//获取after数据
// Struct afterStruct = oceanBaseRecord.getLogMessageFieldsAfter();
JSONObject afterJson = new JSONObject();
//判断是否有after
if (oceanBaseRecord.getLogMessageFieldsAfter() != null) {
//遍历oceanBaseRecord.getLogMessageFieldsAfter()
oceanBaseRecord.getLogMessageFieldsAfter().forEach((k,v)->{
afterJson.put(k,v);
});
}
JSONObject beforeJson = new JSONObject();
if(oceanBaseRecord.getLogMessageFieldsBefore() != null){
oceanBaseRecord.getLogMessageFieldsBefore().forEach((k,v)->{
beforeJson.put(k,v);
});
}
//获取before数据
// Struct beforeStruct = value.getStruct("before");
// JSONObject beforeJson = new JSONObject();
// //判断是否有before
// if (beforeStruct != null) {
// for (Field field : beforeStruct.schema().fields()) {
// beforeJson.put(field.name(), beforeStruct.get(field));
// }
// }
//获得操作类型 DELETE UPDATE CREATE
String type ="" ;
if(Objects.isNull(oceanBaseRecord.getOpt())){
//适配initial模式
type = "insert".toUpperCase();
if(oceanBaseRecord.getJdbcFields()!=null&oceanBaseRecord.getLogMessageFieldsAfter()==null){
oceanBaseRecord.getJdbcFields().forEach((k,v)->{
afterJson.put(k,v);
});
}
}else{
type = oceanBaseRecord.getOpt().name();
}
// if ("create".toUpperCase().equals(type)) {
// type = "insert".toUpperCase();
// }
//封装数据到JSONObject
jsonObject.put("database", database);
jsonObject.put("tableName", tableName);
jsonObject.put("after", afterJson);
jsonObject.put("before", beforeJson);
jsonObject.put("type", type);
jsonObject.put("ts",oceanBaseRecord.getSourceInfo().getTimestampS());
collector.collect(jsonObject.toJSONString());
}
}
在.deserializer(new OceanBaseDeserializer()) 这样使用,这样就会输出json方便后续逻辑处理