Pro
一个createWriteStream
的简单实现,以求能增加对可写流的理解与应用。
参数配置
/**
* createWriteStream
* @param1 path
* @param2 options
*/
let fs = require('fs');
let ws = fs.createWriteStream('./1.txt',{
flags:'w'//文件的打开模式
,mode:0o666//文件的权限设置
,encoding:'utf8'//写入文件的字符的编码
,highWaterMark:3//最高水位线
,start:0 //写入文件的起始索引位置
,autoClose:true//是否自动关闭文档
})
createWriteStream类的实例化
- 实例化一个
createWriteStream
类- 将
path
,options
挂载在createWriteStream
的实例上,除此之外再在实例上挂载以下属性-
self.fd=null
:文件打开后返回的文件描述符 -
self.pos=self.start
:用于表示文件真正写入时的指针位置 -
self.Buffer=[]
:用来表示文件的缓冲区 -
self.len=null
:用来表示缓冲区此时的大小 -
self.isWriting=false
:用来表示是否正在真正写入文件
-
- 调用
open
方法,打开文件(发射open事件)
- 将
实例write方法的执行流程
-
wirte
方法接收三个参数,chunk
要写入的内容,encoding
要进行的,cb
回调函数。 -
write
执行流程:- 判断传入的
chunk
是否为buffer,如果不是,则转换成buffer,用于转化编码依据传入的encoding
参数。 - 更新
Buffer
缓冲区的len
长度,让len加上该次chunk的长度 - 判断
len
是否已经超过highWaterMark
,将值存入flag
- 判断是否处于
isWriting
状态:- 是,则先加
chunk
写入实例对象下的Buffer缓冲区
。 - 否,更新
isWriting
,接将参数传递给实例下的_write
方法写入文件
- 是,则先加
- 返回
flag
- 判断传入的
实例_write方法的执行流程
此方法用于真正写入文件
- 查看实例的
fd
属性是否存在(文件是否打开成功)- 成功,调用
fs
模块的write
方法正式写入数据- 更新实例对象下的
len
以及pos
属性 - 调用
clearBuffer
方法将缓冲区的内容写入 - 调用write方法传入的回调函数
cb
- 更新实例对象下的
- 失败,订阅一个
open事件
(open事件将会在open方法中被发射),在订阅中的回调方法中再次以相同的参数调用_write方法
- 成功,调用
实例clearBuffer方法
- 从缓冲区中取出一个数据
- 如果数据存在,调用
_write
方法 - 如果数据不存在,将
isWriting
更改为false,发射drain
事件
- 如果数据存在,调用
实现源码以及测试文件
let fs = require('fs');
let EventEmitter = require('events');
class WriteStream extends EventEmitter {
constructor(path, options) {
super();
let self = this;
Object.assign(self, options); //还需设置默认值
self.path = path;
self.isWriting = false;
self.Buffer = []; //源码中为链表实现的缓冲区
self.len = null;
self.pos = self.start; //初始化写入位置
self.fd = null;
self.open();
}
open() {
let self = this;
fs.open(self.path, self.flags, self.mode, (err, fd) => {
self.fd = fd;
if (err) return self.destroy(err);
self.emit('open');
});
}
destroy(err) {
fs.close(this.fd, () => {
this.emit('error', err);
});
}
write(chunk, encoding, cb) {
let self = this
, ret = null;
encoding = encoding?encoding:self.encoding; //优先使用write传入的编码方式
chunk = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk, encoding);
self.len += chunk.length;
ret = self.highWaterMark > self.len; //判断当前最新的缓冲区是否已达到最高水位线
if (self.isWriting) { //说明正在调用底层方法真正写入文件,先写入Buffer
self.Buffer.push({
chunk
, cb
});
} else {
self.isWriting = true;
self._write(chunk, cb, () => self.clearBuffer());
}
return ret;
}
_write(chunk, cb, clear) {
let self = this;
if (!self.fd) return self.once('open', () => {
self._write(chunk, cb, clear)
});
fs.write(self.fd, chunk, 0, chunk.length, self.pos, (err, bytesWritten) => {
if (err) {
if (self.autoClose) {
self.destroy();
self.emit('error', err);
}
}
self.len -= bytesWritten;
self.pos += bytesWritten;
cb && cb();
clear && clear();
});
}
clearBuffer() {
let self = this
, data = null;
data = self.Buffer.shift();
if (data) {
self._write(data.chunk, data.cb, () => self.clearBuffer());
} else { //此时说明缓冲区已无数据
self.isWriting = false;
self.emit('drain');
}
}
}
module.exports = WriteStream;
测试文件:
let WriteStream = require('./practice');
let ws = new WriteStream('./1.txt',{
flags:'w'
,mode:0o666
,start:0
,encoding:'utf8'
,autoClose:true //当流写完之后自动关闭文件
,highWaterMark:3
});
let n = 9;
ws.on('error',(err)=>{
console.log(err)
})
function write(){
let flag = true;
while(flag&&n>0){
flag = ws.write(n+"",'utf8',()=>{
console.log('ok');
});
n--;
console.log('flag=',flag)
}
ws.once('drain',()=>{
console.log('drain');
write();
})
}
// ws.on('drain',()=>{
// console.log('drain');
// write();
// })
write();
参考资料:
https://nodejs.org/dist/latest-v9.x/docs/api/stream.html#stream_writable_streams