Skip to content

Commit 339a47b

Browse files
Merge pull request #5104 from IntersectMBO/mwojtowicz/inbound-governor-turbo
Inbound governor performance improvement
2 parents a66d529 + bc25515 commit 339a47b

File tree

31 files changed

+1738
-1664
lines changed

31 files changed

+1738
-1664
lines changed

docs/network-spec/connection-manager.tex

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2100,15 +2100,18 @@ \subsubsection{\RemoteIdle}
21002100
connection is used (\warm{} or \hot{}) or not (\cold{}) by the outbound side.
21012101

21022102
\subsubsection{\RemoteWarm}
2103-
A connection enters \RemoteWarm{} state once any of the mini-protocols starts
2104-
to operate. Once all hot mini-protocols start, the state will transition to
2105-
\RemoteHot{}. Note that this is slightly different than the notion of a \warm{}
2106-
peer, for which all \established{} and \warm{} mini-protocols are active, but
2107-
\hot{} ones are idle.
2103+
A connection dwells in \RemoteWarm{} if there are strictly only any warm or established
2104+
responder protocols running. Note also that an established protocol is one that may run
2105+
in both hot and warm states, but cannot be the only type running to maintain hot state
2106+
once all proper hot protocols have terminated. In other words, the connection must be
2107+
demoted in that case.
21082108

21092109
\subsubsection{\RemoteHot}
2110-
A connection enters \RemoteHot{} transition once all hot protocols started, if
2111-
any of them terminates the connection will be put in \RemoteWarm{}.
2110+
A connection enters \RemoteHot{} state once any hot responder protocol has started.
2111+
In particular, if a hot responder is the first to start, the state cycles through \RemoteWarm{}
2112+
first. Once all hot responders terminate, the connection will be put in \RemoteWarm{} regardless
2113+
of whether there are any warm or established responders left. In the latter case, if there aren't any
2114+
other protocols running, the connection will then follow up with further demotion to \RemoteIdle{}.
21122115

21132116
\subsection{Transitions}
21142117

@@ -2166,11 +2169,10 @@ \subsubsection{\MuxTerminated}
21662169
termination of the connection, as it can detect this by itself.
21672170

21682171
\subsubsection{\PromotedToHotRemote}
2169-
The inbound governor detects when all \hot{} mini-protocols started. In such
2172+
The inbound governor detects when any \hot{} mini-protocols have started. In such
21702173
case a \RemoteWarm{} connection is put in \RemoteHot{} state.
21712174

21722175
\subsubsection{\DemotedToWarmRemote}
2173-
Dually to \PromotedToHotRemote{} state transition, as soon as any of the \hot{}
2174-
mini-protocols terminates, the connection will transition to \RemoteWarm{}
2176+
Dually to \PromotedToHotRemote{} state transition, as soon as all of the \hot{}
2177+
mini-protocols terminate, the connection will transition to \RemoteWarm{}
21752178
state.
2176-

network-mux/CHANGELOG.md

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,11 @@
33
## next release
44

55
### Breaking changes
6-
* Bearer writeMany function for vector IO
7-
* An optional read buffer for Bearer
8-
* Polling of the egress queue
6+
7+
* run, miniProtocolJob, monitor now accept Tracers record
8+
instead of `Tracer m Trace` type.
99

1010
### Non-breaking changes
11-
* Define msHeaderLength instead of using '8'
12-
* Benchmark for Socket Bearer
13-
* Use ByteString.Builder for the ingress queues
14-
* Signal the kernal that we require at least the full SDU's worth of data
1511

1612
## 0.8.0.1 -- 2025-06-02
1713

@@ -27,6 +23,8 @@
2723

2824
* `MakeBearer` accepts optional `ReadBuffer`
2925
* added fields `egressInterval`, `writeMany`, `batchSize` to `Bearer`
26+
* writeMany provides vector IO, egressInterval supports polling of egress queue
27+
for tuning latency vs. network efficiency
3028
* `socketAsBearer` additionally takes `ReadBuffer`, egress
3129
interval `DiffTime` for egress polling, and batchSize
3230
* changed `IngressQueue` type synonym
@@ -35,6 +33,10 @@
3533
### Non-breaking changes
3634

3735
* added `makeSocketBearer'`, `ReadBuffer`, `withReadBufferIO`
36+
* Define msHeaderLength instead of using '8'
37+
* Benchmark for Socket Bearer
38+
* Use ByteString.Builder for the ingress queues
39+
* Signal the kernal that we require at least the full SDU's worth of data
3840

3941
## 0.7.0.0 -- 2025-02-25
4042

