diff --git a/bench/Chainweb/Pact/Backend/ForkingBench.hs b/bench/Chainweb/Pact/Backend/ForkingBench.hs index c59b3dbef3..2bd880cf62 100644 --- a/bench/Chainweb/Pact/Backend/ForkingBench.hs +++ b/bench/Chainweb/Pact/Backend/ForkingBench.hs @@ -87,7 +87,7 @@ import Chainweb.Pact.Backend.Compaction qualified as C import Chainweb.Pact.Backend.Types import Chainweb.Pact.Backend.Utils import Chainweb.Pact.PactService -import Chainweb.Pact.Service.BlockValidation +import Chainweb.Pact.Service.BlockValidation as BlockValidation import Chainweb.Pact.Service.PactQueue import Chainweb.Pact.Service.Types import Chainweb.Pact.Types @@ -262,7 +262,8 @@ data Resources , coinAccounts :: !(MVar (Map Account (NonEmpty (DynKeyPair, [SigCapability])))) , nonceCounter :: !(IORef Word64) , txPerBlock :: !(IORef Int) - , sqlEnv :: !SQLiteEnv + , writeSqlEnv :: !SQLiteEnv + , readSqlEnv :: !SQLiteEnv } type RunPactService = @@ -293,46 +294,38 @@ withResources rdb trunkLength logLevel compact f = C.envWithCleanup create destr coinAccounts <- newMVar mempty nonceCounter <- newIORef 1 txPerBlock <- newIORef 10 - sqlEnv <- openSQLiteConnection "" {- temporary SQLite db -} chainwebBenchPragmas + + writeSqlEnv <- openSQLiteConnection "" {- temporary SQLite db -} chainwebPragmas + readSqlEnv <- openSQLiteConnection "" {- temporary SQLite db -} chainwebPragmas mp <- testMemPoolAccess txPerBlock coinAccounts pactService <- - startPact testVer logger blockHeaderDb payloadDb mp sqlEnv + startPact testVer logger blockHeaderDb payloadDb mp (writeSqlEnv, readSqlEnv) mainTrunkBlocks <- playLine payloadDb blockHeaderDb trunkLength genesisBlock (snd pactService) nonceCounter when (compact == DoCompact) $ do C.withDefaultLogger Error $ \lgr -> do - void $ C.compact (BlockHeight trunkLength) lgr sqlEnv [] + void $ C.compact (BlockHeight trunkLength) lgr writeSqlEnv [] return $ NoopNFData $ Resources {..} destroy (NoopNFData (Resources {..})) = do stopPact pactService - stopSqliteDb sqlEnv + stopSqliteDb writeSqlEnv + stopSqliteDb readSqlEnv pactQueueSize = 2000 logger = genericLogger logLevel T.putStrLn - startPact version l bhdb pdb mempool sqlEnv = do + startPact version l bhdb pdb mempool sqlEnvs = do reqQ <- newPactQueue pactQueueSize - a <- async $ runPactService version cid l reqQ mempool bhdb pdb sqlEnv testPactServiceConfig + a <- async $ runPactService version cid l reqQ mempool bhdb pdb sqlEnvs testPactServiceConfig { _pactBlockGasLimit = 180_000 } - return (a, reqQ) stopPact (a, _) = cancel a - chainwebBenchPragmas = - [ "synchronous = NORMAL" - , "journal_mode = WAL" - , "locking_mode = EXCLUSIVE" - -- this is different from the prodcution database that uses @NORMAL@ - , "temp_store = MEMORY" - , "auto_vacuum = NONE" - , "page_size = 1024" - ] - genesisBlock :: BlockHeader genesisBlock = genesisBlockHeader testVer cid @@ -369,7 +362,7 @@ testMemPoolAccess txsPerBlock accounts = do getTestBlock mVarAccounts txOrigTime validate bHeight hash | bHeight == 1 = do meta <- setTime txOrigTime <$> makeMeta cid - (as, kss, cmds) <- unzip3 . toList <$> createCoinAccounts testVer meta + (as, kss, cmds) <- unzip3 <$> createCoinAccounts testVer meta twoNames case traverse validateCommand cmds of Left err -> throwM $ userError err Right !r -> do @@ -468,15 +461,20 @@ stockKey s = do stockKeyFile :: ByteString stockKeyFile = $(embedFile "pact/genesis/devnet/keys.yaml") -createCoinAccounts :: ChainwebVersion -> PublicMeta -> IO (NonEmpty (Account, NonEmpty (DynKeyPair, [SigCapability]), Command Text)) -createCoinAccounts v meta = traverse (go <*> createCoinAccount v meta) names +createCoinAccounts :: ChainwebVersion -> PublicMeta -> [String] -> IO [(Account, NonEmpty (DynKeyPair, [SigCapability]), Command Text)] +createCoinAccounts v meta names' = traverse (go <*> createCoinAccount v meta) names' where go a m = do (b,c) <- m return (Account a,b,c) -names :: NonEmpty String -names = NEL.map safeCapitalize . NEL.fromList $ Prelude.take 2 $ words "mary elizabeth patricia jennifer linda barbara margaret susan dorothy jessica james john robert michael william david richard joseph charles thomas" +twoNames :: [String] +twoNames = take 2 names + +names :: [String] +names = map safeCapitalize $ names' ++ [(n ++ show x) | n <- names', x <- [0 :: Int ..1000]] + where + names' = words "mary elizabeth patricia jennifer linda barbara margaret susan dorothy jessica james john robert michael william david richard joseph charles thomas" formatB16PubKey :: DynKeyPair -> Text formatB16PubKey = \case diff --git a/src/Chainweb/Pact/PactService.hs b/src/Chainweb/Pact/PactService.hs index 5ecaae1711..5e55be99b5 100644 --- a/src/Chainweb/Pact/PactService.hs +++ b/src/Chainweb/Pact/PactService.hs @@ -99,7 +99,7 @@ import Chainweb.Pact.Backend.RelationalCheckpointer (withProdRelationalCheckpoin import Chainweb.Pact.Backend.Types import Chainweb.Pact.PactService.ExecBlock import Chainweb.Pact.PactService.Checkpointer -import Chainweb.Pact.Service.PactQueue (PactQueue, getNextRequest) +import Chainweb.Pact.Service.PactQueue (PactQueue, getNextWriteRequest, getNextReadRequest) import Chainweb.Pact.Service.Types import Chainweb.Pact.SPV import Chainweb.Pact.TransactionExec @@ -126,13 +126,18 @@ runPactService -> MemPoolAccess -> BlockHeaderDb -> PayloadDb tbl - -> SQLiteEnv + -> (SQLiteEnv, SQLiteEnv) -> PactServiceConfig -> IO () -runPactService ver cid chainwebLogger reqQ mempoolAccess bhDb pdb sqlenv config = - void $ withPactService ver cid chainwebLogger bhDb pdb sqlenv config $ do +runPactService ver cid chainwebLogger reqQ mempoolAccess bhDb pdb (writeSqlEnv, readSqlEnv) config = + void $ withPactService ver cid chainwebLogger bhDb pdb writeSqlEnv config True $ do initialPayloadState mempoolAccess ver cid - serviceRequests mempoolAccess reqQ + pst <- get + pse <- ask + liftIO $ race_ + (runPactServiceM pst pse $ serviceWriteRequests mempoolAccess reqQ) + (threadDelay 1_000_000 >> (withPactService ver cid chainwebLogger bhDb pdb readSqlEnv config False $ + serviceReadRequests mempoolAccess reqQ)) withPactService :: (Logger logger, CanReadablePayloadCas tbl) @@ -143,9 +148,10 @@ withPactService -> PayloadDb tbl -> SQLiteEnv -> PactServiceConfig + -> Bool -> PactServiceM logger tbl a -> IO (T2 a PactServiceState) -withPactService ver cid chainwebLogger bhDb pdb sqlenv config act = +withPactService ver cid chainwebLogger bhDb pdb sqlenv config initLatest act = withProdRelationalCheckpointer checkpointerLogger (_pactModuleCacheLimit config) sqlenv ver cid $ \checkpointer -> do let !rs = readRewards let !pse = PactServiceEnv @@ -195,7 +201,7 @@ withPactService ver cid chainwebLogger bhDb pdb sqlenv config act = -- 'initalPayloadState.readContracts'. We therefore rewind to the latest -- avaliable header in the block header database. -- - exitOnRewindLimitExceeded $ initializeLatestBlock (_pactUnlimitedInitialRewind config) + when initLatest $ exitOnRewindLimitExceeded $ initializeLatestBlock (_pactUnlimitedInitialRewind config) act where pactServiceLogger = setComponent "pact" chainwebLogger @@ -281,36 +287,25 @@ lookupBlockHeader bhash ctx = do throwM $ BlockHeaderLookupFailure $ "failed lookup of parent header in " <> ctx <> ": " <> sshow e --- | Loop forever, serving Pact execution requests and reponses from the queues -serviceRequests +-- | Loop forever, serving Pact execution Write-requests +serviceWriteRequests :: forall logger tbl. (Logger logger, CanReadablePayloadCas tbl) => MemPoolAccess -> PactQueue -> PactServiceM logger tbl () -serviceRequests memPoolAccess reqQ = do - logInfo "Starting service" - go `finally` logInfo "Stopping service" +serviceWriteRequests memPoolAccess reqQ = do + logInfo "Starting Write-requests handling service" + go `finally` logInfo "Stopping Write-requests handling service" where go = do PactServiceEnv{_psLogger} <- ask - logDebug "serviceRequests: wait" - msg <- liftIO $ getNextRequest reqQ + logDebug "serviceWriteRequests: wait" + msg <- liftIO $ getNextWriteRequest reqQ requestId <- liftIO $ UUID.toText <$> UUID.nextRandom let logFn = logFunction $ addLabel ("pact-request-id", requestId) _psLogger - logDebug $ "serviceRequests: " <> sshow msg + logDebug $ "serviceWriteRequests: " <> sshow msg case msg of CloseMsg -> return () - LocalMsg (LocalReq localRequest preflight sigVerify rewindDepth localResultVar) -> do - trace logFn "Chainweb.Pact.PactService.execLocal" () 0 $ - tryOne "execLocal" localResultVar $ - execLocal localRequest preflight sigVerify rewindDepth - go - NewBlockMsg NewBlockReq {..} -> do - trace logFn "Chainweb.Pact.PactService.execNewBlock" - () 1 $ - tryOne "execNewBlock" _newResultVar $ - execNewBlock memPoolAccess _newMiner - go ValidateBlockMsg ValidateBlockReq {..} -> do tryOne "execValidateBlock" _valResultVar $ fmap fst $ trace' logFn "Chainweb.Pact.PactService.execValidateBlock" @@ -318,6 +313,42 @@ serviceRequests memPoolAccess reqQ = do (\(_, g) -> fromIntegral g) (execValidateBlock memPoolAccess _valBlockHeader _valPayloadData) go + SyncToBlockMsg SyncToBlockReq {..} -> do + trace logFn "Chainweb.Pact.PactService.execSyncToBlock" _syncToBlockHeader 1 $ + tryOne "syncToBlockBlock" _syncToResultVar $ + execSyncToBlock _syncToBlockHeader + go + _ -> error $ "impossible: unexpected request " ++ show msg + +-- | Loop forever, serving Pact execution Read-requests +serviceReadRequests + :: forall logger tbl. (Logger logger, CanReadablePayloadCas tbl) + => MemPoolAccess + -> PactQueue + -> PactServiceM logger tbl () +serviceReadRequests memPoolAccess reqQ = do + logInfo "Starting read-requests handling service" + go `finally` (logInfo "Stopping read-requests handling service") + where + go = do + logDebug "serviceReadRequests: wait" + msg <- liftIO $ getNextReadRequest reqQ + requestId <- liftIO $ UUID.toText <$> UUID.nextRandom + PactServiceEnv{_psLogger} <- ask + let logFn = logFunction $ addLabel ("pact-request-id", requestId) _psLogger + logDebug $ "serviceReadRequests: " <> sshow msg + case msg of + NewBlockMsg NewBlockReq {..} -> do + trace logFn "Chainweb.Pact.PactService.execNewBlock" + () 1 $ + tryOne "execNewBlock" _newResultVar $ + execNewBlock memPoolAccess _newMiner + go + LocalMsg (LocalReq localRequest preflight sigVerify rewindDepth localResultVar) -> do + trace logFn "Chainweb.Pact.PactService.execLocal" () 0 $ + tryOne "execLocal" localResultVar $ + execLocal localRequest preflight sigVerify rewindDepth + go LookupPactTxsMsg (LookupPactTxsReq confDepth txHashes resultVar) -> do trace logFn "Chainweb.Pact.PactService.execLookupPactTxs" () (length txHashes) $ @@ -340,81 +371,80 @@ serviceRequests memPoolAccess reqQ = do tryOne "execHistoricalLookup" resultVar $ execHistoricalLookup bh d k go - SyncToBlockMsg SyncToBlockReq {..} -> do - trace logFn "Chainweb.Pact.PactService.execSyncToBlock" _syncToBlockHeader 1 $ - tryOne "syncToBlockBlock" _syncToResultVar $ - execSyncToBlock _syncToBlockHeader - go ReadOnlyReplayMsg ReadOnlyReplayReq {..} -> do trace logFn "Chainweb.Pact.PactService.execReadOnlyReplay" (_readOnlyReplayLowerBound, _readOnlyReplayUpperBound) 1 $ tryOne "readOnlyReplayBlock" _readOnlyReplayResultVar $ execReadOnlyReplay _readOnlyReplayLowerBound _readOnlyReplayUpperBound go + _ -> error $ "impossible: unexpected request " ++ show msg + +tryOne + :: forall logger tbl a. (Logger logger, CanReadablePayloadCas tbl) + => Text + -> MVar (Either PactException a) + -> PactServiceM logger tbl a + -> PactServiceM logger tbl () +tryOne which mvar = tryOne' which mvar Right +tryOne' + :: (Logger logger, CanReadablePayloadCas tbl) + => Text + -> MVar (Either PactException b) + -> (a -> Either PactException b) + -> PactServiceM logger tbl a + -> PactServiceM logger tbl () +tryOne' which mvar post m = + (evalPactOnThread (post <$> m) >>= (liftIO . putMVar mvar)) + `catches` + [ Handler $ \(e :: SomeAsyncException) -> do + logWarn $ T.concat + [ "Received asynchronous exception running pact service (" + , which + , "): " + , sshow e + ] + liftIO $ do + void $ tryPutMVar mvar $! toPactInternalError e + throwM e + , Handler $ \(e :: SomeException) -> do + logError $ mconcat + [ "Received exception running pact service (" + , which + , "): " + , sshow e + ] + liftIO $ do + void $ tryPutMVar mvar $! toPactInternalError e + ] + where toPactInternalError e = Left $ PactInternalError $ T.pack $ show e - tryOne - :: Text - -> MVar (Either PactException a) - -> PactServiceM logger tbl a - -> PactServiceM logger tbl () - tryOne which mvar = tryOne' which mvar Right - - tryOne' - :: Text - -> MVar (Either PactException b) - -> (a -> Either PactException b) - -> PactServiceM logger tbl a - -> PactServiceM logger tbl () - tryOne' which mvar post m = - (evalPactOnThread (post <$> m) >>= (liftIO . putMVar mvar)) - `catches` - [ Handler $ \(e :: SomeAsyncException) -> do - logWarn $ T.concat - [ "Received asynchronous exception running pact service (" - , which - , "): " - , sshow e - ] - liftIO $ do - void $ tryPutMVar mvar $! toPactInternalError e - throwM e - , Handler $ \(e :: SomeException) -> do - logError $ mconcat - [ "Received exception running pact service (" - , which - , "): " - , sshow e - ] - liftIO $ do - void $ tryPutMVar mvar $! toPactInternalError e - ] - where - -- Pact turns AsyncExceptions into textual exceptions within - -- PactInternalError. So there is no easy way for us to distinguish - -- whether an exception originates from within pact or from the outside. - -- - -- A common strategy to deal with this is to run the computation (pact) - -- on a "hidden" internal thread. Lifting `forkIO` into a state - -- monad is generally not thread-safe. It is fine to do here, since - -- there is no concurrency. We use a thread here only to shield the - -- computation from external exceptions. - -- - -- This solution isn't bullet-proof and only meant as a temporary fix. A - -- proper solution is to fix pact, to handle asynchronous exceptions - -- gracefully. - -- - -- No mask is needed here. Asynchronous exceptions are handled - -- by the outer handlers and cause an abort. So no state is lost. - -- - evalPactOnThread :: PactServiceM logger tbl a -> PactServiceM logger tbl a - evalPactOnThread act = do - e <- ask - s <- get - T2 r s' <- liftIO $ - withAsync (runPactServiceM s e act) wait - put $! s' - return $! r + -- Pact turns AsyncExceptions into textual exceptions within + -- PactInternalError. So there is no easy way for us to distinguish + -- whether an exception originates from within pact or from the outside. + -- + -- A common strategy to deal with this is to run the computation (pact) + -- on a "hidden" internal thread. Lifting `forkIO` into a state + -- monad is generally not thread-safe. It is fine to do here, since + -- there is no concurrency. We use a thread here only to shield the + -- computation from external exceptions. + -- + -- This solution isn't bullet-proof and only meant as a temporary fix. A + -- proper solution is to fix pact, to handle asynchronous exceptions + -- gracefully. + -- + -- No mask is needed here. Asynchronous exceptions are handled + -- by the outer handlers and cause an abort. So no state is lost. + -- + evalPactOnThread :: PactServiceM logger tbl a -> PactServiceM logger tbl a + evalPactOnThread act = do + e <- ask + s <- get + T2 r s' <- liftIO $ + withAsync (runPactServiceM s e act) wait + put $! s' + return $! r + execNewBlock :: forall logger tbl. (Logger logger, CanReadablePayloadCas tbl) diff --git a/src/Chainweb/Pact/Service/PactInProcApi.hs b/src/Chainweb/Pact/Service/PactInProcApi.hs index 132f70ff91..adbb24420f 100644 --- a/src/Chainweb/Pact/Service/PactInProcApi.hs +++ b/src/Chainweb/Pact/Service/PactInProcApi.hs @@ -67,8 +67,9 @@ withPactService -> (PactQueue -> IO a) -> IO a withPactService ver cid logger mpc bhdb pdb pactDbDir config action = - withSqliteDb cid logger pactDbDir (_pactResetDb config) $ \sqlenv -> - withPactService' ver cid logger mpa bhdb pdb sqlenv config action + withSqliteDb cid logger pactDbDir (_pactResetDb config) $ \writeSqlEnv -> + withSqliteDb cid logger pactDbDir (_pactResetDb config) $ \readSqlEnv -> + withPactService' ver cid logger mpa bhdb pdb (writeSqlEnv, readSqlEnv) config action where mpa = pactMemPoolAccess mpc $ addLabel ("sub-component", "MempoolAccess") logger @@ -84,18 +85,18 @@ withPactService' -> MemPoolAccess -> BlockHeaderDb -> PayloadDb tbl - -> SQLiteEnv + -> (SQLiteEnv, SQLiteEnv) -> PactServiceConfig -> (PactQueue -> IO a) -> IO a -withPactService' ver cid logger memPoolAccess bhDb pdb sqlenv config action = do +withPactService' ver cid logger memPoolAccess bhDb pdb sqlenvs config action = do reqQ <- newPactQueue (_pactQueueSize config) race (concurrently_ (monitor reqQ) (server reqQ)) (action reqQ) >>= \case Left () -> error "Chainweb.Pact.Service.PactInProcApi: pact service terminated unexpectedly" Right a -> return a where server reqQ = runForever logg "pact-service" - $ PS.runPactService ver cid logger reqQ memPoolAccess bhDb pdb sqlenv config + $ PS.runPactService ver cid logger reqQ memPoolAccess bhDb pdb sqlenvs config logg = logFunction logger monitor = runPactServiceQueueMonitor $ addLabel ("sub-component", "PactQueue") logger diff --git a/src/Chainweb/Pact/Service/PactQueue.hs b/src/Chainweb/Pact/Service/PactQueue.hs index b294aff5c4..e7a8199b02 100644 --- a/src/Chainweb/Pact/Service/PactQueue.hs +++ b/src/Chainweb/Pact/Service/PactQueue.hs @@ -17,7 +17,8 @@ -- module Chainweb.Pact.Service.PactQueue ( addRequest -, getNextRequest +, getNextWriteRequest +, getNextReadRequest , getPactQueueStats , newPactQueue , resetPactQueueStats @@ -51,9 +52,11 @@ import Chainweb.Utils -- other requests. -- data PactQueue = PactQueue - { _pactQueueValidateBlock :: !(TBQueue (T2 RequestMsg (Time Micros))) - , _pactQueueNewBlock :: !(TBQueue (T2 RequestMsg (Time Micros))) - , _pactQueueOtherMsg :: !(TBQueue (T2 RequestMsg (Time Micros))) + { _pactQueueWriteRequests :: !(TBQueue (T2 RequestMsg (Time Micros))) + , _pactQueueCloseRequests :: !(TBQueue (T2 RequestMsg (Time Micros))) + -- NewBlock requests are Read-requests as well but prioritize them with their own queue + , _pactQueueNewBlockRequests :: !(TBQueue (T2 RequestMsg (Time Micros))) + , _pactQueueReadRequests :: !(TBQueue (T2 RequestMsg (Time Micros))) , _pactQueuePactQueueValidateBlockMsgCounters :: !(IORef PactQueueCounters) , _pactQueuePactQueueNewBlockMsgCounters :: !(IORef PactQueueCounters) , _pactQueuePactQueueOtherMsgCounters :: !(IORef PactQueueCounters) @@ -72,6 +75,7 @@ newPactQueue sz = PactQueue <$> newTBQueueIO sz <*> newTBQueueIO sz <*> newTBQueueIO sz + <*> newTBQueueIO sz <*> newIORef initPactQueueCounters <*> newIORef initPactQueueCounters <*> newIORef initPactQueueCounters @@ -84,18 +88,20 @@ addRequest q msg = do atomically $ writeTBQueue priority (T2 msg entranceTime) where priority = case msg of - ValidateBlockMsg {} -> _pactQueueValidateBlock q - NewBlockMsg {} -> _pactQueueNewBlock q - _ -> _pactQueueOtherMsg q - --- | Get the next available request from the Pact execution queue + -- Write-requests + ValidateBlockMsg {} -> _pactQueueWriteRequests q + SyncToBlockMsg {} -> _pactQueueWriteRequests q + CloseMsg -> _pactQueueCloseRequests q + -- Read-requests + NewBlockMsg {} -> _pactQueueNewBlockRequests q + _ -> _pactQueueReadRequests q + +-- | Get the next available Write-request from the Pact execution queue -- -getNextRequest :: PactQueue -> IO RequestMsg -getNextRequest q = do - T2 req entranceTime <- atomically - $ tryReadTBQueueOrRetry (_pactQueueValidateBlock q) - <|> tryReadTBQueueOrRetry (_pactQueueNewBlock q) - <|> tryReadTBQueueOrRetry (_pactQueueOtherMsg q) +getNextWriteRequest :: PactQueue -> IO RequestMsg +getNextWriteRequest q = do + T2 req entranceTime <- atomically $ tryReadTBQueueOrRetry (_pactQueueWriteRequests q) + <|> tryReadTBQueueOrRetry (_pactQueueCloseRequests q) requestTime <- diff <$> getCurrentTimeIntegral <*> pure entranceTime updatePactQueueCounters (counters req q) requestTime return req @@ -105,6 +111,23 @@ getNextRequest q = do Just msg -> return msg counters ValidateBlockMsg{} = _pactQueuePactQueueValidateBlockMsgCounters + counters NewBlockMsg{} = error "getNextWriteRequest.counters.impossible" + counters _ = _pactQueuePactQueueOtherMsgCounters + +-- | Get the next available Read-request from the Pact execution queue +getNextReadRequest :: PactQueue -> IO RequestMsg +getNextReadRequest q = do + T2 req entranceTime <- atomically $ tryReadTBQueueOrRetry (_pactQueueNewBlockRequests q) + <|> tryReadTBQueueOrRetry (_pactQueueReadRequests q) + requestTime <- diff <$> getCurrentTimeIntegral <*> pure entranceTime + updatePactQueueCounters (counters req q) requestTime + return req + where + tryReadTBQueueOrRetry = tryReadTBQueue >=> \case + Nothing -> retry + Just msg -> return msg + + counters ValidateBlockMsg{} = error "getNextReadRequest.counters.impossible" counters NewBlockMsg{} = _pactQueuePactQueueNewBlockMsgCounters counters _ = _pactQueuePactQueueOtherMsgCounters diff --git a/test/Chainweb/Test/Pact/ModuleCacheOnRestart.hs b/test/Chainweb/Test/Pact/ModuleCacheOnRestart.hs index ef7be7ce21..d151b2baf9 100644 --- a/test/Chainweb/Test/Pact/ModuleCacheOnRestart.hs +++ b/test/Chainweb/Test/Pact/ModuleCacheOnRestart.hs @@ -296,7 +296,7 @@ withPact' bdbio ioSqlEnv r (ps, cacheTest) tastylog = do let pdb = _bdbPayloadDb bdb sqlEnv <- ioSqlEnv T2 _ pstate <- withPactService - testVer testChainId logger bhdb pdb sqlEnv testPactServiceConfig ps + testVer testChainId logger bhdb pdb sqlEnv testPactServiceConfig True ps cacheTest r (_psInitCache pstate) where logger = genericLogger Quiet (tastylog . T.unpack) diff --git a/test/Chainweb/Test/Pact/PactReplay.hs b/test/Chainweb/Test/Pact/PactReplay.hs index 8a57fa1b73..48afaec99b 100644 --- a/test/Chainweb/Test/Pact/PactReplay.hs +++ b/test/Chainweb/Test/Pact/PactReplay.hs @@ -9,6 +9,7 @@ module Chainweb.Test.Pact.PactReplay where +import Control.Concurrent (threadDelay) import Control.Concurrent.MVar import Control.Monad (forM_, unless, void) import Control.Monad.Catch @@ -193,6 +194,7 @@ serviceInitializationAfterFork mpio genesisBlock iop = do restartPact = do (_, q, _) <- iop addRequest q CloseMsg + threadDelay 1_000_000 pruneDbs = forM_ cids $ \c -> do (_, _, dbs) <- iop diff --git a/test/Chainweb/Test/Pact/PactSingleChainTest.hs b/test/Chainweb/Test/Pact/PactSingleChainTest.hs index f1900b1acd..bf51a2d327 100644 --- a/test/Chainweb/Test/Pact/PactSingleChainTest.hs +++ b/test/Chainweb/Test/Pact/PactSingleChainTest.hs @@ -18,7 +18,7 @@ module Chainweb.Test.Pact.PactSingleChainTest ) where import Control.Arrow ((&&&)) -import Control.Concurrent (forkIO) +import Control.Concurrent (forkIO, threadDelay) import Control.Concurrent.MVar import Control.DeepSeq import Control.Lens hiding ((.=), matching) @@ -271,12 +271,13 @@ rosettaFailsWithoutFullHistory rdb = pactQueue <- newPactQueue 2000 blockDb <- mkTestBlockDb testVersion rdb bhDb <- getWebBlockHeaderDb (_bdbWebBlockHeaderDb blockDb) cid - sqlEnv <- sqlEnvIO + writeSqlEnv <- sqlEnvIO + readSqlEnv <- sqlEnvIO mempool <- fmap snd dm let payloadDb = _bdbPayloadDb blockDb let cfg = testPactServiceConfig { _pactFullHistoryRequired = True } let logger = genericLogger System.LogLevel.Error (\_ -> return ()) - e <- try $ runPactService testVersion cid logger pactQueue mempool bhDb payloadDb sqlEnv cfg + e <- try $ runPactService testVersion cid logger pactQueue mempool bhDb payloadDb (writeSqlEnv, readSqlEnv) cfg case e of Left (FullHistoryRequired {}) -> do pure () @@ -979,7 +980,8 @@ compactionSetup pat rdb pactCfg f = blockDb <- mkTestBlockDb testVersion rdb bhDb <- getWebBlockHeaderDb (_bdbWebBlockHeaderDb blockDb) cid let payloadDb = _bdbPayloadDb blockDb - sqlEnv <- sqlEnvIO + writeSqlEnv <- sqlEnvIO + readSqlEnv <- sqlEnvIO (mempoolRef, mempool) <- do (ref, nonRef) <- dm pure (pure ref, nonRef) @@ -987,14 +989,15 @@ compactionSetup pat rdb pactCfg f = let logger = genericLogger System.LogLevel.Error (\_ -> return ()) - void $ forkIO $ runPactService testVersion cid logger pactQueue mempool bhDb payloadDb sqlEnv pactCfg + void $ forkIO $ runPactService testVersion cid logger pactQueue mempool bhDb payloadDb (writeSqlEnv,readSqlEnv) pactCfg + threadDelay 1_000_000 setOneShotMempool mempoolRef goldenMemPool f $ CompactionResources { mempoolRef = mempoolRef , mempool = mempool - , sqlEnv = sqlEnv + , sqlEnv = writeSqlEnv , pactQueue = pactQueue , blockDb = blockDb } diff --git a/test/Chainweb/Test/Pact/TTL.hs b/test/Chainweb/Test/Pact/TTL.hs index 2173e7e44c..cf03781d51 100644 --- a/test/Chainweb/Test/Pact/TTL.hs +++ b/test/Chainweb/Test/Pact/TTL.hs @@ -242,10 +242,12 @@ doValidateBlock -> IO () doValidateBlock ctxIO header payload = do ctx <- ctxIO - _mv' <- validateBlock header (payloadWithOutputsToPayloadData payload) $ _ctxQueue ctx + mv <- validateBlock header (payloadWithOutputsToPayloadData payload) $ _ctxQueue ctx + void $ assertNotLeft =<< takeMVar mv + addNewPayload (_ctxPdb ctx) (_blockHeight header) payload unsafeInsertBlockHeaderDb (_ctxBdb ctx) header - -- FIXME FIXME FIXME: do at least some checks? + -- FIXME FIXME: do at least some checks? -- -------------------------------------------------------------------------- -- -- Misc Utils diff --git a/test/Chainweb/Test/Pact/Utils.hs b/test/Chainweb/Test/Pact/Utils.hs index 4dd15596a2..a5d57d95c0 100644 --- a/test/Chainweb/Test/Pact/Utils.hs +++ b/test/Chainweb/Test/Pact/Utils.hs @@ -904,13 +904,14 @@ withPactTestBlockDb' version cid rdb sqlEnvIO mempoolIO pactConfig f = startPact bdbio = do reqQ <- newPactQueue 2000 bdb <- bdbio - sqlEnv <- sqlEnvIO + writeSqlEnv <- sqlEnvIO + readSqlEnv <- sqlEnvIO mempool <- mempoolIO bhdb <- getWebBlockHeaderDb (_bdbWebBlockHeaderDb bdb) cid let pdb = _bdbPayloadDb bdb a <- async $ runForever (\_ _ -> return ()) "Chainweb.Test.Pact.Utils.withPactTestBlockDb" $ - runPactService version cid logger reqQ mempool bhdb pdb sqlEnv pactConfig - return (a, (sqlEnv,reqQ,bdb)) + runPactService version cid logger reqQ mempool bhdb pdb (writeSqlEnv, readSqlEnv) pactConfig + return (a, (writeSqlEnv,reqQ,bdb)) stopPact (a, _) = cancel a @@ -963,10 +964,11 @@ withPactTestBlockDb version cid rdb mempoolIO pactConfig f = mempool <- mempoolIO bhdb <- getWebBlockHeaderDb (_bdbWebBlockHeaderDb bdb) cid let pdb = _bdbPayloadDb bdb - sqlEnv <- startSqliteDb cid logger dir False + writeSqlEnv <- startSqliteDb cid logger dir False + readSqlEnv <- startSqliteDb cid logger dir False a <- async $ runForever (\_ _ -> return ()) "Chainweb.Test.Pact.Utils.withPactTestBlockDb" $ - runPactService version cid logger reqQ mempool bhdb pdb sqlEnv pactConfig - return (a, (sqlEnv,reqQ,bdb)) + runPactService version cid logger reqQ mempool bhdb pdb (writeSqlEnv, readSqlEnv) pactConfig + return (a, (writeSqlEnv,reqQ,bdb)) stopPact (a, (sqlEnv, _, _)) = cancel a >> stopSqliteDb sqlEnv diff --git a/tools/ea/Ea.hs b/tools/ea/Ea.hs index bce1bef14c..d5aa58e31d 100644 --- a/tools/ea/Ea.hs +++ b/tools/ea/Ea.hs @@ -179,7 +179,7 @@ genPayloadModule v tag cid cwTxs = pdb <- newPayloadDb withSystemTempDirectory "ea-pact-db" $ \pactDbDir -> do T2 payloadWO _ <- withSqliteDb cid logger pactDbDir False $ \env -> - withPactService v cid logger bhdb pdb env testPactServiceConfig $ + withPactService v cid logger bhdb pdb env testPactServiceConfig True $ execNewGenesisBlock noMiner (V.fromList cwTxs) return $ TL.toStrict $ TB.toLazyText $ payloadModuleCode tag payloadWO