Skip to content

Commit

Permalink
Process PactQueue concurrently with read-only connection.
Browse files Browse the repository at this point in the history
Signed-off-by: Evgenii Akentev <i@ak3n.com>
Change-Id: Ia007a565d75c625ddb243534a546d57584ec8e7d
Evgenii Akentev committed May 28, 2024
1 parent e363858 commit 09d9a79
Showing 27 changed files with 414 additions and 304 deletions.
2 changes: 1 addition & 1 deletion bench/Chainweb/Pact/Backend/Bench.hs
Original file line number Diff line number Diff line change
@@ -147,7 +147,7 @@ cpWithBench torun =
let neverLogger = genericLogger Error (\_ -> return ())
!sqliteEnv <- openSQLiteConnection dbFile chainwebPragmas
!cenv <-
initRelationalCheckpointer defaultModuleCacheLimit sqliteEnv DoNotPersistIntraBlockWrites neverLogger testVer testChainId
initRelationalCheckpointer defaultModuleCacheLimit (SQLiteEnv ReadWrite sqliteEnv) DoNotPersistIntraBlockWrites neverLogger testVer testChainId
return $ NoopNFData (sqliteEnv, cenv)

teardown (NoopNFData (sqliteEnv, _cenv)) = closeSQLiteConnection sqliteEnv
46 changes: 22 additions & 24 deletions bench/Chainweb/Pact/Backend/ForkingBench.hs
Original file line number Diff line number Diff line change
@@ -87,7 +87,7 @@ import Chainweb.Pact.Backend.Compaction qualified as C
import Chainweb.Pact.Backend.Types
import Chainweb.Pact.Backend.Utils
import Chainweb.Pact.PactService
import Chainweb.Pact.Service.BlockValidation
import Chainweb.Pact.Service.BlockValidation as BlockValidation
import Chainweb.Pact.Service.PactQueue
import Chainweb.Pact.Service.Types
import Chainweb.Pact.Types
@@ -264,7 +264,8 @@ data Resources
, coinAccounts :: !(MVar (Map Account (NonEmpty (DynKeyPair, [SigCapability]))))
, nonceCounter :: !(IORef Word64)
, txPerBlock :: !(IORef Int)
, sqlEnv :: !SQLiteEnv
, writeSqlEnv :: !Database
, readSqlEnv :: !Database
}

type RunPactService =
@@ -296,47 +297,39 @@ withResources rdb trunkLength logLevel compact p f = C.envWithCleanup create des
coinAccounts <- newMVar mempty
nonceCounter <- newIORef 1
txPerBlock <- newIORef 10
sqlEnv <- openSQLiteConnection "" {- temporary SQLite db -} chainwebBenchPragmas

writeSqlEnv <- openSQLiteConnection "" {- temporary SQLite db -} chainwebPragmas
readSqlEnv <- openSQLiteConnection "" {- temporary SQLite db -} chainwebPragmas
mp <- testMemPoolAccess txPerBlock coinAccounts
pactService <-
startPact testVer logger blockHeaderDb payloadDb mp sqlEnv
startPact testVer logger blockHeaderDb payloadDb mp (writeSqlEnv, readSqlEnv)
mainTrunkBlocks <-
playLine payloadDb blockHeaderDb trunkLength genesisBlock (snd pactService) nonceCounter
when (compact == DoCompact) $ do
C.withDefaultLogger Error $ \lgr -> do
void $ C.compact (BlockHeight trunkLength) lgr sqlEnv []
void $ C.compact (BlockHeight trunkLength) lgr writeSqlEnv []

return $ NoopNFData $ Resources {..}

destroy (NoopNFData (Resources {..})) = do
stopPact pactService
stopSqliteDb sqlEnv
stopSqliteDb writeSqlEnv
stopSqliteDb readSqlEnv

pactQueueSize = 2000

logger = genericLogger logLevel T.putStrLn

startPact version l bhdb pdb mempool sqlEnv = do
startPact version l bhdb pdb mempool sqlEnvs = do
reqQ <- newPactQueue pactQueueSize
a <- async $ runPactService version cid l reqQ mempool bhdb pdb sqlEnv testPactServiceConfig
a <- async $ runPactService version cid l reqQ mempool bhdb pdb sqlEnvs testPactServiceConfig
{ _pactBlockGasLimit = 180_000
, _pactPersistIntraBlockWrites = p
}

return (a, reqQ)

stopPact (a, _) = cancel a

chainwebBenchPragmas =
[ "synchronous = NORMAL"
, "journal_mode = WAL"
, "locking_mode = EXCLUSIVE"
-- this is different from the prodcution database that uses @NORMAL@
, "temp_store = MEMORY"
, "auto_vacuum = NONE"
, "page_size = 1024"
]

