From ebf3566f9b24a81c7b51bfecf54cd517bd1a9541 Mon Sep 17 00:00:00 2001 From: msfstef Date: Wed, 19 Jun 2024 19:01:25 +0300 Subject: [PATCH 1/6] proposed improvement --- .../replication/shapes/sent_rows_graph.ex | 20 ++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) diff --git a/components/electric/lib/electric/replication/shapes/sent_rows_graph.ex b/components/electric/lib/electric/replication/shapes/sent_rows_graph.ex index 40d591c3c9..0f8b89d5fd 100644 --- a/components/electric/lib/electric/replication/shapes/sent_rows_graph.ex +++ b/components/electric/lib/electric/replication/shapes/sent_rows_graph.ex @@ -1,4 +1,5 @@ defmodule Electric.Replication.Shapes.SentRowsGraph do + require Logger @moduledoc """ Module responsible for operations over the sent rows graph. @@ -80,6 +81,8 @@ defmodule Electric.Replication.Shapes.SentRowsGraph do defp do_pop_by_request_id(%Graph{} = graph, %MapSet{} = request_ids, root_vertex) do predicate = fn {id, _} -> MapSet.member?(request_ids, id) end + start = System.monotonic_time() + {edges, vertices} = dfs_traverse( [Graph.Utils.vertex_id(root_vertex)], @@ -96,9 +99,9 @@ defmodule Electric.Replication.Shapes.SentRowsGraph do end) |> Enum.split_with(&predicate.(elem(&1, 2))) |> case do - {_all_edges, []} -> + {all_edges, []} -> # If all incoming edges match the request ID, we'll pop the vertex - {:next, {edges, [v | vertices]}} + {:next, {all_edges ++ edges, [v | vertices]}} {new_edges, _rest} -> # If some incoming edges are unaffected, we'll pop the edges explicitly @@ -108,10 +111,21 @@ defmodule Electric.Replication.Shapes.SentRowsGraph do fn meta -> any_key_matches_predicate?(meta, predicate) end ) + Logger.warning("Traversing graph took #{System.convert_time_unit(System.monotonic_time() - start, :native, :millisecond)}") + start = System.monotonic_time() + graph = edges |> Enum.reduce(graph, fn {v1, v2, label}, acc -> Graph.delete_edge(acc, v1, v2, label) end) - |> Graph.delete_vertices(vertices) + + Logger.warning("Removing edges from graph took #{System.convert_time_unit(System.monotonic_time() - start, :native, :millisecond)}") + start = System.monotonic_time() + vertices_to_keep = MapSet.difference(MapSet.new(Graph.vertices(graph)), MapSet.new(vertices)) |> MapSet.to_list + graph = Graph.subgraph(graph, vertices_to_keep) + # graph = graph + # |> Graph.delete_vertices(vertices) + + Logger.warning("Removing vertices graph took #{System.convert_time_unit(System.monotonic_time() - start, :native, :millisecond)}") {vertices, graph} end From 1cb9ef4562aadd1cb5d427888123df395075f375 Mon Sep 17 00:00:00 2001 From: msfstef Date: Thu, 20 Jun 2024 10:53:34 +0300 Subject: [PATCH 2/6] Remove logs --- .../replication/shapes/sent_rows_graph.ex | 17 +++++------------ 1 file changed, 5 insertions(+), 12 deletions(-) diff --git a/components/electric/lib/electric/replication/shapes/sent_rows_graph.ex b/components/electric/lib/electric/replication/shapes/sent_rows_graph.ex index 0f8b89d5fd..3bbf7bf2b2 100644 --- a/components/electric/lib/electric/replication/shapes/sent_rows_graph.ex +++ b/components/electric/lib/electric/replication/shapes/sent_rows_graph.ex @@ -81,8 +81,6 @@ defmodule Electric.Replication.Shapes.SentRowsGraph do defp do_pop_by_request_id(%Graph{} = graph, %MapSet{} = request_ids, root_vertex) do predicate = fn {id, _} -> MapSet.member?(request_ids, id) end - start = System.monotonic_time() - {edges, vertices} = dfs_traverse( [Graph.Utils.vertex_id(root_vertex)], @@ -99,9 +97,9 @@ defmodule Electric.Replication.Shapes.SentRowsGraph do end) |> Enum.split_with(&predicate.(elem(&1, 2))) |> case do - {all_edges, []} -> + {new_edges, []} -> # If all incoming edges match the request ID, we'll pop the vertex - {:next, {all_edges ++ edges, [v | vertices]}} + {:next, {new_edges ++ edges, [v | vertices]}} {new_edges, _rest} -> # If some incoming edges are unaffected, we'll pop the edges explicitly @@ -111,21 +109,16 @@ defmodule Electric.Replication.Shapes.SentRowsGraph do fn meta -> any_key_matches_predicate?(meta, predicate) end ) - Logger.warning("Traversing graph took #{System.convert_time_unit(System.monotonic_time() - start, :native, :millisecond)}") - start = System.monotonic_time() + # Remove all edges relating to the request IDs from the graph graph = edges |> Enum.reduce(graph, fn {v1, v2, label}, acc -> Graph.delete_edge(acc, v1, v2, label) end) - Logger.warning("Removing edges from graph took #{System.convert_time_unit(System.monotonic_time() - start, :native, :millisecond)}") - start = System.monotonic_time() + # Retain the maximally connected subgraph that does not contain the + # vertices that have been popped vertices_to_keep = MapSet.difference(MapSet.new(Graph.vertices(graph)), MapSet.new(vertices)) |> MapSet.to_list graph = Graph.subgraph(graph, vertices_to_keep) - # graph = graph - # |> Graph.delete_vertices(vertices) - - Logger.warning("Removing vertices graph took #{System.convert_time_unit(System.monotonic_time() - start, :native, :millisecond)}") {vertices, graph} end From 0961802336673260fa24673bbfbac7448b16da87 Mon Sep 17 00:00:00 2001 From: msfstef Date: Thu, 20 Jun 2024 11:04:34 +0300 Subject: [PATCH 3/6] Fix formatting issues --- .../lib/electric/replication/shapes/sent_rows_graph.ex | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/components/electric/lib/electric/replication/shapes/sent_rows_graph.ex b/components/electric/lib/electric/replication/shapes/sent_rows_graph.ex index 3bbf7bf2b2..c1ba57a9f6 100644 --- a/components/electric/lib/electric/replication/shapes/sent_rows_graph.ex +++ b/components/electric/lib/electric/replication/shapes/sent_rows_graph.ex @@ -1,5 +1,4 @@ defmodule Electric.Replication.Shapes.SentRowsGraph do - require Logger @moduledoc """ Module responsible for operations over the sent rows graph. @@ -109,7 +108,6 @@ defmodule Electric.Replication.Shapes.SentRowsGraph do fn meta -> any_key_matches_predicate?(meta, predicate) end ) - # Remove all edges relating to the request IDs from the graph graph = edges @@ -117,7 +115,10 @@ defmodule Electric.Replication.Shapes.SentRowsGraph do # Retain the maximally connected subgraph that does not contain the # vertices that have been popped - vertices_to_keep = MapSet.difference(MapSet.new(Graph.vertices(graph)), MapSet.new(vertices)) |> MapSet.to_list + vertices_to_keep = + MapSet.difference(MapSet.new(Graph.vertices(graph)), MapSet.new(vertices)) + |> MapSet.to_list() + graph = Graph.subgraph(graph, vertices_to_keep) {vertices, graph} From b863df8a87aa082277f09853a0acf15371222bd6 Mon Sep 17 00:00:00 2001 From: msfstef Date: Thu, 20 Jun 2024 18:21:33 +0300 Subject: [PATCH 4/6] Add a couple of tests --- .../shapes/sent_rows_graph_test.exs | 42 +++++++++++++++++++ 1 file changed, 42 insertions(+) diff --git a/components/electric/test/electric/replication/shapes/sent_rows_graph_test.exs b/components/electric/test/electric/replication/shapes/sent_rows_graph_test.exs index 104ab925e7..701e0492cf 100644 --- a/components/electric/test/electric/replication/shapes/sent_rows_graph_test.exs +++ b/components/electric/test/electric/replication/shapes/sent_rows_graph_test.exs @@ -1,5 +1,47 @@ defmodule Electric.Replication.Shapes.SentRowsGraphTest do use ExUnit.Case, async: true + alias Electric.Replication.Shapes.SentRowsGraph doctest Electric.Replication.Shapes.SentRowsGraph, import: true + + describe "pop_by_request_ids/3" do + test "should return the popped vertices and the new graph" do + graph_init = + Graph.new() + |> Graph.add_edge(:root, :v1, label: {"r1", "l1"}) + |> Graph.add_edge(:root, :v1, label: {"r2", "l1"}) + |> Graph.add_edge(:v1, :v2, label: {"r1", "l2"}) + + assert {[:v2], %Graph{} = graph_new} = + SentRowsGraph.pop_by_request_ids(graph_init, "r1", root_vertex: :root) + + assert [:v1, :root] = Graph.vertices(graph_new) + assert 1 = Graph.num_edges(graph_new) + assert Graph.edge(graph_new, :root, :v1, {"r2", "l1"}) + end + + test "should be able to pop list of requests" do + graph_init = + Graph.new() + |> Graph.add_edge(:root, :v1, label: {"r1", "l1"}) + |> Graph.add_edge(:root, :v1, label: {"r2", "l1"}) + |> Graph.add_edge(:v1, :v2, label: {"r1", "l2"}) + + assert {[:v2, :v1], %Graph{} = graph_new} = + SentRowsGraph.pop_by_request_ids(graph_init, ["r1", "r2"], root_vertex: :root) + + assert [:root] = Graph.vertices(graph_new) + assert 0 = Graph.num_edges(graph_new) + end + + test "should be able to pop empty graph" do + graph_init = Graph.new() + + assert {[], %Graph{} = graph_new} = + SentRowsGraph.pop_by_request_ids(graph_init, ["r1", "r2"]) + + assert [] = Graph.vertices(graph_new) + assert 0 = Graph.num_edges(graph_new) + end + end end From 95d5e3a195c07f8e78420ecd2dc074cf40ede0f8 Mon Sep 17 00:00:00 2001 From: Ilia Borovitinov Date: Tue, 25 Jun 2024 12:43:49 +0300 Subject: [PATCH 5/6] further optimized the function to not build up unneeded sets --- .../replication/shapes/sent_rows_graph.ex | 37 +++++++++---------- .../shapes/sent_rows_graph_test.exs | 2 +- 2 files changed, 19 insertions(+), 20 deletions(-) diff --git a/components/electric/lib/electric/replication/shapes/sent_rows_graph.ex b/components/electric/lib/electric/replication/shapes/sent_rows_graph.ex index c1ba57a9f6..494e9107c0 100644 --- a/components/electric/lib/electric/replication/shapes/sent_rows_graph.ex +++ b/components/electric/lib/electric/replication/shapes/sent_rows_graph.ex @@ -48,7 +48,7 @@ defmodule Electric.Replication.Shapes.SentRowsGraph do ...> |> Graph.add_edge(:v1, :v2, label: {"r1", "l2"}) ...> |> pop_by_request_ids(["r1", "r2"]) iex> popped_vertices - [:v2, :v1] + [:v1, :v2] iex> new_graph #Graph @@ -80,25 +80,25 @@ defmodule Electric.Replication.Shapes.SentRowsGraph do defp do_pop_by_request_id(%Graph{} = graph, %MapSet{} = request_ids, root_vertex) do predicate = fn {id, _} -> MapSet.member?(request_ids, id) end - {edges, vertices} = + {edges, vertices_to_drop} = dfs_traverse( [Graph.Utils.vertex_id(root_vertex)], graph, - {[], []}, + {[], %{}}, fn - ^root_vertex, _, acc -> + _, ^root_vertex, _, acc -> {:next, acc} - v, incoming_edges, {edges, vertices} -> + v_id, v, incoming_edges, {edges, vertices} -> incoming_edges |> Enum.flat_map(fn {source_v, meta} -> Enum.map(Map.keys(meta), &{source_v, v, &1}) end) |> Enum.split_with(&predicate.(elem(&1, 2))) |> case do - {new_edges, []} -> + {_new_edges, []} -> # If all incoming edges match the request ID, we'll pop the vertex - {:next, {new_edges ++ edges, [v | vertices]}} + {:next, {edges, Map.put(vertices, v_id, v)}} {new_edges, _rest} -> # If some incoming edges are unaffected, we'll pop the edges explicitly @@ -108,20 +108,19 @@ defmodule Electric.Replication.Shapes.SentRowsGraph do fn meta -> any_key_matches_predicate?(meta, predicate) end ) - # Remove all edges relating to the request IDs from the graph + vertices_to_keep = + Enum.flat_map(graph.vertices, fn {v_id, v} -> + if is_map_key(vertices_to_drop, v_id), do: [], else: [v] + end) + graph = edges + # Remove all edges relating to the request IDs from the graph |> Enum.reduce(graph, fn {v1, v2, label}, acc -> Graph.delete_edge(acc, v1, v2, label) end) + # Retain the subgraph that does not contain the dropped vertices + |> Graph.subgraph(vertices_to_keep) - # Retain the maximally connected subgraph that does not contain the - # vertices that have been popped - vertices_to_keep = - MapSet.difference(MapSet.new(Graph.vertices(graph)), MapSet.new(vertices)) - |> MapSet.to_list() - - graph = Graph.subgraph(graph, vertices_to_keep) - - {vertices, graph} + {Map.values(vertices_to_drop), graph} end defp any_key_matches_predicate?(map, predicate) when is_map(map), @@ -155,14 +154,14 @@ defmodule Electric.Replication.Shapes.SentRowsGraph do edge_predicate_fun, visited ) - when is_function(fun, 3) and is_function(edge_predicate_fun, 1) do + when is_function(fun, 4) and is_function(edge_predicate_fun, 1) do if MapSet.member?(visited, v_id) do dfs_traverse(rest, g, acc, fun, edge_predicate_fun, visited) else v = Map.get(vs, v_id) in_edges = Enum.map(Map.get(ie, v_id, []), &{Map.fetch!(vs, &1), Map.fetch!(e, {&1, v_id})}) - case fun.(v, in_edges, acc) do + case fun.(v_id, v, in_edges, acc) do {:next, acc2} -> visited = MapSet.put(visited, v_id) diff --git a/components/electric/test/electric/replication/shapes/sent_rows_graph_test.exs b/components/electric/test/electric/replication/shapes/sent_rows_graph_test.exs index 701e0492cf..4235dda92b 100644 --- a/components/electric/test/electric/replication/shapes/sent_rows_graph_test.exs +++ b/components/electric/test/electric/replication/shapes/sent_rows_graph_test.exs @@ -27,7 +27,7 @@ defmodule Electric.Replication.Shapes.SentRowsGraphTest do |> Graph.add_edge(:root, :v1, label: {"r2", "l1"}) |> Graph.add_edge(:v1, :v2, label: {"r1", "l2"}) - assert {[:v2, :v1], %Graph{} = graph_new} = + assert {[:v1, :v2], %Graph{} = graph_new} = SentRowsGraph.pop_by_request_ids(graph_init, ["r1", "r2"], root_vertex: :root) assert [:root] = Graph.vertices(graph_new) From 464747fa86b430714f121dc18b814842fc63c255 Mon Sep 17 00:00:00 2001 From: msfstef Date: Tue, 25 Jun 2024 12:49:52 +0300 Subject: [PATCH 6/6] Add changeset --- .changeset/quick-crabs-accept.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 .changeset/quick-crabs-accept.md diff --git a/.changeset/quick-crabs-accept.md b/.changeset/quick-crabs-accept.md new file mode 100644 index 0000000000..1815e1ae5e --- /dev/null +++ b/.changeset/quick-crabs-accept.md @@ -0,0 +1,5 @@ +--- +"@core/electric": patch +--- + +Improve performance of shape unsubscribe API with alternative `SentRowsGraph` pruning method.