Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

client: introduce circuit breaker for region calls #8856

Merged
merged 11 commits into from
Dec 10, 2024
302 changes: 302 additions & 0 deletions client/circuitbreaker/circuit_breaker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,302 @@
// Copyright 2024 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package circuitbreaker

import (
"fmt"
"strings"
"sync"
"time"

"github.com/tikv/pd/client/errs"

"github.com/prometheus/client_golang/prometheus"
m "github.com/tikv/pd/client/metrics"
"go.uber.org/zap"

"github.com/pingcap/log"
)

// Overloading is a type describing service return value
type Overloading bool

const (
// No means the service is not overloaded
No = false
// Yes means the service is overloaded
Yes = true
)

// Settings describes configuration for Circuit Breaker
type Settings struct {
// Defines the error rate threshold to trip the circuit breaker.
ErrorRateThresholdPct uint32
// Defines the average qps over the `error_rate_window` that must be met before evaluating the error rate threshold.
MinQPSForOpen uint32
// Defines how long to track errors before evaluating error_rate_threshold.
ErrorRateWindow time.Duration
// Defines how long to wait after circuit breaker is open before go to half-open state to send a probe request.
CoolDownInterval time.Duration
// Defines how many subsequent requests to test after cooldown period before fully close the circuit.
HalfOpenSuccessCount uint32
}

// AlwaysClosedSettings is a configuration that never trips the circuit breaker.
var AlwaysClosedSettings = Settings{
ErrorRateThresholdPct: 0, // never trips
ErrorRateWindow: 10 * time.Second, // effectively results in testing for new settings every 10 seconds
MinQPSForOpen: 10,
CoolDownInterval: 10 * time.Second,
HalfOpenSuccessCount: 1,
}

// CircuitBreaker is a state machine to prevent sending requests that are likely to fail.
type CircuitBreaker[T any] struct {
config *Settings
name string

mutex sync.Mutex
state *State[T]

successCounter prometheus.Counter
errorCounter prometheus.Counter
overloadCounter prometheus.Counter
fastFailCounter prometheus.Counter
}

// StateType is a type that represents a state of CircuitBreaker.
type StateType int

// States of CircuitBreaker.
const (
StateClosed StateType = iota
StateOpen
StateHalfOpen
)

// String implements stringer interface.
func (s StateType) String() string {
switch s {
case StateClosed:
return "closed"
case StateOpen:
return "open"
case StateHalfOpen:
return "half-open"
default:
return fmt.Sprintf("unknown state: %d", s)

Check warning on line 98 in client/circuitbreaker/circuit_breaker.go

View check run for this annotation

Codecov / codecov/patch

client/circuitbreaker/circuit_breaker.go#L89-L98

Added lines #L89 - L98 were not covered by tests
}
}

var replacer = strings.NewReplacer(" ", "_", "-", "_")

// NewCircuitBreaker returns a new CircuitBreaker configured with the given Settings.
func NewCircuitBreaker[T any](name string, st Settings) *CircuitBreaker[T] {
cb := new(CircuitBreaker[T])
cb.name = name
cb.config = &st
cb.state = cb.newState(time.Now(), StateClosed)

metricName := replacer.Replace(name)
cb.successCounter = m.CircuitBreakerCounters.WithLabelValues(metricName, "success")
cb.errorCounter = m.CircuitBreakerCounters.WithLabelValues(metricName, "error")
cb.overloadCounter = m.CircuitBreakerCounters.WithLabelValues(metricName, "overload")
cb.fastFailCounter = m.CircuitBreakerCounters.WithLabelValues(metricName, "fast_fail")
return cb
}

// ChangeSettings changes the CircuitBreaker settings.
// The changes will be reflected only in the next evaluation window.
func (cb *CircuitBreaker[T]) ChangeSettings(apply func(config *Settings)) {
cb.mutex.Lock()
defer cb.mutex.Unlock()

apply(cb.config)
}

