-
Notifications
You must be signed in to change notification settings - Fork 17
/
Copy pathdemuxer.go
109 lines (93 loc) · 2.6 KB
/
demuxer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
package astilibav
import (
"context"
"fmt"
"sync/atomic"
"github.com/asticode/go-astiencoder"
"github.com/asticode/go-astitools/stat"
"github.com/asticode/go-astitools/worker"
"github.com/asticode/goav/avformat"
"github.com/asticode/goav/avutil"
)
var countDemuxer uint64
// Demuxer represents an object capable of demuxing packets out of an input
type Demuxer struct {
*astiencoder.BaseNode
ctxFormat *avformat.Context
d *pktDispatcher
e astiencoder.EmitEventFunc
statWorkRatio *astistat.DurationRatioStat
}
// NewDemuxer creates a new demuxer
func NewDemuxer(ctxFormat *avformat.Context, e astiencoder.EmitEventFunc, c *astiencoder.Closer) (d *Demuxer) {
count := atomic.AddUint64(&countDemuxer, uint64(1))
d = &Demuxer{
BaseNode: astiencoder.NewBaseNode(e, astiencoder.NodeMetadata{
Description: fmt.Sprintf("Demuxes %s", ctxFormat.Filename()),
Label: fmt.Sprintf("Demuxer #%d", count),
Name: fmt.Sprintf("demuxer_%d", count),
}),
ctxFormat: ctxFormat,
d: newPktDispatcher(c),
e: e,
statWorkRatio: astistat.NewDurationRatioStat(),
}
d.addStats()
return
}
func (d *Demuxer) addStats() {
// Add work ratio
d.Stater().AddStat(astistat.StatMetadata{
Description: "Percentage of time spent doing some actual work",
Label: "Work ratio",
Unit: "%",
}, d.statWorkRatio)
// Add dispatcher stats
d.d.addStats(d.Stater())
}
// Connect connects the demuxer to a PktHandler for a specific stream index
func (d *Demuxer) Connect(i *avformat.Stream, h PktHandler) {
// Add handler
d.d.addHandler(newPktCond(i, h))
// Connect nodes
astiencoder.ConnectNodes(d, h.(astiencoder.Node))
}
// Start starts the demuxer
func (d *Demuxer) Start(ctx context.Context, t astiencoder.CreateTaskFunc) {
d.BaseNode.Start(ctx, t, func(t *astiworker.Task) {
// Make sure to wait for all dispatcher subprocesses to be done so that they are properly closed
defer d.d.wait()
// Loop
for {
// Read frame
if stop := d.readFrame(); stop {
return
}
// Handle pause
d.HandlePause()
// Check context
if d.Context().Err() != nil {
return
}
}
})
}
func (d *Demuxer) readFrame() (stop bool) {
// Get pkt from pool
pkt := d.d.getPkt()
defer d.d.putPkt(pkt)
// Read frame
d.statWorkRatio.Add(true)
if ret := d.ctxFormat.AvReadFrame(pkt); ret < 0 {
d.statWorkRatio.Done(true)
if ret != avutil.AVERROR_EOF {
emitAvError(d.e, ret, "ctxFormat.AvReadFrame on %s failed", d.ctxFormat.Filename())
}
stop = true
return
}
d.statWorkRatio.Done(true)
// Dispatch pkt
d.d.dispatch(pkt)
return
}