生产上出现了问题:
datax 从hdfs 读 orc 数据导入 mongodb,
有时会产生数据的丢失
针对这个问题在 github上提了个问题。
然后两个月后,大神们修复了这个问题。
问题见
https://github.com/alibaba/DataX/issues/239
下面的是问题代码部分
InputSplit[] splits = in.getSplits(conf, 1);
RecordReader reader = in.getRecordReader(splits[0], conf, Reporter.NULL);
Object key = reader.createKey();
Object value = reader.createValue();
// 获取列信息
List<? extends StructField> fields = inspector.getAllStructFieldRefs();
List<Object> recordFields;
while (reader.next(key, value)) {
recordFields = new ArrayList<Object>();
for (int i = 0; i <= columnIndexMax; i++) {
Object field = inspector.getStructFieldData(value, fields.get(i));
recordFields.add(field);
// OrcInputFormat getSplits params numSplits not used, splits size = block numbers
InputSplit[] splits = in.getSplits(conf, -1);
for (InputSplit split : splits) {
{
RecordReader reader = in.getRecordReader(split, conf, Reporter.NULL);
Object key = reader.createKey();
Object value = reader.createValue();
// 获取列信息
List<? extends StructField> fields = inspector.getAllStructFieldRefs();
List<Object> recordFields;
while (reader.next(key, value)) {
recordFields = new ArrayList<Object>();
for (int i = 0; i <= columnIndexMax; i++) {
Object field = inspector.getStructFieldData(value, fields.get(i));
recordFields.add(field);
}
transportOneRecord(column, recordFields, recordSender,
taskPluginCollector, isReadAllColumns, nullFormat);
}
reader.close();
https://github.com/alibaba/DataX/pull/262/commits/e2dcbcd532bebdb46992ab58b53ae37f1477cec0
贴下有问题的部分
大致看了下应该是 读文件时 由于hdfs文件存储 是block 形式的。
当单个文件 大于 单个block 的size时,出现一个文件 多个block 存储。
之前bug 部分时 仅读取了第一个block
造成了数据的部分丢失
验证下是否是对应的问题
生成对应的倾斜数据
set mapred.reduce.tasks =1 ;
insert into table inner_ods_ctis_txn_tif_bas partition (day=19880105 )
select `(day)?+.+` from inner_ods_ctis_txn_tif_bas
distribute by rand() ;
上面的语句是一个用 正则完成分区表数据插入的好的方式,可以参考。
利用distribute by rand() 重分配会产生reduce作业,并设置reduce个数为1 多次循环后,就可讲一个数据变成一个倾斜的大文件。
该过程因为倾斜。。。造一个 300M 不到的文件 ,开发环境跑了 40分钟。。。
操作时需要有一定时间的预估。
数据情况
hive> select count(1) from inner_ods_ctis_txn_tif_bas where day=19880105 limit 10 ;
10956384
hive> select count(1) from inner_ods_ctis_txn_tif_bas where day=19880103 limit 10 ;
913032
=============================================
hive> dfs -du -h /project/BDP/data/inner/ODS/78/TXN_TIF_BAS/day=19880105 ;
16.8 M /project/BDP/data/inner/ODS/78/TXN_TIF_BAS/day=19880105/000000_0
275.4 M /project/BDP/data/inner/ODS/78/TXN_TIF_BAS/day=19880105/000000_0_copy_1
12.4 M /project/BDP/data/inner/ODS/78/TXN_TIF_BAS/day=19880105/000001_0
16.8 M /project/BDP/data/inner/ODS/78/TXN_TIF_BAS/day=19880105/000002_0
8.5 M /project/BDP/data/inner/ODS/78/TXN_TIF_BAS/day=19880105/000003_0
16.8 M /project/BDP/data/inner/ODS/78/TXN_TIF_BAS/day=19880105/000004_0
16.8 M /project/BDP/data/inner/ODS/78/TXN_TIF_BAS/day=19880105/000005_0
8.4 M /project/BDP/data/inner/ODS/78/TXN_TIF_BAS/day=19880105/000006_0
8.4 M /project/BDP/data/inner/ODS/78/TXN_TIF_BAS/day=19880105/000007_0
8.5 M /project/BDP/data/inner/ODS/78/TXN_TIF_BAS/day=19880105/000008_0
8.3 M /project/BDP/data/inner/ODS/78/TXN_TIF_BAS/day=19880105/000009_0
8.2 M /project/BDP/data/inner/ODS/78/TXN_TIF_BAS/day=19880105/000010_0
3.9 M /project/BDP/data/inner/ODS/78/TXN_TIF_BAS/day=19880105/000011_0
hive> dfs -du -h /project/BDP/data/inner/ODS/78/TXN_TIF_BAS/day=19880103 ;
8.5 M /project/BDP/data/inner/ODS/78/TXN_TIF_BAS/day=19880103/000000_0
16.8 M /project/BDP/data/inner/ODS/78/TXN_TIF_BAS/day=19880103/000001_0
8.2 M /project/BDP/data/inner/ODS/78/TXN_TIF_BAS/day=19880103/000002_0
===========================================
{
"job": {
"content": [
{
"reader": {
"name": "hdfsreader",
"parameter": {
"column": [
{
"index":0,
"type":"string"
},
{
"index":41,
"type":"string"
},
{
"index":46,
"type":"string"
}
],
"defaultFS": "hdfs://hdfsCluster",
"encoding": "UTF-8",
"fileType": "orc",
"haveTbds": "true",
"path": "/project/BDP/data/inner/ODS/78/TXN_TIF_BAS/day=19880105/*",
"hadoopConfig": {
"dfs.nameservices":"hdfsCluster",
"dfs.ha.namenodes.hdfsCluster":"nn1,nn2",
"dfs.namenode.rpc-address.hdfsCluster.nn1":"22.188.9.108:8020",
"dfs.namenode.rpc-address.hdfsCluster.nn2":"22.188.9.100:8020",
"dfs.client.failover.proxy.provider.hdfsCluster":"org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
}
}
},
"writer": {
"name": "mongodbwriter",
"parameter": {
"address": ["22.188.9.104:6000","22.188.9.105:6000","22.188.9.106:6000"],
"collectionName": "TEST",
"column": [
{
"name":"md5",
"type":"string"
},
{
"name":"status",
"type":"string"
},
{
"name":"batTime",
"type":"string"
}
],
"dbName": "HDSS",
"writeMode": {
"isReplace":"false",
"replaceKey":"tranKey"
},
"userName": "hdss",
"userPassword": "hdss123"
}
}
}
],
"setting": {
"speed": {
"channel": "5"
}
}
}
}
================================
日志明细
2019-02-12 14:44:58.348 [job-0] INFO StandAloneJobContainerCommunicator - Total 10447128 records, 74481116 bytes | Speed 123.28KB/s, 17706 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 90.651s | All Task WaitReaderTime 507.924s | Percentage 100.00%
2019-02-12 14:44:58.350 [job-0] INFO JobContainer -
任务启动时刻 : 2019-02-12 14:34:59
任务结束时刻 : 2019-02-12 14:44:58
任务总计耗时 : 598s
任务平均流量 : 123.28KB/s
记录写入速度 : 17706rec/s
读出记录总数 : 10447128
读写失败总数 : 0
这里看到数据发生了丢失
对比验证
2019-02-12 14:50:06.698 [job-0] INFO StandAloneJobContainerCommunicator - Total 913032 records, 6507408 bytes | Speed 105.91KB/s, 15217 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 7.747s | All Task WaitReaderTime 56.215s | Percentage 100.00%
2019-02-12 14:50:06.706 [job-0] INFO JobContainer -
任务启动时刻 : 2019-02-12 14:48:57
任务结束时刻 : 2019-02-12 14:50:06
任务总计耗时 : 69s
任务平均流量 : 105.91KB/s
记录写入速度 : 15217rec/s
读出记录总数 : 913032
读写失败总数 : 0
单个文件数据量不大时无数据丢失
============================
因为 娟娟 同学加了对应 权限认证的部分。
所以这次把娟娟的代码都提了出来,
重新编译了下,直接将修改的bug部分覆盖掉,再编译下,拿到对应的class文件完成对应的替换。
开发环境上 将原有 maven 编出的 snapshot.jar 重命名了下
新建了 snapshot.jar 的目录
将编译好的 class文件放了进去。
替换后
对应 19880105 日的 datax 导数日志
2019-02-12 15:19:38.747 [job-0] INFO StandAloneJobContainerCommunicator - Total 10956384 records, 78088896 bytes | Speed 117.32KB/s, 16855 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 86.431s | All Task WaitReaderTime 568.876s | Percentage 100.00%
2019-02-12 15:19:38.750 [job-0] INFO JobContainer -
任务启动时刻 : 2019-02-12 15:08:38
任务结束时刻 : 2019-02-12 15:19:38
任务总计耗时 : 660s
任务平均流量 : 117.32KB/s
记录写入速度 : 16855rec/s
读出记录总数 : 10956384
读写失败总数 : 0
问题得到了解决