Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Revert "stream: simpler and faster Readable async iterator"
This reverts commit 4bb4007.
  • Loading branch information
richardlau committed Sep 9, 2020
commit 0f94c6b4e4dd4b3f0d30d40b2063db6a0ccf0e0d
38 changes: 0 additions & 38 deletions benchmark/streams/readable-async-iterator.js

This file was deleted.

66 changes: 5 additions & 61 deletions lib/_stream_readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ const {
NumberIsNaN,
ObjectDefineProperties,
ObjectSetPrototypeOf,
Promise,
Set,
SymbolAsyncIterator,
Symbol
Expand Down Expand Up @@ -60,11 +59,11 @@ const kPaused = Symbol('kPaused');

// Lazy loaded to improve the startup performance.
let StringDecoder;
let createReadableStreamAsyncIterator;
let from;

ObjectSetPrototypeOf(Readable.prototype, Stream.prototype);
ObjectSetPrototypeOf(Readable, Stream);
function nop() {}

const { errorOrDestroy } = destroyImpl;

Expand Down Expand Up @@ -1076,68 +1075,13 @@ Readable.prototype.wrap = function(stream) {
};

Readable.prototype[SymbolAsyncIterator] = function() {
let stream = this;

if (typeof stream.read !== 'function') {
// v1 stream
const src = stream;
stream = new Readable({
objectMode: true,
destroy(err, callback) {
destroyImpl.destroyer(src, err);
callback();
}
}).wrap(src);
if (createReadableStreamAsyncIterator === undefined) {
createReadableStreamAsyncIterator =
require('internal/streams/async_iterator');
}

const iter = createAsyncIterator(stream);
iter.stream = stream;
return iter;
return createReadableStreamAsyncIterator(this);
};

async function* createAsyncIterator(stream) {
let callback = nop;

function next(resolve) {
if (this === stream) {
callback();
callback = nop;
} else {
callback = resolve;
}
}

stream
.on('readable', next)
.on('error', next)
.on('end', next)
.on('close', next);

try {
const state = stream._readableState;
while (true) {
const chunk = stream.read();
if (chunk !== null) {
yield chunk;
} else if (state.errored) {
throw state.errored;
} else if (state.ended) {
break;
} else if (state.closed) {
// TODO(ronag): ERR_PREMATURE_CLOSE?
break;
} else {
await new Promise(next);
}
}
} catch (err) {
destroyImpl.destroyer(stream, err);
throw err;
} finally {
destroyImpl.destroyer(stream, null);
}
}

// Making it explicit these properties are not enumerable
// because otherwise some prototype manipulation in
// userland will fail.
Expand Down
221 changes: 221 additions & 0 deletions lib/internal/streams/async_iterator.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,221 @@
'use strict';

const {
ObjectCreate,
ObjectGetPrototypeOf,
ObjectSetPrototypeOf,
Promise,
PromiseReject,
PromiseResolve,
Symbol,
} = primordials;

const finished = require('internal/streams/end-of-stream');
const destroyImpl = require('internal/streams/destroy');

const kLastResolve = Symbol('lastResolve');
const kLastReject = Symbol('lastReject');
const kError = Symbol('error');
const kEnded = Symbol('ended');
const kLastPromise = Symbol('lastPromise');
const kHandlePromise = Symbol('handlePromise');
const kStream = Symbol('stream');

let Readable;

function createIterResult(value, done) {
return { value, done };
}

function readAndResolve(iter) {
const resolve = iter[kLastResolve];
if (resolve !== null) {
const data = iter[kStream].read();
// We defer if data is null. We can be expecting either 'end' or 'error'.
if (data !== null) {
iter[kLastPromise] = null;
iter[kLastResolve] = null;
iter[kLastReject] = null;
resolve(createIterResult(data, false));
}
}
}

function onReadable(iter) {
// We wait for the next tick, because it might
// emit an error with `process.nextTick()`.
process.nextTick(readAndResolve, iter);
}

function wrapForNext(lastPromise, iter) {
return (resolve, reject) => {
lastPromise.then(() => {
if (iter[kEnded]) {
resolve(createIterResult(undefined, true));
return;
}

iter[kHandlePromise](resolve, reject);
}, reject);
};
}

const AsyncIteratorPrototype = ObjectGetPrototypeOf(
ObjectGetPrototypeOf(async function* () {}).prototype);

function finish(self, err) {
return new Promise((resolve, reject) => {
const stream = self[kStream];

finished(stream, (err) => {
if (err && err.code !== 'ERR_STREAM_PREMATURE_CLOSE') {
reject(err);
} else {
resolve(createIterResult(undefined, true));
}
});
destroyImpl.destroyer(stream, err);
});
}

const ReadableStreamAsyncIteratorPrototype = ObjectSetPrototypeOf({
get stream() {
return this[kStream];
},

next() {
// If we have detected an error in the meanwhile
// reject straight away.
const error = this[kError];
if (error !== null) {
return PromiseReject(error);
}

if (this[kEnded]) {
return PromiseResolve(createIterResult(undefined, true));
}

if (this[kStream].destroyed) {
return new Promise((resolve, reject) => {
if (this[kError]) {
reject(this[kError]);
} else if (this[kEnded]) {
resolve(createIterResult(undefined, true));
} else {
finished(this[kStream], (err) => {
if (err && err.code !== 'ERR_STREAM_PREMATURE_CLOSE') {
reject(err);
} else {
resolve(createIterResult(undefined, true));
}
});
}
});
}

// If we have multiple next() calls we will wait for the previous Promise to
// finish. This logic is optimized to support for await loops, where next()
// is only called once at a time.
const lastPromise = this[kLastPromise];
let promise;

if (lastPromise) {
promise = new Promise(wrapForNext(lastPromise, this));
} else {
// Fast path needed to support multiple this.push()
// without triggering the next() queue.
const data = this[kStream].read();
if (data !== null) {
return PromiseResolve(createIterResult(data, false));
}

promise = new Promise(this[kHandlePromise]);
}

this[kLastPromise] = promise;

return promise;
},

return() {
return finish(this);
},

throw(err) {
return finish(this, err);
},
}, AsyncIteratorPrototype);

const createReadableStreamAsyncIterator = (stream) => {
if (typeof stream.read !== 'function') {
// v1 stream

if (!Readable) {
Readable = require('_stream_readable');
}

const src = stream;
stream = new Readable({ objectMode: true }).wrap(src);
finished(stream, (err) => destroyImpl.destroyer(src, err));
}

const iterator = ObjectCreate(ReadableStreamAsyncIteratorPrototype, {
[kStream]: { value: stream, writable: true },
[kLastResolve]: { value: null, writable: true },
[kLastReject]: { value: null, writable: true },
[kError]: { value: null, writable: true },
[kEnded]: {
value: stream.readableEnded || stream._readableState.endEmitted,
writable: true
},
// The function passed to new Promise is cached so we avoid allocating a new
// closure at every run.
[kHandlePromise]: {
value: (resolve, reject) => {
const data = iterator[kStream].read();
if (data) {
iterator[kLastPromise] = null;
iterator[kLastResolve] = null;
iterator[kLastReject] = null;
resolve(createIterResult(data, false));
} else {
iterator[kLastResolve] = resolve;
iterator[kLastReject] = reject;
}
},
writable: true,
},
});
iterator[kLastPromise] = null;

finished(stream, { writable: false }, (err) => {
if (err && err.code !== 'ERR_STREAM_PREMATURE_CLOSE') {
const reject = iterator[kLastReject];
// Reject if we are waiting for data in the Promise returned by next() and
// store the error.
if (reject !== null) {
iterator[kLastPromise] = null;
iterator[kLastResolve] = null;
iterator[kLastReject] = null;
reject(err);
}
iterator[kError] = err;
return;
}

const resolve = iterator[kLastResolve];
if (resolve !== null) {
iterator[kLastPromise] = null;
iterator[kLastResolve] = null;
iterator[kLastReject] = null;
resolve(createIterResult(undefined, true));
}
iterator[kEnded] = true;
});

stream.on('readable', onReadable.bind(null, iterator));

return iterator;
};

module.exports = createReadableStreamAsyncIterator;
10 changes: 5 additions & 5 deletions lib/internal/streams/pipeline.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ const {

let EE;
let PassThrough;
let Readable;
let createReadableStreamAsyncIterator;

function destroyer(stream, reading, writing, callback) {
callback = once(callback);
Expand Down Expand Up @@ -113,11 +113,11 @@ function makeAsyncIterable(val) {
}

async function* fromReadable(val) {
if (!Readable) {
Readable = require('_stream_readable');
if (!createReadableStreamAsyncIterator) {
createReadableStreamAsyncIterator =
require('internal/streams/async_iterator');
}

yield* Readable.prototype[SymbolAsyncIterator].call(val);
yield* createReadableStreamAsyncIterator(val);
}

async function pump(iterable, writable, finish) {
Expand Down
1 change: 1 addition & 0 deletions node.gyp
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,7 @@
'lib/internal/worker/js_transferable.js',
'lib/internal/watchdog.js',
'lib/internal/streams/lazy_transform.js',
'lib/internal/streams/async_iterator.js',
'lib/internal/streams/buffer_list.js',
'lib/internal/streams/duplexpair.js',
'lib/internal/streams/from.js',
Expand Down
1 change: 0 additions & 1 deletion test/parallel/test-readline-async-iterators-destroy.js
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ async function testMutualDestroy() {
break;
}
assert.deepStrictEqual(iteratedLines, expectedLines);
break;
}

assert.deepStrictEqual(iteratedLines, expectedLines);
Expand Down
Loading