Node.js Streams(流)

流的概念

  • 流是一组有序的、有起点和终点的字节数据传输手段
  • 流不关心文件的整体内容,只关注是否从文件中读到了数据,以及读到数据之后的处理
  • 流是一个抽象接口,被 Node 中的很多对象所实现。比如 HTTP 服务器 request 和 response 对象都是流
  • 流 是 Node.js 的核心模块,基本上都是 stream的实例,比如 process.stdout、http.clientRequest

流的好处

  • 流是基于事件的 API,用于管理和处理数据,而且有不错的效率
  • 借助事件和非阻塞 I/O 库,流模块允许在其可用的时候动态处理,在其不需要的时候释放掉

流中的数据有两种模式,二进制模式和对象模式

  • 二进制模式, 每个分块都是 buffer 或者 string 对象
  • 对象模式, 流内部处理的是一系列普通对象

所有使用 Node.js API 创建的流对象都只能操作 strings 和 Buffer对象。但是,通过一些第三方流的实现,你依然能够处理其它类型的 JavaScript 值 (除了 null,它在流处理中有特殊意义)。 这些流被认为是工作在 “对象模式”(object mode)。 在创建流的实例时,可以通过 objectMode 选项使流的实例切换到对象模式。试图将已经存在的流切换到对象模式是不安全的。

Node.js 中有四种基本的流类型

  1. Readable-可读流 (例如 fs.createReadStream() )
  2. Writable-可写的流(例如 fs.createWriteStreame() )
  3. Duplex-可读写的流(例如 net.Socket )
  4. Transform-在读写过程中可以修改和变换数据的 Duplex 流 (例如
    zlib.createDeflate() )

第一种类型:可读流 createReadStream

创建一个可读流
// 引入 fs(读取文件) 模块
let fs = require('fs');
// 创建一个可读流
let rs = fs.createReadStream('./1.txt',{
    flags:'r',
    encoding:'utf8',
    start:0,
    autoClose:true,
    end: 3,
    highWaterMark:3 
});

API:createReadStream(path, [options]);

  1. path 是读取文件的路径
  2. options 里面有
    • flags:打开文件要做的操作,默认为 'r'
    • encoding:默认是null,null 代表的是 buffer
    • start:开始读取的索引位置
    • autoClose:读取完毕后自动关闭
    • end:结束读取的索引位置(包括结束位置)
    • highWaterMark:读取缓存区默认的默认的大小 64kb (64*1024b)

    如果指定 encoding 为 utf8 编码, highWaterMark 要大于 3 个字节

可读流的一些监听事件
  1. data 事件
  2. end 事件
  3. error 事件
  4. open 事件
  5. close 事件

各个写法如下:

// 流切换到流动模式,数据会被尽可能快的读出
rs.on('data',function(data){ // 暂停模式 -> 流动模式
    console.log(data);
});

// 该事件会在读完数据后被触发
rs.on('end', function () {
    console.log('读取完成');
});

// 读文件失败后被触发
rs.on('error', function (err) {
    console.log(err);
});

// 文件打开后被触发
rs.on('open', function () {
    console.log('文件打开了');
});

// 文件关闭后被触发
rs.on('close', function () {
    console.log('关闭');
});
设置编码

与指定 {encoding:'utf8'} 效果相同,设置编码

rs.setEncoding('utf8');
暂停和恢复触发 data

通过 pause() 方法和 resume() 方法

rs.on('data', function (data) {
    console.log(data);
    rs.pause(); // 暂停方法 表示暂停读取,暂停data事件触发
});
setTimeout(function () {
    rs.resume(); // 恢复方法
},2000);

第二种类型:可写流 createWriteStream

创建一个可写流
// 引入 fs(读取文件) 模块
let fs = require('fs');
// 创建一个可写流
let ws = fs.createWriteStream('./1.txt',{
    flags:'w',
    encoding:'utf8',
    highWaterMark:3 
});

API:createWriteStream(path, [options]);

  1. path 是读取文件的路径
  2. options 里面有
    • flags:打开文件要做的操作,默认为 'w'
    • encoding:默认是 utf8
    • highWaterMark:写入缓存区的,默认大小 16kb
