Skip to content

Commit

Permalink
Merge pull request #40 from nikita-volkov/observability
Browse files Browse the repository at this point in the history
Observability
  • Loading branch information
nikita-volkov authored Feb 25, 2024
2 parents e58850e + 7a0e620 commit 92a6ed9
Show file tree
Hide file tree
Showing 6 changed files with 126 additions and 28 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
# 0.11

- Optional observability event stream, which can be interpreted into log records and metrics.

# 0.10.1

- Avoid releasing connections on exceptions thrown in session
Expand Down
8 changes: 7 additions & 1 deletion hasql-pool.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,19 @@ common base-settings
library
import: base-settings
hs-source-dirs: library
exposed-modules: Hasql.Pool
exposed-modules:
Hasql.Pool
Hasql.Pool.Observation

other-modules: Hasql.Pool.Prelude
build-depends:
, base >=4.11 && <5
, bytestring >=0.10 && <0.14
, hasql >=1.6.0.1 && <1.7
, stm >=2.5 && <3
, text >=1.2 && <3
, time >=1.9 && <2
, uuid >=1.3 && <2

test-suite test
import: base-settings
Expand Down
101 changes: 75 additions & 26 deletions library/Hasql/Pool.hs
Original file line number Diff line number Diff line change
Expand Up @@ -8,29 +8,36 @@ module Hasql.Pool

-- * Errors
UsageError (..),

-- * Observations
module Hasql.Pool.Observation,
)
where

import qualified Data.Text.Encoding as Text
import qualified Data.Text.Encoding.Error as Text
import qualified Data.UUID.V4 as Uuid
import Hasql.Connection (Connection)
import qualified Hasql.Connection as Connection
import Hasql.Pool.Observation
import Hasql.Pool.Prelude
import qualified Hasql.Session as Session

-- | A connection tagged with metadata.
data Entry = Entry
{ entryConnection :: Connection,
entryCreationTimeNSec :: Word64,
entryUseTimeNSec :: Word64
entryUseTimeNSec :: Word64,
entryId :: UUID
}

entryIsAlive :: Word64 -> Word64 -> Word64 -> Entry -> Bool
entryIsAlive maxLifetime maxIdletime now Entry {..} =
now
<= entryCreationTimeNSec
+ maxLifetime
&& now
<= entryUseTimeNSec
+ maxIdletime
entryIsAged :: Word64 -> Word64 -> Entry -> Bool
entryIsAged maxLifetime now Entry {..} =
now > entryCreationTimeNSec + maxLifetime

entryIsIdle :: Word64 -> Word64 -> Entry -> Bool
entryIsIdle maxIdletime now Entry {..} =
now > entryUseTimeNSec + maxIdletime

-- | Pool of connections to DB.
data Pool = Pool
Expand All @@ -54,7 +61,9 @@ data Pool = Pool
-- | Whether to return a connection to the pool.
poolReuseVar :: TVar (TVar Bool),
-- | To stop the manager thread via garbage collection.
poolReaperRef :: IORef ()
poolReaperRef :: IORef (),
-- | Action for reporting the observations.
poolObserver :: Observation -> IO ()
}

-- | Create a connection-pool, with default settings.
Expand All @@ -72,9 +81,16 @@ acquire ::
DiffTime ->
-- | Connection settings.
Connection.Settings ->
-- | Observation handler.
--
-- Typically it's used for monitoring the state of the pool via metrics and logging.
--
-- If the action is not lightweight, it's recommended to use intermediate bufferring via channels like TBQueue.
-- E.g., if the action is @'atomically' . 'writeTBQueue' yourQueue@, then reading from it and processing can be done on a separate thread.
(Observation -> IO ()) ->
IO Pool
acquire poolSize acqTimeout maxLifetime maxIdletime connectionSettings =
acquireDynamically poolSize acqTimeout maxLifetime maxIdletime (pure connectionSettings)
acquire poolSize acqTimeout maxLifetime maxIdletime connectionSettings observer =
acquireDynamically poolSize acqTimeout maxLifetime maxIdletime (pure connectionSettings) observer

