Skip to content

Commit

Permalink
cannon: Use exclusive auto-delete queues for temp clients
Browse files Browse the repository at this point in the history
Also:

* Use Data.Unique.Unqiue to keep track of channels instead of (ByteString,
ByteString) because it is way less fuss to generate the Unqiue and it doesn't
really need to be (UserId, ClientId) tuple. This also removes polymorphism for
the RabbitMqPool type.

* Log conneciton close while draining at Debug level instead of Info
  • Loading branch information
akshaymankar committed Dec 17, 2024
1 parent 3166e11 commit d60fc4a
Show file tree
Hide file tree
Showing 8 changed files with 45 additions and 72 deletions.
18 changes: 2 additions & 16 deletions libs/wire-api/src/Wire/API/Notification.hs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ module Wire.API.Notification
userNotificationExchangeName,
userNotificationDlxName,
userNotificationDlqName,
RabbitMqClientId (..),
clientNotificationQueueName,
userRoutingKey,
temporaryRoutingKey,
Expand All @@ -51,7 +50,6 @@ import Control.Lens.Operators ((?~))
import Data.Aeson (FromJSON (..), ToJSON (..))
import Data.Aeson.Types qualified as Aeson
import Data.Bits
import Data.ByteString.Conversion
import Data.HashMap.Strict.InsOrd qualified as InsOrdHashMap
import Data.Id
import Data.Json.Util
Expand Down Expand Up @@ -191,17 +189,9 @@ userNotificationDlxName = "dead-user-notifications"
userNotificationDlqName :: Text
userNotificationDlqName = "dead-user-notifications"

data RabbitMqClientId
= RabbitMqClientId ClientId
| RabbitMqTempId Text

instance ToByteString RabbitMqClientId where
builder (RabbitMqClientId cid) = builder cid
builder (RabbitMqTempId temp) = builder temp

clientNotificationQueueName :: UserId -> RabbitMqClientId -> Text
clientNotificationQueueName :: UserId -> ClientId -> Text
clientNotificationQueueName uid cid =
"user-notifications." <> userRoutingKey uid <> "." <> rabbitMqClientToText cid
"user-notifications." <> userRoutingKey uid <> "." <> clientToText cid

userRoutingKey :: UserId -> Text
userRoutingKey = idToText
Expand All @@ -211,7 +201,3 @@ clientRoutingKey uid cid = userRoutingKey uid <> "." <> clientToText cid

temporaryRoutingKey :: UserId -> Text
temporaryRoutingKey uid = userRoutingKey uid <> ".temporary"

