diff --git a/hydra-node/json-schemas/logs.yaml b/hydra-node/json-schemas/logs.yaml index 3484913bbb6..3d90d610240 100644 --- a/hydra-node/json-schemas/logs.yaml +++ b/hydra-node/json-schemas/logs.yaml @@ -483,7 +483,7 @@ definitions: - missing - acknowledged - localCounter - - partyIndex + - theirIndex properties: tag: type: string @@ -500,7 +500,7 @@ definitions: type: array items: type: number - partyIndex: + theirIndex: type: number - title: BroadcastCounter description: >- @@ -509,13 +509,13 @@ definitions: additionalProperties: false required: - tag - - partyIndex + - ourIndex - localCounter properties: tag: type: string enum: ["BroadcastCounter"] - partyIndex: + ourIndex: type: number localCounter: type: array @@ -528,13 +528,13 @@ definitions: additionalProperties: false required: - tag - - partyIndex + - ourIndex - localCounter properties: tag: type: string enum: ["BroadcastPing"] - partyIndex: + ourIndex: type: number localCounter: type: array @@ -549,7 +549,8 @@ definitions: - tag - acknowledged - localCounter - - partyIndex + - theirIndex + - ourIndex properties: tag: type: string @@ -562,7 +563,9 @@ definitions: type: array items: type: number - partyIndex: + theirIndex: + type: number + ourIndex: type: number - title: ClearedMessageQueue description: >- @@ -590,7 +593,8 @@ definitions: - tag - acknowledged - localCounter - - partyIndex + - theirIndex + - ourIndex properties: tag: type: string @@ -603,7 +607,9 @@ definitions: type: array items: type: number - partyIndex: + theirIndex: + type: number + ourIndex: type: number - title: ReliabilityFailedToFindMsg description: >- @@ -2063,7 +2069,7 @@ definitions: enum: ["WaitOnContestationDeadline"] - title: WaitOnTxs description: >- - Some transactions from a proposed snapshot have not been seen yet + Some transactions from a proposed snapshot have not been seen yet. type: object additionalProperties: false required: diff --git a/hydra-node/src/Hydra/Network/Reliability.hs b/hydra-node/src/Hydra/Network/Reliability.hs index dfc664793be..9b13c74c253 100644 --- a/hydra-node/src/Hydra/Network/Reliability.hs +++ b/hydra-node/src/Hydra/Network/Reliability.hs @@ -86,6 +86,7 @@ import Cardano.Binary (serialize') import Cardano.Crypto.Util (SignableRepresentation (getSignableRepresentation)) import Control.Concurrent.Class.MonadSTM ( MonadSTM (readTQueue, writeTQueue), + modifyTVar', newTQueueIO, newTVarIO, readTVarIO, @@ -93,6 +94,8 @@ import Control.Concurrent.Class.MonadSTM ( ) import Control.Tracer (Tracer) import Data.IntMap qualified as IMap +import Data.Sequence.Strict ((|>)) +import Data.Sequence.Strict qualified as Seq import Data.Vector ( Vector, elemIndex, @@ -116,13 +119,13 @@ data ReliableMsg msg = ReliableMsg -- ^ Vector of highest known counter for each known party. Serves as announcement of -- which messages the sender of `ReliableMsg` has seen. The individual counters have -- nothing to do with the `message` also included in this. - , message :: msg + , payload :: msg } deriving stock (Eq, Show, Generic) deriving anyclass (ToJSON, FromJSON) instance ToCBOR msg => ToCBOR (ReliableMsg msg) where - toCBOR ReliableMsg{knownMessageIds, message} = toCBOR knownMessageIds <> toCBOR message + toCBOR ReliableMsg{knownMessageIds, payload} = toCBOR knownMessageIds <> toCBOR payload instance FromCBOR msg => FromCBOR (ReliableMsg msg) where fromCBOR = ReliableMsg <$> fromCBOR <*> fromCBOR @@ -135,11 +138,11 @@ instance ToCBOR msg => SignableRepresentation (ReliableMsg msg) where -- __NOTE__: Log items are documented in a YAML schema file which is not -- currently public, but should be. data ReliabilityLog - = Resending {missing :: Vector Int, acknowledged :: Vector Int, localCounter :: Vector Int, partyIndex :: Int} - | BroadcastCounter {partyIndex :: Int, localCounter :: Vector Int} - | BroadcastPing {partyIndex :: Int, localCounter :: Vector Int} - | Received {acknowledged :: Vector Int, localCounter :: Vector Int, partyIndex :: Int} - | Ignored {acknowledged :: Vector Int, localCounter :: Vector Int, partyIndex :: Int} + = Resending {missing :: Vector Int, acknowledged :: Vector Int, localCounter :: Vector Int, theirIndex :: Int} + | BroadcastCounter {ourIndex :: Int, localCounter :: Vector Int} + | BroadcastPing {ourIndex :: Int, localCounter :: Vector Int} + | Received {acknowledged :: Vector Int, localCounter :: Vector Int, theirIndex :: Int, ourIndex :: Int} + | Ignored {acknowledged :: Vector Int, localCounter :: Vector Int, theirIndex :: Int, ourIndex :: Int} | ReliabilityFailedToFindMsg { missingMsgIndex :: Int , sentMessagesLength :: Int @@ -224,56 +227,60 @@ withReliability :: NetworkComponent m (Authenticated (Heartbeat msg)) (Heartbeat msg) a withReliability tracer MessagePersistence{saveAcks, loadAcks, appendMessage, loadMessages} me otherParties withRawNetwork callback action = do acksCache <- loadAcks >>= newTVarIO + sentMessages <- loadMessages >>= newTVarIO . Seq.fromList resendQ <- newTQueueIO let ourIndex = fromMaybe (error "This cannot happen because we constructed the list with our party inside.") (findPartyIndex me) let resend = writeTQueue resendQ - withRawNetwork (reliableCallback acksCache resend ourIndex) $ \network@Network{broadcast} -> do + withRawNetwork (reliableCallback acksCache sentMessages resend ourIndex) $ \network@Network{broadcast} -> do withAsync (forever $ atomically (readTQueue resendQ) >>= broadcast) $ \_ -> - reliableBroadcast ourIndex acksCache network + reliableBroadcast sentMessages ourIndex acksCache network where allParties = fromList $ sort $ me : otherParties - reliableBroadcast ourIndex acksCache Network{broadcast} = + reliableBroadcast sentMessages ourIndex acksCache Network{broadcast} = action $ Network { broadcast = \msg -> case msg of Data{} -> do - newAckCounter <- incrementAckCounter + localCounter <- atomically $ cacheMessage msg >> incrementAckCounter + saveAcks localCounter appendMessage msg - saveAcks newAckCounter - traceWith tracer (BroadcastCounter ourIndex newAckCounter) - broadcast $ ReliableMsg newAckCounter msg + traceWith tracer BroadcastCounter{ourIndex, localCounter} + broadcast $ ReliableMsg localCounter msg Ping{} -> do - acks <- readTVarIO acksCache - saveAcks acks - traceWith tracer (BroadcastPing ourIndex acks) - broadcast $ ReliableMsg acks msg + localCounter <- readTVarIO acksCache + saveAcks localCounter + traceWith tracer BroadcastPing{ourIndex, localCounter} + broadcast $ ReliableMsg localCounter msg } where - incrementAckCounter = atomically $ do + incrementAckCounter = do acks <- readTVar acksCache let newAcks = constructAcks acks ourIndex writeTVar acksCache newAcks pure newAcks - reliableCallback acksCache resend ourIndex (Authenticated (ReliableMsg acks msg) party) = do - if length acks /= length allParties + cacheMessage msg = + modifyTVar' sentMessages (|> msg) + + reliableCallback acksCache sentMessages resend ourIndex (Authenticated (ReliableMsg acknowledged payload) party) = do + if length acknowledged /= length allParties then traceWith tracer ReceivedMalformedAcks { fromParty = party - , partyAcks = acks + , partyAcks = acknowledged , numberOfParties = length allParties } else do eShouldCallbackWithKnownAcks <- atomically $ runMaybeT $ do loadedAcks <- lift $ readTVar acksCache partyIndex <- hoistMaybe $ findPartyIndex party - messageAckForParty <- hoistMaybe (acks !? partyIndex) + messageAckForParty <- hoistMaybe (acknowledged !? partyIndex) knownAckForParty <- hoistMaybe $ loadedAcks !? partyIndex if - | isPing msg -> + | isPing payload -> -- we do not update indices on Pings but we do propagate them return (True, partyIndex, loadedAcks) | messageAckForParty == knownAckForParty + 1 -> do @@ -286,15 +293,15 @@ withReliability tracer MessagePersistence{saveAcks, loadAcks, appendMessage, loa return (False, partyIndex, loadedAcks) case eShouldCallbackWithKnownAcks of - Just (shouldCallback, partyIndex, knownAcks) -> do + Just (shouldCallback, theirIndex, localCounter) -> do if shouldCallback then do - callback (Authenticated msg party) - traceWith tracer (Received acks knownAcks partyIndex) - else traceWith tracer (Ignored acks knownAcks partyIndex) + callback Authenticated{payload, party} + traceWith tracer Received{acknowledged, localCounter, theirIndex, ourIndex} + else traceWith tracer Ignored{acknowledged, localCounter, theirIndex, ourIndex} - when (isPing msg) $ - resendMessagesIfLagging resend partyIndex knownAcks acks ourIndex + when (isPing payload) $ + resendMessagesIfLagging sentMessages resend theirIndex localCounter acknowledged ourIndex Nothing -> pure () constructAcks acks wantedIndex = @@ -302,8 +309,8 @@ withReliability tracer MessagePersistence{saveAcks, loadAcks, appendMessage, loa partyIndexes = generate (length allParties) id - resendMessagesIfLagging resend partyIndex knownAcks messageAcks myIndex = do - let mmessageAckForUs = messageAcks !? myIndex + resendMessagesIfLagging sentMessages resend theirIndex knownAcks acknowledged myIndex = do + let mmessageAckForUs = acknowledged !? myIndex let mknownAckForUs = knownAcks !? myIndex case (mmessageAckForUs, mknownAckForUs) of (Just messageAckForUs, Just knownAckForUs) -> @@ -311,8 +318,8 @@ withReliability tracer MessagePersistence{saveAcks, loadAcks, appendMessage, loa -- latest message sent when (messageAckForUs < knownAckForUs) $ do let missing = fromList [messageAckForUs + 1 .. knownAckForUs] - storedMessages <- loadMessages - let messages = IMap.fromList (zip [1 ..] storedMessages) + storedMessages <- readTVarIO sentMessages + let messages = IMap.fromList (zip [1 ..] $ toList storedMessages) forM_ missing $ \idx -> do case messages IMap.!? idx of Nothing -> @@ -324,9 +331,9 @@ withReliability tracer MessagePersistence{saveAcks, loadAcks, appendMessage, loa , messageAckForUs = messageAckForUs } Just missingMsg -> do - let newAcks' = zipWith (\ack i -> if i == myIndex then idx else ack) knownAcks partyIndexes - traceWith tracer (Resending missing messageAcks newAcks' partyIndex) - atomically $ resend $ ReliableMsg newAcks' missingMsg + let localCounter = zipWith (\ack i -> if i == myIndex then idx else ack) knownAcks partyIndexes + traceWith tracer Resending{missing, acknowledged, localCounter, theirIndex} + atomically $ resend $ ReliableMsg localCounter missingMsg _ -> pure () -- Find the index of a party in the list of all parties. diff --git a/hydra-node/src/Hydra/Node/Network.hs b/hydra-node/src/Hydra/Node/Network.hs index d6c0d4776bc..6197196cb6e 100644 --- a/hydra-node/src/Hydra/Node/Network.hs +++ b/hydra-node/src/Hydra/Node/Network.hs @@ -143,7 +143,7 @@ withNetwork tracer connectionMessages configuration callback action = do -- * Some state already exists and is loaded, -- * The number of parties is not the same as the number of acknowledgments saved. configureMessagePersistence :: - (MonadIO m, MonadThrow m, FromJSON msg, ToJSON msg) => + (MonadIO m, MonadThrow m, FromJSON msg, ToJSON msg, MonadSTM m, MonadThread m, MonadThrow (STM m)) => Tracer m (HydraNodeLog tx) -> FilePath -> Int -> diff --git a/hydra-node/src/Hydra/Persistence.hs b/hydra-node/src/Hydra/Persistence.hs index 7e27fa2a067..892f5f718f2 100644 --- a/hydra-node/src/Hydra/Persistence.hs +++ b/hydra-node/src/Hydra/Persistence.hs @@ -4,6 +4,8 @@ module Hydra.Persistence where import Hydra.Prelude +import Control.Concurrent.Class.MonadSTM (newTVarIO, throwSTM, writeTVar) +import Control.Monad.Class.MonadFork (myThreadId) import Data.Aeson qualified as Aeson import Data.ByteString qualified as BS import Data.ByteString.Char8 qualified as C8 @@ -11,8 +13,9 @@ import System.Directory (createDirectoryIfMissing, doesFileExist) import System.FilePath (takeDirectory) import UnliftIO.IO.File (withBinaryFile, writeBinaryFileDurableAtomic) -newtype PersistenceException +data PersistenceException = PersistenceException String + | IncorrectAccessException String deriving stock (Eq, Show) instance Exception PersistenceException @@ -53,18 +56,33 @@ data PersistenceIncremental a m = PersistenceIncremental } -- | Initialize persistence handle for given type 'a' at given file path. +-- +-- This instance of `PersistenceIncremental` is "thread-safe" in the sense that +-- it prevents loading from a different thread once one starts `append`ing +-- through the handle. If another thread attempts to `loadAll` after this point, +-- an `IncorrectAccessException` will be raised. createPersistenceIncremental :: - (MonadIO m, MonadThrow m) => + forall a m. + (MonadIO m, MonadThrow m, MonadSTM m, MonadThread m, MonadThrow (STM m)) => FilePath -> m (PersistenceIncremental a m) createPersistenceIncremental fp = do liftIO . createDirectoryIfMissing True $ takeDirectory fp + authorizedThread <- newTVarIO Nothing pure $ PersistenceIncremental { append = \a -> do + tid <- myThreadId + atomically $ writeTVar authorizedThread $ Just tid let bytes = toStrict $ Aeson.encode a <> "\n" liftIO $ withBinaryFile fp AppendMode (`BS.hPut` bytes) - , loadAll = + , loadAll = do + tid <- myThreadId + atomically $ do + authTid <- readTVar authorizedThread + when (isJust authTid && authTid /= Just tid) $ + throwSTM (IncorrectAccessException $ "Trying to load persisted data in " <> fp <> " from different thread") + liftIO (doesFileExist fp) >>= \case False -> pure [] True -> do diff --git a/hydra-node/test/Hydra/API/ServerSpec.hs b/hydra-node/test/Hydra/API/ServerSpec.hs index 2eac9ee4dc0..2d6b68edb34 100644 --- a/hydra-node/test/Hydra/API/ServerSpec.hs +++ b/hydra-node/test/Hydra/API/ServerSpec.hs @@ -365,8 +365,8 @@ spec = describe "ServerSpec" $ guard $ v ^? key "headStatus" == Just (Aeson.String "Initializing") guard $ v ^? key "snapshotUtxo" == Just expectedUtxos - -- expect the api server to load events from apiPersistence and project headStatus correctly - withTestAPIServer port alice apiPersistence tracer $ \_ -> do + newApiPersistence <- createPersistenceIncremental $ persistenceDir <> "/server-output" + withTestAPIServer port alice newApiPersistence tracer $ \_ -> do waitForValue port $ \v -> do guard $ v ^? key "headStatus" == Just (Aeson.String "Initializing") guard $ v ^? key "snapshotUtxo" == Just expectedUtxos diff --git a/hydra-node/test/Hydra/Network/ReliabilitySpec.hs b/hydra-node/test/Hydra/Network/ReliabilitySpec.hs index 17804343193..a4439172c42 100644 --- a/hydra-node/test/Hydra/Network/ReliabilitySpec.hs +++ b/hydra-node/test/Hydra/Network/ReliabilitySpec.hs @@ -87,7 +87,8 @@ spec = parallel $ do receivedMessagesInOrder messageReceived = let refMessages = Data "node-2" <$> [1 ..] - in all (`elem` refMessages) (payload <$> messageReceived) + isInMessage Authenticated{payload} = payload `elem` refMessages + in all isInMessage messageReceived in receivedMessagesInOrder propagatedMessages & counterexample (show propagatedMessages) @@ -125,8 +126,8 @@ spec = parallel $ do aliceFailingNetwork = failingNetwork randomSeed alice (bobToAlice, aliceToBob) bobFailingNetwork = failingNetwork randomSeed bob (aliceToBob, bobToAlice) - bobReliabilityStack = reliabilityStack bobPersistence bobFailingNetwork emittedTraces "bob" bob [alice] - aliceReliabilityStack = reliabilityStack alicePersistence aliceFailingNetwork emittedTraces "alice" alice [bob] + bobReliabilityStack = reliabilityStack bobPersistence bobFailingNetwork (captureTraces emittedTraces) "bob" bob [alice] + aliceReliabilityStack = reliabilityStack alicePersistence aliceFailingNetwork (captureTraces emittedTraces) "alice" alice [bob] runAlice = runPeer aliceReliabilityStack "alice" messagesReceivedByAlice messagesReceivedByBob aliceToBobMessages bobToAliceMessages runBob = runPeer bobReliabilityStack "bob" messagesReceivedByBob messagesReceivedByAlice bobToAliceMessages aliceToBobMessages @@ -169,8 +170,10 @@ spec = parallel $ do it "appends messages to disk and can load them back" $ do withTempDir "network-messages-persistence" $ \tmpDir -> do + let networkMessagesFile = tmpDir <> "/network-messages" + Persistence{load, save} <- createPersistence $ tmpDir <> "/acks" - PersistenceIncremental{loadAll, append} <- createPersistenceIncremental $ tmpDir <> "/network-messages" + PersistenceIncremental{loadAll, append} <- createPersistenceIncremental $ networkMessagesFile let messagePersistence = MessagePersistence @@ -204,8 +207,8 @@ spec = parallel $ do receivedMsgs `shouldBe` [ReliableMsg (fromList [1, 1]) (Data "node-1" msg)] - doesFileExist (tmpDir "network-messages") `shouldReturn` True - loadAll `shouldReturn` [Data "node-1" msg] + doesFileExist networkMessagesFile `shouldReturn` True + reloadAll networkMessagesFile `shouldReturn` [Data "node-1" msg] doesFileExist (tmpDir "acks") `shouldReturn` True load `shouldReturn` Just (fromList [1, 1]) @@ -220,10 +223,10 @@ spec = parallel $ do (waitForAllMessages expectedMessages receivedMessageContainer) (waitForAllMessages messagesToSend sentMessageContainer) - reliabilityStack persistence underlyingNetwork tracesContainer nodeId party peers = + reliabilityStack persistence underlyingNetwork tracer nodeId party peers = withHeartbeat nodeId noop $ withFlipHeartbeats $ - withReliability (captureTraces tracesContainer) persistence party peers underlyingNetwork + withReliability tracer persistence party peers underlyingNetwork failingNetwork seed peer (readQueue, writeQueue) callback action = withAsync @@ -245,6 +248,11 @@ spec = parallel $ do writeTVar seed' newGenSeed pure res + reloadAll :: FilePath -> IO [Heartbeat (Heartbeat String)] + reloadAll fileName = + createPersistenceIncremental fileName + >>= \PersistenceIncremental{loadAll} -> loadAll + noop :: Monad m => b -> m () noop = const $ pure () @@ -273,7 +281,7 @@ captureIncoming receivedMessages msg = atomically $ modifyTVar' receivedMessages (`snoc` msg) capturePayload :: MonadSTM m => TVar m (Vector msg) -> Authenticated (Heartbeat msg) -> m () -capturePayload receivedMessages message = case payload message of +capturePayload receivedMessages Authenticated{payload} = case payload of Data _ msg -> atomically $ modifyTVar' receivedMessages (`snoc` msg) _ -> pure () diff --git a/hydra-node/test/Hydra/PersistenceSpec.hs b/hydra-node/test/Hydra/PersistenceSpec.hs index 5c45611b120..8a14af91a88 100644 --- a/hydra-node/test/Hydra/PersistenceSpec.hs +++ b/hydra-node/test/Hydra/PersistenceSpec.hs @@ -1,3 +1,5 @@ +{-# LANGUAGE LambdaCase #-} + module Hydra.PersistenceSpec where import Hydra.Prelude hiding (label) @@ -6,8 +8,8 @@ import Test.Hydra.Prelude import Data.Aeson (Value (..)) import Data.Aeson qualified as Aeson import Data.Text qualified as Text -import Hydra.Persistence (Persistence (..), PersistenceIncremental (..), createPersistence, createPersistenceIncremental) -import Test.QuickCheck (checkCoverage, cover, elements, oneof, (===)) +import Hydra.Persistence (Persistence (..), PersistenceException (..), PersistenceIncremental (..), createPersistence, createPersistenceIncremental) +import Test.QuickCheck (checkCoverage, cover, elements, oneof, suchThat, (===)) import Test.QuickCheck.Gen (listOf) import Test.QuickCheck.Monadic (monadicIO, monitor, pick, run) @@ -54,6 +56,22 @@ spec = do loadAll pure $ actualResult === items + it "it cannot load from a different thread once having started appending" $ + monadicIO $ do + items <- pick $ listOf genPersistenceItem + moreItems <- pick $ listOf genPersistenceItem `suchThat` ((> 2) . length) + pure $ + withTempDir "hydra-persistence" $ \tmpDir -> do + PersistenceIncremental{loadAll, append} <- createPersistenceIncremental $ tmpDir <> "/data" + forM_ items append + loadAll `shouldReturn` items + race_ + (forever $ threadDelay 0.01 >> loadAll) + (forM_ moreItems $ \item -> append item >> threadDelay 0.01) + `shouldThrow` \case + IncorrectAccessException{} -> True + _ -> False + genPersistenceItem :: Gen Aeson.Value genPersistenceItem = oneof