Skip to content

Commit

Permalink
fix: listen on notifications for error handling
Browse files Browse the repository at this point in the history
* Adds Realtime.Tenants.Listen to receive error messages from functions so we can better inform our users
* Changes send function to not handle any partition creation
* Changes send function to pg_notify to `realtime:system` so we can capture and handle in Realtime
* Increases number of future partitions to 10 days to prevent issues
  • Loading branch information
filipecabaco committed Dec 20, 2024
1 parent 821bd77 commit 72ff20a
Show file tree
Hide file tree
Showing 11 changed files with 262 additions and 71 deletions.
6 changes: 5 additions & 1 deletion lib/realtime/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,11 @@ defmodule Realtime.Application do
{PartitionSupervisor,
child_spec: DynamicSupervisor,
strategy: :one_for_one,
name: Realtime.BroadcastChanges.Handler.DynamicSupervisor},
name: Realtime.Tenants.BroadcastChanges.Handler.DynamicSupervisor},
{PartitionSupervisor,
child_spec: DynamicSupervisor,
strategy: :one_for_one,
name: Realtime.Tenants.Listen.DynamicSupervisor},
RealtimeWeb.Endpoint,
RealtimeWeb.Presence,
Realtime.MetricsCleaner
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
defmodule Realtime.BroadcastChanges.Handler do
defmodule Realtime.Tenants.BroadcastChanges.Handler do
@moduledoc """
BroadcastChanges is a module that provides a way to stream data from a PostgreSQL database using logical replication.
Expand Down Expand Up @@ -327,7 +327,8 @@ defmodule Realtime.BroadcastChanges.Handler do

@spec supervisor_spec(Tenant.t()) :: term()
def supervisor_spec(%Tenant{external_id: tenant_id}) do
{:via, PartitionSupervisor, {Realtime.BroadcastChanges.Handler.DynamicSupervisor, tenant_id}}
{:via, PartitionSupervisor,
{Realtime.Tenants.BroadcastChanges.Handler.DynamicSupervisor, tenant_id}}
end

def publication_name(%__MODULE__{table: table, schema: schema}) do
Expand Down
41 changes: 30 additions & 11 deletions lib/realtime/tenants/connect.ex
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,15 @@ defmodule Realtime.Tenants.Connect do
import Realtime.Logs

alias Realtime.Api.Tenant
alias Realtime.BroadcastChanges.Handler
alias Realtime.Rpc
alias Realtime.Tenants
alias Realtime.Tenants.BroadcastChanges.Handler
alias Realtime.Tenants.Connect.CheckConnection
alias Realtime.Tenants.Connect.GetTenant
alias Realtime.Tenants.Connect.Piper
alias Realtime.Tenants.Connect.RegisterProcess
alias Realtime.Tenants.Connect.StartCounters
alias Realtime.Tenants.Listen
alias Realtime.Tenants.Migrations
alias Realtime.UsersCounter

Expand All @@ -38,6 +39,7 @@ defmodule Realtime.Tenants.Connect do
db_conn_reference: nil,
db_conn_pid: nil,
broadcast_changes_pid: nil,
listen_pid: nil,
check_connected_user_interval: nil,
connected_users_bucket: [1]

Expand Down Expand Up @@ -166,8 +168,9 @@ defmodule Realtime.Tenants.Connect do

with :ok <- Migrations.maybe_run_migrations(db_conn_pid, tenant),
:ok <- Migrations.create_partitions(db_conn_pid),
{:ok, broadcast_changes_pid} <- start_replication(tenant) do
{:noreply, %{state | broadcast_changes_pid: broadcast_changes_pid},
{:ok, broadcast_changes_pid} <- start_replication(tenant),
{:ok, listen_pid} <- start_listen(tenant) do
{:noreply, %{state | broadcast_changes_pid: broadcast_changes_pid, listen_pid: listen_pid},
{:continue, :setup_connected_user_events}}
else
error ->
Expand Down Expand Up @@ -210,29 +213,41 @@ defmodule Realtime.Tenants.Connect do
{:noreply, %{state | connected_users_bucket: connected_users_bucket}}
end

def handle_info(
:shutdown,
%{db_conn_pid: db_conn_pid, broadcast_changes_pid: broadcast_changes_pid} = state
) do
def handle_info(:shutdown, state) do
%{
db_conn_pid: db_conn_pid,
broadcast_changes_pid: broadcast_changes_pid,
listen_pid: listen_pid
} = state

Logger.info("Tenant has no connected users, database connection will be terminated")
:ok = GenServer.stop(db_conn_pid, :normal, 500)

broadcast_changes_pid && Process.alive?(broadcast_changes_pid) &&
GenServer.stop(broadcast_changes_pid, :normal, 500)

listen_pid && Process.alive?(listen_pid) &&
GenServer.stop(listen_pid, :normal, 500)

{:stop, :normal, state}
end

def handle_info(
{:suspend_tenant, _},
%{db_conn_pid: db_conn_pid, broadcast_changes_pid: broadcast_changes_pid} = state
) do
def handle_info({:suspend_tenant, _}, state) do
%{
db_conn_pid: db_conn_pid,
broadcast_changes_pid: broadcast_changes_pid,
listen_pid: listen_pid
} = state

Logger.warning("Tenant was suspended, database connection will be terminated")
:ok = GenServer.stop(db_conn_pid, :normal, 500)

broadcast_changes_pid && Process.alive?(broadcast_changes_pid) &&
GenServer.stop(broadcast_changes_pid, :normal, 500)

listen_pid && Process.alive?(listen_pid) &&
GenServer.stop(listen_pid, :normal, 500)

{:stop, :normal, state}
end

Expand Down Expand Up @@ -316,4 +331,8 @@ defmodule Realtime.Tenants.Connect do
error -> {:error, error}
end
end

defp start_listen(%{notify_private_alpha: false}), do: {:ok, nil}

defp start_listen(tenant), do: Listen.start(tenant)
end
95 changes: 95 additions & 0 deletions lib/realtime/tenants/listen.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
defmodule Realtime.Tenants.Listen do
@moduledoc """
Listen for Postgres notifications to identify issues with the functions that are being called in tenants database
"""
use GenServer, restart: :transient
require Logger
alias Realtime.Logs
alias Realtime.Api.Tenant
alias Realtime.Database
alias Realtime.PostgresCdc
alias Realtime.Registry.Unique

defstruct tenant_id: nil, listen_conn: nil

@cdc "postgres_cdc_rls"
@topic "realtime:system"
def start_link(%Tenant{} = tenant) do
name = {:via, Registry, {Unique, {__MODULE__, :tenant_id, tenant.external_id}}}
GenServer.start_link(__MODULE__, tenant, name: name)
end

def init(%Tenant{external_id: external_id} = tenant) do
Logger.metadata(external_id: external_id, project: external_id)

settings =
tenant
|> then(&PostgresCdc.filter_settings(@cdc, &1.extensions))
|> then(&Database.from_settings(&1, "realtime_listen", :rand_exp, true))
|> Map.from_struct()

name =
{:via, Registry,
{Realtime.Registry.Unique, {Postgrex.Notifications, :tenant_id, tenant.external_id}}}

settings =
settings
|> Map.put(:hostname, settings[:host])
|> Map.put(:database, settings[:name])
|> Map.put(:password, settings[:pass])
|> Map.put(:username, "postgres")
|> Map.put(:port, String.to_integer(settings[:port]))
|> Map.put(:ssl, settings[:ssl_enforced])
|> Map.put(:auto_reconnect, true)
|> Map.put(:name, name)
|> Enum.to_list()

Logger.info("Listening for notifications on #{@topic}")

case Postgrex.Notifications.start_link(settings) do
{:ok, conn} ->
Postgrex.Notifications.listen!(conn, @topic)
{:ok, %{tenant_id: tenant.external_id, listen_conn: conn}}

{:error, {:already_started, conn}} ->
Postgrex.Notifications.listen!(conn, @topic)
{:ok, %{tenant_id: tenant.external_id, listen_conn: conn}}

{:error, reason} ->
{:stop, reason}
end
catch
e -> {:stop, e}
end

@spec start(Realtime.Api.Tenant.t()) :: {:ok, pid()} | {:error, any()}
def start(%Tenant{} = tenant) do
supervisor = {:via, PartitionSupervisor, {Realtime.Tenants.Listen.DynamicSupervisor, self()}}
spec = {__MODULE__, tenant}

case DynamicSupervisor.start_child(supervisor, spec) do
{:ok, pid} -> {:ok, pid}
{:error, {:already_started, pid}} -> {:ok, pid}
error -> {:error, error}
end
catch
e -> {:error, e}
end

def handle_info({:notification, _, _, @topic, payload}, state) do
case Jason.decode(payload) do
{:ok, %{"function" => "realtime.send"} = parsed} when is_map_key(parsed, "error") ->
Logs.log_error("FailedSendFromDatabase", parsed)

{:error, _} ->
Logs.log_error("FailedToParseDiagnosticMessage", payload)

_ ->
:ok
end

{:noreply, state}
end

def handle_info(_, state), do: {:noreply, state}
end
10 changes: 6 additions & 4 deletions lib/realtime/tenants/migrations.ex
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ defmodule Realtime.Tenants.Migrations do
MessagesUsingUuid,
FixSendFunction,
RecreateEntityIndexUsingBtree,
FixSendFunctionPartitionCreation
FixSendFunctionPartitionCreation,
RealtimeSendHandleExceptionsRemovePartitionCreation
}

@migrations [
Expand Down Expand Up @@ -126,7 +127,8 @@ defmodule Realtime.Tenants.Migrations do
{20_241_108_114_728, MessagesUsingUuid},
{20_241_121_104_152, FixSendFunction},
{20_241_130_184_212, RecreateEntityIndexUsingBtree},
{20_241_220_035_512, FixSendFunctionPartitionCreation}
{20_241_220_035_512, FixSendFunctionPartitionCreation},
{20_241_220_123_912, RealtimeSendHandleExceptionsRemovePartitionCreation}
]

defstruct [:tenant_external_id, :settings]
Expand Down Expand Up @@ -252,9 +254,9 @@ defmodule Realtime.Tenants.Migrations do
Logger.info("Creating partitions for realtime.messages")
today = Date.utc_today()
yesterday = Date.add(today, -1)
tomorrow = Date.add(today, 1)
future = Date.add(today, 10)

dates = [yesterday, today, tomorrow]
dates = Date.range(yesterday, future)

Enum.each(dates, fn date ->
partition_name = "messages_#{date |> Date.to_iso8601() |> String.replace("-", "_")}"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
defmodule Realtime.Tenants.Migrations.RealtimeSendHandleExceptionsRemovePartitionCreation do
@moduledoc false
use Ecto.Migration

# We missed the schema prefix of `realtime.` in the create table partition statement
def change do
execute("""
CREATE OR REPLACE FUNCTION realtime.send(payload jsonb, event text, topic text, private boolean DEFAULT true ) RETURNS void
AS $$
BEGIN
BEGIN
-- Attempt to insert the message
INSERT INTO realtime.messages (payload, event, topic, private, extension)
VALUES (payload, event, topic, private, 'broadcast');
EXCEPTION
WHEN OTHERS THEN
-- Capture and notify the error
PERFORM pg_notify(
'realtime:system',
jsonb_build_object(
'error', SQLERRM,
'function', 'realtime.send',
'event', event,
'topic', topic,
'private', private
)::text
);
END;
END;
$$
LANGUAGE plpgsql;
""")
end
end
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ defmodule Realtime.MixProject do
def project do
[
app: :realtime,
version: "2.33.71",
version: "2.33.72",
elixir: "~> 1.17.3",
elixirc_paths: elixirc_paths(Mix.env()),
start_permanent: Mix.env() == :prod,
Expand Down
Loading

0 comments on commit 72ff20a

Please sign in to comment.