Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Connect to tenant database on channel join #677

Merged
merged 16 commits into from
Oct 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions lib/extensions/postgres_cdc_rls/cdc_rls.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ defmodule Extensions.PostgresCdcRls do
require Logger

alias RealtimeWeb.Endpoint
alias Realtime.PostgresCdc
alias Extensions.PostgresCdcRls, as: Rls
alias Rls.Subscriptions

Expand Down Expand Up @@ -61,8 +60,8 @@ defmodule Extensions.PostgresCdcRls do
## Internal functions

def start_distributed(%{"region" => region, "id" => tenant} = args) do
platform_region = PostgresCdc.platform_region_translator(region)
launch_node = PostgresCdc.launch_node(tenant, platform_region, node())
platform_region = Realtime.Nodes.platform_region_translator(region)
launch_node = Realtime.Nodes.launch_node(tenant, platform_region, node())

Logger.warning(
"Starting distributed postgres extension #{inspect(lauch_node: launch_node, region: region, platform_region: platform_region)}"
Expand Down
8 changes: 4 additions & 4 deletions lib/extensions/postgres_cdc_rls/supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ defmodule Extensions.PostgresCdcRls.Supervisor do
"""
use Supervisor

alias Extensions.PostgresCdcRls, as: Rls
alias Extensions.PostgresCdcRls

@spec start_link :: :ignore | {:error, any} | {:ok, pid}
def start_link() do
Expand All @@ -15,16 +15,16 @@ defmodule Extensions.PostgresCdcRls.Supervisor do
def init(_args) do
load_migrations_modules()

:syn.set_event_handler(Rls.SynHandler)
:syn.add_node_to_scopes([Rls])
:syn.set_event_handler(Realtime.SynHandler)
:syn.add_node_to_scopes([PostgresCdcRls])

children = [
{
PartitionSupervisor,
partitions: 20,
child_spec: DynamicSupervisor,
strategy: :one_for_one,
name: Rls.DynamicSupervisor
name: PostgresCdcRls.DynamicSupervisor
}
]

Expand Down
69 changes: 0 additions & 69 deletions lib/extensions/postgres_cdc_rls/syn_handler.ex

This file was deleted.

12 changes: 4 additions & 8 deletions lib/extensions/postgres_cdc_stream/cdc_stream.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ defmodule Extensions.PostgresCdcStream do

require Logger

alias Realtime.PostgresCdc
alias Extensions.PostgresCdcStream, as: Stream

def handle_connect(opts) do
Expand Down Expand Up @@ -47,17 +46,14 @@ defmodule Extensions.PostgresCdcStream do
def get_manager_conn(id) do
Phoenix.Tracker.get_by_key(Stream.Tracker, "postgres_cdc_stream", id)
|> case do
[] ->
{:error, nil}

[{_, %{manager_pid: pid, conn: conn}}] ->
{:ok, pid, conn}
[] -> {:error, nil}
[{_, %{manager_pid: pid, conn: conn}}] -> {:ok, pid, conn}
end
end

def start_distributed(%{"region" => region, "id" => tenant} = args) do
platform_region = PostgresCdc.platform_region_translator(region)
launch_node = PostgresCdc.launch_node(tenant, platform_region, node())
platform_region = Realtime.Nodes.platform_region_translator(region)
launch_node = Realtime.Nodes.launch_node(tenant, platform_region, node())

Logger.warning(
"Starting distributed postgres extension #{inspect(lauch_node: launch_node, region: region, platform_region: platform_region)}"
Expand Down
9 changes: 7 additions & 2 deletions lib/realtime/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@
name: Realtime.Registry.Unique
)

:syn.add_node_to_scopes([:users, RegionNodes])
:ok = :syn.add_node_to_scopes([Realtime.Tenants.Connect])
:ok = :syn.add_node_to_scopes([:users, RegionNodes])
region = Application.get_env(:realtime, :region)
:syn.join(RegionNodes, region, self(), node: node())

Expand All @@ -62,11 +63,15 @@
RealtimeWeb.Presence,
{Task.Supervisor, name: Realtime.TaskSupervisor},
Realtime.Latency,
Realtime.Telemetry.Logger
Realtime.Telemetry.Logger,
{PartitionSupervisor,
child_spec: DynamicSupervisor,
strategy: :one_for_one,
name: Realtime.Tenants.Connect.DynamicSupervisor}
] ++ extensions_supervisors()

children =
case Realtime.Repo.Replica.replica() do

Check warning on line 74 in lib/realtime/application.ex

View workflow job for this annotation

GitHub Actions / Formatting Checks

Nested modules could be aliased at the top of the invoking module.
Realtime.Repo -> children
replica -> List.insert_at(children, 2, replica)
end
Expand Down
80 changes: 78 additions & 2 deletions lib/realtime/helpers.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,11 @@ defmodule Realtime.Helpers do
This module includes helper functions for different contexts that can't be union in one module.
"""

alias Realtime.Api.Tenant
alias Realtime.PostgresCdc

require Logger

@spec cancel_timer(reference() | nil) :: non_neg_integer() | false | :ok | nil
def cancel_timer(nil), do: nil
def cancel_timer(ref), do: Process.cancel_timer(ref)
Expand All @@ -21,6 +26,32 @@ defmodule Realtime.Helpers do
|> unpad()
end

@spec connect_db(%{
:host => binary,
:name => binary,
:pass => binary,
:pool => non_neg_integer,
:port => binary,
:queue_target => non_neg_integer,
:socket_opts => list,
:ssl_enforced => boolean,
:user => binary,
optional(any) => any
}) :: {:error, any} | {:ok, pid}
def connect_db(%{
host: host,
port: port,
name: name,
user: user,
pass: pass,
socket_opts: socket_opts,
pool: pool,
queue_target: queue_target,
ssl_enforced: ssl_enforced
}) do
connect_db(host, port, name, user, pass, socket_opts, pool, queue_target, ssl_enforced)
end

@spec connect_db(
String.t(),
String.t(),
Expand Down Expand Up @@ -65,6 +96,51 @@ defmodule Realtime.Helpers do
|> Postgrex.start_link()
end

@cdc "postgres_cdc_rls"
@doc """
Checks if the Tenant CDC extension information is properly configured and that we're able to query against the tenant database.
"""
@spec check_tenant_connection(Tenant.t()) :: {:error, atom()} | {:ok, pid()}
def check_tenant_connection(nil), do: {:error, :tenant_not_found}

def check_tenant_connection(tenant) do
tenant
|> then(&PostgresCdc.filter_settings(@cdc, &1.extensions))
|> then(fn settings ->
ssl_enforced = default_ssl_param(settings)

host = settings["db_host"]
port = settings["db_port"]
name = settings["db_name"]
user = settings["db_user"]
password = settings["db_password"]
socket_opts = settings["db_socket_opts"]

opts = %{
host: host,
port: port,
name: name,
user: user,
pass: password,
socket_opts: socket_opts,
pool: 1,
queue_target: 1000,
ssl_enforced: ssl_enforced
}

with {:ok, conn} <- connect_db(opts) do
case Postgrex.query(conn, "SELECT 1", []) do
{:ok, _} ->
{:ok, conn}

{:error, e} ->
Logger.error("Error connecting to tenant database: #{inspect(e)}")
{:error, :tenant_database_unavailable}
end
end
end)
end

@spec default_ssl_param(map) :: boolean
def default_ssl_param(%{"ssl_enforced" => ssl_enforced}) when is_boolean(ssl_enforced),
do: ssl_enforced
Expand Down Expand Up @@ -198,8 +274,8 @@ defmodule Realtime.Helpers do
Enum.reduce(:syn.group_names(:users), 0, fn tenant, acc ->
case :syn.lookup(Extensions.PostgresCdcRls, tenant) do
{pid, %{region: region}} ->
platform_region = Realtime.PostgresCdc.platform_region_translator(region)
launch_node = Realtime.PostgresCdc.launch_node(tenant, platform_region, false)
platform_region = Realtime.Nodes.platform_region_translator(region)
launch_node = Realtime.Nodes.launch_node(tenant, platform_region, false)
current_node = node(pid)

case launch_node do
Expand Down
118 changes: 118 additions & 0 deletions lib/realtime/nodes.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
defmodule Realtime.Nodes do
@moduledoc """
Handles common needs for :syn module operations
"""
require Logger
alias Realtime.Api.Tenant

@doc """
Gets the node to launch the Postgres connection on for a tenant.
"""
@spec get_node_for_tenant(Tenant.t()) :: {:ok, node()} | {:error, term()}
def get_node_for_tenant(nil), do: {:error, :tenant_not_found}

def get_node_for_tenant(%Tenant{extensions: extensions, external_id: tenant_id}) do
with region <- get_region(extensions),
tenant_region <- platform_region_translator(region),
node <- launch_node(tenant_id, tenant_region, node()) do
{:ok, node}
end
end

defp get_region(extensions) do
extensions
|> Enum.map(fn %{settings: %{"region" => region}} -> region end)
|> Enum.uniq()
|> hd()
end

@doc """
Translates a region from a platform to the closest Supabase tenant region
"""
@spec platform_region_translator(String.t()) :: nil | binary()
def platform_region_translator(tenant_region) when is_binary(tenant_region) do
platform = Application.get_env(:realtime, :platform)
region_mapping(platform, tenant_region)
end

defp region_mapping(:aws, tenant_region) do
case tenant_region do
"us-west-1" -> "us-west-1"
"us-west-2" -> "us-west-1"
"us-east-1" -> "us-east-1"
"sa-east-1" -> "us-east-1"
"ca-central-1" -> "us-east-1"
"ap-southeast-1" -> "ap-southeast-1"
"ap-northeast-1" -> "ap-southeast-1"
"ap-northeast-2" -> "ap-southeast-1"
"ap-southeast-2" -> "ap-southeast-2"
"ap-south-1" -> "ap-southeast-1"
"eu-west-1" -> "eu-west-2"
"eu-west-2" -> "eu-west-2"
"eu-west-3" -> "eu-west-2"
"eu-central-1" -> "eu-west-2"
_ -> nil
end
end

defp region_mapping(:fly, tenant_region) do
case tenant_region do
"us-east-1" -> "iad"
"us-west-1" -> "sea"
"sa-east-1" -> "iad"
"ca-central-1" -> "iad"
"ap-southeast-1" -> "syd"
"ap-northeast-1" -> "syd"
"ap-northeast-2" -> "syd"
"ap-southeast-2" -> "syd"
"ap-south-1" -> "syd"
"eu-west-1" -> "lhr"
"eu-west-2" -> "lhr"
"eu-west-3" -> "lhr"
"eu-central-1" -> "lhr"
_ -> nil
end
end

defp region_mapping(_, tenant_region), do: tenant_region

@doc """
Lists the nodes in a region. Sorts by node name in case the list order
is unstable.
"""

@spec region_nodes(String.t()) :: [atom()]
def region_nodes(region) when is_binary(region) do
:syn.members(RegionNodes, region)
|> Enum.map(fn {_pid, [node: node]} -> node end)
|> Enum.sort()
end

@doc """
Picks the node to launch the Postgres connection on.

If there are not two nodes in a region the connection is established from
the `default` node given.
"""
@spec launch_node(String.t(), String.t(), atom()) :: atom()
def launch_node(tenant_id, region, default) do
case region_nodes(region) do
[node] ->
Logger.warning(
"Only one region node (#{inspect(node)}) for #{region} using default #{inspect(default)}"
)

default

[] ->
Logger.warning("Zero region nodes for #{region} using #{inspect(default)}")
default

regions_nodes ->
member_count = Enum.count(regions_nodes)
index = :erlang.phash2(tenant_id, member_count)

Enum.fetch!(regions_nodes, index)
end
end
end
Loading
Loading