genesisBlock :: BlockHeader
genesisBlock = genesisBlockHeader testVer cid

@@ -373,7 +366,7 @@ testMemPoolAccess txsPerBlock accounts = do
getTestBlock mVarAccounts txOrigTime validate bHeight hash
| bHeight == 1 = do
meta <- setTime txOrigTime <$> makeMeta cid
(as, kss, cmds) <- unzip3 . toList <$> createCoinAccounts testVer meta
(as, kss, cmds) <- unzip3 <$> createCoinAccounts testVer meta twoNames
case traverse validateCommand cmds of
Left err -> throwM $ userError err
Right !r -> do
@@ -468,15 +461,20 @@ stockKey s = do
stockKeyFile :: ByteString
stockKeyFile = $(embedFile "pact/genesis/devnet/keys.yaml")

createCoinAccounts :: ChainwebVersion -> PublicMeta -> IO (NonEmpty (Account, NonEmpty (DynKeyPair, [SigCapability]), Command Text))
createCoinAccounts v meta = traverse (go <*> createCoinAccount v meta) names
createCoinAccounts :: ChainwebVersion -> PublicMeta -> [String] -> IO [(Account, NonEmpty (DynKeyPair, [SigCapability]), Command Text)]
createCoinAccounts v meta names' = traverse (go <*> createCoinAccount v meta) names'
where
go a m = do
(b,c) <- m
return (Account a,b,c)

names :: NonEmpty String
names = NEL.map safeCapitalize . NEL.fromList $ Prelude.take 2 $ words "mary elizabeth patricia jennifer linda barbara margaret susan dorothy jessica james john robert michael william david richard joseph charles thomas"
twoNames :: [String]
twoNames = take 2 names

names :: [String]
names = map safeCapitalize $ names' ++ [(n ++ show x) | n <- names', x <- [0 :: Int ..1000]]
where
names' = words "mary elizabeth patricia jennifer linda barbara margaret susan dorothy jessica james john robert michael william david richard joseph charles thomas"

formatB16PubKey :: DynKeyPair -> Text
formatB16PubKey = \case
1 change: 0 additions & 1 deletion src/Chainweb/Chainweb.hs
Original file line number Diff line number Diff line change
@@ -362,7 +362,6 @@ withChainwebInternal
-> (StartedChainweb logger -> IO ())
-> IO ()
withChainwebInternal conf logger peer serviceSock rocksDb pactDbDir backupDir resetDb inner = do

unless (_configOnlySyncPact conf || _configReadOnlyReplay conf) $
initializePayloadDb v payloadDb

12 changes: 6 additions & 6 deletions src/Chainweb/Pact/Backend/ChainwebPactDb.hs
Original file line number Diff line number Diff line change
@@ -646,7 +646,7 @@ createVersionedTable tablename db = do

-- | Delete any state from the database newer than the input parent header.
rewindDbTo
:: SQLiteEnv
:: Database
-> Maybe ParentHeader
-> IO ()
rewindDbTo db Nothing = rewindDbToGenesis db
@@ -656,7 +656,7 @@ rewindDbTo db mh@(Just (ParentHeader ph)) = do

-- rewind before genesis, delete all user tables and all rows in all tables
rewindDbToGenesis
:: SQLiteEnv
:: Database
-> IO ()
rewindDbToGenesis db = do
exec_ db "DELETE FROM BlockHistory;"
@@ -742,7 +742,7 @@ rewindDbToBlock db bh endingTxId = do
exec' db "DELETE FROM TransactionIndex WHERE blockheight > ?;"
[ SInt (fromIntegral bh) ]

commitBlockStateToDatabase :: SQLiteEnv -> BlockHash -> BlockHeight -> BlockState -> IO ()
commitBlockStateToDatabase :: Database -> BlockHash -> BlockHeight -> BlockState -> IO ()
commitBlockStateToDatabase db hsh bh blockState = do
let newTables = _pendingTableCreation $ _bsPendingBlock blockState
mapM_ (\tn -> createUserTable (Utf8 tn)) newTables
@@ -808,7 +808,7 @@ commitBlockStateToDatabase db hsh bh blockState = do


-- | Create all tables that exist pre-genesis
initSchema :: (Logger logger) => logger -> SQLiteEnv -> IO ()
initSchema :: (Logger logger) => logger -> Database -> IO ()
initSchema logger sql =
withSavepoint sql DbTransaction $ do
createBlockHistoryTable
@@ -860,12 +860,12 @@ initSchema logger sql =
"CREATE INDEX IF NOT EXISTS \
\ transactionIndexByBH ON TransactionIndex(blockheight)";

