Skip to content

Commit ed1bf53

Browse files
authored
Infinite steams cookbook recipe (#1849)
* Infinite steams cookbook recipe * Description
1 parent adb9b0b commit ed1bf53

File tree

5 files changed

+318
-0
lines changed

5 files changed

+318
-0
lines changed

cabal.project

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ packages:
5050
doc/cookbook/using-free-client
5151
-- doc/cookbook/open-id-connect
5252
doc/cookbook/managed-resource
53+
doc/cookbook/infinite-streams
5354
doc/cookbook/openapi3
5455
doc/cookbook/expose-prometheus
5556

changelog.d/pr-1849

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
synopsys: Cookbook recipe on serving infinite HTTP streams
2+
prs: 1849
3+
description: {
4+
Servant provides facilities to work with streams of data. In this cookbook, we will be concerned
5+
with serving infinite HTTP streams.
6+
7+
An infinite HTTP stream differs from a finite stream in two major ways. First, resource
8+
cleanup is non-deterministic. Second, data must be sent back to the client regularly to prevent
9+
the connection from closing. Both of these challenges are addressed in this recipe.
10+
}

doc/cookbook/index.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,3 +42,4 @@ you name it!
4242
testing/Testing.lhs
4343
open-id-connect/OpenIdConnect.lhs
4444
managed-resource/ManagedResource.lhs
45+
infinite-streams/InfiniteStreams.lhs
Lines changed: 279 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,279 @@
1+
# Serving infinite streams
2+
3+
Servant provides facilities to work with streams of data. This is handy for cases where the data may take a while to
4+
fetch, but we can start returning data early. In this cookbook, we will be concerned with serving _infinite_ HTTP
5+
streams.
6+
7+
HTTP streams have many advantages compared to other streaming standards like websockets: they are simple
8+
and are well-supported by a broad range of intermediaries, such as proxy servers, content-delivery networks, and
9+
corporate firewalls.
10+
11+
An _infinite_ HTTP stream differs from a _finite_ stream in two major ways. First, cleaning up resources (such as
12+
connections) associated with infinite streams is not deterministic. We do not know when the handler will
13+
terminate -- if at all! Second, we want to ensure that the connection isn't cut off because no data is flowing.
14+
To keep the connection alive, we will need to send bytes on a regular basis.
15+
16+
This is a Literate Haskell file, so let's get imports out of the way.
17+
18+
```haskell
19+
{-# LANGUAGE BangPatterns #-}
20+
{-# LANGUAGE DeriveGeneric #-}
21+
{-# LANGUAGE LambdaCase #-}
22+
{-# LANGUAGE NumericUnderscores #-}
23+
{-# LANGUAGE OverloadedStrings #-}
24+
{-# LANGUAGE TypeOperators #-}
25+
module Main (main) where
26+
27+
-- from `aeson`
28+
import Data.Aeson (FromJSON, ToJSON)
29+
30+
-- from `async`
31+
import Control.Concurrent.Async (async, link, withAsync)
32+
33+
-- from `base`
34+
import Control.Concurrent (threadDelay, forkIO, killThread)
35+
import Control.Concurrent.Chan (Chan, newChan, readChan, writeChan)
36+
import Control.Concurrent.MVar (MVar, newEmptyMVar, putMVar, isEmptyMVar)
37+
import Control.Exception (throwIO, bracket)
38+
import Control.Monad (forever, (<=<))
39+
import qualified Data.List
40+
import Data.Proxy (Proxy(Proxy))
41+
import GHC.Generics (Generic)
42+
43+
-- from `http-client`
44+
import Network.HTTP.Client (newManager, defaultManagerSettings)
45+
46+
-- from the `resourcet` package
47+
import Control.Monad.Trans.Resource (ReleaseKey)
48+
import Data.Acquire ( mkAcquire, Acquire )
49+
50+
-- from `servant`
51+
import Servant
52+
( WithResource,
53+
type (:>),
54+
StreamGet,
55+
NewlineFraming,
56+
JSON,
57+
SourceIO,
58+
Context((:.), EmptyContext),
59+
Handler,
60+
Application )
61+
import qualified Servant.Types.SourceT as SourceT
62+
63+
-- from `servant-client`
64+
import Servant.Client.Streaming (ClientM, mkClientEnv, client, withClientM, BaseUrl (..), Scheme (Http))
65+
66+
-- from `servant-server`
67+
import Servant.Server (serveWithContext)
68+
69+
-- from `warp`
70+
import qualified Network.Wai.Handler.Warp as Warp (run)
71+
```
72+
73+
We start with our scenario: we are tasked with creating an API which will serve random numbers in real-time. We are
74+
given a function that creates a producer of integers, and a method for us to stop the producer:
75+
76+
77+
```haskell
78+
createProducer :: IO (Chan Int, IO ())
79+
createProducer = do
80+
chan <- newChan
81+
isDone <- newEmptyMVar
82+
83+
let -- This is the action that the consumer of the stream
84+
-- can run to stop feeding the channel
85+
weAreDone = putMVar isDone ()
86+
87+
-- Writer thread will feed our Chan forever. This is where
88+
-- your secret random number generation algorithm would go.
89+
-- For this example, we are using a deterministic stream
90+
-- of numbers, where [1,5,10,20,45] is repeated forever.
91+
_ <- forkIO (go (cycle [1,5,10,20,45]) chan isDone)
92+
93+
pure ( chan
94+
, weAreDone
95+
)
96+
where
97+
go :: [Int] -> Chan Int -> MVar () -> IO ()
98+
go stream chan isDone = do
99+
isEmpty <- isEmptyMVar isDone
100+
if not isEmpty
101+
-- We are done
102+
then pure ()
103+
else case Data.List.uncons stream of
104+
Nothing -> throwIO (userError "Impossible!")
105+
Just (nextNum, restStream) -> do
106+
-- We simulate a random delay in how numbers are returned.
107+
threadDelay (nextNum * 7_000)
108+
chan `writeChan` nextNum
109+
go restStream chan isDone
110+
```
111+
112+
This was a lot of set up; we now have the ability to create an infinite stream of integers, and message the producer
113+
that we are not listening anymore. In practice, this might mean connecting and disconnecting from a source of
114+
data upstream, for example.
115+
116+
We will now define our API. It has a single route: a method for a subscriber to subscribe to our infinite stream
117+
of random numbers. As mentioned previously, there might be a long time between integers being generated upstream.
118+
We will need to send some bytes just to keep the connection open. To do this, we create a type for the elements
119+
of our infinite stream:
120+
121+
```haskell
122+
data InfiniteStream a = Element a | KeepAlive
123+
deriving (Show, Generic)
124+
125+
-- For brevity, we derive these instances generically.
126+
-- In production, you can optimize the representation
127+
-- much better.
128+
instance ToJSON a => ToJSON (InfiniteStream a)
129+
instance FromJSON a => FromJSON (InfiniteStream a)
130+
```
131+
132+
We'll also need to package our resources into a specific type, `Upstream`:
133+
134+
```haskell
135+
data Upstream a =
136+
Upstream { getNext :: IO (InfiniteStream a)
137+
-- ^ Fetch the next element to forward to the client
138+
, pleaseStop :: IO ()
139+
-- ^ Notify upstream to stop sending data
140+
}
141+
```
142+
143+
`Upstream` is a data type which we want to allocate for a handler, and deallocate once the connection
144+
is closed, which means we want to involve `WithResource`. The API definition becomes:
145+
146+
```haskell
147+
type InfiniteIntegersAPI
148+
= WithResource (Upstream Int)
149+
:> StreamGet
150+
NewlineFraming
151+
JSON
152+
(SourceIO (InfiniteStream Int))
153+
```
154+
155+
Let's write our handler, which is pretty simple: return an infinite stream by
156+
continuously calling `getNext`:
157+
158+
```haskell
159+
handleInfiniteIntegersAPI :: (ReleaseKey, Upstream Int) -> Handler (SourceIO (InfiniteStream Int))
160+
handleInfiniteIntegersAPI (_, upstream) =
161+
let neverStop = const False
162+
in pure (SourceT.fromAction neverStop (getNext upstream))
163+
```
164+
165+
Now for the tricky bit. We need to produce data on a regular basis, even if there are no
166+
numbers available upstream. Typically, a HTTP server will break connections after 30 seconds without data.
167+
For this example, we'll provide data 0.1 seconds so that the example runs quickly. We do this when we
168+
allocate a new `Upstream` in `allocate`:
169+
170+
```haskell
171+
allocate :: IO (Upstream Int)
172+
allocate = do
173+
-- Channel that will feed the client
174+
toDownstream <- newChan
175+
176+
-- Producer from upstream
177+
(intChan, weAreDone) <- createProducer
178+
179+
-- See comment below
180+
(link <=< async) $ interleaveLoop intChan toDownstream
181+
182+
pure (Upstream { getNext = readChan toDownstream
183+
, pleaseStop = weAreDone
184+
}
185+
)
186+
where
187+
-- This loop interleaves integers from upstream, with keep-alive
188+
-- messages.
189+
--
190+
-- The logic here is to spawn a thread that feeds the 'toDownstream' channel
191+
-- with keep-alive messages regularly, until 'readChan intChan' succeeds. At this point,
192+
-- we feed the integer to downstream, and 'withAsync' exits, cancelling
193+
-- the loop feeding 'KeepAlive' messages.
194+
interleaveLoop intChan toDownstream = do
195+
withAsync
196+
(forever $ threadDelay 100_000 *> writeChan toDownstream KeepAlive)
197+
(\_ -> readChan intChan >>= writeChan toDownstream . Element)
198+
interleaveLoop intChan toDownstream
199+
```
200+
201+
Finally, we must tell our server how to allocate and deallocate an `Upstream Int`. The `allocate` function
202+
below is executed when a client connects, and the `deallocate` function is executed when the connection is
203+
closed in any way:
204+
205+
```haskell
206+
withUpstream :: Acquire (Upstream Int)
207+
withUpstream = mkAcquire allocate pleaseStop
208+
```
209+
210+
We now have all the pieces to assemble our server:
211+
212+
```haskell
213+
server :: Application
214+
server = serveWithContext
215+
(Proxy :: Proxy InfiniteIntegersAPI)
216+
(withUpstream :. EmptyContext)
217+
handleInfiniteIntegersAPI
218+
```
219+
220+
and our client:
221+
222+
```haskell
223+
getInfiniteIntegers :: ClientM (SourceIO (InfiniteStream Int))
224+
getInfiniteIntegers = client (Proxy :: Proxy InfiniteIntegersAPI)
225+
```
226+
227+
We can see how they interact:
228+
229+
```haskell
230+
main :: IO ()
231+
main = do
232+
mgr <- newManager defaultManagerSettings
233+
let url = (BaseUrl Http "localhost" 8080 "")
234+
bracket (forkIO (Warp.run 8080 server)) killThread (\_ -> do
235+
threadDelay 100_000
236+
withClientM getInfiniteIntegers (mkClientEnv mgr url) (\case
237+
Left clientError -> throwIO clientError
238+
Right stream -> SourceT.unSourceT stream go
239+
)
240+
)
241+
where
242+
go (SourceT.Yield !incoming next) = print incoming >> go next
243+
go (SourceT.Effect !x) = x >>= go
244+
go (SourceT.Skip !next) = go next
245+
-- This cookbook recipe is concerned with infinite streams. While
246+
-- the following two cases should be unreachable, we handle
247+
-- them for completeness.
248+
go (SourceT.Error err) = throwIO (userError err)
249+
go (SourceT.Stop) = pure ()
250+
```
251+
252+
Running this program shows:
253+
254+
```
255+
Element 1
256+
Element 5
257+
Element 10
258+
KeepAlive
259+
Element 20
260+
KeepAlive
261+
KeepAlive
262+
KeepAlive
263+
Element 45
264+
Element 1
265+
Element 5
266+
Element 10
267+
KeepAlive
268+
Element 20
269+
KeepAlive
270+
KeepAlive
271+
KeepAlive
272+
Element 45
273+
Element 1
274+
Element 5
275+
Element 10
276+
KeepAlive
277+
Element 20
278+
...
279+
```
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
cabal-version: 2.2
2+
name: cookbook-infinite-streams
3+
version: 2.1
4+
synopsis: Serving infinite streams
5+
homepage: http://docs.servant.dev/
6+
license: BSD-3-Clause
7+
license-file: ../../../servant/LICENSE
8+
author: Servant Contributors
9+
maintainer: [email protected]
10+
build-type: Simple
11+
12+
executable cookbook-infinite-streams
13+
main-is: InfiniteStreams.lhs
14+
build-tool-depends: markdown-unlit:markdown-unlit
15+
default-language: Haskell2010
16+
ghc-options: -Wall -pgmL markdown-unlit -Wunused-packages -threaded -rtsopts -with-rtsopts=-N
17+
18+
hs-source-dirs: .
19+
build-depends: base >= 4.8 && <5
20+
, aeson
21+
, async
22+
, http-client
23+
, resourcet
24+
, servant
25+
, servant-server
26+
, servant-client
27+
, warp

0 commit comments

Comments
 (0)