Node进阶 ---- Stream(二)

1. Node.js 中有四种基本的流类型

  • Readable - 可读的流 (例如 fs.createReadStream()).
  • Writable - 可写的流 (例如 fs.createWriteStream()).
  • Duplex - 可读写的流 (例如 net.Socket).
  • Transform - 在读写过程中可以修改和变换数据的 Duplex 流 (例如 zlib.createDeflate()).

2. 流中的数据有两种模式,二进制模式和对象模式

  • 二进制模式, 每个分块都是buffer或者string对象.

  • 对象模式, 流内部处理的是一系列普通对象.

    所有使用 Node.js API 创建的流对象都只能操作 strings 和 Buffer对象。但是,通过一些第三方流的实现,你依然能够处理其它类型的 JavaScript 值 (除了 null,它在流处理中有特殊意义)。 这些流被认为是工作在 “对象模式”(object mode)。 在创建流的实例时,可以通过 objectMode 选项使流的实例切换到对象模式。试图将已经存在的流切换到对象模式是不安全的。

3. 可读流的两种模式

  • 可读流事实上工作在下面两种模式之一:flowingpaused
  • 在 flowing 模式下, 可读流自动从系统底层读取数据,并通过 EventEmitter 接口的事件尽快将数据提供给应用。
  • 在 paused 模式下,必须显式调用 stream.read() 方法来从流中读取数据片段。
  • 所有初始工作模式为 paused 的 Readable 流,可以通过下面三种途径切换到 flowing 模式:
    • 监听 'data' 事件
    • 调用 stream.resume() 方法
    • 调用 stream.pipe() 方法将数据发送到 Writable
  • 可读流可以通过下面途径切换到 paused 模式:
    • 如果不存在管道目标(pipe destination),可以通过调用 stream.pause() 方法实现。
    • 如果存在管道目标,可以通过取消 'data' 事件监听,并调用 stream.unpipe() 方法移除所有管道目标来实现。

如果 Readable 切换到 flowing 模式,且没有消费者处理流中的数据,这些数据将会丢失。 比如, 调用了 readable.resume() 方法却没有监听 'data' 事件,或是取消了 'data' 事件监听,就有可能出现这种情况。

4.缓存区

  • Writable 和 Readable 流都会将数据存储到内部的缓冲器(buffer)中。这些缓冲器可以 通过相应的 writable._writableState.getBuffer() 或 readable._readableState.buffer 来获取。

  • 缓冲器的大小取决于传递给流构造函数的 highWaterMark 选项。 对于普通的流, highWaterMark 选项指定了总共的字节数。对于工作在对象模式的流, highWaterMark 指定了对象的总数。

  • 当可读流的实现调用stream.push(chunk)方法时,数据被放到缓冲器中。如果流的消费者没有调用stream.read()方法, 这些数据会始终存在于内部队列中,直到被消费。

  • 当内部可读缓冲器的大小达到 highWaterMark 指定的阈值时,流会暂停从底层资源读取数据,直到当前 缓冲器的数据被消费 (也就是说, 流会在内部停止调用 readable._read() 来填充可读缓冲器)。

  • 可写流通过反复调用 writable.write(chunk) 方法将数据放到缓冲器。 当内部可写缓冲器的总大小小于 highWaterMark 指定的阈值时, 调用 writable.write() 将返回true。 一旦内部缓冲器的大小达到或超过 highWaterMark ,调用 writable.write() 将返回 false 。

  • stream API 的关键目标, 尤其对于 stream.pipe() 方法, 就是限制缓冲器数据大小,以达到可接受的程度。这样,对于读写速度不匹配的源头和目标,就不会超出可用的内存大小。

  • Duplex 和 Transform 都是可读写的。 在内部,它们都维护了 两个 相互独立的缓冲器用于读和写。 在维持了合理高效的数据流的同时,也使得对于读和写可以独立进行而互不影响。

5. 可读流的三种状态

在任意时刻,任意可读流应确切处于下面三种状态之一:

  • readable._readableState.flowing = null

  • readable._readableState.flowing = false

  • readable._readableState.flowing = true

  • 若 readable._readableState.flowing 为 null,由于不存在数据消费者,可读流将不会产生数据。 在这个状态下,监听 'data' 事件,调用 readable.pipe() 方法,或者调用 readable.resume() 方法, readable._readableState.flowing 的值将会变为 true 。这时,随着数据生成,可读流开始频繁触发事件。

  • 调用 readable.pause() 方法, readable.unpipe() 方法, 或者接收 “背压”(back pressure), 将导致 readable._readableState.flowing 值变为 false。 这将暂停事件流,但 不会 暂停数据生成。 在这种情况下,为 'data' 事件设置监听函数不会导致 readable._readableState.flowing 变为 true。

  • 当 readable._readableState.flowing 值为 false 时, 数据可能堆积到流的内部缓存中。

