Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/hot-taxis-judge.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@livekit/agents': patch
---

Add utility to play local audio file to livekit
6 changes: 6 additions & 0 deletions REUSE.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,9 @@ SPDX-License-Identifier = "CC-BY-NC-SA-4.0"
path = ["**/.gitattributes", "**.wav", "**/__snapshots__/**"]
SPDX-FileCopyrightText = "2024 LiveKit, Inc."
SPDX-License-Identifier = "Apache-2.0"

# audio resources
[[annotations]]
path = ["agents/resources/*.ogg", "agents/resources/NOTICE"]
SPDX-FileCopyrightText = "2024 LiveKit, Inc."
SPDX-License-Identifier = "Apache-2.0"
7 changes: 5 additions & 2 deletions agents/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,14 @@
"api:update": "api-extractor run --local --typescript-compiler-folder ../node_modules/typescript --verbose"
},
"devDependencies": {
"@livekit/rtc-node": "^0.13.12",
"@ffmpeg-installer/ffmpeg": "^1.1.0",
"@livekit/rtc-node": "^0.13.13",
"@microsoft/api-extractor": "^7.35.0",
"@types/fluent-ffmpeg": "^2.1.28",
"@types/json-schema": "^7.0.15",
"@types/node": "^22.5.5",
"@types/ws": "^8.5.10",
"fluent-ffmpeg": "^2.1.3",
"tsup": "^8.4.0",
"typescript": "^5.0.0"
},
Expand All @@ -50,8 +53,8 @@
"commander": "^12.0.0",
"heap-js": "^2.6.0",
"json-schema": "^0.4.0",
"openai": "^4.91.1",
"livekit-server-sdk": "^2.13.3",
"openai": "^4.91.1",
"pidusage": "^4.0.1",
"pino": "^8.19.0",
"pino-pretty": "^11.0.0",
Expand Down
2 changes: 2 additions & 0 deletions agents/resources/NOTICE
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
keyboard-typing.ogg by Anton -- https://freesound.org/s/137/ -- License: Attribution 4.0
keyboard-typing2.opg by Anton -- https://freesound.org/s/137/ -- License: Attribution 4.0
Binary file added agents/resources/keyboard-typing.ogg
Binary file not shown.
Binary file added agents/resources/keyboard-typing2.ogg
Binary file not shown.
Binary file added agents/resources/office-ambience.ogg
Binary file not shown.
6 changes: 6 additions & 0 deletions agents/src/codecs/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
// SPDX-FileCopyrightText: 2025 LiveKit, Inc.
//
// SPDX-License-Identifier: Apache-2.0
import { audioFramesFromFile, loopAudioFramesFromFile } from './utils.js';

export { audioFramesFromFile, loopAudioFramesFromFile };
137 changes: 137 additions & 0 deletions agents/src/codecs/utils.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
// SPDX-FileCopyrightText: 2025 LiveKit, Inc.
//
// SPDX-License-Identifier: Apache-2.0
import ffmpegInstaller from '@ffmpeg-installer/ffmpeg';
import type { AudioFrame } from '@livekit/rtc-node';
import ffmpeg from 'fluent-ffmpeg';
import type { ReadableStream } from 'node:stream/web';
import { AudioByteStream } from '../audio.js';
import { log } from '../log.js';
import { createStreamChannel } from '../stream/stream_channel.js';

ffmpeg.setFfmpegPath(ffmpegInstaller.path);

export interface AudioStreamDecoderOptions {
sampleRate?: number;
numChannels?: number;
/**
* Audio format hint (e.g., 'mp3', 'ogg', 'wav', 'opus')
* If not provided, FFmpeg will auto-detect
*/
format?: string;
abortSignal?: AbortSignal;
}

/**
* Decode an audio file into AudioFrame instances
*
* @param filePath - Path to the audio file
* @param options - Decoding options
* @returns AsyncGenerator that yields AudioFrame objects
*
* @example
* ```typescript
* for await (const frame of audioFramesFromFile('audio.ogg', { sampleRate: 48000 })) {
* console.log('Frame:', frame.samplesPerChannel, 'samples');
* }
* ```
*/
export function audioFramesFromFile(
filePath: string,
options: AudioStreamDecoderOptions = {},
): ReadableStream<AudioFrame> {
const sampleRate = options.sampleRate ?? 48000;
const numChannels = options.numChannels ?? 1;

const audioStream = new AudioByteStream(sampleRate, numChannels);
const channel = createStreamChannel<AudioFrame>();
const logger = log();

// TODO (Brian): decode WAV using a custom decoder instead of FFmpeg
const command = ffmpeg(filePath)
.inputOptions([
'-probesize',
'32',
'-analyzeduration',
'0',
'-fflags',
'+nobuffer+flush_packets',
'-flags',
'low_delay',
])
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same set of flags as in python

.format('s16le') // signed 16-bit little-endian PCM to be consistent cross-platform
.audioChannels(numChannels)
.audioFrequency(sampleRate);

let commandRunning = true;

const onClose = () => {
logger.debug('Audio file playback aborted');

channel.close();
if (commandRunning) {
commandRunning = false;
command.kill('SIGKILL');
}
};

const outputStream = command.pipe();
options.abortSignal?.addEventListener('abort', onClose, { once: true });

outputStream.on('data', (chunk: Buffer) => {
const arrayBuffer = chunk.buffer.slice(
chunk.byteOffset,
chunk.byteOffset + chunk.byteLength,
) as ArrayBuffer;

const frames = audioStream.write(arrayBuffer);
for (const frame of frames) {
channel.write(frame);
}
});

outputStream.on('end', () => {
const frames = audioStream.flush();
for (const frame of frames) {
channel.write(frame);
}
commandRunning = false;
channel.close();
});

outputStream.on('error', (err: Error) => {
logger.error(err);
commandRunning = false;
onClose();
});

return channel.stream();
}

