1.readableStream.js
const readableStream = require('./readableStream');
readableStream.on('data', (data) => {
console.log(data);
readableStream.pause();
});
readableStream.js
const Readable = require('./Readable');
const readableIterator = (function (count) {
return {
next() {
count++;
if (count <= 5) {
return { done: false, value: count + '' };
} else {
return { done: true, value: null }
}
}
}
})(0)
const readableStream = new Readable({
read() {
let { done, value } = readableIterator.next();
if (done) {
this.push(null);
} else {
this.push(value);
}
}
});
module.exports = readableStream;
Readable.js
const Stream = require('./Stream');
var { inherits } = require('util');
function Readable(options) {
Stream.call(this, options);
this._readableState = { ended: false, buffer: [], flowing: false };
if (options.read) this._read = options.read;
}
inherits(Readable, Stream);
Readable.prototype.on = function (event, fn) {
Stream.prototype.on.call(this, event, fn);
if (event === 'data') {
this.resume();
}
}
Readable.prototype.resume = function () {
this._readableState.flowing = true;
while (this.read());
}
Readable.prototype.pause = function () {
this._readableState.flowing = false;
}
Readable.prototype.read = function () {
if (!this._readableState.ended && this._readableState.flowing) {
this._read();
}
let data = this._readableState.buffer.shift();
if (data) {
this.emit('data', data);
}
return data;
}
Readable.prototype.push = function (chunk) {
if (chunk === null) {
this._readableState.ended = true;
} else {
this._readableState.buffer.push(chunk);
}
}
module.exports = Readable;
Stream.js
const EventEmitter = require('events');
var { inherits } = require('util');
function Stream(options) {
this.options = options;
EventEmitter.call(this);
}
inherits(Stream, EventEmitter);
module.exports = Stream;
2.writableStream.js
let writableStream = require('./writableStream');
writableStream.write('1');
writableStream.write('2');
writableStream.write('3');
writableStream.write('4');
writableStream.write('5');
writableStream.end();
writableStream.js
const Writable = require('./Writable');
const writableStream = new Writable({
write(data, encoding, next) {
console.log(data.toString(encoding));
setTimeout(next, 1000);
}
});
module.exports = writableStream;
Writable.js
const Stream = require('./Stream');
var { inherits } = require('util');
function Writable(options) {
Stream.call(this, options);
this._writableState = {
ended: false,
writing: false,
buffer: []
};
if (options.write) this._write = options.write;
}
inherits(Writable, Stream);
Writable.prototype.write = function (chunk) {
if (this._writableState.ended) {
return;
}
if (this._writableState.writing) {
this._writableState.buffer.push(chunk);
} else {
this._writableState.writing = true;
this._write(chunk, 'utf8', () => this.next());
}
}
Writable.prototype.next = function () {
this._writableState.writing = false;
if (this._writableState.buffer.length > 0) {
this._write(this._writableState.buffer.shift(), 'utf8', () => this.next());
}
}
Writable.prototype.end = function () {
this._writableState.ended = true;
}
module.exports = Writable;
3.highWaterMark.js
//const { Writable } = require('stream');
const Writable = require('./Writable');
class WritableStream extends Writable {
_write = (data, encoding, next) => {
console.log(data.toString());
setTimeout(next, 1000);
}
}
const writableStream = new WritableStream({
highWaterMark: 1
});
writableStream.on('finish', () => {
console.log('finish');
});
let canWrite = writableStream.write('1');
console.log('canWrite:1', canWrite);
canWrite = writableStream.write('2');
console.log('canWrite:2', canWrite);
canWrite = writableStream.write('3');
console.log('canWrite:3', canWrite);
writableStream.once('drain', () => {
console.log('drain');
let canWrite = writableStream.write('4');
console.log('canWrite:4', canWrite);
canWrite = writableStream.write('5');
console.log('canWrite:5', canWrite);
canWrite = writableStream.write('6');
console.log('canWrite:6', canWrite);
});
/**
1
canWrite:1 false
canWrite:2 false
canWrite:3 false
2
3
drain
4
canWrite:4 false
canWrite:5 false
canWrite:6 false
5
6
*/
Writable.js
const Stream = require('./Stream');
var { inherits } = require('util');
function Writable(options) {
Stream.call(this, options);
this._writableState = {
ended: false,
writing: false,
buffer: [],
+ bufferSize: 0
};
if (options.write) this._write = options.write;
}
inherits(Writable, Stream);
Writable.prototype.write = function (chunk) {
if (this._writableState.ended) {
return;
}
chunk = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk, 'utf8');
this._writableState.bufferSize += chunk.length;
+ let canWrite = this.options.highWaterMark > this._writableState.bufferSize;
+ if (this._writableState.writing) {
+ this._writableState.buffer.push(chunk);
+ } else {
+ this._writableState.writing = true;
+ this._write(chunk, 'utf8', () => this.next());
+ }
+ return canWrite;
}
Writable.prototype.next = function () {
this._writableState.writing = false;
+ if (this._writableState.buffer.length > 0) {
+ let chunk = this._writableState.buffer.shift();
+ this._write(chunk, 'utf8', () => {
+ this._writableState.bufferSize -= chunk.length;
+ this.next();
+ })
+ } else {
+ this.emit('drain');
+ }
}
Writable.prototype.end = function () {
this._writableState.ended = true;
}
module.exports = Writable;
3.pipe.js
const readableStream = require('./readableStream');
const writableStream = require('./writableStream');
readableStream.pipe(writableStream);
Readable.js
const Stream = require('./Stream');
var { inherits } = require('util');
function Readable(options) {
Stream.call(this, options);
this._readableState = { ended: false, buffer: [], flowing: false };
if (options.read) this._read = options.read;
}
inherits(Readable, Stream);
Readable.prototype.on = function (event, fn) {
Stream.prototype.on.call(this, event, fn);
if (event === 'data') {
this.resume();
}
}
Readable.prototype.resume = function () {
this._readableState.flowing = true;
while (this.read());
}
Readable.prototype.pause = function () {
this._readableState.flowing = false;
}
Readable.prototype.read = function () {
if (!this._readableState.ended && this._readableState.flowing) {
this._read();
}
let data = this._readableState.buffer.shift();
if (data) {
this.emit('data', data);
}
return data;
}
Readable.prototype.push = function (chunk) {
if (chunk === null) {
this._readableState.ended = true;
} else {
this._readableState.buffer.push(chunk);
}
}
+Readable.prototype.pipe = function (dest) {
+ this.on('data', (chunk) => {
+ dest.write(chunk);
+ })
+ this.on('end', () => {
+ dest.end();
+ });
+}
module.exports = Readable;
4.duplexStream.js
const duplexStream = require('./duplexStream');
duplexStream.pipe(duplexStream);
duplexStream.js
const Duplex = require('./Duplex');
const readableIterator = (function (count) {
return {
next() {
count++;
if (count <= 5) {
return { done: false, value: count + '' };
} else {
return { done: true, value: null }
}
}
}
})(0)
const duplexStream = new Duplex({
read() {
let { done, value } = readableIterator.next();
if (done) {
this.push(null);
} else {
this.push(value);
}
},
write(data, encoding, next) {
console.log(data.toString(encoding));
setTimeout(next, 1000);
}
});
module.exports = duplexStream;
Duplex.js
const Readable = require('./Readable');
const Writable = require('./Writable');
var { inherits } = require('util');
inherits(Duplex, Readable);
const keys = Object.keys(Writable.prototype);
for (let v = 0; v < keys.length; v++) {
const method = keys[v];
if (!Duplex.prototype[method]) {
Duplex.prototype[method] = Writable.prototype[method];
}
}
function Duplex(options) {
Readable.call(this, options);
Writable.call(this, options);
}
module.exports = Duplex;
5.transformStream.js
const readableStream = require('./readableStream');
const transformStream = require('./transformStream');
const writableStream = require('./writableStream');
readableStream.pipe(transformStream).pipe(writableStream);
transformStream.js
const Transform = require('./Transform');
const transformStream = new Transform({
transform(buffer, encoding, next) {
let transformed = buffer.toString(encoding) + '$';
next(null, transformed);
}
});
module.exports = transformStream;
Transform.js
const Duplex = require('./Duplex');
var { inherits } = require('util');
inherits(Transform, Duplex);
function Transform(options) {
Duplex.call(this, options);
if (options.transform) this._transform = options.transform;
}
Transform.prototype._write = function (chunk, encoding, next) {
this._transform(chunk, encoding, (err, data) => {
if (data) {
this.push(data);
}
next(err);
});
}
Transform.prototype._read = function () {
}
module.exports = Transform;
Buffer/String
类型的值objectMode
标志,我们可以设置它让流可以接受任何JavaScript
对象6.objectMode.js
const { Readable, Writable } = require('stream');
const readableIterator = (function (count) {
return {
next() {
count++;
if (count <= 5) {
return { done: false, value: { id: count + '' } };
} else {
return { done: true, value: null }
}
}
}
})(0)
const readableStream = new Readable({
objectMode: true,
read() {
let { done, value } = readableIterator.next();
if (done) {
this.push(null);
} else {
this.push(value);
}
}
});
const writableStream = new Writable({
objectMode: true,
write(data, encoding, next) {
console.log(data);
setTimeout(next, 1000);
}
});
readableStream.pipe(writableStream);
const fs = require('fs');
const through2 = require('./through2');
const readableStream = require('./readableStream');
const writableStream = require('./writableStream');
const transformStream = through2(function (chunk, encoding, next) {
let transformed = chunk.toString(encoding) + '$';
next(null, transformed);
});
readableStream.pipe(transformStream).pipe(writableStream);
through2.js
const fs = require('fs');
const through2 = require('./through2');
const readableStream = require('./readableStream');
const writableStream = require('./writableStream');
const transformStream = through2(function (chunk, encoding, next) {
let transformed = chunk.toString(encoding) + '$';
next(null, transformed);
});
readableStream.pipe(transformStream).pipe(writableStream);
8.through2.js
const fs = require('fs');
const through2 = require('through2');
const fileStream = fs.createReadStream('data.txt', { highWaterMark: 10 });
const all = [];
fileStream.pipe(
through2.obj(function (chunk, encoding, next) {
this.push(JSON.parse(chunk))
next();
})).on('data', (data) => {
all.push(data)
}).on('end', () => {
console.log(all);
})
through2.js
const Transform = require('./Transform');
const { Transform } = require('stream');
function through2(transform) {
return new Transform({
transform
});
}
through2.obj = function (transform) {
return new Transform({
objectMode: true,
transform
});
}
module.exports = through2;
data.txt
{"id":1}
{"id":2}
{"id":3}