-
Notifications
You must be signed in to change notification settings - Fork 17
/
Copy pathmuxer.go
152 lines (131 loc) · 3.91 KB
/
muxer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
package astilibav
import (
"context"
"fmt"
"sync"
"sync/atomic"
"unsafe"
"github.com/asticode/go-astiencoder"
"github.com/asticode/go-astitools/stat"
"github.com/asticode/go-astitools/sync"
"github.com/asticode/go-astitools/worker"
"github.com/asticode/goav/avcodec"
"github.com/asticode/goav/avformat"
"github.com/pkg/errors"
)
var countMuxer uint64
// Muxer represents an object capable of muxing packets into an output
type Muxer struct {
*astiencoder.BaseNode
c *astiencoder.Closer
ctxFormat *avformat.Context
e astiencoder.EmitEventFunc
o *sync.Once
q *astisync.CtxQueue
statIncomingRate *astistat.IncrementStat
statWorkRatio *astistat.DurationRatioStat
}
// NewMuxer creates a new muxer
func NewMuxer(ctxFormat *avformat.Context, e astiencoder.EmitEventFunc, c *astiencoder.Closer) (m *Muxer) {
count := atomic.AddUint64(&countMuxer, uint64(1))
m = &Muxer{
BaseNode: astiencoder.NewBaseNode(e, astiencoder.NodeMetadata{
Description: fmt.Sprintf("Muxes to %s", ctxFormat.Filename()),
Label: fmt.Sprintf("Muxer #%d", count),
Name: fmt.Sprintf("muxer_%d", count),
}),
c: c,
ctxFormat: ctxFormat,
e: e,
o: &sync.Once{},
q: astisync.NewCtxQueue(),
statIncomingRate: astistat.NewIncrementStat(),
statWorkRatio: astistat.NewDurationRatioStat(),
}
m.addStats()
return
}
func (m *Muxer) addStats() {
// Add incoming rate
m.Stater().AddStat(astistat.StatMetadata{
Description: "Number of packets coming in the muxer per second",
Label: "Incoming rate",
Unit: "pps",
}, m.statIncomingRate)
// Add work ratio
m.Stater().AddStat(astistat.StatMetadata{
Description: "Percentage of time spent doing some actual work",
Label: "Work ratio",
Unit: "%",
}, m.statWorkRatio)
// Add queue stats
m.q.AddStats(m.Stater())
}
// Start starts the muxer
func (m *Muxer) Start(ctx context.Context, t astiencoder.CreateTaskFunc) {
m.BaseNode.Start(ctx, t, func(t *astiworker.Task) {
// Handle context
go m.q.HandleCtx(m.Context())
// Make sure to write header once
var ret int
m.o.Do(func() { ret = m.ctxFormat.AvformatWriteHeader(nil) })
if ret < 0 {
emitAvError(m.e, ret, "m.ctxFormat.AvformatWriteHeader on %s failed", m.ctxFormat.Filename())
return
}
// Write trailer once everything is done
m.c.Add(func() error {
if ret := m.ctxFormat.AvWriteTrailer(); ret < 0 {
return errors.Wrapf(newAvError(ret), "m.ctxFormat.AvWriteTrailer on %s failed", m.ctxFormat.Filename())
}
return nil
})
// Make sure to stop the queue properly
defer m.q.Stop()
// Start queue
m.q.Start(func(p interface{}) {
// Handle pause
defer m.HandlePause()
// Assert payload
pkt := p.(pktRetriever)()
// Increment incoming rate
m.statIncomingRate.Add(1)
// Write frame
m.statWorkRatio.Add(true)
if ret := m.ctxFormat.AvInterleavedWriteFrame((*avformat.Packet)(unsafe.Pointer(pkt))); ret < 0 {
m.statWorkRatio.Done(true)
emitAvError(m.e, ret, "m.ctxFormat.AvInterleavedWriteFrame on %+v failed", pkt)
return
}
m.statWorkRatio.Done(true)
})
})
}
// MuxerPktHandler is an object that can handle a pkt for the muxer
type MuxerPktHandler struct {
*Muxer
o *avformat.Stream
prev Descriptor
}
// NewHandler creates
func (m *Muxer) NewPktHandler(o *avformat.Stream, prev Descriptor) *MuxerPktHandler {
return &MuxerPktHandler{
Muxer: m,
o: o,
prev: prev,
}
}
// HandlePkt implements the PktHandler interface
func (h *MuxerPktHandler) HandlePkt(pkt *avcodec.Packet) {
// Send pkt
h.q.Send(h.pktRetriever(pkt))
}
func (h *MuxerPktHandler) pktRetriever(pkt *avcodec.Packet) pktRetriever {
return func() *avcodec.Packet {
// Rescale timestamps
pkt.AvPacketRescaleTs(h.prev.TimeBase(), h.o.TimeBase())
// Set stream index
pkt.SetStreamIndex(h.o.Index())
return pkt
}
}