Stream - 流
stream是一个比较抽象的模型,类似于没有水的水流,stream.write可以让水流中有水(数据),每次一点一点写的小数据叫做chunk(块),产生数据的一段叫做source(源头),得到数据的一段叫做sink(水池)。
Stream 对象的原型链
s = fs.createReadStream(path)
- 代码中对象的层级为
- 自身属性(由fs.ReadStream构造)
- 原型:stream.Readable.prototype
- 二级原型:stream.Stream.prototype
- 三级原型:events.EventEmitter.prototype
- 四级原型:Object.prototype
- Stream 对象都继承了EventEmitter
Stream 支持的事件和方法
Readable Stream | Writable Stream | |
---|---|---|
事件 | data,end,error,close,readable | drain,finish,error,close,pipe,unpipe |
方法 | pipe(),unpipe() warp() destory() read() unshift() resume(),pause() isPaused() setEncoding() |
wriet() destory() end() cork(), uncork() setDefaultEncoding() |
- drain事件:某部分写完,stream干涸不再拥堵(stream中空闲下来了),通知可以继续进行写入
-
finish事件:整个全部写完,不再写入了
stream流方法事件详细介绍:http://nodejs.cn/api-v16/stream.html
Readable 和 Writable
Readable Stream
- 静止态 paused 和流动态 flowing
- 默认处于paused 态
- 添加 data 事件监听,他就会变成 flowing 态
- 删除 data 事件监听,他就会变成 paused 态
- pause() 可以将它变成 paused
- resume() 可以将它变成 flowing
Writable Stream
- drain 流干了事件
- 调用
stream.write(chunk)
时,可能会得到 false - 造成 false 的原因是写的太快,数据积压了
- 这时候不能再 write,要监听 drain
- 当 drain 事件触发时,才能继续write
- finish事件
- 调用
stream.end()
之后 - 且缓冲区数据都已经传给底层系统之后
- 触发 finish 事件
Duplex 和 Transform 的区别
- Duplex 读和写是两条,边读边写,不会交汇,内容不同,在没有其他设置的情况下,自己无法读到写的内容
- Transform 读写在同一条,中间存在转换器,可将写入的流转换为读取流需要的格式,从而进行读取
fs.readFile() 和 fs.createReadStream()
// fs.readFile() 读的速度快,但由于是一次性大量传输,因此会占用大量内存
server.on('request', (request, response) => {
fs.readFile('./big_file.txt', (err, data) => {
if (err) throw err
response.end(data)
console.log('done')
})
})
// fs.createReadStream() 读的速度略慢,但由于是一点一点传输,因此占用内存较小,基本控制在30M以内
server.on('request', (request, response) => {
const stream = fs.createReadStream('./big_file.txt')
stream.pipe(response)
console.log('done')
})
管道 pipe
释义
两个stream流可以用一个管道相连,stream1 的末尾接上 stream2 的开端,只要 stream1 有数据,就会流到 stream2,可以降低内存占用
代码示例
// 两个流可以连起来,stream1 一有数据,就通过管道 pipe 传送给 stream2,使用流可以降低内存占用
stream1.pipe(stream2)
a.pipe(b).pipe(c)
// 等价于
a.pipe
b.pipe
管道可以通过事件实现
// stream1一有数据chunk就塞给stream2
stream1.on('data', (chunk)=>{
stream2.write(chunk)
})
// stream1听了,就停掉stream2
stream1.on('end', ()=>{
stream2.end()
})
创建自己的流
Writable Stream
实现功能:输入什么就输出什么
const {Writable} = require("stream")
// 创建自己的可写的流
const outStream = new Writable({
write(chunk, encoding, callback) {
console.log(chunk.toString());
callback();
}
});
// process.stdin 标注输入
process.stdin.pipe(outStream)
Readable Stream
实现功能:根据控制,读取并输出范围内的值
const {Readable} = require("stream")
// 创建自己的可读的流
const inStream = new Readable({
read(size) {
this.push(String.fromCharCode(this.currentCharCode++));
// 控制只能读取 currentCharCode 大于90的值
// Z
if (this.currentCharCode > 90) {
this.push(null)
}
}
})
// 通过currentCharCode控制读取范围
// A,读取从65-90,即从A-Z
inStream.currentCharCode = 65
inStream.pipe(process.stdout)
Duplex Stream
实现功能:兼具Writable Stream和Readable Stream功能
const {Duplex} = require("stream");
// 读写分两条流,因此需要定义write()和read()分别负责读写
const inoutStream = new Duplex({
write(chunk, encoding, callback) {
console.log(chunk.toString());
callback();
}, read(size) {
this.push(String.fromCharCode(this.currentCharCode++));
if (this.currentCharCode > 90) {
this.push(null);
}
}
});
inoutStream.currentCharCode = 65;
process.stdin.pipe(inoutStream).pipe(process.stdout);
Transform Stream
实现功能:将输入的小写字母转换为大写字母并输出
const {Transform} = require("stream");
// transform 中间有转换器,因此可以在同一条流内处理读写
const upperCaseTr = new Transform({
transform(chunk, encoding, callback) {
// 将输入的小写字母转化为大写字母
this.push(chunk.toString().toUpperCase());
callback();
}
});
// 程序输入流中的值流入upperCaseTr操作,转换为大写字母
// 将大写字母 push 而后又流入process.stdout输出
process.stdin.pipe(upperCaseTr).pipe(process.stdout);