在
tms-mongodb-web
(MongoDB的Web客户端,可对数据库进行可视化管理)项目中,通过replica set
实现应用级集合复制能力。本文介绍了项目中使用的一些关键方法:用docker搭建MongoDB复制集;使用ChangeStream
监听集合更新;使用子进程执行实时复制;MongoDB命令行执行js脚本等。
项目地址:https://github.com/jasony62/tms-mongodb-web
目标
允许用户在系统运行时建立或删除集合间的复制关系,基于这个关系,用户可手动执行全量复制(通过API),系统可自动执行实时增量复制(独立线程)。典型应用场景包括:1、将多个分散集合中的数据集中到同一个集合中,便于实现全局查找和处理;2、跟踪集合的数据变化,用异步线程对数据进行加工,减少单段代码的复杂度,例如:记录业务级日志。
本文主要介绍了多集合数据汇聚场景的实现方法。
MongoDB复制集
MongoDB自带replica set
功能,用于实现多个MongoDB实例间数据的实时复制,从而实现数据备份和高可用。
复制集中的实例分为3种类型:Primary(主)
,Secondaries(从)
和Arbiter(仲裁)
。客户端所有写操作都要通过Primary节点,默认情况下读操作也都通过主节点(客户端可以设置为支持多读);Secondaries节点从主节点实时接收变更数据;Arbiter节点不复制数据也不能成为主节点,只是参与投票。复制集最小配置通常采用一主两从的结构(PSS),但是如果资源有限,可以采取一主一从一仲裁的结构(PSA)。主节点执行写操作时会生成oplog
,从节点通过执行oplog
复制数据。当主节点不可用时,从节点可以通过选举机制成为从主节点。
建立复制集
下面说明如何在本地开发环境中通过docker建立复制集,从而简化开发过程。
开发环境只是为了进行功能验证,所以可以将复制集设置为PSA
结构,这样占用的资源量最小。配置复制集前,需要先启动3个MongoDB的实例,我们通过docker-compose执行(下述命令行用于示例,实际执行请参照项目中的说明文档)。
docker-compose up mongodb mongodb-s mongodb-a
在docker-compose文件中需要将MongoDB的启动方式设置为复制集方式
entrypoint: [ "mongod", "--bind_ip_all", "--port", "37017", "--replSet", "tmw-rs" ]
为实现复制集的自动初始化,新建init_replicate.js
文件,内容如下:
rs.initiate({
_id: 'tmw-rs', // 复制集名称
members: [
{
_id: 0,
host: 'host.docker.internal:37017', // 用host.docker.internal实现容器外部可以访问
priority: 1,
},
{
_id: 1,
host: 'host.docker.internal:37018',
priority: 0, // 不能作为主节点
},
{
_id: 2,
host: 'host.docker.internal:37019',
arbiterOnly: true, // 作为仲裁节点
},
],
})
注意:第二个节点设置priority: 0
,使从节点不能升级主节点,保证每次重启后主节点都是固定的,便于调试。第三个节点设置为仲裁节点。
为了能够在容器外连接复制集,需要在主机上的hosts
文件中添加host.docker.internal
。
新建文件init_replicate.sh
,执行复制集初始化:
#!/bin/sh
# 复制配置文件到容器中
docker cp ./init_replicate.js tms-mongodb-mongo:/home/int_replicate.js
# 按照配置文件初始化复制集
docker exec -it tms-mongodb-mongo mongo --port 37017 /home/int_replicate.js
/bin/sh init_replicate.sh
测试复制集
连接主节点
mongo --port 37017
在主节点中添加数据
devrs:PRIMARY> db.cl01.insertOne({x: 1})
连接从节点查看数据
mongo --port 37018
允许从节点读
rs.secondaryOk()
查看是否已有复制的数据
devrs:SECONDARY> db.cl01.find()
连接到仲裁节点
show dbs
可以看到里面只有local
数据库,并没有同步的数据库db01
。
客户端连接
MongoClient.connect('mongodb://host.docker.internal:37017,host.docker.internal:37018,host.docker.internal:37019/?replicaSet=tmw-rs')
监听集合变化
MongoDB 3.6版本开始提供ChangeStream
,它可以订阅单个集合,单个数据库或者整个部署实例的数据变化。
const cl = mongoClient.db('tms_admin').collection('replica_map')
ReplicaMapWatcher = cl.watch([], { fullDocument: 'updateLookup' })
上面的代码是订阅集合tms_admin.replica_map
的数据变化,集合的watch
方法创建ChangeStream
实例。
ReplicaMapWatcher.on('change', async (csEvent) => {
......
})
当数据发生变化时,会产生如下事件:
{
operationType: 'insert',
clusterTime: Timestamp { _bsontype: 'Timestamp', low_: 1, high_: 1606960043 },
fullDocument: { _id: 5fc843abdf2330d645686e0f, x: 5 },
ns: { db: 'db01', coll: 'cl01' },
documentKey: { _id: 5fc843abdf2330d645686e0f }
}
{
operationType: 'delete',
clusterTime: Timestamp { _bsontype: 'Timestamp', low_: 1, high_: 1606960379 },
ns: { db: 'db01', coll: 'cl01' },
documentKey: { _id: 5fc843abdf2330d645686e0f }
}
注意:deleteMany
方法会产生多个单条事件。
{
operationType: 'update',
clusterTime: Timestamp { _bsontype: 'Timestamp', low_: 1, high_: 1606960612 },
ns: { db: 'db01', coll: 'cl01' },
documentKey: { _id: 5fc84245df2330d645686e0c },
updateDescription: { updatedFields: [Object], removedFields: [] }
}
创建ChangeStream
时可以设置选项fullDocument
参数,这样在update
事件中就可以返回document
的完整信息。
{
operationType: 'replace',
clusterTime: Timestamp { _bsontype: 'Timestamp', low_: 1, high_: 1606960924 },
fullDocument: { _id: 5fc84245df2330d645686e0c, x: 9 },
ns: { db: 'db01', coll: 'cl01' },
documentKey: { _id: 5fc84245df2330d645686e0c }
}
实现
记录数据复制关系
首先,建立集合tms_admin.replica_map
记录复制关系。用户可以自己建立或删除集合间的复制关系。系统利用ChangeStream
订阅tms_admin.replica_map
的变化情况,自动建立或关闭集合间的实时复制。
字段 | 说明 | 类型 | 必填 |
---|---|---|---|
_id |
mongodb 内部id
|
ObjectId | 是 |
primary | 主集合。 | object | 是 |
primary.db | 主集合数据库名称。 | string | 是 |
primary.cl | 主集合名称。 | string | 是 |
secondary | 从集合。 | object | 是 |
secondary.db | 从集合数据库名称。 | string | 是 |
secondary.cl | 从集合名称。 | string | 是 |
进行数据复制时,为了追踪数据来源,系统在从集合中添加记录数据来源的字段__pri
。
内置字段 | 说明 | 必填 | 类型 |
---|---|---|---|
__pri | 是 | object | |
__pri.db | 文档所属数据库名称(sysname)。 | 是 | string |
__pri.cl | 文档所属集合名称(sysname)。 | 是 | string |
__pri.id | 文档原始 id(_id)。 | 是 | ObjectId |
__pri.time | 最近一次复制时间。 | 是 | 13 位整型 |
全量复制
因为在从集合中记录了文档来源__pri
,复制时,先检查主集合的数据是否已经存在,存在就替换,否则插入新数据。
secSysCl.replaceOne({ '__pri.id': _id }, doc, { upsert: true })
从集合数据来源包含了文档最后1次更新的时间戳time
,更新时间早于这个时间戳的文档,是主集合中已经删除的文档,应该在从集合中删除。
secSysCl
.deleteMany({
'__pri.db': pri.db,
'__pri.cl': pri.cl,
'__pri.time': { $not: { $eq: syncAt } },
})
代码:/back/models/mgdb/replicaMap.js
实时复制
Nodejs是单线程的,虽然它的执行效率很高,但是如果有频繁的数据更新操作要处理,多少还是会影响处理效率,因此使用child_process
将实时复制放到独立的线程中执行。
实时复制功能要求MongoDB必须部署为复制集方式,TMW设置了环境变量TMW_REALTIME_REPLICA
,控制是否支持实时复制功能。TMW启动时,检查环境变量TMW_REALTIME_REPLICA
是否有效,如果有就启动实时复制子线程。
const cp = require('child_process')
Replica_Child_Process = cp.spawn('node', ['./replica/watch.js'], {
detached: true,
stdio: 'ignore',
})
Replica_Child_Process.unref()
这里设置为子线程独立于主线程的方式,复制线程中的日志无法发送的主线程,因此,为了便于查看日志输出,应该通过配置文件/back/config/log4js.js
将日志输出到文件。
module.exports = {
appenders: {
consoleout: { type: 'console' },
fileout: {
type: 'file',
filename: './logs/back-logs.log',
maxLogSize: 1024 * 1024,
},
},
categories: {
default: {
appenders: ['consoleout', 'fileout'],
level: process.env.TMS_APP_LOG4JS_LEVEL || 'debug',
},
},
}
代码:/back/replica/watch.js
命令行脚本复制
可以通过命令行在程序之外执行对MongoDB的操作,例如:初始化,生成测试数据等。
项目中自带了执行全量复制的js脚本,便于系统已经初始化操作。
mongo --port 37017 ./back/replica/synchronize.js
代码:/back/replica/synchronize.js