Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Send new transactions to remote mempools immediately #2022

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions src/Chainweb/Chainweb/MempoolSyncClient.hs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,9 @@ runMempoolSyncClient mgr memP2pConfig peerRes chain = bracket create destroy go
create = do
logg Debug "starting mempool p2p sync"
p2pCreateNode v netId peer (logFunction syncLogger) peerDb mgr True $
mempoolSyncP2pSession chain (_mempoolP2pConfigPollInterval memP2pConfig)
mempoolSyncP2pSession chain
(_mempoolP2pConfigPollInterval memP2pConfig)
(_mempoolP2pConfigSendNewTxsDelay memP2pConfig)
go n = do
-- Run P2P client node
logg Debug "mempool sync p2p node initialized, starting session"
Expand All @@ -91,18 +93,19 @@ runMempoolSyncClient mgr memP2pConfig peerRes chain = bracket create destroy go
mempoolSyncP2pSession
:: ChainResources logger
-> Seconds
-> Micros
-> P2pSession
mempoolSyncP2pSession chain (Seconds pollInterval) logg0 env _ = do
mempoolSyncP2pSession chain (Seconds pollInterval) (Micros sendNewTxsDelayMicros) logg0 env _ = do
logg Debug "mempool sync session starting"
Mempool.syncMempools' logg syncIntervalUs pool peerMempool
Mempool.syncMempools' logg syncIntervalMicros (int sendNewTxsDelayMicros) pool peerMempool
logg Debug "mempool sync session finished"
return True
where
peerMempool = MPC.toMempool v cid txcfg env

-- FIXME Potentially dangerous down-cast.
syncIntervalUs :: Int
syncIntervalUs = int pollInterval * 500000
syncIntervalMicros :: Int
syncIntervalMicros = int pollInterval * 500000

remote = T.pack $ Sv.showBaseUrl $ Sv.baseUrl env
logg d m = logg0 d $ T.concat ["[mempool sync@", remote, "]:", m]
Expand Down
32 changes: 24 additions & 8 deletions src/Chainweb/Mempool/InMem.hs
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,15 @@ module Chainweb.Mempool.InMem
import Control.Applicative ((<|>))
import Control.Concurrent.Async
import Control.Concurrent.MVar
import Control.Concurrent.STM
import Control.DeepSeq
import Control.Error.Util (hush)
import Control.Exception (bracket, evaluate, mask_, throw)
import Control.Monad

import Data.Aeson
import Data.Bifunctor (bimap)
import Data.ByteString (ByteString)
import qualified Data.ByteString.Short as SB
import Data.Decimal
#if MIN_VERSION_base(4,20,0)
Expand Down Expand Up @@ -79,7 +81,6 @@ import Chainweb.Version (ChainwebVersion)
import qualified Pact.Types.ChainMeta as P

import Numeric.AffineSpace
import Data.ByteString (ByteString)

------------------------------------------------------------------------------
compareOnGasPrice :: TransactionConfig t -> t -> t -> Ordering
Expand All @@ -96,12 +97,12 @@ makeInMemPool :: InMemConfig t
makeInMemPool cfg = mask_ $ do
nonce <- randomIO
dataLock <- newInMemMempoolData >>= newMVar
return $! InMemoryMempool cfg dataLock nonce
newTxsVar <- newTVarIO []
return $! InMemoryMempool cfg dataLock newTxsVar nonce

destroyInMemPool :: InMemoryMempool t -> IO ()
destroyInMemPool = const $ return ()


------------------------------------------------------------------------------
newInMemMempoolData :: IO (InMemoryMempoolData t)
newInMemMempoolData =
Expand Down Expand Up @@ -132,24 +133,27 @@ toMempoolBackend logger mempool = do
, mempoolGetBlock = getBlock
, mempoolPrune = prune
, mempoolGetPendingTransactions = getPending
, mempoolGetNewTransactions = getNew
, mempoolClear = clear
}
where
cfg = _inmemCfg mempool
nonce = _inmemNonce mempool
lockMVar = _inmemDataLock mempool
newTxsVar = _inmemNewTxs mempool

