现象:
当期的操作流程如下图:
这样的处理流程会造成如下问题:
1、当binlog解析出的批次数据中,数据包含了对同一条数据的删除和修改操作时,无法保证操作执行的顺序。
解决方案(针对kudu的Destination):
Kudu的Destination中有个设置Default Operation ,这个设置的说明是:
default operation to perform if sdc.operation.type is not set in record header.
所以我们可以通过Record的Header Attribute 中的sdc.opeation.type来直接控制数据在kudu的Destination中执行的操作。
对数据的操作不进行分离,通过sdc.operation.type加入的Record的Header Attributes中进行控制。
sdc.operation.type的值的说明如下:
- INSERT_CODE = 1;
- DELETE_CODE = 2;
- UPDATE_CODE = 3;
- UPSERT_CODE = 4;
在Javascript Evaluator 中的JS中增加如下的代码:
for(var i = 0; i < records.length; i++) {
try {
var newRecord = sdcFunctions.createRecord(true);
var attributes = records[i].attributes
if(records[i].value['Type'] =='DELETE'){
newRecord.attributes['sdc.operation.type']='2';
newRecord.value = records[i].value['OldData'];
}else{
newRecord.attributes['sdc.operation.type']='4'
newRecord.value = records[i].value['Data'];
}
newRecord.value.Type = records[i].value['Type'];
newRecord.value.Database = records[i].value['Database'];
newRecord.value.Table = records[i].value['Table'];
output.write(newRecord);
} catch (e) {
// Send record to error
error.write(records[i], e);
}
}
``