rabbitMqClientToText :: RabbitMqClientId -> Text
rabbitMqClientToText (RabbitMqClientId cid) = clientToText cid
rabbitMqClientToText (RabbitMqTempId temp) = "temp-" <> temp
1 change: 0 additions & 1 deletion services/cannon/cannon.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,6 @@ library
, lens >=4.4
, lens-family-core >=1.1
, metrics-wai >=0.4
, MonadRandom
, mwc-random >=0.13
, prometheus-client
, retry >=0.7
Expand Down
2 changes: 0 additions & 2 deletions services/cannon/default.nix
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
, lens-family-core
, lib
, metrics-wai
, MonadRandom
, mwc-random
, prometheus-client
, QuickCheck
Expand Down Expand Up @@ -92,7 +91,6 @@ mkDerivation {
lens
lens-family-core
metrics-wai
MonadRandom
mwc-random
prometheus-client
retry
Expand Down
38 changes: 20 additions & 18 deletions services/cannon/src/Cannon/RabbitMq.hs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import Data.List.Extra
import Data.Map qualified as Map
import Data.Text qualified as T
import Data.Timeout
import Data.Unique
import Imports hiding (threadDelay)
import Network.AMQP qualified as Q
import Network.AMQP.Extended
Expand All @@ -41,16 +42,16 @@ data RabbitMqPoolException

instance Exception RabbitMqPoolException

data PooledConnection key = PooledConnection
data PooledConnection = PooledConnection
{ connId :: Word64,
inner :: Q.Connection,
channels :: !(Map key Q.Channel)
channels :: !(Map Unique Q.Channel)
}

data RabbitMqPool key = RabbitMqPool
data RabbitMqPool = RabbitMqPool
{ opts :: RabbitMqPoolOptions,
nextId :: TVar Word64,
connections :: TVar [PooledConnection key],
connections :: TVar [PooledConnection],
-- | draining mode
draining :: TVar Bool,
logger :: Logger,
Expand All @@ -64,7 +65,7 @@ data RabbitMqPoolOptions = RabbitMqPoolOptions
retryEnabled :: Bool
}

createRabbitMqPool :: (Ord key) => RabbitMqPoolOptions -> Logger -> Codensity IO (RabbitMqPool key)
createRabbitMqPool :: RabbitMqPoolOptions -> Logger -> Codensity IO RabbitMqPool
createRabbitMqPool opts logger = Codensity $ bracket create destroy
where
create = do
Expand All @@ -78,7 +79,7 @@ createRabbitMqPool opts logger = Codensity $ bracket create destroy
pure pool
destroy pool = putMVar pool.deadVar ()

drainRabbitMqPool :: (ToByteString key) => RabbitMqPool key -> DrainOpts -> IO ()
drainRabbitMqPool :: RabbitMqPool -> DrainOpts -> IO ()
drainRabbitMqPool pool opts = do
atomically $ writeTVar pool.draining True

Expand Down Expand Up @@ -118,11 +119,11 @@ drainRabbitMqPool pool opts = do
(liftIO $ threadDelay ((opts ^. millisecondsBetweenBatches) # MilliSecond))
Log.info pool.logger $ Log.msg (Log.val "Draining complete")
where
closeChannel :: (ToByteString key) => Log.Logger -> (key, Q.Channel) -> IO ()
closeChannel :: Log.Logger -> (Unique, Q.Channel) -> IO ()
closeChannel l (key, chan) = do
Log.info l $
Log.debug l $
Log.msg (Log.val "closing rabbitmq channel")
. Log.field "key" (toByteString' key)
. Log.field "key_hash" (toByteString' $ hashUnique key)
Q.closeChannel chan

logExpired :: Log.Logger -> Word64 -> IO ()
Expand All @@ -139,7 +140,7 @@ drainRabbitMqPool pool opts = do
. Log.field "batchSize" batchSize
. Log.field "maxNumberOfBatches" m

createConnection :: (Ord key) => RabbitMqPool key -> IO (PooledConnection key)
createConnection :: RabbitMqPool -> IO PooledConnection
createConnection pool = mask_ $ do
conn <- openConnection pool
mpconn <- runMaybeT . atomically $ do
Expand Down Expand Up @@ -176,7 +177,7 @@ createConnection pool = mask_ $ do
putMVar closedVar ()
pure pconn

openConnection :: RabbitMqPool key -> IO Q.Connection
openConnection :: RabbitMqPool -> IO Q.Connection
openConnection pool = do
-- This might not be the correct connection ID that will eventually be
-- assigned to this connection, since there are potential races with other
Expand Down Expand Up @@ -229,13 +230,14 @@ ackMessage chan deliveryTag multiple = do

type QueueName = Text

type CreateQueue = Q.Channel -> Codensity IO ()
type CreateQueue = Q.Channel -> Codensity IO QueueName

createChannel :: (Ord key) => RabbitMqPool key -> QueueName -> CreateQueue -> key -> Codensity IO RabbitMqChannel
createChannel pool queueName createQueue key = do
createChannel :: RabbitMqPool -> CreateQueue -> Codensity IO RabbitMqChannel
createChannel pool createQueue = do
closedVar <- lift newEmptyMVar
inner <- lift newEmptyMVar
msgVar <- lift newEmptyMVar
key <- lift newUnique

let handleException e = do
retry <- case (Q.isNormalChannelClose e, fromException e) of
Expand Down Expand Up @@ -267,7 +269,7 @@ createChannel pool queueName createQueue key = do
if connSize > pool.opts.maxChannels
then pure True
else do
createQueue chan
queueName <- createQueue chan

liftIO $ Q.addChannelExceptionHandler chan handleException
putMVar inner chan
Expand All @@ -286,7 +288,7 @@ createChannel pool queueName createQueue key = do
`finally` putMVar msgVar Nothing
pure RabbitMqChannel {inner = inner, msgVar = msgVar}

acquireConnection :: (Ord key) => RabbitMqPool key -> IO (PooledConnection key)
acquireConnection :: RabbitMqPool -> IO PooledConnection
acquireConnection pool = do
findConnection pool >>= \case
Nothing -> do
Expand All @@ -301,7 +303,7 @@ acquireConnection pool = do
pure conn
Just conn -> pure conn

findConnection :: RabbitMqPool key -> IO (Maybe (PooledConnection key))
findConnection :: RabbitMqPool -> IO (Maybe PooledConnection)
findConnection pool = (either throwIO pure <=< (atomically . runExceptT . runMaybeT)) $ do
conns <- lift . lift $ readTVar pool.connections
guard (notNull conns)
Expand All @@ -313,7 +315,7 @@ findConnection pool = (either throwIO pure <=< (atomically . runExceptT . runMay
else mzero
pure pconn

releaseConnection :: (Ord key) => RabbitMqPool key -> key -> PooledConnection key -> IO ()
releaseConnection :: RabbitMqPool -> Unique -> PooledConnection -> IO ()
releaseConnection pool key conn = atomically $ do
modifyTVar pool.connections $ map $ \c ->
if c.connId == conn.connId
Expand Down
37 changes: 14 additions & 23 deletions services/cannon/src/Cannon/RabbitMqConsumerApp.hs
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,11 @@ import Cannon.RabbitMq
import Cannon.WS hiding (env)
import Cassandra as C hiding (batch)
import Control.Concurrent.Async
import Control.Exception (Handler (..), bracket, catch, catches, finally, throwIO, try)
import Control.Exception (Handler (..), bracket, catch, catches, throwIO, try)
import Control.Lens hiding ((#))
import Control.Monad.Codensity
import Control.Monad.Random.Class
import Data.Aeson hiding (Key)
import Data.Id
import Data.Text qualified as T
import Imports hiding (min, threadDelay)
import Network.AMQP (newQueue)
import Network.AMQP qualified as Q
Expand Down Expand Up @@ -111,22 +109,20 @@ rabbitMQWebSocketApp uid mcid e pendingConn = do

sendNotifications :: WS.Connection -> IO ()
sendNotifications wsConn = lowerCodensity $ do
cid <- lift $ mkRabbitMqClientId mcid
let key = mkKeyRabbit uid cid
let queueName = clientNotificationQueueName uid cid

let createQueue chan = case mcid of
Nothing -> Codensity $ \k ->
( do
void $ Q.declareQueue chan newQueue {Q.queueName = queueName}
for_ [userRoutingKey uid, temporaryRoutingKey uid] $
Q.bindQueue chan queueName userNotificationExchangeName
k ()
)
`finally` Q.deleteQueue chan queueName
Just _ -> pure ()

chan <- createChannel e.pool queueName createQueue key
Nothing -> Codensity $ \k -> do
(queueName, _, _) <-
Q.declareQueue chan $
newQueue
{ Q.queueExclusive = True,
Q.queueAutoDelete = True
}
for_ [userRoutingKey uid, temporaryRoutingKey uid] $
Q.bindQueue chan queueName userNotificationExchangeName
k queueName
Just cid -> Codensity $ \k -> k $ clientNotificationQueueName uid cid

chan <- createChannel e.pool createQueue

let consumeRabbitMq = forever $ do
eventData <- getEventData chan
Expand Down Expand Up @@ -237,8 +233,3 @@ data WebSocketServerError
deriving (Show)

instance Exception WebSocketServerError

mkRabbitMqClientId :: Maybe ClientId -> IO RabbitMqClientId
mkRabbitMqClientId (Just cid) = pure (RabbitMqClientId cid)
mkRabbitMqClientId Nothing =
RabbitMqTempId . T.pack <$> replicateM 8 (getRandomR ('a', 'z'))
5 changes: 3 additions & 2 deletions services/cannon/src/Cannon/Types.hs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import Control.Monad.Catch
import Control.Monad.Codensity
import Data.Id
import Data.Text.Encoding
import Data.Unique
import Imports
import Network.AMQP qualified as Q
import Network.AMQP.Extended (AmqpEndpoint)
Expand All @@ -63,7 +64,7 @@ data Env = Env
{ opts :: !Opts,
applog :: !Logger,
websockets :: !(Dict Key Websocket),
rabbitConnections :: (Dict Key Q.Connection),
rabbitConnections :: (Dict Unique Q.Connection),
reqId :: !RequestId,
env :: !WS.Env
}
Expand Down Expand Up @@ -103,7 +104,7 @@ mkEnv ::
ClientState ->
Logger ->
Dict Key Websocket ->
Dict Key Q.Connection ->
Dict Unique Q.Connection ->
Manager ->
GenIO ->
Clock ->
Expand Down
14 changes: 5 additions & 9 deletions services/cannon/src/Cannon/WS.hs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ module Cannon.WS
connIdent,
Key,
mkKey,
mkKeyRabbit,
key2bytes,
client,
sendMsg,
Expand Down Expand Up @@ -70,6 +69,7 @@ import Data.Id (ClientId, ConnId (..), UserId, defRequestId)
import Data.List.Extra (chunksOf)
import Data.Text.Encoding (decodeUtf8)
import Data.Timeout (TimeoutUnit (..), (#))
import Data.Unique
import Imports hiding (threadDelay)
import Network.AMQP qualified as Q
import Network.HTTP.Types.Method
Expand All @@ -80,7 +80,6 @@ import System.Logger qualified as Logger
import System.Logger.Class hiding (Error, Settings, close, (.=))
import System.Random.MWC (GenIO, uniform)
import UnliftIO.Async (async, cancel, pooledMapConcurrentlyN_)
import Wire.API.Notification
import Wire.API.Presence

-----------------------------------------------------------------------------
Expand All @@ -94,9 +93,6 @@ newtype Key = Key
mkKey :: UserId -> ConnId -> Key
mkKey u c = Key (toByteString' u, fromConnId c)

mkKeyRabbit :: UserId -> RabbitMqClientId -> Key
mkKeyRabbit u c = Key (toByteString' u, toByteString' c)

instance ToByteString Key where
builder = B.fromByteString . key2bytes

Expand Down Expand Up @@ -155,12 +151,12 @@ data Env = Env
logg :: !Logger,
manager :: !Manager,
websockets :: !(Dict Key Websocket),
rabbitConnections :: !(Dict Key Q.Connection),
rabbitConnections :: !(Dict Unique Q.Connection),
rand :: !GenIO,
clock :: !Clock,
drainOpts :: DrainOpts,
cassandra :: ClientState,
pool :: RabbitMqPool Key
pool :: RabbitMqPool
}

setRequestId :: RequestId -> Env -> Env
Expand Down Expand Up @@ -203,12 +199,12 @@ env ::
Logger ->
Manager ->
Dict Key Websocket ->
Dict Key Q.Connection ->
Dict Unique Q.Connection ->
GenIO ->
Clock ->
DrainOpts ->
ClientState ->
RabbitMqPool Key ->
RabbitMqPool ->
Env
env leh lp gh gp = Env leh lp (Bilge.host gh . Bilge.port gp $ empty) (RequestId defRequestId)

Expand Down
2 changes: 1 addition & 1 deletion services/gundeck/src/Gundeck/Client.hs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ setupConsumableNotifications ::
ClientId ->
IO Text
setupConsumableNotifications chan uid cid = do
let qName = clientNotificationQueueName uid (RabbitMqClientId cid)
let qName = clientNotificationQueueName uid cid
void $
declareQueue
chan
Expand Down

0 comments on commit d60fc4a

Please sign in to comment.