曾经学C++的STL中的IOStream,输入输出流,看个代码
using namespace std;
cout<<x;
眼角湿润了,这是大学的记忆啊,大学时我们幸苦的学习C++,为了指针的释放和指针的指针搞的焦头烂额,更何况记忆中不仅有代码,还有我的青春和她。算了,搬砖的腰又酸了,还是回到现实,看看node.js中的流吧。
啥是流啊。。。
流顾名思义就是流水的意思,stream英文也是溪流。如果把二进制数据从一个地方很源源不断的送到另一个地方,像水流一样的功能,就叫流。
A stream is an abstract interface implemented by various objects in Node.js. For example a request to an HTTP server is a stream, as is process.stdout
.Streams are readable, writable, or both. All streams are instances of EventEmitter.
愿意啃文档的兄弟可以看stream
stream的例子
因为node.js非常善于处理数据(这里数据的可能是服务器的网页,或者返回的json数据,或者任何东西),所以我们来看看一些例子,说明stream对服务器的重要作用。node.js里面很多类都是继承了流的接口。
创建一个echo服务
echo是回声的意思,我们对着大山喊话,回听到自己喊的声音,这边我们做个服务器干这个无聊的事情吧。
var http = require('http');
http.createServer(function(request, response) {
response.writeHead(200);
request.pipe(response);
}).listen(8080);
运行以后调用curl -d 'hello' http://localhost:8080
。简直不敢相信服务器这么简单就写好了,这就是node.js的魅力吧。
上面的pipe就是管道的意思,和linux的命令行|
一个意思,大家应该熟悉命令行的管道吧,概念都是相通的。大家应该知道这个
就是基于stream来做的。
上传文件
我们在看一个上传文件的例子。
var http = require('http');
var fs = require('fs');
http.createServer(function(request, response) {
var newFile = fs.createWriteStream("copy" + new Date() + ".md");
var fileBytes = request.headers['content-length'];
var uploadedBytes = 0;
response.write("server receive request\n");
request.pipe(newFile);
request.on('readable', function() {
var chunk = null;
response.write("progress: start\n");
while (null !== (chunk = request.read())) {
uploadedBytes += chunk.length;
var progress = (uploadedBytes / fileBytes) * 100;
response.write("progress: " + parseInt(progress, 10) + "%\n");
}
});
request.on('end', function() {
response.end('uploaded!\n');
});
}).listen(8080);
//curl --upload-file uploadFiles.js http://localhost:8080
这里的看点是
- 如何返回进度的:
request.on('readable', function() {
,有没有觉得这种异步I/O方式的优点。 - 如何保存文件
request.pipe(newFile);
,是不是很方便。
流的实现
上面我们看到流的结构的简单易用,现在我们看看node.js的流是怎么设计的。
To implement any sort of stream, the pattern is the same:
- Extend the appropriate parent class in your own subclass. (Theutil.inherits() method is particularly helpful for this.)
- Call the appropriate parent class constructor in your constructor, to be sure that the internal mechanisms are set up properly.
- Implement one or more specific methods, as detailed below.
The class to extend and the method(s) to implement depend on the sort of stream class you are writing:
翻译一下流实现的过程:
- 继承合适的class
- 不要忘记调用基类构造函数
- 重写基类方法
数数的可读流
看一个例子就清楚了,下面这段程序就是数数,1数到1000000。
const Readable = require('stream').Readable;
const util = require('util');
util.inherits(Counter, Readable);
function Counter(opt) {
Readable.call(this, opt);
this._max = 1000000;
this._index = 1;
}
Counter.prototype._read = function() {
var i = this._index++;
if (i > this._max)
this.push(null);
else {
var str = '' + i;
var buf = new Buffer(str, 'ascii');
this.push(buf);
}
};
///////////////////////////////////////////////////////////
//test
var fs = require('fs');
var newFile = fs.createWriteStream("test_counter.txt");
var myCounter = new Counter();
myCounter.pipe(newFile);
上面的Counter完成了三部曲,测试程序把这个conter输出到文件。如果我们想自己实现一个流,这样就可以了。如果上面例子太简单了,我们看一下复杂点的例子,比如transform
啥是transform流
Transform streams are Duplex streams where the output is in some way computed from the input. They implement both the Readable and Writable interfaces.
Examples of Transform streams include:
zlib streams
crypto streams
翻译一下就是用来把输入流变化一下,再输出。比如压缩,加密等。
const gzip = zlib.createGzip();
const fs = require('fs');
const inp = fs.createReadStream('input.txt');
const out = fs.createWriteStream('input.txt.gz');
inp.pipe(gzip).pipe(out);
实现transform流
这个例子解析一个数据,产生一个readable stream,这个stream是经过变换的哦。
-
解析的格式:有两个换行符的数据流,换行符前面是头,后面是内容
- 解析的过程中发出一个事件header,用来显示头部信息
- 最后去掉头部,保留内容信息
现在来看一下代码吧。
const util = require('util');
const Transform = require('stream').Transform;
util.inherits(SimpleProtocol, Transform);
function SimpleProtocol(options) {
if (!(this instanceof SimpleProtocol))
return new SimpleProtocol(options);
Transform.call(this, options);
this._inBody = false;
this._sawFirstCr = false;
this._rawHeader = [];
this.header = null;
}
SimpleProtocol.prototype._transform = function(chunk, encoding, done) {
if (!this._inBody) {
// check if the chunk has a \n\n
var split = -1;
for (var i = 0; i < chunk.length; i++) {
if (chunk[i] === 10) { // '\n'
if (this._sawFirstCr) {
split = i;
break;
} else {
this._sawFirstCr = true;
}
} else {
this._sawFirstCr = false;
}
}
if (split === -1) {
// still waiting for the \n\n
// stash the chunk, and try again.
this._rawHeader.push(chunk);
} else {
this._inBody = true;
var h = chunk.slice(0, split);
this._rawHeader.push(h);
var header = Buffer.concat(this._rawHeader).toString();
try {
this.header = JSON.parse(header);
} catch (er) {
this.emit('error', new Error('invalid simple protocol data'));
return;
}
// and let them know that we are done parsing the header.
this.emit('header', this.header);
// now, because we got some extra data, emit this first.
this.push(chunk.slice(split));
}
} else {
// from there on, just provide the data to our consumer as-is.
this.push(chunk);
}
done();
};
// Usage:
var fs = require('fs');
const source = fs.createReadStream('input.txt');
const out = fs.createWriteStream('output.txt');
var parser = new SimpleProtocol();
// Now parser is a readable stream that will emit 'header'
// with the parsed header data.
source.pipe(parser).pipe(out);
parser.on('header',function(header){
console.log(header);
});
虽然代码长了点,但是有注释,我就不解释了,注意最后如何使用的哦。看看运行的结果吧。
流就介绍到这里了,如果还意犹未尽,可以看看node的源码node in github或者文档stream