// Execute calls the given function if the CircuitBreaker is closed and returns the result of execution.
// Execute returns an error instantly if the CircuitBreaker is open.
// https://github.com/tikv/rfcs/blob/master/text/0115-circuit-breaker.md
func (cb *CircuitBreaker[T]) Execute(call func() (T, Overloading, error)) (T, error) {
state, err := cb.onRequest()
if err != nil {
cb.fastFailCounter.Inc()
var defaultValue T
return defaultValue, err
}

defer func() {
e := recover()
if e != nil {
cb.emitMetric(Yes, err)
cb.onResult(state, Yes)
panic(e)

Check warning on line 144 in client/circuitbreaker/circuit_breaker.go

View check run for this annotation

Codecov / codecov/patch

client/circuitbreaker/circuit_breaker.go#L142-L144

Added lines #L142 - L144 were not covered by tests
}
}()

result, overloaded, err := call()
cb.emitMetric(overloaded, err)
cb.onResult(state, overloaded)
return result, err
}

func (cb *CircuitBreaker[T]) onRequest() (*State[T], error) {
cb.mutex.Lock()
defer cb.mutex.Unlock()

state, err := cb.state.onRequest(cb)
cb.state = state
return state, err
}

func (cb *CircuitBreaker[T]) onResult(state *State[T], overloaded Overloading) {
cb.mutex.Lock()
defer cb.mutex.Unlock()

// even if the circuit breaker already moved to a new state while the request was in progress,
// it is still ok to update the old state, but it is not relevant anymore
state.onResult(overloaded)
}

func (cb *CircuitBreaker[T]) emitMetric(overloaded Overloading, err error) {
switch overloaded {
case No:
cb.successCounter.Inc()
case Yes:
cb.overloadCounter.Inc()
default:
panic("unknown state")

Check warning on line 179 in client/circuitbreaker/circuit_breaker.go

View check run for this annotation

Codecov / codecov/patch

client/circuitbreaker/circuit_breaker.go#L178-L179

Added lines #L178 - L179 were not covered by tests
}
if err != nil {
cb.errorCounter.Inc()
}
}

// State represents the state of CircuitBreaker.
type State[T any] struct {
stateType StateType
cb *CircuitBreaker[T]
end time.Time

pendingCount uint32
successCount uint32
failureCount uint32
}

// newState creates a new State with the given configuration and reset all success/failure counters.
func (cb *CircuitBreaker[T]) newState(now time.Time, stateType StateType) *State[T] {
var end time.Time
var pendingCount uint32
switch stateType {
case StateClosed:
end = now.Add(cb.config.ErrorRateWindow)
case StateOpen:
end = now.Add(cb.config.CoolDownInterval)
case StateHalfOpen:
// we transition to HalfOpen state on the first request after the cooldown period,
// so we start with 1 pending request
pendingCount = 1
default:
panic("unknown state")

Check warning on line 211 in client/circuitbreaker/circuit_breaker.go

View check run for this annotation

Codecov / codecov/patch

client/circuitbreaker/circuit_breaker.go#L210-L211

Added lines #L210 - L211 were not covered by tests
}
return &State[T]{
cb: cb,
stateType: stateType,
pendingCount: pendingCount,
end: end,
}
}

