Skip to content

Commit

Permalink
Merge pull request #858 from forta-network/jsonrpc-cache-improvements
Browse files Browse the repository at this point in the history
JSON-RPC Cache improvements (authorization in dispatcher, poll error metric fix)
  • Loading branch information
dkeysil authored Apr 4, 2024
2 parents d913ff8 + f54c3d1 commit a329087
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 10 deletions.
26 changes: 21 additions & 5 deletions clients/blocksdata/r2_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ import (
"time"

backoff "github.com/cenkalti/backoff/v4"
"github.com/ethereum/go-ethereum/accounts/keystore"
"github.com/forta-network/forta-core-go/protocol"
"github.com/forta-network/forta-core-go/security"
"github.com/forta-network/forta-core-go/utils/httpclient"
"google.golang.org/protobuf/proto"
)
Expand All @@ -24,13 +26,15 @@ const (

type blocksDataClient struct {
dispatcherURL *url.URL
key *keystore.Key
}

func NewBlocksDataClient(dispatcherURL string) *blocksDataClient {
func NewBlocksDataClient(dispatcherURL string, key *keystore.Key) *blocksDataClient {
u, _ := url.Parse(dispatcherURL)

return &blocksDataClient{
dispatcherURL: u,
key: key,
}
}

Expand All @@ -54,22 +58,34 @@ func (c *blocksDataClient) GetBlocksData(bucket int64) (_ *protocol.BlocksData,
var item PresignedURLItem

err = backoff.Retry(func() error {
resp, err := httpclient.Default.Get(dispatcherUrl)
req, err := http.NewRequest(http.MethodGet, dispatcherUrl, nil)
if err != nil {
return err
return backoff.Permanent(err)
}

defer resp.Body.Close()
scannerJwt, err := security.CreateScannerJWT(c.key, nil)
if err != nil {
return backoff.Permanent(err)
}

b, err := io.ReadAll(resp.Body)
req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", scannerJwt))

resp, err := httpclient.Default.Get(dispatcherUrl)
if err != nil {
return err
}

defer resp.Body.Close()

if resp.StatusCode == http.StatusForbidden {
return backoff.Permanent(fmt.Errorf("forbidden"))
}

b, err := io.ReadAll(resp.Body)
if err != nil {
return err
}

if resp.StatusCode == http.StatusNotFound && bytes.Contains(b, []byte("too old")) {
return fmt.Errorf("%s", b)
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/json-rpc/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func initJsonRpcProxy(ctx context.Context, cfg config.Config) (*jrp.JsonRpcProxy
}

func initJsonRpcCache(ctx context.Context, cfg config.Config, botRegistry registry.BotRegistry) (*jrpcache.JsonRpcCache, error) {
return jrpcache.NewJsonRpcCache(ctx, cfg.JsonRpcCache, botRegistry)
return jrpcache.NewJsonRpcCache(ctx, cfg, botRegistry)
}

func initServices(ctx context.Context, cfg config.Config) ([]services.Service, error) {
Expand Down
16 changes: 12 additions & 4 deletions services/json-rpc/cache/json_rpc_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"strconv"
"time"

"github.com/ethereum/go-ethereum/accounts/keystore"
"github.com/forta-network/forta-core-go/domain"
"github.com/forta-network/forta-core-go/protocol"
"github.com/forta-network/forta-core-go/utils"
Expand Down Expand Up @@ -38,6 +39,7 @@ type JsonRpcCache struct {
botAuthenticator clients.IPAuthenticator
botRegistry registry.BotRegistry
msgClient clients.MessageClient
key *keystore.Key

server *http.Server

Expand All @@ -46,15 +48,21 @@ type JsonRpcCache struct {
blocksDataClient clients.BlocksDataClient
}

func NewJsonRpcCache(ctx context.Context, cfg config.JsonRpcCacheConfig, botRegistry registry.BotRegistry) (*JsonRpcCache, error) {
func NewJsonRpcCache(ctx context.Context, cfg config.Config, botRegistry registry.BotRegistry) (*JsonRpcCache, error) {
botAuthenticator, err := clients.NewBotAuthenticator(ctx)
if err != nil {
return nil, err
}

key, err := config.LoadKeyInContainer(cfg)
if err != nil {
return nil, err
}

return &JsonRpcCache{
ctx: ctx,
cfg: cfg,
key: key,
cfg: cfg.JsonRpcCache,
botAuthenticator: botAuthenticator,
botRegistry: botRegistry,
msgClient: messaging.NewClient("json-rpc-cache", fmt.Sprintf("%s:%s", config.DockerNatsContainerName, config.DefaultNatsPort)),
Expand All @@ -73,7 +81,7 @@ func (c *JsonRpcCache) Start() error {
Handler: r,
}

c.blocksDataClient = blocksdata.NewBlocksDataClient(c.cfg.DispatcherURL)
c.blocksDataClient = blocksdata.NewBlocksDataClient(c.cfg.DispatcherURL, c.key)

utils.GoListenAndServe(c.server)

Expand Down Expand Up @@ -221,7 +229,7 @@ func (c *JsonRpcCache) pollBlocksData() {
blocksData, err := c.blocksDataClient.GetBlocksData(b)
if err != nil {
c.msgClient.PublishProto(messaging.SubjectMetricAgent,
metrics.CreateEventMetric(time.Now(), "system", domain.MetricJSONRPCCachePollError, err.Error()))
metrics.CreateSystemMetric(domain.MetricJSONRPCCachePollError, 1, err.Error()))
log.WithError(err).Errorf("Failed to get BlocksData from dispatcher. bucket: %d", b)
return
}
Expand Down

0 comments on commit a329087

Please sign in to comment.