Skip to content

Commit

Permalink
chore(electric): Validate column types when electrifying a table (#331)
Browse files Browse the repository at this point in the history
Closes VAX-616.
  • Loading branch information
alco authored Aug 15, 2023
1 parent 57324c4 commit 571119a
Show file tree
Hide file tree
Showing 14 changed files with 146 additions and 40 deletions.
5 changes: 5 additions & 0 deletions .changeset/tasty-chairs-beg.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@core/electric": patch
---

Add a validation step in the electrify() function that only lets tables with supported column types to be electrified
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ defmodule Electric.Postgres.Extension.Migrations.Migration_20230605141256_Electr

@behaviour Extension.Migration

sql_file = Path.expand("20230605141256_electrify_function/electrify.sql", __DIR__)
sql_template = Path.expand("20230605141256_electrify_function/electrify.sql.eex", __DIR__)

@external_resource sql_file
@external_resource sql_template

@impl true
def version, do: 2023_06_05_14_12_56
Expand All @@ -18,17 +18,21 @@ defmodule Electric.Postgres.Extension.Migrations.Migration_20230605141256_Electr
electrified_index_table = Extension.electrified_index_table()
publication = Extension.publication_name()
event_triggers = Extension.event_triggers()
event_trigger_tags = ["'ALTER TABLE'", "'DROP TABLE'", "'DROP INDEX'", "'DROP VIEW'"]

event_trigger_tags =
["'ALTER TABLE'"] ++ for obj <- ["TABLE", "INDEX", "VIEW"], do: "'DROP #{obj}'"
supported_types_sql =
Electric.Satellite.Serialization.supported_pg_types()
|> Enum.map(&"'#{&1}'")
|> Enum.join(",")

electrify_function =
electrify_function_sql(
schema,
electrified_tracking_table,
Extension.electrified_index_table(),
publication,
Extension.add_table_to_publication_sql("%I.%I")
Extension.add_table_to_publication_sql("%I.%I"),
supported_types_sql
)

[
Expand Down Expand Up @@ -57,7 +61,7 @@ defmodule Electric.Postgres.Extension.Migrations.Migration_20230605141256_Electr
electrify_function,
"""
CREATE EVENT TRIGGER #{event_triggers[:sql_drop]} ON sql_drop
WHEN TAG IN (#{Enum.join(event_trigger_tags, ", ")})
WHEN TAG IN (#{Enum.join(event_trigger_tags, ", ")})
EXECUTE FUNCTION #{schema}.ddlx_sql_drop_handler();
"""
]
Expand All @@ -68,11 +72,12 @@ defmodule Electric.Postgres.Extension.Migrations.Migration_20230605141256_Electr
[]
end

EEx.function_from_file(:defp, :electrify_function_sql, sql_file, [
EEx.function_from_file(:defp, :electrify_function_sql, sql_template, [
:schema,
:electrified_tracking_table,
:electrified_index_table,
:publication_name,
:publication_sql
:publication_sql,
:valid_column_types
])
end
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,39 @@ $function$ LANGUAGE PLPGSQL STABLE;

-------------------------------------------------

CREATE OR REPLACE FUNCTION <%= schema %>.__validate_table_column_types(table_name text)
RETURNS VOID AS $function$
DECLARE
_col_name text;
_col_type text;
_col_typmod int;
_col_type_pretty text;
_invalid_cols text[];
BEGIN
FOR _col_name, _col_type, _col_typmod, _col_type_pretty IN
SELECT attname, typname, atttypmod, format_type(atttypid, atttypmod)
FROM pg_attribute
JOIN pg_type on atttypid = pg_type.oid
WHERE attrelid = table_name::regclass AND attnum > 0 AND NOT attisdropped
ORDER BY attnum
LOOP
IF _col_type NOT IN (<%= valid_column_types %>)
-- We only support unsized varchar type
OR ('varchar' IN (<%= valid_column_types %>) AND _col_type = 'varchar' AND _col_typmod <> -1)
THEN
_invalid_cols = array_append(_invalid_cols, format('"%s" %s', _col_name, _col_type_pretty));
END IF;
END LOOP;

IF _invalid_cols IS NOT NULL THEN
RAISE EXCEPTION E'Cannot electrify "%" because some of its columns have types not supported by Electric:\n %',
table_name, array_to_string(_invalid_cols, E'\n ');
END IF;
END;
$function$ LANGUAGE PLPGSQL;

-------------------------------------------------

CREATE OR REPLACE PROCEDURE <%= schema %>.electrify(
name1 text,
name2 text DEFAULT NULL
Expand All @@ -117,6 +150,8 @@ BEGIN
RAISE EXCEPTION '% is not an ordinary table', _quoted_name;
END IF;

PERFORM <%= schema %>.__validate_table_column_types(_quoted_name);

EXECUTE format('ALTER TABLE %I.%I REPLICA IDENTITY FULL;', _schema, _table);

INSERT INTO <%= electrified_tracking_table %> (schema_name, table_name, oid)
Expand All @@ -133,9 +168,8 @@ BEGIN

IF NOT EXISTS (
SELECT pr.oid FROM pg_publication_rel pr
INNER JOIN pg_publication pp ON pr.prpubid = pp.oid
WHERE pp.pubname = '<%= publication_name %>'
AND pr.prrelid = _oid
INNER JOIN pg_publication pp ON pr.prpubid = pp.oid
WHERE pp.pubname = '<%= publication_name %>' AND pr.prrelid = _oid
) THEN
EXECUTE format('<%= publication_sql %>;', _schema, _table);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,19 @@ defmodule Electric.Postgres.Extension.Migrations.Migration_20230715000000_Utilit

@impl true
def up(_) do
table = Extension.transaction_marker_table()

[
"""
CREATE TABLE #{Extension.transaction_marker_table()} (
CREATE TABLE #{table} (
id VARCHAR(64) PRIMARY KEY,
content jsonb NULL
)
""",
"""
INSERT INTO #{Extension.transaction_marker_table()} (id, content) VALUES ('magic write', '{}')
INSERT INTO #{table} (id, content) VALUES ('magic write', '{}')
""",
Extension.add_table_to_publication_sql(Extension.transaction_marker_table())
Extension.add_table_to_publication_sql(table)
]
end

Expand Down
15 changes: 13 additions & 2 deletions components/electric/lib/electric/satellite/serialization.ex
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,18 @@ defmodule Electric.Satellite.Serialization do
@type relation_mapping() ::
%{Changes.relation() => {PB.relation_id(), [Replication.Column.name()]}}

@spec supported_pg_types :: [atom]
def supported_pg_types do
~w[
bytea
int2 int4 int8
float8
text
uuid
varchar
]a
end

@doc """
Serialize from internal format to Satellite PB format
"""
Expand Down Expand Up @@ -417,8 +429,7 @@ defmodule Electric.Satellite.Serialization do
"""
@spec decode_column_value(binary, atom) :: binary

def decode_column_value(val, type) when type in [:text, :bpchar, :varchar, :bytea] do
# TODO: validate value length for sized bpchar and varchar types.
def decode_column_value(val, type) when type in [:bytea, :text, :varchar] do
val
end

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ defmodule Electric.Postgres.Extension.ElectrifyTest do
CREATE TABLE buttercup (
id uuid PRIMARY KEY DEFAULT uuid_generate_v4(),
value text,
secret bool
secret integer
);
"""

Expand All @@ -58,7 +58,7 @@ defmodule Electric.Postgres.Extension.ElectrifyTest do
CREATE TABLE buttercup (
id uuid PRIMARY KEY DEFAULT uuid_generate_v4(),
value text,
secret bool
secret integer
);
"""

Expand Down Expand Up @@ -125,7 +125,7 @@ defmodule Electric.Postgres.Extension.ElectrifyTest do
CREATE TABLE balloons.buttercup (
id uuid PRIMARY KEY DEFAULT uuid_generate_v4(),
value text,
secret bool
secret integer
);
"""

Expand Down Expand Up @@ -187,7 +187,7 @@ defmodule Electric.Postgres.Extension.ElectrifyTest do
CREATE TABLE buttercup (
id uuid PRIMARY KEY DEFAULT uuid_generate_v4(),
value text,
secret bool
secret integer
);
"""

Expand Down
48 changes: 48 additions & 0 deletions components/electric/test/electric/postgres/extension_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,54 @@ defmodule Electric.Postgres.ExtensionTest do
)
end

test_tx "table electrification successfully validates column types", fn conn ->
assert [{:ok, [], []}, {:ok, [], []}] ==
:epgsql.squery(conn, """
CREATE TABLE public.t1 (
id UUID PRIMARY KEY,
content TEXT NOT NULL,
words VARCHAR,
num2a INT2,
num2b SMALLINT,
num4a INT4,
num4b INT,
num4c INTEGER,
num8a INT8,
num8b BIGINT,
real8a FLOAT8,
real8b DOUBLE PRECISION
);
CALL electric.electrify('public.t1');
""")
end

test_tx "table electrification rejects invalid column types", fn conn ->
assert [
{:ok, [], []},
{:error, {:error, :error, _, :raise_exception, error_msg, _}}
] =
:epgsql.squery(conn, """
CREATE TABLE public.t1 (
id UUID PRIMARY KEY,
c1 CHARACTER,
c2 CHARACTER(11),
c3 VARCHAR(11),
created_at TIMESTAMP
);
CALL electric.electrify('public.t1');
""")

assert error_msg ==
"""
Cannot electrify "public.t1" because some of its columns have types not supported by Electric:
"c1" character(1)
"c2" character(11)
"c3" character varying(11)
"created_at" timestamp without time zone
"""
|> String.trim()
end

test_tx "electrified?/2", fn conn ->
sql1 = "CREATE TABLE public.buttercup (id int8 GENERATED ALWAYS AS IDENTITY PRIMARY KEY);"
sql2 = "CREATE TABLE public.daisy (id int8 GENERATED ALWAYS AS IDENTITY PRIMARY KEY);"
Expand Down
2 changes: 1 addition & 1 deletion components/electric/test/electric/postgres_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ defmodule Electric.PostgresTest do
import Electric.Postgres.TestConnection

setup do
context = setup_test_db()
context = create_test_db()

# start the replication process here so that it will be stopped before
# we get to the on_exit handler defined in the setup_all
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ defmodule Electric.Satellite.WsValidationsTest do

@ws_listener_name :ws_validations_test
@table_name "foo"
@receive_timeout 500

setup :setup_replicated_db

Expand Down Expand Up @@ -60,7 +61,7 @@ defmodule Electric.Satellite.WsValidationsTest do
# Wait long enough for the server to process our messages, thus confirming it has been accepted
ping_server(conn)

refute_receive {^conn, %SatErrorResp{error_type: :INVALID_REQUEST}}
refute_receive {^conn, %SatErrorResp{error_type: :INVALID_REQUEST}}, @receive_timeout
end)
end

Expand All @@ -85,7 +86,7 @@ defmodule Electric.Satellite.WsValidationsTest do
within_replication_context(ctx, vsn, fn conn ->
tx_op_log = serialize_trans(record)
MockClient.send_data(conn, tx_op_log)
assert_receive {^conn, %SatErrorResp{error_type: :INVALID_REQUEST}}
assert_receive {^conn, %SatErrorResp{error_type: :INVALID_REQUEST}}, @receive_timeout
end)
end)
end
Expand Down Expand Up @@ -139,7 +140,7 @@ defmodule Electric.Satellite.WsValidationsTest do
within_replication_context(ctx, vsn, fn conn ->
tx_op_log = serialize_trans(record)
MockClient.send_data(conn, tx_op_log)
assert_receive {^conn, %SatErrorResp{error_type: :INVALID_REQUEST}}
assert_receive {^conn, %SatErrorResp{error_type: :INVALID_REQUEST}}, @receive_timeout
end)
end)
end
Expand Down Expand Up @@ -194,7 +195,7 @@ defmodule Electric.Satellite.WsValidationsTest do
within_replication_context(ctx, vsn, fn conn ->
tx_op_log = serialize_trans(record)
MockClient.send_data(conn, tx_op_log)
assert_receive {^conn, %SatErrorResp{error_type: :INVALID_REQUEST}}
assert_receive {^conn, %SatErrorResp{error_type: :INVALID_REQUEST}}, @receive_timeout
end)
end)
end
Expand Down Expand Up @@ -233,7 +234,7 @@ defmodule Electric.Satellite.WsValidationsTest do
within_replication_context(ctx, vsn, fn conn ->
tx_op_log = serialize_trans(record)
MockClient.send_data(conn, tx_op_log)
assert_receive {^conn, %SatErrorResp{error_type: :INVALID_REQUEST}}
assert_receive {^conn, %SatErrorResp{error_type: :INVALID_REQUEST}}, @receive_timeout
end)
end)
end
Expand Down
8 changes: 4 additions & 4 deletions components/electric/test/support/postgres_test_connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ defmodule Electric.Postgres.TestConnection do
end)
end

def setup_test_db(setup_fun \\ fn _ -> nil end, teardown_fun \\ fn _ -> nil end) do
def create_test_db(setup_fun \\ fn _ -> nil end, teardown_fun \\ fn _ -> nil end) do
db_name = "electric_postgres_test_#{DateTime.utc_now() |> DateTime.to_unix()}"
config = config() |> Keyword.delete(:database)

Expand Down Expand Up @@ -109,7 +109,7 @@ defmodule Electric.Postgres.TestConnection do
)
end

context = Map.merge(context, setup_test_db(setup_fun, teardown_fun))
context = Map.merge(context, create_test_db(setup_fun, teardown_fun))

pg_connector_opts =
context
Expand Down Expand Up @@ -160,8 +160,8 @@ defmodule Electric.Postgres.TestConnection do
:epgsql.squery(conn, """
CREATE TABLE public.my_entries (
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
content VARCHAR(64) NOT NULL,
content_b VARCHAR(64)
content VARCHAR NOT NULL,
content_b TEXT
);
""")
Expand Down
8 changes: 4 additions & 4 deletions e2e/init.sql
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
CREATE TABLE entries (
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
content VARCHAR(64) NOT NULL,
content_b VARCHAR(64)
content VARCHAR NOT NULL,
content_b TEXT
);

CREATE TABLE owned_entries (
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
electric_user_id VARCHAR(255) NOT NULL,
content VARCHAR(64) NOT NULL
electric_user_id TEXT NOT NULL,
content VARCHAR NOT NULL
);
4 changes: 2 additions & 2 deletions e2e/tests/01.04_postgres_alter_table_also_alters_shadows.lux
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

[shell pg_1]
!BEGIN;
!ALTER TABLE entries ADD COLUMN new_column VARCHAR(64);
!ALTER TABLE entries ADD COLUMN new_column VARCHAR;
!COMMIT;
?$psql
# Verify column structure of the shadow table:
Expand All @@ -16,4 +16,4 @@
?$psql

[cleanup]
[invoke teardown]
[invoke teardown]
Loading

0 comments on commit 571119a

Please sign in to comment.