Skip to content

Commit 5b0dd6e

Browse files
shift the max value logic to health monitor
Signed-off-by: Yashvardhan Kukreja <[email protected]>
1 parent 12ffb72 commit 5b0dd6e

File tree

4 files changed

+61
-79
lines changed

4 files changed

+61
-79
lines changed

op-conductor/health/monitor.go

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -65,8 +65,9 @@ func NewSequencerHealthMonitor(log log.Logger, metrics metrics.Metricer, interva
6565
}
6666
}
6767
if rollupBoostToleratePartialHealthinessToleranceLimit != 0 {
68+
hm.rollupBoostPartialHealthinessToleranceLimit = rollupBoostToleratePartialHealthinessToleranceLimit
6869
var err error
69-
hm.timeTolerantRollupBoostPartialHealthinessMgr, err = NewTimeBoundedRotatingCounter(rollupBoostToleratePartialHealthinessToleranceIntervalSeconds, rollupBoostToleratePartialHealthinessToleranceLimit)
70+
hm.rollupBoostPartialHealthinessToleranceCounter, err = NewTimeBoundedRotatingCounter(rollupBoostToleratePartialHealthinessToleranceIntervalSeconds)
7071
if err != nil {
7172
panic(fmt.Errorf("failed to setup health monitor: %w", err))
7273
}
@@ -100,12 +101,13 @@ type SequencerHealthMonitor struct {
100101

101102
timeProviderFn func() uint64
102103

103-
node dial.RollupClientInterface
104-
p2p apis.P2PClient
105-
supervisor SupervisorHealthAPI
106-
rb client.RollupBoostClient
107-
elP2p *ElP2pHealthMonitor
108-
timeTolerantRollupBoostPartialHealthinessMgr *timeBoundedRotatingCounter
104+
node dial.RollupClientInterface
105+
p2p apis.P2PClient
106+
supervisor SupervisorHealthAPI
107+
rb client.RollupBoostClient
108+
elP2p *ElP2pHealthMonitor
109+
rollupBoostPartialHealthinessToleranceLimit uint64
110+
rollupBoostPartialHealthinessToleranceCounter *timeBoundedRotatingCounter
109111
}
110112

111113
var _ HealthMonitor = (*SequencerHealthMonitor)(nil)
@@ -296,11 +298,10 @@ func (hm *SequencerHealthMonitor) checkRollupBoost(ctx context.Context) error {
296298
case client.HealthStatusHealthy:
297299
return nil
298300
case client.HealthStatusPartial:
299-
if hm.timeTolerantRollupBoostPartialHealthinessMgr != nil {
300-
if _, err := hm.timeTolerantRollupBoostPartialHealthinessMgr.Increment(); err == nil {
301-
hm.log.Warn("[Tolerating Failure] Rollup boost is partial failure, builder is down but fallback execution client is up", "err", ErrRollupBoostPartiallyHealthy)
302-
return nil
303-
}
301+
if hm.rollupBoostPartialHealthinessToleranceCounter != nil && hm.rollupBoostPartialHealthinessToleranceCounter.CurrentValue() < hm.rollupBoostPartialHealthinessToleranceLimit {
302+
latestValue := hm.rollupBoostPartialHealthinessToleranceCounter.Increment()
303+
hm.log.Debug("Rollup-boost partial unhealthiness failure tolerated", "currentValue", latestValue, "limit", hm.rollupBoostPartialHealthinessToleranceLimit)
304+
return nil
304305
}
305306
hm.log.Error("Rollup boost is partial failure, builder is down but fallback execution client is up", "err", ErrRollupBoostPartiallyHealthy)
306307
return ErrRollupBoostPartiallyHealthy

op-conductor/health/monitor_test.go

Lines changed: 25 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -473,57 +473,52 @@ func (s *HealthMonitorTestSuite) TestRollupBoostPartialStatusWithTolerance() {
473473
rb.EXPECT().Healthcheck(mock.Anything).Return(client.HealthStatusPartial, nil)
474474

475475
toleranceLimit := uint64(2)
476-
toleranceIntervalSeconds := uint64(3)
476+
toleranceIntervalSeconds := uint64(6)
477477

478-
timeBoundedRotatingCounter, err := NewTimeBoundedRotatingCounter(toleranceIntervalSeconds, toleranceLimit)
478+
timeBoundedRotatingCounter, err := NewTimeBoundedRotatingCounter(toleranceIntervalSeconds)
479479
s.Nil(err)
480480

481+
tp := &timeProvider{now: 1758792282}
482+
481483
// Start monitor with all dependencies as well as tolerance of 2 rollup-boost partial unhealthiness per 3s period
482484
monitor := s.SetupMonitorWithRollupBoost(now, 60, 60, rc, pc, rb, nil, func(shm *SequencerHealthMonitor) {
483-
tp := &timeProvider{now: 1758792282}
484485
timeBoundedRotatingCounter.timeProviderFn = tp.Now
485486

486-
// pollute the cache of timeBoundRotatingCounter with 999 elements so as to later test the lazy cleanup
487-
// note: the 1000th element will be added by the first healthchecl run
487+
// pollute the cache of timeBoundRotatingCounter with 998 elements so as to later test the lazy cleanup
488+
// note: the 999th and 1000th element will be added by the first healthcheck run
488489
for i := 0; i < 999; i++ {
489490
timeBoundedRotatingCounter.temporalCache[int64(i)] = uint64(1)
490491
}
491492

492-
shm.timeTolerantRollupBoostPartialHealthinessMgr = timeBoundedRotatingCounter
493+
shm.rollupBoostPartialHealthinessToleranceCounter = timeBoundedRotatingCounter
494+
shm.rollupBoostPartialHealthinessToleranceLimit = toleranceLimit
493495
})
494496

495497
healthUpdateCh := monitor.Subscribe()
496498

497-
// first error is tolerated (time t+1)
498-
healthStatus := <-healthUpdateCh
499-
s.Nil(healthStatus)
500-
s.Len(timeBoundedRotatingCounter.temporalCache, 1000) // lazy cleanup of the cache not done yet as it's within the bounds
499+
s.Eventually(func() bool {
500+
return len(timeBoundedRotatingCounter.temporalCache) == 1000
501+
}, time.Second*3, time.Second*1)
501502

502-
// second error is tolerated as well (time t+2)
503-
healthStatus = <-healthUpdateCh
504-
s.Nil(healthStatus)
505-
s.Len(timeBoundedRotatingCounter.temporalCache, 1000) // no change of the cache until the next reset
503+
firstHealthStatus := <-healthUpdateCh
504+
secondHealthStatus := <-healthUpdateCh
505+
thirdHealthStatus := <-healthUpdateCh
506506

507-
// third error isn't tolerated (time t+3)
508-
healthFailure := <-healthUpdateCh
509-
s.Equal(ErrRollupBoostPartiallyHealthy, healthFailure)
507+
s.Nil(firstHealthStatus)
508+
s.Nil(secondHealthStatus)
509+
s.Equal(ErrRollupBoostPartiallyHealthy, thirdHealthStatus)
510510

511-
// by now, because of three healthchecks, three seconds have been simulated to pass (by the timeProviderFn)
511+
tp.Now() // simulate another second passing
512+
// by now, because of three healthchecks, six seconds (CurrentValue + Increment + CurrentValue + Increment + CurrentValue + tp.Now()) have been simulated to pass (by the timeProviderFn)
512513
// this should reset the time bound counter, thereby allowing partial unhealthiness failures to be tolerated again
513514

514-
// first error after the reset is tolerated (time t+4)
515-
healthStatus = <-healthUpdateCh
516-
s.Nil(healthStatus)
517-
s.Len(timeBoundedRotatingCounter.temporalCache, 1) // lazy cleanup of the cache done and it's left with only the current value
518-
519-
// second error after the reset is tolerated as well (time t+5)
520-
healthStatus = <-healthUpdateCh
521-
s.Nil(healthStatus)
522-
s.Len(timeBoundedRotatingCounter.temporalCache, 1) // no change to the cache until the next reset
515+
fourthHealthStatus := <-healthUpdateCh
516+
fifthHealthStatus := <-healthUpdateCh
517+
sixthHealthStatus := <-healthUpdateCh
523518

524-
// third error after the reset isn't tolerated (time t+6)
525-
healthFailure = <-healthUpdateCh
526-
s.Equal(ErrRollupBoostPartiallyHealthy, healthFailure)
519+
s.Nil(fourthHealthStatus)
520+
s.Nil(fifthHealthStatus)
521+
s.Equal(ErrRollupBoostPartiallyHealthy, sixthHealthStatus)
527522

528523
s.NoError(monitor.Stop())
529524
}

op-conductor/health/timeboundcounter.go

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,31 +5,29 @@ import (
55
"sync"
66
)
77

8-
// this is a type of counter which keeps on incrementing until its reset interval is hit
8+
// this is a type of counter which keeps on incrementing until its reset interval is hit after which it resets to 0
99
// this can be used to track time-based rate-limit, error counts, etc.
1010
type timeBoundedRotatingCounter struct {
1111
resetIntervalSeconds uint64
12-
maxValue uint64
1312
timeProviderFn func() uint64
1413

1514
mut *sync.RWMutex
1615
temporalCache map[int64]uint64
1716
}
1817

19-
func NewTimeBoundedRotatingCounter(resetIntervalSeconds, maxValue uint64) (*timeBoundedRotatingCounter, error) {
18+
func NewTimeBoundedRotatingCounter(resetIntervalSeconds uint64) (*timeBoundedRotatingCounter, error) {
2019
if resetIntervalSeconds == 0 {
2120
return nil, fmt.Errorf("reset interval seconds must be more than 0")
2221
}
2322
return &timeBoundedRotatingCounter{
2423
resetIntervalSeconds: resetIntervalSeconds,
25-
maxValue: maxValue,
2624
mut: &sync.RWMutex{},
2725
temporalCache: map[int64]uint64{},
2826
timeProviderFn: currentTimeProvider,
2927
}, nil
3028
}
3129

32-
func (t *timeBoundedRotatingCounter) Increment() (uint64, error) {
30+
func (t *timeBoundedRotatingCounter) Increment() uint64 {
3331
// let's take `resetIntervalSeconds` as 60s
3432
// truncatedTimestamp is current timestamp rounded off by 60s (resetIntervalSeconds)
3533
// thereby generating a value which stays same until the next 60s helping track and incrementing the counter corresponding to it for the next 60s
@@ -47,11 +45,8 @@ func (t *timeBoundedRotatingCounter) Increment() (uint64, error) {
4745
}
4846
}()
4947

50-
if t.maxValue == 0 || t.temporalCache[truncatedTimestamp] < t.maxValue {
51-
t.temporalCache[truncatedTimestamp]++
52-
return t.temporalCache[truncatedTimestamp], nil
53-
}
54-
return t.maxValue, fmt.Errorf("counter at its max value, please wait %ds for it to be reset", (t.resetIntervalSeconds - (currentTsSeconds % t.resetIntervalSeconds)))
48+
t.temporalCache[truncatedTimestamp]++
49+
return t.temporalCache[truncatedTimestamp]
5550
}
5651

5752
func (t *timeBoundedRotatingCounter) CurrentValue() uint64 {

op-conductor/health/timeboundcounter_test.go

Lines changed: 18 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,13 @@ import (
1010
func TestTimeBoundedRotatingCounterSetup(t *testing.T) {
1111
t.Parallel()
1212
t.Run("fail with 0 interval seconds value", func(t *testing.T) {
13-
counter, err := NewTimeBoundedRotatingCounter(0, 0)
13+
counter, err := NewTimeBoundedRotatingCounter(0)
1414
require.Error(t, err)
1515
require.Nil(t, counter)
1616
})
1717

18-
t.Run("succeed with 0 max value", func(t *testing.T) {
19-
counter, err := NewTimeBoundedRotatingCounter(2, 0)
18+
t.Run("succeed with non-zero interval seconds value", func(t *testing.T) {
19+
counter, err := NewTimeBoundedRotatingCounter(2)
2020
require.NoError(t, err)
2121
require.NotNil(t, counter)
2222
})
@@ -26,8 +26,8 @@ func TestTimeBoundedRotatingCounterIncrement(t *testing.T) {
2626

2727
mockTimeProvider := &timeProvider{now: 0} // every access to .Now() will increment its value simulating a one-second time passing
2828

29-
maxValue, resetInterval := uint64(2), uint64(6)
30-
counter, err := NewTimeBoundedRotatingCounter(resetInterval, maxValue)
29+
resetInterval := uint64(6)
30+
counter, err := NewTimeBoundedRotatingCounter(resetInterval)
3131
require.NoError(t, err)
3232
require.NotNil(t, counter)
3333
counter.timeProviderFn = mockTimeProvider.Now
@@ -36,44 +36,38 @@ func TestTimeBoundedRotatingCounterIncrement(t *testing.T) {
3636
require.Equal(t, uint64(0), counter.CurrentValue())
3737
require.Equal(t, int(mockTimeProvider.now), 1)
3838

39-
newValue, err := counter.Increment()
40-
require.NoError(t, err)
39+
newValue := counter.Increment()
4140
require.Equal(t, uint64(1), newValue)
4241
require.Equal(t, int(mockTimeProvider.now), 2)
4342
require.Equal(t, uint64(1), counter.CurrentValue())
4443
require.Equal(t, int(mockTimeProvider.now), 3)
4544

46-
newValue, err = counter.Increment()
47-
require.NoError(t, err)
45+
newValue = counter.Increment()
4846
require.Equal(t, uint64(2), newValue)
4947
require.Equal(t, int(mockTimeProvider.now), 4)
5048
require.Equal(t, uint64(2), counter.CurrentValue())
5149
require.Equal(t, int(mockTimeProvider.now), 5)
5250

53-
newValue, err = counter.Increment()
54-
require.Error(t, err)
55-
require.Equal(t, uint64(2), newValue)
51+
newValue = counter.Increment()
52+
require.Equal(t, uint64(3), newValue)
5653
require.Equal(t, int(mockTimeProvider.now), 6)
5754
require.Equal(t, uint64(0), counter.CurrentValue()) // the next second counter rotates returning 0 as the current value
5855
require.Equal(t, int(mockTimeProvider.now), 7)
5956

60-
newValue, err = counter.Increment()
61-
require.NoError(t, err)
57+
newValue = counter.Increment()
6258
require.Equal(t, uint64(1), newValue)
6359
require.Equal(t, int(mockTimeProvider.now), 8)
6460
require.Equal(t, uint64(1), counter.CurrentValue())
6561
require.Equal(t, int(mockTimeProvider.now), 9)
6662

67-
newValue, err = counter.Increment()
68-
require.NoError(t, err)
63+
newValue = counter.Increment()
6964
require.Equal(t, uint64(2), newValue)
7065
require.Equal(t, int(mockTimeProvider.now), 10)
7166
require.Equal(t, uint64(2), counter.CurrentValue())
7267
require.Equal(t, int(mockTimeProvider.now), 11)
7368

74-
newValue, err = counter.Increment()
75-
require.Error(t, err)
76-
require.Equal(t, uint64(2), newValue)
69+
newValue = counter.Increment()
70+
require.Equal(t, uint64(3), newValue)
7771
require.Equal(t, int(mockTimeProvider.now), 12)
7872
require.Equal(t, uint64(0), counter.CurrentValue()) // the next second counter rotates returning 0 as the current value
7973
require.Equal(t, int(mockTimeProvider.now), 13)
@@ -85,7 +79,7 @@ func TestTimeBoundedRotatingCounterIncrement(t *testing.T) {
8579
func TestTimeBoundedRotatingCounterConcurrentAccess(t *testing.T) {
8680
mockTimeProvider := &timeProvider{now: 0}
8781

88-
counter, err := NewTimeBoundedRotatingCounter(1, 9)
82+
counter, err := NewTimeBoundedRotatingCounter(1)
8983
require.NoError(t, err)
9084
require.NotNil(t, counter)
9185
counter.timeProviderFn = mockTimeProvider.Now
@@ -95,8 +89,7 @@ func TestTimeBoundedRotatingCounterConcurrentAccess(t *testing.T) {
9589

9690
write := func() {
9791
defer wg.Done()
98-
_, err := counter.Increment()
99-
require.NoError(t, err) // considering the max value is 9, the increment should never fail
92+
counter.Increment()
10093
}
10194
read := func() {
10295
defer wg.Done()
@@ -116,21 +109,19 @@ func TestTimeBoundedRotatingCounterLazyCleanup(t *testing.T) {
116109

117110
// a counter with a reset interval of 2 ensuring every two-seconds the counter's cache would track a new key:value
118111
// we'll trigger the 2-second increment by calling .Increment() and .CurrentValue() because both under the hood, would call .Now() of the mockTimeProvider
119-
counter, err := NewTimeBoundedRotatingCounter(2, 9)
112+
counter, err := NewTimeBoundedRotatingCounter(2)
120113
require.NoError(t, err)
121114
require.NotNil(t, counter)
122115
counter.timeProviderFn = mockTimeProvider.Now
123116

124117
for i := 0; i < 1000; i++ {
125-
_, err := counter.Increment() // trigger a 1-second time increase
126-
require.NoError(t, err)
118+
counter.Increment() // trigger a 1-second time increase
127119
counter.CurrentValue() // trigger another 1-second time increase, causing the counter interval to reset ensuring next Increment would write a new key in the cache
128120
}
129121

130122
require.Equal(t, 1000, len(counter.temporalCache))
131123

132124
// 1001th increment should trigger the lazy cleanup this time
133-
_, err = counter.Increment()
134-
require.NoError(t, err)
125+
counter.Increment()
135126
require.Equal(t, 1, len(counter.temporalCache))
136127
}

0 commit comments

Comments
 (0)