Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/tidy-lions-repair.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@livekit/rtc-node': patch
---

Fix AudioSource.waitForPlayout resolving immediately with audio still queued. The internal playout promise could be left resolved ("latched") by the drain timer firing during a gap between captures, or by clearQueue()/pause; a later waitForPlayout() then consumed the stale resolution and reported playout complete ~queueSizeMs early, clipping the tail of agent speech on every turn in downstream consumers. The waiter is now discarded when released and lazily re-created on the next captureFrame, mirroring python-sdks. As part of this, waitForPlayout() now resolves immediately when no audio is queued instead of blocking until the next release.
92 changes: 92 additions & 0 deletions packages/livekit-rtc/src/audio_source.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
// SPDX-FileCopyrightText: 2026 LiveKit, Inc.
//
// SPDX-License-Identifier: Apache-2.0
import { describe, expect, it } from 'vitest';
import { AudioFrame } from './audio_frame.js';
import { AudioSource } from './audio_source.js';

const SAMPLE_RATE = 24000;
const FRAME_MS = 20;
const SAMPLES = (SAMPLE_RATE * FRAME_MS) / 1000;
const QUEUE_MS = 200;

const makeFrame = () => new AudioFrame(new Int16Array(SAMPLES).fill(1000), SAMPLE_RATE, 1, SAMPLES);
const sleep = (ms: number) => new Promise<void>((resolve) => setTimeout(resolve, ms));

async function pushAudio(source: AudioSource, durationMs: number) {
for (let i = 0; i < durationMs / FRAME_MS; i++) {
await source.captureFrame(makeFrame());
}
}

describe('AudioSource', () => {
it('waitForPlayout waits for the queued audio to drain', async () => {
const source = new AudioSource(SAMPLE_RATE, 1, QUEUE_MS);
await pushAudio(source, 600);
const pushedAt = Date.now();
await source.waitForPlayout();
// ~QUEUE_MS of audio is still buffered when the last capture returns
expect(Date.now() - pushedAt).toBeGreaterThanOrEqual(QUEUE_MS - 50);
await source.close();
});

it('waitForPlayout waits for drain after a capture gap fired the drain timer', async () => {
const source = new AudioSource(SAMPLE_RATE, 1, QUEUE_MS);
// a single frame followed by a gap longer than its duration: the internal
// drain timer fires and resolves the playout promise while the segment is
// still streaming
await source.captureFrame(makeFrame());
await sleep(FRAME_MS * 3);
await pushAudio(source, 600);
const pushedAt = Date.now();
await source.waitForPlayout();
expect(Date.now() - pushedAt).toBeGreaterThanOrEqual(QUEUE_MS - 50);
await source.close();
});

it('waitForPlayout waits for drain after a previous clearQueue', async () => {
const source = new AudioSource(SAMPLE_RATE, 1, QUEUE_MS);
// e.g. an interrupted turn: buffered audio is dropped, releasing the
// playout promise
await source.captureFrame(makeFrame());
source.clearQueue();
// the next turn must not consume the stale resolution
await pushAudio(source, 600);
const pushedAt = Date.now();
await source.waitForPlayout();
expect(Date.now() - pushedAt).toBeGreaterThanOrEqual(QUEUE_MS - 50);
await source.close();
});

it('waitForPlayout resolves promptly when interrupted by clearQueue', async () => {
const source = new AudioSource(SAMPLE_RATE, 1, QUEUE_MS);
await pushAudio(source, 600);
const playout = source.waitForPlayout();
source.clearQueue();
const clearedAt = Date.now();
await playout;
expect(Date.now() - clearedAt).toBeLessThan(50);
await source.close();
});

it('waitForPlayout resolves immediately when no audio is queued', async () => {
const source = new AudioSource(SAMPLE_RATE, 1, QUEUE_MS);
// nothing ever captured
const before = Date.now();
await source.waitForPlayout();
// fully drained (drain timer fired and released the waiter)
await pushAudio(source, 100);
await sleep(QUEUE_MS + 100);
await source.waitForPlayout();
expect(Date.now() - before).toBeLessThan(QUEUE_MS + 400);
await source.close();
});

it('close resolves a pending waitForPlayout', async () => {
const source = new AudioSource(SAMPLE_RATE, 1, QUEUE_MS);
await pushAudio(source, 600);
const playout = source.waitForPlayout();
await source.close();
await playout;
});
});
60 changes: 42 additions & 18 deletions packages/livekit-rtc/src/audio_source.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@ export class AudioSource {
/** @internal */
currentQueueSize: number;
/** @internal */
release = () => {};
promise = this.newPromise();
promise?: Promise<void> = undefined;
/** @internal */
resolvePromise?: () => void = undefined;
/** @internal */
timeout?: ReturnType<typeof setTimeout> = undefined;
/** @internal */
Expand Down Expand Up @@ -84,24 +85,41 @@ export class AudioSource {
},
});

this.currentQueueSize = 0;
this.release();
this.releaseWaiter();
}

/** @internal */
async newPromise() {
return new Promise<void>((resolve) => {
this.release = resolve;
});
}
/**
* Resolve the pending waitForPlayout() promise (if any) and reset the queue
* bookkeeping. Mirrors python-sdks' AudioSource._release_waiter: the promise is
* discarded here and lazily re-created by the next captureFrame, so a later
* waitForPlayout() can never consume a stale resolution and report playout
* complete while audio is still queued.
* @internal
*/
releaseWaiter = () => {
if (!this.promise) {
return;
}

async waitForPlayout() {
return this.promise.then(() => {
this.lastCapture = 0;
this.currentQueueSize = 0;
this.promise = this.newPromise();
this.resolvePromise?.();
this.lastCapture = 0;
this.currentQueueSize = 0;
this.promise = undefined;
this.resolvePromise = undefined;
// cancel the drain timer (e.g. when released early by clearQueue), otherwise
// it would fire later and release the waiter of a subsequent segment
if (this.timeout) {
clearTimeout(this.timeout);
this.timeout = undefined;
});
}
};

async waitForPlayout() {
if (!this.promise) {
return;
}

await this.promise;
}

async captureFrame(frame: AudioFrame) {
Expand All @@ -124,7 +142,13 @@ export class AudioSource {
clearTimeout(this.timeout);
}

this.timeout = setTimeout(this.release, this.currentQueueSize);
if (!this.promise) {
this.promise = new Promise<void>((resolve) => {
this.resolvePromise = resolve;
});
}

this.timeout = setTimeout(this.releaseWaiter, this.currentQueueSize);

const req = new CaptureAudioFrameRequest({
sourceHandle: this.ffiHandle.handle,
Expand Down Expand Up @@ -152,7 +176,7 @@ export class AudioSource {
this.timeout = undefined;
}
// Resolve any pending waitForPlayout() promise so callers don't hang.
this.release();
this.releaseWaiter();
this.ffiHandle.dispose();
this.closed = true;
}
Expand Down
Loading