diff --git a/common/errors.go b/common/errors.go index 530fb888..56440279 100644 --- a/common/errors.go +++ b/common/errors.go @@ -1,6 +1,7 @@ package common import ( + "context" "errors" "fmt" "net/http" @@ -40,15 +41,17 @@ func ErrorSummary(err interface{}) string { return s } -var ethAddr = regexp.MustCompile(`0x[a-fA-F0-9]+`) -var txHashErr = regexp.MustCompile(`transaction [a-fA-F0-9]+`) +var longHash = regexp.MustCompile(`0x[a-fA-F0-9][a-fA-F0-9]+`) +var txHashErr = regexp.MustCompile(`transaction [a-fA-F0-9][a-fA-F0-9][a-fA-F0-9]+`) +var trieNodeErr = regexp.MustCompile(`trie node [a-fA-F0-9][a-fA-F0-9][a-fA-F0-9]+`) var revertAddr = regexp.MustCompile(`.*execution reverted.*`) var ipAddr = regexp.MustCompile(`\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}`) var ddg = regexp.MustCompile(`\d\d+`) func cleanUpMessage(s string) string { - s = ethAddr.ReplaceAllString(s, "0xREDACTED") + s = longHash.ReplaceAllString(s, "0xREDACTED") s = txHashErr.ReplaceAllString(s, "transaction 0xREDACTED") + s = trieNodeErr.ReplaceAllString(s, "trie node 0xREDACTED") s = revertAddr.ReplaceAllString(s, "execution reverted") s = ipAddr.ReplaceAllString(s, "X.X.X.X") s = ddg.ReplaceAllString(s, "XX") @@ -79,6 +82,7 @@ type StandardError interface { DeepestMessage() string GetCause() error ErrorStatusCode() int + Base() *BaseError } func (e *BaseError) GetCode() ErrorCode { @@ -144,31 +148,30 @@ func (e *BaseError) GetCause() error { func (e BaseError) MarshalJSON() ([]byte, error) { type Alias BaseError cause := e.Cause - if cc, ok := cause.(StandardError); ok { - if je, ok := cc.(interface{ Errors() []error }); ok { - // Handle joined errors - causes := make([]interface{}, 0) - for _, err := range je.Errors() { - if se, ok := err.(StandardError); ok { - causes = append(causes, se) - } else { - causes = append(causes, err.Error()) - } + if cs, ok := cause.(interface{ Unwrap() []error }); ok { + // Handle joined errors + causes := make([]interface{}, 0) + for _, err := range cs.Unwrap() { + if se, ok := err.(StandardError); ok { + causes = append(causes, se) + } else { + causes = append(causes, err.Error()) } - return sonic.Marshal(&struct { - Alias - Cause []interface{} `json:"cause"` - }{ - Alias: (Alias)(e), - Cause: causes, - }) } return sonic.Marshal(&struct { Alias - Cause interface{} `json:"cause"` + Cause []interface{} `json:"cause"` + }{ + Alias: (Alias)(e), + Cause: causes, + }) + } else if cs, ok := cause.(StandardError); ok { + return sonic.Marshal(&struct { + Alias + Cause StandardError `json:"cause"` }{ Alias: (Alias)(e), - Cause: cc, + Cause: cs, }) } else if cause != nil { return sonic.Marshal(&struct { @@ -240,6 +243,10 @@ func (e *BaseError) ErrorStatusCode() int { return http.StatusInternalServerError } +func (e *BaseError) Base() *BaseError { + return e +} + // // Server // @@ -296,7 +303,7 @@ var NewErrRequestTimeout = func(timeout time.Duration) error { } func (e *ErrRequestTimeout) ErrorStatusCode() int { - return http.StatusRequestTimeout + return http.StatusGatewayTimeout } type ErrInternalServerError struct{ BaseError } @@ -505,7 +512,7 @@ var NewErrUpstreamClientInitialization = func(cause error, upstreamId string) er type ErrUpstreamRequest struct{ BaseError } -var NewErrUpstreamRequest = func(cause error, prjId, netId, upsId, method string, duration time.Duration, attempts, retries, hedges int) error { +var NewErrUpstreamRequest = func(cause error, upsId string, duration time.Duration, attempts, retries, hedges int) error { return &ErrUpstreamRequest{ BaseError{ Code: "ErrUpstreamRequest", @@ -513,10 +520,7 @@ var NewErrUpstreamRequest = func(cause error, prjId, netId, upsId, method string Cause: cause, Details: map[string]interface{}{ "durationMs": duration.Milliseconds(), - "projectId": prjId, - "networkId": netId, "upstreamId": upsId, - "method": method, "attempts": attempts, "retries": retries, "hedges": hedges, @@ -594,7 +598,7 @@ const ErrCodeUpstreamsExhausted ErrorCode = "ErrUpstreamsExhausted" var NewErrUpstreamsExhausted = func( req *NormalizedRequest, - ers []error, + ersObj map[string]error, prjId, netId string, duration time.Duration, attempts, retries, hedges int, @@ -606,10 +610,15 @@ var NewErrUpstreamsExhausted = func( } else if s != nil { reqStr = string(s) } - return &ErrUpstreamsExhausted{ + // TODO create a new error type that holds a map to avoid creating a new array + ers := []error{} + for _, err := range ersObj { + ers = append(ers, err) + } + e := &ErrUpstreamsExhausted{ BaseError{ Code: ErrCodeUpstreamsExhausted, - Message: "all available upstreams have been exhausted", + Message: "all upstream attempts failed", Cause: errors.Join(ers...), Details: map[string]interface{}{ "request": reqStr, @@ -622,9 +631,34 @@ var NewErrUpstreamsExhausted = func( }, }, } + + sm := e.SummarizeCauses() + if sm != "" { + e.Message += " (" + sm + ")" + } + + return e } func (e *ErrUpstreamsExhausted) ErrorStatusCode() int { + if e.Cause != nil { + if be, ok := e.Cause.(StandardError); ok { + return be.ErrorStatusCode() + } + // if it's an array of errors (Unwrap) + if joinedErr, ok := e.Cause.(interface{ Unwrap() []error }); ok { + fsc := 503 + for _, e := range joinedErr.Unwrap() { + if be, ok := e.(StandardError); ok { + sc := be.ErrorStatusCode() + if sc != 503 { + fsc = sc + } + } + } + return fsc + } + } return 503 } @@ -634,18 +668,90 @@ func (e *ErrUpstreamsExhausted) CodeChain() string { if e.Cause == nil { return codeChain } - causesChains := []string{} + + s := e.SummarizeCauses() + if s != "" { + return codeChain + " (" + s + ")" + } + + return codeChain +} + +func (e *ErrUpstreamsExhausted) SummarizeCauses() string { if joinedErr, ok := e.Cause.(interface{ Unwrap() []error }); ok { + unsupported := 0 + missing := 0 + timeout := 0 + serverError := 0 + rateLimit := 0 + cbOpen := 0 + billing := 0 + other := 0 + cancelled := 0 + for _, e := range joinedErr.Unwrap() { - if se, ok := e.(StandardError); ok { - causesChains = append(causesChains, se.CodeChain()) + if HasErrorCode(e, ErrCodeEndpointUnsupported) { + unsupported++ + continue + } else if HasErrorCode(e, ErrCodeEndpointMissingData) { + missing++ + continue + } else if HasErrorCode(e, ErrCodeEndpointCapacityExceeded) { + rateLimit++ + continue + } else if HasErrorCode(e, ErrCodeEndpointBillingIssue) { + billing++ + continue + } else if HasErrorCode(e, ErrCodeFailsafeCircuitBreakerOpen) { + cbOpen++ + continue + } else if errors.Is(e, context.DeadlineExceeded) || HasErrorCode(e, ErrCodeEndpointRequestTimeout) { + timeout++ + continue + } else if HasErrorCode(e, ErrCodeEndpointServerSideException) { + serverError++ + continue + } else if HasErrorCode(e, ErrCodeUpstreamHedgeCancelled) { + cancelled++ + continue + } else if !HasErrorCode(e, ErrCodeUpstreamMethodIgnored) { + other++ } } - codeChain += " <= (" + strings.Join(causesChains, " + ") + ")" + reasons := []string{} + if unsupported > 0 { + reasons = append(reasons, fmt.Sprintf("%d unsupported method", unsupported)) + } + if missing > 0 { + reasons = append(reasons, fmt.Sprintf("%d missing data", missing)) + } + if timeout > 0 { + reasons = append(reasons, fmt.Sprintf("%d timeout", timeout)) + } + if serverError > 0 { + reasons = append(reasons, fmt.Sprintf("%d server errors", serverError)) + } + if rateLimit > 0 { + reasons = append(reasons, fmt.Sprintf("%d rate limited", rateLimit)) + } + if cbOpen > 0 { + reasons = append(reasons, fmt.Sprintf("%d circuit breaker open", cbOpen)) + } + if billing > 0 { + reasons = append(reasons, fmt.Sprintf("%d billing issues", billing)) + } + if cancelled > 0 { + reasons = append(reasons, fmt.Sprintf("%d hedges cancelled", cancelled)) + } + if other > 0 { + reasons = append(reasons, fmt.Sprintf("%d other errors", other)) + } + + return strings.Join(reasons, ", ") } - return codeChain + return "" } func (e *ErrUpstreamsExhausted) Errors() []error { @@ -666,17 +772,9 @@ func (e *ErrUpstreamsExhausted) DeepestMessage() string { return e.Message } - causesDeepestMsgs := []string{} - if joinedErr, ok := e.Cause.(interface{ Unwrap() []error }); ok { - for _, e := range joinedErr.Unwrap() { - if se, ok := e.(StandardError); ok { - causesDeepestMsgs = append(causesDeepestMsgs, se.DeepestMessage()) - } - } - } - - if len(causesDeepestMsgs) > 0 { - return strings.Join(causesDeepestMsgs, " + ") + s := e.SummarizeCauses() + if s != "" { + return s } return e.Message @@ -785,8 +883,7 @@ type ErrUpstreamRequestSkipped struct{ BaseError } const ErrCodeUpstreamRequestSkipped ErrorCode = "ErrUpstreamRequestSkipped" -var NewErrUpstreamRequestSkipped = func(reason error, upstreamId string, req *NormalizedRequest) error { - m, _ := req.Method() +var NewErrUpstreamRequestSkipped = func(reason error, upstreamId string) error { return &ErrUpstreamRequestSkipped{ BaseError{ Code: ErrCodeUpstreamRequestSkipped, @@ -794,7 +891,6 @@ var NewErrUpstreamRequestSkipped = func(reason error, upstreamId string, req *No Cause: reason, Details: map[string]interface{}{ "upstreamId": upstreamId, - "method": m, }, }, } @@ -802,10 +898,12 @@ var NewErrUpstreamRequestSkipped = func(reason error, upstreamId string, req *No type ErrUpstreamMethodIgnored struct{ BaseError } +const ErrCodeUpstreamMethodIgnored ErrorCode = "ErrUpstreamMethodIgnored" + var NewErrUpstreamMethodIgnored = func(method string, upstreamId string) error { return &ErrUpstreamMethodIgnored{ BaseError{ - Code: "ErrUpstreamMethodIgnored", + Code: ErrCodeUpstreamMethodIgnored, Message: "method ignored by upstream configuration", Details: map[string]interface{}{ "method": method, @@ -815,6 +913,22 @@ var NewErrUpstreamMethodIgnored = func(method string, upstreamId string) error { } } +type ErrUpstreamHedgeCancelled struct{ BaseError } + +const ErrCodeUpstreamHedgeCancelled ErrorCode = "ErrUpstreamHedgeCancelled" + +var NewErrUpstreamHedgeCancelled = func(upstreamId string) error { + return &ErrUpstreamHedgeCancelled{ + BaseError{ + Code: ErrCodeUpstreamHedgeCancelled, + Message: "hedged request cancelled in favor another response", + Details: map[string]interface{}{ + "upstreamId": upstreamId, + }, + }, + } +} + type ErrResponseWriteLock struct{ BaseError } var NewErrResponseWriteLock = func(writerId string) error { @@ -1067,7 +1181,7 @@ var NewErrNetworkRequestTimeout = func(duration time.Duration) error { } func (e *ErrNetworkRequestTimeout) ErrorStatusCode() int { - return http.StatusRequestTimeout + return http.StatusGatewayTimeout } type ErrUpstreamRateLimitRuleExceeded struct{ BaseError } @@ -1130,7 +1244,7 @@ var NewErrEndpointUnsupported = func(cause error) error { } func (e *ErrEndpointUnsupported) ErrorStatusCode() int { - return 415 + return http.StatusUnsupportedMediaType } type ErrEndpointClientSideException struct{ BaseError } @@ -1263,6 +1377,10 @@ var NewErrEndpointEvmLargeRange = func(cause error) error { } } +func (e *ErrEndpointEvmLargeRange) ErrorStatusCode() int { + return http.StatusRequestEntityTooLarge +} + // // JSON-RPC // diff --git a/common/request.go b/common/request.go index aa51f714..070b725f 100644 --- a/common/request.go +++ b/common/request.go @@ -18,8 +18,11 @@ type RequestDirectives struct { type NormalizedRequest struct { Attempt int - network Network - body []byte + network Network + body []byte + + uid string + method string directives *RequestDirectives jsonRpcRequest *JsonRpcRequest @@ -44,6 +47,8 @@ func NewNormalizedRequest(body []byte) *NormalizedRequest { } func (r *NormalizedRequest) SetLastUpstream(upstream Upstream) *NormalizedRequest { + r.Mu.Lock() + defer r.Mu.Unlock() r.lastUpstream = upstream return r } @@ -74,6 +79,47 @@ func (r *NormalizedRequest) Network() Network { return r.network } +func (r *NormalizedRequest) Id() string { + if r == nil { + return "" + } + + r.Mu.Lock() + defer r.Mu.Unlock() + + if r.uid != "" { + return r.uid + } + + if r.jsonRpcRequest != nil { + if id, ok := r.jsonRpcRequest.ID.(string); ok { + r.uid = id + return id + } else if id, ok := r.jsonRpcRequest.ID.(float64); ok { + r.uid = fmt.Sprintf("%d", int64(id)) + return r.uid + } else { + r.uid = fmt.Sprintf("%v", r.jsonRpcRequest.ID) + return r.uid + } + } + + if len(r.body) > 0 { + idn, err := sonic.Get(r.body, "id") + if err == nil { + id, err := idn.String() + if err == nil { + r.uid = "n/a" + } else { + r.uid = id + } + return id + } + } + + return "" +} + func (r *NormalizedRequest) NetworkId() string { if r == nil || r.network == nil { // For certain requests such as internal eth_chainId requests, network might not be available yet. @@ -135,12 +181,27 @@ func (r *NormalizedRequest) JsonRpcRequest() (*JsonRpcRequest, error) { } func (r *NormalizedRequest) Method() (string, error) { - rpcReq, err := r.JsonRpcRequest() - if err != nil { - return "", err + if r.method != "" { + return r.method, nil + } + + if r.jsonRpcRequest != nil { + r.method = r.jsonRpcRequest.Method + return r.jsonRpcRequest.Method, nil + } + + if len(r.body) > 0 { + method, err := sonic.Get(r.body, "method") + if err != nil { + r.method = "n/a" + return r.method, err + } + m, err := method.String() + r.method = m + return m, err } - return rpcReq.Method, nil + return "", NewErrJsonRpcRequestUnresolvableMethod(r.body) } func (r *NormalizedRequest) Body() []byte { diff --git a/common/response.go b/common/response.go index e0983c0c..da615131 100644 --- a/common/response.go +++ b/common/response.go @@ -1,8 +1,6 @@ package common import ( - "fmt" - "github.com/bytedance/sonic" ) @@ -180,7 +178,7 @@ func (r *NormalizedResponse) JsonRpcResponse() (*JsonRpcResponse, error) { } else { jrr.Error = NewErrJsonRpcExceptionExternal( int(JsonRpcErrorServerSideException), - fmt.Sprintf("%s -> %s", err, string(r.body)), + string(r.body), "", ) } diff --git a/data/postgresql.go b/data/postgresql.go index 9922957b..88c765fa 100644 --- a/data/postgresql.go +++ b/data/postgresql.go @@ -94,7 +94,7 @@ func (p *PostgreSQLConnector) Get(ctx context.Context, index, partitionKey, rang args = []interface{}{partitionKey, rangeKey} } - p.logger.Debug().Msgf("getting item from PostgreSQL with query: %s", query) + p.logger.Debug().Msgf("getting item from PostgreSQL with query: %s args: %v", query, args) var value string err := p.conn.QueryRow(ctx, query, args...).Scan(&value) @@ -134,7 +134,7 @@ func (p *PostgreSQLConnector) getWithWildcard(ctx context.Context, index, partit } } - p.logger.Debug().Msgf("getting item from PostgreSQL with wildcard query: %s", query) + p.logger.Debug().Msgf("getting item from PostgreSQL with wildcard query: %s args: %v", query, args) var value string err := p.conn.QueryRow(ctx, query, args...).Scan(&value) diff --git a/docs/pages/config/projects/upstreams.mdx b/docs/pages/config/projects/upstreams.mdx index 11d01890..f97e948b 100644 --- a/docs/pages/config/projects/upstreams.mdx +++ b/docs/pages/config/projects/upstreams.mdx @@ -25,7 +25,7 @@ eRPC records various metrics about each upstream, and uses them to decide which - **P90 latency of requests** gives higher priority to upstreams with lower latency. - **Total requests served** gives higher priority to upstreams with least served requests so they have a chance to prove themselves. -These metrics amount to a certain **Score** per upstream, within a defined `windowSize` (default 30 minutes), which can be configured as: +These metrics amount to a certain **Score** per upstream (alchemy, infura, etc) and per method (eth_blockNumber, eth_getLogs, etc), within a defined `windowSize` (default 30 minutes), which can be configured as: ```yaml filename="erpc.yaml" projects: # ... diff --git a/erpc/dual_stack.go b/erpc/dual_stack.go new file mode 100644 index 00000000..8fb86aa7 --- /dev/null +++ b/erpc/dual_stack.go @@ -0,0 +1,50 @@ +package erpc + +import "net" + +type dualStackListener struct { + ln4, ln6 net.Listener +} + +func (dl *dualStackListener) Accept() (net.Conn, error) { + // Use a channel to communicate the result of Accept + type result struct { + conn net.Conn + err error + } + ch := make(chan result, 2) + + // Try to accept from both listeners + go func() { + conn, err := dl.ln4.Accept() + ch <- result{conn, err} + }() + go func() { + conn, err := dl.ln6.Accept() + ch <- result{conn, err} + }() + + // Return the first successful connection, or the last error + var lastErr error + for i := 0; i < 2; i++ { + res := <-ch + if res.err == nil { + return res.conn, nil + } + lastErr = res.err + } + return nil, lastErr +} + +func (dl *dualStackListener) Close() error { + err4 := dl.ln4.Close() + err6 := dl.ln6.Close() + if err4 != nil { + return err4 + } + return err6 +} + +func (dl *dualStackListener) Addr() net.Addr { + return dl.ln4.Addr() +} diff --git a/erpc/erpc_test.go b/erpc/erpc_test.go index 94824d9e..ff90657a 100644 --- a/erpc/erpc_test.go +++ b/erpc/erpc_test.go @@ -5,16 +5,22 @@ import ( "encoding/json" "fmt" "math/rand" + "os" "sync" "testing" "time" "github.com/erpc/erpc/common" "github.com/h2non/gock" + "github.com/rs/zerolog" "github.com/rs/zerolog/log" "github.com/stretchr/testify/assert" ) +func init() { + log.Logger = log.Level(zerolog.ErrorLevel).Output(zerolog.ConsoleWriter{Out: os.Stderr}) +} + var erpcMu sync.Mutex func TestErpc_UpstreamsRegistryCorrectPriorityChange(t *testing.T) { diff --git a/erpc/evm_json_rpc_cache.go b/erpc/evm_json_rpc_cache.go index a7b3eeab..ac358989 100644 --- a/erpc/evm_json_rpc_cache.go +++ b/erpc/evm_json_rpc_cache.go @@ -85,6 +85,10 @@ func (c *EvmJsonRpcCache) Get(ctx context.Context, req *common.NormalizedRequest return nil, err } + if resultString == `""` || resultString == "null" || resultString == "[]" || resultString == "{}" { + return nil, nil + } + jrr := &common.JsonRpcResponse{ JSONRPC: rpcReq.JSONRPC, ID: rpcReq.ID, @@ -110,7 +114,7 @@ func (c *EvmJsonRpcCache) Set(ctx context.Context, req *common.NormalizedRequest lg := c.logger.With().Str("network", req.NetworkId()).Str("method", rpcReq.Method).Logger() - if rpcResp == nil || rpcResp.Result == nil || rpcResp.Error != nil { + if resp == nil || resp.IsObjectNull() || resp.IsResultEmptyish() || rpcResp == nil || rpcResp.Result == nil || rpcResp.Error != nil { lg.Debug().Msg("not caching response because it has no result or has error") return nil } @@ -155,14 +159,14 @@ func (c *EvmJsonRpcCache) Set(ctx context.Context, req *common.NormalizedRequest Interface("result", rpcResp.Result). Msg("caching the response") - resultStr, err := sonic.Marshal(rpcResp.Result) + resultBytes, err := sonic.Marshal(rpcResp.Result) if err != nil { return err } - ctx, cancel := context.WithTimeoutCause(ctx, 10*time.Second, errors.New("cache driver timeout during set")) + ctx, cancel := context.WithTimeoutCause(ctx, 5*time.Second, errors.New("evm json-rpc cache driver timeout during set")) defer cancel() - return c.conn.Set(ctx, pk, rk, string(resultStr)) + return c.conn.Set(ctx, pk, rk, string(resultBytes)) } func (c *EvmJsonRpcCache) DeleteByGroupKey(ctx context.Context, groupKeys ...string) error { diff --git a/erpc/http_server.go b/erpc/http_server.go index 81a01a00..19dfde0d 100644 --- a/erpc/http_server.go +++ b/erpc/http_server.go @@ -7,6 +7,7 @@ import ( "errors" "fmt" "net" + "runtime/debug" "strings" "sync" "time" @@ -49,9 +50,9 @@ func NewHttpServer(ctx context.Context, logger *zerolog.Logger, cfg *common.Serv srv.server = &fasthttp.Server{ Handler: fasthttp.TimeoutHandler( - srv.createRequestHandler(reqMaxTimeout), + srv.createRequestHandler(ctx, reqMaxTimeout), // This is the last resort timeout if nothing could be done in time - reqMaxTimeout + 1 * time.Second, + reqMaxTimeout+1*time.Second, `{"jsonrpc":"2.0","error":{"code":-32603,"message":"request timeout before any upstream responded"}}`, ), ReadTimeout: 5 * time.Second, @@ -70,11 +71,16 @@ func NewHttpServer(ctx context.Context, logger *zerolog.Logger, cfg *common.Serv return srv } -func (s *HttpServer) createRequestHandler(reqMaxTimeout time.Duration) fasthttp.RequestHandler { +func (s *HttpServer) createRequestHandler(mainCtx context.Context, reqMaxTimeout time.Duration) fasthttp.RequestHandler { return func(fastCtx *fasthttp.RequestCtx) { defer func() { + defer func() { recover() }() if r := recover(); r != nil { - s.logger.Error().Msgf("unexpected server panic: %v", r) + msg := fmt.Sprintf("unexpected server panic on top-level handler: %v -> %s", r, string(debug.Stack())) + s.logger.Error().Msgf(msg) + fastCtx.SetStatusCode(fasthttp.StatusInternalServerError) + fastCtx.Response.Header.Set("Content-Type", "application/json") + fastCtx.SetBodyString(fmt.Sprintf(`{"jsonrpc":"2.0","error":{"code":-32603,"message":"%s"}}`, msg)) } }() @@ -85,7 +91,7 @@ func (s *HttpServer) createRequestHandler(reqMaxTimeout time.Duration) fasthttp. segments := strings.Split(string(fastCtx.Path()), "/") if len(segments) != 2 && len(segments) != 3 && len(segments) != 4 { - handleErrorResponse(s.logger, nil, common.NewErrInvalidUrlPath(string(fastCtx.Path())), fastCtx) + handleErrorResponse(s.logger, nil, common.NewErrInvalidUrlPath(string(fastCtx.Path())), fastCtx, encoder, buf) return } @@ -100,14 +106,16 @@ func (s *HttpServer) createRequestHandler(reqMaxTimeout time.Duration) fasthttp. if segments[2] == "admin" { isAdmin = true } else { - handleErrorResponse(s.logger, nil, common.NewErrInvalidUrlPath(string(fastCtx.Path())), fastCtx) + handleErrorResponse(s.logger, nil, common.NewErrInvalidUrlPath(string(fastCtx.Path())), fastCtx, encoder, buf) return } } + lg := s.logger.With().Str("projectId", projectId).Str("architecture", architecture).Str("chainId", chainId).Logger() + project, err := s.erpc.GetProject(projectId) if err != nil { - handleErrorResponse(s.logger, nil, err, fastCtx) + handleErrorResponse(s.logger, nil, err, fastCtx, encoder, buf) return } @@ -123,7 +131,7 @@ func (s *HttpServer) createRequestHandler(reqMaxTimeout time.Duration) fasthttp. body := fastCtx.PostBody() - s.logger.Debug().Msgf("received request for projectId: %s, architecture: %s with body: %s", projectId, architecture, body) + lg.Debug().Msgf("received request with body: %s", body) var requests []json.RawMessage err = sonic.Unmarshal(body, &requests) @@ -140,38 +148,46 @@ func (s *HttpServer) createRequestHandler(reqMaxTimeout time.Duration) fasthttp. var argsCopy fasthttp.Args fastCtx.Request.Header.CopyTo(&headersCopy) fastCtx.QueryArgs().CopyTo(&argsCopy) - + for i, reqBody := range requests { wg.Add(1) go func(index int, rawReq json.RawMessage, headersCopy *fasthttp.RequestHeader, argsCopy *fasthttp.Args) { defer func() { + defer func() { recover() }() if r := recover(); r != nil { - s.logger.Error().Msgf("unexpected server panic: %v", r) + msg := fmt.Sprintf("unexpected server panic on per-request handler: %v -> %s", r, string(debug.Stack())) + lg.Error().Msgf(msg) + fastCtx.SetStatusCode(fasthttp.StatusInternalServerError) + fastCtx.Response.Header.Set("Content-Type", "application/json") + fastCtx.SetBodyString(fmt.Sprintf(`{"jsonrpc":"2.0","error":{"code":-32603,"message":"%s"}}`, msg)) } }() - + defer wg.Done() - requestCtx, cancel := context.WithTimeoutCause(fastCtx, reqMaxTimeout, common.NewErrRequestTimeout(reqMaxTimeout)) + requestCtx, cancel := context.WithTimeoutCause(mainCtx, reqMaxTimeout, common.NewErrRequestTimeout(reqMaxTimeout)) defer cancel() nq := common.NewNormalizedRequest(rawReq) nq.ApplyDirectivesFromHttpHeaders(headersCopy) + m, _ := nq.Method() + rlg := lg.With().Str("method", m).Logger() + ap, err := auth.NewPayloadFromHttp(project.Config.Id, nq, headersCopy, argsCopy) if err != nil { - responses[index] = processErrorBody(s.logger, nq, err) + responses[index] = processErrorBody(&rlg, nq, err) return } if isAdmin { if err := project.AuthenticateAdmin(requestCtx, nq, ap); err != nil { - responses[index] = processErrorBody(s.logger, nq, err) + responses[index] = processErrorBody(&rlg, nq, err) return } } else { if err := project.AuthenticateConsumer(requestCtx, nq, ap); err != nil { - responses[index] = processErrorBody(s.logger, nq, err) + responses[index] = processErrorBody(&rlg, nq, err) return } } @@ -180,14 +196,14 @@ func (s *HttpServer) createRequestHandler(reqMaxTimeout time.Duration) fasthttp. if project.Config.Admin != nil { resp, err := project.HandleAdminRequest(requestCtx, nq) if err != nil { - responses[index] = processErrorBody(s.logger, nq, err) + responses[index] = processErrorBody(&rlg, nq, err) return } responses[index] = resp return } else { responses[index] = processErrorBody( - s.logger, + &rlg, nq, common.NewErrAuthUnauthorized( "", @@ -203,14 +219,14 @@ func (s *HttpServer) createRequestHandler(reqMaxTimeout time.Duration) fasthttp. if architecture == "" || chainId == "" { var req map[string]interface{} if err := sonic.Unmarshal(rawReq, &req); err != nil { - responses[index] = processErrorBody(s.logger, nq, common.NewErrInvalidRequest(err)) + responses[index] = processErrorBody(&rlg, nq, common.NewErrInvalidRequest(err)) return } if networkIdFromBody, ok := req["networkId"].(string); ok { networkId = networkIdFromBody parts := strings.Split(networkId, ":") if len(parts) != 2 { - responses[index] = processErrorBody(s.logger, nq, common.NewErrInvalidRequest(fmt.Errorf( + responses[index] = processErrorBody(&rlg, nq, common.NewErrInvalidRequest(fmt.Errorf( "networkId must follow this format: 'architecture:chainId' for example 'evm:42161'", ))) return @@ -218,7 +234,7 @@ func (s *HttpServer) createRequestHandler(reqMaxTimeout time.Duration) fasthttp. architecture = parts[0] chainId = parts[1] } else { - responses[index] = processErrorBody(s.logger, nq, common.NewErrInvalidRequest(fmt.Errorf( + responses[index] = processErrorBody(&rlg, nq, common.NewErrInvalidRequest(fmt.Errorf( "networkId must follow this format: 'architecture:chainId' for example 'evm:42161'", ))) return @@ -229,14 +245,14 @@ func (s *HttpServer) createRequestHandler(reqMaxTimeout time.Duration) fasthttp. nw, err := project.GetNetwork(networkId) if err != nil { - responses[index] = processErrorBody(s.logger, nq, err) + responses[index] = processErrorBody(&rlg, nq, err) return } nq.SetNetwork(nw) resp, err := project.Forward(requestCtx, networkId, nq) if err != nil { - responses[index] = processErrorBody(s.logger, nq, err) + responses[index] = processErrorBody(&rlg, nq, err) return } @@ -253,58 +269,8 @@ func (s *HttpServer) createRequestHandler(reqMaxTimeout time.Duration) fasthttp. encoder.Encode(responses) } else { res := responses[0] - - var rm common.ResponseMetadata - var ok bool - rm, ok = res.(common.ResponseMetadata) - if !ok { - var jrsp, errObj map[string]interface{} - if jrsp, ok = res.(map[string]interface{}); ok { - if errObj, ok = jrsp["error"].(map[string]interface{}); ok { - if err, ok = errObj["cause"].(error); ok { - uer := &common.ErrUpstreamsExhausted{} - if ok = errors.As(err, &uer); ok { - rm = uer - } else { - uer := &common.ErrUpstreamRequest{} - if ok = errors.As(err, &uer); ok { - rm = uer - } - } - } - } - } - } - - if ok { - if rm.FromCache() { - fastCtx.Response.Header.Set("X-ERPC-Cache", "HIT") - } else { - fastCtx.Response.Header.Set("X-ERPC-Cache", "MISS") - } - if rm.UpstreamId() != "" { - fastCtx.Response.Header.Set("X-ERPC-Upstream", rm.UpstreamId()) - } - fastCtx.Response.Header.Set("X-ERPC-Attempts", fmt.Sprintf("%d", rm.Attempts())) - fastCtx.Response.Header.Set("X-ERPC-Retries", fmt.Sprintf("%d", rm.Retries())) - fastCtx.Response.Header.Set("X-ERPC-Hedges", fmt.Sprintf("%d", rm.Hedges())) - } - - if err, ok := res.(error); ok { - fastCtx.SetStatusCode(processErrorStatusCode(err)) - } else if resp, ok := res.(map[string]interface{}); ok { - if errObj, ok := resp["error"].(map[string]interface{}); ok { - if cause, ok := errObj["cause"].(error); ok { - fastCtx.SetStatusCode(processErrorStatusCode(cause)) - } else { - fastCtx.SetStatusCode(fasthttp.StatusOK) - } - } else { - fastCtx.SetStatusCode(fasthttp.StatusOK) - } - } else { - fastCtx.SetStatusCode(fasthttp.StatusOK) - } + setResponseHeaders(res, fastCtx) + setResponseStatusCode(res, fastCtx) encoder.Encode(res) } @@ -362,12 +328,71 @@ func (s *HttpServer) handleCORS(ctx *fasthttp.RequestCtx, corsConfig *common.COR return true } +func setResponseHeaders(res interface{}, fastCtx *fasthttp.RequestCtx) { + var rm common.ResponseMetadata + var ok bool + rm, ok = res.(common.ResponseMetadata) + if !ok { + var jrsp, errObj map[string]interface{} + if jrsp, ok = res.(map[string]interface{}); ok { + if errObj, ok = jrsp["error"].(map[string]interface{}); ok { + if err, ok := errObj["cause"].(error); ok { + uer := &common.ErrUpstreamsExhausted{} + if ok = errors.As(err, &uer); ok { + rm = uer + } else { + uer := &common.ErrUpstreamRequest{} + if ok = errors.As(err, &uer); ok { + rm = uer + } + } + } + } + } + } + if ok && rm != nil { + if rm.FromCache() { + fastCtx.Response.Header.Set("X-ERPC-Cache", "HIT") + } else { + fastCtx.Response.Header.Set("X-ERPC-Cache", "MISS") + } + if rm.UpstreamId() != "" { + fastCtx.Response.Header.Set("X-ERPC-Upstream", rm.UpstreamId()) + } + fastCtx.Response.Header.Set("X-ERPC-Attempts", fmt.Sprintf("%d", rm.Attempts())) + fastCtx.Response.Header.Set("X-ERPC-Retries", fmt.Sprintf("%d", rm.Retries())) + fastCtx.Response.Header.Set("X-ERPC-Hedges", fmt.Sprintf("%d", rm.Hedges())) + } +} + +func setResponseStatusCode(respOrErr interface{}, fastCtx *fasthttp.RequestCtx) { + if err, ok := respOrErr.(error); ok { + fastCtx.SetStatusCode(decideErrorStatusCode(err)) + } else if resp, ok := respOrErr.(map[string]interface{}); ok { + if errObj, ok := resp["error"].(map[string]interface{}); ok { + if cause, ok := errObj["cause"].(error); ok { + fastCtx.SetStatusCode(decideErrorStatusCode(cause)) + } else { + fastCtx.SetStatusCode(fasthttp.StatusOK) + } + } else { + fastCtx.SetStatusCode(fasthttp.StatusOK) + } + } else { + fastCtx.SetStatusCode(fasthttp.StatusOK) + } +} + func processErrorBody(logger *zerolog.Logger, nq *common.NormalizedRequest, err error) interface{} { if !common.IsNull(err) { if common.HasErrorCode(err, common.ErrCodeEndpointClientSideException) { logger.Debug().Err(err).Msgf("forward request errored with client-side exception") } else { - logger.Error().Err(err).Msgf("failed to forward request") + if e, ok := err.(common.StandardError); ok { + logger.Error().Err(err).Msgf("failed to forward request: %s", e.DeepestMessage()) + } else { + logger.Error().Err(err).Msgf("failed to forward request: %s", err.Error()) + } } } @@ -409,80 +434,19 @@ func processErrorBody(logger *zerolog.Logger, nq *common.NormalizedRequest, err } } -func processErrorStatusCode(err error) int { +func decideErrorStatusCode(err error) int { if e, ok := err.(common.StandardError); ok { return e.ErrorStatusCode() } return fasthttp.StatusInternalServerError } -func handleErrorResponse(logger *zerolog.Logger, nq *common.NormalizedRequest, err error, ctx *fasthttp.RequestCtx) { - if !common.IsNull(err) { - if common.HasErrorCode(err, common.ErrCodeEndpointClientSideException) { - logger.Debug().Err(err).Msgf("forward request errored with client-side exception") - } else { - logger.Error().Err(err).Msgf("failed to forward request") - } - } - - ctx.Response.Header.SetContentType("application/json") - if e, ok := err.(common.StandardError); ok { - ctx.SetStatusCode(e.ErrorStatusCode()) - } else { - ctx.SetStatusCode(fasthttp.StatusInternalServerError) - } - - err = common.TranslateToJsonRpcException(err) - - var jsonrpcVersion string = "2.0" - var reqId interface{} = nil - if nq != nil { - jrr, _ := nq.JsonRpcRequest() - if jrr != nil { - jsonrpcVersion = jrr.JSONRPC - reqId = jrr.ID - } - } - - jre := &common.ErrJsonRpcExceptionInternal{} - if errors.As(err, &jre) { - writeErr := json.NewEncoder(ctx).Encode(map[string]interface{}{ - "jsonrpc": jsonrpcVersion, - "id": reqId, - "error": map[string]interface{}{ - "code": jre.NormalizedCode(), - "message": jre.Message, - "data": jre.Details["data"], - "cause": err, - }, - }) - if writeErr != nil { - logger.Error().Err(writeErr).Msgf("failed to encode error response body") - } - return - } - - var writeErr error - - if _, ok := err.(*common.BaseError); ok { - writeErr = json.NewEncoder(ctx).Encode(err) - } else { - if serr, ok := err.(common.StandardError); ok { - writeErr = json.NewEncoder(ctx).Encode(serr) - } else { - writeErr = json.NewEncoder(ctx).Encode( - common.BaseError{ - Code: "ErrUnknown", - Message: "unexpected server error", - Cause: err, - }, - ) - } - } - - if writeErr != nil { - logger.Error().Err(writeErr).Msgf("failed to encode error response body") - } +func handleErrorResponse(logger *zerolog.Logger, nq *common.NormalizedRequest, err error, ctx *fasthttp.RequestCtx, encoder sonic.Encoder, buf *bytes.Buffer) { + resp := processErrorBody(logger, nq, err) + setResponseStatusCode(err, ctx) + encoder.Encode(resp) + ctx.Response.Header.Set("Content-Type", "application/json") + ctx.SetBody(buf.Bytes()) } func (s *HttpServer) Start(logger *zerolog.Logger) error { @@ -531,50 +495,3 @@ func (s *HttpServer) Shutdown(logger *zerolog.Logger) error { logger.Info().Msg("stopping http server...") return s.server.Shutdown() } - -type dualStackListener struct { - ln4, ln6 net.Listener -} - -func (dl *dualStackListener) Accept() (net.Conn, error) { - // Use a channel to communicate the result of Accept - type result struct { - conn net.Conn - err error - } - ch := make(chan result, 2) - - // Try to accept from both listeners - go func() { - conn, err := dl.ln4.Accept() - ch <- result{conn, err} - }() - go func() { - conn, err := dl.ln6.Accept() - ch <- result{conn, err} - }() - - // Return the first successful connection, or the last error - var lastErr error - for i := 0; i < 2; i++ { - res := <-ch - if res.err == nil { - return res.conn, nil - } - lastErr = res.err - } - return nil, lastErr -} - -func (dl *dualStackListener) Close() error { - err4 := dl.ln4.Close() - err6 := dl.ln6.Close() - if err4 != nil { - return err4 - } - return err6 -} - -func (dl *dualStackListener) Addr() net.Addr { - return dl.ln4.Addr() -} diff --git a/erpc/http_server_test.go b/erpc/http_server_test.go index 0927c81f..7a21fef6 100644 --- a/erpc/http_server_test.go +++ b/erpc/http_server_test.go @@ -118,7 +118,7 @@ func TestHttpServer_RaceTimeouts(t *testing.T) { JSON(map[string]interface{}{ "jsonrpc": "2.0", "id": 1, - "result": "0x1234", + "result": "0x222222", }) const concurrentRequests = 5 @@ -139,7 +139,7 @@ func TestHttpServer_RaceTimeouts(t *testing.T) { wg.Wait() for _, result := range results { - if result.statusCode != http.StatusGatewayTimeout && result.statusCode != http.StatusRequestTimeout { + if result.statusCode != http.StatusGatewayTimeout { t.Errorf("unexpected status code: %d", result.statusCode) } assert.Contains(t, result.body, "Timeout") @@ -155,7 +155,7 @@ func TestHttpServer_RaceTimeouts(t *testing.T) { JSON(map[string]interface{}{ "jsonrpc": "2.0", "id": 1, - "result": "0x1234", + "result": "0x333333", }) for i := 0; i < 10; i++ { @@ -170,7 +170,7 @@ func TestHttpServer_RaceTimeouts(t *testing.T) { for i := 0; i < totalReqs; i++ { var delay time.Duration - if i % 2 == 0 { + if i%2 == 0 { delay = 1 * time.Millisecond // shorter than the server timeout } else { delay = 200 * time.Millisecond // longer than the server timeout @@ -183,7 +183,7 @@ func TestHttpServer_RaceTimeouts(t *testing.T) { r.JSON(map[string]interface{}{ "jsonrpc": "2.0", "id": rand.Intn(100000000), - "result": map[string]interface{}{ + "result": map[string]interface{}{ "blockNumber": rand.Intn(100000000), }, }) @@ -210,7 +210,7 @@ func TestHttpServer_RaceTimeouts(t *testing.T) { timeouts := 0 successes := 0 for _, result := range results { - if result.statusCode == http.StatusGatewayTimeout || result.statusCode == http.StatusRequestTimeout { + if result.statusCode == http.StatusGatewayTimeout { timeouts++ assert.Contains(t, result.body, "Timeout") } else { @@ -260,6 +260,7 @@ func TestHttpServer_HandleRequest_EthGetBlockNumber(t *testing.T) { Evm: &common.EvmUpstreamConfig{ ChainId: 1, }, + VendorName: "llama", }, }, }, @@ -312,6 +313,7 @@ func TestHttpServer_HandleRequest_EthGetBlockNumber(t *testing.T) { } t.Run("ConcurrentEthGetBlockNumber", func(t *testing.T) { + defer gock.Off() const concurrentRequests = 10 gock.New("http://rpc1.localhost"). @@ -321,7 +323,7 @@ func TestHttpServer_HandleRequest_EthGetBlockNumber(t *testing.T) { JSON(map[string]interface{}{ "jsonrpc": "2.0", "id": 1, - "result": "0x1234", + "result": "0x444444", }) var wg sync.WaitGroup @@ -334,10 +336,9 @@ func TestHttpServer_HandleRequest_EthGetBlockNumber(t *testing.T) { wg.Add(1) go func(index int) { defer wg.Done() - body := `{"jsonrpc":"2.0","method":"eth_getBlockNumber","params":[],"id":1}` + body := fmt.Sprintf(`{"jsonrpc":"2.0","method":"eth_getBlockNumber","params":[%d],"id":1}`, index) results[index].statusCode, results[index].body = sendRequest(body) }(i) - time.Sleep(50 * time.Millisecond) } wg.Wait() @@ -348,7 +349,7 @@ func TestHttpServer_HandleRequest_EthGetBlockNumber(t *testing.T) { var response map[string]interface{} err := json.Unmarshal([]byte(result.body), &response) assert.NoError(t, err, "Should be able to decode response for request %d", i) - assert.Equal(t, "0x1234", response["result"], "Unexpected result for request %d", i) + assert.Equal(t, "0x444444", response["result"], "Unexpected result for request %d", i) } assert.True(t, gock.IsDone(), "All mocks should have been called") @@ -372,6 +373,8 @@ func TestHttpServer_HandleRequest_EthGetBlockNumber(t *testing.T) { }) t.Run("UnsupportedMethod", func(t *testing.T) { + defer gock.Off() + gock.New("http://rpc1.localhost"). Post("/"). Reply(200). @@ -386,7 +389,7 @@ func TestHttpServer_HandleRequest_EthGetBlockNumber(t *testing.T) { statusCode, body := sendRequest(`{"jsonrpc":"2.0","method":"unsupported_method","params":[],"id":1}`) - assert.Equal(t, http.StatusServiceUnavailable, statusCode) + assert.Equal(t, http.StatusUnsupportedMediaType, statusCode) var errorResponse map[string]interface{} err := json.Unmarshal([]byte(body), &errorResponse) @@ -435,7 +438,7 @@ func TestHttpServer_HandleRequest_EthGetBlockNumber(t *testing.T) { JSON(map[string]interface{}{ "jsonrpc": "2.0", "id": 1, - "result": "0x1234", + "result": "0x1111111", }) statusCode, body := sendRequest(`{"jsonrpc":"2.0","method":"eth_getBlockNumber","params":[],"id":1}`) @@ -453,4 +456,35 @@ func TestHttpServer_HandleRequest_EthGetBlockNumber(t *testing.T) { assert.True(t, gock.IsDone(), "All mocks should have been called") }) -} \ No newline at end of file + + t.Run("UnexpectedPlainErrorResponseFromUpstream", func(t *testing.T) { + gock.New("http://rpc1.localhost"). + Post("/"). + Times(1). + Reply(200). + BodyString("error code: 1015") + + statusCode, body := sendRequest(`{"jsonrpc":"2.0","method":"eth_getBlockNumber","params":[],"id":1}`) + + assert.Equal(t, http.StatusTooManyRequests, statusCode) + assert.Contains(t, body, "error code: 1015") + + assert.True(t, gock.IsDone(), "All mocks should have been called") + }) + + t.Run("UnexpectedServerErrorResponseFromUpstream", func(t *testing.T) { + gock.New("http://rpc1.localhost"). + Post("/"). + Times(1). + Reply(500). + BodyString(`{"error":{"code":-39999,"message":"my funky error"}}`) + + statusCode, body := sendRequest(`{"jsonrpc":"2.0","method":"eth_getBlockNumber","params":[],"id":1}`) + + assert.Equal(t, http.StatusInternalServerError, statusCode) + assert.Contains(t, body, "-32603") + assert.Contains(t, body, "my funky error") + + assert.True(t, gock.IsDone(), "All mocks should have been called") + }) +} diff --git a/erpc/networks.go b/erpc/networks.go index 65db72fc..e4c23405 100644 --- a/erpc/networks.go +++ b/erpc/networks.go @@ -65,21 +65,18 @@ func (n *Network) Architecture() common.NetworkArchitecture { func (n *Network) Forward(ctx context.Context, req *common.NormalizedRequest) (*common.NormalizedResponse, error) { startTime := time.Now() - n.Logger.Debug().Object("req", req).Msgf("forwarding request") + n.Logger.Trace().Object("req", req).Msgf("forwarding request for network") req.SetNetwork(n) - method, err := req.Method() - if err != nil { - return nil, err - } - lg := n.Logger.With().Str("method", method).Logger() + method, _ := req.Method() + lg := n.Logger.With().Str("method", method).Str("id", req.Id()).Str("ptr", fmt.Sprintf("%p", req)).Logger() // 1) In-flight multiplexing mlxHash, _ := req.CacheHash() n.inFlightMutex.Lock() if inf, exists := n.inFlightRequests[mlxHash]; exists { n.inFlightMutex.Unlock() - lg.Debug().Object("req", req).Msgf("found similar in-flight request, waiting for result") + lg.Debug().Msgf("found similar in-flight request, waiting for result") health.MetricNetworkMultiplexedRequests.WithLabelValues(n.ProjectId, n.NetworkId, method).Inc() inf.mu.RLock() @@ -119,9 +116,9 @@ func (n *Network) Forward(ctx context.Context, req *common.NormalizedRequest) (* if err != nil { lg.Debug().Err(err).Msgf("could not find response in cache") health.MetricNetworkCacheMisses.WithLabelValues(n.ProjectId, n.NetworkId, method).Inc() - } else if resp != nil { + } else if resp != nil && !resp.IsObjectNull() && !resp.IsResultEmptyish() { resp.SetFromCache(true) - lg.Info().Object("req", req).Err(err).Msgf("response served from cache") + lg.Info().Msgf("response served from cache") health.MetricNetworkCacheHits.WithLabelValues(n.ProjectId, n.NetworkId, method).Inc() inf.Close(resp, err) return resp, err @@ -138,29 +135,30 @@ func (n *Network) Forward(ctx context.Context, req *common.NormalizedRequest) (* tryForward := func( u *upstream.Upstream, ctx context.Context, - ) (resp *common.NormalizedResponse, skipped bool, err error) { - lg := u.Logger.With().Str("upstreamId", u.Config().Id).Logger() + ) (resp *common.NormalizedResponse, err error) { + lg := u.Logger.With().Str("upstreamId", u.Config().Id).Str("method", method).Str("id", req.Id()).Str("ptr", fmt.Sprintf("%p", req)).Logger() + + lg.Debug().Msgf("trying to forward request to upstream") - lg.Debug().Str("method", method).Str("rid", fmt.Sprintf("%p", req)).Msgf("trying to forward request to upstream") + resp, err = u.Forward(ctx, req) - resp, skipped, err = u.Forward(ctx, req) if !common.IsNull(err) { // If upstream complains that the method is not supported let's dynamically add it ignoreMethods config if common.HasErrorCode(err, common.ErrCodeEndpointUnsupported) { - lg.Warn().Err(err).Str("method", method).Msgf("upstream does not support method, dynamically adding to ignoreMethods") + lg.Warn().Err(err).Msgf("upstream does not support method, dynamically adding to ignoreMethods") u.IgnoreMethod(method) } - return nil, skipped, err + return nil, err } - if skipped { - lg.Debug().Err(err).Msgf("skipped forwarding request to upstream") + if err != nil { + lg.Debug().Err(err).Msgf("finished forwarding request to upstream with error") } else { - lg.Info().Msgf("finished forwarding request to upstream") + lg.Info().Msgf("finished forwarding request to upstream with success") } - return resp, skipped, err + return resp, err } upsList, err := n.upstreamsRegistry.GetSortedUpstreams(n.NetworkId, method) @@ -170,15 +168,16 @@ func (n *Network) Forward(ctx context.Context, req *common.NormalizedRequest) (* } var execution failsafe.Execution[*common.NormalizedResponse] - var errorsByUpstream = []error{} - var errorsMutex sync.Mutex - imtx := sync.Mutex{} + var errorsByUpstream = map[string]error{} + + coordMu := sync.Mutex{} i := 0 resp, execErr := n.failsafeExecutor. WithContext(ctx). GetWithExecution(func(exec failsafe.Execution[*common.NormalizedResponse]) (*common.NormalizedResponse, error) { + coordMu.Lock() execution = exec - isHedged := exec.Hedges() > 0 + coordMu.Unlock() // We should try all upstreams at least once, but using "i" we make sure // across different executions of the failsafe we pick up next upstream vs retrying the same upstream. @@ -186,47 +185,54 @@ func (n *Network) Forward(ctx context.Context, req *common.NormalizedRequest) (* // Upstream-level retry is handled by the upstream itself (and its own failsafe policies). ln := len(upsList) for count := 0; count < ln; count++ { - imtx.Lock() - n.upstreamsRegistry.RLockUpstreams() + coordMu.Lock() u := upsList[i] - n.upstreamsRegistry.RUnlockUpstreams() i++ if i >= ln { i = 0 } - if isHedged { - lg.Debug(). - Str("upstreamId", u.Config().Id). - Int("index", i). - Msgf("executing hedged forward to upstream") - } else { - lg.Debug(). - Str("upstreamId", u.Config().Id). - Int("index", i). - Msgf("executing forward to upstream") + upsId := u.Config().Id + if prevErr, exists := errorsByUpstream[upsId]; exists { + if !common.IsRetryableTowardsUpstream(prevErr) { + // Do not even try this upstream if we already know + // the previous error was not retryable. e.g. Billing issues + coordMu.Unlock() + continue + } } - imtx.Unlock() + coordMu.Unlock() - resp, skipped, err := n.processResponse( + resp, err := n.normalizeResponse( tryForward(u, exec.Context()), ) + isClientErr := err != nil && common.HasErrorCode(err, common.ErrCodeEndpointClientSideException) + isHedged := exec.Hedges() > 0 + if isHedged && err != nil && errors.Is(err, context.Canceled) { - lg.Debug().Err(err).Msgf("discarding hedged request to upstream %s: %v", u.Config().Id, skipped) - return nil, err + lg.Debug().Err(err).Msgf("discarding hedged request to upstream %s", u.Config().Id) + return nil, common.NewErrUpstreamHedgeCancelled(u.Config().Id) } - if isHedged { - lg.Debug().Err(err).Msgf("forwarded hedged request to upstream %s skipped: %v err: %v", u.Config().Id, skipped, err) + lg.Debug().Msgf("forwarded hedged request to upstream %s", u.Config().Id) } else { - lg.Debug().Err(err).Msgf("forwarded request to upstream %s skipped: %v err: %v", u.Config().Id, skipped, err) + lg.Debug().Msgf("forwarded request to upstream %s", u.Config().Id) } + if err != nil { - errorsMutex.Lock() - errorsByUpstream = append(errorsByUpstream, err) - errorsMutex.Unlock() + coordMu.Lock() + if ser, ok := err.(common.StandardError); ok { + ber := ser.Base() + if ber.Details == nil { + ber.Details = map[string]interface{}{} + } + ber.Details["timestampMs"] = time.Now().UnixMilli() + } + errorsByUpstream[upsId] = err + coordMu.Unlock() } - if !skipped { + + if err == nil || isClientErr { if resp != nil { resp.SetUpstream(u) } @@ -249,7 +255,7 @@ func (n *Network) Forward(ctx context.Context, req *common.NormalizedRequest) (* }) if execErr != nil { - err := upstream.TranslateFailsafeError(execErr) + err := upstream.TranslateFailsafeError("", method, execErr) // If error is due to empty response be generous and accept it, // because this means after many retries still no data is available. if common.HasErrorCode(err, common.ErrCodeFailsafeRetryExceeded) { @@ -303,6 +309,23 @@ func (n *Network) Forward(ctx context.Context, req *common.NormalizedRequest) (* return resp, nil } +// func (n *Network) shouldJumpUpstream(err error) bool { +// if err == nil { +// return false +// } + +// // When it is not retryable towards upstream, continue on other upstreams, +// // and skip on network-level retries. +// if !common.IsRetryableTowardsUpstream(err) { +// return true +// } + +// // By default skip all errors. +// // If there are errors that does not make sense to continue forwarding to other upstreams, +// // it must be added to above conditions. +// return false +// } + func (n *Network) EvmIsBlockFinalized(blockNumber int64) (bool, error) { if n.evmBlockTracker == nil { return false, nil @@ -369,23 +392,17 @@ func (n *Network) EvmChainId() (int64, error) { return n.Config.Evm.ChainId, nil } -func (n *Network) processResponse(resp *common.NormalizedResponse, skipped bool, err error) (*common.NormalizedResponse, bool, error) { +func (n *Network) normalizeResponse(resp *common.NormalizedResponse, err error) (*common.NormalizedResponse, error) { if err == nil { - return resp, skipped, nil + return resp, nil } switch n.Architecture() { case common.ArchitectureEvm: - if common.HasErrorCode(err, common.ErrCodeFailsafeCircuitBreakerOpen) { - // Explicitly skip when CB is open to not count the failed request towards network "retries" - return resp, true, err - } else if common.HasErrorCode(err, common.ErrCodeEndpointUnsupported) || common.HasErrorCode(err, common.ErrCodeUpstreamRequestSkipped) { - // Explicitly skip when method is not supported so it is not counted towards retries - return resp, true, err - } else if common.HasErrorCode(err, common.ErrCodeJsonRpcExceptionInternal) { - return resp, skipped, err + if common.HasErrorCode(err, common.ErrCodeJsonRpcExceptionInternal) { + return resp, err } else if common.HasErrorCode(err, common.ErrCodeJsonRpcRequestUnmarshal) { - return resp, skipped, common.NewErrJsonRpcExceptionInternal( + return resp, common.NewErrJsonRpcExceptionInternal( 0, common.JsonRpcErrorParseException, "failed to parse json-rpc request", @@ -394,7 +411,7 @@ func (n *Network) processResponse(resp *common.NormalizedResponse, skipped bool, ) } - return resp, skipped, common.NewErrJsonRpcExceptionInternal( + return resp, common.NewErrJsonRpcExceptionInternal( 0, common.JsonRpcErrorServerSideException, fmt.Sprintf("failed request on evm network %s", n.NetworkId), @@ -402,7 +419,7 @@ func (n *Network) processResponse(resp *common.NormalizedResponse, skipped bool, nil, ) default: - return resp, skipped, err + return resp, err } } diff --git a/erpc/networks_test.go b/erpc/networks_test.go index 9c922542..8f504063 100644 --- a/erpc/networks_test.go +++ b/erpc/networks_test.go @@ -29,7 +29,7 @@ import ( var TRUE = true func init() { - log.Logger = log.Level(zerolog.ErrorLevel).Output(zerolog.ConsoleWriter{Out: os.Stderr}) + log.Logger = log.Level(zerolog.TraceLevel).Output(zerolog.ConsoleWriter{Out: os.Stderr}) } func TestNetwork_Forward(t *testing.T) { @@ -170,10 +170,8 @@ func TestNetwork_Forward(t *testing.T) { } }) - t.Run("ForwardRetryFailuresWithoutSuccessNoCode", func(t *testing.T) { + t.Run("ForwardUpstreamRetryIntermittentFailuresWithoutSuccessAndNoErrCode", func(t *testing.T) { defer gock.Off() - defer gock.Clean() - defer gock.CleanUnmatchedRequest() var requestBytes = json.RawMessage(`{"jsonrpc":"2.0","id":1,"method":"eth_traceTransaction","params":["0x1273c18",false]}`) @@ -208,6 +206,7 @@ func TestNetwork_Forward(t *testing.T) { Evm: &common.EvmUpstreamConfig{ ChainId: 123, }, + Failsafe: fsCfg, } upr := upstream.NewUpstreamsRegistry( &log.Logger, @@ -248,7 +247,6 @@ func TestNetwork_Forward(t *testing.T) { Evm: &common.EvmNetworkConfig{ ChainId: 123, }, - Failsafe: fsCfg, }, rlr, upr, @@ -353,7 +351,6 @@ func TestNetwork_Forward(t *testing.T) { Evm: &common.EvmNetworkConfig{ ChainId: 123, }, - Failsafe: fsCfg, }, rlr, upr, @@ -381,6 +378,294 @@ func TestNetwork_Forward(t *testing.T) { } }) + t.Run("ForwardSkipsNonRetryableFailuresFromUpstreams", func(t *testing.T) { + defer gock.Off() + + var requestBytes = json.RawMessage(`{"jsonrpc":"2.0","id":1,"method":"eth_traceTransaction","params":["0x1273c18",false]}`) + + gock.New("http://rpc1.localhost"). + Times(1). + Post(""). + Reply(401). + JSON(json.RawMessage(`{"error":{"code":-32016,"message":"unauthorized rpc1"}}`)) + + gock.New("http://rpc2.localhost"). + Times(2). + Post(""). + Reply(503). + JSON(json.RawMessage(`{"error":"random rpc2 unavailable"}`)) + + gock.New("http://rpc2.localhost"). + Times(1). + Post(""). + Reply(200). + JSON(json.RawMessage(`{"result":"0x1234567"}`)) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + clr := upstream.NewClientRegistry(&log.Logger) + + upsFsCfg := &common.FailsafeConfig{ + Retry: &common.RetryPolicyConfig{ + MaxAttempts: 2, + }, + } + ntwFsCfg := &common.FailsafeConfig{ + Retry: &common.RetryPolicyConfig{ + MaxAttempts: 2, + }, + } + rlr, err := upstream.NewRateLimitersRegistry(&common.RateLimiterConfig{ + Budgets: []*common.RateLimitBudgetConfig{}, + }, &log.Logger) + if err != nil { + t.Fatal(err) + } + vndr := vendors.NewVendorsRegistry() + mt := health.NewTracker("prjA", 2*time.Second) + up1 := &common.UpstreamConfig{ + Type: common.UpstreamTypeEvm, + Id: "rpc1", + Endpoint: "http://rpc1.localhost", + Evm: &common.EvmUpstreamConfig{ + ChainId: 123, + }, + Failsafe: upsFsCfg, + } + up2 := &common.UpstreamConfig{ + Type: common.UpstreamTypeEvm, + Id: "rpc2", + Endpoint: "http://rpc2.localhost", + Evm: &common.EvmUpstreamConfig{ + ChainId: 123, + }, + Failsafe: upsFsCfg, + } + upr := upstream.NewUpstreamsRegistry( + &log.Logger, + "prjA", + []*common.UpstreamConfig{ + up1, + up2, + }, + rlr, + vndr, mt, 1*time.Second, + ) + err = upr.Bootstrap(ctx) + if err != nil { + t.Fatal(err) + } + err = upr.PrepareUpstreamsForNetwork(util.EvmNetworkId(123)) + if err != nil { + t.Fatal(err) + } + pup1, err := upr.NewUpstream( + "prjA", + up1, + &log.Logger, + mt, + ) + if err != nil { + t.Fatal(err) + } + cl, err := clr.GetOrCreateClient(pup1) + if err != nil { + t.Fatal(err) + } + pup1.Client = cl + + pup2, err := upr.NewUpstream( + "prjA", + up2, + &log.Logger, + mt, + ) + if err != nil { + t.Fatal(err) + } + cl2, err := clr.GetOrCreateClient(pup2) + if err != nil { + t.Fatal(err) + } + pup2.Client = cl2 + + ntw, err := NewNetwork( + &log.Logger, + "prjA", + &common.NetworkConfig{ + Architecture: common.ArchitectureEvm, + Evm: &common.EvmNetworkConfig{ + ChainId: 123, + }, + Failsafe: ntwFsCfg, + }, + rlr, + upr, + health.NewTracker("prjA", 2*time.Second), + ) + if err != nil { + t.Fatal(err) + } + fakeReq := common.NewNormalizedRequest(requestBytes) + _, err = ntw.Forward(ctx, fakeReq) + + if len(gock.Pending()) > 0 { + t.Errorf("Expected all mocks to be consumed, got %v left", len(gock.Pending())) + for _, pending := range gock.Pending() { + t.Errorf("Pending mock: %v", pending) + } + } + + if err != nil { + t.Errorf("Expected an nil, got error %v", err) + } + }) + + t.Run("ForwardNotSkipsRetryableFailuresFromUpstreams", func(t *testing.T) { + defer gock.Off() + + var requestBytes = json.RawMessage(`{"jsonrpc":"2.0","id":1,"method":"eth_traceTransaction","params":["0x1273c18",false]}`) + + gock.New("http://rpc1.localhost"). + Times(3). + Post(""). + Reply(503). + JSON(json.RawMessage(`{"error":"random rpc1 unavailable"}`)) + + gock.New("http://rpc2.localhost"). + Times(3). + Post(""). + Reply(503). + JSON(json.RawMessage(`{"error":"random rpc2 unavailable"}`)) + + gock.New("http://rpc2.localhost"). + Times(1). + Post(""). + Reply(200). + JSON(json.RawMessage(`{"result":"0x1234567"}`)) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + clr := upstream.NewClientRegistry(&log.Logger) + + upsFsCfg := &common.FailsafeConfig{ + Retry: &common.RetryPolicyConfig{ + MaxAttempts: 3, + }, + } + ntwFsCfg := &common.FailsafeConfig{ + Retry: &common.RetryPolicyConfig{ + MaxAttempts: 2, + }, + } + rlr, err := upstream.NewRateLimitersRegistry(&common.RateLimiterConfig{ + Budgets: []*common.RateLimitBudgetConfig{}, + }, &log.Logger) + if err != nil { + t.Fatal(err) + } + vndr := vendors.NewVendorsRegistry() + mt := health.NewTracker("prjA", 2*time.Second) + up1 := &common.UpstreamConfig{ + Type: common.UpstreamTypeEvm, + Id: "rpc1", + Endpoint: "http://rpc1.localhost", + Evm: &common.EvmUpstreamConfig{ + ChainId: 123, + }, + Failsafe: upsFsCfg, + } + up2 := &common.UpstreamConfig{ + Type: common.UpstreamTypeEvm, + Id: "rpc2", + Endpoint: "http://rpc2.localhost", + Evm: &common.EvmUpstreamConfig{ + ChainId: 123, + }, + Failsafe: upsFsCfg, + } + upr := upstream.NewUpstreamsRegistry( + &log.Logger, + "prjA", + []*common.UpstreamConfig{ + up1, + up2, + }, + rlr, + vndr, mt, 1*time.Second, + ) + err = upr.Bootstrap(ctx) + if err != nil { + t.Fatal(err) + } + err = upr.PrepareUpstreamsForNetwork(util.EvmNetworkId(123)) + if err != nil { + t.Fatal(err) + } + pup1, err := upr.NewUpstream( + "prjA", + up1, + &log.Logger, + mt, + ) + if err != nil { + t.Fatal(err) + } + cl, err := clr.GetOrCreateClient(pup1) + if err != nil { + t.Fatal(err) + } + pup1.Client = cl + + pup2, err := upr.NewUpstream( + "prjA", + up2, + &log.Logger, + mt, + ) + if err != nil { + t.Fatal(err) + } + cl2, err := clr.GetOrCreateClient(pup2) + if err != nil { + t.Fatal(err) + } + pup2.Client = cl2 + + ntw, err := NewNetwork( + &log.Logger, + "prjA", + &common.NetworkConfig{ + Architecture: common.ArchitectureEvm, + Evm: &common.EvmNetworkConfig{ + ChainId: 123, + }, + Failsafe: ntwFsCfg, + }, + rlr, + upr, + health.NewTracker("prjA", 2*time.Second), + ) + if err != nil { + t.Fatal(err) + } + fakeReq := common.NewNormalizedRequest(requestBytes) + _, err = ntw.Forward(ctx, fakeReq) + + if len(gock.Pending()) > 0 { + t.Errorf("Expected all mocks to be consumed, got %v left", len(gock.Pending())) + for _, pending := range gock.Pending() { + t.Errorf("Pending mock: %s => status %d, body %s", pending.Request().URLStruct, pending.Response().StatusCode, string(pending.Response().BodyBuffer)) + } + } + + if err != nil { + t.Errorf("Expected an nil, got error %v", err) + } + }) + t.Run("NotRetryWhenBlockIsFinalized", func(t *testing.T) { // Clean up any gock mocks after the test runs defer gock.Off() @@ -2703,7 +2988,7 @@ func TestNetwork_Forward(t *testing.T) { gock.New("http://rpc1.localhost"). Post(""). Reply(500). - JSON(json.RawMessage(`{"error":{"code":-39999,"message":"Internal error"}}`)) + JSON(json.RawMessage(`{"error":{"code":-39999,"message":"my funky random error"}}`)) ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -2783,8 +3068,17 @@ func TestNetwork_Forward(t *testing.T) { t.Fatalf("Expected non-nil error, got nil") } - if !strings.Contains(common.ErrorSummary(err), "-32603") { - t.Fatalf("Expected error code -32603, got %v", common.ErrorSummary(err)) + ser, ok := err.(common.StandardError) + if !ok { + t.Fatalf("Expected error to be StandardError, got %T", err) + } + sum := common.ErrorSummary(ser.GetCause()) + + if !strings.Contains(sum, "ErrEndpointServerSideException") { + t.Fatalf("Expected error code ErrEndpointServerSideException, got %v", sum) + } + if !strings.Contains(sum, "my funky random error") { + t.Fatalf("Expected error text 'my funky random error', but was missing %v", sum) } }) @@ -3334,7 +3628,7 @@ func TestNetwork_Forward(t *testing.T) { } }) - t.Run("ForwardLlamaRPCEndpointRateLimitResponse", func(t *testing.T) { + t.Run("ForwardLlamaRPCEndpointRateLimitResponseSingle", func(t *testing.T) { defer gock.Off() defer gock.Clean() defer gock.CleanUnmatchedRequest() @@ -3360,7 +3654,6 @@ func TestNetwork_Forward(t *testing.T) { } vndr := vendors.NewVendorsRegistry() mt := health.NewTracker("prjA", 2*time.Second) - FALSE := false up1 := &common.UpstreamConfig{ Type: common.UpstreamTypeEvm, Id: "rpc1", @@ -3369,7 +3662,104 @@ func TestNetwork_Forward(t *testing.T) { ChainId: 123, }, JsonRpc: &common.JsonRpcUpstreamConfig{ - SupportsBatch: &FALSE, + SupportsBatch: &common.FALSE, + }, + VendorName: "llama", + } + upr := upstream.NewUpstreamsRegistry( + &log.Logger, + "prjA", + []*common.UpstreamConfig{up1}, + rlr, + vndr, mt, 1*time.Second, + ) + err = upr.Bootstrap(ctx) + if err != nil { + t.Fatalf("Failed to bootstrap upstreams registry: %v", err) + } + err = upr.PrepareUpstreamsForNetwork(util.EvmNetworkId(123)) + if err != nil { + t.Fatalf("Failed to prepare upstreams for network: %v", err) + } + + ntw, err := NewNetwork( + &log.Logger, + "prjA", + &common.NetworkConfig{ + Architecture: common.ArchitectureEvm, + Evm: &common.EvmNetworkConfig{ + ChainId: 123, + }, + Failsafe: fsCfg, + }, + rlr, + upr, + mt, + ) + if err != nil { + t.Fatal(err) + } + + fakeReq := common.NewNormalizedRequest(requestBytes) + resp, err := ntw.Forward(ctx, fakeReq) + + if len(gock.Pending()) > 0 { + t.Errorf("Expected all mocks to be consumed, got %v left", len(gock.Pending())) + for _, pending := range gock.Pending() { + t.Errorf("Pending mock: %v", pending) + } + } + + if err == nil { + t.Errorf("Expected non-nil error, got nil") + return + } + + if resp != nil { + t.Errorf("Expected nil response, got %v", resp) + return + } + + if !common.HasErrorCode(err, common.ErrCodeEndpointCapacityExceeded) { + t.Errorf("Expected error code %v, got %+v", common.ErrCodeEndpointCapacityExceeded, err) + } + }) + + t.Run("ForwardLlamaRPCEndpointRateLimitResponseBatch", func(t *testing.T) { + defer gock.Off() + defer gock.Clean() + defer gock.CleanUnmatchedRequest() + + var requestBytes = json.RawMessage(`{"jsonrpc":"2.0","id":1,"method":"eth_getBlockByNumber","params":["0x1273c18",false]}`) + + gock.New("http://rpc1.localhost"). + Post(""). + Reply(200). + BodyString(`error code: 1015`) + + log.Logger.Info().Msgf("Mocks registered: %d", len(gock.Pending())) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + fsCfg := &common.FailsafeConfig{} + rlr, err := upstream.NewRateLimitersRegistry(&common.RateLimiterConfig{ + Budgets: []*common.RateLimitBudgetConfig{}, + }, &log.Logger) + if err != nil { + t.Fatal(err) + } + vndr := vendors.NewVendorsRegistry() + mt := health.NewTracker("prjA", 2*time.Second) + up1 := &common.UpstreamConfig{ + Type: common.UpstreamTypeEvm, + Id: "rpc1", + Endpoint: "http://rpc1.localhost", + Evm: &common.EvmUpstreamConfig{ + ChainId: 123, + }, + JsonRpc: &common.JsonRpcUpstreamConfig{ + SupportsBatch: &common.TRUE, }, VendorName: "llama", } @@ -3482,7 +3872,7 @@ func TestNetwork_Forward(t *testing.T) { rateLimitersRegistry, ) - network, err := networksRegistry.RegisterNetwork( + ntw, err := networksRegistry.RegisterNetwork( &logger, &common.ProjectConfig{Id: projectID}, &common.NetworkConfig{ @@ -3492,7 +3882,7 @@ func TestNetwork_Forward(t *testing.T) { ) assert.NoError(t, err) - simulateRequests := func(method string, upstreamId string, latency time.Duration) { + mockRequests := func(method string, upstreamId string, latency time.Duration) { gock.New("http://" + upstreamId + ".localhost"). Persist(). Post("/"). @@ -3507,15 +3897,15 @@ func TestNetwork_Forward(t *testing.T) { } // Upstream A is faster for eth_call, Upstream B is faster for eth_traceTransaction, Upstream C is faster for eth_getLogs - simulateRequests("eth_getLogs", "upstream-a", 200*time.Millisecond) - simulateRequests("eth_getLogs", "upstream-b", 100*time.Millisecond) - simulateRequests("eth_getLogs", "upstream-c", 50*time.Millisecond) - simulateRequests("eth_traceTransaction", "upstream-a", 100*time.Millisecond) - simulateRequests("eth_traceTransaction", "upstream-b", 50*time.Millisecond) - simulateRequests("eth_traceTransaction", "upstream-c", 200*time.Millisecond) - simulateRequests("eth_call", "upstream-a", 50*time.Millisecond) - simulateRequests("eth_call", "upstream-b", 200*time.Millisecond) - simulateRequests("eth_call", "upstream-c", 100*time.Millisecond) + mockRequests("eth_getLogs", "upstream-a", 200*time.Millisecond) + mockRequests("eth_getLogs", "upstream-b", 100*time.Millisecond) + mockRequests("eth_getLogs", "upstream-c", 50*time.Millisecond) + mockRequests("eth_traceTransaction", "upstream-a", 100*time.Millisecond) + mockRequests("eth_traceTransaction", "upstream-b", 50*time.Millisecond) + mockRequests("eth_traceTransaction", "upstream-c", 200*time.Millisecond) + mockRequests("eth_call", "upstream-a", 50*time.Millisecond) + mockRequests("eth_call", "upstream-b", 200*time.Millisecond) + mockRequests("eth_call", "upstream-c", 100*time.Millisecond) allMethods := []string{"eth_getLogs", "eth_traceTransaction", "eth_call"} @@ -3533,10 +3923,19 @@ func TestNetwork_Forward(t *testing.T) { defer wg.Done() upstreamsRegistry.RefreshUpstreamNetworkMethodScores() req := common.NewNormalizedRequest([]byte(fmt.Sprintf(`{"jsonrpc":"2.0","method":"%s","params":[],"id":1}`, method))) - _, err := network.Forward(ctx, req) + req.SetNetwork(ntw) + oups, err := upstreamsRegistry.GetSortedUpstreams(networkID, method) + upstreamsRegistry.RLockUpstreams() + ups := []*upstream.Upstream{} + ups = append(ups, oups...) + upstreamsRegistry.RUnlockUpstreams() assert.NoError(t, err) + for _, up := range ups { + _, err = up.Forward(ctx, req) + assert.NoError(t, err) + } }(method) - time.Sleep(10 * time.Millisecond) + // time.Sleep(1 * time.Millisecond) } } wg.Wait() diff --git a/erpc/projects.go b/erpc/projects.go index 81e04014..735477cc 100644 --- a/erpc/projects.go +++ b/erpc/projects.go @@ -90,19 +90,20 @@ func (p *PreparedProject) Forward(ctx context.Context, networkId string, nq *com defer timer.ObserveDuration() health.MetricNetworkRequestsReceived.WithLabelValues(p.Config.Id, network.NetworkId, method).Inc() - p.Logger.Debug().Str("method", method).Msgf("forwarding request to network") + lg := p.Logger.With().Str("method", method).Str("id", nq.Id()).Str("ptr", fmt.Sprintf("%p", nq)).Logger() + lg.Debug().Msgf("forwarding request to network") resp, err := network.Forward(ctx, nq) if err == nil || common.HasErrorCode(err, common.ErrCodeEndpointClientSideException) { if err != nil { - p.Logger.Info().Err(err).Msgf("finished forwarding request for network with some client-side exception") + lg.Info().Err(err).Msgf("finished forwarding request for network with some client-side exception") } else { - p.Logger.Info().Msgf("successfully forward request for network") + lg.Info().Msgf("successfully forwarded request for network") } health.MetricNetworkSuccessfulRequests.WithLabelValues(p.Config.Id, network.NetworkId, method).Inc() return resp, err } else { - p.Logger.Warn().Err(err).Str("method", method).Msgf("failed to forward request for network") + lg.Warn().Err(err).Msgf("failed to forward request for network") health.MetricNetworkFailedRequests.WithLabelValues(network.ProjectId, network.NetworkId, method, common.ErrorSummary(err)).Inc() } diff --git a/health/metrics.go b/health/metrics.go index 0fad7534..af89b9a1 100644 --- a/health/metrics.go +++ b/health/metrics.go @@ -17,11 +17,11 @@ var ( Name: "upstream_request_duration_seconds", Help: "Duration of actual requests towards upstreams.", Buckets: []float64{ - 0.1, // 100ms - 0.5, // 500ms - 1, // 1s - 5, // 5s - 10, // 10s + 0.1, // 100ms + 0.5, // 500ms + 1, // 1s + 5, // 5s + 10, // 10s }, }, []string{"project", "network", "upstream", "category"}) @@ -108,11 +108,11 @@ var ( Name: "network_request_duration_seconds", Help: "Duration of requests for a network.", Buckets: []float64{ - 0.1, // 100ms - 0.5, // 500ms - 1, // 1s - 5, // 5s - 10, // 10s + 0.1, // 100ms + 0.5, // 500ms + 1, // 1s + 5, // 5s + 10, // 10s }, }, []string{"project", "network", "category"}) diff --git a/upstream/failsafe.go b/upstream/failsafe.go index 9338d1e0..bde81d38 100644 --- a/upstream/failsafe.go +++ b/upstream/failsafe.go @@ -1,6 +1,7 @@ package upstream import ( + "context" "errors" "fmt" "time" @@ -323,8 +324,9 @@ func createTimeoutPolicy(component string, cfg *common.TimeoutPolicyConfig) (fai return builder.Build(), nil } -func TranslateFailsafeError(execErr error) error { - var retryExceededErr *retrypolicy.ExceededError +func TranslateFailsafeError(upstreamId, method string, execErr error) error { + var err error + var retryExceededErr retrypolicy.ExceededError if errors.As(execErr, &retryExceededErr) { ler := retryExceededErr.LastError if common.IsNull(ler) { @@ -334,17 +336,35 @@ func TranslateFailsafeError(execErr error) error { } var translatedCause error if ler != nil { - translatedCause = TranslateFailsafeError(ler) + translatedCause = TranslateFailsafeError("", "", ler) } - return common.NewErrFailsafeRetryExceeded(translatedCause) + err = common.NewErrFailsafeRetryExceeded(translatedCause) + } else if errors.Is(execErr, timeout.ErrExceeded) || + errors.Is(execErr, context.DeadlineExceeded) { + err = common.NewErrFailsafeTimeoutExceeded(execErr) + } else if errors.Is(execErr, circuitbreaker.ErrOpen) { + err = common.NewErrFailsafeCircuitBreakerOpen(execErr) } - if errors.Is(execErr, timeout.ErrExceeded) { - return common.NewErrFailsafeTimeoutExceeded(execErr) - } - - if errors.Is(execErr, circuitbreaker.ErrOpen) { - return common.NewErrFailsafeCircuitBreakerOpen(execErr) + if err != nil { + if method != "" { + if ser, ok := execErr.(common.StandardError); ok { + be := ser.Base() + if be != nil { + if upstreamId != "" { + be.Details = map[string]interface{}{ + "upstreamId": upstreamId, + "method": method, + } + } else { + be.Details = map[string]interface{}{ + "method": method, + } + } + } + } + } + return err } return execErr diff --git a/upstream/http_json_json_client_test.go b/upstream/http_json_json_client_test.go index 9efdfde8..c9a6b6e3 100644 --- a/upstream/http_json_json_client_test.go +++ b/upstream/http_json_json_client_test.go @@ -370,7 +370,7 @@ func TestHttpJsonRpcClient_BatchRequests(t *testing.T) { gock.New("http://rpc1.localhost:8545"). Post("/"). Reply(200). - BodyString("Internal Server Error") + BodyString("my random something error") var wg sync.WaitGroup for i := 0; i < 3; i++ { @@ -380,7 +380,7 @@ func TestHttpJsonRpcClient_BatchRequests(t *testing.T) { req := common.NewNormalizedRequest([]byte(fmt.Sprintf(`{"jsonrpc":"2.0","id":%d,"method":"eth_blockNumber","params":[]}`, id))) _, err := client.SendRequest(context.Background(), req) assert.Error(t, err) - assert.Contains(t, err.Error(), "failed to parse upstream response") + assert.Contains(t, err.Error(), "my random something error") }(i + 1) } wg.Wait() diff --git a/upstream/http_json_rpc_client.go b/upstream/http_json_rpc_client.go index 3a0f485d..74b1d85f 100644 --- a/upstream/http_json_rpc_client.go +++ b/upstream/http_json_rpc_client.go @@ -86,8 +86,8 @@ func NewGenericHttpJsonRpcClient(logger *zerolog.Logger, pu *Upstream, parsedUrl client.httpClient = &http.Client{ Timeout: 60 * time.Second, Transport: &http.Transport{ - MaxIdleConns: 400, - MaxIdleConnsPerHost: 400, + MaxIdleConns: 1024, + MaxIdleConnsPerHost: 256, IdleConnTimeout: 90 * time.Second, }, } @@ -119,13 +119,9 @@ func (c *GenericHttpJsonRpcClient) SendRequest(ctx context.Context, req *common. startedAt := time.Now() jrReq, err := req.JsonRpcRequest() if err != nil { - m, _ := req.Method() return nil, common.NewErrUpstreamRequest( err, - c.upstream.ProjectId, - req.NetworkId(), c.upstream.Config().Id, - m, 0, 0, 0, 0, ) } @@ -213,13 +209,9 @@ func (c *GenericHttpJsonRpcClient) processBatch() { for _, req := range requests { jrReq, err := req.request.JsonRpcRequest() if err != nil { - m, _ := req.request.Method() req.err <- common.NewErrUpstreamRequest( err, - c.upstream.ProjectId, - req.request.NetworkId(), c.upstream.Config().Id, - m, 0, 0, 0, 0, ) continue @@ -322,43 +314,14 @@ func (c *GenericHttpJsonRpcClient) processBatchResponse(requests map[interface{} // Try parsing as single json-rpc object, // some providers return a single object on some errors even when request is batch. // this is a workaround to handle those cases. - var singleResp common.JsonRpcResponse - errsg := sonic.Unmarshal(respBody, &singleResp) - if errsg != nil { - for _, req := range requests { - req.err <- common.NewErrEndpointServerSideException( - fmt.Errorf("failed to parse upstream response, batch: %w single: %w", err, errsg), - map[string]interface{}{ - "statusCode": resp.StatusCode, - "headers": resp.Header, - "body": string(respBody), - }, - ) - } - } else { - if singleResp.JSONRPC != "" || singleResp.Error != nil { - // This case happens when upstreams a single valid json-rpc object as response - // to a batch request (e.g. BlastAPI). - for _, req := range requests { - nr := common.NewNormalizedResponse().WithRequest(req.request).WithBody(respBody) - err := c.normalizeJsonRpcError(resp, nr) - if err != nil { - req.err <- err - } else { - req.response <- nr - } - } + nr := common.NewNormalizedResponse().WithBody(respBody) + for _, br := range requests { + nr.WithRequest(br.request) + err := c.normalizeJsonRpcError(resp, nr) + if err != nil { + br.err <- err } else { - for _, req := range requests { - req.err <- common.NewErrEndpointServerSideException( - fmt.Errorf("failed to parse upstream response: %w", err), - map[string]interface{}{ - "statusCode": resp.StatusCode, - "headers": resp.Header, - "body": string(respBody), - }, - ) - } + br.response <- nr } } return @@ -398,13 +361,9 @@ func (c *GenericHttpJsonRpcClient) processBatchResponse(requests map[interface{} func (c *GenericHttpJsonRpcClient) sendSingleRequest(ctx context.Context, req *common.NormalizedRequest) (*common.NormalizedResponse, error) { jrReq, err := req.JsonRpcRequest() if err != nil { - m, _ := req.Method() return nil, common.NewErrUpstreamRequest( err, - c.upstream.ProjectId, - req.NetworkId(), c.upstream.Config().Id, - m, 0, 0, 0, 0, ) } @@ -544,12 +503,32 @@ func extractJsonRpcError(r *http.Response, nr *common.NormalizedResponse, jr *co details, ), ) + } else if strings.Contains(err.Message, "block range") || + strings.Contains(err.Message, "exceeds the range") || + strings.Contains(err.Message, "Max range") || + strings.Contains(err.Message, "limited to") || + strings.Contains(err.Message, "response size should not") || + strings.Contains(err.Message, "returned more than") || + strings.Contains(err.Message, "exceeds max results") || + strings.Contains(err.Message, "response too large") || + strings.Contains(err.Message, "query exceeds limit") || + strings.Contains(err.Message, "exceeds the range") || + strings.Contains(err.Message, "range limit exceeded") { + return common.NewErrEndpointEvmLargeRange( + common.NewErrJsonRpcExceptionInternal( + int(code), + common.JsonRpcErrorCapacityExceeded, + err.Message, + nil, + details, + ), + ) } else if r.StatusCode == 429 || - r.StatusCode == 408 || code == -32005 || strings.Contains(err.Message, "has exceeded") || strings.Contains(err.Message, "Exceeded the quota") || strings.Contains(err.Message, "under too much load") { + return common.NewErrEndpointCapacityExceeded( common.NewErrJsonRpcExceptionInternal( int(code), @@ -560,7 +539,8 @@ func extractJsonRpcError(r *http.Response, nr *common.NormalizedResponse, jr *co ), ) } else if strings.Contains(err.Message, "transaction not found") || - strings.Contains(err.Message, "cannot find transaction") { + strings.Contains(err.Message, "cannot find transaction") || + strings.Contains(err.Message, "after last accepted block") { return common.NewErrEndpointMissingData( common.NewErrJsonRpcExceptionInternal( int(code), diff --git a/upstream/registry.go b/upstream/registry.go index e4095f96..eedc7fcc 100644 --- a/upstream/registry.go +++ b/upstream/registry.go @@ -3,6 +3,7 @@ package upstream import ( "context" "fmt" + "math" "math/rand" "sort" "sync" @@ -29,13 +30,13 @@ type UpstreamsRegistry struct { // map of network -> method (or *) => upstreams sortedUpstreams map[string]map[string][]*Upstream // map of upstream -> network (or *) -> method (or *) => score - upstreamScores map[string]map[string]map[string]int + upstreamScores map[string]map[string]map[string]float64 } type UpstreamsHealth struct { - Upstreams []*Upstream `json:"upstreams"` - SortedUpstreams map[string]map[string][]string `json:"sortedUpstreams"` - UpstreamScores map[string]map[string]map[string]int `json:"upstreamScores"` + Upstreams []*Upstream `json:"upstreams"` + SortedUpstreams map[string]map[string][]string `json:"sortedUpstreams"` + UpstreamScores map[string]map[string]map[string]float64 `json:"upstreamScores"` } func NewUpstreamsRegistry( @@ -57,7 +58,7 @@ func NewUpstreamsRegistry( metricsTracker: mt, upsCfg: upsCfg, sortedUpstreams: make(map[string]map[string][]*Upstream), - upstreamScores: make(map[string]map[string]map[string]int), + upstreamScores: make(map[string]map[string]map[string]float64), upstreamsMu: &sync.RWMutex{}, } } @@ -119,18 +120,18 @@ func (u *UpstreamsRegistry) PrepareUpstreamsForNetwork(networkId string) error { // Initialize score for this or any network and any method for each upstream for _, ups := range upstreams { if _, ok := u.upstreamScores[ups.Config().Id]; !ok { - u.upstreamScores[ups.Config().Id] = make(map[string]map[string]int) + u.upstreamScores[ups.Config().Id] = make(map[string]map[string]float64) } if _, ok := u.upstreamScores[ups.Config().Id][networkId]; !ok { - u.upstreamScores[ups.Config().Id][networkId] = make(map[string]int) + u.upstreamScores[ups.Config().Id][networkId] = make(map[string]float64) } if _, ok := u.upstreamScores[ups.Config().Id][networkId]["*"]; !ok { u.upstreamScores[ups.Config().Id][networkId]["*"] = 0 } if _, ok := u.upstreamScores[ups.Config().Id]["*"]; !ok { - u.upstreamScores[ups.Config().Id]["*"] = make(map[string]int) + u.upstreamScores[ups.Config().Id]["*"] = make(map[string]float64) } if _, ok := u.upstreamScores[ups.Config().Id]["*"]["*"]; !ok { u.upstreamScores[ups.Config().Id]["*"]["*"] = 0 @@ -192,7 +193,7 @@ func (u *UpstreamsRegistry) RUnlockUpstreams() { func (u *UpstreamsRegistry) sortUpstreams(networkId, method string, upstreams []*Upstream) { // Calculate total score - totalScore := 0 + totalScore := 0.0 for _, ups := range upstreams { score := u.upstreamScores[ups.Config().Id][networkId][method] if score < 0 { @@ -294,7 +295,7 @@ func (u *UpstreamsRegistry) updateScoresAndSort(networkId, method string, upsLis for _, ups := range upsList { metrics := u.metricsTracker.GetUpstreamMethodMetrics(ups.Config().Id, networkId, method) metrics.Mutex.RLock() - u.logger.Debug(). + u.logger.Trace(). Str("projectId", u.prjId). Str("networkId", networkId). Str("method", method). @@ -315,19 +316,19 @@ func (u *UpstreamsRegistry) updateScoresAndSort(networkId, method string, upsLis metrics.Mutex.RUnlock() } - normP90Latencies := normalizeIntValues(p90Latencies, 100) - normErrorRates := normalizeIntValues(errorRates, 100) - normThrottledRates := normalizeIntValues(throttledRates, 100) - normTotalRequests := normalizeIntValues(totalRequests, 100) + normP90Latencies := normalizeValues(p90Latencies) + normErrorRates := normalizeValues(errorRates) + normThrottledRates := normalizeValues(throttledRates) + normTotalRequests := normalizeValues(totalRequests) for i, ups := range upsList { score := u.calculateScore(normTotalRequests[i], normP90Latencies[i], normErrorRates[i], normThrottledRates[i]) u.upstreamScores[ups.Config().Id][networkId][method] = score - u.logger.Debug().Str("projectId", u.prjId). + u.logger.Trace().Str("projectId", u.prjId). Str("upstream", ups.Config().Id). Str("networkId", networkId). Str("method", method). - Int("score", score). + Float64("score", score). Msgf("refreshed score") } @@ -342,39 +343,48 @@ func (u *UpstreamsRegistry) updateScoresAndSort(networkId, method string, upsLis u.logger.Trace().Str("projectId", u.prjId).Str("networkId", networkId).Str("method", method).Str("newSort", newSortStr).Msgf("sorted upstreams") } -func (u *UpstreamsRegistry) calculateScore(normTotalRequests, normP90Latency, normErrorRate, normThrottledRate int) int { - score := 0 +func (u *UpstreamsRegistry) calculateScore(normTotalRequests, normP90Latency, normErrorRate, normThrottledRate float64) float64 { + score := 0.0 // Higher score for lower total requests (to balance the load) - score += 100 - normTotalRequests + score += expCurve(1 - normTotalRequests) // Higher score for lower p90 latency - score += 100 - normP90Latency*4 + score += expCurve(1-normP90Latency) * 4 // Higher score for lower error rate - score += (100 - normErrorRate) * 8 + score += expCurve(1-normErrorRate) * 8 // Higher score for lower throttled rate - score += (100 - normThrottledRate) * 3 + score += expCurve(1-normThrottledRate) * 3 return score } -func normalizeIntValues(values []float64, scale int) []int { +func expCurve(x float64) float64 { + return math.Pow(x, 2.0) +} + +func normalizeValues(values []float64) []float64 { if len(values) == 0 { - return []int{} + return []float64{} } - var min float64 = 0 + var min float64 = values[0] max := values[0] + // Find min and max for _, value := range values { if value > max { max = value } + if value < min { + min = value + } } - normalized := make([]int, len(values)) + + normalized := make([]float64, len(values)) for i, value := range values { if (max - min) > 0 { - normalized[i] = int((value - min) / (max - min) * float64(scale)) + normalized[i] = (value - min) / (max - min) } else { normalized[i] = 0 } @@ -384,7 +394,7 @@ func normalizeIntValues(values []float64, scale int) []int { func (u *UpstreamsRegistry) GetUpstreamsHealth() (*UpstreamsHealth, error) { sortedUpstreams := make(map[string]map[string][]string) - upstreamScores := make(map[string]map[string]map[string]int) + upstreamScores := make(map[string]map[string]map[string]float64) for nw, methods := range u.sortedUpstreams { for method, ups := range methods { @@ -403,10 +413,10 @@ func (u *UpstreamsRegistry) GetUpstreamsHealth() (*UpstreamsHealth, error) { for nw, methods := range nwMethods { for method, score := range methods { if _, ok := upstreamScores[upsId]; !ok { - upstreamScores[upsId] = make(map[string]map[string]int) + upstreamScores[upsId] = make(map[string]map[string]float64) } if _, ok := upstreamScores[upsId][nw]; !ok { - upstreamScores[upsId][nw] = make(map[string]int) + upstreamScores[upsId][nw] = make(map[string]float64) } upstreamScores[upsId][nw][method] = score } diff --git a/upstream/registry_test.go b/upstream/registry_test.go index bd5f453f..e2193ca8 100644 --- a/upstream/registry_test.go +++ b/upstream/registry_test.go @@ -11,6 +11,7 @@ import ( "github.com/erpc/erpc/health" "github.com/erpc/erpc/vendors" "github.com/rs/zerolog" + "github.com/rs/zerolog/log" "github.com/stretchr/testify/assert" ) @@ -119,8 +120,8 @@ func TestUpstreamsRegistry(t *testing.T) { method := "eth_call" _, _ = registry.GetSortedUpstreams(networkID, method) - simulateRequests(metricsTracker, networkID, "upstream-a", method, 100, 0) - simulateRequests(metricsTracker, networkID, "upstream-b", method, 200, 0) + simulateRequests(metricsTracker, networkID, "upstream-a", method, 1000, 0) + simulateRequests(metricsTracker, networkID, "upstream-b", method, 20000, 0) simulateRequests(metricsTracker, networkID, "upstream-c", method, 10, 0) expectedOrder := []string{"upstream-c", "upstream-a", "upstream-b"} @@ -198,29 +199,165 @@ func TestUpstreamsRegistry(t *testing.T) { expectedOrderMethod2Phase2 := []string{"upstream-a", "upstream-c", "upstream-b"} checkUpstreamScoreOrder(t, registry, networkID, method2, expectedOrderMethod2Phase2) }) +} - t.Run("CorrectOrderForLatencyVsReliability", func(t *testing.T) { - largerWindowSize := 6 * time.Second - registry, metricsTracker := createTestRegistry(projectID, &logger, largerWindowSize) +func TestUpstreamScoring(t *testing.T) { + projectID := "test-project" + networkID := "evm:123" + method := "eth_call" - method := "eth_call" - _, _ = registry.GetSortedUpstreams(networkID, method) + type upstreamMetrics struct { + id string + latency float64 + successRate float64 + requestCount int + } - // Simulate upstream A: 500ms latency, 20% failure rate - simulateRequestsWithLatency(metricsTracker, networkID, "upstream-a", method, 80, 0.5) - simulateFailedRequests(metricsTracker, networkID, "upstream-a", method, 20) + scenarios := []struct { + name string + windowSize time.Duration + upstreamConfig []upstreamMetrics + expectedOrder []string + }{ + { + name: "MixedLatencyAndFailureRate", + windowSize: 6 * time.Second, + upstreamConfig: []upstreamMetrics{ + {"upstream-a", 0.5, 0.8, 100}, + {"upstream-b", 1.0, 0.99, 100}, + {"upstream-c", 0.75, 0.9, 100}, + }, + expectedOrder: []string{"upstream-b", "upstream-a", "upstream-c"}, + }, + { + name: "ExtremeFailureRate", + windowSize: 6 * time.Second, + upstreamConfig: []upstreamMetrics{ + {"upstream-a", 1, 0.05, 100}, + {"upstream-b", 1, 0.1, 100}, + {"upstream-c", 1, 0.01, 100}, + }, + expectedOrder: []string{"upstream-b", "upstream-a", "upstream-c"}, + }, + } - // Simulate upstream B: 1000ms latency, 1% failure rate - simulateRequestsWithLatency(metricsTracker, networkID, "upstream-b", method, 99, 1.0) - simulateFailedRequests(metricsTracker, networkID, "upstream-b", method, 1) + for _, scenario := range scenarios { + t.Run(scenario.name, func(t *testing.T) { + registry, metricsTracker := createTestRegistry(projectID, &log.Logger, scenario.windowSize) + _, _ = registry.GetSortedUpstreams(networkID, method) - // Simulate upstream C: 750ms latency, 10% failure rate - simulateRequestsWithLatency(metricsTracker, networkID, "upstream-c", method, 90, 0.75) - simulateFailedRequests(metricsTracker, networkID, "upstream-c", method, 10) + for _, upstream := range scenario.upstreamConfig { + successfulRequests := int(float64(upstream.requestCount) * upstream.successRate) + failedRequests := upstream.requestCount - successfulRequests - expectedOrder := []string{"upstream-b", "upstream-c", "upstream-a"} - checkUpstreamScoreOrder(t, registry, networkID, method, expectedOrder) - }) + simulateRequestsWithLatency(metricsTracker, networkID, upstream.id, method, successfulRequests, upstream.latency) + simulateFailedRequests(metricsTracker, networkID, upstream.id, method, failedRequests) + } + + checkUpstreamScoreOrder(t, registry, networkID, method, scenario.expectedOrder) + }) + } +} + +func TestCalculateScoreDynamicScenarios(t *testing.T) { + registry := &UpstreamsRegistry{ + scoreRefreshInterval: time.Second, + logger: &log.Logger, + } + + type upstreamMetrics struct { + totalRequests float64 + p90Latency float64 + errorRate float64 + throttledRate float64 + } + + type percentRange struct { + min float64 + max float64 + } + + type testScenario struct { + name string + upstreams []upstreamMetrics + expectedPercents []percentRange + } + + scenarios := []testScenario{ + { + name: "Two upstreams with significant difference", + upstreams: []upstreamMetrics{ + {1, 0.1, 0.01, 0.02}, + {0.8, 0.8, 0.4, 0.1}, + }, + expectedPercents: []percentRange{ + {0.65, 0.75}, + {0.25, 0.35}, + }, + }, + { + name: "Three upstreams with varying performance", + upstreams: []upstreamMetrics{ + {1, 0.2, 0.02, 0.01}, + {0.7, 0.5, 0.1, 0.05}, + {0.3, 1.0, 0.3, 0.2}, + }, + expectedPercents: []percentRange{ + {0.40, 0.55}, + {0.30, 0.40}, + {0.10, 0.30}, + }, + }, + { + name: "Four upstreams with similar performance", + upstreams: []upstreamMetrics{ + {0.9, 0.3, 0.05, 0.03}, + {0.8, 0.4, 0.06, 0.04}, + {1.0, 0.2, 0.04, 0.02}, + {0.7, 0.5, 0.07, 0.05}, + }, + expectedPercents: []percentRange{ + {0.20, 0.30}, + {0.20, 0.30}, + {0.25, 0.35}, + {0.15, 0.25}, + }, + }, + { + name: "Two upstreams with extreme differences", + upstreams: []upstreamMetrics{ + {1.0, 0.05, 0.001, 0.001}, + {1.0, 1.0, 0.5, 0.5}, + }, + expectedPercents: []percentRange{ + {0.80, 1.00}, + {0.00, 0.2}, + }, + }, + } + + for _, scenario := range scenarios { + t.Run(scenario.name, func(t *testing.T) { + scores := make([]float64, len(scenario.upstreams)) + totalScore := 0.0 + + for i, ups := range scenario.upstreams { + score := registry.calculateScore(ups.totalRequests, ups.p90Latency, ups.errorRate, ups.throttledRate) + scores[i] = float64(score) + totalScore += float64(score) + } + + for i, score := range scores { + percent := score / totalScore + t.Logf("Upstream %d: Score: %f, Percent: %f", i+1, score, percent) + + assert.GreaterOrEqual(t, percent, scenario.expectedPercents[i].min, + "Upstream %d percent should be greater than or equal to %f", i+1, scenario.expectedPercents[i].min) + assert.LessOrEqual(t, percent, scenario.expectedPercents[i].max, + "Upstream %d percent should be less than or equal to %f", i+1, scenario.expectedPercents[i].max) + } + }) + } } func createTestRegistry(projectID string, logger *zerolog.Logger, windowSize time.Duration) (*UpstreamsRegistry, *health.Tracker) { diff --git a/upstream/upstream.go b/upstream/upstream.go index e5f30105..39e2a4b8 100644 --- a/upstream/upstream.go +++ b/upstream/upstream.go @@ -178,12 +178,12 @@ func (u *Upstream) prepareRequest(normalizedReq *common.NormalizedRequest) error } // Forward is used during lifecycle of a proxied request, it uses writers and readers for better performance -func (u *Upstream) Forward(ctx context.Context, req *common.NormalizedRequest) (*common.NormalizedResponse, bool, error) { +func (u *Upstream) Forward(ctx context.Context, req *common.NormalizedRequest) (*common.NormalizedResponse, error) { startTime := time.Now() cfg := u.Config() if reason, skip := u.shouldSkip(req); skip { - return nil, true, common.NewErrUpstreamRequestSkipped(reason, cfg.Id, req) + return nil, common.NewErrUpstreamRequestSkipped(reason, cfg.Id) } clientType := u.Client.GetType() @@ -196,19 +196,16 @@ func (u *Upstream) Forward(ctx context.Context, req *common.NormalizedRequest) ( var errLimiters error limitersBudget, errLimiters = u.rateLimitersRegistry.GetBudget(cfg.RateLimitBudget) if errLimiters != nil { - return nil, false, errLimiters + return nil, errLimiters } } netId := req.NetworkId() method, err := req.Method() if err != nil { - return nil, false, common.NewErrUpstreamRequest( + return nil, common.NewErrUpstreamRequest( err, - u.ProjectId, - netId, cfg.Id, - method, time.Since(startTime), 0, 0, @@ -230,7 +227,7 @@ func (u *Upstream) Forward(ctx context.Context, req *common.NormalizedRequest) ( cfg.Id, method, ) - return nil, true, common.NewErrUpstreamRateLimitRuleExceeded( + return nil, common.NewErrUpstreamRateLimitRuleExceeded( cfg.Id, cfg.RateLimitBudget, fmt.Sprintf("%+v", rule.Config), @@ -248,7 +245,7 @@ func (u *Upstream) Forward(ctx context.Context, req *common.NormalizedRequest) ( req.SetLastUpstream(u) err = u.prepareRequest(req) if err != nil { - return nil, false, err + return nil, err } // @@ -265,7 +262,7 @@ func (u *Upstream) Forward(ctx context.Context, req *common.NormalizedRequest) ( ClientTypePimlicoHttpJsonRpc: jsonRpcClient, okClient := u.Client.(HttpJsonRpcClient) if !okClient { - return nil, false, common.NewErrJsonRpcExceptionInternal( + return nil, common.NewErrJsonRpcExceptionInternal( 0, common.JsonRpcErrorServerSideException, fmt.Sprintf("failed to initialize client for upstream %s", cfg.Id), @@ -319,10 +316,7 @@ func (u *Upstream) Forward(ctx context.Context, req *common.NormalizedRequest) ( if exec != nil { return nil, common.NewErrUpstreamRequest( errCall, - u.ProjectId, - netId, cfg.Id, - method, time.Since(startTime), exec.Attempts(), exec.Retries(), @@ -331,10 +325,7 @@ func (u *Upstream) Forward(ctx context.Context, req *common.NormalizedRequest) ( } else { return nil, common.NewErrUpstreamRequest( errCall, - u.ProjectId, - netId, cfg.Id, - method, time.Since(startTime), 1, 0, @@ -343,7 +334,7 @@ func (u *Upstream) Forward(ctx context.Context, req *common.NormalizedRequest) ( } } else { if resp.IsResultEmptyish() { - health.MetricUpstreamEmptyResponseTotal.WithLabelValues(u.ProjectId, cfg.Id, netId, method).Inc() + health.MetricUpstreamEmptyResponseTotal.WithLabelValues(u.ProjectId, netId, cfg.Id, method).Inc() } } @@ -361,16 +352,15 @@ func (u *Upstream) Forward(ctx context.Context, req *common.NormalizedRequest) ( }) if execErr != nil { - return nil, false, TranslateFailsafeError(execErr) + return nil, TranslateFailsafeError(u.config.Id, method, execErr) } - return resp, false, nil + return resp, nil } else { - r, e := tryForward(ctx, nil) - return r, false, e + return tryForward(ctx, nil) } default: - return nil, false, common.NewErrUpstreamClientInitialization( + return nil, common.NewErrUpstreamClientInitialization( fmt.Errorf("unsupported client type during forward: %s", clientType), cfg.Id, ) @@ -383,7 +373,7 @@ func (u *Upstream) Executor() failsafe.Executor[*common.NormalizedResponse] { func (u *Upstream) EvmGetChainId(ctx context.Context) (string, error) { pr := common.NewNormalizedRequest([]byte(`{"jsonrpc":"2.0","id":75412,"method":"eth_chainId","params":[]}`)) - resp, _, err := u.Forward(ctx, pr) + resp, err := u.Forward(ctx, pr) if err != nil { return "", err } diff --git a/util/redact.go b/util/redact.go index ee1ed460..440fb924 100644 --- a/util/redact.go +++ b/util/redact.go @@ -17,17 +17,17 @@ func RedactEndpoint(endpoint string) string { parsedURL, err := url.Parse(endpoint) if err != nil || parsedURL.Scheme == "" || parsedURL.Host == "" { // If parsing fails or the URL is incomplete, return just the hash - return "redacted=" + hash + return "redacted=" + hash[:5] } // Construct the redacted endpoint var redactedEndpoint string if parsedURL.Scheme == "http" || parsedURL.Scheme == "https" || parsedURL.Scheme == "ws" || parsedURL.Scheme == "wss" { - redactedEndpoint = parsedURL.Scheme + "://" + parsedURL.Host + "#redacted=" + hash + redactedEndpoint = parsedURL.Scheme + "://" + parsedURL.Host + "#redacted=" + hash[:5] } else if strings.HasSuffix(parsedURL.Scheme, "envio") { redactedEndpoint = parsedURL.Scheme + "://" + parsedURL.Host } else { - redactedEndpoint = parsedURL.Scheme + "#redacted=" + hash + redactedEndpoint = parsedURL.Scheme + "#redacted=" + hash[:5] } return redactedEndpoint