diff --git a/.changeset/tidy-lions-repair.md b/.changeset/tidy-lions-repair.md new file mode 100644 index 00000000..60af82fb --- /dev/null +++ b/.changeset/tidy-lions-repair.md @@ -0,0 +1,5 @@ +--- +'@livekit/rtc-node': patch +--- + +Fix AudioSource.waitForPlayout resolving immediately with audio still queued. The internal playout promise could be left resolved ("latched") by the drain timer firing during a gap between captures, or by clearQueue()/pause; a later waitForPlayout() then consumed the stale resolution and reported playout complete ~queueSizeMs early, clipping the tail of agent speech on every turn in downstream consumers. The waiter is now discarded when released and lazily re-created on the next captureFrame, mirroring python-sdks. As part of this, waitForPlayout() now resolves immediately when no audio is queued instead of blocking until the next release. diff --git a/packages/livekit-rtc/src/audio_source.test.ts b/packages/livekit-rtc/src/audio_source.test.ts new file mode 100644 index 00000000..33f9b19a --- /dev/null +++ b/packages/livekit-rtc/src/audio_source.test.ts @@ -0,0 +1,92 @@ +// SPDX-FileCopyrightText: 2026 LiveKit, Inc. +// +// SPDX-License-Identifier: Apache-2.0 +import { describe, expect, it } from 'vitest'; +import { AudioFrame } from './audio_frame.js'; +import { AudioSource } from './audio_source.js'; + +const SAMPLE_RATE = 24000; +const FRAME_MS = 20; +const SAMPLES = (SAMPLE_RATE * FRAME_MS) / 1000; +const QUEUE_MS = 200; + +const makeFrame = () => new AudioFrame(new Int16Array(SAMPLES).fill(1000), SAMPLE_RATE, 1, SAMPLES); +const sleep = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms)); + +async function pushAudio(source: AudioSource, durationMs: number) { + for (let i = 0; i < durationMs / FRAME_MS; i++) { + await source.captureFrame(makeFrame()); + } +} + +describe('AudioSource', () => { + it('waitForPlayout waits for the queued audio to drain', async () => { + const source = new AudioSource(SAMPLE_RATE, 1, QUEUE_MS); + await pushAudio(source, 600); + const pushedAt = Date.now(); + await source.waitForPlayout(); + // ~QUEUE_MS of audio is still buffered when the last capture returns + expect(Date.now() - pushedAt).toBeGreaterThanOrEqual(QUEUE_MS - 50); + await source.close(); + }); + + it('waitForPlayout waits for drain after a capture gap fired the drain timer', async () => { + const source = new AudioSource(SAMPLE_RATE, 1, QUEUE_MS); + // a single frame followed by a gap longer than its duration: the internal + // drain timer fires and resolves the playout promise while the segment is + // still streaming + await source.captureFrame(makeFrame()); + await sleep(FRAME_MS * 3); + await pushAudio(source, 600); + const pushedAt = Date.now(); + await source.waitForPlayout(); + expect(Date.now() - pushedAt).toBeGreaterThanOrEqual(QUEUE_MS - 50); + await source.close(); + }); + + it('waitForPlayout waits for drain after a previous clearQueue', async () => { + const source = new AudioSource(SAMPLE_RATE, 1, QUEUE_MS); + // e.g. an interrupted turn: buffered audio is dropped, releasing the + // playout promise + await source.captureFrame(makeFrame()); + source.clearQueue(); + // the next turn must not consume the stale resolution + await pushAudio(source, 600); + const pushedAt = Date.now(); + await source.waitForPlayout(); + expect(Date.now() - pushedAt).toBeGreaterThanOrEqual(QUEUE_MS - 50); + await source.close(); + }); + + it('waitForPlayout resolves promptly when interrupted by clearQueue', async () => { + const source = new AudioSource(SAMPLE_RATE, 1, QUEUE_MS); + await pushAudio(source, 600); + const playout = source.waitForPlayout(); + source.clearQueue(); + const clearedAt = Date.now(); + await playout; + expect(Date.now() - clearedAt).toBeLessThan(50); + await source.close(); + }); + + it('waitForPlayout resolves immediately when no audio is queued', async () => { + const source = new AudioSource(SAMPLE_RATE, 1, QUEUE_MS); + // nothing ever captured + const before = Date.now(); + await source.waitForPlayout(); + // fully drained (drain timer fired and released the waiter) + await pushAudio(source, 100); + await sleep(QUEUE_MS + 100); + await source.waitForPlayout(); + expect(Date.now() - before).toBeLessThan(QUEUE_MS + 400); + await source.close(); + }); + + it('close resolves a pending waitForPlayout', async () => { + const source = new AudioSource(SAMPLE_RATE, 1, QUEUE_MS); + await pushAudio(source, 600); + const playout = source.waitForPlayout(); + await source.close(); + await playout; + }); +}); diff --git a/packages/livekit-rtc/src/audio_source.ts b/packages/livekit-rtc/src/audio_source.ts index 7ffe9dbc..42d3d279 100644 --- a/packages/livekit-rtc/src/audio_source.ts +++ b/packages/livekit-rtc/src/audio_source.ts @@ -28,8 +28,9 @@ export class AudioSource { /** @internal */ currentQueueSize: number; /** @internal */ - release = () => {}; - promise = this.newPromise(); + promise?: Promise = undefined; + /** @internal */ + resolvePromise?: () => void = undefined; /** @internal */ timeout?: ReturnType = undefined; /** @internal */ @@ -84,24 +85,41 @@ export class AudioSource { }, }); - this.currentQueueSize = 0; - this.release(); + this.releaseWaiter(); } - /** @internal */ - async newPromise() { - return new Promise((resolve) => { - this.release = resolve; - }); - } + /** + * Resolve the pending waitForPlayout() promise (if any) and reset the queue + * bookkeeping. Mirrors python-sdks' AudioSource._release_waiter: the promise is + * discarded here and lazily re-created by the next captureFrame, so a later + * waitForPlayout() can never consume a stale resolution and report playout + * complete while audio is still queued. + * @internal + */ + releaseWaiter = () => { + if (!this.promise) { + return; + } - async waitForPlayout() { - return this.promise.then(() => { - this.lastCapture = 0; - this.currentQueueSize = 0; - this.promise = this.newPromise(); + this.resolvePromise?.(); + this.lastCapture = 0; + this.currentQueueSize = 0; + this.promise = undefined; + this.resolvePromise = undefined; + // cancel the drain timer (e.g. when released early by clearQueue), otherwise + // it would fire later and release the waiter of a subsequent segment + if (this.timeout) { + clearTimeout(this.timeout); this.timeout = undefined; - }); + } + }; + + async waitForPlayout() { + if (!this.promise) { + return; + } + + await this.promise; } async captureFrame(frame: AudioFrame) { @@ -124,7 +142,13 @@ export class AudioSource { clearTimeout(this.timeout); } - this.timeout = setTimeout(this.release, this.currentQueueSize); + if (!this.promise) { + this.promise = new Promise((resolve) => { + this.resolvePromise = resolve; + }); + } + + this.timeout = setTimeout(this.releaseWaiter, this.currentQueueSize); const req = new CaptureAudioFrameRequest({ sourceHandle: this.ffiHandle.handle, @@ -152,7 +176,7 @@ export class AudioSource { this.timeout = undefined; } // Resolve any pending waitForPlayout() promise so callers don't hang. - this.release(); + this.releaseWaiter(); this.ffiHandle.dispose(); this.closed = true; }