Skip to content

Commit d8b8edc

Browse files
committed
potential bug fixes
1 parent ecfdb18 commit d8b8edc

File tree

3 files changed

+183
-41
lines changed

3 files changed

+183
-41
lines changed

hitless/circuit_breaker.go

Lines changed: 124 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -80,24 +80,7 @@ func newCircuitBreaker(endpoint string, config *Config) *CircuitBreaker {
8080
// IsOpen returns true if the circuit breaker is open (rejecting requests)
8181
func (cb *CircuitBreaker) IsOpen() bool {
8282
state := CircuitBreakerState(cb.state.Load())
83-
84-
if state == CircuitBreakerOpen {
85-
// Check if we should transition to half-open
86-
if cb.shouldAttemptReset() {
87-
if cb.state.CompareAndSwap(int32(CircuitBreakerOpen), int32(CircuitBreakerHalfOpen)) {
88-
cb.requests.Store(0)
89-
cb.successes.Store(0)
90-
if cb.config != nil && cb.config.LogLevel.InfoOrAbove() {
91-
internal.Logger.Printf(context.Background(),
92-
"hitless: circuit breaker for %s transitioning to half-open", cb.endpoint)
93-
}
94-
return false // Now in half-open state, allow requests
95-
}
96-
}
97-
return true // Still open
98-
}
99-
100-
return false
83+
return state == CircuitBreakerOpen
10184
}
10285

10386
// shouldAttemptReset checks if enough time has passed to attempt reset
@@ -108,23 +91,36 @@ func (cb *CircuitBreaker) shouldAttemptReset() bool {
10891

10992
// Execute runs the given function with circuit breaker protection
11093
func (cb *CircuitBreaker) Execute(fn func() error) error {
111-
// Fast path: if circuit is open, fail immediately
112-
if cb.IsOpen() {
113-
return ErrCircuitBreakerOpen
114-
}
115-
94+
// Single atomic state load for consistency
11695
state := CircuitBreakerState(cb.state.Load())
11796

118-
// In half-open state, limit the number of requests
119-
if state == CircuitBreakerHalfOpen {
97+
switch state {
98+
case CircuitBreakerOpen:
99+
if cb.shouldAttemptReset() {
100+
// Attempt transition to half-open
101+
if cb.state.CompareAndSwap(int32(CircuitBreakerOpen), int32(CircuitBreakerHalfOpen)) {
102+
cb.requests.Store(0)
103+
cb.successes.Store(0)
104+
state = CircuitBreakerHalfOpen // Update local state
105+
if cb.config != nil && cb.config.LogLevel.InfoOrAbove() {
106+
internal.Logger.Printf(context.Background(),
107+
"hitless: circuit breaker for %s transitioning to half-open", cb.endpoint)
108+
}
109+
} else {
110+
return ErrCircuitBreakerOpen
111+
}
112+
} else {
113+
return ErrCircuitBreakerOpen
114+
}
115+
case CircuitBreakerHalfOpen:
120116
requests := cb.requests.Add(1)
121117
if requests > int64(cb.maxRequests) {
122118
cb.requests.Add(-1) // Revert the increment
123119
return ErrCircuitBreakerOpen
124120
}
125121
}
126122

127-
// Execute the function
123+
// Execute the function with consistent state
128124
err := fn()
129125

130126
if err != nil {
@@ -221,46 +217,136 @@ type CircuitBreakerStats struct {
221217
LastSuccessTime time.Time
222218
}
223219

220+
// CircuitBreakerEntry wraps a circuit breaker with access tracking
221+
type CircuitBreakerEntry struct {
222+
breaker *CircuitBreaker
223+
lastAccess atomic.Int64 // Unix timestamp
224+
created time.Time
225+
}
226+
224227
// CircuitBreakerManager manages circuit breakers for multiple endpoints
225228
type CircuitBreakerManager struct {
226-
breakers sync.Map // map[string]*CircuitBreaker
227-
config *Config
229+
breakers sync.Map // map[string]*CircuitBreakerEntry
230+
config *Config
231+
cleanupStop chan struct{}
232+
cleanupMu sync.Mutex
233+
lastCleanup atomic.Int64 // Unix timestamp
228234
}
229235

230236
// newCircuitBreakerManager creates a new circuit breaker manager
231237
func newCircuitBreakerManager(config *Config) *CircuitBreakerManager {
232-
return &CircuitBreakerManager{
233-
config: config,
238+
cbm := &CircuitBreakerManager{
239+
config: config,
240+
cleanupStop: make(chan struct{}),
234241
}
242+
cbm.lastCleanup.Store(time.Now().Unix())
243+
244+
// Start background cleanup goroutine
245+
go cbm.cleanupLoop()
246+
247+
return cbm
235248
}
236249

237250
// GetCircuitBreaker returns the circuit breaker for an endpoint, creating it if necessary
238251
func (cbm *CircuitBreakerManager) GetCircuitBreaker(endpoint string) *CircuitBreaker {
239-
if breaker, ok := cbm.breakers.Load(endpoint); ok {
240-
return breaker.(*CircuitBreaker)
252+
now := time.Now().Unix()
253+
254+
if entry, ok := cbm.breakers.Load(endpoint); ok {
255+
cbEntry := entry.(*CircuitBreakerEntry)
256+
cbEntry.lastAccess.Store(now)
257+
return cbEntry.breaker
241258
}
242259

243-
// Create new circuit breaker
260+
// Create new circuit breaker with metadata
244261
newBreaker := newCircuitBreaker(endpoint, cbm.config)
245-
actual, _ := cbm.breakers.LoadOrStore(endpoint, newBreaker)
246-
return actual.(*CircuitBreaker)
262+
newEntry := &CircuitBreakerEntry{
263+
breaker: newBreaker,
264+
created: time.Now(),
265+
}
266+
newEntry.lastAccess.Store(now)
267+
268+
actual, _ := cbm.breakers.LoadOrStore(endpoint, newEntry)
269+
return actual.(*CircuitBreakerEntry).breaker
247270
}
248271

249272
// GetAllStats returns statistics for all circuit breakers
250273
func (cbm *CircuitBreakerManager) GetAllStats() []CircuitBreakerStats {
251274
var stats []CircuitBreakerStats
252275
cbm.breakers.Range(func(key, value interface{}) bool {
253-
breaker := value.(*CircuitBreaker)
254-
stats = append(stats, breaker.GetStats())
276+
entry := value.(*CircuitBreakerEntry)
277+
stats = append(stats, entry.breaker.GetStats())
255278
return true
256279
})
257280
return stats
258281
}
259282

283+
// cleanupLoop runs background cleanup of unused circuit breakers
284+
func (cbm *CircuitBreakerManager) cleanupLoop() {
285+
ticker := time.NewTicker(5 * time.Minute) // Cleanup every 5 minutes
286+
defer ticker.Stop()
287+
288+
for {
289+
select {
290+
case <-ticker.C:
291+
cbm.cleanup()
292+
case <-cbm.cleanupStop:
293+
return
294+
}
295+
}
296+
}
297+
298+
// cleanup removes circuit breakers that haven't been accessed recently
299+
func (cbm *CircuitBreakerManager) cleanup() {
300+
// Prevent concurrent cleanups
301+
if !cbm.cleanupMu.TryLock() {
302+
return
303+
}
304+
defer cbm.cleanupMu.Unlock()
305+
306+
now := time.Now()
307+
cutoff := now.Add(-30 * time.Minute).Unix() // 30 minute TTL
308+
309+
var toDelete []string
310+
count := 0
311+
312+
cbm.breakers.Range(func(key, value interface{}) bool {
313+
endpoint := key.(string)
314+
entry := value.(*CircuitBreakerEntry)
315+
316+
count++
317+
318+
// Remove if not accessed recently
319+
if entry.lastAccess.Load() < cutoff {
320+
toDelete = append(toDelete, endpoint)
321+
}
322+
323+
return true
324+
})
325+
326+
// Delete expired entries
327+
for _, endpoint := range toDelete {
328+
cbm.breakers.Delete(endpoint)
329+
}
330+
331+
// Log cleanup results
332+
if len(toDelete) > 0 && cbm.config != nil && cbm.config.LogLevel.InfoOrAbove() {
333+
internal.Logger.Printf(context.Background(),
334+
"hitless: circuit breaker cleanup removed %d/%d entries", len(toDelete), count)
335+
}
336+
337+
cbm.lastCleanup.Store(now.Unix())
338+
}
339+
340+
// Shutdown stops the cleanup goroutine
341+
func (cbm *CircuitBreakerManager) Shutdown() {
342+
close(cbm.cleanupStop)
343+
}
344+
260345
// Reset resets all circuit breakers (useful for testing)
261346
func (cbm *CircuitBreakerManager) Reset() {
262347
cbm.breakers.Range(func(key, value interface{}) bool {
263-
breaker := value.(*CircuitBreaker)
348+
entry := value.(*CircuitBreakerEntry)
349+
breaker := entry.breaker
264350
breaker.state.Store(int32(CircuitBreakerClosed))
265351
breaker.failures.Store(0)
266352
breaker.successes.Store(0)

hitless/errors.go

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ package hitless
22

33
import (
44
"errors"
5+
"fmt"
6+
"time"
57
)
68

79
// Configuration errors
@@ -54,6 +56,47 @@ var (
5456
ErrCircuitBreakerOpen = errors.New("hitless: circuit breaker is open, failing fast")
5557
)
5658

59+
// CircuitBreakerError provides detailed context for circuit breaker failures
60+
type CircuitBreakerError struct {
61+
Endpoint string
62+
State string
63+
Failures int64
64+
LastFailure time.Time
65+
NextAttempt time.Time
66+
Message string
67+
}
68+
69+
func (e *CircuitBreakerError) Error() string {
70+
if e.NextAttempt.IsZero() {
71+
return fmt.Sprintf("hitless: circuit breaker %s for %s (failures: %d, last: %v): %s",
72+
e.State, e.Endpoint, e.Failures, e.LastFailure, e.Message)
73+
}
74+
return fmt.Sprintf("hitless: circuit breaker %s for %s (failures: %d, last: %v, next attempt: %v): %s",
75+
e.State, e.Endpoint, e.Failures, e.LastFailure, e.NextAttempt, e.Message)
76+
}
77+
78+
// HandoffError provides detailed context for connection handoff failures
79+
type HandoffError struct {
80+
ConnectionID uint64
81+
SourceEndpoint string
82+
TargetEndpoint string
83+
Attempt int
84+
MaxAttempts int
85+
Duration time.Duration
86+
FinalError error
87+
Message string
88+
}
89+
90+
func (e *HandoffError) Error() string {
91+
return fmt.Sprintf("hitless: handoff failed for conn[%d] %s→%s (attempt %d/%d, duration: %v): %s",
92+
e.ConnectionID, e.SourceEndpoint, e.TargetEndpoint,
93+
e.Attempt, e.MaxAttempts, e.Duration, e.Message)
94+
}
95+
96+
func (e *HandoffError) Unwrap() error {
97+
return e.FinalError
98+
}
99+
57100
// circuit breaker configuration errors
58101
var (
59102
ErrInvalidCircuitBreakerFailureThreshold = errors.New("hitless: circuit breaker failure threshold must be >= 1")

hitless/handoff_worker.go

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ func (hwm *handoffWorkerManager) ensureWorkerAvailable() {
100100
// Check if we need a new worker
101101
currentWorkers := hwm.activeWorkers.Load()
102102
workersWas := currentWorkers
103-
for currentWorkers <= int32(hwm.maxWorkers) {
103+
for currentWorkers < int32(hwm.maxWorkers) {
104104
hwm.workerWg.Add(1)
105105
go hwm.onDemandWorker()
106106
currentWorkers++
@@ -122,13 +122,26 @@ func (hwm *handoffWorkerManager) onDemandWorker() {
122122
hwm.workerWg.Done()
123123
}()
124124

125+
// Create reusable timer to prevent timer leaks
126+
timer := time.NewTimer(hwm.workerTimeout)
127+
defer timer.Stop()
128+
125129
for {
130+
// Reset timer for next iteration
131+
if !timer.Stop() {
132+
select {
133+
case <-timer.C:
134+
default:
135+
}
136+
}
137+
timer.Reset(hwm.workerTimeout)
138+
126139
select {
127140
case <-hwm.shutdown:
128141
return
129-
case <-time.After(hwm.workerTimeout):
142+
case <-timer.C:
130143
// Worker has been idle for too long, exit to save resources
131-
if hwm.config != nil && hwm.config.LogLevel.InfoOrAbove() { // Debug level
144+
if hwm.config != nil && hwm.config.LogLevel.InfoOrAbove() {
132145
internal.Logger.Printf(context.Background(),
133146
"hitless: worker exiting due to inactivity timeout (%v)", hwm.workerTimeout)
134147
}

0 commit comments

Comments
 (0)