node中的精髓Stream(流)

在前端工程化中产生了很多工具,例如grunt,gulp,webpack,babel...等等,这些工具都是通过node中的stream实现。
在node中stream也是非常非常非常重要的模块,比如我们常用的console就是基于stream的实例,还有net,http等核心模块都是基于stream来实现的,可见stream是多么的重要。

1.什么是stream?

是一种数据传输手段,从一个地方传输到另一个地方。

在写node的时候会存在读取文件,比如现在我们有一个非常大的文件,50G吧

    const fs = require('fs');
    // test文件50个G
    fs.readFileSync('./test.text');

这个时候需要消耗大量的时候去读取这个文件,然而我们可能关心的并不是文件所有内容,还会存在直接读取失败。stream就是为了解决这些问题而产生,我们读一些数据处理一些数据,当读到所关心数据的时候,则可以不再继续读取。

stream翻译成中文‘流’,就像水一样,从水龙头流向水杯。

2. Stream模块

stream继承于EventEmitter,拥有事件触发和事件监听功能。主要分为4种基本流类型:

  1. Readable (可读流)
  2. Writable (可写流)
  3. Duplex (读写流)
  4. Transform (转换流)

    在流中默认可操作的类型string和Buffer,如果需要处理其他类型的js值需要传入参数objectMode: true(默认为false)

在流中存在一个重要的概念,缓存区,就像拿水杯去接水,水杯就是缓存区,当水杯满,则会关闭水龙头,等把水杯里面的水消耗完毕,再打开水龙头去接水。

stream默认缓存区大小为16384(16kb),可以通过highWaterMark参数设置缓存区大小,但设置encoding后,以设置的字符编码为单位衡量。

3. Readable

首先创建一个可读流,可接收5个参数:

  • highWaterMark 缓存区字节大小,默认16384
  • encoding 字符编码,默认为null,就是buffer
  • objectMode 是否操作js其他类型 默认false
  • read 对内部的_read()方式实现 子类实现,父类调用
  • destroy 对内部的_ destroy()方法实现 子类实现,父类调用

可读流中分为2种模式流动模式暂停模式

监听data事件,触发流动模式,会源源不断生产数据触发data事件:

    const { Readable } = require('stream');
    
    let i = 0;
        
    const rs = Readable({
        encoding: 'utf8',
        // 这里传入的read方法,会被写入_read()
        read: (size) => {
            // size 为highWaterMark大小
            // 在这个方法里面实现获取数据,读取到数据调用rs.push([data]),如果没有数据了,push(null)结束流
            if (i < 10) {
                rs.push(`当前读取数据: ${i++}`);
            } else {
                rs.push(null);
            }
        },
        // 源代码,可覆盖
        destroy(err, cb) {
            rs.push(null);
            cb(err);
        }
    });
        
    rs.on('data', (data) => {
        console.log(data);
        // 每次push数据则触发data事件
        // 当前读取数据: 0
        // 当前读取数据: 1
        // 当前读取数据: 2
        // 当前读取数据: 3
        // 当前读取数据: 4
        // 当前读取数据: 5
        // 当前读取数据: 6
        // 当前读取数据: 7
        // 当前读取数据: 8
        // 当前读取数据: 9
    })

监听readable事件,触发暂停模式,当流有了新数据或到了流结束之前触发readable事件,需要显示调用read([size])读取数据:

    const { Readable } = require('stream');
        
    let i = 0;
        
    const rs = Readable({
        encoding: 'utf8',
        highWaterMark: 9,
        // 这里传入的read方法,会被写入_read()
        read: (size) => {
            // size 为highWaterMark大小
            // 在这个方法里面实现获取数据,读取到数据调用rs.push([data]),如果没有数据了,push(null)结束流
            if (i < 10) {
              // push其实是把数据放入缓存区
              rs.push(`当前读取数据: ${i++}`);
            } else {
                rs.push(null);
            }
        }
    });
    
    rs.on('readable', () => {
        const data = rs.read(9);
        console.log(data);
        // 
    })

