简明writeStream实现

Pro

一个createWriteStream的简单实现,以求能增加对可写流的理解与应用。

参数配置

/**
 * createWriteStream
 * @param1 path
 * @param2 options
 */
let fs = require('fs');
let ws = fs.createWriteStream('./1.txt',{
  flags:'w'//文件的打开模式
  ,mode:0o666//文件的权限设置
  ,encoding:'utf8'//写入文件的字符的编码
  ,highWaterMark:3//最高水位线
  ,start:0 //写入文件的起始索引位置        
  ,autoClose:true//是否自动关闭文档
})

createWriteStream类的实例化

  • 实例化一个createWriteStream
    • pathoptions挂载在createWriteStream的实例上,除此之外再在实例上挂载以下属性
      • self.fd=null:文件打开后返回的文件描述符
      • self.pos=self.start:用于表示文件真正写入时的指针位置
      • self.Buffer=[]:用来表示文件的缓冲区
      • self.len=null:用来表示缓冲区此时的大小
      • self.isWriting=false:用来表示是否正在真正写入文件
    • 调用open方法,打开文件(发射open事件)

实例write方法的执行流程

  • wirte方法接收三个参数,chunk要写入的内容,encoding要进行的,cb回调函数。
  • write执行流程:
    • 判断传入的chunk是否为buffer,如果不是,则转换成buffer,用于转化编码依据传入的encoding参数。
    • 更新Buffer缓冲区的len长度,让len加上该次chunk的长度
    • 判断len是否已经超过highWaterMark,将值存入flag
    • 判断是否处于isWriting状态:
      • 是,则先加chunk写入实例对象下的Buffer缓冲区
      • 否,更新isWriting,接将参数传递给实例下的_write方法写入文件
    • 返回flag

实例_write方法的执行流程

此方法用于真正写入文件

  • 查看实例的fd属性是否存在(文件是否打开成功)
    • 成功,调用fs模块的write方法正式写入数据
      • 更新实例对象下的len以及pos属性
      • 调用clearBuffer方法将缓冲区的内容写入
      • 调用write方法传入的回调函数cb
    • 失败,订阅一个open事件(open事件将会在open方法中被发射),在订阅中的回调方法中再次以相同的参数调用_write方法

实例clearBuffer方法

  • 从缓冲区中取出一个数据
    • 如果数据存在,调用_write方法
    • 如果数据不存在,将isWriting更改为false,发射drain事件

实现源码以及测试文件

let fs = require('fs');
let EventEmitter = require('events');

class WriteStream extends EventEmitter {
  constructor(path, options) {
    super();
    let self = this;
    Object.assign(self, options); //还需设置默认值
    self.path = path;
    self.isWriting = false;
    self.Buffer = []; //源码中为链表实现的缓冲区
    self.len = null;
    self.pos = self.start; //初始化写入位置
    self.fd = null;
    self.open();
  }

  open() {
    let self = this;
    fs.open(self.path, self.flags, self.mode, (err, fd) => {
      self.fd = fd;
      if (err) return self.destroy(err);
      self.emit('open');
    });
  }

  destroy(err) {
    fs.close(this.fd, () => {
      this.emit('error', err);
    });
  }

  write(chunk, encoding, cb) {
    let self = this
      , ret = null;
    encoding = encoding?encoding:self.encoding; //优先使用write传入的编码方式
    chunk = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk, encoding);
    self.len += chunk.length;
    ret = self.highWaterMark > self.len; //判断当前最新的缓冲区是否已达到最高水位线

    if (self.isWriting) { //说明正在调用底层方法真正写入文件,先写入Buffer
      self.Buffer.push({
        chunk
        , cb
      });
    } else {
      self.isWriting = true;
      self._write(chunk, cb, () => self.clearBuffer());
    }

    return ret;
  }

  _write(chunk, cb, clear) {
    let self = this;
    if (!self.fd) return self.once('open', () => {
      self._write(chunk, cb, clear)
    });
    fs.write(self.fd, chunk, 0, chunk.length, self.pos, (err, bytesWritten) => {
      if (err) {
        if (self.autoClose) {
          self.destroy();
          self.emit('error', err);
        }
      }
      self.len -= bytesWritten;
      self.pos += bytesWritten;
      cb && cb();
      clear && clear();
    });
  }

  clearBuffer() {
    let self = this
      , data = null;
    data = self.Buffer.shift();
    if (data) {
      self._write(data.chunk, data.cb, () => self.clearBuffer());
    } else { //此时说明缓冲区已无数据
      self.isWriting = false;
      self.emit('drain');
    }
  }
}

module.exports = WriteStream;

测试文件:

let WriteStream = require('./practice');
let ws = new WriteStream('./1.txt',{
  flags:'w'
  ,mode:0o666
  ,start:0
  ,encoding:'utf8'
  ,autoClose:true //当流写完之后自动关闭文件
  ,highWaterMark:3
});
let n = 9;
ws.on('error',(err)=>{
  console.log(err)
})
function write(){
  let flag = true;
  while(flag&&n>0){
    flag = ws.write(n+"",'utf8',()=>{
      console.log('ok');
    });
    n--;
    console.log('flag=',flag)
  }
  ws.once('drain',()=>{
    console.log('drain');
    write();
  })
}
// ws.on('drain',()=>{
//   console.log('drain');
//   write();
// })
write();


参考资料:
https://nodejs.org/dist/latest-v9.x/docs/api/stream.html#stream_writable_streams

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,362评论 5 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,330评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,247评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,560评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,580评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,569评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,929评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,587评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,840评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,596评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,678评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,366评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,945评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,929评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,165评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 43,271评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,403评论 2 342

推荐阅读更多精彩内容