diff --git a/cmd/lk/perf.go b/cmd/lk/perf.go index 35828f28..4431036f 100644 --- a/cmd/lk/perf.go +++ b/cmd/lk/perf.go @@ -77,6 +77,10 @@ var ( Name: "simulate-speakers", Usage: "Fire random speaker events to simulate speaker changes", }, + &cli.BoolFlag{ + Name: "disable-publish", + Usage: "Disable publishing permission for participants (canPublish: false)", + }, &cli.BoolFlag{ Name: "run-all", Usage: "Runs set list of load test cases", @@ -170,6 +174,10 @@ var ( Name: "simulate-speakers", Usage: "Fire random speaker events to simulate speaker changes", }, + &cli.BoolFlag{ + Name: "disable-publish", + Usage: "Disable publishing permission for participants (canPublish: false)", + }, &cli.BoolFlag{ Name: "run-all", Usage: "Runs set list of load test cases", @@ -198,6 +206,7 @@ func loadTest(ctx context.Context, cmd *cli.Command) error { NumPerSecond: cmd.Float("num-per-second"), Simulcast: !cmd.Bool("no-simulcast"), SimulateSpeakers: cmd.Bool("simulate-speakers"), + CanPublish: !cmd.Bool("disable-publish"), // if disable-publish is true, CanPublish becomes false TesterParams: loadtester.TesterParams{ URL: pc.URL, APIKey: pc.APIKey, @@ -205,6 +214,7 @@ func loadTest(ctx context.Context, cmd *cli.Command) error { Room: cmd.String("room"), IdentityPrefix: cmd.String("identity-prefix"), Layout: loadtester.LayoutFromString(cmd.String("layout")), + CanPublish: !cmd.Bool("disable-publish"), // if disable-publish is true, CanPublish becomes false }, } diff --git a/pkg/loadtester/agentloadtester.go b/pkg/loadtester/agentloadtester.go index 78e87008..a3172b8b 100644 --- a/pkg/loadtester/agentloadtester.go +++ b/pkg/loadtester/agentloadtester.go @@ -187,7 +187,7 @@ func (r *LoadTestRoom) start(roomName string) error { return err } - meetParticipantToken, _ := newAccessToken(r.params.APIKey, r.params.APISecret, roomName, "meet-participant") + meetParticipantToken, _ := newAgentAccessToken(r.params.APIKey, r.params.APISecret, roomName, "meet-participant") r.stats.meetLink = fmt.Sprintf("https://meet.livekit.io/custom?liveKitUrl=%s&token=%s", r.params.URL, meetParticipantToken) logger.Debugw("Inspect the room in LiveKit Meet using this url", "room", roomName, "url", r.stats.meetLink) r.running.Store(true) @@ -345,7 +345,7 @@ func (t *AgentLoadTester) printStats() { fmt.Println(table) } -func newAccessToken(apiKey, apiSecret, roomName, pID string) (string, error) { +func newAgentAccessToken(apiKey, apiSecret, roomName, pID string) (string, error) { at := auth.NewAccessToken(apiKey, apiSecret) grant := &auth.VideoGrant{ RoomJoin: true, diff --git a/pkg/loadtester/loadtest.go b/pkg/loadtester/loadtest.go index a17ae98a..fcfe3b81 100644 --- a/pkg/loadtester/loadtest.go +++ b/pkg/loadtester/loadtest.go @@ -51,6 +51,8 @@ type Params struct { NumPerSecond float64 Simulcast bool SimulateSpeakers bool + // true to allow publishing tracks (default true) + CanPublish bool TesterParams } @@ -308,6 +310,7 @@ func (t *LoadTest) run(ctx context.Context, params Params) (map[string]*testerSt testerParams := params.TesterParams testerParams.Sequence = i testerParams.expectedTracks = expectedTracks + testerParams.CanPublish = params.CanPublish // Copy CanPublish from parent params isVideoPublisher := i < params.VideoPublishers isAudioPublisher := i < params.AudioPublishers if isVideoPublisher || isAudioPublisher { diff --git a/pkg/loadtester/loadtester.go b/pkg/loadtester/loadtester.go index 95fdaa06..6188b432 100644 --- a/pkg/loadtester/loadtester.go +++ b/pkg/loadtester/loadtester.go @@ -25,6 +25,7 @@ import ( "go.uber.org/atomic" provider2 "github.com/livekit/livekit-cli/v2/pkg/provider" + "github.com/livekit/protocol/auth" "github.com/livekit/protocol/livekit" lksdk "github.com/livekit/server-sdk-go/v2" "github.com/livekit/server-sdk-go/v2/pkg/samplebuilder" @@ -83,6 +84,8 @@ type TesterParams struct { Layout Layout // true to subscribe to all published tracks Subscribe bool + // true to allow publishing tracks (default true) + CanPublish bool name string Sequence int @@ -116,12 +119,22 @@ func (t *LoadTester) Start() error { var err error // make up to 10 reconnect attempts for i := 0; i < 10; i++ { - err = t.room.Join(t.params.URL, lksdk.ConnectInfo{ - APIKey: t.params.APIKey, - APISecret: t.params.APISecret, - RoomName: t.params.Room, - ParticipantIdentity: identity, - }, lksdk.WithAutoSubscribe(false)) + // If CanPublish is explicitly set to false, use custom token + if !t.params.CanPublish { + token, tokenErr := newAccessToken(t.params.APIKey, t.params.APISecret, t.params.Room, identity, t.params.CanPublish) + if tokenErr != nil { + return tokenErr + } + err = t.room.JoinWithToken(t.params.URL, token, lksdk.WithAutoSubscribe(false)) + } else { + // Use the original method for backward compatibility + err = t.room.Join(t.params.URL, lksdk.ConnectInfo{ + APIKey: t.params.APIKey, + APISecret: t.params.APISecret, + RoomName: t.params.Room, + ParticipantIdentity: identity, + }, lksdk.WithAutoSubscribe(false)) + } if err == nil { break } @@ -420,3 +433,17 @@ func (t *LoadTester) consumeTrack(track *webrtc.TrackRemote, pub *lksdk.RemoteTr } } } + +func newAccessToken(apiKey, apiSecret, roomName, pID string, canPublish bool) (string, error) { + at := auth.NewAccessToken(apiKey, apiSecret) + grant := &auth.VideoGrant{ + RoomJoin: true, + Room: roomName, + CanPublish: &canPublish, + } + at.SetVideoGrant(grant). + SetIdentity(pID). + SetName(pID) + + return at.ToJWT() +}