From cbe28973cc3cc17d0b942fb5221c6d762c576d4d Mon Sep 17 00:00:00 2001 From: philip-morlier Date: Tue, 6 Feb 2024 12:05:49 -0800 Subject: [PATCH 01/19] Added heavy check for connectivity and gap range This involved moving the other startup check, certaintyCheck, along with the new check from above into their own utility function at the bottom of main.go. --- api/block.go | 7 ++--- main.go | 72 +++++++++++++++++++++++++++++++++++++--------------- 2 files changed, 56 insertions(+), 23 deletions(-) diff --git a/api/block.go b/api/block.go index 6338f7f..490ce12 100644 --- a/api/block.go +++ b/api/block.go @@ -45,16 +45,17 @@ func (api *BlockAPI) ChainId(ctx context.Context) hexutil.Uint64 { return hexutil.Uint64(api.cfg.Chainid) } -func (api *BlockAPI) BlockNumber(ctx context.Context) (hexutil.Uint64, error) { +func (api *BlockAPI) BlockNumber(ctx context.Context) hexutil.Uint64 { log.Debug("eth_blockNumber served from flume light by default") hitMeter.Mark(1) blockNo, err := getLatestBlock(ctx, api.db) if err != nil { - return 0, err + log.Error("Error returned from getLatestBlock", "err", err) + return 0 } - return hexutil.Uint64(blockNo), nil + return hexutil.Uint64(blockNo) } var ( diff --git a/main.go b/main.go index e75de1f..38a043f 100644 --- a/main.go +++ b/main.go @@ -15,10 +15,12 @@ import ( log "github.com/inconshreveable/log15" rpcTransports "github.com/openrelayxyz/cardinal-rpc/transports" + "github.com/openrelayxyz/cardinal-types/hexutil" "github.com/openrelayxyz/cardinal-types/metrics" "github.com/openrelayxyz/cardinal-types/metrics/publishers" "github.com/openrelayxyz/cardinal-flume/api" "github.com/openrelayxyz/cardinal-flume/config" + "github.com/openrelayxyz/cardinal-flume/heavy" "github.com/openrelayxyz/cardinal-flume/indexer" "github.com/openrelayxyz/cardinal-flume/migrations" "github.com/openrelayxyz/cardinal-flume/plugins" @@ -33,6 +35,7 @@ func main() { lightSeed := flag.Int64("lightSeed", 0, "set light service starting block") blockRollback := flag.Int64("block.rollback", 0, "Rollback to block N before syncing. If N < 0, rolls back from head before starting or syncing.") runCertaintyCheck := flag.Bool("certaintyCheck", false, "run database uncertainty check") + runHeavyCheck := flag.Bool("heavyCheck", false, "run heavy instance connectivity and overlap check") flag.CommandLine.Parse(os.Args[1:]) @@ -101,26 +104,7 @@ func main() { } _, hasBlocks := cfg.Databases["blocks"] if hasBlocks { - if *runCertaintyCheck { - var highestNumber uint64 - err := logsdb.QueryRowContext(context.Background(), "SELECT max(number) FROM blocks.blocks;").Scan(&highestNumber) - if err != nil { - log.Error("Error running certainty check on blocks database, highestNumber", "err", err) - } - rows, err := logsdb.QueryContext(context.Background(), "SELECT number + 1 FROM blocks.blocks WHERE number + 1 NOT IN (SELECT number FROM blocks.blocks);") - if err != nil { - log.Error("Error running certainty check on blocks database, gaps query", "err", err) - } - defer rows.Close() - if rows.Next() { - var number uint64 - rows.Scan(&number) - if number != highestNumber +1 { - log.Error("gaps found in blocks database", "missing blocks beginning at block number", number) - os.Exit(1) - } - } - } + runStartupChecks(*runCertaintyCheck, *runHeavyCheck, logsdb, cfg) log.Info("has blocks", "blocks", cfg.Databases["blocks"]) } _, hasTx := cfg.Databases["transactions"] @@ -349,3 +333,51 @@ func main() { metrics.Clear() time.Sleep(time.Second) } + +func runStartupChecks(certainty, heavyCheck bool, database *sql.DB, config *config.Config) { + var earliestBlock, latestBlock uint64 + if err := database.QueryRowContext(context.Background(), "SELECT min(number), max(number) FROM blocks.blocks;").Scan(&earliestBlock, &latestBlock); err != nil { + log.Error("Error aquiring highest block from blocks db for startup checks", "err", err) + } + + if certainty { + rows, err := database.QueryContext(context.Background(), "SELECT number + 1 FROM blocks.blocks WHERE number + 1 NOT IN (SELECT number FROM blocks.blocks);") + if err != nil { + log.Error("Error running certainty check on blocks database, gaps query", "err", err) + } + defer rows.Close() + if rows.Next() { + var number uint64 + rows.Scan(&number) + if number != latestBlock +1 { + log.Error("gaps found in blocks database", "missing blocks beginning at block number", number) + os.Exit(1) + } + } + } + + if heavyCheck { + if config.HeavyServer == "" { + log.Error("No heavy server found") + os.Exit(1) + } + heavyLatest, err := heavy.CallHeavy[string](context.Background(), config.HeavyServer, "eth_blockNumber") + if err != nil { + log.Error(fmt.Sprintf("Error calling heavy service at %v, unable to run test, exiting", config.HeavyServer), "err", err) + os.Exit(1) + } + hl, err := hexutil.DecodeUint64(*heavyLatest) + if err != nil { + log.Error("Error decoding response from call to heavy server eth_blockNumber, unable to run test, exiting", "err", err) + os.Exit(1) + } + if earliestBlock > hl { + log.Error("Gap found between local and heavy database", "local earliest", earliestBlock, "heavy latest", heavyLatest) + os.Exit(1) + } + if hl - earliestBlock < 128 { + log.Error("Overlap between local and heavy databases is too small", "local earliest", earliestBlock, "heavy latest", heavyLatest) + os.Exit(1) + } + } +} From c9e78dc7b9166ed1ff96299a0df54d5221ee72e7 Mon Sep 17 00:00:00 2001 From: philip-morlier Date: Tue, 6 Feb 2024 12:28:52 -0800 Subject: [PATCH 02/19] Added call heavy hit/miss metric to blocks methods --- api/block.go | 31 +++++++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/api/block.go b/api/block.go index 490ce12..53378da 100644 --- a/api/block.go +++ b/api/block.go @@ -61,6 +61,9 @@ func (api *BlockAPI) BlockNumber(ctx context.Context) hexutil.Uint64 { var ( gbbnHitMeter = metrics.NewMinorMeter("/flume/gbbn/hit") gbbnMissMeter = metrics.NewMinorMeter("/flume/gbbn/miss") + + heavyBlockHit = metrics.NewMinorMeter("/flume/hbc/hit") + heavyBlockMiss = metrics.NewMinorMeter("/flume/hbc/miss") ) func (api *BlockAPI) GetBlockByNumber(ctx context.Context, blockNumber rpc.BlockNumber, includeTxns bool) (*map[string]interface{}, error) { @@ -74,6 +77,10 @@ func (api *BlockAPI) GetBlockByNumber(ctx context.Context, blockNumber rpc.Block if err != nil { return nil, err } + if repsonseShell == nil { + heavyBlockMiss.Mark(1) + } + heavyBlockHit.Mark(1) return responseShell, nil } @@ -137,6 +144,10 @@ func (api *BlockAPI) GetBlockByHash(ctx context.Context, blockHash types.Hash, i if err != nil { return nil, err } + if repsonseShell == nil { + heavyBlockMiss.Mark(1) + } + heavyBlockHit.Mark(1) return responseShell, nil } @@ -193,6 +204,10 @@ func (api *BlockAPI) GetBlockTransactionCountByNumber(ctx context.Context, block if err != nil { return nil, err } + if repsonseShell == nil { + heavyBlockMiss.Mark(1) + } + heavyBlockHit.Mark(1) return count, nil } @@ -235,6 +250,10 @@ func (api *BlockAPI) GetBlockTransactionCountByHash(ctx context.Context, blockHa if err != nil { return nil, err } + if repsonseShell == nil { + heavyBlockMiss.Mark(1) + } + heavyBlockHit.Mark(1) return count, nil } @@ -277,6 +296,10 @@ func (api *BlockAPI) GetUncleCountByBlockNumber(ctx context.Context, blockNumber if err != nil { return nil, err } + if repsonseShell == nil { + heavyBlockMiss.Mark(1) + } + heavyBlockHit.Mark(1) return count, nil } @@ -322,6 +345,10 @@ func (api *BlockAPI) GetUncleCountByBlockHash(ctx context.Context, blockHash typ if err != nil { return nil, err } + if repsonseShell == nil { + heavyBlockMiss.Mark(1) + } + heavyBlockHit.Mark(1) return count, nil } @@ -364,6 +391,10 @@ func (api *BlockAPI) GetBlockReceipts(ctx context.Context, input BlockNumberOrHa if err != nil { return nil, err } + if repsonseShell == nil { + heavyBlockMiss.Mark(1) + } + heavyBlockHit.Mark(1) return *rt, nil } From 117fe00936ca221481356930df2e5d18968b8cdb Mon Sep 17 00:00:00 2001 From: philip-morlier Date: Tue, 6 Feb 2024 12:39:22 -0800 Subject: [PATCH 03/19] Added heavy hit/miss metric to transaction methods cleaned up typos from previous commit --- api/block.go | 24 ++++++++++++++---------- api/transaction.go | 23 +++++++++++++++++++++++ 2 files changed, 37 insertions(+), 10 deletions(-) diff --git a/api/block.go b/api/block.go index 53378da..b411b25 100644 --- a/api/block.go +++ b/api/block.go @@ -19,6 +19,9 @@ import ( var ( hitMeter = metrics.NewMajorMeter("/flume/hit") missMeter = metrics.NewMajorMeter("/flume/miss") + + heavyBlockHit = metrics.NewMinorMeter("/flume/hbc/hit") + heavyBlockMiss = metrics.NewMinorMeter("/flume/hbc/miss") ) type BlockAPI struct { @@ -61,9 +64,6 @@ func (api *BlockAPI) BlockNumber(ctx context.Context) hexutil.Uint64 { var ( gbbnHitMeter = metrics.NewMinorMeter("/flume/gbbn/hit") gbbnMissMeter = metrics.NewMinorMeter("/flume/gbbn/miss") - - heavyBlockHit = metrics.NewMinorMeter("/flume/hbc/hit") - heavyBlockMiss = metrics.NewMinorMeter("/flume/hbc/miss") ) func (api *BlockAPI) GetBlockByNumber(ctx context.Context, blockNumber rpc.BlockNumber, includeTxns bool) (*map[string]interface{}, error) { @@ -77,7 +77,7 @@ func (api *BlockAPI) GetBlockByNumber(ctx context.Context, blockNumber rpc.Block if err != nil { return nil, err } - if repsonseShell == nil { + if responseShell == nil { heavyBlockMiss.Mark(1) } heavyBlockHit.Mark(1) @@ -144,7 +144,7 @@ func (api *BlockAPI) GetBlockByHash(ctx context.Context, blockHash types.Hash, i if err != nil { return nil, err } - if repsonseShell == nil { + if responseShell == nil { heavyBlockMiss.Mark(1) } heavyBlockHit.Mark(1) @@ -204,7 +204,7 @@ func (api *BlockAPI) GetBlockTransactionCountByNumber(ctx context.Context, block if err != nil { return nil, err } - if repsonseShell == nil { + if count == nil { heavyBlockMiss.Mark(1) } heavyBlockHit.Mark(1) @@ -250,7 +250,7 @@ func (api *BlockAPI) GetBlockTransactionCountByHash(ctx context.Context, blockHa if err != nil { return nil, err } - if repsonseShell == nil { + if count == nil { heavyBlockMiss.Mark(1) } heavyBlockHit.Mark(1) @@ -296,7 +296,7 @@ func (api *BlockAPI) GetUncleCountByBlockNumber(ctx context.Context, blockNumber if err != nil { return nil, err } - if repsonseShell == nil { + if count == nil { heavyBlockMiss.Mark(1) } heavyBlockHit.Mark(1) @@ -345,7 +345,7 @@ func (api *BlockAPI) GetUncleCountByBlockHash(ctx context.Context, blockHash typ if err != nil { return nil, err } - if repsonseShell == nil { + if count == nil { heavyBlockMiss.Mark(1) } heavyBlockHit.Mark(1) @@ -391,7 +391,7 @@ func (api *BlockAPI) GetBlockReceipts(ctx context.Context, input BlockNumberOrHa if err != nil { return nil, err } - if repsonseShell == nil { + if rt == nil { heavyBlockMiss.Mark(1) } heavyBlockHit.Mark(1) @@ -440,6 +440,10 @@ func (api *BlockAPI) GetBlockReceipts(ctx context.Context, input BlockNumberOrHa if err != nil { return nil, err } + if rt == nil { + heavyBlockMiss.Mark(1) + } + heavyBlockHit.Mark(1) return *rt, nil } diff --git a/api/transaction.go b/api/transaction.go index bab8dcf..7038217 100644 --- a/api/transaction.go +++ b/api/transaction.go @@ -40,6 +40,9 @@ func NewTransactionAPI(db *sql.DB, network uint64, pl *plugins.PluginLoader, cfg var ( gtbhHitMeter = metrics.NewMinorMeter("/flume/gtbh/hit") gtbhMissMeter = metrics.NewMinorMeter("/flume/gtbh/miss") + + heavyTxHit = metrics.NewMinorMeter("/flume/htc/hit") + heavyTxMiss = metrics.NewMinorMeter("/flume/htc/miss") ) func (api *TransactionAPI) GetTransactionByHash(ctx context.Context, txHash types.Hash) (*map[string]interface{}, error) { @@ -52,6 +55,10 @@ func (api *TransactionAPI) GetTransactionByHash(ctx context.Context, txHash type if err != nil { return nil, err } + if responseShell == nil { + heavyTxMiss.Mark(1) + } + heavyTxHit.Mark(1) return responseShell, nil } @@ -110,6 +117,10 @@ func (api *TransactionAPI) GetTransactionByBlockHashAndIndex(ctx context.Context if err != nil { return nil, err } + if responseShell == nil { + heavyTxMiss.Mark(1) + } + heavyTxHit.Mark(1) return responseShell, nil } @@ -144,6 +155,10 @@ func (api *TransactionAPI) GetTransactionByBlockNumberAndIndex(ctx context.Conte if err != nil { return nil, err } + if responseShell == nil { + heavyTxMiss.Mark(1) + } + heavyTxHit.Mark(1) return responseShell, nil } @@ -185,6 +200,10 @@ func (api *TransactionAPI) GetTransactionReceipt(ctx context.Context, txHash typ if err != nil { return nil, err } + if responseShell == nil { + heavyTxMiss.Mark(1) + } + heavyTxHit.Mark(1) return responseShell, nil } @@ -252,6 +271,10 @@ func (api *TransactionAPI) GetTransactionCount(ctx context.Context, addr common. if err != nil { return nil, err } + if count == nil { + heavyTxMiss.Mark(1) + } + heavyTxHit.Mark(1) return count, nil } From cb812262cde01bad20fe8b9a4fa98eed2e65442e Mon Sep 17 00:00:00 2001 From: philip-morlier Date: Wed, 7 Feb 2024 13:26:07 -0800 Subject: [PATCH 04/19] Added discrete heavy call and parallelization of calls across flume instances in flume namespce api --- api/flume.go | 533 +++++++++++++++++++++++++++++++++++++++---------- heavy/heavy.go | 13 ++ 2 files changed, 435 insertions(+), 111 deletions(-) diff --git a/api/flume.go b/api/flume.go index 5a4efb3..19054e0 100644 --- a/api/flume.go +++ b/api/flume.go @@ -65,92 +65,182 @@ func (api *FlumeAPI) HeavyClientVersion(ctx context.Context) (string, error) { return name, nil } +func exhaustChannels[T any](ch chan T, errChan chan error) { + go func() { + select { + case <- ch: + case <- errChan: + } + }() +} + -func (api *FlumeAPI) GetTransactionsBySender(ctx context.Context, address common.Address, offset *int) (*paginator[map[string]interface{}], error) { +func (api *FlumeAPI) GetTransactionsBySender(ctx *rpc.CallContext, address common.Address, offset *int) (*paginator[map[string]interface{}], error) { + + heavyResult := make(chan *paginator[map[string]interface{}]) + errChan := make(chan error) if len(api.cfg.HeavyServer) > 0 { log.Debug("flume_getTransactionsBySender sent to flume heavy by default") missMeter.Mark(1) - tx, err := heavy.CallHeavy[*paginator[map[string]interface{}]](ctx, api.cfg.HeavyServer, "flume_getTransactionsBySender", address, offset) - if err != nil { - return nil, err - } - return *tx, nil + go func() { + tx, err := heavy.CallHeavyDiscrete[*paginator[map[string]interface{}]](ctx.Context(), api.cfg.HeavyServer, api.cfg.EarliestBlock, "flume_getTransactionsBySender", address, offset) + if err != nil { + log.Error("Error processing request in flume_getTransactionsBySender", "err", err) + errChan <- err + } + heavyResult <- *tx + }() + } else { + close(heavyResult) } if offset == nil { offset = new(int) } - ctxs, err := getFlumeTransactions(ctx, api.db, *offset, 1000, api.network, "sender = ?", trimPrefix(address.Bytes())) + + var ctxs []map[string]interface{} + var err error + if ctx.Latest > 0 { + ctxs, err = getFlumeTransactions(ctx.Context(), api.db, *offset, 1000, api.network, "sender = ? AND block < ?", trimPrefix(address.Bytes()), ctx.Latest) + } else { + ctxs, err = getFlumeTransactions(ctx.Context(), api.db, *offset, 1000, api.network, "sender = ?", trimPrefix(address.Bytes())) + } if err != nil { - log.Error("Error getting txs", "err", err.Error()) + exhaustChannels[*paginator[map[string]interface{}]](heavyResult, errChan) + log.Error("Error getting txs, getTransactionsBySender", "err", err) return nil, err } - txs, err := getPendingTransactions(ctx, api.db, api.mempool, *offset, 1000, api.network, "sender = ?", trimPrefix(address.Bytes())) + + txs, err := getPendingTransactions(ctx.Context(), api.db, api.mempool, *offset, 1000, api.network, "sender = ?", trimPrefix(address.Bytes())) if err != nil { - log.Error("Error getting pending txs", "err", err.Error()) + exhaustChannels[*paginator[map[string]interface{}]](heavyResult, errChan) + log.Error("Error getting pending txs, getTransactionsBySender", "err", err) return nil, err } + ctxs = append(ctxs, txs...) - + result := paginator[map[string]interface{}]{Items: ctxs} if len(ctxs) >= 1000 { result.Token = *offset + len(ctxs) } + + select { + case hr, ok := <- heavyResult: + if ok { + result.Items = append(hr.Items, result.Items...) + if result.Token == nil { + result.Token = hr.Token + } + } + case err := <- errChan: + return nil, err + } + return &result, nil } -func (api *FlumeAPI) GetTransactionReceiptsBySender(ctx context.Context, address common.Address, offset *int) (*paginator[map[string]interface{}], error) { +func (api *FlumeAPI) GetTransactionReceiptsBySender(ctx *rpc.CallContext, address common.Address, offset *int) (*paginator[map[string]interface{}], error) { + heavyResult := make(chan *paginator[map[string]interface{}]) + errChan := make(chan error) + if len(api.cfg.HeavyServer) > 0 { log.Debug("flume_getTransactionReceiptsBySender sent to flume heavy by default") missMeter.Mark(1) - rt, err := heavy.CallHeavy[*paginator[map[string]interface{}]](ctx, api.cfg.HeavyServer, "flume_getTransactionReceiptsBySender", address, offset) - if err != nil { - return nil, err - } - return *rt, nil + go func() { + rt, err := heavy.CallHeavyDiscrete[*paginator[map[string]interface{}]](ctx.Context(), api.cfg.HeavyServer, api.cfg.EarliestBlock, "flume_getTransactionReceiptsBySender", address, offset) + if err != nil { + log.Error("Error processing request in flume_getTransactionReceiptsBySender", "err", err) + errChan <- err + } + heavyResult <- *rt + }() + } else { + close(heavyResult) } if offset == nil { offset = new(int) } - receipts, err := getFlumeTransactionReceipts(ctx, api.db, *offset, 1000, api.network, "sender = ?", trimPrefix(address.Bytes())) + + var receipts []map[string]interface{} + var err error + + if ctx.Latest > 0 { + receipts, err = getFlumeTransactionReceipts(ctx.Context(), api.db, *offset, 1000, api.network, "sender = ? AND block < ?", trimPrefix(address.Bytes()), ctx.Latest) + } else { + receipts, err = getFlumeTransactionReceipts(ctx.Context(), api.db, *offset, 1000, api.network, "sender = ?", trimPrefix(address.Bytes())) + } + if err != nil { - log.Error("Error getting receipts", "err", err.Error()) + exhaustChannels[*paginator[map[string]interface{}]](heavyResult, errChan) + log.Error("Error getting receipts, getTransactionReceiptsBySender", "err", err.Error()) return nil, err } + result := paginator[map[string]interface{}]{Items: receipts} - if len(receipts) == 1000 { + if len(receipts) >= 1000 { result.Token = *offset + len(receipts) } + select { + case hr, ok := <- heavyResult: + if ok { + result.Items = append(hr.Items, result.Items...) + if result.Token == nil { + result.Token = hr.Token + } + } + case err := <- errChan: + return nil, err + } + return &result, nil } -func (api *FlumeAPI) GetTransactionsByRecipient(ctx context.Context, address common.Address, offset *int) (*paginator[map[string]interface{}], error) { +func (api *FlumeAPI) GetTransactionsByRecipient(ctx *rpc.CallContext, address common.Address, offset *int) (*paginator[map[string]interface{}], error) { + + heavyResult := make(chan *paginator[map[string]interface{}]) + errChan := make(chan error) if len(api.cfg.HeavyServer) > 0 { log.Debug("flume_getTransactionsByRecipient sent to flume heavy by default") missMeter.Mark(1) - tx, err := heavy.CallHeavy[*paginator[map[string]interface{}]](ctx, api.cfg.HeavyServer, "flume_getTransactionsByRecipient", address, offset) - if err != nil { - return nil, err - } - return *tx, nil + go func() { + tx, err := heavy.CallHeavyDiscrete[*paginator[map[string]interface{}]](ctx.Context(), api.cfg.HeavyServer, api.cfg.EarliestBlock, "flume_getTransactionsByRecipient", address, offset) + if err != nil { + log.Error("Error processing request in flume_getTransactionsByRecipient", "err", err) + errChan <- err + } + heavyResult <- *tx + }() + } else { + close(heavyResult) } if offset == nil { offset = new(int) } - ctxs, err := getFlumeTransactions(ctx, api.db, *offset, 1000, api.network, "recipient = ?", trimPrefix(address.Bytes())) + + var ctxs []map[string]interface{} + var err error + if ctx.Latest > 0 { + ctxs, err = getFlumeTransactions(ctx.Context(), api.db, *offset, 1000, api.network, "recipient = ? AND block < ?", trimPrefix(address.Bytes()), ctx.Latest) + } else { + ctxs, err = getFlumeTransactions(ctx.Context(), api.db, *offset, 1000, api.network, "recipient = ?", trimPrefix(address.Bytes())) + } if err != nil { - log.Error("Error getting txs", "err", err.Error()) + exhaustChannels[*paginator[map[string]interface{}]](heavyResult, errChan) + log.Error("Error getting txs in GetTransactionsByRecipient", "err", err.Error()) return nil, err } - txs, err := getPendingTransactions(ctx, api.db, api.mempool, *offset, 1000, api.network, "recipient = ?", trimPrefix(address.Bytes())) + + txs, err := getPendingTransactions(ctx.Context(), api.db, api.mempool, *offset, 1000, api.network, "recipient = ?", trimPrefix(address.Bytes())) if err != nil { - log.Error("Error getting pending txs", "err", err.Error()) + exhaustChannels[*paginator[map[string]interface{}]](heavyResult, errChan) + log.Error("Error getting pending txs, GetTransactionsByRecipient", "err", err.Error()) return nil, err } ctxs = append(ctxs, txs...) @@ -159,96 +249,202 @@ func (api *FlumeAPI) GetTransactionsByRecipient(ctx context.Context, address com if len(ctxs) >= 1000 { result.Token = *offset + len(ctxs) } + + select { + case hr, ok := <- heavyResult: + if ok { + result.Items = append(hr.Items, result.Items...) + if result.Token == nil { + result.Token = hr.Token + } + } + case err := <- errChan: + return nil, err + } + return &result, nil } -func (api *FlumeAPI) GetTransactionReceiptsByRecipient(ctx context.Context, address common.Address, offset *int) (*paginator[map[string]interface{}], error) { +func (api *FlumeAPI) GetTransactionReceiptsByRecipient(ctx *rpc.CallContext, address common.Address, offset *int) (*paginator[map[string]interface{}], error) { + + + heavyResult := make(chan *paginator[map[string]interface{}]) + errChan := make(chan error) if len(api.cfg.HeavyServer) > 0 { log.Debug("flume_getTransactionReceiptsByRecipient sent to flume heavy by default") missMeter.Mark(1) - tx, err := heavy.CallHeavy[*paginator[map[string]interface{}]](ctx, api.cfg.HeavyServer, "flume_getTransactionReceiptsByRecipient", address, offset) - if err != nil { - return nil, err - } - return *tx, nil + go func() { + rt, err := heavy.CallHeavyDiscrete[*paginator[map[string]interface{}]](ctx.Context(), api.cfg.HeavyServer, api.cfg.EarliestBlock, "flume_getTransactionReceiptsByRecipient", address, offset) + if err != nil { + log.Error("Error processing request in flume_getTransactionReceiptsByRecipient", "err", err) + errChan <- err + } + heavyResult <- *rt + }() + } else { + close(heavyResult) } if offset == nil { offset = new(int) } - receipts, err := getFlumeTransactionReceipts(ctx, api.db, *offset, 1000, api.network, "recipient = ?", trimPrefix(address.Bytes())) + + var receipts []map[string]interface{} + var err error + + if ctx.Latest > 0 { + receipts, err = getFlumeTransactionReceipts(ctx.Context(), api.db, *offset, 1000, api.network, "recipient = ? AND block < ?", trimPrefix(address.Bytes()), ctx.Latest) + } else { + receipts, err = getFlumeTransactionReceipts(ctx.Context(), api.db, *offset, 1000, api.network, "recipient = ?", trimPrefix(address.Bytes())) + } + if err != nil { - log.Error("Error getting receipts", "err", err.Error()) + exhaustChannels[*paginator[map[string]interface{}]](heavyResult, errChan) + log.Error("Error getting receipts in getTransactionReceiptsByRecipient", "err", err.Error()) return nil, err } + result := paginator[map[string]interface{}]{Items: receipts} if len(receipts) == 1000 { result.Token = *offset + len(receipts) } + + select { + case hr, ok := <- heavyResult: + if ok { + result.Items = append(hr.Items, result.Items...) + if result.Token == nil { + result.Token = hr.Token + } + } + case err := <- errChan: + return nil, err + } + return &result, nil } -func (api *FlumeAPI) GetTransactionsByParticipant(ctx context.Context, address common.Address, offset *int) (*paginator[map[string]interface{}], error) { +func (api *FlumeAPI) GetTransactionsByParticipant(ctx *rpc.CallContext, address common.Address, offset *int) (*paginator[map[string]interface{}], error) { + + heavyResult := make(chan *paginator[map[string]interface{}]) + errChan := make(chan error) if len(api.cfg.HeavyServer) > 0 { log.Debug("flume_getTransactionByParticipant sent to flume heavy by default") missMeter.Mark(1) - tx, err := heavy.CallHeavy[*paginator[map[string]interface{}]](ctx, api.cfg.HeavyServer, "flume_getTransactionsByParticipant", address, offset) - if err != nil { - return nil, err - } - return *tx, nil + go func() { + tx, err := heavy.CallHeavyDiscrete[*paginator[map[string]interface{}]](ctx.Context(), api.cfg.HeavyServer, api.cfg.EarliestBlock, "flume_getTransactionsByParticipant", address, offset) + if err != nil { + log.Error("Error processing request in flume_getTransactionByParticipant", "err", err) + errChan <- err + } + heavyResult <- *tx + }() + } else { + close(heavyResult) } if offset == nil { offset = new(int) } - ctxs, err := getFlumeTransactions(ctx, api.db, *offset, 1000, api.network, "sender = ? OR recipient = ?", trimPrefix(address.Bytes()), trimPrefix(address.Bytes())) + + var ctxs []map[string]interface{} + var err error + + if ctx.Latest > 0 { + ctxs, err = getFlumeTransactions(ctx.Context(), api.db, *offset, 1000, api.network, "(sender = ? OR recipient = ?) AND block < ?", trimPrefix(address.Bytes()), trimPrefix(address.Bytes()), ctx.Latest) + } else { + ctxs, err = getFlumeTransactions(ctx.Context(), api.db, *offset, 1000, api.network, "sender = ? OR recipient = ?", trimPrefix(address.Bytes()), trimPrefix(address.Bytes())) + } if err != nil { - log.Error("Error getting txs", "err", err.Error()) + exhaustChannels[*paginator[map[string]interface{}]](heavyResult, errChan) + log.Error("Error getting txs in getTransactionByParticipant", "err", err.Error()) return nil, err } - txs, err := getPendingTransactions(ctx, api.db, api.mempool, *offset, 1000, api.network, "sender = ? OR recipient = ?", trimPrefix(address.Bytes()), trimPrefix(address.Bytes())) + + txs, err := getPendingTransactions(ctx.Context(), api.db, api.mempool, *offset, 1000, api.network, "sender = ? OR recipient = ?", trimPrefix(address.Bytes()), trimPrefix(address.Bytes())) if err != nil { - log.Error("Error getting pending txs", "err", err.Error()) + exhaustChannels[*paginator[map[string]interface{}]](heavyResult, errChan) + log.Error("Error getting pending txs in getTransactionByParticipant", "err", err.Error()) return nil, err } + ctxs = append(ctxs, txs...) - + result := paginator[map[string]interface{}]{Items: ctxs} if len(ctxs) >= 1000 { result.Token = *offset + len(ctxs) } + select { + case hr, ok := <- heavyResult: + if ok { + result.Items = append(hr.Items, result.Items...) + if result.Token == nil { + result.Token = hr.Token + } + } + case err := <- errChan: + return nil, err + } + return &result, nil } -func (api *FlumeAPI) GetTransactionReceiptsByParticipant(ctx context.Context, address common.Address, offset *int) (*paginator[map[string]interface{}], error) { +func (api *FlumeAPI) GetTransactionReceiptsByParticipant(ctx *rpc.CallContext, address common.Address, offset *int) (*paginator[map[string]interface{}], error) { + + heavyResult := make(chan *paginator[map[string]interface{}]) + errChan := make(chan error) if len(api.cfg.HeavyServer) > 0 { log.Debug("flume_getTransactionReceiptsByParticipant sent to flume heavy by default") missMeter.Mark(1) - rt, err := heavy.CallHeavy[*paginator[map[string]interface{}]](ctx, api.cfg.HeavyServer, "flume_getTransactionReceiptsByParticipant", address, offset) - if err != nil { - return nil, err - } - return *rt, nil + go func() { + rt, err := heavy.CallHeavy[*paginator[map[string]interface{}]](ctx.Context(), api.cfg.HeavyServer, "flume_getTransactionReceiptsByParticipant", address, offset) + if err != nil { + log.Error("Error processing request in flume_getTransactionReceiptsByParticipant", "err", err) + errChan <- err + } + heavyResult <- *rt + }() } if offset == nil { offset = new(int) } - receipts, err := getFlumeTransactionReceipts(ctx, api.db, *offset, 1000, api.network, "sender = ? OR recipient = ?", trimPrefix(address.Bytes()), trimPrefix(address.Bytes())) + + var receipts []map[string]interface{} + var err error + + if ctx.Latest > 0 { + receipts, err = getFlumeTransactionReceipts(ctx.Context(), api.db, *offset, 1000, api.network, "(sender = ? OR recipient = ?) AND block < ?", trimPrefix(address.Bytes()), trimPrefix(address.Bytes()), ctx.Latest) + } else { + receipts, err = getFlumeTransactionReceipts(ctx.Context(), api.db, *offset, 1000, api.network, "sender = ? OR recipient = ?", trimPrefix(address.Bytes()), trimPrefix(address.Bytes())) + } if err != nil { - log.Error("Error getting receipts", "err", err.Error()) + exhaustChannels[*paginator[map[string]interface{}]](heavyResult, errChan) + log.Error("Error getting receipts in getTransactionReceiptsByParticipant", "err", err.Error()) return nil, err } + result := paginator[map[string]interface{}]{Items: receipts} if len(receipts) == 1000 { result.Token = *offset + len(receipts) } + select { + case hr, ok := <- heavyResult: + if ok { + result.Items = append(hr.Items, result.Items...) + if result.Token == nil { + result.Token = hr.Token + } + } + case err := <- errChan: + return nil, err + } + return &result, nil } @@ -368,31 +564,40 @@ func (api *FlumeAPI) GetBlockByTransactionHash(ctx context.Context, txHash types return blockVal, nil } -func (api *FlumeAPI) BlockHashesWithPrefix(ctx context.Context, partialHexString string) ([]string, error) { +func (api *FlumeAPI) BlockHashesWithPrefix(ctx *rpc.CallContext, partialHexString string) ([]string, error) { if len(partialHexString) == 66 { return []string{partialHexString}, nil } + heavyResult := make(chan []string) + errChan := make(chan error) + if len(api.cfg.HeavyServer) > 0 { log.Debug("flume_blockHashesWithPrefix sent to flume heavy by default") missMeter.Mark(1) - hashes, err := heavy.CallHeavy[[]string](ctx, api.cfg.HeavyServer, "flume_blockHashesWithPrefix", partialHexString) - if err != nil { - log.Error("Error calling heavy server, flume_blockHashesWithPrefix", "err", err) - return nil, nil - } - return *hashes, nil + go func() { + hashes, err := heavy.CallHeavyDiscrete[[]string](ctx.Context(), api.cfg.HeavyServer, api.cfg.EarliestBlock, "flume_blockHashesWithPrefix", partialHexString) + if err != nil { + log.Error("Error calling heavy server, flume_blockHashesWithPrefix", "err", err) + errChan <- err + } + heavyResult <- *hashes + }() + } else { + close(heavyResult) } bytes, err := hex.DecodeString(partialHexString[2:]) if err != nil { + exhaustChannels[[]string](heavyResult, errChan) log.Error("Error decoding partial Hex String, flume_blockHashesWithPrefix", "err", err) return nil, err } zeros, err := countLeadingZeros(bytes) if err != nil { + exhaustChannels[[]string](heavyResult, errChan) log.Error("Error trimming input, flume_blockHashesWithPrefix") return nil, err } @@ -401,52 +606,84 @@ func (api *FlumeAPI) BlockHashesWithPrefix(ctx context.Context, partialHexString augmentedBytes := incrementLastByte(bytes) - statement := "SELECT hash FROM blocks.blocks WHERE hash > ? AND hash < ? AND LENGTH(hash) = ? LIMIT 20" - rows, err := api.db.QueryContext(ctx, statement, bytes, augmentedBytes, 32 - zeros) - if err != nil { + var statement string + var rows *sql.Rows + var sqlErr error + + if ctx.Latest > 0 { + statement = "SELECT hash FROM blocks.blocks WHERE hash > ? AND hash < ? AND LENGTH(hash) = ? AND block < ? LIMIT 20" + rows, sqlErr = api.db.QueryContext(ctx.Context(), statement, bytes, augmentedBytes, 32 - zeros, ctx.Latest) + } else { + statement = "SELECT hash FROM blocks.blocks WHERE hash > ? AND hash < ? AND LENGTH(hash) = ? LIMIT 20" + rows, sqlErr = api.db.QueryContext(ctx.Context(), statement, bytes, augmentedBytes, 32 - zeros) + } + if sqlErr != nil { + exhaustChannels[[]string](heavyResult, errChan) log.Error("Error returned from query in flume_blockHashWithPrefix", "err", err) return nil, nil } + defer rows.Close() var result []string + for rows.Next() { - var hashBytes []byte - err := rows.Scan(&hashBytes) - if err != nil { - log.Error("Error scanning rows flume_blockHashesWithPrefix") - return nil, err - } - result = append(result, hexutil.Encode(hashBytes)) + var hashBytes []byte + err := rows.Scan(&hashBytes) + if err != nil { + exhaustChannels[[]string](heavyResult, errChan) + log.Error("Error scanning rows flume_blockHashesWithPrefix") + return nil, err } + result = append(result, hexutil.Encode(hashBytes)) + } + + select { + case hr, ok := <- heavyResult: + if ok { + result = append(hr, result...) + } + case err := <- errChan: + return nil, err + } + return result, nil } -func (api *FlumeAPI) TransactionHashesWithPrefix(ctx context.Context, partialHexString string) ([]string, error) { +func (api *FlumeAPI) TransactionHashesWithPrefix(ctx *rpc.CallContext, partialHexString string) ([]string, error) { if len(partialHexString) == 66 { return []string{partialHexString}, nil } + heavyResult := make(chan []string) + errChan := make(chan error) + if len(api.cfg.HeavyServer) > 0 { log.Debug("flume_TransactionHashesWithPrefix sent to flume heavy by default") missMeter.Mark(1) - hashes, err := heavy.CallHeavy[[]string](ctx, api.cfg.HeavyServer, "flume_TransactionHashesWithPrefix", partialHexString) - if err != nil { - log.Error("Error calling heavy server, flume_transactionHashesWithPrefix", "err", err) - return nil, nil - } - return *hashes, nil + go func() { + hashes, err := heavy.CallHeavy[[]string](ctx.Context(), api.cfg.HeavyServer, "flume_TransactionHashesWithPrefix", partialHexString) + if err != nil { + log.Error("Error calling heavy server, flume_transactionHashesWithPrefix", "err", err) + errChan <- err + } + heavyResult <- *hashes + }() + } else { + close(heavyResult) } bytes, err := hex.DecodeString(partialHexString[2:]) if err != nil { + exhaustChannels[[]string](heavyResult, errChan) log.Error("Error decoding partial Hex String, flume_transactionHashesWithPrefix", "err", err) return nil, err } zeros, err := countLeadingZeros(bytes) if err != nil { + exhaustChannels[[]string](heavyResult, errChan) log.Error("Error trimming input, flume_transactionHashesWithPrefix") return nil, err } @@ -460,8 +697,9 @@ func (api *FlumeAPI) TransactionHashesWithPrefix(ctx context.Context, partialHex if api.mempool { mpStatement := "SELECT hash FROM mempool.transactions WHERE hash > ? AND hash < ? AND LENGTH(hash) = ? LIMIT 20" - mpRows, err := api.db.QueryContext(ctx, mpStatement, bytes, augmentedBytes, 32 - zeros) + mpRows, err := api.db.QueryContext(ctx.Context(), mpStatement, bytes, augmentedBytes, 32 - zeros) if err != nil { + exhaustChannels[[]string](heavyResult, errChan) log.Error("Error returned from mempool query in flume_transactionHashesWithPrefix", "err", err) return nil, nil } @@ -470,58 +708,88 @@ func (api *FlumeAPI) TransactionHashesWithPrefix(ctx context.Context, partialHex var hashBytes []byte err := mpRows.Scan(&hashBytes) if err != nil { + exhaustChannels[[]string](heavyResult, errChan) log.Error("Error scanning mempool rows flume_transactionHashesWithPrefix") return nil, err } result = append(result, hexutil.Encode(hashBytes)) } } - txStatement := "SELECT hash FROM transactions.transactions WHERE hash > ? AND hash < ? AND LENGTH(hash) = ? LIMIT 20" - - txRows, err := api.db.QueryContext(ctx, txStatement, bytes, augmentedBytes, 32 - zeros) - if err != nil { + + var txStatement string + var txRows *sql.Rows + var sqlErr error + + if ctx.Latest > 0 { + txStatement = "SELECT hash FROM transactions.transactions WHERE hash > ? AND hash < ? AND LENGTH(hash) = ? AND block < ?LIMIT 20" + txRows, sqlErr = api.db.QueryContext(ctx.Context(), txStatement, bytes, augmentedBytes, 32 - zeros, ctx.Latest) + } else { + txStatement = "SELECT hash FROM transactions.transactions WHERE hash > ? AND hash < ? AND LENGTH(hash) = ? LIMIT 20" + txRows, sqlErr = api.db.QueryContext(ctx.Context(), txStatement, bytes, augmentedBytes, 32 - zeros) + } + if sqlErr != nil { + exhaustChannels[[]string](heavyResult, errChan) log.Error("Error returned from transaction query in flume_transactionHashesWithPrefix", "err", err) return nil, nil } + defer txRows.Close() for txRows.Next() { - var hashBytes []byte - err := txRows.Scan(&hashBytes) - if err != nil { - log.Error("Error scanning transaction rows flume_transactionHashesWithPrefix") - return nil, err - } - result = append(result, hexutil.Encode(hashBytes)) + var hashBytes []byte + err := txRows.Scan(&hashBytes) + if err != nil { + log.Error("Error scanning transaction rows flume_transactionHashesWithPrefix") + return nil, err } + result = append(result, hexutil.Encode(hashBytes)) + } + + select { + case hr, ok := <- heavyResult: + if ok { + result = append(hr, result...) + } + case err := <- errChan: + return nil, err +} return result, nil } -func (api *FlumeAPI) AddressWithPrefix(ctx context.Context, partialHexString string) ([]string, error) { +func (api *FlumeAPI) AddressWithPrefix(ctx *rpc.CallContext, partialHexString string) ([]string, error) { if len(partialHexString) == 42 { return []string{partialHexString}, nil } + heavyResult := make(chan []string) + errChan := make(chan error) + if len(api.cfg.HeavyServer) > 0 { log.Debug("flume_addressWithPrefix sent to flume heavy by default") missMeter.Mark(1) - addresses, err := heavy.CallHeavy[[]string](ctx, api.cfg.HeavyServer, "flume_addressWithPrefix", partialHexString) - if err != nil { - log.Error("Error calling heavy server, flume_addressWithPrefix", "err", err) - return nil, nil - } - return *addresses, nil + go func() { + addresses, err := heavy.CallHeavyDiscrete[[]string](ctx.Context(), api.cfg.HeavyServer, api.cfg.EarliestBlock, "flume_addressWithPrefix", partialHexString) + if err != nil { + log.Error("Error calling heavy server, flume_addressWithPrefix", "err", err) + errChan <- err + } + heavyResult <- *addresses + }() + } else { + close(heavyResult) } bytes, err := hex.DecodeString(partialHexString[2:]) if err != nil { + exhaustChannels[[]string](heavyResult, errChan) log.Error("Error decoding partial Hex String, flume_addressWithPrefix", "err", err) return nil, err } zeros, err := countLeadingZeros(bytes) if err != nil { + exhaustChannels[[]string](heavyResult, errChan) log.Error("Error trimming input, flume_addressWithPrefix") return nil, err } @@ -530,9 +798,19 @@ func (api *FlumeAPI) AddressWithPrefix(ctx context.Context, partialHexString str augmentedBytes := incrementLastByte(bytes) - statement := "SELECT DISTINCT(address) FROM event_logs WHERE address > ? AND address < ? AND LENGTH(address) = ? LIMIT 20" - rows, err := api.db.QueryContext(ctx, statement, bytes, augmentedBytes, 20 - zeros) - if err != nil { + var statement string + var rows *sql.Rows + var sqlErr error + + if ctx.Latest > 0 { + statement = "SELECT DISTINCT(address) FROM event_logs WHERE address > ? AND address < ? AND LENGTH(address) = ? AND block < 3LIMIT 20" + rows, sqlErr = api.db.QueryContext(ctx.Context(), statement, bytes, augmentedBytes, 20 - zeros, ctx.Latest) + } else { + statement = "SELECT DISTINCT(address) FROM event_logs WHERE address > ? AND address < ? AND LENGTH(address) = ? LIMIT 20" + rows, sqlErr = api.db.QueryContext(ctx.Context(), statement, bytes, augmentedBytes, 20 - zeros) + } + if sqlErr != nil { + exhaustChannels[[]string](heavyResult, errChan) log.Error("Error returned from query in flume_addressWithPrefix", "err", err) return nil, nil } @@ -542,48 +820,81 @@ func (api *FlumeAPI) AddressWithPrefix(ctx context.Context, partialHexString str var addressBytes []byte err := rows.Scan(&addressBytes) if err != nil { + exhaustChannels[[]string](heavyResult, errChan) log.Error("Error scanning rows flume_addressWithPrefix") return nil, err } result = append(result, hexutil.Encode(addressBytes)) } - + + select { + case hr, ok := <- heavyResult: + if ok { + result = append(hr, result...) + } + case err := <- errChan: + return nil, err + } + return result, nil } -func (api *FlumeAPI) ResolvePrefix(ctx context.Context, partialHexString string) (map[string]interface{}, error) { +func (api *FlumeAPI) ResolvePrefix(ctx *rpc.CallContext, partialHexString string) (map[string][]string, error) { + + heavyResult := make(chan map[string][]string) + errChan := make(chan error) if len(api.cfg.HeavyServer) > 0 { log.Debug("flume_resolvePrefix sent to flume heavy by default") missMeter.Mark(1) - hashes, err := heavy.CallHeavy[map[string]interface{}](ctx, api.cfg.HeavyServer, "flume_resolvePrefix", partialHexString) - if err != nil { - return nil, err - } - return *hashes, nil + go func() { + hashes, err := heavy.CallHeavyDiscrete[map[string][]string](ctx.Context(), api.cfg.HeavyServer, api.cfg.EarliestBlock, "flume_resolvePrefix", partialHexString) + if err != nil { + log.Error("Error calling heavy server, flume_ResolvePrefix", "err", err) + errChan <- err + } + heavyResult <- *hashes + }() + } else { + close(heavyResult) } blockHashes, err := api.BlockHashesWithPrefix(ctx, partialHexString) if err != nil { + exhaustChannels[map[string][]string](heavyResult, errChan) log.Error("Error returned from flume_blockHashesWithPrefix, flume_resolvePrefix", "err", err) return nil, err } txHashes, err := api.TransactionHashesWithPrefix(ctx, partialHexString) if err != nil { + exhaustChannels[map[string][]string](heavyResult, errChan) log.Error("Error returned from flume_transactionHashesWithPrefix, flume_resolvePrefix", "err", err) return nil, err } addresses, err := api.AddressWithPrefix(ctx, partialHexString) if err != nil { + exhaustChannels[map[string][]string](heavyResult, errChan) log.Error("Error returned from flume_addressWithPrefix, flume_resolvePrefix", "err", err) return nil, err } - result := map[string]interface{}{ + result := map[string][]string{ "blockHashes": blockHashes, "transactionHashes": txHashes, "addresses": addresses, } + select { + case hr, ok := <- heavyResult: + if ok { + result["blockHashes"] = append(hr["blockHashes"], result["blockHashes"]...) + result["transactionHashes"] = append(hr["transactionHashes"], result["transactionHashes"]...) + result["addresses"] = append(hr["addresses"], result["addresses"]...) + } + case err := <- errChan: + return nil, err + } + + return result, nil } \ No newline at end of file diff --git a/heavy/heavy.go b/heavy/heavy.go index 5429fb1..fc80ea5 100644 --- a/heavy/heavy.go +++ b/heavy/heavy.go @@ -3,6 +3,7 @@ package heavy import ( "bytes" "context" + "strconv" "encoding/json" "io/ioutil" "net" @@ -38,7 +39,15 @@ var client = &http.Client{Transport: &http.Transport{ ExpectContinueTimeout: 1 * time.Second, }} +func CallHeavyDiscrete[T any](ctx context.Context, backendURL string, cutoffBlock uint64, method string, params ...interface{}) (*T, error) { + return callHeavy[T](ctx, backendURL, &cutoffBlock, method, params...) +} + func CallHeavy[T any](ctx context.Context, backendURL string, method string, params ...interface{}) (*T, error) { + return callHeavy[T](ctx, backendURL, nil, method, params...) +} + +func callHeavy[T any](ctx context.Context, backendURL string, cutoffBlock *uint64, method string, params ...interface{}) (*T, error) { if backendURL == "mock" { return nil, &MockError{ @@ -56,6 +65,10 @@ func CallHeavy[T any](ctx context.Context, backendURL string, method string, par request, _ := http.NewRequestWithContext(ctx, "POST", backendURL, bytes.NewReader(callBytes)) request.Header.Add("Content-Type", "application/json") + if cutoffBlock != nil && *cutoffBlock > 0 { + request.Header.Add("X-Cardinal-Latest", strconv.Itoa(int(*cutoffBlock))) + } + log.Debug("call heavy request", "method", "POST", "url", backendURL, "headers", request.Header) resp, err := client.Do(request) From e9f2602298aab943bd9746b0ba671184b49ef0e4 Mon Sep 17 00:00:00 2001 From: philip-morlier Date: Wed, 7 Feb 2024 15:08:32 -0800 Subject: [PATCH 05/19] Composition, parallelization, and trimming added to getLogs --- api/logs.go | 65 ++++++++++++++++++++++++++++++++++++++-------------- api/types.go | 4 ++-- 2 files changed, 50 insertions(+), 19 deletions(-) diff --git a/api/logs.go b/api/logs.go index 86eab85..2d86b3f 100644 --- a/api/logs.go +++ b/api/logs.go @@ -9,6 +9,7 @@ import ( "time" log "github.com/inconshreveable/log15" + "github.com/openrelayxyz/cardinal-rpc" "github.com/openrelayxyz/cardinal-types" "github.com/openrelayxyz/cardinal-types/hexutil" "github.com/openrelayxyz/cardinal-types/metrics" @@ -38,13 +39,20 @@ var ( glgMissMeter = metrics.NewMinorMeter("/flume/glg/miss") ) -func (api *LogsAPI) GetLogs(ctx context.Context, crit FilterQuery) ([]*logType, error) { +func (api *LogsAPI) trimFilterQuery(fq FilterQuery) { + if fq.BlockHash == nil { + var tb *rpc.BlockNumber + val := rpc.BlockNumber(api.cfg.EarliestBlock - 1) + tb = &val + fq.ToBlock = tb + } +} - log.Debug("filter query", "fq", crit) +func (api *LogsAPI) GetLogs(ctx context.Context, crit FilterQuery) ([]*logType, error) { latestBlock, err := getLatestBlock(ctx, api.db) if err != nil { - log.Error("Error retrieving latest block, call.ID, 500", "err", err.Error()) + log.Error("Error retrieving latest block, call.ID, 500", "err", err) return nil, err } @@ -81,15 +89,25 @@ func (api *LogsAPI) GetLogs(ctx context.Context, crit FilterQuery) ([]*logType, } } + heavyResult := make(chan []*logType,) + errChan := make(chan error) + if goHeavy && len(api.cfg.HeavyServer) > 0 { + + api.trimFilterQuery(crit) log.Debug("eth_getLogs sent to flume heavy") missMeter.Mark(1) glgMissMeter.Mark(1) - logs, err := heavy.CallHeavy[[]*logType](ctx, api.cfg.HeavyServer, "eth_getLogs", crit) - if err != nil { - return nil, err - } - return *logs, nil + go func() { + logs, err := heavy.CallHeavy[[]*logType](ctx, api.cfg.HeavyServer, "eth_getLogs", crit) + if err != nil { + log.Error("Error processing request in eth_getLogs", "err", err) + errChan <- err + } + heavyResult <- *logs + }() + } else { + close(heavyResult) } if len(api.cfg.HeavyServer) > 0 { @@ -122,6 +140,7 @@ func (api *LogsAPI) GetLogs(ctx context.Context, crit FilterQuery) ([]*logType, if len(topicsClause) > 0 { whereClause = append(whereClause, fmt.Sprintf("(%v)", strings.Join(topicsClause, " AND "))) } + query := fmt.Sprintf("SELECT address, topic0, topic1, topic2, topic3, data, block, transactionHash, transactionIndex, blockHash, logIndex FROM event_logs %v WHERE %v;", indexClause, strings.Join(whereClause, " AND ")) pluginMethods := api.pl.Lookup("AppendBorLogs", func(v interface{}) bool { _, ok := v.(func(string, string, []interface{}) (string, []interface{})) @@ -144,8 +163,9 @@ func (api *LogsAPI) GetLogs(ctx context.Context, crit FilterQuery) ([]*logType, }() rows, err := api.db.QueryContext(ctx, query, params...) if err != nil { - log.Error("Error selecting query", "query", query, "err", err.Error()) - return nil, err + exhaustChannels[[]*logType](heavyResult, errChan) + log.Error("Error selecting query", "query", query, "err", err) + return nil, fmt.Errorf("database error") } defer rows.Close() logs := sortLogs{} @@ -156,8 +176,8 @@ func (api *LogsAPI) GetLogs(ctx context.Context, crit FilterQuery) ([]*logType, var transactionIndex, logIndex uint err := rows.Scan(&address, &topic0, &topic1, &topic2, &topic3, &data, &blockNumber, &transactionHash, &transactionIndex, &blockHash, &logIndex) if err != nil { - log.Error("Error scanning", "err", err.Error()) - // handleError("database error", call.ID, 500) + exhaustChannels[[]*logType](heavyResult, errChan) + log.Error("Error scanning", "err", err) return nil, fmt.Errorf("database error") } blockNumbersInResponse[blockNumber] = struct{}{} @@ -176,8 +196,8 @@ func (api *LogsAPI) GetLogs(ctx context.Context, crit FilterQuery) ([]*logType, } input, err := decompress(data) if err != nil { - log.Error("Error decompressing data", "err", err.Error()) - // handleError("database error", call.ID, 500) + exhaustChannels[[]*logType](heavyResult, errChan) + log.Error("Error decompressing data in getLogs", "err", err) return nil, fmt.Errorf("database error") } logs = append(logs, &logType{ @@ -191,16 +211,27 @@ func (api *LogsAPI) GetLogs(ctx context.Context, crit FilterQuery) ([]*logType, Index: hexutil.Uint(logIndex), }) if len(logs) > 10000 && len(blockNumbersInResponse) > 1 { - // handleError("query returned more than 10,000 results spanning multiple blocks", call.ID, 413) + exhaustChannels[[]*logType](heavyResult, errChan) return nil, fmt.Errorf("query returned more than 10,000 results spanning multiple blocks") } } if err := rows.Err(); err != nil { - log.Error("Error scanning", "err", err.Error()) - // handleError("database error", call.ID, 500) + exhaustChannels[[]*logType](heavyResult, errChan) + log.Error("Error scanning rows getLogs", "err", err) return nil, fmt.Errorf("database error") } + sort.Sort(logs) + select { + case hr, ok := <- heavyResult: + if ok { + logs = append(hr, logs...) + } + case err := <- errChan: + return nil, err + } + + return logs, nil } diff --git a/api/types.go b/api/types.go index a54331a..af6c83b 100644 --- a/api/types.go +++ b/api/types.go @@ -231,8 +231,8 @@ type rpcTransaction struct { type FilterQuery struct { BlockHash *types.Hash // used by eth_getLogs, return logs only from block with this hash - FromBlock *rpc.BlockNumber // beginning of the queried range, nil means genesis block - ToBlock *rpc.BlockNumber // end of the range, nil means latest block + FromBlock *rpc.BlockNumber // beginning of the queried range, nil means latest block, oldest block in query + ToBlock *rpc.BlockNumber // end of the range, nil means latest block, newest block in query Addresses []common.Address // restricts matches to events created by specific contracts // The Topic list restricts matches to particular event topics. Each event has a list From eeddfe022d0c26ec7b3fcf308911d35d740c46f1 Mon Sep 17 00:00:00 2001 From: philip-morlier Date: Wed, 7 Feb 2024 21:31:00 -0800 Subject: [PATCH 06/19] Added deduping, parallelization, and result composition to flume token api --- api/flume.go | 10 ---- api/flume_tokens.go | 119 ++++++++++++++++++++++++++++++++++---------- api/utils.go | 26 ++++++++++ 3 files changed, 118 insertions(+), 37 deletions(-) diff --git a/api/flume.go b/api/flume.go index 19054e0..bc4b523 100644 --- a/api/flume.go +++ b/api/flume.go @@ -65,16 +65,6 @@ func (api *FlumeAPI) HeavyClientVersion(ctx context.Context) (string, error) { return name, nil } -func exhaustChannels[T any](ch chan T, errChan chan error) { - go func() { - select { - case <- ch: - case <- errChan: - } - }() -} - - func (api *FlumeAPI) GetTransactionsBySender(ctx *rpc.CallContext, address common.Address, offset *int) (*paginator[map[string]interface{}], error) { heavyResult := make(chan *paginator[map[string]interface{}]) diff --git a/api/flume_tokens.go b/api/flume_tokens.go index a8cf97b..21e692d 100644 --- a/api/flume_tokens.go +++ b/api/flume_tokens.go @@ -8,6 +8,7 @@ import ( log "github.com/inconshreveable/log15" "github.com/openrelayxyz/cardinal-evm/common" + "github.com/openrelayxyz/cardinal-rpc" "github.com/openrelayxyz/cardinal-types" "github.com/openrelayxyz/cardinal-flume/config" "github.com/openrelayxyz/cardinal-flume/heavy" @@ -30,30 +31,47 @@ func NewFlumeTokensAPI(db *sql.DB, network uint64, pl *plugins.PluginLoader, cfg } } -func (api *FlumeTokensAPI) Erc20ByAccount(ctx context.Context, addr common.Address, offset *int) (*paginator[common.Address], error) { +func (api *FlumeTokensAPI) Erc20ByAccount(ctx *rpc.CallContext, addr common.Address, offset *int) (*paginator[common.Address], error) { if offset == nil { offset = new(int) } + heavyResult := make(chan *paginator[common.Address]) + errChan := make(chan error) + if len(api.cfg.HeavyServer) > 0 { log.Debug("flume_erc20ByAccount sent to flume heavy by default") missMeter.Mark(1) - address, err := heavy.CallHeavy[*paginator[common.Address]](ctx, api.cfg.HeavyServer, "flume_erc20ByAccount", addr, offset) - if err != nil { - return nil, err - } - return *address, nil + go func() { + address, err := heavy.CallHeavyDiscrete[*paginator[common.Address]](ctx.Context(), api.cfg.HeavyServer, api.cfg.EarliestBlock, "flume_erc20ByAccount", addr, offset) + if err != nil { + log.Error("Error processing request in flume_erc20ByAccount", "err", err) + errChan <- err + } + heavyResult <- *address + }() + } else { + close(heavyResult) } - tctx, cancel := context.WithTimeout(ctx, 5*time.Second) + tctx, cancel := context.WithTimeout(ctx.Context(), 5*time.Second) defer cancel() topic0 := types.HexToHash("0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef") - rows, err := api.db.QueryContext(tctx, `SELECT distinct(address) FROM event_logs INDEXED BY topic2_partial WHERE topic0 = ? AND topic2 = ? AND topic3 IS NULL LIMIT 1000 OFFSET ?;`, trimPrefix(topic0.Bytes()), trimPrefix(addr.Bytes()), offset) + + var rows *sql.Rows + var err error + + if ctx.Latest > 0 { + rows, err = api.db.QueryContext(tctx, `SELECT distinct(address) FROM event_logs INDEXED BY topic2_partial WHERE topic0 = ? AND topic2 = ? AND topic3 IS NULL AND block < ? LIMIT 1000 OFFSET ?;`, trimPrefix(topic0.Bytes()), trimPrefix(addr.Bytes()), ctx.Latest, offset) + } else { + rows, err = api.db.QueryContext(tctx, `SELECT distinct(address) FROM event_logs INDEXED BY topic2_partial WHERE topic0 = ? AND topic2 = ? AND topic3 IS NULL LIMIT 1000 OFFSET ?;`, trimPrefix(topic0.Bytes()), trimPrefix(addr.Bytes()), offset) + } if err != nil { - log.Error("Error getting account addresses", "err", err.Error()) - return nil, err + exhaustChannels[*paginator[common.Address]](heavyResult, errChan) + log.Error("Error getting account addresses erc20ByAccount", "err", err) + return nil, fmt.Errorf("database error") } defer rows.Close() addresses := []common.Address{} @@ -61,48 +79,80 @@ func (api *FlumeTokensAPI) Erc20ByAccount(ctx context.Context, addr common.Addre var addrBytes []byte err := rows.Scan(&addrBytes) if err != nil { - log.Error("Query Error", "err", err.Error()) - return nil, err + exhaustChannels[*paginator[common.Address]](heavyResult, errChan) + log.Error("scan error erc20ByAccount", "err", err) + return nil, fmt.Errorf("database error") } addresses = append(addresses, bytesToAddress(addrBytes)) } if err := rows.Err(); err != nil { - log.Error("Query Error", "err", err.Error()) - return nil, err + exhaustChannels[*paginator[common.Address]](heavyResult, errChan) + log.Error("Row error erc20ByAccount", "err", err) + return nil, fmt.Errorf("database error") } result := paginator[common.Address]{Items: addresses} if len(addresses) == 1000 { result.Token = *offset + len(addresses) } + + select { + case hr, ok := <- heavyResult: + if ok { + result.Items = dedup[common.Address](hr.Items, result.Items) + if result.Token == nil { + result.Token = hr.Token + } + } + case err := <- errChan: + return nil, err + } + return &result, nil } -func (api *FlumeTokensAPI) Erc20Holders(ctx context.Context, addr common.Address, offset *int) (*paginator[common.Address], error) { +func (api *FlumeTokensAPI) Erc20Holders(ctx *rpc.CallContext, addr common.Address, offset *int) (*paginator[common.Address], error) { if offset == nil { offset = new(int) } + + heavyResult := make(chan *paginator[common.Address]) + errChan := make(chan error) if len(api.cfg.HeavyServer) > 0 { log.Debug("flume_erc20Holders sent to flume heavy by default") missMeter.Mark(1) - address, err := heavy.CallHeavy[*paginator[common.Address]](ctx, api.cfg.HeavyServer, "flume_erc20Holders", addr, offset) - if err != nil { - return nil, err - } - return *address, nil + go func() { + address, err := heavy.CallHeavyDiscrete[*paginator[common.Address]](ctx.Context(), api.cfg.HeavyServer, api.cfg.EarliestBlock, "flume_erc20Holders", addr, offset) + if err != nil { + log.Error("Error processing request in flume_erc20Holders", "err", err) + errChan <- err + } + heavyResult <- *address + }() + } else { + close(heavyResult) } - tctx, cancel := context.WithTimeout(ctx, 5*time.Second) + tctx, cancel := context.WithTimeout(ctx.Context(), 5*time.Second) defer cancel() topic0 := types.HexToHash("0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef") // topic0 must match ERC20, topic3 must be empty (to exclude ERC721) and topic2 is the recipient address - rows, err := api.db.QueryContext(tctx, `SELECT distinct(topic2) FROM logs.event_logs INDEXED BY address_compound WHERE topic0 = ? AND address = ? AND topic3 IS NULL LIMIT 1000 OFFSET ?;`, trimPrefix(topic0.Bytes()), trimPrefix(addr.Bytes()), offset) + + var rows *sql.Rows + var err error + + if ctx.Latest > 0 { + rows, err = api.db.QueryContext(tctx, `SELECT distinct(topic2) FROM logs.event_logs INDEXED BY address_compound WHERE topic0 = ? AND address = ? AND block < ? AND topic3 IS NULL LIMIT 1000 OFFSET ?;`, trimPrefix(topic0.Bytes()), trimPrefix(addr.Bytes()), ctx.Latest, offset) + } else { + rows, err = api.db.QueryContext(tctx, `SELECT distinct(topic2) FROM logs.event_logs INDEXED BY address_compound WHERE topic0 = ? AND address = ? AND topic3 IS NULL LIMIT 1000 OFFSET ?;`, trimPrefix(topic0.Bytes()), trimPrefix(addr.Bytes()), offset) + } if err != nil { - log.Error("Error getting account addresses", "err", err.Error()) - return nil, err + exhaustChannels[*paginator[common.Address]](heavyResult, errChan) + log.Error("Error getting account addresses erc20Holders", "err", err) + return nil, fmt.Errorf("database error") } defer rows.Close() addresses := []common.Address{} @@ -110,18 +160,33 @@ func (api *FlumeTokensAPI) Erc20Holders(ctx context.Context, addr common.Address var addrBytes []byte err := rows.Scan(&addrBytes) if err != nil { - log.Error("Query Error", "err", err.Error()) - return nil, err + exhaustChannels[*paginator[common.Address]](heavyResult, errChan) + log.Error("scan error erc20Holders", "err", err) + return nil, fmt.Errorf("database error") } addresses = append(addresses, bytesToAddress(addrBytes)) } if err := rows.Err(); err != nil { - log.Error("Query Error", "err", err.Error()) + exhaustChannels[*paginator[common.Address]](heavyResult, errChan) + log.Error("row error erc20Holders", "err", err) return nil, fmt.Errorf("database error") } result := paginator[common.Address]{Items: addresses} if len(addresses) == 1000 { result.Token = *offset + len(addresses) } + + select { + case hr, ok := <- heavyResult: + if ok { + result.Items = dedup[common.Address](hr.Items, result.Items) + if result.Token == nil { + result.Token = hr.Token + } + } + case err := <- errChan: + return nil, err + } + return &result, nil } diff --git a/api/utils.go b/api/utils.go index 3e00b31..762e641 100644 --- a/api/utils.go +++ b/api/utils.go @@ -29,6 +29,32 @@ var ( zeroInputError = errors.New("Input must contain non zero characters") ) +func dedup[T comparable](sliceA, sliceB []T) []T { + set := make(map[T]bool) + + for _, slice := range [][]T{sliceA, sliceB} { + for _, item := range slice { + set[item] = true + } + } + + unique := make([]T, 0, len(set)) + for item := range set { + unique = append(unique, item) + } + + return unique +} + +func exhaustChannels[T any](ch chan T, errChan chan error) { + go func() { + select { + case <- ch: + case <- errChan: + } + }() +} + func blockDataPresent(input interface{}, cfg *config.Config, db *sql.DB) bool { present := true switch input.(type) { From ade81bdffe8624cc51e35ee1aff2f83983e4a80a Mon Sep 17 00:00:00 2001 From: philip-morlier Date: Thu, 8 Feb 2024 13:28:00 -0800 Subject: [PATCH 07/19] Changes made in tests to enable flume archive Also api changes due to cases caught by tests --- api/block.go | 1 + api/blockAPI_test.go | 5 +---- api/flume.go | 44 ++++++++++++++++++++++++++++--------- api/flumeAPI_test.go | 22 ++++++++++++++----- api/flume_tokens.go | 8 +++++-- api/flume_tokensAPI_test.go | 5 ++--- api/heavy_test.go | 28 +++++++++++------------ api/logs.go | 4 +++- 8 files changed, 77 insertions(+), 40 deletions(-) diff --git a/api/block.go b/api/block.go index b411b25..ef546d6 100644 --- a/api/block.go +++ b/api/block.go @@ -5,6 +5,7 @@ import ( "database/sql" log "github.com/inconshreveable/log15" + "github.com/openrelayxyz/cardinal-evm/rlp" "github.com/openrelayxyz/cardinal-rpc" "github.com/openrelayxyz/cardinal-types" diff --git a/api/blockAPI_test.go b/api/blockAPI_test.go index f3f4acc..dba4b9b 100644 --- a/api/blockAPI_test.go +++ b/api/blockAPI_test.go @@ -191,10 +191,7 @@ func TestBlockNumber(t *testing.T) { pl, _ := plugins.NewPluginLoader(cfg) b := NewBlockAPI(db, 1, pl, cfg) expectedResult, _ := hexutil.DecodeUint64("0xd59f95") - test, err := b.BlockNumber(context.Background()) - if err != nil { - t.Fatalf(err.Error()) - } + test := b.BlockNumber(context.Background()) if test != hexutil.Uint64(expectedResult) { t.Fatalf("BlockNumber() result not accurate") } diff --git a/api/flume.go b/api/flume.go index bc4b523..a2d0810 100644 --- a/api/flume.go +++ b/api/flume.go @@ -79,7 +79,9 @@ func (api *FlumeAPI) GetTransactionsBySender(ctx *rpc.CallContext, address commo log.Error("Error processing request in flume_getTransactionsBySender", "err", err) errChan <- err } - heavyResult <- *tx + if tx != nil { + heavyResult <- *tx + } }() } else { close(heavyResult) @@ -145,7 +147,9 @@ func (api *FlumeAPI) GetTransactionReceiptsBySender(ctx *rpc.CallContext, addres log.Error("Error processing request in flume_getTransactionReceiptsBySender", "err", err) errChan <- err } - heavyResult <- *rt + if rt != nil { + heavyResult <- *rt + } }() } else { close(heavyResult) @@ -204,7 +208,9 @@ func (api *FlumeAPI) GetTransactionsByRecipient(ctx *rpc.CallContext, address co log.Error("Error processing request in flume_getTransactionsByRecipient", "err", err) errChan <- err } - heavyResult <- *tx + if tx != nil { + heavyResult <- *tx + } }() } else { close(heavyResult) @@ -270,7 +276,9 @@ func (api *FlumeAPI) GetTransactionReceiptsByRecipient(ctx *rpc.CallContext, add log.Error("Error processing request in flume_getTransactionReceiptsByRecipient", "err", err) errChan <- err } - heavyResult <- *rt + if rt != nil { + heavyResult <- *rt + } }() } else { close(heavyResult) @@ -329,7 +337,9 @@ func (api *FlumeAPI) GetTransactionsByParticipant(ctx *rpc.CallContext, address log.Error("Error processing request in flume_getTransactionByParticipant", "err", err) errChan <- err } - heavyResult <- *tx + if tx != nil { + heavyResult <- *tx + } }() } else { close(heavyResult) @@ -396,8 +406,14 @@ func (api *FlumeAPI) GetTransactionReceiptsByParticipant(ctx *rpc.CallContext, a log.Error("Error processing request in flume_getTransactionReceiptsByParticipant", "err", err) errChan <- err } - heavyResult <- *rt + if rt == nil { + return + } else { + heavyResult <- *rt + } }() + } else { + close(heavyResult) } if offset == nil { @@ -572,7 +588,9 @@ func (api *FlumeAPI) BlockHashesWithPrefix(ctx *rpc.CallContext, partialHexStrin log.Error("Error calling heavy server, flume_blockHashesWithPrefix", "err", err) errChan <- err } - heavyResult <- *hashes + if hashes != nil { + heavyResult <- *hashes + } }() } else { close(heavyResult) @@ -658,7 +676,9 @@ func (api *FlumeAPI) TransactionHashesWithPrefix(ctx *rpc.CallContext, partialHe log.Error("Error calling heavy server, flume_transactionHashesWithPrefix", "err", err) errChan <- err } - heavyResult <- *hashes + if hashes != nil { + heavyResult <- *hashes + } }() } else { close(heavyResult) @@ -764,7 +784,9 @@ func (api *FlumeAPI) AddressWithPrefix(ctx *rpc.CallContext, partialHexString st log.Error("Error calling heavy server, flume_addressWithPrefix", "err", err) errChan <- err } - heavyResult <- *addresses + if addresses != nil { + heavyResult <- *addresses + } }() } else { close(heavyResult) @@ -843,7 +865,9 @@ func (api *FlumeAPI) ResolvePrefix(ctx *rpc.CallContext, partialHexString string log.Error("Error calling heavy server, flume_ResolvePrefix", "err", err) errChan <- err } - heavyResult <- *hashes + if hashes != nil { + heavyResult <- *hashes + } }() } else { close(heavyResult) diff --git a/api/flumeAPI_test.go b/api/flumeAPI_test.go index 3bf8d37..f645b01 100644 --- a/api/flumeAPI_test.go +++ b/api/flumeAPI_test.go @@ -4,10 +4,13 @@ import ( "bytes" "context" "encoding/json" + // "net/http" "fmt" "testing" "os" + // log "github.com/inconshreveable/log15" + "github.com/openrelayxyz/cardinal-evm/common" "github.com/openrelayxyz/cardinal-types" "github.com/openrelayxyz/cardinal-rpc" @@ -145,7 +148,14 @@ func (ms sortTxMap) TransactionIndexes() []hexutil.Uint64 { return result } +var mockContext *rpc.CallContext = rpc.NewContext(context.Background()) + func TestFlumeAPI(t *testing.T) { + + // go func() { + // log.Info("pprof running", "port", http.ListenAndServe("localhost:6060", nil)) + // }() + cfg, err := config.LoadConfig("../testing-resources/api_test_config.yml") if err != nil { t.Fatal("Error parsing config TestFlumeAPI", "err", err.Error()) @@ -223,7 +233,7 @@ func TestFlumeAPI(t *testing.T) { t.Fatalf("sender transactions list of incorrect length expected 47 got %v", len(senderTxns)) } t.Run(fmt.Sprintf("GetTransactionsBySender"), func(t *testing.T) { - actual, _ := f.GetTransactionsBySender(context.Background(), sender, nil) + actual, _ := f.GetTransactionsBySender(mockContext, sender, nil) if len(actual.Items) != len(senderTxns) { t.Fatalf("length error getTransactionsBySender on address %v", sender) } @@ -248,7 +258,7 @@ func TestFlumeAPI(t *testing.T) { t.Fatalf("sender transactions list of incorrect length expected 47 got %v", len(senderReceipts)) } t.Run(fmt.Sprintf("GetTransactionReceiptsBySender"), func(t *testing.T) { - actual, _ := f.GetTransactionReceiptsBySender(context.Background(), sender, nil) + actual, _ := f.GetTransactionReceiptsBySender(mockContext, sender, nil) if len(actual.Items) != len(senderReceipts) { t.Fatalf("getTransactionReceiptsBySender result of incorrect length expected %v got %v", len(actual.Items), len(senderReceipts)) } @@ -277,7 +287,7 @@ func TestFlumeAPI(t *testing.T) { t.Fatalf("recipient transactions list of incorrect length expected 107 got %v", len(recipientTxns)) } t.Run(fmt.Sprintf("GetTransactionsByRecipient"), func(t *testing.T) { - actual, _ := f.GetTransactionsByRecipient(context.Background(), recipient, nil) + actual, _ := f.GetTransactionsByRecipient(mockContext, recipient, nil) if len(actual.Items) != len(recipientTxns) { t.Fatalf("getTransactionsByRecipient result of incorrect length expected %v got %v", len(actual.Items), len(recipientTxns)) } @@ -302,7 +312,7 @@ func TestFlumeAPI(t *testing.T) { t.Fatalf("recipient transactions list of incorrect length expected 107 got %v", len(recipientReceipts)) } t.Run(fmt.Sprintf("GetTransactionsReceiptsByRecipient"), func(t *testing.T) { - actual, _ := f.GetTransactionReceiptsByRecipient(context.Background(), recipient, nil) + actual, _ := f.GetTransactionReceiptsByRecipient(mockContext, recipient, nil) if len(actual.Items) != len(recipientReceipts) { t.Fatalf("getTransactionReceiptsByRecipient result of incorrect length expected %v got %v", len(actual.Items), len(recipientReceipts)) } @@ -328,7 +338,7 @@ func TestFlumeAPI(t *testing.T) { participantTxns := getParticipantTransactionList(blockObject, genericAddr, "to", "from") participant := common.HexToAddress(genericAddr) t.Run(fmt.Sprintf("GetTransactionsByParicipant"), func(t *testing.T) { - actual, _ := f.GetTransactionsByParticipant(context.Background(), participant, nil) + actual, _ := f.GetTransactionsByParticipant(mockContext, participant, nil) if len(actual.Items) != len(participantTxns) { t.Fatalf("getTransactionsByParticipant result of incorrect length expected %v got %v", len(actual.Items), len(participantTxns)) } @@ -353,7 +363,7 @@ func TestFlumeAPI(t *testing.T) { }) participantReceipts := getParticipantReceiptList(receiptObject, genericAddr, "to", "from") t.Run(fmt.Sprintf("GetTransactionsReceiptsByParticipant"), func(t *testing.T) { - actual, _ := f.GetTransactionReceiptsByParticipant(context.Background(), participant, nil) + actual, _ := f.GetTransactionReceiptsByParticipant(mockContext, participant, nil) if len(actual.Items) != len(participantReceipts) { t.Fatalf("getTransactionReceiptsByParticipant result of incorrect length expected %v got %v", len(actual.Items), len(participantReceipts)) } diff --git a/api/flume_tokens.go b/api/flume_tokens.go index 21e692d..0c89a4d 100644 --- a/api/flume_tokens.go +++ b/api/flume_tokens.go @@ -49,7 +49,9 @@ func (api *FlumeTokensAPI) Erc20ByAccount(ctx *rpc.CallContext, addr common.Addr log.Error("Error processing request in flume_erc20ByAccount", "err", err) errChan <- err } - heavyResult <- *address + if address != nil { + heavyResult <- *address + } }() } else { close(heavyResult) @@ -128,7 +130,9 @@ func (api *FlumeTokensAPI) Erc20Holders(ctx *rpc.CallContext, addr common.Addres log.Error("Error processing request in flume_erc20Holders", "err", err) errChan <- err } - heavyResult <- *address + if address != nil { + heavyResult <- *address + } }() } else { close(heavyResult) diff --git a/api/flume_tokensAPI_test.go b/api/flume_tokensAPI_test.go index 01b31d9..1134acf 100644 --- a/api/flume_tokensAPI_test.go +++ b/api/flume_tokensAPI_test.go @@ -3,7 +3,6 @@ package api import ( "bytes" "compress/gzip" - "context" "encoding/json" "fmt" "io" @@ -54,7 +53,7 @@ func TestERCMethods(t *testing.T) { address := "0xdac17f958d2ee523a2206206994597c13d831ec7" t.Run(fmt.Sprintf("Erc20Holders"), func(t *testing.T) { - actual, _ := ft.Erc20Holders(context.Background(), common.HexToAddress(address), nil) + actual, _ := ft.Erc20Holders(mockContext, common.HexToAddress(address), nil) for i, addr := range actual.Items { if addr != data[0][i] { t.Fatalf("Erc20Holders error") @@ -62,7 +61,7 @@ func TestERCMethods(t *testing.T) { } }) t.Run(fmt.Sprintf("Erc20ByAccount"), func(t *testing.T) { - actual, _ := ft.Erc20ByAccount(context.Background(), common.HexToAddress(address), nil) + actual, _ := ft.Erc20ByAccount(mockContext, common.HexToAddress(address), nil) for i, addr := range actual.Items { if addr != data[1][i] { t.Fatalf("Erc20ByAccount error") diff --git a/api/heavy_test.go b/api/heavy_test.go index 44271e5..8c0fdd7 100644 --- a/api/heavy_test.go +++ b/api/heavy_test.go @@ -228,7 +228,7 @@ func TestCallHeavy(t *testing.T) { f := NewFlumeAPI(db, 1, pl, cfg, mempool) - _, err = f.GetTransactionsBySender(context.Background(), testAddress, nil) + _, err = f.GetTransactionsBySender(mockContext, testAddress, nil) if err == nil { t.Fatal("GetTransactionsBySender did not return expected error, heavy test", "err", err.Error()) } @@ -238,11 +238,11 @@ func TestCallHeavy(t *testing.T) { if err.(*heavy.MockError).Params[0].(common.Address) != testAddress { t.Fatal("GetTransactionsBySender did not return expected parameter address, heavy test", "err", err.Error()) } - if err.(*heavy.MockError).Params[1].(*int) != nil { + if *err.(*heavy.MockError).Params[1].(*int) != 0 { t.Fatal("GetTransactionsBySender did not return expected parameter offset, heavy test", "err", err.Error()) } - _, err = f.GetTransactionReceiptsBySender(context.Background(), testAddress, nil) + _, err = f.GetTransactionReceiptsBySender(mockContext, testAddress, nil) if err == nil { t.Fatal("GetTransactionReceiptsBySender did not return expected error, heavy test", "err", err.Error()) } @@ -252,11 +252,11 @@ func TestCallHeavy(t *testing.T) { if err.(*heavy.MockError).Params[0].(common.Address) != testAddress { t.Fatal("GetTransactionReceiptsBySender did not return expected parameter address, heavy test", "err", err.Error()) } - if err.(*heavy.MockError).Params[1].(*int) != nil { + if *err.(*heavy.MockError).Params[1].(*int) != 0 { t.Fatal("GetTransactionReceiptsBySender did not return expected parameter offset, heavy test", "err", err.Error()) } - _, err = f.GetTransactionsByRecipient(context.Background(), testAddress, nil) + _, err = f.GetTransactionsByRecipient(mockContext, testAddress, nil) if err == nil { t.Fatal("GetTransactionsByRecipient did not return expected error, heavy test", "err", err.Error()) } @@ -266,11 +266,11 @@ func TestCallHeavy(t *testing.T) { if err.(*heavy.MockError).Params[0].(common.Address) != testAddress { t.Fatal("GetTransactionsByRecipient did not return expected parameter address, heavy test", "err", err.Error()) } - if err.(*heavy.MockError).Params[1].(*int) != nil { + if *err.(*heavy.MockError).Params[1].(*int) != 0 { t.Fatal("GetTransactionsByRecipient did not return expected parameter offset, heavy test", "err", err.Error()) } - _, err = f.GetTransactionReceiptsByRecipient(context.Background(), testAddress, nil) + _, err = f.GetTransactionReceiptsByRecipient(mockContext, testAddress, nil) if err == nil { t.Fatal("GetTransactionReceiptsByRecipient did not return expected error, heavy test", "err", err.Error()) } @@ -280,11 +280,11 @@ func TestCallHeavy(t *testing.T) { if err.(*heavy.MockError).Params[0].(common.Address) != testAddress { t.Fatal("GetTransactionReceiptsByRecipient did not return expected parameter address, heavy test", "err", err.Error()) } - if err.(*heavy.MockError).Params[1].(*int) != nil { + if *err.(*heavy.MockError).Params[1].(*int) != 0 { t.Fatal("GetTransactionReceiptsByRecipient did not return expected parameter offset, heavy test", "err", err.Error()) } - _, err = f.GetTransactionsByParticipant(context.Background(), testAddress, nil) + _, err = f.GetTransactionsByParticipant(mockContext, testAddress, nil) if err == nil { t.Fatal("GetTransactionsByParticipant did not return expected error, heavy test", "err", err.Error()) } @@ -294,11 +294,11 @@ func TestCallHeavy(t *testing.T) { if err.(*heavy.MockError).Params[0].(common.Address) != testAddress { t.Fatal("GetTransactionsByParticipant did not return expected parameter address, heavy test", "err", err.Error()) } - if err.(*heavy.MockError).Params[1].(*int) != nil { + if *err.(*heavy.MockError).Params[1].(*int) != 0 { t.Fatal("GetTransactionsByParticipant did not return expected parameter offset, heavy test", "err", err.Error()) } - _, err = f.GetTransactionReceiptsByParticipant(context.Background(), testAddress, nil) + _, err = f.GetTransactionReceiptsByParticipant(mockContext, testAddress, nil) if err == nil { t.Fatal("GetTransactionReceiptsByParticipant did not return expected error, heavy test", "err", err.Error()) } @@ -308,7 +308,7 @@ func TestCallHeavy(t *testing.T) { if err.(*heavy.MockError).Params[0].(common.Address) != testAddress { t.Fatal("GetTransactionReceiptsByParticipant did not return expected parameter address, heavy test", "err", err.Error()) } - if err.(*heavy.MockError).Params[1].(*int) != nil { + if *err.(*heavy.MockError).Params[1].(*int) != 0 { t.Fatal("GetTransactionReceiptsByParticipant did not return expected parameter offset, heavy test", "err", err.Error()) } @@ -339,7 +339,7 @@ func TestCallHeavy(t *testing.T) { offset = new(int) *offset = 1 - _, err = ft.Erc20ByAccount(context.Background(), testAddress, offset) + _, err = ft.Erc20ByAccount(mockContext, testAddress, offset) if err == nil { t.Fatal("Erc20ByAccount did not return expected error, heavy test", "err", err.Error()) } @@ -353,7 +353,7 @@ func TestCallHeavy(t *testing.T) { t.Fatal("Erc20ByAccount did not return expected parameter offset, heavy test", "err", err.(*heavy.MockError).Params[1]) } - _, err = ft.Erc20Holders(context.Background(), testAddress, offset) + _, err = ft.Erc20Holders(mockContext, testAddress, offset) if err == nil { t.Fatal("Erc20Holders did not return expected error, heavy test", "err", err.Error()) } diff --git a/api/logs.go b/api/logs.go index 2d86b3f..fea1bb3 100644 --- a/api/logs.go +++ b/api/logs.go @@ -104,7 +104,9 @@ func (api *LogsAPI) GetLogs(ctx context.Context, crit FilterQuery) ([]*logType, log.Error("Error processing request in eth_getLogs", "err", err) errChan <- err } - heavyResult <- *logs + if logs != nil { + heavyResult <- *logs + } }() } else { close(heavyResult) From 9d86b01420c6eafb9daf790238ac47947612d30a Mon Sep 17 00:00:00 2001 From: philip-morlier Date: Thu, 8 Feb 2024 15:51:45 -0800 Subject: [PATCH 08/19] Corrected fix to flume and flume_token namespace parallelization methods and tests. Fix addresses offset problem --- api/flume.go | 24 ++++++++++++------------ api/flume_tokens.go | 8 ++++---- api/heavy_test.go | 12 ++++++------ 3 files changed, 22 insertions(+), 22 deletions(-) diff --git a/api/flume.go b/api/flume.go index a2d0810..dd54098 100644 --- a/api/flume.go +++ b/api/flume.go @@ -73,7 +73,7 @@ func (api *FlumeAPI) GetTransactionsBySender(ctx *rpc.CallContext, address commo if len(api.cfg.HeavyServer) > 0 { log.Debug("flume_getTransactionsBySender sent to flume heavy by default") missMeter.Mark(1) - go func() { + go func(offset *int) { tx, err := heavy.CallHeavyDiscrete[*paginator[map[string]interface{}]](ctx.Context(), api.cfg.HeavyServer, api.cfg.EarliestBlock, "flume_getTransactionsBySender", address, offset) if err != nil { log.Error("Error processing request in flume_getTransactionsBySender", "err", err) @@ -82,7 +82,7 @@ func (api *FlumeAPI) GetTransactionsBySender(ctx *rpc.CallContext, address commo if tx != nil { heavyResult <- *tx } - }() + }(offset) } else { close(heavyResult) } @@ -141,7 +141,7 @@ func (api *FlumeAPI) GetTransactionReceiptsBySender(ctx *rpc.CallContext, addres if len(api.cfg.HeavyServer) > 0 { log.Debug("flume_getTransactionReceiptsBySender sent to flume heavy by default") missMeter.Mark(1) - go func() { + go func(offset *int) { rt, err := heavy.CallHeavyDiscrete[*paginator[map[string]interface{}]](ctx.Context(), api.cfg.HeavyServer, api.cfg.EarliestBlock, "flume_getTransactionReceiptsBySender", address, offset) if err != nil { log.Error("Error processing request in flume_getTransactionReceiptsBySender", "err", err) @@ -150,7 +150,7 @@ func (api *FlumeAPI) GetTransactionReceiptsBySender(ctx *rpc.CallContext, addres if rt != nil { heavyResult <- *rt } - }() + }(offset) } else { close(heavyResult) } @@ -202,7 +202,7 @@ func (api *FlumeAPI) GetTransactionsByRecipient(ctx *rpc.CallContext, address co if len(api.cfg.HeavyServer) > 0 { log.Debug("flume_getTransactionsByRecipient sent to flume heavy by default") missMeter.Mark(1) - go func() { + go func(offset *int) { tx, err := heavy.CallHeavyDiscrete[*paginator[map[string]interface{}]](ctx.Context(), api.cfg.HeavyServer, api.cfg.EarliestBlock, "flume_getTransactionsByRecipient", address, offset) if err != nil { log.Error("Error processing request in flume_getTransactionsByRecipient", "err", err) @@ -211,7 +211,7 @@ func (api *FlumeAPI) GetTransactionsByRecipient(ctx *rpc.CallContext, address co if tx != nil { heavyResult <- *tx } - }() + }(offset) } else { close(heavyResult) } @@ -270,7 +270,7 @@ func (api *FlumeAPI) GetTransactionReceiptsByRecipient(ctx *rpc.CallContext, add if len(api.cfg.HeavyServer) > 0 { log.Debug("flume_getTransactionReceiptsByRecipient sent to flume heavy by default") missMeter.Mark(1) - go func() { + go func(offset *int) { rt, err := heavy.CallHeavyDiscrete[*paginator[map[string]interface{}]](ctx.Context(), api.cfg.HeavyServer, api.cfg.EarliestBlock, "flume_getTransactionReceiptsByRecipient", address, offset) if err != nil { log.Error("Error processing request in flume_getTransactionReceiptsByRecipient", "err", err) @@ -279,7 +279,7 @@ func (api *FlumeAPI) GetTransactionReceiptsByRecipient(ctx *rpc.CallContext, add if rt != nil { heavyResult <- *rt } - }() + }(offset) } else { close(heavyResult) } @@ -331,7 +331,7 @@ func (api *FlumeAPI) GetTransactionsByParticipant(ctx *rpc.CallContext, address if len(api.cfg.HeavyServer) > 0 { log.Debug("flume_getTransactionByParticipant sent to flume heavy by default") missMeter.Mark(1) - go func() { + go func(offset *int) { tx, err := heavy.CallHeavyDiscrete[*paginator[map[string]interface{}]](ctx.Context(), api.cfg.HeavyServer, api.cfg.EarliestBlock, "flume_getTransactionsByParticipant", address, offset) if err != nil { log.Error("Error processing request in flume_getTransactionByParticipant", "err", err) @@ -340,7 +340,7 @@ func (api *FlumeAPI) GetTransactionsByParticipant(ctx *rpc.CallContext, address if tx != nil { heavyResult <- *tx } - }() + }(offset) } else { close(heavyResult) } @@ -400,7 +400,7 @@ func (api *FlumeAPI) GetTransactionReceiptsByParticipant(ctx *rpc.CallContext, a if len(api.cfg.HeavyServer) > 0 { log.Debug("flume_getTransactionReceiptsByParticipant sent to flume heavy by default") missMeter.Mark(1) - go func() { + go func(offset *int) { rt, err := heavy.CallHeavy[*paginator[map[string]interface{}]](ctx.Context(), api.cfg.HeavyServer, "flume_getTransactionReceiptsByParticipant", address, offset) if err != nil { log.Error("Error processing request in flume_getTransactionReceiptsByParticipant", "err", err) @@ -411,7 +411,7 @@ func (api *FlumeAPI) GetTransactionReceiptsByParticipant(ctx *rpc.CallContext, a } else { heavyResult <- *rt } - }() + }(offset) } else { close(heavyResult) } diff --git a/api/flume_tokens.go b/api/flume_tokens.go index 0c89a4d..0a1db94 100644 --- a/api/flume_tokens.go +++ b/api/flume_tokens.go @@ -43,7 +43,7 @@ func (api *FlumeTokensAPI) Erc20ByAccount(ctx *rpc.CallContext, addr common.Addr if len(api.cfg.HeavyServer) > 0 { log.Debug("flume_erc20ByAccount sent to flume heavy by default") missMeter.Mark(1) - go func() { + go func(offset *int) { address, err := heavy.CallHeavyDiscrete[*paginator[common.Address]](ctx.Context(), api.cfg.HeavyServer, api.cfg.EarliestBlock, "flume_erc20ByAccount", addr, offset) if err != nil { log.Error("Error processing request in flume_erc20ByAccount", "err", err) @@ -52,7 +52,7 @@ func (api *FlumeTokensAPI) Erc20ByAccount(ctx *rpc.CallContext, addr common.Addr if address != nil { heavyResult <- *address } - }() + }(offset) } else { close(heavyResult) } @@ -124,7 +124,7 @@ func (api *FlumeTokensAPI) Erc20Holders(ctx *rpc.CallContext, addr common.Addres if len(api.cfg.HeavyServer) > 0 { log.Debug("flume_erc20Holders sent to flume heavy by default") missMeter.Mark(1) - go func() { + go func(offset *int) { address, err := heavy.CallHeavyDiscrete[*paginator[common.Address]](ctx.Context(), api.cfg.HeavyServer, api.cfg.EarliestBlock, "flume_erc20Holders", addr, offset) if err != nil { log.Error("Error processing request in flume_erc20Holders", "err", err) @@ -133,7 +133,7 @@ func (api *FlumeTokensAPI) Erc20Holders(ctx *rpc.CallContext, addr common.Addres if address != nil { heavyResult <- *address } - }() + }(offset) } else { close(heavyResult) } diff --git a/api/heavy_test.go b/api/heavy_test.go index 8c0fdd7..1cecce7 100644 --- a/api/heavy_test.go +++ b/api/heavy_test.go @@ -238,7 +238,7 @@ func TestCallHeavy(t *testing.T) { if err.(*heavy.MockError).Params[0].(common.Address) != testAddress { t.Fatal("GetTransactionsBySender did not return expected parameter address, heavy test", "err", err.Error()) } - if *err.(*heavy.MockError).Params[1].(*int) != 0 { + if err.(*heavy.MockError).Params[1].(*int) != nil { t.Fatal("GetTransactionsBySender did not return expected parameter offset, heavy test", "err", err.Error()) } @@ -252,7 +252,7 @@ func TestCallHeavy(t *testing.T) { if err.(*heavy.MockError).Params[0].(common.Address) != testAddress { t.Fatal("GetTransactionReceiptsBySender did not return expected parameter address, heavy test", "err", err.Error()) } - if *err.(*heavy.MockError).Params[1].(*int) != 0 { + if err.(*heavy.MockError).Params[1].(*int) != nil { t.Fatal("GetTransactionReceiptsBySender did not return expected parameter offset, heavy test", "err", err.Error()) } @@ -266,7 +266,7 @@ func TestCallHeavy(t *testing.T) { if err.(*heavy.MockError).Params[0].(common.Address) != testAddress { t.Fatal("GetTransactionsByRecipient did not return expected parameter address, heavy test", "err", err.Error()) } - if *err.(*heavy.MockError).Params[1].(*int) != 0 { + if err.(*heavy.MockError).Params[1].(*int) != nil { t.Fatal("GetTransactionsByRecipient did not return expected parameter offset, heavy test", "err", err.Error()) } @@ -280,7 +280,7 @@ func TestCallHeavy(t *testing.T) { if err.(*heavy.MockError).Params[0].(common.Address) != testAddress { t.Fatal("GetTransactionReceiptsByRecipient did not return expected parameter address, heavy test", "err", err.Error()) } - if *err.(*heavy.MockError).Params[1].(*int) != 0 { + if err.(*heavy.MockError).Params[1].(*int) != nil { t.Fatal("GetTransactionReceiptsByRecipient did not return expected parameter offset, heavy test", "err", err.Error()) } @@ -294,7 +294,7 @@ func TestCallHeavy(t *testing.T) { if err.(*heavy.MockError).Params[0].(common.Address) != testAddress { t.Fatal("GetTransactionsByParticipant did not return expected parameter address, heavy test", "err", err.Error()) } - if *err.(*heavy.MockError).Params[1].(*int) != 0 { + if err.(*heavy.MockError).Params[1].(*int) != nil { t.Fatal("GetTransactionsByParticipant did not return expected parameter offset, heavy test", "err", err.Error()) } @@ -308,7 +308,7 @@ func TestCallHeavy(t *testing.T) { if err.(*heavy.MockError).Params[0].(common.Address) != testAddress { t.Fatal("GetTransactionReceiptsByParticipant did not return expected parameter address, heavy test", "err", err.Error()) } - if *err.(*heavy.MockError).Params[1].(*int) != 0 { + if err.(*heavy.MockError).Params[1].(*int) != nil { t.Fatal("GetTransactionReceiptsByParticipant did not return expected parameter offset, heavy test", "err", err.Error()) } From dc3ecf3a05bea3d2536803ce18312d7f23a54c4b Mon Sep 17 00:00:00 2001 From: philip-morlier Date: Thu, 8 Feb 2024 16:16:50 -0800 Subject: [PATCH 09/19] Updated rpc import to v1.2.1-newcontext1 --- go.mod | 2 +- go.sum | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index 3292439..1f715d7 100644 --- a/go.mod +++ b/go.mod @@ -9,7 +9,7 @@ require ( github.com/klauspost/compress v1.15.15 github.com/mattn/go-sqlite3 v1.14.16 github.com/openrelayxyz/cardinal-evm v1.11.0 - github.com/openrelayxyz/cardinal-rpc v1.2.0-sf1 + github.com/openrelayxyz/cardinal-rpc v1.2.1-newcontext1 github.com/openrelayxyz/cardinal-streams v1.4.1 github.com/openrelayxyz/cardinal-types v1.1.1 github.com/xsleonard/go-merkle v1.1.0 diff --git a/go.sum b/go.sum index 62522a2..c8552be 100644 --- a/go.sum +++ b/go.sum @@ -102,8 +102,8 @@ github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLA github.com/openrelayxyz/cardinal-evm v1.11.0 h1:HUsS1OChmnTxY3TBg46fTkQpTv34Uci4ZXYSj3Lzl64= github.com/openrelayxyz/cardinal-evm v1.11.0/go.mod h1:GXka2Ug8HtNER3uYBSCZKJdcCGkhYeHwIJWaAjPbm2k= github.com/openrelayxyz/cardinal-rpc v1.1.0/go.mod h1:UZ5KxcsG51ZMBHvLpaDoIReyHpGfGTkazuSKh8Lx4q8= -github.com/openrelayxyz/cardinal-rpc v1.2.0-sf1 h1:hWQtpcq8eLuttFX+dG1hNge7qZW2HS0GmyHR4IElaq4= -github.com/openrelayxyz/cardinal-rpc v1.2.0-sf1/go.mod h1:UZ5KxcsG51ZMBHvLpaDoIReyHpGfGTkazuSKh8Lx4q8= +github.com/openrelayxyz/cardinal-rpc v1.2.1-newcontext1 h1:Amm6aVEo9CnmCAqG1Ewov8vrGuabL07yCtrHTMdFly4= +github.com/openrelayxyz/cardinal-rpc v1.2.1-newcontext1/go.mod h1:dvM++vpmtimrfMovrxoFG4n0x54IXFRfPe2w8HmtoJU= github.com/openrelayxyz/cardinal-streams v1.4.1 h1:v0jrxk8iyZpWnccQ+uGRfrktdoTuY/sZMGumLz/x7Ic= github.com/openrelayxyz/cardinal-streams v1.4.1/go.mod h1:c1I8beZ/p0dfSiDbIGSFN+EMgniRLWq6yKJZgo9yqaQ= github.com/openrelayxyz/cardinal-types v1.0.0/go.mod h1:4CzsdfRjAWXeOszcXbUuNNjEP7ZR8Np8RO3FO5TFA6E= From 06e9e6cd43fac768825b28ed54d8e8ee2c22f521 Mon Sep 17 00:00:00 2001 From: philip-morlier Date: Thu, 8 Feb 2024 16:47:42 -0800 Subject: [PATCH 10/19] Added heavy hit and miss metrics for block and tx hashes --- api/block.go | 36 ++++++++++-------------------------- api/flume.go | 8 ++++++++ api/logs.go | 9 ++++++++- api/transaction.go | 24 ++++++++---------------- 4 files changed, 34 insertions(+), 43 deletions(-) diff --git a/api/block.go b/api/block.go index ef546d6..36b0940 100644 --- a/api/block.go +++ b/api/block.go @@ -21,8 +21,8 @@ var ( hitMeter = metrics.NewMajorMeter("/flume/hit") missMeter = metrics.NewMajorMeter("/flume/miss") - heavyBlockHit = metrics.NewMinorMeter("/flume/hbc/hit") - heavyBlockMiss = metrics.NewMinorMeter("/flume/hbc/miss") + heavyBlockHashHit = metrics.NewMinorMeter("/flume/hbh/hit") + heavyBlockHashMiss = metrics.NewMinorMeter("/flume/hbh/miss") ) type BlockAPI struct { @@ -78,10 +78,6 @@ func (api *BlockAPI) GetBlockByNumber(ctx context.Context, blockNumber rpc.Block if err != nil { return nil, err } - if responseShell == nil { - heavyBlockMiss.Mark(1) - } - heavyBlockHit.Mark(1) return responseShell, nil } @@ -146,9 +142,9 @@ func (api *BlockAPI) GetBlockByHash(ctx context.Context, blockHash types.Hash, i return nil, err } if responseShell == nil { - heavyBlockMiss.Mark(1) + heavyBlockHashMiss.Mark(1) } - heavyBlockHit.Mark(1) + heavyBlockHashHit.Mark(1) return responseShell, nil } @@ -205,10 +201,6 @@ func (api *BlockAPI) GetBlockTransactionCountByNumber(ctx context.Context, block if err != nil { return nil, err } - if count == nil { - heavyBlockMiss.Mark(1) - } - heavyBlockHit.Mark(1) return count, nil } @@ -252,9 +244,9 @@ func (api *BlockAPI) GetBlockTransactionCountByHash(ctx context.Context, blockHa return nil, err } if count == nil { - heavyBlockMiss.Mark(1) + heavyBlockHashMiss.Mark(1) } - heavyBlockHit.Mark(1) + heavyBlockHashHit.Mark(1) return count, nil } @@ -297,10 +289,6 @@ func (api *BlockAPI) GetUncleCountByBlockNumber(ctx context.Context, blockNumber if err != nil { return nil, err } - if count == nil { - heavyBlockMiss.Mark(1) - } - heavyBlockHit.Mark(1) return count, nil } @@ -347,9 +335,9 @@ func (api *BlockAPI) GetUncleCountByBlockHash(ctx context.Context, blockHash typ return nil, err } if count == nil { - heavyBlockMiss.Mark(1) + heavyBlockHashMiss.Mark(1) } - heavyBlockHit.Mark(1) + heavyBlockHashHit.Mark(1) return count, nil } @@ -392,10 +380,6 @@ func (api *BlockAPI) GetBlockReceipts(ctx context.Context, input BlockNumberOrHa if err != nil { return nil, err } - if rt == nil { - heavyBlockMiss.Mark(1) - } - heavyBlockHit.Mark(1) return *rt, nil } @@ -442,9 +426,9 @@ func (api *BlockAPI) GetBlockReceipts(ctx context.Context, input BlockNumberOrHa return nil, err } if rt == nil { - heavyBlockMiss.Mark(1) + heavyBlockHashMiss.Mark(1) } - heavyBlockHit.Mark(1) + heavyBlockHashHit.Mark(1) return *rt, nil } diff --git a/api/flume.go b/api/flume.go index dd54098..a19227c 100644 --- a/api/flume.go +++ b/api/flume.go @@ -469,6 +469,10 @@ func (api *FlumeAPI) GetTransactionReceiptsByBlockHash(ctx context.Context, bloc if err != nil { return nil, err } + if responseShell == nil { + heavyBlockHashMiss.Mark(1) + } + heavyBlockHashHit.Mark(1) return *rt, nil } @@ -544,6 +548,10 @@ func (api *FlumeAPI) GetBlockByTransactionHash(ctx context.Context, txHash types if err != nil { return nil, err } + if responseShell == nil { + heavyTxHashMiss.Mark(1) + } + heavyBlockTxHit.Mark(1) return responseShell, nil } diff --git a/api/logs.go b/api/logs.go index fea1bb3..8f3990a 100644 --- a/api/logs.go +++ b/api/logs.go @@ -106,7 +106,14 @@ func (api *LogsAPI) GetLogs(ctx context.Context, crit FilterQuery) ([]*logType, } if logs != nil { heavyResult <- *logs - } + if crit.BlockHash != nil { + heavyBlockHashHit.Mark(1) + } + } else { + if crit.BlockHash != nil { + heavyBlockHashMiss.Mark(1) + } + } }() } else { close(heavyResult) diff --git a/api/transaction.go b/api/transaction.go index 7038217..4966738 100644 --- a/api/transaction.go +++ b/api/transaction.go @@ -41,8 +41,8 @@ var ( gtbhHitMeter = metrics.NewMinorMeter("/flume/gtbh/hit") gtbhMissMeter = metrics.NewMinorMeter("/flume/gtbh/miss") - heavyTxHit = metrics.NewMinorMeter("/flume/htc/hit") - heavyTxMiss = metrics.NewMinorMeter("/flume/htc/miss") + heavyTxHashHit = metrics.NewMinorMeter("/flume/hth/hit") + heavyTxHashMiss = metrics.NewMinorMeter("/flume/hth/miss") ) func (api *TransactionAPI) GetTransactionByHash(ctx context.Context, txHash types.Hash) (*map[string]interface{}, error) { @@ -56,9 +56,9 @@ func (api *TransactionAPI) GetTransactionByHash(ctx context.Context, txHash type return nil, err } if responseShell == nil { - heavyTxMiss.Mark(1) + heavyTxHashMiss.Mark(1) } - heavyTxHit.Mark(1) + heavyTxHashHit.Mark(1) return responseShell, nil } @@ -118,9 +118,9 @@ func (api *TransactionAPI) GetTransactionByBlockHashAndIndex(ctx context.Context return nil, err } if responseShell == nil { - heavyTxMiss.Mark(1) + heavyTxHashMiss.Mark(1) } - heavyTxHit.Mark(1) + heavyTxHashHit.Mark(1) return responseShell, nil } @@ -155,10 +155,6 @@ func (api *TransactionAPI) GetTransactionByBlockNumberAndIndex(ctx context.Conte if err != nil { return nil, err } - if responseShell == nil { - heavyTxMiss.Mark(1) - } - heavyTxHit.Mark(1) return responseShell, nil } @@ -201,9 +197,9 @@ func (api *TransactionAPI) GetTransactionReceipt(ctx context.Context, txHash typ return nil, err } if responseShell == nil { - heavyTxMiss.Mark(1) + heavyTxHashMiss.Mark(1) } - heavyTxHit.Mark(1) + heavyTxHashHit.Mark(1) return responseShell, nil } @@ -271,10 +267,6 @@ func (api *TransactionAPI) GetTransactionCount(ctx context.Context, addr common. if err != nil { return nil, err } - if count == nil { - heavyTxMiss.Mark(1) - } - heavyTxHit.Mark(1) return count, nil } From 4bb7c60bf2626cb81b43fe2ddde41d1036200583 Mon Sep 17 00:00:00 2001 From: philip-morlier Date: Thu, 8 Feb 2024 16:56:06 -0800 Subject: [PATCH 11/19] Fixed typos --- api/flume.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/api/flume.go b/api/flume.go index a19227c..9851530 100644 --- a/api/flume.go +++ b/api/flume.go @@ -469,7 +469,7 @@ func (api *FlumeAPI) GetTransactionReceiptsByBlockHash(ctx context.Context, bloc if err != nil { return nil, err } - if responseShell == nil { + if rt == nil { heavyBlockHashMiss.Mark(1) } heavyBlockHashHit.Mark(1) @@ -551,7 +551,7 @@ func (api *FlumeAPI) GetBlockByTransactionHash(ctx context.Context, txHash types if responseShell == nil { heavyTxHashMiss.Mark(1) } - heavyBlockTxHit.Mark(1) + heavyTxHashHit.Mark(1) return responseShell, nil } From 0e9c05058aa8f623f174502b5a1ad8060a9ccc73 Mon Sep 17 00:00:00 2001 From: philip-morlier Date: Fri, 16 Feb 2024 11:33:49 -0800 Subject: [PATCH 12/19] Added condition assuring that resumptionBlockNumber will always be > 128 --- consumer.go | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/consumer.go b/consumer.go index 699e553..246fac6 100644 --- a/consumer.go +++ b/consumer.go @@ -109,7 +109,14 @@ func AcquireConsumer(db *sql.DB, cfg *config.Config, resumptionTime int64, useBl } log.Debug("Current block aquired from heavy", "block", int64(*highestBlock)) - resumptionBlockNumber := int64(*highestBlock) - reorgThreshold + var resumptionBlockNumber int64 + a := int64(*highestBlock) - reorgThreshold + b := int64(*highestBlock) - 129 + if a < b { + resumptionBlockNumber = a + } else { + resumptionBlockNumber = b + } resumptionBlock, err := heavy.CallHeavy[map[string]json.RawMessage](context.Background(), cfg.HeavyServer, "eth_getBlockByNumber", hexutil.Uint64(resumptionBlockNumber), false) if err != nil { From c8d80b8ad5076c736496aeaad579d555d1733bb3 Mon Sep 17 00:00:00 2001 From: philip-morlier Date: Fri, 16 Feb 2024 21:21:09 -0800 Subject: [PATCH 13/19] Updated rpc import to v1.2.1-newcontext3 --- go.mod | 2 +- go.sum | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index 1f715d7..70f0e4a 100644 --- a/go.mod +++ b/go.mod @@ -9,7 +9,7 @@ require ( github.com/klauspost/compress v1.15.15 github.com/mattn/go-sqlite3 v1.14.16 github.com/openrelayxyz/cardinal-evm v1.11.0 - github.com/openrelayxyz/cardinal-rpc v1.2.1-newcontext1 + github.com/openrelayxyz/cardinal-rpc v1.2.1-newcontext3 github.com/openrelayxyz/cardinal-streams v1.4.1 github.com/openrelayxyz/cardinal-types v1.1.1 github.com/xsleonard/go-merkle v1.1.0 diff --git a/go.sum b/go.sum index c8552be..57b70a6 100644 --- a/go.sum +++ b/go.sum @@ -102,8 +102,8 @@ github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLA github.com/openrelayxyz/cardinal-evm v1.11.0 h1:HUsS1OChmnTxY3TBg46fTkQpTv34Uci4ZXYSj3Lzl64= github.com/openrelayxyz/cardinal-evm v1.11.0/go.mod h1:GXka2Ug8HtNER3uYBSCZKJdcCGkhYeHwIJWaAjPbm2k= github.com/openrelayxyz/cardinal-rpc v1.1.0/go.mod h1:UZ5KxcsG51ZMBHvLpaDoIReyHpGfGTkazuSKh8Lx4q8= -github.com/openrelayxyz/cardinal-rpc v1.2.1-newcontext1 h1:Amm6aVEo9CnmCAqG1Ewov8vrGuabL07yCtrHTMdFly4= -github.com/openrelayxyz/cardinal-rpc v1.2.1-newcontext1/go.mod h1:dvM++vpmtimrfMovrxoFG4n0x54IXFRfPe2w8HmtoJU= +github.com/openrelayxyz/cardinal-rpc v1.2.1-newcontext3 h1:0bOeHJh5O7ZF3vQ4bCQhEDVJZ3Z/KrEIfOzVE1d571o= +github.com/openrelayxyz/cardinal-rpc v1.2.1-newcontext3/go.mod h1:dvM++vpmtimrfMovrxoFG4n0x54IXFRfPe2w8HmtoJU= github.com/openrelayxyz/cardinal-streams v1.4.1 h1:v0jrxk8iyZpWnccQ+uGRfrktdoTuY/sZMGumLz/x7Ic= github.com/openrelayxyz/cardinal-streams v1.4.1/go.mod h1:c1I8beZ/p0dfSiDbIGSFN+EMgniRLWq6yKJZgo9yqaQ= github.com/openrelayxyz/cardinal-types v1.0.0/go.mod h1:4CzsdfRjAWXeOszcXbUuNNjEP7ZR8Np8RO3FO5TFA6E= From 0eeb24609bf8aa01c81b1efd82801fba1abfb1e2 Mon Sep 17 00:00:00 2001 From: philip-morlier Date: Fri, 16 Feb 2024 21:21:45 -0800 Subject: [PATCH 14/19] Added comment to consumer resumptionBlockNumber condition --- consumer.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/consumer.go b/consumer.go index 246fac6..75d3e80 100644 --- a/consumer.go +++ b/consumer.go @@ -109,6 +109,9 @@ func AcquireConsumer(db *sql.DB, cfg *config.Config, resumptionTime int64, useBl } log.Debug("Current block aquired from heavy", "block", int64(*highestBlock)) + // below both an archive configuration of flume as well as cardinal streams are relying on resumptionBlockNumber + // archive flume has tests which require a minimum of 128 block overlap between databases, and so we do a comparison + // to catch this contingency while defaulting to the reorgThreshold so as to maintain consistency in all other cases. var resumptionBlockNumber int64 a := int64(*highestBlock) - reorgThreshold b := int64(*highestBlock) - 129 From 91cd8ef5d124e0b1caa42405dada7f66519c4896 Mon Sep 17 00:00:00 2001 From: philip-morlier Date: Fri, 16 Feb 2024 21:22:26 -0800 Subject: [PATCH 15/19] Added python archive tests for analysis --- archive_tests.py | 42 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 42 insertions(+) create mode 100644 archive_tests.py diff --git a/archive_tests.py b/archive_tests.py new file mode 100644 index 0000000..997b3ec --- /dev/null +++ b/archive_tests.py @@ -0,0 +1,42 @@ +import requests +import json + +import requests +import json + +def get_all_by_address(url, address, meth): + results = [] + s = requests.session() + try: + outer = s.post(url, json={"id":77,"method":f"flume_get{meth}","params":[address]}).json() + res = outer['result'] + except Exception as e: + print(f"error on stand alone attempt result, error: {e}") + raise + return + while res.get("next"): + results.append(res['items']) + try: + outer = s.post(url, json={"id":77,"method":f"flume_get{meth}","params":[address, res['next']]}).json() + res = outer['result'] + except Exception as e: + print(f"error on inner attemtp result, error: {e}") + raise + return + results.append(res['items']) + # print(f"len of list {len(results[0])}") + #return sorted(results, key=lambda x: int(x[0]['nonce'], 16)) + return sorted(results, key=lambda x: int(x[0]['blockNumber'], 16)) + +def compare_multiple(adrss, method): + flume = 'http://localhost:8000' + rivet = 'https://sepolia.rpc.rivet.cloud/2ebb6d9d2bbb42a3a9b1cf4d129e9609' + for adr in adrss: + try: + r = get_all_by_address(rivet, adr, method) + f = get_all_by_address(flume, adr, method) + except Exception as e: + print(f"error on address {adr}, exception {e}") + raise + return + print(f"{adr}, {f == r}") From d2b52ab1dd830b35c6dd873ac873d6702a4a23d9 Mon Sep 17 00:00:00 2001 From: philip-morlier Date: Sat, 17 Feb 2024 10:39:32 -0800 Subject: [PATCH 16/19] fixed type on flume api --- api/flume.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/api/flume.go b/api/flume.go index 9851530..e14f2bf 100644 --- a/api/flume.go +++ b/api/flume.go @@ -679,7 +679,7 @@ func (api *FlumeAPI) TransactionHashesWithPrefix(ctx *rpc.CallContext, partialHe log.Debug("flume_TransactionHashesWithPrefix sent to flume heavy by default") missMeter.Mark(1) go func() { - hashes, err := heavy.CallHeavy[[]string](ctx.Context(), api.cfg.HeavyServer, "flume_TransactionHashesWithPrefix", partialHexString) + hashes, err := heavy.CallHeavy[[]string](ctx.Context(), api.cfg.HeavyServer, "flume_transactionHashesWithPrefix", partialHexString) if err != nil { log.Error("Error calling heavy server, flume_transactionHashesWithPrefix", "err", err) errChan <- err From 7a131950d18509c6fa63965c9431c1bc3dcfaffd Mon Sep 17 00:00:00 2001 From: philip-morlier Date: Sat, 17 Feb 2024 15:15:16 -0800 Subject: [PATCH 17/19] Fixed logs in heavytest --- main.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/main.go b/main.go index 38a043f..3359bae 100644 --- a/main.go +++ b/main.go @@ -372,11 +372,11 @@ func runStartupChecks(certainty, heavyCheck bool, database *sql.DB, config *conf os.Exit(1) } if earliestBlock > hl { - log.Error("Gap found between local and heavy database", "local earliest", earliestBlock, "heavy latest", heavyLatest) + log.Error("Gap found between local and heavy database", "local earliest", earliestBlock, "heavy latest", hl) os.Exit(1) } if hl - earliestBlock < 128 { - log.Error("Overlap between local and heavy databases is too small", "local earliest", earliestBlock, "heavy latest", heavyLatest) + log.Error("Overlap between local and heavy databases is too small", "local earliest", earliestBlock, "heavy latest", hl) os.Exit(1) } } From ecd69cce1688f30083993dd243743a4105fc3aa8 Mon Sep 17 00:00:00 2001 From: philip-morlier Date: Thu, 11 Apr 2024 14:13:06 -0700 Subject: [PATCH 18/19] Manual fixes to after merging develop branch --- api/flume.go | 108 ++++++++++++++++++++++++++++------------------ api/heavy_test.go | 5 ++- 2 files changed, 69 insertions(+), 44 deletions(-) diff --git a/api/flume.go b/api/flume.go index 93c151e..e5ed406 100644 --- a/api/flume.go +++ b/api/flume.go @@ -884,51 +884,80 @@ func (api *FlumeAPI) AddressWithPrefix(ctx *rpc.CallContext, partialHexString st } bytes = bytes[zeros:] - augmentedBytes := incrementLastByte(bytes) - var intermediate map[string]struct{} - - statements := []string{ - "SELECT DISTINCT(address) FROM event_logs WHERE address > ? AND address < ? AND LENGTH(address) = ? LIMIT 20", - "SELECT DISTINCT(sender) FROM transactions.transactions WHERE sender > ? AND sender < ? AND LENGTH(sender) = ? LIMIT 20", - "SELECT DISTINCT(recipient) FROM transactions.transactions WHERE recipient > ? AND recipient < ? AND LENGTH(recipient) = ? LIMIT 20", - "SELECT DISTINCT(coinbase) FROM blocks WHERE coinbase > ? AND coinbase < ? AND LENGTH(coinbase) = ? LIMIT 20", - } - if api.mempool { - statements = append(statements, - "SELECT DISTINCT(sender) FROM mempool.transactions WHERE sender > ? AND sender < ? AND LENGTH(sender) = ? LIMIT 20", - "SELECT DISTINCT(recipient) FROM mempool.transactions WHERE recipient > ? AND recipient < ? AND LENGTH(recipient) = ? LIMIT 20", - ) - } - - var statement string var rows *sql.Rows var sqlErr error if ctx.Latest > 0 { - statement = "SELECT DISTINCT(address) FROM event_logs WHERE address > ? AND address < ? AND LENGTH(address) = ? AND block < 3LIMIT 20" - rows, sqlErr = api.db.QueryContext(ctx.Context(), statement, bytes, augmentedBytes, 20 - zeros, ctx.Latest) + statements := []string{ + "SELECT DISTINCT(address) FROM event_logs WHERE address > ? AND address < ? AND LENGTH(address) = ? AND block < ? LIMIT 20", + "SELECT DISTINCT(sender) FROM transactions.transactions WHERE sender > ? AND sender < ? AND LENGTH(sender) = ? AND block < ? LIMIT 20", + "SELECT DISTINCT(recipient) FROM transactions.transactions WHERE recipient > ? AND recipient < ? AND LENGTH(recipient) = ? AND block < ? LIMIT 20", + "SELECT DISTINCT(coinbase) FROM blocks WHERE coinbase > ? AND coinbase < ? AND LENGTH(coinbase) = ? AND block < ? LIMIT 20", + } + if api.mempool { + statements = append(statements, + "SELECT DISTINCT(sender) FROM mempool.transactions WHERE sender > ? AND sender < ? AND LENGTH(sender) = ? AND block < ? LIMIT 20", + "SELECT DISTINCT(recipient) FROM mempool.transactions WHERE recipient > ? AND recipient < ? AND LENGTH(recipient) = ? AND block < ? LIMIT 20", + ) + } + for i, statement := range statements { + rows, sqlErr = api.db.QueryContext(ctx.Context(), statement, bytes, augmentedBytes, 20 - zeros, ctx.Latest) + if sqlErr != nil { + exhaustChannels[[]string](heavyResult, errChan) + log.Error("Error returned from query in flume_addressWithPrefix", "err", err) + return nil, nil + } + defer rows.Close() + for rows.Next() { + var addressBytes []byte + err := rows.Scan(&addressBytes) + if err != nil { + exhaustChannels[[]string](heavyResult, errChan) + log.Error("Error scanning rows flume_addressWithPrefix", "query index", i, "err", err) + return nil, err + } + intermediate[hexutil.Encode(addressBytes)] = struct{}{} + } + } } else { - statement = "SELECT DISTINCT(address) FROM event_logs WHERE address > ? AND address < ? AND LENGTH(address) = ? LIMIT 20" - rows, sqlErr = api.db.QueryContext(ctx.Context(), statement, bytes, augmentedBytes, 20 - zeros) - } - if sqlErr != nil { - exhaustChannels[[]string](heavyResult, errChan) - log.Error("Error returned from query in flume_addressWithPrefix", "err", err) - return nil, nil - } - defer rows.Close() - var result []string - for rows.Next() { - var addressBytes []byte - err := rows.Scan(&addressBytes) - if err != nil { + statements := []string{ + "SELECT DISTINCT(address) FROM event_logs WHERE address > ? AND address < ? AND LENGTH(address) = ? LIMIT 20", + "SELECT DISTINCT(sender) FROM transactions.transactions WHERE sender > ? AND sender < ? AND LENGTH(sender) = ? LIMIT 20", + "SELECT DISTINCT(recipient) FROM transactions.transactions WHERE recipient > ? AND recipient < ? AND LENGTH(recipient) = ? LIMIT 20", + "SELECT DISTINCT(coinbase) FROM blocks WHERE coinbase > ? AND coinbase < ? AND LENGTH(coinbase) = ? LIMIT 20", + } + if api.mempool { + statements = append(statements, + "SELECT DISTINCT(sender) FROM mempool.transactions WHERE sender > ? AND sender < ? AND LENGTH(sender) = ? LIMIT 20", + "SELECT DISTINCT(recipient) FROM mempool.transactions WHERE recipient > ? AND recipient < ? AND LENGTH(recipient) = ? LIMIT 20", + ) + } + for i, statement := range statements { + rows, sqlErr = api.db.QueryContext(ctx.Context(), statement, bytes, augmentedBytes, 20 - zeros) + if sqlErr != nil { exhaustChannels[[]string](heavyResult, errChan) - log.Error("Error scanning rows flume_addressWithPrefix") - return nil, err + log.Error("Error returned from query in flume_addressWithPrefix", "err", err) + return nil, nil } - result = append(result, hexutil.Encode(addressBytes)) + defer rows.Close() + for rows.Next() { + var addressBytes []byte + err := rows.Scan(&addressBytes) + if err != nil { + exhaustChannels[[]string](heavyResult, errChan) + log.Error("Error scanning rows flume_addressWithPrefix", "query index", i, "err", err) + return nil, err + } + intermediate[hexutil.Encode(addressBytes)] = struct{}{} + } + } + } + + result := make([]string, 0, len(intermediate)) + for k, _ := range intermediate { + result = append(result, k) } select { @@ -939,11 +968,6 @@ func (api *FlumeAPI) AddressWithPrefix(ctx *rpc.CallContext, partialHexString st case err := <- errChan: return nil, err } - - result := make([]string, 0, len(intermediate)) - for k, _ := range intermediate { - result = append(result, k) - } return result, nil } @@ -989,7 +1013,7 @@ func (api *FlumeAPI) ResolvePrefix(ctx *rpc.CallContext, partialHexString string return nil, err } - result := map[string]interface{}{ + result := map[string][]string{ "blockHashes": blockHashes, "transactionHashes": txHashes, "addresses": addresses, diff --git a/api/heavy_test.go b/api/heavy_test.go index 1cecce7..ae20c59 100644 --- a/api/heavy_test.go +++ b/api/heavy_test.go @@ -5,7 +5,7 @@ import ( "testing" "os" - // log "github.com/inconshreveable/log15" + log "github.com/inconshreveable/log15" "github.com/openrelayxyz/cardinal-evm/common" "github.com/openrelayxyz/cardinal-types" "github.com/openrelayxyz/cardinal-rpc" @@ -17,6 +17,7 @@ import ( ) func TestCallHeavy(t *testing.T) { + log.Info("testing heavy package expect error logs") cfg, err := config.LoadConfig("../testing-resources/heavy_test_config.yml") if err != nil { t.Fatal("Error parsing config TestCallHeavy", "err", err.Error()) @@ -384,5 +385,5 @@ func TestCallHeavy(t *testing.T) { if err.(*heavy.MockError).Method != "eth_maxPriorityFeePerGas" { t.Fatal("MaxPriorityFeePerGas did not return expected method name, heavy test", "err", err.Error()) } - + log.Info("heavy package test complete") } From db72be765fc56807d8546e5080b6cd72ab59d0f1 Mon Sep 17 00:00:00 2001 From: philip-morlier Date: Thu, 18 Apr 2024 12:00:51 -0700 Subject: [PATCH 19/19] Added differentiated query for lowest and highest block main.go --- main.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/main.go b/main.go index 75f78e5..a67cad8 100644 --- a/main.go +++ b/main.go @@ -337,7 +337,10 @@ func main() { func runStartupChecks(certainty, heavyCheck bool, database *sql.DB, config *config.Config) { var earliestBlock, latestBlock uint64 - if err := database.QueryRowContext(context.Background(), "SELECT min(number), max(number) FROM blocks.blocks;").Scan(&earliestBlock, &latestBlock); err != nil { + if err := database.QueryRowContext(context.Background(), "SELECT min(number) FROM blocks.blocks;").Scan(&earliestBlock); err != nil { + log.Error("Error aquiring lowest block from blocks db for startup checks", "err", err) + } + if err := database.QueryRowContext(context.Background(), "SELECT max(number) FROM blocks.blocks;").Scan(&latestBlock); err != nil { log.Error("Error aquiring highest block from blocks db for startup checks", "err", err) }