Skip to content

Commit

Permalink
Use tenant cache for node fetching
Browse files Browse the repository at this point in the history
  • Loading branch information
filipecabaco committed Oct 10, 2023
1 parent f38e36e commit 11e4a79
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 21 deletions.
21 changes: 17 additions & 4 deletions lib/realtime/nodes.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,30 @@ defmodule Realtime.Nodes do
"""
require Logger
alias Realtime.Tenants
alias Realtime.Api.Tenant

@doc """
Gets the node to launch the Postgres connection on for a tenant.
Gets the node to launch the Postgres connection on for a tenant id.
"""
@spec get_node_for_tenant_id(String.t()) :: node()
def get_node_for_tenant_id(tenant_id) do
with %{extensions: extensions} <- Tenants.get_tenant_by_external_id(tenant_id),
region <- get_region(extensions),
with tenant when not is_nil(tenant) <- Tenants.get_tenant_by_external_id(tenant_id),
{:ok, node} <- get_node_for_tenant(tenant) do
node
end
end

@doc """
Gets the node to launch the Postgres connection on for a tenant.
"""
@spec get_node_for_tenant(Tenant.t()) :: {:ok, node()} | {:error, term()}
def get_node_for_tenant(nil), do: {:error, :tenant_not_found}

def get_node_for_tenant(%Tenant{extensions: extensions, external_id: tenant_id}) do
with region <- get_region(extensions),
tenant_region <- platform_region_translator(region),
node <- launch_node(tenant_id, tenant_region, node()) do
node
{:ok, node}
end
end

Expand Down
31 changes: 14 additions & 17 deletions lib/realtime/tenants/connect.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,26 +5,16 @@ defmodule Realtime.Tenants.Connect do

alias Realtime.Helpers
alias Realtime.PostgresCdc
alias Realtime.Tenants

@cdc "postgres_cdc_rls"

@spec connection_status(binary()) :: {:ok, DBConnection.t()} | {:error, term()}
def connection_status(tenant_id) do
case get_status(tenant_id) do
:undefined ->
:ok
node = Realtime.Nodes.get_node_for_tenant_id(tenant_id)

case :rpc.call(node, __MODULE__, :set_status, [tenant_id]) do
:ok -> get_status(tenant_id)
error -> error
end

{:ok, conn} ->
{:ok, conn}

_ ->
{:error, :tenant_database_unavailable}
:undefined -> call_external_node(tenant_id)
{:ok, conn} -> {:ok, conn}
_ -> {:error, :tenant_database_unavailable}
end
end

Expand All @@ -40,12 +30,11 @@ defmodule Realtime.Tenants.Connect do
end
end

def start_link(_opts) do
GenServer.start_link(__MODULE__, %{}, name: __MODULE__)
end
def start_link(_opts), do: GenServer.start_link(__MODULE__, %{}, name: __MODULE__)

def init(state), do: {:ok, state, {:continue, :setup_syn}}

## GenServer callbacks
def handle_continue(:setup_syn, state) do
:ok = :syn.add_node_to_scopes([__MODULE__])
{:noreply, state}
Expand All @@ -57,6 +46,14 @@ defmodule Realtime.Tenants.Connect do
{:noreply, state}
end

defp call_external_node(tenant_id) do
with tenant <- Tenants.Cache.get_tenant_by_external_id(tenant_id),
{:ok, node} <- Realtime.Nodes.get_node_for_tenant(tenant),
:ok <- :rpc.call(node, __MODULE__, :set_status, [tenant_id]) do
get_status(tenant_id)
end
end

defp set_status_backoff(tenant_id, times \\ 5, backoff \\ 500)
defp set_status_backoff(_, 0, _), do: {:error, :tenant_database_unavailable}

Expand Down

0 comments on commit 11e4a79

Please sign in to comment.