第一个stream的例子:
const fs = require('fs')
const stream = fs.createWriteStream('./big_file.txt')
for(let i=0;i<1000000;i++) {
stream.write(`this is ${i} \n`)
}
stream.end() //关闭stream
console.log('done')
- 创建流,多次往里面填充内容,关闭流
- 最终得到一个100MB的文件
stream-流
stream 是水流,但默认没有水
stream.write可以让水流中有水(数据)
每次写的数据叫chunk(块)
产生数据的一段叫source(源头)
得到数据的一段叫sink(水池)
第二个stream的例子
1.直接读取文件:
const fs = require('fs')
const http = require('http')
const server = http.createServer()
server.on('request',(request,response)=>{
fs.readFile('./big_file.txt',(error,data)=>{
if(error) throw error
response.end(data)
console.log('done')
})
})
server.listen(8888)
console.log('started-----8888')
此时我们创建一个http服务,当我们试图读取刚刚创建的文件时,发现node.js的服务内存瞬间增加了100MB,此时如果存在多个请求,将会对机器造成很大的压力.
2. 通过createReadStream读取文件;
const fs = require('fs')
const http = require('http')
const server = http.createServer()
server.on('request',(request,response)=>{
const stream = fs.createReadStream('./big_file.txt')
stream.pipe(response)
})
server.listen(8888)
console.log('started at 8888')
此时我们可以看到node.js服务的内存只增加了20多MB,虽然读取的速度不如比之前fs.readFile慢一些。
管道pipe
两个流可以用一个管道相连,stream1的末尾连上stream2开头
stream1.pipe(stream2)
//链式操作
a.pipe(b).pipe(c)
Stream对象的原型链
const s = fs.createReadStream(path)
s的对象层级为:
自身属性(由fs.ReadStream构造)
原型:stream.Readable.prototype
二级原型:stream.Stream.prototype
三级原型:events.EventEmitter.prototype
四级原型:Object.prototype
Stream 分类
- Readable 可读
- Writable 可写
- Duplex 可读可写(双向,默认读写分离,互相不干扰)
- Transform 可读可写(变化,相当于一个转换器,比如babel写入es6,然后读取的是es5)
Readable stream
- 默认处于paused态
- 添加data事件,变成flowing态
- 删掉data事件,变为paused态
- 调用pause()可以变为paused态
- 调用resume()可以变为flowing态
const fs = require('fs')
const http = require('http')
const server = http.createServer()
server.on('request',(request,response)=>{
const stream = fs.createReadStream('./big_file.txt')
stream.pipe(response)
stream.pause()
})
server.listen(8888)
console.log('started-----8888')
此时我们发送请求,无法获得任何的响应,因为调用了pause()
stream.pipe(response)
stream.pause()
setTimeout(()=>{
console.log('3s 后恢复')
stream.resume()
},3000)
我们在3秒后调用resume(),此时响应恢复
创建一个readable stream
const {Readable} = require('stream')
const inStream = new Readable()
inStream.push('ABCDEFG')
inStream.push('EFGHI')
inStream.push(null)
inStream.pipe(process.stdout)
目前的方法不是按需供给的
const { Readable } = require("stream");
const inStream = new Readable({
read(size) {
const char = String.fromCharCode(this.currentCharCode++)
this.push(char);
console.log(`push ${char}`)
if (this.currentCharCode > 90) {
this.push(null);
}
}
})
inStream.currentCharCode = 65
inStream.pipe(process.stdout)
用户调用read才会调用
Writable stream
1.drain事件
我们在调用stream.write(chunk)的时候可能会得到false,表示写的太快,数据积压,要监听到drain事件后才能继续write
2. finish事件
调用stream.end()后,且缓冲区数据都传递给底层系统后,触发finish事件
创建一个writable stream
const {Writable} = require('stream')
const outStream = new Writable({
write(chunk, encoding, callback) {
console.log('get user input:')
console.log(chunk.toString())
callback()
}
})
process.stdin.pipe(outStream)
这时我们输入一个值,终端会出现你输入的值
Duplex stream
const { Duplex } = require("stream");
const inoutStream = new Duplex({
write(chunk, encoding, callback) {
console.log(chunk.toString());
callback();
},
read(size) {
this.push(String.fromCharCode(this.currentCharCode++));
if (this.currentCharCode > 90) {
this.push(null);
}
}
});
inoutStream.currentCharCode = 65;
process.stdin.pipe(inoutStream).pipe(process.stdout);
Transform stream
const { Transform } = require("stream");
const upperCaseTr = new Transform({
transform(chunk, encoding, callback) {
this.push(chunk.toString().toUpperCase());
callback();
}
});
process.stdin.pipe(upperCaseTr).pipe(process.stdout);
创建一个gzip流
const fs = require("fs");
const zlib = require("zlib");
const file = process.argv[2];
fs.createReadStream(file)
.pipe(zlib.createGzip())
.pipe(fs.createWriteStream(file + ".gz"));