getEndTxId :: Text -> SQLiteEnv -> Maybe ParentHeader -> IO TxId
getEndTxId :: Text -> Database -> Maybe ParentHeader -> IO TxId
getEndTxId msg sql pc = case pc of
Nothing -> return 0
Just (ParentHeader ph) -> getEndTxId' msg sql (_blockHeight ph) (_blockHash ph)

getEndTxId' :: Text -> SQLiteEnv -> BlockHeight -> BlockHash -> IO TxId
getEndTxId' :: Text -> Database -> BlockHeight -> BlockHash -> IO TxId
getEndTxId' msg sql bh bhsh = do
r <- qry sql
"SELECT endingtxid FROM BlockHistory WHERE blockheight = ? and hash = ?;"
3 changes: 1 addition & 2 deletions src/Chainweb/Pact/Backend/PactState.hs
Original file line number Diff line number Diff line change
@@ -68,7 +68,6 @@ import Database.SQLite3.Direct qualified as SQL

import Chainweb.BlockHeight (BlockHeight(..))
import Chainweb.Logger (Logger, addLabel)
import Chainweb.Pact.Backend.Types (SQLiteEnv)
import Chainweb.Pact.Backend.Utils (fromUtf8, withSqliteDb)
import Chainweb.Utils (int)
import Chainweb.Version (ChainId, ChainwebVersion, chainIdToText)
@@ -140,7 +139,7 @@ withChainDb :: (Logger logger)
=> ChainId
-> logger
-> FilePath
-> (logger -> SQLiteEnv -> IO x)
-> (logger -> Database -> IO x)
-> IO x
withChainDb cid logger' path f = do
let logger = addChainIdLabel cid logger'
4 changes: 2 additions & 2 deletions src/Chainweb/Pact/Backend/PactState/GrandHash/Calc.hs
Original file line number Diff line number Diff line change
@@ -32,7 +32,7 @@ import Chainweb.Pact.Backend.PactState.EmbeddedSnapshot (Snapshot(..))
import Chainweb.Pact.Backend.PactState.EmbeddedSnapshot.Mainnet qualified as MainnetSnapshot
import Chainweb.Pact.Backend.PactState.GrandHash.Algorithm (ChainGrandHash(..))
import Chainweb.Pact.Backend.PactState.GrandHash.Utils (resolveLatestCutHeaders, resolveCutHeadersAtHeights, computeGrandHashesAt, withConnections, hex, rocksParser, cwvParser)
import Chainweb.Pact.Backend.Types (SQLiteEnv)
import Chainweb.Pact.Backend.Types (Database)
import Chainweb.Storage.Table.RocksDB (RocksDb, withReadOnlyRocksDb, modernDefaultOptions)
import Chainweb.Utils (sshow)
import Chainweb.Version (ChainwebVersion(..), ChainwebVersionName(..))
@@ -75,7 +75,7 @@ data BlockHeightTargets
pactCalc :: (Logger logger)
=> logger
-> ChainwebVersion
-> HashMap ChainId SQLiteEnv
-> HashMap ChainId Database
-- ^ pact database dir
-> RocksDb
-- ^ rocksdb dir
6 changes: 3 additions & 3 deletions src/Chainweb/Pact/Backend/PactState/GrandHash/Import.hs
Original file line number Diff line number Diff line change
@@ -62,7 +62,7 @@ import Chainweb.Pact.Backend.PactState.EmbeddedSnapshot (Snapshot(..))
import Chainweb.Pact.Backend.PactState.EmbeddedSnapshot.Mainnet qualified as MainnetSnapshots
import Chainweb.Pact.Backend.PactState.GrandHash.Utils (resolveLatestCutHeaders, resolveCutHeadersAtHeight, computeGrandHashesAt, exitLog, withConnections, chainwebDbFilePath, rocksParser, cwvParser)
import Chainweb.Pact.Backend.RelationalCheckpointer (withProdRelationalCheckpointer)
import Chainweb.Pact.Backend.Types (SQLiteEnv, _cpRewindTo)
import Chainweb.Pact.Backend.Types (Database, _cpRewindTo, SQLiteEnv(..), SQLiteConnectionType(..))
import Chainweb.Pact.Service.Types (IntraBlockPersistence(..))
import Chainweb.Pact.Types (defaultModuleCacheLimit)
import Chainweb.Storage.Table.RocksDB (RocksDb, withReadOnlyRocksDb, modernDefaultOptions)
@@ -93,7 +93,7 @@ import System.LogLevel (LogLevel(..))
pactVerify :: (Logger logger)
=> logger
-> ChainwebVersion
-> HashMap ChainId SQLiteEnv
-> HashMap ChainId Database
-- ^ pact connections
-> RocksDb
-- ^ rocksDb
@@ -187,7 +187,7 @@ pactDropPostVerified logger v srcDir tgtDir snapshotBlockHeight snapshotChainHas
let logger' = addChainIdLabel cid logger
logFunctionText logger' Info
$ "Dropping anything post verified state (BlockHeight " <> sshow snapshotBlockHeight <> ")"
withProdRelationalCheckpointer logger defaultModuleCacheLimit sqliteEnv DoNotPersistIntraBlockWrites v cid $ \cp -> do
withProdRelationalCheckpointer logger defaultModuleCacheLimit (SQLiteEnv ReadWrite sqliteEnv) DoNotPersistIntraBlockWrites v cid $ \cp -> do
_cpRewindTo cp (Just $ ParentHeader $ blockHeader $ snapshotChainHashes ^?! ix cid)

