Skip to content

Commit

Permalink
Process PactQueue concurrently.
Browse files Browse the repository at this point in the history
Signed-off-by: Evgenii Akentev <[email protected]>
Change-Id: Ia007a565d75c625ddb243534a546d57584ec8e7d
  • Loading branch information
Evgenii Akentev committed Apr 2, 2024
1 parent 8768e48 commit f1d1ed0
Show file tree
Hide file tree
Showing 10 changed files with 214 additions and 153 deletions.
46 changes: 22 additions & 24 deletions bench/Chainweb/Pact/Backend/ForkingBench.hs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ import Chainweb.Pact.Backend.Compaction qualified as C
import Chainweb.Pact.Backend.Types
import Chainweb.Pact.Backend.Utils
import Chainweb.Pact.PactService
import Chainweb.Pact.Service.BlockValidation
import Chainweb.Pact.Service.BlockValidation as BlockValidation
import Chainweb.Pact.Service.PactQueue
import Chainweb.Pact.Service.Types
import Chainweb.Pact.Types
Expand Down Expand Up @@ -262,7 +262,8 @@ data Resources
, coinAccounts :: !(MVar (Map Account (NonEmpty (DynKeyPair, [SigCapability]))))
, nonceCounter :: !(IORef Word64)
, txPerBlock :: !(IORef Int)
, sqlEnv :: !SQLiteEnv
, writeSqlEnv :: !SQLiteEnv
, readSqlEnv :: !SQLiteEnv
}

type RunPactService =
Expand Down Expand Up @@ -293,46 +294,38 @@ withResources rdb trunkLength logLevel compact f = C.envWithCleanup create destr
coinAccounts <- newMVar mempty
nonceCounter <- newIORef 1
txPerBlock <- newIORef 10
sqlEnv <- openSQLiteConnection "" {- temporary SQLite db -} chainwebBenchPragmas

writeSqlEnv <- openSQLiteConnection "" {- temporary SQLite db -} chainwebPragmas
readSqlEnv <- openSQLiteConnection "" {- temporary SQLite db -} chainwebPragmas
mp <- testMemPoolAccess txPerBlock coinAccounts
pactService <-
startPact testVer logger blockHeaderDb payloadDb mp sqlEnv
startPact testVer logger blockHeaderDb payloadDb mp (writeSqlEnv, readSqlEnv)
mainTrunkBlocks <-
playLine payloadDb blockHeaderDb trunkLength genesisBlock (snd pactService) nonceCounter
when (compact == DoCompact) $ do
C.withDefaultLogger Error $ \lgr -> do
void $ C.compact (BlockHeight trunkLength) lgr sqlEnv []
void $ C.compact (BlockHeight trunkLength) lgr writeSqlEnv []

return $ NoopNFData $ Resources {..}

destroy (NoopNFData (Resources {..})) = do
stopPact pactService
stopSqliteDb sqlEnv
stopSqliteDb writeSqlEnv
stopSqliteDb readSqlEnv

pactQueueSize = 2000

logger = genericLogger logLevel T.putStrLn

startPact version l bhdb pdb mempool sqlEnv = do
startPact version l bhdb pdb mempool sqlEnvs = do
reqQ <- newPactQueue pactQueueSize
a <- async $ runPactService version cid l reqQ mempool bhdb pdb sqlEnv testPactServiceConfig
a <- async $ runPactService version cid l reqQ mempool bhdb pdb sqlEnvs testPactServiceConfig
{ _pactBlockGasLimit = 180_000
}

return (a, reqQ)

stopPact (a, _) = cancel a

chainwebBenchPragmas =
[ "synchronous = NORMAL"
, "journal_mode = WAL"
, "locking_mode = EXCLUSIVE"
-- this is different from the prodcution database that uses @NORMAL@
, "temp_store = MEMORY"
, "auto_vacuum = NONE"
, "page_size = 1024"
]

genesisBlock :: BlockHeader
genesisBlock = genesisBlockHeader testVer cid

Expand Down Expand Up @@ -369,7 +362,7 @@ testMemPoolAccess txsPerBlock accounts = do
getTestBlock mVarAccounts txOrigTime validate bHeight hash
| bHeight == 1 = do
meta <- setTime txOrigTime <$> makeMeta cid
(as, kss, cmds) <- unzip3 . toList <$> createCoinAccounts testVer meta
(as, kss, cmds) <- unzip3 <$> createCoinAccounts testVer meta twoNames
case traverse validateCommand cmds of
Left err -> throwM $ userError err
Right !r -> do
Expand Down Expand Up @@ -468,15 +461,20 @@ stockKey s = do
stockKeyFile :: ByteString
stockKeyFile = $(embedFile "pact/genesis/devnet/keys.yaml")

