From f38e36e6bc5531ebd3f49ec6b3b7305c5fcd84a0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Filipe=20Caba=C3=A7o?= Date: Tue, 10 Oct 2023 00:47:33 +0100 Subject: [PATCH] change check to connect and refactor approach --- lib/realtime/application.ex | 2 +- lib/realtime/tenants/{check.ex => connect.ex} | 58 ++++++++----------- lib/realtime_web/channels/realtime_channel.ex | 4 +- 3 files changed, 28 insertions(+), 36 deletions(-) rename lib/realtime/tenants/{check.ex => connect.ex} (68%) diff --git a/lib/realtime/application.ex b/lib/realtime/application.ex index 97324756e..bf7b7845e 100644 --- a/lib/realtime/application.ex +++ b/lib/realtime/application.ex @@ -63,7 +63,7 @@ defmodule Realtime.Application do {Task.Supervisor, name: Realtime.TaskSupervisor}, Realtime.Latency, Realtime.Telemetry.Logger, - Realtime.Tenants.Check + Realtime.Tenants.Connect ] ++ extensions_supervisors() children = diff --git a/lib/realtime/tenants/check.ex b/lib/realtime/tenants/connect.ex similarity index 68% rename from lib/realtime/tenants/check.ex rename to lib/realtime/tenants/connect.ex index df78f8b69..1e15db206 100644 --- a/lib/realtime/tenants/check.ex +++ b/lib/realtime/tenants/connect.ex @@ -1,10 +1,14 @@ -defmodule Realtime.Tenants.Check do +defmodule Realtime.Tenants.Connect do use GenServer require Logger alias Realtime.Helpers + alias Realtime.PostgresCdc + @cdc "postgres_cdc_rls" + + @spec connection_status(binary()) :: {:ok, DBConnection.t()} | {:error, term()} def connection_status(tenant_id) do case get_status(tenant_id) do :undefined -> @@ -12,18 +16,12 @@ defmodule Realtime.Tenants.Check do node = Realtime.Nodes.get_node_for_tenant_id(tenant_id) case :rpc.call(node, __MODULE__, :set_status, [tenant_id]) do - :ok -> - case get_status(tenant_id) do - {_, %{healthy?: true}} -> :ok - {_, res} -> res - end - - error -> - error + :ok -> get_status(tenant_id) + error -> error end - {_, %{healthy?: true}} -> - :ok + {:ok, conn} -> + {:ok, conn} _ -> {:error, :tenant_database_unavailable} @@ -31,15 +29,15 @@ defmodule Realtime.Tenants.Check do end def set_status(tenant_id) do - __MODULE__ - |> Process.whereis() - |> Process.send({:set_status, tenant_id}, []) - + :ok = GenServer.cast(__MODULE__, {:set_status, tenant_id}) set_status_backoff(tenant_id) end def get_status(tenant_id) do - :syn.lookup(__MODULE__, tenant_id) + case :syn.lookup(__MODULE__, tenant_id) do + {_, %{conn: conn}} when not is_nil(conn) -> {:ok, conn} + error -> error + end end def start_link(_opts) do @@ -53,10 +51,9 @@ defmodule Realtime.Tenants.Check do {:noreply, state} end - def handle_info({:set_status, tenant_id}, state) do + def handle_cast({:set_status, tenant_id}, state) do res = check_tenant_connection(tenant_id) :ok = update_syn_with_conn_check(res, tenant_id) - Process.send_after(self(), {:set_status, tenant_id}, 500) {:noreply, state} end @@ -76,8 +73,8 @@ defmodule Realtime.Tenants.Check do defp update_syn_with_conn_check(res, tenant_id) do case res do - :ok -> :syn.register(__MODULE__, tenant_id, self(), %{healthy?: true}) - {:error, _} -> :syn.register(__MODULE__, tenant_id, self(), %{healthy?: false}) + {:ok, conn} -> :syn.register(__MODULE__, tenant_id, self(), %{conn: conn}) + {:error, _} -> :syn.register(__MODULE__, tenant_id, self(), %{conn: nil}) end end @@ -88,8 +85,8 @@ defmodule Realtime.Tenants.Check do {:error, :tenant_not_found} else tenant - |> then(& &1.extensions) - |> Enum.map(fn %{settings: settings} -> + |> then(&PostgresCdc.filter_settings(@cdc, &1.extensions)) + |> then(fn settings -> ssl_enforced = Helpers.default_ssl_param(settings) host = settings["db_host"] @@ -113,20 +110,15 @@ defmodule Realtime.Tenants.Check do with {:ok, conn} <- Helpers.connect_db(opts) do case Postgrex.query(conn, "SELECT 1", []) do - {:ok, _} -> {:ok, conn} - {:error, _} -> {:error, conn} + {:ok, _} -> + {:ok, conn} + + {:error, e} -> + Logger.error("Error connecting to tenant database: #{inspect(e)}") + {:error, :tenant_database_unavailable} end end end) - # This makes the connection fail - |> tap(fn res -> - Enum.each(res, fn {_, conn} -> Process.exit(conn, :normal) end) - end) - |> Enum.any?(fn res -> elem(res, 0) == :ok end) - |> then(fn - true -> :ok - false -> {:error, :tenant_database_unavailable} - end) end end end diff --git a/lib/realtime_web/channels/realtime_channel.ex b/lib/realtime_web/channels/realtime_channel.ex index 3b9cb2483..b10573e99 100644 --- a/lib/realtime_web/channels/realtime_channel.ex +++ b/lib/realtime_web/channels/realtime_channel.ex @@ -17,7 +17,7 @@ defmodule RealtimeWeb.RealtimeChannel do alias Realtime.RateCounter alias Realtime.SignalHandler alias Realtime.Tenants - alias Realtime.Tenants.Check + alias Realtime.Tenants.Connect alias RealtimeWeb.ChannelsAuthorization alias RealtimeWeb.Endpoint @@ -45,7 +45,7 @@ defmodule RealtimeWeb.RealtimeChannel do start_db_rate_counter(tenant) with false <- SignalHandler.shutdown_in_progress?(), - :ok <- Check.connection_status(tenant), + {:ok, _} <- Connect.connection_status(tenant), :ok <- limit_joins(socket), :ok <- limit_channels(socket), :ok <- limit_max_users(socket),