InMemConfig tcfg _ _ _ _ _ _ = cfg
member = memberInMem lockMVar
lookup = lookupInMem tcfg lockMVar
lookupEncoded = lookupEncodedInMem lockMVar
insert = insertInMem cfg lockMVar
insert = insertInMem cfg lockMVar newTxsVar
insertCheck = insertCheckInMem cfg lockMVar
markValidated = markValidatedInMem logger tcfg lockMVar
addToBadList = addToBadListInMem lockMVar
checkBadList = checkBadListInMem lockMVar
getBlock = getBlockInMem logger cfg lockMVar
getPending = getPendingInMem cfg nonce lockMVar
getNew = getNewInMem newTxsVar
prune = pruneInMem lockMVar
clear = clearInMem lockMVar

Expand Down Expand Up @@ -479,10 +483,11 @@ insertInMem
. NFData t
=> InMemConfig t -- ^ in-memory config
-> MVar (InMemoryMempoolData t) -- ^ in-memory state
-> TVar [t]
-> InsertType
-> Vector t -- ^ new transactions
-> IO ()
insertInMem cfg lock runCheck txs0 = do
insertInMem cfg lock newTxsVar runCheck txs0 = do
txhashes <- insertCheck
withMVarMasked lock $ \mdata -> do
pending <- readIORef (_inmemPending mdata)
Expand All @@ -495,9 +500,13 @@ insertInMem cfg lock runCheck txs0 = do
recordRecentTransactions maxRecent newHashes
where
insertCheck :: IO (Vector (T2 TransactionHash t))
insertCheck = if runCheck == CheckedInsert
then insertCheckInMem' cfg lock txs0
else return $! V.map (\tx -> T2 (hasher tx) tx) txs0
insertCheck = case runCheck of
CheckedInsert -> insertCheckInMem' cfg lock txs0
UncheckedInsert -> return $! V.map (\tx -> T2 (hasher tx) tx) txs0
NewInsert -> do
-- we trust the caller to have done all necessary pre-insert checks
atomically $ modifyTVar newTxsVar $ (V.toList txs0 ++)
return $! V.map (\tx -> T2 (hasher tx) tx) txs0

txcfg = _inmemTxCfg cfg
encodeTx = codecEncode (txCodec txcfg)
Expand Down Expand Up @@ -706,6 +715,13 @@ getPendingInMem cfg nonce lock since callback = do
sendChunk _ 0 = return ()
sendChunk dl _ = callback $! V.fromList $ dl []

getNewInMem :: TVar [t] -> IO [t]
getNewInMem v = atomically $ do
ts <- readTVar v
guard (not $ null ts)
writeTVar v []
return ts

------------------------------------------------------------------------------
clearInMem :: MVar (InMemoryMempoolData t) -> IO ()
clearInMem lock = newInMemMempoolData >>= void . swapMVar lock
Expand Down
9 changes: 8 additions & 1 deletion src/Chainweb/Mempool/InMemTypes.hs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import Chainweb.Mempool.CurrentTxs
import Chainweb.Mempool.Mempool
import Chainweb.Time (Micros(..), Time(..))
import Chainweb.Utils (T2)
import Control.Concurrent.STM

