-
Notifications
You must be signed in to change notification settings - Fork 17
/
Copy pathdecoder.go
190 lines (161 loc) · 4.84 KB
/
decoder.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
package astilibav
import (
"context"
"fmt"
"sync/atomic"
"github.com/asticode/go-astiencoder"
"github.com/asticode/go-astitools/stat"
"github.com/asticode/go-astitools/sync"
"github.com/asticode/go-astitools/worker"
"github.com/asticode/goav/avcodec"
"github.com/asticode/goav/avutil"
"github.com/pkg/errors"
)
var countDecoder uint64
// Decoder represents an object capable of decoding packets
type Decoder struct {
*astiencoder.BaseNode
ctxCodec *avcodec.Context
d *frameDispatcher
e astiencoder.EmitEventFunc
q *astisync.CtxQueue
statIncomingRate *astistat.IncrementStat
statWorkRatio *astistat.DurationRatioStat
}
// NewDecoder creates a new decoder
func NewDecoder(ctxCodec *avcodec.Context, e astiencoder.EmitEventFunc, c *astiencoder.Closer) (d *Decoder) {
count := atomic.AddUint64(&countDecoder, uint64(1))
d = &Decoder{
BaseNode: astiencoder.NewBaseNode(e, astiencoder.NodeMetadata{
Description: "Decodes",
Label: fmt.Sprintf("Decoder #%d", count),
Name: fmt.Sprintf("decoder_%d", count),
}),
ctxCodec: ctxCodec,
d: newFrameDispatcher(e, c),
e: e,
q: astisync.NewCtxQueue(),
statIncomingRate: astistat.NewIncrementStat(),
statWorkRatio: astistat.NewDurationRatioStat(),
}
d.addStats()
return
}
// NewDecoderFromCodecParams creates a new decoder from codec params
func NewDecoderFromCodecParams(codecParams *avcodec.CodecParameters, e astiencoder.EmitEventFunc, c *astiencoder.Closer) (d *Decoder, err error) {
// Find decoder
var cdc *avcodec.Codec
if cdc = avcodec.AvcodecFindDecoder(codecParams.CodecId()); cdc == nil {
err = fmt.Errorf("astilibav: no decoder found for codec id %+v", codecParams.CodecId())
return
}
// Alloc context
var ctxCodec *avcodec.Context
if ctxCodec = cdc.AvcodecAllocContext3(); ctxCodec == nil {
err = fmt.Errorf("astilibav: no context allocated for codec %+v", cdc)
return
}
// Copy codec parameters
if ret := avcodec.AvcodecParametersToContext(ctxCodec, codecParams); ret < 0 {
err = errors.Wrap(newAvError(ret), "astilibav: avcodec.AvcodecParametersToContext failed")
return
}
// Open codec
if ret := ctxCodec.AvcodecOpen2(cdc, nil); ret < 0 {
err = errors.Wrap(newAvError(ret), "astilibav: d.ctxCodec.AvcodecOpen2 failed")
return
}
// Make sure the codec is closed
c.Add(func() error {
if ret := ctxCodec.AvcodecClose(); ret < 0 {
emitAvError(e, ret, "d.ctxCodec.AvcodecClose failed")
}
return nil
})
// Create decoder
d = NewDecoder(ctxCodec, e, c)
return
}
func (d *Decoder) addStats() {
// Add incoming rate
d.Stater().AddStat(astistat.StatMetadata{
Description: "Number of packets coming in the decoder per second",
Label: "Incoming rate",
Unit: "pps",
}, d.statIncomingRate)
// Add work ratio
d.Stater().AddStat(astistat.StatMetadata{
Description: "Percentage of time spent doing some actual work",
Label: "Work ratio",
Unit: "%",
}, d.statWorkRatio)
// Add dispatcher stats
d.d.addStats(d.Stater())
// Add queue stats
d.q.AddStats(d.Stater())
}
// Connect connects the decoder to a FrameHandler
func (d *Decoder) Connect(h FrameHandler) {
// Add handler
d.d.addHandler(h)
// Connect nodes
astiencoder.ConnectNodes(d, h.(astiencoder.Node))
}
// Start starts the decoder
func (d *Decoder) Start(ctx context.Context, t astiencoder.CreateTaskFunc) {
d.BaseNode.Start(ctx, t, func(t *astiworker.Task) {
// Handle context
go d.q.HandleCtx(d.Context())
// Make sure to wait for all dispatcher subprocesses to be done so that they are properly closed
defer d.d.wait()
// Make sure to stop the queue properly
defer d.q.Stop()
// Start queue
d.q.Start(func(p interface{}) {
// Handle pause
defer d.HandlePause()
// Assert payload
pkt := p.(*avcodec.Packet)
// Increment incoming rate
d.statIncomingRate.Add(1)
// Send pkt to decoder
d.statWorkRatio.Add(true)
if ret := avcodec.AvcodecSendPacket(d.ctxCodec, pkt); ret < 0 {
d.statWorkRatio.Done(true)
emitAvError(d.e, ret, "avcodec.AvcodecSendPacket failed")
return
}
d.statWorkRatio.Done(true)
// Loop
for {
// Receive frame
if stop := d.receiveFrame(); stop {
return
}
}
})
})
}
func (d *Decoder) receiveFrame() (stop bool) {
// Get frame
f := d.d.getFrame()
defer d.d.putFrame(f)
// Receive frame
d.statWorkRatio.Add(true)
if ret := avcodec.AvcodecReceiveFrame(d.ctxCodec, f); ret < 0 {
d.statWorkRatio.Done(true)
if ret != avutil.AVERROR_EOF && ret != avutil.AVERROR_EAGAIN {
emitAvError(d.e, ret, "avcodec.AvcodecReceiveFrame failed")
}
stop = true
return
}
d.statWorkRatio.Done(true)
// Dispatch frame
d.d.dispatch(f)
return
}
// HandlePkt implements the PktHandler interface
func (d *Decoder) HandlePkt(pkt *avcodec.Packet) {
d.q.Send(pkt)
}