Skip to content

Commit ea9a191

Browse files
committed
Implement RTPDump Reader/Writer
RTPDump is a widely-implemented file format for saving RTP packet dumps without the overhead of UDP and IP headers found in pcap dumps. libWebRTC, Wireshark, and RTPTools all have an implementation. For more information see: https://www.cs.columbia.edu/irt/software/rtptools Relates to #549
1 parent 5ee8b1a commit ea9a191

File tree

6 files changed

+820
-0
lines changed

6 files changed

+820
-0
lines changed

pkg/media/rtpdump/reader.go

+103
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
package rtpdump
2+
3+
import (
4+
"bufio"
5+
"io"
6+
"regexp"
7+
"sync"
8+
)
9+
10+
// The file starts with #!rtpplay1.0 address/port\n
11+
var preambleRegexp = regexp.MustCompile(`#\!rtpplay1\.0 \d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\/\d{1,5}\n`)
12+
13+
// Reader reads the RTPDump file format
14+
type Reader struct {
15+
readerMu sync.Mutex
16+
reader io.Reader
17+
}
18+
19+
// NewReader opens a new Reader and immediately reads the Header from the start
20+
// of the input stream.
21+
func NewReader(r io.Reader) (*Reader, Header, error) {
22+
var hdr Header
23+
24+
bio := bufio.NewReader(r)
25+
26+
// Look ahead to see if there's a valid preamble
27+
peek, err := bio.Peek(preambleLen)
28+
if err == io.EOF {
29+
return nil, hdr, errMalformed
30+
}
31+
if err != nil {
32+
return nil, hdr, err
33+
}
34+
if !preambleRegexp.Match(peek) {
35+
return nil, hdr, errMalformed
36+
}
37+
38+
// consume the preamble
39+
_, _, err = bio.ReadLine()
40+
if err == io.EOF {
41+
return nil, hdr, errMalformed
42+
}
43+
if err != nil {
44+
return nil, hdr, err
45+
}
46+
47+
hBuf := make([]byte, headerLen)
48+
_, err = io.ReadFull(bio, hBuf)
49+
if err == io.ErrUnexpectedEOF || err == io.EOF {
50+
return nil, hdr, errMalformed
51+
}
52+
if err != nil {
53+
return nil, hdr, err
54+
}
55+
56+
if err := hdr.Unmarshal(hBuf); err != nil {
57+
return nil, hdr, err
58+
}
59+
60+
return &Reader{
61+
reader: bio,
62+
}, hdr, nil
63+
}
64+
65+
// Next returns the next Packet in the Reader input stream
66+
func (r *Reader) Next() (Packet, error) {
67+
r.readerMu.Lock()
68+
defer r.readerMu.Unlock()
69+
70+
hBuf := make([]byte, pktHeaderLen)
71+
72+
_, err := io.ReadFull(r.reader, hBuf)
73+
if err == io.ErrUnexpectedEOF {
74+
return Packet{}, errMalformed
75+
}
76+
if err != nil {
77+
return Packet{}, err
78+
}
79+
80+
var h packetHeader
81+
if err = h.Unmarshal(hBuf); err != nil {
82+
return Packet{}, err
83+
}
84+
85+
if h.Length == 0 {
86+
return Packet{}, errMalformed
87+
}
88+
89+
payload := make([]byte, h.Length-pktHeaderLen)
90+
_, err = io.ReadFull(r.reader, payload)
91+
if err == io.ErrUnexpectedEOF {
92+
return Packet{}, errMalformed
93+
}
94+
if err != nil {
95+
return Packet{}, err
96+
}
97+
98+
return Packet{
99+
Offset: h.Offset,
100+
IsRTCP: h.PacketLength == 0,
101+
Payload: payload,
102+
}, nil
103+
}

pkg/media/rtpdump/reader_test.go

