Skip to content

Commit

Permalink
fix: don't send rows to clients that have already seen this particula…
Browse files Browse the repository at this point in the history
…r table (#311)

This is a very simplified enabling implementation so that we remove a
foot-gun of intersecting shapes while we only support unfiltered
full-table sync (with one immutable filter of user id).
  • Loading branch information
icehaunter authored Aug 21, 2023
1 parent 92f3258 commit 3cf2bc2
Show file tree
Hide file tree
Showing 5 changed files with 138 additions and 31 deletions.
5 changes: 5 additions & 0 deletions .changeset/healthy-dingos-destroy.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@core/electric": patch
---

Support for intersecting shape subscriptions
Original file line number Diff line number Diff line change
Expand Up @@ -97,37 +97,41 @@ defmodule Electric.Replication.Shapes.ShapeRequest do
origin,
filtering_context
) do
table = Enum.find(schema.tables, &(&1.name.schema == schema_name && &1.name.name == name))
columns = Enum.map_join(table.columns, ", ", &~s|main."#{&1.name}"|)
{:ok, pks} = Schema.primary_keys(table)
pk_clause = Enum.map_join(pks, " AND ", &~s|main."#{&1}" = shadow."#{&1}"|)

ownership_column = Ownership.id_column_name()

where_clause =
if filtering_context[:user_id] && Enum.any?(table.columns, &(&1.name == ownership_column)) do
escaped = String.replace(filtering_context[:user_id], "'", "''")

# We're using explicit interpolation here instead of extended query, because we need all columns regardless of type to be returned as binaries
"WHERE #{ownership_column} = '#{escaped}'"
else
""
if filtering_context[:sent_tables] && MapSet.member?(filtering_context[:sent_tables], rel) do
{:ok, []}
else
table = Enum.find(schema.tables, &(&1.name.schema == schema_name && &1.name.name == name))
columns = Enum.map_join(table.columns, ", ", &~s|main."#{&1.name}"|)
{:ok, pks} = Schema.primary_keys(table)
pk_clause = Enum.map_join(pks, " AND ", &~s|main."#{&1}" = shadow."#{&1}"|)

ownership_column = Ownership.id_column_name()

where_clause =
if filtering_context[:user_id] && Enum.any?(table.columns, &(&1.name == ownership_column)) do
escaped = String.replace(filtering_context[:user_id], "'", "''")

# We're using explicit interpolation here instead of extended query, because we need all columns regardless of type to be returned as binaries
"WHERE #{ownership_column} = '#{escaped}'"
else
""
end

query = """
SELECT shadow."_tags", #{columns}
FROM #{Schema.name(schema_name)}.#{Schema.name(name)} as main
JOIN electric."shadow__#{schema_name}__#{name}" as shadow
ON #{pk_clause}
#{where_clause}
"""

case :epgsql.squery(conn, query) do
{:ok, _, rows} ->
{:ok, rows_to_records_with_tags(rows, Enum.map(table.columns, & &1.name), rel, origin)}

{:error, _} = error ->
error
end

query = """
SELECT shadow."_tags", #{columns}
FROM #{Schema.name(schema_name)}.#{Schema.name(name)} as main
JOIN electric."shadow__#{schema_name}__#{name}" as shadow
ON #{pk_clause}
#{where_clause}
"""

case :epgsql.squery(conn, query) do
{:ok, _, rows} ->
{:ok, rows_to_records_with_tags(rows, Enum.map(table.columns, & &1.name), rel, origin)}

{:error, _} = error ->
error
end
end

Expand Down
11 changes: 10 additions & 1 deletion components/electric/lib/electric/satellite/protocol.ex
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ defmodule Electric.Satellite.Protocol do
alias Electric.Postgres.Extension.SchemaCache
alias Electric.Replication.Changes
alias Electric.Replication.Shapes
alias Electric.Replication.Shapes.ShapeRequest
alias Electric.Replication.OffsetStorage
alias Electric.Satellite.Serialization
alias Electric.Satellite.ClientManager
Expand Down Expand Up @@ -828,7 +829,15 @@ defmodule Electric.Satellite.Protocol do
# I'm dereferencing these here because calling this in Task implies copying over entire `state` just for two fields.
fun = state.subscription_data_fun
opts = state.pg_connector_opts
context = %{user_id: Pathex.get(state, path(:auth / :user_id))}

