Mongodb实现应用级集合数据同步(Nodejs版)

tms-mongodb-web(MongoDB的Web客户端,可对数据库进行可视化管理)项目中,通过replica set实现应用级集合复制能力。本文介绍了项目中使用的一些关键方法:用docker搭建MongoDB复制集;使用ChangeStream监听集合更新;使用子进程执行实时复制;MongoDB命令行执行js脚本等。

项目地址:https://github.com/jasony62/tms-mongodb-web

目标

允许用户在系统运行时建立或删除集合间的复制关系,基于这个关系,用户可手动执行全量复制(通过API),系统可自动执行实时增量复制(独立线程)。典型应用场景包括:1、将多个分散集合中的数据集中到同一个集合中,便于实现全局查找和处理;2、跟踪集合的数据变化,用异步线程对数据进行加工,减少单段代码的复杂度,例如:记录业务级日志。

本文主要介绍了多集合数据汇聚场景的实现方法。

MongoDB复制集

MongoDB自带replica set功能,用于实现多个MongoDB实例间数据的实时复制,从而实现数据备份和高可用。

复制集中的实例分为3种类型:Primary(主)Secondaries(从)Arbiter(仲裁)。客户端所有写操作都要通过Primary节点,默认情况下读操作也都通过主节点(客户端可以设置为支持多读);Secondaries节点从主节点实时接收变更数据;Arbiter节点不复制数据也不能成为主节点,只是参与投票。复制集最小配置通常采用一主两从的结构(PSS),但是如果资源有限,可以采取一主一从一仲裁的结构(PSA)。主节点执行写操作时会生成oplog,从节点通过执行oplog复制数据。当主节点不可用时,从节点可以通过选举机制成为从主节点。

建立复制集

下面说明如何在本地开发环境中通过docker建立复制集,从而简化开发过程。

开发环境只是为了进行功能验证,所以可以将复制集设置为PSA结构,这样占用的资源量最小。配置复制集前,需要先启动3个MongoDB的实例,我们通过docker-compose执行(下述命令行用于示例,实际执行请参照项目中的说明文档)。

docker-compose up mongodb mongodb-s mongodb-a

在docker-compose文件中需要将MongoDB的启动方式设置为复制集方式

entrypoint: [ "mongod", "--bind_ip_all", "--port", "37017", "--replSet", "tmw-rs" ]

为实现复制集的自动初始化,新建init_replicate.js文件,内容如下:

 rs.initiate({
  _id: 'tmw-rs', // 复制集名称
  members: [
    {
      _id: 0,
      host: 'host.docker.internal:37017', // 用host.docker.internal实现容器外部可以访问
      priority: 1,
    },
    {
      _id: 1,
      host: 'host.docker.internal:37018',
      priority: 0, // 不能作为主节点
    },
    {
      _id: 2,
      host: 'host.docker.internal:37019',
      arbiterOnly: true, // 作为仲裁节点
    },
  ],
})

注意:第二个节点设置priority: 0,使从节点不能升级主节点,保证每次重启后主节点都是固定的,便于调试。第三个节点设置为仲裁节点。

为了能够在容器外连接复制集,需要在主机上的hosts文件中添加host.docker.internal

新建文件init_replicate.sh,执行复制集初始化:

#!/bin/sh

# 复制配置文件到容器中
docker cp ./init_replicate.js tms-mongodb-mongo:/home/int_replicate.js

# 按照配置文件初始化复制集
docker exec -it tms-mongodb-mongo mongo --port 37017 /home/int_replicate.js

/bin/sh init_replicate.sh

测试复制集

连接主节点

mongo --port 37017

在主节点中添加数据

devrs:PRIMARY> db.cl01.insertOne({x: 1})

连接从节点查看数据

mongo --port 37018

允许从节点读

rs.secondaryOk()

查看是否已有复制的数据

devrs:SECONDARY> db.cl01.find()

连接到仲裁节点

show dbs

可以看到里面只有local数据库,并没有同步的数据库db01

客户端连接

MongoClient.connect('mongodb://host.docker.internal:37017,host.docker.internal:37018,host.docker.internal:37019/?replicaSet=tmw-rs')

监听集合变化

MongoDB 3.6版本开始提供ChangeStream,它可以订阅单个集合,单个数据库或者整个部署实例的数据变化。

const cl = mongoClient.db('tms_admin').collection('replica_map')
ReplicaMapWatcher = cl.watch([], { fullDocument: 'updateLookup' })

上面的代码是订阅集合tms_admin.replica_map的数据变化,集合的watch方法创建ChangeStream实例。

ReplicaMapWatcher.on('change', async (csEvent) => {
  ......
})

当数据发生变化时,会产生如下事件:

