背景
由于越来越多的Mysql数据以及Mongodb的数据需要做分析.但是大量的数据分析并不适合在Mysql以及Mongodb中进行,所以需要将数据同步指CDH中, 由于2方面原因所以才有了实时同步的需求.
1.数据延后性
由于Sqoop/Mongoexport 数据的延后性,导致CDH只能作为分析平台为分析人员提供数据支持,针对于分析视角而言是没问题的.应用系统基本会从性能的角度考虑,从而限制用户导出数据的条数,如果从基层业务人员需要从应用系统导出大量数据时问题来了.为什么你们提供给我的数据和应用系统的数据不一致呢?为什么不能导出最新的数据给我呢?我需要用最新的数据处理XX业务问题,但是应用系统只能每次导出500条,我有10000条需要导出,难道要导出20次吗?
2.集群性能压力
如果公司有100个数据库,而大家都知道Sqoop/Mongoexport操作 都是会影响集群性能负载,最初有考虑将所有Sqoop/Mongoexport分散在不同的服务器以解决此问题.但是Sqoop同步的时候由于是Yarn管理资源.所以会有一定限制.如果这么多数据库一直不停的同步.可能需要10个小时(数据库数据大小决定)甚至10小时以上,大量的ETL脚本一般都是需要在上班之前呈现给业务方.那么数据库如果是在晚上同步的话,那么早上上班前可能只有1~
2个小时的时间去做数据的ETL工作.导致了大量的数据脚本集中在这1~
2个小时.大量的读写以及查询导致集群负载飙升.写本文的这个早上我们的集群机器负载达到了1000,这个非常恐怖甚至不敢想象今后越来越多的数据处理工作如何进行,所以才有了实时同步的需求.
3.实时数仓建设
目前正在建设做大数据数据仓库的建设工作,每天全表刷新数仓所有数据非常耗时,且浪费资源.实时同步也为实时数仓建设打开了大门.之后对接 Flink 等实时处理工作也有极大的支持.
同步工具
在2018年中的时候就在关注实时同步,但是国内很多都是使用阿里DataX,不可否认是个好工具.但是我更希望能找到可以集成在Cloudera生态之内的同步工具,StreamSets让我眼前一亮,这不就是我需要的工具吗?由于一些原因.直到今年年末才有时间继续研究这一块.
话不多说,进入正题
1.Impala中创建KUDU表.
CREATE TABLE test.mgtest
(`_id` STRING NOT NULL,
userid STRING, name string,
age string,
PRIMARY KEY (`_id`))
PARTITION BY HASH (`_id`)
PARTITIONS 16 STORED AS KUDU
TBLPROPERTIES ('kudu.master_addresses'='cdh2:7051', 'kudu.table_name'='test.mgtest')
2.配置流
2.1.Mongo数据源配置.
2.2.数据库选择.
2.3.数据处理.
代码如下:
for record in records:
newRecord = sdcFunctions.createRecord(record.sourceId + ':newRecordId')
try:
if record.value['op'] == 'i':
newRecord.attributes['sdc.operation.type']='1'
newRecord.value = record.value['o']
if record.value['op'] == 'd':
newRecord.attributes['sdc.operation.type']='2'
newRecord.value = record.value['o']
if record.value['op'] == 'u':
newRecord.attributes['sdc.operation.type']='3'
newRecord.value = record.value['o']['$set']
newRecord.value['_id'] = record.value['o2']['_id']
# Write record to processor output
#newRecord.value['Type'] = record.value['Type']
newRecord.value['ns'] = record.value['ns']
newRecord.value['op'] = record.value['op']
output.write(newRecord)
except Exception as e:
# Send record to error
error.write(newRecord, str(e))
2.4.子文档处理
代码如下:
import array
import json
def convert_json(item):
return json.dumps(item,ensure_ascii=False,encoding="utf-8")
def convert_array(item):
return json.dumps(item,ensure_ascii=False,encoding="utf-8")
for record in records:
try:
for colName,value in record.value.items():
temp = record.value[colName]
record.value[colName] = None
if sdcFunctions.getFieldNull(record,'/'+colName) is NULL_MAP:
temp = convert_json(temp)
elif sdcFunctions.getFieldNull(record,'/'+colName) is NULL_LIST:
temp = convert_array(temp)
record.value[colName] = temp
output.write(record)
except Exception as e:
error.write(record, str(e))
2.5.CRUD选择器.
2.6.插入数据
更新和删除数据更改Default Operation 的值即可.
2.7 测试MongoDB 插入数据
2.7.1 Mongo插入数据
2.7.2 Impala查询
2.7.3 Streamsets流
2.8 Mongo更新数据
2.8.1 Mongo更新数据
2.8.2 Impala查询
2.8.3 StreamSets流
2.9 MongoDB删除数据
2.9.1 MongoDB删除数据
2.9.2 Impala查询
最后在impala使用的过程中,由于mongoDB子文档问题.需要升级到Impala版本3.1以上以支持get_json_object 这个function, 后续需要考虑的问题是Hive如何使用.