Skip to content

Commit

Permalink
Improve code complexity in Replication Poller
Browse files Browse the repository at this point in the history
  • Loading branch information
filipecabaco committed Sep 21, 2023
1 parent 00e5c0c commit b804b0c
Show file tree
Hide file tree
Showing 8 changed files with 256 additions and 187 deletions.
271 changes: 135 additions & 136 deletions lib/extensions/postgres_cdc_rls/replication_poller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,15 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do
import Realtime.Helpers,
only: [cancel_timer: 1, decrypt_creds: 5, default_ssl_param: 1, maybe_enforce_ssl_config: 2]

alias Extensions.PostgresCdcRls.{Replications, MessageDispatcher}
alias DBConnection.Backoff
alias Realtime.PubSub

alias Realtime.Adapters.Changes.{
DeletedRecord,
NewRecord,
UpdatedRecord
}
alias Extensions.PostgresCdcRls.MessageDispatcher
alias Extensions.PostgresCdcRls.Replications

alias Realtime.Adapters.Changes.DeletedRecord
alias Realtime.Adapters.Changes.NewRecord
alias Realtime.Adapters.Changes.UpdatedRecord
alias Realtime.PubSub

@queue_target 5_000

Expand All @@ -45,12 +45,7 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do
tenant = args["id"]

