Node.js介绍2-流

曾经学C++的STL中的IOStream,输入输出流,看个代码

using namespace std;
cout<<x;

眼角湿润了,这是大学的记忆啊,大学时我们幸苦的学习C++,为了指针的释放和指针的指针搞的焦头烂额,更何况记忆中不仅有代码,还有我的青春和她。算了,搬砖的腰又酸了,还是回到现实,看看node.js中的流吧。

啥是流啊。。。

流顾名思义就是流水的意思,stream英文也是溪流。如果把二进制数据从一个地方很源源不断的送到另一个地方,像水流一样的功能,就叫流。

A stream is an abstract interface implemented by various objects in Node.js. For example a request to an HTTP server is a stream, as is process.stdout
.Streams are readable, writable, or both. All streams are instances of EventEmitter.

愿意啃文档的兄弟可以看stream

stream的例子

因为node.js非常善于处理数据(这里数据的可能是服务器的网页,或者返回的json数据,或者任何东西),所以我们来看看一些例子,说明stream对服务器的重要作用。node.js里面很多类都是继承了流的接口。

创建一个echo服务

echo是回声的意思,我们对着大山喊话,回听到自己喊的声音,这边我们做个服务器干这个无聊的事情吧。

var http = require('http');
http.createServer(function(request, response) {
    response.writeHead(200);
    request.pipe(response);
}).listen(8080);

运行以后调用curl -d 'hello' http://localhost:8080。简直不敢相信服务器这么简单就写好了,这就是node.js的魅力吧。
上面的pipe就是管道的意思,和linux的命令行|一个意思,大家应该熟悉命令行的管道吧,概念都是相通的。大家应该知道这个

gulp

就是基于stream来做的。

上传文件

我们在看一个上传文件的例子。

var http = require('http');
var fs = require('fs');
http.createServer(function(request, response) {
    var newFile = fs.createWriteStream("copy" + new Date() + ".md");
    var fileBytes = request.headers['content-length'];
    var uploadedBytes = 0;

    response.write("server receive request\n");
    request.pipe(newFile);

    request.on('readable', function() {
        var chunk = null;
        response.write("progress: start\n");
        while (null !== (chunk = request.read())) {
            uploadedBytes += chunk.length;
            var progress = (uploadedBytes / fileBytes) * 100;
            response.write("progress: " + parseInt(progress, 10) + "%\n");
        }
    });


    request.on('end', function() {
        response.end('uploaded!\n');
    });

}).listen(8080);
//curl --upload-file uploadFiles.js http://localhost:8080
上传文件例子