+282
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,282 @@
1+
package rtpdump
2+
3+
import (
4+
"bytes"
5+
"io"
6+
"net"
7+
"reflect"
8+
"testing"
9+
"time"
10+
)
11+
12+
var validPreamble = []byte("#!rtpplay1.0 224.2.0.1/3456\n")
13+
14+
func TestReader(t *testing.T) {
15+
for _, test := range []struct {
16+
Name string
17+
Data []byte
18+
WantHeader Header
19+
WantPackets []Packet
20+
WantErr error
21+
}{
22+
{
23+
Name: "empty",
24+
Data: nil,
25+
WantErr: errMalformed,
26+
},
27+
{
28+
Name: "hashbang missing ip/port",
29+
Data: append(
30+
[]byte("#!rtpplay1.0 \n"),
31+
0x00, 0x00, 0x00, 0x00,
32+
0x00, 0x00, 0x00, 0x00,
33+
0x00, 0x00, 0x00, 0x00,
34+
0x00, 0x00, 0x00, 0x00,
35+
),
36+
WantErr: errMalformed,
37+
},
38+
{
39+
Name: "hashbang missing port",
40+
Data: append(
41+
[]byte("#!rtpplay1.0 0.0.0.0\n"),
42+
0x00, 0x00, 0x00, 0x00,
43+
0x00, 0x00, 0x00, 0x00,
44+
0x00, 0x00, 0x00, 0x00,
45+
0x00, 0x00, 0x00, 0x00,
46+
),
47+
WantErr: errMalformed,
48+
},
49+
{
50+
Name: "valid empty file",
51+
Data: append(
52+
validPreamble,
53+
0x00, 0x00, 0x00, 0x01,
54+
0x00, 0x00, 0x00, 0x00,
55+
0x01, 0x01, 0x01, 0x01,
56+
0x22, 0xB8, 0x00, 0x00,
57+
),
58+
WantHeader: Header{
59+
Start: time.Unix(1, 0).UTC(),
60+
Source: net.IPv4(1, 1, 1, 1),
61+
Port: 8888,
62+
},
63+
},
64+
{
65+
Name: "malformed packet header",
66+
Data: append(
67+
validPreamble,
68+
// header
69+
0x00, 0x00, 0x00, 0x00,
70+
0x00, 0x00, 0x00, 0x00,
71+
0x00, 0x00, 0x00, 0x00,
72+
0x00, 0x00, 0x00, 0x00,
73+
// packet header
74+
0x00,
75+
),
76+
WantHeader: Header{
77+
Start: time.Unix(0, 0).UTC(),
78+
Source: net.IPv4(0, 0, 0, 0),
79+
Port: 0,
80+
},
81+
WantErr: errMalformed,
82+
},
83+
{
84+
Name: "short packet payload",
85+
Data: append(
86+
validPreamble,
87+
// header
88+
0x00, 0x00, 0x00, 0x00,
89+
0x00, 0x00, 0x00, 0x00,
90+
0x00, 0x00, 0x00, 0x00,
91+
0x00, 0x00, 0x00, 0x00,
92+
// packet header len=1048575
93+
0xFF, 0xFF, 0x00, 0x00,
94+
0x00, 0x00, 0x00, 0x00,
95+
// packet paylaod
96+
0x00,
97+
),
98+
WantHeader: Header{
99+
Start: time.Unix(0, 0).UTC(),
100+
Source: net.IPv4(0, 0, 0, 0),
101+
Port: 0,
102+
},
103+
WantErr: errMalformed,
104+
},
105+
{
106+
Name: "empty packet payload",
107+
Data: append(
108+
validPreamble,
109+
// header
110+
0x00, 0x00, 0x00, 0x00,
111+
0x00, 0x00, 0x00, 0x00,
112+
0x00, 0x00, 0x00, 0x00,
113+
0x00, 0x00, 0x00, 0x00,
114+
// packet header len=0
115+
0x00, 0x00, 0x00, 0x00,
116+
0x00, 0x00, 0x00, 0x00,
117+
),
118+
WantHeader: Header{
119+
Start: time.Unix(0, 0).UTC(),
120+
Source: net.IPv4(0, 0, 0, 0),
121+
Port: 0,
122+
},
123+
WantErr: errMalformed,
124+
},
125+
{
126+
Name: "valid rtcp packet",
127+
Data: append(
128+
validPreamble,
129+
// header
130+
0x00, 0x00, 0x00, 0x00,
131+
0x00, 0x00, 0x00, 0x00,
132+
0x00, 0x00, 0x00, 0x00,
133+
0x00, 0x00, 0x00, 0x00,
134+
// packet header len=20, pLen=0, off=1
135+
0x00, 0x14, 0x00, 0x00,
136+
0x00, 0x00, 0x00, 0x01,
137+
// packet payload (BYE)
138+
0x81, 0xcb, 0x00, 0x0c,
139+
0x90, 0x2f, 0x9e, 0x2e,
140+
0x03, 0x46, 0x4f, 0x4f,
141+
),
142+
WantHeader: Header{
143+
Start: time.Unix(0, 0).UTC(),
144+
Source: net.IPv4(0, 0, 0, 0),
145+
Port: 0,
146+
},
147+
WantPackets: []Packet{
148+
{
149+
Offset: 1,
150+
IsRTCP: true,
151+
Payload: []byte{
152+
0x81, 0xcb, 0x00, 0x0c,
153+
0x90, 0x2f, 0x9e, 0x2e,
154+
0x03, 0x46, 0x4f, 0x4f,
155+
},
156+
},
157+
},
158+
WantErr: nil,
159+
},
160+
{
161+
Name: "truncated rtcp packet",
162+
Data: append(
163+
validPreamble,
164+
// header
165+
0x00, 0x00, 0x00, 0x00,
166+
0x00, 0x00, 0x00, 0x00,
167+
0x00, 0x00, 0x00, 0x00,
168+
0x00, 0x00, 0x00, 0x00,
169+
// packet header len=9, pLen=0, off=1
170+
0x00, 0x09, 0x00, 0x00,
171+
0x00, 0x00, 0x00, 0x01,
172+
// invalid payload
173+
0x81,
174+
),
175+
WantHeader: Header{
176+
Start: time.Unix(0, 0).UTC(),
177+
Source: net.IPv4(0, 0, 0, 0),
178+
Port: 0,
179+
},
180+
WantPackets: []Packet{
181+
{
182+
Offset: 1,
183+
IsRTCP: true,
184+
Payload: []byte{0x81},
185+
},
186+
},
187+
},
188+
{
189+
Name: "two valid packets",
190+
Data: append(
191+
validPreamble,
192+
// header
193+
0x00, 0x00, 0x00, 0x00,
194+
0x00, 0x00, 0x00, 0x00,
195+
0x00, 0x00, 0x00, 0x00,
196+
0x00, 0x00, 0x00, 0x00,
197+
// packet header len=20, pLen=0, off=1
198+
0x00, 0x14, 0x00, 0x00,
199+
0x00, 0x00, 0x00, 0x01,
200+
// packet payload (BYE)
201+
0x81, 0xcb, 0x00, 0x0c,
202+
0x90, 0x2f, 0x9e, 0x2e,
203+
0x03, 0x46, 0x4f, 0x4f,
204+
// packet header len=33, pLen=25, off=2
205+
0x00, 0x21, 0x19, 0x00,
206+
0x00, 0x00, 0x00, 0x02,
207+
// packet payload (RTP)
208+
0x90, 0x60, 0x69, 0x8f,
209+
0xd9, 0xc2, 0x93, 0xda,
210+
0x1c, 0x64, 0x27, 0x82,
211+
0x00, 0x01, 0x00, 0x01,
212+
0xFF, 0xFF, 0xFF, 0xFF,
213+
0x98, 0x36, 0xbe, 0x88,
214+
0x9e,
215+
),
216+
WantHeader: Header{
217+
Start: time.Unix(0, 0).UTC(),
218+
Source: net.IPv4(0, 0, 0, 0),
219+
Port: 0,
220+
},
221+
WantPackets: []Packet{
222+
{
223+
Offset: 1,
224+
IsRTCP: true,
225+
Payload: []byte{
226+
0x81, 0xcb, 0x00, 0x0c,
227+
0x90, 0x2f, 0x9e, 0x2e,
228+
0x03, 0x46, 0x4f, 0x4f,
229+
},
230+
},
231+
{
232+
Offset: 2,
233+
IsRTCP: false,
234+
Payload: []byte{
235+
0x90, 0x60, 0x69, 0x8f,
236+
0xd9, 0xc2, 0x93, 0xda,
237+
0x1c, 0x64, 0x27, 0x82,
238+
0x00, 0x01, 0x00, 0x01,
239+
0xFF, 0xFF, 0xFF, 0xFF,
240+
0x98, 0x36, 0xbe, 0x88,
241+
0x9e,
242+
},
243+
},
244+
},
245+
WantErr: nil,
246+
},
247+
} {
248+
r, hdr, err := NewReader(bytes.NewReader(test.Data))
249+
if err != nil {
250+
if got, want := err, test.WantErr; got != want {
251+
t.Fatalf("NewReader(%s) err=%v want %v", test.Name, got, want)
252+
}
253+
continue
254+
}
255+
256+
if got, want := hdr, test.WantHeader; !reflect.DeepEqual(got, want) {
257+
t.Fatalf("%q Header = %#v, want %#v", test.Name, got, want)
258+
}
259+
260+
var nextErr error
261+
var packets []Packet
262+
for {
263+
pkt, err := r.Next()
264+
if err == io.EOF {
265+
break
266+
}
267+
if err != nil {
268+
nextErr = err
269+
break
270+
}
271+
272+
packets = append(packets, pkt)
273+
}
274+
275+
if got, want := nextErr, test.WantErr; got != want {
276+
t.Fatalf("%s err=%v want %v", test.Name, got, want)
277+
}
278+
if got, want := packets, test.WantPackets; !reflect.DeepEqual(got, want) {
279+
t.Fatalf("%q packets=%#v, want %#v", test.Name, got, want)
280+
}
281+
}
282+
}

0 commit comments

Comments
 (0)