feat(internal): add sync tee util#7560
Conversation
warner
left a comment
There was a problem hiding this comment.
I'd rely upon @kriskowal for a proper review, but the fact that this passes the unit tests below gives me confidence that it's working as intended.f
| ]); | ||
|
|
||
| t.deepEqual(output1, sourceData.slice(0, 2)); | ||
| t.deepEqual(output2, sourceData.slice(0, 2)); |
There was a problem hiding this comment.
Should this also test that the original stream still has one item left? Or is that not a property we care about?
There was a problem hiding this comment.
Modified the last test to be explicit about this.
| * @param {AsyncIterator<T, void, void>} stream | ||
| * @param {number} quantity | ||
| */ | ||
| export const syncTee = (stream, quantity) => { |
There was a problem hiding this comment.
Given that this is very much about async iterators, maybe name it synchronizedTee or lockstepTee?
Also, maybe numReaders or readerCount instead of quantity, to make the docstring a bit more instructive.
| const rejections = []; | ||
| /** @type {Array<(value: PromiseLike<IteratorResult<T>>) => void>} */ | ||
| const resolvers = []; | ||
| let done = false; |
There was a problem hiding this comment.
By "done" we mean "one of the readers stopped reading early", right? Maybe use readerDone or readerStopped or something similar? Just trying to capture the fruits of my ignorance while I still have it :)
There was a problem hiding this comment.
done is really the term used in IteratorResults which have the shape of {done: boolean, value: any}.
| const error = assert.error(assert.details`Teed stream threw`); | ||
| assert.note(error, assert.details`Teed rejections: ${rejections}`); | ||
| result = | ||
| stream.throw?.(error) || |
There was a problem hiding this comment.
I don't know the semantics of stream.throw, but could it throw an error? And would that cause the chain of promises to break without telling the remaining readers? And, could it return a non-promise? In which case the result.then below would fail.
I see that any rejected promise by stream.return will still proceed to the notify-readers at the end of this function, so I think we're defensive against that method.
There was a problem hiding this comment.
I don't know the semantics of
stream.throw, but could it throw an error?
And, could it return a non-promise?
.throw usually rejects yes, which is why we capture the rejection. Unless it unleashes Zalgo, it should not throw synchronously. This is definitely expecting a well behaved source.
without telling the remaining readers
We do feed the result (likely a rejection) to all the readers. There is a test for that.
stream.return... I think we're defensive against that method.
We're not any more defensive than for throw, but we make sure that if return does exist and returns a promise, that we end up rejecting instead like a generator source would have. These edge cases are not covered by tests however.
| * @template T | ||
| * @param {AsyncIterable<T>} stream | ||
| * @param {Array<T>} output | ||
| * @param {number} [breakAfter] |
There was a problem hiding this comment.
This is like maxItems, right? It's counting units of whatever the iterator is iterating over.
|
that test failure looked spurious (some web thing), so I kicked it |
Review feedback
Test producer side of interruption
This is inspired by his work, and I shared pretty much this implementation with him earlier. I'd love a proper review, but this is blocking an upcoming PR. |
| * @param {AsyncIterator<T, void, void>} sourceStream | ||
| * @param {number} readerCount | ||
| */ | ||
| export const synchronizedTee = (sourceStream, readerCount) => { |
There was a problem hiding this comment.
synchronized is an unnecessary qualifier. All streams are synchronized by default and can be made resilient to bursts by priming a buffer to each individual consumer. In the context of the @endo/stream library, makePipe returns a synchronized reader/writer which can be buffered by pre-acking a certain number of iterations, effectively allowing the producer to get exactly that far ahead of the consumer. Imagine:
AsyncIterator.prototype.tee(quantity: length): Array<AsyncIterator>;
AsyncIterator.prototype.buffer(length: number): AsyncIterator;There was a problem hiding this comment.
I think there can be non synchronized teeing. In fact I have a similar tee helper in another branch where the source is consumed at the pace of the fastest branch (allowing for unbounded memory growth).
Buffering is not equivalent as by definition that only allows for a certain bounded amount of deviation.
There was a problem hiding this comment.
I’d name these to encourage the use of bounded memory and probably wouldn’t even suggest standardizing an unbounded variant with the exception of terminal collectors like Array.fromAsync or response.json().
| ); | ||
| } else if (done) { | ||
| result = | ||
| sourceStream.return?.() || |
There was a problem hiding this comment.
Once you have an array of promises that you know have settled, some of which may be rejected, it might be sufficient to lean on Promise.all to consolidate them into a rejected promise with an AggregateError instead of rolling that by hand.
But, it would suffice here to leave a comment with the reason you did not do that, which appears to be to connect with SES causal tracing.
There was a problem hiding this comment.
Tracking taming Promise.all, worth referencing too endojs/endo#1568
There was a problem hiding this comment.
Promise.all short circuits on rejection, which mean aggregation is not possible. I have in fact a PromiseAllOrRejections helper that does not short circuit, but alas it's not standard (yet?)
| } | ||
| }, | ||
| () => { | ||
| doneResult = { done: true, value: undefined }; |
There was a problem hiding this comment.
Consider hardening the result.
| return pullNext(); | ||
| }; | ||
|
|
||
| const readers = Array.from({ length: readerCount }).map(() => { |
There was a problem hiding this comment.
TIL Array.from({length})! I’ve been reluctantly new Array(length).fill(undefined).
| return reader; | ||
| }); | ||
|
|
||
| void pullNext(); |
There was a problem hiding this comment.
We need to be extra certain that no rejections fall through to this promise. Ideally we could fall through here to .catch(reportError) but that is not present in Node.js yet.
refs: #6943
Description
This add a util to perform synchronous tee-ing of streams to the internal package. This really should be an
@endo/streamtool, but I really need this now.Security Considerations
None
Scaling Considerations
Avoid unbounded memory usage by synchronously consuming.
Documentation Considerations
Should move to endo
Testing Considerations
Added unit tests