Skip to content

Commit 7a98d72

Browse files
Add support for announcing out of network splice events ad participant attributes (#326)
Out of network events are only received from SCTE 35 tables in MPEG TS streams (eg SRT) for now. This is implemented my monitoring SCTE 35 related gstreamer events pushed down the pipeline. Relying on messages sent on the gstreamer message bus would be a less intrusive solution, but SCTE 35 related events sent on the bus do not contain timing information based on the pipeline timabase, and the MPEG TS demuxer doesn't expose needed timing information to regenerate these timestamps. Events do contain extra timing metadata. This relies on a forked version of go-gst for now.
1 parent bfe72cb commit 7a98d72

File tree

7 files changed

+245
-20
lines changed

7 files changed

+245
-20
lines changed

go.mod

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,16 @@
11
module github.com/livekit/ingress
22

3-
go 1.23.1
3+
go 1.23.2
4+
5+
replace github.com/go-gst/go-gst => github.com/livekit/go-gst v0.2.34-0.20250205234446-03e3e98e7c6f
46

57
toolchain go1.23.6
68

79
require (
810
github.com/Eyevinn/mp4ff v0.47.0
911
github.com/aclements/go-moremath v0.0.0-20241023150245-c8bbc672ef66
1012
github.com/frostbyte73/core v0.1.0
11-
github.com/go-gst/go-glib v1.4.0
13+
github.com/go-gst/go-glib v1.4.1-0.20241209142714-f53cebf18559
1214
github.com/go-gst/go-gst v1.4.0
1315
github.com/gorilla/mux v1.8.1
1416
github.com/livekit/go-rtmp v0.0.0-20230829211117-1c4f5a5c81ed
@@ -51,6 +53,7 @@ require (
5153
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
5254
github.com/fsnotify/fsnotify v1.8.0 // indirect
5355
github.com/gammazero/deque v1.0.0 // indirect
56+
github.com/go-gst/go-pointer v0.0.0-20241127163939-ba766f075b4c // indirect
5457
github.com/go-jose/go-jose/v3 v3.0.3 // indirect
5558
github.com/go-logr/logr v1.4.2 // indirect
5659
github.com/go-logr/stdr v1.2.2 // indirect
@@ -65,7 +68,6 @@ require (
6568
github.com/lithammer/shortuuid/v4 v4.2.0 // indirect
6669
github.com/mackerelio/go-osstat v0.2.5 // indirect
6770
github.com/magefile/mage v1.15.0 // indirect
68-
github.com/mattn/go-pointer v0.0.1 // indirect
6971
github.com/mitchellh/mapstructure v1.5.0 // indirect
7072
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
7173
github.com/nats-io/nats.go v1.38.0 // indirect

go.sum

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -63,10 +63,10 @@ github.com/fsnotify/fsnotify v1.8.0 h1:dAwr6QBTBZIkG8roQaJjGof0pp0EeF+tNV7YBP3F/
6363
github.com/fsnotify/fsnotify v1.8.0/go.mod h1:8jBTzvmWwFyi3Pb8djgCCO5IBqzKJ/Jwo8TRcHyHii0=
6464
github.com/gammazero/deque v1.0.0 h1:LTmimT8H7bXkkCy6gZX7zNLtkbz4NdS2z8LZuor3j34=
6565
github.com/gammazero/deque v1.0.0/go.mod h1:iflpYvtGfM3U8S8j+sZEKIak3SAKYpA5/SQewgfXDKo=
66-
github.com/go-gst/go-glib v1.4.0 h1:FB2uVfB0uqz7/M6EaDdWWlBZRQpvFAbWfL7drdw8lAE=
67-
github.com/go-gst/go-glib v1.4.0/go.mod h1:GUIpWmkxQ1/eL+FYSjKpLDyTZx6Vgd9nNXt8dA31d5M=
68-
github.com/go-gst/go-gst v1.4.0 h1:EikB43u4c3wc8d2RzlFRSfIGIXYzDy6Zls2vJqrG2BU=
69-
github.com/go-gst/go-gst v1.4.0/go.mod h1:p8TLGtOxJLcrp6PCkTPdnanwWBxPZvYiHDbuSuwgO3c=
66+
github.com/go-gst/go-glib v1.4.1-0.20241209142714-f53cebf18559 h1:AK60n6W3FLZTp9H1KU5VOa8XefNO0w0R3pfszphwX14=
67+
github.com/go-gst/go-glib v1.4.1-0.20241209142714-f53cebf18559/go.mod h1:ZWT4LXOO2PH8lSNu/dR5O2yoNQJKEgmijNa2d7nByK8=
68+
github.com/go-gst/go-pointer v0.0.0-20241127163939-ba766f075b4c h1:x8kKRVDmz5BRlolmDZGcsuZ1l+js6TRL3QWBJjGVctM=
69+
github.com/go-gst/go-pointer v0.0.0-20241127163939-ba766f075b4c/go.mod h1:qKw5ZZ0U58W6PU/7F/Lopv+14nKYmdXlOd7VnAZ17Mk=
7070
github.com/go-jose/go-jose/v3 v3.0.3 h1:fFKWeig/irsp7XD2zBxvnmA/XaRWp5V3CBsZXJF7G7k=
7171
github.com/go-jose/go-jose/v3 v3.0.3/go.mod h1:5b+7YgP7ZICgJDBdfjZaIt+H/9L9T/YQrVfLAMboGkQ=
7272
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
@@ -112,6 +112,8 @@ github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0
112112
github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw=
113113
github.com/lithammer/shortuuid/v4 v4.2.0 h1:LMFOzVB3996a7b8aBuEXxqOBflbfPQAiVzkIcHO0h8c=
114114
github.com/lithammer/shortuuid/v4 v4.2.0/go.mod h1:D5noHZ2oFw/YaKCfGy0YxyE7M0wMbezmMjPdhyEFe6Y=
115+
github.com/livekit/go-gst v0.2.34-0.20250205234446-03e3e98e7c6f h1:d5QHoW/YCF9atbqX6WhLFjtyrWOhlq/qssr46s2Pk+E=
116+
github.com/livekit/go-gst v0.2.34-0.20250205234446-03e3e98e7c6f/go.mod h1:pyCgY9XFSG0CAnJzoJ84R5XWn8rEj849EYJOwnAdB8k=
115117
github.com/livekit/go-rtmp v0.0.0-20230829211117-1c4f5a5c81ed h1:w4c3K0j/I2qG+6PUF/ep4tf93HRibNs9QOWWD1SID50=
116118
github.com/livekit/go-rtmp v0.0.0-20230829211117-1c4f5a5c81ed/go.mod h1:X+CliWDrjhm5C+NgmxVt2ncdO3MnKDlbZHTwkuf0808=
117119
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkDaKb5iXdynYrzB84ErPPO4LbRASk58=
@@ -128,8 +130,6 @@ github.com/mackerelio/go-osstat v0.2.5 h1:+MqTbZUhoIt4m8qzkVoXUJg1EuifwlAJSk4Yl2
128130
github.com/mackerelio/go-osstat v0.2.5/go.mod h1:atxwWF+POUZcdtR1wnsUcQxTytoHG4uhl2AKKzrOajY=
129131
github.com/magefile/mage v1.15.0 h1:BvGheCMAsG3bWUDbZ8AyXXpCNwU9u5CB6sM+HNb9HYg=
130132
github.com/magefile/mage v1.15.0/go.mod h1:z5UZb/iS3GoOSn0JgWuiw7dxlurVYTu+/jHXqQg881A=
131-
github.com/mattn/go-pointer v0.0.1 h1:n+XhsuGeVO6MEAp7xyEukFINEa+Quek5psIR/ylA6o0=
132-
github.com/mattn/go-pointer v0.0.1/go.mod h1:2zXcozF6qYGgmsG+SeTZz3oAbFLdD3OWqnUbNvJZAlc=
133133
github.com/mitchellh/mapstructure v1.4.1/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo=
134134
github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY=
135135
github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo=

pkg/lksdk_output/lksdk_output.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -332,6 +332,10 @@ func (s *LKSDKOutput) WriteRTCP(pkts []rtcp.Packet) error {
332332
return pc.WriteRTCP(pkts)
333333
}
334334

335+
func (s *LKSDKOutput) UpdateLocalParticipantAttributes(attributes map[string]string) {
336+
s.room.LocalParticipant.SetAttributes(attributes)
337+
}
338+
335339
func (s *LKSDKOutput) Close() error {
336340
s.closeOutput()
337341

pkg/media/input.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,15 @@ func (i *Input) onPadAdded(_ *gst.Element, pad *gst.Pad) {
158158
}
159159
}
160160

161+
// Make sure we emit scte35 markers if available
162+
tsparser, _ := i.bin.GetElementByName("tsdemux0")
163+
if tsparser != nil {
164+
err := tsparser.SetProperty("send-scte35-events", true)
165+
if err != nil {
166+
logger.Errorw("failed setting `send-scte35-events` property", err)
167+
}
168+
}
169+
161170
// surface callback for first audio and video pads, plug in fakesink on the rest
162171
i.lock.Lock()
163172
newPad := false

pkg/media/splice_processor.go

Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
// Copyright 2025 LiveKit, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package media
16+
17+
import (
18+
"context"
19+
"fmt"
20+
"time"
21+
22+
"github.com/go-gst/go-gst/gst"
23+
"github.com/livekit/ingress/pkg/lksdk_output"
24+
"github.com/livekit/ingress/pkg/utils"
25+
"github.com/livekit/protocol/livekit"
26+
"github.com/livekit/protocol/logger"
27+
)
28+
29+
type Splice struct {
30+
Immediate bool
31+
OutOfNetworkIndicator bool
32+
EventId uint32
33+
EventCancelIndicator bool
34+
RunningTime time.Duration
35+
}
36+
37+
type SpliceProcessor struct {
38+
outputSync *utils.OutputSynchronizer
39+
sdkOut *lksdk_output.LKSDKOutput
40+
41+
ctx context.Context
42+
cancel context.CancelFunc
43+
}
44+
45+
func NewSpliceProcessor(sdkOut *lksdk_output.LKSDKOutput, outputSync *utils.OutputSynchronizer) *SpliceProcessor {
46+
su := &SpliceProcessor{
47+
sdkOut: sdkOut,
48+
outputSync: outputSync,
49+
}
50+
51+
su.ctx, su.cancel = context.WithCancel(context.Background())
52+
53+
return su
54+
}
55+
56+
func (su *SpliceProcessor) Close() {
57+
su.cancel()
58+
}
59+
60+
func (su *SpliceProcessor) ProcessSpliceEvent(ev *gst.Event) error {
61+
ts := ev.ParseMpegtsSection()
62+
if ts == nil {
63+
return nil
64+
}
65+
if ts.SectionType() != gst.MpegtsSectionSCTESIT {
66+
return nil
67+
}
68+
69+
scteSit := ts.GetSCTESIT()
70+
if scteSit == nil {
71+
return nil
72+
}
73+
74+
if scteSit.SpliceCommandType() != gst.MpegtsSCTESpliceCommandInsert {
75+
return nil
76+
}
77+
78+
str, _ := ev.GetStructure().GetValue("running-time-map")
79+
rMap, _ := str.(*gst.Structure)
80+
81+
splices := scteSit.Splices()
82+
for _, sp := range splices {
83+
var rTime time.Duration
84+
85+
if !sp.SpliceImmediateFlag() {
86+
if rMap != nil {
87+
val, _ := rMap.GetValue(fmt.Sprintf("event-%d-splice-time", sp.SpliceEventId()))
88+
if val != nil {
89+
rTime = time.Duration(val.(uint64))
90+
}
91+
}
92+
if rTime == 0 && scteSit.SpliceTimeSpecified() {
93+
val, _ := rMap.GetValue("splice-time")
94+
if val != nil {
95+
rTime = time.Duration(val.(uint64))
96+
}
97+
}
98+
}
99+
100+
sp := &Splice{
101+
Immediate: sp.SpliceImmediateFlag(),
102+
OutOfNetworkIndicator: sp.OutOfNetworkIndicator(),
103+
EventId: sp.SpliceEventId(),
104+
EventCancelIndicator: sp.SpliceEventCancelIndicator(),
105+
RunningTime: time.Duration(rTime),
106+
}
107+
108+
err := su.processSplice(sp)
109+
if err != nil {
110+
return err
111+
}
112+
}
113+
114+
return nil
115+
}
116+
117+
func (su *SpliceProcessor) processSplice(sp *Splice) error {
118+
var attr string
119+
120+
if sp.OutOfNetworkIndicator {
121+
attr = fmt.Sprintf("%d", sp.EventId)
122+
}
123+
124+
logger.Debugw("spice event", "eventID", sp.EventId, "outOfNetworkIndicator", sp.OutOfNetworkIndicator, "immediate", sp.Immediate, "runningTime", sp.RunningTime)
125+
126+
event := func() {
127+
su.sdkOut.UpdateLocalParticipantAttributes(map[string]string{livekit.AttrIngressOutOfNetworkEventID: attr})
128+
}
129+
130+
if sp.Immediate {
131+
event()
132+
} else {
133+
err := su.outputSync.ScheduleEvent(su.ctx, sp.RunningTime, event)
134+
if err != nil {
135+
return err
136+
}
137+
}
138+
139+
return nil
140+
}

pkg/media/webrtc_sink.go

Lines changed: 48 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -38,14 +38,16 @@ type WebRTCSink struct {
3838
params *params.Params
3939
onFailure func()
4040

41-
lock sync.Mutex
42-
sdkReady core.Fuse
43-
closed core.Fuse
44-
errChan chan error
45-
46-
sdkOut *lksdk_output.LKSDKOutput
47-
outputSync *utils.OutputSynchronizer
48-
statsGatherer *stats.LocalMediaStatsGatherer
41+
lock sync.Mutex
42+
sdkReady core.Fuse
43+
closed core.Fuse
44+
errChan chan error
45+
spliceProbeAdded bool
46+
47+
sdkOut *lksdk_output.LKSDKOutput
48+
outputSync *utils.OutputSynchronizer
49+
spliceProcessor *SpliceProcessor
50+
statsGatherer *stats.LocalMediaStatsGatherer
4951
}
5052

5153
func NewWebRTCSink(ctx context.Context, p *params.Params, onFailure func(), statsGatherer *stats.LocalMediaStatsGatherer) (*WebRTCSink, error) {
@@ -83,6 +85,7 @@ func NewWebRTCSink(ctx context.Context, p *params.Params, onFailure func(), stat
8385

8486
s.lock.Lock()
8587
s.sdkOut = sdkOut
88+
s.spliceProcessor = NewSpliceProcessor(sdkOut, s.outputSync)
8689
s.lock.Unlock()
8790
}()
8891

@@ -234,6 +237,9 @@ func (s *WebRTCSink) AddTrack(kind types.StreamKind, caps *gst.Caps) (*gst.Bin,
234237
bin = pp.GetBin()
235238
}
236239

240+
if !s.spliceProbeAdded {
241+
s.addSpliceProbe(bin)
242+
}
237243
return bin, nil
238244
}
239245

@@ -244,6 +250,10 @@ func (s *WebRTCSink) Close() error {
244250

245251
var err error
246252
s.lock.Lock()
253+
if s.spliceProcessor != nil {
254+
s.spliceProcessor.Close()
255+
}
256+
247257
if s.sdkOut != nil {
248258
err = s.sdkOut.Close()
249259
}
@@ -279,6 +289,36 @@ func getResolution(caps *gst.Caps) (w int, h int, err error) {
279289
return wObj.(int), hObj.(int), nil
280290
}
281291

292+
func (s *WebRTCSink) addSpliceProbe(bin *gst.Bin) {
293+
pad := bin.GetStaticPad("sink")
294+
if pad == nil {
295+
logger.Infow("No sink pad on output bin")
296+
return
297+
}
298+
299+
pad.SetEventFunction(func(self *gst.Pad, parent *gst.Object, event *gst.Event) bool {
300+
if event.HasName("scte-sit") {
301+
s.lock.Lock()
302+
p := s.spliceProcessor
303+
s.lock.Unlock()
304+
305+
if p != nil {
306+
err := p.ProcessSpliceEvent(event)
307+
if err != nil {
308+
logger.Infow("failed processing splice event", "error", err)
309+
}
310+
} else {
311+
// TODO store events and process them after connection
312+
logger.Infow("unable to process media splice before room is connected")
313+
}
314+
}
315+
316+
return pad.EventDefault(parent, event)
317+
})
318+
319+
s.spliceProbeAdded = true
320+
}
321+
282322
func filterAndSortLayersByQuality(layers []*livekit.VideoLayer, sourceW, sourceH int) []*livekit.VideoLayer {
283323
layersByQuality := make(map[livekit.VideoQuality]*livekit.VideoLayer)
284324

pkg/utils/output_synchronizer.go

Lines changed: 33 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,13 @@
1515
package utils
1616

1717
import (
18+
"context"
1819
"sync"
1920
"time"
2021

2122
"github.com/frostbyte73/core"
2223
"github.com/livekit/ingress/pkg/errors"
24+
"github.com/livekit/psrpc"
2325
)
2426

2527
const (
@@ -42,6 +44,30 @@ func NewOutputSynchronizer() *OutputSynchronizer {
4244
return &OutputSynchronizer{}
4345
}
4446

47+
func (os *OutputSynchronizer) ScheduleEvent(ctx context.Context, pts time.Duration, event func()) error {
48+
os.lock.Lock()
49+
50+
if os.zeroTime.IsZero() {
51+
os.lock.Unlock()
52+
return psrpc.NewErrorf(psrpc.OutOfRange, "trying to schedule an event before output synchtonizer timeline was set")
53+
}
54+
55+
waitTime := computeWaitDuration(pts, os.zeroTime, time.Now())
56+
57+
os.lock.Unlock()
58+
59+
go func() {
60+
select {
61+
case <-time.After(waitTime):
62+
event()
63+
case <-ctx.Done():
64+
return
65+
}
66+
}()
67+
68+
return nil
69+
}
70+
4571
func (os *OutputSynchronizer) AddTrack() *TrackOutputSynchronizer {
4672
return newTrackOutputSynchronizer(os)
4773
}
@@ -50,10 +76,8 @@ func (os *OutputSynchronizer) getWaitDuration(pts time.Duration, firstSampleSent
5076
os.lock.Lock()
5177
defer os.lock.Unlock()
5278

53-
mediaTime := os.zeroTime.Add(pts)
5479
now := time.Now()
55-
56-
waitTime := mediaTime.Sub(now)
80+
waitTime := computeWaitDuration(pts, os.zeroTime, now)
5781

5882
if os.zeroTime.IsZero() || (waitTime < -leeway && firstSampleSent) {
5983
// Reset zeroTime if the earliest track is late
@@ -90,3 +114,9 @@ func (ost *TrackOutputSynchronizer) WaitForMediaTime(pts time.Duration) (bool, e
90114
return false, errors.ErrIngressClosing
91115
}
92116
}
117+
118+
func computeWaitDuration(pts time.Duration, zeroTime time.Time, now time.Time) time.Duration {
119+
mediaTime := zeroTime.Add(pts)
120+
121+
return mediaTime.Sub(now)
122+
}

0 commit comments

Comments
 (0)