流的概念
- 流是一组有序的、有起点和终点的字节数据传输手段
- 流不关心文件的整体内容,只关注是否从文件中读到了数据,以及读到数据之后的处理
- 流是一个抽象接口,被 Node 中的很多对象所实现。比如 HTTP 服务器 request 和 response 对象都是流
- 流 是 Node.js 的核心模块,基本上都是 stream的实例,比如 process.stdout、http.clientRequest
流的好处
- 流是基于事件的 API,用于管理和处理数据,而且有不错的效率
- 借助事件和非阻塞 I/O 库,流模块允许在其可用的时候动态处理,在其不需要的时候释放掉
流中的数据有两种模式,二进制模式和对象模式
- 二进制模式, 每个分块都是 buffer 或者 string 对象
- 对象模式, 流内部处理的是一系列普通对象
所有使用 Node.js API 创建的流对象都只能操作 strings 和 Buffer对象。但是,通过一些第三方流的实现,你依然能够处理其它类型的 JavaScript 值 (除了 null,它在流处理中有特殊意义)。 这些流被认为是工作在 “对象模式”(object mode)。 在创建流的实例时,可以通过 objectMode 选项使流的实例切换到对象模式。试图将已经存在的流切换到对象模式是不安全的。
Node.js 中有四种基本的流类型
- Readable-可读流 (例如 fs.createReadStream() )
- Writable-可写的流(例如 fs.createWriteStreame() )
- Duplex-可读写的流(例如 net.Socket )
- Transform-在读写过程中可以修改和变换数据的 Duplex 流 (例如
zlib.createDeflate() )
第一种类型:可读流 createReadStream
创建一个可读流
// 引入 fs(读取文件) 模块
let fs = require('fs');
// 创建一个可读流
let rs = fs.createReadStream('./1.txt',{
flags:'r',
encoding:'utf8',
start:0,
autoClose:true,
end: 3,
highWaterMark:3
});
API:createReadStream(path, [options]);
- path 是读取文件的路径
- options 里面有
- flags:打开文件要做的操作,默认为 'r'
- encoding:默认是null,null 代表的是 buffer
- start:开始读取的索引位置
- autoClose:读取完毕后自动关闭
- end:结束读取的索引位置(包括结束位置)
- highWaterMark:读取缓存区默认的默认的大小 64kb (64*1024b)
如果指定 encoding 为 utf8 编码, highWaterMark 要大于 3 个字节
可读流的一些监听事件
- data 事件
- end 事件
- error 事件
- open 事件
- close 事件
各个写法如下:
// 流切换到流动模式,数据会被尽可能快的读出
rs.on('data',function(data){ // 暂停模式 -> 流动模式
console.log(data);
});
// 该事件会在读完数据后被触发
rs.on('end', function () {
console.log('读取完成');
});
// 读文件失败后被触发
rs.on('error', function (err) {
console.log(err);
});
// 文件打开后被触发
rs.on('open', function () {
console.log('文件打开了');
});
// 文件关闭后被触发
rs.on('close', function () {
console.log('关闭');
});
设置编码
与指定 {encoding:'utf8'} 效果相同,设置编码
rs.setEncoding('utf8');
暂停和恢复触发 data
通过 pause() 方法和 resume() 方法
rs.on('data', function (data) {
console.log(data);
rs.pause(); // 暂停方法 表示暂停读取,暂停data事件触发
});
setTimeout(function () {
rs.resume(); // 恢复方法
},2000);
第二种类型:可写流 createWriteStream
创建一个可写流
// 引入 fs(读取文件) 模块
let fs = require('fs');
// 创建一个可写流
let ws = fs.createWriteStream('./1.txt',{
flags:'w',
encoding:'utf8',
highWaterMark:3
});
API:createWriteStream(path, [options]);
- path 是读取文件的路径
- options 里面有
- flags:打开文件要做的操作,默认为 'w'
- encoding:默认是 utf8
- highWaterMark:写入缓存区的,默认大小 16kb
可写流的一些方法
1. write 方法
ws.write(chunk, [encoding], [callback]);
- chunk 写入的数据 buffer/string
- encoding 编码格式,chunk 为字符串时有用,是个可选参数
- callback 写入成功后的回调
返回值为布尔值,系统缓存区满时为 false,未满时为 true
2. end 方法
ws.end(chunk, [encoding], [callback]);
表明接下来没有数据要被写入 Writable 通过传入可选的 chunk 和 encoding 参数,可以在关闭流之前再写入一段数据 如果传入了可选的 callback 函数,它将作为 'finish' 事件的回调函数
3. drain 方法
ws.on('drain',function(){
console.log('drain')
});
- 当一个流不处在 drain 的状态, 对 write() 的调用会缓存数据块, 并且返回 false
- 当前所有缓存的数据块满了,满了之后情况才会出发 drain
- 一旦 write() 返回 false, 在 'drain' 事件触发前, 不能写入任何数据块
4. finish 方法
ws.end('结束');
ws.on('finish',function(){
console.log('drain')
});
- 在调用 end 方法,且缓冲区数据都已经传给底层系统之后, 'finish' 事件将被触发
第三种类型:可读写的流,也叫双工流(Duplex)
双工流,可以在同一个对象上同时实现可读、可写,就好像同时继承这两个接口。而且读取可以没关系(互不干扰)
// 引入双工流模块
let {Duplex} = require('stream');
let d = Duplex({
read(){
this.push('hello');
this.push(null)
},
write(chunk,encoding,callback){
console.log(chunk);
callback();
}
});
d.on('data',function(data){
console.log(data);
});
d.write('hello');
第四种类型:转换流(Transform)
- 转换流输出是从输入中计算出来的
- 转换流中,不需要实现 read 和 write 方法,只需要实现一个 transform 方法,就可以结合两者。
// 引入转换流
let {Transform} = require('stream');
// 转换流的参数和可写流一样
let tranform1 = Transform({
transform(chunk,encoding,callback){
this.push(chunk.toString().toUpperCase());
callback();
}
});
let tranform2 = Transform({
transform(chunk,encoding,callback){
console.log(chunk.toString());
callback();
}
});
process.stdin.pipe(tranform1).pipe(tranform2);
pipe 方法
大家都知道,想把 Readable 的数据 写到 Writable,需要手动将数据读入内存中,然后在写入 Writable。也就是每次传递数据的时候,都需要写一下的代码:
readable.on('readable', (err) => {
if(err) throw err
writable.write(readable.read())
})
为了方便使用,Node.js 提供了 pipe() 方法
readable.pipe(writable)
pipe 方法的原理
var fs = require('fs');
var ws = fs.createWriteStream('./2.txt');
var rs = fs.createReadStream('./1.txt');
rs.on('data', function (data) {
var flag = ws.write(data);
if(!flag)
rs.pause();
});
ws.on('drain', function () {
rs.resume();
});
rs.on('end', function () {
ws.end();
});
unpipe 用法
- readable.unpipe() 方法将之前通过 stream.pipe() 方法绑定的流分离
- 如果 destination 没有传入, 则所有绑定的流都会被分离
let fs = require('fs');
var from = fs.createReadStream('./1.txt');
var to = fs.createWriteStream('./2.txt');
from.pipe(to);
setTimeout(() => {
console.log('关闭向2.txt的写入');
from.unpipe(writable);
console.log('手工关闭文件流');
to.end();
}, 1000);
cork & uncork
- 调用 writable.cork() 方法将强制所有写入数据都存到内存中的缓冲区里。 直到调用 stream.uncork() 或 stream.end() 方法时,缓冲区里的数据才会被输出
- writable.uncork() 将输出在 stream.cork() 方法被调用之后缓冲在内存中的所有数据
stream.cork();
stream.write('1');
stream.write('2');
process.nextTick(() => stream.uncork());
readable
'readable' 事件将在流中有数据可供读取时才触发。在某些情况下,为 'readable' 事件添加回调将会导致一些数据被读取到内部缓存中
const readable = getReadableStreamSomehow();
readable.on('readable', () => {
// 某些数据可读
});
let fs = require('fs');
let rs = fs.createReadStream('./1.txt',{
start:3,
end:8,
encoding:'utf8',
highWaterMark:3
});
rs.on('readable',function () {
console.log('readable');
console.log('rs._readableState.buffer.length',rs._readableState.length);
let d = rs.read(1);
console.log('rs._readableState.buffer.length',rs._readableState.length);
console.log(d);
setTimeout(()=>{
console.log('rs._readableState.buffer.length',rs._readableState.length);
},500)
});
- 当流数据到达尾部时, 'readable' 事件会触发。触发顺序在 'end' 事件之前
- 事实上, 'readable' 事件表明流有了新的动态:要么是有了新的数据,要么是到了流的尾部。 对于前者, stream.read() 将返回可用的数据。而对于后者, stream.read() 将返回 null。
可读流的两种模式
- 可读流的两种工作模式:flowing 和 paused
- flowing 模式下,可读流自动从系统底层读取数据,通过 EventEmitter 接口的事件尽快将数据提供给应用
- paused 模式下,调用 stream.read() 方法来从流中读取数据片段
- 所有初始工作模式为 paused 的 Readable 流,可以通过下面三种途径切换到 flowing 模式
- 监听 'data' 事件
- 调用 stream.resume() 方法
- 调用 stream.pipe() 方法将数据发送到 Writable
- 可读流可以通过下面途径切换到 paused 模式:
- 如果不存在管道目标(pipe destination),可以通过调用 stream.pause() 方法实现。
- 如果存在管道目标,可以通过取消 'data' 事件监听,并调用 stream.unpipe() 方法移除所有管道目标来实现。
如果 Readable 切换到 flowing 模式,且没有消费者处理流中的数据,这些数据将会丢失。 比如, 调用了 readable.resume() 方法却没有监听 'data' 事件,或是取消了 'data' 事件监听,就有可能出现这种情况。
可读流的三种状态
在任意时刻,任意可读流应确切处于下面三种状态之一:
- readable._readableState.flowing = null
- readable._readableState.flowing = false
- readable._readableState.flowing = true
若 readable._readableState.flowing 为 null,由于不存在数据消费者,可读流将不会产生数据。 在这个状态下,监听 'data' 事件,调用 readable.pipe() 方法,或者调用 readable.resume() 方法, readable._readableState.flowing 的值将会变为 true 。这时,随着数据生成,可读流开始频繁触发事件。
调用 readable.pause() 方法, readable.unpipe() 方法, 或者接收 “背压”(back pressure), 将导致 readable._readableState.flowing 值变为 false。 这将暂停事件流,但 不会 暂停数据生成。 在这种情况下,为 'data' 事件设置监听函数不会导致 readable._readableState.flowing 变为 true。
当 readable._readableState.flowing 值为 false 时, 数据可能堆积到流的内部缓存中。