Skip to content

Commit

Permalink
feat(sync-service): Streaming the response to the client without wait…
Browse files Browse the repository at this point in the history
…ing for the snapshot to finish (#1517)

Previously shapes with one million rows or more (170MB+) would timeout
waiting for the snapshot to be created. Now the snapshot is streamed
from the database into storage while simultaneously being streamed from
storage to the client. This means the first packets of the response can
be sent without waiting for the snapshot to finish. This means we now
support tables with over a million rows. Ilia is currently benchmarking
this branch to see what the new limits are.

This PR addresses #1438 and #1444

I think there are quite a few simplifications that could happen off the
back of this change, but I have kept the refactoring to a minimum in
this PR and will instead address the simplifications in separate PRs.
  • Loading branch information
robacourt authored Aug 14, 2024
1 parent ea0b73f commit 1803392
Show file tree
Hide file tree
Showing 13 changed files with 458 additions and 164 deletions.
5 changes: 5 additions & 0 deletions .changeset/kind-starfishes-tap.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@core/sync-service": patch
---

Support larger shapes (1 million row, 170MB +) and faster time to first byte
51 changes: 51 additions & 0 deletions packages/sync-service/lib/electric/concurrent_stream.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
defmodule Electric.ConcurrentStream do
@default_poll_time 10

@doc """
Allows concurrent reading while writing of a stream.
There can be mutiple reading processes however there must be only one writing process.
The writing process must append an end marker to the end of the stream when it has finished
to signal to the reading processes that the stream has ended.
If a read process runs out of data to read before the end marker has been written
it waits the `poll_time_in_ms` for more data to be written, then resumes the stream
with the `stream_fun`.
"""

def stream_to_end(opts) do
excluded_start_key = Keyword.fetch!(opts, :excluded_start_key)
end_marker_key = Keyword.fetch!(opts, :end_marker_key)
stream_fun = Keyword.fetch!(opts, :stream_fun)

stream_fun.(excluded_start_key, end_marker_key)
|> continue_if_not_ended(excluded_start_key, opts)
end

defp continue_if_not_ended(stream, latest_key, opts) do
end_marker_key = Keyword.fetch!(opts, :end_marker_key)
stream_fun = Keyword.fetch!(opts, :stream_fun)
poll_time_in_ms = Keyword.get(opts, :poll_time_in_ms, @default_poll_time)

[stream, [:premature_end]]
|> Stream.concat()
|> Stream.transform(latest_key, fn
:premature_end, latest_key ->
# Wait for more items to be added
Process.sleep(poll_time_in_ms)

# Continue from the latest_key
stream =
stream_fun.(latest_key, end_marker_key)
|> continue_if_not_ended(latest_key, opts)

{stream, latest_key}

{^end_marker_key, _}, _latest_key ->
{:halt, :end_marker_seen}

{key, _value} = item, _latest_key ->
{[item], key}
end)
end
end
48 changes: 27 additions & 21 deletions packages/sync-service/lib/electric/shape_cache.ex
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ defmodule Electric.ShapeCacheBehaviour do
{shape_id(), current_snapshot_offset :: LogOffset.t()}

@callback list_active_shapes(opts :: keyword()) :: [{shape_id(), shape_def(), xmin()}]
@callback wait_for_snapshot(GenServer.name(), shape_id()) :: :ready | {:error, term()}
@callback await_snapshot_start(GenServer.name(), shape_id()) :: :started | {:error, term()}
@callback handle_truncate(GenServer.name(), shape_id()) :: :ok
@callback clean_shape(GenServer.name(), shape_id()) :: :ok
end
Expand Down Expand Up @@ -136,9 +136,9 @@ defmodule Electric.ShapeCache do
GenServer.call(server, {:truncate, shape_id})
end

@spec wait_for_snapshot(GenServer.name(), String.t()) :: :ready | {:error, term()}
def wait_for_snapshot(server \\ __MODULE__, shape_id) when is_binary(shape_id) do
GenServer.call(server, {:wait_for_snapshot, shape_id}, 30_000)
@spec await_snapshot_start(GenServer.name(), String.t()) :: :started | {:error, term()}
def await_snapshot_start(server \\ __MODULE__, shape_id) when is_binary(shape_id) do
GenServer.call(server, {:await_snapshot_start, shape_id})
end

def init(opts) do
Expand All @@ -148,7 +148,7 @@ defmodule Electric.ShapeCache do
state = %{
storage: opts.storage,
shape_meta_table: shape_meta_table,
waiting_for_creation: %{},
awaiting_snapshot_start: %{},
db_pool: opts.db_pool,
create_snapshot_fn: opts.create_snapshot_fn,
prepare_tables_fn: opts.prepare_tables_fn
Expand Down Expand Up @@ -188,13 +188,13 @@ defmodule Electric.ShapeCache do
{:reply, {shape_id, latest_offset}, state}
end

def handle_call({:wait_for_snapshot, shape_id}, from, state) do
def handle_call({:await_snapshot_start, shape_id}, from, state) do
cond do
not is_known_shape_id?(state, shape_id) ->
{:reply, {:error, :unknown}, state}

Storage.snapshot_exists?(shape_id, state.storage) ->
{:reply, :ready, state}
Storage.snapshot_started?(shape_id, state.storage) ->
{:reply, :started, state}

true ->
Logger.debug("Starting a wait on the snapshot #{shape_id} for #{inspect(from)}}")
Expand Down Expand Up @@ -235,10 +235,11 @@ defmodule Electric.ShapeCache do
{:noreply, state}
end

def handle_cast({:snapshot_ready, shape_id}, state) do
def handle_cast({:snapshot_started, shape_id}, state) do
Logger.debug("Snapshot for #{shape_id} is ready")
{waiting, state} = pop_in(state, [:waiting_for_creation, shape_id])
for client <- List.wrap(waiting), not is_nil(client), do: GenServer.reply(client, :ready)
Storage.mark_snapshot_as_started(shape_id, state.storage)
{waiting, state} = pop_in(state, [:awaiting_snapshot_start, shape_id])
for client <- List.wrap(waiting), not is_nil(client), do: GenServer.reply(client, :started)
{:noreply, state}
end

Expand All @@ -248,8 +249,13 @@ defmodule Electric.ShapeCache do
)

clean_up_shape(state, shape_id)
{waiting, state} = pop_in(state, [:waiting_for_creation, shape_id])
for client <- waiting, not is_nil(client), do: GenServer.reply(client, {:error, error})
{waiting, state} = pop_in(state, [:awaiting_snapshot_start, shape_id])

# waiting may nil here if :snapshot_failed happens after :snapshot_started
if waiting do
for client <- waiting, not is_nil(client), do: GenServer.reply(client, {:error, error})
end

{:noreply, state}
end

Expand All @@ -275,12 +281,12 @@ defmodule Electric.ShapeCache do
shape
end

defp maybe_start_snapshot(%{waiting_for_creation: map} = state, shape_id, _)
defp maybe_start_snapshot(%{awaiting_snapshot_start: map} = state, shape_id, _)
when is_map_key(map, shape_id),
do: state

defp maybe_start_snapshot(state, shape_id, shape) do
if not Storage.snapshot_exists?(shape_id, state.storage) do
if not Storage.snapshot_started?(shape_id, state.storage) do
parent = self()

%{
Expand All @@ -298,9 +304,7 @@ defmodule Electric.ShapeCache do
fn ->
try do
Utils.apply_fn_or_mfa(prepare_tables_fn_or_mfa, [pool, affected_tables])

apply(create_snapshot_fn, [parent, shape_id, shape, pool, storage])
GenServer.cast(parent, {:snapshot_ready, shape_id})
rescue
error -> GenServer.cast(parent, {:snapshot_failed, shape_id, error, __STACKTRACE__})
end
Expand All @@ -321,10 +325,10 @@ defmodule Electric.ShapeCache do
end
end

defp add_waiter(%{waiting_for_creation: waiters} = state, shape_id, waiter),
defp add_waiter(%{awaiting_snapshot_start: waiters} = state, shape_id, waiter),
do: %{
state
| waiting_for_creation: Map.update(waiters, shape_id, [waiter], &[waiter | &1])
| awaiting_snapshot_start: Map.update(waiters, shape_id, [waiter], &[waiter | &1])
}

@doc false
Expand All @@ -340,12 +344,14 @@ defmodule Electric.ShapeCache do
%{rows: [[xmin]]} =
Postgrex.query!(conn, "SELECT pg_snapshot_xmin(pg_current_snapshot())", [])

GenServer.cast(parent, {:snapshot_xmin_known, shape_id, xmin})

# Enforce display settings *before* querying initial data to maintain consistent
# formatting between snapshot and live log entries.
Enum.each(Electric.Postgres.display_settings(), &Postgrex.query!(conn, &1, []))

GenServer.cast(parent, {:snapshot_xmin_known, shape_id, xmin})
{query, stream} = Querying.stream_initial_data(conn, shape)
GenServer.cast(parent, {:snapshot_started, shape_id})

# could pass the shape and then make_new_snapshot! can pass it to row_to_snapshot_item
# that way it has the relation, but it is still missing the pk_cols
Expand All @@ -355,7 +361,7 @@ defmodule Electric.ShapeCache do
end

defp recover_shapes(state) do
Storage.cleanup_shapes_without_xmins(state.storage)
Storage.initialise(state.storage)

state.storage
|> Storage.list_shapes()
Expand Down
61 changes: 43 additions & 18 deletions packages/sync-service/lib/electric/shape_cache/cub_db_storage.ex
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
defmodule Electric.ShapeCache.CubDbStorage do
alias Electric.ConcurrentStream
alias Electric.LogItems
alias Electric.Replication.LogOffset
alias Electric.Telemetry.OpenTelemetry
@behaviour Electric.ShapeCache.Storage

# If the storage format changes, increase `@version` to prevent
# the incompatable older versions being read
@version 1
@version_key :version
@snapshot_key_type 0
@log_key_type 1
@snapshot_offset LogOffset.first()
Expand All @@ -12,7 +17,7 @@ defmodule Electric.ShapeCache.CubDbStorage do
file_path = Access.get(opts, :file_path, "./shapes")
db = Access.get(opts, :db, :shape_db)

{:ok, %{file_path: file_path, db: db}}
{:ok, %{file_path: file_path, db: db, version: @version}}
end

def child_spec(opts) do
Expand All @@ -29,12 +34,20 @@ defmodule Electric.ShapeCache.CubDbStorage do
CubDB.start_link(data_dir: opts.file_path, name: opts.db)
end

def cleanup_shapes_without_xmins(opts) do
def initialise(opts) do
stored_version = stored_version(opts)

opts.db
|> CubDB.select(min_key: shapes_start(), max_key: shapes_end())
|> Stream.map(fn {{:shapes, shape_id}, _} -> shape_id end)
|> Stream.reject(&snapshot_xmin(&1, opts))
|> Stream.filter(fn shape_id ->
stored_version != opts.version ||
snapshot_xmin(shape_id, opts) == nil ||
CubDB.has_key?(opts.db, snapshot_end(shape_id)) == false
end)
|> Enum.each(&cleanup!(&1, opts))

CubDB.put(opts.db, @version_key, @version)
end

def list_shapes(opts) do
Expand Down Expand Up @@ -77,17 +90,26 @@ defmodule Electric.ShapeCache.CubDbStorage do
end
end

@spec snapshot_exists?(any(), any()) :: false
def snapshot_exists?(shape_id, opts) do
CubDB.has_key?(opts.db, snapshot_meta_key(shape_id))
@spec snapshot_started?(any(), any()) :: false
def snapshot_started?(shape_id, opts) do
CubDB.has_key?(opts.db, snapshot_start(shape_id))
end

def get_snapshot(shape_id, opts) do
stream =
opts.db
|> CubDB.select(
min_key: snapshot_start(shape_id),
max_key: snapshot_end(shape_id)
ConcurrentStream.stream_to_end(
excluded_start_key: snapshot_start(shape_id),
end_marker_key: snapshot_end(shape_id),
poll_time_in_ms: 10,
stream_fun: fn excluded_start_key, included_end_key ->
if !snapshot_started?(shape_id, opts), do: raise("Snapshot no longer available")

CubDB.select(opts.db,
min_key: excluded_start_key,
max_key: included_end_key,
min_key_inclusive: false
)
end
)
|> Stream.flat_map(fn {_, items} -> items end)
|> Stream.map(fn {_, item} -> item end)
Expand All @@ -112,7 +134,11 @@ defmodule Electric.ShapeCache.CubDbStorage do
def has_log_entry?(shape_id, offset, opts) do
# FIXME: this is naive while we don't have snapshot metadata to get real offsets
CubDB.has_key?(opts.db, log_key(shape_id, offset)) or
(snapshot_exists?(shape_id, opts) and offset == @snapshot_offset)
(snapshot_started?(shape_id, opts) and offset == @snapshot_offset)
end

def mark_snapshot_as_started(shape_id, opts) do
CubDB.put(opts.db, snapshot_start(shape_id), 0)
end

def make_new_snapshot!(shape_id, shape, query_info, data_stream, opts) do
Expand All @@ -127,7 +153,7 @@ defmodule Electric.ShapeCache.CubDbStorage do
|> Stream.each(fn [{key, _} | _] = chunk -> CubDB.put(opts.db, key, chunk) end)
|> Stream.run()

CubDB.put(opts.db, snapshot_meta_key(shape_id), 0)
CubDB.put(opts.db, snapshot_end(shape_id), 0)
end)
end

Expand All @@ -141,7 +167,6 @@ defmodule Electric.ShapeCache.CubDbStorage do

def cleanup!(shape_id, opts) do
[
snapshot_meta_key(shape_id),
shape_key(shape_id),
xmin_key(shape_id)
]
Expand All @@ -155,10 +180,6 @@ defmodule Electric.ShapeCache.CubDbStorage do
|> Stream.map(&elem(&1, 0))
end

defp snapshot_meta_key(shape_id) do
{:snapshot_metadata, shape_id}
end

defp snapshot_key(shape_id, index) do
{shape_id, @snapshot_key_type, index}
end
Expand Down Expand Up @@ -187,6 +208,10 @@ defmodule Electric.ShapeCache.CubDbStorage do
defp log_start(shape_id), do: log_key(shape_id, LogOffset.first())
defp log_end(shape_id), do: log_key(shape_id, LogOffset.last())

defp snapshot_start(shape_id), do: snapshot_key(shape_id, 0)
defp snapshot_start(shape_id), do: snapshot_key(shape_id, -1)
defp snapshot_end(shape_id), do: snapshot_key(shape_id, :end)

defp stored_version(opts) do
CubDB.get(opts.db, @version_key)
end
end
Loading

0 comments on commit 1803392

Please sign in to comment.