Skip to content

Commit

Permalink
Redo Pact 5 insertion cache to contain decoded Pact values
Browse files Browse the repository at this point in the history
  • Loading branch information
edmundnoble committed Jan 17, 2025
1 parent 518e3a3 commit 7c0eb95
Show file tree
Hide file tree
Showing 12 changed files with 475 additions and 249 deletions.
1 change: 1 addition & 0 deletions chainweb.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,7 @@ library
, Chainweb.Pact.Backend.PactState.GrandHash.Utils
, Chainweb.Pact.Backend.SQLite.DirectV2
, Chainweb.Pact.Backend.SQLite.V2
, Chainweb.Pact.Backend.InMemDb
, Chainweb.Pact.Backend.Types
, Chainweb.Pact.Backend.Utils
, Chainweb.Pact.Conversion
Expand Down
117 changes: 117 additions & 0 deletions src/Chainweb/Pact/Backend/InMemDb.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
{-# LANGUAGE ImportQualifiedPost #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE StandaloneDeriving #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeApplications #-}
{-# LANGUAGE PartialTypeSignatures #-}
{-# OPTIONS_GHC -Wno-partial-type-signatures #-}
{-# LANGUAGE RecordWildCards #-}

-- | This type includes both pending writes and cached reads from the Pact state
-- in sqlite.
module Chainweb.Pact.Backend.InMemDb
( Store(..)
, Entry(..)
, empty
, insert
, lookup
, keys
, merge
) where

import Prelude hiding (lookup)
import Data.ByteString (ByteString)
import Data.Map.Strict(Map)
import Data.Map.Strict qualified as Map

import Pact.Core.Persistence
import Pact.Core.Builtin
import Pact.Core.Evaluate
import Pact.Core.Guards
import Pact.Core.Names
import Pact.Core.Namespace
import Pact.Core.DefPacts.Types
import Pact.Core.IR.Term (ModuleCode)

data Entry a
= ReadEntry !ByteString !a
-- WriteEntry bytestring could be intentionally lazy, as most of the time
-- we don't need this until we commit to the db. However, encoding these is
-- gassed, and thus cannot be done lazily.
| WriteEntry !TxId !ByteString !a
deriving (Show, Eq)

data Store = Store
-- TODO: hashmap instead of map? Or maybe an intmap?
{ userTables :: Map TableName (Map RowKey (Entry RowData))
, keySets :: Map KeySetName (Entry KeySet)
, modules :: Map ModuleName (Entry (ModuleData CoreBuiltin Info))
, namespaces :: Map NamespaceName (Entry Namespace)
, defPacts :: Map DefPactId (Entry (Maybe DefPactExec))
, moduleSources :: Map HashedModuleName (Entry ModuleCode)
}
deriving (Show, Eq)

empty :: Store
empty = Store mempty mempty mempty mempty mempty mempty

insert
:: forall k v
. Domain k v CoreBuiltin Info
-> k -> Entry v -> Store -> Store
insert d k v Store {..} = case d of
DUserTables tn -> Store
{ userTables =
Map.insertWith
(\new old -> mergeEntries old new)
tn (Map.singleton k v) userTables
, ..}
DKeySets -> Store {keySets = insertProperlyInto keySets, ..}
DModules -> Store {modules = insertProperlyInto modules, ..}
DNamespaces -> Store {namespaces = insertProperlyInto namespaces, ..}
DDefPacts -> Store {defPacts = insertProperlyInto defPacts, ..}
DModuleSource -> Store {moduleSources = insertProperlyInto moduleSources, ..}
where
insertProperlyInto :: Ord k => Map k (Entry v) -> Map k (Entry v)
insertProperlyInto m = Map.insertWith takeLatestEntry k v m

lookup
:: Domain k v CoreBuiltin Info
-> k -> Store -> Maybe (Entry v)
lookup d k Store {..} = case d of
DUserTables tn -> Map.lookup tn userTables >>= Map.lookup k
DKeySets -> Map.lookup k keySets
DModules -> Map.lookup k modules
DNamespaces -> Map.lookup k namespaces
DDefPacts -> Map.lookup k defPacts
DModuleSource -> Map.lookup k moduleSources

keys :: Domain k v CoreBuiltin Info -> Store -> [k]
keys d Store {..} = case d of
DUserTables tn -> maybe [] Map.keys $ Map.lookup tn userTables
DKeySets -> Map.keys keySets
DModules -> Map.keys modules
DNamespaces -> Map.keys namespaces
DDefPacts -> Map.keys defPacts
DModuleSource -> Map.keys moduleSources

merge :: Store -> Store -> Store
merge old new = Store
{ keySets = mergeEntries (keySets old) (keySets new)
, modules = mergeEntries (modules old) (modules new)
, namespaces = mergeEntries (namespaces old) (namespaces new)
, defPacts = mergeEntries (defPacts old) (defPacts new)
, moduleSources = mergeEntries (moduleSources old) (moduleSources new)
, userTables = Map.unionWith mergeEntries (userTables old) (userTables new)
}

mergeEntries :: Ord k => Map k (Entry a) -> Map k (Entry a) -> Map k (Entry a)
mergeEntries oldMap newMap =
Map.unionWith takeLatestEntry oldMap newMap

takeLatestEntry :: Entry a -> Entry a -> Entry a
takeLatestEntry ReadEntry {} newEntry = newEntry
-- should be impossible. if we wrote before,
-- we would never overwrite with a read.
takeLatestEntry oldEntry ReadEntry {} = oldEntry
takeLatestEntry _ newEntry = newEntry
40 changes: 30 additions & 10 deletions src/Chainweb/Pact/Backend/Types.hs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,13 @@
{-# LANGUAGE KindSignatures #-}
{-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE PolyKinds #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ExistentialQuantification #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE StandaloneDeriving #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE TypeFamilyDependencies #-}

module Chainweb.Pact.Backend.Types
( Checkpointer(..)
Expand All @@ -20,7 +27,8 @@ module Chainweb.Pact.Backend.Types
, BlockHandle(..)
, blockHandleTxId
, blockHandlePending
, emptyBlockHandle
, emptyPact4BlockHandle
, emptyPact5BlockHandle
, SQLitePendingData(..)
, emptySQLitePendingData
, pendingWrites
Expand All @@ -32,6 +40,7 @@ module Chainweb.Pact.Backend.Types
, _Historical
, _NoHistory
, PactDbFor
, PendingWrites
) where

import Control.Lens
Expand All @@ -49,6 +58,7 @@ import Data.HashSet (HashSet)
import Data.HashMap.Strict (HashMap)
import Data.List.NonEmpty (NonEmpty)
import Control.DeepSeq (NFData)
import qualified Chainweb.Pact.Backend.InMemDb as InMemDb

-- | Whether we write rows to the database that were already overwritten
-- in the same block.
Expand Down Expand Up @@ -114,9 +124,9 @@ type SQLitePendingWrites = HashMap ByteString (HashMap ByteString (NonEmpty SQLi
-- these; one for the block as a whole, and one for any pending pact
-- transaction. Upon pact transaction commit, the two 'SQLitePendingData'
-- values are merged together.
data SQLitePendingData = SQLitePendingData
data SQLitePendingData w = SQLitePendingData
{ _pendingTableCreation :: !SQLitePendingTableCreations
, _pendingWrites :: !SQLitePendingWrites
, _pendingWrites :: !w
-- See Note [TxLogs in SQLitePendingData]
, _pendingTxLogMap :: !TxLogMap
, _pendingSuccessfulTxs :: !SQLitePendingSuccessfulTxs
Expand All @@ -125,18 +135,28 @@ data SQLitePendingData = SQLitePendingData

makeLenses ''SQLitePendingData

emptySQLitePendingData :: SQLitePendingData
emptySQLitePendingData = SQLitePendingData mempty mempty mempty mempty
type family PendingWrites (pv :: PactVersion) = w | w -> pv where
PendingWrites Pact4 = SQLitePendingWrites
PendingWrites Pact5 = InMemDb.Store

data BlockHandle = BlockHandle
emptySQLitePendingData :: w -> SQLitePendingData w
emptySQLitePendingData w = SQLitePendingData mempty w mempty mempty

data BlockHandle (pv :: PactVersion) = BlockHandle
{ _blockHandleTxId :: !Pact4.TxId
, _blockHandlePending :: !SQLitePendingData
, _blockHandlePending :: !(SQLitePendingData (PendingWrites pv))
}
deriving (Eq, Show)
deriving instance Eq (BlockHandle Pact4)
deriving instance Eq (BlockHandle Pact5)
deriving instance Show (BlockHandle Pact4)
deriving instance Show (BlockHandle Pact5)
makeLenses ''BlockHandle

emptyBlockHandle :: Pact4.TxId -> BlockHandle
emptyBlockHandle txid = BlockHandle txid emptySQLitePendingData
emptyPact4BlockHandle :: Pact4.TxId -> BlockHandle Pact4
emptyPact4BlockHandle txid = BlockHandle txid (emptySQLitePendingData mempty)

emptyPact5BlockHandle :: Pact4.TxId -> BlockHandle Pact5
emptyPact5BlockHandle txid = BlockHandle txid (emptySQLitePendingData InMemDb.empty)

-- | The result of a historical lookup which might fail to even find the
-- header the history is being queried for.
Expand Down
80 changes: 3 additions & 77 deletions src/Chainweb/Pact/Backend/Utils.hs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@

{-# OPTIONS_GHC -fno-warn-orphans #-}
{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE TupleSections #-}
{-# LANGUAGE DataKinds #-}

-- |
-- Module: Chainweb.Pact.ChainwebPactDb
Expand All @@ -27,7 +30,6 @@ module Chainweb.Pact.Backend.Utils
, chainDbFileName
-- * Shared Pact database interactions
, doLookupSuccessful
, commitBlockStateToDatabase
, createVersionedTable
, tbl
, initSchema
Expand Down Expand Up @@ -346,82 +348,6 @@ sqlite_open_readwrite = 0x00000002
sqlite_open_create = 0x00000004
sqlite_open_fullmutex = 0x00010000

commitBlockStateToDatabase :: SQLiteEnv -> BlockHash -> BlockHeight -> BlockHandle -> IO ()
commitBlockStateToDatabase db hsh bh blockHandle = do
let newTables = _pendingTableCreation $ _blockHandlePending blockHandle
mapM_ (\tn -> createUserTable (Utf8 tn)) newTables
let writeV = toChunks $ _pendingWrites (_blockHandlePending blockHandle)
backendWriteUpdateBatch writeV
indexPendingPactTransactions
let nextTxId = _blockHandleTxId blockHandle
blockHistoryInsert nextTxId
where
toChunks writes =
over _2 (concatMap toList . HashMap.elems) .
over _1 Utf8 <$> HashMap.toList writes

backendWriteUpdateBatch
:: [(Utf8, [SQLiteRowDelta])]
-> IO ()
backendWriteUpdateBatch writesByTable = mapM_ writeTable writesByTable
where
prepRow (SQLiteRowDelta _ txid rowkey rowdata) =
[ Pact4.SText (Utf8 rowkey)
, Pact4.SInt (fromIntegral txid)
, Pact4.SBlob rowdata
]

writeTable (tableName, writes) = do
execMulti db q (map prepRow writes)
markTableMutation tableName bh
where
q = "INSERT OR REPLACE INTO " <> tbl tableName <> "(rowkey,txid,rowdata) VALUES(?,?,?)"

-- Mark the table as being mutated during this block, so that we know
-- to delete from it if we rewind past this block.
markTableMutation tablename blockheight = do
Pact4.exec' db mutq [Pact4.SText tablename, Pact4.SInt (fromIntegral blockheight)]
where
mutq = "INSERT OR IGNORE INTO VersionedTableMutation VALUES (?,?);"

-- | Record a block as being in the history of the checkpointer.
blockHistoryInsert :: Pact4.TxId -> IO ()
blockHistoryInsert t =
Pact4.exec' db stmt
[ Pact4.SInt (fromIntegral bh)
, Pact4.SBlob (runPutS (encodeBlockHash hsh))
, Pact4.SInt (fromIntegral t)
]
where
stmt =
"INSERT INTO BlockHistory ('blockheight','hash','endingtxid') VALUES (?,?,?);"

createUserTable :: Utf8 -> IO ()
createUserTable tablename = do
createVersionedTable tablename db
markTableCreation tablename

-- Mark the table as being created during this block, so that we know
-- to drop it if we rewind past this block.
markTableCreation tablename =
Pact4.exec' db insertstmt insertargs
where
insertstmt = "INSERT OR IGNORE INTO VersionedTableCreation VALUES (?,?)"
insertargs = [Pact4.SText tablename, Pact4.SInt (fromIntegral bh)]

-- | Commit the index of pending successful transactions to the database
indexPendingPactTransactions :: IO ()
indexPendingPactTransactions = do
let txs = _pendingSuccessfulTxs $ _blockHandlePending blockHandle
dbIndexTransactions txs

where
toRow b = [Pact4.SBlob b, Pact4.SInt (fromIntegral bh)]
dbIndexTransactions txs = do
let rows = map toRow $ toList txs
execMulti db "INSERT INTO TransactionIndex (txhash, blockheight) \
\ VALUES (?, ?)" rows

tbl :: HasCallStack => Utf8 -> Utf8
tbl t@(Utf8 b)
| B8.elem ']' b = error $ "Chainweb.Pact4.Backend.ChainwebPactDb: Code invariant violation. Illegal SQL table name " <> sshow b <> ". Please report this as a bug."
Expand Down
2 changes: 1 addition & 1 deletion src/Chainweb/Pact/PactService.hs
Original file line number Diff line number Diff line change
Expand Up @@ -1223,7 +1223,7 @@ execPreInsertCheckReq txs = pactLabel "execPreInsertCheckReq" $ do
=> logger
-> ParentHeader
-> Pact5.Pact5Db
-> BlockHandle
-> BlockHandle Pact5
-> Miner
-> Pact5.Transaction
-> ExceptT InsertError IO ()
Expand Down
14 changes: 6 additions & 8 deletions src/Chainweb/Pact/PactService/Checkpointer/Internal.hs
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ readFrom
=> Checkpointer logger
-> Maybe ParentHeader
-> PactVersionT pv
-> (PactDbFor logger pv -> BlockHandle -> IO a)
-> (PactDbFor logger pv -> BlockHandle pv -> IO a)
-> IO (Historical a)
readFrom res maybeParent pactVersion doRead = do
let currentHeight = case maybeParent of
Expand Down Expand Up @@ -193,7 +193,7 @@ readFrom res maybeParent pactVersion doRead = do
pactDb
| parentIsLatestHeader = Pact4.chainwebPactDb
| otherwise = Pact4.rewoundPactDb currentHeight startTxId
r <- doRead (mkBlockDbEnv pactDb) (emptyBlockHandle startTxId)
r <- doRead (mkBlockDbEnv pactDb) (emptyPact4BlockHandle startTxId)
finalCache <- Pact4._bsModuleCache . Pact4._benvBlockState <$> readMVar newDbEnv
return (r, finalCache)

Expand All @@ -215,14 +215,13 @@ readFrom res maybeParent pactVersion doRead = do
, Pact5._blockHandlerChainId = res.cpChainId
, Pact5._blockHandlerBlockHeight = currentHeight
, Pact5._blockHandlerMode = Pact5.Transactional
, Pact5._blockHandlerPersistIntraBlockWrites = DoNotPersistIntraBlockWrites
}
let upperBound
| parentIsLatestHeader = Nothing
| otherwise = Just (currentHeight, coerce @Pact4.TxId @Pact5.TxId startTxId)
let pactDb
= Pact5.chainwebPactBlockDb upperBound blockHandlerEnv
r <- doRead pactDb (emptyBlockHandle startTxId)
r <- doRead pactDb (emptyPact5BlockHandle startTxId)
return (r, sharedModuleCache)
| otherwise ->
internalError $
Expand Down Expand Up @@ -345,7 +344,7 @@ restoreAndSave res rewindParent blocks = do
<> sshow (view blockHeight ph) <> ", child height " <> sshow (view blockHeight newBh)
_ -> return ()
-- persist any changes to the database
PactDb.commitBlockStateToDatabase res.cpSql
Pact4.commitBlockStateToDatabase res.cpSql
(view blockHash newBh) (view blockHeight newBh)
(BlockHandle (Pact4._bsTxId nextState) (Pact4._bsPendingBlock nextState))
return (m'', Just (ParentHeader newBh), nextTxId, nextModuleCache)
Expand All @@ -359,11 +358,10 @@ restoreAndSave res rewindParent blocks = do
, Pact5._blockHandlerBlockHeight = bh
, Pact5._blockHandlerChainId = res.cpChainId
, Pact5._blockHandlerMode = Pact5.Transactional
, Pact5._blockHandlerPersistIntraBlockWrites = res.cpIntraBlockPersistence
}
pactDb = Pact5.chainwebPactBlockDb Nothing blockEnv
-- run the block
((m', nextBlockHeader), blockHandle) <- runBlock pactDb maybeParent (emptyBlockHandle txid)
((m', nextBlockHeader), blockHandle) <- runBlock pactDb maybeParent (emptyPact5BlockHandle txid)
-- compute the accumulator early
let !m'' = m <> m'
case maybeParent of
Expand All @@ -375,7 +373,7 @@ restoreAndSave res rewindParent blocks = do
"doRestoreAndSave: non-genesis block should be one higher than its parent. parent at "
<> sshow (view blockHeight ph) <> ", child height " <> sshow (view blockHeight nextBlockHeader)
_ -> return ()
PactDb.commitBlockStateToDatabase res.cpSql
Pact5.commitBlockStateToDatabase res.cpSql
(view blockHash nextBlockHeader) (view blockHeight nextBlockHeader)
blockHandle

Expand Down
Loading

0 comments on commit 7c0eb95

Please sign in to comment.