Skip to content

Commit

Permalink
Use a custom setting to control which triggers are invoked for Electr…
Browse files Browse the repository at this point in the history
…ic writes

This gives us control over which triggers fire regardless of whether
writes are streamed to Postgres over a logical replication connection or
applied directly as DML statements.
  • Loading branch information
alco committed Nov 27, 2023
1 parent f32d591 commit c19fb5d
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 10 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
CREATE OR REPLACE FUNCTION <%= @schema %>.__session_replication_role(OUT role text) AS $$
BEGIN
SELECT INTO role current_setting('electric.session_replication_role');
EXCEPTION WHEN undefined_object THEN
SELECT INTO role current_setting('session_replication_role');
END;
$$ LANGUAGE PLPGSQL;
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ defmodule Electric.Postgres.Extension.Migrations.Migration_20230512000000_confli
# This function definition is included here because it is referenced in the definition of
# "trigger_function_installers" below it.
Extension.Functions.by_name(:perform_reordered_op_installer_function),
Extension.Functions.by_name(:__session_replication_role),
@contents["trigger_function_installers"],
@contents["shadow_table_creation_and_update"]
# We need to actually run shadow table creation/updates, but that's handled in the next migration.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -508,60 +508,62 @@ BEGIN
CREATE OR REPLACE TRIGGER postgres_write__upsert_generate_shadow_rows
BEFORE INSERT OR UPDATE ON %s
FOR EACH ROW
WHEN (electric.__session_replication_role() <> 'replica')
EXECUTE PROCEDURE electric.%I();
$$, full_table_identifier, generated_functions->>'create_shadow_row_from_upsert');

EXECUTE format($$ ALTER TABLE %s ENABLE TRIGGER postgres_write__upsert_generate_shadow_rows $$, full_table_identifier);
EXECUTE format($$ ALTER TABLE %s ENABLE ALWAYS TRIGGER postgres_write__upsert_generate_shadow_rows $$, full_table_identifier);

EXECUTE format($$
CREATE OR REPLACE TRIGGER postgres_write__delete_generate_shadow_rows
BEFORE DELETE ON %s
FOR EACH ROW
WHEN (electric.__session_replication_role() <> 'replica')
EXECUTE PROCEDURE electric.%I();
$$, full_table_identifier, generated_functions->>'update_shadow_row_from_delete');

EXECUTE format($$ ALTER TABLE %s ENABLE TRIGGER postgres_write__delete_generate_shadow_rows $$, full_table_identifier);
EXECUTE format($$ ALTER TABLE %s ENABLE ALWAYS TRIGGER postgres_write__delete_generate_shadow_rows $$, full_table_identifier);

EXECUTE format($$ DROP TRIGGER IF EXISTS postgres_write__write_resolved_tags ON electric.%I $$, shadow_table_name);
EXECUTE format($$
CREATE CONSTRAINT TRIGGER postgres_write__write_resolved_tags
AFTER UPDATE ON electric.%I
DEFERRABLE INITIALLY DEFERRED
FOR EACH ROW
WHEN (NOT NEW._resolved)
WHEN (electric.__session_replication_role() <> 'replica' AND NOT NEW._resolved)
EXECUTE PROCEDURE electric.%I();
$$, shadow_table_name, generated_functions->>'write_correct_max_tag');

EXECUTE format($$ ALTER TABLE electric.%I ENABLE TRIGGER postgres_write__write_resolved_tags $$, shadow_table_name);
EXECUTE format($$ ALTER TABLE electric.%I ENABLE ALWAYS TRIGGER postgres_write__write_resolved_tags $$, shadow_table_name);

EXECUTE format($$
CREATE OR REPLACE TRIGGER satellite_write__upsert_rows
BEFORE INSERT ON electric.%I
FOR EACH ROW
WHEN (pg_trigger_depth() < 1 AND NEW._currently_reordering IS NULL)
WHEN (electric.__session_replication_role() = 'replica' AND pg_trigger_depth() < 1 AND NEW._currently_reordering IS NULL)
EXECUTE PROCEDURE electric.%I();
$$, shadow_table_name, generated_functions->>'shadow_insert_to_upsert');

EXECUTE format($$ ALTER TABLE electric.%I ENABLE REPLICA TRIGGER satellite_write__upsert_rows $$, shadow_table_name);
EXECUTE format($$ ALTER TABLE electric.%I ENABLE ALWAYS TRIGGER satellite_write__upsert_rows $$, shadow_table_name);

EXECUTE format($$
CREATE OR REPLACE TRIGGER satellite_write__resolve_observed_tags
BEFORE UPDATE ON electric.%I
FOR EACH ROW
WHEN (NEW._currently_reordering IS NULL)
WHEN (electric.__session_replication_role() = 'replica' AND NEW._currently_reordering IS NULL)
EXECUTE PROCEDURE electric.%I();
$$, shadow_table_name, generated_functions->>'resolve_observed_tags');

EXECUTE format($$ ALTER TABLE electric.%I ENABLE REPLICA TRIGGER satellite_write__resolve_observed_tags $$, shadow_table_name);
EXECUTE format($$ ALTER TABLE electric.%I ENABLE ALWAYS TRIGGER satellite_write__resolve_observed_tags $$, shadow_table_name);

EXECUTE format($$
CREATE OR REPLACE TRIGGER satellite_write__save_operation_for_reordering
BEFORE INSERT OR UPDATE ON %s
FOR EACH ROW
WHEN (pg_trigger_depth() < 1)
WHEN (electric.__session_replication_role() = 'replica' AND pg_trigger_depth() < 1)
EXECUTE PROCEDURE electric.%I();
$$, full_table_identifier, generated_functions->>'reorder_main_op');

EXECUTE format($$ ALTER TABLE %s ENABLE REPLICA TRIGGER satellite_write__save_operation_for_reordering $$, full_table_identifier);
EXECUTE format($$ ALTER TABLE %s ENABLE ALWAYS TRIGGER satellite_write__save_operation_for_reordering $$, full_table_identifier);
END
$function$;
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ defmodule Electric.Replication.Postgres.Writer do

conn_opts = Connectors.get_connection_opts(conn_config)
{:ok, conn} = Client.connect(conn_opts)
{:ok, [], []} = :epgsql.squery(conn, "SET electric.session_replication_role = replica")

{:consumer, %{conn: conn, origin: origin, producer_pid: nil}}
end
Expand Down

0 comments on commit c19fb5d

Please sign in to comment.