Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
<!--
A new scriv changelog fragment.

Uncomment the section that is right (remove the HTML comment wrapper).
For top level release notes, leave all the headers commented out.
-->

### Breaking

- generalized mempool `getWriter` to validate tx's in a given context
and returning user defined validation errors


<!--
### Non-Breaking

- A bullet item for the Non-Breaking category.

-->
Original file line number Diff line number Diff line change
@@ -1,40 +1,49 @@
{-# LANGUAGE DerivingStrategies #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE StandaloneDeriving #-}
{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE BlockArguments #-}
{-# LANGUAGE DerivingStrategies #-}
{-# LANGUAGE DisambiguateRecordFields #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TupleSections #-}
{-# LANGUAGE TypeFamilies #-}

-- | The module should be imported qualified.
--
module Ouroboros.Network.TxSubmission.Mempool.Simple
( Mempool (..)
( TxValidationFail
, Mempool (..)
, MempoolSeq (..)
, MempoolWriter (..)
, empty
, new
, read
, getReader
, getWriter
, writerAdapter
) where

import Prelude hiding (read, seq)

import Control.Concurrent.Class.MonadSTM.Strict
import Control.Monad (when)
import Control.Exception (assert)
import Control.Monad ((>=>))
import Control.Monad.Class.MonadThrow

import Data.Bifunctor (bimap)
import Data.Either (partitionEithers)
import Control.Monad.Trans.Except
import Data.Bifunctor (bimap, first, second)
import Data.Either
import Data.Foldable (toList)
import Data.Foldable qualified as Foldable
import Data.Function (on)
import Data.List (find, nubBy)
import Data.List (find)
import Data.Maybe (isJust)
import Data.Sequence (Seq)
import Data.Sequence qualified as Seq
import Data.Set (Set)
import Data.Set qualified as Set
import Data.Typeable (Typeable)

import Ouroboros.Network.Protocol.LocalTxSubmission.Type (SubmitResult (..))
import Ouroboros.Network.SizeInBytes
import Ouroboros.Network.TxSubmission.Inbound.V2.Types
import Ouroboros.Network.TxSubmission.Mempool.Reader
Expand Down Expand Up @@ -106,68 +115,101 @@ getReader getTxId getTxSize (Mempool mempool) =
f idx tx = (getTxId tx, idx, getTxSize tx)


data InvalidTxsError where
InvalidTxsError :: forall txid failure.
( Typeable txid
, Typeable failure
, Show txid
, Show failure
)
=> [(txid, failure)]
-> InvalidTxsError

deriving instance Show InvalidTxsError
instance Exception InvalidTxsError

-- | type of user-defined validation failures
--
data family TxValidationFail tx

-- | A simple mempool writer.
-- | A mempool writer which generalizes the tx submission mempool writer
-- TODO: We could replace TxSubmissionMempoolWriter with this at some point
--
data MempoolWriter txid tx idx m =
MempoolWriter {

-- | Compute the transaction id from a transaction.
--
-- This is used in the protocol handler to verify a full transaction
-- matches a previously given transaction id.
--
txId :: tx -> txid,

-- | Supply a batch of transactions to the mempool. They are either
-- accepted or rejected individually, but in the order supplied.
--
-- The 'txid's of all transactions that were added successfully are
-- returned.
mempoolAddTxs
:: [tx]
-> m (Either (txid, TxValidationFail tx) [(txid, SubmitResult (TxValidationFail tx))])
}


-- | A mempool writer with validation harness
-- PRECONDITION: no duplicates given to mempoolAddTxs
--
getWriter :: forall tx txid ctx failure m.
getWriter :: forall tx txid ctx m.
( MonadSTM m
, MonadThrow m
-- , NFData txid
-- , NFData tx
-- , NFData (TxValidationFail tx)
, Ord txid
, Typeable txid
, Typeable failure
, Show txid
, Show failure
)
=> (tx -> txid)
-- ^ get txid of a tx
-- ^ project txid
-> m ctx
-- ^ monadic validation ctx
-> (ctx -> tx -> Either failure ())
-- ^ validate a tx, any failing `tx` throws an exception.
-> (failure -> Bool)
-- ^ return `True` when a failure should throw an exception
-- ^ acquire validation context
-> ( [tx]
-> ctx
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we remove the ctx, I realised at some point that we don't need inside it inside getWriter``: we acquire it in m ctx callback, which can be done outside of `getWriter` and passed to the validation callback. This will slightly simplify `getWriter`.

I should have cleaned this up before, so sorry for asking you to do that in this PR.

-> ExceptT (tx, TxValidationFail tx) m
[(tx, Either (TxValidationFail tx) ())])
-- ^ validation callback
-> TxValidationFail tx
-- ^ replace duplicates
-> Mempool m txid tx
-> TxSubmissionMempoolWriter txid tx Int m
getWriter getTxId getValidationCtx validateTx failureFilterFn (Mempool mempool) =
TxSubmissionMempoolWriter {
txId = getTxId,

mempoolAddTxs = \txs -> do
ctx <- getValidationCtx
(invalidTxIds, validTxs) <- atomically $ do
MempoolSeq { mempoolSet, mempoolSeq } <- readTVar mempool
let (invalidTxIds, validTxs) =
bimap (filter (failureFilterFn . snd))
(nubBy (on (==) getTxId))
. partitionEithers
. map (\tx -> case validateTx ctx tx of
Left e -> Left (getTxId tx, e)
Right _ -> Right tx
)
. filter (\tx -> getTxId tx `Set.notMember` mempoolSet)
$ txs
mempoolTxs' = MempoolSeq {
mempoolSet = Foldable.foldl' (\s tx -> getTxId tx `Set.insert` s)
mempoolSet
validTxs,
mempoolSeq = Foldable.foldl' (Seq.|>) mempoolSeq validTxs
}
writeTVar mempool mempoolTxs'
return (invalidTxIds, map getTxId validTxs)
when (not (null invalidTxIds)) $
throwIO (InvalidTxsError invalidTxIds)
return validTxs
}
-> MempoolWriter txid tx Int m
getWriter getTxId acquireCtx validateTxs duplicateFail (Mempool mempool) =
MempoolWriter {
txId = getTxId,

mempoolAddTxs = \txs -> assert (not . null $ txs) $ do
ctx <- acquireCtx
first (first getTxId)
<$> runExceptT do
-- TODO probably should force the results before entering the atomically block
-- to limit contention
!vTxs <- zipWith ((,) . getTxId) txs <$> validateTxs txs ctx

ExceptT . atomically $ do
MempoolSeq { mempoolSet, mempoolSeq } <- readTVar mempool
let result =
[if duplicate then
Left (txid, duplicateFail)
else
bimap (txid,) (const (txid, tx)) eResult
| (txid, (tx, eResult)) <- vTxs
, let duplicate = txid `Set.member` mempoolSet
]
(validIds, validTxs) = unzip . rights $ result
mempoolTxs' = MempoolSeq {
mempoolSet = Set.union mempoolSet (Set.fromList validIds),
mempoolSeq = Foldable.foldl' (Seq.|>) mempoolSeq validTxs
}
writeTVar mempool mempoolTxs'
return . Right $ either (second SubmitFail) (second (const SubmitSuccess)) <$> result
}


-- | Takes the general mempool writer defined here
-- and adapts it to the API of the tx submission mempool writer
-- to maintain backwards compatibility.
--
writerAdapter :: ( Exception (TxValidationFail tx)
, MonadThrow m
)
=> MempoolWriter txid tx idx m
-> TxSubmissionMempoolWriter txid tx idx m
writerAdapter MempoolWriter { txId, mempoolAddTxs } =
TxSubmissionMempoolWriter { txId, mempoolAddTxs = adapter }
where
adapter =
mempoolAddTxs >=> either (throwIO . snd) (return . fmap fst . filter success)
success = \case (_txid, SubmitSuccess) -> True; _otherwise -> False
Loading