From 01cba0d0fdfa5c4f270faf17e627134963a43734 Mon Sep 17 00:00:00 2001 From: Vlad Date: Mon, 22 Sep 2025 11:37:11 -0400 Subject: [PATCH 1/2] wip: set up local journal --- mempool/mempool.go | 58 ++++-- mempool/txpool/locals/errors.go | 46 +++++ mempool/txpool/locals/journal.go | 186 +++++++++++++++++++ mempool/txpool/locals/tx_tracker.go | 216 +++++++++++++++++++++++ mempool/txpool/locals/tx_tracker_test.go | 165 +++++++++++++++++ 5 files changed, 656 insertions(+), 15 deletions(-) create mode 100644 mempool/txpool/locals/errors.go create mode 100644 mempool/txpool/locals/journal.go create mode 100644 mempool/txpool/locals/tx_tracker.go create mode 100644 mempool/txpool/locals/tx_tracker_test.go diff --git a/mempool/mempool.go b/mempool/mempool.go index a3a7aa62c..6143a1f1b 100644 --- a/mempool/mempool.go +++ b/mempool/mempool.go @@ -4,7 +4,9 @@ import ( "context" "errors" "fmt" + "github.com/cosmos/evm/mempool/txpool/locals" "sync" + "time" ethtypes "github.com/ethereum/go-ethereum/core/types" "github.com/holiman/uint256" @@ -46,9 +48,10 @@ type ( vmKeeper VMKeeperI /** Mempools **/ - txPool *txpool.TxPool - legacyTxPool *legacypool.LegacyPool - cosmosPool sdkmempool.ExtMempool + txPool *txpool.TxPool + legacyTxPool *legacypool.LegacyPool + localTxTracker *locals.TxTracker + cosmosPool sdkmempool.ExtMempool /** Utils **/ logger log.Logger @@ -127,6 +130,7 @@ func NewExperimentalEVMMempool(getCtxCallback func(height int64, prove bool) (sd // from queued into pending, noting their readiness to be executed. legacyPool.BroadcastTxFn = func(txs []*ethtypes.Transaction) error { logger.Debug("broadcasting EVM transactions", "tx_count", len(txs)) + fmt.Println(clientCtx) return broadcastEVMTransactions(clientCtx, txConfig, txs) } } @@ -143,6 +147,21 @@ func NewExperimentalEVMMempool(getCtxCallback func(height int64, prove bool) (sd panic("tx pool should contain only legacypool") } + var localTxTracker *locals.TxTracker + + if !legacyConfig.NoLocals { + rejournal := legacyConfig.Rejournal + if rejournal < time.Second { + logger.Debug("Sanitizing invalid txpool journal time", "provided", rejournal, "updated", time.Second) + rejournal = time.Second + } + localTxTracker = locals.New(legacyConfig.Journal, rejournal, blockchain.Config(), txPool) + err := localTxTracker.Start() + if err != nil { + return nil + } + } + // Create Cosmos Mempool from configuration cosmosPoolConfig := config.CosmosPoolConfig if cosmosPoolConfig == nil { @@ -174,18 +193,19 @@ func NewExperimentalEVMMempool(getCtxCallback func(height int64, prove bool) (sd cosmosPool = sdkmempool.NewPriorityMempool(*cosmosPoolConfig) evmMempool := &ExperimentalEVMMempool{ - vmKeeper: vmKeeper, - txPool: txPool, - legacyTxPool: txPool.Subpools[0].(*legacypool.LegacyPool), - cosmosPool: cosmosPool, - logger: logger, - txConfig: txConfig, - blockchain: blockchain, - bondDenom: bondDenom, - evmDenom: evmDenom, - blockGasLimit: config.BlockGasLimit, - minTip: config.MinTip, - anteHandler: config.AnteHandler, + vmKeeper: vmKeeper, + txPool: txPool, + legacyTxPool: txPool.Subpools[0].(*legacypool.LegacyPool), + localTxTracker: localTxTracker, + cosmosPool: cosmosPool, + logger: logger, + txConfig: txConfig, + blockchain: blockchain, + bondDenom: bondDenom, + evmDenom: evmDenom, + blockGasLimit: config.BlockGasLimit, + minTip: config.MinTip, + anteHandler: config.AnteHandler, } vmKeeper.SetEvmMempool(evmMempool) @@ -303,6 +323,10 @@ func (m *ExperimentalEVMMempool) Remove(tx sdk.Tx) error { m.mtx.Lock() defer m.mtx.Unlock() + if m.blockchain.latestCtx.BlockHeight() == 0 { + return nil + } + m.logger.Debug("removing transaction from mempool") msg, err := m.getEVMMessage(tx) @@ -419,6 +443,10 @@ func (m *ExperimentalEVMMempool) Close() error { errs = append(errs, fmt.Errorf("failed to close txpool: %w", err)) } + if err := m.localTxTracker.Stop(); err != nil { + errs = append(errs, fmt.Errorf("failed to close localTxTracker: %w", err)) + } + return errors.Join(errs...) } diff --git a/mempool/txpool/locals/errors.go b/mempool/txpool/locals/errors.go new file mode 100644 index 000000000..fda50bf21 --- /dev/null +++ b/mempool/txpool/locals/errors.go @@ -0,0 +1,46 @@ +// Copyright 2025 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package locals + +import ( + "errors" + + "github.com/ethereum/go-ethereum/core/txpool" + "github.com/ethereum/go-ethereum/core/txpool/legacypool" +) + +// IsTemporaryReject determines whether the given error indicates a temporary +// reason to reject a transaction from being included in the txpool. The result +// may change if the txpool's state changes later. +func IsTemporaryReject(err error) bool { + switch { + case errors.Is(err, legacypool.ErrOutOfOrderTxFromDelegated): + return true + case errors.Is(err, txpool.ErrInflightTxLimitReached): + return true + case errors.Is(err, legacypool.ErrAuthorityReserved): + return true + case errors.Is(err, txpool.ErrUnderpriced): + return true + case errors.Is(err, legacypool.ErrTxPoolOverflow): + return true + case errors.Is(err, legacypool.ErrFutureReplacePending): + return true + default: + return false + } +} diff --git a/mempool/txpool/locals/journal.go b/mempool/txpool/locals/journal.go new file mode 100644 index 000000000..46fd6de34 --- /dev/null +++ b/mempool/txpool/locals/journal.go @@ -0,0 +1,186 @@ +// Copyright 2017 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package locals + +import ( + "errors" + "io" + "io/fs" + "os" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/rlp" +) + +// errNoActiveJournal is returned if a transaction is attempted to be inserted +// into the journal, but no such file is currently open. +var errNoActiveJournal = errors.New("no active journal") + +// devNull is a WriteCloser that just discards anything written into it. Its +// goal is to allow the transaction journal to write into a fake journal when +// loading transactions on startup without printing warnings due to no file +// being read for write. +type devNull struct{} + +func (*devNull) Write(p []byte) (n int, err error) { return len(p), nil } +func (*devNull) Close() error { return nil } + +// journal is a rotating log of transactions with the aim of storing locally +// created transactions to allow non-executed ones to survive node restarts. +type journal struct { + path string // Filesystem path to store the transactions at + writer io.WriteCloser // Output stream to write new transactions into +} + +// newTxJournal creates a new transaction journal to +func newTxJournal(path string) *journal { + return &journal{ + path: path, + } +} + +// load parses a transaction journal dump from disk, loading its contents into +// the specified pool. +func (journal *journal) load(add func([]*types.Transaction) []error) error { + // Open the journal for loading any past transactions + input, err := os.Open(journal.path) + if errors.Is(err, fs.ErrNotExist) { + // Skip the parsing if the journal file doesn't exist at all + return nil + } + if err != nil { + return err + } + defer input.Close() + + // Temporarily discard any journal additions (don't double add on load) + journal.writer = new(devNull) + defer func() { journal.writer = nil }() + + // Inject all transactions from the journal into the pool + stream := rlp.NewStream(input, 0) + total, dropped := 0, 0 + + // Create a method to load a limited batch of transactions and bump the + // appropriate progress counters. Then use this method to load all the + // journaled transactions in small-ish batches. + loadBatch := func(txs types.Transactions) { + for _, err := range add(txs) { + if err != nil { + log.Debug("Failed to add journaled transaction", "err", err) + dropped++ + } + } + } + var ( + failure error + batch types.Transactions + ) + for { + // Parse the next transaction and terminate on error + tx := new(types.Transaction) + if err = stream.Decode(tx); err != nil { + if err != io.EOF { + failure = err + } + if batch.Len() > 0 { + loadBatch(batch) + } + break + } + // New transaction parsed, queue up for later, import if threshold is reached + total++ + + if batch = append(batch, tx); batch.Len() > 1024 { + loadBatch(batch) + batch = batch[:0] + } + } + log.Info("Loaded local transaction journal", "transactions", total, "dropped", dropped) + + return failure +} + +// insert adds the specified transaction to the local disk journal. +func (journal *journal) insert(tx *types.Transaction) error { + if journal.writer == nil { + return errNoActiveJournal + } + if err := rlp.Encode(journal.writer, tx); err != nil { + return err + } + return nil +} + +// rotate regenerates the transaction journal based on the current contents of +// the transaction pool. +func (journal *journal) rotate(all map[common.Address]types.Transactions) error { + // Close the current journal (if any is open) + if journal.writer != nil { + if err := journal.writer.Close(); err != nil { + return err + } + journal.writer = nil + } + // Generate a new journal with the contents of the current pool + replacement, err := os.OpenFile(journal.path+".new", os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644) + if err != nil { + return err + } + journaled := 0 + for _, txs := range all { + for _, tx := range txs { + if err = rlp.Encode(replacement, tx); err != nil { + replacement.Close() + return err + } + } + journaled += len(txs) + } + replacement.Close() + + // Replace the live journal with the newly generated one + if err = os.Rename(journal.path+".new", journal.path); err != nil { + return err + } + sink, err := os.OpenFile(journal.path, os.O_WRONLY|os.O_APPEND, 0644) + if err != nil { + return err + } + journal.writer = sink + + logger := log.Info + if len(all) == 0 { + logger = log.Debug + } + logger("Regenerated local transaction journal", "transactions", journaled, "accounts", len(all)) + + return nil +} + +// close flushes the transaction journal contents to disk and closes the file. +func (journal *journal) close() error { + var err error + + if journal.writer != nil { + err = journal.writer.Close() + journal.writer = nil + } + return err +} diff --git a/mempool/txpool/locals/tx_tracker.go b/mempool/txpool/locals/tx_tracker.go new file mode 100644 index 000000000..0f22b1c42 --- /dev/null +++ b/mempool/txpool/locals/tx_tracker.go @@ -0,0 +1,216 @@ +// Copyright 2023 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +// Package locals implements tracking for "local" transactions +package locals + +import ( + "github.com/cosmos/evm/mempool/txpool" + "github.com/cosmos/evm/mempool/txpool/legacypool" + "slices" + "sync" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/metrics" + "github.com/ethereum/go-ethereum/params" +) + +var ( + recheckInterval = time.Minute + localGauge = metrics.GetOrRegisterGauge("txpool/local", nil) +) + +// TxTracker is a struct used to track priority transactions; it will check from +// time to time if the main pool has forgotten about any of the transaction +// it is tracking, and if so, submit it again. +// This is used to track 'locals'. +// This struct does not care about transaction validity, price-bumps or account limits, +// but optimistically accepts transactions. +type TxTracker struct { + all map[common.Hash]*types.Transaction // All tracked transactions + byAddr map[common.Address]*legacypool.SortedMap // Transactions by address + + journal *journal // Journal of local transaction to back up to disk + rejournal time.Duration // How often to rotate journal + pool *txpool.TxPool // The tx pool to interact with + signer types.Signer + + shutdownCh chan struct{} + mu sync.Mutex + wg sync.WaitGroup +} + +// New creates a new TxTracker +func New(journalPath string, journalTime time.Duration, chainConfig *params.ChainConfig, next *txpool.TxPool) *TxTracker { + pool := &TxTracker{ + all: make(map[common.Hash]*types.Transaction), + byAddr: make(map[common.Address]*legacypool.SortedMap), + signer: types.LatestSigner(chainConfig), + shutdownCh: make(chan struct{}), + pool: next, + } + if journalPath != "" { + pool.journal = newTxJournal(journalPath) + pool.rejournal = journalTime + } + return pool +} + +// Track adds a transaction to the tracked set. +// Note: blob-type transactions are ignored. +func (tracker *TxTracker) Track(tx *types.Transaction) { + tracker.TrackAll([]*types.Transaction{tx}) +} + +// TrackAll adds a list of transactions to the tracked set. +// Note: blob-type transactions are ignored. +func (tracker *TxTracker) TrackAll(txs []*types.Transaction) { + tracker.mu.Lock() + defer tracker.mu.Unlock() + + for _, tx := range txs { + if tx.Type() == types.BlobTxType { + continue + } + // If we're already tracking it, it's a no-op + if _, ok := tracker.all[tx.Hash()]; ok { + continue + } + // Theoretically, checking the error here is unnecessary since sender recovery + // is already part of basic validation. However, retrieving the sender address + // from the transaction cache is effectively a no-op if it was previously verified. + // Therefore, the error is still checked just in case. + addr, err := types.Sender(tracker.signer, tx) + if err != nil { + continue + } + tracker.all[tx.Hash()] = tx + if tracker.byAddr[addr] == nil { + tracker.byAddr[addr] = legacypool.NewSortedMap() + } + tracker.byAddr[addr].Put(tx) + + if tracker.journal != nil { + _ = tracker.journal.insert(tx) + } + } + localGauge.Update(int64(len(tracker.all))) +} + +// recheck checks and returns any transactions that needs to be resubmitted. +func (tracker *TxTracker) recheck(journalCheck bool) (resubmits []*types.Transaction, rejournal map[common.Address]types.Transactions) { + tracker.mu.Lock() + defer tracker.mu.Unlock() + + var ( + numStales = 0 + numOk = 0 + ) + for sender, txs := range tracker.byAddr { + // Wipe the stales + stales := txs.Forward(tracker.pool.Nonce(sender)) + for _, tx := range stales { + delete(tracker.all, tx.Hash()) + } + numStales += len(stales) + + // Check the non-stale + for _, tx := range txs.Flatten() { + if tracker.pool.Has(tx.Hash()) { + numOk++ + continue + } + resubmits = append(resubmits, tx) + } + } + + if journalCheck { // rejournal + rejournal = make(map[common.Address]types.Transactions) + for _, tx := range tracker.all { + addr, _ := types.Sender(tracker.signer, tx) + rejournal[addr] = append(rejournal[addr], tx) + } + // Sort them + for _, list := range rejournal { + // cmp(a, b) should return a negative number when a < b, + slices.SortFunc(list, func(a, b *types.Transaction) int { + return int(a.Nonce() - b.Nonce()) + }) + } + } + localGauge.Update(int64(len(tracker.all))) + log.Debug("Tx tracker status", "need-resubmit", len(resubmits), "stale", numStales, "ok", numOk) + return resubmits, rejournal +} + +// Start implements node.Lifecycle interface +// Start is called after all services have been constructed and the networking +// layer was also initialized to spawn any goroutines required by the service. +func (tracker *TxTracker) Start() error { + tracker.wg.Add(1) + go tracker.loop() + return nil +} + +// Stop implements node.Lifecycle interface +// Stop terminates all goroutines belonging to the service, blocking until they +// are all terminated. +func (tracker *TxTracker) Stop() error { + close(tracker.shutdownCh) + tracker.wg.Wait() + return nil +} + +func (tracker *TxTracker) loop() { + defer tracker.wg.Done() + + if tracker.journal != nil { + tracker.journal.load(func(transactions []*types.Transaction) []error { + tracker.TrackAll(transactions) + return nil + }) + defer tracker.journal.close() + } + var ( + lastJournal = time.Now() + timer = time.NewTimer(10 * time.Second) // Do initial check after 10 seconds, do rechecks more seldom. + ) + for { + select { + case <-tracker.shutdownCh: + return + case <-timer.C: + checkJournal := tracker.journal != nil && time.Since(lastJournal) > tracker.rejournal + resubmits, rejournal := tracker.recheck(checkJournal) + if len(resubmits) > 0 { + tracker.pool.Add(resubmits, false) + } + if checkJournal { + // Lock to prevent journal.rotate <-> journal.insert (via TrackAll) conflicts + tracker.mu.Lock() + lastJournal = time.Now() + if err := tracker.journal.rotate(rejournal); err != nil { + log.Warn("Transaction journal rotation failed", "err", err) + } + tracker.mu.Unlock() + } + timer.Reset(recheckInterval) + } + } +} diff --git a/mempool/txpool/locals/tx_tracker_test.go b/mempool/txpool/locals/tx_tracker_test.go new file mode 100644 index 000000000..367fb6b6d --- /dev/null +++ b/mempool/txpool/locals/tx_tracker_test.go @@ -0,0 +1,165 @@ +// Copyright 2025 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package locals + +import ( + "math/big" + "testing" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/consensus/ethash" + "github.com/ethereum/go-ethereum/core" + "github.com/ethereum/go-ethereum/core/rawdb" + "github.com/ethereum/go-ethereum/core/txpool" + "github.com/ethereum/go-ethereum/core/txpool/legacypool" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/ethdb" + "github.com/ethereum/go-ethereum/params" +) + +var ( + key, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291") + address = crypto.PubkeyToAddress(key.PublicKey) + funds = big.NewInt(1000000000000000) + gspec = &core.Genesis{ + Config: params.TestChainConfig, + Alloc: types.GenesisAlloc{ + address: {Balance: funds}, + }, + BaseFee: big.NewInt(params.InitialBaseFee), + } + signer = types.LatestSigner(gspec.Config) +) + +type testEnv struct { + chain *core.BlockChain + pool *txpool.TxPool + tracker *TxTracker + genDb ethdb.Database +} + +func newTestEnv(t *testing.T, n int, gasTip uint64, journal string) *testEnv { + genDb, blocks, _ := core.GenerateChainWithGenesis(gspec, ethash.NewFaker(), n, func(i int, gen *core.BlockGen) { + tx, err := types.SignTx(types.NewTransaction(gen.TxNonce(address), common.Address{0x00}, big.NewInt(1000), params.TxGas, gen.BaseFee(), nil), signer, key) + if err != nil { + panic(err) + } + gen.AddTx(tx) + }) + + db := rawdb.NewMemoryDatabase() + chain, _ := core.NewBlockChain(db, gspec, ethash.NewFaker(), nil) + + legacyPool := legacypool.New(legacypool.DefaultConfig, chain) + pool, err := txpool.New(gasTip, chain, []txpool.SubPool{legacyPool}) + if err != nil { + t.Fatalf("Failed to create tx pool: %v", err) + } + if n, err := chain.InsertChain(blocks); err != nil { + t.Fatalf("Failed to process block %d: %v", n, err) + } + if err := pool.Sync(); err != nil { + t.Fatalf("Failed to sync the txpool, %v", err) + } + return &testEnv{ + chain: chain, + pool: pool, + tracker: New(journal, time.Minute, gspec.Config, pool), + genDb: genDb, + } +} + +func (env *testEnv) close() { + env.chain.Stop() +} + +// nolint:unused +func (env *testEnv) setGasTip(gasTip uint64) { + env.pool.SetGasTip(new(big.Int).SetUint64(gasTip)) +} + +// nolint:unused +func (env *testEnv) makeTx(nonce uint64, gasPrice *big.Int) *types.Transaction { + if nonce == 0 { + head := env.chain.CurrentHeader() + state, _ := env.chain.StateAt(head.Root) + nonce = state.GetNonce(address) + } + if gasPrice == nil { + gasPrice = big.NewInt(params.GWei) + } + tx, _ := types.SignTx(types.NewTransaction(nonce, common.Address{0x00}, big.NewInt(1000), params.TxGas, gasPrice, nil), signer, key) + return tx +} + +func (env *testEnv) makeTxs(n int) []*types.Transaction { + head := env.chain.CurrentHeader() + state, _ := env.chain.StateAt(head.Root) + nonce := state.GetNonce(address) + + var txs []*types.Transaction + for i := 0; i < n; i++ { + tx, _ := types.SignTx(types.NewTransaction(nonce+uint64(i), common.Address{0x00}, big.NewInt(1000), params.TxGas, big.NewInt(params.GWei), nil), signer, key) + txs = append(txs, tx) + } + return txs +} + +// nolint:unused +func (env *testEnv) commit() { + head := env.chain.CurrentBlock() + block := env.chain.GetBlock(head.Hash(), head.Number.Uint64()) + blocks, _ := core.GenerateChain(env.chain.Config(), block, ethash.NewFaker(), env.genDb, 1, func(i int, gen *core.BlockGen) { + tx, err := types.SignTx(types.NewTransaction(gen.TxNonce(address), common.Address{0x00}, big.NewInt(1000), params.TxGas, gen.BaseFee(), nil), signer, key) + if err != nil { + panic(err) + } + gen.AddTx(tx) + }) + env.chain.InsertChain(blocks) + if err := env.pool.Sync(); err != nil { + panic(err) + } +} + +func TestResubmit(t *testing.T) { + env := newTestEnv(t, 10, 0, "") + defer env.close() + + txs := env.makeTxs(10) + txsA := txs[:len(txs)/2] + txsB := txs[len(txs)/2:] + env.pool.Add(txsA, true) + pending, queued := env.pool.ContentFrom(address) + if len(pending) != len(txsA) || len(queued) != 0 { + t.Fatalf("Unexpected txpool content: %d, %d", len(pending), len(queued)) + } + env.tracker.TrackAll(txs) + + resubmit, all := env.tracker.recheck(true) + if len(resubmit) != len(txsB) { + t.Fatalf("Unexpected transactions to resubmit, got: %d, want: %d", len(resubmit), len(txsB)) + } + if len(all) == 0 || len(all[address]) == 0 { + t.Fatalf("Unexpected transactions being tracked, got: %d, want: %d", 0, len(txs)) + } + if len(all[address]) != len(txs) { + t.Fatalf("Unexpected transactions being tracked, got: %d, want: %d", len(all[address]), len(txs)) + } +} From a720a8d6f52dc5bded6f87da78ade25363410173 Mon Sep 17 00:00:00 2001 From: Abdul Malek Date: Fri, 24 Oct 2025 21:14:48 +0600 Subject: [PATCH 2/2] add rejection handling and new tests --- CHANGELOG.md | 1 + config/server_app_options.go | 9 ++ mempool/mempool.go | 12 ++- mempool/track_local_txs_test.go | 4 + mempool/txpool/locals/errors.go | 20 +---- mempool/txpool/locals/errors_test.go | 48 +++++++++++ mempool/txpool/locals/tx_tracker.go | 4 +- mempool/txpool/locals/tx_tracker_test.go | 104 ++++++++++++++++++++++- rpc/backend/call_tx.go | 15 ++++ rpc/backend/sign_tx.go | 13 +++ 10 files changed, 204 insertions(+), 26 deletions(-) create mode 100644 mempool/track_local_txs_test.go create mode 100644 mempool/txpool/locals/errors_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 1ac4e2df3..74e0fdb89 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -82,6 +82,7 @@ ### FEATURES +- [\#646](https://github.com/cosmos/evm/pull/646) Add TxTracker support for tracking priority transactions and handling temporary rejections - [\#665](https://github.com/cosmos/evm/pull/665) Add EvmCodec address codec implementation - [\#346](https://github.com/cosmos/evm/pull/346) Add eth_createAccessList method and implementation - [\#337](https://github.com/cosmos/evm/pull/337) Support state overrides in eth_call. diff --git a/config/server_app_options.go b/config/server_app_options.go index c98f1bf2a..46bdd305c 100644 --- a/config/server_app_options.go +++ b/config/server_app_options.go @@ -2,6 +2,7 @@ package config import ( "math" + "os" "path/filepath" "github.com/holiman/uint256" @@ -140,6 +141,14 @@ func GetLegacyPoolConfig(appOpts servertypes.AppOptions, logger log.Logger) *leg legacyConfig.Lifetime = lifetime } + // Set journal path under .evmd data dir and ensure dir exists + homeDir := cast.ToString(appOpts.Get(flags.FlagHome)) + if homeDir != "" { + journalDir := filepath.Join(homeDir, "data", "txpool") + _ = os.MkdirAll(journalDir, 0o755) + legacyConfig.Journal = filepath.Join(journalDir, legacyConfig.Journal) + } + return &legacyConfig } diff --git a/mempool/mempool.go b/mempool/mempool.go index d768e2610..d5ea3729d 100644 --- a/mempool/mempool.go +++ b/mempool/mempool.go @@ -7,8 +7,6 @@ import ( "sync" "time" - "github.com/cosmos/evm/mempool/txpool/locals" - ethtypes "github.com/ethereum/go-ethereum/core/types" "github.com/holiman/uint256" @@ -17,6 +15,7 @@ import ( "github.com/cosmos/evm/mempool/miner" "github.com/cosmos/evm/mempool/txpool" "github.com/cosmos/evm/mempool/txpool/legacypool" + "github.com/cosmos/evm/mempool/txpool/locals" "github.com/cosmos/evm/rpc/stream" evmtypes "github.com/cosmos/evm/x/vm/types" @@ -218,6 +217,15 @@ func NewExperimentalEVMMempool( return evmMempool } +// TrackLocalTxs tracks transactions as local priority via TxTracker. +// No-op if local tracking is not initialized. +func (m *ExperimentalEVMMempool) TrackLocalTxs(txs []*ethtypes.Transaction) { + if m == nil || m.localTxTracker == nil || len(txs) == 0 { + return + } + m.localTxTracker.TrackAll(txs) +} + // GetBlockchain returns the blockchain interface used for chain head event notifications. // This is primarily used to notify the mempool when new blocks are finalized. func (m *ExperimentalEVMMempool) GetBlockchain() *Blockchain { diff --git a/mempool/track_local_txs_test.go b/mempool/track_local_txs_test.go new file mode 100644 index 000000000..74aed4921 --- /dev/null +++ b/mempool/track_local_txs_test.go @@ -0,0 +1,4 @@ +// Intentionally left empty; comprehensive TxTracker behavior is tested in +// package locals where internals are accessible. This file reserved for future +// integration wiring tests if needed. +package mempool diff --git a/mempool/txpool/locals/errors.go b/mempool/txpool/locals/errors.go index fda50bf21..2eca36b8e 100644 --- a/mempool/txpool/locals/errors.go +++ b/mempool/txpool/locals/errors.go @@ -1,26 +1,10 @@ -// Copyright 2025 The go-ethereum Authors -// This file is part of the go-ethereum library. -// -// The go-ethereum library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The go-ethereum library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the go-ethereum library. If not, see . - package locals import ( "errors" - "github.com/ethereum/go-ethereum/core/txpool" - "github.com/ethereum/go-ethereum/core/txpool/legacypool" + "github.com/cosmos/evm/mempool/txpool" + "github.com/cosmos/evm/mempool/txpool/legacypool" ) // IsTemporaryReject determines whether the given error indicates a temporary diff --git a/mempool/txpool/locals/errors_test.go b/mempool/txpool/locals/errors_test.go new file mode 100644 index 000000000..a163131b7 --- /dev/null +++ b/mempool/txpool/locals/errors_test.go @@ -0,0 +1,48 @@ +package locals + +import ( + "errors" + "testing" + + "github.com/cosmos/evm/mempool/txpool" + "github.com/cosmos/evm/mempool/txpool/legacypool" +) + +func TestIsTemporaryReject_PositiveCases(t *testing.T) { + cases := []struct { + name string + err error + }{ + {name: "delegated out-of-order nonce", err: legacypool.ErrOutOfOrderTxFromDelegated}, + {name: "inflight tx limit reached", err: txpool.ErrInflightTxLimitReached}, + {name: "authority reserved", err: legacypool.ErrAuthorityReserved}, + {name: "underpriced", err: txpool.ErrUnderpriced}, + {name: "txpool overflow", err: legacypool.ErrTxPoolOverflow}, + {name: "future replace pending", err: legacypool.ErrFutureReplacePending}, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + if !IsTemporaryReject(tc.err) { + t.Fatalf("expected temporary reject error to be detected, got false: %v", tc.err) + } + }) + } +} + +func TestIsTemporaryReject_NegativeCases(t *testing.T) { + cases := []struct { + name string + err error + }{ + {name: "nil", err: nil}, + {name: "unrelated", err: errors.New("some unrelated error")}, + {name: "substring lookalike", err: errors.New("under price threshold")}, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + if IsTemporaryReject(tc.err) { + t.Fatalf("did not expect temporary reject error for: %v", tc.err) + } + }) + } +} diff --git a/mempool/txpool/locals/tx_tracker.go b/mempool/txpool/locals/tx_tracker.go index 0f22b1c42..7b616e167 100644 --- a/mempool/txpool/locals/tx_tracker.go +++ b/mempool/txpool/locals/tx_tracker.go @@ -18,12 +18,12 @@ package locals import ( - "github.com/cosmos/evm/mempool/txpool" - "github.com/cosmos/evm/mempool/txpool/legacypool" "slices" "sync" "time" + "github.com/cosmos/evm/mempool/txpool" + "github.com/cosmos/evm/mempool/txpool/legacypool" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/log" diff --git a/mempool/txpool/locals/tx_tracker_test.go b/mempool/txpool/locals/tx_tracker_test.go index 367fb6b6d..af9ef713b 100644 --- a/mempool/txpool/locals/tx_tracker_test.go +++ b/mempool/txpool/locals/tx_tracker_test.go @@ -21,15 +21,17 @@ import ( "testing" "time" + "github.com/cosmos/evm/mempool/txpool" + "github.com/cosmos/evm/mempool/txpool/legacypool" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/consensus/ethash" "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/rawdb" - "github.com/ethereum/go-ethereum/core/txpool" - "github.com/ethereum/go-ethereum/core/txpool/legacypool" "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/core/vm" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/ethdb" + "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/params" ) @@ -54,6 +56,22 @@ type testEnv struct { genDb ethdb.Database } +// testChainAdapter adapts geth *core.BlockChain to the local txpool/legacypool BlockChain interfaces. +type testChainAdapter struct{ c *core.BlockChain } + +func (a *testChainAdapter) Config() *params.ChainConfig { return a.c.Config() } +func (a *testChainAdapter) CurrentBlock() *types.Header { return a.c.CurrentHeader() } +func (a *testChainAdapter) GetBlock(hash common.Hash, number uint64) *types.Block { + return a.c.GetBlock(hash, number) +} +func (a *testChainAdapter) StateAt(root common.Hash) (vm.StateDB, error) { + st, err := a.c.StateAt(root) + return st, err +} +func (a *testChainAdapter) SubscribeChainHeadEvent(ch chan<- core.ChainHeadEvent) event.Subscription { + return a.c.SubscribeChainHeadEvent(ch) +} + func newTestEnv(t *testing.T, n int, gasTip uint64, journal string) *testEnv { genDb, blocks, _ := core.GenerateChainWithGenesis(gspec, ethash.NewFaker(), n, func(i int, gen *core.BlockGen) { tx, err := types.SignTx(types.NewTransaction(gen.TxNonce(address), common.Address{0x00}, big.NewInt(1000), params.TxGas, gen.BaseFee(), nil), signer, key) @@ -66,8 +84,9 @@ func newTestEnv(t *testing.T, n int, gasTip uint64, journal string) *testEnv { db := rawdb.NewMemoryDatabase() chain, _ := core.NewBlockChain(db, gspec, ethash.NewFaker(), nil) - legacyPool := legacypool.New(legacypool.DefaultConfig, chain) - pool, err := txpool.New(gasTip, chain, []txpool.SubPool{legacyPool}) + adapter := &testChainAdapter{c: chain} + legacyPool := legacypool.New(legacypool.DefaultConfig, adapter) + pool, err := txpool.New(gasTip, adapter, []txpool.SubPool{legacyPool}) if err != nil { t.Fatalf("Failed to create tx pool: %v", err) } @@ -163,3 +182,80 @@ func TestResubmit(t *testing.T) { t.Fatalf("Unexpected transactions being tracked, got: %d, want: %d", len(all[address]), len(txs)) } } + +func TestTrackAddsAndResubmitsAll(t *testing.T) { + env := newTestEnv(t, 5, 0, "") + defer env.close() + + // Create a contiguous set of txs but do NOT add any to the pool + txs := env.makeTxs(5) + env.tracker.TrackAll(txs) + + // Since none are present in the pool, all should be scheduled for resubmission + resubmit, all := env.tracker.recheck(true) + if len(resubmit) != len(txs) { + t.Fatalf("expected all transactions to be resubmitted, got %d want %d", len(resubmit), len(txs)) + } + if len(all[address]) != len(txs) { + t.Fatalf("expected all transactions tracked, got %d want %d", len(all[address]), len(txs)) + } + + // Now add them to the pool as if resubmitted + env.pool.Add(txs, false) + + resubmit2, _ := env.tracker.recheck(false) + if len(resubmit2) != 0 { + t.Fatalf("expected no resubmissions after promotion, got %d", len(resubmit2)) + } +} + +func TestDropObsoleteOnHigherNonce(t *testing.T) { + env := newTestEnv(t, 5, 0, "") + defer env.close() + + // Make and track 6 txs starting at current nonce + txs := env.makeTxs(6) + env.tracker.TrackAll(txs) + + // Advance the chain/account nonce by 3 (mine 3 blocks each adding a tx from the same account) + for i := 0; i < 3; i++ { + env.commit() + } + + // Recheck should drop the first 3 as stale + resubmit, all := env.tracker.recheck(true) + if len(all[address]) != 3 { + t.Fatalf("expected 3 transactions to remain tracked after nonce advance, got %d", len(all[address])) + } + for _, tx := range resubmit { + // none of the resubmits should have nonce less than current pool nonce + sender, _ := types.Sender(signer, tx) + if tx.Nonce() < env.pool.Nonce(sender) { + t.Fatalf("found stale tx in resubmits: nonce %d < pool nonce %d", tx.Nonce(), env.pool.Nonce(sender)) + } + } +} + +func TestPromoteThenNoRetry(t *testing.T) { + env := newTestEnv(t, 4, 0, "") + defer env.close() + + // Track 4 txs, add 2 to pool. Expect 2 resubmits. + txs := env.makeTxs(4) + txsA := txs[:2] + txsB := txs[2:] + env.pool.Add(txsA, true) + env.tracker.TrackAll(txs) + + resubmit, _ := env.tracker.recheck(false) + if len(resubmit) != len(txsB) { + t.Fatalf("unexpected resubmit count, got %d want %d", len(resubmit), len(txsB)) + } + + // Promote missing ones by adding them; next recheck should yield none + env.pool.Add(resubmit, false) + resubmit2, _ := env.tracker.recheck(false) + if len(resubmit2) != 0 { + t.Fatalf("expected no resubmits after all txs present in pool, got %d", len(resubmit2)) + } +} diff --git a/rpc/backend/call_tx.go b/rpc/backend/call_tx.go index 61dfc0b34..5c65bce3b 100644 --- a/rpc/backend/call_tx.go +++ b/rpc/backend/call_tx.go @@ -17,6 +17,7 @@ import ( "google.golang.org/grpc/status" "github.com/cosmos/evm/mempool" + txlocals "github.com/cosmos/evm/mempool/txpool/locals" rpctypes "github.com/cosmos/evm/rpc/types" evmtypes "github.com/cosmos/evm/x/vm/types" @@ -158,6 +159,14 @@ func (b *Backend) SendRawTransaction(data hexutil.Bytes) (common.Hash, error) { if b.Mempool != nil && strings.Contains(err.Error(), mempool.ErrNonceGap.Error()) { // Transaction was successfully queued due to nonce gap, return success to client b.Logger.Debug("transaction queued due to nonce gap", "hash", txHash.Hex()) + // Track as local for priority and persistence + b.Mempool.TrackLocalTxs([]*ethtypes.Transaction{tx}) + return txHash, nil + } + // Temporary txpool rejections should be locally tracked for resubmission + if b.Mempool != nil && txlocals.IsTemporaryReject(err) { + b.Logger.Debug("temporary rejection, tracking locally", "hash", txHash.Hex(), "err", err.Error()) + b.Mempool.TrackLocalTxs([]*ethtypes.Transaction{tx}) return txHash, nil } if b.Mempool != nil && strings.Contains(err.Error(), mempool.ErrNonceLow.Error()) { @@ -176,6 +185,8 @@ func (b *Backend) SendRawTransaction(data hexutil.Bytes) (common.Hash, error) { } // SendRawTransaction does not return error when committed nonce <= tx.Nonce < pending nonce + // Track as local for persistence until mined + b.Mempool.TrackLocalTxs([]*ethtypes.Transaction{tx}) return txHash, nil } @@ -183,6 +194,10 @@ func (b *Backend) SendRawTransaction(data hexutil.Bytes) (common.Hash, error) { return txHash, fmt.Errorf("failed to broadcast transaction: %w", err) } + // On success, track as local too to persist across restarts until mined + if b.Mempool != nil { + b.Mempool.TrackLocalTxs([]*ethtypes.Transaction{tx}) + } return txHash, nil } diff --git a/rpc/backend/sign_tx.go b/rpc/backend/sign_tx.go index 859c2c3a8..a8bd67437 100644 --- a/rpc/backend/sign_tx.go +++ b/rpc/backend/sign_tx.go @@ -14,6 +14,7 @@ import ( "github.com/ethereum/go-ethereum/signer/core/apitypes" "github.com/cosmos/evm/mempool" + txlocals "github.com/cosmos/evm/mempool/txpool/locals" evmtypes "github.com/cosmos/evm/x/vm/types" errorsmod "cosmossdk.io/errors" @@ -113,6 +114,14 @@ func (b *Backend) SendTransaction(args evmtypes.TransactionArgs) (common.Hash, e if b.Mempool != nil && strings.Contains(err.Error(), mempool.ErrNonceGap.Error()) { // Transaction was successfully queued due to nonce gap, return success to client b.Logger.Debug("transaction queued due to nonce gap", "hash", txHash.Hex()) + // Track as local for priority and persistence + b.Mempool.TrackLocalTxs([]*ethtypes.Transaction{ethTx}) + return txHash, nil + } + // Temporary txpool rejections should be locally tracked for resubmission + if b.Mempool != nil && txlocals.IsTemporaryReject(err) { + b.Logger.Debug("temporary rejection, tracking locally", "hash", txHash.Hex(), "err", err.Error()) + b.Mempool.TrackLocalTxs([]*ethtypes.Transaction{ethTx}) return txHash, nil } b.Logger.Error("failed to broadcast tx", "error", err.Error()) @@ -120,6 +129,10 @@ func (b *Backend) SendTransaction(args evmtypes.TransactionArgs) (common.Hash, e } // Return transaction hash + // On success, track as local too to persist across restarts until mined + if b.Mempool != nil { + b.Mempool.TrackLocalTxs([]*ethtypes.Transaction{ethTx}) + } return txHash, nil }