MongoDB数据增量同步到Hive(方案一通过BSON文件映射)

一、背景

随着monggo中数据量越来越大,全量同步到数仓,已不太现实,考虑增量同步的方式,我们在探索增量同步的过程中,方案不断在改进优化,这里记录一下我们mongo增量同步的变迁史吧

二、方案一,通过BSON文件映射到临时表,然后insert overwrite到正式表

具体思路是:首先针对存量数据,通过mongodump,dump一份完整的bson文件,put到HDFS,然后建一个原始表映射到对应的bson文件,然后通过insert overwrite table final_table select * from origin_table转存到正式表,顺便还可以建分区,样例脚本如下:
1、先从monggo dump一份bson文件到本地

 mongodump --host $host --port $port --username=$username  --password=$password --collection $coll --db $db --out ${localPath}/ --authenticationDatabase=$db

2、然后从本地put到HDFS

hadoop fs -put ${localPath}/$db/$coll.bson  $hdfsPath/$db/$coll/

3、然后在hive中建表,跟HDFS上的bson做映射

CREATE external TABLE if not exists table_origin(
`_id` string, 
`batch` string, 
`content` string, 
`createtime` timestamp, 
`mobile` string, 
`type` string, 
`updatetime` timestamp
)
comment '注释'
row format serde 'com.mongodb.hadoop.hive.BSONSerDe'
stored as inputformat 'com.mongodb.hadoop.mapred.BSONFileInputFormat'
outputformat 'com.mongodb.hadoop.hive.output.HiveBSONFileOutputFormat'
location 'HDFS上bson文件所在目录';

4、创建正式表,指定存储目录

CREATE TABLE if not exists table(
`_id` string, 
`batch` string, 
`content` string, 
`createtime` timestamp, 
`mobile` string,
`type` string, 
`updatetime` timestamp,
)
comment '注释'
partitioned by (pyear int,pmonth int,pday int)
ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION '/user/hive/warehouse/mongodb/data/sdk_pro/table'
TBLPROPERTIES ( 'orc.compress'='snappy'); 

5、insert overwrite到正式表

INSERT INTO TABLE table
PARTITION (pyear,pmonth,pday)
SELECT 
t.`_id` , 
t.`batch` , 
t.`content` , 
t.`createtime` , 
t.`mobile` ,
t.`type` , 
t.`updatetime` ,
year(t.createTime) pyear,month(t.createTime) pmonth,day(t.createTime) pday from table_origin t;

6、对于增量数据,根据条件dump增量数据,put到HDFS,建临时表(脚本同上),关键的不同点在于合并增量数据,合并脚本如下:

--合并数据到总表
with t_delta as (SELECT t.*,year(t.createAt) pyear,month(t.createAt) pmonth,day(t.createAt) pday from ${table_name} t),
t_base as (select b.* from sdk_call_nxcloud_voice_sms b where b.pyear =${pt_year} and b.pmonth = ${pt_month} and b.pday = ${pt_day})
INSERT OVERWRITE TABLE sdk_call_nxcloud_voice_sms
PARTITION (pyear,pmonth,pday)
select  
coalesce(base.id, delta.id) id,
if(delta.id is NULL, base.countryCode,delta.countryCode) countryCode,
if(delta.id is NULL, base.voiceType,delta.voiceType) voiceType,
if(delta.id is NULL, base.messageid,delta.messageid) messageid,
if(delta.id is NULL, base.thirdNotifyState,delta.thirdNotifyState) thirdNotifyState,
if(delta.id is NULL, base.firstData,delta.firstData) firstData,
if(delta.id is NULL, base.secondData,delta.secondData) secondData,
if(delta.id is NULL, base.pyear,delta.pyear) pyear,
if(delta.id is NULL, base.pmonth,delta.pmonth) pmonth,
if(delta.id is NULL, base.pday,delta.pday) pday 
from t_base base 
full outer join t_delta delta on base.pyear = delta.pyear and base.pmonth = delta.pmonth and base.pday = delta.pday and base.id = delta.id;

三、存在的问题

此方案虽然简单易懂易上手,但是过程复杂,重复占用大量存储空间,有待改进。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。