Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(electric): Proposed speedup of Sent Rows Graph popping method #1389

Merged
merged 6 commits into from
Jun 25, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ defmodule Electric.Replication.Shapes.SentRowsGraph do
...> |> Graph.add_edge(:v1, :v2, label: {"r1", "l2"})
...> |> pop_by_request_ids(["r1", "r2"])
iex> popped_vertices
[:v2, :v1]
[:v1, :v2]
iex> new_graph
#Graph<type: directed, vertices: [:root], edges: []>

Expand Down Expand Up @@ -80,25 +80,25 @@ defmodule Electric.Replication.Shapes.SentRowsGraph do
defp do_pop_by_request_id(%Graph{} = graph, %MapSet{} = request_ids, root_vertex) do
predicate = fn {id, _} -> MapSet.member?(request_ids, id) end

{edges, vertices} =
{edges, vertices_to_drop} =
dfs_traverse(
[Graph.Utils.vertex_id(root_vertex)],
graph,
{[], []},
{[], %{}},
fn
^root_vertex, _, acc ->
_, ^root_vertex, _, acc ->
{:next, acc}

v, incoming_edges, {edges, vertices} ->
v_id, v, incoming_edges, {edges, vertices} ->
incoming_edges
|> Enum.flat_map(fn {source_v, meta} ->
Enum.map(Map.keys(meta), &{source_v, v, &1})
end)
|> Enum.split_with(&predicate.(elem(&1, 2)))
|> case do
{_all_edges, []} ->
{_new_edges, []} ->
# If all incoming edges match the request ID, we'll pop the vertex
{:next, {edges, [v | vertices]}}
{:next, {edges, Map.put(vertices, v_id, v)}}

{new_edges, _rest} ->
# If some incoming edges are unaffected, we'll pop the edges explicitly
Expand All @@ -108,12 +108,19 @@ defmodule Electric.Replication.Shapes.SentRowsGraph do
fn meta -> any_key_matches_predicate?(meta, predicate) end
)

vertices_to_keep =
Enum.flat_map(graph.vertices, fn {v_id, v} ->
if is_map_key(vertices_to_drop, v_id), do: [], else: [v]
end)

graph =
edges
# Remove all edges relating to the request IDs from the graph
|> Enum.reduce(graph, fn {v1, v2, label}, acc -> Graph.delete_edge(acc, v1, v2, label) end)
|> Graph.delete_vertices(vertices)
# Retain the subgraph that does not contain the dropped vertices
|> Graph.subgraph(vertices_to_keep)

{vertices, graph}
{Map.values(vertices_to_drop), graph}
end

defp any_key_matches_predicate?(map, predicate) when is_map(map),
Expand Down Expand Up @@ -147,14 +154,14 @@ defmodule Electric.Replication.Shapes.SentRowsGraph do
edge_predicate_fun,
visited
)
when is_function(fun, 3) and is_function(edge_predicate_fun, 1) do
when is_function(fun, 4) and is_function(edge_predicate_fun, 1) do
if MapSet.member?(visited, v_id) do
dfs_traverse(rest, g, acc, fun, edge_predicate_fun, visited)
else
v = Map.get(vs, v_id)
in_edges = Enum.map(Map.get(ie, v_id, []), &{Map.fetch!(vs, &1), Map.fetch!(e, {&1, v_id})})

case fun.(v, in_edges, acc) do
case fun.(v_id, v, in_edges, acc) do
{:next, acc2} ->
visited = MapSet.put(visited, v_id)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,47 @@
defmodule Electric.Replication.Shapes.SentRowsGraphTest do
use ExUnit.Case, async: true
alias Electric.Replication.Shapes.SentRowsGraph

doctest Electric.Replication.Shapes.SentRowsGraph, import: true

describe "pop_by_request_ids/3" do
test "should return the popped vertices and the new graph" do
graph_init =
Graph.new()
|> Graph.add_edge(:root, :v1, label: {"r1", "l1"})
|> Graph.add_edge(:root, :v1, label: {"r2", "l1"})
|> Graph.add_edge(:v1, :v2, label: {"r1", "l2"})

assert {[:v2], %Graph{} = graph_new} =
SentRowsGraph.pop_by_request_ids(graph_init, "r1", root_vertex: :root)

assert [:v1, :root] = Graph.vertices(graph_new)
assert 1 = Graph.num_edges(graph_new)
assert Graph.edge(graph_new, :root, :v1, {"r2", "l1"})
end

test "should be able to pop list of requests" do
graph_init =
Graph.new()
|> Graph.add_edge(:root, :v1, label: {"r1", "l1"})
|> Graph.add_edge(:root, :v1, label: {"r2", "l1"})
|> Graph.add_edge(:v1, :v2, label: {"r1", "l2"})

assert {[:v1, :v2], %Graph{} = graph_new} =
SentRowsGraph.pop_by_request_ids(graph_init, ["r1", "r2"], root_vertex: :root)

assert [:root] = Graph.vertices(graph_new)
assert 0 = Graph.num_edges(graph_new)
end

test "should be able to pop empty graph" do
graph_init = Graph.new()

assert {[], %Graph{} = graph_new} =
SentRowsGraph.pop_by_request_ids(graph_init, ["r1", "r2"])

assert [] = Graph.vertices(graph_new)
assert 0 = Graph.num_edges(graph_new)
end
end
end
Loading