Skip to content

Commit

Permalink
fix(electric): Advance the active replication slot even when there ar…
Browse files Browse the repository at this point in the history
…e no writes to electrified tables (#1402)

We've had the logic for advancing the main replication slot for some
time now. That slot is used as a way to retain WAL records, allowing the
sync service to maintain a window within which it can go back in time
and replay old transactions.

A separate replication slot is used for the logical replication
connection that the sync service maintains with Postgres when it's
running. This slot is also known as the active replication slot.

The issue this PR addresses is caused by the fact that the active
replication slot is never advanced when electrified tables aren't seeing
any writes. This manifests as an endlessly growing replication lag and
disk usage when the database sees a constant rate of writes to
non-electrified tables:

![Screenshot from 2024-03-23
18-19-05](https://github.com/electric-sql/electric/assets/207748/2047f530-4945-4d03-a6fd-e6e61fbe5c9a)

With the fix in this PR in place, the sync service can now maintain
constant replication lag that we can further adjust, for example, to
prevent DigitalOcean from raising alerts about high replication lag. On
the chart below you can see how disk usage goes down when the sync
service is upgraded to a patched version: it never goes up the same way
it used to:

![Screenshot from 2024-06-25
13-00-58](https://github.com/electric-sql/electric/assets/207748/a427fbe0-f188-474a-b319-36b0a7d31062)

And here's zoomed-in view of a 6-hour time span with the patched version
running:
![Screenshot from 2024-06-25
13-01-09](https://github.com/electric-sql/electric/assets/207748/9834ca7b-2a2c-4a1a-8140-d355cdeb4739)

Fixes #1285.
  • Loading branch information
alco committed Jul 10, 2024
1 parent 392a36b commit 3dcff9a
Show file tree
Hide file tree
Showing 3 changed files with 133 additions and 13 deletions.
22 changes: 21 additions & 1 deletion components/electric/lib/electric/replication/postgres/client.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ defmodule Electric.Replication.Postgres.Client do
doesn't support connecting via a unix socket.
"""

import Electric.Postgres.Dialect.Postgresql, only: [quote_ident: 1]
import Electric.Postgres.Dialect.Postgresql, only: [escape_quotes: 2, quote_ident: 1]

alias Electric.Postgres.Extension
alias Electric.Postgres.Lsn
Expand Down Expand Up @@ -322,6 +322,26 @@ defmodule Electric.Replication.Postgres.Client do
end
end

@type logical_message_option :: {:transactional?, boolean} | {:prefix, String.t()}
@doc """
Emit a logical message to be consumed by LogicalReplicationProducer.
"""
@spec emit_logical_message(connection, String.t(), [logical_message_option()]) ::
:ok | {:error, term}
def emit_logical_message(conn, message, opts \\ []) do
transactional? = Keyword.get(opts, :transactional?, false)
prefix = Keyword.get(opts, :prefix, "") |> escape_quotes(?')
message = escape_quotes(message, ?')

with {:ok, _, _} <-
squery(
conn,
"SELECT pg_logical_emit_message(#{transactional?}, '#{prefix}', '#{message}')"
) do
:ok
end
end

@relkind %{table: ["r"], index: ["i"], view: ["v", "m"]}

@pg_class_query """
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ defmodule Electric.Replication.Postgres.LogicalReplicationProducer do
advance_timer: nil,
main_slot: "",
main_slot_lsn: %Lsn{},
acked_lsn: %Lsn{},
resumable_wal_window: 1

@type t() :: %__MODULE__{
Expand All @@ -70,6 +71,7 @@ defmodule Electric.Replication.Postgres.LogicalReplicationProducer do
advance_timer: reference() | nil,
main_slot: binary(),
main_slot_lsn: Lsn.t(),
acked_lsn: Lsn.t(),
resumable_wal_window: pos_integer()
}
end
Expand All @@ -86,6 +88,10 @@ defmodule Electric.Replication.Postgres.LogicalReplicationProducer do
@advance_timeout 1_000
end

# 100 MB is somewhat arbitrary, chosen to balance between the frequency of acks and extra
# disk usage.
@active_slot_lag_bytes 100 * 1024 * 1024

@spec start_link(Connectors.config()) :: :ignore | {:error, any} | {:ok, pid}
def start_link(connector_config) do
GenStage.start_link(__MODULE__, connector_config, name: name(connector_config))
Expand Down Expand Up @@ -121,7 +127,7 @@ defmodule Electric.Replication.Postgres.LogicalReplicationProducer do
"Starting replication with publication=#{publication} and slots=#{main_slot},#{tmp_slot}}"
)

# The replication connection is used to consumer the logical replication stream from
# The replication connection is used to consume the logical replication stream from
# Postgres and to send acknowledgements about received transactions back to Postgres,
# allowing it to advance the replication slot forward and discard obsolete WAL records.
with {:ok, repl_conn} <- Client.connect(repl_conn_opts),
Expand Down Expand Up @@ -228,7 +234,8 @@ defmodule Electric.Replication.Postgres.LogicalReplicationProducer do
end

defp process_message(
%Message{transactional?: true, prefix: "electric.fk_chain_touch", content: content},
%Message{transactional?: true, prefix: "electric.fk_chain_touch", content: content} =
msg,
state
) do
received = Jason.decode!(content)
Expand All @@ -241,6 +248,8 @@ defmodule Electric.Replication.Postgres.LogicalReplicationProducer do
ShadowTableTransformation.convert_tag_list_pg_to_satellite(received["tags"], state.origin)
}

state = ack_message(msg, state)

{lsn, txn} = state.transaction

{:noreply, [],
Expand All @@ -249,7 +258,7 @@ defmodule Electric.Replication.Postgres.LogicalReplicationProducer do

defp process_message(%Message{} = msg, state) do
Logger.info("Got a message from PG via logical replication: #{inspect(msg)}")

state = ack_message(msg, state)
{:noreply, [], state}
end

Expand Down Expand Up @@ -385,16 +394,14 @@ defmodule Electric.Replication.Postgres.LogicalReplicationProducer do

defp dispatch_events(%{demand: demand, queue: queue, queue_len: queue_len} = state)
when demand >= queue_len do
queue |> :queue.last() |> ack(state)

state = queue |> :queue.last() |> ack_transaction(state)
state = %{state | queue: :queue.new(), queue_len: 0, demand: demand - queue_len}
{:noreply, :queue.to_list(queue), state}
end

defp dispatch_events(%{demand: demand, queue: queue, queue_len: queue_len} = state) do
{to_emit, queue_remaining} = :queue.split(demand, queue)
to_emit |> :queue.last() |> ack(state)

state = to_emit |> :queue.last() |> ack_transaction(state)
state = %{state | queue: queue_remaining, queue_len: queue_len - demand, demand: 0}
{:noreply, :queue.to_list(to_emit), state}
end
Expand All @@ -408,25 +415,60 @@ defmodule Electric.Replication.Postgres.LogicalReplicationProducer do
|> Map.new(fn {column, data} -> {column.name, data} end)
end

@spec ack(Transaction.t(), State.t()) :: :ok
@spec ack_message(Message.t(), State.t()) :: State.t()

if Mix.env() == :test do
def ack(%Transaction{}, %State{repl_conn: :conn}) do
:ok
def ack_message(_, %State{repl_conn: :conn} = state), do: state
end

def ack_message(%Message{lsn: lsn}, state) do
cond do
state.queue_len > 0 ->
# We still have unacknowledged transactions waiting in the queue. Can't acknowledge the
# message before all transactions in the queue have been processed.
state

Lsn.compare(lsn, state.acked_lsn) != :gt ->
# Either lsn == state.acked_lsn, in which case we can skip acknowledging it for the second time,
# or lsn < state.acked_lsn, which could mean that the message's LSN is lower than the
# LSN of the transaction it was emitted from or another transaction with a higher LSN
# has already been acknowledged.
state

true ->
ack_lsn(lsn, state)
end
end

def ack(%Transaction{lsn: lsn}, state) do
@spec ack_transaction(Transaction.t(), State.t()) :: State.t()

if Mix.env() == :test do
def ack_transaction(_, %State{repl_conn: :conn} = state), do: state
end

def ack_transaction(%Transaction{lsn: lsn}, state) do
ack_lsn(lsn, state)
end

def ack_lsn(%Lsn{} = lsn, state) do
assert_lsn_is_advancing!(lsn, state.acked_lsn, Lsn.compare(lsn, state.acked_lsn))

Logger.debug("Acknowledging #{lsn}", origin: state.origin)
Client.acknowledge_lsn(state.repl_conn, lsn)
:ok = Client.acknowledge_lsn(state.repl_conn, lsn)
%{state | acked_lsn: lsn}
end

defp assert_lsn_is_advancing!(_lsn, _acked_lsn, :gt), do: :ok

# Advance the replication slot to let Postgres discard old WAL records.
#
# TODO: make sure we're not removing transactions that are about to be requested by a newly
# connected client. See VAX-1552.
defp advance_main_slot(state) do
{:ok, current_lsn} = Client.current_lsn(state.svc_conn)

check_active_slot_lag(current_lsn, state)

min_in_window_lsn = Lsn.increment(current_lsn, -state.resumable_wal_window)

if Lsn.compare(state.main_slot_lsn, min_in_window_lsn) == :lt do
Expand All @@ -440,6 +482,17 @@ defmodule Electric.Replication.Postgres.LogicalReplicationProducer do
end
end

defp check_active_slot_lag(current_lsn, state) do
lsn_threshold = Lsn.increment(current_lsn, -@active_slot_lag_bytes)

if Lsn.compare(state.acked_lsn, lsn_threshold) == :lt do
# If there's more than `@active_slot_lag_bytes` between the current LSN and the last
# ack'ed LSN, emit a logical message to be consumed by the producer in order to advance
# the active slot and prevent it from stalling removal of old WAL records by Postgres.
:ok = Client.emit_logical_message(state.svc_conn, "advance active slot")
end
end

defp schedule_main_slot_advance(state) do
tref = :erlang.start_timer(@advance_timeout, self(), @advance_msg)
%State{state | advance_timer: tref}
Expand Down
47 changes: 47 additions & 0 deletions e2e/tests/01.08_electric_acknowledges_logical_message_lsn.lux
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
[doc Electric acknowledges logical message's LSN]
[include _shared.luxinc]

[invoke setup]

[shell pg_1]
!SELECT pg_logical_emit_message(false, '', 'hello from PG');
?? pg_logical_emit_message
??-------------------------
?(\d+/[0-9a-fA-F]+)
[my logical_msg_lsn=$1]
??(1 row)

[shell electric]
??Got a message from PG via logical replication: \
%Electric.Postgres.LogicalReplication.Messages.Message{\
transactional?: false, \
lsn: #Lsn<$logical_msg_lsn>, \
prefix: "", \
content: "hello from PG"}
??Acknowledging $logical_msg_lsn

[shell pg_1]
!BEGIN;
??BEGIN

!SELECT pg_logical_emit_message(true, '', 'hello from PG transaction');
?? pg_logical_emit_message
??-------------------------
?(\d+/[0-9a-fA-F]+)
[my logical_msg_lsn=$1]
??(1 row)

!COMMIT;
??COMMIT

[shell electric]
??Got a message from PG via logical replication: \
%Electric.Postgres.LogicalReplication.Messages.Message{\
transactional?: true, \
lsn: #Lsn<$logical_msg_lsn>, \
prefix: "", \
content: "hello from PG transaction"}
??Acknowledging $logical_msg_lsn

[cleanup]
[invoke teardown]

0 comments on commit 3dcff9a

Please sign in to comment.