Skip to content
Draft
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 121 additions & 6 deletions src/state/CallViewModel/localMember/Publisher.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,19 @@
type Mock,
vi,
} from "vitest";
import { ConnectionState as LivekitConenctionState } from "livekit-client";
import { type BehaviorSubject } from "rxjs";
import {
ConnectionState as LivekitConenctionState,
LocalParticipant,
type LocalTrack,
type LocalTrackPublication,
} from "livekit-client";
import { BehaviorSubject } from "rxjs";
import { logger } from "matrix-js-sdk/lib/logger";

import { ObservableScope } from "../../ObservableScope";
import { constant } from "../../Behavior";
import {
flushPromises,
mockLivekitRoom,
mockLocalParticipant,
mockMediaDevices,
Expand All @@ -33,8 +39,15 @@
import { type MuteStates } from "../../MuteStates";
import { FailToStartLivekitConnection } from "../../../utils/errors";

let scope: ObservableScope;

beforeEach(() => {
scope = new ObservableScope();
});

afterEach(() => scope.end());
Comment on lines +42 to +48
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You might also find the testScope utility to be convenient


describe("Publisher", () => {
let scope: ObservableScope;
let connection: Connection;
let muteStates: MuteStates;
beforeEach(() => {
Expand All @@ -50,7 +63,6 @@
setHandler: vi.fn(),
},
} as unknown as MuteStates;
scope = new ObservableScope();
connection = {
state$: constant({
state: "ConnectedToLkRoom",
Expand All @@ -62,8 +74,6 @@
} as unknown as Connection;
});

afterEach(() => scope.end());

it("throws if livekit room could not publish", async () => {
const publisher = new Publisher(
scope,
Expand Down Expand Up @@ -100,7 +110,7 @@
connection.livekitRoom.localParticipant.publishTrack as Mock
).mockRejectedValue(Error("testError"));

await expect(publisher.startPublishing()).rejects.toThrow(

Check failure on line 113 in src/state/CallViewModel/localMember/Publisher.test.ts

View workflow job for this annotation

GitHub Actions / Run unit tests

src/state/CallViewModel/localMember/Publisher.test.ts > Publisher > throws if livekit room could not publish

AssertionError: expected a thrown error to be Error: Failed to start Livekit connection { …(4) } - Expected + Received - FailToStartLivekitConnection { - "message": "Failed to start Livekit connection", - "localisedTitle": "Failed to start Livekit connection", - "localisedMessage": "testError", - "category": "NETWORK_CONNECTIVITY", - "code": "FAILED_TO_START_LIVEKIT", + TypeError { + "message": "track.mute is not a function", } ❯ src/state/CallViewModel/localMember/Publisher.test.ts:113:5
new FailToStartLivekitConnection("testError"),
);

Expand Down Expand Up @@ -138,3 +148,108 @@
).toHaveBeenCalledTimes(3);
});
});

describe("Bug fix", () => {
// There is a race condition when creating and publishing tracks while the mute state changes.
// This race condition could cause tracks to be published even though they are muted at the
// beginning of a call coming from lobby.
// This is caused by our stack using manually the low level API to create and publish tracks,
// but also using the higher level setMicrophoneEnabled and setCameraEnabled functions that also create
// and publish tracks, and managing pending publications.
// Race is as follow, on creation of the Publisher we create the tracks then publish them.
// If in the middle of that process the mute state changes:
// - the `setMicrophoneEnabled` will be no-op because it is not aware of our created track and can't see any pending publication
// - If start publication is requested it will publish the track even though there was a mute request.
it("wrongly publish tracks while muted", async () => {
const audioEnabled$ = new BehaviorSubject(true);
const muteStates = {
audio: {
enabled$: audioEnabled$,
unsetHandler: vi.fn(),
setHandler: vi.fn(),
},
video: {
enabled$: constant(false),
unsetHandler: vi.fn(),
setHandler: vi.fn(),
},
} as unknown as MuteStates;

const mockSendDataPacket = vi.fn();
const mockEngine = {
client: {
sendUpdateLocalMetadata: vi.fn(),
},
on: vi.fn().mockReturnThis(),
sendDataPacket: mockSendDataPacket,
};

// cont mockRoomOptions = {} as InternalRoomOptions;

const localParticipant = new LocalParticipant(
"local-sid",
"local-identity",
// @ts-expect-error - for that test we want a real LocalParticipant to have the pending publications logic
mockEngine,
{},
new Map(),
{},
);

const connection = {
state$: constant({
state: "ConnectedToLkRoom",
livekitConnectionState$: constant(LivekitConenctionState.Connected),
}),
livekitRoom: mockLivekitRoom({
localParticipant,
}),
} as unknown as Connection;

const mediaDevices = mockMediaDevices({});

const mockTrack = vi.mocked<LocalTrack>({
kind: "audio",
mute: vi.fn(),
} as Partial<LocalTrack> as LocalTrack);
const createTrackLock = Promise.withResolvers<void>();
const createTrackSpy = vi.spyOn(localParticipant, "createTracks");
createTrackSpy.mockImplementation(async () => {
await createTrackLock.promise;
return [mockTrack];
});

const publishTrackSpy = vi.spyOn(localParticipant, "publishTrack");
publishTrackSpy.mockResolvedValue({} as unknown as LocalTrackPublication);

const publisher = new Publisher(
scope,
connection,
mediaDevices,
muteStates,
constant({ supported: false, processor: undefined }),
logger,
);

// Initially the audio is unmuted, so creating tracks should publish the audio track
const createTracks = publisher.createAndSetupTracks();
publisher.tracks$.subscribe(() => {
void publisher.startPublishing();
});
// now mute the audio before allowing track creation to complete
audioEnabled$.next(false);
// const publishing = publisher.startPublishing();
createTrackLock.resolve();
await createTracks;
// await publishing;

await flushPromises();

// It should not publish or instead call track.mute()
try {
expect(publishTrackSpy).not.toHaveBeenCalled();
} catch {
expect(mockTrack.mute).toHaveBeenCalled();
}
});
});
35 changes: 26 additions & 9 deletions src/state/CallViewModel/localMember/Publisher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ export class Publisher {
this.logger.error("Failed to create tracks", error);
});
}
// TODO why throw here? should we just do nothing?
throw Error("audio and video is false");
}

