Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions cmd/lk/perf.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@ var (
Name: "simulate-speakers",
Usage: "Fire random speaker events to simulate speaker changes",
},
&cli.BoolFlag{
Name: "disable-publish",
Usage: "Disable publishing permission for participants (canPublish: false)",
},
&cli.BoolFlag{
Name: "run-all",
Usage: "Runs set list of load test cases",
Expand Down Expand Up @@ -170,6 +174,10 @@ var (
Name: "simulate-speakers",
Usage: "Fire random speaker events to simulate speaker changes",
},
&cli.BoolFlag{
Name: "disable-publish",
Usage: "Disable publishing permission for participants (canPublish: false)",
},
&cli.BoolFlag{
Name: "run-all",
Usage: "Runs set list of load test cases",
Expand Down Expand Up @@ -198,13 +206,15 @@ func loadTest(ctx context.Context, cmd *cli.Command) error {
NumPerSecond: cmd.Float("num-per-second"),
Simulcast: !cmd.Bool("no-simulcast"),
SimulateSpeakers: cmd.Bool("simulate-speakers"),
CanPublish: !cmd.Bool("disable-publish"), // if disable-publish is true, CanPublish becomes false
TesterParams: loadtester.TesterParams{
URL: pc.URL,
APIKey: pc.APIKey,
APISecret: pc.APISecret,
Room: cmd.String("room"),
IdentityPrefix: cmd.String("identity-prefix"),
Layout: loadtester.LayoutFromString(cmd.String("layout")),
CanPublish: !cmd.Bool("disable-publish"), // if disable-publish is true, CanPublish becomes false
},
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/loadtester/agentloadtester.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ func (r *LoadTestRoom) start(roomName string) error {
return err
}

meetParticipantToken, _ := newAccessToken(r.params.APIKey, r.params.APISecret, roomName, "meet-participant")
meetParticipantToken, _ := newAgentAccessToken(r.params.APIKey, r.params.APISecret, roomName, "meet-participant")
r.stats.meetLink = fmt.Sprintf("https://meet.livekit.io/custom?liveKitUrl=%s&token=%s", r.params.URL, meetParticipantToken)
logger.Debugw("Inspect the room in LiveKit Meet using this url", "room", roomName, "url", r.stats.meetLink)
r.running.Store(true)
Expand Down Expand Up @@ -345,7 +345,7 @@ func (t *AgentLoadTester) printStats() {
fmt.Println(table)
}

func newAccessToken(apiKey, apiSecret, roomName, pID string) (string, error) {
func newAgentAccessToken(apiKey, apiSecret, roomName, pID string) (string, error) {
at := auth.NewAccessToken(apiKey, apiSecret)
grant := &auth.VideoGrant{
RoomJoin: true,
Expand Down
3 changes: 3 additions & 0 deletions pkg/loadtester/loadtest.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ type Params struct {
NumPerSecond float64
Simulcast bool
SimulateSpeakers bool
// true to allow publishing tracks (default true)
CanPublish bool

TesterParams
}
Expand Down Expand Up @@ -308,6 +310,7 @@ func (t *LoadTest) run(ctx context.Context, params Params) (map[string]*testerSt
testerParams := params.TesterParams
testerParams.Sequence = i
testerParams.expectedTracks = expectedTracks
testerParams.CanPublish = params.CanPublish // Copy CanPublish from parent params
isVideoPublisher := i < params.VideoPublishers
isAudioPublisher := i < params.AudioPublishers
if isVideoPublisher || isAudioPublisher {
Expand Down
39 changes: 33 additions & 6 deletions pkg/loadtester/loadtester.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"go.uber.org/atomic"

provider2 "github.com/livekit/livekit-cli/v2/pkg/provider"
"github.com/livekit/protocol/auth"
"github.com/livekit/protocol/livekit"
lksdk "github.com/livekit/server-sdk-go/v2"
"github.com/livekit/server-sdk-go/v2/pkg/samplebuilder"
Expand Down Expand Up @@ -83,6 +84,8 @@ type TesterParams struct {
Layout Layout
// true to subscribe to all published tracks
Subscribe bool
// true to allow publishing tracks (default true)
CanPublish bool

name string
Sequence int
Expand Down Expand Up @@ -116,12 +119,22 @@ func (t *LoadTester) Start() error {
var err error
// make up to 10 reconnect attempts
for i := 0; i < 10; i++ {
err = t.room.Join(t.params.URL, lksdk.ConnectInfo{
APIKey: t.params.APIKey,
APISecret: t.params.APISecret,
RoomName: t.params.Room,
ParticipantIdentity: identity,
}, lksdk.WithAutoSubscribe(false))
// If CanPublish is explicitly set to false, use custom token
if !t.params.CanPublish {
token, tokenErr := newAccessToken(t.params.APIKey, t.params.APISecret, t.params.Room, identity, t.params.CanPublish)
if tokenErr != nil {
return tokenErr
}
err = t.room.JoinWithToken(t.params.URL, token, lksdk.WithAutoSubscribe(false))
} else {
// Use the original method for backward compatibility
err = t.room.Join(t.params.URL, lksdk.ConnectInfo{
APIKey: t.params.APIKey,
APISecret: t.params.APISecret,
RoomName: t.params.Room,
ParticipantIdentity: identity,
}, lksdk.WithAutoSubscribe(false))
}
if err == nil {
break
}
Expand Down Expand Up @@ -420,3 +433,17 @@ func (t *LoadTester) consumeTrack(track *webrtc.TrackRemote, pub *lksdk.RemoteTr
}
}
}

func newAccessToken(apiKey, apiSecret, roomName, pID string, canPublish bool) (string, error) {
at := auth.NewAccessToken(apiKey, apiSecret)
grant := &auth.VideoGrant{
RoomJoin: true,
Room: roomName,
CanPublish: &canPublish,
}
at.SetVideoGrant(grant).
SetIdentity(pID).
SetName(pID)

return at.ToJWT()
}
Loading