解析NodeJs中的流

解析NodeJS中的流

前言

作为前端开发同学,相信大家对下面的一段代码应该不会陌生:

gulp.src('client/templates/*.jade')
    .pipe(jade())
    .pipe(minify())
    .pipe(gulp.dest('build/minified_templates'));

这可能是项目构建里的一条命令,将指定目录下面的后缀为jade的文件,首先通过jade去解析,然后压缩,最后输出到指定目录。这一系列操作能够顺序进行,其实就是流在背后驱动。

本文会从代码角度,去剖析nodejs中常见的几种流的内部原理,最后我们自已实现一个类似上面那种pipeline式的调用。

  1. Readable
  2. Writable
  3. Duplex
  4. Transform

Readable

什么是可读流呢?

以生活中家里吃水打个比方,我们一般不会直接一次性从水井里打上很多水,因为一次用不完,时间放久会变质。我们的做法是,家里建个蓄水池,蓄水池通过水管连接到水泵,从井中抽水出来,然后再接根水管到蓄水池,供我们日常按需取水使用。

为保证水质,蓄水池蓄水不能太多(水位不能太高),够用一两天就行。满足这个条件的水位姑且称之为适量水位。一旦水位低于适量水位时,水泵开始向蓄水池中抽水,反之,停止抽水。

在可读流中,资源也是不会直接流向消费者,而是先被push到内部的的缓存池中,缓存池有一个水位标记highWatermark(简称hwm),当缓存池中的资源量state.length小于hwm时,资源会被不断push到缓存池中,直到达到这个阀值为至。

注:为行文方便,文中标有伪代码的地方是不可运行的,只保留部分片段代码,有地方代码层级有拉平,变量的名称,位置等和node源码也不严格相同。

可读流有流动和暂停两种模式,可读流能够流动起来的驱动力在其resume方法,如:

//伪代码

function flow(stream){
    const state=stream._readableState;
    state.flowing=true;
    while(state.flowing && stream.read() !== null);
}
Readable.prototype.resume=function(){
    //...
    flow(this);
    //...
}

可以看到,该方法将先将流转变成流动模式,然后在该模式下,不断地去消耗资源,直到资源耗尽或流转变成暂停模式为止。

下面来看看,在read方法里,资源是如何被消耗掉,又是如何被补充到缓存池中的:

//伪代码

function fromList(n,state){
   var ret;
   ret= /*从缓存池state.buffe中取数据*/
   return ret;
}
Readable.prototype.read=function(n){
    var state=this._readableState;
    //...
    this._read(state.highWaterMark);
    //...

    var ret=fromList(n,state);

    if(ret===null){
        //...
    }else{
        //标记消耗量
        state.length-=n;
    }

    //触发事件,发送数据
    if(ret !==null) this.emit('data',ret);

    return data;

};

如果能从缓存池中取到资源,将触发data事件,并发送资源数据。

另外,在消耗资源前,先试着向缓存池中补充资源。 _read是一个虚方法,是每个继承了Readable的类都实现的方法。在该方法里,则要调用push方法向缓存池中注入资源:

//伪代码

Readable.prototype.push=function(chunk){
    var stream=this;

    var  state=stream._readableState;

    if(state.flowing&&state.length===0&&!state.sync){
        stream.emit('data',chunk);
        stream.read(0);
    }else{
        //...
        //省略state.objectMode等情况
        state.buffer.push(chunk);
    }
}

初始状态下,监听可读流的data事件即可进入流动模式:

//伪代码
Readable.prototype.on=function(ev,fn){
    //...
    var state=this._readableState;
    if(ev==='data'){
        if(state.flowing!==false) this.resume();
    }else if(ev==='readable'){
        state.needReadable=true;

        //...
        this.emit('readable');
        flow(this);
    }
    //...
}

下面,我们实现一个Readable:

const stream = require('stream');

//模拟底层资源
const resource = ['www', 'jd', 'com'];

readable = stream.Readable({
    read: function () {
        if (resource.length) this.push(resource.shift());
        else this.push(null);
    }
});

readable.on('data', function (data) {
    console.log(data.toString());
});

/*
www
jd
com
*/