/**
* Loop audio frames from a file indefinitely
*
* @param filePath - Path to the audio file
* @param options - Decoding options
* @returns AsyncGenerator that yields AudioFrame objects in an infinite loop
*/
export async function* loopAudioFramesFromFile(
filePath: string,
options: AudioStreamDecoderOptions = {},
): AsyncGenerator<AudioFrame, void, unknown> {
const frames: AudioFrame[] = [];
const logger = log();

for await (const frame of audioFramesFromFile(filePath, options)) {
frames.push(frame);
yield frame;
}

while (!options.abortSignal?.aborted) {
for (const frame of frames) {
yield frame;
}
}

logger.debug('Audio file playback loop finished');
}
3 changes: 2 additions & 1 deletion agents/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
* @packageDocumentation
*/
import * as cli from './cli.js';
import * as codecs from './codecs/utils.js';
import * as inference from './inference/index.js';
import * as ipc from './ipc/index.js';
import * as llm from './llm/index.js';
Expand All @@ -34,4 +35,4 @@ export * from './vad.js';
export * from './version.js';
export * from './worker.js';

export { cli, inference, ipc, llm, metrics, stream, stt, tokenize, tts, voice };
export { cli, codecs, inference, ipc, llm, metrics, stream, stt, tokenize, tts, voice };
37 changes: 37 additions & 0 deletions agents/src/stream/stream_channel.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -126,4 +126,41 @@ describe('StreamChannel', () => {
const nextResult = await reader.read();
expect(nextResult.done).toBe(true);
});

it('should gracefully handle close while read is pending', async () => {
const channel = createStreamChannel<string>();
const reader = channel.stream().getReader();

const readPromise = reader.read();

await channel.close();

const result = await readPromise;
expect(result.done).toBe(true);
expect(result.value).toBeUndefined();
});
Comment on lines +130 to +141
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add tests to make sure buffered read after close still reads correct values


it('should complete all pending reads when closed', async () => {
const channel = createStreamChannel<number>();
const reader = channel.stream().getReader();

const read1 = reader.read();
const read2 = reader.read();
const read3 = reader.read();

await channel.write(42);
await channel.write(43);
await channel.close();

const result1 = await read1;
expect(result1.done).toBe(false);
expect(result1.value).toBe(42);

const result2 = await read2;
expect(result2.done).toBe(false);
expect(result2.value).toBe(43);

const result3 = await read3;
expect(result3.done).toBe(true);
});
});
60 changes: 60 additions & 0 deletions examples/src/play_local_audio_file.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// SPDX-FileCopyrightText: 2025 LiveKit, Inc.
//
// SPDX-License-Identifier: Apache-2.0
import { type JobContext, WorkerOptions, cli, codecs, defineAgent, log } from '@livekit/agents';
import { AudioSource, LocalAudioTrack, TrackPublishOptions, TrackSource } from '@livekit/rtc-node';
import { dirname, join } from 'node:path';
import { fileURLToPath } from 'node:url';

export default defineAgent({
entry: async (ctx: JobContext) => {
const logger = log();

await ctx.connect();

logger.info('Playing audio file to LiveKit track...');

const audioSource = new AudioSource(48000, 1);

const track = LocalAudioTrack.createAudioTrack('background_audio', audioSource);

const publication = await ctx.room.localParticipant!.publishTrack(
track,
new TrackPublishOptions({
source: TrackSource.SOURCE_MICROPHONE,
}),
);

await publication.waitForSubscription();

logger.info(`Audio track published: ${publication?.sid}`);

const currentDir = dirname(fileURLToPath(import.meta.url));
const resourcesPath = join(currentDir, '../../agents/resources');
const audioFile = join(resourcesPath, 'office-ambience.ogg');

logger.info(`Playing: ${audioFile}`);

const abortController = new AbortController();

ctx.addShutdownCallback(async () => {
abortController.abort();
});

let frameCount = 0;
for await (const frame of codecs.loopAudioFramesFromFile(audioFile, {
sampleRate: 48000,
numChannels: 1,
abortSignal: abortController.signal,
})) {
await audioSource.captureFrame(frame);
frameCount++;

if (frameCount % 100 === 0) {
logger.info(`Played ${frameCount} frames (${(frameCount * 0.1).toFixed(1)}s)`);
}
}
},
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

example of directly playing an audio file on server side and send to livekit.

});

cli.runApp(new WorkerOptions({ agent: fileURLToPath(import.meta.url) }));
Loading