stream: allow stream to stay open after take#47023
stream: allow stream to stay open after take#47023rluvaton wants to merge 18 commits intonodejs:mainfrom
Conversation
|
Review requested:
|
| if (options?.destroyStream != null) { | ||
| validateBoolean(options.destroyStream, 'options.destroyStream'); | ||
| } |
There was a problem hiding this comment.
Maybe another name would be better? Not sure what though... @benjamingr
lib/internal/streams/operators.js
Outdated
| for await (const val of this.iterator({ destroyOnReturn: options?.destroyStream ?? true })) { | ||
| if (options?.signal?.aborted) { | ||
| throw new AbortError(); | ||
| } |
There was a problem hiding this comment.
I think that if the stream fails we should close the stream, WDYT @ronag ?
There was a problem hiding this comment.
after thinking I think it should be closed on an error as in the iterator helpers proposal spec the underlying iterator should be closed when it failed
mcollina
left a comment
There was a problem hiding this comment.
Thanks for opening a PR! Can you please add a unit test?
|
Docs are also missing |
|
Hey @mcollina I've added tests and docs :) |
|
@mcollina is something holding this back from merging? Or we waiting for the TC39? |
|
we have 48 hours grace period to allow for folks across the globe to review |
Failed to start CI- Validating Jenkins credentials ✔ Jenkins credentials valid - Starting PR CI job ✘ Failed to start PR CI: 403 Forbiddenhttps://github.com/nodejs/node/actions/runs/4384604326 |
Failed to start CI- Validating Jenkins credentials ✔ Jenkins credentials valid - Starting PR CI job ✘ Failed to start PR CI: 403 Forbiddenhttps://github.com/nodejs/node/actions/runs/4385139704 |
Co-authored-by: Debadree Chatterjee <debadree333@gmail.com>
…fter-take-46980' into feat/allow_stream-to-stay-open-after-take-46980
aren't we using squash and merge? |
Ref: #46910 (comment) |
benjamingr
left a comment
There was a problem hiding this comment.
I don't think discussion has exhausted itself in the spec issue and I don't want to deviate from it until we proved we must
Should be add the blocked label for now then? |
benjamingr
left a comment
There was a problem hiding this comment.
Yeah sorry I meant to explicitly block and hit the wrong button
michaelficarra
left a comment
There was a problem hiding this comment.
I think what you really want is a chunk (which will be proposed for inclusion in standard iterators in the future) or a nextN helper like
function nextN(iterator, n) {
const result = [];
const next = iterator.next;
for (; n > 0; --n) {
cont { done, value } = next.call(iterator);
if (done) break;
result.push(value);
}
return result;
}|
@michaelficarra no problem but maybe could you update the proposal first so we won't have this back and forth FYI I think nextN is better name than chunk |
|
@rluvaton |
Oh, after looking at chunk what I really want is Do you think that non-closing
|
|
Having
Example of usages: having it as an operator: const responses = await topVisitedUrlsIterator
.filter(noEmptyLine)
.drop(1) // Header
.nextN(8)
.map(parseLine)
.map(value => apiCall(value), { concurrency: 4 })
.toArray();having it as a static function: const values = await nextN(
topVisitedUrlsIterator
.filter(noEmptyLine)
.drop(1),
8)
.map(parseLine);
// How would I use the concurrency that we would have in the map?
const responses = concurrentMap(values, (value) => apiCall(value), 4); |
# Conflicts: # lib/internal/streams/operators.js
fix #46980
TODO