// onRequest transitions the state to the next state based on the current state and the previous requests results
// The implementation represents a state machine for CircuitBreaker
// All state transitions happens at the request evaluation time only
// Circuit breaker start with a closed state, allows all requests to pass through and always lasts for a fixed duration of `Settings.ErrorRateWindow`.
// If `Settings.ErrorRateThresholdPct` is breached at the end of the window, then it moves to Open state, otherwise it moves to a new Closed state with a new window.
// Open state fails all request, it has a fixed duration of `Settings.CoolDownInterval` and always moves to HalfOpen state at the end of the interval.
// HalfOpen state does not have a fixed duration and lasts till `Settings.HalfOpenSuccessCount` are evaluated.
// If any of `Settings.HalfOpenSuccessCount` fails then it moves back to Open state, otherwise it moves to Closed state.
func (s *State[T]) onRequest(cb *CircuitBreaker[T]) (*State[T], error) {
var now = time.Now()
switch s.stateType {
case StateClosed:
if now.After(s.end) {
// ErrorRateWindow is over, let's evaluate the error rate
if s.cb.config.ErrorRateThresholdPct > 0 { // otherwise circuit breaker is disabled
total := s.failureCount + s.successCount
if total > 0 {
observedErrorRatePct := s.failureCount * 100 / total
if total >= uint32(s.cb.config.ErrorRateWindow.Seconds())*s.cb.config.MinQPSForOpen && observedErrorRatePct >= s.cb.config.ErrorRateThresholdPct {
// the error threshold is breached, let's move to open state and start failing all requests
log.Error("Circuit breaker tripped. Starting to fail all requests",
zap.String("name", cb.name),
zap.Uint32("observedErrorRatePct", observedErrorRatePct),
zap.String("config", fmt.Sprintf("%+v", cb.config)))
return cb.newState(now, StateOpen), errs.ErrCircuitBreakerOpen
}
}
}
// the error threshold is not breached or there were not enough requests to evaluate it,
// continue in the closed state and allow all requests
return cb.newState(now, StateClosed), nil
Tema marked this conversation as resolved.
Show resolved Hide resolved
}
// continue in closed state till ErrorRateWindow is over
return s, nil
case StateOpen:
if now.After(s.end) {
// CoolDownInterval is over, it is time to transition to half-open state
log.Info("Circuit breaker cooldown period is over. Transitioning to half-open state to test the service",
zap.String("name", cb.name),
zap.String("config", fmt.Sprintf("%+v", cb.config)))
return cb.newState(now, StateHalfOpen), nil
} else {
// continue in the open state till CoolDownInterval is over
return s, errs.ErrCircuitBreakerOpen
}

Check warning on line 265 in client/circuitbreaker/circuit_breaker.go

View check run for this annotation

Codecov / codecov/patch

client/circuitbreaker/circuit_breaker.go#L263-L265

Added lines #L263 - L265 were not covered by tests
case StateHalfOpen:
// do we need some expire time here in case of one of pending requests is stuck forever?
if s.failureCount > 0 {
// there were some failures during half-open state, let's go back to open state to wait a bit longer
log.Error("Circuit breaker goes from half-open to open again as errors persist and continue to fail all requests",
zap.String("name", cb.name),
zap.String("config", fmt.Sprintf("%+v", cb.config)))
return cb.newState(now, StateOpen), errs.ErrCircuitBreakerOpen
} else if s.successCount == s.cb.config.HalfOpenSuccessCount {
// all probe requests are succeeded, we can move to closed state and allow all requests
log.Info("Circuit breaker is closed. Start allowing all requests",
zap.String("name", cb.name),
zap.String("config", fmt.Sprintf("%+v", cb.config)))
return cb.newState(now, StateClosed), nil
} else if s.pendingCount < s.cb.config.HalfOpenSuccessCount {
// allow more probe requests and continue in half-open state
s.pendingCount++
return s, nil
} else {
// continue in half-open state till all probe requests are done and fail all other requests for now
return s, errs.ErrCircuitBreakerOpen
}
default:
panic("unknown state")

Check warning on line 289 in client/circuitbreaker/circuit_breaker.go

View check run for this annotation

Codecov / codecov/patch

client/circuitbreaker/circuit_breaker.go#L288-L289

Added lines #L288 - L289 were not covered by tests
}
}

func (s *State[T]) onResult(overloaded Overloading) {
switch overloaded {
case No:
s.successCount++
case Yes:
s.failureCount++
default:
panic("unknown state")

Check warning on line 300 in client/circuitbreaker/circuit_breaker.go

View check run for this annotation

Codecov / codecov/patch

client/circuitbreaker/circuit_breaker.go#L299-L300

Added lines #L299 - L300 were not covered by tests
}
}
Loading
Loading