Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@ require (
github.com/livekit/protocol v1.41.1-0.20250909050443-48ed04737846
github.com/livekit/psrpc v0.6.1-0.20250828235857-3fafdbbcbe55
github.com/livekit/server-sdk-go/v2 v2.4.2
github.com/pion/dtls/v3 v3.0.6
github.com/pion/dtls/v3 v3.0.7
github.com/pion/interceptor v0.1.40
github.com/pion/rtcp v1.2.15
github.com/pion/rtp v1.8.20
github.com/pion/sdp/v3 v3.0.14
github.com/pion/webrtc/v4 v4.1.2
github.com/pion/rtp v1.8.21
github.com/pion/sdp/v3 v3.0.15
github.com/pion/webrtc/v4 v4.1.4
github.com/prometheus/client_golang v1.22.0
github.com/sirupsen/logrus v1.9.3
github.com/stretchr/testify v1.10.0
Expand Down Expand Up @@ -84,10 +84,10 @@ require (
github.com/pion/mdns/v2 v2.0.7 // indirect
github.com/pion/randutil v0.1.0 // indirect
github.com/pion/sctp v1.8.39 // indirect
github.com/pion/srtp/v3 v3.0.6 // indirect
github.com/pion/srtp/v3 v3.0.7 // indirect
github.com/pion/stun/v3 v3.0.0 // indirect
github.com/pion/transport/v3 v3.0.7 // indirect
github.com/pion/turn/v4 v4.0.2 // indirect
github.com/pion/turn/v4 v4.1.1 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_model v0.6.2 // indirect
Expand Down
24 changes: 12 additions & 12 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,8 @@ github.com/ory/dockertest/v3 v3.11.0 h1:OiHcxKAvSDUwsEVh2BjxQQc/5EHz9n0va9awCtNG
github.com/ory/dockertest/v3 v3.11.0/go.mod h1:VIPxS1gwT9NpPOrfD3rACs8Y9Z7yhzO4SB194iUDnUI=
github.com/pion/datachannel v1.5.10 h1:ly0Q26K1i6ZkGf42W7D4hQYR90pZwzFOjTq5AuCKk4o=
github.com/pion/datachannel v1.5.10/go.mod h1:p/jJfC9arb29W7WrxyKbepTU20CFgyx5oLo8Rs4Py/M=
github.com/pion/dtls/v3 v3.0.6 h1:7Hkd8WhAJNbRgq9RgdNh1aaWlZlGpYTzdqjy9x9sK2E=
github.com/pion/dtls/v3 v3.0.6/go.mod h1:iJxNQ3Uhn1NZWOMWlLxEEHAN5yX7GyPvvKw04v9bzYU=
github.com/pion/dtls/v3 v3.0.7 h1:bItXtTYYhZwkPFk4t1n3Kkf5TDrfj6+4wG+CZR8uI9Q=
github.com/pion/dtls/v3 v3.0.7/go.mod h1:uDlH5VPrgOQIw59irKYkMudSFprY9IEFCqz/eTz16f8=
github.com/pion/ice/v4 v4.0.10 h1:P59w1iauC/wPk9PdY8Vjl4fOFL5B+USq1+xbDcN6gT4=
github.com/pion/ice/v4 v4.0.10/go.mod h1:y3M18aPhIxLlcO/4dn9X8LzLLSma84cx6emMSu14FGw=
github.com/pion/interceptor v0.1.40 h1:e0BjnPcGpr2CFQgKhrQisBU7V3GXK6wrfYrGYaU6Jq4=
Expand All @@ -165,22 +165,22 @@ github.com/pion/randutil v0.1.0 h1:CFG1UdESneORglEsnimhUjf33Rwjubwj6xfiOXBa3mA=
github.com/pion/randutil v0.1.0/go.mod h1:XcJrSMMbbMRhASFVOlj/5hQial/Y8oH/HVo7TBZq+j8=
github.com/pion/rtcp v1.2.15 h1:LZQi2JbdipLOj4eBjK4wlVoQWfrZbh3Q6eHtWtJBZBo=
github.com/pion/rtcp v1.2.15/go.mod h1:jlGuAjHMEXwMUHK78RgX0UmEJFV4zUKOFHR7OP+D3D0=
github.com/pion/rtp v1.8.20 h1:8zcyqohadZE8FCBeGdyEvHiclPIezcwRQH9zfapFyYI=
github.com/pion/rtp v1.8.20/go.mod h1:bAu2UFKScgzyFqvUKmbvzSdPr+NGbZtv6UB2hesqXBk=
github.com/pion/rtp v1.8.21 h1:3yrOwmZFyUpcIosNcWRpQaU+UXIJ6yxLuJ8Bx0mw37Y=
github.com/pion/rtp v1.8.21/go.mod h1:bAu2UFKScgzyFqvUKmbvzSdPr+NGbZtv6UB2hesqXBk=
github.com/pion/sctp v1.8.39 h1:PJma40vRHa3UTO3C4MyeJDQ+KIobVYRZQZ0Nt7SjQnE=
github.com/pion/sctp v1.8.39/go.mod h1:cNiLdchXra8fHQwmIoqw0MbLLMs+f7uQ+dGMG2gWebE=
github.com/pion/sdp/v3 v3.0.14 h1:1h7gBr9FhOWH5GjWWY5lcw/U85MtdcibTyt/o6RxRUI=
github.com/pion/sdp/v3 v3.0.14/go.mod h1:88GMahN5xnScv1hIMTqLdu/cOcUkj6a9ytbncwMCq2E=
github.com/pion/srtp/v3 v3.0.6 h1:E2gyj1f5X10sB/qILUGIkL4C2CqK269Xq167PbGCc/4=
github.com/pion/srtp/v3 v3.0.6/go.mod h1:BxvziG3v/armJHAaJ87euvkhHqWe9I7iiOy50K2QkhY=
github.com/pion/sdp/v3 v3.0.15 h1:F0I1zds+K/+37ZrzdADmx2Q44OFDOPRLhPnNTaUX9hk=
github.com/pion/sdp/v3 v3.0.15/go.mod h1:88GMahN5xnScv1hIMTqLdu/cOcUkj6a9ytbncwMCq2E=
github.com/pion/srtp/v3 v3.0.7 h1:QUElw0A/FUg3MP8/KNMZB3i0m8F9XeMnTum86F7S4bs=
github.com/pion/srtp/v3 v3.0.7/go.mod h1:qvnHeqbhT7kDdB+OGB05KA/P067G3mm7XBfLaLiaNF0=
github.com/pion/stun/v3 v3.0.0 h1:4h1gwhWLWuZWOJIJR9s2ferRO+W3zA/b6ijOI6mKzUw=
github.com/pion/stun/v3 v3.0.0/go.mod h1:HvCN8txt8mwi4FBvS3EmDghW6aQJ24T+y+1TKjB5jyU=
github.com/pion/transport/v3 v3.0.7 h1:iRbMH05BzSNwhILHoBoAPxoB9xQgOaJk+591KC9P1o0=
github.com/pion/transport/v3 v3.0.7/go.mod h1:YleKiTZ4vqNxVwh77Z0zytYi7rXHl7j6uPLGhhz9rwo=
github.com/pion/turn/v4 v4.0.2 h1:ZqgQ3+MjP32ug30xAbD6Mn+/K4Sxi3SdNOTFf+7mpps=
github.com/pion/turn/v4 v4.0.2/go.mod h1:pMMKP/ieNAG/fN5cZiN4SDuyKsXtNTr0ccN7IToA1zs=
github.com/pion/webrtc/v4 v4.1.2 h1:mpuUo/EJ1zMNKGE79fAdYNFZBX790KE7kQQpLMjjR54=
github.com/pion/webrtc/v4 v4.1.2/go.mod h1:xsCXiNAmMEjIdFxAYU0MbB3RwRieJsegSB2JZsGN+8U=
github.com/pion/turn/v4 v4.1.1 h1:9UnY2HB99tpDyz3cVVZguSxcqkJ1DsTSZ+8TGruh4fc=
github.com/pion/turn/v4 v4.1.1/go.mod h1:2123tHk1O++vmjI5VSD0awT50NywDAq5A2NNNU4Jjs8=
github.com/pion/webrtc/v4 v4.1.4 h1:/gK1ACGHXQmtyVVbJFQDxNoODg4eSRiFLB7t9r9pg8M=
github.com/pion/webrtc/v4 v4.1.4/go.mod h1:Oab9npu1iZtQRMic3K3toYq5zFPvToe/QBw7dMI2ok4=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
Expand Down
20 changes: 17 additions & 3 deletions pkg/media/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"strings"
"sync"
"time"

"github.com/frostbyte73/core"
"github.com/go-gst/go-gst/gst"
Expand All @@ -37,7 +38,7 @@ import (
type Source interface {
GetSources() []*gst.Element
ValidateCaps(*gst.Caps) error
Start(ctx context.Context) error
Start(ctx context.Context, onClose func()) error
Close() error
}

Expand Down Expand Up @@ -122,8 +123,21 @@ func (i *Input) OnOutputReady(f OutputReadyFunc) {
i.onOutputReady = f
}

func (i *Input) Start(ctx context.Context) error {
return i.source.Start(ctx)
func (i *Input) Start(ctx context.Context, onCloseTimeout func(ctx context.Context)) error {
return i.source.Start(ctx, func() {
go func() {
t := time.NewTimer(5 * time.Second)
select {
case <-t.C:
logger.Infow("timeout while waiting for source closure to trigger pipeline stop. Pipeline frozen")
if onCloseTimeout != nil {
onCloseTimeout(context.Background())
}
case <-i.closeFuse.Watch():
t.Stop()
}
}()
})
}

func (i *Input) Close() error {
Expand Down
32 changes: 30 additions & 2 deletions pkg/media/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,9 @@ func (p *Pipeline) Run(ctx context.Context) error {
return err
}

err = p.input.Start(ctx)
err = p.input.Start(ctx, func(ctx context.Context) {
p.SendEOS(ctx)
})
if err != nil {
span.RecordError(err)
logger.Infow("failed to start input", err)
Expand Down Expand Up @@ -241,7 +243,22 @@ func (p *Pipeline) messageWatch(msg *gst.Message) bool {
case gst.MessageStreamCollection:
p.handleStreamCollectionMessage(msg)

case gst.MessageTag, gst.MessageStateChanged, gst.MessageLatency, gst.MessageAsyncDone, gst.MessageStreamStatus, gst.MessageElement:
case gst.MessageStateChanged:
p.logPipelineStateChange(msg)

case gst.MessageAsyncStart:
src := msg.Source()
if src == p.pipeline.GetName() {
logger.Infow("GST ASYNC_START (pipeline)")
}

case gst.MessageAsyncDone:
src := msg.Source()
if src == p.pipeline.GetName() {
logger.Debugw("GST ASYNC_DONE (pipeline)")
}

case gst.MessageTag, gst.MessageLatency, gst.MessageStreamStatus, gst.MessageElement:
// ignore

default:
Expand Down Expand Up @@ -279,6 +296,17 @@ func (p *Pipeline) handleStreamCollectionMessage(msg *gst.Message) {
}
}

func (p *Pipeline) logPipelineStateChange(msg *gst.Message) {
old, new := msg.ParseStateChanged()
src := msg.Source()
isPipeline := (src == p.pipeline.GetName())

if isPipeline && new != old {
logger.Infow("GST pipeline state changed",
"old", old, "new", new)
}
}

func (p *Pipeline) SendEOS(ctx context.Context) {
ctx, span := tracer.Start(ctx, "Pipeline.SendEOS")
defer span.End()
Expand Down
10 changes: 8 additions & 2 deletions pkg/media/rtmp/appsrc.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func NewRTMPRelaySource(ctx context.Context, p *params.Params) (*RTMPRelaySource
return s, nil
}

func (s *RTMPRelaySource) Start(ctx context.Context) error {
func (s *RTMPRelaySource) Start(ctx context.Context, onClose func()) error {
ctx, span := tracer.Start(ctx, "RTMPRelaySource.Start")
defer span.End()

Expand Down Expand Up @@ -104,8 +104,14 @@ func (s *RTMPRelaySource) Start(ctx context.Context) error {
logger.Debugw("flv http relay reached end of stream")

s.flvSrc.EndStream()
if onClose != nil {
onClose()
}

s.result <- err
select {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

trying to understand this part better - I assume this is done to avoid blocking - and since the channel is already buffered (1) - where else we have writing to the channel? Is whip different in that regard as try send isn't used there?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change should not be needed with the code as it stands. However, without this the code is not idempotent, and we have a risk of getting a deadlock if we ever (potentially after a code change somewhere else) have a case where we write to the channel more than once.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 - updating whip appsrc with the same pattern sounds good then

case s.result <- err:
default:
}
close(s.result)
}()

Expand Down
2 changes: 1 addition & 1 deletion pkg/media/urlpull/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ func (s *URLSource) ValidateCaps(caps *gst.Caps) error {
return errors.ErrUnsupportedDecodeMimeType(str.Name())
}

func (u *URLSource) Start(ctx context.Context) error {
func (u *URLSource) Start(ctx context.Context, onClose func()) error {
if u.printStats == nil {
return nil
}
Expand Down
13 changes: 10 additions & 3 deletions pkg/media/whip/appsrc.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,8 @@ func NewWHIPAppSource(ctx context.Context, resourceId string, trackKind types.St
return w, nil
}

func (w *whipAppSource) Start(ctx context.Context, getCorrectedTs func(time.Duration) time.Duration) error {
ctx, span := tracer.Start(ctx, "RTMPRelaySource.Start")
func (w *whipAppSource) Start(ctx context.Context, getCorrectedTs func(time.Duration) time.Duration, onClose func()) error {
ctx, span := tracer.Start(ctx, "WHIPAppSource.Start")
defer span.End()

logger.Debugw("starting WHIP app source", "resourceID", w.resourceId, "kind", w.trackKind)
Expand All @@ -104,7 +104,14 @@ func (w *whipAppSource) Start(ctx context.Context, getCorrectedTs func(time.Dura

w.appSrc.EndStream()

w.result <- err
if onClose != nil {
onClose()
}

select {
case w.result <- err:
default:
}
close(w.result)
}()

Expand Down
4 changes: 2 additions & 2 deletions pkg/media/whip/whipsrc.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,9 @@ func NewWHIPRelaySource(ctx context.Context, p *params.Params) (*WHIPSource, err
return s, nil
}

func (s *WHIPSource) Start(ctx context.Context) error {
func (s *WHIPSource) Start(ctx context.Context, onClose func()) error {
for _, t := range s.trackSrc {
err := t.Start(ctx, s.getCorrctedTimestamp)
err := t.Start(ctx, s.getCorrctedTimestamp, onClose)
if err != nil {
return err
}
Expand Down