-
Notifications
You must be signed in to change notification settings - Fork 2
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix migration race #50
Changes from 7 commits
2e8a15a
78cd1f8
6793b5e
94d5c64
b600c96
db59651
79dd32a
b95c58b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -21,15 +21,19 @@ import Lens.Micro | |
( to | ||
, (^.) | ||
) | ||
import Streamly.Data.Fold qualified as Fold | ||
import Streamly.Data.Stream.Prelude (Stream) | ||
import Streamly.Data.Stream.Prelude qualified as Stream | ||
import Streamly.Data.Unfold qualified as Unfold | ||
import Streamly.Prelude qualified as S | ||
import UnliftIO (MonadUnliftIO (..)) | ||
import UnliftIO.Pool | ||
( LocalPool | ||
, Pool | ||
, createPool | ||
, destroyResource | ||
, mkDefaultPoolConfig | ||
, newPool | ||
, putResource | ||
, setNumStripes | ||
, takeResource | ||
, withResource | ||
) | ||
|
@@ -122,12 +126,19 @@ createRetireFunction conn = | |
\language plpgsql;" | ||
|
||
simplePool' :: MonadUnliftIO m => PG.ConnectInfo -> m (Pool Connection) | ||
simplePool' connInfo = | ||
createPool (liftIO $ PG.connect connInfo) (liftIO . PG.close) 1 5 5 | ||
simplePool' connInfo = simplePool (PG.connect connInfo) | ||
|
||
simplePool :: MonadUnliftIO m => IO Connection -> m (Pool Connection) | ||
simplePool getConn = | ||
createPool (liftIO getConn) (liftIO . PG.close) 1 5 5 | ||
simplePool getConn = do | ||
-- Using stripesAndResources because the default is crazy: | ||
-- "Set num resources to cores and crash if there are fewer stripes" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could you add a link to scrive/pool#16 to add some context? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah and the comment is backwards. It's 'set striped to num cores and crash if there are fewer resources than stripes'. |
||
let stripesAndResources :: Int | ||
stripesAndResources = 5 | ||
poolCfg <- | ||
setNumStripes (Just stripesAndResources) | ||
<$> mkDefaultPoolConfig (liftIO getConn) (liftIO . PG.close) 1.5 stripesAndResources | ||
|
||
newPool poolCfg | ||
|
||
-- | Setup the persistance model and verify that the tables exist. | ||
postgresWriteModelNoMigration | ||
|
@@ -173,7 +184,10 @@ runMigrations trans et = do | |
(MigrateUsing{}, [Only True]) -> pure () | ||
(InitialVersion _, [Only False]) -> createTable | ||
(MigrateUsing mig prevEt, [Only False]) -> do | ||
-- Ensure migrations are done up until the previous table | ||
runMigrations trans prevEt | ||
-- Then lock lock the previous table before we start | ||
exclusiveLock trans (getEventTableName prevEt) | ||
createTable | ||
mig (getEventTableName prevEt) (getEventTableName et) conn | ||
retireTable conn (getEventTableName prevEt) | ||
|
@@ -298,23 +312,23 @@ writeEvents conn eventTable storedEvents = do | |
("select coalesce(max(event_number),1) from \"" <> fromString eventTable <> "\"") | ||
|
||
getEventStream' | ||
:: FromJSON event => PostgresEventTrans model event -> S.SerialT IO (Stored event) | ||
:: FromJSON event => PostgresEventTrans model event -> Stream IO (Stored event) | ||
getEventStream' pgt = | ||
S.map fst $ | ||
mkEventStream | ||
fst | ||
<$> mkEventStream | ||
(pgt ^. field @"chunkSize") | ||
(pgt ^. field @"transaction" . field @"connection") | ||
(pgt ^. field @"eventTableName" . to mkEventQuery) | ||
|
||
-- | A transaction that is always rolled back at the end. | ||
-- This is useful when using cursors as they can only be used inside a transaction. | ||
withStreamReadTransaction | ||
:: forall t m a model event | ||
. (S.IsStream t, S.MonadAsync m, MonadCatch m) | ||
:: forall m a model event | ||
. (Stream.MonadAsync m, MonadCatch m) | ||
=> PostgresEvent model event | ||
-> (PostgresEventTrans model event -> t m a) | ||
-> t m a | ||
withStreamReadTransaction pg = S.bracket startTrans rollbackTrans | ||
-> (PostgresEventTrans model event -> Stream m a) | ||
-> Stream m a | ||
withStreamReadTransaction pg = Stream.bracket startTrans rollbackTrans | ||
where | ||
startTrans :: m (PostgresEventTrans model event) | ||
startTrans = liftIO $ do | ||
|
@@ -387,7 +401,7 @@ mkEventStream | |
=> ChunkSize | ||
-> Connection | ||
-> EventQuery | ||
-> S.SerialT IO (Stored event, EventNumber) | ||
-> Stream IO (Stored event, EventNumber) | ||
mkEventStream chunkSize conn q = do | ||
let step :: Cursor.Cursor -> IO (Maybe (Seq EventRowOut, Cursor.Cursor)) | ||
step cursor = do | ||
|
@@ -397,12 +411,17 @@ mkEventStream chunkSize conn q = do | |
Left a -> pure $ Just (a, cursor) | ||
Right a -> pure $ Just (a, cursor) | ||
|
||
cursor <- liftIO $ Cursor.declareCursor conn (getPgQuery q) | ||
S.mapM fromEventRow $ | ||
S.unfoldMany Unfold.fromList . fmap toList $ | ||
S.unfoldrM | ||
step | ||
cursor | ||
-- cursor <- liftIO $ Cursor.declareCursor conn (getPgQuery q) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. any reason you want to leave the comment? |
||
Stream.bracketIO | ||
(Cursor.declareCursor conn (getPgQuery q)) | ||
(Cursor.closeCursor) | ||
( \cursor -> | ||
Stream.mapM fromEventRow $ | ||
Stream.unfoldMany Unfold.fromList . fmap toList $ | ||
Stream.unfoldrM | ||
step | ||
cursor | ||
) | ||
|
||
getModel' :: forall e m. FromJSON e => PostgresEventTrans m e -> IO m | ||
getModel' pgt = do | ||
|
@@ -423,8 +442,7 @@ refreshModel pg = do | |
-- refresh doesn't write any events but changes the state and thus needs a lock | ||
exclusiveLock (pg ^. field @"transaction") (pg ^. field @"eventTableName") | ||
NumberedModel model lastEventNo <- readIORef (pg ^. field @"modelIORef") | ||
let eventStream :: S.SerialT IO (Stored e, EventNumber) | ||
eventStream = | ||
let eventStream = | ||
mkEventStream | ||
(pg ^. field @"chunkSize") | ||
(pg ^. field @"transaction" . field @"connection") | ||
|
@@ -435,9 +453,11 @@ refreshModel pg = do | |
NumberedModel ((pg ^. field @"app") m ev) evNumber | ||
|
||
NumberedModel newModel lastNewEventNo <- | ||
S.foldl' | ||
applyModel | ||
(NumberedModel model lastEventNo) | ||
Stream.fold | ||
( Fold.foldl' | ||
applyModel | ||
(NumberedModel model lastEventNo) | ||
) | ||
eventStream | ||
|
||
_ <- writeIORef (pg ^. field @"modelIORef") $ NumberedModel newModel lastNewEventNo | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't this just be fromEffect?
As in,
fromEffect (getEventList ff)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok nvm, you are right.