Skip to content

Commit

Permalink
PR feedback first pass
Browse files Browse the repository at this point in the history
  • Loading branch information
filipecabaco committed Oct 10, 2023
1 parent 9a742c8 commit 5b19444
Show file tree
Hide file tree
Showing 6 changed files with 35 additions and 30 deletions.
2 changes: 1 addition & 1 deletion config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ config :realtime, :extensions,
},
postgres_cdc_stream: %{
type: :postgres_cdc,
key: "postgres_cdc_rls_stream",
key: "postgres_cdc_stream",
driver: Extensions.PostgresCdcStream,
supervisor: Extensions.PostgresCdcStream.Supervisor,
db_settings: Extensions.PostgresCdcStream.DbSettings
Expand Down
45 changes: 26 additions & 19 deletions lib/extensions/postgres_cdc_stream/cdc_stream.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@ defmodule Extensions.PostgresCdcStream do

def handle_connect(opts) do
Enum.reduce_while(1..5, nil, fn retry, acc ->
case get_manager_conn(opts["id"]) do
nil ->
get_manager_conn(opts["id"])
|> case do
{:error, nil} ->
start_distributed(opts)
if retry > 1, do: Process.sleep(1_000)
{:cont, acc}
Expand All @@ -20,12 +21,13 @@ defmodule Extensions.PostgresCdcStream do
end)
end

def handle_after_connect(_, _, _), do: {:ok, nil}
def handle_after_connect(_, _, _) do
{:ok, nil}
end

def handle_subscribe(pg_change_params, tenant, metadata) do
Enum.each(pg_change_params, fn e ->
tenant
|> topic(e.params)
topic(tenant, e.params)
|> RealtimeWeb.Endpoint.subscribe(metadata)
end)
end
Expand All @@ -40,10 +42,11 @@ defmodule Extensions.PostgresCdcStream do
end
end

@spec get_manager_conn(String.t()) :: nil | {:ok, pid(), pid()}
@spec get_manager_conn(String.t()) :: {:error, nil} | {:ok, pid(), pid()}
def get_manager_conn(id) do
case Phoenix.Tracker.get_by_key(Stream.Tracker, "postgres_cdc_rls_stream", id) do
[] -> nil
Phoenix.Tracker.get_by_key(Stream.Tracker, "postgres_cdc_stream", id)
|> case do
[] -> {:error, nil}
[{_, %{manager_pid: pid, conn: conn}}] -> {:ok, pid, conn}
end
end
Expand Down Expand Up @@ -74,23 +77,27 @@ defmodule Extensions.PostgresCdcStream do
def start(args) do
addrtype =
case args["ip_version"] do
6 -> :inet6
_ -> :inet
6 ->
:inet6

_ ->
:inet
end

args = Map.merge(args, %{"db_socket_opts" => [addrtype]})
args =
Map.merge(args, %{
"db_socket_opts" => [addrtype]
})

Logger.debug("Starting postgres stream extension with args: #{inspect(args, pretty: true)}")

opts = %{
id: args["id"],
start: {Stream.WorkerSupervisor, :start_link, [args]},
restart: :transient
}

DynamicSupervisor.start_child(
{:via, PartitionSupervisor, {Stream.DynamicSupervisor, self()}},
opts
%{
id: args["id"],
start: {Stream.WorkerSupervisor, :start_link, [args]},
restart: :transient
}
)
end

Expand All @@ -102,7 +109,7 @@ defmodule Extensions.PostgresCdcStream do
Phoenix.Tracker.track(
Stream.Tracker,
self(),
"postgres_cdc_rls_stream",
"postgres_cdc_stream",
id,
%{
conn: conn,
Expand Down
4 changes: 2 additions & 2 deletions lib/extensions/postgres_cdc_stream/tracker.ex
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ defmodule Extensions.PostgresCdcStream.Tracker do
for {_topic, {_joins, leaves}} <- diff do
for {id, _meta} <- leaves do
Endpoint.local_broadcast(
"postgres_cdc_rls:" <> id,
"postgres_cdc_rls_down",
"postgres_cdc:" <> id,
"postgres_cdc_down",
nil
)
end
Expand Down
5 changes: 0 additions & 5 deletions lib/realtime/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,6 @@ defmodule Realtime.Application do
name: Realtime.Registry.Unique
)

Registry.start_link(
keys: :unique,
name: Realtime.Registry.Tenant
)

:syn.add_node_to_scopes([:users, RegionNodes])
region = Application.get_env(:realtime, :region)
:syn.join(RegionNodes, region, self(), node: node())
Expand Down
6 changes: 4 additions & 2 deletions lib/realtime/tenants/check.ex
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@ defmodule Realtime.Tenants.Check do

case :rpc.call(node, __MODULE__, :set_status, [tenant_id]) do
:ok ->
{_, res} = get_status(tenant_id)
res
case get_status(tenant_id) do
{_, %{healthy?: true}} -> :ok
{_, res} -> res
end

error ->
error
Expand Down
3 changes: 2 additions & 1 deletion lib/realtime_web/channels/realtime_channel.ex
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,8 @@ defmodule RealtimeWeb.RealtimeChannel do
{:error, :too_many_joins}

other ->
Logger.error("Unexpected error for " <> tenant <> ": " <> inspect(other))
Logger.error("Unexpected error: " <> inspect(other))

{:error, other}
end
end
Expand Down

0 comments on commit 5b19444

Please sign in to comment.