@@ -27,12 +27,15 @@ module Main (main) where
2727-- from `aeson`
2828import Data.Aeson (FromJSON, ToJSON)
2929
30+ -- from `async`
31+ import Control.Concurrent.Async (async, link, withAsync)
32+
3033-- from `base`
3134import Control.Concurrent (threadDelay, forkIO, killThread)
3235import Control.Concurrent.Chan (Chan, newChan, readChan, writeChan)
3336import Control.Concurrent.MVar (MVar, newEmptyMVar, putMVar, isEmptyMVar)
3437import Control.Exception (throwIO, bracket)
35- import Control.Monad (forever)
38+ import Control.Monad (forever, (<=<) )
3639import qualified Data.List
3740import Data.Proxy (Proxy(Proxy))
3841import GHC.Generics (Generic)
@@ -68,7 +71,7 @@ import qualified Network.Wai.Handler.Warp as Warp (run)
6871`` `
6972
7073We start with our scenario: we are tasked with creating an API which will serve random numbers in real-time. We are
71- given a function that creates a producer of characters , and a method for us to stop the producer:
74+ given a function that creates a producer of integers , and a method for us to stop the producer:
7275
7376
7477`` `haskell
@@ -83,7 +86,8 @@ createProducer = do
8386
8487 -- Writer thread will feed our Chan forever. This is where
8588 -- your secret random number generation algorithm would go.
86- -- Unfortunately, we have some technical debt here.
89+ -- For this example, we are using a deterministic stream
90+ -- of numbers, where [1,5,10,20,45] is repeated forever.
8791 _ <- forkIO (go (cycle [1,5,10,20,45]) chan isDone)
8892
8993 pure ( chan
@@ -172,31 +176,26 @@ allocate = do
172176 -- Producer from upstream
173177 (intChan, weAreDone) <- createProducer
174178
175- let -- action to spawn a thread that will continuously write 'KeepAlive' messages
176- keepalive = forkIO (forever (threadDelay 100_000 *> writeChan toDownstream KeepAlive))
177-
178- -- The function below, `go`, is used to forward elements from the upstream
179- -- producer 'intChan' to the 'toDownstream' channel.
180- --
181- -- The wrinkle is that we must send data downstream regularly. Therefore, every time
182- -- a new element is produced by 'toDownstream' , we reset the keepalive thread
183- -- (named 'keepAliveThreadId ' ) by killing it and starting it again.
184- --
185- -- This ensures:
186- --
187- -- * that we send data (either an `Element` or `KeepAlive`) every 0.1 seconds at most;
188- -- * that we do not send more `KeepAlive` messages than necessary.
189- go keepAliveThreadId = do
190- readChan intChan >>= writeChan toDownstream . Element
191- killThread keepAliveThreadId
192- keepalive >>= go
193-
194- loopThreadId <- forkIO (keepalive >>= go)
179+ -- See comment below
180+ (link <=< async) $ interleaveLoop intChan toDownstream
195181
196182 pure (Upstream { getNext = readChan toDownstream
197- , pleaseStop = weAreDone >> killThread loopThreadId
183+ , pleaseStop = weAreDone
198184 }
199185 )
186+ where
187+ -- This loop interleaves integers from upstream, with keep-alive
188+ -- messages.
189+ --
190+ -- The logic here is to spawn a thread that feeds the 'toDownstream' channel
191+ -- with keep-alive messages regularly, until 'readChan intChan' succeeds. At this point,
192+ -- we feed the integer to downstream, and 'withAsync' exits, cancelling
193+ -- the loop feeding 'KeepAlive' messages.
194+ interleaveLoop intChan toDownstream = do
195+ withAsync
196+ (forever $ threadDelay 100_000 *> writeChan toDownstream KeepAlive)
197+ (\_ -> readChan intChan >>= writeChan toDownstream . Element)
198+ interleaveLoop intChan toDownstream
200199`` `
201200
202201Finally, we must tell our server how to allocate and deallocate an `Upstream Int`. The `allocate` function
0 commit comments