Skip to content

Commit 2c02c0c

Browse files
committed
Optimize native versioning
1 parent d0ed204 commit 2c02c0c

File tree

2 files changed

+72
-78
lines changed

2 files changed

+72
-78
lines changed

sqlalchemy_continuum/dialects/postgresql.py

+43-6
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,17 @@
2929
WHERE NOT EXISTS (SELECT 1 FROM upsert);
3030
"""
3131

32+
temporary_transaction_sql = """
33+
CREATE TEMP TABLE {temporary_transaction_table}
34+
({transaction_table_columns})
35+
ON COMMIT DROP;
36+
"""
37+
38+
insert_temporary_transaction_sql = """
39+
INSERT INTO {temporary_transaction_table} ({transaction_table_columns})
40+
VALUES ({transaction_values});
41+
"""
42+
3243
procedure_sql = """
3344
CREATE OR REPLACE FUNCTION {procedure_name}() RETURNS TRIGGER AS $$
3445
DECLARE transaction_id_value INT;
@@ -40,12 +51,8 @@
4051
INSERT INTO transaction (native_tx_id)
4152
VALUES (txid_current()) RETURNING id INTO transaction_id_value;
4253
43-
CREATE TEMP TABLE continuum_temp_transaction
44-
(id BIGINT, PRIMARY KEY(id))
45-
ON COMMIT DROP;
46-
47-
INSERT INTO continuum_temp_transaction (id)
48-
VALUES (transaction_id_value);
54+
{temporary_transaction_sql}
55+
{insert_temporary_transaction_sql}
4956
END;
5057
5158
IF (TG_OP = 'INSERT') THEN
@@ -127,6 +134,13 @@ def transaction_table_name(self):
127134
else:
128135
return 'transaction'
129136

137+
@property
138+
def temporary_transaction_table_name(self):
139+
if self.table.schema:
140+
return '%s.temporary_transaction' % self.table.schema
141+
else:
142+
return 'temporary_transaction'
143+
130144
@property
131145
def version_table_name(self):
132146
version_table_name = self.version_table_name_format % self.table.name
@@ -345,6 +359,23 @@ def __str__(self):
345359
)
346360

347361

362+
class CreateTemporaryTransactionTableSQL(SQLConstruct):
363+
def __str__(self):
364+
return temporary_transaction_sql.format(
365+
temporary_transaction_table=self.temporary_transaction_table_name,
366+
transaction_table_columns='id BIGINT, PRIMARY KEY(id)'
367+
)
368+
369+
370+
class InsertTemporaryTransactionSQL(SQLConstruct):
371+
def __str__(self):
372+
return insert_temporary_transaction_sql.format(
373+
temporary_transaction_table=self.temporary_transaction_table_name,
374+
transaction_table_columns='id',
375+
transaction_values='transaction_id_value'
376+
)
377+
378+
348379
class CreateTriggerFunctionSQL(SQLConstruct):
349380
def __str__(self):
350381
args = self.copy_args()
@@ -362,6 +393,12 @@ def __str__(self):
362393
after_insert=after_insert,
363394
after_update=after_update,
364395
after_delete=after_delete,
396+
temporary_transaction_sql=(
397+
CreateTemporaryTransactionTableSQL(**args)
398+
),
399+
insert_temporary_transaction_sql=(
400+
InsertTemporaryTransactionSQL(**args)
401+
),
365402
upsert_insert=InsertUpsertSQL(**args),
366403
upsert_update=UpdateUpsertSQL(**args),
367404
upsert_delete=DeleteUpsertSQL(**args)

sqlalchemy_continuum/unit_of_work.py

+29-72
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from copy import copy
22

33
import sqlalchemy as sa
4+
from sqlalchemy.orm.attributes import set_committed_value
45
from sqlalchemy_utils import get_primary_keys, identity
56
from .operation import Operations
67
from .utils import (
@@ -11,52 +12,6 @@
1112
versioned_column_properties
1213
)
1314