------------------------------------------------------------------------------
data PendingEntry = PendingEntry
Expand Down Expand Up @@ -80,6 +81,11 @@ data InMemConfig t = InMemConfig {
data InMemoryMempool t = InMemoryMempool {
_inmemCfg :: !(InMemConfig t)
, _inmemDataLock :: !(MVar (InMemoryMempoolData t))
, _inmemNewTxs :: !(TVar [t])
-- ^ The set of new transactions that have been submitted to this mempool
-- by a user and not by another mempool, and which haven't been sent to
-- other mempools yet. This is used to quickly send these transactions
-- to other mempool without waiting for the usual polling period.
, _inmemNonce :: !ServerNonce
}

Expand All @@ -102,11 +108,12 @@ data InMemoryMempoolData t = InMemoryMempoolData {
-- possibly have to pay gas for it several times.

, _inmemCurrentTxs :: !(IORef CurrentTxs)
-- ^ The set of non-expired transactions that have been addeded to a block.
-- ^ The set of non-expired transactions that have been added to a block.
-- Transactions are remove from the set of pending transactions when they
-- are added to a block. This set is used to prevent transactions from being
-- re-inserts when synchronizing with nodes that haven't yet validated the
-- block.

}

------------------------------------------------------------------------------
Expand Down
38 changes: 31 additions & 7 deletions src/Chainweb/Mempool/Mempool.hs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ module Chainweb.Mempool.Mempool
import Control.DeepSeq (NFData)
import Control.Exception
import Control.Lens hiding ((.=))
import Control.Monad (replicateM, unless)
import Control.Monad (replicateM, unless, forever)

import Crypto.Hash (hash)
import Crypto.Hash.Algorithms (SHA512t_256)
Expand Down Expand Up @@ -142,6 +142,7 @@ import Chainweb.Transaction
import Chainweb.Utils
import Chainweb.Utils.Serialization
import Data.LogMessage (LogFunctionText)
import Control.Concurrent.Async

------------------------------------------------------------------------------
data LookupResult t = Missing
Expand Down Expand Up @@ -215,7 +216,7 @@ data TransactionConfig t = TransactionConfig {
type MempoolTxId = Int64
type ServerNonce = Int
type HighwaterMark = (ServerNonce, MempoolTxId)
data InsertType = CheckedInsert | UncheckedInsert
data InsertType = CheckedInsert | UncheckedInsert | NewInsert
deriving (Show, Eq)

data InsertError = InsertErrorDuplicate
Expand Down Expand Up @@ -310,6 +311,10 @@ data MempoolBackend t = MempoolBackend {
-- | Discard any expired transactions.
, mempoolPrune :: IO ()

-- | Returns only transactions that have been newly constructed to send them
-- to the rest of the network for the first time.
, mempoolGetNewTransactions :: IO [t]

-- | given a previous high-water mark and a chunk callback function, loops
-- through the pending candidate transactions and supplies the hashes to
-- the callback in chunks. No ordering of hashes is presupposed. Returns
Expand Down Expand Up @@ -342,6 +347,7 @@ noopMempool = do
, mempoolGetBlock = noopGetBlock
, mempoolPrune = return ()
, mempoolGetPendingTransactions = noopGetPending
, mempoolGetNewTransactions = return []
, mempoolClear = noopClear
}
where
Expand Down Expand Up @@ -407,6 +413,9 @@ data SyncState = SyncState {
, _syncTooMany :: !Bool
}

-- sendNewTx :: t -> MempoolBackend t -> IO ()


-- | Pulls any missing pending transactions from a remote mempool.
--
-- The initial sync procedure:
Expand All @@ -425,12 +434,14 @@ syncMempools'
=> LogFunctionText
-> Int
-- ^ polling interval in microseconds
-> Int
-- ^ sending new txs delay in microseconds
-> MempoolBackend t
-- ^ local mempool
-> MempoolBackend t
-- ^ remote mempool
-> IO ()
syncMempools' log0 us localMempool remoteMempool = sync
syncMempools' log0 syncDelayMicros sendNewTxsDelayMicros localMempool remoteMempool = sync

where
maxCnt = 5000
Expand Down Expand Up @@ -489,7 +500,11 @@ syncMempools' log0 us localMempool remoteMempool = sync
deb :: Text -> IO ()
deb = log0 Debug

sync = finally (initialSync >>= subsequentSync) (deb "sync exiting")
sync = finally
(do
hw <- initialSync
subsequentSync hw `race_` sendNewTxs)
(deb "sync exiting")

initialSync = do
deb "Get full list of pending hashes from remote"
Expand Down Expand Up @@ -525,9 +540,17 @@ syncMempools' log0 us localMempool remoteMempool = sync
, " new remote hashes need to be fetched"
]
traverse_ fetchMissing missingChunks
approximateThreadDelay us
approximateThreadDelay syncDelayMicros
subsequentSync remoteHw'

sendNewTxs = forever $ do
newTxs <- mempoolGetNewTransactions localMempool
deb $
"Sending newly constructed transactions: " <>
sshow (txHasher (mempoolTxConfig localMempool) <$> newTxs)
mempoolInsert remoteMempool CheckedInsert $! V.fromList newTxs
approximateThreadDelay sendNewTxsDelayMicros

-- get pending hashes from remote since the given (optional) high water mark
fetchSince oldRemoteHw = do
-- Intialize and collect SyncState
Expand Down Expand Up @@ -563,11 +586,12 @@ syncMempools
:: Show t
=> LogFunctionText
-> Int -- ^ polling interval in microseconds
-> Int -- ^ new tx sending delay in microseconds
-> MempoolBackend t -- ^ local mempool
-> MempoolBackend t -- ^ remote mempool
-> IO ()
syncMempools log us localMempool remoteMempool =
syncMempools' log us localMempool remoteMempool
syncMempools log syncDelayMicros sendNewTxsDelayMicros localMempool remoteMempool =
syncMempools' log syncDelayMicros sendNewTxsDelayMicros localMempool remoteMempool

------------------------------------------------------------------------------
-- | Raw/unencoded transaction hashes.
Expand Down
10 changes: 9 additions & 1 deletion src/Chainweb/Mempool/P2pConfig.hs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TemplateHaskell #-}
{-# LANGUAGE NumericUnderscores #-}

-- |
-- Module: Chainweb.Mempool.P2pConfig
Expand All @@ -19,6 +20,7 @@ module Chainweb.Mempool.P2pConfig
, mempoolP2pConfigMaxSessionCount
, mempoolP2pConfigSessionTimeout
, mempoolP2pConfigPollInterval
, mempoolP2pConfigSendNewTxsDelay
, defaultMempoolP2pConfig
, pMempoolP2pConfig
) where
Expand All @@ -42,6 +44,7 @@ data MempoolP2pConfig = MempoolP2pConfig
, _mempoolP2pConfigSessionTimeout :: !Seconds
-- ^ timeout in seconds
, _mempoolP2pConfigPollInterval :: !Seconds
, _mempoolP2pConfigSendNewTxsDelay :: !Micros
}
deriving (Show, Eq, Ord, Generic)

