Skip to content
5 changes: 5 additions & 0 deletions src/psycopack/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from ._conn import get_db_connection
from ._cur import get_cursor
from ._introspect import BackfillBatch
from ._registry import RegistryException, UnexpectedSyncStrategy
from ._repack import (
BasePsycopackError,
CompositePrimaryKey,
Expand All @@ -26,6 +27,7 @@
TableIsEmpty,
UnsupportedPrimaryKey,
)
from ._sync_strategy import SyncStrategy
from ._tracker import FailureDueToLockTimeout, Stage


Expand All @@ -46,11 +48,14 @@
"PostBackfillBatchCallback",
"PrimaryKeyNotFound",
"ReferringForeignKeyInDifferentSchema",
"RegistryException",
"Psycopack",
"Stage",
"SyncStrategy",
"TableDoesNotExist",
"TableHasTriggers",
"TableIsEmpty",
"UnexpectedSyncStrategy",
"UnsupportedPrimaryKey",
"get_cursor",
"get_db_connection",
Expand Down
194 changes: 193 additions & 1 deletion src/psycopack/_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ def drop_trigger_if_exists(self, *, table: str, trigger: str) -> None:
.as_string(self.conn)
)

def create_copy_trigger(
def create_source_to_copy_trigger(
self,
trigger_name: str,
function: str,
Expand Down Expand Up @@ -252,6 +252,182 @@ def create_backfill_log(self, *, table: str) -> None:
.as_string(self.conn)
)

def create_change_log(self, *, src_table: str, change_log: str) -> None:
pk_info = self.introspector.get_primary_key_info(table=src_table)

# At this point tables without primary-keys or with composite keys have
# already been stopped of proceeding as they aren't supported by
# Psycopack.
assert pk_info is not None
assert len(pk_info.columns) == 1
assert len(pk_info.data_types) == 1

self.cur.execute(
psycopg.sql.SQL(
dedent("""
CREATE TABLE {schema}.{table} (
id BIGSERIAL PRIMARY KEY,
src_pk {data_type} UNIQUE NOT NULL
);
""")
)
.format(
table=psycopg.sql.Identifier(change_log),
schema=psycopg.sql.Identifier(self.schema),
data_type=psycopg.sql.SQL(pk_info.data_types[0]),
)
.as_string(self.conn)
)

def create_change_log_function(
self,
*,
function: str,
table_from: str,
table_to: str,
) -> None:
# Note: assumes the PK to be an integer-like type that can be
# automatically promoted to bigint.
# TODO: generalise for other PK types.
self.cur.execute(
psycopg.sql.SQL(
dedent("""
CREATE OR REPLACE FUNCTION {schema}.{function}(BIGINT)
RETURNS VOID AS $$

INSERT INTO {schema}.{table_to}
(src_pk) VALUES ($1)
ON CONFLICT DO NOTHING

$$ LANGUAGE SQL SECURITY DEFINER;
""")
)
.format(
function=psycopg.sql.Identifier(function),
table_from=psycopg.sql.Identifier(table_from),
table_to=psycopg.sql.Identifier(table_to),
schema=psycopg.sql.Identifier(self.schema),
)
.as_string(self.conn)
)

def create_change_log_copy_function(
self,
*,
function: str,
table_from: str,
table_to: str,
change_log: str,
columns: list[str],
pk_column: str,
) -> None:
# Note: assumes the PK to be an integer-like type that can be
# automatically promoted to bigint.
# TODO: generalise for other PK types.
self.cur.execute(
psycopg.sql.SQL(
dedent("""
CREATE OR REPLACE FUNCTION {schema}.{function}(VARIADIC pks BIGINT[])
RETURNS VOID
LANGUAGE plpgsql
SECURITY DEFINER
AS $$
BEGIN
-- Lock source rows
PERFORM 1
FROM {schema}.{table_from}
WHERE {pk_column} = ANY (pks)
FOR UPDATE;

-- Lock destination rows
PERFORM 1
FROM {schema}.{table_to}
WHERE {pk_column} = ANY (pks)
FOR UPDATE;

-- Lock change log rows
PERFORM 1
FROM {schema}.{change_log}
WHERE src_pk = ANY (pks)
FOR UPDATE;

-- Delete destination rows
DELETE FROM {schema}.{table_to}
WHERE {pk_column} = ANY (pks);

-- Delete change log rows
DELETE FROM {schema}.{change_log}
WHERE src_pk = ANY (pks);

-- Insert from source
INSERT INTO {schema}.{table_to}
OVERRIDING SYSTEM VALUE
SELECT {columns}
FROM {schema}.{table_from}
WHERE {pk_column} = ANY (pks);
END;
$$;
""")
)
.format(
function=psycopg.sql.Identifier(function),
table_from=psycopg.sql.Identifier(table_from),
table_to=psycopg.sql.Identifier(table_to),
change_log=psycopg.sql.Identifier(change_log),
schema=psycopg.sql.Identifier(self.schema),
columns=psycopg.sql.SQL(",").join(
[psycopg.sql.Identifier(c) for c in columns]
),
pk_column=psycopg.sql.Identifier(pk_column),
)
.as_string(self.conn)
)

def create_change_log_trigger(
self,
*,
trigger_name: str,
table_from: str,
table_to: str,
pk_column: str,
function: str,
) -> None:
self.cur.execute(
psycopg.sql.SQL(
dedent("""
CREATE OR REPLACE FUNCTION {schema}.{trigger_name}()
RETURNS TRIGGER AS
$$
BEGIN
IF ( TG_OP = 'INSERT') THEN
PERFORM {schema}.{function}(NEW.{pk_column});
RETURN NEW;
ELSIF ( TG_OP = 'UPDATE') THEN
PERFORM {schema}.{function}(NEW.{pk_column});
RETURN NEW;
ELSIF ( TG_OP = 'DELETE') THEN
PERFORM {schema}.{function}(OLD.{pk_column});
RETURN OLD;
END IF;
END;
$$ LANGUAGE PLPGSQL SECURITY DEFINER;

CREATE TRIGGER {trigger_name}
AFTER INSERT OR UPDATE OR DELETE ON {schema}.{table_from}
FOR EACH ROW EXECUTE PROCEDURE {schema}.{trigger_name}();
""")
)
.format(
trigger_name=psycopg.sql.Identifier(trigger_name),
function=psycopg.sql.Identifier(function),
table_from=psycopg.sql.Identifier(table_from),
table_to=psycopg.sql.Identifier(table_to),
pk_column=psycopg.sql.Identifier(pk_column),
schema=psycopg.sql.Identifier(self.schema),
)
.as_string(self.conn)
)

@contextmanager
def session_lock(self, *, name: str) -> Iterator[None]:
# Based on:
Expand Down Expand Up @@ -536,6 +712,22 @@ def execute_copy_function(
.as_string(self.conn)
)

def execute_change_log_copy_function(
self,
*,
function: str,
pks: list[int],
) -> None:
self.cur.execute(
psycopg.sql.SQL("SELECT {schema}.{function}({pks});")
.format(
function=psycopg.sql.Identifier(function),
schema=psycopg.sql.Identifier(self.schema),
pks=psycopg.sql.SQL(", ").join(map(psycopg.sql.Literal, pks)),
)
.as_string(self.conn)
)

def set_batch_to_finished(
self, *, table: str, batch: _introspect.BackfillBatch
) -> None:
Expand Down
38 changes: 38 additions & 0 deletions src/psycopack/_introspect.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,12 @@ class BackfillBatch:
end: int


@dataclasses.dataclass
class ChangeLogEntry:
id: int
src_pk: int


class Introspector:
def __init__(
self, *, conn: psycopg.Connection, cur: _cur.Cursor, schema: str
Expand Down Expand Up @@ -672,6 +678,38 @@ def get_backfill_batch(self, *, table: str) -> BackfillBatch | None:
return None
return BackfillBatch(id=result[0], start=result[1], end=result[2])

def get_change_log_batch(
self, *, table: str, batch_size: int
) -> list[ChangeLogEntry] | None:
self.cur.execute(
psycopg.sql.SQL(
dedent("""
SELECT
id,
src_pk
FROM
{schema}.{table}
FOR UPDATE SKIP LOCKED
LIMIT {batch_size};
""")
)
.format(
table=psycopg.sql.Identifier(table),
batch_size=psycopg.sql.Literal(batch_size),
schema=psycopg.sql.Identifier(self.schema),
)
.as_string(self.conn)
)
if not (results := self.cur.fetchall()):
return None
return [
ChangeLogEntry(
id=id,
src_pk=src_pk,
)
for id, src_pk in results
]

def get_user(self) -> str:
self.cur.execute(psycopg.sql.SQL("SELECT current_user;").as_string(self.conn))
result = self.cur.fetchone()
Expand Down
Loading
Loading