createCoinAccounts :: ChainwebVersion -> PublicMeta -> IO (NonEmpty (Account, NonEmpty (DynKeyPair, [SigCapability]), Command Text))
createCoinAccounts v meta = traverse (go <*> createCoinAccount v meta) names
createCoinAccounts :: ChainwebVersion -> PublicMeta -> [String] -> IO [(Account, NonEmpty (DynKeyPair, [SigCapability]), Command Text)]
createCoinAccounts v meta names' = traverse (go <*> createCoinAccount v meta) names'
where
go a m = do
(b,c) <- m
return (Account a,b,c)

names :: NonEmpty String
names = NEL.map safeCapitalize . NEL.fromList $ Prelude.take 2 $ words "mary elizabeth patricia jennifer linda barbara margaret susan dorothy jessica james john robert michael william david richard joseph charles thomas"
twoNames :: [String]
twoNames = take 2 names

names :: [String]
names = map safeCapitalize $ names' ++ [(n ++ show x) | n <- names', x <- [0 :: Int ..1000]]
where
names' = words "mary elizabeth patricia jennifer linda barbara margaret susan dorothy jessica james john robert michael william david richard joseph charles thomas"

formatB16PubKey :: DynKeyPair -> Text
formatB16PubKey = \case
Expand Down
216 changes: 123 additions & 93 deletions src/Chainweb/Pact/PactService.hs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ import Chainweb.Pact.Backend.RelationalCheckpointer (withProdRelationalCheckpoin
import Chainweb.Pact.Backend.Types
import Chainweb.Pact.PactService.ExecBlock
import Chainweb.Pact.PactService.Checkpointer
import Chainweb.Pact.Service.PactQueue (PactQueue, getNextRequest)
import Chainweb.Pact.Service.PactQueue (PactQueue, getNextWriteRequest, getNextReadRequest)
import Chainweb.Pact.Service.Types
import Chainweb.Pact.SPV
import Chainweb.Pact.TransactionExec
Expand All @@ -126,13 +126,18 @@ runPactService
-> MemPoolAccess
-> BlockHeaderDb
-> PayloadDb tbl
-> SQLiteEnv
-> (SQLiteEnv, SQLiteEnv)
-> PactServiceConfig
-> IO ()
runPactService ver cid chainwebLogger reqQ mempoolAccess bhDb pdb sqlenv config =
void $ withPactService ver cid chainwebLogger bhDb pdb sqlenv config $ do
runPactService ver cid chainwebLogger reqQ mempoolAccess bhDb pdb (writeSqlEnv, readSqlEnv) config =
void $ withPactService ver cid chainwebLogger bhDb pdb writeSqlEnv config True $ do
initialPayloadState mempoolAccess ver cid
serviceRequests mempoolAccess reqQ
pst <- get
pse <- ask
liftIO $ race_
(runPactServiceM pst pse $ serviceWriteRequests mempoolAccess reqQ)
(threadDelay 1_000_000 >> (withPactService ver cid chainwebLogger bhDb pdb readSqlEnv config False $
serviceReadRequests mempoolAccess reqQ))

withPactService
:: (Logger logger, CanReadablePayloadCas tbl)
Expand All @@ -143,9 +148,10 @@ withPactService
-> PayloadDb tbl
-> SQLiteEnv
-> PactServiceConfig
-> Bool
-> PactServiceM logger tbl a
-> IO (T2 a PactServiceState)
withPactService ver cid chainwebLogger bhDb pdb sqlenv config act =
withPactService ver cid chainwebLogger bhDb pdb sqlenv config initLatest act =
withProdRelationalCheckpointer checkpointerLogger (_pactModuleCacheLimit config) sqlenv ver cid $ \checkpointer -> do
let !rs = readRewards
let !pse = PactServiceEnv
Expand Down Expand Up @@ -195,7 +201,7 @@ withPactService ver cid chainwebLogger bhDb pdb sqlenv config act =
-- 'initalPayloadState.readContracts'. We therefore rewind to the latest
-- avaliable header in the block header database.
--
exitOnRewindLimitExceeded $ initializeLatestBlock (_pactUnlimitedInitialRewind config)
when initLatest $ exitOnRewindLimitExceeded $ initializeLatestBlock (_pactUnlimitedInitialRewind config)
act
where
pactServiceLogger = setComponent "pact" chainwebLogger
Expand Down Expand Up @@ -281,43 +287,68 @@ lookupBlockHeader bhash ctx = do
throwM $ BlockHeaderLookupFailure $
"failed lookup of parent header in " <> ctx <> ": " <> sshow e

-- | Loop forever, serving Pact execution requests and reponses from the queues
serviceRequests
-- | Loop forever, serving Pact execution Write-requests
serviceWriteRequests
:: forall logger tbl. (Logger logger, CanReadablePayloadCas tbl)
=> MemPoolAccess
-> PactQueue
-> PactServiceM logger tbl ()
serviceRequests memPoolAccess reqQ = do
logInfo "Starting service"
go `finally` logInfo "Stopping service"
serviceWriteRequests memPoolAccess reqQ = do
logInfo "Starting Write-requests handling service"
go `finally` logInfo "Stopping Write-requests handling service"
where
go = do
PactServiceEnv{_psLogger} <- ask
logDebug "serviceRequests: wait"
msg <- liftIO $ getNextRequest reqQ
logDebug "serviceWriteRequests: wait"
msg <- liftIO $ getNextWriteRequest reqQ
requestId <- liftIO $ UUID.toText <$> UUID.nextRandom
let logFn = logFunction $ addLabel ("pact-request-id", requestId) _psLogger
logDebug $ "serviceRequests: " <> sshow msg
logDebug $ "serviceWriteRequests: " <> sshow msg
case msg of
CloseMsg -> return ()
LocalMsg (LocalReq localRequest preflight sigVerify rewindDepth localResultVar) -> do
trace logFn "Chainweb.Pact.PactService.execLocal" () 0 $
tryOne "execLocal" localResultVar $
execLocal localRequest preflight sigVerify rewindDepth
go
NewBlockMsg NewBlockReq {..} -> do
trace logFn "Chainweb.Pact.PactService.execNewBlock"
() 1 $
tryOne "execNewBlock" _newResultVar $
execNewBlock memPoolAccess _newMiner
go
ValidateBlockMsg ValidateBlockReq {..} -> do
tryOne "execValidateBlock" _valResultVar $
fmap fst $ trace' logFn "Chainweb.Pact.PactService.execValidateBlock"
_valBlockHeader
(\(_, g) -> fromIntegral g)
(execValidateBlock memPoolAccess _valBlockHeader _valPayloadData)
go
SyncToBlockMsg SyncToBlockReq {..} -> do
trace logFn "Chainweb.Pact.PactService.execSyncToBlock" _syncToBlockHeader 1 $
tryOne "syncToBlockBlock" _syncToResultVar $
execSyncToBlock _syncToBlockHeader
go
_ -> error $ "impossible: unexpected request " ++ show msg

-- | Loop forever, serving Pact execution Read-requests
serviceReadRequests
:: forall logger tbl. (Logger logger, CanReadablePayloadCas tbl)
=> MemPoolAccess
-> PactQueue
-> PactServiceM logger tbl ()
serviceReadRequests memPoolAccess reqQ = do
logInfo "Starting read-requests handling service"
go `finally` (logInfo "Stopping read-requests handling service")
where
go = do
logDebug "serviceReadRequests: wait"
msg <- liftIO $ getNextReadRequest reqQ
requestId <- liftIO $ UUID.toText <$> UUID.nextRandom
PactServiceEnv{_psLogger} <- ask
let logFn = logFunction $ addLabel ("pact-request-id", requestId) _psLogger
logDebug $ "serviceReadRequests: " <> sshow msg
case msg of
NewBlockMsg NewBlockReq {..} -> do
trace logFn "Chainweb.Pact.PactService.execNewBlock"
() 1 $
tryOne "execNewBlock" _newResultVar $
execNewBlock memPoolAccess _newMiner
go
LocalMsg (LocalReq localRequest preflight sigVerify rewindDepth localResultVar) -> do
trace logFn "Chainweb.Pact.PactService.execLocal" () 0 $
tryOne "execLocal" localResultVar $
execLocal localRequest preflight sigVerify rewindDepth
go
LookupPactTxsMsg (LookupPactTxsReq confDepth txHashes resultVar) -> do
trace logFn "Chainweb.Pact.PactService.execLookupPactTxs" ()
(length txHashes) $
Expand All @@ -340,81 +371,80 @@ serviceRequests memPoolAccess reqQ = do
tryOne "execHistoricalLookup" resultVar $
execHistoricalLookup bh d k
go
SyncToBlockMsg SyncToBlockReq {..} -> do
trace logFn "Chainweb.Pact.PactService.execSyncToBlock" _syncToBlockHeader 1 $
tryOne "syncToBlockBlock" _syncToResultVar $
execSyncToBlock _syncToBlockHeader
go
ReadOnlyReplayMsg ReadOnlyReplayReq {..} -> do
trace logFn "Chainweb.Pact.PactService.execReadOnlyReplay" (_readOnlyReplayLowerBound, _readOnlyReplayUpperBound) 1 $
tryOne "readOnlyReplayBlock" _readOnlyReplayResultVar $
execReadOnlyReplay _readOnlyReplayLowerBound _readOnlyReplayUpperBound
go
_ -> error $ "impossible: unexpected request " ++ show msg

tryOne
:: forall logger tbl a. (Logger logger, CanReadablePayloadCas tbl)
=> Text
-> MVar (Either PactException a)
-> PactServiceM logger tbl a
-> PactServiceM logger tbl ()
tryOne which mvar = tryOne' which mvar Right

tryOne'
:: (Logger logger, CanReadablePayloadCas tbl)
=> Text
-> MVar (Either PactException b)
-> (a -> Either PactException b)
-> PactServiceM logger tbl a
-> PactServiceM logger tbl ()
tryOne' which mvar post m =
(evalPactOnThread (post <$> m) >>= (liftIO . putMVar mvar))
`catches`
[ Handler $ \(e :: SomeAsyncException) -> do
logWarn $ T.concat
[ "Received asynchronous exception running pact service ("
, which
, "): "
, sshow e
]
liftIO $ do
void $ tryPutMVar mvar $! toPactInternalError e
throwM e
, Handler $ \(e :: SomeException) -> do
logError $ mconcat
[ "Received exception running pact service ("
, which
, "): "
, sshow e
]
liftIO $ do
void $ tryPutMVar mvar $! toPactInternalError e
]
where
toPactInternalError e = Left $ PactInternalError $ T.pack $ show e

tryOne
:: Text
-> MVar (Either PactException a)
-> PactServiceM logger tbl a
-> PactServiceM logger tbl ()
tryOne which mvar = tryOne' which mvar Right

tryOne'
:: Text
-> MVar (Either PactException b)
-> (a -> Either PactException b)
-> PactServiceM logger tbl a
-> PactServiceM logger tbl ()
tryOne' which mvar post m =
(evalPactOnThread (post <$> m) >>= (liftIO . putMVar mvar))
`catches`
[ Handler $ \(e :: SomeAsyncException) -> do
logWarn $ T.concat
[ "Received asynchronous exception running pact service ("
, which
, "): "
, sshow e
]
liftIO $ do
void $ tryPutMVar mvar $! toPactInternalError e
throwM e
, Handler $ \(e :: SomeException) -> do
logError $ mconcat
[ "Received exception running pact service ("
, which
, "): "
, sshow e
]
liftIO $ do
void $ tryPutMVar mvar $! toPactInternalError e
]
where
-- Pact turns AsyncExceptions into textual exceptions within
-- PactInternalError. So there is no easy way for us to distinguish
-- whether an exception originates from within pact or from the outside.
--
-- A common strategy to deal with this is to run the computation (pact)
-- on a "hidden" internal thread. Lifting `forkIO` into a state
-- monad is generally not thread-safe. It is fine to do here, since
-- there is no concurrency. We use a thread here only to shield the
-- computation from external exceptions.
--
-- This solution isn't bullet-proof and only meant as a temporary fix. A
-- proper solution is to fix pact, to handle asynchronous exceptions
-- gracefully.
--
-- No mask is needed here. Asynchronous exceptions are handled
-- by the outer handlers and cause an abort. So no state is lost.
--
evalPactOnThread :: PactServiceM logger tbl a -> PactServiceM logger tbl a
evalPactOnThread act = do
e <- ask
s <- get
T2 r s' <- liftIO $
withAsync (runPactServiceM s e act) wait
put $! s'
return $! r
-- Pact turns AsyncExceptions into textual exceptions within
-- PactInternalError. So there is no easy way for us to distinguish
-- whether an exception originates from within pact or from the outside.
--
-- A common strategy to deal with this is to run the computation (pact)
-- on a "hidden" internal thread. Lifting `forkIO` into a state
-- monad is generally not thread-safe. It is fine to do here, since
-- there is no concurrency. We use a thread here only to shield the
-- computation from external exceptions.
--
-- This solution isn't bullet-proof and only meant as a temporary fix. A
-- proper solution is to fix pact, to handle asynchronous exceptions
-- gracefully.
--
-- No mask is needed here. Asynchronous exceptions are handled
-- by the outer handlers and cause an abort. So no state is lost.
--
evalPactOnThread :: PactServiceM logger tbl a -> PactServiceM logger tbl a
evalPactOnThread act = do
e <- ask
s <- get
T2 r s' <- liftIO $
withAsync (runPactServiceM s e act) wait
put $! s'
return $! r


execNewBlock
:: forall logger tbl. (Logger logger, CanReadablePayloadCas tbl)
Expand Down
Loading

0 comments on commit f1d1ed0

Please sign in to comment.