DataEngine数据处理流程
DataEngine数据处理流程包含如下步骤:
- change
- validate
- push&relation
- stat
- info
- order
change
change 是从hdfs和hbase表中读取数据,逐字段比较提取变更信息,保存至hbase.目前包括工商三类数据和非工商4类数据.
无前置条件
1. 执行流程
Title: change sequence diagram - gs
hdfs->HDFSDataProvider: 1. 读取HDFS的\ncompare文件
HDFSDataProvider->DataModel: 2. 每行进行compare,\n将变更保存到\ndataModel对象
DataModel->DataModel: 3. 对dataModel进行\n指定字段的空值和日期\n异常进行过滤
HDFSDataProvider->rb_gs_change: 4. 保存到hbase的\nrb_gs_change和\nrb_non_gs_change表
Title: change sequence diagram - nongs
hdfs->HDFSDataProvider: 1. 读取HDFS的\ncompare文件
HDFSDataProvider->DataModel: 2. 每行进行compare,\n将变更保存到dataModel对象
DataModel->DataModel: 3. 对dataModel进行\n指定字段的空值和日期异常\n进行过滤
DataModel-->HDFSDataProvider:
HDFSDataProvider->rb_gs_change: 4. 保存到hbase的\nrb_gs_change表
2. 执行脚本
/opt/data-engine/current/bin/data_loader.sh E_ENT_BASEINFO 20160314000000 hdfs://chinadaas11:8020/hive1/user/hive/warehouse/enterprisebaseinfocollect_20160314_compare/ &
/opt/data-engine/current/bin/data_loader.sh E_INV_INVESTMENT 20160314000000 hdfs://chinadaas11:8020/hive1/user/hive/warehouse/e_inv_investment_20160314_compare/ &
/opt/data-engine/current/bin/data_loader.sh E_PRI_PERSON 20160314000000 hdfs://chinadaas11:8020/hive1/user/hive/warehouse/e_pri_person_20160314_compare/ &
/opt/data-engine/current/bin/data_loader.sh DIS_SXBZXR 20160314000000 dis_sxbzxr_new_name &
/opt/data-engine/current/bin/data_loader.sh FROST 20160314000000 frost_pripid &
/opt/data-engine/current/bin/data_loader.sh IMPAWN 20160314000000 impawn_pripid &
/opt/data-engine/current/bin/data_loader.sh XZCF 20160314000000 xzcf_pripid &
validate
validate 是将订单企业的change数据进行过滤的过程.
前置条件 : change order
1. 执行流程
Title:validate sequence diagram
rb_..._change->ValidateDataProvider:1.从hbase读取变更数据
ValidateDataProvider->DataModel:2. 生成DataModel对象
DataModel->DataModel:3. 过滤重复,\n反复数据
DataModel-->ValidateDataProvider:
ValidateDataProvider->rb_validated_change:4. 将数据保存到hbase
2. 执行脚本
/opt/data-engine/current/bin/data_loader.sh VALIDATE 20160314000000 rb_gs_change SCAN_BY_DATE &
/opt/data-engine/current/bin/data_loader.sh VALIDATE 20160314000000 rb_non_gs_change SCAN_BY_DATE &
5. push&relation
push&relation 是按照用户提取企业变更信息的过程.处理结果保存在hbase和es中.
前置条件 : validate
1. 处理过程
order_index -> changeEntInfo.txt : 1. 按order索引生\n成用户监控的企业
rb_validated_change -> ChangePushDataProvider : 2. 从rb_validated_change\n按日期读取
changeEntInfo.txt -> ChangePushDataProvider : 3. 读取用户监控企业
ChangePushDataProvider -> ChangePushDataProvider : 4. 读取监控企业的用户
ChangePushDataProvider -> column.txt : 5. 读取用户监控字段
ChangePushDataProvider -> rb_push: 6. 将监控字段的\n变更信息入hbase
ChangePushDataProvider -> push&relation_index: 7. 将监控字段的\n变更信息入es
2. 执行脚本
/opt/data-engine/current/bin/data_loader.sh CHANGE_PUSH 20160314000000 rb_validated_change SCAN_BY_DATE &
stat
stat 是将relation信息和企业基本信息整合的过程.
前置条件 : relation info
1. 执行脚本
/opt/data-engine/current/bin/data_loader.sh STATISTICS 20160314000000 p_change &
info
info 是工商基本信息的索引.
无前置条件
1. 执行脚本
/opt/data-engine/current/bin/data_loader.sh INDEX_ENT_INFO 20160314000000 ENTERPRISEBASEINFOCOLLECT_20160314 &
order
order 索引,从mysql同步.