-- | Create a connection-pool.
--
Expand All @@ -94,8 +110,15 @@ acquireDynamically ::
DiffTime ->
-- | Action fetching connection settings.
IO Connection.Settings ->
-- | Observation handler.
--
-- Use it for monitoring the state of the pool via metrics and logging.
--
-- If the action is not lightweight, it's recommended to use intermediate bufferring via channels like TBQueue.
-- E.g., if the action is @'atomically' . 'writeTBQueue' yourQueue@, then reading from it and processing can be done on a separate thread.
(Observation -> IO ()) ->
IO Pool
acquireDynamically poolSize acqTimeout maxLifetime maxIdletime fetchConnectionSettings = do
acquireDynamically poolSize acqTimeout maxLifetime maxIdletime fetchConnectionSettings observer = do
connectionQueue <- newTQueueIO
capVar <- newTVarIO poolSize
reuseVar <- newTVarIO =<< newTVarIO True
Expand All @@ -106,17 +129,24 @@ acquireDynamically poolSize acqTimeout maxLifetime maxIdletime fetchConnectionSe
now <- getMonotonicTimeNSec
join . atomically $ do
entries <- flushTQueue connectionQueue
let (keep, close) = partition (entryIsAlive maxLifetimeNanos maxIdletimeNanos now) entries
traverse_ (writeTQueue connectionQueue) keep
return $ forM_ close $ \entry -> do
Connection.release (entryConnection entry)
atomically $ modifyTVar' capVar succ
let (agedEntries, unagedEntries) = partition (entryIsAged maxLifetimeNanos now) entries
(idleEntries, liveEntries) = partition (entryIsIdle maxLifetimeNanos now) unagedEntries
traverse_ (writeTQueue connectionQueue) liveEntries
return $ do
forM_ agedEntries $ \entry -> do
Connection.release (entryConnection entry)
atomically $ modifyTVar' capVar succ
observer (ConnectionObservation (entryId entry) (TerminatedConnectionStatus AgingConnectionTerminationReason))
forM_ idleEntries $ \entry -> do
Connection.release (entryConnection entry)
atomically $ modifyTVar' capVar succ
observer (ConnectionObservation (entryId entry) (TerminatedConnectionStatus IdlenessConnectionTerminationReason))

void . mkWeakIORef reaperRef $ do
-- When the pool goes out of scope, stop the manager.
killThread managerTid

return $ Pool poolSize fetchConnectionSettings acqTimeoutMicros maxLifetimeNanos maxIdletimeNanos connectionQueue capVar reuseVar reaperRef
return $ Pool poolSize fetchConnectionSettings acqTimeoutMicros maxLifetimeNanos maxIdletimeNanos connectionQueue capVar reuseVar reaperRef observer
where
acqTimeoutMicros =
div (fromIntegral (diffTimeToPicoseconds acqTimeout)) 1_000_000
Expand All @@ -143,6 +173,7 @@ release Pool {..} =
return $ forM_ entries $ \entry -> do
Connection.release (entryConnection entry)
atomically $ modifyTVar' poolCapacity succ
poolObserver (ConnectionObservation (entryId entry) (TerminatedConnectionStatus ReleaseConnectionTerminationReason))

-- | Use a connection from the pool to run a session and return the connection
-- to the pool, when finished.
Expand All @@ -152,8 +183,7 @@ release Pool {..} =
-- and a slot gets freed up for a new connection to be established the next
-- time one is needed. The error still gets returned from this function.
--
-- __Warning:__ Due to the mechanism mentioned above you should avoid consuming
-- errors within sessions.
-- __Warning:__ Due to the mechanism mentioned above you should avoid intercepting this error type from within sessions.
use :: Pool -> Session.Session a -> IO (Either UsageError a)
use Pool {..} sess = do
timeout <- do
Expand All @@ -180,37 +210,55 @@ use Pool {..} sess = do
onNewConn reuseVar = do
settings <- poolFetchConnectionSettings
now <- getMonotonicTimeNSec
id <- Uuid.nextRandom
poolObserver (ConnectionObservation id ConnectingConnectionStatus)
connRes <- Connection.acquire settings
case connRes of
Left connErr -> do
poolObserver (ConnectionObservation id (TerminatedConnectionStatus (NetworkErrorConnectionTerminationReason (fmap (Text.decodeUtf8With Text.lenientDecode) connErr))))
atomically $ modifyTVar' poolCapacity succ
return $ Left $ ConnectionUsageError connErr
Right entry -> onLiveConn reuseVar (Entry entry now now)
Right entry -> do
poolObserver (ConnectionObservation id ReadyForUseConnectionStatus)
onLiveConn reuseVar (Entry entry now now id)

onConn reuseVar entry = do
now <- getMonotonicTimeNSec
if entryIsAlive poolMaxLifetime poolMaxIdletime now entry
then onLiveConn reuseVar entry {entryUseTimeNSec = now}
else do
if entryIsAged poolMaxLifetime now entry
then do
Connection.release (entryConnection entry)
poolObserver (ConnectionObservation (entryId entry) (TerminatedConnectionStatus AgingConnectionTerminationReason))
onNewConn reuseVar
else
if entryIsIdle poolMaxIdletime now entry
then do
Connection.release (entryConnection entry)
poolObserver (ConnectionObservation (entryId entry) (TerminatedConnectionStatus IdlenessConnectionTerminationReason))
onNewConn reuseVar
else do
onLiveConn reuseVar entry {entryUseTimeNSec = now}

