Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Manage mutation of permissions directly in the proxy #1393

Open
wants to merge 4 commits into
base: garry/vax-1425-create-permissions-protobuf-definitions-and-write-validation
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 7 additions & 17 deletions components/electric/lib/electric/ddlx/command.ex
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,10 @@ defmodule Electric.DDLX.Command do
}
end

def command_list(%__MODULE__{action: %SatPerms.DDLX{} = ddlx}) do
command_list(ddlx)
end

def command_list(%SatPerms.DDLX{} = ddlx) do
Stream.concat([ddlx.grants, ddlx.revokes, ddlx.assigns, ddlx.unassigns, ddlx.sqlite])
end
Expand Down Expand Up @@ -243,15 +247,9 @@ end

defimpl Command.PgSQL, for: SatPerms.DDLX do
alias Command
alias Electric.Postgres.Extension

def to_sql(%SatPerms.DDLX{} = ddlx, ddl_capture, quote_fun) do
Enum.concat([
serialise_ddlx(ddlx),
ddlx
|> Command.command_list()
|> Enum.flat_map(&Command.PgSQL.to_sql(&1, ddl_capture, quote_fun))
])

def to_sql(%SatPerms.DDLX{}, _ddl_capture, _quote_fun) do
[]
end

def validate_schema(%SatPerms.DDLX{} = ddlx, schema, electrified) do
Expand All @@ -260,14 +258,6 @@ defimpl Command.PgSQL, for: SatPerms.DDLX do
|> Enum.to_list()
|> Command.PgSQL.validate_schema(schema, electrified)
end

defp serialise_ddlx(ddlx) do
encoded = Protox.encode!(ddlx) |> IO.iodata_to_binary() |> Base.encode16()

[
"INSERT INTO #{Extension.ddlx_table()} (ddlx) VALUES ('\\x#{encoded}'::bytea);"
]
end
end

defimpl Command.PgSQL, for: SatPerms.Grant do
Expand Down
56 changes: 41 additions & 15 deletions components/electric/lib/electric/postgres/extension.ex
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,9 @@ defmodule Electric.Postgres.Extension do
def ddl_relation, do: {@schema, @ddl_relation}
def version_relation, do: {@schema, @version_relation}
def electrified_tracking_relation, do: {@schema, @electrified_tracking_relation}
def transaction_marker_relation, do: {@schema, @transaction_marker_relation}
def ddlx_relation, do: {@schema, @ddlx_commands_relation}
def global_perms_relation, do: {@schema, @global_perms_relation}
def acked_client_lsn_relation, do: {@schema, @acked_client_lsn_relation}

def publication_name, do: @publication_name
Expand Down Expand Up @@ -305,17 +307,6 @@ defmodule Electric.Postgres.Extension do
end
end

# These are tables in the "electric" schema, each of which was added to the publication in
# one of the extension migrations. They can be found by searching the codebase for
# "add_table_to_publication_sql".
@published_extension_tables [
{@schema, @ddl_relation},
{@schema, @electrified_tracking_relation},
{@schema, @transaction_marker_relation},
{@schema, @acked_client_lsn_relation},
{@schema, @ddlx_commands_relation}
]

