'use strict';require('../common');const assert = require('assert');
const stream = require('stream');const Readable = stream.Readable;const Writable = stream.Writable;
const totalChunks = 100;const chunkSize = 99;const expectTotalData = totalChunks * chunkSize;let expectEndingData = expectTotalData;
const r = new Readable({ highWaterMark: 1000 });let chunks = totalChunks;r._read = function(n) { console.log('_read called', chunks); if (!(chunks % 2)) setImmediate(push); else if (!(chunks % 3)) process.nextTick(push); else push();};
let totalPushed = 0;function push() { const chunk = chunks-- > 0 ? Buffer.alloc(chunkSize, 'x') : null; if (chunk) { totalPushed += chunk.length; } console.log('chunks', chunks); r.push(chunk);}
read100();
function read100() { readn(100, onData);}
function readn(n, then) { console.error(`read ${n}`); expectEndingData -= n; (function read() { const c = r.read(n); console.error('c', c); if (!c) r.once('readable', read); else { assert.strictEqual(c.length, n); assert(!r.readableFlowing); then(); } })();}
function onData() { expectEndingData -= 100; console.error('onData'); let seen = 0; r.on('data', function od(c) { seen += c.length; if (seen >= 100) { r.removeListener('data', od); r.pause(); if (seen > 100) { const diff = seen - 100; r.unshift(c.slice(c.length - diff)); console.error('seen too much', seen, diff); }
setImmediate(pipeLittle); } });}
function pipeLittle() { expectEndingData -= 200; console.error('pipe a little'); const w = new Writable(); let written = 0; w.on('finish', () => { assert.strictEqual(written, 200); setImmediate(read1234); }); w._write = function(chunk, encoding, cb) { written += chunk.length; if (written >= 200) { r.unpipe(w); w.end(); cb(); if (written > 200) { const diff = written - 200; written -= diff; r.unshift(chunk.slice(chunk.length - diff)); } } else { setImmediate(cb); } }; r.pipe(w);}
function read1234() { readn(1234, resumePause);}
function resumePause() { console.error('resumePause'); r.resume(); r.pause(); r.resume(); r.pause(); r.resume(); r.pause(); r.resume(); r.pause(); r.resume(); r.pause(); setImmediate(pipe);}
function pipe() { console.error('pipe the rest'); const w = new Writable(); let written = 0; w._write = function(chunk, encoding, cb) { written += chunk.length; cb(); }; w.on('finish', () => { console.error('written', written, totalPushed); assert.strictEqual(written, expectEndingData); assert.strictEqual(totalPushed, expectTotalData); console.log('ok'); }); r.pipe(w);}