Skip to content

Commit

Permalink
Merge pull request #855 from forta-network/kisel/forta-1609-accept-js…
Browse files Browse the repository at this point in the history
…on-rpc-cache-requests-from-bots

JSON-RPC local cache for v2 bots optimizes a few popular json-rpc requests for 8 protocols: `eth_getBlockNumber`, `eth_getBlockByNumber`, `eth_getLogs` (for a specific block), `trace_block`. Blocks data arrives with a delay of 10-60.
  • Loading branch information
dkeysil authored Apr 2, 2024
2 parents f1cd717 + 42e6876 commit d913ff8
Show file tree
Hide file tree
Showing 18 changed files with 1,012 additions and 7 deletions.
134 changes: 134 additions & 0 deletions clients/blocksdata/r2_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
package blocksdata

import (
"bytes"
"compress/gzip"
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"time"

backoff "github.com/cenkalti/backoff/v4"
"github.com/forta-network/forta-core-go/protocol"
"github.com/forta-network/forta-core-go/utils/httpclient"
"google.golang.org/protobuf/proto"
)

const (
minBackoff = 1 * time.Second
maxBackoff = 10 * time.Second
maxElapsedTime = 5 * time.Minute
)

type blocksDataClient struct {
dispatcherURL *url.URL
}

func NewBlocksDataClient(dispatcherURL string) *blocksDataClient {
u, _ := url.Parse(dispatcherURL)

return &blocksDataClient{
dispatcherURL: u,
}
}

type PresignedURLItem struct {
Bucket int64 `json:"bucket"`
PresignedURL string `json:"presignedURL"`
ExpiresAt int64 `json:"expiresAt"`
}

func (c *blocksDataClient) GetBlocksData(bucket int64) (_ *protocol.BlocksData, err error) {
dispatcherUrl, err := url.JoinPath(c.dispatcherURL.String(), fmt.Sprintf("%d", bucket))
if err != nil {
return nil, err
}

bo := backoff.NewExponentialBackOff()
bo.InitialInterval = minBackoff
bo.MaxInterval = maxBackoff
bo.MaxElapsedTime = maxElapsedTime

var item PresignedURLItem

err = backoff.Retry(func() error {
resp, err := httpclient.Default.Get(dispatcherUrl)
if err != nil {
return err
}

defer resp.Body.Close()

b, err := io.ReadAll(resp.Body)
if err != nil {
return err
}

if resp.StatusCode == http.StatusForbidden {
return backoff.Permanent(fmt.Errorf("forbidden"))
}

if resp.StatusCode == http.StatusNotFound && bytes.Contains(b, []byte("too old")) {
return fmt.Errorf("%s", b)
}

if resp.StatusCode != http.StatusOK {
return fmt.Errorf("unexpected status code: %d, body: %s", resp.StatusCode, b)
}

err = json.Unmarshal(b, &item)
if err != nil {
return err
}

if item.ExpiresAt < time.Now().Unix() {
return backoff.Permanent(fmt.Errorf("presigned URL expired"))
}

return nil
}, bo)

if err != nil {
return nil, err
}

var blocks protocol.BlocksData

err = backoff.Retry(func() error {
resp, err := httpclient.Default.Get(item.PresignedURL)
if err != nil {
return err
}

defer resp.Body.Close()

if resp.StatusCode != 200 {
return fmt.Errorf("unexpected status code: %d", resp.StatusCode)
}

gzipReader, err := gzip.NewReader(resp.Body)
if err != nil {
return err
}

b, err := io.ReadAll(gzipReader)
if err != nil {
return err
}

err = proto.Unmarshal(b, &blocks)
if err != nil {
return backoff.Permanent(err)
}

return nil
}, bo)

if err != nil {
return nil, err
}

return &blocks, nil
}
5 changes: 5 additions & 0 deletions clients/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/events"
"github.com/forta-network/forta-core-go/domain"
"github.com/forta-network/forta-core-go/protocol"
"github.com/forta-network/forta-node/clients/docker"
"github.com/forta-network/forta-node/config"
"github.com/golang/protobuf/proto"
Expand Down Expand Up @@ -68,3 +69,7 @@ type IPAuthenticator interface {
FindContainerNameFromRemoteAddr(ctx context.Context, hostPort string) (string, error)
FindAgentByContainerName(containerName string) (*config.AgentConfig, error)
}

