Skip to content

Commit

Permalink
Revert "Introduce ipc server on chain-observer"
Browse files Browse the repository at this point in the history
This reverts commit 1f21d95.
  • Loading branch information
ffakenz committed Jan 18, 2024
1 parent 7bcd5dd commit 6ba9da4
Show file tree
Hide file tree
Showing 7 changed files with 30 additions and 118 deletions.
3 changes: 2 additions & 1 deletion hydra-chain-observer/exe/Main.hs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
module Main where

import Hydra.ChainObserver (defaultObserverHandler)
import Hydra.ChainObserver qualified
import Hydra.Prelude

main :: IO ()
main = Hydra.ChainObserver.main
main = Hydra.ChainObserver.main defaultObserverHandler
5 changes: 1 addition & 4 deletions hydra-chain-observer/hydra-chain-observer.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ source-repository head
common project-config
default-language: GHC2021
default-extensions:
NoImplicitPrelude
BangPatterns
BinaryLiterals
ConstraintKinds
Expand All @@ -42,6 +41,7 @@ common project-config
MultiParamTypeClasses
MultiWayIf
NamedFieldPuns
NoImplicitPrelude
NumericUnderscores
OverloadedStrings
PartialTypeSignatures
Expand All @@ -64,13 +64,10 @@ library
hs-source-dirs: src
ghc-options: -haddock
build-depends:
, base
, hydra-cardano-api
, hydra-node
, hydra-plutus
, hydra-prelude
, io-classes
, network
, optparse-applicative
, ouroboros-network-protocols

Expand Down
98 changes: 12 additions & 86 deletions hydra-chain-observer/src/Hydra/ChainObserver.hs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,6 @@ module Hydra.ChainObserver where

import Hydra.Prelude

