Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Timeout #11

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
# resource-pool-0.4.0.0 (2022-08-18)
* Added optional timeout for resource acquiring

# resource-pool-0.3.1.0 (2022-06-15)
* Add `tryWithResource` and `tryTakeResource`.

Expand All @@ -7,4 +10,4 @@
* Remove dependency on `monad-control`.
* Expose the `.Internal` module.
* Add support for introspection.
* Add `PoolConfig`.
* Add `PoolConfig`.
2 changes: 1 addition & 1 deletion resource-pool.cabal
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
cabal-version: 2.4
build-type: Simple
name: resource-pool
version: 0.3.1.0
version: 0.4.0.0
license: BSD-3-Clause
license-file: LICENSE
category: Data, Database, Network
Expand Down
7 changes: 5 additions & 2 deletions src/Data/Pool.hs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
module Data.Pool
( -- * Pool
PoolConfig(..)
, TimeoutConfig(..)
, TimeoutException(..)
, Pool
, LocalPool
, newPool
Expand All @@ -22,9 +24,9 @@ module Data.Pool

import Control.Concurrent
import Control.Exception
import Data.Time (NominalDiffTime)

import Data.Pool.Internal
import Data.Time (NominalDiffTime)

-- | Take a resource from the pool, perform an action with it and return it to
-- the pool afterwards.
Expand Down Expand Up @@ -64,7 +66,7 @@ takeResource pool = mask_ $ do
then do
q <- newEmptyMVar
putMVar (stripeVar lp) $! stripe { queueR = Queue q (queueR stripe) }
waitForResource (stripeVar lp) q >>= \case
waitForResource (getPoolTimeoutConfig pool) (stripeVar lp) q >>= \case
Just a -> pure (a, lp)
Nothing -> do
a <- createResource (poolConfig pool) `onException` restoreSize (stripeVar lp)
Expand Down Expand Up @@ -103,6 +105,7 @@ createPool create free numStripes idleTime maxResources = newPool PoolConfig
, freeResource = free
, poolCacheTTL = realToFrac idleTime
, poolMaxResources = numStripes * maxResources
, poolTimeoutConfig = Nothing
}

----------------------------------------
Expand Down
30 changes: 28 additions & 2 deletions src/Data/Pool/Internal.hs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
-- This module is intended for internal use only, and may change without warning
-- in subsequent releases.
{-# OPTIONS_HADDOCK not-home #-}
{-# LANGUAGE DeriveAnyClass #-}
module Data.Pool.Internal where

import Control.Concurrent
Expand All @@ -12,6 +13,7 @@ import Data.IORef
import Data.Primitive.SmallArray
import GHC.Clock
import qualified Data.List as L
import System.Timeout (timeout)

-- | Striped resource pool based on "Control.Concurrent.QSem".
--
Expand All @@ -24,6 +26,9 @@ data Pool a = Pool
, reaperRef :: !(IORef ())
}

getPoolTimeoutConfig :: Pool a -> Maybe TimeoutConfig
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd argue we can do without this helper function.

getPoolTimeoutConfig = poolTimeoutConfig . poolConfig

-- | A single, capability-local pool.
data LocalPool a = LocalPool
{ stripeId :: !Int
Expand Down Expand Up @@ -71,8 +76,20 @@ data PoolConfig a = PoolConfig
-- capabilities and rounded up. Therefore the pool might end up creating up to
-- @N - 1@ resources more in total than specified, where @N@ is the number of
-- capabilities.
, poolTimeoutConfig :: Maybe TimeoutConfig
-- ^ Optional timeout for waiting for a resource
}

data TimeoutConfig = TimeoutConfig
{ acquireResourceTimeout :: Int
-- ^ Time to await, microseconds
, timeoutLabel :: String
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't it be better to provide a value that should be thrown as exception here instead of a label?

Then you can also get rid of the TimeoutException.

-- ^ Label for TimeoutException
}

newtype TimeoutException = TimeoutException String
deriving (Show, Exception)

-- | Create a new striped resource pool.
--
-- The number of stripes is equal to the number of capabilities.
Expand All @@ -87,6 +104,8 @@ newPool pc = do
error "poolCacheTTL must be at least 0.5"
when (poolMaxResources pc < 1) $ do
error "poolMaxResources must be at least 1"
when (maybe False (< 0) (acquireResourceTimeout <$> poolTimeoutConfig pc)) $ do
error "acquireResourceTimeout must be at least 0"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd argue it should be more than 0 :)

numStripes <- getNumCapabilities
when (numStripes < 1) $ do
error "numStripes must be at least 1"
Expand Down Expand Up @@ -175,8 +194,8 @@ getLocalPool pools = do
pure $ pools `indexSmallArray` (cid `rem` sizeofSmallArray pools)

-- | Wait for the resource to be put into a given 'MVar'.
waitForResource :: MVar (Stripe a) -> MVar (Maybe a) -> IO (Maybe a)
waitForResource mstripe q = takeMVar q `onException` cleanup
waitForResource :: Maybe TimeoutConfig -> MVar (Stripe a) -> MVar (Maybe a) -> IO (Maybe a)
waitForResource timeoutConfig mstripe q = limitByTime (takeMVar q) `onException` cleanup
where
cleanup = uninterruptibleMask_ $ do -- Note [signal uninterruptible]
stripe <- takeMVar mstripe
Expand All @@ -192,6 +211,13 @@ waitForResource mstripe q = takeMVar q `onException` cleanup
putMVar q $ error "unreachable"
pure stripe
putMVar mstripe newStripe
limitByTime = case timeoutConfig of
Just cfg -> timeout (acquireResourceTimeout cfg) >=> throwOnTimeout cfg
Nothing -> id
throwOnTimeout cfg =
\case Just a -> pure a
Nothing -> throwIO $ TimeoutException (timeoutLabel cfg)


-- | If an exception is received while a resource is being created, restore the
-- original size of the stripe.
Expand Down
2 changes: 1 addition & 1 deletion src/Data/Pool/Introspection.hs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ takeResource pool = mask_ $ do
then do
q <- newEmptyMVar
putMVar (stripeVar lp) $! stripe { queueR = Queue q (queueR stripe) }
waitForResource (stripeVar lp) q >>= \case
waitForResource (getPoolTimeoutConfig pool) (stripeVar lp) q >>= \case
Just a -> do
t2 <- getMonotonicTime
let res = Resource
Expand Down