Skip to content

Commit

Permalink
change check to connect and refactor approach
Browse files Browse the repository at this point in the history
  • Loading branch information
filipecabaco committed Oct 10, 2023
1 parent 5b19444 commit f38e36e
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 36 deletions.
2 changes: 1 addition & 1 deletion lib/realtime/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ defmodule Realtime.Application do
{Task.Supervisor, name: Realtime.TaskSupervisor},
Realtime.Latency,
Realtime.Telemetry.Logger,
Realtime.Tenants.Check
Realtime.Tenants.Connect
] ++ extensions_supervisors()

children =
Expand Down
58 changes: 25 additions & 33 deletions lib/realtime/tenants/check.ex → lib/realtime/tenants/connect.ex
Original file line number Diff line number Diff line change
@@ -1,45 +1,43 @@
defmodule Realtime.Tenants.Check do
defmodule Realtime.Tenants.Connect do
use GenServer

require Logger

alias Realtime.Helpers
alias Realtime.PostgresCdc

@cdc "postgres_cdc_rls"

@spec connection_status(binary()) :: {:ok, DBConnection.t()} | {:error, term()}
def connection_status(tenant_id) do
case get_status(tenant_id) do
:undefined ->
:ok
node = Realtime.Nodes.get_node_for_tenant_id(tenant_id)

case :rpc.call(node, __MODULE__, :set_status, [tenant_id]) do
:ok ->
case get_status(tenant_id) do
{_, %{healthy?: true}} -> :ok
{_, res} -> res
end

error ->
error
:ok -> get_status(tenant_id)
error -> error
end

{_, %{healthy?: true}} ->
:ok
{:ok, conn} ->
{:ok, conn}

_ ->
{:error, :tenant_database_unavailable}
end
end

def set_status(tenant_id) do
__MODULE__
|> Process.whereis()
|> Process.send({:set_status, tenant_id}, [])

:ok = GenServer.cast(__MODULE__, {:set_status, tenant_id})
set_status_backoff(tenant_id)
end

def get_status(tenant_id) do
:syn.lookup(__MODULE__, tenant_id)
case :syn.lookup(__MODULE__, tenant_id) do
{_, %{conn: conn}} when not is_nil(conn) -> {:ok, conn}
error -> error
end
end

def start_link(_opts) do
Expand All @@ -53,10 +51,9 @@ defmodule Realtime.Tenants.Check do
{:noreply, state}
end

def handle_info({:set_status, tenant_id}, state) do
def handle_cast({:set_status, tenant_id}, state) do
res = check_tenant_connection(tenant_id)
:ok = update_syn_with_conn_check(res, tenant_id)
Process.send_after(self(), {:set_status, tenant_id}, 500)
{:noreply, state}
end

Expand All @@ -76,8 +73,8 @@ defmodule Realtime.Tenants.Check do

defp update_syn_with_conn_check(res, tenant_id) do
case res do
:ok -> :syn.register(__MODULE__, tenant_id, self(), %{healthy?: true})
{:error, _} -> :syn.register(__MODULE__, tenant_id, self(), %{healthy?: false})
{:ok, conn} -> :syn.register(__MODULE__, tenant_id, self(), %{conn: conn})
{:error, _} -> :syn.register(__MODULE__, tenant_id, self(), %{conn: nil})
end
end

Expand All @@ -88,8 +85,8 @@ defmodule Realtime.Tenants.Check do
{:error, :tenant_not_found}
else
tenant
|> then(& &1.extensions)
|> Enum.map(fn %{settings: settings} ->
|> then(&PostgresCdc.filter_settings(@cdc, &1.extensions))
|> then(fn settings ->
ssl_enforced = Helpers.default_ssl_param(settings)

host = settings["db_host"]
Expand All @@ -113,20 +110,15 @@ defmodule Realtime.Tenants.Check do

with {:ok, conn} <- Helpers.connect_db(opts) do
case Postgrex.query(conn, "SELECT 1", []) do
{:ok, _} -> {:ok, conn}
{:error, _} -> {:error, conn}
{:ok, _} ->
{:ok, conn}

{:error, e} ->
Logger.error("Error connecting to tenant database: #{inspect(e)}")
{:error, :tenant_database_unavailable}
end
end
end)
# This makes the connection fail
|> tap(fn res ->
Enum.each(res, fn {_, conn} -> Process.exit(conn, :normal) end)
end)
|> Enum.any?(fn res -> elem(res, 0) == :ok end)
|> then(fn
true -> :ok
false -> {:error, :tenant_database_unavailable}
end)
end
end
end
4 changes: 2 additions & 2 deletions lib/realtime_web/channels/realtime_channel.ex
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ defmodule RealtimeWeb.RealtimeChannel do
alias Realtime.RateCounter
alias Realtime.SignalHandler
alias Realtime.Tenants
alias Realtime.Tenants.Check
alias Realtime.Tenants.Connect

alias RealtimeWeb.ChannelsAuthorization
alias RealtimeWeb.Endpoint
Expand Down Expand Up @@ -45,7 +45,7 @@ defmodule RealtimeWeb.RealtimeChannel do
start_db_rate_counter(tenant)

with false <- SignalHandler.shutdown_in_progress?(),
:ok <- Check.connection_status(tenant),
{:ok, _} <- Connect.connection_status(tenant),
:ok <- limit_joins(socket),
:ok <- limit_channels(socket),
:ok <- limit_max_users(socket),
Expand Down

0 comments on commit f38e36e

Please sign in to comment.