node.js中的流是一种数据传输手段,流是有顺序的。流不关心整体流程,只管取出数据,获取数据后的操作
流有四种基本的类型
Readable - 可读的流 (例如 fs.createReadStream()).
Writable - 可写的流 (例如 fs.createWriteStream()).
Duplex - 可读写的流 (例如 net.Socket).
Transform - 在读写过程中可以修改和变换数据的 Duplex 流 (例如 zlib.createDeflate()).
可读流有两种模式
flowing 流动模式
paused 暂停模式
流动模式:flowing 没有缓存区。读一点数据,发射一点数据。当数据全部读完了触发一个end事件。例如:pipe(),resume()方法不走缓存.
data事件,当你一旦开始监听data事件的时候,流就可以读文件的内容并且发射data 。默认请况下,当你监听data事件之后,会不停的读数据,然后触发data事件,触发完data事件后再次读数据。
let rs=fs.createReadStream('./11.txt',{
highWaterMark:3
});
rs.setEncoding('utf8');rs.on('data',function(data){
//data获取到的是个buffer,要想获取字符需要设置编码
console.log(data);
});
rs.on('end',function(){
console.log('文件读完了');
});
pipe是可读流 的方法
ReadStream.prototype.pipe = function (dest) {
this.on('data', (data)=>{
let flag = dest.write(data);//写入数据,返回true,说明缓存区没满还可以继续写。返回
false暂停一下。监听drain事件,等到触发drain事件说明数据消化完了,再继续读取数据
if(!flag){
this.pause();
}
});
dest.on('drain', ()=>{
this.resume();
});
this.on('end', ()=>{
dest.end();
});
}
ReadStream.prototype.pause = function(){
this.flowing = false;
}
ReadStream.prototype.resume = function(){
this.flowing = true;
this.read();
}
dest 数据写入目标
可以在单个可读流上绑定多个可写流。
const r = fs.createReadStream('file.txt');
const z = zlib.createGzip();
const w = fs.createWriteStream('file.txt.gz');
r.pipe(z).pipe(w);
暂停模式:paused (初始化模式) 内部设置一个缓存区,缓存区默认大小64kb.实际大小以highWaterMark的值为准。当你监听 readable事件的时候,会进入暂停模式。读取highWaterMark的值放入缓存区,触发readable事件。
let fs = require('fs');
let rs = fs.createReadStream('./1.txt',{
highWaterMark:3
});
rs.on('readable',()=>{
onsole.log(rs._readableState.length);//3
let ch = rs.read(1);
//当你读了一个字节后,发现只剩下2个字节,不够highWaterMark,会再次读取highWaterMark个字节并填到
缓存区内
console.log(rs._readableState.length);//2
let ch = rs.read(3);
setTimeout(()=>{
console.log(rs._readableState.length);//5
},200)
});
自定义流
let {Writable,Readable,Duplex,Transform} = require('stream');
自定义可读流
为了实现可读流,引用Readable接口并用它构造新对象。
我们可以直接把供使用的数据push出去。
当push一个null对象就意味着我们想发出信号——这个流没有更多数据了。
var stream = require('stream');
var util = require('util');
util.inherits(Counter, stream.Readable);
function Counter(options) {
stream.Readable.call(this, options);
this._index = 0;
}
Counter.prototype._read = function() {
if(this._index++<3){
this.push(this._index+'');
}else{
this.push(null);
}
};
var counter = new Counter();
counter.on('data', function(data){
console.log("读到数据: " + data.toString());//no maybe
});
counter.on('end', function(data){
console.log("读完了");
});
自定义可写流
为了实现可写流,我们需要使用流模块中的Writable构造函数。 我们只需给Writable构造函数传递一些选项并创建一个对象。唯一需要的选项是write函数,该函数揭露数据块要往哪里写。
1.chunk通常是一个buffer,除非我们配置不同的流。
2.encoding是在特定情况下需要的参数,通常我们可以忽略它。
3.callback是在完成处理数据块后需要调用的函数。这是写数据成功与否的标志。若要发出故障信号,请用错误对象调用回调函数
var stream=require('stream');
var util=require('util');
util.inherits(Writer,stream.Writable);
letstock=[];
function Writer(opt) {
stream.Writable.call(this,opt);
}
Writer.prototype._write=function(chunk,encoding,callback) {
setTimeout(()=>{
stock.push(chunk.toString('utf8'));
console.log("增加: "+chunk);
callback();
},500)
};
var w=newWriter();
for(vari=1;i<=5;i++){
w.write("项目:"+i,'utf8');
}
w.end("结束写入",function(){
console.log(stock);
});
双工流
双工流(可读可写流)是可读流和可写流的实现。例如:net.Socket
let {Duplex} = require('stream');
let index = 0;
let s = Duplex({
read(){
if(index++<3)
this.push('a');
else
this.push(null);
},
write(chunk,encoding,cb){
console.log(chunk.toString().toUpperCase());
cb();
}
});
//process.stdin 标准输入流
//proces.stdout标准输出流
process.stdin.pipe(s).pipe(process.stdout);
Transform转换流
let {Transform} = require('stream');
let t = Transform({
transform(chunk,encoding,cb){
this.push(chunk.toString().toUpperCase());
cb();
}
});
process.stdin.pipe(t).pipe(process.stdout);