MongoDB实时同步Impala

背景

由于越来越多的Mysql数据以及Mongodb的数据需要做分析.但是大量的数据分析并不适合在Mysql以及Mongodb中进行,所以需要将数据同步指CDH中, 由于2方面原因所以才有了实时同步的需求.

1.数据延后性
由于Sqoop/Mongoexport 数据的延后性,导致CDH只能作为分析平台为分析人员提供数据支持,针对于分析视角而言是没问题的.应用系统基本会从性能的角度考虑,从而限制用户导出数据的条数,如果从基层业务人员需要从应用系统导出大量数据时问题来了.为什么你们提供给我的数据和应用系统的数据不一致呢?为什么不能导出最新的数据给我呢?我需要用最新的数据处理XX业务问题,但是应用系统只能每次导出500条,我有10000条需要导出,难道要导出20次吗?

2.集群性能压力
如果公司有100个数据库,而大家都知道Sqoop/Mongoexport操作 都是会影响集群性能负载,最初有考虑将所有Sqoop/Mongoexport分散在不同的服务器以解决此问题.但是Sqoop同步的时候由于是Yarn管理资源.所以会有一定限制.如果这么多数据库一直不停的同步.可能需要10个小时(数据库数据大小决定)甚至10小时以上,大量的ETL脚本一般都是需要在上班之前呈现给业务方.那么数据库如果是在晚上同步的话,那么早上上班前可能只有1~2个小时的时间去做数据的ETL工作.导致了大量的数据脚本集中在这1~2个小时.大量的读写以及查询导致集群负载飙升.写本文的这个早上我们的集群机器负载达到了1000,这个非常恐怖甚至不敢想象今后越来越多的数据处理工作如何进行,所以才有了实时同步的需求.

3.实时数仓建设
目前正在建设做大数据数据仓库的建设工作,每天全表刷新数仓所有数据非常耗时,且浪费资源.实时同步也为实时数仓建设打开了大门.之后对接 Flink 等实时处理工作也有极大的支持.

同步工具

在2018年中的时候就在关注实时同步,但是国内很多都是使用阿里DataX,不可否认是个好工具.但是我更希望能找到可以集成在Cloudera生态之内的同步工具,StreamSets让我眼前一亮,这不就是我需要的工具吗?由于一些原因.直到今年年末才有时间继续研究这一块.

话不多说,进入正题

1.Impala中创建KUDU表.

CREATE TABLE test.mgtest
(`_id` STRING NOT NULL, 
userid STRING, name string, 
age string, 
PRIMARY KEY (`_id`)) 
PARTITION BY HASH (`_id`) 
PARTITIONS 16 STORED AS KUDU 
TBLPROPERTIES ('kudu.master_addresses'='cdh2:7051', 'kudu.table_name'='test.mgtest')
image.png

2.配置流


image.png

2.1.Mongo数据源配置.


image.png

2.2.数据库选择.


image.png

2.3.数据处理.


image.png

代码如下:

for record in records:
  newRecord = sdcFunctions.createRecord(record.sourceId + ':newRecordId')
  try:      
    if record.value['op'] == 'i':
      newRecord.attributes['sdc.operation.type']='1'
      newRecord.value = record.value['o']
    if record.value['op'] == 'd':
      newRecord.attributes['sdc.operation.type']='2'
      newRecord.value = record.value['o']
    if record.value['op'] == 'u':
      newRecord.attributes['sdc.operation.type']='3'
      newRecord.value = record.value['o']['$set'] 
      newRecord.value['_id'] = record.value['o2']['_id']
    # Write record to processor output
    #newRecord.value['Type'] = record.value['Type']
    newRecord.value['ns'] = record.value['ns']
    newRecord.value['op'] = record.value['op']
    output.write(newRecord)
  except Exception as e:
    # Send record to error
    error.write(newRecord, str(e))

2.4.子文档处理


image.png

代码如下:

import array
import json

def convert_json(item):
  return json.dumps(item,ensure_ascii=False,encoding="utf-8")
def convert_array(item):
  return json.dumps(item,ensure_ascii=False,encoding="utf-8")
for record in records:
  try:
    for colName,value in record.value.items():
      temp = record.value[colName]
      record.value[colName] = None
      if sdcFunctions.getFieldNull(record,'/'+colName) is NULL_MAP:
        temp = convert_json(temp)
      elif sdcFunctions.getFieldNull(record,'/'+colName) is NULL_LIST:
        temp = convert_array(temp)
      record.value[colName] = temp
    output.write(record)
  except Exception as e:
    error.write(record, str(e))

2.5.CRUD选择器.


image.png

2.6.插入数据


image.png

更新和删除数据更改Default Operation 的值即可.


image.png

2.7 测试MongoDB 插入数据
2.7.1 Mongo插入数据


image.png

2.7.2 Impala查询


image.png

2.7.3 Streamsets流
image.png

2.8 Mongo更新数据
2.8.1 Mongo更新数据


image.png

2.8.2 Impala查询


image.png

2.8.3 StreamSets流
image.png

2.9 MongoDB删除数据
2.9.1 MongoDB删除数据


image.png

2.9.2 Impala查询


image.png

最后在impala使用的过程中,由于mongoDB子文档问题.需要升级到Impala版本3.1以上以支持get_json_object 这个function, 后续需要考虑的问题是Hive如何使用.

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。