Skip to content

Commit 94e1339

Browse files
committed
fix(snapshot): async decode for main stream, remove debug logging
Move OpenH264 decode to a dedicated goroutine in SnapshotConsumer to avoid blocking RTP fan-out (was causing ~110ms delays per packet). NAL streams are queued via channel with non-blocking send to drop frames if the decoder is busy. Remove temporary debug log statements.
1 parent d583758 commit 94e1339

1 file changed

Lines changed: 54 additions & 27 deletions

File tree

internal/media/snapshot_consumer.go

Lines changed: 54 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -16,18 +16,20 @@ import (
1616

1717
// SnapshotConsumer decodes IDR frames from the main (high-res) stream
1818
// and caches the latest decoded frame for event snapshots.
19-
// It only decodes keyframes (~0.5-1 FPS) to keep CPU overhead minimal.
19+
// Decoding runs in a dedicated goroutine to avoid blocking RTP fan-out.
2020
type SnapshotConsumer struct {
2121
camera string
2222

2323
h264Decoder *rtph264.Decoder
24-
h264Dec *H264Decoder
2524
sps []byte
2625
pps []byte
2726

2827
mu sync.RWMutex
2928
lastFrame *image.RGBA
3029
lastTime time.Time
30+
31+
decodeCh chan []byte
32+
done chan struct{}
3133
}
3234

3335
// NewSnapshotConsumer creates a consumer that caches the latest full-resolution
@@ -38,10 +40,10 @@ func NewSnapshotConsumer(camera string, track *rtsp.TrackInfo) *SnapshotConsumer
3840
return nil
3941
}
4042

41-
sc := &SnapshotConsumer{
42-
camera: camera,
43-
sps: track.SPS,
44-
pps: track.PPS,
43+
// Verify OpenH264 is available before allocating
44+
if !ensureOpenH264() {
45+
slog.Warn("snapshot consumer: OpenH264 unavailable", "camera", camera)
46+
return nil
4547
}
4648

4749
h264Format := &format.H264{
@@ -55,18 +57,52 @@ func NewSnapshotConsumer(camera string, track *rtsp.TrackInfo) *SnapshotConsumer
5557
slog.Warn("snapshot consumer: failed to create H264 RTP decoder", "camera", camera, "error", err)
5658
return nil
5759
}
58-
sc.h264Decoder = dec
5960

60-
sc.h264Dec = NewH264Decoder()
61-
if sc.h264Dec == nil {
62-
slog.Warn("snapshot consumer: OpenH264 unavailable", "camera", camera)
63-
return nil
61+
sc := &SnapshotConsumer{
62+
camera: camera,
63+
h264Decoder: dec,
64+
sps: track.SPS,
65+
pps: track.PPS,
66+
decodeCh: make(chan []byte, 1),
67+
done: make(chan struct{}),
6468
}
6569

70+
go sc.decodeLoop()
71+
6672
slog.Info("snapshot consumer enabled for main stream", "camera", camera)
6773
return sc
6874
}
6975

76+
// decodeLoop runs in a dedicated goroutine, decoding NAL streams without
77+
// blocking the RTP fan-out callback.
78+
func (sc *SnapshotConsumer) decodeLoop() {
79+
h264Dec := NewH264Decoder()
80+
if h264Dec == nil {
81+
return
82+
}
83+
defer h264Dec.Close()
84+
85+
for {
86+
select {
87+
case nalStream, ok := <-sc.decodeCh:
88+
if !ok {
89+
return
90+
}
91+
ycbcr := h264Dec.Decode(nalStream)
92+
if ycbcr == nil {
93+
continue
94+
}
95+
rgba := ycbcrToRGBA(ycbcr)
96+
sc.mu.Lock()
97+
sc.lastFrame = rgba
98+
sc.lastTime = time.Now()
99+
sc.mu.Unlock()
100+
case <-sc.done:
101+
return
102+
}
103+
}
104+
}
105+
70106
// LastFrame returns the most recently decoded full-resolution frame, or nil.
71107
func (sc *SnapshotConsumer) LastFrame() *image.RGBA {
72108
sc.mu.RLock()
@@ -76,15 +112,12 @@ func (sc *SnapshotConsumer) LastFrame() *image.RGBA {
76112

77113
// Close releases decoder resources.
78114
func (sc *SnapshotConsumer) Close() {
79-
if sc.h264Dec != nil {
80-
sc.h264Dec.Close()
81-
sc.h264Dec = nil
82-
}
115+
close(sc.done)
83116
}
84117

85-
// OnVideoRTP decodes IDR frames and caches the result.
118+
// OnVideoRTP reassembles NAL units and queues IDR frames for async decode.
86119
func (sc *SnapshotConsumer) OnVideoRTP(pkt *rtp.Packet) {
87-
if sc.h264Decoder == nil || sc.h264Dec == nil {
120+
if sc.h264Decoder == nil {
88121
return
89122
}
90123

@@ -151,17 +184,11 @@ func (sc *SnapshotConsumer) OnVideoRTP(pkt *rtp.Packet) {
151184
nalStream = append(nalStream, nalu...)
152185
}
153186

154-
ycbcr := sc.h264Dec.Decode(nalStream)
155-
if ycbcr == nil {
156-
return
187+
// Non-blocking send — drop frame if decode goroutine is busy
188+
select {
189+
case sc.decodeCh <- nalStream:
190+
default:
157191
}
158-
159-
rgba := ycbcrToRGBA(ycbcr)
160-
161-
sc.mu.Lock()
162-
sc.lastFrame = rgba
163-
sc.lastTime = time.Now()
164-
sc.mu.Unlock()
165192
}
166193

167194
// OnAudioRTP is a no-op.

0 commit comments

Comments
 (0)