Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 13 additions & 19 deletions lib/realtime/tenants.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,7 @@ defmodule Realtime.Tenants do

require Logger
alias Realtime.Repo.Replica
alias Realtime.Api
alias Realtime.{Api.Tenant, PostgresCdc, UsersCounter}
alias Realtime.{Api.Tenant, UsersCounter, Tenants}

@doc """
Gets a list of connected tenant `external_id` strings in the cluster or a node.
Expand All @@ -23,21 +22,20 @@ defmodule Realtime.Tenants do
end

@doc """
Gets the database connection pid of the SubscriptionManager `manager` and the
Postgrex connection pool `subs_pool`.

When this function returns `:wait` the database connection tree is starting and the
caller should not try to start the database connection tree again.
Gets the database connection pid managed by the Tenants.Connect process.

## Examples

iex> get_manager_conn(Extensions.PostgresCdcRls, "not_started_external_id")
{:error, nil}
iex> get_health_conn(%Tenant{external_id: "not_found_tenant"})
{:error, :tenant_database_unavailable}
"""

@spec get_manager_conn(:atom, %Tenant{}) :: {:error, :wait | nil} | {:ok, pid(), pid()}
def get_manager_conn(module, %Tenant{external_id: external_id}) do
module.get_manager_conn(external_id)
@spec get_health_conn(%Tenant{}) :: {:error, term()} | {:ok, pid()}
def get_health_conn(%Tenant{external_id: external_id}) do
case Tenants.Connect.get_status(external_id) do
{:ok, conn} -> {:ok, conn}
{:error, reason} -> {:error, reason}
end
end

@doc """
Expand All @@ -60,19 +58,15 @@ defmodule Realtime.Tenants do
| %{connected_cluster: pos_integer, db_connected: false, healthy: false}}
| {:ok, %{connected_cluster: non_neg_integer, db_connected: true, healthy: true}}
def health_check(external_id) when is_binary(external_id) do
with %Tenant{} = tenant <- Api.get_tenant_by_external_id(external_id),
{:ok, module} <- PostgresCdc.driver(tenant.postgres_cdc_default),
{:error, _} <- get_manager_conn(module, tenant),
with %Tenant{} = tenant <- Tenants.Cache.get_tenant_by_external_id(external_id),
{:error, _} <- get_health_conn(tenant),
connected_cluster when connected_cluster > 0 <- UsersCounter.tenant_users(external_id) do
{:error, %{healthy: false, db_connected: false, connected_cluster: connected_cluster}}
else
nil ->
{:error, :tenant_not_found}

{:error, _mod} ->
{:error, "Bad value for tenant field `postgres_cdc_default`"}

{:ok, _manager, _subs_pool} ->
{:ok, _health_conn} ->
connected_cluster = UsersCounter.tenant_users(external_id)

{:ok, %{healthy: true, db_connected: true, connected_cluster: connected_cluster}}
Expand Down
25 changes: 16 additions & 9 deletions lib/realtime/tenants/connect.ex
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,23 @@ defmodule Realtime.Tenants.Connect do
@spec lookup_or_start_connection(binary()) :: {:ok, DBConnection.t()} | {:error, term()}
def lookup_or_start_connection(tenant_id) do
case get_status(tenant_id) do
:undefined -> call_external_node(tenant_id)
{:ok, conn} -> {:ok, conn}
_ -> {:error, :tenant_database_unavailable}
{:error, :tenant_database_unavailable} -> call_external_node(tenant_id)
{:error, :initializing} -> {:error, :tenant_database_unavailable}
end
end

@doc """
Returns the database connection pid from :syn if it exists.
"""

@spec get_status(binary()) ::
{:ok, DBConnection.t()} | {:error, :tenant_database_unavailable | :initializing}
def get_status(tenant_id) do
case :syn.lookup(__MODULE__, tenant_id) do
{_, %{conn: conn}} when not is_nil(conn) -> {:ok, conn}
{_, %{conn: nil}} -> {:error, :initializing}
_error -> {:error, :tenant_database_unavailable}
end
end

Expand Down Expand Up @@ -74,13 +88,6 @@ defmodule Realtime.Tenants.Connect do

## Private functions

defp get_status(tenant_id) do
case :syn.lookup(__MODULE__, tenant_id) do
{_, %{conn: conn}} when not is_nil(conn) -> {:ok, conn}
error -> error
end
end

defp call_external_node(tenant_id) do
with tenant <- Tenants.Cache.get_tenant_by_external_id(tenant_id),
{:ok, node} <- Realtime.Nodes.get_node_for_tenant(tenant) do
Expand Down
5 changes: 0 additions & 5 deletions lib/realtime_web/controllers/tenant_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -261,11 +261,6 @@ defmodule RealtimeWeb.TenantController do
conn
|> put_status(404)
|> render("not_found.json", tenant: nil)

{:error, message} when is_binary(message) ->
conn
|> put_status(422)
|> json(%{errors: [message]})
end
end
end
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ defmodule Realtime.MixProject do
def project do
[
app: :realtime,
version: "2.25.0",
version: "2.25.1",
elixir: "~> 1.14.0",
elixirc_paths: elixirc_paths(Mix.env()),
start_permanent: Mix.env() == :prod,
Expand Down
9 changes: 5 additions & 4 deletions test/realtime_web/controllers/tenant_controller_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -211,10 +211,11 @@ defmodule RealtimeWeb.TenantControllerTest do
UsersCounter.add(self(), ext_id)

# Fake a db connection
{:ok, mod} = PostgresCdc.driver(tenant.postgres_cdc_default)
:syn.add_node_to_scopes([mod])
:syn.register(mod, ext_id, self(), %{region: nil, manager: nil, subs_pool: nil})
mod.update_meta(ext_id, self(), self())
:syn.register(Realtime.Tenants.Connect, ext_id, self(), %{conn: nil})

:syn.update_registry(Realtime.Tenants.Connect, ext_id, fn _pid, meta ->
%{meta | conn: conn}
end)

conn = get(conn, Routes.tenant_path(conn, :health, ext_id))
data = json_response(conn, 200)["data"]
Expand Down