Skip to content

Commit 1d1d65b

Browse files
authored
fix(mempool/v1): data race (#1655)
Closes celestiaorg/celestia-app#4379 I don't know if this fixes all the issues with the v1 mempool but it should fix the data race reported in the issue. We had to move the mempool lock earlier in `CheckTx` before this map look up ```go txmp.txByKey[txKey] ``` ## Testing ```shell # Before $ go test ./mempool/v1 -race -run TestConcurrentCheckTxDataRace # github.com/tendermint/tendermint/mempool/v1.test ================== WARNING: DATA RACE Write at 0x00c0005987e0 by goroutine 89: # After $ go test ./mempool/v1 -race -run TestConcurrentCheckTxDataRace # github.com/tendermint/tendermint/mempool/v1.test ok github.com/tendermint/tendermint/mempool/v1 (cached) ```
1 parent 6611cb0 commit 1d1d65b

File tree

2 files changed

+89
-5
lines changed

2 files changed

+89
-5
lines changed

mempool/v1/mempool.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,11 @@ func (txmp *TxMempool) CheckTx(tx types.Tx, cb func(*abci.Response), txInfo memp
205205

206206
txKey := tx.Key()
207207

208+
// At this point, we need to ensure that passing CheckTx and adding to
209+
// the mempool is atomic.
210+
txmp.Lock()
211+
defer txmp.Unlock()
212+
208213
// Check for the transaction in the cache.
209214
if !txmp.cache.Push(tx) {
210215
// If the cached transaction is also in the pool, record its sender.
@@ -216,11 +221,6 @@ func (txmp *TxMempool) CheckTx(tx types.Tx, cb func(*abci.Response), txInfo memp
216221
return mempool.ErrTxInCache
217222
}
218223

219-
// At this point, we need to ensure that passing CheckTx and adding to
220-
// the mempool is atomic.
221-
txmp.Lock()
222-
defer txmp.Unlock()
223-
224224
// Invoke an ABCI CheckTx for this transaction.
225225
rsp, err := txmp.proxyAppConn.CheckTxSync(abci.RequestCheckTx{Tx: tx})
226226
if err != nil {

mempool/v1/mempool_test.go

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -748,3 +748,87 @@ func abciResponses(n int, code uint32) []*abci.ResponseDeliverTx {
748748
}
749749
return responses
750750
}
751+
752+
// TestConcurrentCheckTxPanic reproduces the concurrent map read/write panic
753+
// that happens when multiple goroutines call CheckTx simultaneously.
754+
// This test should be run with the race detector enabled.
755+
//
756+
// See https://github.com/celestiaorg/celestia-app/issues/4379
757+
func TestConcurrentCheckTxDataRace(t *testing.T) {
758+
// Skip in short mode as this test is timing-dependent
759+
if testing.Short() {
760+
t.Skip("skipping in short mode")
761+
}
762+
763+
// Use a WaitGroup to ensure all goroutines finish
764+
var wg sync.WaitGroup
765+
766+
wg.Add(1)
767+
go func() {
768+
defer wg.Done()
769+
770+
// Create a mempool for testing
771+
app := kvstore.NewApplication()
772+
cc := proxy.NewLocalClientCreator(app)
773+
appConnMem, _ := cc.NewABCIClient()
774+
err := appConnMem.Start()
775+
require.NoError(t, err)
776+
defer func() {
777+
err := appConnMem.Stop()
778+
require.NoError(t, err)
779+
}()
780+
781+
logger := log.TestingLogger()
782+
cfg := config.DefaultMempoolConfig()
783+
txmp := NewTxMempool(logger, cfg, appConnMem, 0)
784+
785+
// Create test transactions
786+
tx1 := types.Tx("test_transaction_1")
787+
tx2 := types.Tx("test_transaction_2")
788+
tx3 := types.Tx("test_transaction_3")
789+
790+
// First, add tx1 to both the cache and mempool so it shows up in txByKey
791+
_ = txmp.CheckTx(tx1, nil, mempool.TxInfo{})
792+
793+
// Create multiple goroutines that all try to check the same transaction
794+
// simultaneously, which should trigger the race condition
795+
numGoroutines := 100
796+
var startWg sync.WaitGroup
797+
var startCh = make(chan struct{})
798+
799+
for i := 0; i < numGoroutines; i++ {
800+
startWg.Add(1)
801+
802+
go func(id int) {
803+
startWg.Done()
804+
// Wait for the signal to start
805+
<-startCh
806+
807+
// Use different transactions for different goroutines
808+
// but ensure some overlap to trigger the race
809+
var tx types.Tx
810+
if id%3 == 0 {
811+
tx = tx1 // Already in cache, will access txByKey
812+
} else if id%3 == 1 {
813+
tx = tx2 // New transaction
814+
} else {
815+
tx = tx3 // New transaction
816+
}
817+
818+
// This will race with other goroutines
819+
_ = txmp.CheckTx(tx, nil, mempool.TxInfo{})
820+
}(i)
821+
}
822+
823+
// Wait for all goroutines to be ready
824+
startWg.Wait()
825+
// Signal all goroutines to start simultaneously
826+
close(startCh)
827+
828+
// Wait a short time for the panic to occur
829+
time.Sleep(1 * time.Second)
830+
}()
831+
832+
// Wait for the test goroutine to complete
833+
wg.Wait()
834+
}

0 commit comments

Comments
 (0)