Skip to content

Commit 8269f92

Browse files
committed
Add event as part of message type so we have metadata to distinguish between messages coming from WS clients and messages generated by the server
1 parent 4f230b1 commit 8269f92

File tree

3 files changed

+32
-24
lines changed

3 files changed

+32
-24
lines changed

src/PostgresWebsockets/HasqlBroadcast.hs

+2-2
Original file line numberDiff line numberDiff line change
@@ -99,11 +99,11 @@ newHasqlBroadcasterForChannel onConnectionFailure ch getCon = do
9999
_ -> d
100100
lookupStringDef _ d _ = d
101101
channelDef = lookupStringDef "channel"
102-
openProducer msgs = do
102+
openProducer msgQ = do
103103
con <- getCon
104104
listen con $ toPgIdentifier ch
105105
waitForNotifications
106-
(\c m-> atomically $ writeTQueue msgs $ toMsg c m)
106+
(\c m-> atomically $ writeTQueue msgQ $ toMsg c m)
107107
con
108108

109109
putErrLn :: Text -> IO ()

src/PostgresWebsockets/Middleware.hs

+28-22
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import Data.Time.Clock (UTCTime)
1515
import Data.Time.Clock.POSIX (utcTimeToPOSIXSeconds, posixSecondsToUTCTime)
1616
import Control.Concurrent.AlarmClock (newAlarmClock, setAlarm)
1717
import qualified Hasql.Notifications as H
18+
import qualified Hasql.Pool as H
1819
import qualified Network.Wai as Wai
1920
import qualified Network.Wai.Handler.WebSockets as WS
2021
import qualified Network.WebSockets as WS
@@ -31,17 +32,25 @@ import PostgresWebsockets.Context ( Context(..) )
3132
import PostgresWebsockets.Config (AppConfig(..))
3233
import qualified PostgresWebsockets.Broadcast as B
3334

35+
36+
data Event =
37+
WebsocketMessage
38+
| ServerStateChange
39+
deriving (Show, Eq, Generic)
40+
3441
data Message = Message
3542
{ claims :: A.Object
36-
, channel :: Text
43+
, event :: Event
3744
, payload :: Text
45+
, channel :: Text
3846
} deriving (Show, Eq, Generic)
3947

48+
instance A.ToJSON Event
4049
instance A.ToJSON Message
4150

4251
-- | Given a secret, a function to fetch the system time, a Hasql Pool and a Multiplexer this will give you a WAI middleware.
4352
postgresWsMiddleware :: Context -> Wai.Middleware
44-
postgresWsMiddleware =
53+
postgresWsMiddleware =
4554
WS.websocketsOr WS.defaultConnectionOptions . wsApp
4655

4756
-- private functions
@@ -85,15 +94,15 @@ wsApp Context{..} pendingConn =
8594
Just _ -> pure ()
8695
Nothing -> pure ()
8796

88-
let sendNotification =
89-
relayChannelData
90-
(void . H.notifyPool ctxPool (configListenChannel ctxConfig) . toS)
91-
validClaims
92-
ctxGetTime
97+
let sendNotification msg channel = sendMessageWithTimestamp $ websocketMessageForChannel msg channel
98+
sendMessageToDatabase = sendToDatabase ctxPool (configListenChannel ctxConfig)
99+
sendMessageWithTimestamp = timestampMessage ctxGetTime >=> sendMessageToDatabase
100+
websocketMessageForChannel = Message validClaims WebsocketMessage
101+
serverStateChangeMessage = Message validClaims ServerStateChange
93102

94103
case configMetaChannel ctxConfig of
95104
Nothing -> pure ()
96-
Just ch -> sendNotification "Connecion Open" ch
105+
Just ch -> sendMessageWithTimestamp $ serverStateChangeMessage "Connecion Open" ch
97106

98107
when (hasRead mode) $
99108
forM_ chs $ flip (onMessage ctxMulti) $ WS.sendTextData conn . B.payload
@@ -107,25 +116,22 @@ wsApp Context{..} pendingConn =
107116
-- Having both channel and claims as parameters seem redundant
108117
-- But it allows the function to ignore the claims structure and the source
109118
-- of the channel, so all claims decoding can be coded in the caller
110-
notifySession :: WS.Connection -> (ByteString -> Text -> IO ()) -> [ByteString] -> IO ()
119+
notifySession :: WS.Connection -> (Text -> Text -> IO ()) -> [ByteString] -> IO ()
111120
notifySession wsCon sendToChannel chs =
112121
withAsync (forever relayData) wait
113122
where
114-
relayData = do
123+
relayData = do
115124
msg <- WS.receiveData wsCon
116125
forM_ chs (sendToChannel msg . toS)
117126

118-
relayChannelData :: (ByteString -> IO ()) -> A.Object -> IO UTCTime -> ByteString -> Text -> IO ()
119-
relayChannelData send claimsToSend getTime msg ch =
120-
claimsWithTime >>= (send . jsonMsg)
127+
sendToDatabase :: H.Pool -> Text -> Message -> IO ()
128+
sendToDatabase pool dbChannel =
129+
notify . jsonMsg
121130
where
122-
-- we need to decode the bytestring to re-encode valid JSON for the notification
123-
jsonMsg :: M.HashMap Text A.Value -> ByteString
124-
jsonMsg cl = BL.toStrict . A.encode . Message cl ch . decodeUtf8With T.lenientDecode $ msg
125-
126-
claimsWithTime :: IO (M.HashMap Text A.Value)
127-
claimsWithTime = do
128-
time <- utcTimeToPOSIXSeconds <$> getTime
129-
return $ M.insert "message_delivered_at" (A.Number $ realToFrac time) claimsWithChannel
131+
notify = void . H.notifyPool pool dbChannel . toS
132+
jsonMsg = BL.toStrict . A.encode
130133

131-
claimsWithChannel = M.insert "channel" (A.String ch) claimsToSend
134+
timestampMessage :: IO UTCTime -> Message -> IO Message
135+
timestampMessage getTime msg@Message{..} = do
136+
time <- utcTimeToPOSIXSeconds <$> getTime
137+
return $ msg{ claims = M.insert "message_delivered_at" (A.Number $ realToFrac time) claims}

test/ServerSpec.hs

+2
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,10 @@ testServerConfig = AppConfig
2020
, configPort = 8080
2121
, configListenChannel = "postgres-websockets-test-channel"
2222
, configJwtSecret = "reallyreallyreallyreallyverysafe"
23+
, configMetaChannel = Nothing
2324
, configJwtSecretIsBase64 = False
2425
, configPool = 10
26+
, configRetries = 5
2527
}
2628

2729
startTestServer :: IO ThreadId

0 commit comments

Comments
 (0)