MongoDB实时同步Impala

背景

由于越来越多的Mysql数据以及Mongodb的数据需要做分析.但是大量的数据分析并不适合在Mysql以及Mongodb中进行,所以需要将数据同步指CDH中, 由于2方面原因所以才有了实时同步的需求.

1.数据延后性
由于Sqoop/Mongoexport 数据的延后性,导致CDH只能作为分析平台为分析人员提供数据支持,针对于分析视角而言是没问题的.应用系统基本会从性能的角度考虑,从而限制用户导出数据的条数,如果从基层业务人员需要从应用系统导出大量数据时问题来了.为什么你们提供给我的数据和应用系统的数据不一致呢?为什么不能导出最新的数据给我呢?我需要用最新的数据处理XX业务问题,但是应用系统只能每次导出500条,我有10000条需要导出,难道要导出20次吗?

2.集群性能压力
如果公司有100个数据库,而大家都知道Sqoop/Mongoexport操作 都是会影响集群性能负载,最初有考虑将所有Sqoop/Mongoexport分散在不同的服务器以解决此问题.但是Sqoop同步的时候由于是Yarn管理资源.所以会有一定限制.如果这么多数据库一直不停的同步.可能需要10个小时(数据库数据大小决定)甚至10小时以上,大量的ETL脚本一般都是需要在上班之前呈现给业务方.那么数据库如果是在晚上同步的话,那么早上上班前可能只有1~2个小时的时间去做数据的ETL工作.导致了大量的数据脚本集中在这1~2个小时.大量的读写以及查询导致集群负载飙升.写本文的这个早上我们的集群机器负载达到了1000,这个非常恐怖甚至不敢想象今后越来越多的数据处理工作如何进行,所以才有了实时同步的需求.

3.实时数仓建设
目前正在建设做大数据数据仓库的建设工作,每天全表刷新数仓所有数据非常耗时,且浪费资源.实时同步也为实时数仓建设打开了大门.之后对接 Flink 等实时处理工作也有极大的支持.

同步工具

在2018年中的时候就在关注实时同步,但是国内很多都是使用阿里DataX,不可否认是个好工具.但是我更希望能找到可以集成在Cloudera生态之内的同步工具,StreamSets让我眼前一亮,这不就是我需要的工具吗?由于一些原因.直到今年年末才有时间继续研究这一块.

话不多说,进入正题

1.Impala中创建KUDU表.

CREATE TABLE test.mgtest
(`_id` STRING NOT NULL, 
userid STRING, name string, 
age string, 
PRIMARY KEY (`_id`)) 
PARTITION BY HASH (`_id`) 
PARTITIONS 16 STORED AS KUDU 
TBLPROPERTIES ('kudu.master_addresses'='cdh2:7051', 'kudu.table_name'='test.mgtest')
image.png

2.配置流


image.png

2.1.Mongo数据源配置.


image.png

2.2.数据库选择.


image.png

2.3.数据处理.


image.png

代码如下:

for record in records:
  newRecord = sdcFunctions.createRecord(record.sourceId + ':newRecordId')
  try:      
    if record.value['op'] == 'i':
      newRecord.attributes['sdc.operation.type']='1'
      newRecord.value = record.value['o']
    if record.value['op'] == 'd':
      newRecord.attributes['sdc.operation.type']='2'
      newRecord.value = record.value['o']
    if record.value['op'] == 'u':
      newRecord.attributes['sdc.operation.type']='3'
      newRecord.value = record.value['o']['$set'] 
      newRecord.value['_id'] = record.value['o2']['_id']
    # Write record to processor output
    #newRecord.value['Type'] = record.value['Type']
    newRecord.value['ns'] = record.value['ns']
    newRecord.value['op'] = record.value['op']
    output.write(newRecord)
  except Exception as e:
    # Send record to error
    error.write(newRecord, str(e))

2.4.子文档处理


image.png

代码如下:

import array
import json

def convert_json(item):
  return json.dumps(item,ensure_ascii=False,encoding="utf-8")
def convert_array(item):
  return json.dumps(item,ensure_ascii=False,encoding="utf-8")
for record in records:
  try:
    for colName,value in record.value.items():
      temp = record.value[colName]
      record.value[colName] = None
      if sdcFunctions.getFieldNull(record,'/'+colName) is NULL_MAP:
        temp = convert_json(temp)
      elif sdcFunctions.getFieldNull(record,'/'+colName) is NULL_LIST:
        temp = convert_array(temp)
      record.value[colName] = temp
    output.write(record)
  except Exception as e:
    error.write(record, str(e))

2.5.CRUD选择器.


image.png

2.6.插入数据


image.png

更新和删除数据更改Default Operation 的值即可.


image.png

2.7 测试MongoDB 插入数据
2.7.1 Mongo插入数据


image.png

2.7.2 Impala查询


image.png

2.7.3 Streamsets流
image.png

2.8 Mongo更新数据
2.8.1 Mongo更新数据


image.png

2.8.2 Impala查询


image.png

2.8.3 StreamSets流
image.png

2.9 MongoDB删除数据
2.9.1 MongoDB删除数据


image.png

2.9.2 Impala查询


image.png

最后在impala使用的过程中,由于mongoDB子文档问题.需要升级到Impala版本3.1以上以支持get_json_object 这个function, 后续需要考虑的问题是Hive如何使用.

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 217,406评论 6 503
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,732评论 3 393
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 163,711评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,380评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,432评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,301评论 1 301
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,145评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,008评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,443评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,649评论 3 334
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,795评论 1 347
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,501评论 5 345
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,119评论 3 328
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,731评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,865评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,899评论 2 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,724评论 2 354