一次多进程优化实践

我们先从最近手里的一个项目说起,其实这个项目的需求很简单:从多数据源抓取数据,进行数据整合以后分别存入到不同的数据库中。当然不同数据库存储的数据结构是不一样的,但是也只是对数据源 attribute 的重新过滤组合而已。

当每一份数据的体量以及整个数据的体量变得很大的时候,系统的实时性大大降低(因为在数据库的另一端是有客户端等着将数据进行可视化展示的),这个时候我就不得不好好审视一下我的代码结构了。早先在设计的时候,因为没有考虑到数据这么大的体量,并且为了增强日志文件的可读性和一些客观限制,我没有选择大量的异步并发。

为了后面描述比较方便,我先罗列一下这些客观限制:

  • Azure Cosmos DB 是对并发数量有限制的,大概在 20次/s 的时候会发出警告。
  • Azure Cosmos DB 是对 Stored Procedurerequest body 以及 response body 有限制的。
  • Node 的异步数量是存在限制的,数据库连接数也是存在限制的。

初步模型

单进程单线程同步跑任务,这个速度肯定是我们接受不了的。那我们开始优化我们的架构。对于这种多数据源多任务的场景,生产者消费者 的行为模式作为基础应该是最合适不过了。这样我们就可以把业务逻辑完全解耦为:

  • 生产者从数据源抓数据整理为 数据单元 放到缓存队列。
  • 消费者从缓存队列拿出 数据单元 进行处理。

解耦完成后,我们对这两部分一一的审视。这样单一性的任务我们能很好的进行多线程、多进程的处理,这里我选择在生产者的 单一数据源 ,消费者的 文件-数据库(一个数据源会产生多个文件,每份文件要存储到多个数据库,这里的意思是一个文件存储到一个数据库) 粒度上选择多进程处理。

  • 生产者
const cluster = require('cluster');
const cpuNum = require('os').cpus().length;
const redis = require('redis');
const sourceList = [...];
const sourceIndex = 0;

if (cluster.isMaster) {
    for (let i = 0; i < cpuNum; i++) {
        cluster.fork();
    }
} else {
    if (sourceIndex < sourceList.length) {
        // 选择数据源
        let source = sourceList[++sourceIndex];
        // 抓数据并过滤成对象,存入redis
        let dataObj = catchAndParser(source);
        ...
    }
}
  • 消费者
const cluster = require('cluster');
const cpuNum = require('os').cpus().length;
const redis = require('redis');
const worker = require('./worker');

let workers = {
    mssql: worker.mssql,
    documentdb: worker.documentdb,
    ...
}

if (cluster.isMaster) {
    for (let i = 0; i < cpuNum; i++) {
        cluster.fork();
    }
} else {
    // 从redis里取任务
    let task = getTaskFromRedis();
    // 进行数据重组以及存储
    worker[task.type](task);
}

这样我们会发现我们起的进程数达到 CUP 的两倍了,其实这边可以自己根据任务是否会吃满 CUP ,会吃多少,并结合进程切换的时间以及内存问题来调整进程数量。到现在我们完成了我们的初步模型,但是有一个很麻烦的问题暴露在了我们面前,我们现在的逻辑没有去维护连接池,也就是说我们会在 Worker 进程中让它自己去进行连接,完成任务再断开连接。这样显然也是我们不希望去看到的。

连接池

最初我有两种设想:

  • 在主进程建立连接池,然后所有 Worker 需要连接的时候就去这个地方取来用。
  • 在每一个进程当中去维护自己的连接池。

思来想去,咨询前辈后发现第一种想法有点太傻了。先不说父子进程间监听 socket 的问题,光是子进程对取回来的连接进行复用这一点上也是对性能的浪费。所以当然是每个进程维护自己的连接池比较好,这样不论是多线程还是异步的情况下,都能不让连接成为性能的瓶颈。

在实现上,我们需要放在全局声明连接池,这样因为子进程是 fork 出去的,也会在自己的进程当中声明连接池。完成了我们每个进程一个连接池的目的。

参考:Nodejs Cluster with MySQL connections

var cluster = require('cluster');
var http = require('http');
var numCPUs = require('os').cpus().length;

var mysql = require('mysql');
var pool  = mysql.createPool({
   connectionLimit : 10,
   host            : 'example.org',
   user            : 'bob',
   password        : 'secret'
});

if (cluster.isMaster) {
    ...
} else {
    ...
}

数据库层面

其实我们在数据库层面的操作也比较简单:

  • 根据某个字段获取数据库里的内容(避免设计子增主键的部分重复存储)
  • 批量存储

批量存储

可以看到性能瓶颈的地方就在第二点,这里不同的数据库就会有不同的支持。对于 Documentdb,可以将批量数据封装成数组全部丢给 Stored procedure ,但其实 Stored procedure 中也是一个一个数据存入数据库中。(这里与数据库是多线程还是单线程有关,后面会去研究一下 Documentdb 的底层再来补坑)。对于前面提到的客观条件1、2,其实是可以把数据包分包再异步,如果并发数量超过限制再使用队列管理异步。参考:Azure Cosmos DB server-side programming: Stored procedures