可写流的一些方法
1. write 方法
ws.write(chunk, [encoding], [callback]);
  • chunk 写入的数据 buffer/string
  • encoding 编码格式,chunk 为字符串时有用,是个可选参数
  • callback 写入成功后的回调

返回值为布尔值,系统缓存区满时为 false,未满时为 true

2. end 方法
ws.end(chunk, [encoding], [callback]);

表明接下来没有数据要被写入 Writable 通过传入可选的 chunk 和 encoding 参数,可以在关闭流之前再写入一段数据 如果传入了可选的 callback 函数,它将作为 'finish' 事件的回调函数

3. drain 方法
ws.on('drain',function(){
    console.log('drain')
});
  • 当一个流不处在 drain 的状态, 对 write() 的调用会缓存数据块, 并且返回 false
  • 当前所有缓存的数据块满了,满了之后情况才会出发 drain
  • 一旦 write() 返回 false, 在 'drain' 事件触发前, 不能写入任何数据块
4. finish 方法
ws.end('结束');
ws.on('finish',function(){
    console.log('drain')
});
  • 在调用 end 方法,且缓冲区数据都已经传给底层系统之后, 'finish' 事件将被触发

第三种类型:可读写的流,也叫双工流(Duplex)

双工流,可以在同一个对象上同时实现可读、可写,就好像同时继承这两个接口。而且读取可以没关系(互不干扰)

// 引入双工流模块
let {Duplex} =  require('stream');
let d = Duplex({
    read(){
        this.push('hello');
        this.push(null)
    },
    write(chunk,encoding,callback){
        console.log(chunk);
        callback();
    }
});
d.on('data',function(data){
    console.log(data);
});
d.write('hello');

第四种类型:转换流(Transform)

  • 转换流输出是从输入中计算出来的
  • 转换流中,不需要实现 read 和 write 方法,只需要实现一个 transform 方法,就可以结合两者。
// 引入转换流
let {Transform} =  require('stream');
// 转换流的参数和可写流一样
let tranform1 = Transform({
    transform(chunk,encoding,callback){
        this.push(chunk.toString().toUpperCase()); 
        callback();
    }
});
let tranform2 = Transform({
    transform(chunk,encoding,callback){
        console.log(chunk.toString());
        callback();
    }
});
process.stdin.pipe(tranform1).pipe(tranform2);

pipe 方法

大家都知道,想把 Readable 的数据 写到 Writable,需要手动将数据读入内存中,然后在写入 Writable。也就是每次传递数据的时候,都需要写一下的代码:

readable.on('readable', (err) => {
 if(err) throw err
 writable.write(readable.read())
})

为了方便使用,Node.js 提供了 pipe() 方法

readable.pipe(writable)
pipe 方法的原理
var fs = require('fs');
var ws = fs.createWriteStream('./2.txt');
var rs = fs.createReadStream('./1.txt');
rs.on('data', function (data) {
    var flag = ws.write(data);
    if(!flag)
    rs.pause();
});
ws.on('drain', function () {
    rs.resume();
});
rs.on('end', function () {
    ws.end();
});
unpipe 用法
  • readable.unpipe() 方法将之前通过 stream.pipe() 方法绑定的流分离
  • 如果 destination 没有传入, 则所有绑定的流都会被分离
let fs = require('fs');
var from = fs.createReadStream('./1.txt');
var to = fs.createWriteStream('./2.txt');
from.pipe(to);
setTimeout(() => {
console.log('关闭向2.txt的写入');
from.unpipe(writable);
console.log('手工关闭文件流');
to.end();
}, 1000);
cork & uncork
  • 调用 writable.cork() 方法将强制所有写入数据都存到内存中的缓冲区里。 直到调用 stream.uncork() 或 stream.end() 方法时,缓冲区里的数据才会被输出
  • writable.uncork() 将输出在 stream.cork() 方法被调用之后缓冲在内存中的所有数据
stream.cork();
stream.write('1');
stream.write('2');
process.nextTick(() => stream.uncork());

readable

'readable' 事件将在流中有数据可供读取时才触发。在某些情况下,为 'readable' 事件添加回调将会导致一些数据被读取到内部缓存中

