diff --git a/.changeset/sour-lemons-relate.md b/.changeset/sour-lemons-relate.md new file mode 100644 index 00000000..8d73ef76 --- /dev/null +++ b/.changeset/sour-lemons-relate.md @@ -0,0 +1,5 @@ +--- +"@livekit/rtc-node": patch +--- + +Move RPC handler functions to room diff --git a/examples/rpc/index.ts b/examples/rpc/index.ts index c70652b8..2939d18f 100644 --- a/examples/rpc/index.ts +++ b/examples/rpc/index.ts @@ -61,7 +61,7 @@ async function main() { } const registerReceiverMethods = (greetersRoom: Room, mathGeniusRoom: Room) => { - greetersRoom.localParticipant?.registerRpcMethod( + greetersRoom.registerRpcMethod( 'arrival', async (data: RpcInvocationData) => { console.log(`[Greeter] Oh ${data.callerIdentity} arrived and said "${data.payload}"`); @@ -70,7 +70,7 @@ const registerReceiverMethods = (greetersRoom: Room, mathGeniusRoom: Room) => { }, ); - mathGeniusRoom.localParticipant?.registerRpcMethod( + mathGeniusRoom.registerRpcMethod( 'square-root', async (data: RpcInvocationData) => { const jsonData = JSON.parse(data.payload); @@ -88,7 +88,7 @@ const registerReceiverMethods = (greetersRoom: Room, mathGeniusRoom: Room) => { }, ); - mathGeniusRoom.localParticipant?.registerRpcMethod( + mathGeniusRoom.registerRpcMethod( 'divide', async (data: RpcInvocationData) => { const jsonData = JSON.parse(data.payload); diff --git a/packages/livekit-rtc/rust-sdks b/packages/livekit-rtc/rust-sdks index cbf07356..1f3d9a3e 160000 --- a/packages/livekit-rtc/rust-sdks +++ b/packages/livekit-rtc/rust-sdks @@ -1 +1 @@ -Subproject commit cbf07356fb987db03259f0549c0869679a0e41e8 +Subproject commit 1f3d9a3e5b88daabc3c0a48ebd0fbac8d18578b6 diff --git a/packages/livekit-rtc/src/participant.ts b/packages/livekit-rtc/src/participant.ts index 76a5b09a..55781b2d 100644 --- a/packages/livekit-rtc/src/participant.ts +++ b/packages/livekit-rtc/src/participant.ts @@ -89,6 +89,7 @@ import { LocalTrackPublication } from './track_publication.js'; import type { Transcription } from './transcription.js'; import type { ChatMessage } from './types.js'; import { numberToBigInt, splitUtf8 } from './utils.js'; +import type { Room } from './room.js'; const STREAM_CHUNK_SIZE = 15_000; @@ -155,10 +156,15 @@ export type DataPublishOptions = { }; export class LocalParticipant extends Participant { - private rpcHandlers: Map Promise> = new Map(); + private room?: Room; trackPublications: Map = new Map(); + constructor(owned_info: OwnedParticipant, room: Room) { + super(owned_info); + this.room = room; + } + async publishData(data: Uint8Array, options: DataPublishOptions) { const req = new PublishDataRequest({ localParticipantHandle: this.ffi_handle.handle, @@ -739,6 +745,8 @@ export class LocalParticipant extends Participant { } /** + * @deprecated Use `room.registerRpcMethod` instead + * * Establishes the participant as a receiver for calls of the specified RPC method. * Will overwrite any existing callback for the same method. * @@ -767,82 +775,18 @@ export class LocalParticipant extends Participant { * Other errors thrown in your handler will not be transmitted as-is, and will instead arrive to the caller as `1500` ("Application Error"). */ registerRpcMethod(method: string, handler: (data: RpcInvocationData) => Promise) { - this.rpcHandlers.set(method, handler); - - const req = new RegisterRpcMethodRequest({ - localParticipantHandle: this.ffi_handle.handle, - method, - }); - - FfiClient.instance.request({ - message: { case: 'registerRpcMethod', value: req }, - }); + this.room?.registerRpcMethod(method, handler); } /** + * @deprecated Use `room.unregisterRpcMethod` instead + * * Unregisters a previously registered RPC method. * * @param method - The name of the RPC method to unregister */ unregisterRpcMethod(method: string) { - this.rpcHandlers.delete(method); - - const req = new UnregisterRpcMethodRequest({ - localParticipantHandle: this.ffi_handle.handle, - method, - }); - - FfiClient.instance.request({ - message: { case: 'unregisterRpcMethod', value: req }, - }); - } - - /** @internal */ - async handleRpcMethodInvocation( - invocationId: bigint, - method: string, - requestId: string, - callerIdentity: string, - payload: string, - responseTimeout: number, - ) { - let responseError: RpcError | null = null; - let responsePayload: string | null = null; - - const handler = this.rpcHandlers.get(method); - - if (!handler) { - responseError = RpcError.builtIn('UNSUPPORTED_METHOD'); - } else { - try { - responsePayload = await handler({ requestId, callerIdentity, payload, responseTimeout }); - } catch (error) { - if (error instanceof RpcError) { - responseError = error; - } else { - console.warn( - `Uncaught error returned by RPC handler for ${method}. Returning APPLICATION_ERROR instead.`, - error, - ); - responseError = RpcError.builtIn('APPLICATION_ERROR'); - } - } - } - - const req = new RpcMethodInvocationResponseRequest({ - localParticipantHandle: this.ffi_handle.handle, - invocationId, - error: responseError ? responseError.toProto() : undefined, - payload: responsePayload ?? undefined, - }); - - const res = FfiClient.instance.request({ - message: { case: 'rpcMethodInvocationResponse', value: req }, - }); - - if (res.error) { - console.warn(`error sending rpc method invocation response: ${res.error}`); - } + this.room?.unregisterRpcMethod(method); } } diff --git a/packages/livekit-rtc/src/proto/audio_frame_pb.ts b/packages/livekit-rtc/src/proto/audio_frame_pb.ts index c917f6b0..b6a8b8aa 100644 --- a/packages/livekit-rtc/src/proto/audio_frame_pb.ts +++ b/packages/livekit-rtc/src/proto/audio_frame_pb.ts @@ -199,6 +199,18 @@ export class NewAudioStreamRequest extends Message { */ numChannels?: number; + /** + * Unique identifier passed in LoadAudioFilterPluginRequest + * + * @generated from field: optional string audio_filter_module_id = 5; + */ + audioFilterModuleId?: string; + + /** + * @generated from field: optional string audio_filter_options = 6; + */ + audioFilterOptions?: string; + constructor(data?: PartialMessage) { super(); proto2.util.initPartial(data, this); @@ -211,6 +223,8 @@ export class NewAudioStreamRequest extends Message { { no: 2, name: "type", kind: "enum", T: proto2.getEnumType(AudioStreamType), req: true }, { no: 3, name: "sample_rate", kind: "scalar", T: 13 /* ScalarType.UINT32 */, opt: true }, { no: 4, name: "num_channels", kind: "scalar", T: 13 /* ScalarType.UINT32 */, opt: true }, + { no: 5, name: "audio_filter_module_id", kind: "scalar", T: 9 /* ScalarType.STRING */, opt: true }, + { no: 6, name: "audio_filter_options", kind: "scalar", T: 9 /* ScalarType.STRING */, opt: true }, ]); static fromBinary(bytes: Uint8Array, options?: Partial): NewAudioStreamRequest { @@ -296,6 +310,16 @@ export class AudioStreamFromParticipantRequest extends Message) { super(); proto2.util.initPartial(data, this); @@ -309,6 +333,8 @@ export class AudioStreamFromParticipantRequest extends Message): AudioStreamFromParticipantRequest { @@ -823,6 +849,302 @@ export class RemixAndResampleResponse extends Message } } +/** + * @generated from message livekit.proto.NewApmRequest + */ +export class NewApmRequest extends Message { + /** + * @generated from field: required bool echo_canceller_enabled = 1; + */ + echoCancellerEnabled?: boolean; + + /** + * @generated from field: required bool gain_controller_enabled = 2; + */ + gainControllerEnabled?: boolean; + + /** + * @generated from field: required bool high_pass_filter_enabled = 3; + */ + highPassFilterEnabled?: boolean; + + /** + * @generated from field: required bool noise_suppression_enabled = 4; + */ + noiseSuppressionEnabled?: boolean; + + constructor(data?: PartialMessage) { + super(); + proto2.util.initPartial(data, this); + } + + static readonly runtime: typeof proto2 = proto2; + static readonly typeName = "livekit.proto.NewApmRequest"; + static readonly fields: FieldList = proto2.util.newFieldList(() => [ + { no: 1, name: "echo_canceller_enabled", kind: "scalar", T: 8 /* ScalarType.BOOL */, req: true }, + { no: 2, name: "gain_controller_enabled", kind: "scalar", T: 8 /* ScalarType.BOOL */, req: true }, + { no: 3, name: "high_pass_filter_enabled", kind: "scalar", T: 8 /* ScalarType.BOOL */, req: true }, + { no: 4, name: "noise_suppression_enabled", kind: "scalar", T: 8 /* ScalarType.BOOL */, req: true }, + ]); + + static fromBinary(bytes: Uint8Array, options?: Partial): NewApmRequest { + return new NewApmRequest().fromBinary(bytes, options); + } + + static fromJson(jsonValue: JsonValue, options?: Partial): NewApmRequest { + return new NewApmRequest().fromJson(jsonValue, options); + } + + static fromJsonString(jsonString: string, options?: Partial): NewApmRequest { + return new NewApmRequest().fromJsonString(jsonString, options); + } + + static equals(a: NewApmRequest | PlainMessage | undefined, b: NewApmRequest | PlainMessage | undefined): boolean { + return proto2.util.equals(NewApmRequest, a, b); + } +} + +/** + * @generated from message livekit.proto.NewApmResponse + */ +export class NewApmResponse extends Message { + /** + * @generated from field: required livekit.proto.OwnedApm apm = 1; + */ + apm?: OwnedApm; + + constructor(data?: PartialMessage) { + super(); + proto2.util.initPartial(data, this); + } + + static readonly runtime: typeof proto2 = proto2; + static readonly typeName = "livekit.proto.NewApmResponse"; + static readonly fields: FieldList = proto2.util.newFieldList(() => [ + { no: 1, name: "apm", kind: "message", T: OwnedApm, req: true }, + ]); + + static fromBinary(bytes: Uint8Array, options?: Partial): NewApmResponse { + return new NewApmResponse().fromBinary(bytes, options); + } + + static fromJson(jsonValue: JsonValue, options?: Partial): NewApmResponse { + return new NewApmResponse().fromJson(jsonValue, options); + } + + static fromJsonString(jsonString: string, options?: Partial): NewApmResponse { + return new NewApmResponse().fromJsonString(jsonString, options); + } + + static equals(a: NewApmResponse | PlainMessage | undefined, b: NewApmResponse | PlainMessage | undefined): boolean { + return proto2.util.equals(NewApmResponse, a, b); + } +} + +/** + * @generated from message livekit.proto.ApmProcessStreamRequest + */ +export class ApmProcessStreamRequest extends Message { + /** + * @generated from field: required uint64 apm_handle = 1; + */ + apmHandle?: bigint; + + /** + * *mut i16 + * + * @generated from field: required uint64 data_ptr = 2; + */ + dataPtr?: bigint; + + /** + * in bytes + * + * @generated from field: required uint32 size = 3; + */ + size?: number; + + /** + * @generated from field: required uint32 sample_rate = 4; + */ + sampleRate?: number; + + /** + * @generated from field: required uint32 num_channels = 5; + */ + numChannels?: number; + + constructor(data?: PartialMessage) { + super(); + proto2.util.initPartial(data, this); + } + + static readonly runtime: typeof proto2 = proto2; + static readonly typeName = "livekit.proto.ApmProcessStreamRequest"; + static readonly fields: FieldList = proto2.util.newFieldList(() => [ + { no: 1, name: "apm_handle", kind: "scalar", T: 4 /* ScalarType.UINT64 */, req: true }, + { no: 2, name: "data_ptr", kind: "scalar", T: 4 /* ScalarType.UINT64 */, req: true }, + { no: 3, name: "size", kind: "scalar", T: 13 /* ScalarType.UINT32 */, req: true }, + { no: 4, name: "sample_rate", kind: "scalar", T: 13 /* ScalarType.UINT32 */, req: true }, + { no: 5, name: "num_channels", kind: "scalar", T: 13 /* ScalarType.UINT32 */, req: true }, + ]); + + static fromBinary(bytes: Uint8Array, options?: Partial): ApmProcessStreamRequest { + return new ApmProcessStreamRequest().fromBinary(bytes, options); + } + + static fromJson(jsonValue: JsonValue, options?: Partial): ApmProcessStreamRequest { + return new ApmProcessStreamRequest().fromJson(jsonValue, options); + } + + static fromJsonString(jsonString: string, options?: Partial): ApmProcessStreamRequest { + return new ApmProcessStreamRequest().fromJsonString(jsonString, options); + } + + static equals(a: ApmProcessStreamRequest | PlainMessage | undefined, b: ApmProcessStreamRequest | PlainMessage | undefined): boolean { + return proto2.util.equals(ApmProcessStreamRequest, a, b); + } +} + +/** + * @generated from message livekit.proto.ApmProcessStreamResponse + */ +export class ApmProcessStreamResponse extends Message { + /** + * @generated from field: optional string error = 1; + */ + error?: string; + + constructor(data?: PartialMessage) { + super(); + proto2.util.initPartial(data, this); + } + + static readonly runtime: typeof proto2 = proto2; + static readonly typeName = "livekit.proto.ApmProcessStreamResponse"; + static readonly fields: FieldList = proto2.util.newFieldList(() => [ + { no: 1, name: "error", kind: "scalar", T: 9 /* ScalarType.STRING */, opt: true }, + ]); + + static fromBinary(bytes: Uint8Array, options?: Partial): ApmProcessStreamResponse { + return new ApmProcessStreamResponse().fromBinary(bytes, options); + } + + static fromJson(jsonValue: JsonValue, options?: Partial): ApmProcessStreamResponse { + return new ApmProcessStreamResponse().fromJson(jsonValue, options); + } + + static fromJsonString(jsonString: string, options?: Partial): ApmProcessStreamResponse { + return new ApmProcessStreamResponse().fromJsonString(jsonString, options); + } + + static equals(a: ApmProcessStreamResponse | PlainMessage | undefined, b: ApmProcessStreamResponse | PlainMessage | undefined): boolean { + return proto2.util.equals(ApmProcessStreamResponse, a, b); + } +} + +/** + * @generated from message livekit.proto.ApmProcessReverseStreamRequest + */ +export class ApmProcessReverseStreamRequest extends Message { + /** + * @generated from field: required uint64 apm_handle = 1; + */ + apmHandle?: bigint; + + /** + * *mut i16 + * + * @generated from field: required uint64 data_ptr = 2; + */ + dataPtr?: bigint; + + /** + * in bytes + * + * @generated from field: required uint32 size = 3; + */ + size?: number; + + /** + * @generated from field: required uint32 sample_rate = 4; + */ + sampleRate?: number; + + /** + * @generated from field: required uint32 num_channels = 5; + */ + numChannels?: number; + + constructor(data?: PartialMessage) { + super(); + proto2.util.initPartial(data, this); + } + + static readonly runtime: typeof proto2 = proto2; + static readonly typeName = "livekit.proto.ApmProcessReverseStreamRequest"; + static readonly fields: FieldList = proto2.util.newFieldList(() => [ + { no: 1, name: "apm_handle", kind: "scalar", T: 4 /* ScalarType.UINT64 */, req: true }, + { no: 2, name: "data_ptr", kind: "scalar", T: 4 /* ScalarType.UINT64 */, req: true }, + { no: 3, name: "size", kind: "scalar", T: 13 /* ScalarType.UINT32 */, req: true }, + { no: 4, name: "sample_rate", kind: "scalar", T: 13 /* ScalarType.UINT32 */, req: true }, + { no: 5, name: "num_channels", kind: "scalar", T: 13 /* ScalarType.UINT32 */, req: true }, + ]); + + static fromBinary(bytes: Uint8Array, options?: Partial): ApmProcessReverseStreamRequest { + return new ApmProcessReverseStreamRequest().fromBinary(bytes, options); + } + + static fromJson(jsonValue: JsonValue, options?: Partial): ApmProcessReverseStreamRequest { + return new ApmProcessReverseStreamRequest().fromJson(jsonValue, options); + } + + static fromJsonString(jsonString: string, options?: Partial): ApmProcessReverseStreamRequest { + return new ApmProcessReverseStreamRequest().fromJsonString(jsonString, options); + } + + static equals(a: ApmProcessReverseStreamRequest | PlainMessage | undefined, b: ApmProcessReverseStreamRequest | PlainMessage | undefined): boolean { + return proto2.util.equals(ApmProcessReverseStreamRequest, a, b); + } +} + +/** + * @generated from message livekit.proto.ApmProcessReverseStreamResponse + */ +export class ApmProcessReverseStreamResponse extends Message { + /** + * @generated from field: optional string error = 1; + */ + error?: string; + + constructor(data?: PartialMessage) { + super(); + proto2.util.initPartial(data, this); + } + + static readonly runtime: typeof proto2 = proto2; + static readonly typeName = "livekit.proto.ApmProcessReverseStreamResponse"; + static readonly fields: FieldList = proto2.util.newFieldList(() => [ + { no: 1, name: "error", kind: "scalar", T: 9 /* ScalarType.STRING */, opt: true }, + ]); + + static fromBinary(bytes: Uint8Array, options?: Partial): ApmProcessReverseStreamResponse { + return new ApmProcessReverseStreamResponse().fromBinary(bytes, options); + } + + static fromJson(jsonValue: JsonValue, options?: Partial): ApmProcessReverseStreamResponse { + return new ApmProcessReverseStreamResponse().fromJson(jsonValue, options); + } + + static fromJsonString(jsonString: string, options?: Partial): ApmProcessReverseStreamResponse { + return new ApmProcessReverseStreamResponse().fromJsonString(jsonString, options); + } + + static equals(a: ApmProcessReverseStreamResponse | PlainMessage | undefined, b: ApmProcessReverseStreamResponse | PlainMessage | undefined): boolean { + return proto2.util.equals(ApmProcessReverseStreamResponse, a, b); + } +} + /** * @generated from message livekit.proto.NewSoxResamplerRequest */ @@ -1649,6 +1971,43 @@ export class OwnedAudioResampler extends Message { } } +/** + * @generated from message livekit.proto.OwnedApm + */ +export class OwnedApm extends Message { + /** + * @generated from field: required livekit.proto.FfiOwnedHandle handle = 1; + */ + handle?: FfiOwnedHandle; + + constructor(data?: PartialMessage) { + super(); + proto2.util.initPartial(data, this); + } + + static readonly runtime: typeof proto2 = proto2; + static readonly typeName = "livekit.proto.OwnedApm"; + static readonly fields: FieldList = proto2.util.newFieldList(() => [ + { no: 1, name: "handle", kind: "message", T: FfiOwnedHandle, req: true }, + ]); + + static fromBinary(bytes: Uint8Array, options?: Partial): OwnedApm { + return new OwnedApm().fromBinary(bytes, options); + } + + static fromJson(jsonValue: JsonValue, options?: Partial): OwnedApm { + return new OwnedApm().fromJson(jsonValue, options); + } + + static fromJsonString(jsonString: string, options?: Partial): OwnedApm { + return new OwnedApm().fromJsonString(jsonString, options); + } + + static equals(a: OwnedApm | PlainMessage | undefined, b: OwnedApm | PlainMessage | undefined): boolean { + return proto2.util.equals(OwnedApm, a, b); + } +} + /** * @generated from message livekit.proto.SoxResamplerInfo */ @@ -1723,3 +2082,97 @@ export class OwnedSoxResampler extends Message { } } +/** + * Audio Filter Plugin + * + * @generated from message livekit.proto.LoadAudioFilterPluginRequest + */ +export class LoadAudioFilterPluginRequest extends Message { + /** + * path for ffi audio filter plugin + * + * @generated from field: required string plugin_path = 1; + */ + pluginPath?: string; + + /** + * Optional: paths for dependency dylibs + * + * @generated from field: repeated string dependencies = 2; + */ + dependencies: string[] = []; + + /** + * Unique identifier of the plugin + * + * @generated from field: required string module_id = 3; + */ + moduleId?: string; + + constructor(data?: PartialMessage) { + super(); + proto2.util.initPartial(data, this); + } + + static readonly runtime: typeof proto2 = proto2; + static readonly typeName = "livekit.proto.LoadAudioFilterPluginRequest"; + static readonly fields: FieldList = proto2.util.newFieldList(() => [ + { no: 1, name: "plugin_path", kind: "scalar", T: 9 /* ScalarType.STRING */, req: true }, + { no: 2, name: "dependencies", kind: "scalar", T: 9 /* ScalarType.STRING */, repeated: true }, + { no: 3, name: "module_id", kind: "scalar", T: 9 /* ScalarType.STRING */, req: true }, + ]); + + static fromBinary(bytes: Uint8Array, options?: Partial): LoadAudioFilterPluginRequest { + return new LoadAudioFilterPluginRequest().fromBinary(bytes, options); + } + + static fromJson(jsonValue: JsonValue, options?: Partial): LoadAudioFilterPluginRequest { + return new LoadAudioFilterPluginRequest().fromJson(jsonValue, options); + } + + static fromJsonString(jsonString: string, options?: Partial): LoadAudioFilterPluginRequest { + return new LoadAudioFilterPluginRequest().fromJsonString(jsonString, options); + } + + static equals(a: LoadAudioFilterPluginRequest | PlainMessage | undefined, b: LoadAudioFilterPluginRequest | PlainMessage | undefined): boolean { + return proto2.util.equals(LoadAudioFilterPluginRequest, a, b); + } +} + +/** + * @generated from message livekit.proto.LoadAudioFilterPluginResponse + */ +export class LoadAudioFilterPluginResponse extends Message { + /** + * @generated from field: optional string error = 1; + */ + error?: string; + + constructor(data?: PartialMessage) { + super(); + proto2.util.initPartial(data, this); + } + + static readonly runtime: typeof proto2 = proto2; + static readonly typeName = "livekit.proto.LoadAudioFilterPluginResponse"; + static readonly fields: FieldList = proto2.util.newFieldList(() => [ + { no: 1, name: "error", kind: "scalar", T: 9 /* ScalarType.STRING */, opt: true }, + ]); + + static fromBinary(bytes: Uint8Array, options?: Partial): LoadAudioFilterPluginResponse { + return new LoadAudioFilterPluginResponse().fromBinary(bytes, options); + } + + static fromJson(jsonValue: JsonValue, options?: Partial): LoadAudioFilterPluginResponse { + return new LoadAudioFilterPluginResponse().fromJson(jsonValue, options); + } + + static fromJsonString(jsonString: string, options?: Partial): LoadAudioFilterPluginResponse { + return new LoadAudioFilterPluginResponse().fromJsonString(jsonString, options); + } + + static equals(a: LoadAudioFilterPluginResponse | PlainMessage | undefined, b: LoadAudioFilterPluginResponse | PlainMessage | undefined): boolean { + return proto2.util.equals(LoadAudioFilterPluginResponse, a, b); + } +} + diff --git a/packages/livekit-rtc/src/proto/ffi_pb.ts b/packages/livekit-rtc/src/proto/ffi_pb.ts index 2e9c21e4..5d6e964b 100644 --- a/packages/livekit-rtc/src/proto/ffi_pb.ts +++ b/packages/livekit-rtc/src/proto/ffi_pb.ts @@ -22,7 +22,7 @@ import { Message, proto2 } from "@bufbuild/protobuf"; import { ConnectCallback, ConnectRequest, ConnectResponse, DisconnectCallback, DisconnectRequest, DisconnectResponse, EditChatMessageRequest, GetSessionStatsCallback, GetSessionStatsRequest, GetSessionStatsResponse, PublishDataCallback, PublishDataRequest, PublishDataResponse, PublishSipDtmfCallback, PublishSipDtmfRequest, PublishSipDtmfResponse, PublishTrackCallback, PublishTrackRequest, PublishTrackResponse, PublishTranscriptionCallback, PublishTranscriptionRequest, PublishTranscriptionResponse, RoomEvent, SendChatMessageCallback, SendChatMessageRequest, SendChatMessageResponse, SendStreamChunkCallback, SendStreamChunkRequest, SendStreamChunkResponse, SendStreamHeaderCallback, SendStreamHeaderRequest, SendStreamHeaderResponse, SendStreamTrailerCallback, SendStreamTrailerRequest, SendStreamTrailerResponse, SetDataChannelBufferedAmountLowThresholdRequest, SetDataChannelBufferedAmountLowThresholdResponse, SetLocalAttributesCallback, SetLocalAttributesRequest, SetLocalAttributesResponse, SetLocalMetadataCallback, SetLocalMetadataRequest, SetLocalMetadataResponse, SetLocalNameCallback, SetLocalNameRequest, SetLocalNameResponse, SetSubscribedRequest, SetSubscribedResponse, UnpublishTrackCallback, UnpublishTrackRequest, UnpublishTrackResponse } from "./room_pb.js"; import { CreateAudioTrackRequest, CreateAudioTrackResponse, CreateVideoTrackRequest, CreateVideoTrackResponse, EnableRemoteTrackRequest, EnableRemoteTrackResponse, GetStatsCallback, GetStatsRequest, GetStatsResponse, LocalTrackMuteRequest, LocalTrackMuteResponse, SetTrackSubscriptionPermissionsRequest, SetTrackSubscriptionPermissionsResponse, TrackEvent } from "./track_pb.js"; import { CaptureVideoFrameRequest, CaptureVideoFrameResponse, NewVideoSourceRequest, NewVideoSourceResponse, NewVideoStreamRequest, NewVideoStreamResponse, VideoConvertRequest, VideoConvertResponse, VideoStreamEvent, VideoStreamFromParticipantRequest, VideoStreamFromParticipantResponse } from "./video_frame_pb.js"; -import { AudioStreamEvent, AudioStreamFromParticipantRequest, AudioStreamFromParticipantResponse, CaptureAudioFrameCallback, CaptureAudioFrameRequest, CaptureAudioFrameResponse, ClearAudioBufferRequest, ClearAudioBufferResponse, FlushSoxResamplerRequest, FlushSoxResamplerResponse, NewAudioResamplerRequest, NewAudioResamplerResponse, NewAudioSourceRequest, NewAudioSourceResponse, NewAudioStreamRequest, NewAudioStreamResponse, NewSoxResamplerRequest, NewSoxResamplerResponse, PushSoxResamplerRequest, PushSoxResamplerResponse, RemixAndResampleRequest, RemixAndResampleResponse } from "./audio_frame_pb.js"; +import { ApmProcessReverseStreamRequest, ApmProcessReverseStreamResponse, ApmProcessStreamRequest, ApmProcessStreamResponse, AudioStreamEvent, AudioStreamFromParticipantRequest, AudioStreamFromParticipantResponse, CaptureAudioFrameCallback, CaptureAudioFrameRequest, CaptureAudioFrameResponse, ClearAudioBufferRequest, ClearAudioBufferResponse, FlushSoxResamplerRequest, FlushSoxResamplerResponse, LoadAudioFilterPluginRequest, LoadAudioFilterPluginResponse, NewApmRequest, NewApmResponse, NewAudioResamplerRequest, NewAudioResamplerResponse, NewAudioSourceRequest, NewAudioSourceResponse, NewAudioStreamRequest, NewAudioStreamResponse, NewSoxResamplerRequest, NewSoxResamplerResponse, PushSoxResamplerRequest, PushSoxResamplerResponse, RemixAndResampleRequest, RemixAndResampleResponse } from "./audio_frame_pb.js"; import { E2eeRequest, E2eeResponse } from "./e2ee_pb.js"; import { PerformRpcCallback, PerformRpcRequest, PerformRpcResponse, RegisterRpcMethodRequest, RegisterRpcMethodResponse, RpcMethodInvocationEvent, RpcMethodInvocationResponseRequest, RpcMethodInvocationResponseResponse, UnregisterRpcMethodRequest, UnregisterRpcMethodResponse } from "./rpc_pb.js"; import { EnableRemoteTrackPublicationRequest, EnableRemoteTrackPublicationResponse, UpdateRemoteTrackPublicationDimensionRequest, UpdateRemoteTrackPublicationDimensionResponse } from "./track_publication_pb.js"; @@ -373,6 +373,32 @@ export class FfiRequest extends Message { */ value: SetDataChannelBufferedAmountLowThresholdRequest; case: "setDataChannelBufferedAmountLowThreshold"; + } | { + /** + * Audio Filter Plugin + * + * @generated from field: livekit.proto.LoadAudioFilterPluginRequest load_audio_filter_plugin = 49; + */ + value: LoadAudioFilterPluginRequest; + case: "loadAudioFilterPlugin"; + } | { + /** + * @generated from field: livekit.proto.NewApmRequest new_apm = 50; + */ + value: NewApmRequest; + case: "newApm"; + } | { + /** + * @generated from field: livekit.proto.ApmProcessStreamRequest apm_process_stream = 51; + */ + value: ApmProcessStreamRequest; + case: "apmProcessStream"; + } | { + /** + * @generated from field: livekit.proto.ApmProcessReverseStreamRequest apm_process_reverse_stream = 52; + */ + value: ApmProcessReverseStreamRequest; + case: "apmProcessReverseStream"; } | { case: undefined; value?: undefined } = { case: undefined }; constructor(data?: PartialMessage) { @@ -430,6 +456,10 @@ export class FfiRequest extends Message { { no: 45, name: "send_stream_chunk", kind: "message", T: SendStreamChunkRequest, oneof: "message" }, { no: 46, name: "send_stream_trailer", kind: "message", T: SendStreamTrailerRequest, oneof: "message" }, { no: 47, name: "set_data_channel_buffered_amount_low_threshold", kind: "message", T: SetDataChannelBufferedAmountLowThresholdRequest, oneof: "message" }, + { no: 49, name: "load_audio_filter_plugin", kind: "message", T: LoadAudioFilterPluginRequest, oneof: "message" }, + { no: 50, name: "new_apm", kind: "message", T: NewApmRequest, oneof: "message" }, + { no: 51, name: "apm_process_stream", kind: "message", T: ApmProcessStreamRequest, oneof: "message" }, + { no: 52, name: "apm_process_reverse_stream", kind: "message", T: ApmProcessReverseStreamRequest, oneof: "message" }, ]); static fromBinary(bytes: Uint8Array, options?: Partial): FfiRequest { @@ -750,6 +780,32 @@ export class FfiResponse extends Message { */ value: SetDataChannelBufferedAmountLowThresholdResponse; case: "setDataChannelBufferedAmountLowThreshold"; + } | { + /** + * Audio Filter Plugin + * + * @generated from field: livekit.proto.LoadAudioFilterPluginResponse load_audio_filter_plugin = 48; + */ + value: LoadAudioFilterPluginResponse; + case: "loadAudioFilterPlugin"; + } | { + /** + * @generated from field: livekit.proto.NewApmResponse new_apm = 49; + */ + value: NewApmResponse; + case: "newApm"; + } | { + /** + * @generated from field: livekit.proto.ApmProcessStreamResponse apm_process_stream = 50; + */ + value: ApmProcessStreamResponse; + case: "apmProcessStream"; + } | { + /** + * @generated from field: livekit.proto.ApmProcessReverseStreamResponse apm_process_reverse_stream = 51; + */ + value: ApmProcessReverseStreamResponse; + case: "apmProcessReverseStream"; } | { case: undefined; value?: undefined } = { case: undefined }; constructor(data?: PartialMessage) { @@ -806,6 +862,10 @@ export class FfiResponse extends Message { { no: 44, name: "send_stream_chunk", kind: "message", T: SendStreamChunkResponse, oneof: "message" }, { no: 45, name: "send_stream_trailer", kind: "message", T: SendStreamTrailerResponse, oneof: "message" }, { no: 46, name: "set_data_channel_buffered_amount_low_threshold", kind: "message", T: SetDataChannelBufferedAmountLowThresholdResponse, oneof: "message" }, + { no: 48, name: "load_audio_filter_plugin", kind: "message", T: LoadAudioFilterPluginResponse, oneof: "message" }, + { no: 49, name: "new_apm", kind: "message", T: NewApmResponse, oneof: "message" }, + { no: 50, name: "apm_process_stream", kind: "message", T: ApmProcessStreamResponse, oneof: "message" }, + { no: 51, name: "apm_process_reverse_stream", kind: "message", T: ApmProcessReverseStreamResponse, oneof: "message" }, ]); static fromBinary(bytes: Uint8Array, options?: Partial): FfiResponse { diff --git a/packages/livekit-rtc/src/proto/rpc_pb.ts b/packages/livekit-rtc/src/proto/rpc_pb.ts index eb303614..c5ee7590 100644 --- a/packages/livekit-rtc/src/proto/rpc_pb.ts +++ b/packages/livekit-rtc/src/proto/rpc_pb.ts @@ -137,9 +137,9 @@ export class PerformRpcRequest extends Message { */ export class RegisterRpcMethodRequest extends Message { /** - * @generated from field: required uint64 local_participant_handle = 1; + * @generated from field: required uint64 room_handle = 1; */ - localParticipantHandle?: bigint; + roomHandle?: bigint; /** * @generated from field: required string method = 2; @@ -154,7 +154,7 @@ export class RegisterRpcMethodRequest extends Message static readonly runtime: typeof proto2 = proto2; static readonly typeName = "livekit.proto.RegisterRpcMethodRequest"; static readonly fields: FieldList = proto2.util.newFieldList(() => [ - { no: 1, name: "local_participant_handle", kind: "scalar", T: 4 /* ScalarType.UINT64 */, req: true }, + { no: 1, name: "room_handle", kind: "scalar", T: 4 /* ScalarType.UINT64 */, req: true }, { no: 2, name: "method", kind: "scalar", T: 9 /* ScalarType.STRING */, req: true }, ]); @@ -180,9 +180,9 @@ export class RegisterRpcMethodRequest extends Message */ export class UnregisterRpcMethodRequest extends Message { /** - * @generated from field: required uint64 local_participant_handle = 1; + * @generated from field: required uint64 room_handle = 1; */ - localParticipantHandle?: bigint; + roomHandle?: bigint; /** * @generated from field: required string method = 2; @@ -197,7 +197,7 @@ export class UnregisterRpcMethodRequest extends Message [ - { no: 1, name: "local_participant_handle", kind: "scalar", T: 4 /* ScalarType.UINT64 */, req: true }, + { no: 1, name: "room_handle", kind: "scalar", T: 4 /* ScalarType.UINT64 */, req: true }, { no: 2, name: "method", kind: "scalar", T: 9 /* ScalarType.STRING */, req: true }, ]); diff --git a/packages/livekit-rtc/src/room.ts b/packages/livekit-rtc/src/room.ts index 78f02edf..c4eb36b6 100644 --- a/packages/livekit-rtc/src/room.ts +++ b/packages/livekit-rtc/src/room.ts @@ -45,6 +45,8 @@ import type { LocalTrackPublication, TrackPublication } from './track_publicatio import { RemoteTrackPublication } from './track_publication.js'; import type { ChatMessage } from './types.js'; import { bigIntToNumber } from './utils.js'; +import { RpcError, type RpcInvocationData } from './rpc.js'; +import { RegisterRpcMethodRequest, RegisterRpcMethodResponse, RpcMethodInvocationResponseRequest, RpcMethodInvocationResponseResponse, UnregisterRpcMethodRequest, UnregisterRpcMethodResponse } from './proto/rpc_pb.js'; export interface RtcConfiguration { iceTransportType: IceTransportType; @@ -83,6 +85,8 @@ export class Room extends (EventEmitter as new () => TypedEmitter private byteStreamHandlers = new Map(); private textStreamHandlers = new Map(); + private rpcHandlers: Map Promise> = new Map(); + e2eeManager?: E2EEManager; connectionState: ConnectionState = ConnectionState.CONN_DISCONNECTED; @@ -155,7 +159,7 @@ export class Room extends (EventEmitter as new () => TypedEmitter this.info = cb.message.value.room!.info; this.connectionState = ConnectionState.CONN_CONNECTED; - this.localParticipant = new LocalParticipant(cb.message.value.localParticipant!); + this.localParticipant = new LocalParticipant(cb.message.value.localParticipant!, this); for (const pt of cb.message.value.participants) { const rp = this.createRemoteParticipant(pt.participant!); @@ -214,6 +218,73 @@ export class Room extends (EventEmitter as new () => TypedEmitter this.byteStreamHandlers.delete(topic); } + /** + * Establishes the participant as a receiver for calls of the specified RPC method. + * Will overwrite any existing callback for the same method. + * + * @param method - The name of the indicated RPC method + * @param handler - Will be invoked when an RPC request for this method is received + * @returns A promise that resolves when the method is successfully registered + * + * @example + * ```typescript + * room.registerRpcMethod( + * 'greet', + * async (data: RpcInvocationData) => { + * console.log(`Received greeting from ${data.callerIdentity}: ${data.payload}`); + * return `Hello, ${data.callerIdentity}!`; + * } + * ); + * ``` + * + * See {@link RpcInvocationData} for more details on invocation params. + * + * The handler should return a Promise that resolves to a string. + * If unable to respond within `responseTimeout`, the request will result in an error on the caller's side. + * + * You may throw errors of type `RpcError` with a string `message` in the handler, + * and they will be received on the caller's side with the message intact. + * Other errors thrown in your handler will not be transmitted as-is, and will instead arrive to the caller as `1500` ("Application Error"). + */ + registerRpcMethod(method: string, handler: (data: RpcInvocationData) => Promise) { + if (this.ffiHandle == null) { + throw new Error(`Cannot register RPC method before room is connected`); + } + + this.rpcHandlers.set(method, handler); + + const req = new RegisterRpcMethodRequest({ + roomHandle: this.ffiHandle?.handle, + method, + }); + + FfiClient.instance.request({ + message: { case: 'registerRpcMethod', value: req }, + }); + } + + /** + * Unregisters a previously registered RPC method. + * + * @param method - The name of the RPC method to unregister + */ + unregisterRpcMethod(method: string) { + if (this.ffiHandle == null) { + throw new Error(`Cannot unregister RPC method before room is connected`); + } + + this.rpcHandlers.delete(method); + + const req = new UnregisterRpcMethodRequest({ + roomHandle: this.ffiHandle?.handle, + method, + }); + + FfiClient.instance.request({ + message: { case: 'unregisterRpcMethod', value: req }, + }); + } + private onFfiEvent = (ffiEvent: FfiEvent) => { if (!this.localParticipant || !this.ffiHandle || !this.info) { throw TypeError('cannot handle ffi events before connectCallback'); @@ -223,7 +294,7 @@ export class Room extends (EventEmitter as new () => TypedEmitter if ( ffiEvent.message.value.localParticipantHandle == this.localParticipant.ffi_handle.handle ) { - this.localParticipant.handleRpcMethodInvocation( + this.handleRpcMethodInvocation( ffiEvent.message.value.invocationId!, ffiEvent.message.value.method!, ffiEvent.message.value.requestId!, @@ -569,6 +640,53 @@ export class Room extends (EventEmitter as new () => TypedEmitter this.byteStreamControllers.delete(streamId); } } + + private async handleRpcMethodInvocation( + invocationId: bigint, + method: string, + requestId: string, + callerIdentity: string, + payload: string, + responseTimeout: number, + ) { + let responseError: RpcError | null = null; + let responsePayload: string | null = null; + + const handler = this.rpcHandlers.get(method); + + if (!handler) { + responseError = RpcError.builtIn('UNSUPPORTED_METHOD'); + } else { + try { + responsePayload = await handler({ requestId, callerIdentity, payload, responseTimeout }); + } catch (error) { + if (error instanceof RpcError) { + responseError = error; + } else { + console.warn( + `Uncaught error returned by RPC handler for ${method}. Returning APPLICATION_ERROR instead.`, + error, + ); + responseError = RpcError.builtIn('APPLICATION_ERROR'); + } + } + } + + const req = new RpcMethodInvocationResponseRequest({ + localParticipantHandle: this.localParticipant?.ffi_handle.handle, + invocationId, + error: responseError ? responseError.toProto() : undefined, + payload: responsePayload ?? undefined, + }); + + const res = FfiClient.instance.request({ + message: { case: 'rpcMethodInvocationResponse', value: req }, + }); + + if (res.error) { + console.warn(`error sending rpc method invocation response: ${res.error}`); + } + } } export class ConnectError extends Error {