监听data事件,进入流动模式,_read方法取得底层资源,加入到缓存池,从缓存池中取到数据,触发data事件,并发送数据。输出如下:

但是在暂停模式下,流的驱动要readable事件和read方法来配合使用:

const stream = require('stream');

//模拟底层资源
const resource = ['www', 'jd', 'com'];

readable = stream.Readable({
    objectMode: true,
    read: function () {
        if (resource.length) this.push(resource.shift());
        else this.push(null);
    }
});

//state.flowing=false
readable.pause();
readable.on('data', function (data) {
    console.log(data.toString());
});

readable.on('readable', function () {
    while (null !== readable.read());
});

通过实现_read方法,可以让可读流自由对接不同的度层数据源,如,fs.createReadStream 中 :

//伪代码

//FileReadStream继承了Readable
FileReadStream.prototye._read=function(n){
    //..
    fs.read(this.fd,/*文件读取相关参数*/,funtion(err,bytesRead){
        if(!err){
            var b=/*读取的文件内容*/;
            this.push(b);
        }
    });
}

简单总结下:
  1. _read方法对接底层数据源,然后push数据到缓存池中
  2. read方法从缓存池中读取数据,或以data事件形式将数据发送给消费者

Writable

可写流可以作为可读流的消耗方,当可读流的生产过快时,会导致可写流"溢出",writable.write()返回false,此时要告知可读流暂停生产:readable.pause(),一旦可写流里的资源被消耗了,又要通知(以drain事件)可读流恢复生产(readable.resume()),其实这就是pipe方法的内部原理。

下面看看write方法的基本实现:

//伪代码

function onwriteStateUpdate(state){
    //...
    state.length-=state.writelen;
    state.writelen=0;
}
Writable.prototype.write=function(chunk,encoding,cb){
    var stream=this;
    var state=this._writableState;
    var ret=false;
    var len=state.objectMode?1:chunk.length;
    state.writelen=len;

    state.length+=len;
    
    //false为溢出
    ret=state.length<state.highWateMark;

    this._write(chunk,encoding,function(){
        //...

        onwriteStateUpdate(state);
        cb();
        if(state.length===0&&state.needDrain){
            state.needDrain=false;
            stream.emit('drain');
        }

    });


    return ret;
}

可以看到,在write内部调用了_write方法,将资源的消耗转交给了这个虚方法,在实现这个虚方法时,切记要调用第三参数标识的回调函数,在这里进行了资源消耗后状态更新等处理。

//伪代码

//FileWriteStream继承了Writeable
FileWriteStream.prototye._write=function(data,encoing,cb){
    //..
    fs.write(this.fd,/*写文件相关参数*/,funtion(err,bytes){
        if(!err){
            cb();
        }
    });
}

上面是fs.createWriteStream对_write的实现。

终于可以讲到pipe方法了,pipe其实是Readable的方法,原理上面已经讲过了,直接上图看代码吧:
//伪代码

Readable.prototype.pipe=function(dest,pipeOpts){
    var src=this;
    //...

    src.on('data',function(chunk){
        if(false===dest.write(chunk)){
            //...
            src.pause();
        }
    });

    dest.on('drain',function(src){
        var state=src._readableState;
        //...
        // src.resume()
        state.flowing=true;
        flow(src);
    });

    dest.emit('pipe',src);

    //...
}

好了,结合Readable,Writable,pipe三者写个例子:

const stream = require('stream');

var c = 0;
const readable = stream.Readable({
    highWaterMark: 4,
    read: function () {
        var data = c < 6 ? String.fromCharCode(c + 65) : null;
        ++c;
        //console.log('read: ', ++c, data);
        this.push(data);
    }
});

const writable = stream.Writable({
    highWaterMark: 2,
    write: function (chunk, encoding, done) {
        process.nextTick(() => {
            console.log('write: ', chunk.toString());
            done();
        });
    }
});

readable.pipe(writable);

/*
write: A
write: B
write: C
write: D
write: E
write: F
*/

Duplex

双工流,继承了Readable和Writable的流。该流可同时进行读和写,但是读写是分离的,读写的数据也是不能互通的。._read,_write两虚方法,在继承类中要被实现。

const stream = require('stream');

