流(stream)在 Node.js 中是处理流数据的抽象接口(abstract interface)。
Node.js 提供了多种流对象。 例如, HTTP 请求 和 process.stdout
就都是流的实例。
流可以是可读的、可写的,或是可读写的。所有的流都是 EventEmitter
的实例。
Stream流分为三类(官方文档将最后一种分为两类,即四类)
- 可读数据流接口,用于对外提供数据。
- 可写数据流接口,用于写入数据。
- 双向数据流接口,用于读取和写入数据,比如Node的tcp sockets、zlib、crypto都部署了这个接口。
最后一类又分为(Duplex:可读写的流,Transform:在读写过程中可以修改和变换数据的Duplex流)
Writable 和Readable流都会将数据存储到内部的缓存(buffer)中。这些缓存可以通过相应的 writable._writableState.getBuffer() 或 readable._readableState.buffer来获取。缓存的大小取决于传递给流构造函数的 highWaterMark选项。 对于普通的流, highWaterMark选项指定了总共的字节数。对于工作在对象模式的流, highWaterMark 指定了对象的总数。当可读流的实现调用 stream.push(chunk)
方法时,数据被放到缓存中。如果流的消费者 没有调用 stream.read()
方法, 这些数据会始终存在于内部队列中,直到被消费。当内部可读缓存的大小达到 highWaterMark指定的阈值时,流会暂停从底层资源读取数据,直到当前 缓存的数据被消费 (也就是说, 流会在内部停止调用 readable._read() 来填充可读缓存)。可写流通过反复调用 writable.write(chunk)
方法将数据放到缓存。 当内部可写缓存的总大小小于 highWaterMark指定的阈值时, 调用 writable.write() 将返回true。 一旦内部缓存的大小达到或超过 highWaterMark,调用 writable.write()将返回 false 。stream API 的关键目标, 尤其对于 stream.pipe()
方法, 就是限制缓存数据大小,以达到可接受的程度。这样,对于读写速度不匹配的源头和目标,就不会超出可用的内存大小。
Duplex 和 Transform 都是可读写的。 在内部,它们都维护了 两个 相互独立的缓存用于读和写。 在维持了合理高效的数据流的同时,也使得对于读和写可以独立进行而互不影响。 例如, net.Socket就是 Duplex 的实例,它的可读端可以消费从套接字(socket)中接收的数据, 可写端则可以将数据写入到套接字。 由于数据写入到套接字中的速度可能比从套接字接收数据的速度快或者慢, 在读写两端使用独立缓存,并进行独立操作就显得很重要了。--NodeJS -v8.13 API
用流实现的简单的HTTP服务器:
const http = require('http');
const server = http.createServer((req, res) => {
// req 是 http.IncomingMessage 的实例,这是一个 Readable Stream
// res 是 http.ServerResponse 的实例,这是一个 Writable Stream
let body = '';
// 接收数据为 utf8 字符串,
// 如果没有设置字符编码,将接收到 Buffer 对象。
req.setEncoding('utf8');
// 如果监听了 'data' 事件,Readable streams 触发 'data' 事件
req.on('data', (chunk) => {
body += chunk;
});
// end 事件表明整个 body 都接收完毕了
req.on('end', () => {
try {
const data = JSON.parse(body);
// 发送一些信息给用户
res.write(typeof data);
res.end();
} catch (er) {
// json 数据解析失败
res.statusCode = 400;
return res.end(`error: ${er.message}`);
}
});
});
server.listen(1337);
// $ curl localhost:1337 -d "{}"
// object
// $ curl localhost:1337 -d "\"foo\""
// string
// $ curl localhost:1337 -d "not json"
// error: Unexpected token o in JSON at position 1
另一个实例:
var fs = require('fs');
var rs = fs.createReadStream('../resource/readable.txt','utf-8');
var ws = fs.createWriteStream('../resource/writable.txt','utf-8');
ws.write(new Buffer('使用Stream写入二进制数据...\n'),'utf-8');
ws.write(new Buffer('end'),'utf-8');
ws.end();
// rs.pipe(ws);
rs.on('data',function(chunk) {
console.log('DATA:');
console.log(chunk);//this is a readableStream 这是一个可读流
});
rs.on('end',function() {
console.log('end');//使用Stream写入二进制数据... end
});
rs.on('error',function(err) {
console.log('ERROR:' + err);
});
pipe
管道技术,可读流的方法:
rs.pipe(ws);
如此,readable.txt中的数据流入了writable.txt中;
参考:
http://javascript.ruanyifeng.com/nodejs/stream.html
http://www.liaoxuefeng.com/wiki/001434446689867b27157e896e74d51a89c25cc8b43bdb3000/001434501515527e6fce6d5ec4b4fd9b572122cd1ec8ded000#0
没有实际运用过,暂时不是很懂;后面再更新;