diff --git a/components/electric/lib/electric/ddlx/command.ex b/components/electric/lib/electric/ddlx/command.ex index ee87735f5c..3725a4ea84 100644 --- a/components/electric/lib/electric/ddlx/command.ex +++ b/components/electric/lib/electric/ddlx/command.ex @@ -88,6 +88,10 @@ defmodule Electric.DDLX.Command do } end + def command_list(%__MODULE__{action: %SatPerms.DDLX{} = ddlx}) do + command_list(ddlx) + end + def command_list(%SatPerms.DDLX{} = ddlx) do Stream.concat([ddlx.grants, ddlx.revokes, ddlx.assigns, ddlx.unassigns, ddlx.sqlite]) end @@ -243,15 +247,9 @@ end defimpl Command.PgSQL, for: SatPerms.DDLX do alias Command - alias Electric.Postgres.Extension - - def to_sql(%SatPerms.DDLX{} = ddlx, ddl_capture, quote_fun) do - Enum.concat([ - serialise_ddlx(ddlx), - ddlx - |> Command.command_list() - |> Enum.flat_map(&Command.PgSQL.to_sql(&1, ddl_capture, quote_fun)) - ]) + + def to_sql(%SatPerms.DDLX{}, _ddl_capture, _quote_fun) do + [] end def validate_schema(%SatPerms.DDLX{} = ddlx, schema, electrified) do @@ -260,14 +258,6 @@ defimpl Command.PgSQL, for: SatPerms.DDLX do |> Enum.to_list() |> Command.PgSQL.validate_schema(schema, electrified) end - - defp serialise_ddlx(ddlx) do - encoded = Protox.encode!(ddlx) |> IO.iodata_to_binary() |> Base.encode16() - - [ - "INSERT INTO #{Extension.ddlx_table()} (ddlx) VALUES ('\\x#{encoded}'::bytea);" - ] - end end defimpl Command.PgSQL, for: SatPerms.Grant do diff --git a/components/electric/lib/electric/postgres/extension.ex b/components/electric/lib/electric/postgres/extension.ex index 87051a72ff..5192f5ce96 100644 --- a/components/electric/lib/electric/postgres/extension.ex +++ b/components/electric/lib/electric/postgres/extension.ex @@ -134,7 +134,9 @@ defmodule Electric.Postgres.Extension do def ddl_relation, do: {@schema, @ddl_relation} def version_relation, do: {@schema, @version_relation} def electrified_tracking_relation, do: {@schema, @electrified_tracking_relation} + def transaction_marker_relation, do: {@schema, @transaction_marker_relation} def ddlx_relation, do: {@schema, @ddlx_commands_relation} + def global_perms_relation, do: {@schema, @global_perms_relation} def acked_client_lsn_relation, do: {@schema, @acked_client_lsn_relation} def publication_name, do: @publication_name @@ -305,17 +307,6 @@ defmodule Electric.Postgres.Extension do end end - # These are tables in the "electric" schema, each of which was added to the publication in - # one of the extension migrations. They can be found by searching the codebase for - # "add_table_to_publication_sql". - @published_extension_tables [ - {@schema, @ddl_relation}, - {@schema, @electrified_tracking_relation}, - {@schema, @transaction_marker_relation}, - {@schema, @acked_client_lsn_relation}, - {@schema, @ddlx_commands_relation} - ] - @doc """ The list of fully-qualified table identifiers that should be included in "#{@publication_name}". @@ -326,11 +317,19 @@ defmodule Electric.Postgres.Extension do def published_tables(conn) do with {:ok, tables} <- electrified_tables(conn) do tables_with_shadows = Enum.flat_map(tables, &[&1, shadow_of(&1)]) - published_tables = Enum.concat(tables_with_shadows, @published_extension_tables) + published_tables = Enum.concat(tables_with_shadows, published_extension_tables()) {:ok, published_tables} end end + defp published_extension_tables(module \\ __MODULE__) do + module + |> migration_versions() + |> Enum.flat_map(fn {_version, migration_module} -> + migration_published_tables(migration_module) + end) + end + def create_table_ddl(conn, %Proto.RangeVar{} = table_name) do name = to_string(table_name) @@ -389,11 +388,10 @@ defmodule Electric.Postgres.Extension do Migrations.Migration_20231206130400_ConvertReplicaTriggersToAlways, Migrations.Migration_20240110110200_DropUnusedFunctions, Migrations.Migration_20240205141200_ReinstallTriggerFunctionWriteCorrectMaxTag, - Migrations.Migration_20240212161153_DDLXCommands, Migrations.Migration_20240213160300_DropGenerateElectrifiedSqlFunction, - Migrations.Migration_20240214131615_PermissionsState, Migrations.Migration_20240417131000_ClientReconnectionInfoTables, - Migrations.Migration_20240501000000_UnsubPoints + Migrations.Migration_20240501000000_UnsubPoints, + Migrations.Migration_20240618152555_DDLXPermissions ] end @@ -444,7 +442,16 @@ defmodule Electric.Postgres.Extension do Logger.info("Running extension migration: #{version}") for sql <- module.up(@schema) do + # guard against adding publications via raw sql to protect consistency of + # `published_tables/1` + if sql =~ ~r/ALTER +PUBLICATION +.?#{@publication_name}.? +ADD +TABLE/, + do: + raise( + "Invalid migration: add relation to `#{module}.published_tables/0` to publish a table" + ) + results = :epgsql.squery(txconn, sql) |> List.wrap() + errors = Enum.filter(results, &(elem(&1, 0) == :error)) if errors == [] do @@ -456,6 +463,17 @@ defmodule Electric.Postgres.Extension do end end + module + |> migration_published_tables() + |> Enum.each(fn + {_schema, _name} = table -> + sql = table |> Electric.Utils.inspect_relation() |> add_table_to_publication_sql() + {:ok, [], []} = :epgsql.squery(txconn, sql) + + name when is_binary(name) -> + raise "migration published_tables/0 should return a list of relations in `{schema, name}` form" + end) + {:ok, 1} = :epgsql.squery( txconn, @@ -465,6 +483,14 @@ defmodule Electric.Postgres.Extension do :ok end + defp migration_published_tables(module) do + if function_exported?(module, :published_tables, 0) do + module.published_tables() + else + [] + end + end + # https://dba.stackexchange.com/a/311714 @is_transaction_sql "SELECT transaction_timestamp() != statement_timestamp() AS is_transaction" diff --git a/components/electric/lib/electric/postgres/extension/migration.ex b/components/electric/lib/electric/postgres/extension/migration.ex index 85c7b7899f..4d2b539881 100644 --- a/components/electric/lib/electric/postgres/extension/migration.ex +++ b/components/electric/lib/electric/postgres/extension/migration.ex @@ -1,9 +1,9 @@ defmodule Electric.Postgres.Extension.Migration do @callback version() :: pos_integer() @callback up(binary()) :: [binary(), ...] - @callback down(binary()) :: [binary()] + @callback published_tables() :: [Electric.Postgres.relation()] @callback replicated_table_ddls() :: [String.t()] - @optional_callbacks replicated_table_ddls: 0 + @optional_callbacks replicated_table_ddls: 0, published_tables: 0 @enforce_keys [:version, :schema, :stmts, :txid, :txts, :timestamp] diff --git a/components/electric/lib/electric/postgres/extension/migrations/20230328113927_setup_extension.ex b/components/electric/lib/electric/postgres/extension/migrations/20230328113927_setup_extension.ex index 2761af4f9e..36347aa4c3 100644 --- a/components/electric/lib/electric/postgres/extension/migrations/20230328113927_setup_extension.ex +++ b/components/electric/lib/electric/postgres/extension/migrations/20230328113927_setup_extension.ex @@ -53,13 +53,14 @@ defmodule Electric.Postgres.Extension.Migrations.Migration_20230328113927 do ################## """ CREATE PUBLICATION "#{publication_name}"; - """, - Extension.add_table_to_publication_sql(ddl_table) + """ ] end @impl true - def down(_schema) do - [] + def published_tables do + [ + Extension.ddl_relation() + ] end end diff --git a/components/electric/lib/electric/postgres/extension/migrations/20230424154425_ddlx.ex b/components/electric/lib/electric/postgres/extension/migrations/20230424154425_ddlx.ex index 536cc7a87f..1faf7f92c9 100644 --- a/components/electric/lib/electric/postgres/extension/migrations/20230424154425_ddlx.ex +++ b/components/electric/lib/electric/postgres/extension/migrations/20230424154425_ddlx.ex @@ -61,9 +61,4 @@ defmodule Electric.Postgres.Extension.Migrations.Migration_20230424154425_DDLX d """ ] end - - @impl true - def down(_schema) do - [] - end end diff --git a/components/electric/lib/electric/postgres/extension/migrations/20230512000000_conflict_resolution_triggers.ex b/components/electric/lib/electric/postgres/extension/migrations/20230512000000_conflict_resolution_triggers.ex index 2cc0aaca94..a71e20e662 100644 --- a/components/electric/lib/electric/postgres/extension/migrations/20230512000000_conflict_resolution_triggers.ex +++ b/components/electric/lib/electric/postgres/extension/migrations/20230512000000_conflict_resolution_triggers.ex @@ -26,9 +26,4 @@ defmodule Electric.Postgres.Extension.Migrations.Migration_20230512000000_confli ] |> Enum.map(&String.replace(&1, "electric", schema)) end - - @impl true - def down(_schema) do - [] - end end diff --git a/components/electric/lib/electric/postgres/extension/migrations/20230605141256_electrify_function.ex b/components/electric/lib/electric/postgres/extension/migrations/20230605141256_electrify_function.ex index 11962f1977..308d0e8f73 100644 --- a/components/electric/lib/electric/postgres/extension/migrations/20230605141256_electrify_function.ex +++ b/components/electric/lib/electric/postgres/extension/migrations/20230605141256_electrify_function.ex @@ -22,13 +22,14 @@ defmodule Electric.Postgres.Extension.Migrations.Migration_20230605141256_Electr ) """, "CREATE INDEX electrified_tracking_table_name_idx ON #{electrified_tracking_table} (schema_name, table_name)", - "CREATE INDEX electrified_tracking_table_name_oid ON #{electrified_tracking_table} (oid)", - Extension.add_table_to_publication_sql(electrified_tracking_table) + "CREATE INDEX electrified_tracking_table_name_oid ON #{electrified_tracking_table} (oid)" ] end @impl true - def down(_schema) do - [] + def published_tables do + [ + Extension.electrified_tracking_relation() + ] end end diff --git a/components/electric/lib/electric/postgres/extension/migrations/20230715000000_utilities_table.ex b/components/electric/lib/electric/postgres/extension/migrations/20230715000000_utilities_table.ex index ad17780e7d..5c2fcdb9e2 100644 --- a/components/electric/lib/electric/postgres/extension/migrations/20230715000000_utilities_table.ex +++ b/components/electric/lib/electric/postgres/extension/migrations/20230715000000_utilities_table.ex @@ -19,11 +19,14 @@ defmodule Electric.Postgres.Extension.Migrations.Migration_20230715000000_Utilit """, """ INSERT INTO #{table} (id, content) VALUES ('magic write', '{}') - """, - Extension.add_table_to_publication_sql(table) + """ ] end @impl true - def down(_), do: [] + def published_tables do + [ + Extension.transaction_marker_relation() + ] + end end diff --git a/components/electric/lib/electric/postgres/extension/migrations/20230814170123_rename_ddlx_functions.ex b/components/electric/lib/electric/postgres/extension/migrations/20230814170123_rename_ddlx_functions.ex index 83716f7d26..4c67749ace 100644 --- a/components/electric/lib/electric/postgres/extension/migrations/20230814170123_rename_ddlx_functions.ex +++ b/components/electric/lib/electric/postgres/extension/migrations/20230814170123_rename_ddlx_functions.ex @@ -165,11 +165,6 @@ defmodule Electric.Postgres.Extension.Migrations.Migration_20230814170123_Rename ] end - @impl true - def down(_schema) do - [] - end - EEx.function_from_file(:defp, :ddlgen_sql, sql_file, [ :schema ]) diff --git a/components/electric/lib/electric/postgres/extension/migrations/20230829000000_acknowledged_client_lsns_table.ex b/components/electric/lib/electric/postgres/extension/migrations/20230829000000_acknowledged_client_lsns_table.ex index d56eb194de..5a8d454ad3 100644 --- a/components/electric/lib/electric/postgres/extension/migrations/20230829000000_acknowledged_client_lsns_table.ex +++ b/components/electric/lib/electric/postgres/extension/migrations/20230829000000_acknowledged_client_lsns_table.ex @@ -20,13 +20,16 @@ defmodule Electric.Postgres.Extension.Migrations.Migration_20230829000000_Acknow WHEN (pg_trigger_depth() < 1) EXECUTE FUNCTION #{schema}.upsert_acknowledged_client_lsn() """, - "ALTER TABLE #{table} ENABLE REPLICA TRIGGER upsert_acknowledged_client_lsn", - Extension.add_table_to_publication_sql(table) + "ALTER TABLE #{table} ENABLE REPLICA TRIGGER upsert_acknowledged_client_lsn" ] end @impl true - def down(_), do: [] + def published_tables do + [ + Extension.acked_client_lsn_relation() + ] + end @impl true def replicated_table_ddls do diff --git a/components/electric/lib/electric/postgres/extension/migrations/20230918115714_add_unique_constraint_ddl_commands.ex b/components/electric/lib/electric/postgres/extension/migrations/20230918115714_add_unique_constraint_ddl_commands.ex index 8b495418dc..9691d4ed6e 100644 --- a/components/electric/lib/electric/postgres/extension/migrations/20230918115714_add_unique_constraint_ddl_commands.ex +++ b/components/electric/lib/electric/postgres/extension/migrations/20230918115714_add_unique_constraint_ddl_commands.ex @@ -18,7 +18,4 @@ defmodule Electric.Postgres.Extension.Migrations.Migration_20230918115714_DDLCom """ ] end - - @impl true - def down(_), do: [] end diff --git a/components/electric/lib/electric/postgres/extension/migrations/20230921161045_drop_event_triggers.ex b/components/electric/lib/electric/postgres/extension/migrations/20230921161045_drop_event_triggers.ex index 1f58fd8595..8f76c0fd68 100644 --- a/components/electric/lib/electric/postgres/extension/migrations/20230921161045_drop_event_triggers.ex +++ b/components/electric/lib/electric/postgres/extension/migrations/20230921161045_drop_event_triggers.ex @@ -24,7 +24,4 @@ defmodule Electric.Postgres.Extension.Migrations.Migration_20230921161045_DropEv """ ] end - - @impl true - def down(_), do: [] end diff --git a/components/electric/lib/electric/postgres/extension/migrations/20230921161418_proxy_compatibility.ex b/components/electric/lib/electric/postgres/extension/migrations/20230921161418_proxy_compatibility.ex index 2dcc321fcf..97368b13d3 100644 --- a/components/electric/lib/electric/postgres/extension/migrations/20230921161418_proxy_compatibility.ex +++ b/components/electric/lib/electric/postgres/extension/migrations/20230921161418_proxy_compatibility.ex @@ -101,7 +101,4 @@ defmodule Electric.Postgres.Extension.Migrations.Migration_20230921161418_ProxyC """ ] end - - @impl true - def down(_), do: [] end diff --git a/components/electric/lib/electric/postgres/extension/migrations/20231009121515_allow_large_migrations.ex b/components/electric/lib/electric/postgres/extension/migrations/20231009121515_allow_large_migrations.ex index 7e4d8405a2..31c1ae8c45 100644 --- a/components/electric/lib/electric/postgres/extension/migrations/20231009121515_allow_large_migrations.ex +++ b/components/electric/lib/electric/postgres/extension/migrations/20231009121515_allow_large_migrations.ex @@ -25,7 +25,4 @@ defmodule Electric.Postgres.Extension.Migrations.Migration_20231009121515_AllowL """ ] end - - @impl true - def down(_), do: [] end diff --git a/components/electric/lib/electric/postgres/extension/migrations/20231010123118_add_priority_to_version.ex b/components/electric/lib/electric/postgres/extension/migrations/20231010123118_add_priority_to_version.ex index cba8c556fc..66a0593ad4 100644 --- a/components/electric/lib/electric/postgres/extension/migrations/20231010123118_add_priority_to_version.ex +++ b/components/electric/lib/electric/postgres/extension/migrations/20231010123118_add_priority_to_version.ex @@ -20,9 +20,4 @@ defmodule Electric.Postgres.Extension.Migrations.Migration_20231010123118_AddPri """ ] end - - @impl true - def down(_schema) do - [] - end end diff --git a/components/electric/lib/electric/postgres/extension/migrations/20231016141000_convert_function_to_procedure.ex b/components/electric/lib/electric/postgres/extension/migrations/20231016141000_convert_function_to_procedure.ex index 5cdee37c4d..d91aac37b7 100644 --- a/components/electric/lib/electric/postgres/extension/migrations/20231016141000_convert_function_to_procedure.ex +++ b/components/electric/lib/electric/postgres/extension/migrations/20231016141000_convert_function_to_procedure.ex @@ -10,9 +10,4 @@ defmodule Electric.Postgres.Extension.Migrations.Migration_20231016141000_Conver def up(schema) do ["DROP ROUTINE IF EXISTS #{schema}.__validate_table_column_types(text)"] end - - @impl true - def down(_schema) do - [] - end end diff --git a/components/electric/lib/electric/postgres/extension/migrations/20231206130400_convert_replica_triggers_to_always.ex b/components/electric/lib/electric/postgres/extension/migrations/20231206130400_convert_replica_triggers_to_always.ex index dd7420f4c1..074039fccd 100644 --- a/components/electric/lib/electric/postgres/extension/migrations/20231206130400_convert_replica_triggers_to_always.ex +++ b/components/electric/lib/electric/postgres/extension/migrations/20231206130400_convert_replica_triggers_to_always.ex @@ -23,9 +23,4 @@ defmodule Electric.Postgres.Extension.Migrations.Migration_20231206130400_Conver String.replace(@migration_sql, "electric", schema) ] end - - @impl true - def down(_schema) do - [] - end end diff --git a/components/electric/lib/electric/postgres/extension/migrations/20240110110200_drop_unused_functions.ex b/components/electric/lib/electric/postgres/extension/migrations/20240110110200_drop_unused_functions.ex index d6f63c973c..eedada7e69 100644 --- a/components/electric/lib/electric/postgres/extension/migrations/20240110110200_drop_unused_functions.ex +++ b/components/electric/lib/electric/postgres/extension/migrations/20240110110200_drop_unused_functions.ex @@ -13,9 +13,4 @@ defmodule Electric.Postgres.Extension.Migrations.Migration_20240110110200_DropUn "DROP ROUTINE IF EXISTS #{schema}.__validate_table_column_types(text)" ] end - - @impl true - def down(_schema) do - [] - end end diff --git a/components/electric/lib/electric/postgres/extension/migrations/20240205141200_reinstall_trigger_function__write_correct_max_tag.ex b/components/electric/lib/electric/postgres/extension/migrations/20240205141200_reinstall_trigger_function__write_correct_max_tag.ex index acb8db7740..44df7a133b 100644 --- a/components/electric/lib/electric/postgres/extension/migrations/20240205141200_reinstall_trigger_function__write_correct_max_tag.ex +++ b/components/electric/lib/electric/postgres/extension/migrations/20240205141200_reinstall_trigger_function__write_correct_max_tag.ex @@ -16,9 +16,4 @@ defmodule Electric.Postgres.Extension.Migrations.Migration_20240205141200_Reinst "CALL #{schema}.reinstall_trigger_function('install_function__write_correct_max_tag')" ] end - - @impl true - def down(_schema) do - [] - end end diff --git a/components/electric/lib/electric/postgres/extension/migrations/20240212161153_ddlx_commands.ex b/components/electric/lib/electric/postgres/extension/migrations/20240212161153_ddlx_commands.ex deleted file mode 100644 index 8c8a7f0f4f..0000000000 --- a/components/electric/lib/electric/postgres/extension/migrations/20240212161153_ddlx_commands.ex +++ /dev/null @@ -1,32 +0,0 @@ -defmodule Electric.Postgres.Extension.Migrations.Migration_20240212161153_DDLXCommands do - alias Electric.Postgres.Extension - - @behaviour Extension.Migration - - @impl true - def version, do: 2024_02_12_16_11_53 - - @impl true - def up(schema) do - ddlx_table = Extension.ddlx_table() - txid_type = Extension.txid_type() - txts_type = Extension.txts_type() - - [ - """ - CREATE TABLE #{ddlx_table} ( - id serial8 NOT NULL PRIMARY KEY, - txid #{txid_type} NOT NULL DEFAULT #{schema}.current_xact_id(), - txts #{txts_type} NOT NULL DEFAULT #{schema}.current_xact_ts(), - ddlx bytea NOT NULL - ); - """, - Extension.add_table_to_publication_sql(ddlx_table) - ] - end - - @impl true - def down(_schema) do - [] - end -end diff --git a/components/electric/lib/electric/postgres/extension/migrations/20240213160300_drop_generate_electrified_sql_function.ex b/components/electric/lib/electric/postgres/extension/migrations/20240213160300_drop_generate_electrified_sql_function.ex index fb857a1a29..f828d1e041 100644 --- a/components/electric/lib/electric/postgres/extension/migrations/20240213160300_drop_generate_electrified_sql_function.ex +++ b/components/electric/lib/electric/postgres/extension/migrations/20240213160300_drop_generate_electrified_sql_function.ex @@ -14,9 +14,4 @@ defmodule Electric.Postgres.Extension.Migrations.Migration_20240213160300_DropGe "DROP ROUTINE IF EXISTS #{schema}.generate_electrified_sql(regclass)" ] end - - @impl true - def down(_schema) do - [] - end end diff --git a/components/electric/lib/electric/postgres/extension/migrations/20240417131000_client_reconnection_info_tables.ex b/components/electric/lib/electric/postgres/extension/migrations/20240417131000_client_reconnection_info_tables.ex index 1809d45022..c7ed68c4be 100644 --- a/components/electric/lib/electric/postgres/extension/migrations/20240417131000_client_reconnection_info_tables.ex +++ b/components/electric/lib/electric/postgres/extension/migrations/20240417131000_client_reconnection_info_tables.ex @@ -57,7 +57,4 @@ defmodule Electric.Postgres.Extension.Migrations.Migration_20240417131000_Client """ ] end - - @impl true - def down(_), do: [] end diff --git a/components/electric/lib/electric/postgres/extension/migrations/20240501000000_client_reconnection_unsub_points.ex b/components/electric/lib/electric/postgres/extension/migrations/20240501000000_client_reconnection_unsub_points.ex index 2bcd57c7f2..25d6295ea7 100644 --- a/components/electric/lib/electric/postgres/extension/migrations/20240501000000_client_reconnection_unsub_points.ex +++ b/components/electric/lib/electric/postgres/extension/migrations/20240501000000_client_reconnection_unsub_points.ex @@ -19,7 +19,4 @@ defmodule Electric.Postgres.Extension.Migrations.Migration_20240501000000_UnsubP """ ] end - - @impl true - def down(_), do: [] end diff --git a/components/electric/lib/electric/postgres/extension/migrations/20240214131615_permissions_state.ex b/components/electric/lib/electric/postgres/extension/migrations/20240618152555_ddlx_permissions.ex similarity index 80% rename from components/electric/lib/electric/postgres/extension/migrations/20240214131615_permissions_state.ex rename to components/electric/lib/electric/postgres/extension/migrations/20240618152555_ddlx_permissions.ex index b244354ad2..e4128b3eb6 100644 --- a/components/electric/lib/electric/postgres/extension/migrations/20240214131615_permissions_state.ex +++ b/components/electric/lib/electric/postgres/extension/migrations/20240618152555_ddlx_permissions.ex @@ -1,14 +1,17 @@ -defmodule Electric.Postgres.Extension.Migrations.Migration_20240214131615_PermissionsState do +defmodule Electric.Postgres.Extension.Migrations.Migration_20240618152555_DDLXPermissions do alias Electric.Postgres.Extension alias Electric.Satellite.SatPerms @behaviour Extension.Migration @impl true - def version, do: 2024_02_14_13_16_15 + def version, do: 2024_06_18_15_25_55 @impl true def up(schema) do + ddlx_table = Extension.ddlx_table() + txid_type = Extension.txid_type() + txts_type = Extension.txts_type() global_perms_table = Extension.global_perms_table() user_perms_table = Extension.user_perms_table() @@ -16,6 +19,14 @@ defmodule Electric.Postgres.Extension.Migrations.Migration_20240214131615_Permis %SatPerms.Rules{id: 1} |> Protox.encode!() |> IO.iodata_to_binary() |> Base.encode16() [ + """ + CREATE TABLE #{ddlx_table} ( + id serial8 NOT NULL PRIMARY KEY, + txid #{txid_type} NOT NULL DEFAULT #{schema}.current_xact_id(), + txts #{txts_type} NOT NULL DEFAULT #{schema}.current_xact_ts(), + ddlx bytea NOT NULL + ); + """, """ CREATE TABLE #{global_perms_table} ( id int8 NOT NULL PRIMARY KEY, @@ -83,7 +94,10 @@ defmodule Electric.Postgres.Extension.Migrations.Migration_20240214131615_Permis end @impl true - def down(_schema) do - [] + def published_tables do + [ + Extension.ddlx_relation(), + Extension.global_perms_relation() + ] end end diff --git a/components/electric/lib/electric/postgres/extension/permissions.ex b/components/electric/lib/electric/postgres/extension/permissions.ex index 4e865983a6..6a688f28a0 100644 --- a/components/electric/lib/electric/postgres/extension/permissions.ex +++ b/components/electric/lib/electric/postgres/extension/permissions.ex @@ -6,7 +6,7 @@ defmodule Electric.Postgres.Extension.Permissions do @user_perms_table Extension.user_perms_table() @shared_global_query """ - SELECT "id", "parent_id", "rules" FROM #{@global_perms_table} + SELECT "rules" FROM #{@global_perms_table} """ @current_global_query """ @@ -43,26 +43,6 @@ defmodule Electric.Postgres.Extension.Permissions do LIMIT 1 """ - # We need to duplicate all the current user perms that, which all depend on the previous version - # of the global rules. This query is complicated by the need to only select the most current - # version of each user's permissions (because for a given rules id, a user may have multiple - # versions of their roles). - @save_global_query """ - WITH global_perms AS ( - INSERT INTO #{@global_perms_table} (id, parent_id, rules) - VALUES ($1, $2, $3) RETURNING id, parent_id - ) - INSERT INTO #{@user_perms_table} (user_id, parent_id, roles, global_perms_id) - SELECT u.*, global_perms.id FROM - (SELECT DISTINCT user_id FROM #{@user_perms_table} ORDER BY user_id) uid - JOIN LATERAL ( - SELECT ui.user_id, ui.id, ui.roles FROM #{@user_perms_table} ui - WHERE ui.user_id = uid.user_id - ORDER BY ui.id DESC - LIMIT 1 - ) u ON TRUE, global_perms - """ - @create_user_query """ WITH global_perms AS ( SELECT id, rules @@ -81,16 +61,27 @@ defmodule Electric.Postgres.Extension.Permissions do FROM user_perms, global_perms """ + def global_rules_query, do: @current_global_query + + def save_global_query(%SatPerms.Rules{id: id, parent_id: parent_id} = rules) + when is_integer(id) and is_integer(parent_id) do + with {:ok, iodata} <- Protox.encode(rules), + bytes = IO.iodata_to_binary(iodata), + hex = Base.encode16(bytes) do + generate_save_global_query("#{id}", "#{parent_id}", "'\\x#{hex}'::bytea") + end + end + def global(conn) do with {:ok, _cols, [row]} <- :epgsql.equery(conn, @current_global_query, []), - {_id, _parent_id, bytes} = row do + {bytes} = row do Protox.decode(bytes, SatPerms.Rules) end end def global(conn, id) do with {:ok, _cols, [row]} <- :epgsql.equery(conn, @specific_global_query, [id]), - {_id, _parent_id, bytes} = row do + {bytes} = row do Protox.decode(bytes, SatPerms.Rules) end end @@ -98,7 +89,12 @@ defmodule Electric.Postgres.Extension.Permissions do def save_global(conn, %SatPerms.Rules{id: id, parent_id: parent_id} = rules) do with {:ok, iodata} <- Protox.encode(rules), bytes = IO.iodata_to_binary(iodata), - {:ok, _users} <- :epgsql.equery(conn, @save_global_query, [id, parent_id, bytes]) do + {:ok, _users} <- + :epgsql.equery(conn, generate_save_global_query("$1", "$2", "$3"), [ + id, + parent_id, + bytes + ]) do :ok end end @@ -146,4 +142,26 @@ defmodule Electric.Postgres.Extension.Permissions do {:ok, %SatPerms{id: id, user_id: user_id, rules: rules, roles: roles.roles}} end end + + defp generate_save_global_query(id, parent_id, rules) do + # We need to duplicate all the current user perms that, which all depend on the previous version + # of the global rules. This query is complicated by the need to only select the most current + # version of each user's permissions (because for a given rules id, a user may have multiple + # versions of their roles). + """ + WITH global_perms AS ( + INSERT INTO #{@global_perms_table} (id, parent_id, rules) + VALUES (#{id}, #{parent_id}, #{rules}) RETURNING id, parent_id + ) + INSERT INTO #{@user_perms_table} (user_id, parent_id, roles, global_perms_id) + SELECT u.*, global_perms.id FROM + (SELECT DISTINCT user_id FROM #{@user_perms_table} ORDER BY user_id) uid + JOIN LATERAL ( + SELECT ui.user_id, ui.id, ui.roles FROM #{@user_perms_table} ui + WHERE ui.user_id = uid.user_id + ORDER BY ui.id DESC + LIMIT 1 + ) u ON TRUE, global_perms + """ + end end diff --git a/components/electric/lib/electric/postgres/extension/schema_cache.ex b/components/electric/lib/electric/postgres/extension/schema_cache.ex index 714368c838..91c47e8fb4 100644 --- a/components/electric/lib/electric/postgres/extension/schema_cache.ex +++ b/components/electric/lib/electric/postgres/extension/schema_cache.ex @@ -160,11 +160,6 @@ defmodule Electric.Postgres.Extension.SchemaCache do call(origin, {:global_permissions, id}) end - @impl SchemaLoader - def save_global_permissions(origin, rules) do - call(origin, {:save_global_permissions, rules}) - end - @impl SchemaLoader def user_permissions(origin, user_id) do call(origin, {:user_permissions, user_id}) @@ -410,16 +405,6 @@ defmodule Electric.Postgres.Extension.SchemaCache do {:reply, SchemaLoader.global_permissions(state.backend, id), state} end - def handle_call({:save_global_permissions, rules}, _from, state) do - case SchemaLoader.save_global_permissions(state.backend, rules) do - {:ok, backend} -> - {:reply, {:ok, state.origin}, %{state | backend: backend}} - - error -> - {:reply, error, state} - end - end - def handle_call({:user_permissions, user_id}, _from, state) do case SchemaLoader.user_permissions(state.backend, user_id) do {:ok, backend, roles} -> diff --git a/components/electric/lib/electric/postgres/extension/schema_loader.ex b/components/electric/lib/electric/postgres/extension/schema_loader.ex index 2e771b5bea..5f12ed31c3 100644 --- a/components/electric/lib/electric/postgres/extension/schema_loader.ex +++ b/components/electric/lib/electric/postgres/extension/schema_loader.ex @@ -49,8 +49,6 @@ defmodule Electric.Postgres.Extension.SchemaLoader do @callback user_permissions(state(), user_id :: binary(), id :: integer()) :: {:ok, %SatPerms{}} | {:error, term()} - @callback save_global_permissions(state(), %SatPerms.Rules{}) :: - {:ok, state()} | {:error, term()} @callback save_user_permissions(state(), user_id :: binary(), %SatPerms.Roles{}) :: {:ok, state(), %SatPerms{}} | {:error, term()} @@ -148,13 +146,6 @@ defmodule Electric.Postgres.Extension.SchemaLoader do module.global_permissions(state, id) end - @impl true - def save_global_permissions({module, state}, rules) do - with {:ok, state} <- module.save_global_permissions(state, rules) do - {:ok, {module, state}} - end - end - @impl true def user_permissions({_module, _state} = loader, nil) do with {:ok, rules} <- global_permissions(loader) do diff --git a/components/electric/lib/electric/postgres/extension/schema_loader/epgsql.ex b/components/electric/lib/electric/postgres/extension/schema_loader/epgsql.ex index 8bfab458d9..1533adaf61 100644 --- a/components/electric/lib/electric/postgres/extension/schema_loader/epgsql.ex +++ b/components/electric/lib/electric/postgres/extension/schema_loader/epgsql.ex @@ -207,15 +207,6 @@ defmodule Electric.Postgres.Extension.SchemaLoader.Epgsql do end) end - @impl SchemaLoader - def save_global_permissions(pool, permissions) do - checkout!(pool, fn conn -> - with :ok <- Extension.Permissions.save_global(conn, permissions) do - {:ok, pool} - end - end) - end - @impl SchemaLoader def user_permissions(pool, user_id) do checkout!(pool, fn conn -> diff --git a/components/electric/lib/electric/postgres/proxy/injector.ex b/components/electric/lib/electric/postgres/proxy/injector.ex index cdbff7a7c0..42689f1ec5 100644 --- a/components/electric/lib/electric/postgres/proxy/injector.ex +++ b/components/electric/lib/electric/postgres/proxy/injector.ex @@ -3,6 +3,7 @@ defmodule Electric.Postgres.Proxy.Injector do alias Electric.Postgres alias Electric.Postgres.Proxy.Injector alias Electric.Postgres.Proxy.Injector.{Operation, Send, State} + alias Electric.Satellite.SatPerms require Logger @@ -20,7 +21,10 @@ defmodule Electric.Postgres.Proxy.Injector do @callback quote_query(String.t()) :: String.t() @callback introspect_tables_query(Postgres.relation() | String.t() | [String.t()]) :: String.t() + @callback lock_rules_table_query() :: String.t() @callback electrified_tables_query() :: String.t() + @callback permissions_rules_query() :: String.t() + @callback save_permissions_rules_query(%SatPerms.Rules{}) :: String.t() @callback capture_ddl_query(query :: binary()) :: binary() @callback capture_version_query(version :: binary(), priority :: integer()) :: binary() @callback alter_shadow_table_query(table_modification()) :: binary() @@ -28,6 +32,8 @@ defmodule Electric.Postgres.Proxy.Injector do @default_mode {Injector.Electric, []} + @behaviour __MODULE__ + def new(opts \\ [], connection) do with {:ok, loader} <- Keyword.fetch(opts, :loader) do query_generator = Keyword.get(opts, :query_generator, __MODULE__) @@ -184,10 +190,22 @@ defmodule Electric.Postgres.Proxy.Injector do "SELECT electric.introspect_tables(#{stmts});" end + def lock_rules_table_query do + "LOCK TABLE #{Electric.Postgres.Extension.global_perms_table()} IN EXCLUSIVE MODE" + end + def electrified_tables_query do Electric.Postgres.Extension.electrified_tables_query() end + def permissions_rules_query do + Electric.Postgres.Extension.Permissions.global_rules_query() + end + + def save_permissions_rules_query(rules) do + Electric.Postgres.Extension.Permissions.save_global_query(rules) + end + defp normalise_name({_, _} = relation) do Electric.Utils.inspect_relation(relation) end diff --git a/components/electric/lib/electric/postgres/proxy/injector/electric.ex b/components/electric/lib/electric/postgres/proxy/injector/electric.ex index 9b54e77011..dcd76e22c4 100644 --- a/components/electric/lib/electric/postgres/proxy/injector/electric.ex +++ b/components/electric/lib/electric/postgres/proxy/injector/electric.ex @@ -5,7 +5,7 @@ defmodule Electric.Postgres.Proxy.Injector.Electric do alias PgProtocol.Message, as: M alias Electric.Postgres.Proxy.{Parser, QueryAnalysis} - alias Electric.Postgres.Proxy.Injector.{Operation, Send, State} + alias Electric.Postgres.Proxy.Injector.{Operation, Send} alias __MODULE__ def command_from_analysis(analysis, state) do @@ -20,17 +20,8 @@ defmodule Electric.Postgres.Proxy.Injector.Electric do [%Operation.Begin{}] end - def command_from_analysis(_msg, %{action: {:tx, :commit}}, state) do - case {State.tx_version?(state), State.electrified?(state)} do - {_, false} -> - [%Operation.Commit{}] - - {true, true} -> - [%Operation.Commit{}] - - {false, true} -> - [%Operation.AssignMigrationVersion{}, %Operation.Commit{}] - end + def command_from_analysis(_msg, %{action: {:tx, :commit}}, _state) do + [%Operation.Commit{}] end def command_from_analysis(_msg, %{action: {:tx, :rollback}}, _state) do @@ -213,6 +204,14 @@ defmodule Electric.Postgres.Proxy.Injector.Electric do end end + defp extended_begin_response(msg, tag \\ "BEGIN", status \\ :tx) + + defp extended_begin_response(%M.Parse{}, _, _), do: %M.ParseComplete{} + defp extended_begin_response(%M.Bind{}, _, _), do: %M.BindComplete{} + defp extended_begin_response(%M.Describe{}, _, _), do: %M.NoData{} + defp extended_begin_response(%M.Execute{}, tag, _), do: %M.CommandComplete{tag: tag} + defp extended_begin_response(%M.Sync{}, _, status), do: %M.ReadyForQuery{status: status} + defp handle_parse(msg, msgs, electric, state) do signal = case List.last(msgs) do @@ -262,17 +261,26 @@ defmodule Electric.Postgres.Proxy.Injector.Electric do # psycopg sends its txn commands using the extended protocol, annoyingly # it uses a [parse, describe, bind, execute, sync] message block, so all we # need to do is pass that on and mark the connection as in a transaction - %{action: {:tx, action}} = _analysis when action in [:begin, :rollback, :commit] -> - state = State.transaction(state, action) + %{action: {:tx, :begin}} = _analysis -> + begin = %Operation.Begin{complete_msgs: Enum.map(msgs, &extended_begin_response/1)} - op = - if Enum.any?(msgs, &is_struct(&1, M.Execute)) do - Operation.Wait.new(msgs, state) - else - %Operation.BindExecute{ops: []} - end + {[begin], {electric, state}} + + %{action: {:tx, :commit}} = _analysis -> + commit = + %Operation.Commit{ + complete_msgs: Enum.map(msgs, &extended_begin_response(&1, "COMMIT", :idle)) + } + + {[commit], {electric, state}} + + %{action: {:tx, :rollback}} = _analysis -> + rollback = + %Operation.Rollback{ + complete_msgs: Enum.map(msgs, &extended_begin_response(&1, "ROLLBACK", :idle)) + } - {[op], {electric, state}} + {[rollback], {electric, state}} analysis -> bind = %Operation.BindExecute{ diff --git a/components/electric/lib/electric/postgres/proxy/injector/operation.ex b/components/electric/lib/electric/postgres/proxy/injector/operation.ex index a5d4f4c77c..1e31ce842d 100644 --- a/components/electric/lib/electric/postgres/proxy/injector/operation.ex +++ b/components/electric/lib/electric/postgres/proxy/injector/operation.ex @@ -235,7 +235,8 @@ defmodule Operation.Impl do Used by Electric DDLX commands which fake the responses from Parse/Describe and Bind/Execute. """ - @spec response(PgProtocol.Message.t(), String.t() | nil, State.t()) :: PgProtocol.Message.t() + @spec response(PgProtocol.Message.t(), String.t() | nil, State.t()) :: + PgProtocol.Message.t() def response(msg, tag \\ nil, state) def response(%M.Parse{}, _tag, _state) do @@ -487,113 +488,275 @@ defmodule Operation.Between do end end -defmodule Operation.Begin do - defstruct hidden?: false +defmodule Operation.AssignMigrationVersion do + defstruct [:version] + + alias __MODULE__, as: O defimpl Operation do use Operation.Impl - def activate(op, state, send) do - {if(op.hidden?, do: op, else: nil), State.begin(state), - Send.server(send, [%M.Query{query: "BEGIN"}])} + @generated_version_priority 0 + @session_version_priority 2 + @tx_version_priority 4 + + def activate(%O{} = op, state, send) do + if State.capture_version?(state) do + {version, priority, state} = migration_version(op, state) + sql = state.query_generator.capture_version_query(version, priority) + {op, State.tx_version(state, version), Send.server(send, [query(sql)])} + else + {nil, state, send} + end + end + + defp migration_version(%O{version: nil}, state) do + case State.retrieve_version_metadata(state) do + {{:ok, version}, state} -> + # this version is coming from some previous query, outside the + # current transaction so give it a priority < the priority of any tx + # assigned version. + {version, @session_version_priority, state} + + {:error, state} -> + # priority 0 will only be used if the automatic version assignment + # wasn't called for some reason + {generate_version(state), @generated_version_priority, state} + end + end + + defp migration_version(%O{version: version}, state) + when is_binary(version) do + # this version is coming from the current transaction, so give it the + # highest priority of all these options + {version, @tx_version_priority, state} + end + + defp generate_version(state) do + state.query_generator.migration_version() end end end -defmodule Operation.Rollback do - defstruct hidden?: false +defmodule IntrospectionQuery do + defstruct [:query, :callback, rows: []] defimpl Operation do use Operation.Impl def activate(op, state, send) do - {if(op.hidden?, do: op, else: nil), State.rollback(state), - Send.server(send, [%M.Query{query: "ROLLBACK"}])} + {op, state, Send.server(send, %M.Query{query: op.query})} + end + + def recv_server(op, %M.RowDescription{}, state, send) do + {op, state, send} + end + + def recv_server(op, %M.DataRow{fields: row}, state, send) do + {%{op | rows: [row | op.rows]}, state, send} + end + + def recv_server(op, %M.CommandComplete{}, state, send) do + {op, state, send} + end + + def recv_server(op, %M.ReadyForQuery{}, state, send) do + callback = op.callback || (&null_callback/3) + {state, send} = callback.(Enum.reverse(op.rows), state, send) + {nil, state, send} + end + + defp null_callback(_rows, state, send) do + {state, send} end end end -defmodule Operation.Commit do - defstruct hidden?: false +defmodule Operation.Begin do + defstruct hidden?: false, + complete_msgs: [ + %M.CommandComplete{tag: "BEGIN"}, + %M.ReadyForQuery{status: :tx} + ], + introspect: [:perms], + rules: nil + + alias Electric.Satellite.Permissions + + alias __MODULE__, as: O defimpl Operation do use Operation.Impl def activate(op, state, send) do - {if(op.hidden?, do: op, else: nil), State.commit(state), - Send.server(send, [%M.Query{query: "COMMIT"}])} + {op, State.begin(state), Send.server(send, [%M.Query{query: "BEGIN"}])} end - def send_error(_op, state, send) do - %{client: client} = Send.flush(send) + def recv_server(%O{hidden?: true} = op, %M.ReadyForQuery{}, state, send) do + introspect(op, state, send) + end - Operation.activate( - [ - %Operation.Rollback{hidden?: true}, - Operation.Pass.client([client]) - ], + def recv_server(%O{} = op, %M.ReadyForQuery{}, state, send) do + introspect( + op, state, - Send.new() + send ) end - def recv_error(_op, msgs, state, _send) do - Operation.activate( - [ - %Operation.Rollback{hidden?: true}, - Operation.Pass.client(msgs) - ], - state, - Send.new() - ) + def recv_server(%O{} = op, _msg, state, send) do + {op, state, send} + end + + defp introspect(op, state, send) do + stack = [ + %IntrospectionQuery{ + query: state.query_generator.permissions_rules_query(), + callback: fn [[data]], state, send -> + {:ok, rules} = Permissions.State.decode_rules(data) + {State.tx_permissions(state, rules), send} + end + }, + if(op.hidden?, do: [], else: Operation.Pass.client(op.complete_msgs)) + ] + + Operation.activate(stack, state, send) end end end -defmodule Operation.AssignMigrationVersion do - defstruct [:version] +defmodule Operation.Rollback do + defstruct hidden?: false, + complete_msgs: [ + %M.CommandComplete{tag: "ROLLBACK"}, + %M.ReadyForQuery{status: :idle} + ] defimpl Operation do use Operation.Impl - @generated_version_priority 0 - @session_version_priority 2 - @tx_version_priority 4 + def activate(op, state, send) do + {op, state, Send.server(send, [%M.Query{query: "ROLLBACK"}])} + end + + def recv_server(%{hidden?: false} = op, %M.ReadyForQuery{}, state, send) do + {nil, State.rollback(state), Send.client(send, op.complete_msgs)} + end + + def recv_server(%{hidden?: true}, %M.ReadyForQuery{}, state, send) do + {nil, State.rollback(state), send} + end + + def recv_server(op, _msg, state, send) do + {op, state, send} + end + end +end + +defmodule Operation.Map do + defstruct msgs: [], response: [] + + defimpl Operation do + use Operation.Impl def activate(op, state, send) do - if State.electrified?(state) do - {version, priority, state} = migration_version(op, state) - sql = state.query_generator.capture_version_query(version, priority) - {op, State.tx_version(state, version), Send.server(send, [query(sql)])} + {op, state, Send.server(send, op.msgs)} + end + + def recv_server(op, %M.ReadyForQuery{}, state, send) do + {nil, state, Send.client(send, op.response)} + end + + def recv_server(op, _msg, state, send) do + {op, state, send} + end + end +end + +defmodule Operation.SavePermissionsRules do + defstruct [] + + defimpl Operation do + use Operation.Impl + + def activate(op, state, send) do + if rules = State.permissions_modified(state) do + query = state.query_generator.save_permissions_rules_query(rules) + + {op, state, Send.server(send, %M.Query{query: query})} else {nil, state, send} end end - defp migration_version(%Operation.AssignMigrationVersion{version: nil}, state) do - case State.retrieve_version_metadata(state) do - {{:ok, version}, state} -> - # this version is coming from some previous query, outside the - # current transaction so give it a priority < the priority of any tx - # assigned version. - {version, @session_version_priority, state} + def recv_server(_op, %M.ReadyForQuery{}, state, send) do + {nil, State.permissions_saved(state), send} + end - {:error, state} -> - # priority 0 will only be used if the automatic version assignment - # wasn't called for some reason - {generate_version(state), @generated_version_priority, state} + def recv_server(op, _msg, state, send) do + {op, state, send} + end + end +end + +defmodule Operation.Commit do + defstruct hidden?: false, + complete_msgs: [ + %M.CommandComplete{tag: "COMMIT"}, + %M.ReadyForQuery{status: :idle} + ] + + defmodule Complete do + defstruct [] + + defimpl Operation do + use Operation.Impl + + def activate(_op, state, send) do + {nil, State.commit(state), send} end end + end - defp migration_version(%Operation.AssignMigrationVersion{version: version}, state) - when is_binary(version) do - # this version is coming from the current transaction, so give it the - # highest priority of all these options - {version, @tx_version_priority, state} + defimpl Operation do + use Operation.Impl + + def activate(op, state, send) do + stack = [ + %Operation.SavePermissionsRules{}, + %Operation.AssignMigrationVersion{}, + %Operation.Map{msgs: [%M.Query{query: "COMMIT"}], response: response_msgs(op)}, + %Complete{} + ] + + Operation.activate(stack, state, send) end - defp generate_version(state) do - state.query_generator.migration_version() + defp response_msgs(%{hidden?: true}), do: [] + defp response_msgs(%{hidden?: false, complete_msgs: msgs}), do: msgs + + def send_error(_op, state, send) do + %{client: client} = Send.flush(send) + + Operation.activate( + [ + %Operation.Rollback{hidden?: true}, + Operation.Pass.client([client]) + ], + state, + Send.new() + ) + end + + def recv_error(_op, msgs, state, _send) do + Operation.activate( + [ + %Operation.Rollback{hidden?: true}, + Operation.Pass.client(msgs) + ], + state, + Send.new() + ) end end end @@ -617,7 +780,6 @@ defmodule Operation.Simple do op, %Operation.Between{ commands: [ - %Operation.AssignMigrationVersion{}, %Operation.Commit{hidden?: true} ], status: :idle @@ -708,7 +870,7 @@ defmodule Operation.Electric do :queries, :mode, :initial_query, - introspect: [:electrified, :ddl], + introspect: [:lock, :electrified, :ddl], schema: nil, tables: MapSet.new(), ddl: [] @@ -731,6 +893,7 @@ defmodule Operation.Electric do state = op |> tables() |> Enum.reduce(state, &State.electrify(&2, &1)) + # TODO: refactor as a stack of ops send_query(op, state, send) end @@ -753,6 +916,10 @@ defmodule Operation.Electric do send_query(%{op | introspect: rest}, state, send) end + def recv_server(%O{introspect: [:lock | rest]} = op, %M.ReadyForQuery{}, state, send) do + send_query(%{op | introspect: rest}, state, send) + end + # this ready for query is the end of the ddlx introspection query def recv_server(%O{introspect: [:ddl | _]} = op, %M.DataRow{fields: [ddl]}, state, send) do schema = Schema.update(op.schema, ddl, oid_loader: &oid_loader/3) @@ -810,7 +977,7 @@ defmodule Operation.Electric do end def recv_server( - %O{introspect: false, queries: [query | queries]} = op, + %O{introspect: [], queries: [query | queries]} = op, %M.ReadyForQuery{}, state, send @@ -822,6 +989,14 @@ defmodule Operation.Electric do {op, state, send} end + defp send_query(%O{introspect: [:lock | _]} = op, state, send) do + query = state.query_generator.lock_rules_table_query() + + Logger.debug(fn -> "Locking global rules table" end) + + {op, state, Send.server(send, query(query))} + end + defp send_query(%O{introspect: [:electrified | _]} = op, state, send) do query = state.query_generator.electrified_tables_query() @@ -844,10 +1019,34 @@ defmodule Operation.Electric do defp send_query(%O{introspect: []} = op, state, send) do ddl = Enum.reverse(op.ddl) - [query | queries] = - DDLX.Command.proxy_sql(op.command, ddl, &state.query_generator.quote_query/1) - - {%{op | queries: queries}, state, Send.server(send, query(query))} + # TODO: rename this proxy_sql function, should be command_sql or + # something. only electrify returns something here perms commands return + # nothing for this + # + # TODO: we also need to run actions based on the command so for an + # assign, we need to query the assign table in this txn in order to + # generate roles for all the existing entries in the assign table by + # iterating the rows in that table and running the assign triggers + # against them. this is much better than my idea of capturing a snapshot + # and using it later in the replication consumer. + case DDLX.Command.proxy_sql(op.command, ddl, &state.query_generator.quote_query/1) do + [query | queries] -> + {%{op | queries: queries}, state, Send.server(send, query(query))} + + [] -> + tag = DDLX.Command.tag(op.command) + + reply = + case op.mode do + :simple -> + [%M.CommandComplete{tag: tag}, %M.ReadyForQuery{status: :tx}] + + :extended -> + [] + end + + {nil, State.update_permissions(state, op.command), Send.client(send, reply)} + end end # we don't need real oids @@ -1049,7 +1248,6 @@ defmodule Operation.AutoTx do op.ops, %Operation.Between{ commands: [ - %Operation.AssignMigrationVersion{}, %Operation.Commit{hidden?: true} ], status: :idle diff --git a/components/electric/lib/electric/postgres/proxy/injector/state.ex b/components/electric/lib/electric/postgres/proxy/injector/state.ex index 799581df39..d7e866f1e8 100644 --- a/components/electric/lib/electric/postgres/proxy/injector/state.ex +++ b/components/electric/lib/electric/postgres/proxy/injector/state.ex @@ -14,8 +14,10 @@ defmodule Electric.Postgres.Proxy.Injector.State do id: 0, tables: %{}, rules: nil, + rules_modifications: 0, schema: nil, - failed: false + failed: false, + permissions: %{} @type t() :: %__MODULE__{ electrified: boolean(), @@ -23,24 +25,21 @@ defmodule Electric.Postgres.Proxy.Injector.State do tables: %{Postgres.relation() => true}, id: pos_integer(), rules: nil | %SatPerms.Rules{}, + rules_modifications: non_neg_integer(), schema: nil | Postgres.Schema.t(), - failed: boolean() + failed: boolean(), + permissions: %{module() => term()} } def new(loader) do - # TODO: These rules and schema version could be inconsistent with the database + # TODO: These schema version could be inconsistent with the database # there could be a migration in the replication stream that hasn't reached # our state maintenance consumer (MigrationConsumer) # Perhaps we could move the schema mutation/update to within the proxy itself # and provide a way to retrieve based on txid or something. - # - # We also need to maintain the permissions state in sync with the current - # transaction. - {:ok, rules} = SchemaLoader.global_permissions(loader) {:ok, schema_version} = SchemaLoader.load(loader) %__MODULE__{ - rules: rules, schema: schema_version.schema, id: System.unique_integer([:positive, :monotonic]) } @@ -211,6 +210,19 @@ defmodule Electric.Postgres.Proxy.Injector.State do end end + def capture_version?(%__MODULE__{} = state) do + case {tx_version?(state), electrified?(state)} do + {_, false} -> + false + + {true, true} -> + false + + {false, true} -> + true + end + end + def assign_version_metadata(%__MODULE__{} = state, version) do Map.update!(state, :metadata, &Map.put(&1, :version, to_string(version))) end @@ -231,4 +243,38 @@ defmodule Electric.Postgres.Proxy.Injector.State do def failed?(%__MODULE__{tx: nil}), do: false def failed?(%__MODULE__{tx: tx}), do: tx.failed + + def txn_permissions(%__MODULE__{tx: nil}, _rules) do + raise "no in transaction" + end + + def tx_permissions(%__MODULE__{} = state, rules) do + Map.update!(state, :tx, fn tx -> + %{tx | rules: rules, permissions: Map.put(tx.permissions, Electric.DDLX, rules)} + end) + end + + def update_permissions(%__MODULE__{} = state, %Electric.DDLX.Command{} = command) do + {:ok, n, rules} = + Electric.Satellite.Permissions.State.apply_ddlx_txn(command.action, state.tx.rules) + + Map.update!( + state, + :tx, + &%{&1 | rules: rules, rules_modifications: &1.rules_modifications + n} + ) + end + + def permissions_modified(%__MODULE__{tx: %{rules_modifications: n, rules: rules}}) + when n > 0 do + Electric.Satellite.Permissions.State.commit(rules) + end + + def permissions_modified(%__MODULE__{}) do + nil + end + + def permissions_saved(%__MODULE__{tx: tx} = state) do + %{state | tx: %{tx | rules_modifications: 0}} + end end diff --git a/components/electric/lib/electric/postgres/schema/update/alter_table.ex b/components/electric/lib/electric/postgres/schema/update/alter_table.ex index 5c2b4c8386..6ad3bd8727 100644 --- a/components/electric/lib/electric/postgres/schema/update/alter_table.ex +++ b/components/electric/lib/electric/postgres/schema/update/alter_table.ex @@ -159,7 +159,7 @@ defmodule Electric.Postgres.Schema.Update.AlterTable do def update(%{node: {:alter_table_cmd, %{subtype: subtype} = act}}, table) do Logger.warning( - "Un-supported alter table statement on #{table.name} #{subtype}: #{inspect(act)}" + "Ignoring unsupported alter table statement on #{table.name} #{subtype}: #{inspect(act)}" ) {[], table} diff --git a/components/electric/lib/electric/satellite/permissions.ex b/components/electric/lib/electric/satellite/permissions.ex index d3f8a8ab37..8b37c19bb7 100644 --- a/components/electric/lib/electric/satellite/permissions.ex +++ b/components/electric/lib/electric/satellite/permissions.ex @@ -447,6 +447,8 @@ defmodule Electric.Satellite.Permissions do defp rebuild(perms) do %{id: id, roles: roles, rules: rules, schema: schema_version} = perms.source + Logger.debug(fn -> "Built perms using #{inspect(rules)}" end) + assigned_roles = build_roles(roles, perms.auth, rules.assigns) scoped_roles = compile_scopes(assigned_roles) evaluator = Eval.new(schema_version, perms.auth) diff --git a/components/electric/lib/electric/satellite/permissions/state.ex b/components/electric/lib/electric/satellite/permissions/state.ex index a76ec63a92..415f0af7e9 100644 --- a/components/electric/lib/electric/satellite/permissions/state.ex +++ b/components/electric/lib/electric/satellite/permissions/state.ex @@ -14,6 +14,7 @@ defmodule Electric.Satellite.Permissions.State do require Logger @electric_ddlx Extension.ddlx_relation() + @electric_rules Extension.global_perms_relation() @enforce_keys [:rules, :schema] @@ -38,6 +39,13 @@ defmodule Electric.Satellite.Permissions.State do def new(loader) do with {:ok, schema_version} <- SchemaLoader.load(loader), {:ok, rules} <- SchemaLoader.global_permissions(loader) do + # The rules we load here can be ahead of the replication stream, so we + # may receive updates to the rules that are before the rules we load + # here, and we will potentially be applying triggers from assigns that + # didn't exist when a row from an assigns table was updated. + # + # This is ok though, things will eventually resolve to a consistent + # state. {:ok, create_triggers(%__MODULE__{rules: rules, schema: schema_version})} end end @@ -76,61 +84,86 @@ defmodule Electric.Satellite.Permissions.State do {:ok, changes, state, loader} end - # useful function for testing creation of global state - @doc false - def update_global(%SatPerms.DDLX{} = ddlx, loader) do - with {:ok, rules} <- SchemaLoader.global_permissions(loader) do - case mutate_global(ddlx, rules) do - {rules, 0} -> - {:ok, 0, loader, rules} - - {rules, n} -> - with {:ok, loader} <- SchemaLoader.save_global_permissions(loader, rules) do - {:ok, n, loader, rules} - end - end + def apply_ddlx(ddlx, rules) do + case apply_ddlx_txn(ddlx, rules) do + {:ok, 0, rules} -> + {:ok, 0, rules} + + {:ok, n, rules} -> + {:ok, n, commit(rules)} end end - defp apply_changes([%{relation: @electric_ddlx} | _] = changes, {state, loader}) do - {:ok, rules} = SchemaLoader.global_permissions(loader) + def apply_ddlx!(ddlx, rules) do + {:ok, _n, rules} = apply_ddlx(ddlx, rules) + rules + end - case Enum.reduce(changes, {rules, 0}, &apply_global_change/2) do - {_rules, 0} -> - {[], {state, loader}} + def apply_ddlx_txn( + %Electric.DDLX.Command{action: %SatPerms.DDLX{} = ddlx}, + %SatPerms.Rules{} = rules + ) do + apply_ddlx_txn(ddlx, rules) + end - {rules, _count} -> - Logger.debug(fn -> "Updated global permissions id: #{rules.id}" end) - {:ok, loader} = SchemaLoader.save_global_permissions(loader, rules) + def apply_ddlx_txn(%Electric.DDLX.Command{action: _}, %SatPerms.Rules{} = rules) do + rules + end - { - [updated_global_permissions(rules)], - {create_triggers(%{state | rules: rules}), loader} - } + def apply_ddlx_txn(%SatPerms.DDLX{} = ddlx, %SatPerms.Rules{} = rules) do + with {rules, n} <- mutate_global(ddlx, rules) do + {:ok, n, rules} end end - defp apply_changes(changes, {state, loader}) do - {changes, {_triggers, loader}} = - Enum.flat_map_reduce(changes, {state.triggers, loader}, &apply_triggers/2) + def apply_ddlx_txn(_action, %SatPerms.Rules{} = rules) do + {:ok, 0, rules} + end - {changes, {state, loader}} + def apply_ddlx_txn!(ddlx, %SatPerms.Rules{} = rules) do + {:ok, _, rules} = apply_ddlx_txn(ddlx, rules) + rules + end + + @doc """ + Should be called before saving the permissions state to set up the + permissions `id` and `parent_id` + """ + @spec commit(%SatPerms.Rules{}) :: %SatPerms.Rules{} + def commit(%SatPerms.Rules{} = rules) do + increment_id(rules) end - # the ddlx table is insert-only - defp apply_global_change(%Changes.NewRecord{} = change, {rules, count}) do - %{record: %{"ddlx" => ddlx_bytes}} = change + # just ignoring ddlx commmands for now. Because perms state mutations are + # done in the proxy and arrive here fully-formed, the only ddlx command we + # currently receive are `ELECTRIC SQLITE...`, which currently do nothing. + defp apply_changes([%{relation: @electric_ddlx} | _], {state, loader}) do + {[], {state, loader}} + end - pb_bytes = - case ddlx_bytes do - "\\x" <> rest -> Base.decode16!(rest, case: :lower) - bytes -> bytes - end + defp apply_changes([%{relation: @electric_rules} | _] = changes, {state, loader}) do + # we can just take the last insert and skip any intermediate states + %{record: %{"rules" => bytes}} = List.last(changes) - {:ok, ddlx} = - Protox.decode(pb_bytes, SatPerms.DDLX) + {:ok, new_rules} = decode_pb(bytes, SatPerms.Rules) - mutate_global(ddlx, rules, count) + if new_rules.id > state.rules.id do + Logger.debug(fn -> "Updated global permissions id: #{new_rules.id}" end) + + { + [updated_global_permissions(new_rules)], + {create_triggers(%{state | rules: new_rules}), loader} + } + else + {[], {state, loader}} + end + end + + defp apply_changes(changes, {state, loader}) do + {changes, {_triggers, loader}} = + Enum.flat_map_reduce(changes, {state.triggers, loader}, &apply_triggers/2) + + {changes, {state, loader}} end defp apply_triggers(change, {triggers, loader}) do @@ -235,7 +268,7 @@ defmodule Electric.Satellite.Permissions.State do end defp mutate_global(%SatPerms.DDLX{} = ddlx, rules, count) do - {apply_ddlx(rules, ddlx, count == 0), count + count_changes(ddlx)} + {do_apply_ddlx(rules, ddlx), count + count_changes(ddlx)} end defp role_match?(role1, role2) do @@ -277,16 +310,14 @@ defmodule Electric.Satellite.Permissions.State do # # Public only for its usefulness in tests. @doc false - @spec apply_ddlx(%SatPerms.Rules{}, %SatPerms.DDLX{}) :: %SatPerms.Rules{} - def apply_ddlx(rules, ddlx, is_first? \\ true) + @spec do_apply_ddlx(%SatPerms.Rules{}, %SatPerms.DDLX{}) :: %SatPerms.Rules{} - def apply_ddlx(%SatPerms.Rules{} = rules, %SatPerms.DDLX{} = ddlx, is_first?) do + defp do_apply_ddlx(%SatPerms.Rules{} = rules, %SatPerms.DDLX{} = ddlx) do rules |> update_grants(ddlx.grants) |> update_revokes(ddlx.revokes) |> update_assigns(ddlx.assigns) |> update_unassigns(ddlx.unassigns) - |> increment_id(is_first?) end defp update_grants(rules, grants) do @@ -329,14 +360,10 @@ defmodule Electric.Satellite.Permissions.State do end) end - defp increment_id(%{id: id} = rules, true) do + defp increment_id(%{id: id} = rules) do %{rules | id: id + 1, parent_id: id} end - defp increment_id(rules, false) do - rules - end - defp count_changes(ddlx) do [:grants, :revokes, :assigns, :unassigns] |> Enum.reduce(0, fn key, count -> @@ -369,4 +396,18 @@ defmodule Electric.Satellite.Permissions.State do %{state | triggers: triggers} end + + def decode_rules(bytes) do + decode_pb(bytes, SatPerms.Rules) + end + + defp decode_pb(bytes, message) do + pb_bytes = + case bytes do + "\\x" <> rest -> Base.decode16!(rest, case: :lower) + bytes -> bytes + end + + Protox.decode(pb_bytes, message) + end end diff --git a/components/electric/priv/sql_function_templates/electrify.sql.eex b/components/electric/priv/sql_function_templates/electrify.sql.eex index ad2bb4cc09..031f0db511 100644 --- a/components/electric/priv/sql_function_templates/electrify.sql.eex +++ b/components/electric/priv/sql_function_templates/electrify.sql.eex @@ -14,7 +14,7 @@ BEGIN RAISE WARNING E'Electrification via procedure call is deprecated.' - '\n\nPlease use the `ALTER %I.%I ENABLE ELECTRIC` syntax via the migrations proxy.' + '\n\nPlease use the `ALTER "%"."%" ENABLE ELECTRIC` syntax via the migrations proxy.' '\n\nSee https://electric-sql.com/docs/usage/data-modelling/migrations#migrations-proxy', _schema, _table; -- duplicate validations now run in elixir using a resolved schema diff --git a/components/electric/priv/sql_function_templates/electrify/electrify_with_ddl.sql.eex b/components/electric/priv/sql_function_templates/electrify/electrify_with_ddl.sql.eex index 708a3f6787..1b4ccd7d17 100644 --- a/components/electric/priv/sql_function_templates/electrify/electrify_with_ddl.sql.eex +++ b/components/electric/priv/sql_function_templates/electrify/electrify_with_ddl.sql.eex @@ -21,7 +21,7 @@ BEGIN IF NOT EXISTS ( SELECT oid FROM <%= electrified_tracking_table() %> WHERE schema_name = _schema AND table_name = _table LIMIT 1 ) THEN - RAISE NOTICE 'Electrify table %I.%I', _schema, _table; + RAISE NOTICE 'Electrify table %.%', _schema, _table; RAISE DEBUG '%', ddl; diff --git a/components/electric/test/electric/ddlx/command_test.exs b/components/electric/test/electric/ddlx/command_test.exs index 2f86229687..fdee49c594 100644 --- a/components/electric/test/electric/ddlx/command_test.exs +++ b/components/electric/test/electric/ddlx/command_test.exs @@ -37,73 +37,25 @@ defmodule Electric.DDLX.CommandTest do test "ELECTRIC ASSIGN" do ddlx = "ELECTRIC ASSIGN (projects, memberships.role) TO memberships.user_id" - assert [ - ~S[INSERT INTO "electric"."ddlx_commands" (ddlx) VALUES ('\x] <> hex - ] = pg_sql(ddlx) - - assert %SatPerms.DDLX{assigns: [assign]} = parse_pb(hex) - - assert %SatPerms.Assign{ - table: %{schema: "public", name: "memberships"}, - scope: %{schema: "public", name: "projects"}, - user_column: "user_id", - role_column: "role" - } = assign + assert [] = pg_sql(ddlx) end test "ELECTRIC UNASSIGN" do ddlx = "ELECTRIC UNASSIGN (projects, memberships.role) FROM memberships.user_id" - assert [ - ~S[INSERT INTO "electric"."ddlx_commands" (ddlx) VALUES ('\x] <> hex - ] = pg_sql(ddlx) - - assert %SatPerms.DDLX{unassigns: [unassign]} = parse_pb(hex) - - assert %SatPerms.Unassign{ - table: %{schema: "public", name: "memberships"}, - scope: %{schema: "public", name: "projects"}, - user_column: "user_id", - role_column: "role" - } = unassign + assert [] = pg_sql(ddlx) end test "ELECTRIC GRANT" do ddlx = "ELECTRIC GRANT INSERT ON issues TO (projects, 'member')" - assert [ - ~S[INSERT INTO "electric"."ddlx_commands" (ddlx) VALUES ('\x] <> hex - ] = pg_sql(ddlx) - - assert %SatPerms.DDLX{grants: [grant]} = parse_pb(hex) - - assert %SatPerms.Grant{ - privilege: :INSERT, - table: %{schema: "public", name: "issues"}, - role: %SatPerms.RoleName{role: {:application, "member"}}, - columns: nil, - scope: %{schema: "public", name: "projects"}, - path: nil, - check: nil - } = grant + assert [] = pg_sql(ddlx) end test "ELECTRIC REVOKE" do ddlx = "ELECTRIC REVOKE INSERT ON issues FROM (projects, 'member')" - assert [ - ~S[INSERT INTO "electric"."ddlx_commands" (ddlx) VALUES ('\x] <> hex - ] = pg_sql(ddlx) - - assert %SatPerms.DDLX{revokes: [revoke]} = parse_pb(hex) - - assert %SatPerms.Revoke{ - privilege: :INSERT, - table: %{schema: "public", name: "issues"}, - role: %SatPerms.RoleName{role: {:application, "member"}}, - scope: %{schema: "public", name: "projects"}, - path: nil - } = revoke + assert [] = pg_sql(ddlx) end end end diff --git a/components/electric/test/electric/features_test.exs b/components/electric/test/electric/features_test.exs index 16a54cfeff..b4faea52ce 100644 --- a/components/electric/test/electric/features_test.exs +++ b/components/electric/test/electric/features_test.exs @@ -151,11 +151,12 @@ defmodule Electric.FeaturesTest do test "lists merged flag state", cxt do Features.process_override([disabled_feature: true], cxt.name) - assert Features.list(cxt.name) == [ - __default__: false, - disabled_feature: true, - enabled_feature: true - ] + assert Features.list(cxt.name) |> Enum.sort() == + Enum.sort( + __default__: false, + disabled_feature: true, + enabled_feature: true + ) end end end diff --git a/components/electric/test/electric/postgres/extension/schema_loader/epgsql_test.exs b/components/electric/test/electric/postgres/extension/schema_loader/epgsql_test.exs index 2e089a00fe..e497e2a840 100644 --- a/components/electric/test/electric/postgres/extension/schema_loader/epgsql_test.exs +++ b/components/electric/test/electric/postgres/extension/schema_loader/epgsql_test.exs @@ -43,7 +43,7 @@ defmodule Electric.Postgres.Extension.SchemaLoader.EpgsqlTest do ] } - assert {:ok, _loader} = SchemaLoader.save_global_permissions(loader, rules) + assert :ok = Extension.Permissions.save_global(conn, rules) {loader, rules} end @@ -58,7 +58,7 @@ defmodule Electric.Postgres.Extension.SchemaLoader.EpgsqlTest do assert {:ok, %SatPerms.Rules{id: 1} = _rules} = SchemaLoader.global_permissions(loader, 1) end - test_tx "save_global_permissions/2", fn conn -> + test_tx "save_global/2", fn conn -> loader = epgsql_loader(conn) rules = @@ -88,7 +88,7 @@ defmodule Electric.Postgres.Extension.SchemaLoader.EpgsqlTest do ] } - assert {:ok, _loader} = SchemaLoader.save_global_permissions(loader, rules) + assert :ok = Extension.Permissions.save_global(conn, rules) assert {:ok, %SatPerms.Rules{id: 2} = ^rules} = SchemaLoader.global_permissions(loader) end @@ -150,7 +150,7 @@ defmodule Electric.Postgres.Extension.SchemaLoader.EpgsqlTest do SchemaLoader.user_permissions(loader, "e815dfe6-f64d-472a-a322-bfc9e7993d27") end - test_tx "save_global_permissions/2 migrates existing user roles", fn conn -> + test_tx "save_global/2 migrates existing user roles", fn conn -> {loader, rules} = epgsql_loader_with_rules(conn) assert {:ok, _loader, @@ -199,9 +199,9 @@ defmodule Electric.Postgres.Extension.SchemaLoader.EpgsqlTest do ] ) - rules = State.apply_ddlx(rules, ddlx) + rules = State.apply_ddlx!(ddlx, rules) - assert {:ok, _loader} = SchemaLoader.save_global_permissions(loader, rules) + assert :ok = Extension.Permissions.save_global(conn, rules) assert {:ok, _loader, %SatPerms{ @@ -231,9 +231,9 @@ defmodule Electric.Postgres.Extension.SchemaLoader.EpgsqlTest do ] ) - rules = State.apply_ddlx(rules, ddlx) + rules = State.apply_ddlx!(ddlx, rules) - assert {:ok, _loader} = SchemaLoader.save_global_permissions(loader, rules) + assert :ok = Extension.Permissions.save_global(conn, rules) assert {:ok, _loader, %SatPerms{ diff --git a/components/electric/test/electric/postgres/extension_test.exs b/components/electric/test/electric/postgres/extension_test.exs index ca926a6cca..83c037a505 100644 --- a/components/electric/test/electric/postgres/extension_test.exs +++ b/components/electric/test/electric/postgres/extension_test.exs @@ -23,13 +23,24 @@ defmodule Electric.Postgres.ExtensionTest do "CREATE TABLE #{schema}.things (id uuid PRIMARY KEY)" ] end + end + + defmodule MigrationInvalidPublication do + @behaviour Electric.Postgres.Extension.Migration @impl true - def down(schema) do + def version(), do: 2024_06_25_11_36_50 + + @impl true + def up(_schema) do [ - "DROP TABLE #{schema}.things CASCADE" + Extension.add_table_to_publication_sql("electric.my_table") ] end + + def migrations do + [__MODULE__] + end end def migrations do @@ -105,6 +116,17 @@ defmodule Electric.Postgres.ExtensionTest do tx(&migrate/1, cxt) end + test "adding table to publication via sql raises", cxt do + tx( + fn conn -> + assert_raise RuntimeError, fn -> + migrate_module(conn, cxt, MigrationInvalidPublication) + end + end, + cxt + ) + end + test_tx "we can retrieve and set the current schema json", fn conn -> assert {:ok, nil, %Schema.Proto.Schema{tables: []}} = Extension.current_schema(conn) schema = Schema.new() diff --git a/components/electric/test/electric/postgres/proxy/ddlx_modification_test.exs b/components/electric/test/electric/postgres/proxy/ddlx_modification_test.exs new file mode 100644 index 0000000000..688506ec39 --- /dev/null +++ b/components/electric/test/electric/postgres/proxy/ddlx_modification_test.exs @@ -0,0 +1,160 @@ +defmodule Electric.Postgres.Proxy.DDLXModificationTest do + use ExUnit.Case, async: true + use Electric.Postgres.MockSchemaLoader + + alias PgProtocol.Message, as: M + alias Electric.Postgres.Proxy.Injector + alias Electric.DDLX + alias Electric.Satellite.Permissions + + alias Electric.Postgres.Proxy.TestScenario + + import Electric.Postgres.Proxy.TestScenario + + @projects "CREATE TABLE public.projects (id uuid PRIMARY KEY, name text)" + setup do + # enable all the optional ddlx features + Electric.Features.process_override( + proxy_grant_write_permissions: true, + proxy_ddlx_sqlite: true + ) + + migrations = [ + {"0001", + [ + "CREATE TABLE public.users (id uuid PRIMARY KEY, name text)", + @projects, + "CREATE TABLE public.project_memberships (id uuid PRIMARY KEY, project_id uuid REFERENCES projects (id), user_id uuid REFERENCES users (id), role text)", + "CREATE TABLE public.issues (id uuid PRIMARY KEY, name text, project_id uuid REFERENCES projects (id))" + ]} + ] + + spec = MockSchemaLoader.backend_spec(migrations: migrations) + + {:ok, loader} = SchemaLoader.connect(spec, []) + + {:ok, injector} = + Injector.new( + [loader: loader, query_generator: TestScenario.MockInjector], + username: "electric", + database: "electric" + ) + + {:ok, rules} = SchemaLoader.global_permissions(loader) + + %{injector: injector, loader: loader, rules: rules} + end + + defp rules(rules, stmts) do + stmts + |> Enum.map(fn stmt -> + {:ok, command} = DDLX.parse(stmt) + + {stmt, command} + end) + |> Enum.map_reduce(rules, fn {stmt, command}, rules -> + {{stmt, command}, Permissions.State.apply_ddlx_txn!(command.action, rules)} + end) + |> then(fn {stmts, rules} -> + {stmts, Permissions.State.commit(rules)} + end) + end + + test "ddlx updates are saved", cxt do + ddlx = [ + "ELECTRIC ASSIGN (projects, project_memberships.role) TO project_memberships.user_id", + "ELECTRIC GRANT ALL ON projects TO 'member'" + ] + + {[{ddlx1, command1}, {ddlx2, command2}], rules} = rules(cxt.rules, ddlx) + + cxt.injector + |> electric_begin(client: begin()) + |> electric_preamble([client: ddlx1], command1) + |> server( + introspect_result(@projects), + client: [complete_ready("ELECTRIC ASSIGN", :tx)] + ) + |> electric_preamble([client: ddlx2], command2) + |> server( + introspect_result(@projects), + client: [complete_ready("ELECTRIC GRANT", :tx)] + ) + |> client(commit(), server: [save_permissions_rules_query(rules)]) + |> server(complete_ready("INSERT 1", :tx), server: [capture_version_query()]) + |> server(complete_ready("INSERT 1", :tx), server: commit()) + |> server(complete_ready("COMMIT", :idle)) + end + + test "electrification commands are passed through", cxt do + ddlx = "ALTER TABLE something ENABLE ELECTRIC" + table_schema = "create table something (id uuid primary key, value text)" + {:ok, command} = DDLX.parse(ddlx) + [call] = proxy_sql(command, table_schema) + + cxt.injector + |> electric_begin(client: begin()) + |> electric_preamble([client: ddlx], command) + |> server(introspect_result(table_schema), server: [call]) + |> server(electric_call_complete(), + client: [complete_ready("ELECTRIC ENABLE", :tx)] + ) + |> client(commit(), server: [capture_version_query()]) + |> server(complete_ready("INSERT 1", :tx), server: commit()) + |> server(complete_ready("COMMIT", :idle)) + end + + test "psycopg ddlx", cxt do + ddlx = [ + "ELECTRIC ASSIGN (projects, project_memberships.role) TO project_memberships.user_id", + "ELECTRIC GRANT ALL ON projects TO 'member'" + ] + + {[{ddlx1, command1}, {ddlx2, command2}], rules} = rules(cxt.rules, ddlx) + + cxt.injector + |> electric_begin( + [ + client: [%M.Parse{query: "BEGIN"}, %M.Bind{}, %M.Describe{}, %M.Execute{}, %M.Sync{}] + ], + client: [ + %M.ParseComplete{}, + %M.BindComplete{}, + %M.NoData{}, + %M.CommandComplete{tag: "BEGIN"}, + %M.ReadyForQuery{status: :tx} + ] + ) + |> electric_preamble([client: [%M.Query{query: ddlx1}]], command1) + |> server( + introspect_result(@projects), + client: [complete_ready("ELECTRIC ASSIGN", :tx)] + ) + |> electric_preamble([client: ddlx2], command2) + |> server( + introspect_result(@projects), + client: [complete_ready("ELECTRIC GRANT", :tx)] + ) + |> client( + [ + %M.Parse{name: "", query: "COMMIT", params: []}, + %M.Bind{}, + %M.Describe{}, + %M.Execute{}, + %M.Sync{} + ], + server: [save_permissions_rules_query(rules)] + ) + |> server(complete_ready("INSERT 1", :tx), server: [capture_version_query()]) + |> server(complete_ready("INSERT 1", :tx), server: commit()) + |> server(complete_ready("COMMIT", :idle), + client: [ + %M.ParseComplete{}, + %M.BindComplete{}, + %M.NoData{}, + %M.CommandComplete{tag: "COMMIT"}, + %M.ReadyForQuery{status: :idle} + ] + ) + end +end diff --git a/components/electric/test/electric/postgres/proxy/injector/electric_test.exs b/components/electric/test/electric/postgres/proxy/injector/electric_test.exs index fe142d5fc5..78711e09d2 100644 --- a/components/electric/test/electric/postgres/proxy/injector/electric_test.exs +++ b/components/electric/test/electric/postgres/proxy/injector/electric_test.exs @@ -7,6 +7,8 @@ defmodule Electric.Postgres.Proxy.Injector.ElectricTest do alias Electric.Postgres.Proxy.{Injector, Parser} alias Electric.Postgres.Proxy.TestScenario + alias ElectricTest.PermissionsHelpers.Perms + def simple(sql), do: %M.Query{query: sql} def analyse(sql, cxt) when is_binary(sql) do @@ -34,7 +36,12 @@ defmodule Electric.Postgres.Proxy.Injector.ElectricTest do {:ok, loader} = SchemaLoader.connect(spec, []) - state = %Injector.State{loader: loader} + rules = Perms.to_rules([]) + + state = + %Injector.State{loader: loader} + |> Injector.State.begin() + |> Injector.State.tx_permissions(rules) {:ok, state: state, loader: loader} end diff --git a/components/electric/test/electric/postgres/proxy/injector_test.exs b/components/electric/test/electric/postgres/proxy/injector_test.exs index 2c9b4d460f..ef2300c626 100644 --- a/components/electric/test/electric/postgres/proxy/injector_test.exs +++ b/components/electric/test/electric/postgres/proxy/injector_test.exs @@ -413,22 +413,15 @@ defmodule Electric.Postgres.Proxy.InjectorTest do %M.CommandComplete{tag: "MY TAG"}, %M.ReadyForQuery{status: :idle} ]) - |> client( - parse_describe( - "CREATE TABLE IF NOT EXISTS \"schema_migrations\" (\"version\" bigint, \"inserted_at\" timestamp(0), PRIMARY KEY (\"version\"))" - ), - server: [begin()] - ) - |> server(complete_ready("BEGIN", :tx), - server: + |> electric_begin( + client: parse_describe( "CREATE TABLE IF NOT EXISTS \"schema_migrations\" (\"version\" bigint, \"inserted_at\" timestamp(0), PRIMARY KEY (\"version\"))" ) ) |> server(parse_describe_complete()) |> client(bind_execute()) - |> server(bind_execute_complete("CREATE TABLE", :tx), server: commit()) - |> server(complete_ready("COMMIT", :idle), + |> electric_commit([server: bind_execute_complete("CREATE TABLE", :tx)], client: bind_execute_complete("CREATE TABLE", :idle) ) |> idle!() @@ -478,10 +471,7 @@ defmodule Electric.Postgres.Proxy.InjectorTest do ) ) |> server(bind_execute_complete("INSERT 1")) - |> client(query(query), - server: [begin()] - ) - |> server(complete_ready("BEGIN", :tx), + |> electric_begin([client: query], server: [query("CREATE TABLE something (id uuid PRIMARY KEY, value text)")] ) |> electric_preamble([server: complete_ready("CREATE TABLE")], command) @@ -519,8 +509,7 @@ defmodule Electric.Postgres.Proxy.InjectorTest do ] ) |> server(alter_shadow_table_complete(), server: capture_version_query(version, 2)) - |> server(capture_version_complete(), server: commit()) - |> server(complete_ready("COMMIT", :idle), + |> electric_commit([server: capture_version_complete()], client: [ complete("CREATE TABLE"), complete("ELECTRIC ENABLE"), @@ -543,10 +532,7 @@ defmodule Electric.Postgres.Proxy.InjectorTest do test "close, sync, parse, describe, sync", cxt do cxt.injector - |> client( - [%M.Close{}, %M.Sync{} | parse_describe_sync("SELECT version()")] - # server: [%M.Close{}, %M.Sync{}] - ) + |> client([%M.Close{}, %M.Sync{} | parse_describe_sync("SELECT version()")]) |> server([ %M.CloseComplete{}, %M.ReadyForQuery{status: :idle} | parse_describe_sync_complete(:idle) @@ -585,8 +571,7 @@ defmodule Electric.Postgres.Proxy.InjectorTest do query = Enum.join([query1 <> ";", query2 <> ";", query3 <> ";", query4 <> ";"], "\n") cxt.injector - |> client(begin()) - |> server(complete_ready("BEGIN")) + |> electric_begin(client: begin()) |> client(query(query), server: query(query1)) |> server(complete_ready("CREATE TABLE"), server: query(query2)) |> server(complete_ready("CREATE FUNCTION1"), server: query(query3)) @@ -600,8 +585,7 @@ defmodule Electric.Postgres.Proxy.InjectorTest do ready(:tx) ] ) - |> client(commit()) - |> server(complete_ready("COMMIT", :idle)) + |> electric_commit(client: commit()) |> idle!() end @@ -615,8 +599,7 @@ defmodule Electric.Postgres.Proxy.InjectorTest do query = Enum.join([query1 <> ";", query2 <> ";"], "\n") cxt.injector - |> client(begin()) - |> server(complete_ready("BEGIN")) + |> electric_begin(client: begin()) |> client(query(query), server: query(query1)) |> server(complete_ready("CREATE FUNCTION1"), server: query(query2)) |> server(complete_ready("ALTER TABLE"), @@ -626,28 +609,23 @@ defmodule Electric.Postgres.Proxy.InjectorTest do ready(:tx) ] ) - |> client(commit()) - |> server(complete_ready("COMMIT", :idle)) + |> electric_commit(client: commit()) |> idle!() end test "drop random things", cxt do cxt.injector - |> client(begin()) - |> server(complete_ready("BEGIN")) + |> electric_begin(client: begin()) |> client(query("DROP FUNCTION IF EXISTS \"electric.ddlx_sql_drop_handler\" CASCADE")) |> server(complete_ready("DROP FUNCTION")) - |> client(commit()) - |> server(complete_ready("COMMIT", :idle)) + |> electric_commit(client: commit()) |> idle!() cxt.injector - |> client(begin()) - |> server(complete_ready("BEGIN")) + |> electric_begin(client: begin()) |> client(query("DROP EVENT TRIGGER IF EXISTS \"electric_event_trigger_sql_drop\" CASCADE")) |> server(complete_ready("DROP EVENT TRIGGER")) - |> client(commit()) - |> server(complete_ready("COMMIT", :idle)) + |> electric_commit(client: commit()) |> idle!() end @@ -678,8 +656,7 @@ defmodule Electric.Postgres.Proxy.InjectorTest do "INSERT INTO \"atdatabases_migrations_applied\"\n (\n index, name, script,\n applied_at, ignored_error, obsolete\n )\nVALUES\n (\n $1, $2, $3,\n $4,\n $5,\n $6\n )" cxt.injector - |> client(query("BEGIN")) - |> server(complete_ready("BEGIN", :tx)) + |> electric_begin(client: begin()) |> electric_preamble([client: query("ALTER TABLE public.socks ENABLE ELECTRIC;")], command) |> server(introspect_result(ddl), server: electric) |> server(complete_ready("CALL", :tx), client: complete_ready("ELECTRIC ENABLE")) @@ -739,8 +716,7 @@ defmodule Electric.Postgres.Proxy.InjectorTest do %M.ReadyForQuery{status: :tx} ] ) - |> client(query("COMMIT")) - |> server(complete_ready("COMMIT", :idle)) + |> electric_commit(client: commit()) |> idle!() end @@ -756,8 +732,7 @@ defmodule Electric.Postgres.Proxy.InjectorTest do "INSERT INTO \"atdatabases_migrations_applied\"\n (\n index, name, script,\n applied_at, ignored_error, obsolete\n )\nVALUES\n (\n $1, $2, $3,\n $4,\n $5,\n $6\n )" cxt.injector - |> client(query("BEGIN")) - |> server(complete_ready("BEGIN", :tx)) + |> electric_begin(client: begin()) |> electric_preamble([client: query("ALTER TABLE public.socks ENABLE ELECTRIC;")], command) |> server(introspect_result(ddl), server: electric) |> server(complete_ready("CALL", :tx), client: complete_ready("ELECTRIC ENABLE")) @@ -819,27 +794,24 @@ defmodule Electric.Postgres.Proxy.InjectorTest do %M.ReadyForQuery{status: :tx} ] ) - |> client(query("COMMIT")) - |> server(complete_ready("COMMIT", :idle)) + |> electric_commit(client: commit()) |> idle!() end test "psycopg transactions", cxt do cxt.injector - |> client([ - %M.Parse{query: "BEGIN"}, - %M.Bind{}, - %M.Describe{}, - %M.Execute{}, - %M.Sync{} - ]) - |> server([ - %M.ParseComplete{}, - %M.BindComplete{}, - %M.NoData{}, - %M.CommandComplete{tag: "BEGIN"}, - %M.ReadyForQuery{status: :tx} - ]) + |> electric_begin( + [ + client: [%M.Parse{query: "BEGIN"}, %M.Bind{}, %M.Describe{}, %M.Execute{}, %M.Sync{}] + ], + client: [ + %M.ParseComplete{}, + %M.BindComplete{}, + %M.NoData{}, + %M.CommandComplete{tag: "BEGIN"}, + %M.ReadyForQuery{status: :tx} + ] + ) |> client(%M.Query{query: "select pg_catalog.version()"}) |> server([ %M.RowDescription{ @@ -863,44 +835,47 @@ defmodule Electric.Postgres.Proxy.InjectorTest do %M.CommandComplete{tag: "SELECT 1"}, %M.ReadyForQuery{status: :tx} ]) - |> client([ - %M.Parse{name: "", query: "ROLLBACK", params: []}, - %M.Bind{ - portal: "", - source: "", - parameters: [], - parameter_format_codes: [], - result_format_codes: [0] - }, - %M.Describe{type: "P", name: ""}, - %M.Execute{portal: "", max_rows: 0}, - %M.Sync{} - ]) - |> server([ - %M.ParseComplete{}, - %M.BindComplete{}, - %M.NoData{}, - %M.CommandComplete{tag: "ROLLBACK"}, - %M.ReadyForQuery{status: :idle} - ]) + |> client( + [ + %M.Parse{name: "", query: "ROLLBACK", params: []}, + %M.Bind{ + portal: "", + source: "", + parameters: [], + parameter_format_codes: [], + result_format_codes: [0] + }, + %M.Describe{type: "P", name: ""}, + %M.Execute{portal: "", max_rows: 0}, + %M.Sync{} + ], + server: [rollback()] + ) + |> server(complete_ready("ROLLBACK", :idle), + client: [ + %M.ParseComplete{}, + %M.BindComplete{}, + %M.NoData{}, + %M.CommandComplete{tag: "ROLLBACK"}, + %M.ReadyForQuery{status: :idle} + ] + ) end test "psycopg savepoints", cxt do cxt.injector - |> client([ - %M.Parse{query: "BEGIN"}, - %M.Bind{}, - %M.Describe{}, - %M.Execute{}, - %M.Sync{} - ]) - |> server([ - %M.ParseComplete{}, - %M.BindComplete{}, - %M.NoData{}, - %M.CommandComplete{tag: "BEGIN"}, - %M.ReadyForQuery{status: :tx} - ]) + |> electric_begin( + [ + client: [%M.Parse{query: "BEGIN"}, %M.Bind{}, %M.Describe{}, %M.Execute{}, %M.Sync{}] + ], + client: [ + %M.ParseComplete{}, + %M.BindComplete{}, + %M.NoData{}, + %M.CommandComplete{tag: "BEGIN"}, + %M.ReadyForQuery{status: :tx} + ] + ) |> client([ %M.Parse{name: "", query: "SAVEPOINT \"_pg3_1\"", params: []}, %M.Bind{}, @@ -929,26 +904,31 @@ defmodule Electric.Postgres.Proxy.InjectorTest do %M.CommandComplete{tag: "RELEASE"}, %M.ReadyForQuery{status: :tx} ]) - |> client([ - %M.Parse{name: "", query: "ROLLBACK", params: []}, - %M.Bind{ - portal: "", - source: "", - parameters: [], - parameter_format_codes: [], - result_format_codes: [0] - }, - %M.Describe{type: "P", name: ""}, - %M.Execute{portal: "", max_rows: 0}, - %M.Sync{} - ]) - |> server([ - %M.ParseComplete{}, - %M.BindComplete{}, - %M.NoData{}, - %M.CommandComplete{tag: "ROLLBACK"}, - %M.ReadyForQuery{status: :idle} - ]) + |> client( + [ + %M.Parse{name: "", query: "ROLLBACK", params: []}, + %M.Bind{ + portal: "", + source: "", + parameters: [], + parameter_format_codes: [], + result_format_codes: [0] + }, + %M.Describe{type: "P", name: ""}, + %M.Execute{portal: "", max_rows: 0}, + %M.Sync{} + ], + server: rollback() + ) + |> server(complete_ready("ROLLBACK", :idle), + client: [ + %M.ParseComplete{}, + %M.BindComplete{}, + %M.NoData{}, + %M.CommandComplete{tag: "ROLLBACK"}, + %M.ReadyForQuery{status: :idle} + ] + ) end end diff --git a/components/electric/test/electric/postgres/proxy/query_analyser_test.exs b/components/electric/test/electric/postgres/proxy/query_analyser_test.exs index aa5809007a..b65c75192e 100644 --- a/components/electric/test/electric/postgres/proxy/query_analyser_test.exs +++ b/components/electric/test/electric/postgres/proxy/query_analyser_test.exs @@ -13,18 +13,18 @@ defmodule Electric.Postgres.Proxy.QueryAnalyserTest do def simple(sql), do: %M.Query{query: sql} def extended(sql, attrs \\ []), do: struct(M.Parse, Keyword.put(attrs, :query, sql)) - describe "analyse/2" do - alias Electric.Postgres.Proxy.QueryAnalysis + def analyse(sql, cxt) when is_binary(sql) do + analyse(simple(sql), cxt) + end - def analyse(sql, cxt) when is_binary(sql) do - analyse(simple(sql), cxt) + def analyse(msg, cxt) do + with {:ok, stmts} <- Parser.parse(msg) do + Enum.map(stmts, &Parser.analyse(&1, cxt.state)) end + end - def analyse(msg, cxt) do - with {:ok, stmts} <- Parser.parse(msg) do - Enum.map(stmts, &Parser.analyse(&1, cxt.state)) - end - end + describe "analyse/2" do + alias Electric.Postgres.Proxy.QueryAnalysis setup(cxt) do # enable all the optional ddlx features @@ -43,15 +43,15 @@ defmodule Electric.Postgres.Proxy.QueryAnalyserTest do spec = MockSchemaLoader.backend_spec(migrations: migrations) - {:ok, loader} = - SchemaLoader.connect(spec, []) + {:ok, loader} = SchemaLoader.connect(spec, []) ddlx = Map.get(cxt, :ddlx, []) rules = Perms.to_rules(ddlx) - {:ok, loader} = SchemaLoader.save_global_permissions(loader, rules) - - state = %State{loader: loader} + state = + %State{loader: loader} + |> State.begin() + |> State.tx_permissions(rules) {:ok, state: state, loader: loader} end diff --git a/components/electric/test/electric/postgres/proxy/schema_validation_test.exs b/components/electric/test/electric/postgres/proxy/schema_validation_test.exs index 5e6e906c03..37ee54de65 100644 --- a/components/electric/test/electric/postgres/proxy/schema_validation_test.exs +++ b/components/electric/test/electric/postgres/proxy/schema_validation_test.exs @@ -82,9 +82,7 @@ defmodule Electric.Postgres.Proxy.SchemaValidationTest do end defp txn(injector) do - injector - |> client("BEGIN") - |> server(complete_ready("BEGIN", :tx)) + electric_begin(injector, client: begin()) end defp start_transaction(cxt) do @@ -282,8 +280,7 @@ defmodule Electric.Postgres.Proxy.SchemaValidationTest do {:ok, command} = DDLX.parse(ddlx) cxt.injector - |> client(begin()) - |> server(complete_ready("BEGIN", :tx)) + |> electric_begin(client: begin()) |> electric_preamble([client: ddlx], command) |> server( introspect_result(ddl), @@ -303,8 +300,9 @@ defmodule Electric.Postgres.Proxy.SchemaValidationTest do ddl = create_table_ddl(relation, columns) cxt.injector - |> client("ALTER TABLE #{name} ENABLE ELECTRIC", server: "BEGIN") - |> electric_preamble([server: complete_ready("BEGIN", :tx)], command) + |> client("ALTER TABLE #{name} ENABLE ELECTRIC", server: begin()) + |> server(complete_ready("BEGIN", :tx), server: permissions_rules_query()) + |> electric_preamble([server: rules_query_result()], command) |> server( introspect_result(ddl), server: "ROLLBACK" @@ -322,8 +320,7 @@ defmodule Electric.Postgres.Proxy.SchemaValidationTest do {:ok, command} = DDLX.parse(ddlx) cxt.injector - |> client(begin()) - |> server(complete_ready("BEGIN", :tx)) + |> electric_begin(client: begin()) |> electric_preamble([client: ddlx], command) |> server( introspect_result(ddl), diff --git a/components/electric/test/electric/replication/postgres/migration_consumer_test.exs b/components/electric/test/electric/replication/postgres/migration_consumer_test.exs index c19dd1b322..7cb403a484 100644 --- a/components/electric/test/electric/replication/postgres/migration_consumer_test.exs +++ b/components/electric/test/electric/replication/postgres/migration_consumer_test.exs @@ -407,24 +407,28 @@ defmodule Electric.Replication.Postgres.MigrationConsumerTest do describe "permissions" do alias ElectricTest.PermissionsHelpers.Proto alias ElectricTest.PermissionsHelpers.Chgs + alias ElectricTest.PermissionsHelpers.Perms test "converts ddlx events into global permission change messages", cxt do %{origin: origin, producer: producer, version: version} = cxt assert_receive {MockSchemaLoader, {:connect, _}} + {loader, rules} = + Perms.rules(cxt.loader, + assigns: [ + Proto.assign( + table: Proto.table("project_memberships"), + user_column: "user_id", + role_column: "project_role", + scope: Proto.table("projects") + ) + ] + ) + raw_events = [ %Transaction{ changes: [ - Chgs.ddlx( - assigns: [ - Proto.assign( - table: Proto.table("project_memberships"), - user_column: "user_id", - role_column: "project_role", - scope: Proto.table("projects") - ) - ] - ) + Chgs.rules(rules) ], commit_timestamp: ~U[2023-05-02 10:08:00.948788Z], origin: origin, @@ -433,7 +437,7 @@ defmodule Electric.Replication.Postgres.MigrationConsumerTest do } ] - GenStage.call(producer, {:emit, cxt.loader, raw_events, version}) + GenStage.call(producer, {:emit, loader, raw_events, version}) assert_receive {FakeConsumer, :events, filtered_events}, 1000 @@ -456,19 +460,22 @@ defmodule Electric.Replication.Postgres.MigrationConsumerTest do %{origin: origin, producer: producer, version: version} = cxt assert_receive {MockSchemaLoader, {:connect, _}} + {loader, rules} = + Perms.rules(cxt.loader, + assigns: [ + Proto.assign( + table: Proto.table("project_memberships"), + user_column: "user_id", + role_column: "project_role", + scope: Proto.table("projects") + ) + ] + ) + raw_events = [ %Transaction{ changes: [ - Chgs.ddlx( - assigns: [ - Proto.assign( - table: Proto.table("project_memberships"), - user_column: "user_id", - role_column: "project_role", - scope: Proto.table("projects") - ) - ] - ) + Chgs.rules(rules) ], commit_timestamp: ~U[2023-05-02 10:08:00.948788Z], origin: origin, @@ -477,8 +484,7 @@ defmodule Electric.Replication.Postgres.MigrationConsumerTest do } ] - GenStage.call(producer, {:emit, cxt.loader, raw_events, version}) - assert_receive {MockSchemaLoader, {:save_global_permissions, _}}, 500 + GenStage.call(producer, {:emit, loader, raw_events, version}) assert_receive {FakeConsumer, :events, _filtered_events}, 1000 @@ -503,7 +509,7 @@ defmodule Electric.Replication.Postgres.MigrationConsumerTest do } ] - GenStage.call(producer, {:emit, cxt.loader, raw_events, version}) + GenStage.call(producer, {:emit, loader, raw_events, version}) assert_receive {FakeConsumer, :events, filtered_events}, 1000 @@ -532,6 +538,18 @@ defmodule Electric.Replication.Postgres.MigrationConsumerTest do %{origin: origin, producer: producer, version: version} = cxt assert_receive {MockSchemaLoader, {:connect, _}} + {loader, rules} = + Perms.rules(cxt.loader, + assigns: [ + Proto.assign( + table: Proto.table("team_memberships"), + user_column: "user_id", + role_column: "team_role", + scope: Proto.table("teams") + ) + ] + ) + insert = Chgs.insert( {"public", "team_memberships"}, @@ -573,16 +591,7 @@ defmodule Electric.Replication.Postgres.MigrationConsumerTest do }, tags: [] }, - Chgs.ddlx( - assigns: [ - Proto.assign( - table: Proto.table("team_memberships"), - user_column: "user_id", - role_column: "team_role", - scope: Proto.table("teams") - ) - ] - ), + Chgs.rules(rules), insert ], commit_timestamp: ~U[2023-05-02 10:08:00.948788Z], @@ -592,7 +601,7 @@ defmodule Electric.Replication.Postgres.MigrationConsumerTest do } ] - GenStage.call(producer, {:emit, cxt.loader, raw_events, version}) + GenStage.call(producer, {:emit, loader, raw_events, version}) assert_receive {FakeConsumer, :events, filtered_events}, 1000 diff --git a/components/electric/test/electric/satellite/permissions/state_test.exs b/components/electric/test/electric/satellite/permissions/state_test.exs index 4ecf77c534..b01f39bcd3 100644 --- a/components/electric/test/electric/satellite/permissions/state_test.exs +++ b/components/electric/test/electric/satellite/permissions/state_test.exs @@ -7,10 +7,12 @@ defmodule Electric.Satellite.Permissions.StateTest do alias Electric.Replication.Changes alias Electric.Satellite.Permissions.State alias Electric.Satellite.SatPerms - alias ElectricTest.PermissionsHelpers.{Chgs, Proto} + alias ElectricTest.PermissionsHelpers.{Chgs, Proto, Perms} def apply_ddlx(rules \\ %SatPerms.Rules{}, cmds) do - State.apply_ddlx(rules, Command.ddlx(cmds)) + cmds + |> Command.ddlx() + |> State.apply_ddlx!(rules) end def new(cmds) do @@ -300,11 +302,11 @@ defmodule Electric.Satellite.Permissions.StateTest do rules = ddlx |> parse_ddlx() - |> Enum.reduce(%SatPerms.Rules{}, &State.apply_ddlx(&2, &1)) + |> Enum.reduce(%SatPerms.Rules{id: 1}, &State.apply_ddlx!/2) assert rules == %SatPerms.Rules{ - id: 5, - parent_id: 4, + id: 6, + parent_id: 5, assigns: [ Proto.assign( scope: Proto.scope("projects"), @@ -347,23 +349,33 @@ defmodule Electric.Satellite.Permissions.StateTest do rules = ddlx |> parse_ddlx() - |> Enum.reduce(rules, &State.apply_ddlx(&2, &1)) + |> Enum.reduce(rules, &State.apply_ddlx!/2) assert rules == %SatPerms.Rules{ - id: 9, - parent_id: 8, + id: 10, + parent_id: 9, assigns: [], grants: [] } end end + def update_global(mock_loader, ddlx) do + {:ok, rules} = SchemaLoader.global_permissions(mock_loader) + + {:ok, _, rules} = State.apply_ddlx(ddlx, rules) + + {:ok, loader} = MockSchemaLoader.save_global_permissions(mock_loader, rules) + + {loader, rules} + end + def loader_with_global_perms(cxt, ddlx \\ default_ddlx()) do loader = loader(cxt) ddlx = Command.ddlx(ddlx) - assert {:ok, _, loader, rules} = State.update_global(ddlx, loader) + {loader, rules} = update_global(loader, ddlx) {loader, rules} end @@ -467,12 +479,12 @@ defmodule Electric.Satellite.Permissions.StateTest do role_name: "admin" ) - ddlx = Command.ddlx(assigns: [assign1]) + {loader, rules} = Perms.rules(loader, assigns: [assign1]) tx = Chgs.tx([ Chgs.insert({"public", "kittens"}, %{"size" => "cute"}), - Chgs.ddlx(ddlx) + Chgs.rules(rules) ]) assert {:ok, tx, consumer, loader} = State.update(tx, consumer, loader) @@ -497,11 +509,11 @@ defmodule Electric.Satellite.Permissions.StateTest do role_name: "admin2" ) - ddlx = Command.ddlx(assigns: [assign2]) + {loader, rules} = Perms.rules(loader, assigns: [assign1, assign2]) tx = Chgs.tx([ - Chgs.ddlx(ddlx), + Chgs.rules(rules), Chgs.insert({"public", "kittens"}, %{"size" => "cute"}) ]) @@ -533,7 +545,7 @@ defmodule Electric.Satellite.Permissions.StateTest do role_name: "admin" ) - ddlx1 = Command.ddlx(assigns: [assign1]) + {loader, rules1} = Perms.rules(loader, assigns: [assign1]) assign2 = Proto.assign( @@ -543,7 +555,7 @@ defmodule Electric.Satellite.Permissions.StateTest do role_column: "role" ) - ddlx2 = Command.ddlx(assigns: [assign2]) + {loader, rules2} = Perms.rules(loader, assigns: [assign1, assign2]) assign3 = Proto.assign( @@ -553,15 +565,15 @@ defmodule Electric.Satellite.Permissions.StateTest do role_column: "role" ) - ddlx3 = Command.ddlx(assigns: [assign3]) + {loader, rules3} = Perms.rules(loader, assigns: [assign1, assign2, assign3]) tx = Chgs.tx([ - Chgs.ddlx(ddlx1), - Chgs.ddlx(ddlx2), + Chgs.rules(rules1), + Chgs.rules(rules2), Chgs.insert({"public", "kittens"}, %{"size" => "cute"}), Chgs.insert({"public", "kittens"}, %{"fur" => "furry"}), - Chgs.ddlx(ddlx3) + Chgs.rules(rules3) ]) assert {:ok, tx, _consumer, _loader} = State.update(tx, consumer, loader) @@ -570,7 +582,7 @@ defmodule Electric.Satellite.Permissions.StateTest do %Changes.UpdatedPermissions{ type: :global, permissions: %Changes.UpdatedPermissions.GlobalPermissions{ - permissions_id: 2 + permissions_id: 3 } }, Chgs.insert({"public", "kittens"}, %{"size" => "cute"}), @@ -578,7 +590,7 @@ defmodule Electric.Satellite.Permissions.StateTest do %Changes.UpdatedPermissions{ type: :global, permissions: %Changes.UpdatedPermissions.GlobalPermissions{ - permissions_id: 3 + permissions_id: 4 } } ] @@ -751,11 +763,11 @@ defmodule Electric.Satellite.Permissions.StateTest do role_column: "team_role" ) - ddlx = Command.ddlx(assigns: [assign]) + {loader, rules} = Perms.rules(loader, assigns: [assign]) tx = Chgs.tx([ - Chgs.ddlx(ddlx), + Chgs.rules(rules), Chgs.insert( {"public", "team_memberships"}, %{ @@ -1282,7 +1294,11 @@ defmodule Electric.Satellite.Permissions.StateTest do ] ) - tx = Chgs.tx([Chgs.ddlx(ddlx)]) + {:ok, 1, rules} = State.apply_ddlx(ddlx, rules) + + {loader, rules} = Perms.rules(loader, rules) + + tx = Chgs.tx([Chgs.rules(rules)]) assert {:ok, _tx, consumer, loader} = State.update(tx, consumer, loader) @@ -1461,6 +1477,16 @@ defmodule Electric.Satellite.Permissions.StateTest do ddlx = Command.ddlx(sqlite: [Proto.sqlite("create table local (id primary key)")]) + {loader, _rules} = + Perms.rules(loader, + unassign: + Proto.unassign( + table: Proto.table("site_admins"), + user_column: "user_id", + role_column: "site_role" + ) + ) + tx = Chgs.tx([ Chgs.insert({"public", "kittens"}, %{"size" => "cute"}), diff --git a/components/electric/test/support/injector_test/scenarios/adhoc.ex b/components/electric/test/support/injector_test/scenarios/adhoc.ex index 52ef815b9e..93f5639e95 100644 --- a/components/electric/test/support/injector_test/scenarios/adhoc.ex +++ b/components/electric/test/support/injector_test/scenarios/adhoc.ex @@ -20,14 +20,12 @@ defmodule Electric.Postgres.Proxy.TestScenario.AdHoc do tag = random_tag() injector - |> client(query("BEGIN")) - |> server(complete_ready("BEGIN")) + |> electric_begin(client: begin()) |> client(parse_describe(query)) |> server(parse_describe_complete()) |> client(bind_execute()) |> server(bind_execute_complete(tag)) - |> client(commit()) - |> server(complete_ready("COMMIT", :idle)) + |> electric_commit(client: commit()) |> idle!() end @@ -40,21 +38,18 @@ defmodule Electric.Postgres.Proxy.TestScenario.AdHoc do injector = injector - |> client(query("BEGIN")) - |> server(complete_ready("BEGIN")) + |> electric_begin(client: begin()) queries |> Enum.reduce(injector, &execute_tx_sql(&1, &2, :extended)) |> client(commit(), server: capture_version_query()) - |> server(capture_version_complete(), server: commit()) - |> server(complete_ready("COMMIT", :idle)) + |> electric_commit(server: capture_version_complete()) |> idle!() end def assert_injector_error(injector, query, error_details) do injector - |> client(query("BEGIN")) - |> server(complete_ready("BEGIN")) + |> electric_begin(client: begin()) |> client(parse_describe(query), client: [error(error_details), ready(:failed)]) |> client(rollback()) |> server(complete_ready("ROLLBACK", :idle)) @@ -67,17 +62,31 @@ defmodule Electric.Postgres.Proxy.TestScenario.AdHoc do # may not be used but needs to be valid sql ddl = Keyword.get(opts, :ddl, "CREATE TABLE _not_used_ (id uuid PRIMARY KEY)") - injector - |> client(query("BEGIN")) - |> server(complete_ready("BEGIN")) - |> client(parse_describe(query), client: parse_describe_complete(), server: []) - |> electric([client: bind_execute()], command, ddl, - client: bind_execute_complete(DDLX.Command.tag(command)) - ) - |> client(commit(), server: capture_version_query()) - |> server(capture_version_complete(), server: commit()) - |> server(complete_ready("COMMIT", :idle)) - |> idle!() + if modifies_permissions?(command) do + injector + |> electric_begin(client: begin()) + |> client(parse_describe(query), client: parse_describe_complete(), server: []) + |> electric([client: bind_execute()], command, ddl, + client: bind_execute_complete(DDLX.Command.tag(command)) + ) + |> client(commit(), fn injector -> + rules = permissions_modified!(injector) + [server: save_permissions_rules_query(rules)] + end) + |> server(complete_ready(), server: capture_version_query()) + |> electric_commit(server: capture_version_complete()) + |> idle!() + else + injector + |> electric_begin(client: begin()) + |> client(parse_describe(query), client: parse_describe_complete(), server: []) + |> electric([client: bind_execute()], command, ddl, + client: bind_execute_complete(DDLX.Command.tag(command)) + ) + |> client(commit(), server: capture_version_query()) + |> electric_commit(server: capture_version_complete()) + |> idle!() + end end def assert_electrify_server_error(injector, _framework, query, ddl, error_details) do @@ -90,8 +99,7 @@ defmodule Electric.Postgres.Proxy.TestScenario.AdHoc do |> Enum.map(&query/1) injector - |> client(query("BEGIN")) - |> server(complete_ready("BEGIN")) + |> electric_begin(client: begin()) |> client(parse_describe(query), client: parse_describe_complete()) |> electric_preamble([client: bind_execute()], command) |> server(introspect_result(ddl), server: electrify) diff --git a/components/electric/test/support/injector_test/scenarios/extended_no_tx.ex b/components/electric/test/support/injector_test/scenarios/extended_no_tx.ex index 138dacea84..43aac8be7b 100644 --- a/components/electric/test/support/injector_test/scenarios/extended_no_tx.ex +++ b/components/electric/test/support/injector_test/scenarios/extended_no_tx.ex @@ -19,8 +19,7 @@ defmodule Electric.Postgres.Proxy.TestScenario.ExtendedNoTx do tag = random_tag() injector - |> client(parse_describe(query), server: begin()) - |> server(complete_ready("BEGIN", :tx), server: parse_describe(query)) + |> electric_begin(client: parse_describe(query)) |> server(parse_describe_complete()) |> client(bind_execute()) |> server(bind_execute_complete(tag), server: commit()) @@ -58,17 +57,18 @@ defmodule Electric.Postgres.Proxy.TestScenario.ExtendedNoTx do tag = random_tag() injector - |> client(parse_describe(query), server: begin()) - |> server(complete_ready("BEGIN", :tx), server: parse_describe(query)) + |> electric_begin(client: parse_describe(query)) |> server(parse_describe_complete()) |> client(bind_execute()) - |> server(bind_execute_complete(tag), + |> server( + bind_execute_complete(tag), server: capture_ddl_query(query), client: capture_notice(query) ) |> shadow_add_column(capture_ddl_complete(), opts, server: capture_version_query()) - |> server(capture_version_complete(), server: commit()) - |> server(complete_ready("COMMIT", :idle), client: [bind_execute_complete(tag, :idle)]) + |> electric_commit([server: capture_version_complete()], + client: [bind_execute_complete(tag, :idle)] + ) |> idle!() end @@ -84,24 +84,27 @@ defmodule Electric.Postgres.Proxy.TestScenario.ExtendedNoTx do # may not be used but needs to be valid sql ddl = Keyword.get(opts, :ddl, "CREATE TABLE _not_used_ (id uuid PRIMARY KEY)") - injector - |> client(parse_describe(query), server: begin()) - |> server(complete_ready("BEGIN", :tx), client: parse_describe_complete(), server: []) - |> electric( - [client: bind_execute()], - command, - ddl, - # bind_execute_complete(DDLX.Command.tag(command)), - server: capture_version_query() - ) - |> server(capture_version_complete(), - server: commit() - # client: complete(DDLX.Command.tag(command)) - ) - |> server(complete_ready("COMMIT", :idle), - client: bind_execute_complete(DDLX.Command.tag(command), :idle) - ) - |> idle!() + if modifies_permissions?(command) do + injector + |> electric_begin([client: parse_describe(query)], client: parse_describe_complete()) + |> electric([client: bind_execute()], command, ddl, fn injector -> + rules = permissions_modified!(injector) + [server: save_permissions_rules_query(rules)] + end) + |> server(complete_ready(), server: capture_version_query()) + |> electric_commit([server: capture_version_complete()], + client: [bind_execute_complete(DDLX.Command.tag(command), :idle)] + ) + |> idle!() + else + injector + |> electric_begin([client: parse_describe(query)], client: parse_describe_complete()) + |> electric([client: bind_execute()], command, ddl, server: capture_version_query()) + |> electric_commit([server: capture_version_complete()], + client: [bind_execute_complete(DDLX.Command.tag(command), :idle)] + ) + |> idle!() + end end def assert_electrify_server_error(injector, _framework, query, ddl, error_details) do @@ -114,8 +117,7 @@ defmodule Electric.Postgres.Proxy.TestScenario.ExtendedNoTx do |> Enum.map(&query/1) injector - |> client(parse_describe(query), server: begin()) - |> server(complete_ready("BEGIN", :tx), client: parse_describe_complete(), server: []) + |> electric_begin([client: parse_describe(query)], client: parse_describe_complete()) |> electric_preamble([client: bind_execute()], command) |> server(introspect_result(ddl), server: electrify) |> server([error(error_details), ready(:failed)], server: rollback()) diff --git a/components/electric/test/support/injector_test/scenarios/framework.ex b/components/electric/test/support/injector_test/scenarios/framework.ex index 85f4914f70..83fc404690 100644 --- a/components/electric/test/support/injector_test/scenarios/framework.ex +++ b/components/electric/test/support/injector_test/scenarios/framework.ex @@ -19,15 +19,13 @@ defmodule Electric.Postgres.Proxy.TestScenario.Framework do tag = random_tag() injector - |> client(query("BEGIN")) - |> server(complete_ready("BEGIN")) + |> electric_begin(client: begin()) |> client(parse_describe(query)) |> server(parse_describe_complete()) |> client(bind_execute()) |> server(bind_execute_complete(tag)) |> framework.assign_migration_version("20230822143453") - |> client(commit()) - |> server(complete_ready("COMMIT", :idle)) + |> electric_commit(client: commit()) |> idle!() end @@ -41,21 +39,18 @@ defmodule Electric.Postgres.Proxy.TestScenario.Framework do injector = injector - |> client(query("BEGIN")) - |> server(complete_ready("BEGIN")) + |> electric_begin(client: begin()) queries |> Enum.reduce(injector, &execute_tx_sql(&1, &2, :extended)) |> framework.capture_migration_version(version) - |> client(commit()) - |> server(complete_ready("COMMIT", :idle)) + |> electric_commit(client: commit()) |> idle!() end def assert_injector_error(injector, query, error_details) do injector - |> client(query("BEGIN")) - |> server(complete_ready("BEGIN")) + |> electric_begin(client: begin()) |> client(parse_describe(query), client: [error(error_details), ready(:failed)]) |> client(rollback()) |> server(complete_ready("ROLLBACK", :idle)) @@ -69,15 +64,13 @@ defmodule Electric.Postgres.Proxy.TestScenario.Framework do ddl = Keyword.get(opts, :ddl, "CREATE TABLE _not_used_ (id uuid PRIMARY KEY)") injector - |> client(query("BEGIN")) - |> server(complete_ready("BEGIN")) + |> electric_begin(client: begin()) |> client(parse_describe(query), client: parse_describe_complete(), server: []) |> electric([client: bind_execute()], command, ddl, client: bind_execute_complete(DDLX.Command.tag(command)) ) |> framework.capture_migration_version(version) - |> client(commit()) - |> server(complete_ready("COMMIT", :idle)) + |> electric_commit(client: commit()) |> idle!() end @@ -91,8 +84,7 @@ defmodule Electric.Postgres.Proxy.TestScenario.Framework do |> Enum.map(&query/1) injector - |> client(query("BEGIN")) - |> server(complete_ready("BEGIN")) + |> electric_begin(client: begin()) |> client(parse_describe(query), client: parse_describe_complete()) |> electric_preamble([client: bind_execute()], command) |> server(introspect_result(ddl), server: electrify) diff --git a/components/electric/test/support/injector_test/scenarios/framework_simple.ex b/components/electric/test/support/injector_test/scenarios/framework_simple.ex index 7035d020ac..5ffd243602 100644 --- a/components/electric/test/support/injector_test/scenarios/framework_simple.ex +++ b/components/electric/test/support/injector_test/scenarios/framework_simple.ex @@ -23,13 +23,11 @@ defmodule Electric.Postgres.Proxy.TestScenario.FrameworkSimple do version = random_version() injector - |> client(query("BEGIN")) - |> server(complete_ready("BEGIN")) + |> electric_begin(client: begin()) |> client(query(query)) |> server(complete_ready(tag)) |> framework.assign_migration_version(version) - |> client(commit()) - |> server(complete_ready("COMMIT", :idle)) + |> electric_commit(client: commit()) |> idle!() end @@ -43,21 +41,18 @@ defmodule Electric.Postgres.Proxy.TestScenario.FrameworkSimple do injector = injector - |> client(query("BEGIN")) - |> server(complete_ready("BEGIN")) + |> electric_begin(client: begin()) queries |> Enum.reduce(injector, &execute_tx_sql(&1, &2, :simple)) |> framework.capture_migration_version(version) - |> client(commit()) - |> server(complete_ready("COMMIT", :idle)) + |> electric_commit(client: commit()) |> idle!() end def assert_injector_error(injector, query, error_details) do injector - |> client(query("BEGIN")) - |> server(complete_ready("BEGIN")) + |> electric_begin(client: begin()) |> client(query(query), client: [error(error_details), ready(:failed)]) |> client(rollback()) |> server(complete_ready("ROLLBACK", :idle)) @@ -72,14 +67,12 @@ defmodule Electric.Postgres.Proxy.TestScenario.FrameworkSimple do ddl = Keyword.get(opts, :ddl, "CREATE TABLE _not_used_ (id uuid PRIMARY KEY)") injector - |> client(query("BEGIN")) - |> server(complete_ready("BEGIN")) + |> electric_begin(client: begin()) |> electric([client: query(query)], command, ddl, client: complete_ready(DDLX.Command.tag(command)) ) |> framework.capture_migration_version(version) - |> client(commit()) - |> server(complete_ready("COMMIT", :idle)) + |> electric_commit(client: commit()) |> idle!() end @@ -93,8 +86,7 @@ defmodule Electric.Postgres.Proxy.TestScenario.FrameworkSimple do |> Enum.map(&query/1) injector - |> client(query("BEGIN")) - |> server(complete_ready("BEGIN")) + |> electric_begin(client: begin()) |> electric_preamble([client: query(query)], command) |> server(introspect_result(ddl), server: electrify) |> server([error(error_details), ready(:failed)]) diff --git a/components/electric/test/support/injector_test/scenarios/manual.ex b/components/electric/test/support/injector_test/scenarios/manual.ex index c19b096ab3..f2a0c0fe6e 100644 --- a/components/electric/test/support/injector_test/scenarios/manual.ex +++ b/components/electric/test/support/injector_test/scenarios/manual.ex @@ -17,10 +17,8 @@ defmodule Electric.Postgres.Proxy.TestScenario.Manual do def assert_non_electrified_migration(injector, _framework, query, tag \\ random_tag()) do injector - |> client(query(query), server: begin()) - |> server(complete_ready("BEGIN", :tx), server: query(query)) - |> server(complete_ready(tag, :tx), server: commit()) - |> server(complete_ready("COMMIT", :idle), client: [complete_ready(tag, :idle)]) + |> electric_begin(client: query(query)) + |> electric_commit([server: complete_ready(tag, :tx)], client: [complete_ready(tag, :idle)]) |> idle!() end @@ -50,10 +48,7 @@ defmodule Electric.Postgres.Proxy.TestScenario.Manual do tag = random_tag() injector - |> client(query(query), server: begin()) - |> server(complete_ready("BEGIN", :tx), - server: query(query) - ) + |> electric_begin(client: query(query)) |> server(complete_ready(tag, :tx), server: capture_ddl_query(query), client: [ @@ -61,8 +56,7 @@ defmodule Electric.Postgres.Proxy.TestScenario.Manual do ] ) |> shadow_add_column(capture_ddl_complete(), opts, server: capture_version_query(0)) - |> server(capture_version_complete(), server: commit()) - |> server(complete_ready("COMMIT", :idle), client: [complete_ready(tag, :idle)]) + |> electric_commit([server: capture_version_complete()], client: [complete_ready(tag, :idle)]) |> idle!() end @@ -74,26 +68,44 @@ defmodule Electric.Postgres.Proxy.TestScenario.Manual do def assert_valid_electric_command(injector, _framework, query, opts \\ []) do {:ok, command} = DDLX.parse(query) + rules = Keyword.get(opts, :rules, nil) # may not be used but needs to be valid sql ddl = Keyword.get(opts, :ddl, "CREATE TABLE _not_used_ (id uuid PRIMARY KEY)") - injector - |> client(query(query), server: begin()) - |> electric( - [server: complete_ready("BEGIN", :tx)], - command, - ddl, - server: capture_version_query() - ) - |> server(capture_version_complete(), - server: commit() - # client: complete(DDLX.Command.tag(command)) - ) - |> server(complete_ready("COMMIT", :idle), - client: complete_ready(DDLX.Command.tag(command), :idle) - ) - |> idle!() + if modifies_permissions?(command) do + injector + |> client(query(query), server: begin()) + |> server(complete_ready("BEGIN", :tx), server: permissions_rules_query()) + |> electric( + [server: rules_query_result(rules)], + command, + ddl, + fn injector -> + rules = permissions_modified!(injector) + [server: save_permissions_rules_query(rules)] + end + ) + |> server(complete_ready(), server: capture_version_query()) + |> electric_commit([server: complete_ready("INSERT 1")], + client: complete_ready(DDLX.Command.tag(command), :idle) + ) + |> idle!() + else + injector + |> client(query(query), server: begin()) + |> server(complete_ready("BEGIN", :tx), server: permissions_rules_query()) + |> electric( + [server: rules_query_result(rules)], + command, + ddl, + server: capture_version_query() + ) + |> electric_commit([server: capture_version_complete()], + client: complete_ready(DDLX.Command.tag(command), :idle) + ) + |> idle!() + end end def assert_electrify_server_error(injector, _framework, query, ddl, error_details) do @@ -107,7 +119,8 @@ defmodule Electric.Postgres.Proxy.TestScenario.Manual do injector |> client(query(query), server: begin()) - |> electric_preamble([server: complete_ready("BEGIN", :tx)], command) + |> server(complete_ready("BEGIN", :tx), server: permissions_rules_query()) + |> electric_preamble([server: rules_query_result()], command) |> server(introspect_result(ddl), server: electrify) |> server([error(error_details), ready(:failed)], server: rollback()) |> server(complete_ready("ROLLBACK", :idle), client: [error(error_details), ready(:failed)]) diff --git a/components/electric/test/support/injector_test/scenarios/manual_tx.ex b/components/electric/test/support/injector_test/scenarios/manual_tx.ex index 02d1a22227..5cbafdb8b4 100644 --- a/components/electric/test/support/injector_test/scenarios/manual_tx.ex +++ b/components/electric/test/support/injector_test/scenarios/manual_tx.ex @@ -20,12 +20,10 @@ defmodule Electric.Postgres.Proxy.TestScenario.ManualTx do tag = random_tag() injector - |> client(query("BEGIN")) - |> server(complete_ready("BEGIN")) + |> electric_begin(client: begin()) |> client(query(query)) |> server(complete_ready(tag, :tx)) - |> client(commit()) - |> server(complete_ready("COMMIT", :idle)) + |> electric_commit(client: commit()) |> idle!() end @@ -38,21 +36,18 @@ defmodule Electric.Postgres.Proxy.TestScenario.ManualTx do injector = injector - |> client(query("BEGIN")) - |> server(complete_ready("BEGIN")) + |> electric_begin(client: begin()) queries |> Enum.reduce(injector, &execute_tx_sql(&1, &2, :simple)) |> client(commit(), server: capture_version_query()) - |> server(capture_version_complete(), server: commit()) - |> server(complete_ready("COMMIT", :idle)) + |> electric_commit(server: capture_version_complete()) |> idle!() end def assert_injector_error(injector, query, error_details) do injector - |> client(query("BEGIN")) - |> server(complete_ready("BEGIN")) + |> electric_begin(client: begin()) |> client(query(query), client: [error(error_details), ready(:failed)]) |> client(rollback()) |> server(complete_ready("ROLLBACK", :idle)) @@ -65,16 +60,32 @@ defmodule Electric.Postgres.Proxy.TestScenario.ManualTx do # may not be used but needs to be valid sql ddl = Keyword.get(opts, :ddl, "CREATE TABLE _not_used_ (id uuid PRIMARY KEY)") - injector - |> client(query("BEGIN")) - |> server(complete_ready("BEGIN")) - |> electric([client: query(query)], command, ddl, - client: complete_ready(DDLX.Command.tag(command)) - ) - |> client(commit(), server: capture_version_query()) - |> server(capture_version_complete(), server: commit()) - |> server(complete_ready("COMMIT", :idle)) - |> idle!() + if modifies_permissions?(command) do + injector + |> electric_begin(client: begin()) + |> electric([client: query(query)], command, ddl, + client: complete_ready(DDLX.Command.tag(command)) + ) + |> client( + commit(), + fn injector -> + rules = permissions_modified!(injector) + [server: save_permissions_rules_query(rules)] + end + ) + |> server(complete_ready(), server: capture_version_query()) + |> electric_commit(server: capture_version_complete()) + |> idle!() + else + injector + |> electric_begin(client: begin()) + |> electric([client: query(query)], command, ddl, + client: complete_ready(DDLX.Command.tag(command)) + ) + |> client(commit(), server: capture_version_query()) + |> electric_commit(server: capture_version_complete()) + |> idle!() + end end def assert_electrify_server_error(injector, _framework, query, ddl, error_details) do @@ -87,8 +98,7 @@ defmodule Electric.Postgres.Proxy.TestScenario.ManualTx do |> Enum.map(&query/1) injector - |> client(query("BEGIN")) - |> server(complete_ready("BEGIN")) + |> electric_begin(client: begin()) |> electric_preamble([client: query(query)], command) |> server(introspect_result(ddl), server: electrify) |> server([error(error_details), ready(:failed)]) diff --git a/components/electric/test/support/injector_test/test_scenario.ex b/components/electric/test/support/injector_test/test_scenario.ex index 5dfa3dd16b..770dbbeaaf 100644 --- a/components/electric/test/support/injector_test/test_scenario.ex +++ b/components/electric/test/support/injector_test/test_scenario.ex @@ -2,6 +2,7 @@ defmodule Electric.Postgres.Proxy.TestScenario do alias PgProtocol.Message, as: M alias Electric.Postgres.Proxy.Injector alias Electric.DDLX + alias Electric.Satellite.SatPerms import ExUnit.Assertions @@ -18,10 +19,22 @@ defmodule Electric.Postgres.Proxy.TestScenario do Injector.introspect_tables_query(tables, "'") end + def lock_rules_table_query do + Injector.lock_rules_table_query() + end + def electrified_tables_query do Injector.electrified_tables_query() end + def permissions_rules_query do + Injector.permissions_rules_query() + end + + def save_permissions_rules_query(rules) do + Injector.save_permissions_rules_query(rules) + end + def capture_ddl_query(query) do Injector.capture_ddl_query(query, "$query$") end @@ -69,6 +82,7 @@ defmodule Electric.Postgres.Proxy.TestScenario do alias Electric.DDLX alias unquote(m).MockInjector alias Electric.Postgres.MockSchemaLoader + alias Electric.Postgres.Proxy.Injector unquote(message_aliases) @@ -300,10 +314,22 @@ defmodule Electric.Postgres.Proxy.TestScenario do query(MockInjector.capture_version_query(to_string(version), priority)) end + def lock_rules_table_query do + query(MockInjector.lock_rules_table_query()) + end + def introspect_tables_query(tables) do query(MockInjector.introspect_tables_query(tables)) end + def permissions_rules_query do + query(MockInjector.permissions_rules_query()) + end + + def save_permissions_rules_query(rules) do + query(MockInjector.save_permissions_rules_query(rules)) + end + def electrified_tables_query do query(MockInjector.electrified_tables_query()) end @@ -324,6 +350,44 @@ defmodule Electric.Postgres.Proxy.TestScenario do electric_call_complete(status) end + def modifies_permissions?([_ | _] = cmds) do + Enum.any?(cmds, &modifies_permissions?/1) + end + + def modifies_permissions?(%Electric.DDLX.Command{action: cmd}) do + modifies_permissions?(cmd) + end + + def modifies_permissions?(%SatPerms.DDLX{} = ddlx) do + Enum.any?( + ddlx + |> DDLX.Command.command_list() + |> Enum.to_list(), + &modifies_permissions?/1 + ) + end + + def modifies_permissions?(%DDLX.Command.Enable{}) do + false + end + + def modifies_permissions?(%DDLX.Command.Disable{}) do + false + end + + def modifies_permissions?(%DDLX.Command.Error{}) do + false + end + + def modifies_permissions?(%SatPerms.Sqlite{}) do + false + end + + def modifies_permissions?(%m{}) + when m in [SatPerms.Grant, SatPerms.Revoke, SatPerms.Assign, SatPerms.Unassign] do + true + end + # splitting this out as a function in order to simplify the process of # updating the expected messages when calling an electric procedure def electric_call_complete(status \\ :tx) do @@ -333,6 +397,24 @@ defmodule Electric.Postgres.Proxy.TestScenario do ] end + def state({_stack, state}) do + state + end + + def tx({_stack, %{tx: nil}}) do + raise "No active transaction" + end + + def tx({_stack, %{tx: tx}}) do + tx + end + + def permissions_modified!({_stack, state}) do + if rules = Injector.State.permissions_modified(state), + do: rules, + else: raise("permissions are not modified") + end + @doc """ Asserts that the injector is in the idle state, so outside a transaction with no active capture mode. @@ -359,19 +441,106 @@ defmodule Electric.Postgres.Proxy.TestScenario do tables = Electric.DDLX.Command.table_names(command) introspect_query = introspect_tables_query(tables) - initial_messages - |> case do - [client: msgs] -> - injector - |> client(msgs, server: electrified_tables_query()) - - [server: msgs] -> - injector - |> server(msgs, server: electrified_tables_query()) - end + injector + |> command(initial_messages, server: lock_rules_table_query()) + |> server(complete_ready("LOCK TABLE"), server: electrified_tables_query()) |> server(electrified_tables_result(electrified_tables), server: introspect_query) end + def electric_begin(injector) do + electric_begin(injector, client: begin()) + end + + def electric_begin(injector, initial_messages, opts \\ []) do + rules = + case Keyword.fetch(opts, :rules) do + {:ok, rules} -> + rules + + :error -> + nil + end + + {injector, final_messages} = + case initial_messages do + [client: "BEGIN"] -> + {client(injector, begin(), server: begin()), client: complete_ready("BEGIN", :tx)} + + [client: %M.Query{query: "BEGIN"} = msg] -> + {client(injector, msg, server: begin()), client: complete_ready("BEGIN", :tx)} + + [client: [%M.Query{query: "BEGIN"}] = msgs] -> + {client(injector, msgs, server: begin()), client: complete_ready("BEGIN", :tx)} + + [client: msgs] -> + final = + case Keyword.fetch(opts, :client) do + {:ok, msgs} -> + [client: msgs] + + :error -> + [server: Keyword.get(opts, :server, msgs)] + end + + {client(injector, msgs, server: begin()), final} + end + + injector + |> server(complete_ready("BEGIN", :tx), server: permissions_rules_query()) + |> server(rules_query_result(rules), final_messages) + end + + @doc """ + If the transaction has unwritten permissions updates, then they are written here. + """ + def electric_commit({_stack, state} = injector, initial_messages, final_messages \\ nil) do + version? = Injector.State.capture_version?(state) + + [state_msg | state_messages] = + if rules = Injector.State.permissions_modified(state) do + [ + save_permissions_rules_query(rules), + if(version?, do: capture_version_query(), else: []), + commit() + ] + else + [ + if(version?, do: capture_version_query(), else: []), + commit() + ] + end + |> List.flatten() + + injector = command(injector, initial_messages, server: state_msg) + + commit_complete = complete_ready("COMMIT", :idle) + final_messages = final_messages || [client: commit_complete] + + state_messages + |> Enum.reduce(injector, fn msg, injector -> + server(injector, complete_ready("INSERT 1", :tx), server: msg) + end) + |> server(commit_complete, final_messages) + end + + def rules_query_result() do + rules_query_result(nil) + end + + def rules_query_result(nil) do + rules_query_result(%SatPerms.Rules{id: 1}) + end + + def rules_query_result(%SatPerms.Rules{} = rules) do + rules_data = rules |> Protox.encode!() |> IO.iodata_to_binary() + + [ + %M.RowDescription{}, + %M.DataRow{fields: [rules_data]} + | complete_ready("SELECT 1", :tx) + ] + end + @doc """ Given an injector mid-tx generates migration version query flows for the given framework modules and asserts that the version is captured correctly @@ -414,14 +583,20 @@ defmodule Electric.Postgres.Proxy.TestScenario do def electric(injector, initial_messages, command, ddl, final_messages) do capture_ddl = List.wrap(ddl) + # the initial client message which is a [bind, execute] or [query] message + # triggers a re-write to the real procedure call + + injector = electric_preamble(injector, initial_messages, command) + case proxy_sql(command, capture_ddl) |> Enum.map(&query/1) do + [] -> + # if the electric command doesn't result in any immediate queries, then + # we're done, pending the final message from the preamble introspection + # queries + server(injector, introspect_result(ddl), final_messages) + [query | queries] -> - # the initial client message which is a [bind, execute] or [query] message - # triggers a re-write to the real procedure call - injector = - injector - |> electric_preamble(initial_messages, command) - |> server(introspect_result(ddl), server: query) + injector = server(injector, introspect_result(ddl), server: query) # this real proc call returns a readyforquery etc response which triggers # the next procedure call required for the electric command @@ -455,22 +630,8 @@ defmodule Electric.Postgres.Proxy.TestScenario do server([%M.ReadyForQuery{status: :tx}], server: [%M.Query{query: "COMMIT"}]) """ - def server(injector, server_messages, receipients) do - expected_proxy_server = - Keyword.get(receipients, :server, []) |> to_struct() - - expected_proxy_client = - Keyword.get(receipients, :client, []) |> to_struct() - - {:ok, injector, proxy_server, proxy_client} = - Injector.recv_server(injector, to_struct(server_messages)) - - assert_messages_equal( - proxy_server: {proxy_server, expected_proxy_server}, - proxy_client: {proxy_client, expected_proxy_client} - ) - - injector + def server(injector, server_messages, recipients) do + command(injector, [server: server_messages], recipients) end @doc """ @@ -487,12 +648,31 @@ defmodule Electric.Postgres.Proxy.TestScenario do `:client` or the `:server`. """ - def client(injector, client_messages, receipients) do - expected_proxy_server = Keyword.get(receipients, :server, []) |> to_struct() - expected_proxy_client = Keyword.get(receipients, :client, []) |> to_struct() + def client(injector, client_messages, recipients) do + command(injector, [client: client_messages], recipients) + end + def command(injector, msgs, recipients) do {:ok, injector, proxy_server, proxy_client} = - Injector.recv_client(injector, to_struct(client_messages)) + case msgs do + [client: client_msgs] -> + Injector.recv_client(injector, to_struct(client_msgs)) + + [server: server_msgs] -> + Injector.recv_server(injector, to_struct(server_msgs)) + end + + final = + case recipients do + fun when is_function(fun) -> + fun.(injector) + + list when is_list(list) -> + list + end + + expected_proxy_server = Keyword.get(final, :server, []) |> to_struct() + expected_proxy_client = Keyword.get(final, :client, []) |> to_struct() assert_messages_equal( proxy_server: {proxy_server, expected_proxy_server}, @@ -715,7 +895,5 @@ defmodule Electric.Postgres.Proxy.TestScenario do injector |> server(capture_ddl_complete(), server: capture_version_query(version)) |> server(capture_version_complete(), final_msg) - - # injector = server(injector, capture_ddl_complete(), final_msg) end end diff --git a/components/electric/test/support/mock_schema_loader.ex b/components/electric/test/support/mock_schema_loader.ex index cb932e8d2e..3b0596b736 100644 --- a/components/electric/test/support/mock_schema_loader.ex +++ b/components/electric/test/support/mock_schema_loader.ex @@ -501,12 +501,11 @@ defmodule Electric.Postgres.MockSchemaLoader do end end - @impl SchemaLoader - def save_global_permissions({:agent, pid}, rules) do + def save_global_permissions({__MODULE__, {:agent, pid}}, rules) do Agent.get_and_update(pid, fn state -> - case save_global_permissions(state, rules) do - {:ok, state} -> - {{:ok, {:agent, pid}}, state} + case save_global_permissions({__MODULE__, state}, rules) do + {:ok, {__MODULE__, state}} -> + {{:ok, {__MODULE__, {:agent, pid}}}, state} error -> {error, state} @@ -515,7 +514,7 @@ defmodule Electric.Postgres.MockSchemaLoader do end def save_global_permissions( - %{global_perms: global_perms, opts: opts} = state, + {__MODULE__, %{global_perms: global_perms, opts: opts} = state}, %SatPerms.Rules{} = rules ) do notify(opts, {:save_global_permissions, rules}) @@ -530,7 +529,8 @@ defmodule Electric.Postgres.MockSchemaLoader do end) {:ok, - %{state | user_perms: user_perms ++ state.user_perms, global_perms: [rules | global_perms]}} + {__MODULE__, + %{state | user_perms: user_perms ++ state.user_perms, global_perms: [rules | global_perms]}}} end @impl SchemaLoader diff --git a/components/electric/test/support/permissions_helpers.ex b/components/electric/test/support/permissions_helpers.ex index 0a53af0ce9..1b929b884d 100644 --- a/components/electric/test/support/permissions_helpers.ex +++ b/components/electric/test/support/permissions_helpers.ex @@ -151,6 +151,7 @@ defmodule ElectricTest.PermissionsHelpers do defmodule Perms do alias Electric.Satellite.SatPerms, as: P alias Electric.Satellite.Permissions + alias Electric.Postgres.MockSchemaLoader defmodule Transient do @name __MODULE__.Transient @@ -211,6 +212,31 @@ defmodule ElectricTest.PermissionsHelpers do perms end + # Generate some rules and push them into our mock loader we need to do this + # because the schema loader is no longer responsible for saving the global + # perms, which happens at the point of mutation (e.g. in the proxy). Without + # knowledge of the current global perms, the `user_permissions/[23]` will + # return user perms pointing to the wrong global permissions, which + # wouldn't happen with a real (from pg) loader + def rules(mock_loader, %P.Rules{} = rules) do + {:ok, mock_loader} = MockSchemaLoader.save_global_permissions(mock_loader, rules) + + {mock_loader, rules} + end + + def rules(mock_loader, attrs) do + {:ok, old_rules} = SchemaLoader.global_permissions(mock_loader) + + rules = %P.Rules{ + id: old_rules.id, + parent_id: old_rules.parent_id, + grants: Keyword.get(attrs, :grants, []), + assigns: Keyword.get(attrs, :assigns, []) + } + + rules(mock_loader, Permissions.State.commit(rules)) + end + def to_rules(ddlx) do ddlx |> make_ddlx() @@ -261,6 +287,7 @@ defmodule ElectricTest.PermissionsHelpers do end defmodule Chgs do + alias Electric.Satellite.SatPerms, as: P alias Electric.DDLX.Command alias Electric.Replication.Changes alias Electric.Postgres.Extension @@ -306,6 +333,20 @@ defmodule ElectricTest.PermissionsHelpers do } end + def rules(%P.Rules{} = rules) do + bytes = Protox.encode!(rules) |> IO.iodata_to_binary() + + # this table is append-only + %Changes.NewRecord{ + relation: Extension.global_perms_relation(), + record: %{ + "id" => rules.id, + "parent_id" => rules.parent_id, + "rules" => bytes + } + } + end + def migration(attrs \\ []) do attrs = attrs diff --git a/components/electric/test/support/postgres_test_connection.ex b/components/electric/test/support/postgres_test_connection.ex index 312737c471..541681fea2 100644 --- a/components/electric/test/support/postgres_test_connection.ex +++ b/components/electric/test/support/postgres_test_connection.ex @@ -156,15 +156,21 @@ defmodule Electric.Postgres.TestConnection do def define_permissions(conn, origin, scenario) do ddlx = scenario_ddlx(scenario) - sql = + apply_ddlx(conn, origin, ddlx) + end + + defp apply_ddlx(conn, origin, ddlx) do + alias Electric.Satellite.Permissions + alias Electric.Postgres.Extension + + {:ok, initial_rules} = Extension.Permissions.global(conn) + + rules = ddlx |> Enum.map(&Electric.DDLX.parse!/1) - |> Electric.DDLX.Command.PgSQL.to_sql([], &Electric.Postgres.Proxy.Injector.quote_query/1) - |> Enum.join("\n") + |> Enum.reduce(initial_rules, &Permissions.State.apply_ddlx!/2) - conn - |> :epgsql.squery(["BEGIN;\n", sql, "\nCOMMIT;"]) - |> Enum.each(&(:ok = elem(&1, 0))) + :ok = Extension.Permissions.save_global(conn, %{rules | parent_id: initial_rules.id}) :ok = wait_for_message(origin, Electric.Replication.Changes.UpdatedPermissions) end @@ -384,7 +390,7 @@ defmodule Electric.Postgres.TestConnection do def setup_with_sql_execute(_), do: :ok def setup_with_ddlx(%{conn: conn, ddlx: ddlx, origin: origin}) do - sql = + stmts = ddlx |> List.wrap() |> Enum.map(&String.trim/1) @@ -392,17 +398,8 @@ defmodule Electric.Postgres.TestConnection do "ELECTRIC " <> _ = ddlx -> ddlx ddl -> "ELECTRIC " <> ddl end) - |> Enum.map(&Electric.DDLX.parse!/1) - |> Electric.DDLX.Command.proxy_sql() - |> Enum.join("\n") - - conn - |> :epgsql.squery(["BEGIN;\n", sql, "\nCOMMIT;"]) - |> Enum.each(&(:ok = elem(&1, 0))) - :ok = wait_for_message(origin, Electric.Replication.Changes.UpdatedPermissions) - - :ok + apply_ddlx(conn, origin, stmts) end def setup_with_ddlx(_) do diff --git a/e2e/tests/01.05_electric_can_recreate_publication.lux b/e2e/tests/01.05_electric_can_recreate_publication.lux index 3cbba828ec..f98db511b5 100644 --- a/e2e/tests/01.05_electric_can_recreate_publication.lux +++ b/e2e/tests/01.05_electric_can_recreate_publication.lux @@ -26,12 +26,13 @@ ??electric | ddlx_commands ??electric | electrified ??public | foo + ??electric | global_perms_state ??public | items ??electric | shadow__public__baz ??electric | shadow__public__foo ??electric | shadow__public__items ??electric | transaction_marker - ??(11 rows) + ??(12 rows) # Make sure Electric consumes all migrations from the replication stream before stopping it. [shell electric] @@ -64,12 +65,13 @@ ??electric | ddlx_commands ??electric | electrified ??public | foo + ??electric | global_perms_state ??public | items ??electric | shadow__public__baz ??electric | shadow__public__foo ??electric | shadow__public__items ??electric | transaction_marker - ??(11 rows) + ??(12 rows) [cleanup] [invoke teardown] diff --git a/e2e/tests/02.07_where_clauses_on_subscriptions.lux b/e2e/tests/02.07_where_clauses_on_subscriptions.lux index 194c6d5bcd..a7efc24613 100644 --- a/e2e/tests/02.07_where_clauses_on_subscriptions.lux +++ b/e2e/tests/02.07_where_clauses_on_subscriptions.lux @@ -26,11 +26,7 @@ [shell electric] # We expect to send the transaction to Satellite - ?client_id=client_1_1 .+ user_id=1 \[debug\] trans:(.*)%Electric.Replication.Changes.NewRecord\{(.*)record: %\{ - ?+"content" => "sentinel value" - ?+"content_b" => nil - ?"id" => "00000000-0000-0000-0000-000000000000" - ?\} + ?client_id=client_1_1 .+ user_id=1 \[debug\] Responding with: %.+SatOpLog{ops: \[%.+SatTransOp{op: \{:insert, %.+SatOpInsert\{.*row_data: %.+SatOpRow\{.*values: \["00000000-0000-0000-0000-000000000000", "sentinel value" [shell user_1_ws1] # And receive it there diff --git a/e2e/tests/compose.yaml b/e2e/tests/compose.yaml index 4241049508..63b24c9ea1 100644 --- a/e2e/tests/compose.yaml +++ b/e2e/tests/compose.yaml @@ -19,6 +19,7 @@ services: ELECTRIC_WRITE_TO_PG_MODE: "${ELECTRIC_WRITE_TO_PG_MODE:-logical_replication}" LOGICAL_PUBLISHER_HOST: electric_1 PG_PROXY_LOG_LEVEL: info + # PROXY_TRACING_ENABLE: true # prioritise per-test settings ELECTRIC_FEATURES: "${ELECTRIC_FEATURES:-}:proxy_grant_write_permissions=true:permissions=true" ports: