解析NodeJS中的流
前言
作为前端开发同学,相信大家对下面的一段代码应该不会陌生:
gulp.src('client/templates/*.jade')
.pipe(jade())
.pipe(minify())
.pipe(gulp.dest('build/minified_templates'));
这可能是项目构建里的一条命令,将指定目录下面的后缀为jade的文件,首先通过jade去解析,然后压缩,最后输出到指定目录。这一系列操作能够顺序进行,其实就是流在背后驱动。
本文会从代码角度,去剖析nodejs中常见的几种流的内部原理,最后我们自已实现一个类似上面那种pipeline式的调用。
- Readable
- Writable
- Duplex
- Transform
Readable
什么是可读流呢?
以生活中家里吃水打个比方,我们一般不会直接一次性从水井里打上很多水,因为一次用不完,时间放久会变质。我们的做法是,家里建个蓄水池,蓄水池通过水管连接到水泵,从井中抽水出来,然后再接根水管到蓄水池,供我们日常按需取水使用。
为保证水质,蓄水池蓄水不能太多(水位不能太高),够用一两天就行。满足这个条件的水位姑且称之为适量水位。一旦水位低于适量水位时,水泵开始向蓄水池中抽水,反之,停止抽水。
在可读流中,资源也是不会直接流向消费者,而是先被push到内部的的缓存池中,缓存池有一个水位标记highWatermark(简称hwm),当缓存池中的资源量state.length小于hwm时,资源会被不断push到缓存池中,直到达到这个阀值为至。
注:为行文方便,文中标有伪代码的地方是不可运行的,只保留部分片段代码,有地方代码层级有拉平,变量的名称,位置等和node源码也不严格相同。
可读流有流动和暂停两种模式,可读流能够流动起来的驱动力在其resume方法,如:
//伪代码
function flow(stream){
const state=stream._readableState;
state.flowing=true;
while(state.flowing && stream.read() !== null);
}
Readable.prototype.resume=function(){
//...
flow(this);
//...
}
可以看到,该方法将先将流转变成流动模式,然后在该模式下,不断地去消耗资源,直到资源耗尽或流转变成暂停模式为止。
下面来看看,在read方法里,资源是如何被消耗掉,又是如何被补充到缓存池中的:
//伪代码
function fromList(n,state){
var ret;
ret= /*从缓存池state.buffe中取数据*/
return ret;
}
Readable.prototype.read=function(n){
var state=this._readableState;
//...
this._read(state.highWaterMark);
//...
var ret=fromList(n,state);
if(ret===null){
//...
}else{
//标记消耗量
state.length-=n;
}
//触发事件,发送数据
if(ret !==null) this.emit('data',ret);
return data;
};
如果能从缓存池中取到资源,将触发data事件,并发送资源数据。
另外,在消耗资源前,先试着向缓存池中补充资源。 _read是一个虚方法,是每个继承了Readable的类都实现的方法。在该方法里,则要调用push方法向缓存池中注入资源:
//伪代码
Readable.prototype.push=function(chunk){
var stream=this;
var state=stream._readableState;
if(state.flowing&&state.length===0&&!state.sync){
stream.emit('data',chunk);
stream.read(0);
}else{
//...
//省略state.objectMode等情况
state.buffer.push(chunk);
}
}
初始状态下,监听可读流的data事件即可进入流动模式:
//伪代码
Readable.prototype.on=function(ev,fn){
//...
var state=this._readableState;
if(ev==='data'){
if(state.flowing!==false) this.resume();
}else if(ev==='readable'){
state.needReadable=true;
//...
this.emit('readable');
flow(this);
}
//...
}
下面,我们实现一个Readable:
const stream = require('stream');
//模拟底层资源
const resource = ['www', 'jd', 'com'];
readable = stream.Readable({
read: function () {
if (resource.length) this.push(resource.shift());
else this.push(null);
}
});
readable.on('data', function (data) {
console.log(data.toString());
});
/*
www
jd
com
*/
监听data事件,进入流动模式,_read方法取得底层资源,加入到缓存池,从缓存池中取到数据,触发data事件,并发送数据。输出如下:
但是在暂停模式下,流的驱动要readable事件和read方法来配合使用:
const stream = require('stream');
//模拟底层资源
const resource = ['www', 'jd', 'com'];
readable = stream.Readable({
objectMode: true,
read: function () {
if (resource.length) this.push(resource.shift());
else this.push(null);
}
});
//state.flowing=false
readable.pause();
readable.on('data', function (data) {
console.log(data.toString());
});
readable.on('readable', function () {
while (null !== readable.read());
});
通过实现_read方法,可以让可读流自由对接不同的度层数据源,如,fs.createReadStream 中 :
//伪代码
//FileReadStream继承了Readable
FileReadStream.prototye._read=function(n){
//..
fs.read(this.fd,/*文件读取相关参数*/,funtion(err,bytesRead){
if(!err){
var b=/*读取的文件内容*/;
this.push(b);
}
});
}
简单总结下:- _read方法对接底层数据源,然后push数据到缓存池中
- read方法从缓存池中读取数据,或以data事件形式将数据发送给消费者
Writable
可写流可以作为可读流的消耗方,当可读流的生产过快时,会导致可写流"溢出",writable.write()返回false,此时要告知可读流暂停生产:readable.pause(),一旦可写流里的资源被消耗了,又要通知(以drain事件)可读流恢复生产(readable.resume()),其实这就是pipe方法的内部原理。下面看看write方法的基本实现:
//伪代码
function onwriteStateUpdate(state){
//...
state.length-=state.writelen;
state.writelen=0;
}
Writable.prototype.write=function(chunk,encoding,cb){
var stream=this;
var state=this._writableState;
var ret=false;
var len=state.objectMode?1:chunk.length;
state.writelen=len;
state.length+=len;
//false为溢出
ret=state.length<state.highWateMark;
this._write(chunk,encoding,function(){
//...
onwriteStateUpdate(state);
cb();
if(state.length===0&&state.needDrain){
state.needDrain=false;
stream.emit('drain');
}
});
return ret;
}
可以看到,在write内部调用了_write方法,将资源的消耗转交给了这个虚方法,在实现这个虚方法时,切记要调用第三参数标识的回调函数,在这里进行了资源消耗后状态更新等处理。
//伪代码
//FileWriteStream继承了Writeable
FileWriteStream.prototye._write=function(data,encoing,cb){
//..
fs.write(this.fd,/*写文件相关参数*/,funtion(err,bytes){
if(!err){
cb();
}
});
}
上面是fs.createWriteStream对_write的实现。
终于可以讲到pipe方法了,pipe其实是Readable的方法,原理上面已经讲过了,直接上图看代码吧://伪代码
Readable.prototype.pipe=function(dest,pipeOpts){
var src=this;
//...
src.on('data',function(chunk){
if(false===dest.write(chunk)){
//...
src.pause();
}
});
dest.on('drain',function(src){
var state=src._readableState;
//...
// src.resume()
state.flowing=true;
flow(src);
});
dest.emit('pipe',src);
//...
}
好了,结合Readable,Writable,pipe三者写个例子:
const stream = require('stream');
var c = 0;
const readable = stream.Readable({
highWaterMark: 4,
read: function () {
var data = c < 6 ? String.fromCharCode(c + 65) : null;
++c;
//console.log('read: ', ++c, data);
this.push(data);
}
});
const writable = stream.Writable({
highWaterMark: 2,
write: function (chunk, encoding, done) {
process.nextTick(() => {
console.log('write: ', chunk.toString());
done();
});
}
});
readable.pipe(writable);
/*
write: A
write: B
write: C
write: D
write: E
write: F
*/
Duplex
双工流,继承了Readable和Writable的流。该流可同时进行读和写,但是读写是分离的,读写的数据也是不能互通的。._read,_write两虚方法,在继承类中要被实现。
const stream = require('stream');
const resource = ['www', '.jd', '.com', '\n'];
const duplex = stream.Duplex({
write: function (chunk, encoding, done) {
console.log(chunk.toString().toUpperCase());
done();
},
read: function () {
resource.forEach(res => {
res = res.toString();
this.push(res);
if (res === '\n') this.push(null);
})
}
});
process.stdin.pipe(duplex).pipe(process.stdout);
/*
www.jd.com
a
A
...
*/
这个例子中,用pipe把这个双工流分别连接到标准输出、输入流,并自动适配读写端。等同于下面的写法:
process.stdin.pipe(duplex); duplex.pipe(process.stdout);
Transform
转换流,也是一种直通流,就是上面的Duplex流的基础上,将可写读端接受到的数据,在内部经过转换,作为可读端流的数据源。好比自来水流经过滤装置,在装置内部进行过滤处理后,再流出使用一样。
Transfom是Duplex的一种具体实现:
//伪代码
const Duplex = require('_stream_duplex');
const util = require('util');
util.inherits(Transform, Duplex);
function afterTransform(err,data){
var ts=this._transformState;
ts.transforming=false;
//...
ts.writechunk = null;
ts.writecb = null;
if(data != null) this.push(data);
//...
var rs=this._readableState;
if(rs.needReadable || rs.length < rs.highWaterMark) {
this._read(rs.highWaterMark);
}
}
function Transform (options) {
if(!(this instanceof Transform))
return new Transform(options);
this._transformState = {
afterTransform: afterTransform.bind(this),
//...
};
//...
}
Transform.prototype._write=function(chunk,encoding,cb){
var ts=this._transformState;
ts.writecb=cb;
ts.writechunk=chunk;
ts.writeencoding=encoding;
if(!ts.transforming){
var rs=this._readableState;
if(ts.needTransform || ts.needReadable || rs.length<rs.highWaterMark){
this._read(rs.highWaterMark);
}
}
}
Transform.prototype._read=function(n){
var ts=this._transformState;
if(ts.writechunk!==null && ts.writecb && !ts.transforming){
ts.transforming=true;
this._transform(ts.writechunk,ts.writeencoding,ts.afterTransform);
}
}
Transform.prototype._transform=function(chunk,encoding,cb){
//对外曝露的虚方法
//数据在这里被转换,然后在回调cb (afterTransform)里,通过push到可读端的缓存池中
}
Transform.prototype._flush = function() {
//所有可写读的数据被转换到写读端时执行
}
在可写端的_write方法中,先将数据chunk临时保存到_transformState的writechunk上,然后调用可读端的_read方法,在_read方法里,取到_transformState.writechunk,然后将其传递给虚方法_transform,在这个方法里,完成数据的转换,并将转换好的数据传给回调函数cb,这个函数其实是内部预定好的afterTransform,它会完成各种状态处理,并将数据push到可读端的缓存中。
最后,综合前面所讲,实现一个pipeline式调用:
const stream = require('stream');
var resource = ['www', 'jd', 'com'];
var src = function (res) {
return stream.Readable({
read: function () {
if (res.length) this.push(res.shift());
else this.push(null);
}
});
};
var upper = function () {
return stream.Transform({
transform: function (chunk, encoding, done) {
done(null, chunk.toString().toUpperCase());
}
});
};
var join = function () {
var data = [];
var _join = stream.Transform({
transform: function (chunk, encoding, done) {
data.push(chunk.toString());
done();
},
flush: function () {
this.push(data.join('.'));
}
});
return _join;
};
var dest = function () {
//return process.stdout;
return stream.Writable({
write: function (chunk, encoding, done) {
console.log(chunk.toString());
done();
}
});
};
src(resource).pipe(upper())
.pipe(join())
.pipe(dest());
/*
WWW.JD.COM
*/