diff --git a/runner/sidecar/monitor/impl.go b/runner/sidecar/monitor/impl.go index c4ef429f..7fde5fd1 100644 --- a/runner/sidecar/monitor/impl.go +++ b/runner/sidecar/monitor/impl.go @@ -85,9 +85,8 @@ func (i *impl) Accept(ctx context.Context, sourceName, sourceURN string, partiti } if offset > expectedOffset { missingCounter.WithLabelValues(sourceName).Inc() - } else { - i.db[key] = offset } + i.db[key] = offset return true, nil } diff --git a/runner/sidecar/monitor/impl_test.go b/runner/sidecar/monitor/impl_test.go index 0e16f998..54cfa75b 100644 --- a/runner/sidecar/monitor/impl_test.go +++ b/runner/sidecar/monitor/impl_test.go @@ -55,7 +55,7 @@ func Test_impl_Accept(t *testing.T) { }) thirtyDays := time.Hour * 24 * 30 rdb.On("Set", ctx, "my-pl/my-step/my-urn/1/offset", int64(1), thirtyDays).Return(nil) - rdb.On("Set", ctx, "my-pl/my-step/my-urn/2/offset", int64(2), thirtyDays).Return(nil) + rdb.On("Set", ctx, "my-pl/my-step/my-urn/2/offset", int64(4), thirtyDays).Return(nil) t.Run("CommitOffsets", func(t *testing.T) { i.commitOffsets(ctx) })