14-
from sqlalchemy.dialects import postgresql
15-
from sqlalchemy.ext.compiler import compiles
16-
from sqlalchemy.orm.attributes import set_committed_value
17-
from sqlalchemy.schema import DDLElement
18-
from sqlalchemy.sql.expression import CTE, exists
19-
20-
21-
class Parenthesis(DDLElement):
22-
def __init__(self, element):
23-
self.element = element
24-
25-
def __getattr__(self, attr):
26-
return getattr(self.element, attr)
27-
28-
29-
@compiles(Parenthesis)
30-
def visit_alter_column(element, compiler, **kw):
31-
return '(%s)' % compiler.process(element.element)
32-
33-
34-
def select_or_insert(table, select_values, insert_values, select_criteria):
35-
criteria = [
36-
getattr(table.c, key) == sa.text(str(value))
37-
for key, value in select_criteria.items() if value is not None
38-
]
39-
40-
select = (
41-
sa.select(select_values, from_obj=table)
42-
.where(sa.and_(*criteria))
43-
)
44-
insert = (
45-
table.insert()
46-
.from_select(
47-
insert_values.keys(),
48-
sa.select(insert_values.values())
49-
.where(~ exists(select))
50-
)
51-
.returning(*map(sa.text, select_values))
52-
)
53-
insert_cte = CTE(
54-
Parenthesis(insert),
55-
name='new_row'
56-
)
57-
query = sa.select(select_values, from_obj=insert_cte).union(select)
58-
return query
59-
6015

6116
class UnitOfWork(object):
6217
def __init__(self, manager):
@@ -159,33 +114,35 @@ def create_transaction(self, session):
159114
self.current_transaction = Transaction()
160115

161116
if self.manager.options['native_versioning']:
162-
criteria = {'native_tx_id': sa.func.txid_current()}
163-
args.update(criteria)
164-
query = select_or_insert(
165-
table,
166-
['*'],
167-
args,
168-
criteria
169-
)
170-
query_string = str(query.compile(dialect=postgresql.dialect()))
171-
172-
values = session.execute(query_string).fetchone()
173-
for key, value in values.items():
174-
set_committed_value(self.current_transaction, key, value)
175-
176-
session.execute(
177-
'''
178-
CREATE TEMP TABLE IF NOT EXISTS continuum_temp_transaction
179-
(id BIGINT, PRIMARY KEY(id))
180-
ON COMMIT DROP
181-
'''
182-
)
183-
session.execute('''
184-
INSERT INTO continuum_temp_transaction (id)
185-
SELECT :id WHERE NOT EXISTS
186-
(SELECT 1 FROM continuum_temp_transaction WHERE id = :id)''',
187-
{'id': self.current_transaction.id}
117+
tx_id = (
118+
session.execute('SELECT id FROM temporary_transaction')
119+
.scalar()
188120
)
121+
if tx_id:
122+
set_committed_value(self.current_transaction, 'id', tx_id)
123+
else:
124+
criteria = {'native_tx_id': sa.func.txid_current()}
125+
args.update(criteria)
126+
127+
query = table.insert().values(**args)
128+
129+
values = session.execute(query).fetchone()
130+
for key, value in values.items():
131+
set_committed_value(self.current_transaction, key, value)
132+
133+
session.execute(
134+
'''
135+
CREATE TEMP TABLE temporary_transaction
136+
(id BIGINT, PRIMARY KEY(id))
137+
ON COMMIT DROP
138+
'''
139+
)
140+
session.execute('''
141+
INSERT INTO temporary_transaction (id)
142+
VALUES (:id)
143+
''',
144+
{'id': self.current_transaction.id}
145+
)
189146
self.merge_transaction(session, self.current_transaction)
190147
else:
191148
for key, value in args.items():

0 commit comments

Comments
 (0)