diff --git a/.github/workflows/haskell-ci.yml b/.github/workflows/haskell-ci.yml new file mode 100644 index 0000000..4231913 --- /dev/null +++ b/.github/workflows/haskell-ci.yml @@ -0,0 +1,228 @@ +# This GitHub workflow config has been generated by a script via +# +# haskell-ci 'github' '--config=cabal.haskell-ci' 'resource-pool.cabal' +# +# To regenerate the script (for example after adjusting tested-with) run +# +# haskell-ci regenerate +# +# For more information, see https://github.com/haskell-CI/haskell-ci +# +# version: 0.14.3 +# +# REGENDATA ("0.14.3",["github","--config=cabal.haskell-ci","resource-pool.cabal"]) +# +name: Haskell-CI +on: + push: + branches: + - master + pull_request: + branches: + - master +jobs: + linux: + name: Haskell-CI - Linux - ${{ matrix.compiler }} + runs-on: ubuntu-18.04 + timeout-minutes: + 60 + container: + image: buildpack-deps:bionic + continue-on-error: ${{ matrix.allow-failure }} + strategy: + matrix: + include: + - compiler: ghc-9.2.2 + compilerKind: ghc + compilerVersion: 9.2.2 + setup-method: ghcup + allow-failure: false + - compiler: ghc-9.0.2 + compilerKind: ghc + compilerVersion: 9.0.2 + setup-method: ghcup + allow-failure: false + - compiler: ghc-8.10.7 + compilerKind: ghc + compilerVersion: 8.10.7 + setup-method: ghcup + allow-failure: false + - compiler: ghc-8.8.4 + compilerKind: ghc + compilerVersion: 8.8.4 + setup-method: hvr-ppa + allow-failure: false + - compiler: ghc-8.6.5 + compilerKind: ghc + compilerVersion: 8.6.5 + setup-method: hvr-ppa + allow-failure: false + - compiler: ghc-8.4.4 + compilerKind: ghc + compilerVersion: 8.4.4 + setup-method: hvr-ppa + allow-failure: false + fail-fast: false + steps: + - name: apt + run: | + apt-get update + apt-get install -y --no-install-recommends gnupg ca-certificates dirmngr curl git software-properties-common libtinfo5 + if [ "${{ matrix.setup-method }}" = ghcup ]; then + mkdir -p "$HOME/.ghcup/bin" + curl -sL https://downloads.haskell.org/ghcup/0.1.17.5/x86_64-linux-ghcup-0.1.17.5 > "$HOME/.ghcup/bin/ghcup" + chmod a+x "$HOME/.ghcup/bin/ghcup" + "$HOME/.ghcup/bin/ghcup" install ghc "$HCVER" + "$HOME/.ghcup/bin/ghcup" install cabal 3.6.2.0 + else + apt-add-repository -y 'ppa:hvr/ghc' + apt-get update + apt-get install -y "$HCNAME" + mkdir -p "$HOME/.ghcup/bin" + curl -sL https://downloads.haskell.org/ghcup/0.1.17.5/x86_64-linux-ghcup-0.1.17.5 > "$HOME/.ghcup/bin/ghcup" + chmod a+x "$HOME/.ghcup/bin/ghcup" + "$HOME/.ghcup/bin/ghcup" install cabal 3.6.2.0 + fi + env: + HCKIND: ${{ matrix.compilerKind }} + HCNAME: ${{ matrix.compiler }} + HCVER: ${{ matrix.compilerVersion }} + - name: Set PATH and environment variables + run: | + echo "$HOME/.cabal/bin" >> $GITHUB_PATH + echo "LANG=C.UTF-8" >> "$GITHUB_ENV" + echo "CABAL_DIR=$HOME/.cabal" >> "$GITHUB_ENV" + echo "CABAL_CONFIG=$HOME/.cabal/config" >> "$GITHUB_ENV" + HCDIR=/opt/$HCKIND/$HCVER + if [ "${{ matrix.setup-method }}" = ghcup ]; then + HC=$HOME/.ghcup/bin/$HCKIND-$HCVER + echo "HC=$HC" >> "$GITHUB_ENV" + echo "HCPKG=$HOME/.ghcup/bin/$HCKIND-pkg-$HCVER" >> "$GITHUB_ENV" + echo "HADDOCK=$HOME/.ghcup/bin/haddock-$HCVER" >> "$GITHUB_ENV" + echo "CABAL=$HOME/.ghcup/bin/cabal-3.6.2.0 -vnormal+nowrap" >> "$GITHUB_ENV" + else + HC=$HCDIR/bin/$HCKIND + echo "HC=$HC" >> "$GITHUB_ENV" + echo "HCPKG=$HCDIR/bin/$HCKIND-pkg" >> "$GITHUB_ENV" + echo "HADDOCK=$HCDIR/bin/haddock" >> "$GITHUB_ENV" + echo "CABAL=$HOME/.ghcup/bin/cabal-3.6.2.0 -vnormal+nowrap" >> "$GITHUB_ENV" + fi + + HCNUMVER=$(${HC} --numeric-version|perl -ne '/^(\d+)\.(\d+)\.(\d+)(\.(\d+))?$/; print(10000 * $1 + 100 * $2 + ($3 == 0 ? $5 != 1 : $3))') + echo "HCNUMVER=$HCNUMVER" >> "$GITHUB_ENV" + echo "ARG_TESTS=--enable-tests" >> "$GITHUB_ENV" + echo "ARG_BENCH=--enable-benchmarks" >> "$GITHUB_ENV" + echo "HEADHACKAGE=false" >> "$GITHUB_ENV" + echo "ARG_COMPILER=--$HCKIND --with-compiler=$HC" >> "$GITHUB_ENV" + echo "GHCJSARITH=0" >> "$GITHUB_ENV" + env: + HCKIND: ${{ matrix.compilerKind }} + HCNAME: ${{ matrix.compiler }} + HCVER: ${{ matrix.compilerVersion }} + - name: env + run: | + env + - name: write cabal config + run: | + mkdir -p $CABAL_DIR + cat >> $CABAL_CONFIG <> $CABAL_CONFIG < cabal-plan.xz + echo 'de73600b1836d3f55e32d80385acc055fd97f60eaa0ab68a755302685f5d81bc cabal-plan.xz' | sha256sum -c - + xz -d < cabal-plan.xz > $HOME/.cabal/bin/cabal-plan + rm -f cabal-plan.xz + chmod a+x $HOME/.cabal/bin/cabal-plan + cabal-plan --version + - name: checkout + uses: actions/checkout@v2 + with: + path: source + - name: initial cabal.project for sdist + run: | + touch cabal.project + echo "packages: $GITHUB_WORKSPACE/source/." >> cabal.project + cat cabal.project + - name: sdist + run: | + mkdir -p sdist + $CABAL sdist all --output-dir $GITHUB_WORKSPACE/sdist + - name: unpack + run: | + mkdir -p unpacked + find sdist -maxdepth 1 -type f -name '*.tar.gz' -exec tar -C $GITHUB_WORKSPACE/unpacked -xzvf {} \; + - name: generate cabal.project + run: | + PKGDIR_resource_pool="$(find "$GITHUB_WORKSPACE/unpacked" -maxdepth 1 -type d -regex '.*/resource-pool-[0-9.]*')" + echo "PKGDIR_resource_pool=${PKGDIR_resource_pool}" >> "$GITHUB_ENV" + rm -f cabal.project cabal.project.local + touch cabal.project + touch cabal.project.local + echo "packages: ${PKGDIR_resource_pool}" >> cabal.project + echo "package resource-pool" >> cabal.project + echo " ghc-options: -Werror=missing-methods" >> cabal.project + cat >> cabal.project <> cabal.project.local + cat cabal.project + cat cabal.project.local + - name: dump install plan + run: | + $CABAL v2-build $ARG_COMPILER $ARG_TESTS $ARG_BENCH --dry-run all + cabal-plan + - name: cache + uses: actions/cache@v2 + with: + key: ${{ runner.os }}-${{ matrix.compiler }}-${{ github.sha }} + path: ~/.cabal/store + restore-keys: ${{ runner.os }}-${{ matrix.compiler }}- + - name: install dependencies + run: | + $CABAL v2-build $ARG_COMPILER --disable-tests --disable-benchmarks --dependencies-only -j2 all + $CABAL v2-build $ARG_COMPILER $ARG_TESTS $ARG_BENCH --dependencies-only -j2 all + - name: build w/o tests + run: | + $CABAL v2-build $ARG_COMPILER --disable-tests --disable-benchmarks all + - name: build + run: | + $CABAL v2-build $ARG_COMPILER $ARG_TESTS $ARG_BENCH all --write-ghc-environment-files=always + - name: cabal check + run: | + cd ${PKGDIR_resource_pool} || false + ${CABAL} -vnormal check + - name: haddock + run: | + $CABAL v2-haddock $ARG_COMPILER --with-haddock $HADDOCK $ARG_TESTS $ARG_BENCH all + - name: unconstrained build + run: | + rm -f cabal.project.local + $CABAL v2-build $ARG_COMPILER --disable-tests --disable-benchmarks all diff --git a/.hgignore b/.hgignore deleted file mode 100644 index 51da823..0000000 --- a/.hgignore +++ /dev/null @@ -1,10 +0,0 @@ -.*\.(?:aux|h[ip]|o|orig|out|pdf|prof|ps|rej)$ -^(?:dist|\.DS_Store)$ -^tests/(?:qc) - -syntax: glob -cabal-dev -*~ -.*.swp -.\#* -\#* diff --git a/.hgtags b/.hgtags deleted file mode 100644 index 1407166..0000000 --- a/.hgtags +++ /dev/null @@ -1,14 +0,0 @@ -4c30c410223808c6ba0f4045b76c79dccb2b3386 0.1.0.0 -7c58de97c097a6e67252460b3cdcddea5f4b688e 0.1.0.1 -639cfbbbfcb73b0effa51064e9b59bf7cb397ac7 0.1.0.2 -639cfbbbfcb73b0effa51064e9b59bf7cb397ac7 0.1.0.2 -1fc8b99aad9ca0947fa6c614eab7e3d793dc1b0d 0.1.0.2 -33fb0f106b0153d24d4281ef88b8cf0eb681e13d 0.1.1.0 -caf8342bace0ca22f74f08458419229cbff87727 0.2.0.0 -606c1e147f6c363a9850e1239f6ec01dc1cea842 0.2.0.1 -3c0453d9ed9f57ea13873e9fb6f3cf423c6f9fc8 0.2.0.2 -89a5ed47828f804e352296f66ccd10cd8639aebe 0.2.0.3 -f3fc5de81581943b601e4e0605c0a41e822e89b2 0.2.0.4 -1453c7d3314ff954f040b0179abe65790f4819fe 0.2.1.0 -ccaa7c42c382361181286cff1db1d305fb88a4a5 0.2.1.1 -e7c91e7b0e1cf6bf89b628b97e901820e4a42032 0.2.2.0 diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 0000000..13d445c --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1,2 @@ +# resource-pool-0.3.0.0 (2022-??-??) +* Rewrite based on `Control.Concurrent.QSem` for better throughput and latency. diff --git a/Data/Pool.hs b/Data/Pool.hs deleted file mode 100644 index 6764e8b..0000000 --- a/Data/Pool.hs +++ /dev/null @@ -1,393 +0,0 @@ -{-# LANGUAGE CPP, NamedFieldPuns, RecordWildCards, ScopedTypeVariables, RankNTypes, DeriveDataTypeable #-} - -#if MIN_VERSION_monad_control(0,3,0) -{-# LANGUAGE FlexibleContexts #-} -#endif - -#if !MIN_VERSION_base(4,3,0) -{-# LANGUAGE RankNTypes #-} -#endif - --- | --- Module: Data.Pool --- Copyright: (c) 2011 MailRank, Inc. --- License: BSD3 --- Maintainer: Bryan O'Sullivan , --- Bas van Dijk --- Stability: experimental --- Portability: portable --- --- A high-performance striped pooling abstraction for managing --- flexibly-sized collections of resources such as database --- connections. --- --- \"Striped\" means that a single 'Pool' consists of several --- sub-pools, each managed independently. A single stripe is fine for --- many applications, and probably what you should choose by default. --- More stripes will lead to reduced contention in high-performance --- multicore applications, at a trade-off of causing the maximum --- number of simultaneous resources in use to grow. -module Data.Pool - ( - Pool(idleTime, maxResources, numStripes) - , LocalPool - , createPool - , withResource - , takeResource - , tryWithResource - , tryTakeResource - , destroyResource - , putResource - , destroyAllResources - ) where - -import Control.Applicative ((<$>)) -import Control.Concurrent (ThreadId, forkIOWithUnmask, killThread, myThreadId, threadDelay) -import Control.Concurrent.STM -import Control.Exception (SomeException, onException, mask_) -import Control.Monad (forM_, forever, join, liftM3, unless, when) -import Data.Hashable (hash) -import Data.IORef (IORef, newIORef, mkWeakIORef) -import Data.List (partition) -import Data.Time.Clock (NominalDiffTime, UTCTime, diffUTCTime, getCurrentTime) -import Data.Typeable (Typeable) -import GHC.Conc.Sync (labelThread) -import qualified Control.Exception as E -import qualified Data.Vector as V - -#if MIN_VERSION_monad_control(0,3,0) -import Control.Monad.Trans.Control (MonadBaseControl, control) -import Control.Monad.Base (liftBase) -#else -import Control.Monad.IO.Control (MonadControlIO, controlIO) -import Control.Monad.IO.Class (liftIO) -#define control controlIO -#define liftBase liftIO -#endif - -#if MIN_VERSION_base(4,3,0) -import Control.Exception (mask) -#else --- Don't do any async exception protection for older GHCs. -mask :: ((forall a. IO a -> IO a) -> IO b) -> IO b -mask f = f id -#endif - --- | A single resource pool entry. -data Entry a = Entry { - entry :: a - , lastUse :: UTCTime - -- ^ Time of last return. - } - --- | A single striped pool. -data LocalPool a = LocalPool { - inUse :: TVar Int - -- ^ Count of open entries (both idle and in use). - , entries :: TVar [Entry a] - -- ^ Idle entries. - , lfin :: IORef () - -- ^ empty value used to attach a finalizer to (internal) - } deriving (Typeable) - -data Pool a = Pool { - create :: IO a - -- ^ Action for creating a new entry to add to the pool. - , destroy :: a -> IO () - -- ^ Action for destroying an entry that is now done with. - , numStripes :: Int - -- ^ The number of stripes (distinct sub-pools) to maintain. - -- The smallest acceptable value is 1. - , idleTime :: NominalDiffTime - -- ^ Amount of time for which an unused resource is kept alive. - -- The smallest acceptable value is 0.5 seconds. - -- - -- The elapsed time before closing may be a little longer than - -- requested, as the reaper thread wakes at 1-second intervals. - , maxResources :: Int - -- ^ Maximum number of resources to maintain per stripe. The - -- smallest acceptable value is 1. - -- - -- Requests for resources will block if this limit is reached on a - -- single stripe, even if other stripes have idle resources - -- available. - , localPools :: V.Vector (LocalPool a) - -- ^ Per-capability resource pools. - , fin :: IORef () - -- ^ empty value used to attach a finalizer to (internal) - } deriving (Typeable) - -instance Show (Pool a) where - show Pool{..} = "Pool {numStripes = " ++ show numStripes ++ ", " ++ - "idleTime = " ++ show idleTime ++ ", " ++ - "maxResources = " ++ show maxResources ++ "}" - --- | Create a striped resource pool. --- --- Although the garbage collector will destroy all idle resources when --- the pool is garbage collected it's recommended to manually --- 'destroyAllResources' when you're done with the pool so that the --- resources are freed up as soon as possible. -createPool - :: IO a - -- ^ Action that creates a new resource. - -> (a -> IO ()) - -- ^ Action that destroys an existing resource. - -> Int - -- ^ The number of stripes (distinct sub-pools) to maintain. - -- The smallest acceptable value is 1. - -> NominalDiffTime - -- ^ Amount of time for which an unused resource is kept open. - -- The smallest acceptable value is 0.5 seconds. - -- - -- The elapsed time before destroying a resource may be a little - -- longer than requested, as the reaper thread wakes at 1-second - -- intervals. - -> Int - -- ^ Maximum number of resources to keep open per stripe. The - -- smallest acceptable value is 1. - -- - -- Requests for resources will block if this limit is reached on a - -- single stripe, even if other stripes have idle resources - -- available. - -> IO (Pool a) -createPool create destroy numStripes idleTime maxResources = do - when (numStripes < 1) $ - modError "pool " $ "invalid stripe count " ++ show numStripes - when (idleTime < 0.5) $ - modError "pool " $ "invalid idle time " ++ show idleTime - when (maxResources < 1) $ - modError "pool " $ "invalid maximum resource count " ++ show maxResources - localPools <- V.replicateM numStripes $ - liftM3 LocalPool (newTVarIO 0) (newTVarIO []) (newIORef ()) - reaperId <- forkIOLabeledWithUnmask "resource-pool: reaper" $ \unmask -> - unmask $ reaper destroy idleTime localPools - fin <- newIORef () - let p = Pool { - create - , destroy - , numStripes - , idleTime - , maxResources - , localPools - , fin - } - mkWeakIORef fin (killThread reaperId) >> - V.mapM_ (\lp -> mkWeakIORef (lfin lp) (purgeLocalPool destroy lp)) localPools - return p - --- TODO: Propose 'forkIOLabeledWithUnmask' for the base library. - --- | Sparks off a new thread using 'forkIOWithUnmask' to run the given --- IO computation, but first labels the thread with the given label --- (using 'labelThread'). --- --- The implementation makes sure that asynchronous exceptions are --- masked until the given computation is executed. This ensures the --- thread will always be labeled which guarantees you can always --- easily find it in the GHC event log. --- --- Like 'forkIOWithUnmask', the given computation is given a function --- to unmask asynchronous exceptions. See the documentation of that --- function for the motivation of this. --- --- Returns the 'ThreadId' of the newly created thread. -forkIOLabeledWithUnmask :: String - -> ((forall a. IO a -> IO a) -> IO ()) - -> IO ThreadId -forkIOLabeledWithUnmask label m = mask_ $ forkIOWithUnmask $ \unmask -> do - tid <- myThreadId - labelThread tid label - m unmask - --- | Periodically go through all pools, closing any resources that --- have been left idle for too long. -reaper :: (a -> IO ()) -> NominalDiffTime -> V.Vector (LocalPool a) -> IO () -reaper destroy idleTime pools = forever $ do - threadDelay (1 * 1000000) - now <- getCurrentTime - let isStale Entry{..} = now `diffUTCTime` lastUse > idleTime - V.forM_ pools $ \LocalPool{..} -> do - resources <- atomically $ do - (stale,fresh) <- partition isStale <$> readTVar entries - unless (null stale) $ do - writeTVar entries fresh - modifyTVar_ inUse (subtract (length stale)) - return (map entry stale) - forM_ resources $ \resource -> do - destroy resource `E.catch` \(_::SomeException) -> return () - --- | Destroy all idle resources of the given 'LocalPool' and remove them from --- the pool. -purgeLocalPool :: (a -> IO ()) -> LocalPool a -> IO () -purgeLocalPool destroy LocalPool{..} = do - resources <- atomically $ do - idle <- swapTVar entries [] - modifyTVar_ inUse (subtract (length idle)) - return (map entry idle) - forM_ resources $ \resource -> - destroy resource `E.catch` \(_::SomeException) -> return () - --- | Temporarily take a resource from a 'Pool', perform an action with --- it, and return it to the pool afterwards. --- --- * If the pool has an idle resource available, it is used --- immediately. --- --- * Otherwise, if the maximum number of resources has not yet been --- reached, a new resource is created and used. --- --- * If the maximum number of resources has been reached, this --- function blocks until a resource becomes available. --- --- If the action throws an exception of any type, the resource is --- destroyed, and not returned to the pool. --- --- It probably goes without saying that you should never manually --- destroy a pooled resource, as doing so will almost certainly cause --- a subsequent user (who expects the resource to be valid) to throw --- an exception. -withResource :: -#if MIN_VERSION_monad_control(0,3,0) - (MonadBaseControl IO m) -#else - (MonadControlIO m) -#endif - => Pool a -> (a -> m b) -> m b -{-# SPECIALIZE withResource :: Pool a -> (a -> IO b) -> IO b #-} -withResource pool act = control $ \runInIO -> mask $ \restore -> do - (resource, local) <- takeResource pool - ret <- restore (runInIO (act resource)) `onException` - destroyResource pool local resource - putResource local resource - return ret -#if __GLASGOW_HASKELL__ >= 700 -{-# INLINABLE withResource #-} -#endif - --- | Take a resource from the pool, following the same results as --- 'withResource'. Note that this function should be used with caution, as --- improper exception handling can lead to leaked resources. --- --- This function returns both a resource and the @LocalPool@ it came from so --- that it may either be destroyed (via 'destroyResource') or returned to the --- pool (via 'putResource'). -takeResource :: Pool a -> IO (a, LocalPool a) -takeResource pool@Pool{..} = do - local@LocalPool{..} <- getLocalPool pool - resource <- liftBase . join . atomically $ do - ents <- readTVar entries - case ents of - (Entry{..}:es) -> writeTVar entries es >> return (return entry) - [] -> do - used <- readTVar inUse - when (used == maxResources) retry - writeTVar inUse $! used + 1 - return $ - create `onException` atomically (modifyTVar_ inUse (subtract 1)) - return (resource, local) -#if __GLASGOW_HASKELL__ >= 700 -{-# INLINABLE takeResource #-} -#endif - --- | Similar to 'withResource', but only performs the action if a resource could --- be taken from the pool /without blocking/. Otherwise, 'tryWithResource' --- returns immediately with 'Nothing' (ie. the action function is /not/ called). --- Conversely, if a resource can be borrowed from the pool without blocking, the --- action is performed and it's result is returned, wrapped in a 'Just'. -tryWithResource :: forall m a b. -#if MIN_VERSION_monad_control(0,3,0) - (MonadBaseControl IO m) -#else - (MonadControlIO m) -#endif - => Pool a -> (a -> m b) -> m (Maybe b) -tryWithResource pool act = control $ \runInIO -> mask $ \restore -> do - res <- tryTakeResource pool - case res of - Just (resource, local) -> do - ret <- restore (runInIO (Just <$> act resource)) `onException` - destroyResource pool local resource - putResource local resource - return ret - Nothing -> restore . runInIO $ return (Nothing :: Maybe b) -#if __GLASGOW_HASKELL__ >= 700 -{-# INLINABLE tryWithResource #-} -#endif - --- | A non-blocking version of 'takeResource'. The 'tryTakeResource' function --- returns immediately, with 'Nothing' if the pool is exhausted, or @'Just' (a, --- 'LocalPool' a)@ if a resource could be borrowed from the pool successfully. -tryTakeResource :: Pool a -> IO (Maybe (a, LocalPool a)) -tryTakeResource pool@Pool{..} = do - local@LocalPool{..} <- getLocalPool pool - resource <- liftBase . join . atomically $ do - ents <- readTVar entries - case ents of - (Entry{..}:es) -> writeTVar entries es >> return (return . Just $ entry) - [] -> do - used <- readTVar inUse - if used == maxResources - then return (return Nothing) - else do - writeTVar inUse $! used + 1 - return $ Just <$> - create `onException` atomically (modifyTVar_ inUse (subtract 1)) - return $ (flip (,) local) <$> resource -#if __GLASGOW_HASKELL__ >= 700 -{-# INLINABLE tryTakeResource #-} -#endif - --- | Get a (Thread-)'LocalPool' --- --- Internal, just to not repeat code for 'takeResource' and 'tryTakeResource' -getLocalPool :: Pool a -> IO (LocalPool a) -getLocalPool Pool{..} = do - i <- liftBase $ ((`mod` numStripes) . hash) <$> myThreadId - return $ localPools V.! i -#if __GLASGOW_HASKELL__ >= 700 -{-# INLINABLE getLocalPool #-} -#endif - --- | Destroy a resource. Note that this will ignore any exceptions in the --- destroy function. -destroyResource :: Pool a -> LocalPool a -> a -> IO () -destroyResource Pool{..} LocalPool{..} resource = do - destroy resource `E.catch` \(_::SomeException) -> return () - atomically (modifyTVar_ inUse (subtract 1)) -#if __GLASGOW_HASKELL__ >= 700 -{-# INLINABLE destroyResource #-} -#endif - --- | Return a resource to the given 'LocalPool'. -putResource :: LocalPool a -> a -> IO () -putResource LocalPool{..} resource = do - now <- getCurrentTime - atomically $ modifyTVar_ entries (Entry resource now:) -#if __GLASGOW_HASKELL__ >= 700 -{-# INLINABLE putResource #-} -#endif - --- | Destroy all resources in all stripes in the pool. Note that this --- will ignore any exceptions in the destroy function. --- --- This function is useful when you detect that all resources in the --- pool are broken. For example after a database has been restarted --- all connections opened before the restart will be broken. In that --- case it's better to close those connections so that 'takeResource' --- won't take a broken connection from the pool but will open a new --- connection instead. --- --- Another use-case for this function is that when you know you are --- done with the pool you can destroy all idle resources immediately --- instead of waiting on the garbage collector to destroy them, thus --- freeing up those resources sooner. -destroyAllResources :: Pool a -> IO () -destroyAllResources Pool{..} = V.forM_ localPools $ purgeLocalPool destroy - -modifyTVar_ :: TVar a -> (a -> a) -> STM () -modifyTVar_ v f = readTVar v >>= \a -> writeTVar v $! f a - -modError :: String -> String -> a -modError func msg = - error $ "Data.Pool." ++ func ++ ": " ++ msg diff --git a/README.markdown b/README.markdown deleted file mode 100644 index f30471c..0000000 --- a/README.markdown +++ /dev/null @@ -1,28 +0,0 @@ -# Welcome to pool - -pool is a fast Haskell library for managing medium-lifetime pooled -resources, such as database connections. - -# Join in! - -We are happy to receive bug reports, fixes, documentation enhancements, -and other improvements. - -Please report bugs via the -[github issue tracker](http://github.com/bos/pool/issues). - -Master [git repository](http://github.com/bos/pool): - -* `git clone git://github.com/bos/pool.git` - -There's also a [Mercurial mirror](http://bitbucket.org/bos/pool): - -* `hg clone http://bitbucket.org/bos/pool` - -(You can create and contribute changes using either git or Mercurial.) - -Authors -------- - -This library is written and maintained by Bryan O'Sullivan, -. diff --git a/README.md b/README.md new file mode 100644 index 0000000..833ee37 --- /dev/null +++ b/README.md @@ -0,0 +1,10 @@ +# resource-pool + +[![Build Status](https://github.com/scrive/pool/workflows/Haskell-CI/badge.svg?branch=master)](https://github.com/scrive/pool/actions?query=branch%3Amaster) +[![Hackage](https://img.shields.io/hackage/v/resource-pool.svg)](https://hackage.haskell.org/package/resource-pool) +[![Dependencies](https://img.shields.io/hackage-deps/v/resource-pool.svg)](https://packdeps.haskellers.com/feed?needle=andrzej@rybczak.net) +[![Stackage LTS](https://www.stackage.org/package/resource-pool/badge/lts)](https://www.stackage.org/lts/package/resource-pool) +[![Stackage Nightly](https://www.stackage.org/package/resource-pool/badge/nightly)](https://www.stackage.org/nightly/package/resource-pool) + +A high-performance striped resource pooling implementation for Haskell based on +[QSem](https://hackage.haskell.org/package/base/docs/Control-Concurrent-QSem.html). diff --git a/Setup.lhs b/Setup.lhs deleted file mode 100755 index 5bde0de..0000000 --- a/Setup.lhs +++ /dev/null @@ -1,3 +0,0 @@ -#!/usr/bin/env runhaskell -> import Distribution.Simple -> main = defaultMain diff --git a/resource-pool.cabal b/resource-pool.cabal index 6a9bc09..cde8eb1 100644 --- a/resource-pool.cabal +++ b/resource-pool.cabal @@ -1,55 +1,46 @@ +cabal-version: 2.4 +build-type: Simple name: resource-pool -version: 0.2.3.2 -synopsis: A high-performance striped resource pooling implementation -description: - A high-performance striped pooling abstraction for managing - flexibly-sized collections of resources such as database - connections. - -homepage: http://github.com/bos/pool -license: BSD3 +version: 0.3.0.0 +license: BSD-3-Clause license-file: LICENSE -author: Bryan O'Sullivan -maintainer: Bryan O'Sullivan , - Bas van Dijk -copyright: Copyright 2011 MailRank, Inc. category: Data, Database, Network -build-type: Simple -extra-source-files: - README.markdown +maintainer: andrzej@rybczak.net +author: Andrzej Rybczak, Bryan O'Sullivan -cabal-version: >=1.8 +synopsis: A high-performance striped resource pooling implementation -flag developer - description: operate in developer mode - default: False - manual: True +description: A high-performance striped pooling abstraction for managing + flexibly-sized collections of resources such as database + connections. -library - exposed-modules: - Data.Pool - - build-depends: - base >= 4.4 && < 5, - hashable, - monad-control >= 0.2.0.1, - transformers, - transformers-base >= 0.4, - stm >= 2.3, - time, - vector >= 0.7 - - if flag(developer) - ghc-options: -Werror - ghc-prof-options: -auto-all - cpp-options: -DASSERTS -DDEBUG - - ghc-options: -Wall +tested-with: GHC ==8.4.4 || ==8.6.5 || ==8.8.4 || ==8.10.7 || ==9.0.2 || ==9.2.2 + +extra-doc-files: + CHANGELOG.md + README.md +bug-reports: https://github.com/scrive/pool/issues source-repository head type: git - location: http://github.com/bos/pool + location: https://github.com/scrive/pool.git -source-repository head - type: mercurial - location: http://bitbucket.org/bos/pool +library + hs-source-dirs: src + + exposed-modules: Data.Pool + Data.Pool.Internal + Data.Pool.Introspection + + build-depends: base >= 4.11 && < 5 + , primitive >= 0.7 + , time + + ghc-options: -Wall -Wcompat + + default-language: Haskell2010 + + default-extensions: DeriveGeneric + , LambdaCase + , RankNTypes + , TypeApplications diff --git a/src/Data/Pool.hs b/src/Data/Pool.hs new file mode 100644 index 0000000..339007d --- /dev/null +++ b/src/Data/Pool.hs @@ -0,0 +1,92 @@ +-- | A high-performance pooling abstraction for managing flexibly-sized +-- collections of resources such as database connections. +module Data.Pool + ( -- * Pool + PoolConfig(..) + , Pool + , LocalPool + , newPool + + -- * Resource management + , withResource + , takeResource + , putResource + , destroyResource + , destroyAllResources + + -- * Compatibility with 0.2 + , createPool + ) where + +import Control.Concurrent +import Control.Exception +import Data.Time (NominalDiffTime) + +import Data.Pool.Internal + +-- | Take a resource from the pool, perform an action with it and return it to +-- the pool afterwards. +-- +-- * If the pool has an idle resource available, it is used immediately. +-- +-- * Otherwise, if the maximum number of resources has not yet been reached, a +-- new resource is created and used. +-- +-- * If the maximum number of resources has been reached, this function blocks +-- until a resource becomes available. +-- +-- If the action throws an exception of any type, the resource is destroyed and +-- not returned to the pool. +-- +-- It probably goes without saying that you should never manually destroy a +-- pooled resource, as doing so will almost certainly cause a subsequent user +-- (who expects the resource to be valid) to throw an exception. +withResource :: Pool a -> (a -> IO r) -> IO r +withResource pool act = mask $ \unmask -> do + (res, localPool) <- takeResource pool + r <- unmask (act res) `onException` destroyResource pool localPool res + putResource localPool res + pure r + +-- | Take a resource from the pool, following the same results as +-- 'withResource'. +-- +-- /Note:/ this function returns both a resource and the 'LocalPool' it came +-- from so that it may either be destroyed (via 'destroyResource') or returned +-- to the pool (via 'putResource'). +takeResource :: Pool a -> IO (a, LocalPool a) +takeResource pool = mask_ $ do + lp <- getLocalPool (localPools pool) + stripe <- takeMVar (stripeVar lp) + if available stripe == 0 + then do + q <- newEmptyMVar + putMVar (stripeVar lp) $! stripe { queueR = Queue q (queueR stripe) } + waitForResource (stripeVar lp) q >>= \case + Just a -> pure (a, lp) + Nothing -> do + a <- createResource (poolConfig pool) `onException` restoreSize (stripeVar lp) + pure (a, lp) + else case cache stripe of + [] -> do + putMVar (stripeVar lp) $! stripe { available = available stripe - 1 } + a <- createResource (poolConfig pool) `onException` restoreSize (stripeVar lp) + pure (a, lp) + Entry a _ : as -> do + putMVar (stripeVar lp) $! stripe + { available = available stripe - 1 + , cache = as + } + pure (a, lp) + +{-# DEPRECATED createPool "Use newPool instead" #-} +-- | Provided for compatibility with @resource-pool < 0.3@. +-- +-- Use 'newPool' instead. +createPool :: IO a -> (a -> IO ()) -> Int -> NominalDiffTime -> Int -> IO (Pool a) +createPool create free numStripes idleTime maxResources = newPool PoolConfig + { createResource = create + , freeResource = free + , poolCacheTTL = realToFrac idleTime + , poolMaxResources = numStripes * maxResources + } diff --git a/src/Data/Pool/Internal.hs b/src/Data/Pool/Internal.hs new file mode 100644 index 0000000..85c5557 --- /dev/null +++ b/src/Data/Pool/Internal.hs @@ -0,0 +1,275 @@ +-- | Internal implementation details for "Data.Pool". +-- +-- This module is intended for internal use only, and may change without warning +-- in subsequent releases. +{-# OPTIONS_HADDOCK not-home #-} +module Data.Pool.Internal where + +import Control.Concurrent +import Control.Exception +import Control.Monad +import Data.IORef +import Data.Primitive.SmallArray +import GHC.Clock +import qualified Data.List as L + +-- | Striped resource pool based on "Control.Concurrent.QSem". +-- +-- The number of stripes is arranged to be equal to the number of capabilities +-- so that they never compete over access to the same stripe. This results in a +-- very good performance in a multi-threaded environment. +data Pool a = Pool + { poolConfig :: !(PoolConfig a) + , localPools :: !(SmallArray (LocalPool a)) + , reaperRef :: !(IORef ()) + } + +-- | A single, capability-local pool. +data LocalPool a = LocalPool + { stripeId :: !Int + , stripeVar :: !(MVar (Stripe a)) + , cleanerRef :: !(IORef ()) + } + +-- | Stripe of a resource pool. If @available@ is 0, the list of threads waiting +-- for a resource (each with an associated 'MVar') is @queue ++ reverse queueR@. +data Stripe a = Stripe + { available :: !Int + , cache :: ![Entry a] + , queue :: !(Queue a) + , queueR :: !(Queue a) + } + +-- | An existing resource currently sitting in a pool. +data Entry a = Entry + { entry :: a + , lastUsed :: !Double + } + +-- | A queue of MVarS corresponding to threads waiting for resources. +-- +-- Basically a monomorphic list to save two pointer indirections. +data Queue a = Queue !(MVar (Maybe a)) (Queue a) | Empty + +-- | Configuration of a 'Pool'. +data PoolConfig a = PoolConfig + { createResource :: !(IO a) + -- ^ The action that creates a new resource. + , freeResource :: !(a -> IO ()) + -- ^ The action that destroys an existing resource. + , poolCacheTTL :: !Double + -- ^ The amount of seconds for which an unused resource is kept around. The + -- smallest acceptable value is @0.5@. + -- + -- /Note:/ the elapsed time before destroying a resource may be a little + -- longer than requested, as the collector thread wakes at 1-second intervals. + , poolMaxResources :: !Int + -- ^ The maximum number of resources to keep open across all stripes. The + -- smallest acceptable value is @1@. + -- + -- /Note:/ for each stripe the number of resources is divided by the number of + -- capabilities and rounded up. Therefore the pool might end up creating up to + -- @N - 1@ resources more in total than specified, where @N@ is the number of + -- capabilities. + } + +-- | Create a new striped resource pool. +-- +-- The number of stripes is equal to the number of capabilities. +-- +-- /Note:/ although the runtime system will destroy all idle resources when the +-- pool is garbage collected, it's recommended to manually call +-- 'destroyAllResources' when you're done with the pool so that the resources +-- are freed up as soon as possible. +newPool :: PoolConfig a -> IO (Pool a) +newPool pc = do + when (poolCacheTTL pc < 0.5) $ do + error "poolCacheTTL must be at least 0.5" + when (poolMaxResources pc < 1) $ do + error "poolMaxResources must be at least 1" + numStripes <- getNumCapabilities + when (numStripes < 1) $ do + error "numStripes must be at least 1" + pools <- fmap (smallArrayFromListN numStripes) . forM [1..numStripes] $ \n -> do + ref <- newIORef () + stripe <- newMVar Stripe + { available = poolMaxResources pc `quotCeil` numStripes + , cache = [] + , queue = Empty + , queueR = Empty + } + -- When the local pool goes out of scope, free its resources. + void . mkWeakIORef ref $ cleanStripe (const True) (freeResource pc) stripe + pure LocalPool { stripeId = n + , stripeVar = stripe + , cleanerRef = ref + } + mask_ $ do + ref <- newIORef () + collectorA <- forkIOWithUnmask $ \unmask -> unmask $ collector pools + void . mkWeakIORef ref $ do + -- When the pool goes out of scope, stop the collector. Resources existing + -- in stripes will be taken care by their cleaners. + killThread collectorA + pure Pool { poolConfig = pc + , localPools = pools + , reaperRef = ref + } + where + quotCeil :: Int -> Int -> Int + quotCeil x y = + -- Basically ceiling (x / y) without going through Double. + let (z, r) = x `quotRem` y in if r == 0 then z else z + 1 + + -- Collect stale resources from the pool once per second. + collector pools = forever $ do + threadDelay 1000000 + now <- getMonotonicTime + let isStale e = now - lastUsed e > poolCacheTTL pc + mapM_ (cleanStripe isStale (freeResource pc) . stripeVar) pools + +-- | Destroy a resource. +-- +-- Note that this will ignore any exceptions in the destroy function. +destroyResource :: Pool a -> LocalPool a -> a -> IO () +destroyResource pool lp a = do + uninterruptibleMask_ $ do -- Note [signal uninterruptible] + stripe <- takeMVar (stripeVar lp) + newStripe <- signal stripe Nothing + putMVar (stripeVar lp) newStripe + void . try @SomeException $ freeResource (poolConfig pool) a + +-- | Return a resource to the given 'LocalPool'. +putResource :: LocalPool a -> a -> IO () +putResource lp a = do + uninterruptibleMask_ $ do -- Note [signal uninterruptible] + stripe <- takeMVar (stripeVar lp) + newStripe <- signal stripe (Just a) + putMVar (stripeVar lp) newStripe + +-- | Destroy all resources in all stripes in the pool. +-- +-- Note that this will ignore any exceptions in the destroy function. +-- +-- This function is useful when you detect that all resources in the pool are +-- broken. For example after a database has been restarted all connections +-- opened before the restart will be broken. In that case it's better to close +-- those connections so that 'takeResource' won't take a broken connection from +-- the pool but will open a new connection instead. +-- +-- Another use-case for this function is that when you know you are done with +-- the pool you can destroy all idle resources immediately instead of waiting on +-- the garbage collector to destroy them, thus freeing up those resources +-- sooner. +destroyAllResources :: Pool a -> IO () +destroyAllResources pool = forM_ (localPools pool) $ \lp -> do + cleanStripe (const True) (freeResource (poolConfig pool)) (stripeVar lp) + +---------------------------------------- +-- Helpers + +-- | Get a capability-local pool. +getLocalPool :: SmallArray (LocalPool a) -> IO (LocalPool a) +getLocalPool pools = do + (cid, _) <- threadCapability =<< myThreadId + pure $ pools `indexSmallArray` (cid `rem` sizeofSmallArray pools) + +-- | Wait for the resource to be put into a given 'MVar'. +waitForResource :: MVar (Stripe a) -> MVar (Maybe a) -> IO (Maybe a) +waitForResource mstripe q = takeMVar q `onException` cleanup + where + cleanup = uninterruptibleMask_ $ do -- Note [signal uninterruptible] + stripe <- takeMVar mstripe + newStripe <- tryTakeMVar q >>= \case + Just ma -> do + -- Between entering the exception handler and taking ownership of + -- the stripe we got the resource we wanted. We don't need it + -- anymore though, so pass it to someone else. + signal stripe ma + Nothing -> do + -- If we're still waiting, fill up the MVar with an undefined value + -- so that 'signal' can discard our MVar from the queue. + putMVar q $ error "unreachable" + pure stripe + putMVar mstripe newStripe + +-- | If an exception is received while a resource is being created, restore the +-- original size of the stripe. +restoreSize :: MVar (Stripe a) -> IO () +restoreSize mstripe = uninterruptibleMask_ $ do + -- 'uninterruptibleMask_' is used since 'takeMVar' might block. + stripe <- takeMVar mstripe + putMVar mstripe $! stripe { available = available stripe + 1 } + +-- | Free resource entries in the stripes that fulfil a given condition. +cleanStripe + :: (Entry a -> Bool) + -> (a -> IO ()) + -> MVar (Stripe a) + -> IO () +cleanStripe isStale free mstripe = mask $ \unmask -> do + -- Asynchronous exceptions need to be masked here to prevent leaking of + -- 'stale' resources before they're freed. + stale <- modifyMVar mstripe $ \stripe -> unmask $ do + let (stale, fresh) = L.partition isStale (cache stripe) + -- There's no need to update 'available' here because it only tracks + -- the number of resources taken from the pool. + newStripe = stripe { cache = fresh } + newStripe `seq` pure (newStripe, map entry stale) + -- We need to ignore exceptions in the 'free' function, otherwise if an + -- exception is thrown half-way, we leak the rest of the resources. Also, + -- asynchronous exceptions need to be hard masked here since freeing a + -- resource might in theory block. + uninterruptibleMask_ . forM_ stale $ try @SomeException . free + +-- Note [signal uninterruptible] +-- +-- If we have +-- +-- bracket takeResource putResource (...) +-- +-- and an exception arrives at the putResource, then we must not lose the +-- resource. The putResource is masked by bracket, but taking the MVar might +-- block, and so it would be interruptible. Hence we need an uninterruptible +-- variant of mask here. +signal :: Stripe a -> Maybe a -> IO (Stripe a) +signal stripe ma = if available stripe == 0 + then loop (queue stripe) (queueR stripe) + else do + newCache <- case ma of + Just a -> do + now <- getMonotonicTime + pure $ Entry a now : cache stripe + Nothing -> pure $ cache stripe + pure $! stripe { available = available stripe + 1 + , cache = newCache + } + where + loop Empty Empty = do + newCache <- case ma of + Just a -> do + now <- getMonotonicTime + pure [Entry a now] + Nothing -> pure [] + pure $! Stripe { available = 1 + , cache = newCache + , queue = Empty + , queueR = Empty + } + loop Empty qR = loop (reverseQueue qR) Empty + loop (Queue q qs) qR = tryPutMVar q ma >>= \case + -- This fails when 'waitForResource' went into the exception handler and + -- filled the MVar (with an undefined value) itself. In such case we + -- simply ignore it. + False -> loop qs qR + True -> pure $! stripe { available = 0 + , queue = qs + , queueR = qR + } + +reverseQueue :: Queue a -> Queue a +reverseQueue = go Empty + where + go acc = \case + Empty -> acc + Queue x xs -> go (Queue x acc) xs diff --git a/src/Data/Pool/Introspection.hs b/src/Data/Pool/Introspection.hs new file mode 100644 index 0000000..2bab6aa --- /dev/null +++ b/src/Data/Pool/Introspection.hs @@ -0,0 +1,115 @@ +-- | A variant of "Data.Pool" with introspection capabilities. +module Data.Pool.Introspection + ( -- * Pool + PoolConfig(..) + , Pool + , LocalPool + , newPool + + -- * Resource management + , Resource(..) + , Acquisition(..) + , withResource + , takeResource + , putResource + , destroyResource + , destroyAllResources + ) where + +import Control.Concurrent +import Control.Exception +import GHC.Clock +import GHC.Generics (Generic) + +import Data.Pool.Internal + +-- | A resource taken from the pool along with additional information. +data Resource a = Resource + { resource :: a + , stripeNumber :: !Int + , availableResources :: !Int + , acquisition :: !Acquisition + , acquisitionTime :: !Double + , creationTime :: !(Maybe Double) + } deriving (Eq, Show, Generic) + +-- | Describes how a resource was acquired from the pool. +data Acquisition + = Immediate + -- ^ A resource was taken from the pool immediately. + | Delayed + -- ^ The thread had to wait until a resource was released. + deriving (Eq, Show, Generic) + +-- | 'Data.Pool.withResource' with introspection capabilities. +withResource :: Pool a -> (Resource a -> IO r) -> IO r +withResource pool act = mask $ \unmask -> do + (res, localPool) <- takeResource pool + r <- unmask (act res) `onException` destroyResource pool localPool (resource res) + putResource localPool (resource res) + pure r + +-- | 'Data.Pool.takeResource' with introspection capabilities. +takeResource :: Pool a -> IO (Resource a, LocalPool a) +takeResource pool = mask_ $ do + t1 <- getMonotonicTime + lp <- getLocalPool (localPools pool) + stripe <- takeMVar (stripeVar lp) + if available stripe == 0 + then do + q <- newEmptyMVar + putMVar (stripeVar lp) $! stripe { queueR = Queue q (queueR stripe) } + waitForResource (stripeVar lp) q >>= \case + Just a -> do + t2 <- getMonotonicTime + let res = Resource + { resource = a + , stripeNumber = stripeId lp + , availableResources = 0 + , acquisition = Delayed + , acquisitionTime = t2 - t1 + , creationTime = Nothing + } + pure (res, lp) + Nothing -> do + t2 <- getMonotonicTime + a <- createResource (poolConfig pool) `onException` restoreSize (stripeVar lp) + t3 <- getMonotonicTime + let res = Resource + { resource = a + , stripeNumber = stripeId lp + , availableResources = 0 + , acquisition = Delayed + , acquisitionTime = t2 - t1 + , creationTime = Just $! t3 - t2 + } + pure (res, lp) + else case cache stripe of + [] -> do + let newAvailable = available stripe - 1 + putMVar (stripeVar lp) $! stripe { available = newAvailable } + t2 <- getMonotonicTime + a <- createResource (poolConfig pool) `onException` restoreSize (stripeVar lp) + t3 <- getMonotonicTime + let res = Resource + { resource = a + , stripeNumber = stripeId lp + , availableResources = newAvailable + , acquisition = Immediate + , acquisitionTime = t2 - t1 + , creationTime = Just $! t3 - t2 + } + pure (res, lp) + Entry a _ : as -> do + let newAvailable = available stripe - 1 + putMVar (stripeVar lp) $! stripe { available = newAvailable, cache = as } + t2 <- getMonotonicTime + let res = Resource + { resource = a + , stripeNumber = stripeId lp + , availableResources = newAvailable + , acquisition = Immediate + , acquisitionTime = t2 - t1 + , creationTime = Nothing + } + pure (res, lp)