Expand Down Expand Up @@ -184,16 +185,32 @@ export class Publisher {

for (const track of this.tracks$.value) {
this.logger.info("publish ", this.tracks$.value.length, "tracks");
// TODO: handle errors? Needs the signaling connection to be up, but it has some retries internally
// with a timeout.
await lkRoom.localParticipant.publishTrack(track).catch((error) => {
this.logger.error("Failed to publish track", error);
throw new FailToStartLivekitConnection(
error instanceof Error ? error.message : error,
);
});
this.logger.info("published track ", track.kind, track.id);

// XXXX: Patch: Check if the track has been muted manually before publishing
// This is only a patch, the proper way would be to use livekit high-level enabled/disabled APIs
// or only use the low level create/publish APIs and have our own pending publication protection.
// Maybe we could change the livekit api to pre-load tracks without publishing them yet?
// Are we sure this is needed at all? What are the gains?
const isEnabled =
track.kind === Track.Kind.Audio
? this.muteStates.audio.enabled$.value
: this.muteStates.video.enabled$.value;

if (!isEnabled) {
// TODO should we also drop it?
// I believe the high-level LiveKit APIs will recreate a track?
await track.mute();
} else {
// TODO: handle errors? Needs the signaling connection to be up, but it has some retries internally
// with a timeout.
await lkRoom.localParticipant.publishTrack(track).catch((error) => {
this.logger.error("Failed to publish track", error);
throw new FailToStartLivekitConnection(
error instanceof Error ? error.message : error,
);
});
this.logger.info("published track ", track.kind, track.id);
}
// TODO: check if the connection is still active? and break the loop if not?
}
this._publishing$.next(true);
Expand Down
Loading