context = %{
user_id: Pathex.get(state, path(:auth / :user_id)),
sent_tables:
Map.values(state.subscriptions)
|> List.flatten()
|> Enum.flat_map(fn %ShapeRequest{included_tables: tables} -> tables end)
|> MapSet.new()
}

Task.start(fn ->
# This is `InitiaSync.query_subscription_data/2` by default, but can be overridden for tests.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,5 +82,21 @@ defmodule Electric.Replication.Shapes.ShapeRequestTest do
user_id: "00000000-0000-0000-0000-000000000001"
})
end

@tag with_sql: """
INSERT INTO public.my_entries (content) VALUES ('test content');
"""
test "should not fulfill full-table requests if the table has already been sent", %{
origin: origin,
conn: conn,
schema: schema
} do
request = %ShapeRequest{id: "id", included_tables: [{"public", "my_entries"}]}

assert {:ok, []} =
ShapeRequest.query_initial_data(request, conn, schema, origin, %{
sent_tables: MapSet.new([{"public", "my_entries"}])
})
end
end
end
73 changes: 73 additions & 0 deletions components/electric/test/electric/satellite/subscriptions_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,79 @@ defmodule Electric.Satellite.SubscriptionsTest do
end)
end

@tag with_sql: """
INSERT INTO public.users (id, name) VALUES ('#{uuid4()}', 'John');
"""
test "Second subscription for the same table yields no data",
%{conn: pg_conn} = ctx do
MockClient.with_connect([auth: ctx, id: ctx.client_id, port: ctx.port], fn conn ->
MockClient.send_data(conn, %SatInStartReplicationReq{})
assert_initial_replication_response(conn, 3)

sub_id1 = uuid4()
request_id1 = uuid4()

MockClient.send_data(conn, %SatSubsReq{
subscription_id: sub_id1,
shape_requests: [
%SatShapeReq{
request_id: request_id1,
shape_definition: %SatShapeDef{
selects: [%SatShapeDef.Select{tablename: "users"}]
}
}
]
})

assert_receive {^conn, %SatSubsResp{subscription_id: ^sub_id1, err: nil}}
received = receive_subscription_data(conn, sub_id1)
assert Map.keys(received) == [request_id1]
assert [%SatOpInsert{row_data: %{values: [_, "John"]}}] = received[request_id1]

sub_id2 = uuid4()
request_id2 = uuid4()

MockClient.send_data(conn, %SatSubsReq{
subscription_id: sub_id2,
shape_requests: [
%SatShapeReq{
request_id: request_id2,
shape_definition: %SatShapeDef{
selects: [%SatShapeDef.Select{tablename: "users"}]
}
}
]
})

# Since we already got the data for this table, we shouldn't receive it again
assert_receive {^conn, %SatSubsResp{subscription_id: sub_id2, err: nil}}
received = receive_subscription_data(conn, sub_id2)
assert Map.keys(received) == [request_id2]
assert [] == received[request_id2]

# But an unsubscribe from one of those still keeps messages coming for the mentioned table
MockClient.send_data(conn, %SatUnsubsReq{subscription_ids: [sub_id1]})
assert_receive {^conn, %SatUnsubsResp{}}

# We still get the message because the other subscription is active
{:ok, 1} =
:epgsql.equery(pg_conn, "INSERT INTO public.users (id, name) VALUES ($1, $2)", [
uuid4(),
"Garry"
])

assert_receive {^conn,
%SatOpLog{
ops: [
_begin,
%{op: {:insert, %{row_data: %{values: [_, "Garry"]}}}},
_commit
]
}},
1000
end)
end

test "The client can connect and subscribe, and then unsubscribe, and gets no data after unsubscribing",
%{conn: pg_conn} = ctx do
MockClient.with_connect([auth: ctx, id: ctx.client_id, port: ctx.port], fn conn ->
Expand Down

0 comments on commit 3cf2bc2

Please sign in to comment.