Skip to content

Commit 359cefa

Browse files
authored
AsyncQueue-based AudioMixer (#567)
* Audio Mixer ported from python * AsyncQueue-based AudioMixer based on the work of @toubatbrian * prettier + changeset * remove console logs and and add some comments * use deque * add test for async_queue, fix close, and pin deque * lint
1 parent 8701072 commit 359cefa

File tree

8 files changed

+921
-2
lines changed

8 files changed

+921
-2
lines changed

.changeset/plenty-garlics-watch.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'@livekit/rtc-node': patch
3+
---
4+
5+
added AsyncQueue-based AudioMixer

packages/livekit-rtc/package.json

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,18 +47,19 @@
4747
},
4848
"dependencies": {
4949
"@bufbuild/protobuf": "^1.10.1",
50+
"@datastructures-js/deque": "1.0.8",
5051
"@livekit/mutex": "^1.0.0",
5152
"@livekit/typed-emitter": "^3.0.0",
5253
"pino": "^9.0.0",
5354
"pino-pretty": "^13.0.0"
5455
},
5556
"devDependencies": {
57+
"@bufbuild/protoc-gen-es": "^1.10.1",
5658
"@napi-rs/cli": "^2.18.0",
5759
"@types/node": "^22.13.10",
5860
"prettier": "^3.0.3",
5961
"tsup": "^8.3.5",
60-
"typescript": "5.8.2",
61-
"@bufbuild/protoc-gen-es": "^1.10.1"
62+
"typescript": "5.8.2"
6263
},
6364
"optionalDependencies": {
6465
"@livekit/rtc-node-darwin-arm64": "workspace:*",
Lines changed: 250 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,250 @@
1+
// SPDX-FileCopyrightText: 2025 LiveKit, Inc.
2+
//
3+
// SPDX-License-Identifier: Apache-2.0
4+
import { describe, expect, it } from 'vitest';
5+
import { AsyncQueue } from './async_queue.js';
6+
7+
describe('AsyncQueue', () => {
8+
it('allows basic put and get operations', async () => {
9+
const queue = new AsyncQueue<number>();
10+
11+
await queue.put(1);
12+
await queue.put(2);
13+
await queue.put(3);
14+
15+
expect(queue.get()).toBe(1);
16+
expect(queue.get()).toBe(2);
17+
expect(queue.get()).toBe(3);
18+
expect(queue.get()).toBe(undefined);
19+
});
20+
21+
it('respects capacity limits', async () => {
22+
const queue = new AsyncQueue<number>(2);
23+
24+
// Fill the queue to capacity
25+
await queue.put(1);
26+
await queue.put(2);
27+
28+
// Try to put a third item - this should block
29+
let putCompleted = false;
30+
const putPromise = queue.put(3).then(() => {
31+
putCompleted = true;
32+
});
33+
34+
// Wait a bit to ensure put() is blocked
35+
await new Promise((resolve) => setTimeout(resolve, 10));
36+
expect(putCompleted).toBe(false);
37+
38+
// Get an item to make space
39+
expect(queue.get()).toBe(1);
40+
41+
// Now the put should complete
42+
await putPromise;
43+
expect(putCompleted).toBe(true);
44+
45+
expect(queue.get()).toBe(2);
46+
expect(queue.get()).toBe(3);
47+
});
48+
49+
it('blocks consumers when queue is empty', async () => {
50+
const queue = new AsyncQueue<number>();
51+
52+
// Start waiting for an item
53+
let itemAvailable = false;
54+
const waitPromise = queue.waitForItem().then(() => {
55+
itemAvailable = true;
56+
});
57+
58+
// Wait a bit to ensure waitForItem() is blocked
59+
await new Promise((resolve) => setTimeout(resolve, 10));
60+
expect(itemAvailable).toBe(false);
61+
62+
// Put an item
63+
await queue.put(42);
64+
65+
// Now waitForItem should resolve
66+
await waitPromise;
67+
expect(itemAvailable).toBe(true);
68+
69+
expect(queue.get()).toBe(42);
70+
});
71+
72+
it('returns immediately from waitForItem if items exist', async () => {
73+
const queue = new AsyncQueue<number>();
74+
75+
await queue.put(1);
76+
await queue.put(2);
77+
78+
// Should return immediately since items are available
79+
await queue.waitForItem();
80+
expect(queue.get()).toBe(1);
81+
});
82+
83+
it('handles close correctly', async () => {
84+
const queue = new AsyncQueue<number>();
85+
86+
// Add some items
87+
await queue.put(1);
88+
await queue.put(2);
89+
90+
// Close the queue
91+
queue.close();
92+
93+
// Should be able to get existing items
94+
expect(queue.get()).toBe(1);
95+
expect(queue.get()).toBe(2);
96+
97+
// Trying to put should throw
98+
await expect(queue.put(3)).rejects.toThrow('Queue closed');
99+
100+
// waitForItem should return immediately when closed
101+
await queue.waitForItem();
102+
expect(queue.closed).toBe(true);
103+
});
104+
105+
it('wakes up waiting producers when closed', async () => {
106+
const queue = new AsyncQueue<number>(1);
107+
108+
// Fill the queue
109+
await queue.put(1);
110+
111+
// Try to put another item (will block)
112+
let putRejected = false;
113+
const putPromise = queue.put(2).catch(() => {
114+
putRejected = true;
115+
});
116+
117+
// Wait a bit
118+
await new Promise((resolve) => setTimeout(resolve, 10));
119+
120+
// Close the queue
121+
queue.close();
122+
123+
// The blocked put should reject
124+
await putPromise;
125+
expect(putRejected).toBe(true);
126+
});
127+
128+
it('wakes up waiting consumers when closed', async () => {
129+
const queue = new AsyncQueue<number>();
130+
131+
// Start waiting for an item
132+
const waitPromise = queue.waitForItem();
133+
134+
// Wait a bit
135+
await new Promise((resolve) => setTimeout(resolve, 10));
136+
137+
// Close the queue
138+
queue.close();
139+
140+
// waitForItem should resolve
141+
await waitPromise;
142+
expect(queue.closed).toBe(true);
143+
});
144+
145+
it('handles multiple waiting producers', async () => {
146+
const queue = new AsyncQueue<number>(1);
147+
148+
// Fill the queue
149+
await queue.put(1);
150+
151+
// Start multiple producers waiting
152+
const put2 = queue.put(2);
153+
const put3 = queue.put(3);
154+
155+
// Get items to allow producers to proceed
156+
expect(queue.get()).toBe(1);
157+
await put2;
158+
expect(queue.get()).toBe(2);
159+
await put3;
160+
expect(queue.get()).toBe(3);
161+
});
162+
163+
it('handles multiple waiting consumers', async () => {
164+
const queue = new AsyncQueue<number>();
165+
166+
// Start multiple consumers waiting
167+
const wait1 = queue.waitForItem();
168+
const wait2 = queue.waitForItem();
169+
170+
// Put items
171+
await queue.put(1);
172+
await queue.put(2);
173+
174+
// Both waits should resolve
175+
await Promise.all([wait1, wait2]);
176+
177+
expect(queue.length).toBe(2);
178+
});
179+
180+
it('reports length correctly', async () => {
181+
const queue = new AsyncQueue<number>();
182+
183+
expect(queue.length).toBe(0);
184+
185+
await queue.put(1);
186+
expect(queue.length).toBe(1);
187+
188+
await queue.put(2);
189+
expect(queue.length).toBe(2);
190+
191+
queue.get();
192+
expect(queue.length).toBe(1);
193+
194+
queue.get();
195+
expect(queue.length).toBe(0);
196+
});
197+
198+
it('handles unbounded queue (infinite capacity)', async () => {
199+
const queue = new AsyncQueue<number>(); // No capacity specified
200+
201+
// Should be able to add many items without blocking
202+
for (let i = 0; i < 1000; i++) {
203+
await queue.put(i);
204+
}
205+
206+
expect(queue.length).toBe(1000);
207+
208+
// Get them all back
209+
for (let i = 0; i < 1000; i++) {
210+
expect(queue.get()).toBe(i);
211+
}
212+
213+
expect(queue.length).toBe(0);
214+
});
215+
216+
it('handles concurrent put and get operations', async () => {
217+
const queue = new AsyncQueue<number>(5);
218+
219+
const consumed: number[] = [];
220+
221+
// Start concurrent producers
222+
const producers = Array.from({ length: 10 }, (_, i) =>
223+
(async () => {
224+
await queue.put(i);
225+
})(),
226+
);
227+
228+
// Start concurrent consumers - each consumer tries to get items until queue is empty
229+
const consumers = Array.from({ length: 10 }, () =>
230+
(async () => {
231+
while (true) {
232+
await queue.waitForItem();
233+
const item = queue.get();
234+
if (item !== undefined) {
235+
consumed.push(item);
236+
break; // Each consumer gets one item
237+
}
238+
// If item is undefined, another consumer got it first, try again
239+
}
240+
})(),
241+
);
242+
243+
// Wait for all to complete
244+
await Promise.all([...producers, ...consumers]);
245+
246+
// Should have consumed all items
247+
expect(consumed.length).toBe(10);
248+
expect(consumed.sort((a, b) => a - b)).toEqual([0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
249+
});
250+
});
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
// SPDX-FileCopyrightText: 2025 LiveKit, Inc.
2+
//
3+
// SPDX-License-Identifier: Apache-2.0
4+
import { Deque } from '@datastructures-js/deque';
5+
6+
/**
7+
* AsyncQueue is a bounded queue with async support for both producers and consumers.
8+
*
9+
* This queue simplifies the AudioMixer implementation by handling backpressure and
10+
* synchronization automatically:
11+
* - Producers can await put() until the queue has space (when queue is full)
12+
* - Consumers can await waitForItem() until data is available (when queue is empty)
13+
*
14+
* This eliminates the need for manual coordination logic, polling loops, and
15+
* complex state management throughout the rest of the codebase.
16+
*/
17+
export class AsyncQueue<T> {
18+
private items: T[] = [];
19+
private waitingProducers = new Deque<{ resolve: () => void; reject: (err: Error) => void }>();
20+
private waitingConsumers = new Deque<() => void>();
21+
closed = false;
22+
23+
constructor(private capacity: number = Infinity) {}
24+
25+
async put(item: T) {
26+
if (this.closed) throw new Error('Queue closed');
27+
28+
while (this.items.length >= this.capacity) {
29+
await new Promise<void>((resolve, reject) =>
30+
this.waitingProducers.pushBack({ resolve, reject }),
31+
);
32+
// Re-check if closed after waking up
33+
if (this.closed) throw new Error('Queue closed');
34+
}
35+
36+
this.items.push(item);
37+
38+
// Wake up one waiting consumer
39+
if (this.waitingConsumers.size() > 0) {
40+
const resolve = this.waitingConsumers.popFront()!;
41+
resolve();
42+
}
43+
}
44+
45+
get(): T | undefined {
46+
const item = this.items.shift();
47+
if (this.waitingProducers.size() > 0) {
48+
const producer = this.waitingProducers.popFront()!;
49+
producer.resolve(); // wakes up one waiting producer
50+
}
51+
return item;
52+
}
53+
54+
/**
55+
* Wait until an item is available or the queue is closed.
56+
* Returns immediately if items are already available.
57+
*/
58+
async waitForItem(): Promise<void> {
59+
if (this.items.length > 0 || this.closed) {
60+
return;
61+
}
62+
await new Promise<void>((resolve) => this.waitingConsumers.pushBack(resolve));
63+
}
64+
65+
close() {
66+
this.closed = true;
67+
// Reject all waiting producers with an error
68+
this.waitingProducers
69+
.toArray()
70+
.forEach((producer) => producer.reject(new Error('Queue closed')));
71+
// Resolve all waiting consumers so they can see the queue is closed
72+
this.waitingConsumers.toArray().forEach((resolve) => resolve());
73+
this.waitingProducers.clear();
74+
this.waitingConsumers.clear();
75+
}
76+
77+
get length() {
78+
return this.items.length;
79+
}
80+
}

0 commit comments

Comments
 (0)