@doc """
The list of fully-qualified table identifiers that should be included in "#{@publication_name}".

Expand All @@ -326,11 +317,19 @@ defmodule Electric.Postgres.Extension do
def published_tables(conn) do
with {:ok, tables} <- electrified_tables(conn) do
tables_with_shadows = Enum.flat_map(tables, &[&1, shadow_of(&1)])
published_tables = Enum.concat(tables_with_shadows, @published_extension_tables)
published_tables = Enum.concat(tables_with_shadows, published_extension_tables())
{:ok, published_tables}
end
end

defp published_extension_tables(module \\ __MODULE__) do
module
|> migration_versions()
|> Enum.flat_map(fn {_version, migration_module} ->
migration_published_tables(migration_module)
end)
end

def create_table_ddl(conn, %Proto.RangeVar{} = table_name) do
name = to_string(table_name)

Expand Down Expand Up @@ -389,11 +388,10 @@ defmodule Electric.Postgres.Extension do
Migrations.Migration_20231206130400_ConvertReplicaTriggersToAlways,
Migrations.Migration_20240110110200_DropUnusedFunctions,
Migrations.Migration_20240205141200_ReinstallTriggerFunctionWriteCorrectMaxTag,
Migrations.Migration_20240212161153_DDLXCommands,
Migrations.Migration_20240213160300_DropGenerateElectrifiedSqlFunction,
Migrations.Migration_20240214131615_PermissionsState,
Migrations.Migration_20240417131000_ClientReconnectionInfoTables,
Migrations.Migration_20240501000000_UnsubPoints
Migrations.Migration_20240501000000_UnsubPoints,
Migrations.Migration_20240618152555_DDLXPermissions
]
end

Expand Down Expand Up @@ -444,7 +442,16 @@ defmodule Electric.Postgres.Extension do
Logger.info("Running extension migration: #{version}")

for sql <- module.up(@schema) do
# guard against adding publications via raw sql to protect consistency of
# `published_tables/1`
if sql =~ ~r/ALTER +PUBLICATION +.?#{@publication_name}.? +ADD +TABLE/,
do:
raise(
"Invalid migration: add relation to `#{module}.published_tables/0` to publish a table"
)

results = :epgsql.squery(txconn, sql) |> List.wrap()

errors = Enum.filter(results, &(elem(&1, 0) == :error))

if errors == [] do
Expand All @@ -456,6 +463,17 @@ defmodule Electric.Postgres.Extension do
end
end

module
|> migration_published_tables()
|> Enum.each(fn
{_schema, _name} = table ->
sql = table |> Electric.Utils.inspect_relation() |> add_table_to_publication_sql()
{:ok, [], []} = :epgsql.squery(txconn, sql)

name when is_binary(name) ->
raise "migration published_tables/0 should return a list of relations in `{schema, name}` form"
end)

{:ok, 1} =
:epgsql.squery(
txconn,
Expand All @@ -465,6 +483,14 @@ defmodule Electric.Postgres.Extension do
:ok
end

defp migration_published_tables(module) do
if function_exported?(module, :published_tables, 0) do
module.published_tables()
else
[]
end
end

# https://dba.stackexchange.com/a/311714
@is_transaction_sql "SELECT transaction_timestamp() != statement_timestamp() AS is_transaction"

Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
defmodule Electric.Postgres.Extension.Migration do
@callback version() :: pos_integer()
@callback up(binary()) :: [binary(), ...]
@callback down(binary()) :: [binary()]
@callback published_tables() :: [Electric.Postgres.relation()]
@callback replicated_table_ddls() :: [String.t()]
@optional_callbacks replicated_table_ddls: 0
@optional_callbacks replicated_table_ddls: 0, published_tables: 0

@enforce_keys [:version, :schema, :stmts, :txid, :txts, :timestamp]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,14 @@ defmodule Electric.Postgres.Extension.Migrations.Migration_20230328113927 do
##################
"""
CREATE PUBLICATION "#{publication_name}";
""",
Extension.add_table_to_publication_sql(ddl_table)
"""
]
end

@impl true
def down(_schema) do
[]
def published_tables do
[
Extension.ddl_relation()
]
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,4 @@ defmodule Electric.Postgres.Extension.Migrations.Migration_20230424154425_DDLX d
"""
]
end

@impl true
def down(_schema) do
[]
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,4 @@ defmodule Electric.Postgres.Extension.Migrations.Migration_20230512000000_confli
]
|> Enum.map(&String.replace(&1, "electric", schema))
end

@impl true
def down(_schema) do
[]
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,14 @@ defmodule Electric.Postgres.Extension.Migrations.Migration_20230605141256_Electr
)
""",
"CREATE INDEX electrified_tracking_table_name_idx ON #{electrified_tracking_table} (schema_name, table_name)",
"CREATE INDEX electrified_tracking_table_name_oid ON #{electrified_tracking_table} (oid)",
Extension.add_table_to_publication_sql(electrified_tracking_table)
"CREATE INDEX electrified_tracking_table_name_oid ON #{electrified_tracking_table} (oid)"
]
end