read([size]) size参数:

  • 不传代表读取缓存区所有数据。
  • 传入0 填充缓存区, 但返回null
  • size < 当前缓存区数据 返回所需数据
  • size > 当前缓存区数据 返回null 并改变highWaterMark值

这里的缓存区数据不是指highWaterMark,获取缓存区数据大小rs._readableState.length。

流的模式可以自由切换: 通过rs._readableState.flowing的值获取当前状态

  • null 初始状态
  • false 暂停模式
  • true 流动模式

rs.pause()切换到暂停模式 rs.resume()切换到流动模式

在可读流里面还可以监听其他事件:

    
    rs.on('close', () => {
        // 流关闭时或文件关闭时触发
    })
    
    rs.on('end', () => {
        // 在流中没有数据可供消费时触发
    })
    
    rs.on('error', (err) => {
        // 发生错误时候
    })

4. Writable

可写流可接受参数:

  • highWaterMark 缓存区字节大小,默认16384
  • decodeStrings 是否将字符编码传入缓冲区
  • objectMode 是否操作js其他类型 默认false
  • write 子类实现,供父类调用 实现写入底层数据
  • writev 子类实现,供父类调用 一次处理多个chunk写入底层数据
  • destroy 可以覆盖父类方法,不能直接调用,销毁流时,父类调用
  • final 完成写入所有数据时父类触发

在实现流除了用上面直接传入参数的方式,还可以用继承类

class WS extends stream.Writable {
    constructor() {
        super({
            highWaterMark: 1
        });
    }

    _write(chunk, encoding, cb) {
        console.log(this._writableState.length);
        // chunk 为需要写入的数据
        // encoding 字符编码
        // cb 回调函数, 如果写入成功需要调用cb去执行下一次写入,如果发生错误,可以cb(new Error([错误信息]))
        if (chunk.length < 4) {
            fs.writeFileSync('./2.text', chunk, {
                flag: 'a'
            });
            cb();
        } else{
            cb(new Error('超出4个字节'));
        }
    }
}

const ws = new WS();

let i = 0;
function next() {
    let flag = true;

    // write() 会返回boolean false -> 缓存区没满 true —> 已满,需要暂停写入数据
    while(i < 10 && flag) {
        flag = ws.write(`${i++}`);
        console.log('flag', flag);
    }
}

next();

// 当所有缓存区数据已经成功写入底层数据,缓存区没有数据了,触发drain事件
ws.on('drain', () => {
    console.log('drain');
    // 继续写入缓存区数据
    next();
})

可写流的end事件,一旦触发end事件,后续不能再写入数据.

    ws.write('start');
    ws.end('end');
    ws.wrtie('test'); // 报错 write after end

finish事件:

    ws.write('start');
    ws.end('end');
    ws.on('finish', () => {
        console.log('调用end方法后,并且所有数据已经写入底层')
    })

cork()与uncork(),强制所有数据先写入缓存区,直到调用uncork()或end(),这时一并写入底层:

    const ws = stream.Writable({
        writev(chunks, encoding, cb) {
            // 这时chunks为一个数组,包含所有的chunk
        
            // 现在length为10
            console.log(chunk.length);
        }
    });
    
    // 写入数据之前,强制写入数据放入缓存区
    ws.cork();
    
    // 写入数据
    for (let i = 0; i < 10; i++) {
        ws.write(i.toString());
    }
    
    // 写入完毕,可以触发写入底层
    ws.uncork();

5. Duplex

读写流,该方法继承了可写流和可读流,但相互之间没有关系,各自独立缓存区,拥有Writable和Readable所有方法和事件,同时实现_read()和_write()方法。

    const fs = require('fs');
    const stream = require('stream');
    
    const duplex = stream.Duplex({
        write(chunk, encoding, cb) {
            console.log(chunk.toString('utf8')); // 写入
        },
        read() {
            this.push('读取');
            this.push(null);
        }
    });
    
    console.log(duplex.read(6).toString('utf8')); // 读取
    
    duplex.write('写入');

