From e1144db9a6b97b2fba2a948c982ecf8f55aeec9d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Filipe=20Caba=C3=A7o?= Date: Tue, 19 Sep 2023 23:48:32 +0100 Subject: [PATCH] Improve code complexity in Replication Poller --- .../postgres_cdc_rls/replication_poller.ex | 258 +++++++++--------- .../postgres_cdc_stream/cdc_stream.ex | 44 ++- 2 files changed, 146 insertions(+), 156 deletions(-) diff --git a/lib/extensions/postgres_cdc_rls/replication_poller.ex b/lib/extensions/postgres_cdc_rls/replication_poller.ex index b6a39f9f7..b6ad3f38a 100644 --- a/lib/extensions/postgres_cdc_rls/replication_poller.ex +++ b/lib/extensions/postgres_cdc_rls/replication_poller.ex @@ -11,15 +11,15 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do import Realtime.Helpers, only: [cancel_timer: 1, decrypt_creds: 5, default_ssl_param: 1, maybe_enforce_ssl_config: 2] - alias Extensions.PostgresCdcRls.{Replications, MessageDispatcher} alias DBConnection.Backoff - alias Realtime.PubSub - alias Realtime.Adapters.Changes.{ - DeletedRecord, - NewRecord, - UpdatedRecord - } + alias Extensions.PostgresCdcRls.MessageDispatcher + alias Extensions.PostgresCdcRls.Replications + + alias Realtime.Adapters.Changes.DeletedRecord + alias Realtime.Adapters.Changes.NewRecord + alias Realtime.Adapters.Changes.UpdatedRecord + alias Realtime.PubSub @queue_target 5_000 @@ -45,12 +45,7 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do tenant = args["id"] state = %{ - backoff: - Backoff.new( - backoff_min: 100, - backoff_max: 5_000, - backoff_type: :rand_exp - ), + backoff: Backoff.new(backoff_min: 100, backoff_max: 5_000, backoff_type: :rand_exp), conn: conn, db_host: args["db_host"], db_port: args["db_port"], @@ -99,66 +94,18 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do cancel_timer(poll_ref) cancel_timer(retry_ref) - try do - {time, response} = - :timer.tc(Replications, :list_changes, [ - conn, - slot_name, - publication, - max_changes, - max_record_bytes - ]) - - Realtime.Telemetry.execute( - [:realtime, :replication, :poller, :query, :stop], - %{duration: time}, - %{tenant: tenant} + broadcast_count = + conn + |> list_changes_with_telemetry( + slot_name, + publication, + max_changes, + max_record_bytes, + tenant ) + |> handle_list_changes_result(tenant) - response - catch - {:error, reason} -> - {:error, reason} - end - |> case do - {:ok, - %Postgrex.Result{ - columns: ["wal", "is_rls_enabled", "subscription_ids", "errors"] = columns, - rows: [_ | _] = rows, - num_rows: rows_count - }} -> - Enum.reduce(rows, [], fn row, acc -> - columns - |> Enum.zip(row) - |> generate_record() - |> case do - nil -> - acc - - record_struct -> - [record_struct | acc] - end - end) - |> Enum.reverse() - |> Enum.each(fn change -> - Phoenix.PubSub.broadcast_from( - PubSub, - self(), - "realtime:postgres:" <> tenant, - change, - MessageDispatcher - ) - end) - - {:ok, rows_count} - - {:ok, _} -> - {:ok, 0} - - {:error, reason} -> - {:error, reason} - end - |> case do + case broadcast_count do {:ok, rows_num} -> backoff = Backoff.reset(backoff) @@ -186,14 +133,9 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do if retry_count > 3 do case Replications.terminate_backend(conn, slot_name) do - {:ok, :terminated} -> - Logger.warn("Replication slot in use - terminating") - - {:error, :slot_not_found} -> - Logger.warn("Replication slot not found") - - {:error, error} -> - Logger.warn("Error terminating backend: #{inspect(error)}") + {:ok, :terminated} -> Logger.warn("Replication slot in use - terminating") + {:error, :slot_not_found} -> Logger.warn("Replication slot not found") + {:error, error} -> Logger.warn("Error terminating backend: #{inspect(error)}") end end @@ -220,6 +162,114 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do {:noreply, prepare_replication(state)} end + def slot_name_suffix() do + case System.get_env("SLOT_NAME_SUFFIX") do + nil -> + "" + + value -> + Logger.debug("Using slot name suffix: " <> value) + "_" <> value + end + end + + defp convert_errors([_ | _] = errors), do: errors + + defp convert_errors(_), do: nil + + defp connect_db(host, port, name, user, pass, socket_opts, ssl_enforced) do + {host, port, name, user, pass} = decrypt_creds(host, port, name, user, pass) + + [ + hostname: host, + port: port, + database: name, + password: pass, + username: user, + queue_target: @queue_target, + parameters: [application_name: "realtime_rls"], + socket_options: socket_opts + ] + |> maybe_enforce_ssl_config(ssl_enforced) + |> Postgrex.start_link() + end + + defp prepare_replication( + %{backoff: backoff, conn: conn, slot_name: slot_name, retry_count: retry_count} = state + ) do + case Replications.prepare_replication(conn, slot_name) do + {:ok, _} -> + send(self(), :poll) + state + + {:error, error} -> + Logger.error("Prepare replication error: #{inspect(error)}") + {timeout, backoff} = Backoff.backoff(backoff) + retry_ref = Process.send_after(self(), :retry, timeout) + %{state | backoff: backoff, retry_ref: retry_ref, retry_count: retry_count + 1} + end + end + + defp list_changes_with_telemetry( + conn, + slot_name, + publication, + max_changes, + max_record_bytes, + tenant + ) do + args = [ + conn, + slot_name, + publication, + max_changes, + max_record_bytes + ] + + {time, response} = :timer.tc(Replications, :list_changes, args) + + Realtime.Telemetry.execute( + [:realtime, :replication, :poller, :query, :stop], + %{duration: time}, + %{tenant: tenant} + ) + + response + catch + {:error, reason} -> {:error, reason} + end + + defp handle_list_changes_result( + {:ok, + %Postgrex.Result{ + columns: ["wal", "is_rls_enabled", "subscription_ids", "errors"] = columns, + rows: [_ | _] = rows, + num_rows: rows_count + }}, + tenant + ) do + rows + |> Enum.reduce([], fn row, acc -> + columns + |> Enum.zip(row) + |> generate_record() + |> then(fn + nil -> acc + record_struct -> [record_struct | acc] + end) + end) + |> Enum.reverse() + |> Enum.each(fn change -> + topic = "realtime:postgres:" <> tenant + Phoenix.PubSub.broadcast_from(PubSub, self(), topic, change, MessageDispatcher) + end) + + {:ok, rows_count} + end + + defp handle_list_changes_result({:ok, _}, _), do: {:ok, 0} + defp handle_list_changes_result({:error, reason}, _), do: {:error, reason} + def generate_record([ {"wal", %{ @@ -294,54 +344,4 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do end def generate_record(_), do: nil - - def slot_name_suffix() do - case System.get_env("SLOT_NAME_SUFFIX") do - nil -> - "" - - value -> - Logger.debug("Using slot name suffix: " <> value) - "_" <> value - end - end - - defp convert_errors([_ | _] = errors), do: errors - - defp convert_errors(_), do: nil - - defp connect_db(host, port, name, user, pass, socket_opts, ssl_enforced) do - {host, port, name, user, pass} = decrypt_creds(host, port, name, user, pass) - - [ - hostname: host, - port: port, - database: name, - password: pass, - username: user, - queue_target: @queue_target, - parameters: [ - application_name: "realtime_rls" - ], - socket_options: socket_opts - ] - |> maybe_enforce_ssl_config(ssl_enforced) - |> Postgrex.start_link() - end - - defp prepare_replication( - %{backoff: backoff, conn: conn, slot_name: slot_name, retry_count: retry_count} = state - ) do - case Replications.prepare_replication(conn, slot_name) do - {:ok, _} -> - send(self(), :poll) - state - - {:error, error} -> - Logger.error("Prepare replication error: #{inspect(error)}") - {timeout, backoff} = Backoff.backoff(backoff) - retry_ref = Process.send_after(self(), :retry, timeout) - %{state | backoff: backoff, retry_ref: retry_ref, retry_count: retry_count + 1} - end - end end diff --git a/lib/extensions/postgres_cdc_stream/cdc_stream.ex b/lib/extensions/postgres_cdc_stream/cdc_stream.ex index fc0a466e7..c63962636 100644 --- a/lib/extensions/postgres_cdc_stream/cdc_stream.ex +++ b/lib/extensions/postgres_cdc_stream/cdc_stream.ex @@ -9,8 +9,7 @@ defmodule Extensions.PostgresCdcStream do def handle_connect(opts) do Enum.reduce_while(1..5, nil, fn retry, acc -> - get_manager_conn(opts["id"]) - |> case do + case get_manager_conn(opts["id"]) do nil -> start_distributed(opts) if retry > 1, do: Process.sleep(1_000) @@ -22,13 +21,12 @@ defmodule Extensions.PostgresCdcStream do end) end - def handle_after_connect(_, _, _) do - {:ok, nil} - end + def handle_after_connect(_, _, _), do: {:ok, nil} def handle_subscribe(pg_change_params, tenant, metadata) do Enum.each(pg_change_params, fn e -> - topic(tenant, e.params) + tenant + |> topic(e.params) |> RealtimeWeb.Endpoint.subscribe(metadata) end) end @@ -45,13 +43,9 @@ defmodule Extensions.PostgresCdcStream do @spec get_manager_conn(String.t()) :: nil | {:ok, pid(), pid()} def get_manager_conn(id) do - Phoenix.Tracker.get_by_key(Stream.Tracker, "postgres_cdc_stream", id) - |> case do - [] -> - nil - - [{_, %{manager_pid: pid, conn: conn}}] -> - {:ok, pid, conn} + case Phoenix.Tracker.get_by_key(Stream.Tracker, "postgres_cdc_stream", id) do + [] -> nil + [{_, %{manager_pid: pid, conn: conn}}] -> {:ok, pid, conn} end end @@ -81,27 +75,23 @@ defmodule Extensions.PostgresCdcStream do def start(args) do addrtype = case args["ip_version"] do - 6 -> - :inet6 - - _ -> - :inet + 6 -> :inet6 + _ -> :inet end - args = - Map.merge(args, %{ - "db_socket_opts" => [addrtype] - }) + args = Map.merge(args, %{"db_socket_opts" => [addrtype]}) Logger.debug("Starting postgres stream extension with args: #{inspect(args, pretty: true)}") + opts = %{ + id: args["id"], + start: {Stream.WorkerSupervisor, :start_link, [args]}, + restart: :transient + } + DynamicSupervisor.start_child( {:via, PartitionSupervisor, {Stream.DynamicSupervisor, self()}}, - %{ - id: args["id"], - start: {Stream.WorkerSupervisor, :start_link, [args]}, - restart: :transient - } + opts ) end