From 87668cb2a977918926ad5e927f0bd5710e1d6765 Mon Sep 17 00:00:00 2001 From: "Kamat, Trivikram" <16024985+trivikr@users.noreply.github.com> Date: Fri, 22 May 2026 21:22:43 -0700 Subject: [PATCH] stream: fix overlapping broadcast next calls Preserve the first pending broadcast consumer read when next() is called again before data is available. The overlapping read now closes the consumer without replacing the earlier resolver, allowing the pending read to receive the next chunk. Fixes: https://github.com/nodejs/node/issues/63499 Signed-off-by: Kamat, Trivikram <16024985+trivikr@users.noreply.github.com> Assisted-by: openai:gpt-5.5 --- lib/internal/streams/iter/broadcast.js | 11 ++++++++++ .../test-stream-iter-broadcast-basic.js | 22 +++++++++++++++++++ 2 files changed, 33 insertions(+) diff --git a/lib/internal/streams/iter/broadcast.js b/lib/internal/streams/iter/broadcast.js index e6a404729d6e0b..e42c9cdce136f6 100644 --- a/lib/internal/streams/iter/broadcast.js +++ b/lib/internal/streams/iter/broadcast.js @@ -207,6 +207,11 @@ class BroadcastImpl { return kDone; } + if (state.resolve) { + state.detached = true; + return kDone; + } + const { promise, resolve, reject } = PromiseWithResolvers(); state.resolve = resolve; state.reject = reject; @@ -312,6 +317,9 @@ class BroadcastImpl { } consumer.resolve = null; consumer.reject = null; + if (consumer.detached && this.#deleteConsumer(consumer)) { + this.#tryTrimBuffer(); + } } } } @@ -396,6 +404,9 @@ class BroadcastImpl { consumer.resolve = null; consumer.reject = null; resolve({ __proto__: null, done: false, value: chunk }); + if (consumer.detached && this.#deleteConsumer(consumer)) { + this.#tryTrimBuffer(); + } } else { // Still waiting -- put back ArrayPrototypePush(this.#waiters, consumer); diff --git a/test/parallel/test-stream-iter-broadcast-basic.js b/test/parallel/test-stream-iter-broadcast-basic.js index ab2c81304ec2ac..9f216b451bc433 100644 --- a/test/parallel/test-stream-iter-broadcast-basic.js +++ b/test/parallel/test-stream-iter-broadcast-basic.js @@ -243,6 +243,27 @@ async function testLateJoinerSeesBufferedData() { assert.strictEqual(result, 'before-join'); } +async function testOverlappingNextKeepsEarlierRead() { + const { writer, broadcast: bc } = broadcast(); + const it = bc.push()[Symbol.asyncIterator](); + + const first = it.next(); + const second = it.next(); + + await writer.write('x'); + + assert.deepStrictEqual(await second, { + __proto__: null, + done: true, + value: undefined, + }); + + const result = await first; + assert.strictEqual(result.done, false); + assert.strictEqual(Buffer.concat(result.value).toString(), 'x'); + assert.strictEqual(bc.consumerCount, 0); +} + Promise.all([ testBasicBroadcast(), testMultipleWrites(), @@ -257,4 +278,5 @@ Promise.all([ testFailDetachesConsumers(), testWriterFailIdempotent(), testLateJoinerSeesBufferedData(), + testOverlappingNextKeepsEarlierRead(), ]).then(common.mustCall());