diff --git a/lib/realtime/nodes.ex b/lib/realtime/nodes.ex index cf64aeb24..19d83e78c 100644 --- a/lib/realtime/nodes.ex +++ b/lib/realtime/nodes.ex @@ -4,17 +4,30 @@ defmodule Realtime.Nodes do """ require Logger alias Realtime.Tenants + alias Realtime.Api.Tenant @doc """ - Gets the node to launch the Postgres connection on for a tenant. + Gets the node to launch the Postgres connection on for a tenant id. """ @spec get_node_for_tenant_id(String.t()) :: node() def get_node_for_tenant_id(tenant_id) do - with %{extensions: extensions} <- Tenants.get_tenant_by_external_id(tenant_id), - region <- get_region(extensions), + with tenant when not is_nil(tenant) <- Tenants.get_tenant_by_external_id(tenant_id), + {:ok, node} <- get_node_for_tenant(tenant) do + node + end + end + + @doc """ + Gets the node to launch the Postgres connection on for a tenant. + """ + @spec get_node_for_tenant(Tenant.t()) :: {:ok, node()} | {:error, term()} + def get_node_for_tenant(nil), do: {:error, :tenant_not_found} + + def get_node_for_tenant(%Tenant{extensions: extensions, external_id: tenant_id}) do + with region <- get_region(extensions), tenant_region <- platform_region_translator(region), node <- launch_node(tenant_id, tenant_region, node()) do - node + {:ok, node} end end diff --git a/lib/realtime/tenants/connect.ex b/lib/realtime/tenants/connect.ex index 1e15db206..8cd0dc74f 100644 --- a/lib/realtime/tenants/connect.ex +++ b/lib/realtime/tenants/connect.ex @@ -5,26 +5,16 @@ defmodule Realtime.Tenants.Connect do alias Realtime.Helpers alias Realtime.PostgresCdc + alias Realtime.Tenants @cdc "postgres_cdc_rls" @spec connection_status(binary()) :: {:ok, DBConnection.t()} | {:error, term()} def connection_status(tenant_id) do case get_status(tenant_id) do - :undefined -> - :ok - node = Realtime.Nodes.get_node_for_tenant_id(tenant_id) - - case :rpc.call(node, __MODULE__, :set_status, [tenant_id]) do - :ok -> get_status(tenant_id) - error -> error - end - - {:ok, conn} -> - {:ok, conn} - - _ -> - {:error, :tenant_database_unavailable} + :undefined -> call_external_node(tenant_id) + {:ok, conn} -> {:ok, conn} + _ -> {:error, :tenant_database_unavailable} end end @@ -40,12 +30,11 @@ defmodule Realtime.Tenants.Connect do end end - def start_link(_opts) do - GenServer.start_link(__MODULE__, %{}, name: __MODULE__) - end + def start_link(_opts), do: GenServer.start_link(__MODULE__, %{}, name: __MODULE__) def init(state), do: {:ok, state, {:continue, :setup_syn}} + ## GenServer callbacks def handle_continue(:setup_syn, state) do :ok = :syn.add_node_to_scopes([__MODULE__]) {:noreply, state} @@ -57,6 +46,14 @@ defmodule Realtime.Tenants.Connect do {:noreply, state} end + defp call_external_node(tenant_id) do + with tenant <- Tenants.Cache.get_tenant_by_external_id(tenant_id), + {:ok, node} <- Realtime.Nodes.get_node_for_tenant(tenant), + :ok <- :rpc.call(node, __MODULE__, :set_status, [tenant_id]) do + get_status(tenant_id) + end + end + defp set_status_backoff(tenant_id, times \\ 5, backoff \\ 500) defp set_status_backoff(_, 0, _), do: {:error, :tenant_database_unavailable}