1.问题:17W明细数据批量导入hudi,经过路径mysql->hudi ods表->hudi dws表->mysql 的运算后,发现sink到mysql表的聚合结果有误,详细过程如下:
Case:按天计算报表查询最大耗时指标:
经过以上数据流转后,mysql结果表如右图,无线外购报表的最大耗时计算错误
2.使用软件版本:
hudi:0.10.0;
flink: flink-1.13.1
flink-cdc:2.0
3.原因分析:
step1:经过测试,发现只有在大并发数据插入情况下,会出现数据计算错误问题,且出问题的数据比较随机,小数据量情况下的增删改均无问题,考虑是并发场景下数据流转方面出现问题;
step2:查询Hudi-DWS表,发现聚合结果是正确的,排除了Hudi聚合计算流程问题,将目标定位到从Hudi-DWS表 sink 数据到mysql结果表任务,该任务对应的SQL为:
INSERT INTO item_access_groupby_jdbc SELECT `item_name` ,`group_name` ,`report_name` ,`report_id`,`avg_delay`,`max_delay`,`min_delay` ,`access_type` ,`createdate` ,CAST(`access_count` AS int) ,CAST(`access_user_count` AS int) FROM dws_item_access_groupby_hudi;
任务Job对应的执行图如下:
通过Job图和源码分析,整个读取Hudi-DWS表,并Sink到mysql结果表的过程分为如下几步:
(1)split_monitor:每间隔3s(可设置)去监听Hudi TimeLine 上是否存在新提交的Instance,有的话则读取新提交的Instance,并获取对应的数据FileSlice(内包含数据log文件和parquet文件),将信息封装,下发给后面的split_reader进行处理,这里要注意其分发模式是Rebanlance;
(2)split_reader:接收split_monitor传递的需要处理的文件信息,对文件中的数据进行处理,完毕后sink到mysql中,注意这里的并发度为4;
所以猜测split_monitor在两次监控下,对同一数据文件进行了Rebanlance模式的分发,并分发给了两个不同的task进行处理,task对数据文件处理速度不一致,导致了老的回撤流覆盖了新的回撤流,流程如下:
数据文件结构:
数据导出流程:
4.解决方法:
方案一:将split_reader并行度指定为1,此时只有一个task处理log数据文件,保证处理顺序性,具体改动是在定义Hudi-DWS表的时候指定参数'read.tasks' = '1',但该方案会影响sink处理速度;
方案二:修改源码:在分发log文件时候,按照分区值进行keyBy,保证同一分区下数据文件都给一个Task进行处理,从而保证数据处理的有序性,主要修改如下三个类:
1.HoodieTableSource类:
2.IncrementalInputSplits类的inputSplits方法:
3.MergeOnReadInputSplit类,增加变量realPartition(表示分区)
修改完成后,重新进行hudi打包,问题消失,mysql结果表数据计算正确,job图如下,log分发从之前的Rebanlance变成了hash,此时同一个log文件被相同task处理,保障了处理的顺序性。