Expand All @@ -51,7 +54,8 @@ defaultMempoolP2pConfig :: MempoolP2pConfig
defaultMempoolP2pConfig = MempoolP2pConfig
{ _mempoolP2pConfigMaxSessionCount = 6
, _mempoolP2pConfigSessionTimeout = 300
, _mempoolP2pConfigPollInterval = 30
, _mempoolP2pConfigPollInterval = Seconds 30
, _mempoolP2pConfigSendNewTxsDelay = Micros 500_000
}

instance ToJSON MempoolP2pConfig where
Expand All @@ -72,6 +76,7 @@ instance FromJSON MempoolP2pConfig where
<$> o .: "maxSessionCount"
<*> o .: "sessionTimeout"
<*> o .: "pollInterval"
<*> o .: "sendNewTxsDelay"

pMempoolP2pConfig :: MParser MempoolP2pConfig
pMempoolP2pConfig = id
Expand All @@ -84,3 +89,6 @@ pMempoolP2pConfig = id
<*< mempoolP2pConfigPollInterval .:: textOption
% long "mempool-p2p-poll-interval"
<> help "poll interval for synchronizing mempools in seconds"
<*< mempoolP2pConfigPollInterval .:: textOption
% long "mempool-p2p-send-new-txs-delay"
<> help "delay between sending new transactions to other mempools in micros"
1 change: 1 addition & 0 deletions src/Chainweb/Mempool/RestAPI/Client.hs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ toMempool version chain txcfg env =
, mempoolCheckBadList = const unsupported
, mempoolGetBlock = \_ _ _ _ -> unsupported
, mempoolGetPendingTransactions = getPending
, mempoolGetNewTransactions = unsupported
, mempoolPrune = unsupported
, mempoolClear = clear
}
Expand Down
2 changes: 1 addition & 1 deletion src/Chainweb/Pact/RestAPI/Server.hs
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ sendHandler logger v cid mempool (SubmitBatch cmds) = Handler $ do
let txs = V.fromList $ NEL.toList enriched
-- If any of the txs in the batch fail validation, we reject them all.
liftIO (mempoolInsertCheck mempool txs) >>= checkResult
liftIO (mempoolInsert mempool UncheckedInsert txs)
liftIO (mempoolInsert mempool NewInsert txs)
return $! RequestKeys $ NEL.map cmdToRequestKey enriched
Left err -> failWith $ "Validation failed: " <> T.pack err
where
Expand Down
Loading
Loading