import Control.Concurrent (forkFinally)
import Control.Concurrent.Class.MonadSTM (modifyTVar')
import Control.Exception ()
import Hydra.Cardano.Api (
Block (..),
BlockInMode (..),
Expand Down Expand Up @@ -48,103 +45,32 @@ import Hydra.Contract qualified as Contract
import Hydra.HeadId (HeadId (..))
import Hydra.Ledger.Cardano (adjustUTxO)
import Hydra.Logging (Tracer, Verbosity (..), traceWith, withTracer)
import Hydra.Network (Host (..))
import Hydra.Node.EventQueue (EventQueue (..), Queued (..), createEventQueue)
import Network.Socket (
AddrInfo (..),
SocketOption (..),
SocketType (..),
accept,
bind,
close,
defaultHints,
defaultProtocol,
getAddrInfo,
listen,
setSocketOption,
socket,
socketToHandle,
withSocketsDo,
)
import Options.Applicative (execParser)
import Ouroboros.Network.Protocol.ChainSync.Client (
ChainSyncClient (..),
ClientStIdle (..),
ClientStIntersect (..),
ClientStNext (..),
)
import System.IO (hClose, hPrint)

type ObserverHandler m = [HeadObservation] -> m ()

type ObserverState = [HeadObservation]

observerHandler :: TVar IO ObserverState -> ObserverState -> IO ()
observerHandler observerState observations =
atomically $
modifyTVar' observerState (<> observations)

runIPCServer :: Host -> EventQueue IO ObserverState -> IO ()
runIPCServer Host{hostname, port} eq = withSocketsDo $ do
-- Create a TCP socket
bracket
openTCPListener
close
( \sock -> do
putStrLn $ "Listening on port " ++ show port
forever $ do
-- Accept incoming connection
(conn, _) <- accept sock
-- Fork a new thread to handle the connection
forkFinally
(handleClient conn)
( \_ -> close conn
)
)
where
openTCPListener = do
is <- getAddrInfo (Just defaultHints) (Just $ toString hostname) (Just $ show port)
addr <- case is of
(inf : _) -> pure inf
_ -> die "getAdrrInfo failed"
sock <- socket (addrFamily addr) Stream defaultProtocol
setSocketOption sock ReuseAddr 1
bind sock (addrAddress addr)
listen sock 5
return sock

handleClient conn = do
hdl <- socketToHandle conn ReadWriteMode
hSetBuffering hdl LineBuffering
putStrLn "Client connected"
pushObservation hdl `finally` hClose hdl

pushObservation hdl = forever $ do
Queued{queuedEvent} <- nextEvent eq
hPrint hdl queuedEvent
defaultObserverHandler :: Applicative m => ObserverHandler m
defaultObserverHandler = const $ pure ()

main :: IO ()
main = do
Options{networkId, nodeSocket, host, port, startChainFrom} <- execParser hydraChainObserverOptions
main :: ObserverHandler IO -> IO ()
main observerHandler = do
Options{networkId, nodeSocket, startChainFrom} <- execParser hydraChainObserverOptions
withTracer (Verbose "hydra-chain-observer") $ \tracer -> do
traceWith tracer KnownScripts{scriptInfo = Contract.scriptInfo}
traceWith tracer ConnectingToNode{nodeSocket, networkId}
chainPoint <- case startChainFrom of
Nothing -> queryTip networkId nodeSocket
Just x -> pure x
traceWith tracer StartObservingFrom{chainPoint}
eq@EventQueue{putEvent} <- createEventQueue
race
( runIPCServer Host{hostname = show host, port} eq
`catch` \(e :: SomeException) -> putStrLn $ "Exception: " ++ show e
)
( connectToLocalNode
(connectInfo nodeSocket networkId)
(clientProtocols tracer networkId chainPoint putEvent)
)
>>= \case
Left{} -> error "Something went wrong: "
Right a -> pure a
connectToLocalNode
(connectInfo nodeSocket networkId)
(clientProtocols tracer networkId chainPoint observerHandler)

type ChainObserverLog :: Type
data ChainObserverLog
Expand Down Expand Up @@ -183,9 +109,9 @@ clientProtocols ::
ChainPoint ->
ObserverHandler IO ->
LocalNodeClientProtocols BlockType ChainPoint ChainTip slot tx txid txerr query IO
clientProtocols tracer networkId startingPoint observerHandle =
clientProtocols tracer networkId startingPoint observerHandler =
LocalNodeClientProtocols
{ localChainSyncClient = LocalChainSyncClient $ chainSyncClient tracer networkId startingPoint observerHandle
{ localChainSyncClient = LocalChainSyncClient $ chainSyncClient tracer networkId startingPoint observerHandler
, localTxSubmissionClient = Nothing
, localStateQueryClient = Nothing
, localTxMonitoringClient = Nothing
Expand All @@ -211,7 +137,7 @@ chainSyncClient ::
ChainPoint ->
ObserverHandler m ->
ChainSyncClient BlockType ChainPoint ChainTip m ()
chainSyncClient tracer networkId startingPoint observerHandle =
chainSyncClient tracer networkId startingPoint observerHandler =
ChainSyncClient $
pure $
SendMsgFindIntersect [startingPoint] clientStIntersect
Expand Down Expand Up @@ -240,7 +166,7 @@ chainSyncClient tracer networkId startingPoint observerHandle =
let (utxo', observations) = observeAll networkId utxo txs
-- FIXME we should be exposing OnChainTx instead of working around NoHeadTx.
forM_ observations (maybe (pure ()) (traceWith tracer) . logObservation)
observerHandle observations
observerHandler observations
pure $ clientStIdle utxo'
_ -> pure $ clientStIdle utxo
, recvMsgRollBackward = \point _tip -> ChainSyncClient $ do
Expand Down
7 changes: 0 additions & 7 deletions hydra-chain-observer/src/Hydra/ChainObserver/Options.hs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,9 @@ module Hydra.ChainObserver.Options where
import Hydra.Prelude

import Hydra.Cardano.Api (ChainPoint, NetworkId, SocketPath)
import Hydra.Network (IP, PortNumber)
import Hydra.Options (
hostParser,
networkIdParser,
nodeSocketParser,
portParser,
startChainFromParser,
)
import Options.Applicative (Parser, ParserInfo, fullDesc, header, helper, info, progDesc)
Expand All @@ -17,8 +14,6 @@ type Options :: Type
data Options = Options
{ networkId :: NetworkId
, nodeSocket :: SocketPath
, host :: IP
, port :: PortNumber
, startChainFrom :: Maybe ChainPoint
-- ^ Point at which to start following the chain.
}
Expand All @@ -29,8 +24,6 @@ optionsParser =
Options
<$> networkIdParser
<*> nodeSocketParser
<*> hostParser
<*> portParser
<*> optional startChainFromParser

hydraChainObserverOptions :: ParserInfo Options
Expand Down
17 changes: 8 additions & 9 deletions hydra-chain-observer/test/Hydra/ChainObserverSpec.hs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@ import Test.Hydra.Prelude
import Hydra.Chain.Direct.Fixture (testNetworkId)
import Hydra.Chain.Direct.State (HasKnownUTxO (getKnownUTxO), genChainStateWithTx)
import Hydra.Chain.Direct.State qualified as Transition
import Hydra.Chain.Direct.Tx (HeadObservation (..))
import Hydra.ChainObserver (observeAll, observeTx)
import Hydra.ChainObserver (ChainObserverLog (..), observeAll, observeTx)
import Hydra.Ledger.Cardano (genSequenceOfSimplePaymentTransactions)
import Test.QuickCheck (counterexample, forAll, forAllBlind, property, (=/=), (===))
import Test.QuickCheck.Property (checkCoverage)
Expand All @@ -22,13 +21,13 @@ spec =
counterexample (show transition) $
let utxo = getKnownUTxO st
in case snd $ observeTx testNetworkId utxo tx of
Just (Init{}) -> transition === Transition.Init
Just (Abort{}) -> transition === Transition.Abort
Just (Commit{}) -> transition === Transition.Commit
Just (CollectCom{}) -> transition === Transition.Collect
Just (Close{}) -> transition === Transition.Close
Just (Contest{}) -> transition === Transition.Contest
Just (Fanout{}) -> transition === Transition.Fanout
Just (HeadInitTx{}) -> transition === Transition.Init
Just (HeadCommitTx{}) -> transition === Transition.Commit
Just (HeadCollectComTx{}) -> transition === Transition.Collect
Just (HeadAbortTx{}) -> transition === Transition.Abort
Just (HeadCloseTx{}) -> transition === Transition.Close
Just (HeadContestTx{}) -> transition === Transition.Contest
Just (HeadFanoutTx{}) -> transition === Transition.Fanout
_ -> property False

prop "Updates UTxO state given transaction part of Head lifecycle" $
Expand Down
2 changes: 0 additions & 2 deletions hydra-cluster/test/Test/ChainObserverSpec.hs
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,6 @@ withChainObserver cardanoNode action =
proc
"hydra-chain-observer"
$ ["--node-socket", unFile nodeSocket]
<> ["--host", "127.0.0.1"]
<> ["--port", "8888"]
<> case networkId of
Mainnet -> ["--mainnet"]
Testnet (NetworkMagic magic) -> ["--testnet-magic", show magic]
Expand Down
16 changes: 7 additions & 9 deletions hydra-explorer/src/Hydra/Explorer.hs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import Hydra.HeadId (HeadId)
import Hydra.Logging (Tracer, Verbosity (..), traceWith, withTracer)
import Hydra.Network (PortNumber)

import Control.Concurrent.Class.MonadSTM (newTVarIO)
import Control.Concurrent.Class.MonadSTM (modifyTVar', newTVarIO)
import Data.Aeson qualified as Aeson
import Data.List qualified as List
import Hydra.API.APIServerLog (APIServerLog (..), Method (..), PathInfo (..))
Expand All @@ -30,6 +30,11 @@ import System.Environment (withArgs)

type ExplorerState = [HeadObservation]

observerHandler :: TVar IO ExplorerState -> ExplorerState -> IO ()
observerHandler explorerState observations = do
atomically $
modifyTVar' explorerState (<> observations)

main :: IO ()
main = do
withTracer (Verbose "hydra-explorer") $ \tracer -> do
Expand All @@ -38,14 +43,7 @@ main = do
args <- getArgs
race
-- FIXME: this is going to be problematic on mainnet.
( withArgs
( args
<> ["--start-chain-from", "0"]
<> ["--host", "127.0.0.1"]
<> ["--port", "8888"]
)
Hydra.ChainObserver.main
)
(withArgs (args <> ["--start-chain-from", "0"]) $ Hydra.ChainObserver.main (observerHandler explorerState))
( traceWith tracer (APIServerStarted (fromIntegral port :: PortNumber))
*> Warp.runSettings (settings tracer) (httpApp tracer getHeadIds)
)
Expand Down

0 comments on commit 6ba9da4

Please sign in to comment.