Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix migration race #50

Merged
merged 8 commits into from
Mar 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 20 additions & 17 deletions domaindriven-core/domaindriven-core.cabal
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
cabal-version: 1.12

-- This file has been generated from package.yaml by hpack version 0.35.1.
-- This file has been generated from package.yaml by hpack version 0.36.0.
--
-- see: https://github.com/sol/hpack

Expand Down Expand Up @@ -70,7 +70,7 @@ library
TypeOperators
TypeSynonymInstances
ViewPatterns
ghc-options: -Wall -Wcompat -Widentities -Wincomplete-record-updates -Wincomplete-uni-patterns -Wpartial-fields -Wredundant-constraints -Wincomplete-record-updates -Wincomplete-patterns
ghc-options: -Wall -Wcompat -Widentities -Wincomplete-record-updates -Wincomplete-uni-patterns -Wpartial-fields -Wredundant-constraints -Wincomplete-record-updates -Wincomplete-patterns -Wunused-packages
build-depends:
aeson >=2.0.3 && <2.2
, async >=2.2.4 && <2.3
Expand All @@ -82,15 +82,16 @@ library
, generic-lens >=2.2.1.0 && <2.3
, http-types >=0.12.3 && <0.13
, microlens >=0.4.12.0 && <0.5
, mtl >=2.2.2 && <2.3
, postgresql-simple >=0.6.4 && <0.7
, mtl >=2.2.2 && <2.4
, postgresql-simple >=0.6.4 && <0.8
, random >=1.2.1.1 && <1.3
, streamly >=0.8.1.1 && <0.9
, template-haskell >=2.18.0.0 && <2.19
, time >=1.11.1 && <1.12
, transformers >=0.5.6.2 && <0.6
, streamly >=0.9 && <0.11
, streamly-core ==0.2.*
, template-haskell >=2.18.0.0 && <2.21
, time >=1.11.1 && <1.13
, transformers >=0.5.6.2 && <0.7
, unliftio >=0.2.0.1 && <0.3
, unliftio-pool >=0.2.2.0 && <0.3
, unliftio-pool >=0.2.2.0 && <0.5
, unordered-containers >=0.2.19.1 && <0.3
, uuid >=1.3.15 && <1.4
, vector >=0.12.3.1 && <0.14
Expand All @@ -100,7 +101,7 @@ test-suite domaindriven-core-test
type: exitcode-stdio-1.0
main-is: Spec.hs
other-modules:
DomainDriven.Persistance.PostgresIORefStateSpec
DomainDriven.Persistance.PostgresSpec
Paths_domaindriven_core
hs-source-dirs:
test
Expand Down Expand Up @@ -137,15 +138,17 @@ test-suite domaindriven-core-test
TypeOperators
TypeSynonymInstances
ViewPatterns
ghc-options: -Wall -Wcompat -Widentities -Wincomplete-record-updates -Wincomplete-uni-patterns -Wpartial-fields -Wredundant-constraints -Wincomplete-record-updates -Wincomplete-patterns -threaded -rtsopts -with-rtsopts=-N -Wall -Wunused-packages
ghc-options: -Wall -Wcompat -Widentities -Wincomplete-record-updates -Wincomplete-uni-patterns -Wpartial-fields -Wredundant-constraints -Wincomplete-record-updates -Wincomplete-patterns -Wunused-packages -threaded -rtsopts -with-rtsopts=-N -Wall -Wunused-packages
build-depends:
aeson >=2.0.3 && <2.2
, base >=4.7 && <5
, domaindriven-core >=0.5.0 && <0.6
, hspec >=2.9.7 && <2.10
, postgresql-simple >=0.6.4 && <0.7
, streamly >=0.8.1.1 && <0.9
, time >=1.11.1 && <1.12
, unliftio-pool >=0.2.2.0 && <0.3
, domaindriven-core
, hspec >=2.9 && <2.12
, postgresql-simple >=0.6.4 && <0.8
, streamly >=0.9 && <0.11
, streamly-core ==0.2.*
, time >=1.11.1 && <1.13
, unliftio >=0.2.0.1 && <0.3
, unliftio-pool >=0.2.2.0 && <0.5
, uuid >=1.3.15 && <1.4
default-language: Haskell2010
62 changes: 33 additions & 29 deletions domaindriven-core/package.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ category: Web
description: Please see the README on GitHub at <https://github.com/tommyengstrom/domaindriven#readme>

