Node.js Stream(流) 简单易懂全解析

一.node.js中的流是什么

       stream(流)是Node.js提供的又一个仅在服务区端可用的模块,流是一种抽象的数据结构。Stream 是一个抽象接口,Node 中有很多对象实现了这个接口。例如,对http 服务器发起请求的request 对象就是一个 Stream,还有stdout(标准输出流)。
       顾名思义,流的意思就是数据的流动,就好比停水了,楼上的人存了一些水,楼下的人发请求想借楼上的人一桶水,直接搬费力又麻烦,直接往下倒容易倒到地上,于是可以用一根管子连接两个桶(A和B),楼上的人直接把A桶的里水通过管子流到楼下的B桶里,这就类似于request 对象向服务器发请求要资源,在这request 请求资源的传播方式通过流来实现。
       再举个例子,可以把数据看成是数据流,如果我们在键盘上打字,电脑程序把字符一个一个输出到屏幕上,这也可以看成是一个流,这个可以叫做标准输出(stdout)。

二.为什么要在node中使用流

看了前面稍微了解node的同学可能就要问了,流的作用不就是传递数据麽,也就是把一个地方数据拷贝到另一个地方,不用流也可以这样实现:

var water = fs.readFileSync('a.txt', {encoding: 'utf8'});
fs.writeFileSync('b.txt', water);

是的,只要使用node的读写文件的功能就能实现上面借水的效果,但这样做有个致命问题:

  • 处理数据量较大的文件时不能分块处理,导致速度慢,内存容易爆满。

使用读写方式是把文件内容全部读入内存,然后再写入文件,对于小型的文本文件问题不大,但是遇到较大的比如音频、视频文件,动辄几个GB大小实在承受不住,而流可以把文件资源拆分成小块,一块一块的运输,资源就像水流一样进行传输,使用流的话上述功能可以这样写:

var fs = require('fs');
var readStream = fs.createReadStream('a.mp4'); // 创建可读流
var writeStream = fs.createWriteStream('b.mp4'); // 创建可写流

readStream.on('data', function(chunk) { // 当有数据流出时,写入数据
    writeStream.write(chunk);
});

readStream.on('end', function() { // 当没有数据时,关闭数据流
    writeStream.end();
});
  • 但这样写还是有一些问题的,如果说写入的速度跟不上读取的速度,有可能导致数据丢失。正常的情况应该是,写完一段,再读取下一段,如果没有写完的话,就让读取流先暂停,等写完再继续,所以为了让可读流和可写流速度一致,就要用到流中必不可少的属性pipe了,pipe翻译过来意思是管道,顾名思义,就想上面的倒水一样,如果不用一根管子相连,A桶倒进B桶的水不会均速传输,可能会导致水的浪费,用pipe可以这样解决上述问题:
