一、背景
随着monggo中数据量越来越大,全量同步到数仓,已不太现实,考虑增量同步的方式,我们在探索增量同步的过程中,方案不断在改进优化,这里记录一下我们mongo增量同步的变迁史吧
二、方案一,通过BSON文件映射到临时表,然后insert overwrite到正式表
具体思路是:首先针对存量数据,通过mongodump,dump一份完整的bson文件,put到HDFS,然后建一个原始表映射到对应的bson文件,然后通过insert overwrite table final_table select * from origin_table转存到正式表,顺便还可以建分区,样例脚本如下:
1、先从monggo dump一份bson文件到本地
mongodump --host $host --port $port --username=$username --password=$password --collection $coll --db $db --out ${localPath}/ --authenticationDatabase=$db
2、然后从本地put到HDFS
hadoop fs -put ${localPath}/$db/$coll.bson $hdfsPath/$db/$coll/
3、然后在hive中建表,跟HDFS上的bson做映射
CREATE external TABLE if not exists table_origin(
`_id` string,
`batch` string,
`content` string,
`createtime` timestamp,
`mobile` string,
`type` string,
`updatetime` timestamp
)
comment '注释'
row format serde 'com.mongodb.hadoop.hive.BSONSerDe'
stored as inputformat 'com.mongodb.hadoop.mapred.BSONFileInputFormat'
outputformat 'com.mongodb.hadoop.hive.output.HiveBSONFileOutputFormat'
location 'HDFS上bson文件所在目录';
4、创建正式表,指定存储目录
CREATE TABLE if not exists table(
`_id` string,
`batch` string,
`content` string,
`createtime` timestamp,
`mobile` string,
`type` string,
`updatetime` timestamp,
)
comment '注释'
partitioned by (pyear int,pmonth int,pday int)
ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION '/user/hive/warehouse/mongodb/data/sdk_pro/table'
TBLPROPERTIES ( 'orc.compress'='snappy');
5、insert overwrite到正式表
INSERT INTO TABLE table
PARTITION (pyear,pmonth,pday)
SELECT
t.`_id` ,
t.`batch` ,
t.`content` ,
t.`createtime` ,
t.`mobile` ,
t.`type` ,
t.`updatetime` ,
year(t.createTime) pyear,month(t.createTime) pmonth,day(t.createTime) pday from table_origin t;
6、对于增量数据,根据条件dump增量数据,put到HDFS,建临时表(脚本同上),关键的不同点在于合并增量数据,合并脚本如下:
--合并数据到总表
with t_delta as (SELECT t.*,year(t.createAt) pyear,month(t.createAt) pmonth,day(t.createAt) pday from ${table_name} t),
t_base as (select b.* from sdk_call_nxcloud_voice_sms b where b.pyear =${pt_year} and b.pmonth = ${pt_month} and b.pday = ${pt_day})
INSERT OVERWRITE TABLE sdk_call_nxcloud_voice_sms
PARTITION (pyear,pmonth,pday)
select
coalesce(base.id, delta.id) id,
if(delta.id is NULL, base.countryCode,delta.countryCode) countryCode,
if(delta.id is NULL, base.voiceType,delta.voiceType) voiceType,
if(delta.id is NULL, base.messageid,delta.messageid) messageid,
if(delta.id is NULL, base.thirdNotifyState,delta.thirdNotifyState) thirdNotifyState,
if(delta.id is NULL, base.firstData,delta.firstData) firstData,
if(delta.id is NULL, base.secondData,delta.secondData) secondData,
if(delta.id is NULL, base.pyear,delta.pyear) pyear,
if(delta.id is NULL, base.pmonth,delta.pmonth) pmonth,
if(delta.id is NULL, base.pday,delta.pday) pday
from t_base base
full outer join t_delta delta on base.pyear = delta.pyear and base.pmonth = delta.pmonth and base.pday = delta.pday and base.id = delta.id;
三、存在的问题
此方案虽然简单易懂易上手,但是过程复杂,重复占用大量存储空间,有待改进。