const resource = ['www', '.jd', '.com', '\n'];
const duplex = stream.Duplex({
    write: function (chunk, encoding, done) {
        console.log(chunk.toString().toUpperCase());
        done();
    },
    read: function () {
        resource.forEach(res => {
            res = res.toString();
            this.push(res);
            if (res === '\n') this.push(null);
        })
    }
});

process.stdin.pipe(duplex).pipe(process.stdout);

/*
www.jd.com
a
A

... 

*/

这个例子中,用pipe把这个双工流分别连接到标准输出、输入流,并自动适配读写端。等同于下面的写法:

process.stdin.pipe(duplex); duplex.pipe(process.stdout);

Transform

转换流,也是一种直通流,就是上面的Duplex流的基础上,将可写读端接受到的数据,在内部经过转换,作为可读端流的数据源。好比自来水流经过滤装置,在装置内部进行过滤处理后,再流出使用一样。

Transfom是Duplex的一种具体实现:

//伪代码

const Duplex = require('_stream_duplex');
const util = require('util');
util.inherits(Transform, Duplex);


function afterTransform(err,data){
    var ts=this._transformState;
    ts.transforming=false;
    //...

    ts.writechunk = null;
    ts.writecb = null;

    if(data != null) this.push(data);
    //...
    var rs=this._readableState;
    
    if(rs.needReadable || rs.length < rs.highWaterMark) {
        this._read(rs.highWaterMark);
    }
}
function Transform (options) {
    if(!(this instanceof Transform))
       return new Transform(options);
    
    this._transformState = {
        afterTransform: afterTransform.bind(this),
        //...
    };
    //...
}
Transform.prototype._write=function(chunk,encoding,cb){
    var ts=this._transformState;

    ts.writecb=cb;
    ts.writechunk=chunk;
    ts.writeencoding=encoding;
    
    if(!ts.transforming){
        var rs=this._readableState;
        if(ts.needTransform || ts.needReadable || rs.length<rs.highWaterMark){  
           this._read(rs.highWaterMark);
        }
    }

}

Transform.prototype._read=function(n){
    var ts=this._transformState;

    if(ts.writechunk!==null && ts.writecb && !ts.transforming){
        ts.transforming=true;
        this._transform(ts.writechunk,ts.writeencoding,ts.afterTransform);
    }

}

Transform.prototype._transform=function(chunk,encoding,cb){
    //对外曝露的虚方法
    //数据在这里被转换,然后在回调cb (afterTransform)里,通过push到可读端的缓存池中
}

Transform.prototype._flush = function() {
    //所有可写读的数据被转换到写读端时执行
}

在可写端的_write方法中,先将数据chunk临时保存到_transformState的writechunk上,然后调用可读端的_read方法,在_read方法里,取到_transformState.writechunk,然后将其传递给虚方法_transform,在这个方法里,完成数据的转换,并将转换好的数据传给回调函数cb,这个函数其实是内部预定好的afterTransform,它会完成各种状态处理,并将数据push到可读端的缓存中。

最后,综合前面所讲,实现一个pipeline式调用:

const stream = require('stream');

var resource = ['www', 'jd', 'com'];
var src = function (res) {
    return stream.Readable({
        read: function () {
            if (res.length) this.push(res.shift());
            else this.push(null);
        }
    });

};

var upper = function () {
    return stream.Transform({
        transform: function (chunk, encoding, done) {
            done(null, chunk.toString().toUpperCase());
        }
    });
};


var join = function () {
    var data = [];
    var _join = stream.Transform({
        transform: function (chunk, encoding, done) {
            data.push(chunk.toString());
            done();
        },
        flush: function () {
            this.push(data.join('.'));
        }
    });


    return _join;
};

var dest = function () {
    //return process.stdout;
    return stream.Writable({
        write: function (chunk, encoding, done) {
            console.log(chunk.toString());
            done();
        }
    });
};

src(resource).pipe(upper())
             .pipe(join())
             .pipe(dest());


/*
WWW.JD.COM
*/
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,686评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,668评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,160评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,736评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,847评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,043评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,129评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,872评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,318评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,645评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,777评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,470评论 4 333
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,126评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,861评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,095评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,589评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,687评论 2 351

推荐阅读更多精彩内容