diff --git a/src/psycopack/__init__.py b/src/psycopack/__init__.py index d7c3608..ba8f022 100644 --- a/src/psycopack/__init__.py +++ b/src/psycopack/__init__.py @@ -5,6 +5,7 @@ from ._conn import get_db_connection from ._cur import get_cursor from ._introspect import BackfillBatch +from ._registry import RegistryException, UnexpectedSyncStrategy from ._repack import ( BasePsycopackError, CompositePrimaryKey, @@ -26,6 +27,7 @@ TableIsEmpty, UnsupportedPrimaryKey, ) +from ._sync_strategy import SyncStrategy from ._tracker import FailureDueToLockTimeout, Stage @@ -46,11 +48,14 @@ "PostBackfillBatchCallback", "PrimaryKeyNotFound", "ReferringForeignKeyInDifferentSchema", + "RegistryException", "Psycopack", "Stage", + "SyncStrategy", "TableDoesNotExist", "TableHasTriggers", "TableIsEmpty", + "UnexpectedSyncStrategy", "UnsupportedPrimaryKey", "get_cursor", "get_db_connection", diff --git a/src/psycopack/_commands.py b/src/psycopack/_commands.py index eec0e25..3d248ba 100644 --- a/src/psycopack/_commands.py +++ b/src/psycopack/_commands.py @@ -172,7 +172,7 @@ def drop_trigger_if_exists(self, *, table: str, trigger: str) -> None: .as_string(self.conn) ) - def create_copy_trigger( + def create_source_to_copy_trigger( self, trigger_name: str, function: str, @@ -252,6 +252,182 @@ def create_backfill_log(self, *, table: str) -> None: .as_string(self.conn) ) + def create_change_log(self, *, src_table: str, change_log: str) -> None: + pk_info = self.introspector.get_primary_key_info(table=src_table) + + # At this point tables without primary-keys or with composite keys have + # already been stopped of proceeding as they aren't supported by + # Psycopack. + assert pk_info is not None + assert len(pk_info.columns) == 1 + assert len(pk_info.data_types) == 1 + + self.cur.execute( + psycopg.sql.SQL( + dedent(""" + CREATE TABLE {schema}.{table} ( + id BIGSERIAL PRIMARY KEY, + src_pk {data_type} UNIQUE NOT NULL + ); + """) + ) + .format( + table=psycopg.sql.Identifier(change_log), + schema=psycopg.sql.Identifier(self.schema), + data_type=psycopg.sql.SQL(pk_info.data_types[0]), + ) + .as_string(self.conn) + ) + + def create_change_log_function( + self, + *, + function: str, + table_from: str, + table_to: str, + ) -> None: + # Note: assumes the PK to be an integer-like type that can be + # automatically promoted to bigint. + # TODO: generalise for other PK types. + self.cur.execute( + psycopg.sql.SQL( + dedent(""" + CREATE OR REPLACE FUNCTION {schema}.{function}(BIGINT) + RETURNS VOID AS $$ + + INSERT INTO {schema}.{table_to} + (src_pk) VALUES ($1) + ON CONFLICT DO NOTHING + + $$ LANGUAGE SQL SECURITY DEFINER; + """) + ) + .format( + function=psycopg.sql.Identifier(function), + table_from=psycopg.sql.Identifier(table_from), + table_to=psycopg.sql.Identifier(table_to), + schema=psycopg.sql.Identifier(self.schema), + ) + .as_string(self.conn) + ) + + def create_change_log_copy_function( + self, + *, + function: str, + table_from: str, + table_to: str, + change_log: str, + columns: list[str], + pk_column: str, + ) -> None: + # Note: assumes the PK to be an integer-like type that can be + # automatically promoted to bigint. + # TODO: generalise for other PK types. + self.cur.execute( + psycopg.sql.SQL( + dedent(""" + CREATE OR REPLACE FUNCTION {schema}.{function}(VARIADIC pks BIGINT[]) + RETURNS VOID + LANGUAGE plpgsql + SECURITY DEFINER + AS $$ + BEGIN + -- Lock source rows + PERFORM 1 + FROM {schema}.{table_from} + WHERE {pk_column} = ANY (pks) + FOR UPDATE; + + -- Lock destination rows + PERFORM 1 + FROM {schema}.{table_to} + WHERE {pk_column} = ANY (pks) + FOR UPDATE; + + -- Lock change log rows + PERFORM 1 + FROM {schema}.{change_log} + WHERE src_pk = ANY (pks) + FOR UPDATE; + + -- Delete destination rows + DELETE FROM {schema}.{table_to} + WHERE {pk_column} = ANY (pks); + + -- Delete change log rows + DELETE FROM {schema}.{change_log} + WHERE src_pk = ANY (pks); + + -- Insert from source + INSERT INTO {schema}.{table_to} + OVERRIDING SYSTEM VALUE + SELECT {columns} + FROM {schema}.{table_from} + WHERE {pk_column} = ANY (pks); + END; + $$; + """) + ) + .format( + function=psycopg.sql.Identifier(function), + table_from=psycopg.sql.Identifier(table_from), + table_to=psycopg.sql.Identifier(table_to), + change_log=psycopg.sql.Identifier(change_log), + schema=psycopg.sql.Identifier(self.schema), + columns=psycopg.sql.SQL(",").join( + [psycopg.sql.Identifier(c) for c in columns] + ), + pk_column=psycopg.sql.Identifier(pk_column), + ) + .as_string(self.conn) + ) + + def create_change_log_trigger( + self, + *, + trigger_name: str, + table_from: str, + table_to: str, + pk_column: str, + function: str, + ) -> None: + self.cur.execute( + psycopg.sql.SQL( + dedent(""" + CREATE OR REPLACE FUNCTION {schema}.{trigger_name}() + RETURNS TRIGGER AS + $$ + BEGIN + IF ( TG_OP = 'INSERT') THEN + PERFORM {schema}.{function}(NEW.{pk_column}); + RETURN NEW; + ELSIF ( TG_OP = 'UPDATE') THEN + PERFORM {schema}.{function}(NEW.{pk_column}); + RETURN NEW; + ELSIF ( TG_OP = 'DELETE') THEN + PERFORM {schema}.{function}(OLD.{pk_column}); + RETURN OLD; + END IF; + END; + $$ LANGUAGE PLPGSQL SECURITY DEFINER; + + CREATE TRIGGER {trigger_name} + AFTER INSERT OR UPDATE OR DELETE ON {schema}.{table_from} + FOR EACH ROW EXECUTE PROCEDURE {schema}.{trigger_name}(); + """) + ) + .format( + trigger_name=psycopg.sql.Identifier(trigger_name), + function=psycopg.sql.Identifier(function), + table_from=psycopg.sql.Identifier(table_from), + table_to=psycopg.sql.Identifier(table_to), + pk_column=psycopg.sql.Identifier(pk_column), + schema=psycopg.sql.Identifier(self.schema), + ) + .as_string(self.conn) + ) + @contextmanager def session_lock(self, *, name: str) -> Iterator[None]: # Based on: @@ -536,6 +712,22 @@ def execute_copy_function( .as_string(self.conn) ) + def execute_change_log_copy_function( + self, + *, + function: str, + pks: list[int], + ) -> None: + self.cur.execute( + psycopg.sql.SQL("SELECT {schema}.{function}({pks});") + .format( + function=psycopg.sql.Identifier(function), + schema=psycopg.sql.Identifier(self.schema), + pks=psycopg.sql.SQL(", ").join(map(psycopg.sql.Literal, pks)), + ) + .as_string(self.conn) + ) + def set_batch_to_finished( self, *, table: str, batch: _introspect.BackfillBatch ) -> None: diff --git a/src/psycopack/_introspect.py b/src/psycopack/_introspect.py index 109acf1..9128794 100644 --- a/src/psycopack/_introspect.py +++ b/src/psycopack/_introspect.py @@ -61,6 +61,12 @@ class BackfillBatch: end: int +@dataclasses.dataclass +class ChangeLogEntry: + id: int + src_pk: int + + class Introspector: def __init__( self, *, conn: psycopg.Connection, cur: _cur.Cursor, schema: str @@ -672,6 +678,38 @@ def get_backfill_batch(self, *, table: str) -> BackfillBatch | None: return None return BackfillBatch(id=result[0], start=result[1], end=result[2]) + def get_change_log_batch( + self, *, table: str, batch_size: int + ) -> list[ChangeLogEntry] | None: + self.cur.execute( + psycopg.sql.SQL( + dedent(""" + SELECT + id, + src_pk + FROM + {schema}.{table} + FOR UPDATE SKIP LOCKED + LIMIT {batch_size}; + """) + ) + .format( + table=psycopg.sql.Identifier(table), + batch_size=psycopg.sql.Literal(batch_size), + schema=psycopg.sql.Identifier(self.schema), + ) + .as_string(self.conn) + ) + if not (results := self.cur.fetchall()): + return None + return [ + ChangeLogEntry( + id=id, + src_pk=src_pk, + ) + for id, src_pk in results + ] + def get_user(self) -> str: self.cur.execute(psycopg.sql.SQL("SELECT current_user;").as_string(self.conn)) result = self.cur.fetchone() diff --git a/src/psycopack/_registry.py b/src/psycopack/_registry.py index 294b84d..dfc8e24 100644 --- a/src/psycopack/_registry.py +++ b/src/psycopack/_registry.py @@ -1,10 +1,18 @@ import dataclasses from textwrap import dedent -from . import _commands, _const, _cur, _introspect +from . import _commands, _const, _cur, _introspect, _sync_strategy from . import _psycopg as psycopg +class RegistryException(Exception): + pass + + +class UnexpectedSyncStrategy(RegistryException): + pass + + @dataclasses.dataclass class RegistryRow: original_table: str @@ -16,6 +24,11 @@ class RegistryRow: repacked_name: str repacked_function: str repacked_trigger: str + sync_strategy: _sync_strategy.SyncStrategy + change_log_trigger: str | None + change_log_function: str | None + change_log_copy_function: str | None + change_log: str | None class Registry: @@ -28,16 +41,19 @@ def __init__( command: _commands.Command, schema: str, table: str, + sync_strategy: _sync_strategy.SyncStrategy, ) -> None: self.conn = conn self.cur = cur self.introspector = introspector self.schema = schema self.table = table + self.sync_strategy = sync_strategy def get_registry_row(self) -> RegistryRow: row = self._get_row_from_registry_table() if row: + self._validate_row(row=row) return row oid = self.introspector.get_table_oid(table=self.table) @@ -57,6 +73,18 @@ def get_registry_row(self) -> RegistryRow: repacked_trigger = trigger.replace( _const.NAME_PREFIX, _const.REPACKED_NAME_PREFIX ) + + if self.sync_strategy == _sync_strategy.SyncStrategy.CHANGE_LOG: + change_log = f"{copy_table}_change_log" + change_log_trigger = f"{change_log}_tgr" + change_log_function = f"{change_log}_fun" + change_log_copy_function = f"{change_log}_copy_fun" + else: + change_log = None + change_log_trigger = None + change_log_function = None + change_log_copy_function = None + row = RegistryRow( original_table=self.table, copy_table=copy_table, @@ -67,6 +95,11 @@ def get_registry_row(self) -> RegistryRow: repacked_name=repacked_name, repacked_function=repacked_function, repacked_trigger=repacked_trigger, + sync_strategy=self.sync_strategy, + change_log_trigger=change_log_trigger, + change_log_function=change_log_function, + change_log_copy_function=change_log_copy_function, + change_log=change_log, ) self._insert_row_into_registry(row=row) return row @@ -85,7 +118,12 @@ def _get_row_from_registry_table(self) -> RegistryRow | None: backfill_log, repacked_name, repacked_function, - repacked_trigger + repacked_trigger, + sync_strategy, + change_log_trigger, + change_log_function, + change_log_copy_function, + change_log FROM {schema}.{registry_table} WHERE @@ -113,13 +151,80 @@ def _get_row_from_registry_table(self) -> RegistryRow | None: repacked_name=result[6], repacked_function=result[7], repacked_trigger=result[8], + sync_strategy=_sync_strategy.SyncStrategy[result[9]], + change_log_trigger=result[10], + change_log_function=result[11], + change_log_copy_function=result[12], + change_log=result[13], ) def _ensure_registry_table_exists(self) -> None: if self._registry_table_exists(): + self._ensure_registry_has_sync_strategy_updates() return self._create_registry_table() + def _ensure_registry_has_sync_strategy_updates(self) -> None: + """ + The registry table was changed to accomodate the sync strategy upgrade. + The upgrade added the following columns: + - sync_strategy + - change_log (optional, only for CHANGE_LOG strategy). + - change_log_trigger (optional, only for CHANGE_LOG strategy). + - change_log_function (optional, only for CHANGE_LOG strategy). + - change_log_copy_function (optional, only for CHANGE_LOG strategy). + + However, given that some users of Psycopack already had the old table, + they need to be migrated to the new Registry table schema. + """ + columns = self.introspector.get_table_columns(table=_const.PSYCOPACK_REGISTRY) + if "sync_strategy" in columns: + return + + self.cur.execute( + psycopg.sql.SQL( + dedent(""" + ALTER TABLE + {schema}.{registry_table} + ADD COLUMN + sync_strategy VARCHAR(32) DEFAULT 'DIRECT_TRIGGER' NOT NULL, + ADD COLUMN + change_log_trigger VARCHAR(63) UNIQUE, + ADD COLUMN + change_log_function VARCHAR(63) UNIQUE, + ADD COLUMN + change_log_copy_function VARCHAR(63) UNIQUE, + ADD COLUMN + change_log VARCHAR(63) UNIQUE + ; + """) + ) + .format( + registry_table=psycopg.sql.Identifier(_const.PSYCOPACK_REGISTRY), + schema=psycopg.sql.Identifier(self.schema), + ) + .as_string(self.conn) + ) + # The DEFAULT above was set to virtually backfill existing rows in the + # registry table. That is not necessary going forward as the Python + # code should always set a value for sync_strategy. + self.cur.execute( + psycopg.sql.SQL( + dedent(""" + ALTER TABLE + {schema}.{registry_table} + ALTER COLUMN + sync_strategy DROP DEFAULT + ; + """) + ) + .format( + registry_table=psycopg.sql.Identifier(_const.PSYCOPACK_REGISTRY), + schema=psycopg.sql.Identifier(self.schema), + ) + .as_string(self.conn) + ) + def _registry_table_exists(self) -> bool: return bool(self.introspector.get_table_oid(table=_const.PSYCOPACK_REGISTRY)) @@ -140,7 +245,12 @@ def _create_registry_table(self) -> None: backfill_log VARCHAR(63) NOT NULL UNIQUE, repacked_name VARCHAR(63) NOT NULL UNIQUE, repacked_function VARCHAR(63) NOT NULL UNIQUE, - repacked_trigger VARCHAR(63) NOT NULL UNIQUE + repacked_trigger VARCHAR(63) NOT NULL UNIQUE, + sync_strategy VARCHAR(32) NOT NULL, + change_log_trigger VARCHAR(63) UNIQUE, + change_log_function VARCHAR(63) UNIQUE, + change_log_copy_function VARCHAR(63) UNIQUE, + change_log VARCHAR(63) UNIQUE ); """) ) @@ -166,7 +276,12 @@ def _insert_row_into_registry(self, row: RegistryRow) -> None: backfill_log, repacked_name, repacked_function, - repacked_trigger + repacked_trigger, + sync_strategy, + change_log_trigger, + change_log_function, + change_log_copy_function, + change_log ) VALUES ( @@ -178,7 +293,12 @@ def _insert_row_into_registry(self, row: RegistryRow) -> None: {backfill_log}, {repacked_name}, {repacked_function}, - {repacked_trigger} + {repacked_trigger}, + {sync_strategy}, + {change_log_trigger}, + {change_log_function}, + {change_log_copy_function}, + {change_log} ); """) ) @@ -194,6 +314,13 @@ def _insert_row_into_registry(self, row: RegistryRow) -> None: repacked_name=psycopg.sql.Literal(row.repacked_name), repacked_function=psycopg.sql.Literal(row.repacked_function), repacked_trigger=psycopg.sql.Literal(row.repacked_trigger), + sync_strategy=psycopg.sql.Literal(row.sync_strategy.value), + change_log_trigger=psycopg.sql.Literal(row.change_log_trigger), + change_log_function=psycopg.sql.Literal(row.change_log_function), + change_log_copy_function=psycopg.sql.Literal( + row.change_log_copy_function + ), + change_log=psycopg.sql.Literal(row.change_log), ) .as_string(self.conn) ) @@ -215,3 +342,12 @@ def delete_row_for(self, *, table: str) -> None: ) .as_string(self.conn) ) + + def _validate_row(self, *, row: RegistryRow) -> None: + if row.sync_strategy != self.sync_strategy: + raise UnexpectedSyncStrategy( + f"Psycopack was instantiated with sync_strategy: {self.sync_strategy}, " + f"but a process already exists with a different sync_strategy " + f"({row.sync_strategy}). " + f"Either finish the existing process, or reset it via .reset()" + ) diff --git a/src/psycopack/_repack.py b/src/psycopack/_repack.py index c512b78..1b7d24b 100644 --- a/src/psycopack/_repack.py +++ b/src/psycopack/_repack.py @@ -7,7 +7,15 @@ import typing from collections import defaultdict -from . import _commands, _cur, _identifiers, _introspect, _registry, _tracker +from . import ( + _commands, + _cur, + _identifiers, + _introspect, + _registry, + _sync_strategy, + _tracker, +) from . import _psycopg as psycopg @@ -139,6 +147,8 @@ def __init__( schema: str = "public", allow_empty: bool = False, skip_permissions_check: bool = False, + sync_strategy: _sync_strategy.SyncStrategy = _sync_strategy.SyncStrategy.DIRECT_TRIGGER, + change_log_batch_size: int | None = None, ) -> None: self.conn = conn self.cur = cur @@ -165,6 +175,7 @@ def __init__( self.lock_timeout = lock_timeout self.convert_pk_to_bigint = convert_pk_to_bigint self.allow_empty = allow_empty + self.sync_strategy = sync_strategy # Names for psycopack objects are stored in the Registry self.registry = _registry.Registry( @@ -174,6 +185,7 @@ def __init__( introspector=self.introspector, command=self.command, table=table, + sync_strategy=sync_strategy, ) registry_row = self.registry.get_registry_row() self.copy_table = registry_row.copy_table @@ -185,6 +197,12 @@ def __init__( self.repacked_name = registry_row.repacked_name self.repacked_function = registry_row.repacked_function self.repacked_trigger = registry_row.repacked_trigger + # Names specific to the CHANGE_LOG strategy + self.change_log = registry_row.change_log + self.change_log_trigger = registry_row.change_log_trigger + self.change_log_function = registry_row.change_log_function + self.change_log_copy_function = registry_row.change_log_copy_function + self.change_log_batch_size = change_log_batch_size self.tracker = _tracker.Tracker( table=self.table, @@ -195,6 +213,9 @@ def __init__( backfill_log=self.backfill_log, repacked_name=self.repacked_name, repacked_trigger=self.repacked_trigger, + change_log=self.change_log, + change_log_trigger=self.change_log_trigger, + sync_strategy=self.sync_strategy, introspector=self.introspector, command=self.command, schema=schema, @@ -225,6 +246,7 @@ def full(self) -> None: self.setup_repacking() self.backfill() self.sync_schemas() + self.post_sync_update() self.swap() self.clean_up() @@ -232,17 +254,25 @@ def full(self) -> None: self.setup_repacking() self.backfill() self.sync_schemas() + self.post_sync_update() self.swap() self.clean_up() if stage == _tracker.Stage.BACKFILL: self.backfill() self.sync_schemas() + self.post_sync_update() self.swap() self.clean_up() if stage == _tracker.Stage.SYNC_SCHEMAS: self.sync_schemas() + self.post_sync_update() + self.swap() + self.clean_up() + + if stage == _tracker.Stage.POST_SYNC_UPDATE: + self.post_sync_update() self.swap() self.clean_up() @@ -359,23 +389,86 @@ def setup_repacking(self) -> None: ): self._create_copy_table() self._create_copy_function() - self._create_copy_trigger() self._create_backfill_log() self._populate_backfill_log() + if self.sync_strategy == _sync_strategy.SyncStrategy.CHANGE_LOG: + self._create_change_log() + self._create_change_log_function() + self._create_change_log_trigger() + self._create_change_log_copy_function() + elif self.sync_strategy == _sync_strategy.SyncStrategy.DIRECT_TRIGGER: + self._create_source_to_copy_table_trigger() + def backfill(self) -> None: with self.tracker.track(_tracker.Stage.BACKFILL): self._perform_backfill() def sync_schemas(self) -> None: with self.tracker.track(_tracker.Stage.SYNC_SCHEMAS): - self._create_indexes() + self._create_indexes( + concurrently=bool( + self.sync_strategy != _sync_strategy.SyncStrategy.CHANGE_LOG + ) + ) with self.command.lock_timeout(self.lock_timeout): self._create_unique_constraints() self._create_check_and_fk_constraints() self._create_referring_fks() self.command.analyze(table=self.table) + if self.sync_strategy == _sync_strategy.SyncStrategy.CHANGE_LOG: + # Schema is synced. Rely on the src-to-copy trigger from + # now on and remove the change log trigger to avoid making + # the change log grow indefinetely. + with self.command.db_transaction(): + self.command.acquire_access_exclusive_lock(table=self.table) + self.command.acquire_access_exclusive_lock( + table=self.copy_table + ) + assert self.change_log is not None + self.command.acquire_access_exclusive_lock( + table=self.change_log + ) + self._create_source_to_copy_table_trigger() + assert self.change_log_trigger is not None + self.command.drop_trigger_if_exists( + table=self.table, trigger=self.change_log_trigger + ) + assert self.change_log_function is not None + self.command.drop_function_if_exists( + function=self.change_log_function + ) + + def post_sync_update(self) -> None: + with self.tracker.track(_tracker.Stage.POST_SYNC_UPDATE): + if self.sync_strategy == _sync_strategy.SyncStrategy.DIRECT_TRIGGER: + return + elif self.sync_strategy == _sync_strategy.SyncStrategy.CHANGE_LOG: + return self._post_sync_update_for_change_log() + else: + raise NotImplementedError + + def _post_sync_update_for_change_log(self) -> None: + assert self.change_log is not None + assert self.change_log_batch_size is not None + assert self.change_log_copy_function is not None + + while True: + with self.command.db_transaction(): + change_log_batch = self.introspector.get_change_log_batch( + table=self.change_log, + batch_size=self.change_log_batch_size, + ) + if not change_log_batch: + # No batches available to process at present. + break + + self.command.execute_change_log_copy_function( + function=self.change_log_copy_function, + pks=[change.src_pk for change in change_log_batch], + ) + def swap(self) -> None: """ 1. Drop the trigger (and function) that kept the copy table in sync. @@ -420,7 +513,7 @@ def swap(self) -> None: self.command.drop_trigger_if_exists( table=self.table, trigger=self.repacked_trigger ) - self.command.create_copy_trigger( + self.command.create_source_to_copy_trigger( trigger_name=self.repacked_trigger, function=self.repacked_function, table_from=self.table, @@ -467,7 +560,7 @@ def revert_swap(self) -> None: table_from=self.repacked_name, table_to=self.table ) self._create_copy_function() - self._create_copy_trigger() + self._create_source_to_copy_table_trigger() def clean_up(self) -> None: with self.tracker.track(_tracker.Stage.CLEAN_UP): @@ -539,6 +632,13 @@ def clean_up(self) -> None: self.command.drop_table_if_exists(table=self.backfill_log) self.registry.delete_row_for(table=self.table) + if self.sync_strategy == _sync_strategy.SyncStrategy.CHANGE_LOG: + # The change log trigger and function have already been + # dropped during the schema sync stage. The table is the + # only artefact remaining. + assert self.change_log is not None + self.command.drop_table_if_exists(table=self.change_log) + def reset(self) -> None: current_stage = self.tracker.get_current_stage() if current_stage == _tracker.Stage.PRE_VALIDATION: @@ -567,6 +667,18 @@ def reset(self) -> None: self.command.drop_sequence_if_exists(seq=self.id_seq) self.command.drop_table_if_exists(table=self.tracker.tracker_table) + if self.sync_strategy == _sync_strategy.SyncStrategy.CHANGE_LOG: + # Drop all remaining artefacts associated with the change log + # table. + assert self.change_log is not None + assert self.change_log_trigger is not None + assert self.change_log_function is not None + self.command.drop_table_if_exists(table=self.change_log) + self.command.drop_trigger_if_exists( + table=self.table, trigger=self.change_log_trigger + ) + self.command.drop_function_if_exists(function=self.change_log_function) + def _create_copy_table(self) -> None: # Checks if other relating objects have FKs pointing to the copy table # first. Deletes them (if any) as they might have been created by a @@ -649,9 +761,9 @@ def _create_copy_function(self) -> None: pk_column=self.pk_column, ) - def _create_copy_trigger(self) -> None: + def _create_source_to_copy_table_trigger(self) -> None: self.command.drop_trigger_if_exists(table=self.table, trigger=self.trigger) - self.command.create_copy_trigger( + self.command.create_source_to_copy_trigger( trigger_name=self.trigger, function=self.function, table_from=self.table, @@ -663,6 +775,51 @@ def _create_backfill_log(self) -> None: self.command.drop_table_if_exists(table=self.backfill_log) self.command.create_backfill_log(table=self.backfill_log) + def _create_change_log(self) -> None: + assert self.change_log is not None + self.command.drop_table_if_exists(table=self.change_log) + self.command.create_change_log(src_table=self.table, change_log=self.change_log) + + def _create_change_log_function(self) -> None: + assert self.change_log_function is not None + assert self.change_log is not None + + self.command.drop_function_if_exists(function=self.change_log_function) + self.command.create_change_log_function( + function=self.change_log_function, + table_from=self.table, + table_to=self.change_log, + ) + + def _create_change_log_copy_function(self) -> None: + assert self.change_log_copy_function is not None + assert self.change_log is not None + + self.command.drop_function_if_exists(function=self.change_log_copy_function) + self.command.create_change_log_copy_function( + function=self.change_log_copy_function, + table_from=self.table, + table_to=self.copy_table, + pk_column=self.pk_column, + change_log=self.change_log, + columns=self.introspector.get_table_columns(table=self.table), + ) + + def _create_change_log_trigger(self) -> None: + assert self.change_log_trigger is not None + assert self.change_log_function is not None + + self.command.drop_trigger_if_exists( + table=self.table, trigger=self.change_log_trigger + ) + self.command.create_change_log_trigger( + trigger_name=self.change_log_trigger, + function=self.change_log_function, + table_from=self.table, + table_to=self.copy_table, + pk_column=self.pk_column, + ) + def _populate_backfill_log(self) -> None: # positive pk values min_and_max = self.introspector.get_min_and_max_pk( @@ -709,7 +866,7 @@ def _perform_backfill(self) -> None: if self.post_backfill_batch_callback: self.post_backfill_batch_callback(batch) - def _create_indexes(self) -> None: + def _create_indexes(self, concurrently: bool) -> None: # Start by checking if there are any invalid indexes already created # due to a previous Psycopack run that failed midway through and delete # them. This excludes the primary index and exclusion constraints which @@ -738,13 +895,24 @@ def _create_indexes(self) -> None: for index in indexes: name = index.name sql = index.definition - sql = sql.replace( - "CREATE INDEX", "CREATE INDEX CONCURRENTLY IF NOT EXISTS" - ) - sql = sql.replace( - "CREATE UNIQUE INDEX", - "CREATE UNIQUE INDEX CONCURRENTLY IF NOT EXISTS", - ) + if concurrently: + sql = sql.replace( + "CREATE INDEX", "CREATE INDEX CONCURRENTLY IF NOT EXISTS" + ) + else: + sql = sql.replace("CREATE INDEX", "CREATE INDEX IF NOT EXISTS") + + if concurrently: + sql = sql.replace( + "CREATE UNIQUE INDEX", + "CREATE UNIQUE INDEX CONCURRENTLY IF NOT EXISTS", + ) + else: + sql = sql.replace( + "CREATE UNIQUE INDEX", + "CREATE UNIQUE INDEX IF NOT EXISTS", + ) + sql_arr = sql.split(" ON") new_name = _identifiers.build_postgres_identifier([name], "psycopack") sql_arr[0] = sql_arr[0].replace(name, new_name) diff --git a/src/psycopack/_sync_strategy.py b/src/psycopack/_sync_strategy.py new file mode 100644 index 0000000..77088ac --- /dev/null +++ b/src/psycopack/_sync_strategy.py @@ -0,0 +1,13 @@ +import enum + + +class SyncStrategy(enum.Enum): + # The DIRECT_TRIGGER strategy uses a trigger to sync data between the + # source table and the copy table. As a consequence, the schema + # synchronisation is done CONCURRENTLY. + DIRECT_TRIGGER = "DIRECT_TRIGGER" + # The CHANGE_LOG strategy uses a trigger to push data from the source + # table onto a table that logs the row changes during/after backfilling. In + # this case, the schema synchronisation can be done without CONCURRENT DDLs + # being necessary. + CHANGE_LOG = "CHANGE_LOG" diff --git a/src/psycopack/_tracker.py b/src/psycopack/_tracker.py index 42349b4..020f432 100644 --- a/src/psycopack/_tracker.py +++ b/src/psycopack/_tracker.py @@ -4,7 +4,7 @@ from textwrap import dedent from typing import Iterator -from . import _commands, _const, _cur, _introspect +from . import _commands, _const, _cur, _introspect, _sync_strategy from . import _psycopg as psycopg @@ -59,8 +59,9 @@ class Stage(enum.Enum): SETUP = StageInfo(name="SETUP", step=2) BACKFILL = StageInfo(name="BACKFILL", step=3) SYNC_SCHEMAS = StageInfo(name="SYNC_SCHEMAS", step=4) - SWAP = StageInfo(name="SWAP", step=5) - CLEAN_UP = StageInfo(name="CLEAN_UP", step=6) + POST_SYNC_UPDATE = StageInfo(name="POST_SYNC_UPDATE", step=5) + SWAP = StageInfo(name="SWAP", step=6) + CLEAN_UP = StageInfo(name="CLEAN_UP", step=7) class Tracker: @@ -87,6 +88,9 @@ def __init__( backfill_log: str, repacked_name: str, repacked_trigger: str, + change_log: str | None, + change_log_trigger: str | None, + sync_strategy: _sync_strategy.SyncStrategy, introspector: _introspect.Introspector, command: _commands.Command, schema: str, @@ -103,6 +107,9 @@ def __init__( self.backfill_log = backfill_log self.repacked_name = repacked_name self.repacked_trigger = repacked_trigger + self.change_log = change_log + self.change_log_trigger = change_log_trigger + self.sync_strategy = sync_strategy self.tracker_table = self._get_tracker_table_name() self.tracker_lock = f"{self.tracker_table}_lock" @@ -170,23 +177,55 @@ def _validate_stage_dependencies(self, *, stage: Stage) -> None: deleting a table or a trigger that needs to be used by the passed in stage. """ - table_dependencies = { - Stage.SETUP: [self.table], - Stage.BACKFILL: [self.table, self.copy_table, self.backfill_log], - Stage.SYNC_SCHEMAS: [self.table, self.copy_table], - Stage.SWAP: [self.table, self.copy_table], - Stage.CLEAN_UP: [self.repacked_name, self.table], - } + if self.sync_strategy == _sync_strategy.SyncStrategy.DIRECT_TRIGGER: + table_dependencies = { + Stage.SETUP: [self.table], + Stage.BACKFILL: [self.table, self.copy_table, self.backfill_log], + Stage.SYNC_SCHEMAS: [self.table, self.copy_table], + Stage.POST_SYNC_UPDATE: [self.table, self.copy_table], + Stage.SWAP: [self.table, self.copy_table], + Stage.CLEAN_UP: [self.repacked_name, self.table], + } + elif self.sync_strategy == _sync_strategy.SyncStrategy.CHANGE_LOG: + assert self.change_log is not None + table_dependencies = { + Stage.SETUP: [self.table], + Stage.BACKFILL: [ + self.table, + self.copy_table, + self.backfill_log, + self.change_log, + ], + Stage.SYNC_SCHEMAS: [self.table, self.copy_table, self.change_log], + Stage.POST_SYNC_UPDATE: [self.table, self.copy_table, self.change_log], + Stage.SWAP: [self.table, self.copy_table], + Stage.CLEAN_UP: [self.repacked_name, self.table], + } + else: + raise NotImplementedError + if stage in table_dependencies: for table in table_dependencies[stage]: self._validate_table_exists(table=table, stage=stage) - trigger_dependencies = { - Stage.BACKFILL: [self.trigger], - Stage.SYNC_SCHEMAS: [self.trigger], - Stage.SWAP: [self.trigger], - Stage.CLEAN_UP: [self.repacked_trigger], - } + if self.sync_strategy == _sync_strategy.SyncStrategy.DIRECT_TRIGGER: + trigger_dependencies = { + Stage.BACKFILL: [self.trigger], + Stage.SYNC_SCHEMAS: [self.trigger], + Stage.SWAP: [self.trigger], + Stage.CLEAN_UP: [self.repacked_trigger], + } + elif self.sync_strategy == _sync_strategy.SyncStrategy.CHANGE_LOG: + assert self.change_log_trigger is not None + trigger_dependencies = { + Stage.BACKFILL: [self.change_log_trigger], + Stage.SYNC_SCHEMAS: [self.change_log_trigger], + Stage.SWAP: [self.trigger], + Stage.CLEAN_UP: [self.repacked_trigger], + } + else: + raise NotImplementedError + if stage in trigger_dependencies: for trigger in trigger_dependencies[stage]: self._validate_trigger_exists(trigger=trigger, stage=stage) @@ -203,6 +242,9 @@ def _finish_stage(self, *, stage: Stage) -> None: self._set_stage(stage_from=stage, stage_to=Stage.SYNC_SCHEMAS) return case Stage.SYNC_SCHEMAS: + self._set_stage(stage_from=stage, stage_to=Stage.POST_SYNC_UPDATE) + return + case Stage.POST_SYNC_UPDATE: self._set_stage(stage_from=stage, stage_to=Stage.SWAP) return case Stage.SWAP: diff --git a/tests/test_repack.py b/tests/test_repack.py index 2ebbe7a..30aaf83 100644 --- a/tests/test_repack.py +++ b/tests/test_repack.py @@ -21,9 +21,11 @@ PrimaryKeyNotFound, Psycopack, ReferringForeignKeyInDifferentSchema, + SyncStrategy, TableDoesNotExist, TableHasTriggers, TableIsEmpty, + UnexpectedSyncStrategy, UnsupportedPrimaryKey, _const, _cur, @@ -82,15 +84,27 @@ def _collect_table_info( class _TriggerInfo: trigger_exists: bool repacked_trigger_exists: bool + change_log_trigger_exists: bool def _get_trigger_info(repack: Psycopack, cur: _cur.Cursor) -> _TriggerInfo: cur.execute(f"SELECT 1 FROM pg_trigger WHERE tgname = '{repack.trigger}'") trigger_exists = cur.fetchone() is not None cur.execute(f"SELECT 1 FROM pg_trigger WHERE tgname = '{repack.repacked_trigger}'") + repacked_trigger_exists = cur.fetchone() is not None + if repack.change_log is not None: + cur.execute( + f"SELECT 1 FROM pg_trigger WHERE tgname = '{repack.change_log_trigger}'" + ) + change_log_trigger_exists = cur.fetchone() is not None + else: + change_log_trigger_exists = False + repacked_trigger_exists = cur.fetchone() is not None return _TriggerInfo( - trigger_exists=trigger_exists, repacked_trigger_exists=repacked_trigger_exists + trigger_exists=trigger_exists, + repacked_trigger_exists=repacked_trigger_exists, + change_log_trigger_exists=change_log_trigger_exists, ) @@ -98,6 +112,7 @@ def _get_trigger_info(repack: Psycopack, cur: _cur.Cursor) -> _TriggerInfo: class _FunctionInfo: function_exists: bool repacked_function_exists: bool + change_log_function_exists: bool def _get_function_info(repack: Psycopack, cur: _cur.Cursor) -> _FunctionInfo: @@ -105,9 +120,17 @@ def _get_function_info(repack: Psycopack, cur: _cur.Cursor) -> _FunctionInfo: function_exists = cur.fetchone() is not None cur.execute(f"SELECT 1 FROM pg_proc WHERE proname = '{repack.repacked_function}'") repacked_function_exists = cur.fetchone() is not None + if repack.change_log_function is not None: + cur.execute( + f"SELECT 1 FROM pg_proc WHERE proname = '{repack.change_log_function}'" + ) + change_log_function_exists = cur.fetchone() is not None + else: + change_log_function_exists = False return _FunctionInfo( function_exists=function_exists, repacked_function_exists=repacked_function_exists, + change_log_function_exists=change_log_function_exists, ) @@ -160,12 +183,21 @@ def _assert_repack( def _assert_reset(repack: Psycopack, cur: _cur.Cursor) -> None: - assert _get_trigger_info(repack, cur).trigger_exists is False - assert _get_function_info(repack, cur).function_exists is False + trigger_info = _get_trigger_info(repack, cur) + assert trigger_info.trigger_exists is False + + function_info = _get_function_info(repack, cur) + assert function_info.function_exists is False assert _get_sequence_info(repack, cur).sequence_exists is False assert repack.introspector.get_table_oid(table=repack.copy_table) is None assert repack.introspector.get_table_oid(table=repack.tracker.tracker_table) is None + if repack.sync_strategy == SyncStrategy.CHANGE_LOG: + assert trigger_info.change_log_trigger_exists is False + assert function_info.change_log_function_exists is False + assert repack.change_log is not None + assert repack.introspector.get_table_oid(table=repack.change_log) is None + def _do_writes( table: str, @@ -531,8 +563,8 @@ def test_repack_full_after_sync_schemas_called(connection: _psycopg.Connection) repack.setup_repacking() repack.backfill() repack.sync_schemas() - # Sync schemas finished. Next stage is swap. - assert repack.tracker.get_current_stage() == _tracker.Stage.SWAP + # Sync schemas finished. Next stage is post sync update. + assert repack.tracker.get_current_stage() == _tracker.Stage.POST_SYNC_UPDATE repack.full() table_after = _collect_table_info(table="to_repack", connection=connection) _assert_repack( @@ -570,6 +602,7 @@ def test_repack_full_after_swap_called(connection: _psycopg.Connection) -> None: repack.setup_repacking() repack.backfill() repack.sync_schemas() + repack.post_sync_update() repack.swap() # Swap finished. Next stage is clean up. assert repack.tracker.get_current_stage() == _tracker.Stage.CLEAN_UP @@ -606,6 +639,7 @@ def test_clean_up_finishes_the_repacking(connection: _psycopg.Connection) -> Non repack.setup_repacking() repack.backfill() repack.sync_schemas() + repack.post_sync_update() repack.swap() repack.clean_up() table_after = _collect_table_info(table="to_repack", connection=connection) @@ -741,7 +775,7 @@ def test_when_tracker_removed_after_sync_schemas( repack.setup_repacking() repack.backfill() repack.sync_schemas() - assert repack.tracker.get_current_stage() == _tracker.Stage.SWAP + assert repack.tracker.get_current_stage() == _tracker.Stage.POST_SYNC_UPDATE # Deleting the tracker table will remove the ability to pick up the # repacking process where it left. But nonetheless, it should resume @@ -841,6 +875,75 @@ def test_when_rogue_row_inserted_in_tracker_table( repack.full() +def test_registry_table_sync_strategy_upgrade(connection: _psycopg.Connection) -> None: + with _cur.get_cursor(connection, logged=True) as cur: + factories.create_table_for_repacking( + connection=connection, + cur=cur, + table_name="to_repack", + rows=100, + ) + # Create the Registry table with the OLD schema manually. + cur.execute( + dedent(f""" + CREATE TABLE public.{_const.PSYCOPACK_REGISTRY} ( + original_table VARCHAR(63) NOT NULL UNIQUE, + copy_table VARCHAR(63) NOT NULL UNIQUE, + id_seq VARCHAR(63) NOT NULL UNIQUE, + function VARCHAR(63) NOT NULL UNIQUE, + trigger VARCHAR(63) NOT NULL UNIQUE, + backfill_log VARCHAR(63) NOT NULL UNIQUE, + repacked_name VARCHAR(63) NOT NULL UNIQUE, + repacked_function VARCHAR(63) NOT NULL UNIQUE, + repacked_trigger VARCHAR(63) NOT NULL UNIQUE + ); + """) + ) + + # Update is checked upon initialisation. + repack = Psycopack( + table="to_repack", + batch_size=1, + conn=connection, + cur=cur, + ) + columns = repack.introspector.get_table_columns(table=_const.PSYCOPACK_REGISTRY) + assert "sync_strategy" in columns + assert "change_log_trigger" in columns + assert "change_log" in columns + assert "change_log_function" in columns + assert "change_log_copy_function" in columns + + +def test_when_user_changes_existing_sync_strategy( + connection: _psycopg.Connection, +) -> None: + with _cur.get_cursor(connection, logged=True) as cur: + factories.create_table_for_repacking( + connection=connection, + cur=cur, + table_name="to_repack", + rows=100, + ) + # Initiate psycopack with 'DIRECT_TRIGGER' strategy - this creates the + # Registry table. + Psycopack( + table="to_repack", + batch_size=1, + conn=connection, + cur=cur, + sync_strategy=SyncStrategy.DIRECT_TRIGGER, + ) + with pytest.raises(UnexpectedSyncStrategy): + Psycopack( + table="to_repack", + batch_size=1, + conn=connection, + cur=cur, + sync_strategy=SyncStrategy.CHANGE_LOG, + ) + + def test_table_to_repack_deleted_after_pre_validation( connection: _psycopg.Connection, ) -> None: @@ -922,6 +1025,10 @@ def test_cannot_repeat_finished_stage(connection: _psycopg.Connection) -> None: with pytest.raises(_tracker.StageAlreadyFinished): repack.sync_schemas() + repack.post_sync_update() + with pytest.raises(_tracker.StageAlreadyFinished): + repack.post_sync_update() + repack.swap() with pytest.raises(_tracker.StageAlreadyFinished): repack.swap() @@ -983,7 +1090,12 @@ def test_cannot_skip_order_of_stages(connection: _psycopg.Connection) -> None: repack.sync_schemas() with pytest.raises(_tracker.InvalidPsycopackStep): - # Can't go to clean up without swapping first. + # Can't go to swap without post sync update. + repack.swap() + + repack.post_sync_update() + with pytest.raises(_tracker.InvalidPsycopackStep): + # Can't go to clean up without swap. repack.clean_up() repack.swap() @@ -1028,6 +1140,7 @@ def test_revert_swap_after_swap_called( repack.setup_repacking() repack.backfill() repack.sync_schemas() + repack.post_sync_update() repack.swap() # After the swap, repacking is ready for the clean-up stage. @@ -1433,6 +1546,8 @@ def test_with_writes_when_table_has_negative_pk_values( _do_writes(table="to_repack", cur=cur, check_table=repack.copy_table) repack.sync_schemas() _do_writes(table="to_repack", cur=cur, check_table=repack.copy_table) + repack.post_sync_update() + _do_writes(table="to_repack", cur=cur, check_table=repack.copy_table) repack.swap() _do_writes(table="to_repack", cur=cur, check_table=repack.repacked_name) repack.clean_up() @@ -1625,7 +1740,9 @@ def side_effect(*args: object, **kwargs: object) -> None: repack.pre_validate() # Setting up the copy relations may time out (specially the trigger). - with mock.patch.object(repack.command, "create_copy_trigger") as mocked: + with mock.patch.object( + repack.command, "create_source_to_copy_trigger" + ) as mocked: mocked.side_effect = side_effect with pytest.raises(FailureDueToLockTimeout): repack.setup_repacking() @@ -1651,6 +1768,10 @@ def side_effect(*args: object, **kwargs: object) -> None: # reentrant. repack.sync_schemas() + # It can be called again successfully as the function is idempotent and + # reentrant. + repack.post_sync_update() + # Swapping also has many DDLs that may time out as it is moving the # tables around. with mock.patch.object(repack.command, "rename_table") as mocked: @@ -1696,7 +1817,11 @@ def side_effect(*args: object, **kwargs: object) -> None: ) -def test_reset(connection: _psycopg.Connection) -> None: +@pytest.mark.parametrize( + "sync_strategy", + [SyncStrategy.DIRECT_TRIGGER, SyncStrategy.CHANGE_LOG], +) +def test_reset(connection: _psycopg.Connection, sync_strategy: SyncStrategy) -> None: with _cur.get_cursor(connection, logged=True) as cur: factories.create_table_for_repacking( connection=connection, @@ -1705,12 +1830,18 @@ def test_reset(connection: _psycopg.Connection) -> None: rows=100, ) table_before = _collect_table_info(table="to_repack", connection=connection) + repack = Psycopack( table="to_repack", batch_size=1, conn=connection, cur=cur, + sync_strategy=sync_strategy, + change_log_batch_size=10 + if sync_strategy == SyncStrategy.CHANGE_LOG + else None, ) + # Psycopack hasn't run yet, no reason to reset. with pytest.raises(InvalidStageForReset, match="Psycopack hasn't run yet"): repack.reset() @@ -1741,11 +1872,21 @@ def test_reset(connection: _psycopg.Connection) -> None: repack.reset() _assert_reset(repack, cur) + # Resetting after post_sync_update + repack.pre_validate() + repack.setup_repacking() + repack.backfill() + repack.sync_schemas() + repack.post_sync_update() + repack.reset() + _assert_reset(repack, cur) + # Resetting after swap results in error repack.pre_validate() repack.setup_repacking() repack.backfill() repack.sync_schemas() + repack.post_sync_update() repack.swap() with pytest.raises(InvalidStageForReset, match="reset from the CLEAN_UP stage"): repack.reset() @@ -1761,6 +1902,7 @@ def test_reset(connection: _psycopg.Connection) -> None: repack.setup_repacking() repack.backfill() repack.sync_schemas() + repack.post_sync_update() repack.swap() repack.clean_up() @@ -1828,6 +1970,7 @@ def test_with_non_default_schema(connection: _psycopg.Connection) -> None: repack.setup_repacking() repack.backfill() repack.sync_schemas() + repack.post_sync_update() repack.swap() repack.revert_swap() repack.swap() @@ -2207,6 +2350,7 @@ def test_when_repack_is_reinstantiated_after_swapping( repack.setup_repacking() repack.backfill() repack.sync_schemas() + repack.post_sync_update() repack.swap() # Re-instantiate to trigger the edge-case @@ -2287,3 +2431,175 @@ def test_with_skip_permissions_check( skip_permissions_check=True, ) mocked.assert_not_called() + + +@pytest.mark.parametrize( + "pk_type", + ("bigint", "bigserial", "integer", "serial", "smallint", "smallserial"), +) +def test_repack_with_change_log_strategy( + connection: _psycopg.Connection, pk_type: str +) -> None: + with _cur.get_cursor(connection, logged=True) as cur: + factories.create_table_for_repacking( + connection=connection, + cur=cur, + table_name="to_repack", + rows=100, + pk_type=pk_type, + ) + repack = Psycopack( + table="to_repack", + batch_size=1, + conn=connection, + cur=cur, + sync_strategy=SyncStrategy.CHANGE_LOG, + change_log_batch_size=10, + ) + repack.pre_validate() + repack.setup_repacking() + + create_row_sql = dedent( + """ + INSERT INTO to_repack ( + var_with_btree, + var_with_pattern_ops, + int_with_check, + int_with_not_valid_check, + int_with_long_index_name, + var_with_unique_idx, + var_with_unique_const, + valid_fk, + not_valid_fk, + to_repack, + var_maybe_with_exclusion, + var_with_multiple_idx + ) + SELECT + substring(md5(random()::text), 1, 10), + substring(md5(random()::text), 1, 10), + (floor(random() * 10) + 1)::int, + (floor(random() * 10) + 1)::int, + (floor(random() * 10) + 1)::int, + substring(md5(random()::text), 1, 10), + substring(md5(random()::text), 1, 10), + (floor(random() * 10) + 1)::int, + (floor(random() * 10) + 1)::int, + (floor(random() * 10) + 1)::int, + substring(md5(random()::text), 1, 10), + substring(md5(random()::text), 1, 10) + FROM + generate_series(1, 1) + RETURNING + id + ; + """ + ) + # Insert a row in the table to_repack, to verify the trigger places the + # pk of the row being changed in the change log. + cur.execute(create_row_sql) + + cur.execute(f"SELECT * FROM {repack.change_log};") + # The fixture adds 100 rows, and so the insert above was 101. + assert cur.fetchall() == [(1, 101)] + + # Updating the same row doesn't create a new row in the change log + # and doesn't err due to the "ON CONFLICT DO NOTHING". + cur.execute("UPDATE to_repack SET int_with_check = 42 WHERE id = 101;") + cur.execute(f"SELECT * FROM {repack.change_log};") + assert cur.fetchall() == [(1, 101)] + + # Unless... It's updating the id itself. + # The older id still remains in the table, however. Given that the copy + # function is idempotent, this doesn't matter. But it additionally + # covers for the corner case where the id is inserted again. + cur.execute("UPDATE to_repack SET id = 9999 WHERE id = 101;") + cur.execute(f"SELECT * FROM {repack.change_log};") + assert cur.fetchall() == [(1, 101), (3, 9999)] + + # Deleting the row doesn't create a new row in the change log. + cur.execute("DELETE FROM to_repack WHERE id = 9999;") + cur.execute(f"SELECT * FROM {repack.change_log};") + assert cur.fetchall() == [(1, 101), (3, 9999)] + repack.backfill() + + # Create a row post-backfill, this row shouldn't be in the copy table + # yet as it didn't exist prior to the backfill process. + cur.execute(create_row_sql) + row = cur.fetchone() + assert row is not None + created_row_id = row[0] + assert created_row_id == 102 + cur.execute( + f"SELECT count(*) FROM {repack.copy_table} WHERE id = {created_row_id};" + ) + row = cur.fetchone() + assert row is not None + assert row[0] == 0 + + # But it is in the change log. + cur.execute(f"SELECT * FROM {repack.change_log};") + assert cur.fetchall() == [(1, 101), (3, 9999), (5, 102)] + + # Before syncing the schema, the src-to-copy trigger doesn't exist. + # But the change log trigger and function exist. + cur.execute(f"SELECT 1 FROM pg_trigger WHERE tgname = '{repack.trigger}'") + assert cur.fetchone() is None + cur.execute( + f"SELECT 1 FROM pg_proc WHERE proname = '{repack.change_log_function}'" + ) + assert cur.fetchone() is not None + cur.execute( + f"SELECT 1 FROM pg_trigger WHERE tgname = '{repack.change_log_trigger}'" + ) + assert cur.fetchone() is not None + + repack.sync_schemas() + + # After syncing the schema, the src-to-copy trigger exists. + # But the change log trigger and function don't exist. + cur.execute(f"SELECT 1 FROM pg_trigger WHERE tgname = '{repack.trigger}'") + assert cur.fetchone() is not None + cur.execute( + f"SELECT 1 FROM pg_proc WHERE proname = '{repack.change_log_function}'" + ) + assert cur.fetchone() is None + cur.execute( + f"SELECT 1 FROM pg_trigger WHERE tgname = '{repack.change_log_trigger}'" + ) + assert cur.fetchone() is None + + # Before the post sync-update, there must be three rows in the change + # log (id 101, 102, 9999). + cur.execute(f"SELECT COUNT(*) FROM {repack.change_log};") + row = cur.fetchone() + assert row is not None + assert row[0] == 3 + + repack.post_sync_update() + + # Change log rows have been deleted. + cur.execute(f"SELECT COUNT(*) FROM {repack.change_log};") + row = cur.fetchone() + assert row is not None + assert row[0] == 0 + + table_before = _collect_table_info(table="to_repack", connection=connection) + repack.swap() + + # Before the clean-up, the change_log table is still there. + assert repack.change_log is not None + repack.introspector.get_table_oid(table=repack.change_log) + + repack.clean_up() + + # After the clean-up, the change_log table has been deleted. + repack.introspector.get_table_oid(table=repack.change_log) + + table_after = _collect_table_info(table="to_repack", connection=connection) + _assert_repack( + table_before=table_before, + table_after=table_after, + repack=repack, + cur=cur, + )