dependencies:
- aeson >= 2.0.3 && < 2.2
- base >= 4.7 && < 5
- aeson >=2.0.3 && <2.2
- base >=4.7 && <5


default-extensions:
Expand Down Expand Up @@ -68,30 +68,32 @@ ghc-options:
- -Wredundant-constraints
- -Wincomplete-record-updates
- -Wincomplete-patterns
- -Wunused-packages

library:
source-dirs: src
dependencies:
- async >= 2.2.4 && < 2.3
- bytestring >= 0.11.3 && < 0.12
- containers >= 0.6.5.1 && < 0.7
- deepseq >= 1.4.6.1 && < 1.5
- exceptions >= 0.10.4 && < 0.11
- generic-lens >= 2.2.1.0 && < 2.3
- http-types >= 0.12.3 && < 0.13
- microlens >= 0.4.12.0 && < 0.5
- mtl >= 2.2.2 && < 2.3
- postgresql-simple >= 0.6.4 && < 0.7
- random >= 1.2.1.1 && < 1.3
- streamly >= 0.8.1.1 && < 0.9
- template-haskell >= 2.18.0.0 && < 2.19
- time >= 1.11.1 && < 1.12
- transformers >= 0.5.6.2 && < 0.6
- unliftio >= 0.2.0.1 && < 0.3
- unliftio-pool >= 0.2.2.0 && < 0.3
- unordered-containers >= 0.2.19.1 && < 0.3
- uuid >= 1.3.15 && < 1.4
- vector >= 0.12.3.1 && < 0.14
- async >=2.2.4 && <2.3
- bytestring >=0.11.3 && <0.12
- containers >=0.6.5.1 && <0.7
- deepseq >=1.4.6.1 && <1.5
- exceptions >=0.10.4 && <0.11
- generic-lens >=2.2.1.0 && <2.3
- http-types >=0.12.3 && <0.13
- microlens >=0.4.12.0 && <0.5
- mtl >=2.2.2 && <2.4
- postgresql-simple >=0.6.4 && <0.8
- random >=1.2.1.1 && <1.3
- streamly >=0.9 && <0.11
- streamly-core >=0.2 && <0.3
- template-haskell >=2.18.0.0 && <2.21
- time >=1.11.1 && <1.13
- transformers >=0.5.6.2 && <0.7
- unliftio >=0.2.0.1 && <0.3
- unliftio-pool >=0.2.2.0 && <0.5
- uuid >=1.3.15 && <1.4
- unordered-containers >=0.2.19.1 && <0.3
- vector >=0.12.3.1 && <0.14
tests:
domaindriven-core-test:
main: Spec.hs
Expand All @@ -104,10 +106,12 @@ tests:
- -Wall
- -Wunused-packages
dependencies:
- domaindriven-core >= 0.5.0 && < 0.6
- hspec >= 2.9.7 && < 2.10
- postgresql-simple >= 0.6.4 && < 0.7
- streamly >= 0.8.1.1 && < 0.9
- time >= 1.11.1 && < 1.12
- unliftio-pool >= 0.2.2.0 && < 0.3
- uuid >= 1.3.15 && < 1.4
- domaindriven-core
- hspec >=2.9 && <2.12
- postgresql-simple >=0.6.4 && <0.8
- streamly >=0.9 && <0.11
- streamly-core >=0.2 && <0.3
- time >=1.11.1 && <1.13
- unliftio >=0.2.0.1 && <0.3
- unliftio-pool >=0.2.2.0 && <0.5
- uuid >=1.3.15 && <1.4
4 changes: 2 additions & 2 deletions domaindriven-core/src/DomainDriven/Persistance/Class.hs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import Data.Kind
import Data.Time
import Data.UUID (UUID)
import GHC.Generics (Generic)
import Streamly.Prelude (SerialT)
import Streamly.Data.Stream.Prelude (Stream)
import System.Random
import UnliftIO
import Prelude
Expand All @@ -23,7 +23,7 @@ class ReadModel p where
applyEvent :: p -> Model p -> Stored (Event p) -> Model p
getModel :: p -> IO (Model p)
getEventList :: p -> IO [Stored (Event p)]
getEventStream :: p -> SerialT IO (Stored (Event p))
getEventStream :: p -> Stream IO (Stored (Event p))

