Skip to content

Commit

Permalink
Merge pull request #861 from forta-network/kisel/forta-1603-delete-an…
Browse files Browse the repository at this point in the history
…d-stop-storing-batch-receipts-on-s3

Remove receipts from the publisher
  • Loading branch information
dkeysil authored Apr 23, 2024
2 parents c34c465 + 52ec537 commit cdfee8a
Show file tree
Hide file tree
Showing 6 changed files with 15 additions and 87 deletions.
12 changes: 4 additions & 8 deletions clients/alertapi/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ type client struct {
apiUrl string
}

func (c *client) post(path string, body interface{}, headers map[string]string, target interface{}) error {
func (c *client) post(path string, body interface{}, headers map[string]string) error {
jsonVal, err := json.Marshal(body)
if err != nil {
return err
Expand Down Expand Up @@ -47,20 +47,16 @@ func (c *client) post(path string, body interface{}, headers map[string]string,
}).Error("alert api error")
return fmt.Errorf("%d error: %s", resp.StatusCode, string(b))
}
return json.Unmarshal(b, target)
return nil
}

func (c *client) PostBatch(batch *domain.AlertBatchRequest, token string) (*domain.AlertBatchResponse, error) {
func (c *client) PostBatch(batch *domain.AlertBatchRequest, token string) error {
path := fmt.Sprintf("/batch/%s", batch.Ref)
headers := map[string]string{
"content-type": "application/json",
"Authorization": fmt.Sprintf("Bearer %s", token),
}
var resp domain.AlertBatchResponse
if err := c.post(path, batch, headers, &resp); err != nil {
return nil, err
}
return &resp, nil
return c.post(path, batch, headers)
}

func NewClient(apiUrl string) *client {
Expand Down
2 changes: 1 addition & 1 deletion clients/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ type MessageClient interface {

// AlertAPIClient calls an http api on the analyzer to store alerts
type AlertAPIClient interface {
PostBatch(batch *domain.AlertBatchRequest, token string) (*domain.AlertBatchResponse, error)
PostBatch(batch *domain.AlertBatchRequest, token string) error
}

type IPAuthenticator interface {
Expand Down
7 changes: 3 additions & 4 deletions clients/mocks/mock_clients.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ require (
github.com/cenkalti/backoff/v4 v4.1.3
github.com/docker/docker v1.6.2
github.com/docker/go-connections v0.4.0
github.com/forta-network/forta-core-go v0.0.0-20240410112425-80ab9a87259c
github.com/forta-network/forta-core-go v0.0.0-20240423071831-edccde967e5b
github.com/prometheus/client_golang v1.14.0
github.com/prometheus/client_model v0.3.0
github.com/prometheus/common v0.39.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -329,8 +329,8 @@ github.com/flynn/noise v0.0.0-20180327030543-2492fe189ae6/go.mod h1:1i71OnUq3iUe
github.com/flynn/noise v1.0.0 h1:DlTHqmzmvcEiKj+4RYo/imoswx/4r6iBlCMfVtrMXpQ=
github.com/flynn/noise v1.0.0/go.mod h1:xbMo+0i6+IGbYdJhF31t2eR1BIU0CYc12+BNAKwUTag=
github.com/fogleman/gg v1.2.1-0.20190220221249-0403632d5b90/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k=
github.com/forta-network/forta-core-go v0.0.0-20240410112425-80ab9a87259c h1:f+lF42ZkBK6PnAY6yQXXn46xzRxk+e/alsito9AtByA=
github.com/forta-network/forta-core-go v0.0.0-20240410112425-80ab9a87259c/go.mod h1:iNehCWOypwVeO8b1GKmsrEWReHTvO5qw8SsGvZsBINo=
github.com/forta-network/forta-core-go v0.0.0-20240423071831-edccde967e5b h1:YoX67i29YPnXGXXvQYGm9oyFdC2MY10zMJqiZHxUd84=
github.com/forta-network/forta-core-go v0.0.0-20240423071831-edccde967e5b/go.mod h1:iNehCWOypwVeO8b1GKmsrEWReHTvO5qw8SsGvZsBINo=
github.com/forta-network/go-multicall v0.0.0-20230609185354-1436386c6707 h1:f6I7K43i2m6AwHSsDxh0Mf3qFzYt8BKnabSl/zGFmh0=
github.com/forta-network/go-multicall v0.0.0-20230609185354-1436386c6707/go.mod h1:nqTUF1REklpWLZ/M5HfzqhSHNz4dPVKzJvbLziqTZpw=
github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g=
Expand Down
75 changes: 4 additions & 71 deletions services/publisher/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,10 @@ import (
"github.com/forta-network/forta-node/clients"
"github.com/forta-network/forta-node/clients/alertapi"
"github.com/forta-network/forta-node/clients/messaging"
"github.com/forta-network/forta-node/clients/storagegrpc"
"github.com/forta-network/forta-node/config"
"github.com/forta-network/forta-node/services/components/estimation"
"github.com/forta-network/forta-node/services/components/metrics"
"github.com/forta-network/forta-node/services/publisher/webhooklog"
"github.com/forta-network/forta-node/services/storage"
"github.com/forta-network/forta-node/store"
ipfsapi "github.com/ipfs/go-ipfs-api"
log "github.com/sirupsen/logrus"
Expand All @@ -59,16 +57,14 @@ type Publisher struct {
cfg PublisherConfig
contract AlertsContract
ipfs ipfs.Client
storage protocol.StorageClient
metricsAggregator *AgentMetricsAggregator
messageClient clients.MessageClient
alertClient clients.AlertAPIClient
localAlertClient LocalAlertClient

lifecycleMetrics metrics.Lifecycle

batchRefStore store.StringStore
lastReceiptStore store.StringStore
batchRefStore store.StringStore

blockTimeline estimation.BlockTimeline

Expand Down Expand Up @@ -109,9 +105,6 @@ type Publisher struct {
// LocalAlertClient sends the local alerts.
type LocalAlertClient webhook.AlertWebhookClient

// StorageClient stores content.
type StorageClient protocol.StorageClient

// EthClient interacts with the Ethereum API.
type EthClient interface {
BlockNumber(ctx context.Context) (uint64, error)
Expand Down Expand Up @@ -276,12 +269,6 @@ func (pub *Publisher) publishNextBatch(batch *protocol.AlertBatch) (published bo
},
)

var lastReceipt string
lr, err := pub.lastReceiptStore.Get()
if err == nil {
lastReceipt = lr
}

signedBatchSummary, err := security.SignBatchSummary(
pub.cfg.Key, &protocol.BatchSummary{
Batch: cid,
Expand All @@ -290,7 +277,6 @@ func (pub *Publisher) publishNextBatch(batch *protocol.AlertBatch) (published bo
BlockEnd: batch.BlockEnd,
AlertCount: batch.AlertCount,
ScannerVersion: batch.ScannerVersion,
PreviousReceipt: lastReceipt,
LatestBlockInput: batch.LatestBlockInput,
Timestamp: time.Now().UTC().Format(time.RFC3339),
},
Expand All @@ -302,7 +288,6 @@ func (pub *Publisher) publishNextBatch(batch *protocol.AlertBatch) (published bo

scannerAddr := pub.cfg.Key.Address.Hex()

var resp *domain.AlertBatchResponse
for i := 0; i < defaultBatchSendRetryTimes; i++ {
var scannerJwt string
scannerJwt, err = security.CreateScannerJWT(
Expand All @@ -314,7 +299,7 @@ func (pub *Publisher) publishNextBatch(batch *protocol.AlertBatch) (published bo
logger.WithError(err).Error("failed to create batch jwt")
return false, err
}
resp, err = pub.alertClient.PostBatch(&domain.AlertBatchRequest{
err = pub.alertClient.PostBatch(&domain.AlertBatchRequest{
Scanner: scannerAddr,
ChainID: int64(batch.ChainId),
BlockStart: int64(batch.BlockStart),
Expand All @@ -340,47 +325,6 @@ func (pub *Publisher) publishNextBatch(batch *protocol.AlertBatch) (published bo
return false, fmt.Errorf("failed to send the alert tx: %v", err)
}

if resp.SignedReceipt != nil {
// store off receipt id
if err := pub.lastReceiptStore.Put(resp.ReceiptID); err != nil {
logger.WithError(err).Error("failed to marshal receipt")
return true, err
}
logger = logger.WithFields(
log.Fields{
"receiptId": resp.ReceiptID,
},
)

// if for some reason receipt can't marshal, log and move on
b, err := json.Marshal(resp.SignedReceipt)
if err != nil {
logger.WithError(err).Error("failed to marshal receipt (not saving receipt)")
return true, nil
}
logger = logger.WithFields(log.Fields{
"receipt": string(b),
})

if pub.cfg.Config.AdvancedConfig.IPFSExperiment {
ctx, cancel := context.WithTimeout(pub.ctx, time.Second*10)
defer cancel()
putResp, err := pub.storage.Put(ctx, &protocol.PutRequest{
User: scannerAddr,
Kind: storage.KindBatchReceipt,
Bytes: b,
})
if err != nil {
logger.WithError(err).Warn("failed to store batch receipt")
} else {
logger = logger.WithFields(log.Fields{
"storedReceiptRef": putResp.ContentId,
"storedReceiptPath": putResp.ContentPath,
})
}
}
}

logger.Info("alert batch")

return true, nil
Expand Down Expand Up @@ -864,15 +808,7 @@ func NewPublisher(ctx context.Context, blockTimeline estimation.BlockTimeline, c

apiClient := alertapi.NewClient(cfg.Publish.APIURL)

var storageClient protocol.StorageClient
if !cfg.LocalModeConfig.Enable && cfg.AdvancedConfig.IPFSExperiment {
storageClient, err = storagegrpc.DialContext(ctx, fmt.Sprintf("%s:%s", config.DockerStorageContainerName, config.DefaultStoragePort))
if err != nil {
return nil, fmt.Errorf("failed to dial the storage client: %v", err)
}
}

return initPublisher(ctx, msgClient, lifecycleMetrics, apiClient, storageClient, blockTimeline,
return initPublisher(ctx, msgClient, lifecycleMetrics, apiClient, blockTimeline,
PublisherConfig{
ChainID: cfg.ChainID,
Key: key,
Expand All @@ -886,8 +822,7 @@ func NewPublisher(ctx context.Context, blockTimeline estimation.BlockTimeline, c
func initPublisher(
ctx context.Context, mc clients.MessageClient,
lifecycleMetrics metrics.Lifecycle, alertClient clients.AlertAPIClient,
storageClient StorageClient, blockTimeline estimation.BlockTimeline,
cfg PublisherConfig,
blockTimeline estimation.BlockTimeline, cfg PublisherConfig,
) (*Publisher, error) {
ipfsClient, err := ipfs.NewClient(fmt.Sprintf("http://%s:5001", config.DockerIpfsContainerName))
if err != nil {
Expand Down Expand Up @@ -929,14 +864,12 @@ func initPublisher(
ctx: ctx,
cfg: cfg,
ipfs: ipfsClient,
storage: storageClient,
metricsAggregator: NewMetricsAggregator(time.Duration(*cfg.PublisherConfig.Batch.MetricsBucketIntervalSeconds)*time.Second, int64(cfg.ChainID)),
messageClient: mc,
alertClient: alertClient,
localAlertClient: localAlertClient,
lifecycleMetrics: lifecycleMetrics,
batchRefStore: store.NewFileStringStore(path.Join(cfg.Config.FortaDir, ".last-batch")),
lastReceiptStore: store.NewFileStringStore(path.Join(cfg.Config.FortaDir, ".last-receipt")),
blockTimeline: blockTimeline,

skipEmpty: cfg.PublisherConfig.Batch.SkipEmpty,
Expand Down

0 comments on commit cdfee8a

Please sign in to comment.