data PactImportConfig = PactImportConfig
18 changes: 9 additions & 9 deletions src/Chainweb/Pact/Backend/PactState/GrandHash/Utils.hs
Original file line number Diff line number Diff line change
@@ -33,7 +33,7 @@ import Chainweb.Logger (Logger, logFunctionText)
import Chainweb.Pact.Backend.PactState (getLatestPactStateAt, getLatestBlockHeight, addChainIdLabel)
import Chainweb.Pact.Backend.PactState.EmbeddedSnapshot (Snapshot(..))
import Chainweb.Pact.Backend.PactState.GrandHash.Algorithm (computeGrandHash)
import Chainweb.Pact.Backend.Types (SQLiteEnv)
import Chainweb.Pact.Backend.Types (Database)
import Chainweb.Pact.Backend.Utils (startSqliteDb, stopSqliteDb)
import Chainweb.Storage.Table.RocksDB (RocksDb)
import Chainweb.TreeDB (seekAncestor)
@@ -69,7 +69,7 @@ limitCut :: (Logger logger)
=> logger
-> WebBlockHeaderDb
-> HashMap ChainId BlockHeader -- ^ latest cut headers
-> HashMap ChainId SQLiteEnv
-> HashMap ChainId Database
-> BlockHeight
-> IO (HashMap ChainId BlockHeader)
limitCut logger wbhdb latestCutHeaders pactConns blockHeight = do
@@ -117,7 +117,7 @@ getLatestCutHeaders v rocksDb = do
resolveLatestCutHeaders :: (Logger logger)
=> logger
-> ChainwebVersion
-> HashMap ChainId SQLiteEnv
-> HashMap ChainId Database
-> RocksDb
-> IO (BlockHeight, HashMap ChainId BlockHeader)
resolveLatestCutHeaders logger v pactConns rocksDb = do
@@ -131,7 +131,7 @@ resolveLatestCutHeaders logger v pactConns rocksDb = do
resolveCutHeadersAtHeight :: (Logger logger)
=> logger
-> ChainwebVersion
-> HashMap ChainId SQLiteEnv
-> HashMap ChainId Database
-> RocksDb
-> BlockHeight
-> IO (HashMap ChainId BlockHeader)
@@ -146,7 +146,7 @@ resolveCutHeadersAtHeight logger v pactConns rocksDb target = do
resolveCutHeadersAtHeights :: (Logger logger)
=> logger
-> ChainwebVersion
-> HashMap ChainId SQLiteEnv
-> HashMap ChainId Database
-> RocksDb
-> [BlockHeight] -- ^ targets
-> IO [(BlockHeight, HashMap ChainId BlockHeader)]
@@ -159,7 +159,7 @@ resolveCutHeadersAtHeights logger v pactConns rocksDb targets = do
-- a 'BlockHeader' with the computed 'ChainGrandHash' at the header's
-- 'BlockHeight'.
computeGrandHashesAt :: ()
=> HashMap ChainId SQLiteEnv
=> HashMap ChainId Database
-- ^ pact connections
-> HashMap ChainId BlockHeader
-- ^ Resolved targets, i.e, blockheights that are accessible per each
@@ -202,17 +202,17 @@ withConnections :: (Logger logger)
=> logger
-> FilePath
-> [ChainId]
-> (HashMap ChainId SQLiteEnv -> IO x)
-> (HashMap ChainId Database -> IO x)
-> IO x
withConnections logger pactDir cids f = do
checkPactDbsExist pactDir cids
bracket openConnections closeConnections f
where
openConnections :: IO (HashMap ChainId SQLiteEnv)
openConnections :: IO (HashMap ChainId Database)
openConnections = fmap HM.fromList $ forM cids $ \cid -> do
(cid, ) <$> startSqliteDb cid logger pactDir False

closeConnections :: HashMap ChainId SQLiteEnv -> IO ()
closeConnections :: HashMap ChainId Database -> IO ()
closeConnections = mapM_ stopSqliteDb

hex :: ByteString -> Text
Loading

0 comments on commit 09d9a79

Please sign in to comment.