Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions erst.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ max_trace_depth = 50
# and no Sentry DSN is set. Can also be set via ERST_CRASH_ENDPOINT.
# crash_endpoint = "https://crash.erst.dev/v1/report"

# Circuit Breaker Configuration
# Controls automatic failover behavior when using multiple RPC endpoints
circuit_breaker_threshold = 5 # Number of consecutive failures before opening circuit
circuit_breaker_timeout = 60 # Seconds to wait before attempting reset

# Audit HSM Configuration
# ERST_PKCS11_MODULE = "/usr/lib/softhsm/libsofthsm2.so"
# ERST_PKCS11_MAX_RPM = 1000 # Max requests per minute to protect HSM
Expand Down
8 changes: 8 additions & 0 deletions internal/cmd/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,14 @@ Local WASM Replay Mode:
rpc.WithToken(token),
}

// Load circuit breaker configuration from config file
cfg, err := config.Load()
if err == nil {
if cfg.CircuitBreakerThreshold > 0 && cfg.CircuitBreakerTimeout > 0 {
opts = append(opts, rpc.WithCircuitBreaker(cfg.CircuitBreakerThreshold, time.Duration(cfg.CircuitBreakerTimeout)*time.Second))
}
}

if rpcURLFlag != "" {
urls := strings.Split(rpcURLFlag, ",")
for i := range urls {
Expand Down
58 changes: 58 additions & 0 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,24 @@
CrashSentryDSN string `json:"crash_sentry_dsn,omitempty"`
// RequestTimeout is the HTTP request timeout in seconds for all RPC calls.
RequestTimeout int `json:"request_timeout,omitempty"`
// CircuitBreakerThreshold is the number of consecutive failures before opening the circuit breaker.
// Set via circuit_breaker_threshold in config or ERST_CIRCUIT_BREAKER_THRESHOLD.
// Defaults to 5 failures.
CircuitBreakerThreshold int `json:"circuit_breaker_threshold,omitempty"`
// CircuitBreakerTimeout is the duration in seconds the circuit breaker remains open before resetting.
// Set via circuit_breaker_timeout in config or ERST_CIRCUIT_BREAKER_TIMEOUT.
// Defaults to 60 seconds.
CircuitBreakerTimeout int `json:"circuit_breaker_timeout,omitempty"`
// MaxTraceDepth is the maximum depth of the call tree before it is truncated.
MaxTraceDepth int `json:"max_trace_depth,omitempty"`
// CircuitBreakerThreshold is the number of consecutive failures before opening the circuit breaker.
// Set via circuit_breaker_threshold in config or ERST_CIRCUIT_BREAKER_THRESHOLD.
// Defaults to 5 failures.
CircuitBreakerThreshold int `json:"circuit_breaker_threshold,omitempty"`
// CircuitBreakerTimeout is the duration in seconds the circuit breaker remains open before resetting.
// Set via circuit_breaker_timeout in config or ERST_CIRCUIT_BREAKER_TIMEOUT.
// Defaults to 60 seconds.
CircuitBreakerTimeout int `json:"circuit_breaker_timeout,omitempty"`
}

// -- Constants & Defaults --
Expand Down Expand Up @@ -291,6 +307,48 @@
cfg.RequestTimeout = n
}
}

if v := os.Getenv("ERST_CIRCUIT_BREAKER_THRESHOLD"); v != "" {
n, err := strconv.Atoi(v)
if err == nil && n > 0 {
cfg.CircuitBreakerThreshold = n
}
}
if v := os.Getenv("ERST_CIRCUIT_BREAKER_TIMEOUT"); v != "" {
n, err := strconv.Atoi(v)
if err == nil && n > 0 {
cfg.CircuitBreakerTimeout = n
}
}

switch strings.ToLower(os.Getenv("ERST_CRASH_REPORTING")) {
case "1", "true", "yes":
cfg.CrashReporting = true
case "0", "false", "no":
cfg.CrashReporting = false
}

if urlsEnv := os.Getenv("ERST_RPC_URLS"); urlsEnv != "" {
cfg.RpcUrls = splitAndTrim(urlsEnv)
} else if urlsEnv := os.Getenv("STELLAR_RPC_URLS"); urlsEnv != "" {
cfg.RpcUrls = splitAndTrim(urlsEnv)
}

return nil
}

type fileParser struct{}

