diff --git a/.changeset/chilled-pillows-live.md b/.changeset/chilled-pillows-live.md new file mode 100644 index 0000000000..9085ef25eb --- /dev/null +++ b/.changeset/chilled-pillows-live.md @@ -0,0 +1,6 @@ +--- +"@core/electric": patch +"electric-sql": patch +--- + +[VAX-1324] Prevent updates to table PKs diff --git a/clients/typescript/src/_generated/protocol/satellite.ts b/clients/typescript/src/_generated/protocol/satellite.ts index 13156e7818..a3938e38bb 100644 --- a/clients/typescript/src/_generated/protocol/satellite.ts +++ b/clients/typescript/src/_generated/protocol/satellite.ts @@ -86,6 +86,12 @@ export interface SatAuthResp { export interface SatErrorResp { $type: "Electric.Satellite.SatErrorResp"; errorType: SatErrorResp_ErrorCode; + /** lsn of the txn that caused the problem, if available */ + lsn?: + | Uint8Array + | undefined; + /** human readable explanation of what went wrong */ + message?: string | undefined; } export enum SatErrorResp_ErrorCode { @@ -931,7 +937,7 @@ export const SatAuthResp = { messageTypeRegistry.set(SatAuthResp.$type, SatAuthResp); function createBaseSatErrorResp(): SatErrorResp { - return { $type: "Electric.Satellite.SatErrorResp", errorType: 0 }; + return { $type: "Electric.Satellite.SatErrorResp", errorType: 0, lsn: undefined, message: undefined }; } export const SatErrorResp = { @@ -941,6 +947,12 @@ export const SatErrorResp = { if (message.errorType !== 0) { writer.uint32(8).int32(message.errorType); } + if (message.lsn !== undefined) { + writer.uint32(18).bytes(message.lsn); + } + if (message.message !== undefined) { + writer.uint32(26).string(message.message); + } return writer; }, @@ -958,6 +970,20 @@ export const SatErrorResp = { message.errorType = reader.int32() as any; continue; + case 2: + if (tag !== 18) { + break; + } + + message.lsn = reader.bytes(); + continue; + case 3: + if (tag !== 26) { + break; + } + + message.message = reader.string(); + continue; } if ((tag & 7) === 4 || tag === 0) { break; @@ -974,6 +1000,8 @@ export const SatErrorResp = { fromPartial, I>>(object: I): SatErrorResp { const message = createBaseSatErrorResp(); message.errorType = object.errorType ?? 0; + message.lsn = object.lsn ?? undefined; + message.message = object.message ?? undefined; return message; }, }; diff --git a/components/electric/lib/electric/plug/migrations.ex b/components/electric/lib/electric/plug/migrations.ex index cf90f75009..f6c6356a69 100644 --- a/components/electric/lib/electric/plug/migrations.ex +++ b/components/electric/lib/electric/plug/migrations.ex @@ -2,7 +2,7 @@ defmodule Electric.Plug.Migrations do use Plug.Router use Electric.Satellite.Protobuf - alias Electric.Postgres.Extension.SchemaCache + alias Electric.Postgres.Extension.{SchemaCache, SchemaLoader} require Logger @@ -106,8 +106,10 @@ defmodule Electric.Plug.Migrations do defp translate_stmts(version, schema, stmts, dialect) do Enum.flat_map(stmts, fn stmt -> + schema_version = SchemaLoader.Version.new(version, schema) + {:ok, msgs, _relations} = - Electric.Postgres.Replication.migrate(schema, version, stmt, dialect) + Electric.Postgres.Replication.migrate(schema_version, stmt, dialect) msgs end) diff --git a/components/electric/lib/electric/postgres/extension/schema_cache.ex b/components/electric/lib/electric/postgres/extension/schema_cache.ex index c5a3ac402f..1b120d3d8f 100644 --- a/components/electric/lib/electric/postgres/extension/schema_cache.ex +++ b/components/electric/lib/electric/postgres/extension/schema_cache.ex @@ -93,16 +93,6 @@ defmodule Electric.Postgres.Extension.SchemaCache do call(origin, {:relation_oid, type, schema, name}) end - @impl SchemaLoader - def primary_keys(origin, {schema, name}) do - call(origin, {:primary_keys, schema, name}) - end - - @impl SchemaLoader - def primary_keys(origin, schema, name) do - call(origin, {:primary_keys, schema, name}) - end - @impl SchemaLoader def refresh_subscription(origin, name) do call(origin, {:refresh_subscription, name}) @@ -246,17 +236,28 @@ defmodule Electric.Postgres.Extension.SchemaCache do @impl GenServer def handle_call({:load, :current}, _from, %{current: nil} = state) do - {result, state} = load_current_schema(state) - - {:reply, result, state} + with {{:ok, schema_version}, state} <- load_current_schema(state) do + {:reply, {:ok, schema_version}, state} + else + {error, state} -> + {:reply, error, state} + end end - def handle_call({:load, :current}, _from, %{current: {version, schema}} = state) do - {:reply, {:ok, version, schema}, state} + def handle_call( + {:load, :current}, + _from, + %{current: %SchemaLoader.Version{} = schema_version} = state + ) do + {:reply, {:ok, schema_version}, state} end - def handle_call({:load, {:version, version}}, _from, %{current: {version, schema}} = state) do - {:reply, {:ok, version, schema}, state} + def handle_call( + {:load, {:version, version}}, + _from, + %{current: %{version: version} = schema_version} = state + ) do + {:reply, {:ok, schema_version}, state} end def handle_call({:load, {:version, version}}, _from, state) do @@ -264,9 +265,10 @@ defmodule Electric.Postgres.Extension.SchemaCache do end def handle_call({:save, version, schema, stmts}, _from, state) do - {:ok, backend} = SchemaLoader.save(state.backend, version, schema, stmts) + {:ok, backend, schema_version} = SchemaLoader.save(state.backend, version, schema, stmts) - {:reply, {:ok, state.origin}, %{state | backend: backend, current: {version, schema}}} + {:reply, {:ok, state.origin, schema_version}, + %{state | backend: backend, current: schema_version}} end def handle_call({:relation_oid, type, schema, name}, _from, state) do @@ -275,8 +277,14 @@ defmodule Electric.Postgres.Extension.SchemaCache do def handle_call({:primary_keys, sname, tname}, _from, state) do {result, state} = - with {{:ok, _version, schema}, state} <- current_schema(state) do - {Schema.primary_keys(schema, sname, tname), state} + with {{:ok, schema_version}, state} <- current_schema(state) do + case SchemaLoader.Version.primary_keys(schema_version, {sname, tname}) do + {:ok, pks} -> + {{:ok, pks}, state} + + {:error, _reason} = error -> + {error, state} + end end {:reply, result, state} @@ -296,8 +304,8 @@ defmodule Electric.Postgres.Extension.SchemaCache do end def handle_call(:electrified_tables, _from, state) do - load_and_reply(state, fn schema -> - {:ok, Schema.table_info(schema)} + load_and_reply(state, fn schema_version -> + {:ok, Schema.table_info(schema_version.schema)} end) end @@ -323,21 +331,21 @@ defmodule Electric.Postgres.Extension.SchemaCache do end def handle_call({:relation, oid}, _from, state) when is_integer(oid) do - load_and_reply(state, fn schema -> - Schema.table_info(schema, oid) + load_and_reply(state, fn schema_version -> + Schema.table_info(schema_version.schema, oid) end) end def handle_call({:relation, {_sname, _tname} = relation}, _from, state) do - load_and_reply(state, fn schema -> - Schema.table_info(schema, relation) + load_and_reply(state, fn schema_version -> + Schema.table_info(schema_version.schema, relation) end) end def handle_call({:relation, relation, version}, _from, state) do {result, state} = - with {:ok, ^version, schema} <- SchemaLoader.load(state.backend, version) do - {Schema.table_info(schema, relation), state} + with {:ok, schema_version} <- SchemaLoader.load(state.backend, version) do + {Schema.table_info(schema_version.schema, relation), state} else error -> {error, state} end @@ -422,14 +430,14 @@ defmodule Electric.Postgres.Extension.SchemaCache do load_current_schema(state) end - defp current_schema(%{current: {version, schema}} = state) do - {{:ok, version, schema}, state} + defp current_schema(%{current: schema_version} = state) do + {{:ok, schema_version}, state} end defp load_current_schema(state) do case SchemaLoader.load(state.backend) do - {:ok, version, schema} -> - {{:ok, version, schema}, %{state | current: {version, schema}}} + {:ok, schema_version} -> + {{:ok, schema_version}, %{state | current: schema_version}} error -> {error, state} @@ -438,8 +446,8 @@ defmodule Electric.Postgres.Extension.SchemaCache do defp load_and_reply(state, process) when is_function(process, 1) do {result, state} = - with {{:ok, _version, schema}, state} <- current_schema(state) do - {process.(schema), state} + with {{:ok, schema_version}, state} <- current_schema(state) do + {process.(schema_version), state} else error -> {error, state} end diff --git a/components/electric/lib/electric/postgres/extension/schema_cache/global.ex b/components/electric/lib/electric/postgres/extension/schema_cache/global.ex index 302324de01..bf790edb98 100644 --- a/components/electric/lib/electric/postgres/extension/schema_cache/global.ex +++ b/components/electric/lib/electric/postgres/extension/schema_cache/global.ex @@ -36,18 +36,6 @@ defmodule Electric.Postgres.Extension.SchemaCache.Global do fun.(pid) end - def primary_keys({_schema, _name} = relation) do - with_instance(fn pid -> - SchemaCache.primary_keys(pid, relation) - end) - end - - def primary_keys(schema, name) when is_binary(schema) and is_binary(name) do - with_instance(fn pid -> - SchemaCache.primary_keys(pid, schema, name) - end) - end - def migration_history(version) do with_instance(fn pid -> SchemaCache.migration_history(pid, version) diff --git a/components/electric/lib/electric/postgres/extension/schema_loader.ex b/components/electric/lib/electric/postgres/extension/schema_loader.ex index 5142364621..d07da1cea6 100644 --- a/components/electric/lib/electric/postgres/extension/schema_loader.ex +++ b/components/electric/lib/electric/postgres/extension/schema_loader.ex @@ -1,6 +1,7 @@ defmodule Electric.Postgres.Extension.SchemaLoader do alias Electric.Postgres.{Schema, Extension.Migration} alias Electric.Replication.Connectors + alias __MODULE__.Version @type state() :: term() @type version() :: String.t() @@ -18,13 +19,11 @@ defmodule Electric.Postgres.Extension.SchemaLoader do @type tx_fk_row() :: %{binary() => integer() | binary()} @callback connect(Connectors.config(), Keyword.t()) :: {:ok, state()} - @callback load(state()) :: {:ok, version(), Schema.t()} - @callback load(state(), version()) :: {:ok, version(), Schema.t()} | {:error, binary()} + @callback load(state()) :: {:ok, Version.t()} + @callback load(state(), version()) :: {:ok, Version.t()} | {:error, binary()} @callback save(state(), version(), Schema.t(), [String.t()]) :: - {:ok, state()} | {:error, term()} + {:ok, state(), Version.t()} | {:error, term()} @callback relation_oid(state(), rel_type(), schema(), name()) :: oid_result() - @callback primary_keys(state(), schema(), name()) :: pk_result() - @callback primary_keys(state(), relation()) :: pk_result() @callback refresh_subscription(state(), name()) :: :ok | {:error, term()} @callback migration_history(state(), version() | nil) :: {:ok, [Migration.t()]} | {:error, term()} @@ -61,8 +60,8 @@ defmodule Electric.Postgres.Extension.SchemaLoader do end def save({module, state}, version, schema, stmts) do - with {:ok, state} <- module.save(state, version, schema, stmts) do - {:ok, {module, state}} + with {:ok, state, schema_version} <- module.save(state, version, schema, stmts) do + {:ok, {module, state}, schema_version} end end @@ -70,14 +69,6 @@ defmodule Electric.Postgres.Extension.SchemaLoader do module.relation_oid(state, rel_type, schema, table) end - def primary_keys({module, state}, schema, table) do - module.primary_keys(state, schema, table) - end - - def primary_keys({_module, _state} = impl, {schema, table}) do - primary_keys(impl, schema, table) - end - def refresh_subscription({module, state}, name) do module.refresh_subscription(state, name) end diff --git a/components/electric/lib/electric/postgres/extension/schema_loader/epgsql.ex b/components/electric/lib/electric/postgres/extension/schema_loader/epgsql.ex index 5fe49fecb8..0cb931178f 100644 --- a/components/electric/lib/electric/postgres/extension/schema_loader/epgsql.ex +++ b/components/electric/lib/electric/postgres/extension/schema_loader/epgsql.ex @@ -93,14 +93,18 @@ defmodule Electric.Postgres.Extension.SchemaLoader.Epgsql do @impl true def load(pool) do checkout!(pool, fn conn -> - Extension.current_schema(conn) + with {:ok, version, schema} <- Extension.current_schema(conn) do + {:ok, SchemaLoader.Version.new(version, schema)} + end end) end @impl true def load(pool, version) do checkout!(pool, fn conn -> - Extension.schema_version(conn, version) + with {:ok, version, schema} <- Extension.schema_version(conn, version) do + {:ok, SchemaLoader.Version.new(version, schema)} + end end) end @@ -108,7 +112,7 @@ defmodule Electric.Postgres.Extension.SchemaLoader.Epgsql do def save(pool, version, schema, stmts) do checkout!(pool, fn conn -> with :ok <- Extension.save_schema(conn, version, schema, stmts) do - {:ok, pool} + {:ok, pool, SchemaLoader.Version.new(version, schema)} end end) end @@ -124,35 +128,6 @@ defmodule Electric.Postgres.Extension.SchemaLoader.Epgsql do end) end - @primary_keys_query """ - SELECT a.attname - FROM pg_class c - INNER JOIN pg_namespace n ON c.relnamespace = n.oid - INNER JOIN pg_index i ON i.indrelid = c.oid - INNER JOIN pg_attribute a ON a.attrelid = i.indrelid AND a.attnum = ANY(i.indkey) - WHERE - n.nspname = $1 - AND c.relname = $2 - AND c.relkind = 'r' - AND i.indisprimary - """ - - @impl true - def primary_keys(pool, schema, name) do - checkout!(pool, fn conn -> - {:ok, _, pks_data} = :epgsql.equery(conn, @primary_keys_query, [schema, name]) - - {:ok, Enum.map(pks_data, &elem(&1, 0))} - end) - end - - @impl true - def primary_keys(pool, {schema, name}) do - checkout!(pool, fn conn -> - primary_keys(conn, schema, name) - end) - end - @impl true def refresh_subscription(pool, name) do checkout!(pool, fn conn -> diff --git a/components/electric/lib/electric/postgres/extension/schema_loader/version.ex b/components/electric/lib/electric/postgres/extension/schema_loader/version.ex new file mode 100644 index 0000000000..02e7baef58 --- /dev/null +++ b/components/electric/lib/electric/postgres/extension/schema_loader/version.ex @@ -0,0 +1,114 @@ +defmodule Electric.Postgres.Extension.SchemaLoader.Version do + alias Electric.Postgres.Schema + alias Electric.Postgres.Extension.SchemaLoader + alias Electric.Postgres.Schema.Proto.Table + + defstruct [:version, :schema, :fk_graph, tables: %{}, primary_keys: %{}] + + @type version() :: SchemaLoader.version() + @type relation() :: SchemaLoader.relation() + @type name() :: SchemaLoader.name() + @type schema() :: SchemaLoader.schema() + @type table_ref() :: relation() | %Table{} | %Schema.Proto.RangeVar{} + + @type t() :: %__MODULE__{ + version: nil | version(), + schema: Schema.t(), + fk_graph: Graph.t(), + tables: %{relation() => %Table{}}, + primary_keys: %{relation() => [String.t()]} + } + + @spec new(version(), Schema.t()) :: t() + def new(version, %Schema.Proto.Schema{} = schema) do + %__MODULE__{version: version, schema: schema} + |> Map.update!(:tables, &cache_tables_by_name(&1, schema)) + |> Map.update!(:primary_keys, &cache_pks_by_name(&1, schema)) + |> Map.put(:fk_graph, Schema.public_fk_graph(schema)) + end + + defp cache_tables_by_name(tables, schema) do + Enum.reduce(schema.tables, tables, fn table, cache -> + Map.put(cache, table_name(table), table) + end) + end + + defp cache_pks_by_name(pks, schema) do + Enum.reduce(schema.tables, pks, fn table, cache -> + case Schema.primary_keys(table) do + {:ok, pks} -> + Map.put(cache, table_name(table), pks) + + {:error, _} -> + Map.put(cache, table_name(table), []) + end + end) + end + + defp table_name(%{name: %{schema: s, name: n}}) do + {s, n} + end + + defp table_name(%{schema: s, name: n}) do + {s, n} + end + + defp table_name({s, n}) do + {s, n} + end + + @spec tables(t()) :: [%Table{}] + def tables(%__MODULE__{schema: schema}) do + schema.tables + end + + @spec table(t(), schema(), name()) :: {:ok, %Table{}} | {:error, String.t()} + def table(%__MODULE__{tables: tables}, sname, tname) do + fetch_table_value(tables, {sname, tname}) + end + + @spec table(t(), table_ref()) :: {:ok, %Table{}} | {:error, String.t()} + def table(%__MODULE__{tables: tables}, name) do + fetch_table_value(tables, table_name(name)) + end + + @spec table!(t(), table_ref()) :: %Table{} | no_return() + def table!(version, name) do + case table(version, name) do + {:ok, table} -> table + {:error, reason} -> raise ArgumentError, message: reason + end + end + + @spec version(t()) :: version() + def version(%__MODULE__{version: version}) do + version + end + + @spec schema(t()) :: Schema.t() + def schema(%__MODULE__{schema: schema}) do + schema + end + + @spec primary_keys(t(), schema(), name()) :: {:ok, [name()]} | {:error, String.t()} + def primary_keys(%__MODULE__{primary_keys: pks}, sname, tname) do + fetch_table_value(pks, {sname, tname}) + end + + @spec primary_keys(t(), table_ref()) :: {:ok, [name()]} | {:error, String.t()} + def primary_keys(%__MODULE__{primary_keys: pks}, relation) do + fetch_table_value(pks, relation) + end + + @spec fk_graph(t()) :: Graph.t() + def fk_graph(%__MODULE__{fk_graph: fk_graph}) do + fk_graph + end + + defp fetch_table_value(values, relation) do + case Map.fetch(values, relation) do + {:ok, value} -> {:ok, value} + :error -> {:error, "Table #{Electric.Utils.inspect_relation(relation)} not found"} + end + end +end diff --git a/components/electric/lib/electric/postgres/proxy/injector/prisma.ex b/components/electric/lib/electric/postgres/proxy/injector/prisma.ex index 99adf164d2..af962795b6 100644 --- a/components/electric/lib/electric/postgres/proxy/injector/prisma.ex +++ b/components/electric/lib/electric/postgres/proxy/injector/prisma.ex @@ -129,10 +129,10 @@ defmodule Electric.Postgres.Proxy.Injector.Prisma do {:ok, {ps, binds}} -> {:ok, query_module} = Map.fetch(prisma.prepared_statements, ps) # TODO: allow for introspecing a specific version of the schema - {:ok, _version, schema} = schema(state) + {:ok, schema_version} = schema(state) data_rows = - query_module.data_rows(binds, schema, prisma.config) + query_module.data_rows(binds, schema_version, prisma.config) |> Enum.map(&%M.DataRow{fields: &1}) { diff --git a/components/electric/lib/electric/postgres/proxy/prisma/query.ex b/components/electric/lib/electric/postgres/proxy/prisma/query.ex index 3c9a31a9fa..2a5046c06f 100644 --- a/components/electric/lib/electric/postgres/proxy/prisma/query.ex +++ b/components/electric/lib/electric/postgres/proxy/prisma/query.ex @@ -1,13 +1,13 @@ defmodule Electric.Postgres.Proxy.Prisma.Query do alias Electric.Postgres.Proxy.Prisma - alias Electric.Postgres.Schema + alias Electric.Postgres.Extension.SchemaLoader alias PgProtocol.Message, as: M @type data_row() :: [binary()] @callback column_names() :: [String.t()] @callback parameter_description(Prisma.t()) :: [integer()] @callback row_description(Prisma.t()) :: [M.RowDescription.Field.t()] - @callback data_rows([term()], Schema.t(), Prisma.t()) :: [data_row()] + @callback data_rows([term()], SchemaLoader.Version.t(), Prisma.t()) :: [data_row()] # PG_VERSION_NUM => sprintf("%d%04d", $majorver, $minorver) defguard is_major_version(config, v) @@ -32,8 +32,10 @@ defmodule Electric.Postgres.Proxy.Prisma.Query do "PostgreSQL #{v} Electric" end - def namespace_exists?(schema, namespace) do - Enum.any?(schema.tables, &(&1.name.schema == namespace)) + def namespace_exists?(schema_version, namespace) do + schema_version + |> SchemaLoader.Version.tables() + |> Enum.any?(&(&1.name.schema == namespace)) end def bool(b) when is_boolean(b) do @@ -70,14 +72,16 @@ defmodule Electric.Postgres.Proxy.Prisma.Query do Prisma.parse_bind_array(name_array) end - def tables_in_schema(nspname_array, schema) do + def tables_in_schema(nspname_array, schema_version) do nspname_array |> parse_name_array() - |> Enum.flat_map(&tables_for_schema(&1, schema)) + |> Enum.flat_map(&tables_for_schema(&1, schema_version)) end - defp tables_for_schema(nspname, schema) do - Enum.filter(schema.tables, &(&1.name.schema == nspname)) + defp tables_for_schema(nspname, schema_version) do + schema_version + |> SchemaLoader.Version.tables() + |> Enum.filter(&(&1.name.schema == nspname)) end end @@ -105,7 +109,7 @@ defmodule Electric.Postgres.Proxy.Prisma.Query.VersionV5_2 do ] end - def data_rows(_binds, _schema, config) do + def data_rows(_binds, _schema_version, config) do [[server_version_string(config)]] end end @@ -138,8 +142,8 @@ defmodule Electric.Postgres.Proxy.Prisma.Query.NamespaceVersionV5_2 do ] end - def data_rows([nspname], schema, %{server_version: {_, v}} = config) do - exists = namespace_exists?(schema, nspname) + def data_rows([nspname], schema_version, %{server_version: {_, v}} = config) do + exists = namespace_exists?(schema_version, nspname) [[bool(exists), server_version_string(config), i32(v)]] end end @@ -172,12 +176,12 @@ defmodule Electric.Postgres.Proxy.Prisma.Query.NamespaceV5_2 do ] end - def data_rows([nspname_array], schema, _config) do + def data_rows([nspname_array], schema_version, _config) do # see above re questions over purpose of this.. - # exists = namespace_exists?(schema, nspname) + # exists = namespace_exists?(schema_version, nspname) nspname_array |> parse_name_array() - |> Enum.filter(&namespace_exists?(schema, &1)) + |> Enum.filter(&namespace_exists?(schema_version, &1)) |> Enum.map(&[&1]) |> Enum.sort() end @@ -211,9 +215,9 @@ defmodule Electric.Postgres.Proxy.Prisma.Query.TableListV4_8 do ] end - def data_rows([nspname_array], schema, _config) do + def data_rows([nspname_array], schema_version, _config) do nspname_array - |> tables_in_schema(schema) + |> tables_in_schema(schema_version) |> Enum.map(&table_entry/1) |> Enum.sort_by(fn [t, s] -> [s, t] end) end @@ -277,9 +281,9 @@ defmodule Electric.Postgres.Proxy.Prisma.Query.TableV5_2 do ] end - def data_rows([nspname_array], schema, _config) do + def data_rows([nspname_array], schema_version, _config) do nspname_array - |> tables_in_schema(schema) + |> tables_in_schema(schema_version) |> Enum.flat_map(&table_description/1) |> Enum.sort_by(fn [t, s | _] -> [s, t] end) end @@ -355,9 +359,9 @@ defmodule Electric.Postgres.Proxy.Prisma.Query.ConstraintV5_2 do ] end - def data_rows([nspname_array], schema, _config) do + def data_rows([nspname_array], schema_version, _config) do nspname_array - |> tables_in_schema(schema) + |> tables_in_schema(schema_version) |> Enum.flat_map(&table_check_constraints/1) |> Enum.sort_by(fn [ns, tn, cn, ct | _] -> [ns, tn, cn, ct] end) end @@ -410,7 +414,7 @@ defmodule Electric.Postgres.Proxy.Prisma.Query.ViewV4_8 do end # we don't support views - def data_rows([_nspname], _schema, _config) do + def data_rows([_nspname], _schema_version, _config) do [] end end @@ -450,7 +454,7 @@ defmodule Electric.Postgres.Proxy.Prisma.Query.ViewV5_2 do end # we don't support views - def data_rows([_nspname], _schema, _config) do + def data_rows([_nspname], _schema_version, _config) do [] end end @@ -486,7 +490,7 @@ defmodule Electric.Postgres.Proxy.Prisma.Query.TypeV4_8 do end # we don't support custom types - def data_rows([_nspname], _schema, _config) do + def data_rows([_nspname], _schema_version, _config) do [] end end @@ -527,7 +531,7 @@ defmodule Electric.Postgres.Proxy.Prisma.Query.TypeV5_2 do end # we don't support custom types - def data_rows([_nspname], _schema, _config) do + def data_rows([_nspname], _schema_version, _config) do [] end end @@ -612,10 +616,10 @@ defmodule Electric.Postgres.Proxy.Prisma.Query.ColumnV4_8 do ] end - def data_rows([nspname_array], schema, _config) do + def data_rows([nspname_array], schema_version, _config) do # ["public", "items", "id", "text", nil, nil, nil, nil, "text", "pg_catalog", "text", nil, "NO", "NO", nil, nil], nspname_array - |> tables_in_schema(schema) + |> tables_in_schema(schema_version) |> Enum.flat_map(&table_columns/1) |> Enum.sort_by(fn [op, ns, tn | _] -> [ns, tn, op] end) |> Enum.map(fn [_ | rest] -> rest end) @@ -859,10 +863,10 @@ defmodule Electric.Postgres.Proxy.Prisma.Query.ColumnV5_2 do ] end - def data_rows([nspname_array], schema, _config) do + def data_rows([nspname_array], schema_version, _config) do # ["public", "items", "id", "text", nil, nil, nil, nil, "text", "pg_catalog", "text", nil, "NO", "NO", nil, nil], nspname_array - |> tables_in_schema(schema) + |> tables_in_schema(schema_version) |> Enum.flat_map(&table_columns/1) |> Enum.sort_by(fn [op, ns, tn | _] -> [ns, tn, op] end) |> Enum.map(fn [_ | rest] -> rest end) @@ -1067,7 +1071,7 @@ defmodule Electric.Postgres.Proxy.Prisma.Query.ForeignKeyV4_8 do """ @behaviour Electric.Postgres.Proxy.Prisma.Query - alias Electric.Postgres.Schema + alias Electric.Postgres.Extension.SchemaLoader import Electric.Postgres.Proxy.Prisma.Query @@ -1110,17 +1114,17 @@ defmodule Electric.Postgres.Proxy.Prisma.Query.ForeignKeyV4_8 do ] end - def data_rows([nspname_array], schema, _config) do + def data_rows([nspname_array], schema_version, _config) do nspname_array - |> tables_in_schema(schema) - |> Enum.flat_map(&table_fks(&1, schema)) + |> tables_in_schema(schema_version) + |> Enum.flat_map(&table_fks(&1, schema_version)) |> Enum.sort_by(fn [colidx, ci, _, _, _, _, _, _, cn, _, _, tn, ns] -> [ns, tn, cn, ci, colidx] end) |> Enum.map(fn [_ | rest] -> rest end) end - defp table_fks(table, schema) do + defp table_fks(table, schema_version) do table.constraints |> Enum.filter(&is_fk/1) |> Enum.flat_map(fn %{constraint: {:foreign, fk}} -> @@ -1128,7 +1132,7 @@ defmodule Electric.Postgres.Proxy.Prisma.Query.ForeignKeyV4_8 do |> Enum.zip(fk.fk_cols) |> Enum.with_index() |> Enum.map(fn {{parent_column, child_column}, i} -> - {:ok, parent_table} = Schema.fetch_table(schema, fk.pk_table) + {:ok, parent_table} = SchemaLoader.Version.table(schema_version, fk.pk_table) parent_idx = Enum.find_index(parent_table.columns, &(&1.name == parent_column)) || @@ -1221,7 +1225,7 @@ defmodule Electric.Postgres.Proxy.Prisma.Query.ForeignKeyV5_2 do """ @behaviour Electric.Postgres.Proxy.Prisma.Query - alias Electric.Postgres.Schema + alias Electric.Postgres.Extension.SchemaLoader import Electric.Postgres.Proxy.Prisma.Query @@ -1268,17 +1272,17 @@ defmodule Electric.Postgres.Proxy.Prisma.Query.ForeignKeyV5_2 do ] end - def data_rows([nspname_array], schema, _config) do + def data_rows([nspname_array], schema_version, _config) do nspname_array - |> tables_in_schema(schema) - |> Enum.flat_map(&table_fks(&1, schema)) + |> tables_in_schema(schema_version) + |> Enum.flat_map(&table_fks(&1, schema_version)) |> Enum.sort_by(fn [colidx, ci, _, _, _, _, _, _, cn, _, _, tn, ns, _, _] -> [ns, tn, cn, ci, colidx] end) |> Enum.map(fn [_ | rest] -> rest end) end - defp table_fks(table, schema) do + defp table_fks(table, schema_version) do table.constraints |> Enum.filter(&is_fk/1) |> Enum.flat_map(fn %{constraint: {:foreign, fk}} -> @@ -1286,7 +1290,7 @@ defmodule Electric.Postgres.Proxy.Prisma.Query.ForeignKeyV5_2 do |> Enum.zip(fk.fk_cols) |> Enum.with_index() |> Enum.map(fn {{parent_column, child_column}, i} -> - {:ok, parent_table} = Schema.fetch_table(schema, fk.pk_table) + {:ok, parent_table} = SchemaLoader.Version.table(schema_version, fk.pk_table) parent_idx = Enum.find_index(parent_table.columns, &(&1.name == parent_column)) || @@ -1418,15 +1422,15 @@ defmodule Electric.Postgres.Proxy.Prisma.Query.IndexV4_8 do ] end - def data_rows([nspname_array], schema, _config) do + def data_rows([nspname_array], schema_version, _config) do # ["public", "items_pkey", "items", "id", <<1>>, <<1>>, <<0, 0, 0, 0>>, "text_ops", <<1>>, "btree", "ASC", <<0>>, <<0>>, <<0>>] nspname_array - |> tables_in_schema(schema) - |> Enum.flat_map(&table_indexes(&1, schema)) + |> tables_in_schema(schema_version) + |> Enum.flat_map(&table_indexes(&1, schema_version)) |> Enum.sort_by(fn [ns, idn, tn, _, _, _, idx | _] -> [ns, tn, idn, idx] end) end - defp table_indexes(table, schema) do + defp table_indexes(table, schema_version) do Enum.flat_map(table.indexes, fn index -> index.columns |> Enum.with_index() @@ -1447,10 +1451,10 @@ defmodule Electric.Postgres.Proxy.Prisma.Query.IndexV4_8 do to_string(column.ordering) ] end) - end) ++ table_constraints(table, schema) + end) ++ table_constraints(table, schema_version) end - defp table_constraints(table, _schema) do + defp table_constraints(table, _schema_version) do Enum.flat_map(table.constraints, fn %{constraint: {type, constraint}} when type in [:primary, :unique] -> constraint.keys @@ -1592,15 +1596,15 @@ defmodule Electric.Postgres.Proxy.Prisma.Query.IndexV5_2 do ] end - def data_rows([nspname_array], schema, _config) do + def data_rows([nspname_array], schema_version, _config) do # ["public", "items_pkey", "items", "id", <<1>>, <<1>>, <<0, 0, 0, 0>>, "text_ops", <<1>>, "btree", "ASC", <<0>>, <<0>>, <<0>>] nspname_array - |> tables_in_schema(schema) - |> Enum.flat_map(&table_indexes(&1, schema)) + |> tables_in_schema(schema_version) + |> Enum.flat_map(&table_indexes(&1, schema_version)) |> Enum.sort_by(fn [ns, idn, tn, _, _, _, idx | _] -> [ns, tn, idn, idx] end) end - defp table_indexes(table, schema) do + defp table_indexes(table, schema_version) do Enum.flat_map(table.indexes, fn index -> index.columns |> Enum.with_index() @@ -1624,10 +1628,10 @@ defmodule Electric.Postgres.Proxy.Prisma.Query.IndexV5_2 do bool(nil) ] end) - end) ++ table_constraints(table, schema) + end) ++ table_constraints(table, schema_version) end - defp table_constraints(table, _schema) do + defp table_constraints(table, _schema_version) do Enum.flat_map(table.constraints, fn %{constraint: {type, constraint}} when type in [:primary, :unique] -> constraint.keys @@ -1706,7 +1710,7 @@ defmodule Electric.Postgres.Proxy.Prisma.Query.FunctionV5_2 do end # we don't support functions currently - def data_rows([_nspname], _schema, _config) do + def data_rows([_nspname], _schema_version, _config) do [] end end @@ -1745,7 +1749,7 @@ defmodule Electric.Postgres.Proxy.Prisma.Query.ExtensionV5_2 do end # we don't support extensions - def data_rows(_binds, _schema, _config) do + def data_rows(_binds, _schema_version, _config) do [] end end @@ -1801,7 +1805,7 @@ defmodule Electric.Postgres.Proxy.Prisma.Query.SequenceV5_2 do end # we don't support sequences, so this must be empty - def data_rows([_nspname], _schema, _config) do + def data_rows([_nspname], _schema_version, _config) do [] end end diff --git a/components/electric/lib/electric/postgres/replication.ex b/components/electric/lib/electric/postgres/replication.ex index c45ed84b90..8b08891b7d 100644 --- a/components/electric/lib/electric/postgres/replication.ex +++ b/components/electric/lib/electric/postgres/replication.ex @@ -2,7 +2,7 @@ defmodule Electric.Postgres.Replication do use Electric.Satellite.Protobuf alias PgQuery, as: Pg - alias Electric.Postgres.{Dialect, Schema, Schema.AST, Schema.Proto} + alias Electric.Postgres.{Dialect, Extension.SchemaLoader, Schema.AST, Schema.Proto} defmodule Column do alias Electric.Postgres @@ -59,9 +59,9 @@ defmodule Electric.Postgres.Replication do # 4. use the updated schema to get column, fk and pk information for the affected tables # # - creation of indexes doesn't affect any tables so that list should be empty - @spec migrate(Schema.t(), version(), binary(), Electric.Postgres.Dialect.t()) :: + @spec migrate(SchemaLoader.Version.t(), binary(), Electric.Postgres.Dialect.t()) :: {:ok, [%SatOpMigrate{}], [{binary, binary}]} - def migrate(schema, version, stmt, dialect \\ @default_dialect) do + def migrate(schema_version, stmt, dialect \\ @default_dialect) do ast = Electric.Postgres.parse!(stmt) case propagatable_stmt?(ast) do @@ -69,7 +69,7 @@ defmodule Electric.Postgres.Replication do {:ok, [], []} propagate_ast -> - {msg, relations} = build_replication_msg(propagate_ast, version, schema, dialect) + {msg, relations} = build_replication_msg(propagate_ast, schema_version, dialect) {:ok, [msg], relations} end @@ -112,14 +112,14 @@ defmodule Electric.Postgres.Replication do [] end - defp build_replication_msg(ast, version, schema, dialect) do + defp build_replication_msg(ast, schema_version, dialect) do affected_tables = affected_tables(ast, dialect) relations = Enum.map(affected_tables, &{&1.schema, &1.name}) tables = affected_tables - |> Enum.map(&Schema.fetch_table!(schema, &1)) + |> Enum.map(&SchemaLoader.Version.table!(schema_version, &1)) |> Enum.map(&replication_msg_table(&1, dialect)) table = @@ -138,7 +138,7 @@ defmodule Electric.Postgres.Replication do ) {%SatOpMigrate{ - version: version, + version: SchemaLoader.Version.version(schema_version), table: table, stmts: stmts }, relations} diff --git a/components/electric/lib/electric/postgres/shadow_table_transformation.ex b/components/electric/lib/electric/postgres/shadow_table_transformation.ex index e552246acd..39d0de7217 100644 --- a/components/electric/lib/electric/postgres/shadow_table_transformation.ex +++ b/components/electric/lib/electric/postgres/shadow_table_transformation.ex @@ -98,8 +98,8 @@ defmodule Electric.Postgres.ShadowTableTransformation do defp build_bitmask(%Changes.Compensation{}, columns), do: Enum.map(columns, fn _ -> "f" end) - defp build_bitmask(%Changes.UpdatedRecord{old_record: old, record: new}, columns), - do: Enum.map(columns, fn col -> if old[col] != new[col], do: "t", else: "f" end) + defp build_bitmask(%Changes.UpdatedRecord{changed_columns: changed}, columns), + do: Enum.map(columns, &if(MapSet.member?(changed, &1), do: "t", else: "f")) defp build_bitmask(%Changes.DeletedRecord{}, columns), do: Enum.map(columns, fn _ -> "f" end) diff --git a/components/electric/lib/electric/replication/changes.ex b/components/electric/lib/electric/replication/changes.ex index 68efcd77ed..d33238eece 100644 --- a/components/electric/lib/electric/replication/changes.ex +++ b/components/electric/lib/electric/replication/changes.ex @@ -84,14 +84,46 @@ defmodule Electric.Replication.Changes do end defmodule UpdatedRecord do - defstruct [:relation, :old_record, :record, tags: []] + defstruct [:relation, :old_record, :record, tags: [], changed_columns: MapSet.new()] @type t() :: %__MODULE__{ relation: Changes.relation(), old_record: Changes.record() | nil, record: Changes.record(), - tags: [Changes.tag()] + tags: [Changes.tag()], + changed_columns: MapSet.t() } + + def new(attrs) do + __MODULE__ + |> struct(attrs) + |> build_changed_columns() + end + + defp build_changed_columns(%{old_record: nil} = change) do + change + end + + defp build_changed_columns(change) do + %{old_record: old, record: new} = change + + # if the value is in the new but NOT the old, then it's being updated + # if it's in the old but NOT the new, then it's staying the same + changed = + Enum.reduce(new, MapSet.new(), fn {col_name, new_value}, changed -> + case Map.fetch(old, col_name) do + :error -> + MapSet.put(changed, col_name) + + {:ok, old_value} -> + if old_value == new_value, + do: changed, + else: MapSet.put(changed, col_name) + end + end) + + %{change | changed_columns: changed} + end end defmodule DeletedRecord do diff --git a/components/electric/lib/electric/replication/initial_sync.ex b/components/electric/lib/electric/replication/initial_sync.ex index d389731b72..962338fb5e 100644 --- a/components/electric/lib/electric/replication/initial_sync.ex +++ b/components/electric/lib/electric/replication/initial_sync.ex @@ -100,7 +100,7 @@ defmodule Electric.Replication.InitialSync do ) do Client.with_conn(Connectors.get_connection_opts(opts), fn conn -> origin = Connectors.origin(opts) - {:ok, _, schema} = Extension.SchemaCache.load(origin) + {:ok, schema_version} = Extension.SchemaCache.load(origin) Client.with_transaction( "ISOLATION LEVEL REPEATABLE READ READ ONLY", @@ -123,7 +123,13 @@ defmodule Electric.Replication.InitialSync do Enum.reduce_while(requests, [], fn request, results -> start = System.monotonic_time() - case Shapes.ShapeRequest.query_initial_data(request, conn, schema, origin, context) do + case Shapes.ShapeRequest.query_initial_data( + request, + conn, + schema_version, + origin, + context + ) do {:ok, num_records, data} -> Metrics.span_event( span, diff --git a/components/electric/lib/electric/replication/postgres/logical_replication_producer.ex b/components/electric/lib/electric/replication/postgres/logical_replication_producer.ex index 647a444b3b..6092e6b634 100644 --- a/components/electric/lib/electric/replication/postgres/logical_replication_producer.ex +++ b/components/electric/lib/electric/replication/postgres/logical_replication_producer.ex @@ -195,11 +195,12 @@ defmodule Electric.Replication.Postgres.LogicalReplicationProducer do old_data = data_tuple_to_map(relation.columns, msg.old_tuple_data) data = data_tuple_to_map(relation.columns, msg.tuple_data) - updated_record = %UpdatedRecord{ - relation: {relation.namespace, relation.name}, - old_record: old_data, - record: data - } + updated_record = + UpdatedRecord.new( + relation: {relation.namespace, relation.name}, + old_record: old_data, + record: data + ) {lsn, txn} = state.transaction txn = %{txn | changes: [updated_record | txn.changes]} diff --git a/components/electric/lib/electric/replication/postgres/migration_consumer.ex b/components/electric/lib/electric/replication/postgres/migration_consumer.ex index dbe39d66b8..5b1228b4b5 100644 --- a/components/electric/lib/electric/replication/postgres/migration_consumer.ex +++ b/components/electric/lib/electric/replication/postgres/migration_consumer.ex @@ -149,11 +149,11 @@ defmodule Electric.Replication.Postgres.MigrationConsumer do end defp perform_migration({version, stmts}, state) do - {:ok, loader, schema} = apply_migration(version, stmts, state.loader) + {:ok, loader, schema_version} = apply_migration(version, stmts, state.loader) Metrics.non_span_event( [:postgres, :migration], - %{electrified_tables: Schema.num_electrified_tables(schema)}, + %{electrified_tables: Schema.num_electrified_tables(schema_version.schema)}, %{migration_version: version} ) @@ -206,16 +206,16 @@ defmodule Electric.Replication.Postgres.MigrationConsumer do using the given implementation of SchemaLoader. """ @spec apply_migration(String.t(), [String.t()], SchemaLoader.t()) :: - {:ok, SchemaLoader.t(), Schema.t()} | {:error, term()} + {:ok, SchemaLoader.t(), SchemaLoader.Version.t()} | {:error, term()} def apply_migration(version, stmts, loader) when is_list(stmts) do - {:ok, old_version, schema} = SchemaLoader.load(loader) + {:ok, schema_version} = SchemaLoader.load(loader) - Logger.info("Migrating version #{old_version || ""} -> #{version}") + Logger.info("Migrating version #{schema_version.version || ""} -> #{version}") oid_loader = &SchemaLoader.relation_oid(loader, &1, &2, &3) schema = - Enum.reduce(stmts, schema, fn stmt, schema -> + Enum.reduce(stmts, schema_version.schema, fn stmt, schema -> Logger.info("Applying migration #{version}: #{inspect(stmt)}") Schema.update(schema, stmt, oid_loader: oid_loader) end) @@ -223,7 +223,7 @@ defmodule Electric.Replication.Postgres.MigrationConsumer do Logger.info("Saving schema version #{version} /#{inspect(loader)}/") - {:ok, loader} = SchemaLoader.save(loader, version, schema, stmts) - {:ok, loader, schema} + {:ok, loader, schema_version} = SchemaLoader.save(loader, version, schema, stmts) + {:ok, loader, schema_version} end end diff --git a/components/electric/lib/electric/replication/shapes.ex b/components/electric/lib/electric/replication/shapes.ex index fdc8967486..6aafd601aa 100644 --- a/components/electric/lib/electric/replication/shapes.ex +++ b/components/electric/lib/electric/replication/shapes.ex @@ -5,8 +5,7 @@ defmodule Electric.Replication.Shapes do import Electric.Postgres.Extension, only: [is_migration_relation: 1] alias Electric.Utils alias Electric.Replication.Changes.Transaction - alias Electric.Postgres.Extension.SchemaCache - alias Electric.Postgres.Schema + alias Electric.Postgres.Extension.{SchemaCache, SchemaLoader} alias Electric.Replication.Shapes.ShapeRequest use Electric.Satellite.Protobuf @@ -42,12 +41,11 @@ defmodule Electric.Replication.Shapes do @spec validate_requests([%SatShapeReq{}, ...], String.t()) :: {:ok, [ShapeRequest.t(), ...]} | {:error, [{String.t(), atom(), String.t()}]} def validate_requests(shape_requests, origin) do - {:ok, _, schema} = SchemaCache.load(origin) - # TODO: Move this graph calculation to the SchemaCache when #191 is merged - graph = Schema.public_fk_graph(schema) + {:ok, schema_version} = SchemaCache.load(origin) + graph = SchemaLoader.Version.fk_graph(schema_version) shape_requests - |> Enum.map(&validate_request(&1, schema, graph)) + |> Enum.map(&validate_request(&1, graph)) |> Enum.split_with(&is_struct(&1, ShapeRequest)) |> case do {results, []} -> {:ok, results} @@ -55,9 +53,9 @@ defmodule Electric.Replication.Shapes do end end - @spec validate_request(%SatShapeReq{}, Schema.t(), Graph.t()) :: + @spec validate_request(%SatShapeReq{}, Graph.t()) :: ShapeRequest.t() | {String.t(), atom(), String.t()} - defp validate_request(%SatShapeReq{shape_definition: shape} = request, _schema, graph) do + defp validate_request(%SatShapeReq{shape_definition: shape} = request, graph) do with :ok <- request_cannot_be_empty(shape), :ok <- table_names_are_valid(shape), :ok <- tables_should_exist(shape, graph), diff --git a/components/electric/lib/electric/replication/shapes/shape_request.ex b/components/electric/lib/electric/replication/shapes/shape_request.ex index a22dde3525..4f977e38e3 100644 --- a/components/electric/lib/electric/replication/shapes/shape_request.ex +++ b/components/electric/lib/electric/replication/shapes/shape_request.ex @@ -17,6 +17,7 @@ defmodule Electric.Replication.Shapes.ShapeRequest do alias Electric.Replication.Changes.Ownership alias Electric.Postgres.ShadowTableTransformation alias Electric.Postgres.Schema + alias Electric.Postgres.Extension.SchemaLoader alias Electric.Replication.Changes use Electric.Satellite.Protobuf @@ -91,12 +92,18 @@ defmodule Electric.Replication.Shapes.ShapeRequest do set (see [PG documentation](https://www.postgresql.org/docs/current/transaction-iso.html#XACT-REPEATABLE-READ) for details.) """ - @spec query_initial_data(t(), :epgsql.connection(), Schema.t(), String.t(), map()) :: + @spec query_initial_data(t(), :epgsql.connection(), SchemaLoader.Version.t(), String.t(), map()) :: {:ok, non_neg_integer, [Changes.NewRecord.t()]} | {:error, term()} # TODO: `filtering_context` is underdefined by design. It's a stand-in for a more complex solution while we need to enable basic functionality. - def query_initial_data(%__MODULE__{} = request, conn, schema, origin, filtering_context \\ %{}) do + def query_initial_data( + %__MODULE__{} = request, + conn, + schema_version, + origin, + filtering_context \\ %{} + ) do Enum.reduce_while(request.included_tables, {:ok, 0, []}, fn table, {:ok, num_records, acc} -> - case query_full_table(conn, table, schema, origin, filtering_context) do + case query_full_table(conn, table, schema_version, origin, filtering_context) do {:ok, count, results} -> {:cont, {:ok, num_records + count, acc ++ results}} @@ -114,16 +121,16 @@ defmodule Electric.Replication.Shapes.ShapeRequest do defp query_full_table( conn, {schema_name, name} = rel, - %Schema.Proto.Schema{} = schema, + schema_version, origin, filtering_context ) do if filtering_context[:sent_tables] && MapSet.member?(filtering_context[:sent_tables], rel) do {:ok, 0, []} else - table = Enum.find(schema.tables, &(&1.name.schema == schema_name && &1.name.name == name)) + {:ok, table} = SchemaLoader.Version.table(schema_version, rel) columns = Enum.map_join(table.columns, ", ", &~s|main."#{&1.name}"|) - {:ok, pks} = Schema.primary_keys(table) + {:ok, pks} = SchemaLoader.Version.primary_keys(schema_version, rel) pk_clause = Enum.map_join(pks, " AND ", &~s|main."#{&1}" = shadow."#{&1}"|) ownership_column = Ownership.id_column_name() diff --git a/components/electric/lib/electric/satellite/protobuf_messages.ex b/components/electric/lib/electric/satellite/protobuf_messages.ex index 7867f8a3db..38a181c1b8 100644 --- a/components/electric/lib/electric/satellite/protobuf_messages.ex +++ b/components/electric/lib/electric/satellite/protobuf_messages.ex @@ -9225,7 +9225,7 @@ end, defmodule Electric.Satellite.SatErrorResp do @moduledoc false - defstruct error_type: :INTERNAL + defstruct error_type: :INTERNAL, lsn: nil, message: nil ( ( @@ -9240,7 +9240,7 @@ @spec encode!(struct) :: iodata | no_return def encode!(msg) do - [] |> encode_error_type(msg) + [] |> encode_lsn(msg) |> encode_message(msg) |> encode_error_type(msg) end ) @@ -9264,6 +9264,28 @@ ArgumentError -> reraise Protox.EncodingError.new(:error_type, "invalid field value"), __STACKTRACE__ end + end, + defp encode_lsn(acc, msg) do + try do + case msg.lsn do + nil -> [acc] + child_field_value -> [acc, "\x12", Protox.Encode.encode_bytes(child_field_value)] + end + rescue + ArgumentError -> + reraise Protox.EncodingError.new(:lsn, "invalid field value"), __STACKTRACE__ + end + end, + defp encode_message(acc, msg) do + try do + case msg.message do + nil -> [acc] + child_field_value -> [acc, "\x1A", Protox.Encode.encode_string(child_field_value)] + end + rescue + ArgumentError -> + reraise Protox.EncodingError.new(:message, "invalid field value"), __STACKTRACE__ + end end ] @@ -9308,6 +9330,16 @@ {[error_type: value], rest} + {2, _, bytes} -> + {len, bytes} = Protox.Varint.decode(bytes) + {delimited, rest} = Protox.Decode.parse_delimited(bytes, len) + {[lsn: delimited], rest} + + {3, _, bytes} -> + {len, bytes} = Protox.Varint.decode(bytes) + {delimited, rest} = Protox.Decode.parse_delimited(bytes, len) + {[message: delimited], rest} + {tag, wire_type, rest} -> {_, rest} = Protox.Decode.parse_unknown(tag, wire_type, rest) {[], rest} @@ -9367,7 +9399,9 @@ %{ 1 => {:error_type, {:scalar, :INTERNAL}, - {:enum, Electric.Satellite.SatErrorResp.ErrorCode}} + {:enum, Electric.Satellite.SatErrorResp.ErrorCode}}, + 2 => {:lsn, {:oneof, :_lsn}, :bytes}, + 3 => {:message, {:oneof, :_message}, :string} } end @@ -9378,7 +9412,9 @@ def defs_by_name() do %{ error_type: - {1, {:scalar, :INTERNAL}, {:enum, Electric.Satellite.SatErrorResp.ErrorCode}} + {1, {:scalar, :INTERNAL}, {:enum, Electric.Satellite.SatErrorResp.ErrorCode}}, + lsn: {2, {:oneof, :_lsn}, :bytes}, + message: {3, {:oneof, :_message}, :string} } end ) @@ -9395,6 +9431,24 @@ name: :error_type, tag: 1, type: {:enum, Electric.Satellite.SatErrorResp.ErrorCode} + }, + %{ + __struct__: Protox.Field, + json_name: "lsn", + kind: {:oneof, :_lsn}, + label: :proto3_optional, + name: :lsn, + tag: 2, + type: :bytes + }, + %{ + __struct__: Protox.Field, + json_name: "message", + kind: {:oneof, :_message}, + label: :proto3_optional, + name: :message, + tag: 3, + type: :string } ] end @@ -9441,6 +9495,64 @@ }} end ), + ( + def field_def(:lsn) do + {:ok, + %{ + __struct__: Protox.Field, + json_name: "lsn", + kind: {:oneof, :_lsn}, + label: :proto3_optional, + name: :lsn, + tag: 2, + type: :bytes + }} + end + + def field_def("lsn") do + {:ok, + %{ + __struct__: Protox.Field, + json_name: "lsn", + kind: {:oneof, :_lsn}, + label: :proto3_optional, + name: :lsn, + tag: 2, + type: :bytes + }} + end + + [] + ), + ( + def field_def(:message) do + {:ok, + %{ + __struct__: Protox.Field, + json_name: "message", + kind: {:oneof, :_message}, + label: :proto3_optional, + name: :message, + tag: 3, + type: :string + }} + end + + def field_def("message") do + {:ok, + %{ + __struct__: Protox.Field, + json_name: "message", + kind: {:oneof, :_message}, + label: :proto3_optional, + name: :message, + tag: 3, + type: :string + }} + end + + [] + ), def field_def(_) do {:error, :no_such_field} end @@ -9468,6 +9580,12 @@ def default(:error_type) do {:ok, :INTERNAL} end, + def default(:lsn) do + {:error, :no_default_value} + end, + def default(:message) do + {:error, :no_default_value} + end, def default(_) do {:error, :no_such_field} end diff --git a/components/electric/lib/electric/satellite/protocol.ex b/components/electric/lib/electric/satellite/protocol.ex index 2467aa5810..5ba1f6ceaf 100644 --- a/components/electric/lib/electric/satellite/protocol.ex +++ b/components/electric/lib/electric/satellite/protocol.ex @@ -23,6 +23,7 @@ defmodule Electric.Satellite.Protocol do alias Electric.Replication.Shapes.ShapeRequest alias Electric.Satellite.Serialization alias Electric.Satellite.ClientManager + alias Electric.Satellite.WriteValidation alias Electric.Telemetry.Metrics require Logger @@ -563,16 +564,25 @@ defmodule Electric.Satellite.Protocol do {incomplete, complete} -> complete = Enum.reverse(complete) - for tx <- complete do - telemetry_event(state, :transaction_receive, Transaction.count_operations(tx)) - end + case WriteValidation.validate_transactions!( + complete, + {SchemaCache, Connectors.origin(state.pg_connector_opts)} + ) do + {:ok, accepted} -> + {nil, send_transactions(accepted, incomplete, state)} + + {:error, accepted, error, trailing} -> + state = send_transactions(accepted, incomplete, state) + telemetry_event(state, :bad_transaction) - in_rep = - %InRep{in_rep | incomplete_trans: incomplete} - |> InRep.add_to_queue(complete) - |> send_downstream() + Logger.error([ + "WriteValidation.Error: " <> to_string(error), + "\n", + "Dropping #{length(trailing)} unapplied transactions: #{Enum.map(trailing, & &1.lsn) |> inspect()}" + ]) - {nil, %State{state | in_rep: in_rep}} + {:error, WriteValidation.Error.error_response(error)} + end end rescue e -> @@ -600,6 +610,19 @@ defmodule Electric.Satellite.Protocol do {:error, %SatErrorResp{error_type: :INVALID_REQUEST}} end + defp send_transactions(complete, incomplete, state) do + for tx <- complete do + telemetry_event(state, :transaction_receive, Transaction.count_operations(tx)) + end + + in_rep = + %InRep{state.in_rep | incomplete_trans: incomplete} + |> InRep.add_to_queue(complete) + |> send_downstream() + + %State{state | in_rep: in_rep} + end + @spec handle_start_replication_request( %SatInStartReplicationReq{}, binary() | :initial_sync, diff --git a/components/electric/lib/electric/satellite/serialization.ex b/components/electric/lib/electric/satellite/serialization.ex index 47df8b9f4f..02cbcde4f9 100644 --- a/components/electric/lib/electric/satellite/serialization.ex +++ b/components/electric/lib/electric/satellite/serialization.ex @@ -105,10 +105,10 @@ defmodule Electric.Satellite.Serialization do if version && version != v, do: raise("Got DDL transaction with differing migration versions") - {:ok, schema} = maybe_load_schema(origin, schema, v) + {:ok, schema_version} = maybe_load_schema(origin, schema, v) {ops, add_relations} = - case Replication.migrate(schema, v, sql) do + case Replication.migrate(schema_version, sql) do {:ok, [op], relations} -> {[%SatTransOp{op: {:migrate, op}} | ops], relations} @@ -126,7 +126,7 @@ defmodule Electric.Satellite.Serialization do state | ops: ops, migration_version: v, - schema: schema, + schema: schema_version, new_relations: new_relations ++ add_relations, known_relations: known_relations } @@ -167,7 +167,7 @@ defmodule Electric.Satellite.Serialization do end defp maybe_load_schema(origin, nil, version) do - with {:ok, _version, schema} <- Extension.SchemaCache.load(origin, version) do + with {:ok, schema} <- Extension.SchemaCache.load(origin, version) do {:ok, schema} else error -> @@ -416,11 +416,11 @@ defmodule Electric.Satellite.Serialization do %SatOpUpdate{row_data: row_data, old_row_data: old_row_data, tags: tags}, columns ) do - %UpdatedRecord{ + UpdatedRecord.new( record: decode_record!(row_data, columns), old_record: decode_record!(old_row_data, columns), tags: tags - } + ) end defp op_to_change(%SatOpDelete{old_row_data: nil, tags: tags}, _columns) do diff --git a/components/electric/lib/electric/satellite/write_validation.ex b/components/electric/lib/electric/satellite/write_validation.ex new file mode 100644 index 0000000000..33b4c15749 --- /dev/null +++ b/components/electric/lib/electric/satellite/write_validation.ex @@ -0,0 +1,133 @@ +defmodule Electric.Satellite.WriteValidation do + alias Electric.Satellite.WriteValidation + alias Electric.Replication.Changes + alias Electric.Postgres.Extension.SchemaLoader + + @type result() :: :ok | {:error, Changes.change(), String.t()} + @type allowed_result() :: :ok | {:error, reason :: String.t()} + @type txns() :: [Changes.Transaction.t()] + + @type insert() :: Changes.NewRecord.t() + @type update() :: Changes.UpdatedRecord.t() + @type delete() :: Changes.DeletedRecord.t() + + @callback validate_insert(insert(), SchemaLoader.Version.t()) :: allowed_result() + @callback validate_update(update(), SchemaLoader.Version.t()) :: allowed_result() + @callback validate_delete(delete(), SchemaLoader.Version.t()) :: allowed_result() + + defmodule Error do + defstruct [:tx, :reason, :verifier, :change] + + @type t() :: %__MODULE__{ + tx: Changes.Transaction.t(), + reason: String.t(), + verifier: module(), + change: Changes.change() + } + + def error_response(error) do + %Electric.Satellite.SatErrorResp{ + error_type: :INVALID_REQUEST, + lsn: error.tx.lsn, + message: to_string(error) + } + end + + defimpl String.Chars do + def to_string(error) do + "Applying transaction [LSN #{error.tx.lsn || "?"}] " <> + "failed write validation tests for the following reason: " <> + error.reason <> + if(error.verifier, do: " (#{verifier_name(error.verifier)})", else: "") <> + if(error.change, do: " change: #{inspect(error.change)} ", else: "") + end + + defp verifier_name(module) do + Module.split(module) |> Enum.at(-1) + end + end + end + + @validations [ + WriteValidation.ImmutablePrimaryKey + ] + + defmacro __using__(_opts \\ []) do + quote do + alias Electric.Replication.Changes.{UpdatedRecord, NewRecord, DeletedRecord} + + @behaviour Electric.Satellite.WriteValidation + + def validate_insert(_, _), do: :ok + def validate_update(_, _), do: :ok + def validate_delete(_, _), do: :ok + + defoverridable validate_insert: 2, validate_update: 2, validate_delete: 2 + end + end + + @spec validate_transactions!(txns(), SchemaLoader.t()) :: + {:ok, txns()} | {:error, term()} | {:error, txns(), Error.t(), txns()} + def validate_transactions!(txns, schema_loader) do + with {:ok, schema_version} <- SchemaLoader.load(schema_loader) do + take_while_ok(txns, &is_valid_tx?(&1, schema_version), []) + end + end + + @spec is_valid_tx?(Changes.Transaction.t(), SchemaLoader.Version.t()) :: + :ok | {:error, Error.t()} + defp is_valid_tx?(%Changes.Transaction{changes: changes} = tx, schema_version) do + all_ok?(changes, &is_valid_change?(&1, schema_version), fn _src, error -> + {:error, %{error | tx: tx}} + end) + end + + defp is_valid_change?(op, schema_version) do + all_ok?( + @validations, + validation_function(op, schema_version), + &validation_error(&1, &2, op) + ) + end + + defp validation_function(op, schema_version) do + case op do + %Changes.NewRecord{} -> & &1.validate_insert(op, schema_version) + %Changes.UpdatedRecord{} -> & &1.validate_update(op, schema_version) + %Changes.DeletedRecord{} -> & &1.validate_delete(op, schema_version) + # ignore compensation messages + %Changes.Compensation{} -> fn _ -> :ok end + end + end + + defp validation_error(validation_module, reason, change) do + {:error, %Error{reason: reason, verifier: validation_module, change: change}} + end + + defp all_ok?([], _fun, _error_fun) do + :ok + end + + defp all_ok?([c | t], fun, error_fun) do + case fun.(c) do + :ok -> all_ok?(t, fun, error_fun) + {:error, error} -> error_fun.(c, error) + end + end + + @spec take_while_ok(txns(), (Changes.Transaction.t() -> :ok | {:error, Error.t()}), txns()) :: + {:ok, txns()} | {:error, txns(), Error.t(), txns()} + defp take_while_ok([tx | tail], fun, acc) do + case fun.(tx) do + :ok -> + take_while_ok(tail, fun, [tx | acc]) + + {:error, error} -> + {:error, :lists.reverse(acc), error, tail} + end + end + + defp take_while_ok([], _, acc) do + {:ok, :lists.reverse(acc)} + end +end diff --git a/components/electric/lib/electric/satellite/write_validation/immutable_primary_key.ex b/components/electric/lib/electric/satellite/write_validation/immutable_primary_key.ex new file mode 100644 index 0000000000..bc77e6c04b --- /dev/null +++ b/components/electric/lib/electric/satellite/write_validation/immutable_primary_key.ex @@ -0,0 +1,24 @@ +defmodule Electric.Satellite.WriteValidation.ImmutablePrimaryKey do + use Electric.Satellite.WriteValidation + + alias Electric.Postgres.Extension.SchemaLoader + + def validate_update(%UpdatedRecord{} = change, schema_version) do + %{relation: relation, changed_columns: changed} = change + + if Enum.empty?(changed) do + :ok + else + {:ok, pks} = SchemaLoader.Version.primary_keys(schema_version, relation) + + case Enum.filter(pks, &MapSet.member?(changed, &1)) do + [] -> + :ok + + changed_pks -> + {:error, + "Attempt to update table #{Electric.Utils.inspect_relation(relation)} primary key(s) #{inspect(changed_pks)}"} + end + end + end +end diff --git a/components/electric/lib/electric/utils.ex b/components/electric/lib/electric/utils.ex index 05ca4f21c8..50fa5bfc18 100644 --- a/components/electric/lib/electric/utils.ex +++ b/components/electric/lib/electric/utils.ex @@ -371,4 +371,12 @@ defmodule Electric.Utils do params ++ [ssl: true] end end + + @doc """ + Output a 2-tuple relation (table) reference as pg-style `"schema"."table"`. + """ + @spec inspect_relation({String.t(), String.t()}) :: String.t() + def inspect_relation({schema, name}) do + "#{inspect(schema)}.#{inspect(name)}" + end end diff --git a/components/electric/test/electric/postgres/extension/schema_cache_test.exs b/components/electric/test/electric/postgres/extension/schema_cache_test.exs index b18337be61..2716266d80 100644 --- a/components/electric/test/electric/postgres/extension/schema_cache_test.exs +++ b/components/electric/test/electric/postgres/extension/schema_cache_test.exs @@ -96,7 +96,7 @@ defmodule Electric.Postgres.Extension.SchemaCacheTest do use Electric.Extension.Case, async: false alias Electric.Replication.Postgres - alias Electric.Postgres.Extension + alias Electric.Postgres.{Extension, Extension.SchemaLoader} alias Electric.Postgres.Schema alias Electric.Postgres.Replication.{Column, Table} @@ -152,7 +152,8 @@ defmodule Electric.Postgres.Extension.SchemaCacheTest do {conn_config, [producer: producer, refresh_subscription: false]}} ) - {:ok, _pid} = start_supervised({MockConsumer, parent: self(), producer: migration_consumer}) + {:ok, _pid} = + start_supervised({MockConsumer, parent: self(), producer: migration_consumer}) txs = for {version, stmts} <- cxt.migrations do @@ -199,11 +200,15 @@ defmodule Electric.Postgres.Extension.SchemaCacheTest do [_version1, version2] = cxt.versions - assert {:ok, ^version2, schema2} = Extension.SchemaCache.load(cxt.origin) + assert {:ok, %{version: ^version2, schema: schema2} = schema_version} = + Extension.SchemaCache.load(cxt.origin) assert {:ok, table_a} = Schema.fetch_table(schema2, {"public", "a"}) assert {:ok, table_b} = Schema.fetch_table(schema2, {"b", "b"}) + assert {:ok, ^table_a} = SchemaLoader.Version.table(schema_version, {"public", "a"}) + assert {:ok, ^table_b} = SchemaLoader.Version.table(schema_version, {"b", "b"}) + assert table_a.oid == table_oid(conn, "public", "a") assert table_b.oid == table_oid(conn, "b", "b") @@ -214,11 +219,14 @@ defmodule Electric.Postgres.Extension.SchemaCacheTest do [version1, _version2] = cxt.versions - assert {:ok, ^version1, schema1} = Extension.SchemaCache.load(cxt.origin, version1) + assert {:ok, %{version: ^version1, schema: schema1} = schema_version} = + Extension.SchemaCache.load(cxt.origin, version1) assert {:ok, table_a} = Schema.fetch_table(schema1, {"public", "a"}) assert {:error, _} = Schema.fetch_table(schema1, {"b", "b"}) + assert {:ok, ^table_a} = SchemaLoader.Version.table(schema_version, {"public", "a"}) + assert table_a.oid == table_oid(conn, "public", "a") end end @@ -227,11 +235,14 @@ defmodule Electric.Postgres.Extension.SchemaCacheTest do test_tx "provides the correct primary keys for a table", fn conn, cxt -> {:ok, _producer} = bootstrap(conn, cxt) - assert {:ok, ["aid"]} = Extension.SchemaCache.primary_keys(cxt.origin, "public", "a") - assert {:ok, ["aid"]} = Extension.SchemaCache.primary_keys(cxt.origin, {"public", "a"}) + assert {:ok, schema_version} = Extension.SchemaCache.load(cxt.origin) + assert {:ok, ["aid"]} = SchemaLoader.Version.primary_keys(schema_version, "public", "a") + assert {:ok, ["aid"]} = SchemaLoader.Version.primary_keys(schema_version, {"public", "a"}) + + assert {:ok, ["bid1", "bid2"]} = SchemaLoader.Version.primary_keys(schema_version, "b", "b") - assert {:ok, ["bid1", "bid2"]} = Extension.SchemaCache.primary_keys(cxt.origin, "b", "b") - assert {:ok, ["bid1", "bid2"]} = Extension.SchemaCache.primary_keys(cxt.origin, {"b", "b"}) + assert {:ok, ["bid1", "bid2"]} = + SchemaLoader.Version.primary_keys(schema_version, {"b", "b"}) end end diff --git a/components/electric/test/electric/postgres/extension_test.exs b/components/electric/test/electric/postgres/extension_test.exs index 7a48b62612..1b5e3bb8f0 100644 --- a/components/electric/test/electric/postgres/extension_test.exs +++ b/components/electric/test/electric/postgres/extension_test.exs @@ -105,7 +105,7 @@ defmodule Electric.Postgres.ExtensionTest do tx(&migrate/1, cxt) end - test_tx "we can retrieve and set the current schema json", fn conn -> + test_tx("we can retrieve and set the current schema json", fn conn -> assert {:ok, nil, %Schema.Proto.Schema{tables: []}} = Extension.current_schema(conn) schema = Schema.new() version = "20230405171534_1" @@ -137,9 +137,9 @@ defmodule Electric.Postgres.ExtensionTest do ]) assert {:ok, ^version, ^schema} = Extension.current_schema(conn) - end + end) - test_tx "we can retrieve the schema for a given version", fn conn -> + test_tx("we can retrieve the schema for a given version", fn conn -> assert {:ok, nil, %Schema.Proto.Schema{tables: []}} = Extension.current_schema(conn) schema = Schema.new() version = "20230405171534_1" @@ -173,9 +173,9 @@ defmodule Electric.Postgres.ExtensionTest do assert {:ok, ^version, ^schema} = Extension.current_schema(conn) assert {:ok, ^version, ^schema} = Extension.schema_version(conn, version) - end + end) - test_tx "we can retrieve the sql of applied migrations", fn conn -> + test_tx("we can retrieve the sql of applied migrations", fn conn -> migrations = [ {"0001", [ @@ -207,9 +207,9 @@ defmodule Electric.Postgres.ExtensionTest do [_m1, _m2, m3, m4] = migrations assert [m3, m4] == migration_history(conn, "0002") - end + end) - test_tx "logical replication ddl is not captured", fn conn -> + test_tx("logical replication ddl is not captured", fn conn -> sql1 = "CREATE PUBLICATION all_tables FOR ALL TABLES;" sql2 = @@ -223,12 +223,12 @@ defmodule Electric.Postgres.ExtensionTest do end assert {:ok, []} = Extension.ddl_history(conn) - end + end) describe "electrification" do alias Electric.Postgres.SQLGenerator - test_tx "can generate the ddl to create any index", fn conn -> + test_tx("can generate the ddl to create any index", fn conn -> assert {:ok, agent} = SQLGenerator.SchemaAgent.start_link() namespace = "something" @@ -277,10 +277,10 @@ defmodule Electric.Postgres.ExtensionTest do %{} = new_index = Enum.find(new_table.indexes, &(&1.name == index.name)) assert new_index == index end - end + end) @tag timeout: 30_000 - test_tx "can generate the ddl to create any table", fn conn -> + test_tx("can generate the ddl to create any table", fn conn -> assert {:ok, agent} = SQLGenerator.SchemaAgent.start_link() namespace = "something" @@ -321,9 +321,9 @@ defmodule Electric.Postgres.ExtensionTest do assert {:ok, new_table} = Schema.fetch_table(new_schema, table.name) assert new_table == table end - end + end) - test_tx "generated ddl includes defaults and constraints", fn conn -> + test_tx("generated ddl includes defaults and constraints", fn conn -> create_parent_table = """ CREATE TABLE public.parent ( id uuid PRIMARY KEY NOT NULL @@ -361,9 +361,9 @@ defmodule Electric.Postgres.ExtensionTest do """ - end + end) - test_tx "generated ddl includes indexes", fn conn -> + test_tx("generated ddl includes indexes", fn conn -> create_table = """ CREATE TABLE public.something ( id uuid PRIMARY KEY NOT NULL, @@ -398,9 +398,9 @@ defmodule Electric.Postgres.ExtensionTest do CREATE UNIQUE INDEX something_val1_uniq_idx ON public.something USING btree (val1); """ - end + end) - test_tx "table electrification creates shadow tables", fn conn -> + test_tx("table electrification creates shadow tables", fn conn -> sql1 = "CREATE TABLE public.buttercup (id int4 GENERATED ALWAYS AS IDENTITY PRIMARY KEY);" sql2 = "CREATE TABLE public.daisy (id int4 GENERATED ALWAYS AS IDENTITY PRIMARY KEY);" sql3 = "CALL electric.electrify('buttercup')" @@ -420,9 +420,9 @@ defmodule Electric.Postgres.ExtensionTest do conn, "SELECT 1 FROM pg_class JOIN pg_namespace ON relnamespace = pg_namespace.oid WHERE relname = 'shadow__public__daisy' AND nspname = 'electric'" ) - end + end) - test_tx "table electrification successfully validates column types", fn conn -> + test_tx("table electrification successfully validates column types", fn conn -> assert [{:ok, [], []}, {:ok, [], []}] == :epgsql.squery(conn, """ CREATE TABLE public.t1 ( @@ -449,9 +449,9 @@ defmodule Electric.Postgres.ExtensionTest do ); CALL electric.electrify('public.t1'); """) - end + end) - test_tx "table electrification rejects invalid column types", fn conn -> + test_tx("table electrification rejects invalid column types", fn conn -> assert [ {:ok, [], []}, {:error, {:error, :error, _, :raise_exception, error_msg, _}} @@ -476,9 +476,9 @@ defmodule Electric.Postgres.ExtensionTest do created_at time with time zone """ |> String.trim() - end + end) - test_tx "electrified?/3", fn conn -> + test_tx("electrified?/3", fn conn -> sql1 = "CREATE TABLE public.buttercup (id int4 GENERATED ALWAYS AS IDENTITY PRIMARY KEY);" sql2 = "CREATE TABLE public.daisy (id int4 GENERATED ALWAYS AS IDENTITY PRIMARY KEY);" sql3 = "CALL electric.electrify('buttercup')" @@ -492,9 +492,9 @@ defmodule Electric.Postgres.ExtensionTest do assert {:ok, false} = Extension.electrified?(conn, "daisy") assert {:ok, false} = Extension.electrified?(conn, "public", "daisy") - end + end) - test_tx "table electrification rejects default column expressions", fn conn -> + test_tx("table electrification rejects default column expressions", fn conn -> assert [ {:ok, [], []}, {:error, {:error, :error, _, :raise_exception, error_msg, _}} @@ -517,7 +517,7 @@ defmodule Electric.Postgres.ExtensionTest do "Ts" """ |> String.trim() - end + end) end defp migration_history(conn, after_version \\ nil) do diff --git a/components/electric/test/electric/postgres/proxy/prisma/query_test.exs b/components/electric/test/electric/postgres/proxy/prisma/query_test.exs index 52032790d9..51bb311100 100644 --- a/components/electric/test/electric/postgres/proxy/prisma/query_test.exs +++ b/components/electric/test/electric/postgres/proxy/prisma/query_test.exs @@ -25,8 +25,8 @@ defmodule Electric.Postgres.Proxy.Prisma.QueryTest do loader_spec = MockSchemaLoader.backend_spec(migrations: migrations) {:ok, loader} = SchemaLoader.connect(loader_spec, []) - {:ok, version, schema} = SchemaLoader.load(loader) - {:ok, version: version, schema: schema, loader: loader} + {:ok, schema_version} = SchemaLoader.load(loader) + {:ok, version: schema_version.version, schema: schema_version, loader: loader} end test "TableV5_2", cxt do diff --git a/components/electric/test/electric/postgres/proxy/prisma_test.exs b/components/electric/test/electric/postgres/proxy/prisma_test.exs index 7b689007b8..82d2931220 100644 --- a/components/electric/test/electric/postgres/proxy/prisma_test.exs +++ b/components/electric/test/electric/postgres/proxy/prisma_test.exs @@ -505,7 +505,7 @@ defmodule Electric.Postgres.Proxy.PrismaTest do end test "client server session", cxt do - {:ok, _version, schema} = SchemaLoader.load(cxt.loader) + {:ok, schema} = SchemaLoader.load(cxt.loader) Enum.reduce(@queries, {cxt.injector, 0}, fn {module, sql}, {injector, n} -> m = n + 1 diff --git a/components/electric/test/electric/postgres/replication_test.exs b/components/electric/test/electric/postgres/replication_test.exs index e14ef8ed4a..a2c0de2bb1 100644 --- a/components/electric/test/electric/postgres/replication_test.exs +++ b/components/electric/test/electric/postgres/replication_test.exs @@ -3,7 +3,7 @@ defmodule Electric.Postgres.ReplicationTest do use Electric.Satellite.Protobuf - alias Electric.Postgres.{Replication, Schema} + alias Electric.Postgres.{Replication, Schema, Extension.SchemaLoader} def parse(sql) do Electric.Postgres.parse!(sql) @@ -122,8 +122,9 @@ defmodule Electric.Postgres.ReplicationTest do schema = schema_update(stmt) version = "20230405134615" + schema_version = SchemaLoader.Version.new(version, schema) - assert {:ok, [msg], [{"public", "fish"}]} = Replication.migrate(schema, version, stmt) + assert {:ok, [msg], [{"public", "fish"}]} = Replication.migrate(schema_version, stmt) # there are lots of tests that validate the schema is being properly updated # assert Schema.table_names(schema) == [~s("public"."fish"), ~s("frog"), ~s("teeth"."front")] @@ -162,8 +163,9 @@ defmodule Electric.Postgres.ReplicationTest do """ schema = schema_update(schema, stmt) + schema_version = SchemaLoader.Version.new(version, schema) - assert {:ok, [msg], [{"teeth", "front"}]} = Replication.migrate(schema, version, stmt) + assert {:ok, [msg], [{"teeth", "front"}]} = Replication.migrate(schema_version, stmt) assert Schema.table_names(schema) == [~s("public"."fish"), ~s("teeth"."front")] assert %SatOpMigrate{version: ^version} = msg %{stmts: stmts, table: table} = msg @@ -206,8 +208,9 @@ defmodule Electric.Postgres.ReplicationTest do schema = schema_update(stmt) version = "20230405134615" + schema_version = SchemaLoader.Version.new(version, schema) - assert {:ok, [_msg], [{"public", "fish"}]} = Replication.migrate(schema, version, stmt) + assert {:ok, [_msg], [{"public", "fish"}]} = Replication.migrate(schema_version, stmt) # there are lots of tests that validate the schema is being properly updated assert Schema.table_names(schema) == [~s("public"."fish")] @@ -216,8 +219,9 @@ defmodule Electric.Postgres.ReplicationTest do "ALTER TABLE fish ADD COLUMN value jsonb DEFAULT '{}', ADD COLUMN ts timestamp DEFAULT current_timestamp;" schema = schema_update(schema, stmt) + schema_version = SchemaLoader.Version.new(version, schema) - assert {:ok, [msg], [{"public", "fish"}]} = Replication.migrate(schema, version, stmt) + assert {:ok, [msg], [{"public", "fish"}]} = Replication.migrate(schema_version, stmt) assert %SatOpMigrate{version: ^version} = msg @@ -263,14 +267,16 @@ defmodule Electric.Postgres.ReplicationTest do schema = schema_update(stmt) version = "20230405134615" + schema_version = SchemaLoader.Version.new(version, schema) - assert {:ok, [_msg], [{"public", "fish"}]} = Replication.migrate(schema, version, stmt) + assert {:ok, [_msg], [{"public", "fish"}]} = Replication.migrate(schema_version, stmt) stmt = "CREATE INDEX fish_available_index ON public.fish (avilable);" schema = schema_update(schema, stmt) version = "20230405134616" - assert {:ok, [msg], []} = Replication.migrate(schema, version, stmt) + schema_version = SchemaLoader.Version.new(version, schema) + assert {:ok, [msg], []} = Replication.migrate(schema_version, stmt) assert %SatOpMigrate{version: ^version} = msg %{stmts: stmts, table: table} = msg @@ -300,9 +306,10 @@ defmodule Electric.Postgres.ReplicationTest do schema = Schema.new() version = "20230405134615" + schema_version = SchemaLoader.Version.new(version, schema) for stmt <- stmts do - assert {:ok, [], []} = Replication.migrate(schema, version, stmt) + assert {:ok, [], []} = Replication.migrate(schema_version, stmt) end end @@ -322,8 +329,9 @@ defmodule Electric.Postgres.ReplicationTest do # ] # version = "20230405134615" + # schema_version = SchemaLoader.Version.new(version, schema) - # assert {:error, schema} = Replication.migrate(schema, version, stmts) + # assert {:error, schema} = Replication.migrate(schema_version, stmts) # end end end diff --git a/components/electric/test/electric/postgres/shadow_table_transformation_test.exs b/components/electric/test/electric/postgres/shadow_table_transformation_test.exs index 1b7fa1a333..0c1d5c9541 100644 --- a/components/electric/test/electric/postgres/shadow_table_transformation_test.exs +++ b/components/electric/test/electric/postgres/shadow_table_transformation_test.exs @@ -56,12 +56,12 @@ defmodule Electric.Postgres.ShadowTableTransformationTest do test "UPDATE operation gets converted and split correctly" do assert [main_change, shadow_change] = ShadowTableTransformation.split_change_into_main_and_shadow( - %Changes.UpdatedRecord{ + Changes.UpdatedRecord.new( relation: @relation, record: %{"id" => "wow", "content" => "test", "content_b" => "new"}, old_record: %{"id" => "wow", "content" => "test", "content_b" => "old"}, tags: [@observed] - }, + ), relations(), @transaction_tag, nil @@ -213,14 +213,14 @@ defmodule Electric.Postgres.ShadowTableTransformationTest do "_tags" => serialize_pg_array([]) } }, - %Changes.UpdatedRecord{ + Changes.UpdatedRecord.new( relation: shadow_of(@relation), record: %{ "id" => "wow", "_tag" => @pg_transaction_tag, "_tags" => serialize_pg_array([@pg_transaction_tag]) } - } + ) ] # to match the parsing order of logical replication producer |> Enum.reverse() diff --git a/components/electric/test/electric/replication/changes_test.exs b/components/electric/test/electric/replication/changes_test.exs new file mode 100644 index 0000000000..54634039b6 --- /dev/null +++ b/components/electric/test/electric/replication/changes_test.exs @@ -0,0 +1,44 @@ +defmodule Electric.Replication.ChangesTest do + use ExUnit.Case, async: true + + alias Electric.Replication.Changes.UpdatedRecord + + describe "UpdatedRecord.changed_columns" do + test "is empty when old_record is nil" do + changed_columns = MapSet.new([]) + + assert %UpdatedRecord{changed_columns: ^changed_columns} = + UpdatedRecord.new(old_record: nil, record: %{"this" => "that"}) + end + + test "captures column if new value != old value" do + changed_columns = MapSet.new(["first"]) + + assert %UpdatedRecord{changed_columns: ^changed_columns} = + UpdatedRecord.new( + old_record: %{"first" => "first value", "second" => "second value"}, + record: %{"first" => "updated first value", "second" => "second value"} + ) + end + + test "captures column if old record does not have column value" do + changed_columns = MapSet.new(["first", "second"]) + + assert %UpdatedRecord{changed_columns: ^changed_columns} = + UpdatedRecord.new( + old_record: %{"first" => "first value"}, + record: %{"first" => "updated first value", "second" => "second value"} + ) + end + + test "ignores column if new does not have value" do + changed_columns = MapSet.new(["second"]) + + assert %UpdatedRecord{changed_columns: ^changed_columns} = + UpdatedRecord.new( + old_record: %{"first" => "first value", "second" => "second value"}, + record: %{"second" => "second updated value"} + ) + end + end +end diff --git a/components/electric/test/electric/replication/postgres/migration_consumer_test.exs b/components/electric/test/electric/replication/postgres/migration_consumer_test.exs index 3f49f18906..2fbc110d8e 100644 --- a/components/electric/test/electric/replication/postgres/migration_consumer_test.exs +++ b/components/electric/test/electric/replication/postgres/migration_consumer_test.exs @@ -121,7 +121,7 @@ defmodule Electric.Replication.Postgres.MigrationConsumerTest do GenStage.call(producer, {:emit, cxt.loader, events, version}) - assert_receive {MockSchemaLoader, {:refresh_subscription, ^origin}}, 500 + assert_receive {MockSchemaLoader, {:refresh_subscription, ^origin}}, 1500 end test "migration consumer stage captures migration records", cxt do diff --git a/components/electric/test/electric/satellite/serialization_test.exs b/components/electric/test/electric/satellite/serialization_test.exs index 3f0f8b07a2..d16845f6cb 100644 --- a/components/electric/test/electric/satellite/serialization_test.exs +++ b/components/electric/test/electric/satellite/serialization_test.exs @@ -348,7 +348,7 @@ defmodule Electric.Satellite.SerializationTest do {[], schema} end) - assert {:ok, _} = SchemaCache.save(cxt.origin, version, schema, stmts) + assert {:ok, _, _} = SchemaCache.save(cxt.origin, version, schema, stmts) tx end diff --git a/components/electric/test/electric/satellite/write_validation/immutable_primary_key_test.exs b/components/electric/test/electric/satellite/write_validation/immutable_primary_key_test.exs new file mode 100644 index 0000000000..e71217987b --- /dev/null +++ b/components/electric/test/electric/satellite/write_validation/immutable_primary_key_test.exs @@ -0,0 +1,236 @@ +defmodule Electric.Satellite.WriteValidation.ImmutablePrimaryKeyTest do + use ExUnit.Case, async: true + + alias Electric.Postgres.Schema + alias Electric.Postgres.MockSchemaLoader + alias Electric.Postgres.Extension.SchemaLoader + alias Electric.Replication.Changes + + alias Electric.Satellite.WriteValidation.ImmutablePrimaryKey + + setup do + migrations = [ + """ + CREATE TABLE public.single_pk ( + id uuid PRIMARY KEY, + value text, + amount integer + ); + """, + """ + CREATE TABLE public.compound_pk ( + id uuid, + owner uuid, + value text, + amount integer, + PRIMARY KEY (id, owner) + ); + """ + ] + + oid_loader = &MockSchemaLoader.oid_loader/3 + + schema = + Enum.reduce(migrations, Schema.new(), &Schema.update(&2, &1, oid_loader: oid_loader)) + + schema = SchemaLoader.Version.new("001", schema) + + assert {:ok, ["id"]} = SchemaLoader.Version.primary_keys(schema, "public", "single_pk") + + assert {:ok, ["id", "owner"]} = + SchemaLoader.Version.primary_keys(schema, "public", "compound_pk") + + {:ok, schema: schema} + end + + test "allows inserts", cxt do + assert :ok = + ImmutablePrimaryKey.validate_insert( + %Changes.NewRecord{ + relation: {"public", "single_pk"}, + record: %{ + "id" => "471a929e-3e8e-419c-a6d2-9ab3f32294d2", + "owner" => "bba4358f-7ba7-41a2-b32c-d8f145f16f41", + "value" => "something", + "amount" => "3" + } + }, + cxt.schema + ) + + assert :ok = + ImmutablePrimaryKey.validate_insert( + %Changes.NewRecord{ + relation: {"public", "compound_pk"}, + record: %{ + "id" => "471a929e-3e8e-419c-a6d2-9ab3f32294d2", + "value" => "something", + "amount" => "3" + } + }, + cxt.schema + ) + end + + test "allows deletes", cxt do + assert :ok = + ImmutablePrimaryKey.validate_delete( + %Changes.DeletedRecord{ + relation: {"public", "single_pk"}, + old_record: %{ + "id" => "471a929e-3e8e-419c-a6d2-9ab3f32294d2", + "value" => "something", + "amount" => "3" + } + }, + cxt.schema + ) + + assert :ok = + ImmutablePrimaryKey.validate_delete( + %Changes.DeletedRecord{ + relation: {"public", "compound_pk"}, + old_record: %{ + "id" => "471a929e-3e8e-419c-a6d2-9ab3f32294d2", + "owner" => "bba4358f-7ba7-41a2-b32c-d8f145f16f41", + "value" => "something", + "amount" => "3" + } + }, + cxt.schema + ) + end + + test "allows updates that don't affect a primary key", cxt do + assert :ok = + ImmutablePrimaryKey.validate_update( + Changes.UpdatedRecord.new( + relation: {"public", "single_pk"}, + record: %{ + "id" => "471a929e-3e8e-419c-a6d2-9ab3f32294d2", + "value" => "something", + "amount" => "3" + }, + old_record: %{ + "id" => "471a929e-3e8e-419c-a6d2-9ab3f32294d2", + "value" => "something else", + "amount" => "4" + } + ), + cxt.schema + ) + + assert :ok = + ImmutablePrimaryKey.validate_update( + Changes.UpdatedRecord.new( + relation: {"public", "compound_pk"}, + old_record: %{ + "id" => "471a929e-3e8e-419c-a6d2-9ab3f32294d2", + "owner" => "bba4358f-7ba7-41a2-b32c-d8f145f16f41", + "value" => "something", + "amount" => "3" + }, + record: %{ + "id" => "471a929e-3e8e-419c-a6d2-9ab3f32294d2", + "owner" => "bba4358f-7ba7-41a2-b32c-d8f145f16f41", + "value" => "something else", + "amount" => "4" + } + ), + cxt.schema + ) + end + + test "allows upserts", cxt do + assert :ok = + ImmutablePrimaryKey.validate_update( + Changes.UpdatedRecord.new( + relation: {"public", "single_pk"}, + old_record: nil, + record: %{ + "id" => "471a929e-3e8e-419c-a6d2-9ab3f32294d2", + "value" => "something", + "amount" => "3" + } + ), + cxt.schema + ) + + assert :ok = + ImmutablePrimaryKey.validate_update( + Changes.UpdatedRecord.new( + relation: {"public", "compound_pk"}, + old_record: nil, + record: %{ + "id" => "471a929e-3e8e-419c-a6d2-9ab3f32294d2", + "owner" => "bba4358f-7ba7-41a2-b32c-d8f145f16f41", + "value" => "something else", + "amount" => "4" + } + ), + cxt.schema + ) + end + + test "disallows updates that affect a primary key", cxt do + assert {:error, _} = + ImmutablePrimaryKey.validate_update( + Changes.UpdatedRecord.new( + relation: {"public", "single_pk"}, + old_record: %{ + "id" => "f0847f32-d9a5-4006-b9f3-f715654ca0a7", + "value" => "something else", + "amount" => "4" + }, + record: %{ + "id" => "471a929e-3e8e-419c-a6d2-9ab3f32294d2", + "value" => "something", + "amount" => "3" + } + ), + cxt.schema + ) + + assert {:error, _} = + ImmutablePrimaryKey.validate_update( + Changes.UpdatedRecord.new( + relation: {"public", "compound_pk"}, + old_record: %{ + "id" => "471a929e-3e8e-419c-a6d2-9ab3f32294d2", + "owner" => "bba4358f-7ba7-41a2-b32c-d8f145f16f41", + "value" => "something", + "amount" => "3" + }, + record: %{ + "id" => "7ba5e710-7f88-475b-885a-5b33d68e5975", + "owner" => "a6c6bba1-dbc7-4624-a2b0-512212b8f814", + "value" => "something else", + "amount" => "4" + } + ), + cxt.schema + ) + end + + test "disallows updates to any column in a compound pk", cxt do + assert {:error, _} = + ImmutablePrimaryKey.validate_update( + Changes.UpdatedRecord.new( + relation: {"public", "compound_pk"}, + old_record: %{ + "id" => "471a929e-3e8e-419c-a6d2-9ab3f32294d2", + "owner" => "bba4358f-7ba7-41a2-b32c-d8f145f16f41", + "value" => "something", + "amount" => "3" + }, + record: %{ + "id" => "471a929e-3e8e-419c-a6d2-9ab3f32294d2", + "owner" => "a6c6bba1-dbc7-4624-a2b0-512212b8f814", + "value" => "something else", + "amount" => "4" + } + ), + cxt.schema + ) + end +end diff --git a/components/electric/test/electric/satellite/write_validation_test.exs b/components/electric/test/electric/satellite/write_validation_test.exs new file mode 100644 index 0000000000..69bc39ac7e --- /dev/null +++ b/components/electric/test/electric/satellite/write_validation_test.exs @@ -0,0 +1,147 @@ +defmodule Electric.Satellite.WriteValidationTest do + use ExUnit.Case, async: true + + alias Electric.Satellite.WriteValidation + alias Electric.Postgres.MockSchemaLoader + alias Electric.Postgres.Extension.SchemaLoader + alias Electric.Replication.Changes + + @single_pk {"public", "single_pk"} + + def valid_tx(lsn) do + %Changes.Transaction{ + lsn: lsn, + changes: [ + %Changes.NewRecord{ + relation: @single_pk, + record: %{ + "id" => "a6c8f529-7be1-412c-ab4b-a52612137e56", + "value" => "value 1", + "amount" => "1" + } + }, + %Changes.NewRecord{ + relation: @single_pk, + record: %{ + "id" => "5f8bf361-0e9f-4108-a92e-af37e97e38da", + "value" => "value 2", + "amount" => "2" + } + } + ] + } + end + + def invalid_tx(lsn) do + %Changes.Transaction{ + lsn: lsn, + changes: [ + # invalid because it fails WriteValidation.ImmutablePrimaryKey + Changes.UpdatedRecord.new( + relation: @single_pk, + old_record: %{ + "id" => "a6c8f529-7be1-412c-ab4b-a52612137e56", + "value" => "value 1", + "amount" => "1" + }, + record: %{ + "id" => "388005b1-5bc8-4428-9933-6dffa598ce93", + "value" => "value 3", + "amount" => "3" + } + ), + Changes.UpdatedRecord.new( + relation: @single_pk, + old_record: %{ + "id" => "5f8bf361-0e9f-4108-a92e-af37e97e38da", + "value" => "value 2", + "amount" => "2" + }, + record: %{ + "id" => "5f8bf361-0e9f-4108-a92e-af37e97e38da", + "value" => "value 2", + "amount" => "2" + } + ) + ] + } + end + + describe "validate_transactions!/3" do + setup do + migrations = [ + {"001", + [ + """ + CREATE TABLE public.single_pk ( + id uuid PRIMARY KEY, + value text, + amount integer + ); + """, + """ + CREATE TABLE public.compound_pk ( + id uuid, + owner uuid, + value text, + amount integer, + PRIMARY KEY (id, owner) + ); + """ + ]} + ] + + {:ok, loader} = + MockSchemaLoader.backend_spec(migrations: migrations) + |> SchemaLoader.connect([]) + + {:ok, loader: loader} + end + + test "invalid_tx/1", cxt do + {:ok, schema} = SchemaLoader.load(cxt.loader) + + assert Enum.any?( + invalid_tx("001").changes, + &match?( + {:error, _}, + WriteValidation.ImmutablePrimaryKey.validate_update(&1, schema) + ) + ) + end + + test "allows valid transactions", cxt do + txns = [ + valid_tx("001"), + valid_tx("002"), + valid_tx("003") + ] + + assert {:ok, ^txns} = WriteValidation.validate_transactions!(txns, cxt.loader) + end + + test "splits valid and invalid with an error", cxt do + pre = [ + valid_tx("001"), + valid_tx("002") + ] + + invalid = invalid_tx("003") + + post = [ + valid_tx("004"), + valid_tx("005") + ] + + txns = pre ++ [invalid] ++ post + + assert {:error, ^pre, error, ^post} = + WriteValidation.validate_transactions!(txns, cxt.loader) + + assert %{ + tx: ^invalid, + verifier: WriteValidation.ImmutablePrimaryKey + } = error + end + end +end diff --git a/components/electric/test/support/mock_schema_loader.ex b/components/electric/test/support/mock_schema_loader.ex index 0170f76d4b..3905c1b887 100644 --- a/components/electric/test/support/mock_schema_loader.ex +++ b/components/electric/test/support/mock_schema_loader.ex @@ -206,12 +206,12 @@ defmodule Electric.Postgres.MockSchemaLoader do def load({[], opts}) do notify(opts, :load) - {:ok, nil, Schema.new()} + {:ok, SchemaLoader.Version.new(nil, Schema.new())} end def load({[%{version: version, schema: schema} | _versions], opts}) do notify(opts, {:load, version, schema}) - {:ok, version, schema} + {:ok, SchemaLoader.Version.new(version, schema)} end @impl true @@ -224,7 +224,7 @@ defmodule Electric.Postgres.MockSchemaLoader do %Migration{schema: schema} -> notify(opts, {:load, version, schema}) - {:ok, version, schema} + {:ok, SchemaLoader.Version.new(version, schema)} nil -> {:error, "schema version not found: #{version}"} @@ -235,17 +235,18 @@ defmodule Electric.Postgres.MockSchemaLoader do def save({:agent, pid}, version, schema, stmts) do with :ok <- Agent.update(pid, fn state -> - {:ok, state} = save(state, version, schema, stmts) + {:ok, state, _schema_version} = save(state, version, schema, stmts) state end) do - {:ok, {:agent, pid}} + {:ok, {:agent, pid}, SchemaLoader.Version.new(version, schema)} end end def save({versions, opts}, version, schema, stmts) do notify(opts, {:save, version, schema, stmts}) - {:ok, {[mock_version(version, schema, stmts) | versions], opts}} + {:ok, {[mock_version(version, schema, stmts) | versions], opts}, + SchemaLoader.Version.new(version, schema)} end @impl true @@ -269,41 +270,6 @@ defmodule Electric.Postgres.MockSchemaLoader do end end - @impl true - def primary_keys({:agent, pid}, schema, name) do - Agent.get(pid, &primary_keys(&1, schema, name)) - end - - def primary_keys({_versions, %{pks: pks} = opts}, schema, name) when is_map(pks) do - notify(opts, {:primary_keys, schema, name}) - - with {:ok, tpks} <- Map.fetch(pks, {schema, name}) do - {:ok, tpks} - else - :error -> - {:error, "no pks defined for #{schema}.#{name} in #{inspect(opts)}"} - end - end - - def primary_keys({[{_version, schema} | _versions], opts}, sname, tname) do - notify(opts, {:primary_keys, sname, tname}) - - Schema.primary_keys(schema, sname, tname) - end - - def primary_keys({[], _opts}, sname, tname) do - {:error, "unknown table #{sname}.#{tname} and no primary keys configured"} - end - - @impl true - def primary_keys({:agent, pid}, {schema, name}) do - Agent.get(pid, &primary_keys(&1, {schema, name})) - end - - def primary_keys({_versions, _opts} = state, {schema, name}) do - primary_keys(state, schema, name) - end - @impl true def refresh_subscription({:agent, pid}, name) do Agent.get(pid, &refresh_subscription(&1, name)) diff --git a/components/electric/test/support/postgres_test_connection.ex b/components/electric/test/support/postgres_test_connection.ex index bc52cd953b..651e711088 100644 --- a/components/electric/test/support/postgres_test_connection.ex +++ b/components/electric/test/support/postgres_test_connection.ex @@ -218,7 +218,7 @@ defmodule Electric.Postgres.TestConnection do def setup_with_sql_execute(_), do: :ok def load_schema(%{conn: _, origin: origin}) do - {:ok, _, schema} = Electric.Postgres.Extension.SchemaCache.load(origin) + {:ok, schema} = Electric.Postgres.Extension.SchemaCache.load(origin) {:ok, schema: schema} end diff --git a/e2e/README.md b/e2e/README.md index 3b8fa52045..10076a2934 100644 --- a/e2e/README.md +++ b/e2e/README.md @@ -16,6 +16,7 @@ Current groups are: - `3.*` - Replication using an actual typescript client in Node - `4.*` - Auxillary Electric functions not immediately related to replication - `5.*` - Conflict resolution semantics tests +- `6.*` - Permissions and write validations Feel free to add more. diff --git a/e2e/tests/05.06_postgres_trigger_column_name_quoting_regression_test.lux b/e2e/tests/05.06_postgres_trigger_column_name_quoting_regression_test.lux index baadea8493..6a39823884 100644 --- a/e2e/tests/05.06_postgres_trigger_column_name_quoting_regression_test.lux +++ b/e2e/tests/05.06_postgres_trigger_column_name_quoting_regression_test.lux @@ -56,7 +56,7 @@ %Electric.Replication.Changes.UpdatedRecord\{relation: \{"public", "camelCase"\}, \ old_record: %\{"id" => "1", "userId" => nil\}, \ record: %\{"id" => "1", "userId" => "test_user"\}, \ - tags: \["postgres_1@\d+", "client_1_1@\d+"\]\} + tags: \["postgres_1@\d+", "client_1_1@\d+"\] [shell pg_1] [invoke wait-for "SELECT * FROM public.\"camelCase\";" "test_user" 10 ${psql}] diff --git a/e2e/tests/06.01_prevent_updates_to_pk.lux b/e2e/tests/06.01_prevent_updates_to_pk.lux new file mode 100644 index 0000000000..1945a8b45b --- /dev/null +++ b/e2e/tests/06.01_prevent_updates_to_pk.lux @@ -0,0 +1,84 @@ +[doc Updates to table primary keys are rejected] +[include _shared.luxinc] + +[invoke setup] + +[global migration_version_1=20231109154018] + +[shell proxy_1] + [invoke log "run migration $migration_version_1 on postgres"] + """! + BEGIN; + CALL electric.migration_version('$migration_version_1'); + CREATE TABLE "compound_pks" ( + "id1" TEXT NOT NULL, + "id2" TEXT NOT NULL, + "value" TEXT NOT NULL, + CONSTRAINT "compound_pks_pk" PRIMARY KEY ("id1", "id2") + ); + ALTER TABLE "compound_pks" ENABLE ELECTRIC; + COMMIT; + """ + ?$psql + + +[shell electric] + ?? [info] Applying migration $migration_version_1 + +[newshell user_1_ws1] + -$fail_pattern + [invoke start_elixir_test 1] + [invoke client_session 1 1] + + !alias Electric.Satellite.{SatRelation, SatRelationColumn, SatOpInsert, SatOpUpdate, SatOpRow} + + """! + Satellite.TestWsClient.send_data(conn, %SatRelation{ + columns: [ + %SatRelationColumn{name: "id1", type: "text", is_nullable: false}, + %SatRelationColumn{name: "id2", type: "text", is_nullable: false}, + %SatRelationColumn{name: "value", type: "text", is_nullable: false}, + ], + relation_id: 1, + schema_name: "public", + table_name: "compound_pks", + table_type: :TABLE + }) + """ + ?$eprompt + """! + Satellite.TestWsClient.send_data(conn, ProtocolHelpers.transaction("1", DateTime.to_unix(DateTime.utc_now(), :millisecond), [ + %SatOpInsert{relation_id: 1, row_data: %SatOpRow{nulls_bitmask: <<0>>, values: ["11111111-1111-1111-1111-111111111111", "22222222-2222-2222-2222-222222222222", "test content 1"]}}, + %SatOpInsert{relation_id: 1, row_data: %SatOpRow{nulls_bitmask: <<0>>, values: ["33333333-3333-3333-3333-333333333333", "44444444-4444-4444-4444-444444444444", "test content 2"]}} + ])) + """ + ?$eprompt + +[shell electric] + ?row_data: %Electric.Satellite.SatOpRow\{nulls_bitmask: <<0>>, values: \["11111111-1111-1111-1111-111111111111" + ?row_data: %Electric.Satellite.SatOpRow\{nulls_bitmask: <<0>>, values: \["33333333-3333-3333-3333-333333333333" + +[shell user_1_ws1] + # Reset the failure pattern + - + """! + Satellite.TestWsClient.send_data(conn, ProtocolHelpers.transaction("2", DateTime.to_unix(DateTime.utc_now(), :millisecond), [ + %SatOpUpdate{ + relation_id: 1, + old_row_data: %SatOpRow{nulls_bitmask: <<0>>, values: ["11111111-1111-1111-1111-111111111111", "22222222-2222-2222-2222-222222222222", "test content 1"]}, + row_data: %SatOpRow{nulls_bitmask: <<0>>, values: ["21111111-1111-1111-1111-111111111111", "22222222-2222-2222-2222-222222222222", "test content 2"]} + } + ])) + """ + ?$eprompt + +[shell electric] + # Reset the failure pattern + - + ?failed write validation tests + +[shell user_1_ws1] + ?%Electric.Satellite.SatErrorResp\{error_type: :INVALID_REQUEST, lsn: "2" + +[cleanup] + [invoke teardown] diff --git a/protocol/satellite.proto b/protocol/satellite.proto index 0c9b50d6af..75d602ad3d 100644 --- a/protocol/satellite.proto +++ b/protocol/satellite.proto @@ -99,6 +99,11 @@ message SatErrorResp { } ErrorCode error_type = 1; + + // lsn of the txn that caused the problem, if available + optional bytes lsn = 2; + // human readable explanation of what went wrong + optional string message = 3; } // (Consumer) Starts replication stream from producer to consumer