6.readable

'readable' 事件将在流中有数据可供读取时触发。在某些情况下,为 'readable' 事件添加回调将会导致一些数据被读取到内部缓存中。

const readable = getReadableStreamSomehow();
readable.on('readable', () => {
  // 有一些数据可读了
});
  • 当到达流数据尾部时, 'readable' 事件也会触发。触发顺序在 'end' 事件之前。

  • 事实上, 'readable' 事件表明流有了新的动态:要么是有了新的数据,要么是到了流的尾部。 对于前者, stream.read() 将返回可用的数据。而对于后者, stream.read() 将返回 null。

    let fs =require('fs');
    let rs = fs.createReadStream('./1.txt',{
      start:3,
      end:8,
      encoding:'utf8',
      highWaterMark:3
    });
    rs.on('readable',function () {
      console.log('readable');
      console.log('rs._readableState.buffer.length',rs._readableState.length);
      let d = rs.read(1);
      console.log('rs._readableState.buffer.length',rs._readableState.length);
      console.log(d);
      setTimeout(()=>{
          console.log('rs._readableState.buffer.length',rs._readableState.length);
      },500)
    });
    

7.流的经典应用

7.1 行读取器

7.1.1 换行和回车

  • 以前的打印要每秒可以打印10个字符,换行城要0.2秒,正要可以打印2个字符。
  • 研制人员就是在每行后面加两个表示结束的字符。一个叫做"回车",告诉打字机把打印头定位在左边界;另一个叫做"换行",告诉打字机把纸向下移一行。
  • Unix系统里,每行结尾只有换行"(line feed)",即"\n",
  • Windows系统里面,每行结尾是"<回车><换行>",即"\r\n"
  • Mac系统里,每行结尾是"回车"(carriage return),即"\r"
  • 在ASCII码里
    • 换行 \n 10 0A
    • 回车 \r 13 0D

ASCII

7.1.2 代码

let fs = require('fs');
let EventEmitter = require('events');
let util = require('util');
util.inherits(LineReader, EventEmitter)
fs.readFile('./1.txt',function (err,data) {
    console.log(data);
})
function LineReader(path) {
    EventEmitter.call(this);
    this._rs = fs.createReadStream(path);
    this.RETURN = 0x0D;// \r 13
    this.NEW_LINE = 0x0A;// \n 10
    this.on('newListener', function (type, listener) {
        if (type == 'newLine') {
            let buffer = [];
            this._rs.on('readable', () => {
                let bytes;
                while (null != (bytes = this._rs.read(1))) {
                    let ch = bytes[0];
                    switch (ch) {
                        case this.RETURN:
                            this.emit('newLine', Buffer.from(buffer));
                            buffer.length = 0;
                            let nByte = this._rs.read(1);
                            if (nByte && nByte[0] != this.NEW_LINE) {
                                buffer.push(nByte[0]);
                            }
                            break;
                        case this.NEW_LINE:
                            this.emit('newLine', Buffer.from(buffer));
                            buffer.length = 0;
                            break;
                        default:
                            buffer.push(bytes[0]);
                            break;
                    }
                }
            });
            this._rs.on('end', () => {
                if (buffer.length > 0) {
                    this.emit('newLine', Buffer.from(buffer));
                    buffer.length = 0;
                    this.emit('end');
                }
            })
        }
    });
}

var lineReader = new LineReader('./1.txt');
lineReader.on('newLine', function (data) {
    console.log(data.toString());
}).on('end', function () {
    console.log("end");
})
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,658评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,482评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,213评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,395评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,487评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,523评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,525评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,300评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,753评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,048评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,223评论 1 343
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,905评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,541评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,168评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,417评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,094评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,088评论 2 352

推荐阅读更多精彩内容

  • stream 流是一个抽象接口,在 Node 里被不同的对象实现。例如 request to an HTTP se...
    明明三省阅读 3,402评论 1 10
  • 流是Node中最重要的组件和模式之一。在社区里有一句格言说:让一切事务流动起来。这已经足够来描述在Node中流...
    宫若石阅读 550评论 0 0
  • 流的基本概念及理解 流是一种数据传输手段,是有顺序的,有起点和终点,比如你要把数据从一个地方传到另外一个地方流非常...
    October_yang阅读 7,682评论 3 9
  • 春天天气多变、气候潮湿,人体内很容易积聚湿气,特别回南天的时候,感觉浑身上下不舒服。湿气不仅让人乏困,整天感觉累累...
    姜艾进行到底阅读 285评论 0 0
  • ​ 一年之前朋友约我一起弄ETC/ETH,就研究了一下以太坊和它的智能合约,以及史上最逆天的区块链众筹项目Th...
    繁星若尘啊阅读 523评论 0 1