diff --git a/app/abci_utils.go b/app/abci_utils.go index c1a3a661..b39ad2ce 100644 --- a/app/abci_utils.go +++ b/app/abci_utils.go @@ -1,9 +1,9 @@ package app import ( + "errors" "fmt" - "github.com/cockroachdb/errors" abci "github.com/cometbft/cometbft/abci/types" gethtypes "github.com/ethereum/go-ethereum/core/types" evmtypes "github.com/evmos/ethermint/x/evm/types" @@ -97,11 +97,12 @@ func (h *DefaultProposalHandler) PrepareProposalHandler() sdk.PrepareProposalHan return abci.ResponsePrepareProposal{Txs: h.txSelector.SelectedTxs()} } - iterator := h.mempool.Select(ctx, req.Txs) selectedTxsSignersSeqs := make(map[string]uint64) var selectedTxsNums int - for iterator != nil { - memTx := iterator.Tx() + + var waitRemoveTxs []sdk.Tx + + mempool.SelectBy(ctx, h.mempool, req.Txs, func(memTx sdk.Tx) bool { sigs, err := memTx.(signing.SigVerifiableTx).GetSignaturesV2() if err != nil { panic(fmt.Errorf("failed to get signatures: %w", err)) @@ -157,47 +158,49 @@ func (h *DefaultProposalHandler) PrepareProposalHandler() sdk.PrepareProposalHan } } - if !shouldAdd { - iterator = iterator.Next() - continue - } - - // NOTE: Since transaction verification was already executed in CheckTx, - // which calls mempool.Insert, in theory everything in the pool should be - // valid. But some mempool implementations may insert invalid txs, so we - // check again. - txBz, err := h.txVerifier.PrepareProposalVerifyTx(memTx) - if err != nil { - err := h.mempool.Remove(memTx) - if err != nil && !errors.Is(err, mempool.ErrTxNotFound) { - panic(err) - } - } else { - stop := h.txSelector.SelectTxForProposal(uint64(req.MaxTxBytes), maxBlockGas, memTx, txBz) - if stop { - break - } + if shouldAdd { + // NOTE: Since transaction verification was already executed in CheckTx, + // which calls mempool.Insert, in theory everything in the pool should be + // valid. But some mempool implementations may insert invalid txs, so we + // check again. + txBz, err := h.txVerifier.PrepareProposalVerifyTx(memTx) + if err != nil { + waitRemoveTxs = append(waitRemoveTxs, memTx) + } else { + stop := h.txSelector.SelectTxForProposal(uint64(req.MaxTxBytes), maxBlockGas, memTx, txBz) + if stop { + return false + } - txsLen := len(h.txSelector.SelectedTxs()) - for sender, seq := range txSignersSeqs { - // If txsLen != selectedTxsNums is true, it means that we've - // added a new tx to the selected txs, so we need to update - // the sequence of the sender. - if txsLen != selectedTxsNums { - selectedTxsSignersSeqs[sender] = seq - } else if _, ok := selectedTxsSignersSeqs[sender]; !ok { - // The transaction hasn't been added but it passed the - // verification, so we know that the sequence is correct. - // So we set this sender's sequence to seq-1, in order - // to avoid unnecessary calls to PrepareProposalVerifyTx. - selectedTxsSignersSeqs[sender] = seq - 1 + txsLen := len(h.txSelector.SelectedTxs()) + for sender, seq := range txSignersSeqs { + // If txsLen != selectedTxsNums is true, it means that we've + // added a new tx to the selected txs, so we need to update + // the sequence of the sender. + if txsLen != selectedTxsNums { + selectedTxsSignersSeqs[sender] = seq + } else if _, ok := selectedTxsSignersSeqs[sender]; !ok { + // The transaction hasn't been added but it passed the + // verification, so we know that the sequence is correct. + // So we set this sender's sequence to seq-1, in order + // to avoid unnecessary calls to PrepareProposalVerifyTx. + selectedTxsSignersSeqs[sender] = seq - 1 + } } + selectedTxsNums = txsLen } - selectedTxsNums = txsLen } - iterator = iterator.Next() + return true + }) + + for i := range waitRemoveTxs { + err := h.mempool.Remove(waitRemoveTxs[i]) + if err != nil && !errors.Is(err, mempool.ErrTxNotFound) { + panic(err) + } } + return abci.ResponsePrepareProposal{Txs: h.txSelector.SelectedTxs()} } } diff --git a/app/app.go b/app/app.go index 814f0631..d148e857 100644 --- a/app/app.go +++ b/app/app.go @@ -1068,3 +1068,47 @@ func GetMaccPerms() map[string][]string { } return perms } + +type accountNonceOp struct { + ak evmtypes.AccountKeeper +} + +type AccountNonceOp interface { + GetAccountNonce(ctx sdk.Context, address string) uint64 + SetAccountNonce(ctx sdk.Context, address string, nonce uint64) +} + +func NewAccountNonceOp(app *App) AccountNonceOp { + return &accountNonceOp{ + ak: app.accountKeeper, + } +} + +func (ano *accountNonceOp) GetAccountNonce(ctx sdk.Context, address string) uint64 { + bzAcc, err := sdk.AccAddressFromBech32(address) + if err != nil { + ctx.Logger().Error("GetAccountNonce: failed to parse address", "address", address, "error", err) + return 0 + } + acc := ano.ak.GetAccount(ctx, bzAcc) + if acc == nil { + ctx.Logger().Error("GetAccountNonce: account not found", "address", address) + return 0 + } + return acc.GetSequence() +} + +func (ano *accountNonceOp) SetAccountNonce(ctx sdk.Context, address string, nonce uint64) { + bzAcc, err := sdk.AccAddressFromBech32(address) + if err != nil { + ctx.Logger().Error("SetAccountNonce: failed to parse address", "address", address, "nonce", nonce, "error", err) + return + } + acc := ano.ak.GetAccount(ctx, bzAcc) + if acc != nil { + acc.SetSequence(nonce) + ano.ak.SetAccount(ctx, acc) + } else { + ctx.Logger().Error("SetAccountNonce: account not found", "address", address) + } +} diff --git a/app/priority_nonce.go b/app/priority_nonce.go index a7c604de..4b607cbe 100644 --- a/app/priority_nonce.go +++ b/app/priority_nonce.go @@ -2,10 +2,13 @@ package app import ( "context" + "sync" + "fmt" "math" "github.com/huandu/skiplist" + "github.com/pkg/errors" sdk "github.com/cosmos/cosmos-sdk/types" "github.com/cosmos/cosmos-sdk/types/mempool" @@ -14,11 +17,16 @@ import ( evmtypes "github.com/evmos/ethermint/x/evm/types" ) -const MAX_TXS_PRE_SENDER_IN_MEMPOOL int = 10 +const MAX_TXS_PRE_SENDER_IN_MEMPOOL int = 48 var ( _ mempool.Mempool = (*PriorityNonceMempool)(nil) _ mempool.Iterator = (*PriorityNonceIterator)(nil) + + errMempoolTxGasPriceTooLow = errors.New("gas price is too low") + errMempoolTooManyTxs = errors.New("tx sender has too many txs in mempool") + errMempoolIsFull = errors.New("mempool is full") + errTxInMempool = errors.New("tx already in mempool") ) // PriorityNonceMempool is a mempool implementation that stores txs @@ -29,14 +37,20 @@ var ( // priority to other sender txs and must be partially ordered by both sender-nonce // and priority. type PriorityNonceMempool struct { - priorityIndex *skiplist.SkipList - priorityCounts map[int64]int - senderIndices map[string]*skiplist.SkipList - scores map[txMeta]txMeta - onRead func(tx sdk.Tx) - txReplacement func(op, np int64, oTx, nTx sdk.Tx) bool - maxTx int + mtx sync.Mutex + priorityIndex *skiplist.SkipList + priorityCounts map[int64]int + senderIndices map[string]*skiplist.SkipList + scores map[txMeta]txMeta + onRead func(tx sdk.Tx) + txReplacement func(op, np int64, oTx, nTx sdk.Tx) bool + maxTx int + + senderTxCntLock sync.RWMutex counterBySender map[string]int + txRecord map[txMeta]struct{} + + txReplacedCallback func(ctx context.Context, oldTx, newTx *TxInfo) } type PriorityNonceIterator struct { @@ -123,6 +137,12 @@ func PriorityNonceWithMaxTx(maxTx int) PriorityNonceMempoolOption { } } +func PriorityNonceWithTxReplacedCallback(cb func(ctx context.Context, oldTx, newTx *TxInfo)) PriorityNonceMempoolOption { + return func(mp *PriorityNonceMempool) { + mp.txReplacedCallback = cb + } +} + // DefaultPriorityMempool returns a priorityNonceMempool with no options. func DefaultPriorityMempool() mempool.Mempool { return NewPriorityMempool() @@ -137,6 +157,7 @@ func NewPriorityMempool(opts ...PriorityNonceMempoolOption) *PriorityNonceMempoo senderIndices: make(map[string]*skiplist.SkipList), scores: make(map[txMeta]txMeta), counterBySender: make(map[string]int), + txRecord: make(map[txMeta]struct{}), } for _, opt := range opts { @@ -169,67 +190,41 @@ func (mp *PriorityNonceMempool) NextSenderTx(sender string) sdk.Tx { // Inserting a duplicate tx with a different priority overwrites the existing tx, // changing the total order of the mempool. func (mp *PriorityNonceMempool) Insert(ctx context.Context, tx sdk.Tx) error { - if mp.maxTx > 0 && mp.CountTx() >= mp.maxTx { - return mempool.ErrMempoolTxMaxCapacity - } else if mp.maxTx < 0 { + mp.mtx.Lock() + defer mp.mtx.Unlock() + + // if mp.maxTx > 0 && mp.CountTx() >= mp.maxTx { + // return mempool.ErrMempoolTxMaxCapacity + // } else + if mp.maxTx < 0 { return nil } - sigs, err := tx.(signing.SigVerifiableTx).GetSignaturesV2() - if err != nil { - return err - } sdkContext := sdk.UnwrapSDKContext(ctx) priority := sdkContext.Priority() - var sender string - var nonce uint64 - - if len(sigs) == 0 { - msgs := tx.GetMsgs() - if len(msgs) != 1 { - return fmt.Errorf("tx must have at least one signer") - } - msgEthTx, ok := msgs[0].(*evmtypes.MsgEthereumTx) - if !ok { - return fmt.Errorf("tx must have at least one signer") - } - ethTx := msgEthTx.AsTransaction() - signer := gethtypes.NewEIP2930Signer(ethTx.ChainId()) - ethSender, err := signer.Sender(ethTx) - if err != nil { - return fmt.Errorf("tx must have at least one signer") - } - sender = sdk.AccAddress(ethSender.Bytes()).String() - nonce = ethTx.Nonce() - } else { - sig := sigs[0] - sender = sdk.AccAddress(sig.PubKey.Address()).String() - nonce = sig.Sequence + txInfo, err := extractTxInfo(tx) + if err != nil { + return err } - if _, exists := mp.counterBySender[sender]; !exists { - mp.counterBySender[sender] = 1 - } else { - if mp.counterBySender[sender] < MAX_TXS_PRE_SENDER_IN_MEMPOOL { - mp.counterBySender[sender] += 1 - } else { - return fmt.Errorf("tx sender has too many txs in mempool") - } + if !mp.canInsert(txInfo.Sender) { + return errors.Wrapf(errMempoolTooManyTxs, "[%d@%s]sender has too many txs in mempool", txInfo.Nonce, txInfo.Sender) } - key := txMeta{nonce: nonce, priority: priority, sender: sender} - - senderIndex, ok := mp.senderIndices[sender] + // init sender index if not exists + senderIndex, ok := mp.senderIndices[txInfo.Sender] if !ok { senderIndex = skiplist.New(skiplist.LessThanFunc(func(a, b any) int { return skiplist.Uint64.Compare(b.(txMeta).nonce, a.(txMeta).nonce) })) // initialize sender index if not found - mp.senderIndices[sender] = senderIndex + mp.senderIndices[txInfo.Sender] = senderIndex } + newKey := txMeta{nonce: txInfo.Nonce, priority: priority, sender: txInfo.Sender} + // Since mp.priorityIndex is scored by priority, then sender, then nonce, a // changed priority will create a new key, so we must remove the old key and // re-insert it to avoid having the same tx with different priorityIndex indexed @@ -237,35 +232,155 @@ func (mp *PriorityNonceMempool) Insert(ctx context.Context, tx sdk.Tx) error { // // This O(log n) remove operation is rare and only happens when a tx's priority // changes. - sk := txMeta{nonce: nonce, sender: sender} + + sk := txMeta{nonce: txInfo.Nonce, sender: txInfo.Sender} if oldScore, txExists := mp.scores[sk]; txExists { - if mp.txReplacement != nil && !mp.txReplacement(oldScore.priority, priority, senderIndex.Get(key).Value.(sdk.Tx), tx) { - return fmt.Errorf( - "tx doesn't fit the replacement rule, oldPriority: %v, newPriority: %v, oldTx: %v, newTx: %v", - oldScore.priority, - priority, - senderIndex.Get(key).Value.(sdk.Tx), - tx, - ) + if oldScore.priority < priority { + oldTx := senderIndex.Get(newKey).Value.(sdk.Tx) + return mp.doTxReplace(ctx, newKey, oldScore, oldTx, tx) } + return errors.Wrapf(errTxInMempool, "[%d@%s] tx already in mempool", txInfo.Nonce, txInfo.Sender) + } else { + mempoolSize := mp.priorityIndex.Len() + if mempoolSize >= mp.maxTx { + lowestPriority := mp.getLowestPriority() + // find one to replace + if lowestPriority > 0 && priority <= lowestPriority { + return errors.Wrapf(errMempoolTxGasPriceTooLow, "[%d@%s]tx with priority %d is too low, current lowest priority is %d", newKey.nonce, newKey.sender, priority, lowestPriority) + } + + var maxIndexSize int + var lowerPriority int64 = math.MaxInt64 + var selectedElement *skiplist.Element + for sender, index := range mp.senderIndices { + indexSize := index.Len() + if sender == txInfo.Sender { + continue + } + + if indexSize > 0 { + tail := index.Back() + if tail != nil { + tailKey := tail.Key().(txMeta) + if tailKey.priority < lowerPriority { + lowerPriority = tailKey.priority + maxIndexSize = indexSize + selectedElement = tail + } else if tailKey.priority == lowerPriority { + if indexSize > maxIndexSize { + maxIndexSize = indexSize + selectedElement = tail + } + } + } + } + } + + if selectedElement != nil { + key := selectedElement.Key().(txMeta) + replacedTx, _ := mp.doRemove(key, true) + + // insert new tx + mp.doInsert(newKey, tx, true) + + if mp.txReplacedCallback != nil && replacedTx != nil { + sdkContext.Logger().Debug("txn replaced caused by full of mempool", "old", fmt.Sprintf("%d@%s", key.nonce, key.sender), "new", fmt.Sprintf("%d@%s", newKey.nonce, newKey.sender), "mempoolSize", mempoolSize) + mp.txReplacedCallback(ctx, + &TxInfo{Sender: key.sender, Nonce: key.nonce, Tx: replacedTx}, + &TxInfo{Sender: newKey.sender, Nonce: newKey.nonce, Tx: tx}, + ) + } + } else { + return errors.Wrapf(errMempoolIsFull, "%d@%s with priority%d", newKey.nonce, newKey.sender, newKey.priority) + } + } else { + mp.doInsert(newKey, tx, true) + } + return nil + } +} + +func (mp *PriorityNonceMempool) doInsert(newKey txMeta, tx sdk.Tx, incrCnt bool) { + senderIndex, ok := mp.senderIndices[newKey.sender] + if !ok { + senderIndex = skiplist.New(skiplist.LessThanFunc(func(a, b any) int { + return skiplist.Uint64.Compare(b.(txMeta).nonce, a.(txMeta).nonce) + })) + + // initialize sender index if not found + mp.senderIndices[newKey.sender] = senderIndex + } + + mp.priorityCounts[newKey.priority]++ + newKey.senderElement = senderIndex.Set(newKey, tx) + + mp.scores[txMeta{nonce: newKey.nonce, sender: newKey.sender}] = txMeta{priority: newKey.priority} + mp.priorityIndex.Set(newKey, tx) + + if incrCnt { + mp.incrSenderTxCnt(newKey.sender, newKey.nonce) + } +} + +func (mp *PriorityNonceMempool) doRemove(oldKey txMeta, decrCnt bool) (sdk.Tx, error) { + scoreKey := txMeta{nonce: oldKey.nonce, sender: oldKey.sender} + score, ok := mp.scores[scoreKey] + if !ok { + return nil, errors.Wrapf(mempool.ErrTxNotFound, "%d@%s not found", oldKey.nonce, oldKey.sender) + } + tk := txMeta{nonce: oldKey.nonce, priority: score.priority, sender: oldKey.sender, weight: score.weight} - mp.priorityIndex.Remove(txMeta{ - nonce: nonce, - sender: sender, - priority: oldScore.priority, - weight: oldScore.weight, - }) - mp.priorityCounts[oldScore.priority]-- + senderTxs, ok := mp.senderIndices[oldKey.sender] + if !ok { + return nil, fmt.Errorf("%d@%s not found", oldKey.nonce, oldKey.sender) } - mp.priorityCounts[priority]++ + mp.priorityIndex.Remove(tk) + removedElem := senderTxs.Remove(tk) + delete(mp.scores, scoreKey) + mp.priorityCounts[score.priority]-- - // Since senderIndex is scored by nonce, a changed priority will overwrite the - // existing key. - key.senderElement = senderIndex.Set(key, tx) + if decrCnt { + mp.decrSenderTxCnt(oldKey.sender, oldKey.nonce) + } - mp.scores[sk] = txMeta{priority: priority} - mp.priorityIndex.Set(key, tx) + if removedElem == nil { + return nil, mempool.ErrTxNotFound + } + + return removedElem.Value.(sdk.Tx), nil +} + +func (mp *PriorityNonceMempool) doTxReplace(ctx context.Context, newMate, oldMate txMeta, oldTx, newTx sdk.Tx) error { + if mp.txReplacement != nil && !mp.txReplacement(oldMate.priority, newMate.priority, oldTx, newTx) { + return fmt.Errorf( + "tx doesn't fit the replacement rule, oldPriority: %v, newPriority: %v, oldTx: %v, newTx: %v", + oldMate.priority, + newMate.priority, + oldTx, + newTx, + ) + } + + e := mp.priorityIndex.Remove(txMeta{ + nonce: newMate.nonce, + sender: newMate.sender, + priority: oldMate.priority, + weight: oldMate.weight, + }) + replacedTx := e.Value.(sdk.Tx) + mp.priorityCounts[oldMate.priority]-- + + mp.doInsert(newMate, newTx, false) + + if mp.txReplacedCallback != nil && replacedTx != nil { + sdkContext := sdk.UnwrapSDKContext(ctx) + sdkContext.Logger().Debug("txn update", "txn", fmt.Sprintf("%d@%s", newMate.nonce, newMate.sender), "oldPriority", oldMate.priority, "newPriority", newMate.priority) + mp.txReplacedCallback(ctx, + &TxInfo{Sender: newMate.sender, Nonce: newMate.nonce, Tx: replacedTx}, + &TxInfo{Sender: newMate.sender, Nonce: newMate.nonce, Tx: newTx}, + ) + } return nil } @@ -346,7 +461,24 @@ func (i *PriorityNonceIterator) Tx() sdk.Tx { // // NOTE: It is not safe to use this iterator while removing transactions from // the underlying mempool. -func (mp *PriorityNonceMempool) Select(_ context.Context, _ [][]byte) mempool.Iterator { +func (mp *PriorityNonceMempool) Select(ctx context.Context, txs [][]byte) mempool.Iterator { + mp.mtx.Lock() + defer mp.mtx.Unlock() + + return mp.doSelect(ctx, txs) +} + +func (mp *PriorityNonceMempool) SelectBy(ctx context.Context, txs [][]byte, callback func(sdk.Tx) bool) { + mp.mtx.Lock() + defer mp.mtx.Unlock() + + iter := mp.doSelect(ctx, txs) + for iter != nil && callback(iter.Tx()) { + iter = iter.Next() + } +} + +func (mp *PriorityNonceMempool) doSelect(_ context.Context, _ [][]byte) mempool.Iterator { if mp.priorityIndex.Len() == 0 { return nil } @@ -361,6 +493,16 @@ func (mp *PriorityNonceMempool) Select(_ context.Context, _ [][]byte) mempool.It return iterator.iteratePriority() } +func (mp *PriorityNonceMempool) GetSenderUncommittedTxnCount(ctx context.Context, sender string) int { + mp.mtx.Lock() + defer mp.mtx.Unlock() + + if _, exists := mp.counterBySender[sender]; exists { + return mp.counterBySender[sender] + } + return 0 +} + type reorderKey struct { deleteKey txMeta insertKey txMeta @@ -415,51 +557,34 @@ func senderWeight(senderCursor *skiplist.Element) int64 { // CountTx returns the number of transactions in the mempool. func (mp *PriorityNonceMempool) CountTx() int { + mp.mtx.Lock() + defer mp.mtx.Unlock() return mp.priorityIndex.Len() } // Remove removes a transaction from the mempool in O(log n) time, returning an // error if unsuccessful. func (mp *PriorityNonceMempool) Remove(tx sdk.Tx) error { - sigs, err := tx.(signing.SigVerifiableTx).GetSignaturesV2() + mp.mtx.Lock() + defer mp.mtx.Unlock() + + txInfo, err := extractTxInfo(tx) if err != nil { return err } - var sender string - var nonce uint64 - if len(sigs) == 0 { - msgs := tx.GetMsgs() - if len(msgs) != 1 { - return fmt.Errorf("attempted to remove a tx with no signatures") - } - msgEthTx, ok := msgs[0].(*evmtypes.MsgEthereumTx) - if !ok { - return fmt.Errorf("attempted to remove a tx with no signatures") - } - ethTx := msgEthTx.AsTransaction() - signer := gethtypes.NewEIP2930Signer(ethTx.ChainId()) - ethSender, err := signer.Sender(ethTx) - if err != nil { - return fmt.Errorf("attempted to remove a tx with no signatures") - } - sender = sdk.AccAddress(ethSender.Bytes()).String() - nonce = ethTx.Nonce() - } else { - sig := sigs[0] - sender = sdk.AccAddress(sig.PubKey.Address()).String() - nonce = sig.Sequence - } - scoreKey := txMeta{nonce: nonce, sender: sender} + mp.decrSenderTxCnt(txInfo.Sender, txInfo.Nonce) + + scoreKey := txMeta{nonce: txInfo.Nonce, sender: txInfo.Sender} score, ok := mp.scores[scoreKey] if !ok { return mempool.ErrTxNotFound } - tk := txMeta{nonce: nonce, priority: score.priority, sender: sender, weight: score.weight} + tk := txMeta{nonce: txInfo.Nonce, priority: score.priority, sender: txInfo.Sender, weight: score.weight} - senderTxs, ok := mp.senderIndices[sender] + senderTxs, ok := mp.senderIndices[txInfo.Sender] if !ok { - return fmt.Errorf("sender %s not found", sender) + return fmt.Errorf("sender %s not found", txInfo.Sender) } mp.priorityIndex.Remove(tk) @@ -467,17 +592,76 @@ func (mp *PriorityNonceMempool) Remove(tx sdk.Tx) error { delete(mp.scores, scoreKey) mp.priorityCounts[score.priority]-- + return nil +} + +func (mp *PriorityNonceMempool) getLowestPriority() int64 { + if mp.priorityIndex.Len() == 0 { + return 0 + } + + min := int64(math.MaxInt64) + for priority, count := range mp.priorityCounts { + if count > 0 { + if priority < min { + min = priority + } + } + } + + return min +} + +func (mp *PriorityNonceMempool) canInsert(sender string) bool { + mp.senderTxCntLock.RLock() + defer mp.senderTxCntLock.RUnlock() + if _, exists := mp.counterBySender[sender]; exists { - if mp.counterBySender[sender] > 1 { - mp.counterBySender[sender] -= 1 + return mp.counterBySender[sender] < MAX_TXS_PRE_SENDER_IN_MEMPOOL + } + + return true +} + +func (mp *PriorityNonceMempool) incrSenderTxCnt(sender string, nonce uint64) error { + mp.senderTxCntLock.Lock() + defer mp.senderTxCntLock.Unlock() + + existsKey := txMeta{nonce: nonce, sender: sender} + if _, exists := mp.txRecord[existsKey]; !exists { + mp.txRecord[existsKey] = struct{}{} + + if _, exists := mp.counterBySender[sender]; !exists { + mp.counterBySender[sender] = 1 } else { - delete(mp.counterBySender, sender) + if mp.counterBySender[sender] < MAX_TXS_PRE_SENDER_IN_MEMPOOL { + mp.counterBySender[sender] += 1 + } else { + return fmt.Errorf("tx sender has too many txs in mempool") + } } } - return nil } +func (mp *PriorityNonceMempool) decrSenderTxCnt(sender string, nonce uint64) { + mp.senderTxCntLock.Lock() + defer mp.senderTxCntLock.Unlock() + + existsKey := txMeta{nonce: nonce, sender: sender} + if _, exists := mp.txRecord[existsKey]; exists { + delete(mp.txRecord, existsKey) + + if _, exists := mp.counterBySender[sender]; exists { + if mp.counterBySender[sender] > 1 { + mp.counterBySender[sender] -= 1 + } else { + delete(mp.counterBySender, sender) + } + } + } +} + func IsEmpty(mempool mempool.Mempool) error { mp := mempool.(*PriorityNonceMempool) if mp.priorityIndex.Len() != 0 { @@ -508,3 +692,44 @@ func IsEmpty(mempool mempool.Mempool) error { return nil } + +type TxInfo struct { + Sender string + Nonce uint64 + Tx sdk.Tx +} + +func extractTxInfo(tx sdk.Tx) (*TxInfo, error) { + var sender string + var nonce uint64 + + sigs, err := tx.(signing.SigVerifiableTx).GetSignaturesV2() + if err != nil { + return nil, err + } + + if len(sigs) == 0 { + msgs := tx.GetMsgs() + if len(msgs) != 1 { + return nil, fmt.Errorf("tx must have at least one signer") + } + msgEthTx, ok := msgs[0].(*evmtypes.MsgEthereumTx) + if !ok { + return nil, fmt.Errorf("tx must have at least one signer") + } + ethTx := msgEthTx.AsTransaction() + signer := gethtypes.NewEIP2930Signer(ethTx.ChainId()) + ethSender, err := signer.Sender(ethTx) + if err != nil { + return nil, fmt.Errorf("tx must have at least one signer") + } + sender = sdk.AccAddress(ethSender.Bytes()).String() + nonce = ethTx.Nonce() + } else { + sig := sigs[0] + sender = sdk.AccAddress(sig.PubKey.Address()).String() + nonce = sig.Sequence + } + + return &TxInfo{Sender: sender, Nonce: nonce, Tx: tx}, nil +} diff --git a/cmd/0gchaind/app.go b/cmd/0gchaind/app.go index f09ec847..a2c7ea4d 100644 --- a/cmd/0gchaind/app.go +++ b/cmd/0gchaind/app.go @@ -1,6 +1,7 @@ package main import ( + "context" "errors" "fmt" "io" @@ -19,6 +20,7 @@ import ( snapshottypes "github.com/cosmos/cosmos-sdk/snapshots/types" "github.com/cosmos/cosmos-sdk/store" sdk "github.com/cosmos/cosmos-sdk/types" + "github.com/cosmos/cosmos-sdk/x/auth/signing" "github.com/cosmos/cosmos-sdk/x/crisis" ethermintflags "github.com/evmos/ethermint/server/flags" "github.com/spf13/cast" @@ -26,6 +28,8 @@ import ( "github.com/0glabs/0g-chain/app" "github.com/0glabs/0g-chain/app/params" + gethtypes "github.com/ethereum/go-ethereum/core/types" + evmtypes "github.com/evmos/ethermint/x/evm/types" ) const ( @@ -34,6 +38,8 @@ const ( flagSkipLoadLatest = "skip-load-latest" ) +var accountNonceOp app.AccountNonceOp + // appCreator holds functions used by the sdk server to control the 0g-chain app. // The methods implement types in cosmos-sdk/server/types type appCreator struct { @@ -107,8 +113,6 @@ func (ac appCreator) newApp( skipLoadLatest = cast.ToBool(appOpts.Get(flagSkipLoadLatest)) } - mempool := app.NewPriorityMempool() - bApp := app.NewBaseApp(logger, db, ac.encodingConfig, baseapp.SetPruning(pruningOpts), baseapp.SetMinGasPrices(strings.Replace(cast.ToString(appOpts.Get(server.FlagMinGasPrices)), ";", ",", -1)), @@ -123,8 +127,32 @@ func (ac appCreator) newApp( baseapp.SetIAVLDisableFastNode(cast.ToBool(iavlDisableFastNode)), baseapp.SetIAVLLazyLoading(cast.ToBool(appOpts.Get(server.FlagIAVLLazyLoading))), baseapp.SetChainID(chainID), - baseapp.SetMempool(mempool), + baseapp.SetTxInfoExtracter(extractTxInfo), + ) + + mempool := app.NewPriorityMempool( + app.PriorityNonceWithMaxTx(fixMempoolSize(appOpts)), + app.PriorityNonceWithTxReplacedCallback(func(ctx context.Context, oldTx, newTx *app.TxInfo) { + if oldTx.Sender != newTx.Sender { + sdkContext := sdk.UnwrapSDKContext(ctx) + if accountNonceOp != nil { + nonce := accountNonceOp.GetAccountNonce(sdkContext, oldTx.Sender) + if nonce > 0 { + accountNonceOp.SetAccountNonce(sdkContext, oldTx.Sender, nonce-1) + sdkContext.Logger().Debug("rewind the nonce of the account", "account", oldTx.Sender, "from", nonce, "to", nonce-1) + } else { + sdkContext.Logger().Info("First meeting account", "account", oldTx.Sender) + } + } + } else { + sdkContext := sdk.UnwrapSDKContext(ctx) + sdkContext.Logger().Info("tx replace", "account", oldTx.Sender, "nonce", oldTx.Nonce) + } + bApp.RegisterMempoolTxReplacedEvent(ctx, oldTx.Tx, newTx.Tx) + }), ) + bApp.SetMempool(mempool) + bApp.SetTxEncoder(ac.encodingConfig.TxConfig.TxEncoder()) abciProposalHandler := app.NewDefaultProposalHandler(mempool, bApp) bApp.SetPrepareProposal(abciProposalHandler.PrepareProposalHandler()) @@ -144,6 +172,8 @@ func (ac appCreator) newApp( bApp, ) + accountNonceOp = app.NewAccountNonceOp(newApp) + return newApp } @@ -199,3 +229,72 @@ func accAddressesFromBech32(addresses ...string) ([]sdk.AccAddress, error) { } return decodedAddresses, nil } + +var ErrMustHaveSigner error = errors.New("tx must have at least one signer") + +func extractTxInfo(ctx sdk.Context, tx sdk.Tx) (*sdk.TxInfo, error) { + sigs, err := tx.(signing.SigVerifiableTx).GetSignaturesV2() + if err != nil { + return nil, err + } + + var sender string + var nonce uint64 + var gasPrice uint64 + var gasLimit uint64 + var txType int32 + + if len(sigs) == 0 { + txType = 1 + msgs := tx.GetMsgs() + if len(msgs) != 1 { + return nil, ErrMustHaveSigner + } + msgEthTx, ok := msgs[0].(*evmtypes.MsgEthereumTx) + if !ok { + return nil, ErrMustHaveSigner + } + ethTx := msgEthTx.AsTransaction() + signer := gethtypes.NewEIP2930Signer(ethTx.ChainId()) + ethSender, err := signer.Sender(ethTx) + if err != nil { + return nil, ErrMustHaveSigner + } + sender = sdk.AccAddress(ethSender.Bytes()).String() + nonce = ethTx.Nonce() + gasPrice = ethTx.GasPrice().Uint64() + gasLimit = ethTx.Gas() + } else { + sig := sigs[0] + sender = sdk.AccAddress(sig.PubKey.Address()).String() + nonce = sig.Sequence + } + + return &sdk.TxInfo{ + SignerAddress: sender, + Nonce: nonce, + GasLimit: gasLimit, + GasPrice: gasPrice, + TxType: txType, + }, nil +} + +func fixMempoolSize(appOpts servertypes.AppOptions) int { + val1 := appOpts.Get("mempool.size") + val2 := appOpts.Get(server.FlagMempoolMaxTxs) + + if val1 != nil && val2 != nil { + size1 := cast.ToInt(val1) + size2 := cast.ToInt(val2) + if size1 != size2 { + panic("the value of mempool.size and mempool.max-txs are different") + } + return size1 + } else if val1 == nil && val2 == nil { + panic("not found mempool size in config") + } else if val1 == nil { + return cast.ToInt(val2) + } else { //if val2 == nil { + return cast.ToInt(val1) + } +} diff --git a/go.mod b/go.mod index 15434cb9..591ce683 100644 --- a/go.mod +++ b/go.mod @@ -9,7 +9,6 @@ require ( cosmossdk.io/simapp v0.0.0-20231127212628-044ff4d8c015 github.com/Kava-Labs/opendb v0.0.0-20240719173129-a2f11f6d7e51 github.com/cenkalti/backoff/v4 v4.1.3 - github.com/cockroachdb/errors v1.11.1 github.com/cometbft/cometbft v0.37.9 github.com/cometbft/cometbft-db v0.9.1 github.com/coniks-sys/coniks-go v0.0.0-20180722014011-11acf4819b71 @@ -32,6 +31,7 @@ require ( github.com/huandu/skiplist v1.2.0 github.com/linxGnu/grocksdb v1.8.13 github.com/pelletier/go-toml/v2 v2.1.0 + github.com/pkg/errors v0.9.1 github.com/shopspring/decimal v1.4.0 github.com/spf13/cast v1.6.0 github.com/spf13/cobra v1.8.0 @@ -78,6 +78,7 @@ require ( github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/chzyer/readline v1.5.1 // indirect github.com/cockroachdb/apd/v2 v2.0.2 // indirect + github.com/cockroachdb/errors v1.11.1 // indirect github.com/cockroachdb/logtags v0.0.0-20230118201751-21c54148d20b // indirect github.com/cockroachdb/pebble v1.1.0 // indirect github.com/cockroachdb/redact v1.1.5 // indirect @@ -174,7 +175,6 @@ require ( github.com/olekukonko/tablewriter v0.0.5 // indirect github.com/patrickmn/go-cache v2.1.0+incompatible // indirect github.com/petermattis/goid v0.0.0-20231207134359-e60b3f734c67 // indirect - github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/prometheus/client_golang v1.14.0 // indirect github.com/prometheus/client_model v0.6.1 // indirect @@ -238,11 +238,11 @@ replace ( // Use the cosmos keyring code github.com/99designs/keyring => github.com/cosmos/keyring v1.2.0 // Use cometbft fork of tendermint - github.com/cometbft/cometbft => github.com/0glabs/cometbft v0.37.9-0glabs.1 + github.com/cometbft/cometbft => github.com/0glabs/cometbft v0.37.9-0glabs.3 github.com/cometbft/cometbft-db => github.com/kava-labs/cometbft-db v0.9.1-kava.2 // Use cosmos-sdk fork with backported fix for unsafe-reset-all, staking transfer events, and custom tally handler support // github.com/cosmos/cosmos-sdk => github.com/0glabs/cosmos-sdk v0.46.11-kava.3 - github.com/cosmos/cosmos-sdk => github.com/0glabs/cosmos-sdk v0.47.10-0glabs.10 + github.com/cosmos/cosmos-sdk => github.com/0glabs/cosmos-sdk v0.47.10-0glabs.12 github.com/cosmos/iavl => github.com/kava-labs/iavl v1.2.0-kava.1 // See https://github.com/cosmos/cosmos-sdk/pull/13093 github.com/dgrijalva/jwt-go => github.com/golang-jwt/jwt/v4 v4.4.2 @@ -250,7 +250,7 @@ replace ( // TODO: Tag before release github.com/ethereum/go-ethereum => github.com/evmos/go-ethereum v1.10.26-evmos-rc2 // Use ethermint fork that respects min-gas-price with NoBaseFee true and london enabled, and includes eip712 support - github.com/evmos/ethermint => github.com/0glabs/ethermint v0.21.0-0g.v3.1.12 + github.com/evmos/ethermint => github.com/0glabs/ethermint v0.21.0-0g.v3.1.15 // See https://github.com/cosmos/cosmos-sdk/pull/10401, https://github.com/cosmos/cosmos-sdk/commit/0592ba6158cd0bf49d894be1cef4faeec59e8320 github.com/gin-gonic/gin => github.com/gin-gonic/gin v1.9.0 // Downgraded to avoid bugs in following commits which causes "version does not exist" errors diff --git a/go.sum b/go.sum index f0821f5f..678504a3 100644 --- a/go.sum +++ b/go.sum @@ -209,12 +209,12 @@ filippo.io/edwards25519 v1.0.0 h1:0wAIcmJUqRdI8IJ/3eGi5/HwXZWPujYXXlkrQogz0Ek= filippo.io/edwards25519 v1.0.0/go.mod h1:N1IkdkCkiLB6tki+MYJoSx2JTY9NUlxZE7eHn5EwJns= git.sr.ht/~sircmpwn/getopt v0.0.0-20191230200459-23622cc906b3/go.mod h1:wMEGFFFNuPos7vHmWXfszqImLppbc0wEhh6JBfJIUgw= git.sr.ht/~sircmpwn/go-bare v0.0.0-20210406120253-ab86bc2846d9/go.mod h1:BVJwbDfVjCjoFiKrhkei6NdGcZYpkDkdyCdg1ukytRA= -github.com/0glabs/cometbft v0.37.9-0glabs.1 h1:KQJG17Y21suKP3QNICLto4b5Ak73XbSmKxeLbg0ZM68= -github.com/0glabs/cometbft v0.37.9-0glabs.1/go.mod h1:j0Q3RqrCd+cztWCugs3obbzC4NyHGBPZZjtm/fWV00I= -github.com/0glabs/cosmos-sdk v0.47.10-0glabs.10 h1:NJp0RwczHBO4EvrQdDxxftHOgUDBtNh7M/vpaG7wFtQ= -github.com/0glabs/cosmos-sdk v0.47.10-0glabs.10/go.mod h1:KskIVnhXTFqrw7CDccMvx7To5KzUsOomIsQV7sPGOog= -github.com/0glabs/ethermint v0.21.0-0g.v3.1.12 h1:IRVTFhDEH2J5w8ywQW7obXQxYhJYib70SNgKqLOXikU= -github.com/0glabs/ethermint v0.21.0-0g.v3.1.12/go.mod h1:6e/gOcDLhvlDWK3JLJVBgki0gD6H4E1eG7l9byocgWA= +github.com/0glabs/cometbft v0.37.9-0glabs.3 h1:sobMz3C+OdFYNRQ3degfCZUHUzyuSPUIZqVMYgDtJs4= +github.com/0glabs/cometbft v0.37.9-0glabs.3/go.mod h1:j0Q3RqrCd+cztWCugs3obbzC4NyHGBPZZjtm/fWV00I= +github.com/0glabs/cosmos-sdk v0.47.10-0glabs.12 h1:mVUhlaGUPn8izK6TfdXD13xakN8+HGl3Y349YF6Kgqc= +github.com/0glabs/cosmos-sdk v0.47.10-0glabs.12/go.mod h1:KskIVnhXTFqrw7CDccMvx7To5KzUsOomIsQV7sPGOog= +github.com/0glabs/ethermint v0.21.0-0g.v3.1.15 h1:j3GwMVy1bjOb7TNyH7v7qOUu5LRl6oruZECIx9W77J0= +github.com/0glabs/ethermint v0.21.0-0g.v3.1.15/go.mod h1:6e/gOcDLhvlDWK3JLJVBgki0gD6H4E1eG7l9byocgWA= github.com/99designs/go-keychain v0.0.0-20191008050251-8e49817e8af4 h1:/vQbFIOMbk2FiG/kXiLl8BRyzTWDw7gX/Hz7Dd5eDMs= github.com/99designs/go-keychain v0.0.0-20191008050251-8e49817e8af4/go.mod h1:hN7oaIRCjzsZ2dE+yG5k+rsdt3qcwykqK6HVGcKwsw4= github.com/Azure/azure-sdk-for-go/sdk/azcore v0.21.1/go.mod h1:fBF9PQNqB8scdgpZ3ufzaLntG0AG7C1WjPMsiFOmfHM=