From 923dfa21b66ed2087cfdf997a0b8db18692dd218 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Filipe=20Caba=C3=A7o?= Date: Fri, 22 Sep 2023 10:23:54 +0100 Subject: [PATCH 1/3] fix: Refactor Replication Poller to improve readability --- .../postgres_cdc_rls/replication_poller.ex | 271 +++++++++--------- mix.exs | 2 +- 2 files changed, 136 insertions(+), 137 deletions(-) diff --git a/lib/extensions/postgres_cdc_rls/replication_poller.ex b/lib/extensions/postgres_cdc_rls/replication_poller.ex index b6a39f9f7..523595d84 100644 --- a/lib/extensions/postgres_cdc_rls/replication_poller.ex +++ b/lib/extensions/postgres_cdc_rls/replication_poller.ex @@ -11,15 +11,15 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do import Realtime.Helpers, only: [cancel_timer: 1, decrypt_creds: 5, default_ssl_param: 1, maybe_enforce_ssl_config: 2] - alias Extensions.PostgresCdcRls.{Replications, MessageDispatcher} alias DBConnection.Backoff - alias Realtime.PubSub - alias Realtime.Adapters.Changes.{ - DeletedRecord, - NewRecord, - UpdatedRecord - } + alias Extensions.PostgresCdcRls.MessageDispatcher + alias Extensions.PostgresCdcRls.Replications + + alias Realtime.Adapters.Changes.DeletedRecord + alias Realtime.Adapters.Changes.NewRecord + alias Realtime.Adapters.Changes.UpdatedRecord + alias Realtime.PubSub @queue_target 5_000 @@ -45,12 +45,7 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do tenant = args["id"] state = %{ - backoff: - Backoff.new( - backoff_min: 100, - backoff_max: 5_000, - backoff_type: :rand_exp - ), + backoff: Backoff.new(backoff_min: 100, backoff_max: 5_000, backoff_type: :rand_exp), conn: conn, db_host: args["db_host"], db_port: args["db_port"], @@ -99,76 +94,27 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do cancel_timer(poll_ref) cancel_timer(retry_ref) - try do - {time, response} = - :timer.tc(Replications, :list_changes, [ - conn, - slot_name, - publication, - max_changes, - max_record_bytes - ]) - - Realtime.Telemetry.execute( - [:realtime, :replication, :poller, :query, :stop], - %{duration: time}, - %{tenant: tenant} + broadcast_count = + conn + |> list_changes_with_telemetry( + slot_name, + publication, + max_changes, + max_record_bytes, + tenant ) + |> handle_list_changes_result(tenant) - response - catch - {:error, reason} -> - {:error, reason} - end - |> case do - {:ok, - %Postgrex.Result{ - columns: ["wal", "is_rls_enabled", "subscription_ids", "errors"] = columns, - rows: [_ | _] = rows, - num_rows: rows_count - }} -> - Enum.reduce(rows, [], fn row, acc -> - columns - |> Enum.zip(row) - |> generate_record() - |> case do - nil -> - acc - - record_struct -> - [record_struct | acc] - end - end) - |> Enum.reverse() - |> Enum.each(fn change -> - Phoenix.PubSub.broadcast_from( - PubSub, - self(), - "realtime:postgres:" <> tenant, - change, - MessageDispatcher - ) - end) - - {:ok, rows_count} + case broadcast_count do + {:ok, 0} -> + backoff = Backoff.reset(backoff) + send(self(), :poll) - {:ok, _} -> - {:ok, 0} + {:noreply, %{state | backoff: backoff, poll_ref: nil}} - {:error, reason} -> - {:error, reason} - end - |> case do - {:ok, rows_num} -> + {:ok, _} -> backoff = Backoff.reset(backoff) - - poll_ref = - if rows_num > 0 do - send(self(), :poll) - nil - else - Process.send_after(self(), :poll, poll_interval_ms) - end + poll_ref = Process.send_after(self(), :poll, poll_interval_ms) {:noreply, %{state | backoff: backoff, poll_ref: poll_ref}} @@ -186,14 +132,9 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do if retry_count > 3 do case Replications.terminate_backend(conn, slot_name) do - {:ok, :terminated} -> - Logger.warn("Replication slot in use - terminating") - - {:error, :slot_not_found} -> - Logger.warn("Replication slot not found") - - {:error, error} -> - Logger.warn("Error terminating backend: #{inspect(error)}") + {:ok, :terminated} -> Logger.warn("Replication slot in use - terminating") + {:error, :slot_not_found} -> Logger.warn("Replication slot not found") + {:error, error} -> Logger.warn("Error terminating backend: #{inspect(error)}") end end @@ -220,6 +161,114 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do {:noreply, prepare_replication(state)} end + def slot_name_suffix() do + case System.get_env("SLOT_NAME_SUFFIX") do + nil -> + "" + + value -> + Logger.debug("Using slot name suffix: " <> value) + "_" <> value + end + end + + defp convert_errors([_ | _] = errors), do: errors + + defp convert_errors(_), do: nil + + defp connect_db(host, port, name, user, pass, socket_opts, ssl_enforced) do + {host, port, name, user, pass} = decrypt_creds(host, port, name, user, pass) + + [ + hostname: host, + port: port, + database: name, + password: pass, + username: user, + queue_target: @queue_target, + parameters: [application_name: "realtime_rls"], + socket_options: socket_opts + ] + |> maybe_enforce_ssl_config(ssl_enforced) + |> Postgrex.start_link() + end + + defp prepare_replication( + %{backoff: backoff, conn: conn, slot_name: slot_name, retry_count: retry_count} = state + ) do + case Replications.prepare_replication(conn, slot_name) do + {:ok, _} -> + send(self(), :poll) + state + + {:error, error} -> + Logger.error("Prepare replication error: #{inspect(error)}") + {timeout, backoff} = Backoff.backoff(backoff) + retry_ref = Process.send_after(self(), :retry, timeout) + %{state | backoff: backoff, retry_ref: retry_ref, retry_count: retry_count + 1} + end + end + + defp list_changes_with_telemetry( + conn, + slot_name, + publication, + max_changes, + max_record_bytes, + tenant + ) do + args = [ + conn, + slot_name, + publication, + max_changes, + max_record_bytes + ] + + {time, response} = :timer.tc(Replications, :list_changes, args) + + Realtime.Telemetry.execute( + [:realtime, :replication, :poller, :query, :stop], + %{duration: time}, + %{tenant: tenant} + ) + + response + catch + {:error, reason} -> {:error, reason} + end + + defp handle_list_changes_result( + {:ok, + %Postgrex.Result{ + columns: ["wal", "is_rls_enabled", "subscription_ids", "errors"] = columns, + rows: [_ | _] = rows, + num_rows: rows_count + }}, + tenant + ) do + rows + |> Enum.reduce([], fn row, acc -> + columns + |> Enum.zip(row) + |> generate_record() + |> then(fn + nil -> acc + record_struct -> [record_struct | acc] + end) + end) + |> Enum.reverse() + |> Enum.each(fn change -> + topic = "realtime:postgres:" <> tenant + Phoenix.PubSub.broadcast_from(PubSub, self(), topic, change, MessageDispatcher) + end) + + {:ok, rows_count} + end + + defp handle_list_changes_result({:ok, _}, _), do: {:ok, 0} + defp handle_list_changes_result({:error, reason}, _), do: {:error, reason} + def generate_record([ {"wal", %{ @@ -294,54 +343,4 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do end def generate_record(_), do: nil - - def slot_name_suffix() do - case System.get_env("SLOT_NAME_SUFFIX") do - nil -> - "" - - value -> - Logger.debug("Using slot name suffix: " <> value) - "_" <> value - end - end - - defp convert_errors([_ | _] = errors), do: errors - - defp convert_errors(_), do: nil - - defp connect_db(host, port, name, user, pass, socket_opts, ssl_enforced) do - {host, port, name, user, pass} = decrypt_creds(host, port, name, user, pass) - - [ - hostname: host, - port: port, - database: name, - password: pass, - username: user, - queue_target: @queue_target, - parameters: [ - application_name: "realtime_rls" - ], - socket_options: socket_opts - ] - |> maybe_enforce_ssl_config(ssl_enforced) - |> Postgrex.start_link() - end - - defp prepare_replication( - %{backoff: backoff, conn: conn, slot_name: slot_name, retry_count: retry_count} = state - ) do - case Replications.prepare_replication(conn, slot_name) do - {:ok, _} -> - send(self(), :poll) - state - - {:error, error} -> - Logger.error("Prepare replication error: #{inspect(error)}") - {timeout, backoff} = Backoff.backoff(backoff) - retry_ref = Process.send_after(self(), :retry, timeout) - %{state | backoff: backoff, retry_ref: retry_ref, retry_count: retry_count + 1} - end - end end diff --git a/mix.exs b/mix.exs index 14a8a9cac..3253e700b 100644 --- a/mix.exs +++ b/mix.exs @@ -4,7 +4,7 @@ defmodule Realtime.MixProject do def project do [ app: :realtime, - version: "2.22.21", + version: "2.22.22", elixir: "~> 1.14.0", elixirc_paths: elixirc_paths(Mix.env()), start_permanent: Mix.env() == :prod, From f51407477261e640bacc257ed16bd036fb105d1b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Filipe=20Caba=C3=A7o?= Date: Fri, 22 Sep 2023 17:40:04 +0100 Subject: [PATCH 2/3] Remove function in favour of with statement --- .../postgres_cdc_rls/replication_poller.ex | 60 +++++-------------- 1 file changed, 15 insertions(+), 45 deletions(-) diff --git a/lib/extensions/postgres_cdc_rls/replication_poller.ex b/lib/extensions/postgres_cdc_rls/replication_poller.ex index 523595d84..48548ef75 100644 --- a/lib/extensions/postgres_cdc_rls/replication_poller.ex +++ b/lib/extensions/postgres_cdc_rls/replication_poller.ex @@ -94,30 +94,21 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do cancel_timer(poll_ref) cancel_timer(retry_ref) - broadcast_count = - conn - |> list_changes_with_telemetry( - slot_name, - publication, - max_changes, - max_record_bytes, - tenant - ) - |> handle_list_changes_result(tenant) - - case broadcast_count do - {:ok, 0} -> - backoff = Backoff.reset(backoff) - send(self(), :poll) - - {:noreply, %{state | backoff: backoff, poll_ref: nil}} - - {:ok, _} -> - backoff = Backoff.reset(backoff) - poll_ref = Process.send_after(self(), :poll, poll_interval_ms) - - {:noreply, %{state | backoff: backoff, poll_ref: poll_ref}} + with args <- [conn, slot_name, publication, max_changes, max_record_bytes], + {time, list_changes} <- :timer.tc(Replications, :list_changes, args), + _ <- record_list_changes_telemetry(time, tenant), + {:ok, row_count} <- handle_list_changes_result(list_changes, tenant), + backoff <- Backoff.reset(backoff) do + pool_ref = + if row_count == 0 do + send(self(), :poll) + nil + else + Process.send_after(self(), :poll, poll_interval_ms) + end + {:noreply, %{state | backoff: backoff, poll_ref: pool_ref}} + else {:error, %Postgrex.Error{postgres: %{code: :object_in_use, message: msg}}} -> Logger.error("Error polling replication: :object_in_use") @@ -209,33 +200,12 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do end end - defp list_changes_with_telemetry( - conn, - slot_name, - publication, - max_changes, - max_record_bytes, - tenant - ) do - args = [ - conn, - slot_name, - publication, - max_changes, - max_record_bytes - ] - - {time, response} = :timer.tc(Replications, :list_changes, args) - + defp record_list_changes_telemetry(time, tenant) do Realtime.Telemetry.execute( [:realtime, :replication, :poller, :query, :stop], %{duration: time}, %{tenant: tenant} ) - - response - catch - {:error, reason} -> {:error, reason} end defp handle_list_changes_result( From 32d6955b245c72e29b1cb9eed32638b89be357bf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Filipe=20Caba=C3=A7o?= Date: Tue, 26 Sep 2023 00:33:47 +0100 Subject: [PATCH 3/3] Apply PR feedback --- .../postgres_cdc_rls/replication_poller.ex | 85 ++++++------------- lib/realtime/helpers.ex | 16 ++-- .../cluster_strategy/postgres_test.exs | 2 +- 3 files changed, 35 insertions(+), 68 deletions(-) diff --git a/lib/extensions/postgres_cdc_rls/replication_poller.ex b/lib/extensions/postgres_cdc_rls/replication_poller.ex index 48548ef75..093dcf8eb 100644 --- a/lib/extensions/postgres_cdc_rls/replication_poller.ex +++ b/lib/extensions/postgres_cdc_rls/replication_poller.ex @@ -8,24 +8,16 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do require Logger - import Realtime.Helpers, - only: [cancel_timer: 1, decrypt_creds: 5, default_ssl_param: 1, maybe_enforce_ssl_config: 2] + import Realtime.Helpers, only: [cancel_timer: 1, default_ssl_param: 1, connect_db: 10] alias DBConnection.Backoff - - alias Extensions.PostgresCdcRls.MessageDispatcher - alias Extensions.PostgresCdcRls.Replications - - alias Realtime.Adapters.Changes.DeletedRecord - alias Realtime.Adapters.Changes.NewRecord - alias Realtime.Adapters.Changes.UpdatedRecord + alias Extensions.PostgresCdcRls.{Replications, MessageDispatcher} + alias Realtime.Adapters.Changes.{DeletedRecord, NewRecord, UpdatedRecord} alias Realtime.PubSub @queue_target 5_000 - def start_link(opts) do - GenServer.start_link(__MODULE__, opts) - end + def start_link(opts), do: GenServer.start_link(__MODULE__, opts) @impl true def init(args) do @@ -39,7 +31,10 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do args["db_user"], args["db_password"], args["db_socket_opts"], - ssl_enforced + 1, + @queue_target, + ssl_enforced, + "realtime_rls" ) tenant = args["id"] @@ -94,21 +89,24 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do cancel_timer(poll_ref) cancel_timer(retry_ref) - with args <- [conn, slot_name, publication, max_changes, max_record_bytes], - {time, list_changes} <- :timer.tc(Replications, :list_changes, args), - _ <- record_list_changes_telemetry(time, tenant), - {:ok, row_count} <- handle_list_changes_result(list_changes, tenant), - backoff <- Backoff.reset(backoff) do - pool_ref = - if row_count == 0 do - send(self(), :poll) - nil - else - Process.send_after(self(), :poll, poll_interval_ms) - end + args = [conn, slot_name, publication, max_changes, max_record_bytes] + {time, list_changes} = :timer.tc(Replications, :list_changes, args) + record_list_changes_telemetry(time, tenant) + + case handle_list_changes_result(list_changes, tenant) do + {:ok, row_count} -> + Backoff.reset(backoff) + + pool_ref = + if row_count > 0 do + send(self(), :poll) + nil + else + Process.send_after(self(), :poll, poll_interval_ms) + end + + {:noreply, %{state | backoff: backoff, poll_ref: pool_ref}} - {:noreply, %{state | backoff: backoff, poll_ref: pool_ref}} - else {:error, %Postgrex.Error{postgres: %{code: :object_in_use, message: msg}}} -> Logger.error("Error polling replication: :object_in_use") @@ -167,23 +165,6 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do defp convert_errors(_), do: nil - defp connect_db(host, port, name, user, pass, socket_opts, ssl_enforced) do - {host, port, name, user, pass} = decrypt_creds(host, port, name, user, pass) - - [ - hostname: host, - port: port, - database: name, - password: pass, - username: user, - queue_target: @queue_target, - parameters: [application_name: "realtime_rls"], - socket_options: socket_opts - ] - |> maybe_enforce_ssl_config(ssl_enforced) - |> Postgrex.start_link() - end - defp prepare_replication( %{backoff: backoff, conn: conn, slot_name: slot_name, retry_count: retry_count} = state ) do @@ -217,21 +198,11 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do }}, tenant ) do - rows - |> Enum.reduce([], fn row, acc -> - columns - |> Enum.zip(row) - |> generate_record() - |> then(fn - nil -> acc - record_struct -> [record_struct | acc] - end) - end) - |> Enum.reverse() - |> Enum.each(fn change -> + for row <- rows, + change <- columns |> Enum.zip(row) |> generate_record() |> List.wrap() do topic = "realtime:postgres:" <> tenant Phoenix.PubSub.broadcast_from(PubSub, self(), topic, change, MessageDispatcher) - end) + end {:ok, rows_count} end diff --git a/lib/realtime/helpers.ex b/lib/realtime/helpers.ex index e312dca1d..0e8875b8e 100644 --- a/lib/realtime/helpers.ex +++ b/lib/realtime/helpers.ex @@ -30,7 +30,8 @@ defmodule Realtime.Helpers do list(), non_neg_integer(), non_neg_integer(), - boolean() + boolean(), + String.t() ) :: {:ok, pid} | {:error, Postgrex.Error.t() | term()} def connect_db( @@ -42,15 +43,10 @@ defmodule Realtime.Helpers do socket_opts, pool \\ 5, queue_target \\ 5_000, - ssl_enforced \\ true + ssl_enforced \\ true, + application_name \\ "supabase_realtime" ) do - secure_key = Application.get_env(:realtime, :db_enc_key) - - host = decrypt!(host, secure_key) - port = decrypt!(port, secure_key) - name = decrypt!(name, secure_key) - pass = decrypt!(pass, secure_key) - user = decrypt!(user, secure_key) + {host, port, name, user, pass} = decrypt_creds(host, port, name, user, pass) [ hostname: host, @@ -61,7 +57,7 @@ defmodule Realtime.Helpers do pool_size: pool, queue_target: queue_target, parameters: [ - application_name: "supabase_realtime" + application_name: application_name ], socket_options: socket_opts ] diff --git a/test/realtime/cluster_strategy/postgres_test.exs b/test/realtime/cluster_strategy/postgres_test.exs index 8891326c0..1bc6aa681 100644 --- a/test/realtime/cluster_strategy/postgres_test.exs +++ b/test/realtime/cluster_strategy/postgres_test.exs @@ -24,7 +24,7 @@ defmodule Realtime.Cluster.Strategy.PostgresTest do {:ok, conn_notif} = PN.start_link(state.meta.opts.()) PN.listen(conn_notif, channel_name) node = "#{node()}" - assert_receive {:notification, _, _, channel_name, ^node} + assert_receive {:notification, _, _, ^channel_name, ^node} end defp libcluster_state() do