所有使用 Node.js API 创建的流对象都只能操作 strings 和 Buffer对象。但是,通过一些第三方流的实现,你依然能够处理其它类型的 JavaScript 值 (除了 null,它在流处理中有特殊意义)。 这些流被认为是工作在 “对象模式”(object mode)。 在创建流的实例时,可以通过 objectMode 选项使流的实例切换到对象模式。试图将已经存在的流切换到对象模式是不安全的。
flowing
和 paused
如果 Readable 切换到 flowing 模式,且没有消费者处理流中的数据,这些数据将会丢失。 比如, 调用了 readable.resume() 方法却没有监听 'data' 事件,或是取消了 'data' 事件监听,就有可能出现这种情况。
Writable 和 Readable 流都会将数据存储到内部的缓冲器(buffer)中。这些缓冲器可以 通过相应的 writable._writableState.getBuffer() 或 readable._readableState.buffer 来获取。
缓冲器的大小取决于传递给流构造函数的 highWaterMark 选项。 对于普通的流, highWaterMark 选项指定了总共的字节数。对于工作在对象模式的流, highWaterMark 指定了对象的总数。
当可读流的实现调用stream.push(chunk)
方法时,数据被放到缓冲器中。如果流的消费者没有调用stream.read()
方法, 这些数据会始终存在于内部队列中,直到被消费。
当内部可读缓冲器的大小达到 highWaterMark 指定的阈值时,流会暂停从底层资源读取数据,直到当前 缓冲器的数据被消费 (也就是说, 流会在内部停止调用 readable._read() 来填充可读缓冲器)。
可写流通过反复调用 writable.write(chunk) 方法将数据放到缓冲器。 当内部可写缓冲器的总大小小于 highWaterMark 指定的阈值时, 调用 writable.write() 将返回true。 一旦内部缓冲器的大小达到或超过 highWaterMark ,调用 writable.write() 将返回 false 。
stream API 的关键目标, 尤其对于 stream.pipe() 方法, 就是限制缓冲器数据大小,以达到可接受的程度。这样,对于读写速度不匹配的源头和目标,就不会超出可用的内存大小。
Duplex 和 Transform 都是可读写的。 在内部,它们都维护了 两个 相互独立的缓冲器用于读和写。 在维持了合理高效的数据流的同时,也使得对于读和写可以独立进行而互不影响。
在任意时刻,任意可读流应确切处于下面三种状态之一:
readable._readableState.flowing = true
若 readable._readableState.flowing 为 null,由于不存在数据消费者,可读流将不会产生数据。 在这个状态下,监听 'data' 事件,调用 readable.pipe() 方法,或者调用 readable.resume() 方法, readable._readableState.flowing 的值将会变为 true 。这时,随着数据生成,可读流开始频繁触发事件。
调用 readable.pause() 方法, readable.unpipe() 方法, 或者接收 “背压”(back pressure), 将导致 readable._readableState.flowing 值变为 false。 这将暂停事件流,但 不会 暂停数据生成。 在这种情况下,为 'data' 事件设置监听函数不会导致 readable._readableState.flowing 变为 true。
当 readable._readableState.flowing 值为 false 时, 数据可能堆积到流的内部缓存中。
'readable' 事件将在流中有数据可供读取时触发。在某些情况下,为 'readable' 事件添加回调将会导致一些数据被读取到内部缓存中。
const readable = getReadableStreamSomehow();
readable.on('readable', () => {
// 有一些数据可读了
});
let fs =require('fs');
let rs = fs.createReadStream('./1.txt',{
start:3,
end:8,
encoding:'utf8',
highWaterMark:3
});
rs.on('readable',function () {
console.log('readable');
console.log('rs._readableState.buffer.length',rs._readableState.length);
let d = rs.read(1);
console.log('rs._readableState.buffer.length',rs._readableState.length);
console.log(d);
setTimeout(()=>{
console.log('rs._readableState.buffer.length',rs._readableState.length);
},500)
});
let fs = require('fs');
let EventEmitter = require('events');
let util = require('util');
util.inherits(LineReader, EventEmitter)
fs.readFile('./1.txt',function (err,data) {
console.log(data);
})
function LineReader(path) {
EventEmitter.call(this);
this._rs = fs.createReadStream(path);
this.RETURN = 0x0D;// \r 13
this.NEW_LINE = 0x0A;// \n 10
this.on('newListener', function (type, listener) {
if (type == 'newLine') {
let buffer = [];
this._rs.on('readable', () => {
let bytes;
while (null != (bytes = this._rs.read(1))) {
let ch = bytes[0];
switch (ch) {
case this.RETURN:
this.emit('newLine', Buffer.from(buffer));
buffer.length = 0;
let nByte = this._rs.read(1);
if (nByte && nByte[0] != this.NEW_LINE) {
buffer.push(nByte[0]);
}
break;
case this.NEW_LINE:
this.emit('newLine', Buffer.from(buffer));
buffer.length = 0;
break;
default:
buffer.push(bytes[0]);
break;
}
}
});
this._rs.on('end', () => {
if (buffer.length > 0) {
this.emit('newLine', Buffer.from(buffer));
buffer.length = 0;
this.emit('end');
}
})
}
});
}
var lineReader = new LineReader('./1.txt');
lineReader.on('newLine', function (data) {
console.log(data.toString());
}).on('end', function () {
console.log("end");
})