这里的看点是

  1. 如何返回进度的:request.on('readable', function() {,有没有觉得这种异步I/O方式的优点。
  2. 如何保存文件request.pipe(newFile);,是不是很方便。

流的实现

上面我们看到流的结构的简单易用,现在我们看看node.js的流是怎么设计的。

To implement any sort of stream, the pattern is the same:

  1. Extend the appropriate parent class in your own subclass. (Theutil.inherits() method is particularly helpful for this.)
  2. Call the appropriate parent class constructor in your constructor, to be sure that the internal mechanisms are set up properly.
  3. Implement one or more specific methods, as detailed below.

The class to extend and the method(s) to implement depend on the sort of stream class you are writing:


流的基础

翻译一下流实现的过程:

  1. 继承合适的class
  2. 不要忘记调用基类构造函数
  3. 重写基类方法

数数的可读流

看一个例子就清楚了,下面这段程序就是数数,1数到1000000。

const Readable = require('stream').Readable;
const util = require('util');
util.inherits(Counter, Readable);

function Counter(opt) {
    Readable.call(this, opt);
    this._max = 1000000;
    this._index = 1;
}

Counter.prototype._read = function() {
    var i = this._index++;
    if (i > this._max)
        this.push(null);
    else {
        var str = '' + i;
        var buf = new Buffer(str, 'ascii');
        this.push(buf);
    }
};

///////////////////////////////////////////////////////////
//test 
var fs = require('fs');
var newFile = fs.createWriteStream("test_counter.txt");
var myCounter = new Counter();
myCounter.pipe(newFile);

上面的Counter完成了三部曲,测试程序把这个conter输出到文件。如果我们想自己实现一个流,这样就可以了。如果上面例子太简单了,我们看一下复杂点的例子,比如transform

啥是transform流

Transform streams are Duplex streams where the output is in some way computed from the input. They implement both the Readable and Writable interfaces.
Examples of Transform streams include:
zlib streams
crypto streams
翻译一下就是用来把输入流变化一下,再输出。比如压缩,加密等。

const gzip = zlib.createGzip();
const fs = require('fs');
const inp = fs.createReadStream('input.txt');
const out = fs.createWriteStream('input.txt.gz');

inp.pipe(gzip).pipe(out);

实现transform流

这个例子解析一个数据,产生一个readable stream,这个stream是经过变换的哦。

  1. 解析的格式:有两个换行符的数据流,换行符前面是头,后面是内容


    格式
  2. 解析的过程中发出一个事件header,用来显示头部信息
  3. 最后去掉头部,保留内容信息
    现在来看一下代码吧。
const util = require('util');
const Transform = require('stream').Transform;
util.inherits(SimpleProtocol, Transform);

function SimpleProtocol(options) {
  if (!(this instanceof SimpleProtocol))
    return new SimpleProtocol(options);

  Transform.call(this, options);
  this._inBody = false;
  this._sawFirstCr = false;
  this._rawHeader = [];
  this.header = null;
}

SimpleProtocol.prototype._transform = function(chunk, encoding, done) {
  if (!this._inBody) {
    // check if the chunk has a \n\n
    var split = -1;
    for (var i = 0; i < chunk.length; i++) {
      if (chunk[i] === 10) { // '\n'
        if (this._sawFirstCr) {
          split = i;
          break;
        } else {
          this._sawFirstCr = true;
        }
      } else {
        this._sawFirstCr = false;
      }
    }

    if (split === -1) {
      // still waiting for the \n\n
      // stash the chunk, and try again.
      this._rawHeader.push(chunk);
    } else {
      this._inBody = true;
      var h = chunk.slice(0, split);
      this._rawHeader.push(h);
      var header = Buffer.concat(this._rawHeader).toString();
      try {
        this.header = JSON.parse(header);
      } catch (er) {
        this.emit('error', new Error('invalid simple protocol data'));
        return;
      }
      // and let them know that we are done parsing the header.
      this.emit('header', this.header);

      // now, because we got some extra data, emit this first.
      this.push(chunk.slice(split));
    }
  } else {
    // from there on, just provide the data to our consumer as-is.
    this.push(chunk);
  }
  done();
};

// Usage:
var fs = require('fs');
const source = fs.createReadStream('input.txt');
const out = fs.createWriteStream('output.txt');

var parser = new SimpleProtocol();

// Now parser is a readable stream that will emit 'header'
// with the parsed header data.
source.pipe(parser).pipe(out);
parser.on('header',function(header){
  console.log(header);
});

虽然代码长了点,但是有注释,我就不解释了,注意最后如何使用的哦。看看运行的结果吧。


运行结果

流就介绍到这里了,如果还意犹未尽,可以看看node的源码node in github或者文档stream

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,686评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,668评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,160评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,736评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,847评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,043评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,129评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,872评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,318评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,645评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,777评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,470评论 4 333
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,126评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,861评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,095评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,589评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,687评论 2 351

推荐阅读更多精彩内容

  • stream 流是一个抽象接口,在 Node 里被不同的对象实现。例如 request to an HTTP se...
    明明三省阅读 3,398评论 1 10
  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,637评论 18 139
  • 流是Node中最重要的组件和模式之一。在社区里有一句格言说:让一切事务流动起来。这已经足够来描述在Node中流...
    宫若石阅读 547评论 0 0
  • 小人物 从一生下就没有光环, 生活到现在还是平凡。 羡慕别人的光彩, 一味模仿多无奈。 简单...
    生来彷徨ii阅读 267评论 0 4
  • 转身回眸,两两相望,只是一瞬,我看到他的眼睛,深邃而温柔,仿如许久未见的故人,没有早一步,也没有晚一步,刚巧,这就...
    臆想的行路梦阅读 303评论 2 2