Skip to content

Commit a8a9c8e

Browse files
authored
core, eth, miner: start propagating and consuming blob txs (ethereum#28243)
* core, eth, miner: start propagating and consuming blob txs * eth/protocols/eth: disable eth/67 if Cancun is enabled * core/txpool, eth, miner: pass gas limit infos in lazy tx for mienr filtering * core/txpool, miner: add lazy resolver for pending txs too * core, eth: fix review noticed bugs * eth, miner: minor polishes in the mining and announcing logs * core/expool: unsubscribe the event scope
1 parent bc6d184 commit a8a9c8e

14 files changed

+145
-73
lines changed

Diff for: core/txpool/blobpool/blobpool.go

+40-15
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,8 @@ type blobTxMeta struct {
9797
execTipCap *uint256.Int // Needed to prioritize inclusion order across accounts and validate replacement price bump
9898
execFeeCap *uint256.Int // Needed to validate replacement price bump
9999
blobFeeCap *uint256.Int // Needed to validate replacement price bump
100+
execGas uint64 // Needed to check inclusion validity before reading the blob
101+
blobGas uint64 // Needed to check inclusion validity before reading the blob
100102

101103
basefeeJumps float64 // Absolute number of 1559 fee adjustments needed to reach the tx's fee cap
102104
blobfeeJumps float64 // Absolute number of 4844 fee adjustments needed to reach the tx's blob fee cap
@@ -118,6 +120,8 @@ func newBlobTxMeta(id uint64, size uint32, tx *types.Transaction) *blobTxMeta {
118120
execTipCap: uint256.MustFromBig(tx.GasTipCap()),
119121
execFeeCap: uint256.MustFromBig(tx.GasFeeCap()),
120122
blobFeeCap: uint256.MustFromBig(tx.BlobGasFeeCap()),
123+
execGas: tx.Gas(),
124+
blobGas: tx.BlobGas(),
121125
}
122126
meta.basefeeJumps = dynamicFeeJumps(meta.execFeeCap)
123127
meta.blobfeeJumps = dynamicFeeJumps(meta.blobFeeCap)
@@ -307,8 +311,8 @@ type BlobPool struct {
307311
spent map[common.Address]*uint256.Int // Expenditure tracking for individual accounts
308312
evict *evictHeap // Heap of cheapest accounts for eviction when full
309313

310-
eventFeed event.Feed // Event feed to send out new tx events on pool inclusion
311-
eventScope event.SubscriptionScope // Event scope to track and mass unsubscribe on termination
314+
discoverFeed event.Feed // Event feed to send out new tx events on pool discovery (reorg excluded)
315+
insertFeed event.Feed // Event feed to send out new tx events on pool inclusion (reorg included)
312316

313317
lock sync.RWMutex // Mutex protecting the pool during reorg handling
314318
}
@@ -436,8 +440,6 @@ func (p *BlobPool) Close() error {
436440
if err := p.store.Close(); err != nil {
437441
errs = append(errs, err)
438442
}
439-
p.eventScope.Close()
440-
441443
switch {
442444
case errs == nil:
443445
return nil
@@ -758,15 +760,21 @@ func (p *BlobPool) Reset(oldHead, newHead *types.Header) {
758760
// Run the reorg between the old and new head and figure out which accounts
759761
// need to be rechecked and which transactions need to be readded
760762
if reinject, inclusions := p.reorg(oldHead, newHead); reinject != nil {
763+
var adds []*types.Transaction
761764
for addr, txs := range reinject {
762765
// Blindly push all the lost transactions back into the pool
763766
for _, tx := range txs {
764-
p.reinject(addr, tx.Hash())
767+
if err := p.reinject(addr, tx.Hash()); err == nil {
768+
adds = append(adds, tx.WithoutBlobTxSidecar())
769+
}
765770
}
766771
// Recheck the account's pooled transactions to drop included and
767772
// invalidated one
768773
p.recheck(addr, inclusions)
769774
}
775+
if len(adds) > 0 {
776+
p.insertFeed.Send(core.NewTxsEvent{Txs: adds})
777+
}
770778
}
771779
// Flush out any blobs from limbo that are older than the latest finality
772780
if p.chain.Config().IsCancun(p.head.Number, p.head.Time) {
@@ -921,13 +929,13 @@ func (p *BlobPool) reorg(oldHead, newHead *types.Header) (map[common.Address][]*
921929
// Note, the method will not initialize the eviction cache values as those will
922930
// be done once for all transactions belonging to an account after all individual
923931
// transactions are injected back into the pool.
924-
func (p *BlobPool) reinject(addr common.Address, txhash common.Hash) {
932+
func (p *BlobPool) reinject(addr common.Address, txhash common.Hash) error {
925933
// Retrieve the associated blob from the limbo. Without the blobs, we cannot
926934
// add the transaction back into the pool as it is not mineable.
927935
tx, err := p.limbo.pull(txhash)
928936
if err != nil {
929937
log.Error("Blobs unavailable, dropping reorged tx", "err", err)
930-
return
938+
return err
931939
}
932940
// TODO: seems like an easy optimization here would be getting the serialized tx
933941
// from limbo instead of re-serializing it here.
@@ -936,20 +944,20 @@ func (p *BlobPool) reinject(addr common.Address, txhash common.Hash) {
936944
blob, err := rlp.EncodeToBytes(tx)
937945
if err != nil {
938946
log.Error("Failed to encode transaction for storage", "hash", tx.Hash(), "err", err)
939-
return
947+
return err
940948
}
941949
id, err := p.store.Put(blob)
942950
if err != nil {
943951
log.Error("Failed to write transaction into storage", "hash", tx.Hash(), "err", err)
944-
return
952+
return err
945953
}
946954

947955
// Update the indixes and metrics
948956
meta := newBlobTxMeta(id, p.store.Size(id), tx)
949957
if _, ok := p.index[addr]; !ok {
950958
if err := p.reserve(addr, true); err != nil {
951959
log.Warn("Failed to reserve account for blob pool", "tx", tx.Hash(), "from", addr, "err", err)
952-
return
960+
return err
953961
}
954962
p.index[addr] = []*blobTxMeta{meta}
955963
p.spent[addr] = meta.costCap
@@ -960,6 +968,7 @@ func (p *BlobPool) reinject(addr common.Address, txhash common.Hash) {
960968
}
961969
p.lookup[meta.hash] = meta.id
962970
p.stored += uint64(meta.size)
971+
return nil
963972
}
964973

965974
// SetGasTip implements txpool.SubPool, allowing the blob pool's gas requirements
@@ -1154,9 +1163,19 @@ func (p *BlobPool) Get(hash common.Hash) *types.Transaction {
11541163
// Add inserts a set of blob transactions into the pool if they pass validation (both
11551164
// consensus validity and pool restictions).
11561165
func (p *BlobPool) Add(txs []*types.Transaction, local bool, sync bool) []error {
1157-
errs := make([]error, len(txs))
1166+
var (
1167+
adds = make([]*types.Transaction, 0, len(txs))
1168+
errs = make([]error, len(txs))
1169+
)
11581170
for i, tx := range txs {
11591171
errs[i] = p.add(tx)
1172+
if errs[i] == nil {
1173+
adds = append(adds, tx.WithoutBlobTxSidecar())
1174+
}
1175+
}
1176+
if len(adds) > 0 {
1177+
p.discoverFeed.Send(core.NewTxsEvent{Txs: adds})
1178+
p.insertFeed.Send(core.NewTxsEvent{Txs: adds})
11601179
}
11611180
return errs
11621181
}
@@ -1384,6 +1403,8 @@ func (p *BlobPool) Pending(enforceTips bool) map[common.Address][]*txpool.LazyTr
13841403
Time: time.Now(), // TODO(karalabe): Maybe save these and use that?
13851404
GasFeeCap: tx.execFeeCap.ToBig(),
13861405
GasTipCap: tx.execTipCap.ToBig(),
1406+
Gas: tx.execGas,
1407+
BlobGas: tx.blobGas,
13871408
})
13881409
}
13891410
if len(lazies) > 0 {
@@ -1468,10 +1489,14 @@ func (p *BlobPool) updateLimboMetrics() {
14681489
limboSlotusedGauge.Update(int64(slotused))
14691490
}
14701491

1471-
// SubscribeTransactions registers a subscription of NewTxsEvent and
1472-
// starts sending event to the given channel.
1473-
func (p *BlobPool) SubscribeTransactions(ch chan<- core.NewTxsEvent) event.Subscription {
1474-
return p.eventScope.Track(p.eventFeed.Subscribe(ch))
1492+
// SubscribeTransactions registers a subscription for new transaction events,
1493+
// supporting feeding only newly seen or also resurrected transactions.
1494+
func (p *BlobPool) SubscribeTransactions(ch chan<- core.NewTxsEvent, reorgs bool) event.Subscription {
1495+
if reorgs {
1496+
return p.insertFeed.Subscribe(ch)
1497+
} else {
1498+
return p.discoverFeed.Subscribe(ch)
1499+
}
14751500
}
14761501

14771502
// Nonce returns the next nonce of an account, with all transactions executable

Diff for: core/txpool/legacypool/legacypool.go

+10-8
Original file line numberDiff line numberDiff line change
@@ -208,7 +208,6 @@ type LegacyPool struct {
208208
chain BlockChain
209209
gasTip atomic.Pointer[big.Int]
210210
txFeed event.Feed
211-
scope event.SubscriptionScope
212211
signer types.Signer
213212
mu sync.RWMutex
214213

@@ -404,9 +403,6 @@ func (pool *LegacyPool) loop() {
404403

405404
// Close terminates the transaction pool.
406405
func (pool *LegacyPool) Close() error {
407-
// Unsubscribe all subscriptions registered from txpool
408-
pool.scope.Close()
409-
410406
// Terminate the pool reorger and return
411407
close(pool.reorgShutdownCh)
412408
pool.wg.Wait()
@@ -425,10 +421,14 @@ func (pool *LegacyPool) Reset(oldHead, newHead *types.Header) {
425421
<-wait
426422
}
427423

428-
// SubscribeTransactions registers a subscription of NewTxsEvent and
429-
// starts sending event to the given channel.
430-
func (pool *LegacyPool) SubscribeTransactions(ch chan<- core.NewTxsEvent) event.Subscription {
431-
return pool.scope.Track(pool.txFeed.Subscribe(ch))
424+
// SubscribeTransactions registers a subscription for new transaction events,
425+
// supporting feeding only newly seen or also resurrected transactions.
426+
func (pool *LegacyPool) SubscribeTransactions(ch chan<- core.NewTxsEvent, reorgs bool) event.Subscription {
427+
// The legacy pool has a very messed up internal shuffling, so it's kind of
428+
// hard to separate newly discovered transaction from resurrected ones. This
429+
// is because the new txs are added to the queue, resurrected ones too and
430+
// reorgs run lazily, so separating the two would need a marker.
431+
return pool.txFeed.Subscribe(ch)
432432
}
433433

434434
// SetGasTip updates the minimum gas tip required by the transaction pool for a
@@ -552,6 +552,8 @@ func (pool *LegacyPool) Pending(enforceTips bool) map[common.Address][]*txpool.L
552552
Time: txs[i].Time(),
553553
GasFeeCap: txs[i].GasFeeCap(),
554554
GasTipCap: txs[i].GasTipCap(),
555+
Gas: txs[i].Gas(),
556+
BlobGas: txs[i].BlobGas(),
555557
}
556558
}
557559
pending[addr] = lazies

Diff for: core/txpool/subpool.go

+16-3
Original file line numberDiff line numberDiff line change
@@ -30,13 +30,16 @@ import (
3030
// enough for the miner and other APIs to handle large batches of transactions;
3131
// and supports pulling up the entire transaction when really needed.
3232
type LazyTransaction struct {
33-
Pool SubPool // Transaction subpool to pull the real transaction up
33+
Pool LazyResolver // Transaction resolver to pull the real transaction up
3434
Hash common.Hash // Transaction hash to pull up if needed
3535
Tx *types.Transaction // Transaction if already resolved
3636

3737
Time time.Time // Time when the transaction was first seen
3838
GasFeeCap *big.Int // Maximum fee per gas the transaction may consume
3939
GasTipCap *big.Int // Maximum miner tip per gas the transaction can pay
40+
41+
Gas uint64 // Amount of gas required by the transaction
42+
BlobGas uint64 // Amount of blob gas required by the transaction
4043
}
4144

4245
// Resolve retrieves the full transaction belonging to a lazy handle if it is still
@@ -48,6 +51,14 @@ func (ltx *LazyTransaction) Resolve() *types.Transaction {
4851
return ltx.Tx
4952
}
5053

54+
// LazyResolver is a minimal interface needed for a transaction pool to satisfy
55+
// resolving lazy transactions. It's mostly a helper to avoid the entire sub-
56+
// pool being injected into the lazy transaction.
57+
type LazyResolver interface {
58+
// Get returns a transaction if it is contained in the pool, or nil otherwise.
59+
Get(hash common.Hash) *types.Transaction
60+
}
61+
5162
// AddressReserver is passed by the main transaction pool to subpools, so they
5263
// may request (and relinquish) exclusive access to certain addresses.
5364
type AddressReserver func(addr common.Address, reserve bool) error
@@ -99,8 +110,10 @@ type SubPool interface {
99110
// account and sorted by nonce.
100111
Pending(enforceTips bool) map[common.Address][]*LazyTransaction
101112

102-
// SubscribeTransactions subscribes to new transaction events.
103-
SubscribeTransactions(ch chan<- core.NewTxsEvent) event.Subscription
113+
// SubscribeTransactions subscribes to new transaction events. The subscriber
114+
// can decide whether to receive notifications only for newly seen transactions
115+
// or also for reorged out ones.
116+
SubscribeTransactions(ch chan<- core.NewTxsEvent, reorgs bool) event.Subscription
104117

105118
// Nonce returns the next nonce of an account, with all transactions executable
106119
// by the pool already applied on top.

Diff for: core/txpool/txpool.go

+7-5
Original file line numberDiff line numberDiff line change
@@ -155,13 +155,15 @@ func (p *TxPool) Close() error {
155155
if err := <-errc; err != nil {
156156
errs = append(errs, err)
157157
}
158-
159158
// Terminate each subpool
160159
for _, subpool := range p.subpools {
161160
if err := subpool.Close(); err != nil {
162161
errs = append(errs, err)
163162
}
164163
}
164+
// Unsubscribe anyone still listening for tx events
165+
p.subs.Close()
166+
165167
if len(errs) > 0 {
166168
return fmt.Errorf("subpool close errors: %v", errs)
167169
}
@@ -316,12 +318,12 @@ func (p *TxPool) Pending(enforceTips bool) map[common.Address][]*LazyTransaction
316318
return txs
317319
}
318320

319-
// SubscribeNewTxsEvent registers a subscription of NewTxsEvent and starts sending
320-
// events to the given channel.
321-
func (p *TxPool) SubscribeNewTxsEvent(ch chan<- core.NewTxsEvent) event.Subscription {
321+
// SubscribeTransactions registers a subscription for new transaction events,
322+
// supporting feeding only newly seen or also resurrected transactions.
323+
func (p *TxPool) SubscribeTransactions(ch chan<- core.NewTxsEvent, reorgs bool) event.Subscription {
322324
subs := make([]event.Subscription, len(p.subpools))
323325
for i, subpool := range p.subpools {
324-
subs[i] = subpool.SubscribeTransactions(ch)
326+
subs[i] = subpool.SubscribeTransactions(ch, reorgs)
325327
}
326328
return p.subs.Track(event.JoinSubscriptions(subs...))
327329
}

Diff for: eth/api_backend.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -334,7 +334,7 @@ func (b *EthAPIBackend) TxPool() *txpool.TxPool {
334334
}
335335

336336
func (b *EthAPIBackend) SubscribeNewTxsEvent(ch chan<- core.NewTxsEvent) event.Subscription {
337-
return b.eth.txPool.SubscribeNewTxsEvent(ch)
337+
return b.eth.txPool.SubscribeTransactions(ch, true)
338338
}
339339

340340
func (b *EthAPIBackend) SyncProgress() ethereum.SyncProgress {

Diff for: eth/catalyst/simulated_beacon.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,7 @@ func (c *SimulatedBeacon) sealBlock(withdrawals []*types.Withdrawal) error {
199199
func (c *SimulatedBeacon) loopOnDemand() {
200200
var (
201201
newTxs = make(chan core.NewTxsEvent)
202-
sub = c.eth.TxPool().SubscribeNewTxsEvent(newTxs)
202+
sub = c.eth.TxPool().SubscribeTransactions(newTxs, true)
203203
)
204204
defer sub.Unsubscribe()
205205

Diff for: eth/handler.go

+24-17
Original file line numberDiff line numberDiff line change
@@ -75,9 +75,10 @@ type txPool interface {
7575
// The slice should be modifiable by the caller.
7676
Pending(enforceTips bool) map[common.Address][]*txpool.LazyTransaction
7777

78-
// SubscribeNewTxsEvent should return an event subscription of
79-
// NewTxsEvent and send events to the given channel.
80-
SubscribeNewTxsEvent(chan<- core.NewTxsEvent) event.Subscription
78+
// SubscribeTransactions subscribes to new transaction events. The subscriber
79+
// can decide whether to receive notifications only for newly seen transactions
80+
// or also for reorged out ones.
81+
SubscribeTransactions(ch chan<- core.NewTxsEvent, reorgs bool) event.Subscription
8182
}
8283

8384
// handlerConfig is the collection of initialization parameters to create a full
@@ -509,10 +510,10 @@ func (h *handler) unregisterPeer(id string) {
509510
func (h *handler) Start(maxPeers int) {
510511
h.maxPeers = maxPeers
511512

512-
// broadcast transactions
513+
// broadcast and announce transactions (only new ones, not resurrected ones)
513514
h.wg.Add(1)
514515
h.txsCh = make(chan core.NewTxsEvent, txChanSize)
515-
h.txsSub = h.txpool.SubscribeNewTxsEvent(h.txsCh)
516+
h.txsSub = h.txpool.SubscribeTransactions(h.txsCh, false)
516517
go h.txBroadcastLoop()
517518

518519
// broadcast mined blocks
@@ -592,26 +593,33 @@ func (h *handler) BroadcastBlock(block *types.Block, propagate bool) {
592593
}
593594

594595
// BroadcastTransactions will propagate a batch of transactions
595-
// - To a square root of all peers
596+
// - To a square root of all peers for non-blob transactions
596597
// - And, separately, as announcements to all peers which are not known to
597598
// already have the given transaction.
598599
func (h *handler) BroadcastTransactions(txs types.Transactions) {
599600
var (
600-
annoCount int // Count of announcements made
601-
annoPeers int
602-
directCount int // Count of the txs sent directly to peers
603-
directPeers int // Count of the peers that were sent transactions directly
601+
blobTxs int // Number of blob transactions to announce only
602+
largeTxs int // Number of large transactions to announce only
603+
604+
directCount int // Number of transactions sent directly to peers (duplicates included)
605+
directPeers int // Number of peers that were sent transactions directly
606+
annCount int // Number of transactions announced across all peers (duplicates included)
607+
annPeers int // Number of peers announced about transactions
604608

605609
txset = make(map[*ethPeer][]common.Hash) // Set peer->hash to transfer directly
606610
annos = make(map[*ethPeer][]common.Hash) // Set peer->hash to announce
607-
608611
)
609612
// Broadcast transactions to a batch of peers not knowing about it
610613
for _, tx := range txs {
611614
peers := h.peers.peersWithoutTransaction(tx.Hash())
612615

613616
var numDirect int
614-
if tx.Size() <= txMaxBroadcastSize {
617+
switch {
618+
case tx.Type() == types.BlobTxType:
619+
blobTxs++
620+
case tx.Size() > txMaxBroadcastSize:
621+
largeTxs++
622+
default:
615623
numDirect = int(math.Sqrt(float64(len(peers))))
616624
}
617625
// Send the tx unconditionally to a subset of our peers
@@ -629,13 +637,12 @@ func (h *handler) BroadcastTransactions(txs types.Transactions) {
629637
peer.AsyncSendTransactions(hashes)
630638
}
631639
for peer, hashes := range annos {
632-
annoPeers++
633-
annoCount += len(hashes)
640+
annPeers++
641+
annCount += len(hashes)
634642
peer.AsyncSendPooledTransactionHashes(hashes)
635643
}
636-
log.Debug("Transaction broadcast", "txs", len(txs),
637-
"announce packs", annoPeers, "announced hashes", annoCount,
638-
"tx packs", directPeers, "broadcast txs", directCount)
644+
log.Debug("Distributed transactions", "plaintxs", len(txs)-blobTxs-largeTxs, "blobtxs", blobTxs, "largetxs", largeTxs,
645+
"bcastpeers", directPeers, "bcastcount", directCount, "annpeers", annPeers, "anncount", annCount)
639646
}
640647

641648
// minedBroadcastLoop sends mined blocks to connected peers.

0 commit comments

Comments
 (0)