Skip to content

Commit

Permalink
* Extract Node logic to new module
Browse files Browse the repository at this point in the history
* Create Check module (might move later to a partitioned mode)
  • Loading branch information
filipecabaco committed Oct 10, 2023
1 parent e68c1a7 commit 9a742c8
Show file tree
Hide file tree
Showing 14 changed files with 263 additions and 238 deletions.
5 changes: 2 additions & 3 deletions lib/extensions/postgres_cdc_rls/cdc_rls.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ defmodule Extensions.PostgresCdcRls do
require Logger

alias RealtimeWeb.Endpoint
alias Realtime.PostgresCdc
alias Extensions.PostgresCdcRls, as: Rls
alias Rls.Subscriptions

Expand Down Expand Up @@ -61,8 +60,8 @@ defmodule Extensions.PostgresCdcRls do
## Internal functions

def start_distributed(%{"region" => region, "id" => tenant} = args) do
platform_region = PostgresCdc.platform_region_translator(region)
launch_node = PostgresCdc.launch_node(tenant, platform_region, node())
platform_region = Realtime.Nodes.platform_region_translator(region)
launch_node = Realtime.Nodes.launch_node(tenant, platform_region, node())

Logger.warning(
"Starting distributed postgres extension #{inspect(lauch_node: launch_node, region: region, platform_region: platform_region)}"
Expand Down
7 changes: 3 additions & 4 deletions lib/extensions/postgres_cdc_stream/cdc_stream.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ defmodule Extensions.PostgresCdcStream do

require Logger

alias Realtime.PostgresCdc
alias Extensions.PostgresCdcStream, as: Stream

def handle_connect(opts) do
Expand Down Expand Up @@ -41,7 +40,7 @@ defmodule Extensions.PostgresCdcStream do
end
end

@spec get_manager_conn(String.t()) :: {:error, nil} | {:ok, pid(), pid()}
@spec get_manager_conn(String.t()) :: nil | {:ok, pid(), pid()}
def get_manager_conn(id) do
case Phoenix.Tracker.get_by_key(Stream.Tracker, "postgres_cdc_rls_stream", id) do
[] -> nil
Expand All @@ -50,8 +49,8 @@ defmodule Extensions.PostgresCdcStream do
end

def start_distributed(%{"region" => region, "id" => tenant} = args) do
platform_region = PostgresCdc.platform_region_translator(region)
launch_node = PostgresCdc.launch_node(tenant, platform_region, node())
platform_region = Realtime.Nodes.platform_region_translator(region)
launch_node = Realtime.Nodes.launch_node(tenant, platform_region, node())

Logger.warning(
"Starting distributed postgres extension #{inspect(lauch_node: launch_node, region: region, platform_region: platform_region)}"
Expand Down
3 changes: 2 additions & 1 deletion lib/realtime/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ defmodule Realtime.Application do
RealtimeWeb.Presence,
{Task.Supervisor, name: Realtime.TaskSupervisor},
Realtime.Latency,
Realtime.Telemetry.Logger
Realtime.Telemetry.Logger,
Realtime.Tenants.Check
] ++ extensions_supervisors()

children =
Expand Down
4 changes: 2 additions & 2 deletions lib/realtime/helpers.ex
Original file line number Diff line number Diff line change
Expand Up @@ -224,8 +224,8 @@ defmodule Realtime.Helpers do
Enum.reduce(:syn.group_names(:users), 0, fn tenant, acc ->
case :syn.lookup(Extensions.PostgresCdcRls, tenant) do
{pid, %{region: region}} ->
platform_region = Realtime.PostgresCdc.platform_region_translator(region)
launch_node = Realtime.PostgresCdc.launch_node(tenant, platform_region, false)
platform_region = Realtime.Nodes.platform_region_translator(region)
launch_node = Realtime.Nodes.launch_node(tenant, platform_region, false)
current_node = node(pid)

case launch_node do
Expand Down
117 changes: 117 additions & 0 deletions lib/realtime/nodes.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
defmodule Realtime.Nodes do
@moduledoc """
Handles common needs for :syn module operations
"""
require Logger
alias Realtime.Tenants

@doc """
Gets the node to launch the Postgres connection on for a tenant.
"""
@spec get_node_for_tenant_id(String.t()) :: node()
def get_node_for_tenant_id(tenant_id) do
with %{extensions: extensions} <- Tenants.get_tenant_by_external_id(tenant_id),
region <- get_region(extensions),
tenant_region <- platform_region_translator(region),
node <- launch_node(tenant_id, tenant_region, node()) do
node
end
end

defp get_region(extensions) do
extensions
|> Enum.map(fn %{settings: %{"region" => region}} -> region end)
|> Enum.uniq()
|> hd()
end

@doc """
Translates a region from a platform to the closest Supabase tenant region
"""
@spec platform_region_translator(String.t()) :: nil | binary()
def platform_region_translator(tenant_region) when is_binary(tenant_region) do
platform = Application.get_env(:realtime, :platform)
region_mapping(platform, tenant_region)
end

defp region_mapping(:aws, tenant_region) do
case tenant_region do
"us-west-1" -> "us-west-1"
"us-west-2" -> "us-west-1"
"us-east-1" -> "us-east-1"
"sa-east-1" -> "us-east-1"
"ca-central-1" -> "us-east-1"
"ap-southeast-1" -> "ap-southeast-1"
"ap-northeast-1" -> "ap-southeast-1"
"ap-northeast-2" -> "ap-southeast-1"
"ap-southeast-2" -> "ap-southeast-2"
"ap-south-1" -> "ap-southeast-1"
"eu-west-1" -> "eu-west-2"
"eu-west-2" -> "eu-west-2"
"eu-west-3" -> "eu-west-2"
"eu-central-1" -> "eu-west-2"
_ -> nil
end
end

defp region_mapping(:fly, tenant_region) do
case tenant_region do
"us-east-1" -> "iad"
"us-west-1" -> "sea"
"sa-east-1" -> "iad"
"ca-central-1" -> "iad"
"ap-southeast-1" -> "syd"
"ap-northeast-1" -> "syd"
"ap-northeast-2" -> "syd"
"ap-southeast-2" -> "syd"
"ap-south-1" -> "syd"
"eu-west-1" -> "lhr"
"eu-west-2" -> "lhr"
"eu-west-3" -> "lhr"
"eu-central-1" -> "lhr"
_ -> nil
end
end

defp region_mapping(_, tenant_region), do: tenant_region

@doc """
Lists the nodes in a region. Sorts by node name in case the list order
is unstable.
"""

@spec region_nodes(String.t()) :: [atom()]
def region_nodes(region) when is_binary(region) do
:syn.members(RegionNodes, region)
|> Enum.map(fn {_pid, [node: node]} -> node end)
|> Enum.sort()
end

@doc """
Picks the node to launch the Postgres connection on.
If there are not two nodes in a region the connection is established from
the `default` node given.
"""
@spec launch_node(String.t(), String.t(), atom()) :: atom()
def launch_node(tenant_id, region, default) do
case region_nodes(region) do
[node] ->
Logger.warning(
"Only one region node (#{inspect(node)}) for #{region} using default #{inspect(default)}"
)

default

[] ->
Logger.warning("Zero region nodes for #{region} using #{inspect(default)}")
default

regions_nodes ->
member_count = Enum.count(regions_nodes)
index = :erlang.phash2(tenant_id, member_count)

Enum.at(regions_nodes, index)
end
end
end
92 changes: 0 additions & 92 deletions lib/realtime/postgres_cdc.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ defmodule Realtime.PostgresCdc do
@moduledoc false

require Logger

@timeout 10_000
@extensions Application.compile_env(:realtime, :extensions)

Expand Down Expand Up @@ -64,97 +63,6 @@ defmodule Realtime.PostgresCdc do
end
end

@doc """
Translates a region from a platform to the closest Supabase tenant region
"""
@spec platform_region_translator(String.t()) :: nil | binary()
def platform_region_translator(tenant_region) when is_binary(tenant_region) do
platform = Application.get_env(:realtime, :platform)
region_mapping(platform, tenant_region)
end

defp region_mapping(:aws, tenant_region) do
case tenant_region do
"us-west-1" -> "us-west-1"
"us-west-2" -> "us-west-1"
"us-east-1" -> "us-east-1"
"sa-east-1" -> "us-east-1"
"ca-central-1" -> "us-east-1"
"ap-southeast-1" -> "ap-southeast-1"
"ap-northeast-1" -> "ap-southeast-1"
"ap-northeast-2" -> "ap-southeast-1"
"ap-southeast-2" -> "ap-southeast-2"
"ap-south-1" -> "ap-southeast-1"
"eu-west-1" -> "eu-west-2"
"eu-west-2" -> "eu-west-2"
"eu-west-3" -> "eu-west-2"
"eu-central-1" -> "eu-west-2"
_ -> nil
end
end

defp region_mapping(:fly, tenant_region) do
case tenant_region do
"us-east-1" -> "iad"
"us-west-1" -> "sea"
"sa-east-1" -> "iad"
"ca-central-1" -> "iad"
"ap-southeast-1" -> "syd"
"ap-northeast-1" -> "syd"
"ap-northeast-2" -> "syd"
"ap-southeast-2" -> "syd"
"ap-south-1" -> "syd"
"eu-west-1" -> "lhr"
"eu-west-2" -> "lhr"
"eu-west-3" -> "lhr"
"eu-central-1" -> "lhr"
_ -> nil
end
end

defp region_mapping(_, tenant_region), do: tenant_region

@doc """
Lists the nodes in a region. Sorts by node name in case the list order
is unstable.
"""

@spec region_nodes(String.t()) :: [atom()]
def region_nodes(region) when is_binary(region) do
:syn.members(RegionNodes, region)
|> Enum.map(fn {_pid, [node: node]} -> node end)
|> Enum.sort()
end

@doc """
Picks the node to launch the Postgres connection on.
If there are not two nodes in a region the connection is established from
the `default` node given.
"""

@spec launch_node(String.t(), String.t(), atom()) :: atom()
def launch_node(tenant, fly_region, default) do
case region_nodes(fly_region) do
[node] ->
Logger.warning(
"Only one region node (#{inspect(node)}) for #{fly_region} using default #{inspect(default)}"
)

default

[] ->
Logger.warning("Zero region nodes for #{fly_region} using #{inspect(default)}")
default

regions_nodes ->
member_count = Enum.count(regions_nodes)
index = :erlang.phash2(tenant, member_count)

Enum.at(regions_nodes, index)
end
end

@callback handle_connect(any()) :: {:ok, any()} | nil
@callback handle_after_connect(any(), any(), any()) :: {:ok, any()} | {:error, any()}
@callback handle_subscribe(any(), any(), any()) :: :ok
Expand Down
2 changes: 0 additions & 2 deletions lib/realtime/syn.ex

This file was deleted.

2 changes: 1 addition & 1 deletion lib/realtime/syn_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ defmodule Realtime.SynHandler do
end

def resolve_registry_conflict(mod, name, {pid1, %{region: region}, time1}, {pid2, _, time2}) do
platform_region = Realtime.PostgresCdc.platform_region_translator(region)
platform_region = Realtime.Nodes.platform_region_translator(region)

platform_region_nodes =
RegionNodes
Expand Down
Loading

0 comments on commit 9a742c8

Please sign in to comment.