@impl true
def down(_schema) do
[]
def published_tables do
[
Extension.electrified_tracking_relation()
]
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,14 @@ defmodule Electric.Postgres.Extension.Migrations.Migration_20230715000000_Utilit
""",
"""
INSERT INTO #{table} (id, content) VALUES ('magic write', '{}')
""",
Extension.add_table_to_publication_sql(table)
"""
]
end

@impl true
def down(_), do: []
def published_tables do
[
Extension.transaction_marker_relation()
]
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -165,11 +165,6 @@ defmodule Electric.Postgres.Extension.Migrations.Migration_20230814170123_Rename
]
end

@impl true
def down(_schema) do
[]
end

EEx.function_from_file(:defp, :ddlgen_sql, sql_file, [
:schema
])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,16 @@ defmodule Electric.Postgres.Extension.Migrations.Migration_20230829000000_Acknow
WHEN (pg_trigger_depth() < 1)
EXECUTE FUNCTION #{schema}.upsert_acknowledged_client_lsn()
""",
"ALTER TABLE #{table} ENABLE REPLICA TRIGGER upsert_acknowledged_client_lsn",
Extension.add_table_to_publication_sql(table)
"ALTER TABLE #{table} ENABLE REPLICA TRIGGER upsert_acknowledged_client_lsn"
]
end

@impl true
def down(_), do: []
def published_tables do
[
Extension.acked_client_lsn_relation()
]
end

@impl true
def replicated_table_ddls do
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,4 @@ defmodule Electric.Postgres.Extension.Migrations.Migration_20230918115714_DDLCom
"""
]
end

@impl true
def down(_), do: []
end
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,4 @@ defmodule Electric.Postgres.Extension.Migrations.Migration_20230921161045_DropEv
"""
]
end

@impl true
def down(_), do: []
end
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,4 @@ defmodule Electric.Postgres.Extension.Migrations.Migration_20230921161418_ProxyC
"""
]
end

@impl true
def down(_), do: []
end
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,4 @@ defmodule Electric.Postgres.Extension.Migrations.Migration_20231009121515_AllowL
"""
]
end

@impl true
def down(_), do: []
end
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,4 @@ defmodule Electric.Postgres.Extension.Migrations.Migration_20231010123118_AddPri
"""
]
end

@impl true
def down(_schema) do
[]
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,4 @@ defmodule Electric.Postgres.Extension.Migrations.Migration_20231016141000_Conver
def up(schema) do
["DROP ROUTINE IF EXISTS #{schema}.__validate_table_column_types(text)"]
end

@impl true
def down(_schema) do
[]
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,4 @@ defmodule Electric.Postgres.Extension.Migrations.Migration_20231206130400_Conver
String.replace(@migration_sql, "electric", schema)
]
end

@impl true
def down(_schema) do
[]
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,4 @@ defmodule Electric.Postgres.Extension.Migrations.Migration_20240110110200_DropUn
"DROP ROUTINE IF EXISTS #{schema}.__validate_table_column_types(text)"
]
end

@impl true
def down(_schema) do
[]
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,4 @@ defmodule Electric.Postgres.Extension.Migrations.Migration_20240205141200_Reinst
"CALL #{schema}.reinstall_trigger_function('install_function__write_correct_max_tag')"
]
end

@impl true
def down(_schema) do
[]
end
end

This file was deleted.

Loading
Loading