fs.createReadStream('a.mp4').pipe(fs.createWriteStream('b.mp4));
// pipe自动调用了data,end等事件
  • 需要特别注意的是,pipe()只是可读流的方法,也就是说只能从可读流中通过pipe方法拷贝数据到可写流,反之则不行,写的时候要注意顺序。

三.流的四种类型

Stream提供了以下四种类型的流:

  1. Readable 可读流
  2. Writable 可写流
  3. Duplex 可读可写流
  4. Transform 在读写过程中可以修改和变换数据的Duplex流

1.Readable
可读流有五个参数:

  • highWaterMark 缓存区字节大小,默认16384
  • encoding 字符编码,默认为null,就是buffer
  • objectMode 是否操作js其他类型 默认false
  • read 对内部的_read()方式实现 子类实现,父类调用
  • destroy 对内部的_ destroy()方法实现 子类实现,父类调用

可读流中分为2种模式流动模式和暂停模式。

1、流动模式:可读流自动读取数据,通过EventEmitter接口的事件尽快将数据提供给应用。
2、暂停模式:必须显式调用stream.read()方法来从流中读取数据片段。

暂停模式切换到流动模式i:

1、监听“data”事件
2、调用 stream.resume()方法
3、调用 stream.pipe()方法将数据发送到可写流

流动模式切换到暂停模式:

1、如果不存在管道目标,调用stream.pause()方法
2、如果存在管道目标,调用 stream.unpipe()并取消'data'事件监听
可读流事件:'data','readable','error','close','end'

监听data事件,触发流动模式

const { Readable } = require('stream');
   
    let i = 0;
       
    const rs = Readable({
        encoding: 'utf8',
        // 这里传入的read方法,会被写入_read()
        read: (size) => {
            // size 为highWaterMark大小
            // 在这个方法里面实现获取数据,读取到数据调用rs.push([data]),如果没有数据了,push(null)结束流
            if (i < 6) {
                rs.push(`当前读取数据: ${i++}`);
            } else {
                rs.push(null);
            }
        },
        // 源代码,可覆盖
        destroy(err, cb) {
            rs.push(null);
            cb(err);
        }
    });
        
    rs.on('data', (data) => {
        console.log(data);
        // 每次push数据则触发data事件

监听readable事件,触发暂停模式,当流有了新数据或到了流结束之前触发readable事件,需要显示调用read([size])读取数据:

    const { Readable } = require('stream');
        
    let i = 0;
        
    const rs = Readable({
        encoding: 'utf8',
        highWaterMark: 9,
        // 这里传入的read方法,会被写入_read()
        read: (size) => {
            // size 为highWaterMark大小
            // 在这个方法里面实现获取数据,读取到数据调用rs.push([data]),如果没有数据了,push(null)结束流
            if (i < 10) {
              // push其实是把数据放入缓存区
              rs.push(`当前读取数据: ${i++}`);
            } else {
                rs.push(null);
            }
        }
    });
    
    rs.on('readable', () => {
        const data = rs.read(9);
        console.log(data);
        // 
    })

2. Writable
可写流有以下参数:

highWaterMark 缓存区字节大小,默认16384
decodeStrings 是否将字符编码传入缓冲区
objectMode 是否操作js其他类型 默认false
write 子类实现,供父类调用 实现写入底层数据
writev 子类实现,供父类调用 一次处理多个chunk写入底层数据
destroy 可以覆盖父类方法,不能直接调用,销毁流时,父类调用
final 完成写入所有数据时父类触发

const Writable = require('stream').Writable

const writable = Writable()
// 实现`_write`方法
// 这是将数据写入底层的逻辑
writable._write = function (data, enc, next) {
  // 将流中的数据写入底层
  process.stdout.write(data.toString().toUpperCase())
  // 写入完成时,调用`next()`方法通知流传入下一个数据
  process.nextTick(next)
}

// 所有数据均已写入底层
writable.on('finish', () => process.stdout.write('DONE'))

// 将一个数据写入流中
writable.write('a' + '\n')
writable.write('b' + '\n')
writable.write('c' + '\n')

// 再无数据写入流时,需要调用`end`方法
writable.end()

3. Duplex
Duplex为读写流,既可当成可读流来使用,也可当成可写流来使用,实际上就是继承了Readable和Writable的一类流。所以,Duplex拥有Writable和Readable所有方法和事件,但各自独立缓存区,一个Duplex对象可以同时实现_read()和_write()方法。

var Duplex = require('stream').Duplex

var duplex = Duplex()

// 可读端底层读取逻辑
duplex._read = function () {
  this._readNum = this._readNum || 0
  if (this._readNum > 1) {
    this.push(null)
  } else {
    this.push('' + (this._readNum++))
  }
}

// 可写端底层写逻辑
duplex._write = function (buf, enc, next) {
  // a, b
  process.stdout.write('_write ' + buf.toString() + '\n')
  next()
}

// 0, 1
duplex.on('data', data => console.log('ondata', data.toString()))

duplex.write('a')
duplex.write('b')

duplex.end()

4. Transform
Tranform为转换流,它继承自Duplex,并已经实现了_read和_write方法,同时要求用户实现一个_transform方法,从一个地方读取数据,转换数据后输出到一个地方。

    const stream = require('stream');
    
    const transform = stream.Transform({
        transform(chunk, encoding, cb){
            // 把数据转换成小写字母,然后push到缓存区
            this.push(chunk.toString().toLowerCase());
            cb();
        }
    });
    
    transform.write('D');
    
    console.log(transform.read(1).toString()); // d

四.stream中的pipe

前面已经说过,pipe的作用是在流中搭建一条管道,从可读流中到可写流,目的是实现读取和写入步调一致,边读边写。

   const stream = require('stream');
    
    const readStream = stream.Readable({
        read() {
            this.push(fs.readFileSync('a.txt')); // 文件内容 test
            this.push(null);
        }
    });
    
    const writeStream = stream.Writable({
        write(chunk, encoding, cb) {
            // chunk为test buffer
            fs.writeFileSync('b.txt', chunk.toString());
            cb();
        }
    });
    
    writeStream.on('pipe', data => {
        // 触发pipe事件
        console.log(data);
    });
    
    readStream.pipe(writeStream);
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,186评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,858评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,620评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,888评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,009评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,149评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,204评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,956评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,385评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,698评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,863评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,544评论 4 335
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,185评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,899评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,141评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,684评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,750评论 2 351

推荐阅读更多精彩内容

  • stream 流是一个抽象接口,在 Node 里被不同的对象实现。例如 request to an HTTP se...
    明明三省阅读 3,400评论 1 10
  • 一、什么是Stream(流) 流(stream)在 Node.js 中是处理流数据的抽象接口(abstract i...
    Brolly阅读 5,388评论 0 0
  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,642评论 18 139
  • 流是Node中最重要的组件和模式之一。在社区里有一句格言说:让一切事务流动起来。这已经足够来描述在Node中流...
    宫若石阅读 550评论 0 0
  • 编译地址:https://github.com/substack/stream-handbook译者:jabez1...
    IT程序狮阅读 1,294评论 0 4