稳定性: 2 - 不稳定
流是一个抽象接口,在 Node 里被不同的对象实现。例如request to an HTTPserver 是流,stdout 是流。流是可读,可写,或者可读写。所有的流是 EventEmitter 的实例。
你可以通过 require('stream')
加载 Stream 基类。其中包括了 Readable
流、Writable
流、Duplex
流和 Transform
流的基类。
这个文档分为 3 个章节。第一个章节解释了在你的程序中使用流时候需要了解的部分。如果你不用实现流式 API,可以只看这个章节。
如果你想实现你自己的流,第二个章节解释了这部分 API。这些 API 让你的实现更加简单。
第三个部分深入的解释了流是如何工作的,包括一些内部机制和函数,这些内容不要改动,除非你明确知道你要做什么。
流可以是可读(Readable),可写(Writable),或者兼具两者(Duplex,双工)的。
所有的流都是事件分发器(EventEmitters),但是也有自己的方法和属性,这取决于他它们是可读(Readable),可写(Writable),或者兼具两者(Duplex,双工)的。
如果流式可读写的,则它实现了下面的所有方法和事件。因此,这个章节 API 完全阐述了Duplex 或 Transform 流,即便他们的实现有所不同。
没有必要为了消费流而在你的程序里实现流的接口。如果你正在你的程序里实现流接口,请同时参考下面的API for Stream Implementors。
基本所有的 Node 程序,无论多简单,都会使用到流。这有一个使用流的例子。
javascript
var http = require('http');
var server = http.createServer(function (req, res) {
// req is an http.IncomingMessage, which is 可读流(Readable stream)
// res is an http.ServerResponse, which is a Writable Stream
var body = '';
// we want to get the data as utf8 strings
// If you don't set an encoding, then you'll get Buffer objects
req.setEncoding('utf8');
// 可读流(Readable stream) emit 'data' 事件 once a 监听器(listener) is added
req.on('data', function (chunk) {
body += chunk;
});
// the end 事件 tells you that you have entire body
req.on('end', function () {
try {
var data = JSON.parse(body);
} catch (er) {
// uh oh! bad json!
res.statusCode = 400;
return res.end('error: ' + er.message);
}
// write back something interesting to the user:
res.write(typeof data);
res.end();
});
});
server.listen(1337);
// $ curl localhost:1337 -d '{}'
// object
// $ curl localhost:1337 -d '"foo"'
// string
// $ curl localhost:1337 -d 'not json'
// error: Unexpected token o
可读流(Readable stream)接口是对你正在读取的数据的来源的抽象。换句话说,数据来来自
可读流(Readable stream)不会分发数据,直到你表明准备就绪。
可读流(Readable stream) 有2种模式: 流动模式(flowing mode) 和 暂停模式(paused mode). 流动模式(flowing mode)时,尽快的从底层系统读取数据并提供给你的程序。 暂停模式(paused mode)时, 你必须明确的调用 stream.read()
来读取数据。 暂停模式(paused mode) 是默认模式。
注意: 如果没有绑定数据处理函数,并且没有 pipe()
目标,流会切换到流动模式(flowing mode),并且数据会丢失。
可以通过下面几个方法,将流切换到流动模式(flowing mode)。
可以通过以下方法来切换到暂停模式(paused mode):
pause()
方法.'data'
事件][]处理函数, 调用 unpipe()
方法移除所有的 导流(pipe) 目标。注意, 为了向后兼容考虑, 移除 'data' 事件监听器并不会自动暂停流。同样的,当有导流目标时,调用 pause() 并不能保证流在那些目标排空后,请求更多数据时保持暂停状态。
可读流(Readable stream)例子包括:
当一个数据块可以从流中读出,将会触发'readable'
事件.`
某些情况下, 如果没有准备好,监听一个 'readable'
事件将会导致一些数据从底层系统读取到内部缓存。
javascript
var readble = getReadableStreamSomehow();
readable.on('readable', function() {
// there is some data to read now
});
一旦内部缓存排空,一旦有更多数据将会再次触发 readable
事件。
chunk
{Buffer | String} 数据块绑定一个 data
事件的监听器(listener)到一个未明确暂停的流,会将流切换到流动模式。数据会尽额能的传递。
如果你像尽快的从流中获取数据,这是最快的方法。
javascript
var readable = getReadableStreamSomehow();
readable.on('data', function(chunk) {
console.log('got %d bytes of data', chunk.length);
});
如果没有更多的可读数据,将会触发这个事件。
注意,除非数据已经被完全消费, the end
事件才会触发。 可以通过切换到流动模式(flowing mode)来实现,或者通过调用重复调用 read()
获取数据,直到结束。
javascript
var readable = getReadableStreamSomehow();
readable.on('data', function(chunk) {
console.log('got %d bytes of data', chunk.length);
});
readable.on('end', function() {
console.log('there will be no more data.');
});
当底层资源(例如源头的文件描述符)关闭时触发。并不是所有流都会触发这个事件。
当接收数据时发生错误触发。
size
{Number} 可选参数, 需要读入的数据量read()
方法从内部缓存中拉取数据。如果没有可用数据,将会返回null
如果传了 size
参数,将会返回相当字节的数据。如果size
不可用,将会返回 null
如果你没有指定 size
参数。将会返回内部缓存的所有数据。
这个方法仅能再暂停模式(paused mode)里调用. 流动模式(flowing mode)下这个方法会被自动调用直到内存缓存排空。
javascript
var readable = getReadableStreamSomehow();
readable.on('readable', function() {
var chunk;
while (null !== (chunk = readable.read())) {
console.log('got %d bytes of data', chunk.length);
}
});
如果这个方法返回一个数据块, 它同时也会触发['data'
事件][].
encoding
{String} 要使用的编码.this
调用此函数会使得流返回指定编码的字符串,而不是 Buffer 对象。例如,如果你调用readable.setEncoding('utf8')
,输出数据将会是UTF-8 编码,并且返回字符串。如果你调用 readable.setEncoding('hex')
,将会返回2进制编码的数据。
该方法能正确处理多字节字符。如果不想这么做,仅简单的直接拉取缓存并调buf.toString(encoding)
,可能会导致字节错位。因此,如果你想以字符串读取数据,请使用这个方法。
javascript
var readable = getReadableStreamSomehow();
readable.setEncoding('utf8');
readable.on('data', function(chunk) {
assert.equal(typeof chunk, 'string');
console.log('got %d characters of string data', chunk.length);
});
this
这个方法让可读流(Readable stream)继续触发 data
事件.
这个方法会将流切换到流动模式(flowing mode). 如果你不想从流中消费数据,而想得到end
事件,可以调用 readable.resume()
来打开数据流。
javascript
var readable = getReadableStreamSomehow();
readable.resume();
readable.on('end', function(chunk) {
console.log('got to the end, but did not read anything');
});
this
这个方法会使得流动模式(flowing mode)的流停止触发 data
事件, 切换到流动模式(flowing mode). 并让后续可用数据留在内部缓冲区中。
javascript
var readable = getReadableStreamSomehow();
readable.on('data', function(chunk) {
console.log('got %d bytes of data', chunk.length);
readable.pause();
console.log('there will be no more data for 1 second');
setTimeout(function() {
console.log('now data will start flowing again');
readable.resume();
}, 1000);
});
Boolean
这个方法返回readable
是否被客户端代码 明确的暂停(调用 readable.pause()
)。
var readable = new stream.Readable
readable.isPaused() // === false
readable.pause()
readable.isPaused() // === true
readable.resume()
readable.isPaused() // === false
destination
{Writable Stream} 写入数据的目标options
{Object} 导流(pipe) 选项end
{Boolean} 读取到结束符时,结束写入者。默认 = true
这个方法从可读流(Readable stream)拉取所有数据, 并将数据写入到提供的目标中。自动管理流量,这样目标不会快速的可读流(Readable stream)淹没。
可以导流到多个目标。
javascript
var readable = getReadableStreamSomehow();
var writable = fs.createWriteStream('file.txt');
// All the data from readable goes into 'file.txt'
readable.pipe(writable);
这个函数返回目标流, 因此你可以建立导流链:
javascript
var r = fs.createReadStream('file.txt');
var z = zlib.createGzip();
var w = fs.createWriteStream('file.txt.gz');
r.pipe(z).pipe(w);
例如, 模拟 Unix 的 cat
命令:
javascript
process.stdin.pipe(process.stdout);
默认情况下,当源数据流触发 end
的时候调用end()
,所以 destination
不可再写。传 { end:false }
作为options
,可以保持目标流打开状态。
这会让 writer
保持打开状态,可以在最后写入"Goodbye" 。
javascript
reader.pipe(writer, { end: false });
reader.on('end', function() {
writer.end('Goodbye\n');
});
注意 process.stderr
和 process.stdout
直到进程结束才会关闭,无论是否指定
destination
{Writable Stream} 可选,指定解除导流的流这个方法会解除之前调用 pipe()
设置的钩子( pipe()
)。
如果没有指定 destination
,所有的 导流(pipe) 都会被移除。
如果指定了 destination
,但是没有建立如果没有指定 destination
,则什么事情都不会发生。
javascript
var readable = getReadableStreamSomehow();
var writable = fs.createWriteStream('file.txt');
// All the data from readable goes into 'file.txt',
// but only for the first second
readable.pipe(writable);
setTimeout(function() {
console.log('stop writing to file.txt');
readable.unpipe(writable);
console.log('manually close the file stream');
writable.end();
}, 1000);
chunk
{Buffer | String} 数据块插入到读队列中这个方法很有用,当一个流正被一个解析器消费,解析器可能需要将某些刚拉取出的数据“逆消费”,返回到原来的源,以便流能将它传递给其它消费者。
如果你在程序中必须经常调用 stream.unshift(chunk)
,那你可以考虑实现Transform来替换(参见下文API for Stream Implementors)。
javascript
// Pull off a header delimited by \n\n
// use unshift() if we get too much
// Call the callback with (error, header, stream)
var StringDecoder = require('string_decoder').StringDecoder;
function parseHeader(stream, callback) {
stream.on('error', callback);
stream.on('readable', onReadable);
var decoder = new StringDecoder('utf8');
var header = '';
function onReadable() {
var chunk;
while (null !== (chunk = stream.read())) {
var str = decoder.write(chunk);
if (str.match(/\n\n/)) {
// found the header boundary
var split = str.split(/\n\n/);
header += split.shift();
var remaining = split.join('\n\n');
var buf = new Buffer(remaining, 'utf8');
if (buf.length)
stream.unshift(buf);
stream.removeListener('error', callback);
stream.removeListener('readable', onReadable);
// now the body of the message can be read from the stream.
callback(null, header, stream);
} else {
// still reading the header.
header += str;
}
}
}
}
stream
{Stream} 一个旧式的可读流(Readable stream)v0.10 版本之前的 Node 流并未实现现在所有流的API(更多信息详见下文“兼容性”章节)。
如果你使用的是旧的 Node 库,它触发 'data'
事件,并拥有仅做查询用的pause()
方法,那么你能使用wrap()
方法来创建一个Readable 流来使用旧版本的流,作为数据源。
你应该很少需要用到这个函数,但它会留下方便和旧版本的 Node 程序和库交互。
例如:
javascript
var OldReader = require('./old-api-module.js').OldReader;
var oreader = new OldReader;
var Readable = require('stream').Readable;
var myReader = new Readable().wrap(oreader);
myReader.on('readable', function() {
myReader.read(); // etc.
});
可写流(Writable stream )接口是你正把数据写到一个目标的抽象。
可写流(Writable stream )的例子包括:
chunk
{String | Buffer} 准备写的数据encoding
{String} 编码方式(如果chunk
是字符串)callback
{Function} 数据块写入后的回调这个方法向底层系统写入数据,并在数据处理完毕后调用所给的回调。
返回值表示你是否应该继续立即写入。如果数据要缓存在内部,将会返回false
。否则返回 true
。
返回值仅供参考。即使返回 false
,你也可能继续写。但是写会缓存在内存里,所以不要做的太过分。最好的办法是等待drain
事件后,再写入数据。
如果调用 writable.write(chunk)
返回 false, drain
事件会告诉你什么时候将更多的数据写入到流中。
javascript
// Write the data to the supplied 可写流(Writable stream ) 1MM times.
// Be attentive to back-pressure.
function writeOneMillionTimes(writer, data, encoding, callback) {
var i = 1000000;
write();
function write() {
var ok = true;
←上一篇: Node.js 加密
→下一篇:Node.js 网络