diff --git a/.dialyzer_ignore.exs b/.dialyzer_ignore.exs new file mode 100644 index 0000000..020e137 --- /dev/null +++ b/.dialyzer_ignore.exs @@ -0,0 +1,3 @@ +[ + {"lib/observer_web/macros.ex", :unknown_function} +] diff --git a/CHANGELOG.md b/CHANGELOG.md index ee8d6ea..a909e13 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ ### Enhancements * [[`PR-6`](https://github.com/thiagoesteves/observer_web/pull/6)] Adjusted Observer Web Font + * [[`PR-7`](https://github.com/thiagoesteves/observer_web/pull/7)] Adding Live Metrics in Oberver Web for capturing the VM statistics using OTP distribution # Previous Releases * [0.1.3 🚀 (2025-02-08)](https://github.com/thiagoesteves/observer_web/blob/v0.1.3/CHANGELOG.md) diff --git a/README.md b/README.md index be8a407..011d11c 100644 --- a/README.md +++ b/README.md @@ -2,9 +2,10 @@ # Observer Web -Observer Web is an easy-to-use tool that integrates into your application to provide observability. -It relies on the OTP distribution to offer tracing using the [Erlang debugger][edb], as well as -Process/Port status and details. +Observer Web is an easy-to-use tool that integrates into your application, providing +enhanced observability. Leveraging OTP distribution, it offers tracing through the +[Erlang debugger][edb], along with detailed insights into process/port statuses +and Beam VM statistics. Powered by [Phoenix LiveView][liv], it is distributed, lightweight, and fully real-time. This library is part of the [DeployEx][dye] project. diff --git a/assets/js/app.js b/assets/js/app.js index 64d71eb..9e29347 100644 --- a/assets/js/app.js +++ b/assets/js/app.js @@ -69,6 +69,48 @@ hooks.ObserverEChart = { } } +hooks.LiveMetricsEChart = { + mounted() { + selector = "#" + this.el.id + + const dataConfig = JSON.parse(this.el.dataset.config) + const columns = JSON.parse(this.el.dataset.columns) + + this.chart = echarts.init(this.el.querySelector(selector + "-chart")) + this.chart.setOption(dataConfig) + this.graph_cols = columns + }, + updated() { + const dataConfig = JSON.parse(this.el.dataset.config) + const reset = JSON.parse(this.el.dataset.reset) + const columns = JSON.parse(this.el.dataset.columns) + + if (reset) { + this.chart.setOption(dataConfig) + + } else { + var option = this.chart.getOption(); + var updatedXAxis = option.xAxis[0].data.concat(dataConfig.xAxis.data); + var updatedSeries = option.series.map((series, index) => { + // Concatenate the corresponding dataset to each series + return { + data: series.data.concat(dataConfig.series[index] ? dataConfig.series[index].data : []) + }; + }); + + this.chart.setOption( + { + xAxis: { data: updatedXAxis }, + series: updatedSeries + }) + } + if (columns != this.columns) { + this.chart.resize() + this.columns = columns + } + } +} + const liveSocket = new LiveSocket(livePath, Socket, { transport: liveTran === "longpoll" ? LongPoll : WebSocket, params: { _csrf_token: csrfToken }, diff --git a/config/config.exs b/config/config.exs index 8ef3c6f..c3b461d 100644 --- a/config/config.exs +++ b/config/config.exs @@ -30,6 +30,10 @@ if config_env() == :dev do ), cd: Path.expand("../assets", __DIR__) ] + + config :observer_web, ObserverWeb.Telemetry, + adapter: ObserverWeb.Telemetry.Consumer, + data_retention_period: :timer.minutes(15) end # Configures Elixir's Logger @@ -37,6 +41,3 @@ config :logger, level: :warning config :logger, :console, format: "[$level] $message\n" config :phoenix, stacktrace_depth: 20 - -# Rpc Adapter -config :observer_web, ObserverWeb.Rpc, adapter: ObserverWeb.Rpc.Local diff --git a/coveralls.json b/coveralls.json index 21a8cb8..447168d 100644 --- a/coveralls.json +++ b/coveralls.json @@ -5,6 +5,8 @@ "lib/web/components/core.ex", "lib/observer_web/rpc/adapter.ex", "lib/observer_web/rpc/local.ex", + "lib/observer_web/telemetry/adapter.ex", + "lib/observer_web/macros.ex", "deps/*", "test/*" ], diff --git a/dev.exs b/dev.exs index 08ed34d..c6188d4 100644 --- a/dev.exs +++ b/dev.exs @@ -54,7 +54,6 @@ Application.put_env(:observer_web, WebDev.Endpoint, debug_errors: true, http: [port: port], live_view: [signing_salt: "eX7TFPY6Y/+XQ1o2pOUW3DjgAoXGTAdX"], - pubsub_server: WebDev.PubSub, render_errors: [formats: [html: WebDev.ErrorHTML], layout: false], secret_key_base: "jAu3udxm+8tIRDXLLKo+EupAlEvdLsnNG82O8e9nqylpBM9gP8AjUnZ4PWNttztU", url: [host: "localhost"], @@ -76,7 +75,6 @@ Application.put_env(:phoenix, :persistent, true) Task.async(fn -> children = [ - {Phoenix.PubSub, [name: WebDev.PubSub, adapter: Phoenix.PubSub.PG2]}, {WebDev.Endpoint, []} ] diff --git a/guides/installation.md b/guides/installation.md index 5f8f04a..291e919 100644 --- a/guides/installation.md +++ b/guides/installation.md @@ -54,6 +54,23 @@ After you've verified that the dashboard is loading you'll probably want to rest dashboard via authentication, either with a [custom resolver's][ac] access controls or [Basic Auth][ba]. +### Retention period for metrics + +The Observer Web can monitor Beam VM metrics by default, using ETS tables to store the data. +However, this means that the data is not persisted across restarts. The retention period +for this data can be configured. + +By default, without a retention time set, the metrics will only show data received during the +current session. If you'd like to persist this data for a longer period, you can configure +a retention time. + +To configure the retention period, use the following optional setting: + +```elixir +config :observer_web, ObserverWeb.Telemetry, + data_retention_period: :timer.minutes(5) +``` + ### Usage with Web and Clustering The Observer Web provides observer ability for the local application as well as any other that is diff --git a/guides/overview.md b/guides/overview.md index 1e05aa2..3dda479 100644 --- a/guides/overview.md +++ b/guides/overview.md @@ -1,8 +1,9 @@ # Overview -Observer Web is an easy-to-use tool that integrates into your application to provide observability. -It relies on the OTP distribution to offer tracing using the [Erlang debugger][edb], as well as -Process/Port status and details. +Observer Web is an easy-to-use tool that integrates into your application, providing +enhanced observability. Leveraging OTP distribution, it offers tracing through the +[Erlang debugger][edb], along with detailed insights into process/port statuses +and Beam VM statistics. Powered by [Phoenix LiveView][liv], it is distributed, lightweight, and fully real-time. This library is part of the [DeployEx][dye] project. @@ -22,6 +23,9 @@ and also function callers, as many other possibilities. - **🔬 Process/Port Inspection** - View processes and ports details as well as their status and connectivity (and much more). +- **📊 Realtime VM Metrics** - - **📊 Realtime VM Metrics** - Powered by ets table and OTP +distribution, vm memory statistics are stored and easily filtered. + ## Installation See the [installation guide](installation.md) for details on installing and configuring Observer Web diff --git a/guides/static/dashboard.png b/guides/static/dashboard.png index 9d9d8b8..ace25a1 100644 Binary files a/guides/static/dashboard.png and b/guides/static/dashboard.png differ diff --git a/lib/observer_web/application.ex b/lib/observer_web/application.ex index 4c02673..74a055b 100644 --- a/lib/observer_web/application.ex +++ b/lib/observer_web/application.ex @@ -5,12 +5,29 @@ defmodule ObserverWeb.Application do use Application + import ObserverWeb.Macros + @impl true def start(_type, _args) do - children = [ObserverWeb.Tracer.Server] + children = + [ + {Phoenix.PubSub, [name: ObserverWeb.PubSub]}, + ObserverWeb.Tracer.Server + ] ++ telemetry_servers() # # See https://hexdocs.pm/elixir/Supervisor.html # # for other strategies and supported options Supervisor.start_link(children, strategy: :one_for_one, name: __MODULE__) end + + # NOTE: DO NOT start these servers when running tests. + if_not_test do + defp telemetry_servers, + do: [ + ObserverWeb.Telemetry.Consumer, + ObserverWeb.Telemetry.VmData + ] + else + defp telemetry_servers, do: [] + end end diff --git a/lib/observer_web/macros.ex b/lib/observer_web/macros.ex new file mode 100644 index 0000000..832665e --- /dev/null +++ b/lib/observer_web/macros.ex @@ -0,0 +1,49 @@ +defmodule ObserverWeb.Macros do + @moduledoc """ + This file contains common macros + """ + + @doc """ + Compiler macro that will only add the do block code if it is not a test build + (prod or dev) and will run the else block of code if it is a test build + + Note: the else block is optional + + ## Example: + import ObserverWeb.Macros + + if_not_test do + @msg "I am NOT a test build" + else + @msg "I am a test build" + end + + """ + @spec if_not_test([{:do, any} | {:else, any}, ...]) :: any + defmacro if_not_test(do: tBlock, else: fBlock) do + case Mix.env() do + # If this is a dev block + :test -> + if nil != fBlock do + quote do + unquote(fBlock) + end + end + + # otherwise go with the alternative + _ -> + quote do + unquote(tBlock) + end + end + end + + defmacro if_not_test(do: tBlock) do + if :test != Mix.env() do + # If this is a dev block + quote do + unquote(tBlock) + end + end + end +end diff --git a/lib/observer_web/telemetry.ex b/lib/observer_web/telemetry.ex new file mode 100644 index 0000000..7418212 --- /dev/null +++ b/lib/observer_web/telemetry.ex @@ -0,0 +1,75 @@ +defmodule ObserverWeb.Telemetry do + @moduledoc """ + This module will provide telemetry abstraction + """ + + @behaviour ObserverWeb.Telemetry.Adapter + + defmodule Data do + @moduledoc """ + Structure to handle the telemetry event + """ + @type t :: %__MODULE__{ + timestamp: non_neg_integer(), + value: integer() | float(), + unit: String.t(), + tags: map(), + measurements: map() + } + + defstruct timestamp: nil, + value: "", + unit: "", + tags: %{}, + measurements: %{} + end + + ### ========================================================================== + ### Public functions + ### ========================================================================== + + @doc """ + This function pushes events to the Telemetry module + """ + # coveralls-ignore-start + @spec push_data(any()) :: :ok + def push_data(event), do: default().push_data(event) + # coveralls-ignore-stop + + @doc """ + Subscribe for new keys notifications + """ + @spec subscribe_for_new_keys() :: :ok | {:error, term} + def subscribe_for_new_keys, do: default().subscribe_for_new_keys() + + @doc """ + Subscribe for new data notifications for the respective node/key + """ + @spec subscribe_for_new_data(String.t(), String.t()) :: :ok | {:error, term} + def subscribe_for_new_data(node, key), do: default().subscribe_for_new_data(node, key) + + @doc """ + Unsubscribe for new data notifications for the respective node/key + """ + @spec unsubscribe_for_new_data(String.t(), String.t()) :: :ok + def unsubscribe_for_new_data(node, key), do: default().unsubscribe_for_new_data(node, key) + + @doc """ + Fetch data by node and key + """ + @spec list_data_by_node_key(atom() | String.t(), String.t(), Keyword.t()) :: list() + def list_data_by_node_key(node, key, options), + do: default().list_data_by_node_key(node, key, options) + + @doc """ + List all keys registered for the respective node + """ + @spec get_keys_by_node(atom()) :: list() + def get_keys_by_node(node), do: default().get_keys_by_node(node) + + ### ========================================================================== + ### Private functions + ### ========================================================================== + defp default, + do: Application.get_env(:observer_web, __MODULE__)[:adapter] || ObserverWeb.Telemetry.Consumer +end diff --git a/lib/observer_web/telemetry/adapter.ex b/lib/observer_web/telemetry/adapter.ex new file mode 100644 index 0000000..f5cd4dc --- /dev/null +++ b/lib/observer_web/telemetry/adapter.ex @@ -0,0 +1,12 @@ +defmodule ObserverWeb.Telemetry.Adapter do + @moduledoc """ + Behaviour that defines the telemetry adapter callback + """ + + @callback push_data(any()) :: :ok + @callback subscribe_for_new_keys() :: :ok | {:error, term} + @callback subscribe_for_new_data(String.t(), String.t()) :: :ok | {:error, term} + @callback unsubscribe_for_new_data(String.t(), String.t()) :: :ok + @callback list_data_by_node_key(atom() | String.t(), String.t(), Keyword.t()) :: list() + @callback get_keys_by_node(atom()) :: list() +end diff --git a/lib/observer_web/telemetry/consumer.ex b/lib/observer_web/telemetry/consumer.ex new file mode 100644 index 0000000..b822ad9 --- /dev/null +++ b/lib/observer_web/telemetry/consumer.ex @@ -0,0 +1,222 @@ +defmodule ObserverWeb.Telemetry.Consumer do + @moduledoc """ + GenServer that collects the telemetry data received + """ + use GenServer + + import ObserverWeb.Macros + + alias ObserverWeb.Rpc + + @behaviour ObserverWeb.Telemetry.Adapter + + @metric_keys "metric-keys" + @metric_table :observer_web_metrics + + @one_minute_in_milliseconds 60_000 + @retention_data_delete_interval :timer.minutes(1) + + ### ========================================================================== + ### Callback functions + ### ========================================================================== + + @spec start_link(any()) :: :ignore | {:error, any()} | {:ok, pid()} + def start_link(args) do + GenServer.start_link(__MODULE__, args, name: __MODULE__) + end + + @impl true + def init(_args) do + node = Node.self() + + # Create metric tables for the node + :ets.new(@metric_table, [:set, :protected, :named_table]) + :ets.insert(@metric_table, {@metric_keys, []}) + + persist_data? = + if data_retention_period() do + :timer.send_interval(@retention_data_delete_interval, :prune_expired_entries) + true + else + false + end + + {:ok, %{node: node, persist_data?: persist_data?}} + end + + @impl true + def handle_cast( + {:observer_web_telemetry, + %{metrics: metrics, reporter: reporter, measurements: measurements}}, + %{node: node, persist_data?: persist_data?} = state + ) + when reporter in [node] do + now = System.os_time(:millisecond) + minute = unix_to_minutes(now) + + keys = get_keys_by_node(reporter) + + new_keys = + Enum.reduce(metrics, [], fn metric, acc -> + {key, timed_key, data} = build_telemetry_data(metric, measurements, now, minute) + + # credo:disable-for-lines:3 + if persist_data? do + current_data = + case :ets.lookup(@metric_table, timed_key) do + [{_, current_list_data}] -> [data | current_list_data] + _ -> [data] + end + + :ets.insert(@metric_table, {timed_key, current_data}) + end + + Phoenix.PubSub.broadcast( + ObserverWeb.PubSub, + metrics_topic(reporter, key), + {:metrics_new_data, reporter, key, data} + ) + + if key in keys do + acc + else + [key | acc] + end + end) + + if new_keys != [] do + :ets.insert(@metric_table, {@metric_keys, new_keys ++ keys}) + + Phoenix.PubSub.broadcast( + ObserverWeb.PubSub, + keys_topic(), + {:metrics_new_keys, reporter, new_keys} + ) + end + + {:noreply, state} + end + + @impl true + def handle_info(:prune_expired_entries, state) do + now_minutes = unix_to_minutes() + retention_period = trunc(data_retention_period() / @one_minute_in_milliseconds) + deletion_period_to = now_minutes - retention_period - 1 + deletion_period_from = deletion_period_to - 2 + + prune_keys = fn key -> + Enum.each(deletion_period_from..deletion_period_to, fn timestamp -> + :ets.delete(@metric_table, metric_key(key, timestamp)) + end) + end + + Node.self() + |> get_keys_by_node() + |> Enum.each(&prune_keys.(&1)) + + {:noreply, state} + end + + ### ========================================================================== + ### Deployex.Telemetry.Adapter implementation + ### ========================================================================== + @impl true + def push_data(event) do + GenServer.cast(__MODULE__, {:observer_web_telemetry, event}) + end + + @impl true + def subscribe_for_new_keys do + Phoenix.PubSub.subscribe(ObserverWeb.PubSub, keys_topic()) + end + + @impl true + def subscribe_for_new_data(node, key) do + Phoenix.PubSub.subscribe(ObserverWeb.PubSub, metrics_topic(node, key)) + end + + @impl true + def unsubscribe_for_new_data(node, key) do + Phoenix.PubSub.unsubscribe(ObserverWeb.PubSub, metrics_topic(node, key)) + end + + @impl true + def list_data_by_node_key(node, key, options \\ []) + + def list_data_by_node_key(node, key, options) when is_binary(node) do + node + |> String.to_existing_atom() + |> list_data_by_node_key(key, options) + end + + def list_data_by_node_key(node, key, options) when is_atom(node) do + from = Keyword.get(options, :from, 15) + order = Keyword.get(options, :order, :asc) + + now_minutes = unix_to_minutes() + from_minutes = now_minutes - from + + result = + Enum.reduce(from_minutes..now_minutes, [], fn minute, acc -> + case Rpc.call( + node, + :ets, + :lookup, + [@metric_table, metric_key(key, minute)], + :infinity + ) do + [{_, value}] -> + value ++ acc + + _ -> + acc + end + end) + + if order == :asc, do: Enum.reverse(result), else: result + end + + @impl true + def get_keys_by_node(nil), do: [] + + def get_keys_by_node(node) do + case Rpc.call(node, :ets, :lookup, [@metric_table, @metric_keys], :infinity) do + [{_, value}] -> + value + + # coveralls-ignore-start + _ -> + [] + # coveralls-ignore-stop + end + end + + ### ========================================================================== + ### Private functions + ### ========================================================================== + if_not_test do + defp data_retention_period, + do: Application.get_env(:observer_web, ObserverWeb.Telemetry)[:data_retention_period] + else + defp data_retention_period, do: :timer.minutes(1) + end + + defp metric_key(metric, timestamp), do: "#{metric}|#{timestamp}" + + defp unix_to_minutes(time \\ System.os_time(:millisecond)), + do: trunc(time / @one_minute_in_milliseconds) + + defp keys_topic, do: "metrics::keys" + defp metrics_topic(node, key), do: "metrics::#{node}::#{key}" + + defp build_telemetry_data(%{name: name} = metric, measurements, now, minute) do + {name, metric_key(name, minute), + %ObserverWeb.Telemetry.Data{ + timestamp: now, + value: metric.value, + unit: metric.unit, + tags: metric.tags, + measurements: measurements + }} + end +end diff --git a/lib/observer_web/telemetry/vm_data.ex b/lib/observer_web/telemetry/vm_data.ex new file mode 100644 index 0000000..b59b075 --- /dev/null +++ b/lib/observer_web/telemetry/vm_data.ex @@ -0,0 +1,98 @@ +defmodule ObserverWeb.Telemetry.VmData do + @moduledoc """ + GenServer that collects the vm metrics and produce its statistics + """ + use GenServer + + @vm_memory_interval :timer.seconds(5) + + ### ========================================================================== + ### Callback functions + ### ========================================================================== + + @spec start_link(any()) :: :ignore | {:error, any()} | {:ok, pid()} + def start_link(args) do + GenServer.start_link(__MODULE__, args, name: __MODULE__) + end + + @impl true + def init(args) do + args + |> Keyword.get(:vm_memory_interval, @vm_memory_interval) + |> :timer.send_interval(:collect_vm_metrics) + + {:ok, %{}} + end + + @impl true + def handle_info(:collect_vm_metrics, state) do + measurements = Enum.into(:erlang.memory(), %{}) + total_run_queue = :erlang.statistics(:total_run_queue_lengths_all) + cpu_run_queue = :erlang.statistics(:total_run_queue_lengths) + io_run_queue = total_run_queue - cpu_run_queue + + reporter = Node.self() + + [ + %{ + metrics: [ + %{ + name: "vm.memory.total", + value: measurements.total / 1_000, + unit: " kilobyte", + info: "", + tags: %{}, + type: "summary" + } + ], + reporter: reporter, + measurements: measurements + }, + %{ + metrics: [ + %{ + name: "vm.total_run_queue_lengths.total", + value: total_run_queue, + unit: " kilobyte", + info: "", + tags: %{}, + type: "summary" + } + ], + reporter: reporter, + measurements: %{} + }, + %{ + metrics: [ + %{ + name: "vm.total_run_queue_lengths.cpu", + value: cpu_run_queue, + unit: " kilobyte", + info: "", + tags: %{}, + type: "summary" + } + ], + reporter: reporter, + measurements: %{} + }, + %{ + metrics: [ + %{ + name: "vm.total_run_queue_lengths.io", + value: io_run_queue, + unit: " kilobyte", + info: "", + tags: %{}, + type: "summary" + } + ], + reporter: reporter, + measurements: %{} + } + ] + |> Enum.each(&ObserverWeb.Telemetry.push_data(&1)) + + {:noreply, state} + end +end diff --git a/lib/web.ex b/lib/web.ex index a7c000e..8054b28 100644 --- a/lib/web.ex +++ b/lib/web.ex @@ -18,6 +18,7 @@ defmodule Observer.Web do use Phoenix.LiveComponent unquote(html_helpers()) + defguard is_connected?(socket) when socket.transport_pid != nil end end diff --git a/lib/web/components/attention.ex b/lib/web/components/attention.ex index e73f888..8c86f15 100644 --- a/lib/web/components/attention.ex +++ b/lib/web/components/attention.ex @@ -14,7 +14,7 @@ defmodule Observer.Web.Components.Attention do