func (fileParser) Parse(cfg *Config) error {
paths := []string{
".erst.toml",
filepath.Join(os.ExpandEnv("$HOME"), ".erst.toml"),
"/etc/erst/config.toml",
}

for _, path := range paths {
if err := cfg.loadTOML(path); err == nil {
return nil
if v := os.Getenv("ERST_MAX_TRACE_DEPTH"); v != "" {
if n, err := strconv.Atoi(v); err == nil && n > 0 {
cfg.MaxTraceDepth = n
Expand All @@ -315,7 +373,7 @@

type fileParser struct{}

func (fileParser) Parse(cfg *Config) error {

Check failure on line 376 in internal/config/config.go

View workflow job for this annotation

GitHub Actions / Go Strict Linting

expected ';', found error

Check failure on line 376 in internal/config/config.go

View workflow job for this annotation

GitHub Actions / Validate CI scripts from non-root cwd

expected ';', found error
return cfg.loadFromFile()
}

Expand All @@ -325,7 +383,7 @@

type configDefaultsAssigner struct{}

func (configDefaultsAssigner) Apply(cfg *Config) {

Check failure on line 386 in internal/config/config.go

View workflow job for this annotation

GitHub Actions / Go Strict Linting

expected ';', found '{'

Check failure on line 386 in internal/config/config.go

View workflow job for this annotation

GitHub Actions / Validate CI scripts from non-root cwd

expected ';', found '{'
if cfg.RpcUrl == "" {
cfg.RpcUrl = defaultConfig.RpcUrl
}
Expand Down
40 changes: 40 additions & 0 deletions internal/config/parse.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,22 @@ func loadFromEnv(cfg *Config) error {
cfg.MaxTraceDepth = n
}

if v := os.Getenv("ERST_CIRCUIT_BREAKER_THRESHOLD"); v != "" {
n, err := strconv.Atoi(v)
if err != nil {
return errors.WrapValidationError("ERST_CIRCUIT_BREAKER_THRESHOLD must be an integer")
}
cfg.CircuitBreakerThreshold = n
}

if v := os.Getenv("ERST_CIRCUIT_BREAKER_TIMEOUT"); v != "" {
n, err := strconv.Atoi(v)
if err != nil {
return errors.WrapValidationError("ERST_CIRCUIT_BREAKER_TIMEOUT must be an integer")
}
cfg.CircuitBreakerTimeout = n
}

switch strings.ToLower(os.Getenv("ERST_CRASH_REPORTING")) {
case "":
case "1", "true", "yes":
Expand Down Expand Up @@ -183,6 +199,18 @@ func (c *Config) parseTOML(content string) error {
return errors.WrapValidationError("request_timeout must be an integer")
}
c.RequestTimeout = n
case "circuit_breaker_threshold":
n, err := strconv.Atoi(value)
if err != nil {
return errors.WrapValidationError("circuit_breaker_threshold must be an integer")
}
c.CircuitBreakerThreshold = n
case "circuit_breaker_timeout":
n, err := strconv.Atoi(value)
if err != nil {
return errors.WrapValidationError("circuit_breaker_timeout must be an integer")
}
c.CircuitBreakerTimeout = n
case "max_trace_depth":
n, err := strconv.Atoi(value)
if err != nil {
Expand All @@ -195,6 +223,18 @@ func (c *Config) parseTOML(content string) error {
return errors.WrapValidationError("max_cache_size must be a valid size (e.g., 500MB)")
}
c.MaxCacheSize = n
case "circuit_breaker_threshold":
n, err := strconv.Atoi(value)
if err != nil {
return errors.WrapValidationError("circuit_breaker_threshold must be an integer")
}
c.CircuitBreakerThreshold = n
case "circuit_breaker_timeout":
n, err := strconv.Atoi(value)
if err != nil {
return errors.WrapValidationError("circuit_breaker_timeout must be an integer")
}
c.CircuitBreakerTimeout = n
}
}

Expand Down
71 changes: 68 additions & 3 deletions internal/rpc/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,26 @@ import (
"time"

"github.com/dotandev/hintents/internal/errors"
"github.com/dotandev/hintents/internal/telemetry/methods"
"github.com/stellar/go-stellar-sdk/clients/horizonclient"
)

type ClientOption func(*clientBuilder) error

type clientBuilder struct {
network Network
token string
horizonURL string
sorobanURL string
altURLs []string
cacheEnabled bool
methodTelemetry methods.MethodTelemetry
config *NetworkConfig
httpClient *http.Client
requestTimeout time.Duration
circuitBreakerThreshold int
circuitBreakerTimeout time.Duration
middlewares []Middleware
network Network
token string
horizonURL string
Expand All @@ -36,7 +50,7 @@ func newBuilder() *clientBuilder {
return &clientBuilder{
network: Mainnet,
cacheEnabled: true,
methodTelemetry: defaultMethodTelemetry(),
methodTelemetry: methods.DefaultMethodTelemetry(),
requestTimeout: defaultHTTPTimeout,
}
}
Expand Down Expand Up @@ -137,16 +151,34 @@ func WithHTTPClient(client HTTPClient) ClientOption {

// WithMethodTelemetry injects an optional telemetry hook for SDK method timings.
// If nil is provided, a no-op implementation is used.
func WithMethodTelemetry(telemetry MethodTelemetry) ClientOption {
func WithMethodTelemetry(telemetry methods.MethodTelemetry) ClientOption {
return func(b *clientBuilder) error {
if telemetry == nil {
telemetry = defaultMethodTelemetry()
telemetry = methods.DefaultMethodTelemetry()
}
b.methodTelemetry = telemetry
return nil
}
}

// WithCircuitBreaker configures the circuit breaker thresholds for RPC client failover.
// failureThreshold is the number of consecutive failures before opening the circuit.
// timeout is the duration to wait before attempting to reset the circuit breaker.
// Both values must be positive, otherwise an error is returned.
func WithCircuitBreaker(failureThreshold int, timeout time.Duration) ClientOption {
return func(b *clientBuilder) error {
if failureThreshold <= 0 {
return fmt.Errorf("circuit breaker failure threshold must be positive")
}
if timeout <= 0 {
return fmt.Errorf("circuit breaker timeout must be positive")
}
b.circuitBreakerThreshold = failureThreshold
b.circuitBreakerTimeout = timeout
return nil
}
}

func WithMiddleware(middlewares ...Middleware) ClientOption {
return func(b *clientBuilder) error {
b.middlewares = append(b.middlewares, middlewares...)
Expand Down Expand Up @@ -258,12 +290,45 @@ func (b *clientBuilder) build() (*Client, error) {
b.httpClient = createHTTPClient(b.token, b.requestTimeout, mws...)
}

if len(b.altURLs) == 0 && b.horizonURL != "" {
b.altURLs = []string{b.horizonURL}
}

if b.horizonURL == "" {
b.horizonURL = b.config.HorizonURL
}

if len(b.altURLs) == 0 {
b.altURLs = []string{b.horizonURL}
}

// Set default circuit breaker values if not configured
if b.circuitBreakerThreshold == 0 {
b.circuitBreakerThreshold = 5 // Default: 5 consecutive failures
}
if b.circuitBreakerTimeout == 0 {
b.circuitBreakerTimeout = 60 * time.Second // Default: 60 seconds
}

return &Client{
HorizonURL: b.horizonURL,
Horizon: &horizonclient.Client{
HorizonURL: b.horizonURL,
HTTP: b.httpClient,
},
Network: b.network,
SorobanURL: b.sorobanURL,
AltURLs: b.altURLs,
httpClient: b.httpClient,
token: b.token,
Config: *b.config,
CacheEnabled: b.cacheEnabled,
methodTelemetry: b.methodTelemetry,
failures: make(map[string]int),
lastFailure: make(map[string]time.Time),
middlewares: b.middlewares,
circuitBreakerThreshold: b.circuitBreakerThreshold,
circuitBreakerTimeout: b.circuitBreakerTimeout,
Network: b.network,
SorobanURL: b.sorobanURL,
AltURLs: b.altURLs,
Expand Down
50 changes: 50 additions & 0 deletions internal/rpc/builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package rpc
import (
"net/http"
"testing"
"time"

"github.com/stellar/go-stellar-sdk/clients/horizonclient"
)
Expand Down Expand Up @@ -49,6 +50,55 @@ func TestWithInvalidHorizonURL(t *testing.T) {
}
}

func TestWithCircuitBreaker(t *testing.T) {
client, err := NewClient(
WithNetwork(Testnet),
WithCircuitBreaker(10, 120*time.Second),
)
if err != nil {
t.Fatalf("expected no error, got %v", err)
}
if client.circuitBreakerThreshold != 10 {
t.Errorf("expected circuit breaker threshold 10, got %d", client.circuitBreakerThreshold)
}
if client.circuitBreakerTimeout != 120*time.Second {
t.Errorf("expected circuit breaker timeout 120s, got %v", client.circuitBreakerTimeout)
}
}

func TestWithCircuitBreaker_InvalidThreshold(t *testing.T) {
_, err := NewClient(WithCircuitBreaker(0, 60*time.Second))
if err == nil {
t.Fatal("expected error for zero threshold")
}
if err.Error() != "circuit breaker failure threshold must be positive" {
t.Errorf("unexpected error message: %v", err)
}
}

func TestWithCircuitBreaker_InvalidTimeout(t *testing.T) {
_, err := NewClient(WithCircuitBreaker(5, 0))
if err == nil {
t.Fatal("expected error for zero timeout")
}
if err.Error() != "circuit breaker timeout must be positive" {
t.Errorf("unexpected error message: %v", err)
}
}

func TestWithCircuitBreaker_Defaults(t *testing.T) {
client, err := NewClient(WithNetwork(Testnet))
if err != nil {
t.Fatalf("expected no error, got %v", err)
}
if client.circuitBreakerThreshold != 5 {
t.Errorf("expected default circuit breaker threshold 5, got %d", client.circuitBreakerThreshold)
}
if client.circuitBreakerTimeout != 60*time.Second {
t.Errorf("expected default circuit breaker timeout 60s, got %v", client.circuitBreakerTimeout)
}
}

func TestWithAltURLs(t *testing.T) {
urls := []string{"https://horizon-testnet.stellar.org/", "https://horizon-futurenet.stellar.org/"}
client, err := NewClient(WithAltURLs(urls))
Expand Down
Loading
Loading