const readable = getReadableStreamSomehow();
readable.on('readable', () => {
  // 某些数据可读
});
let fs = require('fs');
let rs = fs.createReadStream('./1.txt',{
  start:3,
  end:8,
  encoding:'utf8',
  highWaterMark:3
});
rs.on('readable',function () {
  console.log('readable');
  console.log('rs._readableState.buffer.length',rs._readableState.length);
  let d = rs.read(1);
  console.log('rs._readableState.buffer.length',rs._readableState.length);
  console.log(d);
  setTimeout(()=>{
      console.log('rs._readableState.buffer.length',rs._readableState.length);
  },500)
});
  • 当流数据到达尾部时, 'readable' 事件会触发。触发顺序在 'end' 事件之前
  • 事实上, 'readable' 事件表明流有了新的动态:要么是有了新的数据,要么是到了流的尾部。 对于前者, stream.read() 将返回可用的数据。而对于后者, stream.read() 将返回 null。

可读流的两种模式

  1. 可读流的两种工作模式:flowing 和 paused
  2. flowing 模式下,可读流自动从系统底层读取数据,通过 EventEmitter 接口的事件尽快将数据提供给应用
  3. paused 模式下,调用 stream.read() 方法来从流中读取数据片段
  4. 所有初始工作模式为 paused 的 Readable 流,可以通过下面三种途径切换到 flowing 模式
    • 监听 'data' 事件
    • 调用 stream.resume() 方法
    • 调用 stream.pipe() 方法将数据发送到 Writable
  5. 可读流可以通过下面途径切换到 paused 模式:
    • 如果不存在管道目标(pipe destination),可以通过调用 stream.pause() 方法实现。
    • 如果存在管道目标,可以通过取消 'data' 事件监听,并调用 stream.unpipe() 方法移除所有管道目标来实现。

如果 Readable 切换到 flowing 模式,且没有消费者处理流中的数据,这些数据将会丢失。 比如, 调用了 readable.resume() 方法却没有监听 'data' 事件,或是取消了 'data' 事件监听,就有可能出现这种情况。

可读流的三种状态

在任意时刻,任意可读流应确切处于下面三种状态之一:

  1. readable._readableState.flowing = null
  2. readable._readableState.flowing = false
  3. readable._readableState.flowing = true
  • 若 readable._readableState.flowing 为 null,由于不存在数据消费者,可读流将不会产生数据。 在这个状态下,监听 'data' 事件,调用 readable.pipe() 方法,或者调用 readable.resume() 方法, readable._readableState.flowing 的值将会变为 true 。这时,随着数据生成,可读流开始频繁触发事件。

  • 调用 readable.pause() 方法, readable.unpipe() 方法, 或者接收 “背压”(back pressure), 将导致 readable._readableState.flowing 值变为 false。 这将暂停事件流,但 不会 暂停数据生成。 在这种情况下,为 'data' 事件设置监听函数不会导致 readable._readableState.flowing 变为 true。

  • 当 readable._readableState.flowing 值为 false 时, 数据可能堆积到流的内部缓存中。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,732评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,496评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,264评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,807评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,806评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,675评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,029评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,683评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 41,704评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,666评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,773评论 1 332
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,413评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,016评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,978评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,204评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,083评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,503评论 2 343

推荐阅读更多精彩内容

  • stream 流是一个抽象接口,在 Node 里被不同的对象实现。例如 request to an HTTP se...
    明明三省阅读 3,392评论 1 10
  • 流是Node中最重要的组件和模式之一。在社区里有一句格言说:让一切事务流动起来。这已经足够来描述在Node中流...
    宫若石阅读 539评论 0 0
  • 流的基本概念及理解 流是一种数据传输手段,是有顺序的,有起点和终点,比如你要把数据从一个地方传到另外一个地方流非常...
    October_yang阅读 7,665评论 3 9
  • Buffer Buffer的构成 Buffer对象类似数组,它的元素位16进制的两位数,即0到255的数值。主要是...
    人失格阅读 1,799评论 0 0
  • 记得2016年毕业季,每天焦灼地找工作,家乡就业环境不好,还好最终凭自己的努力入职国企,但里面工作氛围压的人喘不过...
    xxx33阅读 660评论 2 2