Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: reannounce local pending transactions (#2) #2

Draft
wants to merge 1 commit into
base: semita-base
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/geth/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ var (
utils.TxPoolAccountQueueFlag,
utils.TxPoolGlobalQueueFlag,
utils.TxPoolLifetimeFlag,
utils.TxPoolReannounceTimeFlag,
utils.SyncModeFlag,
utils.SyncTargetFlag,
utils.ExitWhenSyncedFlag,
Expand Down
9 changes: 9 additions & 0 deletions cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,12 @@ var (
Value: ethconfig.Defaults.TxPool.Lifetime,
Category: flags.TxPoolCategory,
}
TxPoolReannounceTimeFlag = &cli.DurationFlag{
Name: "txpool.reannouncetime",
Usage: "Duration for announcing local pending transactions again (default = 10 years, minimum = 1 minute)",
Value: ethconfig.Defaults.TxPool.ReannounceTime,
Category: flags.TxPoolCategory,
}

// Performance tuning settings
CacheFlag = &cli.IntFlag{
Expand Down Expand Up @@ -1657,6 +1663,9 @@ func setTxPool(ctx *cli.Context, cfg *txpool.Config) {
if ctx.IsSet(TxPoolLifetimeFlag.Name) {
cfg.Lifetime = ctx.Duration(TxPoolLifetimeFlag.Name)
}
if ctx.IsSet(TxPoolReannounceTimeFlag.Name) {
cfg.ReannounceTime = ctx.Duration(TxPoolReannounceTimeFlag.Name)
}
}

func setEthash(ctx *cli.Context, cfg *ethconfig.Config) {
Expand Down
3 changes: 3 additions & 0 deletions core/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ import (
// NewTxsEvent is posted when a batch of transactions enter the transaction pool.
type NewTxsEvent struct{ Txs []*types.Transaction }

// ReannoTxsEvent is posted when a batch of local pending transactions exceed a specified duration.
type ReannoTxsEvent struct{ Txs []*types.Transaction }

// NewMinedBlockEvent is posted when a block has been imported.
type NewMinedBlockEvent struct{ Block *types.Block }

Expand Down
74 changes: 60 additions & 14 deletions core/txpool/txpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ const (
// more expensive to propagate; larger transactions also take more resources
// to validate whether they fit into the pool or not.
txMaxSize = 4 * txSlotSize // 128KB

// txReannoMaxNum is the maximum number of transactions a reannounce action can include.
txReannoMaxNum = 1024
)

var (
Expand Down Expand Up @@ -104,7 +107,8 @@ var (
// L1 Info Gas Overhead is the amount of gas the the L1 info deposit consumes.
// It is removed from the tx pool max gas to better indicate that L2 transactions
// are not able to consume all of the gas in a L2 block as the L1 info deposit is always present.
l1InfoGasOverhead = uint64(70_000)
l1InfoGasOverhead = uint64(70_000)
reannounceInterval = time.Minute // Time interval to check for reannounce transactions
)

var (
Expand Down Expand Up @@ -184,7 +188,8 @@ type Config struct {
AccountQueue uint64 // Maximum number of non-executable transaction slots permitted per account
GlobalQueue uint64 // Maximum number of non-executable transaction slots for all accounts

Lifetime time.Duration // Maximum amount of time non-executable transaction are queued
Lifetime time.Duration // Maximum amount of time non-executable transaction are queued
ReannounceTime time.Duration // Duration for announcing local pending transactions again
}

// DefaultConfig contains the default configurations for the transaction
Expand All @@ -201,7 +206,8 @@ var DefaultConfig = Config{
AccountQueue: 64,
GlobalQueue: 1024,

Lifetime: 3 * time.Hour,
Lifetime: 3 * time.Hour,
ReannounceTime: 10 * 365 * 24 * time.Hour,
}

// sanitize checks the provided user configurations and changes anything that's
Expand Down Expand Up @@ -240,6 +246,10 @@ func (config *Config) sanitize() Config {
log.Warn("Sanitizing invalid txpool lifetime", "provided", conf.Lifetime, "updated", DefaultConfig.Lifetime)
conf.Lifetime = DefaultConfig.Lifetime
}
if conf.ReannounceTime < time.Minute {
log.Warn("Sanitizing invalid txpool reannounce time", "provided", conf.ReannounceTime, "updated", time.Minute)
conf.ReannounceTime = time.Minute
}
return conf
}

Expand All @@ -251,14 +261,15 @@ func (config *Config) sanitize() Config {
// current state) and future transactions. Transactions move between those
// two states over time as they are received and processed.
type TxPool struct {
config Config
chainconfig *params.ChainConfig
chain blockChain
gasPrice *big.Int
txFeed event.Feed
scope event.SubscriptionScope
signer types.Signer
mu sync.RWMutex
config Config
chainconfig *params.ChainConfig
chain blockChain
gasPrice *big.Int
txFeed event.Feed
reannoTxFeed event.Feed // Event feed for announcing transactions again
scope event.SubscriptionScope
signer types.Signer
mu sync.RWMutex

istanbul atomic.Bool // Fork indicator whether we are in the istanbul stage.
eip2718 atomic.Bool // Fork indicator whether we are using EIP-2718 type transactions.
Expand Down Expand Up @@ -367,14 +378,16 @@ func (pool *TxPool) loop() {
var (
prevPending, prevQueued, prevStales int
// Start the stats reporting and transaction eviction tickers
report = time.NewTicker(statsReportInterval)
evict = time.NewTicker(evictionInterval)
journal = time.NewTicker(pool.config.Rejournal)
report = time.NewTicker(statsReportInterval)
evict = time.NewTicker(evictionInterval)
reannounce = time.NewTicker(reannounceInterval)
journal = time.NewTicker(pool.config.Rejournal)
// Track the previous head headers for transaction reorgs
head = pool.chain.CurrentBlock()
)
defer report.Stop()
defer evict.Stop()
defer reannounce.Stop()
defer journal.Stop()

// Notify tests that the init phase is done
Expand Down Expand Up @@ -424,6 +437,33 @@ func (pool *TxPool) loop() {
}
pool.mu.Unlock()

case <-reannounce.C:
pool.mu.RLock()
reannoTxs := func() []*types.Transaction {
txs := make([]*types.Transaction, 0)
for addr, list := range pool.pending {
if !pool.locals.contains(addr) {
continue
}

for _, tx := range list.Flatten() {
// Default ReannounceTime is 10 years, won't announce by default.
if time.Since(tx.Time()) < pool.config.ReannounceTime {
break
}
txs = append(txs, tx)
if len(txs) >= txReannoMaxNum {
return txs
}
}
}
return txs
}()
pool.mu.RUnlock()
if len(reannoTxs) > 0 {
pool.reannoTxFeed.Send(core.ReannoTxsEvent{reannoTxs})
}

// Handle local transaction journal rotation
case <-journal.C:
if pool.journal != nil {
Expand Down Expand Up @@ -458,6 +498,12 @@ func (pool *TxPool) SubscribeNewTxsEvent(ch chan<- core.NewTxsEvent) event.Subsc
return pool.scope.Track(pool.txFeed.Subscribe(ch))
}

// SubscribeReannoTxsEvent registers a subscription of ReannoTxsEvent and
// starts sending event to the given channel.
func (pool *TxPool) SubscribeReannoTxsEvent(ch chan<- core.ReannoTxsEvent) event.Subscription {
return pool.scope.Track(pool.reannoTxFeed.Subscribe(ch))
}

// GasPrice returns the current gas price enforced by the transaction pool.
func (pool *TxPool) GasPrice() *big.Int {
pool.mu.RLock()
Expand Down
5 changes: 5 additions & 0 deletions core/types/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,11 @@ type TxData interface {
effectiveGasPrice(dst *big.Int, baseFee *big.Int) *big.Int
}

// Time returns transaction's time
func (tx *Transaction) Time() time.Time {
return tx.time
}

// EncodeRLP implements rlp.Encoder
func (tx *Transaction) EncodeRLP(w io.Writer) error {
if tx.Type() == LegacyTxType {
Expand Down
44 changes: 44 additions & 0 deletions eth/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@ type txPool interface {
// SubscribeNewTxsEvent should return an event subscription of
// NewTxsEvent and send events to the given channel.
SubscribeNewTxsEvent(chan<- core.NewTxsEvent) event.Subscription

// SubscribeReannoTxsEvent should return an event subscription of
// ReannoTxsEvent and send events to the given channel.
SubscribeReannoTxsEvent(chan<- core.ReannoTxsEvent) event.Subscription
}

// handlerConfig is the collection of initialization parameters to create a full
Expand Down Expand Up @@ -116,6 +120,8 @@ type handler struct {
eventMux *event.TypeMux
txsCh chan core.NewTxsEvent
txsSub event.Subscription
reannoTxsCh chan core.ReannoTxsEvent
reannoTxsSub event.Subscription
minedBlockSub *event.TypeMuxSubscription

requiredBlocks map[uint64]common.Hash
Expand Down Expand Up @@ -552,6 +558,12 @@ func (h *handler) Start(maxPeers int) {
h.txsSub = h.txpool.SubscribeNewTxsEvent(h.txsCh)
go h.txBroadcastLoop()

// announce local pending transactions again
h.wg.Add(1)
h.reannoTxsCh = make(chan core.ReannoTxsEvent, txChanSize)
h.reannoTxsSub = h.txpool.SubscribeReannoTxsEvent(h.reannoTxsCh)
go h.txReannounceLoop()

// broadcast mined blocks
h.wg.Add(1)
h.minedBlockSub = h.eventMux.Subscribe(core.NewMinedBlockEvent{})
Expand All @@ -564,6 +576,7 @@ func (h *handler) Start(maxPeers int) {

func (h *handler) Stop() {
h.txsSub.Unsubscribe() // quits txBroadcastLoop
h.reannoTxsSub.Unsubscribe() // quits txReannounceLoop
h.minedBlockSub.Unsubscribe() // quits blockBroadcastLoop

// Quit chainSync and txsync64.
Expand Down Expand Up @@ -668,6 +681,24 @@ func (h *handler) BroadcastTransactions(txs types.Transactions) {
"tx packs", directPeers, "broadcast txs", directCount)
}

// ReannounceTransactions will announce a batch of local pending transactions
// to a square root of all peers.
func (h *handler) ReannounceTransactions(txs types.Transactions) {
hashes := make([]common.Hash, 0, txs.Len())
for _, tx := range txs {
hashes = append(hashes, tx.Hash())
}

// Announce transactions hash to a batch of peers
peersCount := uint(math.Sqrt(float64(h.peers.len())))
peers := h.peers.headPeers(peersCount)
for _, peer := range peers {
peer.AsyncSendPooledTransactionHashes(hashes)
}
log.Debug("Transaction reannounce", "txs", len(txs),
"announce packs", peersCount, "announced hashes", peersCount*uint(len(hashes)))
}

// minedBroadcastLoop sends mined blocks to connected peers.
func (h *handler) minedBroadcastLoop() {
defer h.wg.Done()
Expand All @@ -692,3 +723,16 @@ func (h *handler) txBroadcastLoop() {
}
}
}

// txReannounceLoop announces local pending transactions to connected peers again.
func (h *handler) txReannounceLoop() {
defer h.wg.Done()
for {
select {
case event := <-h.reannoTxsCh:
h.ReannounceTransactions(event.Txs)
case <-h.reannoTxsSub.Err():
return
}
}
}
53 changes: 53 additions & 0 deletions eth/handler_eth_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,59 @@ func testTransactionPropagation(t *testing.T, protocol uint) {
}
}

// Tests that local pending transactions get propagated to peers.
func TestTransactionPendingReannounce(t *testing.T) {
t.Parallel()

// Create a source handler to announce transactions from and a sink handler
// to receive them.
source := newTestHandler()
defer source.close()

sink := newTestHandler()
defer sink.close()
sink.handler.acceptTxs = 1 // mark synced to accept transactions

sourcePipe, sinkPipe := p2p.MsgPipe()
defer sourcePipe.Close()
defer sinkPipe.Close()

sourcePeer := eth.NewPeer(eth.ETH66, p2p.NewPeer(enode.ID{0}, "", nil), sourcePipe, source.txpool)
sinkPeer := eth.NewPeer(eth.ETH66, p2p.NewPeer(enode.ID{0}, "", nil), sinkPipe, sink.txpool)
defer sourcePeer.Close()
defer sinkPeer.Close()

go source.handler.runEthPeer(sourcePeer, func(peer *eth.Peer) error {
return eth.Handle((*ethHandler)(source.handler), peer)
})
go sink.handler.runEthPeer(sinkPeer, func(peer *eth.Peer) error {
return eth.Handle((*ethHandler)(sink.handler), peer)
})

// Subscribe transaction pools
txCh := make(chan core.NewTxsEvent, 1024)
sub := sink.txpool.SubscribeNewTxsEvent(txCh)
defer sub.Unsubscribe()

txs := make([]*types.Transaction, 64)
for nonce := range txs {
tx := types.NewTransaction(uint64(nonce), common.Address{}, big.NewInt(0), 100000, big.NewInt(0), nil)
tx, _ = types.SignTx(tx, types.HomesteadSigner{}, testKey)

txs[nonce] = tx
}
source.txpool.ReannouceTransactions(txs)

for arrived := 0; arrived < len(txs); {
select {
case event := <-txCh:
arrived += len(event.Txs)
case <-time.NewTimer(time.Second).C:
t.Errorf("sink: transaction propagation timed out: have %d, want %d", arrived, len(txs))
}
}
}

// Tests that post eth protocol handshake, clients perform a mutual checkpoint
// challenge to validate each other's chains. Hash mismatches, or missing ones
// during a fast sync should lead to the peer getting dropped.
Expand Down
23 changes: 21 additions & 2 deletions eth/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,9 @@ var (
type testTxPool struct {
pool map[common.Hash]*types.Transaction // Hash map of collected transactions

txFeed event.Feed // Notification feed to allow waiting for inclusion
lock sync.RWMutex // Protects the transaction pool
txFeed event.Feed // Notification feed to allow waiting for inclusion
reannoTxFeed event.Feed // Notification feed to trigger reannouce
lock sync.RWMutex // Protects the transaction pool
}

// newTestTxPool creates a mock transaction pool.
Expand Down Expand Up @@ -91,6 +92,18 @@ func (p *testTxPool) AddRemotes(txs []*types.Transaction) []error {
return make([]error, len(txs))
}

// ReannouceTransactions announce the transactions to some peers.
func (p *testTxPool) ReannouceTransactions(txs []*types.Transaction) []error {
p.lock.Lock()
defer p.lock.Unlock()

for _, tx := range txs {
p.pool[tx.Hash()] = tx
}
p.reannoTxFeed.Send(core.ReannoTxsEvent{Txs: txs})
return make([]error, len(txs))
}

// Pending returns all the transactions known to the pool
func (p *testTxPool) Pending(enforceTips bool) map[common.Address]types.Transactions {
p.lock.RLock()
Expand All @@ -113,6 +126,12 @@ func (p *testTxPool) SubscribeNewTxsEvent(ch chan<- core.NewTxsEvent) event.Subs
return p.txFeed.Subscribe(ch)
}

// SubscribeReannoTxsEvent should return an event subscription of ReannoTxsEvent and
// send events to the given channel.
func (p *testTxPool) SubscribeReannoTxsEvent(ch chan<- core.ReannoTxsEvent) event.Subscription {
return p.reannoTxFeed.Subscribe(ch)
}

// testHandler is a live implementation of the Ethereum protocol handler, just
// preinitialized with some sane testing defaults and the transaction pool mocked
// out.
Expand Down
Loading