Skip to content

Commit

Permalink
Merge pull request #57 from bwNetFlow/prometheus
Browse files Browse the repository at this point in the history
implement support for multiple prometheus segments
  • Loading branch information
debugloop authored Oct 27, 2022
2 parents e33e706 + ce6e3dd commit 259c1ad
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 13 deletions.
16 changes: 9 additions & 7 deletions segments/export/prometheus/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

// Exporter provides export features to Prometheus
type Exporter struct {
MetaReg *prometheus.Registry
FlowReg *prometheus.Registry

kafkaMessageCount prometheus.Counter
Expand All @@ -35,7 +36,8 @@ func (e *Exporter) Initialize(labels []string) {
Name: "kafka_offset_current",
Help: "Current Kafka Offset of the consumer",
}, []string{"topic", "partition"})
prometheus.MustRegister(e.kafkaMessageCount, e.kafkaOffsets)
e.MetaReg = prometheus.NewRegistry()
e.MetaReg.MustRegister(e.kafkaMessageCount, e.kafkaOffsets)

// Flows are stored in a separate Registry
e.flowBits = prometheus.NewCounterVec(
Expand All @@ -51,9 +53,10 @@ func (e *Exporter) Initialize(labels []string) {

// listen on given endpoint addr with Handler for metricPath and flowdataPath
func (e *Exporter) ServeEndpoints(segment *Prometheus) {
http.Handle(segment.MetricsPath, promhttp.Handler())
http.Handle(segment.FlowdataPath, promhttp.HandlerFor(e.FlowReg, promhttp.HandlerOpts{}))
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
mux := http.NewServeMux()
mux.Handle(segment.MetricsPath, promhttp.HandlerFor(e.MetaReg, promhttp.HandlerOpts{}))
mux.Handle(segment.FlowdataPath, promhttp.HandlerFor(e.FlowReg, promhttp.HandlerOpts{}))
mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
w.Write([]byte(`<html>
<head><title>Flow Exporter</title></head>
<body>
Expand All @@ -63,11 +66,10 @@ func (e *Exporter) ServeEndpoints(segment *Prometheus) {
</body>
</html>`))
})

go func() {
http.ListenAndServe(segment.Endpoint, nil)
http.ListenAndServe(segment.Endpoint, mux)
}()
log.Printf("Enabled Prometheus %s and %s endpoints.", segment.MetricsPath, segment.FlowdataPath)
log.Printf("Enabled metrics on %s and %s, listening at %s.", segment.MetricsPath, segment.FlowdataPath, segment.Endpoint)
}

func (e *Exporter) Increment(bytes uint64, packets uint64, labelset prometheus.Labels) {
Expand Down
12 changes: 6 additions & 6 deletions segments/export/prometheus/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,16 +44,16 @@ func (segment Prometheus) New(config map[string]string) segments.Segment {
endpoint = config["endpoint"]
}
var metricsPath string = "/metrics"
if config["metricsPath"] == "" {
log.Println("[info] prometheus: Missing configuration parameter 'metricsPath'. Using default path \"/metrics\"")
if config["metricspath"] == "" {
log.Println("[info] prometheus: Missing configuration parameter 'metricspath'. Using default path \"/metrics\"")
} else {
metricsPath = config["metricsPath"]
metricsPath = config["metricspath"]
}
var flowdataPath string = "/flowdata"
if config["flowdataPath"] == "" {
log.Println("[info] prometheus: Missing configuration parameter 'flowdataPath'. Using default path \"/flowdata\"")
if config["flowdatapath"] == "" {
log.Println("[info] prometheus: Missing configuration parameter 'flowdatapath'. Using default path \"/flowdata\"")
} else {
flowdataPath = config["flowdataPath"]
flowdataPath = config["flowdatapath"]
}

newsegment := &Prometheus{
Expand Down

0 comments on commit 259c1ad

Please sign in to comment.