diff --git a/.changeset/quick-crabs-accept.md b/.changeset/quick-crabs-accept.md new file mode 100644 index 0000000000..1815e1ae5e --- /dev/null +++ b/.changeset/quick-crabs-accept.md @@ -0,0 +1,5 @@ +--- +"@core/electric": patch +--- + +Improve performance of shape unsubscribe API with alternative `SentRowsGraph` pruning method. diff --git a/components/electric/lib/electric/replication/shapes/sent_rows_graph.ex b/components/electric/lib/electric/replication/shapes/sent_rows_graph.ex index 40d591c3c9..494e9107c0 100644 --- a/components/electric/lib/electric/replication/shapes/sent_rows_graph.ex +++ b/components/electric/lib/electric/replication/shapes/sent_rows_graph.ex @@ -48,7 +48,7 @@ defmodule Electric.Replication.Shapes.SentRowsGraph do ...> |> Graph.add_edge(:v1, :v2, label: {"r1", "l2"}) ...> |> pop_by_request_ids(["r1", "r2"]) iex> popped_vertices - [:v2, :v1] + [:v1, :v2] iex> new_graph #Graph @@ -80,25 +80,25 @@ defmodule Electric.Replication.Shapes.SentRowsGraph do defp do_pop_by_request_id(%Graph{} = graph, %MapSet{} = request_ids, root_vertex) do predicate = fn {id, _} -> MapSet.member?(request_ids, id) end - {edges, vertices} = + {edges, vertices_to_drop} = dfs_traverse( [Graph.Utils.vertex_id(root_vertex)], graph, - {[], []}, + {[], %{}}, fn - ^root_vertex, _, acc -> + _, ^root_vertex, _, acc -> {:next, acc} - v, incoming_edges, {edges, vertices} -> + v_id, v, incoming_edges, {edges, vertices} -> incoming_edges |> Enum.flat_map(fn {source_v, meta} -> Enum.map(Map.keys(meta), &{source_v, v, &1}) end) |> Enum.split_with(&predicate.(elem(&1, 2))) |> case do - {_all_edges, []} -> + {_new_edges, []} -> # If all incoming edges match the request ID, we'll pop the vertex - {:next, {edges, [v | vertices]}} + {:next, {edges, Map.put(vertices, v_id, v)}} {new_edges, _rest} -> # If some incoming edges are unaffected, we'll pop the edges explicitly @@ -108,12 +108,19 @@ defmodule Electric.Replication.Shapes.SentRowsGraph do fn meta -> any_key_matches_predicate?(meta, predicate) end ) + vertices_to_keep = + Enum.flat_map(graph.vertices, fn {v_id, v} -> + if is_map_key(vertices_to_drop, v_id), do: [], else: [v] + end) + graph = edges + # Remove all edges relating to the request IDs from the graph |> Enum.reduce(graph, fn {v1, v2, label}, acc -> Graph.delete_edge(acc, v1, v2, label) end) - |> Graph.delete_vertices(vertices) + # Retain the subgraph that does not contain the dropped vertices + |> Graph.subgraph(vertices_to_keep) - {vertices, graph} + {Map.values(vertices_to_drop), graph} end defp any_key_matches_predicate?(map, predicate) when is_map(map), @@ -147,14 +154,14 @@ defmodule Electric.Replication.Shapes.SentRowsGraph do edge_predicate_fun, visited ) - when is_function(fun, 3) and is_function(edge_predicate_fun, 1) do + when is_function(fun, 4) and is_function(edge_predicate_fun, 1) do if MapSet.member?(visited, v_id) do dfs_traverse(rest, g, acc, fun, edge_predicate_fun, visited) else v = Map.get(vs, v_id) in_edges = Enum.map(Map.get(ie, v_id, []), &{Map.fetch!(vs, &1), Map.fetch!(e, {&1, v_id})}) - case fun.(v, in_edges, acc) do + case fun.(v_id, v, in_edges, acc) do {:next, acc2} -> visited = MapSet.put(visited, v_id) diff --git a/components/electric/test/electric/replication/shapes/sent_rows_graph_test.exs b/components/electric/test/electric/replication/shapes/sent_rows_graph_test.exs index 104ab925e7..4235dda92b 100644 --- a/components/electric/test/electric/replication/shapes/sent_rows_graph_test.exs +++ b/components/electric/test/electric/replication/shapes/sent_rows_graph_test.exs @@ -1,5 +1,47 @@ defmodule Electric.Replication.Shapes.SentRowsGraphTest do use ExUnit.Case, async: true + alias Electric.Replication.Shapes.SentRowsGraph doctest Electric.Replication.Shapes.SentRowsGraph, import: true + + describe "pop_by_request_ids/3" do + test "should return the popped vertices and the new graph" do + graph_init = + Graph.new() + |> Graph.add_edge(:root, :v1, label: {"r1", "l1"}) + |> Graph.add_edge(:root, :v1, label: {"r2", "l1"}) + |> Graph.add_edge(:v1, :v2, label: {"r1", "l2"}) + + assert {[:v2], %Graph{} = graph_new} = + SentRowsGraph.pop_by_request_ids(graph_init, "r1", root_vertex: :root) + + assert [:v1, :root] = Graph.vertices(graph_new) + assert 1 = Graph.num_edges(graph_new) + assert Graph.edge(graph_new, :root, :v1, {"r2", "l1"}) + end + + test "should be able to pop list of requests" do + graph_init = + Graph.new() + |> Graph.add_edge(:root, :v1, label: {"r1", "l1"}) + |> Graph.add_edge(:root, :v1, label: {"r2", "l1"}) + |> Graph.add_edge(:v1, :v2, label: {"r1", "l2"}) + + assert {[:v1, :v2], %Graph{} = graph_new} = + SentRowsGraph.pop_by_request_ids(graph_init, ["r1", "r2"], root_vertex: :root) + + assert [:root] = Graph.vertices(graph_new) + assert 0 = Graph.num_edges(graph_new) + end + + test "should be able to pop empty graph" do + graph_init = Graph.new() + + assert {[], %Graph{} = graph_new} = + SentRowsGraph.pop_by_request_ids(graph_init, ["r1", "r2"]) + + assert [] = Graph.vertices(graph_new) + assert 0 = Graph.num_edges(graph_new) + end + end end