onLiveConn reuseVar entry = do
poolObserver (ConnectionObservation (entryId entry) InUseConnectionStatus)
sessRes <- try @SomeException (Session.run sess (entryConnection entry))

case sessRes of
Left exc -> do
returnConn
throwIO exc
Right (Left err) -> case err of
Session.QueryError _ _ (Session.ClientError _) -> do
Session.QueryError _ _ (Session.ClientError details) -> do
Connection.release (entryConnection entry)
atomically $ modifyTVar' poolCapacity succ
poolObserver (ConnectionObservation (entryId entry) (TerminatedConnectionStatus (NetworkErrorConnectionTerminationReason (fmap (Text.decodeUtf8With Text.lenientDecode) details))))
return $ Left $ SessionUsageError err
_ -> do
returnConn
poolObserver (ConnectionObservation (entryId entry) ReadyForUseConnectionStatus)
return $ Left $ SessionUsageError err
Right (Right res) -> do
returnConn
poolObserver (ConnectionObservation (entryId entry) ReadyForUseConnectionStatus)
return $ Right res
where
returnConn =
Expand All @@ -221,6 +269,7 @@ use Pool {..} sess = do
else return $ do
Connection.release (entryConnection entry)
atomically $ modifyTVar' poolCapacity succ
poolObserver (ConnectionObservation (entryId entry) (TerminatedConnectionStatus ReleaseConnectionTerminationReason))

-- | Union over all errors that 'use' can result in.
data UsageError
Expand Down
36 changes: 36 additions & 0 deletions library/Hasql/Pool/Observation.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
module Hasql.Pool.Observation where

import Hasql.Pool.Prelude

data Observation
= ConnectionObservation
-- | Generated connection ID.
-- For grouping the observations by one connection.
UUID
-- | Connection status that it has entered.
ConnectionStatus
deriving (Show, Eq)

data ConnectionStatus
= -- | Connection is being established.
ConnectingConnectionStatus
| -- | Connection is established and not occupied.
ReadyForUseConnectionStatus
| -- | Is being used by some session.
--
-- After it's done the status will transition to 'ReadyForUseConnectionStatus' or 'TerminatedConnectionStatus'.
InUseConnectionStatus
| -- | Connection terminated.
TerminatedConnectionStatus ConnectionTerminationReason
deriving (Show, Eq)

data ConnectionTerminationReason
= -- | The age timeout of the connection has passed.
AgingConnectionTerminationReason
| -- | The timeout of how long a connection may remain idle in the pool has passed.
IdlenessConnectionTerminationReason
| -- | Connectivity issues with the server.
NetworkErrorConnectionTerminationReason (Maybe Text)
| -- | User has invoked the 'Hasql.Pool.release' procedure.
ReleaseConnectionTerminationReason
deriving (Show, Eq)
3 changes: 3 additions & 0 deletions library/Hasql/Pool/Prelude.hs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import Control.Monad.ST as Exports
import Data.Bifunctor as Exports
import Data.Bits as Exports
import Data.Bool as Exports
import Data.ByteString as Exports (ByteString)
import Data.Char as Exports
import Data.Coerce as Exports
import Data.Complex as Exports
Expand All @@ -40,9 +41,11 @@ import Data.Proxy as Exports
import Data.Ratio as Exports
import Data.STRef as Exports
import Data.String as Exports
import Data.Text as Exports (Text)
import Data.Time as Exports
import Data.Traversable as Exports
import Data.Tuple as Exports
import Data.UUID as Exports (UUID)
import Data.Unique as Exports
import Data.Version as Exports
import Data.Void as Exports
Expand Down
2 changes: 1 addition & 1 deletion test/Main.hs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ main :: IO ()
main = do
connectionSettings <- getConnectionSettings
let withPool poolSize acqTimeout maxLifetime maxIdletime connectionSettings =
bracket (acquire poolSize acqTimeout maxLifetime maxIdletime connectionSettings) release
bracket (acquire poolSize acqTimeout maxLifetime maxIdletime connectionSettings (const (pure ()))) release
withDefaultPool =
withPool 3 10 1_800 1_800 connectionSettings

Expand Down

0 comments on commit 92a6ed9

Please sign in to comment.