而对于 Sql server,就可以选择用 bulk 还是 insertsql 语句中 OPENROWSET(BULK...) 选项都是可行的,但是从代码组织来看用 bulk 是更好的选择。 参考:使用 BULK INSERT 或 OPENROWSET(BULK...) 导入批量数据

批量查找

数量较少时,我觉得放在服务器端异步并发比较好。数量比较多时就把任务交给服务器端的 Stored procedure 处理。比较麻烦的是, Sql serverSP 是不接受数组的,可以通过字符串操作分隔符来模拟数组。参考:sql server 模拟数组


2017年9月21日更新

上一次的文章当中有一些地方有错误的地方和一些需要完善的地方,这边进行指出并更新。

错误

生产者的伪代码中直接使用了数组来分发数据源的方法是完全错误的,其实每个进程都会拷贝一份代码去执行,这种方法需要让每个进程中去共享 sourceIndex 才能够实现。而我们这边的子进程是通过父进程 fork 所得,所以需要在父进程来维护 sourceIndex,分发给子进程。或者也直接使用消息队列来实现这一部分的功能。

const cluster = require('cluster');
const cpuNum = require('os').cpus().length;
const redis = require('redis');
const sourceList = [...];
const sourceIndex = 0;

if (cluster.isMaster) {
    for (let i = 0; i < cpuNum; i++) {
        let worker = cluster.fork();
        worker.send(sourceList[sourceIndex]);
        sourceindex++;
    }
} else {
    process.on('message', (source) => {
        let dataObj = catchAndParser(source);
    })
}

完善

rsmq 是一个基于 redis 封装好的消息队列的库,使用起来也比较方便。唯一不太好的地方是没有封装 循环队列 ,这使得场景下处理起来比较麻烦。比如我现在的任务是循环不变的,做完了又重头做。

const cluster = require('cluster');
const cpuNum = require('os').cpus().length;
const rsmq = require('rsmq');

let worker = async (worker, process) => {
    // 封装好的promise对象
    let task = await rsmq.receiveMessage('qName');
    if (!task) {
        worker.send('finished');
        process.kill();
    } else {
        // 执行任务
        ...
        process.kill();
    }
}

if (cluster.isMaster) {
    for (let i = 0; i < cpuNum; i++) {
        cluster.fork();
    }
    
    cluster.on('exit', (worker, code, signal) => {
        console.log('Worker ' + worker.process.pid + ' died with code: ' + code + ', and signal: ' + signal);
        console.log('Starting a new worker');
        // 退出后重启进程
        cluster.fork();
    });
    
    cluster.on('message', (msg) => {
        if (msg === 'finished') {
            // 关闭退出重启cluster.removeAllListener('exit');
            // 重新填充数据源
            ...
            // 重新启动所有进程
            for (let i = 0; i < cpuNum; i++) {
                cluster.fork();
            }
            
            cluster.on('exit', (worker, code, signal) => {
                console.log('Worker ' + worker.process.pid + ' died with code: ' + code + ', and signal: ' + signal);
                console.log('Starting a new worker');
                // 退出后重启进程
                cluster.fork();
            });
        }
        
    })
} else {
    worker(cluster.worker, process);
}

当然这种实现只是一个思路,而且不是特别好,因为每次做完任务之后都需要重启一个 worker 进程,可以长期保持几个固定的进程,主进程通过通信来分发任务。不用浪费资源去重启进程。

补充

  • 在一个场景下,我希望使用前台的开关来控制我后台的进程。这种时候因为 fork 进程的句柄在打开开关的那个进程中,我无法在后面关闭的进程当中获取。所以其实可以在启动进程的时候,将进程号发送到消息队列中,关闭时进行关闭。(我这里描述的场景是一个开关同时控制多个任务,如果一个开关控制一个还是直接以任务名做 key, 进程号作为 value 存入 redis 中比较好)
  • 通过 fork 出来的子进程,stdin, stdout, stderr 三个流都会复制父进程,如果需要重定向,需要在 fork 时进行配置。
const cp = require('child_process');
const fs = require('fs');

let main = () => {
  let worker = cp.fork('./test.js', { silent: true });
  let fileStream = fs.createWriteStream('./output');

  worker.stdout.pipe(fileStream);
};

main();
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,293评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,604评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,958评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,729评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,719评论 5 366
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,630评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,000评论 3 397
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,665评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,909评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,646评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,726评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,400评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,986评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,959评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,197评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,996评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,481评论 2 342

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,594评论 18 139
  • Android 自定义View的各种姿势1 Activity的显示之ViewRootImpl详解 Activity...
    passiontim阅读 171,428评论 25 707
  • 看了大触的教程就以为自己能上天,能和太阳肩并肩了。 事实上,咸鱼还是咸鱼,翻身也会粘锅的
    废柴玮阅读 761评论 15 13
  • 从土地里抽出的草锋利异常割破天空的旭日流血,染红我高高举起的右手燃烧啊!像古树般从火焰中重生一颗心脏石头般火热原始...
    竹桦阅读 374评论 14 12
  • 思维指合乎逻辑的客观认识事物的能力;情感指站在对方立场上给予理解和支持的能力;感觉指运用五官来认识和把握现实世界的...
    持横执矩阅读 392评论 0 0