6. Transform

转换流,这个流在前端工程化中用到最多,从一个地方读取数据,转换数据后输出到一个地方,该流继承于Duplex。

    const fs = require('fs');
    const stream = require('stream');
    
    const transform = stream.Transform({
        transform(chunk, encoding, cb){
            // 把数据转换成大写字母,然后push到缓存区
            this.push(chunk.toString().toUpperCase());
            cb();
        }
    });
    
    transform.write('a');
    
    console.log(transform.read(1).toString()); // A

7. fs快速创建可读/可写流

可读流和可写流都需要我们去实现父类的方法,那么fs这个模块帮我们做了这件事情,fs里面实现了高效并且可靠的可读/可写流,提供快速创建流,不再去实现父类_write()或_read()。下面我们来看看如何使用:

    const fs = require('fs');
    
    /**
     * 创建可读流
     *  
     *  第一个参数文件路径
     * 
     *  第二个参数为options
     * 
        flags?: string;      
        encoding?: string;   字符编码
        fd?: number;     文件打开后的标识符
        mode?: number;    文件的权限
        autoClose?: boolean;   读取完毕后,是否自动关闭文件
        start?: number;  从哪个位置开始读取
        end?: number;    读到什么时候结束
        highWaterMark?: number;   最高水位线
     */
    const rs = fs.createReadStream('1.text');
    
    rs.on('data', data => {
        console.log(data);
    })
    
    /**
     * 创建可写流
     *  
     *  第一个参数文件路径
     * 
     *  第二个参数为options
     * 
        flags?: string;  
        encoding?: string; 字符编码
        fd?: number;    文件打开后的标识符
        mode?: number;   文件的权限
        autoClose?: boolean;  写入完毕后,是否自动关闭文件
        start?: number;  从什么位置开始写入
     */
    const ws = fs.createWriteStream('2.text');
    
    ws.write('123');

8. pipe

在流中搭建一条管道,从可读流中到可写流。

可读流中有pipe()方法,在可写流中可以监听pipe事件,下面实现了从可读流中通过管道到可写流:

    const fs = require('fs');
    const stream = require('stream');
    
    const rs = stream.Readable({
        read() {
            this.push(fs.readFileSync('./1.text')); // 文件内容 test
            this.push(null);
        }
    });
    
    const ws = stream.Writable({
        write(chunk, encoding, cb) {
            // chunk为test buffer
            fs.writeFileSync('./2.text', chunk.toString());
            cb();
        }
    });
    
    ws.on('pipe', data => {
        // 触发pipe事件
        console.log(data);
    });
    
    rs.pipe(ws);

9. 总结

流分为四种基本类型,两种模式。流中的数据不是直接写入或读取,有缓存区的概念。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,686评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,668评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,160评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,736评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,847评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,043评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,129评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,872评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,318评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,645评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,777评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,470评论 4 333
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,126评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,861评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,095评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,589评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,687评论 2 351

推荐阅读更多精彩内容

  • stream 流是一个抽象接口,在 Node 里被不同的对象实现。例如 request to an HTTP se...
    明明三省阅读 3,398评论 1 10
  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,637评论 18 139
  • 流是Node中最重要的组件和模式之一。在社区里有一句格言说:让一切事务流动起来。这已经足够来描述在Node中流...
    宫若石阅读 547评论 0 0
  • 写在最前 本次试图浅析探索Nodejs的Stream模块中对于Readable类的一部分实现(可写流也差不多)。其...
    Annnnnn阅读 811评论 0 0
  • 文/四月丽人 当鸟鸣穿透纱帘 我仿佛听见你欢快的呼唤 浅浅一个微笑 足以融化一冬的寂寒 有一种陪伴 穿越红尘的暖 ...
    四月丽人阅读 100评论 0 4