network-mux/demo/mux-demo.hs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,8 @@ putStrLn_ = BSC.putStrLn . BSC.pack
6868
debugTracer :: Show a => Tracer IO a
6969
debugTracer = showTracing (Tracer putStrLn_)
7070

71+
nullTracers :: (Applicative m) => Tracers m
72+
nullTracers = Tracers nullTracer nullTracer
7173
--
7274
-- Protocols
7375
--
@@ -133,7 +135,7 @@ serverWorker bearer = do
133135
putStrLn $ "Result: " ++ show result
134136
Mx.stop mux
135137

136-
Mx.run nullTracer mux bearer
138+
Mx.run nullTracers mux bearer
137139
where
138140
ptcls :: [MiniProtocolInfo ResponderMode]
139141
ptcls = [ MiniProtocolInfo {
@@ -193,7 +195,7 @@ clientWorker bearer n msg = do
193195
putStrLn $ "Result: " ++ show result
194196
Mx.stop mux
195197

196-
Mx.run nullTracer mux bearer
198+
Mx.run nullTracers mux bearer
197199
where
198200
ptcls :: [MiniProtocolInfo Mx.InitiatorMode]
199201
ptcls = [ MiniProtocolInfo {
@@ -208,4 +210,3 @@ echoClient :: Int -> Int -> ByteString
208210
-> ReqRespClient ByteString ByteString IO Int
209211
echoClient !n 0 _ = SendMsgDone (pure n)
210212
echoClient !n m rawmsg = SendMsgReq rawmsg (pure . echoClient (n+1) (m-1))
211-

network-mux/src/Network/Mux.hs

Lines changed: 24 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ module Network.Mux
4545
, traceBearerState
4646
, BearerState (..)
4747
, Trace (..)
48+
, Tracers (..)
4849
, WithBearer (..)
4950
) where
5051

@@ -212,11 +213,11 @@ run :: forall m (mode :: Mode).
212213
, MonadTimer m
213214
, MonadMask m
214215
)
215-
=> Tracer m Trace
216+
=> Tracers m
216217
-> Mux mode m
217218
-> Bearer m
218219
-> m ()
219-
run tracer
220+
run tracers@Tracers { muxTracer = tracer }
220221
Mux { muxMiniProtocols,
221222
muxControlCmdQueue,
222223
muxStatus
@@ -238,7 +239,7 @@ run tracer
238239
-- Wait for someone to shut us down by calling muxStop or an error.
239240
-- Outstanding jobs are shut down Upon completion of withJobPool.
240241
withTimeoutSerial $ \timeout ->
241-
monitor tracer
242+
monitor tracers
242243
timeout
243244
jobpool
244245
egressQueue
@@ -250,6 +251,7 @@ run tracer
250251
-- deadlock of mini-protocol completion action.
251252
`catch` \(SomeAsyncException e) -> do
252253
atomically $ writeTVar muxStatus (Failed $ toException e)
254+
traceWith tracer $ TraceState Dead
253255
throwIO e
254256
where
255257
muxerJob egressQueue =
@@ -272,12 +274,15 @@ miniProtocolJob
272274
, MonadThread m
273275
, MonadThrow (STM m)
274276
)
275-
=> Tracer m Trace
277+
=> Tracers m
276278
-> EgressQueue m
277279
-> MiniProtocolState mode m
278280
-> MiniProtocolAction m
279281
-> JobPool.Job Group m JobResult
280-
miniProtocolJob tracer egressQueue
282+
miniProtocolJob Tracers {
283+
muxTracer = tracer,
284+
channelTracer }
285+
egressQueue
281286
MiniProtocolState {
282287
miniProtocolInfo =
283288
MiniProtocolInfo {
@@ -300,7 +305,7 @@ miniProtocolJob tracer egressQueue
300305
labelThisThread (case miniProtocolNum of
301306
MiniProtocolNum a -> "prtcl-" ++ show a)
302307
w <- newTVarIO BL.empty
303-
let chan = muxChannel tracer egressQueue (Wanton w)
308+
let chan = muxChannel channelTracer egressQueue (Wanton w)
304309
miniProtocolNum miniProtocolDirEnum
305310
miniProtocolIngressQueue
306311
(result, remainder) <- miniProtocolAction chan
@@ -390,14 +395,16 @@ monitor :: forall mode m.
390395
, Alternative (STM m)
391396
, MonadThrow (STM m)
392397
)
393-
=> Tracer m Trace
398+
=> Tracers m
394399
-> TimeoutFn m
395400
-> JobPool.JobPool Group m JobResult
396401
-> EgressQueue m
397402
-> StrictTQueue m (ControlCmd mode m)
398403
-> StrictTVar m Status
399404
-> m ()
400-
monitor tracer timeout jobpool egressQueue cmdQueue muxStatus =
405+
monitor tracers@Tracers {
406+
muxTracer = tracer }
407+
timeout jobpool egressQueue cmdQueue muxStatus =
401408
go (MonitorCtx Map.empty Map.empty)
402409
where
403410
go :: MonitorCtx m mode -> m ()
@@ -433,9 +440,9 @@ monitor tracer timeout jobpool egressQueue cmdQueue muxStatus =
433440
go monitorCtx
434441

435442
EventJobResult (MiniProtocolException pnum pmode e) -> do
436-
traceWith tracer (TraceState Dead)
437-
traceWith tracer (TraceExceptionExit pnum pmode e)
438443
atomically $ writeTVar muxStatus $ Failed e
444+
traceWith tracer (TraceExceptionExit pnum pmode e)
445+
traceWith tracer (TraceState Dead)
439446
throwIO e
440447

441448
-- These two cover internal and protocol errors. The muxer exception is
@@ -447,11 +454,10 @@ monitor tracer timeout jobpool egressQueue cmdQueue muxStatus =
447454
-- the source of the failure, e.g. specific mini-protocol. If we're
448455
-- propagating exceptions, we don't need to log them.
449456
EventJobResult (MuxerException e) -> do
450-
traceWith tracer (TraceState Dead)
451457
atomically $ writeTVar muxStatus $ Failed e
458+
traceWith tracer (TraceState Dead)
452459
throwIO e
453460
EventJobResult (DemuxerException e) -> do
454-
traceWith tracer (TraceState Dead)
455461
r <- atomically $ do
456462
size <- JobPool.readGroupSize jobpool MiniProtocolJob
457463
case size of
@@ -460,6 +466,7 @@ monitor tracer timeout jobpool egressQueue cmdQueue muxStatus =
460466
>> return True
461467
_ -> writeTVar muxStatus (Failed e)
462468
>> return False
469+
traceWith tracer (TraceState Dead)
463470
unless r (throwIO e)
464471

465472
EventControlCmd (CmdStartProtocolThread
@@ -478,14 +485,14 @@ monitor tracer timeout jobpool egressQueue cmdQueue muxStatus =
478485
Nothing ->
479486
JobPool.forkJob jobpool $
480487
miniProtocolJob
481-
tracer
488+
tracers
482489
egressQueue
483490
ptclState
484491
ptclAction
485492
Just cap ->
486493
JobPool.forkJobOn cap jobpool $
487494
miniProtocolJob
488-
tracer
495+
tracers
489496
egressQueue
490497
ptclState
491498
ptclAction
@@ -585,14 +592,14 @@ monitor tracer timeout jobpool egressQueue cmdQueue muxStatus =
585592
Nothing ->
586593
JobPool.forkJob jobpool $
587594
miniProtocolJob
588-
tracer
595+
tracers
589596
egressQueue
590597
ptclState
591598
ptclAction
592599
Just cap ->
593600
JobPool.forkJobOn cap jobpool $
594601
miniProtocolJob
595-
tracer
602+
tracers
596603
egressQueue
597604
ptclState
598605
ptclAction
@@ -654,7 +661,7 @@ muxChannel
654661
-> IngressQueue m
655662
-> ByteChannel m
656663
muxChannel tracer egressQueue want@(Wanton w) mc md q =
657-
Channel { send, recv}
664+
Channel { send, recv }
658665
where
659666
-- Limit for the message buffer between send and mux thread.
660667
perMiniProtocolBufferSize :: Int64
@@ -797,4 +804,3 @@ runMiniProtocol Mux { muxMiniProtocols, muxControlCmdQueue , muxStatus}
797804
<|> return (Left $ toException (Shutdown Nothing st))
798805
Failed e -> readTMVar completionVar
799806
<|> return (Left $ toException (Shutdown (Just e) st))
800-

network-mux/src/Network/Mux/Trace.hs

Lines changed: 46 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ module Network.Mux.Trace
1010
( Error (..)
1111
, handleIOException
1212
, Trace (..)
13+
, Tracers (..)
1314
, BearerState (..)
1415
, WithBearer (..)
1516
, TraceLabelPeer (..)
@@ -22,6 +23,7 @@ import Text.Printf
2223
import Control.Exception hiding (throwIO)
2324
import Control.Monad.Class.MonadThrow
2425
import Control.Monad.Class.MonadTime.SI
26+
import Control.Tracer (Tracer)
2527
import Data.Bifunctor (Bifunctor (..))
2628
import Data.Word
2729
import GHC.Generics (Generic (..))
@@ -118,9 +120,19 @@ data BearerState = Mature
118120
-- closed.
119121
deriving (Eq, Show)
120122

123+
-- todo The Trace type mixes tags which are output by
124+
-- separate components but share the type. It would make more sense
125+
-- to break this up into separate types. Care must be
126+
-- excercised to ensure that a particular tracer goes
127+
-- into the component that outputs the desired tags. For instance,
128+
-- the low level bearer tags are not output by the tracer which
129+
-- is passed to Mux via 'Tracers'.
130+
121131
-- | Enumeration of Mux events that can be traced.
122132
--
123133
data Trace =
134+
-- low level bearer trace tags (these are not traced by the tracer
135+
-- which is passed to Mux)
124136
TraceRecvHeaderStart
125137
| TraceRecvHeaderEnd SDUHeader
126138
| TraceRecvDeltaQObservation SDUHeader Time
@@ -131,27 +143,37 @@ data Trace =
131143
| TraceSendStart SDUHeader
132144
| TraceSendEnd
133145
| TraceState BearerState
134-
| TraceCleanExit MiniProtocolNum MiniProtocolDir
135-
| TraceExceptionExit MiniProtocolNum MiniProtocolDir SomeException
136-
| TraceChannelRecvStart MiniProtocolNum
137-
| TraceChannelRecvEnd MiniProtocolNum Int
138-
| TraceChannelSendStart MiniProtocolNum Int
139-
| TraceChannelSendEnd MiniProtocolNum
146+
| TraceSDUReadTimeoutException
147+
| TraceSDUWriteTimeoutException
148+
| TraceTCPInfo StructTCPInfo Word16
149+
-- low level handshake bearer tags (not traced by tracer in Mux)
140150
| TraceHandshakeStart
141151
| TraceHandshakeClientEnd DiffTime
142152
| TraceHandshakeServerEnd
143153
| forall e. Exception e => TraceHandshakeClientError e DiffTime
144154
| forall e. Exception e => TraceHandshakeServerError e
145-
| TraceSDUReadTimeoutException
146-
| TraceSDUWriteTimeoutException
155+
-- mid level channel tags traced independently by each mini protocol
156+
-- job in Mux, for each complete message, by the 'channelTracer'
157+
-- within 'Tracers'
158+
| TraceChannelRecvStart MiniProtocolNum
159+
| TraceChannelRecvEnd MiniProtocolNum Int
160+
| TraceChannelSendStart MiniProtocolNum Int
161+
| TraceChannelSendEnd MiniProtocolNum
162+
-- high level Mux tags traced by the main Mux/Connection handler
163+
-- thread forked by CM. These may be monitored by the inbound
164+
-- governor information channel tracer. These should be traced
165+
-- by muxTracer of 'Tracers' and their ordering
166+
-- is significant at call sites or bad things will happen.
167+
-- You have been warned.
168+
| TraceCleanExit MiniProtocolNum MiniProtocolDir
169+
| TraceExceptionExit MiniProtocolNum MiniProtocolDir SomeException
147170
| TraceStartEagerly MiniProtocolNum MiniProtocolDir
148171
| TraceStartOnDemand MiniProtocolNum MiniProtocolDir
149172
| TraceStartOnDemandAny MiniProtocolNum MiniProtocolDir
150173
| TraceStartedOnDemand MiniProtocolNum MiniProtocolDir
151174
| TraceTerminating MiniProtocolNum MiniProtocolDir
152175
| TraceStopping
153176
| TraceStopped
154-
| TraceTCPInfo StructTCPInfo Word16
155177

156178
instance Show Trace where
157179
show TraceRecvHeaderStart = printf "Bearer Receive Header Start"
@@ -208,3 +230,18 @@ instance Show Trace where
208230
show (TraceTCPInfo _ len) = printf "TCPInfo len %d" len
209231
#endif
210232

233+
-- | Bundle of tracers passed to mux
234+
-- Consult the 'Trace' type to determine which
235+
-- tags are required/expected to be served by these tracers.
236+
-- In principle, the channelTracer can be == muxTracer
237+
-- but performance likely degrades in typical conditions
238+
-- unnecessarily.
239+
--
240+
data Tracers m = Tracers {
241+
channelTracer :: Tracer m Trace,
242+
-- ^ a low level tracer for events emitted by a bearer. It emits events as frequently
243+
-- as receiving individual `SDU`s from the network.
244+
muxTracer :: Tracer m Trace
245+
-- ^ mux events which are emitted less frequently. It emits events which allow one
246+
-- to observe the current state of a mini-protocol.
247+
}

0 commit comments

Comments
 (0)