Skip to content

Commit

Permalink
fix: Improve failure telemetry (#1307)
Browse files Browse the repository at this point in the history
  • Loading branch information
filipecabaco authored Feb 21, 2025
1 parent b7bfb91 commit 5192428
Show file tree
Hide file tree
Showing 10 changed files with 168 additions and 46 deletions.
8 changes: 6 additions & 2 deletions lib/realtime/monitoring/prom_ex.ex
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
defmodule Realtime.PromEx do
alias Realtime.PromEx.Plugins.{OsMon, Phoenix, Tenants, Tenant}

alias Realtime.Nodes
alias Realtime.PromEx.Plugins.Channels
alias Realtime.PromEx.Plugins.OsMon
alias Realtime.PromEx.Plugins.Phoenix
alias Realtime.PromEx.Plugins.Tenant
alias Realtime.PromEx.Plugins.Tenants

@moduledoc """
Be sure to add the following to finish setting up PromEx:
Expand Down Expand Up @@ -70,6 +73,7 @@ defmodule Realtime.PromEx do
{OsMon, poll_rate: poll_rate},
{Tenants, poll_rate: poll_rate},
{Tenant, poll_rate: poll_rate},
{Channels, poll_rate: poll_rate},
{PromEx.Plugins.Ecto, otp_app: :realtime, poll_rate: poll_rate, metric_prefix: [:ecto]}
]
end
Expand Down
20 changes: 20 additions & 0 deletions lib/realtime/monitoring/prom_ex/plugins/channels.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
defmodule Realtime.PromEx.Plugins.Channels do
@moduledoc """
Realtime channels monitoring plugin for PromEx
"""
use PromEx.Plugin
require Logger

@impl true
def event_metrics(_opts) do
Event.build(:realtime, [
counter(
[:realtime, :channel, :error],
event_name: [:realtime, :channel, :error],
measurement: :code,
tags: [:code],
description: "Count of errors in the Realtime channels initialization"
)
])
end
end
10 changes: 2 additions & 8 deletions lib/realtime/monitoring/prom_ex/plugins/tenants.ex
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,7 @@ defmodule Realtime.PromEx.Plugins.Tenants do
@event_connected [:prom_ex, :plugin, :realtime, :tenants, :connected]

@impl true
def event_metrics(opts) do
rpc_metrics(opts)
end

defp rpc_metrics(_opts) do
def event_metrics(_) do
Event.build(:realtime, [
distribution(
[:realtime, :rpc],
Expand Down Expand Up @@ -58,9 +54,7 @@ defmodule Realtime.PromEx.Plugins.Tenants do
-1
end

execute_metrics(@event_connected, %{
connected: connected
})
execute_metrics(@event_connected, %{connected: connected})
end

defp execute_metrics(event, metrics) do
Expand Down
50 changes: 31 additions & 19 deletions lib/realtime/rpc.ex
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,11 @@ defmodule Realtime.Rpc do
def call(node, mod, func, args, opts \\ []) do
timeout = Keyword.get(opts, :timeout, Application.get_env(:realtime, :rpc_timeout))
{latency, response} = :timer.tc(fn -> :rpc.call(node, mod, func, args, timeout) end)
tenant = Keyword.get(opts, :tenant, nil)

Telemetry.execute(
[:realtime, :tenants, :rpc],
[:realtime, :rpc],
%{latency: latency},
%{
tenant: tenant,
mod: mod,
func: func,
target_node: node,
origin_node: node()
}
%{mod: mod, func: func, target_node: node, origin_node: node()}
)

response
Expand All @@ -36,21 +29,36 @@ defmodule Realtime.Rpc do
def enhanced_call(node, mod, func, args \\ [], opts \\ []) do
timeout = Keyword.get(opts, :timeout, Application.get_env(:realtime, :rpc_timeout))

with {latency, {status, _} = response} <-
with {latency, response} <-
:timer.tc(fn -> :erpc.call(node, mod, func, args, timeout) end) do
Telemetry.execute(
[:realtime, :rpc],
%{latency: latency, success?: status == :ok},
%{mod: mod, func: func, target_node: node, origin_node: node()}
)

case response do
{status, _} when status in [:ok, :error] -> response
_ -> {:error, response}
{:ok, _} ->
Telemetry.execute(
[:realtime, :rpc],
%{latency: latency, success?: true},
%{mod: mod, func: func, target_node: node, origin_node: node()}
)

response

{:error, response} ->
Telemetry.execute(
[:realtime, :rpc],
%{latency: latency, success?: false},
%{mod: mod, func: func, target_node: node, origin_node: node()}
)

{:error, response}
end
end
catch
kind, reason ->
Telemetry.execute(
[:realtime, :rpc],
%{latency: 0, success?: false},
%{mod: mod, func: func, target_node: node, origin_node: node()}
)

log_error(
"ErrorOnRpcCall",
%{target: node, mod: mod, func: func, error: {kind, reason}},
Expand All @@ -59,6 +67,10 @@ defmodule Realtime.Rpc do
target: node
)

{:error, "RPC call error"}
case reason do
{:erpc, :timeout} -> {:error, :rpc_error, :timeout}
{:exception, error, _} -> {:error, :rpc_error, error}
_ -> {:error, reason}
end
end
end
14 changes: 14 additions & 0 deletions lib/realtime_web/channels/realtime_channel/logging.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ defmodule RealtimeWeb.RealtimeChannel.Logging do
"""
require Logger
import Realtime.Logs
alias Realtime.Telemetry

@doc """
Logs messages according to user options given on config
Expand All @@ -20,6 +21,17 @@ defmodule RealtimeWeb.RealtimeChannel.Logging do
socket
end

@doc """
List of errors that are system triggered and not user driven
"""
def system_errors,
do: [
"UnableToSetPolicies",
"InitializingProjectConnection",
"DatabaseConnectionIssue",
"UnknownErrorOnChannel"
]

@doc """
Logs errors in an expected format
"""
Expand All @@ -32,6 +44,8 @@ defmodule RealtimeWeb.RealtimeChannel.Logging do
def log_error_message(level, code, error, metadata \\ [])

def log_error_message(:error, code, error, metadata) do
if code in system_errors(), do: Telemetry.execute([:realtime, :channel, :error], %{code: code}, %{code: code})

log_error(code, error, metadata)
{:error, %{reason: error}}
end
Expand Down
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ defmodule Realtime.MixProject do
def project do
[
app: :realtime,
version: "2.34.29",
version: "2.34.30",
elixir: "~> 1.17.3",
elixirc_paths: elixirc_paths(Mix.env()),
start_permanent: Mix.env() == :prod,
Expand Down
19 changes: 9 additions & 10 deletions test/realtime/database_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@ defmodule Realtime.DatabaseTest do
use Realtime.DataCase, async: false

import ExUnit.CaptureLog
import Mock

alias Realtime.Database
doctest Realtime.Database
def handle_telemetry(event, metadata, _, pid: pid), do: send(pid, {event, metadata})

setup do
tenant = tenant_fixture()

:telemetry.attach(__MODULE__, [:realtime, :database, :transaction], &__MODULE__.handle_telemetry/4, pid: self())
on_exit(fn -> :telemetry.detach(__MODULE__) end)
# Ensure no replication slot is present before the test
Cleanup.ensure_no_replication_slot()

Expand Down Expand Up @@ -163,15 +164,13 @@ defmodule Realtime.DatabaseTest do
test "with telemetry event defined, emits telemetry event", %{db_conn: db_conn} do
event = [:realtime, :database, :transaction]

with_mock Realtime.Telemetry, execute: fn _, _, _ -> :ok end do
Database.transaction(
db_conn,
fn conn -> Postgrex.query!(conn, "SELECT pg_sleep(6)", []) end,
telemetry: event
)
Database.transaction(
db_conn,
fn conn -> Postgrex.query!(conn, "SELECT pg_sleep(6)", []) end,
telemetry: event
)

assert_called(Realtime.Telemetry.execute(event, %{latency: :_}, %{}))
end
assert_receive {^event, %{latency: _}}
end
end

Expand Down
19 changes: 19 additions & 0 deletions test/realtime/monitoring/prom_ex_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
defmodule Realtime.PromExTest do
use ExUnit.Case
doctest Realtime.PromEx
alias Realtime.PromEx

describe "get_metrics/0" do
test "builds metrics in prometheus format which includes host region and id" do
metrics = PromEx.get_metrics()

assert String.contains?(
metrics,
"# HELP beam_system_schedulers_online_info The number of scheduler threads that are online."
)

assert String.contains?(metrics, "# TYPE beam_system_schedulers_online_info gauge")
assert String.contains?(metrics, "beam_system_schedulers_online_info{host=\"nohost\",region=\"\",id=\"nohost\"}")
end
end
end
53 changes: 48 additions & 5 deletions test/realtime/rpc_test.exs
Original file line number Diff line number Diff line change
@@ -1,30 +1,73 @@
defmodule Realtime.RpcTest do
use ExUnit.Case
alias Realtime.Rpc

import ExUnit.CaptureLog

alias Realtime.Rpc

defmodule TestRpc do
def test_raise, do: raise("test")
def test_timeout, do: Process.sleep(1000)
def test_timeout, do: Process.sleep(200)
def test_success, do: {:ok, "success"}
end

def handle_telemetry(event, metadata, _, pid: pid), do: send(pid, {event, metadata})

setup do
:telemetry.attach(__MODULE__, [:realtime, :rpc], &__MODULE__.handle_telemetry/4, pid: self())
on_exit(fn -> :telemetry.detach(__MODULE__) end)
:ok
end

describe "call/5" do
test "successful RPC call returns exactly what the original function returns" do
assert {:ok, "success"} = Rpc.call(node(), TestRpc, :test_success, [])
assert_receive {[:realtime, :rpc], %{latency: _}}
end

test "raised exceptions are properly caught and logged" do
assert {:badrpc,
{:EXIT,
{%RuntimeError{message: "test"},
[
{Realtime.RpcTest.TestRpc, :test_raise, 0,
[file: ~c"test/realtime/rpc_test.exs", line: 9, error_info: %{module: Exception}]}
]}}} =
Rpc.call(node(), TestRpc, :test_raise, [])

assert_receive {[:realtime, :rpc], %{latency: _}}
end

test "timeouts are properly caught and logged" do
assert {:badrpc, :timeout} =
Rpc.call(node(), TestRpc, :test_timeout, [], timeout: 100)

assert_receive {[:realtime, :rpc], %{latency: _}}
end
end

describe "enhanced_call/5" do
test "successful RPC call returns exactly what the original function returns" do
assert {:ok, "success"} = Rpc.enhanced_call(node(), TestRpc, :test_success)
assert_receive {[:realtime, :rpc], %{latency: _, success?: true}}
end

test "raised exceptions are properly caught and logged" do
assert capture_log(fn ->
assert {:error, "RPC call error"} = Rpc.enhanced_call(node(), TestRpc, :test_raise)
assert {:error, :rpc_error, %RuntimeError{message: "test"}} =
Rpc.enhanced_call(node(), TestRpc, :test_raise)
end) =~ "ErrorOnRpcCall"

assert_receive {[:realtime, :rpc], %{latency: _, success?: false}}
end

test "timeouts are properly caught and logged" do
assert capture_log(fn ->
assert {:error, "RPC call error"} =
Rpc.enhanced_call(node(), TestRpc, :test_timeout, 500)
assert {:error, :rpc_error, :timeout} =
Rpc.enhanced_call(node(), TestRpc, :test_timeout, [], timeout: 100)
end) =~ "ErrorOnRpcCall"

assert_receive {[:realtime, :rpc], %{latency: 0, success?: false}}
end
end
end
19 changes: 18 additions & 1 deletion test/realtime_web/channels/realtime_channel/logging_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,12 @@ defmodule RealtimeWeb.RealtimeChannel.LoggingTest do
import ExUnit.CaptureLog
alias RealtimeWeb.RealtimeChannel.Logging

def handle_telemetry(event, measures, metadata, pid: pid), do: send(pid, {event, measures, metadata})

setup do
:telemetry.attach(__MODULE__, [:realtime, :channel, :error], &__MODULE__.handle_telemetry/4, pid: self())
on_exit(fn -> :telemetry.detach(__MODULE__) end)

level = Logger.level()
Logger.configure(level: :debug)
on_exit(fn -> Logger.configure(level: level) end)
Expand Down Expand Up @@ -38,9 +43,21 @@ defmodule RealtimeWeb.RealtimeChannel.LoggingTest do

test "handles error level errors" do
assert capture_log(fn ->
result = Logging.log_error_message(:error, :test_code, "test error")
result = Logging.log_error_message(:error, "TestCodeError", "test error")
assert {:error, %{reason: "test error"}} = result
end) =~ "test error"
end

test "only emits telemetry for system errors" do
errors = Logging.system_errors()

for error <- errors do
Logging.log_error_message(:error, error, "test error")
assert_receive {[:realtime, :channel, :error], %{code: ^error}, %{code: ^error}}
end

Logging.log_error_message(:error, "DatabaseConnectionIssue", "test error")
refute_receive {[:realtime, :channel, :error], %{code: "DatabaseConnectionIssue"}, %{code: "UnableToSetPolicies"}}
end
end
end

0 comments on commit 5192428

Please sign in to comment.