Skip to content

Commit

Permalink
feat: Connect to tenant database on channel join
Browse files Browse the repository at this point in the history
To improve stability we'll connect to the Tenant database upon channel.
  • Loading branch information
filipecabaco committed Sep 22, 2023
1 parent 43e1278 commit c831d32
Show file tree
Hide file tree
Showing 11 changed files with 445 additions and 367 deletions.
44 changes: 17 additions & 27 deletions lib/extensions/postgres_cdc_stream/cdc_stream.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@ defmodule Extensions.PostgresCdcStream do

def handle_connect(opts) do
Enum.reduce_while(1..5, nil, fn retry, acc ->
get_manager_conn(opts["id"])
|> case do
case get_manager_conn(opts["id"]) do
nil ->
start_distributed(opts)
if retry > 1, do: Process.sleep(1_000)
Expand All @@ -22,13 +21,12 @@ defmodule Extensions.PostgresCdcStream do
end)
end

def handle_after_connect(_, _, _) do
{:ok, nil}
end
def handle_after_connect(_, _, _), do: {:ok, nil}

def handle_subscribe(pg_change_params, tenant, metadata) do
Enum.each(pg_change_params, fn e ->
topic(tenant, e.params)
tenant
|> topic(e.params)
|> RealtimeWeb.Endpoint.subscribe(metadata)
end)
end
Expand All @@ -45,13 +43,9 @@ defmodule Extensions.PostgresCdcStream do

@spec get_manager_conn(String.t()) :: nil | {:ok, pid(), pid()}
def get_manager_conn(id) do
Phoenix.Tracker.get_by_key(Stream.Tracker, "postgres_cdc_stream", id)
|> case do
[] ->
nil

[{_, %{manager_pid: pid, conn: conn}}] ->
{:ok, pid, conn}
case Phoenix.Tracker.get_by_key(Stream.Tracker, "postgres_cdc_stream", id) do
[] -> nil
[{_, %{manager_pid: pid, conn: conn}}] -> {:ok, pid, conn}
end
end

Expand Down Expand Up @@ -81,27 +75,23 @@ defmodule Extensions.PostgresCdcStream do
def start(args) do
addrtype =
case args["ip_version"] do
6 ->
:inet6

_ ->
:inet
6 -> :inet6
_ -> :inet
end

args =
Map.merge(args, %{
"db_socket_opts" => [addrtype]
})
args = Map.merge(args, %{"db_socket_opts" => [addrtype]})

Logger.debug("Starting postgres stream extension with args: #{inspect(args, pretty: true)}")

opts = %{
id: args["id"],
start: {Stream.WorkerSupervisor, :start_link, [args]},
restart: :transient
}

DynamicSupervisor.start_child(
{:via, PartitionSupervisor, {Stream.DynamicSupervisor, self()}},
%{
id: args["id"],
start: {Stream.WorkerSupervisor, :start_link, [args]},
restart: :transient
}
opts
)
end

Expand Down
25 changes: 25 additions & 0 deletions lib/realtime/helpers.ex
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,31 @@ defmodule Realtime.Helpers do
|> unpad()
end

@spec connect_db(%{
host: binary(),
port: non_neg_integer(),
name: binary(),
user: binary(),
pass: binary(),
socket_opts: list(),
pool: pos_integer(),
queue_target: pos_integer(),
ssl_enforced: boolean()
}) :: {:ok, pid} | {:error, Postgrex.Error.t() | term()}
def connect_db(%{
host: host,
port: port,
name: name,
user: user,
pass: pass,
socket_opts: socket_opts,
pool: pool,
queue_target: queue_target,
ssl_enforced: ssl_enforced
}) do
connect_db(host, port, name, user, pass, socket_opts, pool, queue_target, ssl_enforced)
end

@spec connect_db(
String.t(),
String.t(),
Expand Down
Loading

0 comments on commit c831d32

Please sign in to comment.