Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Refactor Replication Poller to improve readability #681

Merged
merged 3 commits into from
Sep 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
222 changes: 81 additions & 141 deletions lib/extensions/postgres_cdc_rls/replication_poller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,24 +8,16 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do

require Logger

import Realtime.Helpers,
only: [cancel_timer: 1, decrypt_creds: 5, default_ssl_param: 1, maybe_enforce_ssl_config: 2]
import Realtime.Helpers, only: [cancel_timer: 1, default_ssl_param: 1, connect_db: 10]

alias Extensions.PostgresCdcRls.{Replications, MessageDispatcher}
alias DBConnection.Backoff
alias Extensions.PostgresCdcRls.{Replications, MessageDispatcher}
alias Realtime.Adapters.Changes.{DeletedRecord, NewRecord, UpdatedRecord}
alias Realtime.PubSub

alias Realtime.Adapters.Changes.{
DeletedRecord,
NewRecord,
UpdatedRecord
}

@queue_target 5_000

def start_link(opts) do
GenServer.start_link(__MODULE__, opts)
end
def start_link(opts), do: GenServer.start_link(__MODULE__, opts)

@impl true
def init(args) do
Expand All @@ -39,18 +31,16 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do
args["db_user"],
args["db_password"],
args["db_socket_opts"],
ssl_enforced
1,
@queue_target,
ssl_enforced,
"realtime_rls"
)

tenant = args["id"]