state = %{
backoff:
Backoff.new(
backoff_min: 100,
backoff_max: 5_000,
backoff_type: :rand_exp
),
backoff: Backoff.new(backoff_min: 100, backoff_max: 5_000, backoff_type: :rand_exp),
conn: conn,
db_host: args["db_host"],
db_port: args["db_port"],
Expand Down Expand Up @@ -99,76 +94,27 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do
cancel_timer(poll_ref)
cancel_timer(retry_ref)

try do
{time, response} =
:timer.tc(Replications, :list_changes, [
conn,
slot_name,
publication,
max_changes,
max_record_bytes
])

Realtime.Telemetry.execute(
[:realtime, :replication, :poller, :query, :stop],
%{duration: time},
%{tenant: tenant}
broadcast_count =
conn
|> list_changes_with_telemetry(
slot_name,
publication,
max_changes,
max_record_bytes,
tenant
)
|> handle_list_changes_result(tenant)

response
catch
{:error, reason} ->
{:error, reason}
end
|> case do
{:ok,
%Postgrex.Result{
columns: ["wal", "is_rls_enabled", "subscription_ids", "errors"] = columns,
rows: [_ | _] = rows,
num_rows: rows_count
}} ->
Enum.reduce(rows, [], fn row, acc ->
columns
|> Enum.zip(row)
|> generate_record()
|> case do
nil ->
acc

record_struct ->
[record_struct | acc]
end
end)
|> Enum.reverse()
|> Enum.each(fn change ->
Phoenix.PubSub.broadcast_from(
PubSub,
self(),
"realtime:postgres:" <> tenant,
change,
MessageDispatcher
)
end)

{:ok, rows_count}
case broadcast_count do
{:ok, 0} ->
backoff = Backoff.reset(backoff)
send(self(), :poll)

{:ok, _} ->
{:ok, 0}
{:noreply, %{state | backoff: backoff, poll_ref: nil}}

{:error, reason} ->
{:error, reason}
end
|> case do
{:ok, rows_num} ->
{:ok, _} ->
backoff = Backoff.reset(backoff)

poll_ref =
if rows_num > 0 do
send(self(), :poll)
nil
else
Process.send_after(self(), :poll, poll_interval_ms)
end
poll_ref = Process.send_after(self(), :poll, poll_interval_ms)

{:noreply, %{state | backoff: backoff, poll_ref: poll_ref}}

Expand All @@ -186,14 +132,9 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do

if retry_count > 3 do
case Replications.terminate_backend(conn, slot_name) do
{:ok, :terminated} ->
Logger.warn("Replication slot in use - terminating")

{:error, :slot_not_found} ->
Logger.warn("Replication slot not found")

{:error, error} ->
Logger.warn("Error terminating backend: #{inspect(error)}")
{:ok, :terminated} -> Logger.warn("Replication slot in use - terminating")
{:error, :slot_not_found} -> Logger.warn("Replication slot not found")
{:error, error} -> Logger.warn("Error terminating backend: #{inspect(error)}")
end
end

Expand All @@ -220,6 +161,114 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do
{:noreply, prepare_replication(state)}
end

def slot_name_suffix() do
case System.get_env("SLOT_NAME_SUFFIX") do
nil ->
""

value ->
Logger.debug("Using slot name suffix: " <> value)
"_" <> value
end
end

defp convert_errors([_ | _] = errors), do: errors

defp convert_errors(_), do: nil

defp connect_db(host, port, name, user, pass, socket_opts, ssl_enforced) do
{host, port, name, user, pass} = decrypt_creds(host, port, name, user, pass)

[
hostname: host,
port: port,
database: name,
password: pass,
username: user,
queue_target: @queue_target,
parameters: [application_name: "realtime_rls"],
socket_options: socket_opts
]
|> maybe_enforce_ssl_config(ssl_enforced)
|> Postgrex.start_link()
end

defp prepare_replication(
%{backoff: backoff, conn: conn, slot_name: slot_name, retry_count: retry_count} = state
) do
case Replications.prepare_replication(conn, slot_name) do
{:ok, _} ->
send(self(), :poll)
state

{:error, error} ->
Logger.error("Prepare replication error: #{inspect(error)}")
{timeout, backoff} = Backoff.backoff(backoff)
retry_ref = Process.send_after(self(), :retry, timeout)
%{state | backoff: backoff, retry_ref: retry_ref, retry_count: retry_count + 1}
end
end

defp list_changes_with_telemetry(
conn,
slot_name,
publication,
max_changes,
max_record_bytes,
tenant
) do
args = [
conn,
slot_name,
publication,
max_changes,
max_record_bytes
]

{time, response} = :timer.tc(Replications, :list_changes, args)

Realtime.Telemetry.execute(
[:realtime, :replication, :poller, :query, :stop],
%{duration: time},
%{tenant: tenant}
)

response
catch
{:error, reason} -> {:error, reason}
end

defp handle_list_changes_result(
{:ok,
%Postgrex.Result{
columns: ["wal", "is_rls_enabled", "subscription_ids", "errors"] = columns,
rows: [_ | _] = rows,
num_rows: rows_count
}},
tenant
) do
rows
|> Enum.reduce([], fn row, acc ->
columns
|> Enum.zip(row)
|> generate_record()
|> then(fn
nil -> acc
record_struct -> [record_struct | acc]
end)
end)
|> Enum.reverse()
|> Enum.each(fn change ->
topic = "realtime:postgres:" <> tenant
Phoenix.PubSub.broadcast_from(PubSub, self(), topic, change, MessageDispatcher)
end)

{:ok, rows_count}
end

defp handle_list_changes_result({:ok, _}, _), do: {:ok, 0}
defp handle_list_changes_result({:error, reason}, _), do: {:error, reason}

def generate_record([
{"wal",
%{
Expand Down Expand Up @@ -294,54 +343,4 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do
end

def generate_record(_), do: nil

def slot_name_suffix() do
case System.get_env("SLOT_NAME_SUFFIX") do
nil ->
""

value ->
Logger.debug("Using slot name suffix: " <> value)
"_" <> value
end
end

defp convert_errors([_ | _] = errors), do: errors

defp convert_errors(_), do: nil

defp connect_db(host, port, name, user, pass, socket_opts, ssl_enforced) do
{host, port, name, user, pass} = decrypt_creds(host, port, name, user, pass)

[
hostname: host,
port: port,
database: name,
password: pass,
username: user,
queue_target: @queue_target,
parameters: [
application_name: "realtime_rls"
],
socket_options: socket_opts
]
|> maybe_enforce_ssl_config(ssl_enforced)
|> Postgrex.start_link()
end

defp prepare_replication(
%{backoff: backoff, conn: conn, slot_name: slot_name, retry_count: retry_count} = state
) do
case Replications.prepare_replication(conn, slot_name) do
{:ok, _} ->
send(self(), :poll)
state

{:error, error} ->
Logger.error("Prepare replication error: #{inspect(error)}")
{timeout, backoff} = Backoff.backoff(backoff)
retry_ref = Process.send_after(self(), :retry, timeout)
%{state | backoff: backoff, retry_ref: retry_ref, retry_count: retry_count + 1}
end
end
end
44 changes: 17 additions & 27 deletions lib/extensions/postgres_cdc_stream/cdc_stream.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@ defmodule Extensions.PostgresCdcStream do

def handle_connect(opts) do
Enum.reduce_while(1..5, nil, fn retry, acc ->
get_manager_conn(opts["id"])
|> case do
case get_manager_conn(opts["id"]) do
nil ->
start_distributed(opts)
if retry > 1, do: Process.sleep(1_000)
Expand All @@ -22,13 +21,12 @@ defmodule Extensions.PostgresCdcStream do
end)
end

def handle_after_connect(_, _, _) do
{:ok, nil}
end
def handle_after_connect(_, _, _), do: {:ok, nil}

def handle_subscribe(pg_change_params, tenant, metadata) do
Enum.each(pg_change_params, fn e ->
topic(tenant, e.params)
tenant
|> topic(e.params)
|> RealtimeWeb.Endpoint.subscribe(metadata)
end)
end
Expand All @@ -45,13 +43,9 @@ defmodule Extensions.PostgresCdcStream do

@spec get_manager_conn(String.t()) :: nil | {:ok, pid(), pid()}
def get_manager_conn(id) do
Phoenix.Tracker.get_by_key(Stream.Tracker, "postgres_cdc_stream", id)
|> case do
[] ->
nil

[{_, %{manager_pid: pid, conn: conn}}] ->
{:ok, pid, conn}
case Phoenix.Tracker.get_by_key(Stream.Tracker, "postgres_cdc_stream", id) do
[] -> nil
[{_, %{manager_pid: pid, conn: conn}}] -> {:ok, pid, conn}
end
end

Expand Down Expand Up @@ -81,27 +75,23 @@ defmodule Extensions.PostgresCdcStream do
def start(args) do
addrtype =
case args["ip_version"] do
6 ->
:inet6

_ ->
:inet
6 -> :inet6
_ -> :inet
end

args =
Map.merge(args, %{
"db_socket_opts" => [addrtype]
})
args = Map.merge(args, %{"db_socket_opts" => [addrtype]})

Logger.debug("Starting postgres stream extension with args: #{inspect(args, pretty: true)}")

opts = %{
id: args["id"],
start: {Stream.WorkerSupervisor, :start_link, [args]},
restart: :transient
}

DynamicSupervisor.start_child(
{:via, PartitionSupervisor, {Stream.DynamicSupervisor, self()}},
%{
id: args["id"],
start: {Stream.WorkerSupervisor, :start_link, [args]},
restart: :transient
}
opts
)
end

Expand Down
Loading

0 comments on commit b804b0c

Please sign in to comment.