-
-
Notifications
You must be signed in to change notification settings - Fork 297
Sub-transactions API #912
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: develop
Are you sure you want to change the base?
Sub-transactions API #912
Changes from 7 commits
f7455b1
a35d24e
aa8ae18
8998091
351d157
b38d9d7
df654a1
d04aea8
1cc57bc
3ed2afe
94f7cf8
cb812f4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,103 @@ | ||
| /* | ||
| Portions Copyright 2019-2021 ZomboDB, LLC. | ||
| Portions Copyright 2021-2022 Technology Concepts & Design, Inc. <[email protected]> | ||
|
|
||
| All rights reserved. | ||
|
|
||
| Use of this source code is governed by the MIT license that can be found in the LICENSE file. | ||
| */ | ||
|
|
||
| #[cfg(any(test, feature = "pg_test"))] | ||
| #[pgx::pg_schema] | ||
| mod tests { | ||
| #[allow(unused_imports)] | ||
| use crate as pgx_tests; | ||
|
|
||
| use pgx::prelude::*; | ||
| use pgx::SpiClient; | ||
|
|
||
| #[pg_test] | ||
| fn test_subxact_smoketest() { | ||
| Spi::execute(|c| { | ||
| c.update("CREATE TABLE a (v INTEGER)", None, None); | ||
| let c = c.sub_transaction(|xact| { | ||
| xact.update("INSERT INTO a VALUES (0)", None, None); | ||
| assert_eq!( | ||
| 0, | ||
| xact.select("SELECT v FROM a", Some(1), None) | ||
| .first() | ||
| .get_datum::<i32>(1) | ||
| .unwrap() | ||
| ); | ||
| let xact = xact.sub_transaction(|xact| { | ||
| xact.update("INSERT INTO a VALUES (1)", None, None); | ||
| assert_eq!( | ||
| 2, | ||
| xact.select("SELECT COUNT(*) FROM a", Some(1), None) | ||
| .first() | ||
| .get_datum::<i32>(1) | ||
| .unwrap() | ||
| ); | ||
| xact.rollback() | ||
| }); | ||
| xact.rollback() | ||
| }); | ||
| assert_eq!( | ||
| 0, | ||
| c.select("SELECT COUNT(*) FROM a", Some(1), None) | ||
| .first() | ||
| .get_datum::<i32>(1) | ||
| .unwrap() | ||
| ); | ||
| }) | ||
| } | ||
|
|
||
| #[pg_test] | ||
| fn test_commit_on_drop() { | ||
| Spi::execute(|c| { | ||
| c.update("CREATE TABLE a (v INTEGER)", None, None); | ||
| // The type below is explicit to ensure it's commit on drop by default | ||
| c.sub_transaction(|xact: SubTransaction<SpiClient, true>| { | ||
| xact.update("INSERT INTO a VALUES (0)", None, None); | ||
| // Dropped explicitly for illustration purposes | ||
| drop(xact); | ||
| }); | ||
| // Create a new client to check the state | ||
| Spi::execute(|c| { | ||
| // The above insert should have been committed | ||
| assert_eq!( | ||
| 1, | ||
| c.select("SELECT COUNT(*) FROM a", Some(1), None) | ||
| .first() | ||
| .get_datum::<i32>(1) | ||
| .unwrap() | ||
| ); | ||
| }); | ||
| }) | ||
| } | ||
|
|
||
| #[pg_test] | ||
| fn test_rollback_on_drop() { | ||
| Spi::execute(|c| { | ||
| c.update("CREATE TABLE a (v INTEGER)", None, None); | ||
| // The type below is explicit to ensure it's commit on drop by default | ||
| c.sub_transaction(|xact: SubTransaction<SpiClient, true>| { | ||
| xact.update("INSERT INTO a VALUES (0)", None, None); | ||
| let xact = xact.rollback_on_drop(); | ||
| // Dropped explicitly for illustration purposes | ||
| drop(xact); | ||
| }); | ||
| // Create a new client to check the state | ||
| Spi::execute(|c| { | ||
| // The above insert should NOT have been committed | ||
| assert_eq!( | ||
| 0, | ||
| c.select("SELECT COUNT(*) FROM a", Some(1), None) | ||
| .first() | ||
| .get_datum::<i32>(1) | ||
| .unwrap() | ||
| ); | ||
| }); | ||
| }) | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,190 @@ | ||
| use crate::{pg_sys, PgMemoryContexts, SpiClient}; | ||
| use std::fmt::Debug; | ||
| use std::ops::Deref; | ||
|
|
||
| /// Sub-transaction | ||
| /// | ||
| /// Can be created by calling `SpiClient::sub_transaction`, `SubTransaction<Parent>::sub_transaction` | ||
| /// or any other implementation of `SubTransactionExt` and obtaining it as an argument to the provided closure. | ||
|
||
| /// | ||
| /// Unless rolled back or committed explicitly, it'll commit if `COMMIT` generic parameter is `true` | ||
| /// (default) or roll back if it is `false`. | ||
| #[derive(Debug)] | ||
| pub struct SubTransaction<Parent: SubTransactionExt, const COMMIT: bool = true> { | ||
| memory_context: pg_sys::MemoryContext, | ||
| // Resource ownership before the transaction | ||
| // | ||
| // Based on information from src/backend/utils/resowner/README | ||
| // as well as practical use of it in src/pl/plpython/plpy_spi.c | ||
| resource_owner: pg_sys::ResourceOwner, | ||
| // Should the transaction be released, or was it already committed or rolled back? | ||
| // | ||
| // The reason we are not calling this `released` as we're also using this flag when | ||
| // we convert between commit_on_drop and rollback_on_drop to ensure it doesn't get released | ||
| // on the drop of the original value. | ||
| should_release: bool, | ||
|
||
| parent: Option<Parent>, | ||
| } | ||
|
|
||
| impl<Parent: SubTransactionExt, const COMMIT: bool> SubTransaction<Parent, COMMIT> { | ||
| /// Create a new sub-transaction. | ||
| fn new(parent: Parent) -> Self { | ||
| // Remember the memory context before starting the sub-transaction | ||
| let ctx = PgMemoryContexts::CurrentMemoryContext.value(); | ||
| // Remember resource owner before starting the sub-transaction | ||
| let resource_owner = unsafe { pg_sys::CurrentResourceOwner }; | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could we get a note mentioning when this variable is set (i.e. "what is the last write we are expecting to be reading")? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you please elaborate on this request? I am not 100% sure I understood what you're asking me to do. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think Jubilee means when might Postgres initially set (or later change) this and how are we certain it even has a valid value at this point in time? |
||
| unsafe { | ||
| pg_sys::BeginInternalSubTransaction(std::ptr::null() /* [no] transaction name */); | ||
| } | ||
| // Switch to the outer memory context so that all allocations remain | ||
| // there instead of the sub-transaction's context | ||
| PgMemoryContexts::For(ctx).set_as_current(); | ||
| Self { memory_context: ctx, should_release: true, resource_owner, parent: Some(parent) } | ||
| } | ||
|
|
||
| /// Commit the transaction, returning its parent | ||
| pub fn commit(mut self) -> Parent { | ||
| self.internal_commit(); | ||
| self.should_release = false; | ||
| self.parent.take().unwrap() | ||
| } | ||
|
|
||
| /// Rollback the transaction, returning its parent | ||
| pub fn rollback(mut self) -> Parent { | ||
| self.internal_rollback(); | ||
| self.should_release = false; | ||
| self.parent.take().unwrap() | ||
| } | ||
yrashk marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| fn internal_rollback(&self) { | ||
| unsafe { | ||
| pg_sys::RollbackAndReleaseCurrentSubTransaction(); | ||
| pg_sys::CurrentResourceOwner = self.resource_owner; | ||
yrashk marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| } | ||
| PgMemoryContexts::For(self.memory_context).set_as_current(); | ||
| } | ||
|
|
||
| fn internal_commit(&self) { | ||
| unsafe { | ||
| pg_sys::ReleaseCurrentSubTransaction(); | ||
| pg_sys::CurrentResourceOwner = self.resource_owner; | ||
| } | ||
| PgMemoryContexts::For(self.memory_context).set_as_current(); | ||
| } | ||
| } | ||
|
|
||
| impl<Parent: SubTransactionExt> SubTransaction<Parent, true> { | ||
| /// Make this sub-transaction roll back on drop | ||
| pub fn rollback_on_drop(self) -> SubTransaction<Parent, false> { | ||
| self.into() | ||
| } | ||
| } | ||
|
|
||
| impl<Parent: SubTransactionExt> SubTransaction<Parent, false> { | ||
| /// Make this sub-transaction commit on drop | ||
| pub fn commit_on_drop(self) -> SubTransaction<Parent, true> { | ||
| self.into() | ||
| } | ||
| } | ||
|
|
||
| impl<Parent: SubTransactionExt> Into<SubTransaction<Parent, false>> | ||
| for SubTransaction<Parent, true> | ||
| { | ||
| fn into(mut self) -> SubTransaction<Parent, false> { | ||
| let result = SubTransaction { | ||
| memory_context: self.memory_context, | ||
| resource_owner: self.resource_owner, | ||
| should_release: self.should_release, | ||
| parent: self.parent.take(), | ||
| }; | ||
| // Make sure original sub-transaction won't commit | ||
| self.should_release = false; | ||
| result | ||
| } | ||
|
||
| } | ||
|
|
||
| impl<Parent: SubTransactionExt> Into<SubTransaction<Parent, true>> | ||
| for SubTransaction<Parent, false> | ||
| { | ||
| fn into(mut self) -> SubTransaction<Parent, true> { | ||
| let result = SubTransaction { | ||
| memory_context: self.memory_context, | ||
| resource_owner: self.resource_owner, | ||
| should_release: self.should_release, | ||
| parent: self.parent.take(), | ||
| }; | ||
| // Make sure original sub-transaction won't roll back | ||
| self.should_release = false; | ||
| result | ||
| } | ||
| } | ||
|
|
||
| impl<Parent: SubTransactionExt, const COMMIT: bool> Drop for SubTransaction<Parent, COMMIT> { | ||
| fn drop(&mut self) { | ||
| if self.should_release { | ||
| if COMMIT { | ||
| self.internal_commit(); | ||
| } else { | ||
| self.internal_rollback(); | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| // This allows SubTransaction to be de-referenced to SpiClient | ||
| impl<'conn, const COMMIT: bool> Deref for SubTransaction<SpiClient<'conn>, COMMIT> { | ||
| type Target = SpiClient<'conn>; | ||
|
|
||
| fn deref(&self) -> &Self::Target { | ||
| self.parent.as_ref().unwrap() | ||
| } | ||
| } | ||
|
|
||
| // This allows a SubTransaction of a SubTransaction to be de-referenced to SpiClient | ||
| impl<Parent: SubTransactionExt, const COMMIT: bool> Deref | ||
| for SubTransaction<SubTransaction<Parent>, COMMIT> | ||
| { | ||
| type Target = Parent; | ||
|
|
||
| fn deref(&self) -> &Self::Target { | ||
| self.parent.as_ref().and_then(|p| p.parent.as_ref()).unwrap() | ||
| } | ||
| } | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Wouldn't it be more general for There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is how it was initially https://github.com/supabase/pgx-contrib-spiext/blob/main/src/subtxn.rs#L137-L143 However, this has prevented me from using However, with this special case, I no longer need a Box |
||
|
|
||
| /// Trait that allows creating a sub-transaction off any type | ||
| pub trait SubTransactionExt { | ||
| /// Parent's type | ||
| /// | ||
| /// In most common cases, it'll be equal to `Self`. However, in some cases | ||
| /// it may be desirable to use a different type to achieve certain goals. | ||
| type Parent: SubTransactionExt; | ||
|
|
||
| /// Consume `self` and execute a closure with a sub-transaction | ||
| /// | ||
| /// If further use of the given sub-transaction is necessary, it must | ||
| /// be returned by the closure alongside with its intended result. Otherwise, | ||
| /// the sub-transaction be released when dropped. | ||
| fn sub_transaction<F: FnOnce(SubTransaction<Self::Parent>) -> R, R>(self, f: F) -> R | ||
| where | ||
| Self: Sized; | ||
| } | ||
|
|
||
| impl<'a> SubTransactionExt for SpiClient<'a> { | ||
| type Parent = Self; | ||
| fn sub_transaction<F: FnOnce(SubTransaction<Self::Parent>) -> R, R>(self, f: F) -> R | ||
| where | ||
| Self: Sized, | ||
| { | ||
| f(SubTransaction::new(self)) | ||
| } | ||
| } | ||
|
|
||
| impl<Parent: SubTransactionExt> SubTransactionExt for SubTransaction<Parent> { | ||
| type Parent = Self; | ||
| fn sub_transaction<F: FnOnce(SubTransaction<Self::Parent>) -> R, R>(self, f: F) -> R | ||
| where | ||
| Self: Sized, | ||
| { | ||
| f(SubTransaction::new(self)) | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably better to limit this to just the module:
pub use crate::subxact;.I think we're trying to be a little better about globbing everything into the current namespace.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure this is the best. At the very least, I think we should consider
pub use crate::subxact::{self, SubTransactionExt}for the sub-transaction functionality to be easily available.