Skip to content

Commit

Permalink
fix: remove upstream read lock during forward loop
Browse files Browse the repository at this point in the history
  • Loading branch information
aramalipoor committed Aug 29, 2024
1 parent e834476 commit c6922da
Show file tree
Hide file tree
Showing 9 changed files with 69 additions and 74 deletions.
29 changes: 12 additions & 17 deletions common/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ var NewErrRequestTimeout = func(timeout time.Duration) error {
}

func (e *ErrRequestTimeout) ErrorStatusCode() int {
return http.StatusRequestTimeout
return http.StatusGatewayTimeout
}

type ErrInternalServerError struct{ BaseError }
Expand Down Expand Up @@ -512,18 +512,15 @@ var NewErrUpstreamClientInitialization = func(cause error, upstreamId string) er

type ErrUpstreamRequest struct{ BaseError }

var NewErrUpstreamRequest = func(cause error, prjId, netId, upsId, method string, duration time.Duration, attempts, retries, hedges int) error {
var NewErrUpstreamRequest = func(cause error, upsId string, duration time.Duration, attempts, retries, hedges int) error {
return &ErrUpstreamRequest{
BaseError{
Code: "ErrUpstreamRequest",
Message: "failed to make request to upstream",
Cause: cause,
Details: map[string]interface{}{
"durationMs": duration.Milliseconds(),
"projectId": prjId,
"networkId": netId,
"upstreamId": upsId,
"method": method,
"attempts": attempts,
"retries": retries,
"hedges": hedges,
Expand Down Expand Up @@ -687,7 +684,7 @@ func (e *ErrUpstreamsExhausted) SummarizeCauses() string {
timeout := 0
serverError := 0
rateLimit := 0
down := 0
cbOpen := 0
billing := 0
other := 0
cancelled := 0
Expand All @@ -706,7 +703,7 @@ func (e *ErrUpstreamsExhausted) SummarizeCauses() string {
billing++
continue
} else if HasErrorCode(e, ErrCodeFailsafeCircuitBreakerOpen) {
down++
cbOpen++
continue
} else if errors.Is(e, context.DeadlineExceeded) || HasErrorCode(e, ErrCodeEndpointRequestTimeout) {
timeout++
Expand Down Expand Up @@ -738,20 +735,20 @@ func (e *ErrUpstreamsExhausted) SummarizeCauses() string {
if rateLimit > 0 {
reasons = append(reasons, fmt.Sprintf("%d rate limited", rateLimit))
}
if down > 0 {
reasons = append(reasons, fmt.Sprintf("%d down", down))
if cbOpen > 0 {
reasons = append(reasons, fmt.Sprintf("%d circuit breaker open", cbOpen))
}
if billing > 0 {
reasons = append(reasons, fmt.Sprintf("%d billing issues", billing))
}
if other > 0 {
reasons = append(reasons, fmt.Sprintf("%d other errors", other))
}
if cancelled > 0 {
reasons = append(reasons, fmt.Sprintf("%d hedges cancelled", cancelled))
}
if other > 0 {
reasons = append(reasons, fmt.Sprintf("%d other errors", other))
}

return strings.Join(reasons, " + ")
return strings.Join(reasons, ", ")
}

return ""
Expand Down Expand Up @@ -886,16 +883,14 @@ type ErrUpstreamRequestSkipped struct{ BaseError }

const ErrCodeUpstreamRequestSkipped ErrorCode = "ErrUpstreamRequestSkipped"

var NewErrUpstreamRequestSkipped = func(reason error, upstreamId string, req *NormalizedRequest) error {
m, _ := req.Method()
var NewErrUpstreamRequestSkipped = func(reason error, upstreamId string) error {
return &ErrUpstreamRequestSkipped{
BaseError{
Code: ErrCodeUpstreamRequestSkipped,
Message: "skipped forwarding request to upstream",
Cause: reason,
Details: map[string]interface{}{
"upstreamId": upstreamId,
"method": m,
},
},
}
Expand Down Expand Up @@ -1186,7 +1181,7 @@ var NewErrNetworkRequestTimeout = func(duration time.Duration) error {
}

func (e *ErrNetworkRequestTimeout) ErrorStatusCode() int {
return http.StatusRequestTimeout
return http.StatusGatewayTimeout
}

type ErrUpstreamRateLimitRuleExceeded struct{ BaseError }
Expand Down
4 changes: 2 additions & 2 deletions erpc/http_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func TestHttpServer_RaceTimeouts(t *testing.T) {
wg.Wait()

for _, result := range results {
if result.statusCode != http.StatusGatewayTimeout && result.statusCode != http.StatusRequestTimeout {
if result.statusCode != http.StatusGatewayTimeout {
t.Errorf("unexpected status code: %d", result.statusCode)
}
assert.Contains(t, result.body, "Timeout")
Expand Down Expand Up @@ -210,7 +210,7 @@ func TestHttpServer_RaceTimeouts(t *testing.T) {
timeouts := 0
successes := 0
for _, result := range results {
if result.statusCode == http.StatusGatewayTimeout || result.statusCode == http.StatusRequestTimeout {
if result.statusCode == http.StatusGatewayTimeout {
timeouts++
assert.Contains(t, result.body, "Timeout")
} else {
Expand Down
4 changes: 1 addition & 3 deletions erpc/networks.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,8 +162,6 @@ func (n *Network) Forward(ctx context.Context, req *common.NormalizedRequest) (*
}

upsList, err := n.upstreamsRegistry.GetSortedUpstreams(n.NetworkId, method)
n.upstreamsRegistry.RLockUpstreams()
defer n.upstreamsRegistry.RUnlockUpstreams()
if err != nil {
inf.Close(nil, err)
return nil, err
Expand Down Expand Up @@ -257,7 +255,7 @@ func (n *Network) Forward(ctx context.Context, req *common.NormalizedRequest) (*
})

if execErr != nil {
err := upstream.TranslateFailsafeError(execErr)
err := upstream.TranslateFailsafeError("", method, execErr)
// If error is due to empty response be generous and accept it,
// because this means after many retries still no data is available.
if common.HasErrorCode(err, common.ErrCodeFailsafeRetryExceeded) {
Expand Down
30 changes: 17 additions & 13 deletions erpc/networks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3872,7 +3872,7 @@ func TestNetwork_Forward(t *testing.T) {
rateLimitersRegistry,
)

network, err := networksRegistry.RegisterNetwork(
_, err = networksRegistry.RegisterNetwork(
&logger,
&common.ProjectConfig{Id: projectID},
&common.NetworkConfig{
Expand All @@ -3882,7 +3882,7 @@ func TestNetwork_Forward(t *testing.T) {
)
assert.NoError(t, err)

simulateRequests := func(method string, upstreamId string, latency time.Duration) {
mockRequests := func(method string, upstreamId string, latency time.Duration) {
gock.New("http://" + upstreamId + ".localhost").
Persist().
Post("/").
Expand All @@ -3897,15 +3897,15 @@ func TestNetwork_Forward(t *testing.T) {
}

// Upstream A is faster for eth_call, Upstream B is faster for eth_traceTransaction, Upstream C is faster for eth_getLogs
simulateRequests("eth_getLogs", "upstream-a", 200*time.Millisecond)
simulateRequests("eth_getLogs", "upstream-b", 100*time.Millisecond)
simulateRequests("eth_getLogs", "upstream-c", 50*time.Millisecond)
simulateRequests("eth_traceTransaction", "upstream-a", 100*time.Millisecond)
simulateRequests("eth_traceTransaction", "upstream-b", 50*time.Millisecond)
simulateRequests("eth_traceTransaction", "upstream-c", 200*time.Millisecond)
simulateRequests("eth_call", "upstream-a", 50*time.Millisecond)
simulateRequests("eth_call", "upstream-b", 200*time.Millisecond)
simulateRequests("eth_call", "upstream-c", 100*time.Millisecond)
mockRequests("eth_getLogs", "upstream-a", 200*time.Millisecond)
mockRequests("eth_getLogs", "upstream-b", 100*time.Millisecond)
mockRequests("eth_getLogs", "upstream-c", 50*time.Millisecond)
mockRequests("eth_traceTransaction", "upstream-a", 100*time.Millisecond)
mockRequests("eth_traceTransaction", "upstream-b", 50*time.Millisecond)
mockRequests("eth_traceTransaction", "upstream-c", 200*time.Millisecond)
mockRequests("eth_call", "upstream-a", 50*time.Millisecond)
mockRequests("eth_call", "upstream-b", 200*time.Millisecond)
mockRequests("eth_call", "upstream-c", 100*time.Millisecond)

allMethods := []string{"eth_getLogs", "eth_traceTransaction", "eth_call"}

Expand All @@ -3917,14 +3917,18 @@ func TestNetwork_Forward(t *testing.T) {

wg := sync.WaitGroup{}
for _, method := range allMethods {
for i := 0; i < 500; i++ {
for i := 0; i < 1; i++ {
wg.Add(1)
go func(method string) {
defer wg.Done()
upstreamsRegistry.RefreshUpstreamNetworkMethodScores()
req := common.NewNormalizedRequest([]byte(fmt.Sprintf(`{"jsonrpc":"2.0","method":"%s","params":[],"id":1}`, method)))
_, err := network.Forward(ctx, req)
ups, err := upstreamsRegistry.GetSortedUpstreams(networkID, method)
assert.NoError(t, err)
for _, up := range ups {
_, err = up.Forward(ctx, req)
assert.NoError(t, err)
}
}(method)
time.Sleep(1 * time.Millisecond)
}
Expand Down
38 changes: 29 additions & 9 deletions upstream/failsafe.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package upstream

import (
"context"
"errors"
"fmt"
"time"
Expand Down Expand Up @@ -323,7 +324,8 @@ func createTimeoutPolicy(component string, cfg *common.TimeoutPolicyConfig) (fai
return builder.Build(), nil
}

func TranslateFailsafeError(execErr error) error {
func TranslateFailsafeError(upstreamId, method string, execErr error) error {
var err error
var retryExceededErr retrypolicy.ExceededError
if errors.As(execErr, &retryExceededErr) {
ler := retryExceededErr.LastError
Expand All @@ -334,17 +336,35 @@ func TranslateFailsafeError(execErr error) error {
}
var translatedCause error
if ler != nil {
translatedCause = TranslateFailsafeError(ler)
translatedCause = TranslateFailsafeError("", "", ler)
}
return common.NewErrFailsafeRetryExceeded(translatedCause)
err = common.NewErrFailsafeRetryExceeded(translatedCause)
} else if errors.Is(execErr, timeout.ErrExceeded) ||
errors.Is(execErr, context.DeadlineExceeded) {
err = common.NewErrFailsafeTimeoutExceeded(execErr)
} else if errors.Is(execErr, circuitbreaker.ErrOpen) {
err = common.NewErrFailsafeCircuitBreakerOpen(execErr)
}

if errors.Is(execErr, timeout.ErrExceeded) {
return common.NewErrFailsafeTimeoutExceeded(execErr)
}

if errors.Is(execErr, circuitbreaker.ErrOpen) {
return common.NewErrFailsafeCircuitBreakerOpen(execErr)
if err != nil {
if method != "" {
if ser, ok := execErr.(common.StandardError); ok {
be := ser.Base()
if be != nil {
if upstreamId != "" {
be.Details = map[string]interface{}{
"upstreamId": upstreamId,
"method": method,
}
} else {
be.Details = map[string]interface{}{
"method": method,
}
}
}
}
}
return err
}

return execErr
Expand Down
13 changes: 0 additions & 13 deletions upstream/http_json_rpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,13 +119,9 @@ func (c *GenericHttpJsonRpcClient) SendRequest(ctx context.Context, req *common.
startedAt := time.Now()
jrReq, err := req.JsonRpcRequest()
if err != nil {
m, _ := req.Method()
return nil, common.NewErrUpstreamRequest(
err,
c.upstream.ProjectId,
req.NetworkId(),
c.upstream.Config().Id,
m,
0, 0, 0, 0,
)
}
Expand Down Expand Up @@ -213,13 +209,9 @@ func (c *GenericHttpJsonRpcClient) processBatch() {
for _, req := range requests {
jrReq, err := req.request.JsonRpcRequest()
if err != nil {
m, _ := req.request.Method()
req.err <- common.NewErrUpstreamRequest(
err,
c.upstream.ProjectId,
req.request.NetworkId(),
c.upstream.Config().Id,
m,
0, 0, 0, 0,
)
continue
Expand Down Expand Up @@ -369,13 +361,9 @@ func (c *GenericHttpJsonRpcClient) processBatchResponse(requests map[interface{}
func (c *GenericHttpJsonRpcClient) sendSingleRequest(ctx context.Context, req *common.NormalizedRequest) (*common.NormalizedResponse, error) {
jrReq, err := req.JsonRpcRequest()
if err != nil {
m, _ := req.Method()
return nil, common.NewErrUpstreamRequest(
err,
c.upstream.ProjectId,
req.NetworkId(),
c.upstream.Config().Id,
m,
0, 0, 0, 0,
)
}
Expand Down Expand Up @@ -536,7 +524,6 @@ func extractJsonRpcError(r *http.Response, nr *common.NormalizedResponse, jr *co
),
)
} else if r.StatusCode == 429 ||
r.StatusCode == 408 ||
code == -32005 ||
strings.Contains(err.Message, "has exceeded") ||
strings.Contains(err.Message, "Exceeded the quota") ||
Expand Down
10 changes: 5 additions & 5 deletions upstream/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ type UpstreamsRegistry struct {
}

type UpstreamsHealth struct {
Upstreams []*Upstream `json:"upstreams"`
SortedUpstreams map[string]map[string][]string `json:"sortedUpstreams"`
Upstreams []*Upstream `json:"upstreams"`
SortedUpstreams map[string]map[string][]string `json:"sortedUpstreams"`
UpstreamScores map[string]map[string]map[string]float64 `json:"upstreamScores"`
}

Expand Down Expand Up @@ -350,13 +350,13 @@ func (u *UpstreamsRegistry) calculateScore(normTotalRequests, normP90Latency, no
score += expCurve(1 - normTotalRequests)

// Higher score for lower p90 latency
score += expCurve(1 - normP90Latency) * 4
score += expCurve(1-normP90Latency) * 4

// Higher score for lower error rate
score += expCurve(1 - normErrorRate) * 8
score += expCurve(1-normErrorRate) * 8

// Higher score for lower throttled rate
score += expCurve(1 - normThrottledRate) * 3
score += expCurve(1-normThrottledRate) * 3

return score
}
Expand Down
2 changes: 1 addition & 1 deletion upstream/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ func TestUpstreamScoring(t *testing.T) {
name string
windowSize time.Duration
upstreamConfig []upstreamMetrics
expectedOrder []string
expectedOrder []string
}{
{
name: "MixedLatencyAndFailureRate",
Expand Down
Loading

0 comments on commit c6922da

Please sign in to comment.