state = %{
backoff:
Backoff.new(
backoff_min: 100,
backoff_max: 5_000,
backoff_type: :rand_exp
),
backoff: Backoff.new(backoff_min: 100, backoff_max: 5_000, backoff_type: :rand_exp),
abc3 marked this conversation as resolved.
Show resolved Hide resolved
conn: conn,
db_host: args["db_host"],
db_port: args["db_port"],
Expand Down Expand Up @@ -99,78 +89,23 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do
cancel_timer(poll_ref)
cancel_timer(retry_ref)

try do
{time, response} =
:timer.tc(Replications, :list_changes, [
conn,
slot_name,
publication,
max_changes,
max_record_bytes
])

Realtime.Telemetry.execute(
[:realtime, :replication, :poller, :query, :stop],
%{duration: time},
%{tenant: tenant}
)

response
catch
{:error, reason} ->
{:error, reason}
end
|> case do
{:ok,
%Postgrex.Result{
columns: ["wal", "is_rls_enabled", "subscription_ids", "errors"] = columns,
rows: [_ | _] = rows,
num_rows: rows_count
}} ->
Enum.reduce(rows, [], fn row, acc ->
columns
|> Enum.zip(row)
|> generate_record()
|> case do
nil ->
acc

record_struct ->
[record_struct | acc]
end
end)
|> Enum.reverse()
|> Enum.each(fn change ->
Phoenix.PubSub.broadcast_from(
PubSub,
self(),
"realtime:postgres:" <> tenant,
change,
MessageDispatcher
)
end)

{:ok, rows_count}

{:ok, _} ->
{:ok, 0}
args = [conn, slot_name, publication, max_changes, max_record_bytes]
{time, list_changes} = :timer.tc(Replications, :list_changes, args)
record_list_changes_telemetry(time, tenant)

{:error, reason} ->
{:error, reason}
end
|> case do
{:ok, rows_num} ->
backoff = Backoff.reset(backoff)
case handle_list_changes_result(list_changes, tenant) do
{:ok, row_count} ->
Backoff.reset(backoff)

poll_ref =
if rows_num > 0 do
pool_ref =
if row_count > 0 do
send(self(), :poll)
nil
else
Process.send_after(self(), :poll, poll_interval_ms)
end

{:noreply, %{state | backoff: backoff, poll_ref: poll_ref}}
{:noreply, %{state | backoff: backoff, poll_ref: pool_ref}}

{:error, %Postgrex.Error{postgres: %{code: :object_in_use, message: msg}}} ->
Logger.error("Error polling replication: :object_in_use")
Expand All @@ -186,14 +121,9 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do

if retry_count > 3 do
case Replications.terminate_backend(conn, slot_name) do
{:ok, :terminated} ->
Logger.warn("Replication slot in use - terminating")

{:error, :slot_not_found} ->
Logger.warn("Replication slot not found")

{:error, error} ->
Logger.warn("Error terminating backend: #{inspect(error)}")
{:ok, :terminated} -> Logger.warn("Replication slot in use - terminating")
{:error, :slot_not_found} -> Logger.warn("Replication slot not found")
{:error, error} -> Logger.warn("Error terminating backend: #{inspect(error)}")
end
end

Expand All @@ -220,6 +150,66 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do
{:noreply, prepare_replication(state)}
end

def slot_name_suffix() do
case System.get_env("SLOT_NAME_SUFFIX") do
nil ->
""

value ->
Logger.debug("Using slot name suffix: " <> value)
"_" <> value
end
end

defp convert_errors([_ | _] = errors), do: errors

defp convert_errors(_), do: nil

defp prepare_replication(
%{backoff: backoff, conn: conn, slot_name: slot_name, retry_count: retry_count} = state
) do
case Replications.prepare_replication(conn, slot_name) do
{:ok, _} ->
send(self(), :poll)
state

{:error, error} ->
Logger.error("Prepare replication error: #{inspect(error)}")
{timeout, backoff} = Backoff.backoff(backoff)
retry_ref = Process.send_after(self(), :retry, timeout)
%{state | backoff: backoff, retry_ref: retry_ref, retry_count: retry_count + 1}
end
end

defp record_list_changes_telemetry(time, tenant) do
Realtime.Telemetry.execute(
[:realtime, :replication, :poller, :query, :stop],
%{duration: time},
%{tenant: tenant}
)
end

defp handle_list_changes_result(
{:ok,
%Postgrex.Result{
columns: ["wal", "is_rls_enabled", "subscription_ids", "errors"] = columns,
rows: [_ | _] = rows,
num_rows: rows_count
}},
tenant
) do
for row <- rows,
change <- columns |> Enum.zip(row) |> generate_record() |> List.wrap() do
topic = "realtime:postgres:" <> tenant
Phoenix.PubSub.broadcast_from(PubSub, self(), topic, change, MessageDispatcher)
end

{:ok, rows_count}
end

defp handle_list_changes_result({:ok, _}, _), do: {:ok, 0}
defp handle_list_changes_result({:error, reason}, _), do: {:error, reason}

def generate_record([
{"wal",
%{
Expand Down Expand Up @@ -294,54 +284,4 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do
end

def generate_record(_), do: nil

def slot_name_suffix() do
case System.get_env("SLOT_NAME_SUFFIX") do
nil ->
""

value ->
Logger.debug("Using slot name suffix: " <> value)
"_" <> value
end
end

defp convert_errors([_ | _] = errors), do: errors

defp convert_errors(_), do: nil

defp connect_db(host, port, name, user, pass, socket_opts, ssl_enforced) do
{host, port, name, user, pass} = decrypt_creds(host, port, name, user, pass)

[
hostname: host,
port: port,
database: name,
password: pass,
username: user,
queue_target: @queue_target,
parameters: [
application_name: "realtime_rls"
],
socket_options: socket_opts
]
|> maybe_enforce_ssl_config(ssl_enforced)
|> Postgrex.start_link()
end

defp prepare_replication(
%{backoff: backoff, conn: conn, slot_name: slot_name, retry_count: retry_count} = state
) do
case Replications.prepare_replication(conn, slot_name) do
{:ok, _} ->
send(self(), :poll)
state

{:error, error} ->
Logger.error("Prepare replication error: #{inspect(error)}")
{timeout, backoff} = Backoff.backoff(backoff)
retry_ref = Process.send_after(self(), :retry, timeout)
%{state | backoff: backoff, retry_ref: retry_ref, retry_count: retry_count + 1}
end
end
end
16 changes: 6 additions & 10 deletions lib/realtime/helpers.ex
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ defmodule Realtime.Helpers do
list(),
non_neg_integer(),
non_neg_integer(),
boolean()
boolean(),
String.t()
) ::
{:ok, pid} | {:error, Postgrex.Error.t() | term()}
def connect_db(
Expand All @@ -42,15 +43,10 @@ defmodule Realtime.Helpers do
socket_opts,
pool \\ 5,
queue_target \\ 5_000,
ssl_enforced \\ true
ssl_enforced \\ true,
application_name \\ "supabase_realtime"
) do
secure_key = Application.get_env(:realtime, :db_enc_key)

host = decrypt!(host, secure_key)
port = decrypt!(port, secure_key)
name = decrypt!(name, secure_key)
pass = decrypt!(pass, secure_key)
user = decrypt!(user, secure_key)
{host, port, name, user, pass} = decrypt_creds(host, port, name, user, pass)

[
hostname: host,
Expand All @@ -61,7 +57,7 @@ defmodule Realtime.Helpers do
pool_size: pool,
queue_target: queue_target,
parameters: [
application_name: "supabase_realtime"
application_name: application_name
],
socket_options: socket_opts
]
Expand Down
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ defmodule Realtime.MixProject do
def project do
[
app: :realtime,
version: "2.22.21",
version: "2.22.22",
elixir: "~> 1.14.0",
elixirc_paths: elixirc_paths(Mix.env()),
start_permanent: Mix.env() == :prod,
Expand Down
2 changes: 1 addition & 1 deletion test/realtime/cluster_strategy/postgres_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
alias Realtime.Cluster.Strategy.Postgres

setup do
Ecto.Adapters.SQL.Sandbox.mode(Realtime.Repo, :auto)

Check warning on line 9 in test/realtime/cluster_strategy/postgres_test.exs

View workflow job for this annotation

GitHub Actions / Formatting Checks

Nested modules could be aliased at the top of the invoking module.
end

test "handle_event/4, :internal, :connect is successful" do
Expand All @@ -24,7 +24,7 @@
{:ok, conn_notif} = PN.start_link(state.meta.opts.())
PN.listen(conn_notif, channel_name)
node = "#{node()}"
assert_receive {:notification, _, _, channel_name, ^node}
assert_receive {:notification, _, _, ^channel_name, ^node}
end

defp libcluster_state() do
Expand Down
Loading