From 9db083ea353f7546af941c574cbb84acb845f6ec Mon Sep 17 00:00:00 2001 From: Chenghao Mou Date: Wed, 1 Jul 2026 21:37:10 +0100 Subject: [PATCH 1/2] fix: AudioSource.waitForPlayout resolving early with audio still queued The internal playout promise could be left resolved (latched) by the drain timer firing during a gap between captures, or by clearQueue(). A later waitForPlayout() then consumed the stale resolution and reported playout complete with up to queueSizeMs of audio still buffered, clipping the tail of agent speech on every turn downstream. Re-arm the promise on the next captureFrame when it was already released, and skip the post-playout bookkeeping reset when the promise was re-armed while waiting. Co-Authored-By: Claude Fable 5 --- .changeset/tidy-lions-repair.md | 5 ++ packages/livekit-rtc/src/audio_source.test.ts | 79 +++++++++++++++++++ packages/livekit-rtc/src/audio_source.ts | 24 +++++- 3 files changed, 105 insertions(+), 3 deletions(-) create mode 100644 .changeset/tidy-lions-repair.md create mode 100644 packages/livekit-rtc/src/audio_source.test.ts diff --git a/.changeset/tidy-lions-repair.md b/.changeset/tidy-lions-repair.md new file mode 100644 index 00000000..d42ad9a7 --- /dev/null +++ b/.changeset/tidy-lions-repair.md @@ -0,0 +1,5 @@ +--- +'@livekit/rtc-node': patch +--- + +Fix AudioSource.waitForPlayout resolving immediately with audio still queued. The internal playout promise could be left resolved ("latched") by the drain timer firing during a gap between captures, or by clearQueue()/pause; a later waitForPlayout() then consumed the stale resolution and reported playout complete ~queueSizeMs early, clipping the tail of agent speech on every turn in downstream consumers. diff --git a/packages/livekit-rtc/src/audio_source.test.ts b/packages/livekit-rtc/src/audio_source.test.ts new file mode 100644 index 00000000..7d3bee7f --- /dev/null +++ b/packages/livekit-rtc/src/audio_source.test.ts @@ -0,0 +1,79 @@ +// SPDX-FileCopyrightText: 2026 LiveKit, Inc. +// +// SPDX-License-Identifier: Apache-2.0 +import { describe, expect, it } from 'vitest'; +import { AudioFrame } from './audio_frame.js'; +import { AudioSource } from './audio_source.js'; + +const SAMPLE_RATE = 24000; +const FRAME_MS = 20; +const SAMPLES = (SAMPLE_RATE * FRAME_MS) / 1000; +const QUEUE_MS = 200; + +const makeFrame = () => new AudioFrame(new Int16Array(SAMPLES).fill(1000), SAMPLE_RATE, 1, SAMPLES); +const sleep = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms)); + +async function pushAudio(source: AudioSource, durationMs: number) { + for (let i = 0; i < durationMs / FRAME_MS; i++) { + await source.captureFrame(makeFrame()); + } +} + +describe('AudioSource', () => { + it('waitForPlayout waits for the queued audio to drain', async () => { + const source = new AudioSource(SAMPLE_RATE, 1, QUEUE_MS); + await pushAudio(source, 600); + const pushedAt = Date.now(); + await source.waitForPlayout(); + // ~QUEUE_MS of audio is still buffered when the last capture returns + expect(Date.now() - pushedAt).toBeGreaterThanOrEqual(QUEUE_MS - 50); + await source.close(); + }); + + it('waitForPlayout waits for drain after a capture gap fired the drain timer', async () => { + const source = new AudioSource(SAMPLE_RATE, 1, QUEUE_MS); + // a single frame followed by a gap longer than its duration: the internal + // drain timer fires and resolves the playout promise while the segment is + // still streaming + await source.captureFrame(makeFrame()); + await sleep(FRAME_MS * 3); + await pushAudio(source, 600); + const pushedAt = Date.now(); + await source.waitForPlayout(); + expect(Date.now() - pushedAt).toBeGreaterThanOrEqual(QUEUE_MS - 50); + await source.close(); + }); + + it('waitForPlayout waits for drain after a previous clearQueue', async () => { + const source = new AudioSource(SAMPLE_RATE, 1, QUEUE_MS); + // e.g. an interrupted turn: buffered audio is dropped, releasing the + // playout promise + await source.captureFrame(makeFrame()); + source.clearQueue(); + // the next turn must not consume the stale resolution + await pushAudio(source, 600); + const pushedAt = Date.now(); + await source.waitForPlayout(); + expect(Date.now() - pushedAt).toBeGreaterThanOrEqual(QUEUE_MS - 50); + await source.close(); + }); + + it('waitForPlayout resolves promptly when interrupted by clearQueue', async () => { + const source = new AudioSource(SAMPLE_RATE, 1, QUEUE_MS); + await pushAudio(source, 600); + const playout = source.waitForPlayout(); + source.clearQueue(); + const clearedAt = Date.now(); + await playout; + expect(Date.now() - clearedAt).toBeLessThan(50); + await source.close(); + }); + + it('close resolves a pending waitForPlayout', async () => { + const source = new AudioSource(SAMPLE_RATE, 1, QUEUE_MS); + await pushAudio(source, 600); + const playout = source.waitForPlayout(); + await source.close(); + await playout; + }); +}); diff --git a/packages/livekit-rtc/src/audio_source.ts b/packages/livekit-rtc/src/audio_source.ts index 7ffe9dbc..4b7963e3 100644 --- a/packages/livekit-rtc/src/audio_source.ts +++ b/packages/livekit-rtc/src/audio_source.ts @@ -29,6 +29,8 @@ export class AudioSource { currentQueueSize: number; /** @internal */ release = () => {}; + /** @internal */ + released = false; promise = this.newPromise(); /** @internal */ timeout?: ReturnType = undefined; @@ -90,18 +92,26 @@ export class AudioSource { /** @internal */ async newPromise() { + this.released = false; return new Promise((resolve) => { - this.release = resolve; + this.release = () => { + this.released = true; + resolve(); + }; }); } async waitForPlayout() { - return this.promise.then(() => { + const promise = this.promise; + await promise; + // Skip the reset if captureFrame re-armed the promise while we were waiting: + // the bookkeeping now belongs to audio captured after this playout completed. + if (this.promise === promise) { this.lastCapture = 0; this.currentQueueSize = 0; this.promise = this.newPromise(); this.timeout = undefined; - }); + } } async captureFrame(frame: AudioFrame) { @@ -124,6 +134,14 @@ export class AudioSource { clearTimeout(this.timeout); } + if (this.released) { + // The playout promise was already resolved — the drain timer fired during a + // gap between captures, or clearQueue() released it. Re-arm it so a later + // waitForPlayout() waits for this new audio instead of consuming the stale + // resolution and reporting playout complete while audio is still queued. + this.promise = this.newPromise(); + } + this.timeout = setTimeout(this.release, this.currentQueueSize); const req = new CaptureAudioFrameRequest({ From 427bb88f9eef33d7aba060b9b24c460d046b5f1d Mon Sep 17 00:00:00 2001 From: Chenghao Mou Date: Wed, 1 Jul 2026 22:19:35 +0100 Subject: [PATCH 2/2] refactor: mirror python-sdks _release_waiter structure Discard the waiter when released (timer, clearQueue, close) and lazily re-create it on the next captureFrame, matching python-sdks' fix for the same bug (livekit/python-sdks#270). Releasing also cancels the drain timer so an orphaned timer can't release a later segment's waiter. waitForPlayout() now resolves immediately when no audio is queued, matching Python semantics. Co-Authored-By: Claude Fable 5 --- .changeset/tidy-lions-repair.md | 2 +- packages/livekit-rtc/src/audio_source.test.ts | 13 ++++ packages/livekit-rtc/src/audio_source.ts | 70 ++++++++++--------- 3 files changed, 52 insertions(+), 33 deletions(-) diff --git a/.changeset/tidy-lions-repair.md b/.changeset/tidy-lions-repair.md index d42ad9a7..60af82fb 100644 --- a/.changeset/tidy-lions-repair.md +++ b/.changeset/tidy-lions-repair.md @@ -2,4 +2,4 @@ '@livekit/rtc-node': patch --- -Fix AudioSource.waitForPlayout resolving immediately with audio still queued. The internal playout promise could be left resolved ("latched") by the drain timer firing during a gap between captures, or by clearQueue()/pause; a later waitForPlayout() then consumed the stale resolution and reported playout complete ~queueSizeMs early, clipping the tail of agent speech on every turn in downstream consumers. +Fix AudioSource.waitForPlayout resolving immediately with audio still queued. The internal playout promise could be left resolved ("latched") by the drain timer firing during a gap between captures, or by clearQueue()/pause; a later waitForPlayout() then consumed the stale resolution and reported playout complete ~queueSizeMs early, clipping the tail of agent speech on every turn in downstream consumers. The waiter is now discarded when released and lazily re-created on the next captureFrame, mirroring python-sdks. As part of this, waitForPlayout() now resolves immediately when no audio is queued instead of blocking until the next release. diff --git a/packages/livekit-rtc/src/audio_source.test.ts b/packages/livekit-rtc/src/audio_source.test.ts index 7d3bee7f..33f9b19a 100644 --- a/packages/livekit-rtc/src/audio_source.test.ts +++ b/packages/livekit-rtc/src/audio_source.test.ts @@ -69,6 +69,19 @@ describe('AudioSource', () => { await source.close(); }); + it('waitForPlayout resolves immediately when no audio is queued', async () => { + const source = new AudioSource(SAMPLE_RATE, 1, QUEUE_MS); + // nothing ever captured + const before = Date.now(); + await source.waitForPlayout(); + // fully drained (drain timer fired and released the waiter) + await pushAudio(source, 100); + await sleep(QUEUE_MS + 100); + await source.waitForPlayout(); + expect(Date.now() - before).toBeLessThan(QUEUE_MS + 400); + await source.close(); + }); + it('close resolves a pending waitForPlayout', async () => { const source = new AudioSource(SAMPLE_RATE, 1, QUEUE_MS); await pushAudio(source, 600); diff --git a/packages/livekit-rtc/src/audio_source.ts b/packages/livekit-rtc/src/audio_source.ts index 4b7963e3..42d3d279 100644 --- a/packages/livekit-rtc/src/audio_source.ts +++ b/packages/livekit-rtc/src/audio_source.ts @@ -28,10 +28,9 @@ export class AudioSource { /** @internal */ currentQueueSize: number; /** @internal */ - release = () => {}; + promise?: Promise = undefined; /** @internal */ - released = false; - promise = this.newPromise(); + resolvePromise?: () => void = undefined; /** @internal */ timeout?: ReturnType = undefined; /** @internal */ @@ -86,32 +85,41 @@ export class AudioSource { }, }); - this.currentQueueSize = 0; - this.release(); + this.releaseWaiter(); } - /** @internal */ - async newPromise() { - this.released = false; - return new Promise((resolve) => { - this.release = () => { - this.released = true; - resolve(); - }; - }); - } + /** + * Resolve the pending waitForPlayout() promise (if any) and reset the queue + * bookkeeping. Mirrors python-sdks' AudioSource._release_waiter: the promise is + * discarded here and lazily re-created by the next captureFrame, so a later + * waitForPlayout() can never consume a stale resolution and report playout + * complete while audio is still queued. + * @internal + */ + releaseWaiter = () => { + if (!this.promise) { + return; + } - async waitForPlayout() { - const promise = this.promise; - await promise; - // Skip the reset if captureFrame re-armed the promise while we were waiting: - // the bookkeeping now belongs to audio captured after this playout completed. - if (this.promise === promise) { - this.lastCapture = 0; - this.currentQueueSize = 0; - this.promise = this.newPromise(); + this.resolvePromise?.(); + this.lastCapture = 0; + this.currentQueueSize = 0; + this.promise = undefined; + this.resolvePromise = undefined; + // cancel the drain timer (e.g. when released early by clearQueue), otherwise + // it would fire later and release the waiter of a subsequent segment + if (this.timeout) { + clearTimeout(this.timeout); this.timeout = undefined; } + }; + + async waitForPlayout() { + if (!this.promise) { + return; + } + + await this.promise; } async captureFrame(frame: AudioFrame) { @@ -134,15 +142,13 @@ export class AudioSource { clearTimeout(this.timeout); } - if (this.released) { - // The playout promise was already resolved — the drain timer fired during a - // gap between captures, or clearQueue() released it. Re-arm it so a later - // waitForPlayout() waits for this new audio instead of consuming the stale - // resolution and reporting playout complete while audio is still queued. - this.promise = this.newPromise(); + if (!this.promise) { + this.promise = new Promise((resolve) => { + this.resolvePromise = resolve; + }); } - this.timeout = setTimeout(this.release, this.currentQueueSize); + this.timeout = setTimeout(this.releaseWaiter, this.currentQueueSize); const req = new CaptureAudioFrameRequest({ sourceHandle: this.ffiHandle.handle, @@ -170,7 +176,7 @@ export class AudioSource { this.timeout = undefined; } // Resolve any pending waitForPlayout() promise so callers don't hang. - this.release(); + this.releaseWaiter(); this.ffiHandle.dispose(); this.closed = true; }