type BlocksDataClient interface {
GetBlocksData(bucket int64) (*protocol.BlocksData, error)
}
39 changes: 39 additions & 0 deletions clients/mocks/mock_clients.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

23 changes: 23 additions & 0 deletions cmd/json-rpc/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,29 +8,52 @@ import (
"github.com/forta-network/forta-node/config"
"github.com/forta-network/forta-node/healthutils"
"github.com/forta-network/forta-node/services"
"github.com/forta-network/forta-node/services/components/registry"
jrp "github.com/forta-network/forta-node/services/json-rpc"
jrpcache "github.com/forta-network/forta-node/services/json-rpc/cache"
)

func initJsonRpcProxy(ctx context.Context, cfg config.Config) (*jrp.JsonRpcProxy, error) {
return jrp.NewJsonRpcProxy(ctx, cfg)
}

func initJsonRpcCache(ctx context.Context, cfg config.Config, botRegistry registry.BotRegistry) (*jrpcache.JsonRpcCache, error) {
return jrpcache.NewJsonRpcCache(ctx, cfg.JsonRpcCache, botRegistry)
}

func initServices(ctx context.Context, cfg config.Config) ([]services.Service, error) {
// can't dial localhost - need to dial host gateway from container
cfg.Scan.JsonRpc.Url = utils.ConvertToDockerHostURL(cfg.Scan.JsonRpc.Url)
cfg.JsonRpcProxy.JsonRpc.Url = utils.ConvertToDockerHostURL(cfg.JsonRpcProxy.JsonRpc.Url)
cfg.Registry.JsonRpc.Url = utils.ConvertToDockerHostURL(cfg.Registry.JsonRpc.Url)

proxy, err := initJsonRpcProxy(ctx, cfg)
if err != nil {
return nil, err
}

key, err := config.LoadKeyInContainer(cfg)
if err != nil {
return nil, err
}

botRegistry, err := registry.New(cfg, key.Address)
if err != nil {
return nil, err
}

cache, err := initJsonRpcCache(ctx, cfg, botRegistry)
if err != nil {
return nil, err
}

return []services.Service{
health.NewService(
ctx, "", healthutils.DefaultHealthServerErrHandler,
health.CheckerFrom(summarizeReports, proxy),
),
proxy,
cache,
}, nil
}

Expand Down
6 changes: 6 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ type JsonRpcProxyConfig struct {
RateLimitConfig *RateLimitConfig `yaml:"rateLimit" json:"rateLimit"`
}

type JsonRpcCacheConfig struct {
DispatcherURL string `yaml:"dispatcherUrl" json:"dispatcherUrl" default:"https://dispatcher.forta.network/batch" validate:"omitempty,url"`
CacheExpirePeriodSeconds int `yaml:"cacheExpirePeriodSeconds" json:"cacheExpirePeriodSeconds" default:"300"`
}

