Skip to content

Commit 32947ed

Browse files
committed
streamd: follow prometheus naming conv and srt labels
1 parent bed6387 commit 32947ed

File tree

2 files changed

+131
-102
lines changed

2 files changed

+131
-102
lines changed

streamd/http.go

+119-97
Original file line numberDiff line numberDiff line change
@@ -11,155 +11,177 @@ type httpServer struct {
1111
daemonController
1212
}
1313

14-
// Minimalist prometheus exporter
15-
func (h *httpServer) metrics(w http.ResponseWriter, r *http.Request) {
16-
m := h.metricsSnapshot()
14+
func writeSRTStatsMeta(w http.ResponseWriter) {
15+
fmt.Fprintln(w, "# HELP srt_callers Current number of subscribers to the SRT stream")
16+
fmt.Fprintln(w, "# TYPE srt_callers gauge")
1717

18-
/* CPU */
18+
fmt.Fprintln(w, "# HELP srt_send_bytes_total Total bytes sent across all callers")
19+
fmt.Fprintln(w, "# TYPE srt_send_bytes_total counter")
1920

20-
cpuTime := m.cpu.Time.UnixMilli()
21-
fmt.Fprintf(w, "# HELP linux_proc_user Time spent in user mode, in ticks\n")
22-
fmt.Fprintf(w, "# TYPE linux_proc_user gauge\n")
23-
fmt.Fprintf(w, "linux_proc_user %d %d\n", m.cpu.User, cpuTime)
21+
fmt.Fprintln(w, "# HELP srt_send_rate Send rate in Mbps")
22+
fmt.Fprintln(w, "# TYPE srt_send_rate gauge")
2423

25-
fmt.Fprintf(w, "# HELP linux_proc_system Time spent in system mode, in ticks\n")
26-
fmt.Fprintf(w, "# TYPE linux_proc_system gauge\n")
27-
fmt.Fprintf(w, "linux_proc_system %d %d\n", m.cpu.System, cpuTime)
24+
fmt.Fprintln(w, "# HELP srt_bandwidth Bandwidth in Mbps")
25+
fmt.Fprintln(w, "# TYPE srt_bandwidth gauge")
2826

29-
fmt.Fprintf(w, "# HELP linux_proc_iowait Time spent waiting for I/O to complete, in ticks\n")
30-
fmt.Fprintf(w, "# TYPE linux_proc_iowait gauge\n")
31-
fmt.Fprintf(w, "linux_proc_iowait %d %d\n", m.cpu.Iowait, cpuTime)
27+
fmt.Fprintln(w, "# HELP srt_rtt_seconds RTT in s")
28+
fmt.Fprintln(w, "# TYPE srt_rtt_seconds gauge")
3229

33-
fmt.Fprintf(w, "# HELP linux_proc_irq Time spent servicing interrupts, in ticks\n")
34-
fmt.Fprintf(w, "# TYPE linux_proc_irq gauge\n")
35-
fmt.Fprintf(w, "linux_proc_irq %d %d\n", m.cpu.Irq, cpuTime)
30+
fmt.Fprintln(w, "# HELP srt_negotiated_latency_seconds Negotiated latency in s")
31+
fmt.Fprintln(w, "# TYPE srt_negotiated_latency_seconds gauge")
3632

37-
fmt.Fprintf(w, "# HELP linux_proc_softirq Time spent servicing soft interrupts, in ticks\n")
38-
fmt.Fprintf(w, "# TYPE linux_proc_softirq gauge\n")
39-
fmt.Fprintf(w, "linux_proc_softirq %d %d\n", m.cpu.SoftIrq, cpuTime)
33+
fmt.Fprintln(w, "# HELP srt_sent_bytes_total Total bytes sent")
34+
fmt.Fprintln(w, "# TYPE srt_sent_bytes_total counter")
4035

41-
/* Memory */
36+
fmt.Fprintln(w, "# HELP srt_retransmitted_bytes_total Total bytes retransmitted")
37+
fmt.Fprintln(w, "# TYPE srt_retransmitted_bytes_total counter")
4238

43-
memTime := m.mem.Time.UnixMilli()
44-
fmt.Fprintf(w, "# HELP linux_mem_used Amount of memory used, in kB\n")
45-
fmt.Fprintf(w, "# TYPE linux_mem_used gauge\n")
46-
fmt.Fprintf(w, "linux_mem_used %d %d\n", m.mem.MemUsed, memTime)
39+
fmt.Fprintln(w, "# HELP srt_sent_dropped_bytes_total Total bytes retransmitted")
40+
fmt.Fprintln(w, "# TYPE srt_sent_dropped_bytes_total counter")
4741

48-
fmt.Fprintf(w, "# HELP linux_mem_free Amount of free memory, in kB\n")
49-
fmt.Fprintf(w, "# TYPE linux_mem_free gauge\n")
50-
fmt.Fprintf(w, "linux_mem_free %d %d\n", m.mem.MemFree, memTime)
42+
fmt.Fprintln(w, "# HELP srt_packets_sent_total Total packets sent")
43+
fmt.Fprintln(w, "# TYPE srt_packets_sent_total counter")
5144

52-
/* TODO(hugo) I/O Status metrics via `iostat` when recording is implemented */
45+
fmt.Fprintln(w, "# HELP srt_packets_sent_lost_total Total packets lost")
46+
fmt.Fprintln(w, "# TYPE srt_packets_sent_lost_total counter")
5347

54-
/* Load Average */
55-
56-
// The timestamp is an int64 (milliseconds since epoch, i.e. 1970-01-01
57-
// 00:00:00 UTC, excluding leap seconds), represented as required by Go's
58-
// ParseInt() function.
59-
loadAvgTime := m.loadAvg.Time.UnixMilli()
60-
fmt.Fprintf(w, "# HELP load_avg_one Load average over one minute\n")
61-
fmt.Fprintf(w, "# TYPE load_avg_one gauge\n")
62-
fmt.Fprintf(w, "load_avg_one %f %d\n", m.loadAvg.One, loadAvgTime)
48+
fmt.Fprintln(w, "# HELP srt_packets_sent_dropped_total Total packets dropped")
49+
fmt.Fprintln(w, "# TYPE srt_packets_sent_dropped_total counter")
6350

64-
fmt.Fprintf(w, "# HELP load_avg_five Load average over five minutes\n")
65-
fmt.Fprintf(w, "# TYPE load_avg_five gauge\n")
66-
fmt.Fprintf(w, "load_avg_five %f %d\n", m.loadAvg.Five, loadAvgTime)
51+
fmt.Fprintln(w, "# HELP srt_packets_retransmitted_total Total packets retransmitted")
52+
fmt.Fprintln(w, "# TYPE srt_packets_retransmitted_total counter")
6753

68-
fmt.Fprintf(w, "# HELP load_avg_fifteen Load average over fifteen minutes\n")
69-
fmt.Fprintf(w, "# TYPE load_avg_fifteen gauge\n")
70-
fmt.Fprintf(w, "load_avg_fifteen %f %d\n", m.loadAvg.Fifteen, loadAvgTime)
54+
fmt.Fprintln(w, "# HELP srt_packets_ack_received_total Number of acks received")
55+
fmt.Fprintln(w, "# TYPE srt_packets_ack_received_total counter")
7156

72-
/* SRT Statistics */
57+
fmt.Fprintln(w, "# HELP srt_packets_nack_received_total Number of nacks received")
58+
fmt.Fprintln(w, "# TYPE srt_packets_nack_received_total counter")
59+
}
7360

74-
srtTime := m.compSinkStats.time.UnixMilli()
75-
fmt.Fprintf(w, "# HELP srt_callers Current number of subscribers to the SRT stream\n")
76-
fmt.Fprintf(w, "# TYPE srt_callers gauge\n")
77-
fmt.Fprintf(w, "srt_callers %d %d\n", len(m.compSinkStats.callers), srtTime)
61+
func writeSRTStats(w http.ResponseWriter, s *srtStats, sink string) {
62+
srtTime := s.time.UnixMilli()
63+
fmt.Fprintf(w, "srt_callers{sink=\"%s\"} %d %d\n", sink, len(s.callers), srtTime)
7864

7965
// Total bytes sent
80-
fmt.Fprintf(w, "# HELP srt_bytes_send_total Total bytes sent across all callers\n")
81-
fmt.Fprintf(w, "# TYPE srt_bytes_send_total counter\n")
82-
fmt.Fprintf(w, "srt_bytes_send_total %d %d\n", m.compSinkStats.bytesSendTotal, srtTime)
66+
fmt.Fprintf(w, "srt_send_bytes_total{sink=\"%s\"} %d %d\n", sink, s.bytesSendTotal, srtTime)
8367

8468
// Send rate per caller
85-
// TODO: caller should be identified by IP, but that field is NULL in the srt stats structure
86-
for i, caller := range m.compSinkStats.callers {
87-
common := fmt.Sprintf("caller=\"%d\"", i)
69+
for _, caller := range s.callers {
70+
common := fmt.Sprintf("address=\"%s\", port=\"%d\", sink=\"%s\"", caller.callerAddress.String(), caller.callerPort, sink)
8871

8972
// Send Rate
90-
fmt.Fprintf(w, "# HELP srt_send_rate Send rate in Mbps\n")
91-
fmt.Fprintf(w, "# TYPE srt_send_rate gauge\n")
9273
fmt.Fprintf(w, "srt_send_rate{%s} %f %d\n", common, caller.sendRateMbps, srtTime)
9374

9475
// Bandwidth
95-
fmt.Fprintf(w, "# HELP srt_bandwidth Bandwidth in Mbps\n")
96-
fmt.Fprintf(w, "# TYPE srt_bandwidth gauge\n")
9776
fmt.Fprintf(w, "srt_bandwidth{%s} %f %d\n", common, caller.bandwidthMbps, srtTime)
9877

9978
// Round-trip time (RTT)
100-
fmt.Fprintf(w, "# HELP srt_rtt RTT in ms\n")
101-
fmt.Fprintf(w, "# TYPE srt_rtt gauge\n")
102-
fmt.Fprintf(w, "srt_rtt{%s} %f %d\n", common, caller.rttMS, srtTime)
79+
fmt.Fprintf(w, "srt_rtt_seconds{%s} %f %d\n", common, caller.rttMS/1000, srtTime)
10380

10481
// Negotiated Latency
105-
fmt.Fprintf(w, "# HELP srt_negotiated_latency Negotiated latency in ms\n")
106-
fmt.Fprintf(w, "# TYPE srt_negotiated_latency gauge\n")
107-
fmt.Fprintf(w, "srt_negotiated_latency{%s} %d %d\n", common, caller.negotiatedLatencyMS, srtTime)
82+
fmt.Fprintf(w, "srt_negotiated_latency_seconds{%s} %d %d\n", common, caller.negotiatedLatencyMS/1000, srtTime)
10883

10984
// Bytes sent
110-
fmt.Fprintf(w, "# HELP srt_bytes_sent Total bytes sent\n")
111-
fmt.Fprintf(w, "# TYPE srt_bytes_sent gauge\n")
112-
fmt.Fprintf(w, "srt_bytes_sent{%s} %d %d\n", common, caller.bytesSent, srtTime)
85+
fmt.Fprintf(w, "srt_sent_bytes_total{%s} %d %d\n", common, caller.bytesSent, srtTime)
11386

11487
// Bytes Retransmitted
115-
fmt.Fprintf(w, "# HELP srt_bytes_retransmitted Total bytes retransmitted\n")
116-
fmt.Fprintf(w, "# TYPE srt_bytes_retransmitted gauge\n")
117-
fmt.Fprintf(w, "srt_bytes_retransmitted{%s} %d %d\n", common, caller.bytesRetransmitted, srtTime)
88+
fmt.Fprintf(w, "srt_retransmitted_bytes_total{%s} %d %d\n", common, caller.bytesRetransmitted, srtTime)
11889

11990
// Bytes Sent Dropped
120-
fmt.Fprintf(w, "# HELP srt_bytes_send_dropped Total bytes retransmitted\n")
121-
fmt.Fprintf(w, "# TYPE srt_bytes_send_dropped gauge\n")
122-
fmt.Fprintf(w, "srt_bytes_send_dropped{%s} %d %d\n", common, caller.bytesSentDropped, srtTime)
91+
fmt.Fprintf(w, "srt_sent_dropped_bytes_total{%s} %d %d\n", common, caller.bytesSentDropped, srtTime)
12392

12493
// Packets sent
125-
fmt.Fprintf(w, "# HELP srt_packets_sent Total packets sent\n")
126-
fmt.Fprintf(w, "# TYPE srt_packets_sent gauge\n")
127-
fmt.Fprintf(w, "srt_packets_sent{%s} %d %d\n", common, caller.packetsSent, srtTime)
94+
fmt.Fprintf(w, "srt_packets_sent_total{%s} %d %d\n", common, caller.packetsSent, srtTime)
12895

12996
// Packets Sent Lost
130-
fmt.Fprintf(w, "# HELP srt_packets_sent_lost Total packets lost\n")
131-
fmt.Fprintf(w, "# TYPE srt_packets_sent_lost gauge\n")
132-
fmt.Fprintf(w, "srt_packets_sent_lost{%s} %d %d\n", common, caller.packetsSentLost, srtTime)
97+
fmt.Fprintf(w, "srt_packets_sent_lost_total{%s} %d %d\n", common, caller.packetsSentLost, srtTime)
13398

13499
// Packets Sent Dropped
135-
fmt.Fprintf(w, "# HELP srt_packets_sent_dropped Total packets dropped\n")
136-
fmt.Fprintf(w, "# TYPE srt_packets_sent_dropped gauge\n")
137-
fmt.Fprintf(w, "srt_packets_sent_dropped{%s} %d %d\n", common, caller.packetsSentDropped, srtTime)
100+
fmt.Fprintf(w, "srt_packets_sent_dropped_total{%s} %d %d\n", common, caller.packetsSentDropped, srtTime)
138101

139102
// Packets Retransmitted
140-
fmt.Fprintf(w, "# HELP srt_packets_retransmitted Total packets retransmitted\n")
141-
fmt.Fprintf(w, "# TYPE srt_packets_retransmitted gauge\n")
142-
fmt.Fprintf(w, "srt_packets_retransmitted{%s} %d %d\n", common, caller.packetsRetransmitted, srtTime)
103+
fmt.Fprintf(w, "srt_packets_retransmitted_total{%s} %d %d\n", common, caller.packetsRetransmitted, srtTime)
143104

144105
// Packets Ack Received
145-
fmt.Fprintf(w, "# HELP srt_packets_ack_received Number of acks received\n")
146-
fmt.Fprintf(w, "# TYPE srt_packets_ack_received gauge\n")
147-
fmt.Fprintf(w, "srt_packets_ack_received{%s} %d %d\n", common, caller.packetAckReceived, srtTime)
106+
fmt.Fprintf(w, "srt_packets_ack_received_total{%s} %d %d\n", common, caller.packetAckReceived, srtTime)
148107

149108
// Packets Nack Received
150-
fmt.Fprintf(w, "# HELP srt_packets_nack_received Number of nacks received\n")
151-
fmt.Fprintf(w, "# TYPE srt_packets_nack_received gauge\n")
152-
fmt.Fprintf(w, "srt_packets_nack_received{%s} %d %d\n", common, caller.packetNackReceived, srtTime)
153-
154-
// TODO(hugo): Add receive metrics from 'srtCallerStats'?
109+
fmt.Fprintf(w, "srt_packets_nack_received_total{%s} %d %d\n", common, caller.packetNackReceived, srtTime)
155110
}
156111

112+
}
113+
114+
// Minimalist prometheus exporter
115+
func (h *httpServer) metrics(w http.ResponseWriter, r *http.Request) {
116+
m := h.metricsSnapshot()
117+
118+
/* CPU */
119+
120+
cpuTime := m.cpu.Time.UnixMilli()
121+
fmt.Fprintf(w, "# HELP linux_proc_user_total Time spent in user mode, in ticks\n")
122+
fmt.Fprintf(w, "# TYPE linux_proc_user_total counter\n")
123+
fmt.Fprintf(w, "linux_proc_user_total %d %d\n", m.cpu.User, cpuTime)
124+
125+
fmt.Fprintf(w, "# HELP linux_proc_system_total Time spent in system mode, in ticks\n")
126+
fmt.Fprintf(w, "# TYPE linux_proc_system_total counter\n")
127+
fmt.Fprintf(w, "linux_proc_system_total %d %d\n", m.cpu.System, cpuTime)
128+
129+
fmt.Fprintf(w, "# HELP linux_proc_iowait_total Time spent waiting for I/O to complete, in ticks\n")
130+
fmt.Fprintf(w, "# TYPE linux_proc_iowait_total counter\n")
131+
fmt.Fprintf(w, "linux_proc_iowait_total %d %d\n", m.cpu.Iowait, cpuTime)
132+
133+
fmt.Fprintf(w, "# HELP linux_proc_irq_total Time spent servicing interrupts, in ticks\n")
134+
fmt.Fprintf(w, "# TYPE linux_proc_irq_total counter\n")
135+
fmt.Fprintf(w, "linux_proc_irq_total %d %d\n", m.cpu.Irq, cpuTime)
136+
137+
fmt.Fprintf(w, "# HELP linux_proc_softirq_total Time spent servicing soft interrupts, in ticks\n")
138+
fmt.Fprintf(w, "# TYPE linux_proc_softirq_total counter\n")
139+
fmt.Fprintf(w, "linux_proc_softirq_total %d %d\n", m.cpu.SoftIrq, cpuTime)
140+
141+
/* Memory */
142+
143+
memTime := m.mem.Time.UnixMilli()
144+
fmt.Fprintf(w, "# HELP linux_mem_used_bytes Amount of memory used, in bytes\n")
145+
fmt.Fprintf(w, "# TYPE linux_mem_used_bytes gauge\n")
146+
fmt.Fprintf(w, "linux_mem_used_bytes %d %d\n", m.mem.MemUsed*1024, memTime)
147+
148+
fmt.Fprintf(w, "# HELP linux_mem_free_bytes Amount of free memory, in bytes\n")
149+
fmt.Fprintf(w, "# TYPE linux_mem_free_bytes gauge\n")
150+
fmt.Fprintf(w, "linux_mem_free_bytes %d %d\n", m.mem.MemFree*1024, memTime)
151+
152+
/* TODO(hugo) I/O Status metrics via `iostat` when recording is implemented */
153+
154+
/* Load Average */
155+
156+
// The timestamp is an int64 (milliseconds since epoch, i.e. 1970-01-01
157+
// 00:00:00 UTC, excluding leap seconds), represented as required by Go's
158+
// ParseInt() function.
159+
loadAvgTime := m.loadAvg.Time.UnixMilli()
160+
fmt.Fprintf(w, "# HELP load_avg_one Load average over one minute\n")
161+
fmt.Fprintf(w, "# TYPE load_avg_one gauge\n")
162+
fmt.Fprintf(w, "load_avg_one %f %d\n", m.loadAvg.One, loadAvgTime)
163+
164+
fmt.Fprintf(w, "# HELP load_avg_five Load average over five minutes\n")
165+
fmt.Fprintf(w, "# TYPE load_avg_five gauge\n")
166+
fmt.Fprintf(w, "load_avg_five %f %d\n", m.loadAvg.Five, loadAvgTime)
167+
168+
fmt.Fprintf(w, "# HELP load_avg_fifteen Load average over fifteen minutes\n")
169+
fmt.Fprintf(w, "# TYPE load_avg_fifteen gauge\n")
170+
fmt.Fprintf(w, "load_avg_fifteen %f %d\n", m.loadAvg.Fifteen, loadAvgTime)
171+
172+
/* SRT Statistics */
173+
174+
writeSRTStatsMeta(w)
175+
writeSRTStats(w, &m.compSinkStats, "combined")
176+
writeSRTStats(w, &m.presentSinkStats, "present")
177+
writeSRTStats(w, &m.camSinkStats, "camera")
178+
157179
/* GStreamer Statistics */
158180

159181
for k, v := range m.pipelineStats.qosEvents {
160-
fmt.Fprintf(w, "# HELP gst_qos_events Number of qos events\n")
161-
fmt.Fprintf(w, "# TYPE gst_qos_events gauge\n")
162-
fmt.Fprintf(w, "gst_qos_events{source=\"%s\"} %d\n", k, v)
182+
fmt.Fprintf(w, "# HELP gst_qos_events_total Number of qos events\n")
183+
fmt.Fprintf(w, "# TYPE gst_qos_events_total gauge\n")
184+
fmt.Fprintf(w, "gst_qos_events_total{source=\"%s\"} %d\n", k, v)
163185
}
164186
}
165187

streamd/metrics.go

+12-5
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,13 @@ import (
99
)
1010

1111
type metrics struct {
12-
compSinkStats srtStats
13-
pipelineStats pipelineStats // Updated by bus watch on main thread
14-
cpu systemstat.CPUSample
15-
mem systemstat.MemSample
16-
loadAvg systemstat.LoadAvgSample
12+
compSinkStats srtStats
13+
presentSinkStats srtStats
14+
camSinkStats srtStats
15+
pipelineStats pipelineStats // Updated by bus watch on main thread
16+
cpu systemstat.CPUSample
17+
mem systemstat.MemSample
18+
loadAvg systemstat.LoadAvgSample
1719
}
1820

1921
func (d *daemon) metricsProcess(ctx context.Context) {
@@ -26,19 +28,24 @@ func (d *daemon) metricsProcess(ctx context.Context) {
2628
mem := systemstat.GetMemSample()
2729
loadAvg := systemstat.GetLoadAvgSample()
2830

31+
// []*srtStats{combStats, presentStats, camStats}
2932
srtStats, err := d.srtStatistics()
3033
if err != nil {
3134
klog.Warningf("failed to retrieve statistics from srtsinks: %v", err)
3235
continue
3336
}
3437

3538
srtCompStats := srtStats[0]
39+
srtPresentStats := srtStats[1]
40+
srtCamStats := srtStats[2]
3641

3742
d.mu.Lock()
3843
d.metrics.cpu = cpu
3944
d.metrics.mem = mem
4045
d.metrics.loadAvg = loadAvg
4146
d.metrics.compSinkStats = *srtCompStats
47+
d.metrics.presentSinkStats = *srtPresentStats
48+
d.metrics.camSinkStats = *srtCamStats
4249
d.mu.Unlock()
4350

4451
time.Sleep(time.Second * 1)

0 commit comments

Comments
 (0)