Skip to content

Commit

Permalink
feat(electric): Proposed speedup of Sent Rows Graph popping method (#…
Browse files Browse the repository at this point in the history
…1389)

Addresses
[VAX-1977](https://linear.app/electric-sql/issue/VAX-1977/unsubscribes-really-slow-due-to-sent-rows-graph-popping-implementation)

While running Linearlite multi-project, I noticed unsubscribes took a
very long time, and I found that the `Graph.delete_vertices` operation
is what takes the most time to run by far, not scaling very well.

It doesn't make sense to me that deleting a bunch of vertices from a
tree graph would take this long especially when traversing the whole
graph is fast, so reconstructing it should really not be too much
trouble (?), so my conjecture is that [the `delete_vertices`
implementation](https://github.com/bitwalker/libgraph/blob/460cdfd9a163a533bdab0a160ba7ccf888047927/lib/graph.ex#L890-L926)
which interacts with a map quite a lot is just suboptimal.

The proposed change is a minimal diff to do the same thing which with
the same workload I'm observing >100x improvement in speed with much
better scaling properties, but obviously when you get this kind of
speedup it casts doubt on its correctness.


Instead of deleting vertices, we keep track of all edges to be removed,
as well as vertices to be removed - then run `Graph.remove_edge` which
seems to have a fairly optimal implementation for _all_ edges to be
popped, and instead of removing the vertices with
`Graph.delete_vertices` we use `Graph.subgraph` with a diff of the
graph's existing vertices minus the ones to be removed, which should
generate a maximally connected subgraph containing the specified
vertices.

Since the request related edges have been removed, along with the
disconnected vertices, the subgraph should be (?) the same as what we
were producing before, but I suspect because of [the implementation of
`Graph.subgraph`](https://github.com/bitwalker/libgraph/blob/460cdfd9a163a533bdab0a160ba7ccf888047927/lib/graph.ex#L2224-L2260)
being more aimed at recreating a graph from scratch through traversing
the existing one rather than modifying the existing one it is more
efficient. edit: I suspect also that since once a vertex that forms a
subtree that is going to be removed entirely is marked as removed,
constructing a subgraph ignores the whole subtree whereas
`delete_vertices` will try to clean up the subtree that we're going to
entirely ignore anyway (?)

Even if there is an issue with the correctness of this approach, I
believe we should be able to traverse the graph from the root to
recreate a graph without the specified edges and vertices much faster
than with `Graph.delete_vertices`, even if it requires a more custom
implementation.


Testing with Linearlite varying number of issues and comments, split
over 3 projects/shapes.
The unsubscribe `Shapes.SentRowsGraph.pop_by_request_ids` call takes:

#### For 3k rows  / shape (single shape)
- old ~1200ms
- new ~ 30ms

#### For 10k rows / shape (single shape)
- old ~11000ms
- new ~50ms

#### For 20k rows / shape (multi-shape)
##### Old
- sub 1 shape unsub 1 shape ~ 46000ms
- sub 2 shapes unsub 1 shape ~ 180000ms
- ( didn't bother with 3 shapes)
##### New
- sub 1 shape unsub 1 shape ~100ms
- sub 2 shapes unsub 1 shape ~ 300ms
- sub 3 shapes unsub 1 shape ~ 400ms

---------

Co-authored-by: Ilia Borovitinov <[email protected]>
  • Loading branch information
msfstef and icehaunter committed Jun 25, 2024
1 parent 6002f07 commit 5e16611
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 11 deletions.
5 changes: 5 additions & 0 deletions .changeset/quick-crabs-accept.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@core/electric": patch
---

Improve performance of shape unsubscribe API with alternative `SentRowsGraph` pruning method.
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ defmodule Electric.Replication.Shapes.SentRowsGraph do
...> |> Graph.add_edge(:v1, :v2, label: {"r1", "l2"})
...> |> pop_by_request_ids(["r1", "r2"])
iex> popped_vertices
[:v2, :v1]
[:v1, :v2]
iex> new_graph
#Graph<type: directed, vertices: [:root], edges: []>
Expand Down Expand Up @@ -80,25 +80,25 @@ defmodule Electric.Replication.Shapes.SentRowsGraph do
defp do_pop_by_request_id(%Graph{} = graph, %MapSet{} = request_ids, root_vertex) do
predicate = fn {id, _} -> MapSet.member?(request_ids, id) end

{edges, vertices} =
{edges, vertices_to_drop} =
dfs_traverse(
[Graph.Utils.vertex_id(root_vertex)],
graph,
{[], []},
{[], %{}},
fn
^root_vertex, _, acc ->
_, ^root_vertex, _, acc ->
{:next, acc}

v, incoming_edges, {edges, vertices} ->
v_id, v, incoming_edges, {edges, vertices} ->
incoming_edges
|> Enum.flat_map(fn {source_v, meta} ->
Enum.map(Map.keys(meta), &{source_v, v, &1})
end)
|> Enum.split_with(&predicate.(elem(&1, 2)))
|> case do
{_all_edges, []} ->
{_new_edges, []} ->
# If all incoming edges match the request ID, we'll pop the vertex
{:next, {edges, [v | vertices]}}
{:next, {edges, Map.put(vertices, v_id, v)}}

{new_edges, _rest} ->
# If some incoming edges are unaffected, we'll pop the edges explicitly
Expand All @@ -108,12 +108,19 @@ defmodule Electric.Replication.Shapes.SentRowsGraph do
fn meta -> any_key_matches_predicate?(meta, predicate) end
)

vertices_to_keep =
Enum.flat_map(graph.vertices, fn {v_id, v} ->
if is_map_key(vertices_to_drop, v_id), do: [], else: [v]
end)

graph =
edges
# Remove all edges relating to the request IDs from the graph
|> Enum.reduce(graph, fn {v1, v2, label}, acc -> Graph.delete_edge(acc, v1, v2, label) end)
|> Graph.delete_vertices(vertices)
# Retain the subgraph that does not contain the dropped vertices
|> Graph.subgraph(vertices_to_keep)

{vertices, graph}
{Map.values(vertices_to_drop), graph}
end

defp any_key_matches_predicate?(map, predicate) when is_map(map),
Expand Down Expand Up @@ -147,14 +154,14 @@ defmodule Electric.Replication.Shapes.SentRowsGraph do
edge_predicate_fun,
visited
)
when is_function(fun, 3) and is_function(edge_predicate_fun, 1) do
when is_function(fun, 4) and is_function(edge_predicate_fun, 1) do
if MapSet.member?(visited, v_id) do
dfs_traverse(rest, g, acc, fun, edge_predicate_fun, visited)
else
v = Map.get(vs, v_id)
in_edges = Enum.map(Map.get(ie, v_id, []), &{Map.fetch!(vs, &1), Map.fetch!(e, {&1, v_id})})

case fun.(v, in_edges, acc) do
case fun.(v_id, v, in_edges, acc) do
{:next, acc2} ->
visited = MapSet.put(visited, v_id)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,47 @@
defmodule Electric.Replication.Shapes.SentRowsGraphTest do
use ExUnit.Case, async: true
alias Electric.Replication.Shapes.SentRowsGraph

doctest Electric.Replication.Shapes.SentRowsGraph, import: true

describe "pop_by_request_ids/3" do
test "should return the popped vertices and the new graph" do
graph_init =
Graph.new()
|> Graph.add_edge(:root, :v1, label: {"r1", "l1"})
|> Graph.add_edge(:root, :v1, label: {"r2", "l1"})
|> Graph.add_edge(:v1, :v2, label: {"r1", "l2"})

assert {[:v2], %Graph{} = graph_new} =
SentRowsGraph.pop_by_request_ids(graph_init, "r1", root_vertex: :root)

assert [:v1, :root] = Graph.vertices(graph_new)
assert 1 = Graph.num_edges(graph_new)
assert Graph.edge(graph_new, :root, :v1, {"r2", "l1"})
end

test "should be able to pop list of requests" do
graph_init =
Graph.new()
|> Graph.add_edge(:root, :v1, label: {"r1", "l1"})
|> Graph.add_edge(:root, :v1, label: {"r2", "l1"})
|> Graph.add_edge(:v1, :v2, label: {"r1", "l2"})

assert {[:v1, :v2], %Graph{} = graph_new} =
SentRowsGraph.pop_by_request_ids(graph_init, ["r1", "r2"], root_vertex: :root)

assert [:root] = Graph.vertices(graph_new)
assert 0 = Graph.num_edges(graph_new)
end

test "should be able to pop empty graph" do
graph_init = Graph.new()

assert {[], %Graph{} = graph_new} =
SentRowsGraph.pop_by_request_ids(graph_init, ["r1", "r2"])

assert [] = Graph.vertices(graph_new)
assert 0 = Graph.num_edges(graph_new)
end
end
end

0 comments on commit 5e16611

Please sign in to comment.