Skip to content

Commit

Permalink
fix(sidecar): updated to use fixed counters
Browse files Browse the repository at this point in the history
  • Loading branch information
alexec committed Jun 4, 2021
1 parent 6bef420 commit 3dd39f1
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 16 deletions.
26 changes: 17 additions & 9 deletions api/v1alpha1/sink_statuses.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,22 +32,30 @@ func (in SinkStatuses) IncErrors(name string, replica int, err error) {
in[name] = x
}

func (in SinkStatuses) AnySunk() bool {
func (in SinkStatuses) GetTotal() uint64 {
var x uint64
for _, s := range in {
for _, m := range s.Metrics {
if m.Total > 0 {
return true
}
x += m.Total
}
}
return false
return x
}

func (in SinkStatuses) AnyErrors() bool {
func (in SinkStatuses) GetErrors() uint64 {
var x uint64
for _, s := range in {
if s.AnyErrors() {
return true
for _, m := range s.Metrics {
x += m.Errors
}
}
return false
return x
}

func (in SinkStatuses) AnySunk() bool {
return in.GetTotal() > 0
}

func (in SinkStatuses) AnyErrors() bool {
return in.GetErrors() > 0
}
18 changes: 18 additions & 0 deletions api/v1alpha1/sink_statuses_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,24 @@ func TestSinkStatuses_Set(t *testing.T) {
}
}

func TestSinkStatuses_GetTotal(t *testing.T) {
assert.Equal(t, uint64(0), SinkStatuses{}.GetTotal())
assert.Equal(t, uint64(1), SinkStatuses{
"0": SinkStatus{
Metrics: map[string]Metrics{"0": {Total: 1}},
},
}.GetTotal())
}

func TestSinkStatuses_GetErrors(t *testing.T) {
assert.Equal(t, uint64(0), SinkStatuses{}.GetErrors())
assert.Equal(t, uint64(1), SinkStatuses{
"0": SinkStatus{
Metrics: map[string]Metrics{"0": {Errors: 1}},
},
}.GetErrors())
}

func TestSinkStatuses_AnyErrors(t *testing.T) {
assert.False(t, SinkStatuses{}.AnyErrors())
assert.True(t, SinkStatuses{"0": {
Expand Down
14 changes: 7 additions & 7 deletions runner/sidecar/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,30 +290,30 @@ func connectSources(ctx context.Context, toMain func([]byte) error) error {
sources[sourceName] = true

rateCounter := ratecounter.NewRateCounter(updateInterval)
totalMetric := promauto.NewCounter(prometheus.CounterOpts{

promauto.NewCounterFunc(prometheus.CounterOpts{
Name: "total",
Subsystem: "sources",
Help: "Total number of messages",
ConstLabels: map[string]string{"sourceName": source.Name},
})
errorMetric := promauto.NewCounter(prometheus.CounterOpts{
Name: "errors",
}, func() float64 { return float64(status.SinkStatues.GetTotal()) })

promauto.NewCounterFunc(prometheus.CounterOpts{
Subsystem: "sources",
Name: "errors",
Help: "Total number of errors",
ConstLabels: map[string]string{"sourceName": source.Name},
})
}, func() float64 { return float64(status.SinkStatues.GetErrors()) })

f := func(msg []byte) error {
rateCounter.Incr(1)
totalMetric.Inc()
withLock(func() {
rate := float64(rateCounter.Rate()) / updateInterval.Seconds()
status.SourceStatuses.Set(sourceName, replica, printable(msg), resource.MustParse(fmt.Sprintf("%.3f", rate)))
})
if err := toMain(msg); err != nil {
logger.Error(err, "⚠ →", "source", sourceName)
withLock(func() { status.SourceStatuses.IncErrors(sourceName, replica, err) })
errorMetric.Inc()
return err
}
return nil
Expand Down

0 comments on commit 3dd39f1

Please sign in to comment.