type TransactionalUpdate model event m a = (model -> m (model -> a, [event])) -> m a

Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
{-# LANGUAGE InstanceSigs #-}

module DomainDriven.Persistance.ForgetfulInMemory where

import Data.List (foldl')
import DomainDriven.Persistance.Class
import GHC.Generics (Generic)
import Streamly.Prelude qualified as S
import Streamly.Data.Stream.Prelude qualified as Stream
import UnliftIO
import Prelude

Expand Down Expand Up @@ -32,12 +34,15 @@ data ForgetfulInMemory model event = ForgetfulInMemory
instance ReadModel (ForgetfulInMemory model e) where
type Model (ForgetfulInMemory model e) = model
type Event (ForgetfulInMemory model e) = e
applyEvent ff = apply ff
applyEvent = apply
getModel :: ForgetfulInMemory model e -> IO (Model (ForgetfulInMemory model e))
getModel ff = readIORef $ stateRef ff
getEventList ff = readIORef $ events ff
getEventStream ff = do
l <- liftIO $ getEventList ff
S.fromList l
getEventStream ff =
Stream.bracketIO
(getEventList ff)
(const (pure ()))
Stream.fromList
Comment on lines +41 to +45
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this just be fromEffect?

As in, fromEffect (getEventList ff)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok nvm, you are right.


instance WriteModel (ForgetfulInMemory model e) where
transactionalUpdate ff evalCmd =
Expand Down
72 changes: 46 additions & 26 deletions domaindriven-core/src/DomainDriven/Persistance/Postgres/Internal.hs
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,19 @@ import Lens.Micro
( to
, (^.)
)
import Streamly.Data.Fold qualified as Fold
import Streamly.Data.Stream.Prelude (Stream)
import Streamly.Data.Stream.Prelude qualified as Stream
import Streamly.Data.Unfold qualified as Unfold
import Streamly.Prelude qualified as S
import UnliftIO (MonadUnliftIO (..))
import UnliftIO.Pool
( LocalPool
, Pool
, createPool
, destroyResource
, mkDefaultPoolConfig
, newPool
, putResource
, setNumStripes
, takeResource
, withResource
)
Expand Down Expand Up @@ -122,12 +126,20 @@ createRetireFunction conn =
\language plpgsql;"

simplePool' :: MonadUnliftIO m => PG.ConnectInfo -> m (Pool Connection)
simplePool' connInfo =
createPool (liftIO $ PG.connect connInfo) (liftIO . PG.close) 1 5 5
simplePool' connInfo = simplePool (PG.connect connInfo)

simplePool :: MonadUnliftIO m => IO Connection -> m (Pool Connection)
simplePool getConn =
createPool (liftIO getConn) (liftIO . PG.close) 1 5 5
simplePool getConn = do
-- Using stripesAndResources because the default is crazy:
-- https://github.com/scrive/pool/pull/16
-- "Set number of stripes to number of cores and crash if there are fewer resources"
let stripesAndResources :: Int
stripesAndResources = 5
poolCfg <-
setNumStripes (Just stripesAndResources)
<$> mkDefaultPoolConfig (liftIO getConn) (liftIO . PG.close) 1.5 stripesAndResources

newPool poolCfg

-- | Setup the persistance model and verify that the tables exist.
postgresWriteModelNoMigration
Expand Down Expand Up @@ -173,7 +185,10 @@ runMigrations trans et = do
(MigrateUsing{}, [Only True]) -> pure ()
(InitialVersion _, [Only False]) -> createTable
(MigrateUsing mig prevEt, [Only False]) -> do
-- Ensure migrations are done up until the previous table
runMigrations trans prevEt
-- Then lock lock the previous table before we start
exclusiveLock trans (getEventTableName prevEt)
createTable
mig (getEventTableName prevEt) (getEventTableName et) conn
retireTable conn (getEventTableName prevEt)
Expand Down Expand Up @@ -298,23 +313,23 @@ writeEvents conn eventTable storedEvents = do
("select coalesce(max(event_number),1) from \"" <> fromString eventTable <> "\"")

getEventStream'
:: FromJSON event => PostgresEventTrans model event -> S.SerialT IO (Stored event)
:: FromJSON event => PostgresEventTrans model event -> Stream IO (Stored event)
getEventStream' pgt =
S.map fst $
mkEventStream
fst
<$> mkEventStream
(pgt ^. field @"chunkSize")
(pgt ^. field @"transaction" . field @"connection")
(pgt ^. field @"eventTableName" . to mkEventQuery)

-- | A transaction that is always rolled back at the end.
-- This is useful when using cursors as they can only be used inside a transaction.
withStreamReadTransaction
:: forall t m a model event
. (S.IsStream t, S.MonadAsync m, MonadCatch m)
:: forall m a model event
. (Stream.MonadAsync m, MonadCatch m)
=> PostgresEvent model event
-> (PostgresEventTrans model event -> t m a)
-> t m a
withStreamReadTransaction pg = S.bracket startTrans rollbackTrans
-> (PostgresEventTrans model event -> Stream m a)
-> Stream m a
withStreamReadTransaction pg = Stream.bracket startTrans rollbackTrans
where
startTrans :: m (PostgresEventTrans model event)
startTrans = liftIO $ do
Expand Down Expand Up @@ -387,7 +402,7 @@ mkEventStream
=> ChunkSize
-> Connection
-> EventQuery
-> S.SerialT IO (Stored event, EventNumber)
-> Stream IO (Stored event, EventNumber)
mkEventStream chunkSize conn q = do
let step :: Cursor.Cursor -> IO (Maybe (Seq EventRowOut, Cursor.Cursor))
step cursor = do
Expand All @@ -397,12 +412,16 @@ mkEventStream chunkSize conn q = do
Left a -> pure $ Just (a, cursor)
Right a -> pure $ Just (a, cursor)

cursor <- liftIO $ Cursor.declareCursor conn (getPgQuery q)
S.mapM fromEventRow $
S.unfoldMany Unfold.fromList . fmap toList $
S.unfoldrM
step
cursor
Stream.bracketIO
(Cursor.declareCursor conn (getPgQuery q))
(Cursor.closeCursor)
( \cursor ->
Stream.mapM fromEventRow $
Stream.unfoldMany Unfold.fromList . fmap toList $
Stream.unfoldrM
step
cursor
)

getModel' :: forall e m. FromJSON e => PostgresEventTrans m e -> IO m
getModel' pgt = do
Expand All @@ -423,8 +442,7 @@ refreshModel pg = do
-- refresh doesn't write any events but changes the state and thus needs a lock
exclusiveLock (pg ^. field @"transaction") (pg ^. field @"eventTableName")
NumberedModel model lastEventNo <- readIORef (pg ^. field @"modelIORef")
let eventStream :: S.SerialT IO (Stored e, EventNumber)
eventStream =
let eventStream =
mkEventStream
(pg ^. field @"chunkSize")
(pg ^. field @"transaction" . field @"connection")
Expand All @@ -435,9 +453,11 @@ refreshModel pg = do
NumberedModel ((pg ^. field @"app") m ev) evNumber

NumberedModel newModel lastNewEventNo <-
S.foldl'
applyModel
(NumberedModel model lastEventNo)
Stream.fold
( Fold.foldl'
applyModel
(NumberedModel model lastEventNo)
)
eventStream

_ <- writeIORef (pg ^. field @"modelIORef") $ NumberedModel newModel lastNewEventNo
Expand Down
Loading
Loading