type LogConfig struct {
Level string `yaml:"level" json:"level" default:"info" `
MaxLogSize string `yaml:"maxLogSize" json:"maxLogSize" default:"50m" `
Expand Down Expand Up @@ -227,6 +232,7 @@ type Config struct {
Registry RegistryConfig `yaml:"registry" json:"registry"`
Publish PublisherConfig `yaml:"publish" json:"publish"`
JsonRpcProxy JsonRpcProxyConfig `yaml:"jsonRpcProxy" json:"jsonRpcProxy"`
JsonRpcCache JsonRpcCacheConfig `yaml:"jsonRpcCache" json:"jsonRpcCache"`
PublicAPIProxy PublicAPIProxyConfig `yaml:"publicApiProxy" json:"publicApiProxy"`
Log LogConfig `yaml:"log" json:"log"`
ResourcesConfig ResourcesConfig `yaml:"resources" json:"resources"`
Expand Down
1 change: 1 addition & 0 deletions config/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,6 @@ const (
DefaultPublicAPIProxyPort = "8535"
DefaultJSONRPCProxyPort = "8545"
DefaultBotHealthCheckPort = "8565"
DefaultBotJSONRPCCachePort = "8575"
DefaultFortaNodeBinaryPath = "/forta-node" // the path for the common binary in the container image
)
9 changes: 7 additions & 2 deletions config/env.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ const (
EnvReleaseInfo = "FORTA_RELEASE_INFO"

// Agent env vars
EnvJsonRpcHost = "JSON_RPC_HOST"
EnvJsonRpcPort = "JSON_RPC_PORT"
EnvJWTProviderHost = "FORTA_JWT_PROVIDER_HOST"
EnvJWTProviderPort = "FORTA_JWT_PROVIDER_PORT"
EnvPublicAPIProxyHost = "FORTA_PUBLIC_API_PROXY_HOST"
Expand All @@ -20,4 +18,11 @@ const (
EnvFortaShardID = "FORTA_SHARD_ID"
EnvFortaShardCount = "FORTA_SHARD_COUNT"
EnvFortaTokenExchangeURL = "FORTA_TOKEN_EXCHANGE_URL"

EnvJsonRpcHost = "JSON_RPC_HOST"
EnvJsonRpcPort = "JSON_RPC_PORT"
EnvCacheJsonRpcCachePort = "JSON_RPC_CACHE_PORT"
EnvCacheRequestTimeout = "JSON_RPC_CACHE_TIMEOUT"
EnvCacheRequestInterval = "JSON_RPC_CACHE_INTERVAL"
EnvCacheSupportedChains = "JSON_RPC_CACHE_SUPPORTED_CHAINS"
)
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,14 @@ require (
replace github.com/docker/docker => github.com/moby/moby v20.10.25+incompatible

require (
github.com/cenkalti/backoff/v4 v4.1.3
github.com/docker/docker v1.6.2
github.com/docker/go-connections v0.4.0
github.com/forta-network/forta-core-go v0.0.0-20240306085049-a1ac54ae90f5
github.com/forta-network/forta-core-go v0.0.0-20240401084734-5e73299ce04c
github.com/prometheus/client_golang v1.14.0
github.com/prometheus/client_model v0.3.0
github.com/prometheus/common v0.39.0
google.golang.org/protobuf v1.28.1
)

require (
Expand All @@ -60,7 +62,6 @@ require (
github.com/blang/semver/v4 v4.0.0 // indirect
github.com/btcsuite/btcd/btcec/v2 v2.2.0 // indirect
github.com/cenkalti/backoff v2.2.1+incompatible // indirect
github.com/cenkalti/backoff/v4 v4.1.3 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/containerd/cgroups v1.0.4 // indirect
github.com/coreos/go-systemd/v22 v22.4.0 // indirect
Expand Down Expand Up @@ -286,7 +287,6 @@ require (
golang.org/x/tools v0.2.0 // indirect
golang.org/x/xerrors v0.0.0-20220609144429-65e65417b02f // indirect
google.golang.org/genproto v0.0.0-20211208223120-3a66f561d7aa // indirect
google.golang.org/protobuf v1.28.1 // indirect
gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce // indirect
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -329,8 +329,8 @@ github.com/flynn/noise v0.0.0-20180327030543-2492fe189ae6/go.mod h1:1i71OnUq3iUe
github.com/flynn/noise v1.0.0 h1:DlTHqmzmvcEiKj+4RYo/imoswx/4r6iBlCMfVtrMXpQ=
github.com/flynn/noise v1.0.0/go.mod h1:xbMo+0i6+IGbYdJhF31t2eR1BIU0CYc12+BNAKwUTag=
github.com/fogleman/gg v1.2.1-0.20190220221249-0403632d5b90/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k=
github.com/forta-network/forta-core-go v0.0.0-20240306085049-a1ac54ae90f5 h1:bj2OqjhoCRKUYlSKySH3kWanC77QnlWZsQrSCaw7FDg=
github.com/forta-network/forta-core-go v0.0.0-20240306085049-a1ac54ae90f5/go.mod h1:iNehCWOypwVeO8b1GKmsrEWReHTvO5qw8SsGvZsBINo=
github.com/forta-network/forta-core-go v0.0.0-20240401084734-5e73299ce04c h1:nlbe6zfWzQD/V4iEeL6ggB2qWr1SoOZQZLV5uCJIN5U=
github.com/forta-network/forta-core-go v0.0.0-20240401084734-5e73299ce04c/go.mod h1:iNehCWOypwVeO8b1GKmsrEWReHTvO5qw8SsGvZsBINo=
github.com/forta-network/go-multicall v0.0.0-20230609185354-1436386c6707 h1:f6I7K43i2m6AwHSsDxh0Mf3qFzYt8BKnabSl/zGFmh0=
github.com/forta-network/go-multicall v0.0.0-20230609185354-1436386c6707/go.mod h1:nqTUF1REklpWLZ/M5HfzqhSHNz4dPVKzJvbLziqTZpw=
github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g=
Expand Down
4 changes: 4 additions & 0 deletions services/components/containers/definitions.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/forta-network/forta-node/clients/docker"
"github.com/forta-network/forta-node/config"
jrpcache "github.com/forta-network/forta-node/services/json-rpc/cache"
)

// Label values
Expand Down Expand Up @@ -44,6 +45,9 @@ func NewBotContainerConfig(
config.EnvFortaBotOwner: botConfig.Owner,
config.EnvFortaHealthCheckPort: config.DefaultBotHealthCheckPort,
config.EnvFortaTokenExchangeURL: tokenExchangeURL,
config.EnvCacheJsonRpcCachePort: config.DefaultBotJSONRPCCachePort,
config.EnvCacheRequestTimeout: jrpcache.BotCacheRequestTimeoutSeconds,
config.EnvCacheRequestInterval: jrpcache.BotCacheRequestIntervalSeconds,
}
if botConfig.ChainID > 0 {
env[config.EnvFortaChainID] = fmt.Sprintf("%d", botConfig.ChainID)
Expand Down
22 changes: 22 additions & 0 deletions services/components/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,18 @@ func CreateAgentMetricV2(agt config.AgentConfig, metric string, value float64, c
}
}

func CreateDetailedAgentMetricV2(agt config.AgentConfig, metric string, value float64, details string, chainID int64) *protocol.AgentMetric {
return &protocol.AgentMetric{
AgentId: agt.ID,
Timestamp: time.Now().Format(time.RFC3339),
Name: metric,
Value: value,
ShardId: agt.ShardID(),
ChainId: chainID,
Details: details,
}
}

func CreateEventMetric(t time.Time, id string, metric string, details string) *protocol.AgentMetric {
return &protocol.AgentMetric{
AgentId: id,
Expand All @@ -52,6 +64,16 @@ func CreateEventMetric(t time.Time, id string, metric string, details string) *p
}
}

func CreateSystemMetric(metric string, value float64, details string) *protocol.AgentMetric {
return &protocol.AgentMetric{
AgentId: "system",
Timestamp: time.Now().Format(time.RFC3339),
Name: metric,
Value: value,
Details: details,
}
}

func CreateAgentResourcesMetric(agt config.AgentConfig, t time.Time, metric string, value float64) *protocol.AgentMetric {
return &protocol.AgentMetric{
AgentId: agt.ID,
Expand Down
Loading

0 comments on commit d913ff8

Please sign in to comment.