前言
通过前边的学习,大家应该已经充分理解了node的单线程只不过是js层面的单线程,是基于V8引擎的单线程,因为,V8的缘故,前后端的js执行模型基本上是类似的,但是node的内核机制依然是通过libuv调用epoll或者IOCP的多线程机制。换句话说,node从严格意义上讲,并非是真正的单线程架构,node内核自身有一定的IO线程和IO线程池,通过libuv的调度,直接使用了操作系统层面的多线程。node的开发者,可以通过扩展c/c++模块来直接操纵多线程来提高效率。不过,单线程带来的好处是程序状态单一,没有锁、线程同步、线程上下文切换等问题。但是单线程的程序,并非是完美的。现在的服务器很多都是多cpu,多cpu核心的,一个node实例只能利用一个cpu核心,那么其他的cpu核心不就浪费了吗?并且,单线程的容错也很弱,一旦抛出了没有捕获的异常,必将引起整个程序的崩溃,那这样的程序必然是非常脆弱的,这样的服务器端语言又有什么价值呢?
朴灵在第九章的开篇就提出了两个问题:
1.如何让node充分利用多核cpu服务器?
2.如何保证node进程的健壮性和稳定性?
服务模型的变迁
经历了同步(qps为1/n)、复制进程(预先赋值一定数量的进程,prefork,但是,一旦用超了,还是跟同步的服务器一样,qps为m/n)、多线程(qps为m*l/n,这种模型,当并发上万后,内存耗用的问题将会暴露出来也就是C10k问题,apache就是采用了这样的多线程、多进程架构)和事件驱动等几个不同的模型。
多进程架构
面对单进程单线对多核使用不足的问题,前人的经验是启动多个进程,理想状态下,每个进程各自利用一个cpu,以此实现多核cpu的利用。node提供了child_process模块,并提供了child_process.fork()函数来实现进程的复制。我们来看一下代码:
var http = require('http');
http.createServer(function (req, res) {
res.writeHead(200, {'Content-Type': 'text/plain'});
res.end('Hello World\n');
}).listen(Math.round((1 + Math.random()) * 1000), '127.0.0.1');
这段代码,是node启动web服务的经典代码,然后我们根据master-workers的架构,来添加master.js模块。
var fork = require('child_process').fork;
var cpus = require('os').cpus();
for (var i = 0; i < cpus.length; i++) {
fork('./worker.js');
}
//node master.js
这两段代码会根据当前机器上的cpu数量,复制出对应node进程数,在*nix下,可以通过ps aux | grep worker.js查看到进程的数量。
$ ps aux | grep worker.js
jacksontian 1475 0.0 0.0 2432768 600 s003 S+ 3:27AM 0:00.00 grep worker.js
jacksontian 1440 0.0 0.2 3022452 12680 s003 S 3:25AM 0:00.14 /usr/local/bin/node ./worker.js
jacksontian 1439 0.0 0.2 3023476 12716 s003 S 3:25AM 0:00.14 /usr/local/bin/node ./worker.js
jacksontian 1438 0.0 0.2 3022452 12704 s003 S 3:25AM 0:00.14 /usr/local/bin/node ./worker.js
jacksontian 1437 0.0 0.2 3031668 12696 s003 S 3:25AM 0:00.15 /usr/local/bin/node ./worker.js
这就是主从架构了,在这里存在两个进程,master是主进程、worker是工作进程。这是典型的分布式架构用于并行业务处理的模式,具有较好的可伸缩性和稳定性。主进程不负责具体业务处理,只负责调度和管理工作进程,因此主进程是相对于稳定和简单的,工作进程负责具体的业务处理,因为,业务多种多样,所以,工作进程的稳定性,是我们需要考虑的。
通过fork复制的进程都是独立的,每个进程都有着独立而全新的v8实例,因此,需要至少30毫秒的启动时间和10mb左右的内存,但是,我们要记得fork进程是昂贵的,好在node在事件驱动的方式上,实现了单线程解决大并发的问题,这里启动多个进程只是为了充分将cpu资源利用起来,而不是为了解决并发的问题。
创建子进程
child_process模块给予了node随意创建子进程(child_process)的能力,它提供了4个方法用于创建子进程。
1.spawn():启动一个子进程来执行命令
2.exec():启动一个子进程来执行命令,与spawn()不同的是使用了不同的接口,它有一个回调函数获知子进程的状况。
3.execFile():启动一个子进程来执行可执行文件
4.fork():与spawn()类似,不同点在于,它创建node的子进程只需要指定要执行的js文件模块即可。
spawn()与exec()、execFile()不同的是,后两者创建时可指定timeout属性,设置超时时间,一旦创建的进程运行超过设定的时间进程将会被杀死。
exec()与execFile()不同的是,exec()适合执行已有的命令,execFile()适合执行文件。这里我们一node worker.js为例,来分别实现上述的4中方法。
var cp = require('child_process');
cp.spawn('node', ['worker.js']);
cp.exec('node worker.js', function (err, stdout, stderr) {
// some code
});
cp.execFile('worker.js', function (err, stdout, stderr) {
// some code
});
cp.fork('./worker.js');
以上四个方法在创建子进程后,均会返回子进程对象,他们的差别如下:
这里的可执行文件是指直接可以执行的,也就是*.exe或者.sh,如果是js文件,通过execFile()运行,那么这个文件的首行必须添加环境变量:#!/usr/bin/env node,尽管4种创建子进程的方式存在差别,但是事实上后面3种方法都是spawn()的延伸应用。
进程间通信
在master-worker模式中,要实现主进程管理和调度工作进程的功能,需要主进程和工作进程之间的通信,对于child_process模块,创建好了子进程,然后与父子进程间的通信是十分容易的。
在前端浏览器中,js主线程与ui渲染共用同一个线程,执行js的时候,ui渲染是停滞的渲染ui时,js是停滞的,两者互相阻塞。长时间执行js将会造成ui停滞不响应,为了解决这个问题,h5提出了webworker api,webworker允许创建工作线程并在后台运行,使得一些阻塞较为严重的计算不影响主线程上的ui渲染。
var worker = new Worker('worker.js');
worker.onmessage = function (event) {
document.getElementById('result').textContent = event.data;
};
//worker.js的内容
var n = 1;
search: while (true) {
n += 1;
for (var i = 2; i <= Math.sqrt(n); i += 1)
if (n i == 0) %
continue search;
// found a prime
postMessage(n);
}
主线程与工作线程之间通过onmessage()和postMessage()进程通信,子进程对象则由send()方法实现主进程向子进程发送数据,message事件实现收听子进程发来的数据,与api在一定程度上相似。通过消息传递,而不是共享或直接操纵相关资源,这是较为轻量和无依赖的做法。
我们看一下node当中来实现类似功能的例子
// parent.js
var cp = require('child_process');
var n = cp.fork(__dirname + '/sub.js');
n.on('message', function (m) {
console.log('PARENT got message:', m);
});
n.send({ hello: 'world' });
// sub.js
process.on('message', function (m) {
console.log('CHILD got message:', m);
});
process.send({ foo: 'bar' });
通过fork()或其他api创建子进程后,为了实现父子进程之间的通信,父进程与子进程之间将会创建IPC通道,通过IPC通道,父子进程之间才能通过message和send()传递消息。
进程间通信原理
IPC的全称是Inter-Process Communication,即进程间通信。进程间通信的目的是为了让不同的进程能够互相访问资源,并进程协调工作。实现进程间通信的技术有很多,如命名管道、匿名管道、socket、信号量、共享内存、消息队列、Domain Socket等,node中实现IPC通道的是管道技术(pipe)。
在node中管道是个抽象层面的称呼,具体细节实现由libuv提供,在win下是命名管道(named pipe)实现,在*nix下,采用unix Domain Socket来实现。
但是,具体在应用层面只是简单的message事件和send()方法,接口十分简洁和消息化。
父进程在实际创建子进程前,会创建IPC通道并监听它,然后才真正创建出子进程,并通过环境变量(NODE_CHANNEL_FD)告诉子进程这个IPC通信的文件描述符。子进程在启动的过程中,根据文件描述符去连接这个已存在的IPC通道,从而完成父子进程之间的连接。
建立连接之后的父子进程就可以自由的通信了,由于IPC通道是用命名管道或者Domain Socket创建的,他们与网络socket的行为比较类似,属于双向通道。不同的是他们在系统内核中就完了进程间的通信,而不经过实际的网络层,非常高效。在node中,IPC通道被抽象为stream对象,在调用send()时发送数据(类似于write()),接收到的消息会通过message事件(类似于data)触发给应用层。
注意:只有启动的子进程是node进程是,子进程才会根据环境变量去连接IPC通道,对于其他类型的子进程则无法自动实现进程间通信,需要让其他进程也按照约定去连接这个已经创建好的IPC通道才行。
句柄传递
建立好进程之间的IPC后,如果仅仅只用来发送一些简单的数据,显然不够我们的实际应用使用,还记得本章第一部分代码需要将启动的服务器分别监听各自的端口么,如果让服务都监听到相同的端口,将会有什么样的结果呢?
var http = require('http');
http.createServer(function (req, res) {
res.writeHead(200, {'Content-Type': 'text/plain'});
res.end('Hello World\n');
}).listen(8888, '127.0.0.1');
我们再次启动master.js,会报错:
events.js:72
throw er; // Unhandled 'error' event
^
Error: listen EADDRINUSE
at errnoException (net.js:884:11)
这时,只有一个工作进程能够监听该端口,其余进程都抛出了端口占用的错误。这个问题,破坏了我们将多个进程监听同一端口的想法,要解决这个问题,通常的做法是让每个进程监听不同的端口,其中主程序监听80端口,主进程对外接收请求,在将请求分别代理到不同的端口进程上。
这个其实就类似于负载均衡了,只要在代理上做好负载均衡的程序即可。但是,这种情况会大量浪费操作系统的文件描述符,因此,node0.5.9之后引入了进程间发送句柄的功能,send()方法除了能够通过IPC发送数据外还能发送句柄,第二个可选参数就是句柄:
child.send(message, [sendHandle])
句柄是一种可以用来标识资源的引用,它的内部包含了指向对象的文件描述符。因此,句柄可以用来标识一个服务端的socket对象、一个客户端的socket对象、一个udp套接字、一个管道等
这个句柄就解决了一个问题,我们可以去掉代理方案,在主进程接收到socket请求后,将这个socket直接发送给工作进程,而不重新与工作进程之间建立新的socket连接转发数据。我们来看一下代码实现:
//主进程代码
var child = require('child_process').fork('child.js');
// Open up the server object and send the handle
var server = require('net').createServer();
server.on('connection', function (socket) {
socket.end('handled by parent\n');
});
server.listen(1337, function () {
child.send('server', server);
});
//子进程代码
process.on('message', function (m, server) {
if (m === 'server') {
server.on('connection', function (socket) {
socket.end('handled by child\n');
});
}
});
这个示例中直接将一个tcp服务器发送给子进程,这是看起来不可思议的事情,我们来看一下效果:
// 先启动服务器
$ node parent.js
//然后,打开一个命令行窗口,用curl工具访问
$ curl "http://127.0.0.1:1337/"
handled by parent
$ curl "http://127.0.0.1:1337/"
handled by child
$ curl "http://127.0.0.1:1337/"
handled by child
$ curl "http://127.0.0.1:1337/"
handled by parent
通过结果可以看出,子进程和父进程都可以处理我们的客户端发起的请求。试试将服务发送到多个子进程上:
// parent.js
var cp = require('child_process');
var child1 = cp.fork('child.js');
var child2 = cp.fork('child.js');
// Open up the server object and send the handle
var server = require('net').createServer();
server.on('connection', function (socket) {
socket.end('handled by parent\n');
});
server.listen(1337, function () {
child1.send('server', server);
child2.send('server', server);
});
//然后打印出来
// child.js
process.on('message', function (m, server) {
if (m === 'server') {
server.on('connection', function (socket) {
socket.end('handled by child, pid is ' + process.pid + '\n');
});
}
});
再用curl测试一下:
$ curl "http://127.0.0.1:1337/"
handled by child, pid is 24673
$ curl "http://127.0.0.1:1337/"
handled by parent
$ curl "http://127.0.0.1:1337/"
handled by child, pid is 24672
这个可以在父进程和子进程之间来回处理了。现在这个是tcp层面的转化,我们之后选择用http层面来再次试试。
// parent.js
var cp = require('child_process');
var child1 = cp.fork('child.js');
var child2 = cp.fork('child.js');
// Open up the server object and send the handle
var server = require('net').createServer();
server.listen(1337, function () {
child1.send('server', server);
child2.send('server', server);
// 关掉
server.close();
});
//修改一下子进程
// child.js
var http = require('http');
var server = http.createServer(function (req, res) {
res.writeHead(200, { 'Content-Type': 'text/plain' });
res.end('handled by child, pid is ' + process.pid + '\n');
});
process.on('message', function (m, tcp) {
if (m === 'server') {
tcp.on('connection', function (socket) {
server.emit('connection', socket);
});
}
});
然后重新启动parent.js,测试一下效果:
$ curl "http://127.0.0.1:1337/"
handled by child, pid is 24852
$ curl "http://127.0.0.1:1337/"
handled by child, pid is 24851
这样一来,请求都是由子进程处理,看一下整个过程中,服务的过程发送了一次改变。
我们看一下图
主进程发送完句柄,并关闭监听之后,就变成了如下结构:
这样,就可以实现多个子进程可以同时监听相同端口,再没有EADDRINUSE的异常发生。
句柄发送与还原
看过了上边这个例子之后,我们会存在如下疑问,句柄发送跟我们直接将服务器对象发送给子进程有没有差别?它是否真的将服务器对象发送给了子进程?为什么它可以发送到多个子进程中?发送给子进程为什么父进程中还存在这个对象?接下来,我们将要细细讲解。
目前,子进程对象send()方法可以发送的句柄类型包括如下几种:
1.net.socket,tcp套接字
2.net.Server,tcp服务器,任意建立在tcp服务上的应用层服务都可以享受到它带来的好处。
3.net.Native,c++层面的tcp套接字或IPC管道。
4.dgram.socket,UDP套接字
5.dgram.Native,C++层面的UDP套接字
send()方法在将消息发送到IPC管道前,将消息组装成两个对象,一个参数是handle,另一个是message
{
cmd: 'NODE_HANDLE',
type: 'net.Server',
msg: message
}
发送到IPC管道中的实际上是我们要发送的句柄文件描述符,文件描述符实际上是一个整数值,这个message对象在写入到IPC通道时,也会通过JSON.stringify()进行序列化,所以最终发送到IPC通道中的信息都是字符串,send()方法能发送消息和句柄并不意味着它能发送任意对象。
连接了IPC通道的子进程可以读取到父进程发来的消息,将字符串通过JSON.parse()解析还原为对象后,才出发message事件将消息体传递给应用层使用,在这个过程中,消息对象还要被进行过滤处理,message.cmd的值如果以NODE_为前缀,它将响应一个内部事件internalMessage
如果message.cmd值为NODE_HANDLE,它将取出message.type的值和得到的文件描述符一起还原出一个对应的对象。这个过程的示意图如下:
以发送的tcp服务器句柄为例,子进程收到消息后的还原过程如下:
function(message, handle, emit) {
var self = this;
var server = new net.Server();
server.listen(handle, function() {
emit(server);
});
}
子进程根据message.typs创建对应tcp服务器对象,然后监听到文件描述符上,由于底层细节不被应用层感知,所以在子进程中,开发者会有一种服务器就是从父进程中直接传递过来的错觉。
值得注意的是,node进程之间只有消息传递,不会真正的传递对象,这种错觉是抽象封装的结果
端口共同监听
在node句柄发送的过程中,多个进程可以监听到相同的端口,而不引起EADDRINUSE异常,这是因为,我们独立启动的进程中,tcp服务端套接字socket的文件描述符并不相同,导致监听相同的端口时会抛出异常,但是node底层对每个端口监听都设置了SO_REUSEADDR选项,这个选项的涵义是不同进程可以就相同的网卡和端口进行监听,这个服务器端套接字可以被不同的进程复用:
setsockopt(tcp->io_watcher.fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on))
由于独立启动的进程互相之间并不知道文件描述符,所以监听相同端口就会失败,但对于send()发送的句柄还原出来的服务而言,他们的文件描述符是相同的,所以监听相同端口不会引起异常。
多个应用监听相同端口时,文件描述符同一时间只能被某个进程所用,换言之就是网络请求向服务器端发送时,只有一个幸运的进程能够抢到连接,也就是说只有他能为这个请求进行服务。这些进程也都是抢占式的。
小结
本节我们学习了可以帮助child_process模块在单机上搭建集群的技术,下一节,我们要看看是否可以扩展到多个机器上来搭建更大的集群。
集群稳定之路
在我们正在服务用户之前,我们还需要为集群考虑如下问题:
1.性能问题
2.多个工作进程的存活状态管理
3.工作进程的平滑重启
4.配置或者静态数据的动态重新载入
5.其他细节
进程事件
事件名 | 说明 |
---|---|
error | 当子进程无法被复制创建、无法被杀死、无法发送消息时会触发该事件 |
exit | 子进程退出时触发该事件,子进程如果是正常退出,这个事件的第一个参数为退出码,否则为null,如果进程是通过kill()方法被杀死的,会得到第二个参数,它表示杀死进程时的信号 |
close | 在子进程的标准输入输出流中止时触发该事件,参数与exit相同 |
disconnect | 在父进程或子进程中调用disconnect()方法时触发该事件,在调用该方法时将关闭监听IPC通道 |
除了send()外,还可以使用kill()向子进程发送消息,kill()并不能真正的将通过IPC相连的子进程杀死,它只是给子进程发送一个系统信号,默认情况下,父进程将通过kill()方法给子进程发送一个SIGTERM信号。它与进程默认的kill()方法类似:
// 子进程
child.kill([signal]);
// 当前进程
process.kill(pid, [signal]);
这个信号,一个发给子进程,一个发给目标进程,在POSIX标准中,有一套完备的信号系统,在命令行中执行kill -l可以看到详细的信号列表:
$ kill -l
1) SIGHUP 2) SIGINT 3) SIGQUIT 4) SIGILL
5) SIGTRAP 6) SIGABRT 7) SIGEMT 8) SIGFPE
9) SIGKILL 10) SIGBUS 11) SIGSEGV 12) SIGSYS
13) SIGPIPE 14) SIGALRM 15) SIGTERM 16) SIGURG
17) SIGSTOP 18) SIGTSTP 19) SIGCONT 20) SIGCHLD
21) SIGTTIN 22) SIGTTOU 23) SIGIO 24) SIGXCPU
25) SIGXFSZ 26) SIGVTALRM 27) SIGPROF 28) SIGWINCH
29) SIGINFO 30) SIGUSR1 31) SIGUSR2
node提供了这些信号的对应事件,每个进程都可以通过监听这些信号事件。这些信号事件是用来通知进程的,每个信号事件有不同的含义,进程在收到响应信号时,应当做出约定的行为,如SIGTERM是软件终止信号,进程收到该信号时应当退出:
process.on('SIGTERM', function() {
console.log('Got a SIGTERM, exiting...');
process.exit(1);
});
console.log('server running with PID:', process.pid);
process.kill(process.pid, 'SIGTERM');
自动重启
我们在主进程上添加自动重启子进程的功能代码:
// master.js
var fork = require('child_process').fork;
var cpus = require('os').cpus();
var server = require('net').createServer();
server.listen(1337);
var workers = {};
var createWorker = function () {
var worker = fork(__dirname + '/worker.js');
//退出时重新启动新的进程
worker.on('exit', function () {
console.log('Worker ' + worker.pid + ' exited.');
delete workers[worker.pid];
createWorker();
});
// 句柄转发
worker.send('server', server);
workers[worker.pid] = worker;
console.log('Create worker. pid: ' + worker.pid);
};
for (var i = 0; i < cpus.length; i++) {
createWorker();
}
// 进程自己退出时,让所有工作进程退出
process.on('exit', function () {
for (var pid in workers) {
workers[pid].kill();
}
});
//测试
$ node master.js
Create worker. pid: 30504
Create worker. pid: 30505
Create worker. pid: 30506
Create worker. pid: 30507
$ kill 30506
Worker 30506 exited.
Create worker. pid: 30518
...
然后我们增加代码来捕获和处理异常:
// worker.js
var http = require('http');
var server = http.createServer(function (req, res) {
res.writeHead(200, {'Content-Type': 'text/plain'});
res.end('handled by child, pid is ' + process.pid + '\n');
});
var worker;
process.on('message', function (m, tcp) {
if (m === 'server') {
worker = tcp;
worker.on('connection', function (socket) {
server.emit('connection', socket);
});
}
});
process.on('uncaughtException', function () {
// 停止接收新的连接
worker.close(function () {
// 所有已有连接断开后,退出进程
process.exit(1);
});
});
上述代码一旦有未捕获的异常出现,工作进程就会停止接收新的连接,当所以连接断开后,退出进程,主进程在监听到工作进程的exit后,将立即启动新的进程服务,以此保证整个集群中总是有进程再为用户服务。
自杀信号(suicide)
工作进程在得知要退出时,向主进程发送一个自杀信号,然后才停止接收新的连接,当所有连接断开后再退出。主进程在接收到自杀信号后,立即创建新的工作进程服务:
// worker.js
process.on('uncaughtException', function (err) {
process.send({act: 'suicide'});
// ཕኹ接收ႎ的接
worker.close(function () {
// 有ᅙ有接开ࢫLjཽ出进程
process.exit(1);
});
});
主进程将重庆工作进程的任务,从exit事件的处理函数中转移到message事件的处理函数中:
var createWorker = function () {
var worker = fork(__dirname + '/worker.js');
// 启动新的进程
worker.on('message', function (message) {
if (message.act === 'suicide') {
createWorker();
}
});
worker.on('exit', function () {
console.log('Worker ' + worker.pid + ' exited.');
delete workers[worker.pid];
});
worker.send('server', server);
workers[worker.pid] = worker;
console.log('Create worker. pid: ' + worker.pid);
};
我们来测试一下:
var server = http.createServer(function (req, res) {
res.writeHead(200, {'Content-Type': 'text/plain'});
res.end('handled by child, pid is ' + process.pid + '\n');
throw new Error('throw exception');
});
$ node master.js
Create worker. pid: 48595
Create worker. pid: 48596
Create worker. pid: 48597
Create worker. pid: 48598
$ curl http://127.0.0.1:1337/
handled by child, pid is 48598
Create worker. pid: 48602
Worker 48598 exited.
限量重启
// 重启次数
var limit = 10;
// 时间单位
var during = 60000;
var restart = [];
var isTooFrequently = function () {
// 记录重启时间
var time = Date.now();
var length = restart.push(time);
if (length > limit) {
// 取出最后10个记录
restart = restart.slice(limit * -1);
}
//最后一次重启到前10次重启之间的时间间隔
return restart.length >= limit && restart[restart.length - 1] - restart[0] < during;
};
var workers = {};
var createWorker = function () {
// 检查是否太过频繁
if (isTooFrequently()) {
// 触发giveup事件后,不再重启
process.emit('giveup', length, during);
return;
}
var worker = fork(__dirname + '/worker.js');
worker.on('exit', function () {
console.log('Worker ' + worker.pid + ' exited.');
delete workers[worker.pid];
});
// 重新启动新的进程
worker.on('message', function (message) {
if (message.act === 'suicide') {
createWorker();
}
});
// 句柄转发
worker.send('server', server);
workers[worker.pid] = worker;
console.log('Create worker. pid: ' + worker.pid);
};
负载均衡
node默认提供的机制是采用操作系统的抢占式策略,就是在一堆工作进程中,闲着的进程对到来的请求进行抢占,谁抢到,谁服务。但是,node的抢占策略是根据cpu的繁忙程度而定的,因此会出现IO繁忙,但是cpu空闲的情况。因此,node v0.11提供了一种新的策略,Round-Robin(轮叫调度)。轮叫调度由主进程接受连接,将其依次分发给工作进程,分发的策略是在N个工作进程中,每次选择第i=(i+1)modn个进程来发送连接。在cluster模块中启用它的方式如下:
// 启用Round-Robin
cluster.schedulingPolicy = cluster.SCHED_RR
// 不启用Round-Robin
cluster.schedulingPolicy = cluster.SCHED_NONE
//或者
export NODE_CLUSTER_SCHED_POLICY=rr
export NODE_CLUSTER_SCHED_POLICY=none
rr策略也可以用代理服务器来实现,但是会消耗正常两倍的文件描述符
状态共享
进程间的数据是不能共享的,但是,配置文件、session之类的数据应该是一致的。因此,一般采用第三方数据存储的方案进行功能扩展。也就利用db、文件、缓存来共享状态和数据。我们可以使用子进程定时轮询的方式来同步状态,这是用资源换功能的一种方式,会有大量的资源浪费、并发、数据延时等情况的出现。
另外一种就是主动通知,也就是减少轮询,让轮询只在消息队列层面出现,其他功能都基于事件的调度和触发来实现。我们将这种用来发送通知和查询状态是否更改的进程叫做通知进程,这个进程应该设计为,只进行轮询和通知,不处理任何业务逻辑。
cluster模块
node0.8后,在内核中增加了cluster模块,这是因为child_process要做单机集群需要处理的事情太多了,因此,才会给cluster这个核心模块。cluster可以更方便的解决多cpu的利用问题,同时也提供了较完善的api,用以处理进程的健壮性问题
我们看看如何用cluster来实现child_process做的事情(例如创建node集群),使用cluster做起来就非常轻松了。
// cluster.js
var cluster = require('cluster');
cluster.setupMaster({
exec: "worker.js"
});
var cpus = require('os').cpus();
for (var i = 0; i < cpus.length; i++) {
cluster.fork();
}
//执行 node cluster.js 就可以创建集群了
就官方文档而言,我们还可以这样写:
var cluster = require('cluster');
var http = require('http');
var numCPUs = require('os').cpus().length;
if (cluster.isMaster) {
// Fork workers
for (var i = 0; i < numCPUs; i++) {
cluster.fork();
}
cluster.on('exit', function (worker, code, signal) {
console.log('worker ' + worker.process.pid + ' died');
});
} else {
// Workers can share any TCP connection
// In this case its a HTTP server
http.createServer(function (req, res) {
res.writeHead(200);
res.end("hello world\n");
}).listen(8000);
}
在进程中判断是主进程还是工作进程,主要取决于环境变量中是否有NODE_UNIQUE_ID
cluster.isWorker = ('NODE_UNIQUE_ID' in process.env);
cluster.isMaster = (cluster.isWorker === false);
用于官方文档没有一致的判断 cluster.isWorker,因此,建议使用cluster.setupMaster(),将主进程和工作进程从代码上完全剥离。之后,通过cluster.setupMaster()来创建子进程,而不是使用cluster.fork()。
Cluster工作原理
cluster模块是child_process和net模块组合起来的一个功能封装,cluster启动时,会在内部启动TCP服务器(只能启动一个tcp服务),在cluster.fork()子进程时,将这个tcp服务器端socket的文件描述符发送给工作进程,如果进程是通过cluster.fork()复制出来的,那么他的环境变量里就存在NODE_UNIQUE_ID,如果工作进程中存在listen()监听网络端口的调用,它将拿到文件描述符,通过SO_REUSEADDR端口重用,从而实现多个子进程共享端口。对于,普通方式启动的进程,则不存在文件描述符传递共享等事情。在cluster模块中,一个主进程只能管理一组工作进程:
对比与child_process,自行通过child_process来操作进程的场景下,程序可以同时控制多组工作进程,因为,我们可以创建多组tcp服务,使得子进程可以共享多个服务器端的socket。
Cluster事件
也可以看出是child_process模块的事件封装
事件 | 说明 |
---|---|
fork | 复制一个工作进程后,触发该事件 |
online | 复制好一个工作进程后,工作进程主动发送一条online消息给主进程,主进程收到消息后,触发该事件 |
listening | 工作进程中调用listen()后,也就是共享了服务端的socket后,发送一条listening消息给主进程,主进程收到消息后,触发该事件 |
disconnect | 主进程和工作进程退出时触发该事件 |
exit | 有工作进程退出时触发该事件 |
setup | cluster.setupMaster()执行后触发该事件 |
总结
我们希望每个node进程只做好一件事就可以了,这样将复杂分解为简单,然后再通过许多的简单功能的组合完成复杂的功能。
虽然,我们学习了这些知识,但是在生产环境中,建议使用pm2这样的成熟工具来管理进程。另外,在node的进程管理之外,还需要用监听进程数量或监听日志的方式确保整个系统的稳定性,即使主进程出错退出,也能即使得到监控警报,使得开发者可以及时处理故障。