{
  operationType: 'insert',
  clusterTime: Timestamp { _bsontype: 'Timestamp', low_: 1, high_: 1606960043 },
  fullDocument: { _id: 5fc843abdf2330d645686e0f, x: 5 },
  ns: { db: 'db01', coll: 'cl01' },
  documentKey: { _id: 5fc843abdf2330d645686e0f }
}
{
  operationType: 'delete',
  clusterTime: Timestamp { _bsontype: 'Timestamp', low_: 1, high_: 1606960379 },
  ns: { db: 'db01', coll: 'cl01' },
  documentKey: { _id: 5fc843abdf2330d645686e0f }
}

注意deleteMany方法会产生多个单条事件。

{
  operationType: 'update',
  clusterTime: Timestamp { _bsontype: 'Timestamp', low_: 1, high_: 1606960612 },
  ns: { db: 'db01', coll: 'cl01' },
  documentKey: { _id: 5fc84245df2330d645686e0c },
  updateDescription: { updatedFields: [Object], removedFields: [] }
}

创建ChangeStream时可以设置选项fullDocument参数,这样在update事件中就可以返回document的完整信息。

{
  operationType: 'replace',
  clusterTime: Timestamp { _bsontype: 'Timestamp', low_: 1, high_: 1606960924 },
  fullDocument: { _id: 5fc84245df2330d645686e0c, x: 9 },
  ns: { db: 'db01', coll: 'cl01' },
  documentKey: { _id: 5fc84245df2330d645686e0c }
}

实现

记录数据复制关系

首先,建立集合tms_admin.replica_map记录复制关系。用户可以自己建立或删除集合间的复制关系。系统利用ChangeStream订阅tms_admin.replica_map的变化情况,自动建立或关闭集合间的实时复制。

字段 说明 类型 必填
_id mongodb内部id ObjectId
primary 主集合。 object
primary.db 主集合数据库名称。 string
primary.cl 主集合名称。 string
secondary 从集合。 object
secondary.db 从集合数据库名称。 string
secondary.cl 从集合名称。 string

进行数据复制时,为了追踪数据来源,系统在从集合中添加记录数据来源的字段__pri

内置字段 说明 必填 类型
__pri object
__pri.db 文档所属数据库名称(sysname)。 string
__pri.cl 文档所属集合名称(sysname)。 string
__pri.id 文档原始 id(_id)。 ObjectId
__pri.time 最近一次复制时间。 13 位整型

全量复制

因为在从集合中记录了文档来源__pri,复制时,先检查主集合的数据是否已经存在,存在就替换,否则插入新数据。

secSysCl.replaceOne({ '__pri.id': _id }, doc, { upsert: true })

从集合数据来源包含了文档最后1次更新的时间戳time,更新时间早于这个时间戳的文档,是主集合中已经删除的文档,应该在从集合中删除。

secSysCl
      .deleteMany({
        '__pri.db': pri.db,
        '__pri.cl': pri.cl,
        '__pri.time': { $not: { $eq: syncAt } },
      })

代码:/back/models/mgdb/replicaMap.js

实时复制

Nodejs是单线程的,虽然它的执行效率很高,但是如果有频繁的数据更新操作要处理,多少还是会影响处理效率,因此使用child_process将实时复制放到独立的线程中执行。

实时复制功能要求MongoDB必须部署为复制集方式,TMW设置了环境变量TMW_REALTIME_REPLICA,控制是否支持实时复制功能。TMW启动时,检查环境变量TMW_REALTIME_REPLICA是否有效,如果有就启动实时复制子线程。

const cp = require('child_process')
Replica_Child_Process = cp.spawn('node', ['./replica/watch.js'], {
  detached: true,
  stdio: 'ignore',
})
Replica_Child_Process.unref()

这里设置为子线程独立于主线程的方式,复制线程中的日志无法发送的主线程,因此,为了便于查看日志输出,应该通过配置文件/back/config/log4js.js将日志输出到文件。

module.exports = {
  appenders: {
    consoleout: { type: 'console' },
    fileout: {
      type: 'file',
      filename: './logs/back-logs.log',
      maxLogSize: 1024 * 1024,
    },
  },
  categories: {
    default: {
      appenders: ['consoleout', 'fileout'],
      level: process.env.TMS_APP_LOG4JS_LEVEL || 'debug',
    },
  },
}

代码:/back/replica/watch.js

命令行脚本复制

可以通过命令行在程序之外执行对MongoDB的操作,例如:初始化,生成测试数据等。

项目中自带了执行全量复制的js脚本,便于系统已经初始化操作。

mongo --port 37017 ./back/replica/synchronize.js

代码:/back/replica/synchronize.js

参考

https://docs.mongodb.com/manual/changeStreams/

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,324评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,303评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,192评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,555评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,569评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,566评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,927评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,583评论 0 257
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,827评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,590评论 2 320
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,669评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,365评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,941评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,928评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,159评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,880评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,399评论 2 342

推荐阅读更多精彩内容