hudi做ETL大数据量情况下聚合数据不正确问题分享

1.问题:17W明细数据批量导入hudi,经过路径mysql->hudi ods表->hudi dws表->mysql 的运算后,发现sink到mysql表的聚合结果有误,详细过程如下:


Case:按天计算报表查询最大耗时指标:

 经过以上数据流转后,mysql结果表如右图,无线外购报表的最大耗时计算错误


2.使用软件版本:

   hudi:0.10.0;

   flink: flink-1.13.1

   flink-cdc:2.0

3.原因分析:

   step1:经过测试,发现只有在大并发数据插入情况下,会出现数据计算错误问题,且出问题的数据比较随机,小数据量情况下的增删改均无问题,考虑是并发场景下数据流转方面出现问题;

  step2:查询Hudi-DWS表,发现聚合结果是正确的,排除了Hudi聚合计算流程问题,将目标定位到从Hudi-DWS表 sink 数据到mysql结果表任务,该任务对应的SQL为:

INSERT INTO item_access_groupby_jdbc SELECT `item_name` ,`group_name`  ,`report_name` ,`report_id`,`avg_delay`,`max_delay`,`min_delay`  ,`access_type` ,`createdate` ,CAST(`access_count` AS int) ,CAST(`access_user_count`  AS int) FROM dws_item_access_groupby_hudi;


任务Job对应的执行图如下:

通过Job图和源码分析,整个读取Hudi-DWS表,并Sink到mysql结果表的过程分为如下几步:

(1)split_monitor:每间隔3s(可设置)去监听Hudi TimeLine 上是否存在新提交的Instance,有的话则读取新提交的Instance,并获取对应的数据FileSlice(内包含数据log文件和parquet文件),将信息封装,下发给后面的split_reader进行处理,这里要注意其分发模式是Rebanlance;

(2)split_reader:接收split_monitor传递的需要处理的文件信息,对文件中的数据进行处理,完毕后sink到mysql中,注意这里的并发度为4;

   所以猜测split_monitor在两次监控下,对同一数据文件进行了Rebanlance模式的分发,并分发给了两个不同的task进行处理,task对数据文件处理速度不一致,导致了老的回撤流覆盖了新的回撤流,流程如下:

数据文件结构:

数据导出流程:

4.解决方法:

    方案一:将split_reader并行度指定为1,此时只有一个task处理log数据文件,保证处理顺序性,具体改动是在定义Hudi-DWS表的时候指定参数'read.tasks' = '1',但该方案会影响sink处理速度;

    方案二:修改源码:在分发log文件时候,按照分区值进行keyBy,保证同一分区下数据文件都给一个Task进行处理,从而保证数据处理的有序性,主要修改如下三个类:

      1.HoodieTableSource类:

   2.IncrementalInputSplits类的inputSplits方法:

3.MergeOnReadInputSplit类,增加变量realPartition(表示分区)

   修改完成后,重新进行hudi打包,问题消失,mysql结果表数据计算正确,job图如下,log分发从之前的Rebanlance变成了hash,此时同一个log文件被相同task处理,保障了处理的顺序性。

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容