我们先从最近手里的一个项目说起,其实这个项目的需求很简单:从多数据源抓取数据,进行数据整合以后分别存入到不同的数据库中。当然不同数据库存储的数据结构是不一样的,但是也只是对数据源 attribute 的重新过滤组合而已。
当每一份数据的体量以及整个数据的体量变得很大的时候,系统的实时性大大降低(因为在数据库的另一端是有客户端等着将数据进行可视化展示的),这个时候我就不得不好好审视一下我的代码结构了。早先在设计的时候,因为没有考虑到数据这么大的体量,并且为了增强日志文件的可读性和一些客观限制,我没有选择大量的异步并发。
为了后面描述比较方便,我先罗列一下这些客观限制:
Azure Cosmos DB
是对并发数量有限制的,大概在20次/s
的时候会发出警告。Azure Cosmos DB
是对Stored Procedure
的request body
以及response body
有限制的。Node
的异步数量是存在限制的,数据库连接数也是存在限制的。
初步模型
单进程单线程同步跑任务,这个速度肯定是我们接受不了的。那我们开始优化我们的架构。对于这种多数据源多任务的场景,生产者消费者 的行为模式作为基础应该是最合适不过了。这样我们就可以把业务逻辑完全解耦为:
- 生产者从数据源抓数据整理为 数据单元 放到缓存队列。
- 消费者从缓存队列拿出 数据单元 进行处理。
解耦完成后,我们对这两部分一一的审视。这样单一性的任务我们能很好的进行多线程、多进程的处理,这里我选择在生产者的 单一数据源 ,消费者的 文件-数据库(一个数据源会产生多个文件,每份文件要存储到多个数据库,这里的意思是一个文件存储到一个数据库) 粒度上选择多进程处理。
- 生产者
const cluster = require('cluster');
const cpuNum = require('os').cpus().length;
const redis = require('redis');
const sourceList = [...];
const sourceIndex = 0;
if (cluster.isMaster) {
for (let i = 0; i < cpuNum; i++) {
cluster.fork();
}
} else {
if (sourceIndex < sourceList.length) {
// 选择数据源
let source = sourceList[++sourceIndex];
// 抓数据并过滤成对象,存入redis
let dataObj = catchAndParser(source);
...
}
}
- 消费者
const cluster = require('cluster');
const cpuNum = require('os').cpus().length;
const redis = require('redis');
const worker = require('./worker');
let workers = {
mssql: worker.mssql,
documentdb: worker.documentdb,
...
}
if (cluster.isMaster) {
for (let i = 0; i < cpuNum; i++) {
cluster.fork();
}
} else {
// 从redis里取任务
let task = getTaskFromRedis();
// 进行数据重组以及存储
worker[task.type](task);
}
这样我们会发现我们起的进程数达到 CUP
的两倍了,其实这边可以自己根据任务是否会吃满 CUP
,会吃多少,并结合进程切换的时间以及内存问题来调整进程数量。到现在我们完成了我们的初步模型,但是有一个很麻烦的问题暴露在了我们面前,我们现在的逻辑没有去维护连接池,也就是说我们会在 Worker
进程中让它自己去进行连接,完成任务再断开连接。这样显然也是我们不希望去看到的。
连接池
最初我有两种设想:
- 在主进程建立连接池,然后所有
Worker
需要连接的时候就去这个地方取来用。 - 在每一个进程当中去维护自己的连接池。
思来想去,咨询前辈后发现第一种想法有点太傻了。先不说父子进程间监听 socket
的问题,光是子进程对取回来的连接进行复用这一点上也是对性能的浪费。所以当然是每个进程维护自己的连接池比较好,这样不论是多线程还是异步的情况下,都能不让连接成为性能的瓶颈。
在实现上,我们需要放在全局声明连接池,这样因为子进程是 fork
出去的,也会在自己的进程当中声明连接池。完成了我们每个进程一个连接池的目的。
参考:Nodejs Cluster with MySQL connections
var cluster = require('cluster');
var http = require('http');
var numCPUs = require('os').cpus().length;
var mysql = require('mysql');
var pool = mysql.createPool({
connectionLimit : 10,
host : 'example.org',
user : 'bob',
password : 'secret'
});
if (cluster.isMaster) {
...
} else {
...
}
数据库层面
其实我们在数据库层面的操作也比较简单:
- 根据某个字段获取数据库里的内容(避免设计子增主键的部分重复存储)
- 批量存储
批量存储
可以看到性能瓶颈的地方就在第二点,这里不同的数据库就会有不同的支持。对于 Documentdb
,可以将批量数据封装成数组全部丢给 Stored procedure
,但其实 Stored procedure
中也是一个一个数据存入数据库中。(这里与数据库是多线程还是单线程有关,后面会去研究一下 Documentdb
的底层再来补坑)。对于前面提到的客观条件1、2,其实是可以把数据包分包再异步,如果并发数量超过限制再使用队列管理异步。参考:Azure Cosmos DB server-side programming: Stored procedures
而对于 Sql server
,就可以选择用 bulk
还是 insert
的 sql
语句中 OPENROWSET(BULK...)
选项都是可行的,但是从代码组织来看用 bulk
是更好的选择。 参考:使用 BULK INSERT 或 OPENROWSET(BULK...) 导入批量数据
批量查找
数量较少时,我觉得放在服务器端异步并发比较好。数量比较多时就把任务交给服务器端的 Stored procedure
处理。比较麻烦的是, Sql server
的 SP 是不接受数组的,可以通过字符串操作分隔符来模拟数组。参考:sql server 模拟数组
2017年9月21日更新
上一次的文章当中有一些地方有错误的地方和一些需要完善的地方,这边进行指出并更新。
错误
生产者的伪代码中直接使用了数组来分发数据源的方法是完全错误的,其实每个进程都会拷贝一份代码去执行,这种方法需要让每个进程中去共享 sourceIndex
才能够实现。而我们这边的子进程是通过父进程 fork
所得,所以需要在父进程来维护 sourceIndex
,分发给子进程。或者也直接使用消息队列来实现这一部分的功能。
const cluster = require('cluster');
const cpuNum = require('os').cpus().length;
const redis = require('redis');
const sourceList = [...];
const sourceIndex = 0;
if (cluster.isMaster) {
for (let i = 0; i < cpuNum; i++) {
let worker = cluster.fork();
worker.send(sourceList[sourceIndex]);
sourceindex++;
}
} else {
process.on('message', (source) => {
let dataObj = catchAndParser(source);
})
}
完善
rsmq
是一个基于 redis
封装好的消息队列的库,使用起来也比较方便。唯一不太好的地方是没有封装 循环队列 ,这使得场景下处理起来比较麻烦。比如我现在的任务是循环不变的,做完了又重头做。
const cluster = require('cluster');
const cpuNum = require('os').cpus().length;
const rsmq = require('rsmq');
let worker = async (worker, process) => {
// 封装好的promise对象
let task = await rsmq.receiveMessage('qName');
if (!task) {
worker.send('finished');
process.kill();
} else {
// 执行任务
...
process.kill();
}
}
if (cluster.isMaster) {
for (let i = 0; i < cpuNum; i++) {
cluster.fork();
}
cluster.on('exit', (worker, code, signal) => {
console.log('Worker ' + worker.process.pid + ' died with code: ' + code + ', and signal: ' + signal);
console.log('Starting a new worker');
// 退出后重启进程
cluster.fork();
});
cluster.on('message', (msg) => {
if (msg === 'finished') {
// 关闭退出重启cluster.removeAllListener('exit');
// 重新填充数据源
...
// 重新启动所有进程
for (let i = 0; i < cpuNum; i++) {
cluster.fork();
}
cluster.on('exit', (worker, code, signal) => {
console.log('Worker ' + worker.process.pid + ' died with code: ' + code + ', and signal: ' + signal);
console.log('Starting a new worker');
// 退出后重启进程
cluster.fork();
});
}
})
} else {
worker(cluster.worker, process);
}
当然这种实现只是一个思路,而且不是特别好,因为每次做完任务之后都需要重启一个 worker
进程,可以长期保持几个固定的进程,主进程通过通信来分发任务。不用浪费资源去重启进程。
补充
- 在一个场景下,我希望使用前台的开关来控制我后台的进程。这种时候因为
fork
进程的句柄在打开开关的那个进程中,我无法在后面关闭的进程当中获取。所以其实可以在启动进程的时候,将进程号发送到消息队列中,关闭时进行关闭。(我这里描述的场景是一个开关同时控制多个任务,如果一个开关控制一个还是直接以任务名做key
, 进程号作为value
存入redis
中比较好) - 通过
fork
出来的子进程,stdin
,stdout
,stderr
三个流都会复制父进程,如果需要重定向,需要在fork
时进行配置。
const cp = require('child_process');
const fs = require('fs');
let main = () => {
let worker = cp.fork('./test.js', { silent: true });
let fileStream = fs.createWriteStream('./output');
worker.stdout.pipe(fileStream);
};
main();