diff --git a/.dialyzer_ignore.exs b/.dialyzer_ignore.exs
index 020e137..2214525 100644
--- a/.dialyzer_ignore.exs
+++ b/.dialyzer_ignore.exs
@@ -1,3 +1,4 @@
[
- {"lib/observer_web/macros.ex", :unknown_function}
+ {"lib/observer_web/macros.ex", :unknown_function},
+ {"lib/observer_web/monitor/process_port.ex", :pattern_match}
]
diff --git a/lib/observer_web/application.ex b/lib/observer_web/application.ex
index c2ef04c..504b45a 100644
--- a/lib/observer_web/application.ex
+++ b/lib/observer_web/application.ex
@@ -13,6 +13,7 @@ defmodule ObserverWeb.Application do
[
Observer.Web.Telemetry,
ObserverWeb.Tracer.Server,
+ ObserverWeb.Monitor.ProcessPort,
{Phoenix.PubSub, [name: ObserverWeb.PubSub]}
] ++ telemetry_servers()
diff --git a/lib/observer_web/apps/port.ex b/lib/observer_web/apps/port.ex
index 9789a8a..9d85a9c 100644
--- a/lib/observer_web/apps/port.ex
+++ b/lib/observer_web/apps/port.ex
@@ -5,6 +5,16 @@ defmodule ObserverWeb.Apps.Port do
alias ObserverWeb.Rpc
+ @type t :: %{
+ name: charlist() | String.t(),
+ id: non_neg_integer(),
+ connected: pid(),
+ os_pid: non_neg_integer() | :undefined,
+ memory: non_neg_integer()
+ }
+
+ defstruct [:name, :id, :connected, :os_pid, :memory]
+
@doc """
Return port information
@@ -12,24 +22,25 @@ defmodule ObserverWeb.Apps.Port do
iex> alias ObserverWeb.Observer.Port
...> [h | _] = :erlang.ports()
- ...> assert %{connected: _, id: _, name: _, os_pid: _} = Port.info(h)
+ ...> assert %{connected: _, id: _, name: _, os_pid: _, memory: _} = Port.info(h)
...> assert :undefined = Port.info(nil)
...> assert :undefined = Port.info("")
"""
- @spec info(atom(), port()) ::
- :undefined | %{connected: any(), id: any(), name: any(), os_pid: any()}
+ @spec info(node :: atom(), port :: port()) :: :undefined | ObserverWeb.Apps.Port.t()
def info(node \\ Node.self(), port)
def info(node, port) when is_port(port) do
- case Rpc.call(node, :erlang, :port_info, [port], :infinity) do
- data when is_list(data) ->
- %{
- name: Keyword.get(data, :name, 0),
- id: Keyword.get(data, :id, 0),
- connected: Keyword.get(data, :connected, 0),
- os_pid: Keyword.get(data, :os_pid, 0)
- }
-
+ with data <- Rpc.call(node, :erlang, :port_info, [port], :infinity),
+ true <- is_list(data),
+ {:memory, memory} <- Rpc.call(node, :erlang, :port_info, [port, :memory], :infinity) do
+ %__MODULE__{
+ name: Keyword.get(data, :name, 0),
+ id: Keyword.get(data, :id, 0),
+ connected: Keyword.get(data, :connected, 0),
+ os_pid: Keyword.get(data, :os_pid, 0),
+ memory: memory
+ }
+ else
_ ->
:undefined
end
diff --git a/lib/observer_web/apps/process.ex b/lib/observer_web/apps/process.ex
index 26a67d3..5e670cc 100644
--- a/lib/observer_web/apps/process.ex
+++ b/lib/observer_web/apps/process.ex
@@ -11,6 +11,54 @@ defmodule ObserverWeb.Apps.Process do
@default_get_state_timeout 100
+ @type t :: %{
+ pid: pid(),
+ registered_name: atom() | nil,
+ priority: :low | :normal | :high | :max,
+ trap_exit: boolean(),
+ message_queue_len: non_neg_integer(),
+ error_handler: module() | :none,
+ relations: %{
+ group_leader: pid() | nil,
+ ancestors: [pid()],
+ links: [pid()] | nil,
+ monitored_by: [pid()] | nil,
+ monitors: [pid() | {module(), term()}] | nil
+ },
+ memory: %{
+ total: non_neg_integer(),
+ stack_and_heap: non_neg_integer(),
+ heap_size: non_neg_integer(),
+ stack_size: non_neg_integer(),
+ gc_min_heap_size: non_neg_integer(),
+ gc_full_sweep_after: non_neg_integer()
+ },
+ meta: %{
+ init: String.t(),
+ current: String.t(),
+ status: :running | :waiting | :exiting | :garbage_collecting | :suspended | :runnable,
+ class: :supervisor | :application | :unknown | atom()
+ },
+ state: String.t(),
+ dictionary: keyword() | nil,
+ phx_lv_socket: Phoenix.LiveView.Socket.t() | nil
+ }
+
+ defstruct [
+ :pid,
+ :registered_name,
+ :priority,
+ :trap_exit,
+ :message_queue_len,
+ :error_handler,
+ :relations,
+ :memory,
+ :meta,
+ :state,
+ :dictionary,
+ :phx_lv_socket
+ ]
+
@process_full [
:registered_name,
:priority,
@@ -36,7 +84,8 @@ defmodule ObserverWeb.Apps.Process do
@doc """
Creates a complete overview of process stats based on the given `pid`.
"""
- @spec info(pid :: pid(), timeout :: non_neg_integer()) :: :undefined | map
+ @spec info(pid :: pid(), timeout :: non_neg_integer()) ::
+ :undefined | ObserverWeb.Apps.Process.t()
def info(pid, timeout \\ @default_get_state_timeout) do
process_info(pid, @process_full, &structure_full/3, timeout)
end
@@ -119,7 +168,7 @@ defmodule ObserverWeb.Apps.Process do
end
end
- %{
+ %__MODULE__{
pid: pid,
registered_name: Keyword.get(data, :registered_name, nil),
priority: Keyword.get(data, :priority, :normal),
diff --git a/lib/observer_web/monitor.ex b/lib/observer_web/monitor.ex
new file mode 100644
index 0000000..4c83d1a
--- /dev/null
+++ b/lib/observer_web/monitor.ex
@@ -0,0 +1,35 @@
+defmodule ObserverWeb.Monitor do
+ @moduledoc """
+ This module will provide process/port monitor context
+ """
+
+ alias ObserverWeb.Monitor.ProcessPort
+
+ ### ==========================================================================
+ ### Public functions
+ ### ==========================================================================
+
+ @doc """
+ Starts monitoring a process/port memory usage
+ """
+ @spec start_id_monitor(pid_or_port :: pid() | port()) ::
+ {:ok, ObserverWeb.Monitor.ProcessPort.t()}
+ def start_id_monitor(pid_or_port), do: ProcessPort.start_id_monitor(pid_or_port)
+
+ @doc """
+ Stops monitoring a process/port memory usage
+ """
+ @spec stop_id_monitor(pid_or_port :: pid() | port()) :: :ok
+ def stop_id_monitor(pid_or_port), do: ProcessPort.stop_id_monitor(pid_or_port)
+
+ @doc """
+ Checks if memory monitoring is enabled for a process/port
+ """
+ @spec id_info(pid_or_port :: pid() | port()) ::
+ {:ok, ObserverWeb.Monitor.ProcessPort.t()} | {:error, :not_found | :rescued}
+ def id_info(pid_or_port), do: ProcessPort.id_info(pid_or_port)
+
+ ### ==========================================================================
+ ### Private functions
+ ### ==========================================================================
+end
diff --git a/lib/observer_web/monitor/process_port.ex b/lib/observer_web/monitor/process_port.ex
new file mode 100644
index 0000000..a38ef34
--- /dev/null
+++ b/lib/observer_web/monitor/process_port.ex
@@ -0,0 +1,221 @@
+defmodule ObserverWeb.Monitor.ProcessPort do
+ @moduledoc """
+ This module contains the reporting functions for Beam VM
+ """
+
+ import Telemetry.Metrics
+
+ use GenServer
+
+ alias ObserverWeb.Apps
+ alias ObserverWeb.Telemetry.Consumer
+
+ @default_poll_interval 1_000
+
+ @type t :: %__MODULE__{
+ event: String.t(),
+ metric: String.t(),
+ metric_summary: map(),
+ atomized_id: atom()
+ }
+
+ defstruct [:event, :metric, :metric_summary, :atomized_id]
+
+ ### ==========================================================================
+ ### Callback functions
+ ### ==========================================================================
+
+ @spec start_link(any()) :: :ignore | {:error, any()} | {:ok, pid()}
+ def start_link(args) do
+ GenServer.start_link(__MODULE__, args, name: __MODULE__)
+ end
+
+ @impl true
+ def init(_args) do
+ poll_interval =
+ Application.get_env(:observer_web, :beam_vm_poller_interval_ms) || @default_poll_interval
+
+ :timer.send_interval(poll_interval, :refresh_metrics)
+
+ {:ok, %{}}
+ end
+
+ @impl true
+ def handle_call({:id_info, pid_or_port}, _from, monitored_ids) do
+ case monitored_ids[pid_or_port] do
+ nil ->
+ {:reply, {:error, :not_found}, monitored_ids}
+
+ info ->
+ {:reply, {:ok, info}, monitored_ids}
+ end
+ end
+
+ def handle_call({:attach_id, pid_or_port}, _from, monitored_ids) do
+ case monitored_ids[pid_or_port] do
+ nil ->
+ id_info = id_to_struct(pid_or_port)
+
+ :telemetry.attach(
+ {__MODULE__, id_info.event, self()},
+ id_info.event,
+ &Consumer.handle_event/4,
+ {[id_info.metric_summary], nil, Node.self()}
+ )
+
+ {:reply, {:ok, id_info}, Map.put(monitored_ids, pid_or_port, id_info)}
+
+ info ->
+ # Already in the list of monitored ids
+ {:reply, {:ok, info}, monitored_ids}
+ end
+ end
+
+ def handle_call({:detach_id, pid_or_port}, _from, monitored_ids) do
+ case monitored_ids[pid_or_port] do
+ nil ->
+ {:reply, :ok, monitored_ids}
+
+ info ->
+ # Add a gap in the chart to indicate that data sending was disabled
+ :telemetry.execute(info.event, empty_memory_data(pid_or_port), %{})
+
+ # Detach the telemetry handler
+ :telemetry.detach({__MODULE__, info.event, self()})
+
+ {:reply, :ok, Map.drop(monitored_ids, [pid_or_port])}
+ end
+ end
+
+ @impl true
+ def handle_info(:refresh_metrics, monitored_ids) do
+ updated_monitored_ids =
+ Enum.reduce(monitored_ids, %{}, fn
+ {pid, process_info}, acc when is_pid(pid) ->
+ case Apps.Process.info(pid) do
+ %Apps.Process{memory: memory} ->
+ :telemetry.execute(
+ process_info.event,
+ Map.merge(empty_memory_data(pid), memory),
+ %{}
+ )
+
+ Map.put(acc, pid, process_info)
+
+ :undefined ->
+ # NOTE: The process is either dead or not available (remove)
+ :telemetry.detach({__MODULE__, process_info.event, self()})
+
+ acc
+ end
+
+ {port, port_info}, acc when is_port(port) ->
+ case Apps.Port.info(port) do
+ %Apps.Port{memory: memory} ->
+ :telemetry.execute(port_info.event, %{total: memory}, %{})
+
+ Map.put(acc, port, port_info)
+
+ :undefined ->
+ # NOTE: The port is either dead or not available (remove)
+ :telemetry.detach({__MODULE__, port_info.event, self()})
+
+ acc
+ end
+ end)
+
+ {:noreply, updated_monitored_ids}
+ end
+
+ ### ==========================================================================
+ ### Public APIs
+ ### ==========================================================================
+
+ @spec start_id_monitor(pid_or_port :: pid() | port()) :: {:ok, __MODULE__.t()}
+ def start_id_monitor(pid_or_port) do
+ target_node = node(pid_or_port)
+ GenServer.call({__MODULE__, target_node}, {:attach_id, pid_or_port})
+ end
+
+ @spec stop_id_monitor(pid_or_port :: pid() | port()) :: :ok
+ def stop_id_monitor(pid_or_port) do
+ target_node = node(pid_or_port)
+
+ try do
+ GenServer.call({__MODULE__, target_node}, {:detach_id, pid_or_port})
+ catch
+ _, _ ->
+ :ok
+ end
+ end
+
+ @spec id_info(pid_or_port :: pid() | port()) :: {:ok, map} | {:error, :not_found | :rescued}
+ def id_info(pid_or_port) do
+ target_node = node(pid_or_port)
+
+ try do
+ GenServer.call({__MODULE__, target_node}, {:id_info, pid_or_port})
+ catch
+ _, _ ->
+ {:error, :rescued}
+ end
+ end
+
+ ### ==========================================================================
+ ### Private Functions
+ ### ==========================================================================
+ defp empty_memory_data(pid) when is_pid(pid) do
+ %{
+ total: nil,
+ stack_and_heap: nil,
+ heap_size: nil,
+ stack_size: nil,
+ gc_min_heap_size: nil,
+ gc_full_sweep_after: nil
+ }
+ end
+
+ defp empty_memory_data(port) when is_port(port), do: %{total: nil}
+
+ defp id_to_struct(pid) when is_pid(pid) do
+ atomized_id =
+ pid
+ |> inspect()
+ |> String.replace(["#PID<"], "pid_")
+ |> String.replace(["."], "_")
+ |> String.replace([">"], "")
+ |> String.to_atom()
+
+ event = [:vm, :process, :memory, atomized_id]
+ metric = (event ++ [:total]) |> Enum.join(".")
+ metric_summary = summary(metric)
+
+ %__MODULE__{
+ event: event,
+ metric: metric,
+ metric_summary: metric_summary,
+ atomized_id: atomized_id
+ }
+ end
+
+ defp id_to_struct(port) when is_port(port) do
+ atomized_id =
+ port
+ |> inspect()
+ |> String.replace(["#Port<"], "port_")
+ |> String.replace(["."], "_")
+ |> String.replace([">"], "")
+ |> String.to_atom()
+
+ event = [:vm, :port, :memory, atomized_id]
+ metric = (event ++ [:total]) |> Enum.join(".")
+ metric_summary = summary(metric)
+
+ %__MODULE__{
+ event: event,
+ metric: metric,
+ metric_summary: metric_summary,
+ atomized_id: atomized_id
+ }
+ end
+end
diff --git a/lib/observer_web/telemetry/consumer.ex b/lib/observer_web/telemetry/consumer.ex
index c526e56..79fe98d 100644
--- a/lib/observer_web/telemetry/consumer.ex
+++ b/lib/observer_web/telemetry/consumer.ex
@@ -108,9 +108,6 @@ defmodule ObserverWeb.Telemetry.Consumer do
|> add_phoenix_tags(metadata)
cond do
- is_nil(measurement) ->
- acc
-
not keep?(metric, metadata) ->
acc
diff --git a/lib/web/components/metrics/vm_port_memory.ex b/lib/web/components/metrics/vm_port_memory.ex
new file mode 100644
index 0000000..32de912
--- /dev/null
+++ b/lib/web/components/metrics/vm_port_memory.ex
@@ -0,0 +1,127 @@
+defmodule Observer.Web.Components.Metrics.VmPortMemory do
+ @moduledoc false
+ use Observer.Web, :html
+
+ alias Observer.Web.Components.Metrics.Common
+ alias Observer.Web.Helpers
+
+ attr :title, :string, required: true
+ attr :service, :string, required: true
+ attr :metric, :string, required: true
+ attr :metrics, :list, required: true
+ attr :cols, :integer, default: 2
+
+ def content(assigns) do
+ ~H"""
+
+ <% id = Helpers.normalize_id("#{@service}-#{@metric}") %>
+
+
+
+ <% metrics = Enum.map(@metrics.inserts, fn {_id, _index, data, _} -> data end) %>
+ <% normalized_metrics = normalize(metrics) %>
+ <% echart_config = config(normalized_metrics) %>
+
+
+
+
+ """
+ end
+
+ defp port_regex, do: ~r/^vm\.port\.memory\..+\.total$/
+
+ # NOTE: Streams are retrieved in the reverse order
+ defp normalize(metrics) do
+ empty_series_data = %{
+ total: []
+ }
+
+ {series_data, categories_data} =
+ Enum.reduce(metrics, {empty_series_data, []}, fn
+ %ObserverWeb.Telemetry.Data{value: nil} = metric, {series_data, categories_data} ->
+ timestamp = Common.timestamp_to_string(metric.timestamp)
+
+ {%{
+ total: [nil] ++ series_data.total
+ }, [timestamp] ++ categories_data}
+
+ metric, {series_data, categories_data} ->
+ timestamp = Common.timestamp_to_string(metric.timestamp)
+
+ {%{
+ total: [metric.measurements.total] ++ series_data.total
+ }, [timestamp] ++ categories_data}
+ end)
+
+ datasets =
+ [
+ %{
+ name: "Total",
+ type: "line",
+ data: series_data.total
+ }
+ ]
+
+ %{
+ datasets: datasets,
+ categories: categories_data
+ }
+ end
+
+ defp config(%{datasets: datasets, categories: categories}) do
+ %{
+ tooltip: %{
+ trigger: "axis"
+ },
+ legend: %{
+ data: [
+ "Total"
+ ],
+ right: "25%"
+ },
+ grid: %{
+ left: "3%",
+ right: "4%",
+ bottom: "3%",
+ top: "30%",
+ containLabel: true
+ },
+ toolbox: %{
+ feature: %{
+ dataZoom: %{},
+ dataView: %{},
+ saveAsImage: %{}
+ }
+ },
+ yAxis: %{
+ type: "value",
+ axisLabel: %{
+ formatter: "{value} bytes"
+ }
+ },
+ series: datasets,
+ xAxis: %{
+ type: "category",
+ boundaryGap: false,
+ data: categories
+ }
+ }
+ end
+end
diff --git a/lib/web/components/metrics/vm_process_memory.ex b/lib/web/components/metrics/vm_process_memory.ex
new file mode 100644
index 0000000..0b627d4
--- /dev/null
+++ b/lib/web/components/metrics/vm_process_memory.ex
@@ -0,0 +1,174 @@
+defmodule Observer.Web.Components.Metrics.VmProcessMemory do
+ @moduledoc false
+ use Observer.Web, :html
+
+ alias Observer.Web.Components.Metrics.Common
+ alias Observer.Web.Helpers
+
+ attr :title, :string, required: true
+ attr :service, :string, required: true
+ attr :metric, :string, required: true
+ attr :metrics, :list, required: true
+ attr :cols, :integer, default: 2
+
+ def content(assigns) do
+ ~H"""
+
+ <% id = Helpers.normalize_id("#{@service}-#{@metric}") %>
+
+
+
+ <% metrics = Enum.map(@metrics.inserts, fn {_id, _index, data, _} -> data end) %>
+ <% normalized_metrics = normalize(metrics) %>
+ <% echart_config = config(normalized_metrics) %>
+
+
+
+
+ """
+ end
+
+ defp process_regex, do: ~r/^vm\.process\.memory\..+\.total$/
+
+ # NOTE: Streams are retrieved in the reverse order
+ defp normalize(metrics) do
+ empty_series_data = %{
+ total: [],
+ stack_and_heap: [],
+ heap_size: [],
+ stack_size: [],
+ gc_min_heap_size: [],
+ gc_full_sweep_after: []
+ }
+
+ {series_data, categories_data} =
+ Enum.reduce(metrics, {empty_series_data, []}, fn
+ %ObserverWeb.Telemetry.Data{value: nil} = metric, {series_data, categories_data} ->
+ timestamp = Common.timestamp_to_string(metric.timestamp)
+
+ {%{
+ total: [nil] ++ series_data.total,
+ stack_and_heap: [nil] ++ series_data.stack_and_heap,
+ stack_size: [nil] ++ series_data.stack_size,
+ heap_size: [nil] ++ series_data.heap_size,
+ gc_min_heap_size: [nil] ++ series_data.gc_min_heap_size,
+ gc_full_sweep_after: [nil] ++ series_data.gc_full_sweep_after
+ }, [timestamp] ++ categories_data}
+
+ metric, {series_data, categories_data} ->
+ timestamp = Common.timestamp_to_string(metric.timestamp)
+
+ {%{
+ total: [metric.measurements.total] ++ series_data.total,
+ stack_and_heap: [metric.measurements.stack_and_heap] ++ series_data.stack_and_heap,
+ stack_size: [metric.measurements.stack_size] ++ series_data.stack_size,
+ heap_size: [metric.measurements.heap_size] ++ series_data.heap_size,
+ gc_min_heap_size:
+ [metric.measurements.gc_min_heap_size] ++ series_data.gc_min_heap_size,
+ gc_full_sweep_after:
+ [metric.measurements.gc_full_sweep_after] ++ series_data.gc_full_sweep_after
+ }, [timestamp] ++ categories_data}
+ end)
+
+ datasets =
+ [
+ %{
+ name: "Total",
+ type: "line",
+ data: series_data.total
+ },
+ %{
+ name: "Stack+Heap",
+ type: "line",
+ data: series_data.stack_and_heap
+ },
+ %{
+ name: "Stack",
+ type: "line",
+ data: series_data.stack_size
+ },
+ %{
+ name: "Heap",
+ type: "line",
+ data: series_data.heap_size
+ },
+ %{
+ name: "GC min heap",
+ type: "line",
+ data: series_data.gc_min_heap_size
+ },
+ %{
+ name: "GC full sweep",
+ type: "line",
+ data: series_data.gc_full_sweep_after
+ }
+ ]
+
+ %{
+ datasets: datasets,
+ categories: categories_data
+ }
+ end
+
+ defp config(%{datasets: datasets, categories: categories}) do
+ %{
+ tooltip: %{
+ trigger: "axis"
+ },
+ legend: %{
+ data: [
+ "Total",
+ "Stack+Heap",
+ "Stack",
+ "Heap",
+ "GC min heap",
+ "GC full sweep"
+ ],
+ right: "25%"
+ },
+ grid: %{
+ left: "3%",
+ right: "4%",
+ bottom: "3%",
+ top: "30%",
+ containLabel: true
+ },
+ toolbox: %{
+ feature: %{
+ dataZoom: %{},
+ dataView: %{},
+ saveAsImage: %{}
+ }
+ },
+ yAxis: %{
+ type: "value",
+ axisLabel: %{
+ formatter: "{value} bytes"
+ }
+ },
+ series: datasets,
+ xAxis: %{
+ type: "category",
+ boundaryGap: false,
+ data: categories
+ }
+ }
+ end
+end
diff --git a/lib/web/pages/apps/identifier.ex b/lib/web/pages/apps/identifier.ex
new file mode 100644
index 0000000..a28b8fe
--- /dev/null
+++ b/lib/web/pages/apps/identifier.ex
@@ -0,0 +1,26 @@
+defmodule Observer.Web.Apps.Identifier do
+ @moduledoc """
+ This module provides Identifier Structure
+ """
+
+ # NOTE: Debouncing for reading the selected process
+ @tooltip_debouncing 50
+
+ @type t :: %__MODULE__{
+ info: ObserverWeb.Apps.Process.t() | ObserverWeb.Apps.Port.t() | nil,
+ id_string: String.t() | nil,
+ type: nil,
+ debouncing: non_neg_integer(),
+ node: atom() | nil,
+ metric: String.t() | nil,
+ memory_monitor: boolean()
+ }
+
+ defstruct info: nil,
+ id_string: nil,
+ type: nil,
+ debouncing: @tooltip_debouncing,
+ node: nil,
+ metric: nil,
+ memory_monitor: false
+end
diff --git a/lib/web/pages/apps/page.ex b/lib/web/pages/apps/page.ex
index faa7044..14fe7aa 100644
--- a/lib/web/pages/apps/page.ex
+++ b/lib/web/pages/apps/page.ex
@@ -7,6 +7,7 @@ defmodule Observer.Web.Apps.Page do
use Observer.Web, :live_component
+ alias Observer.Web.Apps.Identifier
alias Observer.Web.Apps.Legend
alias Observer.Web.Apps.Port
alias Observer.Web.Apps.Process
@@ -16,8 +17,8 @@ defmodule Observer.Web.Apps.Page do
alias Observer.Web.Helpers
alias Observer.Web.Page
alias ObserverWeb.Apps
-
- @tooltip_debouncing 50
+ alias ObserverWeb.Monitor
+ alias ObserverWeb.Telemetry
@impl Phoenix.LiveComponent
def render(assigns) do
@@ -143,15 +144,26 @@ defmodule Observer.Web.Apps.Page do
{Jason.encode!(@chart_tree_data)}
+ <% data_key = data_key(@current_selected_id.node, @current_selected_id.metric) %>
<%= if @current_selected_id.type == "pid" do %>
<% else %>
-
+
<% end %>
@@ -190,12 +202,12 @@ defmodule Observer.Web.Apps.Page do
|> assign(:node_info, update_node_info())
|> assign(:node_data, %{})
|> assign(:observer_data, %{})
- |> assign(:current_selected_id, reset_current_selected_id())
+ |> assign(:current_selected_id, %Identifier{})
|> assign(form: to_form(default_form_options()))
|> assign(process_msg_form: to_form(%{"message" => ""}))
|> assign(:show_observer_options, false)
- |> assign(:process_memory_monitor, false)
|> assign(:selected_id_action_confirmation, nil)
+ |> stream(:empty, [])
end
def handle_mount(socket) do
@@ -203,12 +215,12 @@ defmodule Observer.Web.Apps.Page do
|> assign(:node_info, node_info_new())
|> assign(:node_data, %{})
|> assign(:observer_data, %{})
- |> assign(:current_selected_id, reset_current_selected_id())
+ |> assign(:current_selected_id, %Identifier{})
|> assign(form: to_form(default_form_options()))
|> assign(process_msg_form: to_form(%{"message" => ""}))
|> assign(:show_observer_options, false)
- |> assign(:process_memory_monitor, false)
|> assign(:selected_id_action_confirmation, nil)
+ |> stream(:empty, [])
end
# coveralls-ignore-start
@@ -271,7 +283,16 @@ defmodule Observer.Web.Apps.Page do
%{assigns: %{current_selected_id: current_selected_id}} = socket
) do
pid_string = current_selected_id.id_string
- true = pid_string |> Helpers.string_to_pid() |> :erlang.garbage_collect()
+ pid = Helpers.string_to_pid(pid_string)
+
+ node = node(pid)
+
+ true =
+ if node == node() do
+ :erlang.garbage_collect(pid)
+ else
+ :rpc.call(node, :erlang, :garbage_collect, [pid])
+ end
{:noreply,
socket
@@ -295,25 +316,63 @@ defmodule Observer.Web.Apps.Page do
end
def handle_parent_event(
- "request_process_action",
- _params,
+ action,
+ %{"type" => "toggle-memory"},
%{
assigns: %{
- current_selected_id: current_selected_id,
- process_memory_monitor: process_memory_monitor
+ current_selected_id: current_selected_id
}
} = socket
- ) do
- new_process_memory_monitor = !process_memory_monitor
- text = if new_process_memory_monitor, do: "enabled", else: "disabled"
- pid_string = current_selected_id.id_string
+ )
+ when action in ["request_process_action", "request_port_action"] do
+ new_process_memory_monitor = !current_selected_id.memory_monitor
+
+ {_pid_or_port, id} = Helpers.parse_identifier(current_selected_id.id_string)
# NOTE: Add monitor enable actions here
+ socket =
+ if new_process_memory_monitor do
+ {:ok, %{metric: metric}} = Monitor.start_id_monitor(id)
+
+ # Subscribe to receive events
+ Telemetry.subscribe_for_new_data(current_selected_id.node, metric)
+
+ # Fetch current data
+ data_key = data_key(current_selected_id.node, metric)
+
+ data =
+ current_selected_id.node
+ |> Telemetry.list_data_by_node_key(metric, from: 15)
+ |> Enum.map(&Map.put(&1, :id, &1.timestamp))
+
+ socket
+ |> stream(data_key, [], reset: true)
+ |> stream(data_key, data)
+ |> assign(:current_selected_id, %{
+ current_selected_id
+ | metric: metric,
+ memory_monitor: new_process_memory_monitor
+ })
+ else
+ Monitor.stop_id_monitor(id)
+ data_key = data_key(current_selected_id.node, current_selected_id.metric)
+
+ socket
+ |> stream(data_key, [], reset: true)
+ |> assign(:current_selected_id, %{
+ current_selected_id
+ | memory_monitor: new_process_memory_monitor
+ })
+ end
+
+ text = if new_process_memory_monitor, do: "enabled", else: "disabled"
{:noreply,
socket
- |> assign(:process_memory_monitor, new_process_memory_monitor)
- |> put_flash(:info, "Memory monitoring #{text} for process pid: #{pid_string}")}
+ |> put_flash(
+ :info,
+ "Memory monitor #{text} for id: #{current_selected_id.id_string}"
+ )}
end
def handle_parent_event(
@@ -343,7 +402,7 @@ defmodule Observer.Web.Apps.Page do
socket
|> put_flash(:info, "Port id: #{id_string} successfully closed")
|> assign(:selected_id_action_confirmation, nil)
- |> assign(:current_selected_id, reset_current_selected_id())}
+ |> assign(:current_selected_id, %Identifier{})}
end
def handle_parent_event("process-kill-confirmation", %{"id" => id_string}, socket) do
@@ -353,7 +412,7 @@ defmodule Observer.Web.Apps.Page do
socket
|> put_flash(:info, "Process pid: #{id_string} successfully terminated")
|> assign(:selected_id_action_confirmation, nil)
- |> assign(:current_selected_id, reset_current_selected_id())}
+ |> assign(:current_selected_id, %Identifier{})}
end
def handle_parent_event("confirm-close-modal", _, socket) do
@@ -412,7 +471,7 @@ defmodule Observer.Web.Apps.Page do
{:noreply,
socket
|> assign(:node_info, node_info)
- |> assign(:current_selected_id, reset_current_selected_id())}
+ |> assign(:current_selected_id, %Identifier{})}
end
def handle_parent_event(
@@ -436,7 +495,7 @@ defmodule Observer.Web.Apps.Page do
{:noreply,
socket
|> assign(:node_info, node_info)
- |> assign(:current_selected_id, reset_current_selected_id())}
+ |> assign(:current_selected_id, %Identifier{})}
end
def handle_parent_event(
@@ -474,7 +533,7 @@ defmodule Observer.Web.Apps.Page do
{:noreply,
socket
|> assign(:node_info, node_info)
- |> assign(:current_selected_id, reset_current_selected_id())}
+ |> assign(:current_selected_id, %Identifier{})}
end
def handle_parent_event(
@@ -512,7 +571,7 @@ defmodule Observer.Web.Apps.Page do
{:noreply,
socket
|> assign(:node_info, node_info)
- |> assign(:current_selected_id, reset_current_selected_id())}
+ |> assign(:current_selected_id, %Identifier{})}
end
@impl Page
@@ -528,33 +587,91 @@ defmodule Observer.Web.Apps.Page do
)
when id_string != request_id or debouncing < 0 do
get_state_timeout = form.params["get_state_timeout"] |> String.to_integer()
+ [service, _app] = String.split(series_name, "::")
+ node = String.to_existing_atom(service)
+
+ case Helpers.parse_identifier(request_id) do
+ {:pid, pid} ->
+ current_selected_id = %Identifier{
+ info: Apps.Process.info(pid, get_state_timeout),
+ id_string: request_id,
+ type: "pid",
+ node: node
+ }
- current_selected_id =
- case Helpers.parse_identifier(request_id) do
- {:pid, pid} ->
- %{
- info: Apps.Process.info(pid, get_state_timeout),
- id_string: request_id,
- type: "pid",
- debouncing: @tooltip_debouncing
- }
+ case Monitor.id_info(pid) do
+ {:ok, %{metric: metric}} ->
+ # Subscribe to receive events
+ Telemetry.subscribe_for_new_data(service, metric)
+
+ # Read current metrics
+ data_key = data_key(service, metric)
+
+ data =
+ service
+ |> Telemetry.list_data_by_node_key(metric, from: 15)
+ |> Enum.map(&Map.put(&1, :id, &1.timestamp))
+
+ {:noreply,
+ socket
+ |> stream(data_key, [], reset: true)
+ |> stream(data_key, data)
+ |> assign(:current_selected_id, %{
+ current_selected_id
+ | metric: metric,
+ memory_monitor: true
+ })}
+
+ _ ->
+ {:noreply,
+ socket
+ |> assign(:current_selected_id, %{current_selected_id | memory_monitor: false})}
+ end
- {:port, port} ->
- [service, _app] = String.split(series_name, "::")
- node = String.to_existing_atom(service)
+ {:port, port} ->
+ current_selected_id = %Identifier{
+ info: Apps.Port.info(node, port),
+ id_string: request_id,
+ type: "port",
+ node: node
+ }
- %{
- info: Apps.Port.info(node, port),
- id_string: request_id,
- type: "port",
- debouncing: @tooltip_debouncing
- }
+ case Monitor.id_info(port) do
+ {:ok, %{metric: metric}} ->
+ # Subscribe to receive events
+ Telemetry.subscribe_for_new_data(service, metric)
+
+ # Read current metrics
+ data_key = data_key(service, metric)
+
+ data =
+ service
+ |> Telemetry.list_data_by_node_key(metric, from: 15)
+ |> Enum.map(&Map.put(&1, :id, &1.timestamp))
+
+ {:noreply,
+ socket
+ |> stream(data_key, [], reset: true)
+ |> stream(data_key, data)
+ |> assign(:current_selected_id, %{
+ current_selected_id
+ | metric: metric,
+ memory_monitor: true
+ })}
+
+ _ ->
+ {:noreply,
+ socket
+ |> assign(:current_selected_id, %{current_selected_id | memory_monitor: false})}
+ end
- {:none, _any} ->
- reset_current_selected_id(request_id)
- end
+ {:none, _any} ->
+ current_selected_id = %Identifier{id_string: request_id, node: node}
- {:noreply, assign(socket, :current_selected_id, current_selected_id)}
+ {:noreply,
+ socket
+ |> assign(:current_selected_id, current_selected_id)}
+ end
end
# The debouncing added here will reduce the number of Process.info requests since
@@ -570,6 +687,13 @@ defmodule Observer.Web.Apps.Page do
})}
end
+ @impl Page
+ def handle_info({:metrics_new_data, service, key, data}, socket) do
+ data_key = data_key(service, key)
+
+ {:noreply, stream_insert(socket, data_key, Map.put(data, :id, data.timestamp))}
+ end
+
def handle_info({:nodeup, _node}, %{assigns: %{node_info: node_info}} = socket) do
node_info =
update_node_info(
@@ -599,7 +723,7 @@ defmodule Observer.Web.Apps.Page do
{:noreply,
socket
|> assign(:node_info, node_info)
- |> assign(:current_selected_id, reset_current_selected_id())}
+ |> assign(:current_selected_id, %Identifier{})}
end
defp data_key(service, apps), do: "#{service}::#{apps}"
@@ -678,9 +802,6 @@ defmodule Observer.Web.Apps.Page do
end)
end
- defp reset_current_selected_id(id_string \\ nil),
- do: %{info: nil, id_string: id_string, type: nil, debouncing: @tooltip_debouncing}
-
defp flare_chart_data(series) do
%{
tooltip: %{
diff --git a/lib/web/pages/apps/port.ex b/lib/web/pages/apps/port.ex
index 4f976b1..6359809 100644
--- a/lib/web/pages/apps/port.ex
+++ b/lib/web/pages/apps/port.ex
@@ -6,9 +6,14 @@ defmodule Observer.Web.Apps.Port do
alias Observer.Web.Apps.PortActions
alias Observer.Web.Components.Attention
+ alias Observer.Web.Components.Metrics.VmPortMemory
attr :info, :map, required: true
attr :id, :string, required: true
+ attr :memory_monitor, :boolean, required: true
+ attr :node, :atom, required: true
+ attr :metric, :string, required: true
+ attr :metrics, :list, required: true
def content(assigns) do
info = assigns.info
@@ -19,7 +24,8 @@ defmodule Observer.Web.Apps.Port do
%{name: "Id", value: "#{info.id}"},
%{name: "Name", value: "#{info.name}"},
%{name: "Os Pid", value: "#{info.os_pid}"},
- %{name: "Connected", value: "#{inspect(info.connected)}"}
+ %{name: "Connected", value: "#{inspect(info.connected)}"},
+ %{name: "Memory (bytes)", value: "#{info.memory}"}
]
else
nil
@@ -38,12 +44,17 @@ defmodule Observer.Web.Apps.Port do
id="observer-port"
title="Warning"
class="border-red-400 text-red-500"
- message={"Port #{@id} is either dead or protected and therefore can not be shown."}
+ message={"#{@id} is either dead or protected and therefore can not be shown."}
/>
<% true -> %>
<% end %>
+
+ <%= if @memory_monitor do %>
+
+
+
+ <% end %>
"""
end
diff --git a/lib/web/pages/apps/port_actions.ex b/lib/web/pages/apps/port_actions.ex
index 0b8313a..5ec86c4 100644
--- a/lib/web/pages/apps/port_actions.ex
+++ b/lib/web/pages/apps/port_actions.ex
@@ -8,6 +8,8 @@ defmodule Observer.Web.Apps.PortActions do
attr :id, :string, required: true
attr :on_action, :any, required: true
+ attr :memory_monitor, :boolean, required: true
+ attr :node, :atom, required: true
def content(assigns) do
~H"""
@@ -31,10 +33,35 @@ defmodule Observer.Web.Apps.PortActions do
-
- Port ID: {@id}
-
+
+
+
+
+ Memory Monitor
+
+
+
+
+ Node: {@node}
+
+
+ Port ID:
+ {port_info(@node, @id)}
+
+
"""
end
+
+ defp port_info(node, id) do
+ if node == node(), do: "#{id} (local)", else: "#{id} (remote)"
+ end
end
diff --git a/lib/web/pages/apps/process.ex b/lib/web/pages/apps/process.ex
index d6038c2..4529753 100644
--- a/lib/web/pages/apps/process.ex
+++ b/lib/web/pages/apps/process.ex
@@ -7,11 +7,15 @@ defmodule Observer.Web.Apps.Process do
alias Observer.Web.Apps.ProcessActions
alias Observer.Web.Components.Attention
alias Observer.Web.Components.CopyToClipboard
+ alias Observer.Web.Components.Metrics.VmProcessMemory
attr :info, :map, required: true
attr :id, :string, required: true
attr :form, :map, required: true
- attr :process_memory_monitor, :boolean, required: true
+ attr :memory_monitor, :boolean, required: true
+ attr :node, :atom, required: true
+ attr :metric, :string, required: true
+ attr :metrics, :list, required: true
def content(assigns) do
info = assigns.info
@@ -86,7 +90,7 @@ defmodule Observer.Web.Apps.Process do
id="apps-process-alert"
title="Warning"
class="border-red-400 text-red-500"
- message={"Process #{@id} is either dead or protected and therefore can not be shown."}
+ message={"#{@id} is either dead or protected and therefore can not be shown."}
/>
<% true -> %>
+ <%= if @memory_monitor do %>
+
+
+
+ <% end %>
+
<.relations
title="State"
diff --git a/lib/web/pages/apps/process_actions.ex b/lib/web/pages/apps/process_actions.ex
index c964d19..a8a49fc 100644
--- a/lib/web/pages/apps/process_actions.ex
+++ b/lib/web/pages/apps/process_actions.ex
@@ -9,7 +9,8 @@ defmodule Observer.Web.Apps.ProcessActions do
attr :id, :map, required: true
attr :on_action, :any, required: true
attr :form, :map, required: true
- attr :process_memory_monitor, :boolean, required: true
+ attr :memory_monitor, :boolean, required: true
+ attr :node, :atom, required: true
def content(assigns) do
~H"""
@@ -84,23 +85,34 @@ defmodule Observer.Web.Apps.ProcessActions do
Memory Monitor
-
- Process ID: {@id}
-
+
+
+ Node: {@node}
+
+
+ Process ID:
+ {process_info(@node, @id)}
+
+
"""
end
defp border_error(true), do: "border-red-200 focus:border-red-400 hover:border-red-300"
defp border_error(_false), do: "border-slate-200 focus:border-slate-400 hover:border-slate-300"
+
+ defp process_info(node, id) do
+ if node == node(), do: "#{id} (local)", else: "#{id} (remote)"
+ end
end
diff --git a/lib/web/pages/metrics/page.ex b/lib/web/pages/metrics/page.ex
index d033f97..753da9c 100644
--- a/lib/web/pages/metrics/page.ex
+++ b/lib/web/pages/metrics/page.ex
@@ -13,6 +13,8 @@ defmodule Observer.Web.Metrics.Page do
alias Observer.Web.Components.Metrics.PhxLvSocket
alias Observer.Web.Components.Metrics.VmLimits
alias Observer.Web.Components.Metrics.VmMemory
+ alias Observer.Web.Components.Metrics.VmPortMemory
+ alias Observer.Web.Components.Metrics.VmProcessMemory
alias Observer.Web.Components.Metrics.VmRunQueue
alias Observer.Web.Components.MultiSelect
alias Observer.Web.Page
@@ -152,6 +154,20 @@ defmodule Observer.Web.Metrics.Page do
transition={@metric_config[data_key]["transition"]}
metrics={Map.get(@streams, data_key)}
/>
+
+
<% end %>
<% end %>
<% end %>
diff --git a/test/observer_web/monitor/process_port_test.exs b/test/observer_web/monitor/process_port_test.exs
new file mode 100644
index 0000000..de8ab14
--- /dev/null
+++ b/test/observer_web/monitor/process_port_test.exs
@@ -0,0 +1,291 @@
+defmodule ObserverWeb.Monitor.ProcessPortTest do
+ use ExUnit.Case, async: false
+
+ alias ObserverWeb.Monitor
+ alias ObserverWeb.Monitor.ProcessPort
+
+ import Mox
+
+ setup [
+ :set_mox_global,
+ :verify_on_exit!
+ ]
+
+ setup do
+ # Clear any existing telemetry handlers
+ on_exit(fn ->
+ :telemetry.list_handlers([])
+ |> Enum.each(fn %{id: id} -> :telemetry.detach(id) end)
+ end)
+
+ ObserverWeb.TelemetryMock
+ |> stub(:push_data, fn _event -> :ok end)
+
+ ObserverWeb.RpcMock
+ |> stub(:pinfo, fn pid, information -> :rpc.pinfo(pid, information) end)
+ |> stub(:call, fn node, module, function, args, timeout ->
+ :rpc.call(node, module, function, args, timeout)
+ end)
+
+ :ok
+ end
+
+ describe "start_link/1" do
+ test "starts the GenServer with default config" do
+ pid = Process.whereis(ProcessPort)
+ assert is_pid(pid)
+ assert Process.alive?(pid)
+ end
+ end
+
+ describe "start_id_monitor/1 and stop_id_monitor/1" do
+ test "monitors a process and returns process info" do
+ test_pid = spawn(fn -> Process.sleep(:infinity) end)
+
+ {:ok, info} = Monitor.start_id_monitor(test_pid)
+
+ assert %ProcessPort{} = info
+ assert info.event == [:vm, :process, :memory, info.atomized_id]
+ assert is_atom(info.atomized_id)
+ assert String.starts_with?(Atom.to_string(info.atomized_id), "pid_")
+ assert info.metric == Enum.join(info.event ++ [:total], ".")
+
+ # Clean up
+ Process.exit(test_pid, :kill)
+ Monitor.stop_id_monitor(test_pid)
+ end
+
+ test "monitors a port and returns port info" do
+ {:ok, port} = :gen_tcp.listen(0, [])
+
+ {:ok, info} = Monitor.start_id_monitor(port)
+
+ assert %ProcessPort{} = info
+ assert info.event == [:vm, :port, :memory, info.atomized_id]
+ assert is_atom(info.atomized_id)
+ assert String.starts_with?(Atom.to_string(info.atomized_id), "port_")
+ assert info.metric == Enum.join(info.event ++ [:total], ".")
+
+ # Clean up
+ :gen_tcp.close(port)
+ Monitor.stop_id_monitor(port)
+ end
+
+ test "returns same info when attaching same process twice" do
+ test_pid = spawn(fn -> Process.sleep(:infinity) end)
+
+ {:ok, info1} = Monitor.start_id_monitor(test_pid)
+ {:ok, info2} = Monitor.start_id_monitor(test_pid)
+
+ assert info1 == info2
+
+ # Clean up
+ Process.exit(test_pid, :kill)
+ Monitor.stop_id_monitor(test_pid)
+ end
+
+ test "stops monitoring a process" do
+ test_pid = spawn(fn -> Process.sleep(:infinity) end)
+
+ {:ok, _info} = Monitor.start_id_monitor(test_pid)
+ assert :ok = Monitor.stop_id_monitor(test_pid)
+
+ # Verify it's no longer monitored
+ assert {:error, :not_found} = Monitor.id_info(test_pid)
+
+ # Clean up
+ Process.exit(test_pid, :kill)
+ end
+
+ test "stop_id_monitor returns :ok even if process not monitored" do
+ test_pid = spawn(fn -> Process.sleep(:infinity) end)
+
+ assert :ok = Monitor.stop_id_monitor(test_pid)
+
+ # Clean up
+ Process.exit(test_pid, :kill)
+ end
+
+ test "stop_id_monitor handles errors gracefully" do
+ # Create a fake PID that doesn't exist on this node
+ fake_pid = :erlang.list_to_pid(~c"<0.999999.0>")
+
+ assert :ok = Monitor.stop_id_monitor(fake_pid)
+ end
+ end
+
+ describe "id_info/1" do
+ test "returns process info for monitored process" do
+ test_pid = spawn(fn -> Process.sleep(:infinity) end)
+
+ {:ok, expected_info} = Monitor.start_id_monitor(test_pid)
+ {:ok, retrieved_info} = Monitor.id_info(test_pid)
+
+ assert retrieved_info == expected_info
+
+ # Clean up
+ Process.exit(test_pid, :kill)
+ Monitor.stop_id_monitor(test_pid)
+ end
+
+ test "returns error for non-monitored process" do
+ test_pid = spawn(fn -> Process.sleep(:infinity) end)
+
+ assert {:error, :not_found} = Monitor.id_info(test_pid)
+
+ # Clean up
+ Process.exit(test_pid, :kill)
+ end
+
+ test "returns error when Pid is not present" do
+ # Create a fake PID on a non-existent node
+ fake_pid = :erlang.list_to_pid(~c"<0.999999.0>")
+
+ assert {:error, :not_found} = Monitor.id_info(fake_pid)
+ end
+ end
+
+ describe "handle_info(:refresh_metrics, _)" do
+ test "removes dead processes from monitoring" do
+ test_pid = spawn(fn -> :ok end)
+ # Ensure process dies
+ Process.sleep(10)
+
+ {:ok, _info} = Monitor.start_id_monitor(test_pid)
+
+ # Trigger refresh - should detect dead process
+ send(Process.whereis(ProcessPort), :refresh_metrics)
+ Process.sleep(100)
+
+ # Process should no longer be in monitored list
+ assert {:error, :not_found} = Monitor.id_info(test_pid)
+ end
+
+ test "updates metrics for monitored ports" do
+ {:ok, port} = :gen_tcp.listen(0, [])
+
+ {:ok, _info} = Monitor.start_id_monitor(port)
+
+ # Trigger refresh manually
+ send(Process.whereis(ProcessPort), :refresh_metrics)
+ Process.sleep(100)
+
+ # Port should still be monitored
+ assert {:ok, _info} = Monitor.id_info(port)
+
+ # Clean up
+ :gen_tcp.close(port)
+ Monitor.stop_id_monitor(port)
+ end
+
+ test "removes closed ports from monitoring" do
+ {:ok, port} = :gen_tcp.listen(0, [])
+ {:ok, _info} = Monitor.start_id_monitor(port)
+
+ # Close the port
+ :gen_tcp.close(port)
+
+ # Trigger refresh - should detect closed port
+ send(Process.whereis(ProcessPort), :refresh_metrics)
+ Process.sleep(100)
+
+ # Port should no longer be in monitored list
+ assert {:error, :not_found} = Monitor.id_info(port)
+ end
+ end
+
+ describe "id_to_struct/1 for PIDs" do
+ test "converts PID to proper struct format" do
+ test_pid = spawn(fn -> Process.sleep(:infinity) end)
+
+ {:ok, info} = Monitor.start_id_monitor(test_pid)
+ atomized_id = info.atomized_id
+
+ # Check atomized_id format
+ atomized_str = Atom.to_string(atomized_id)
+ assert String.starts_with?(atomized_str, "pid_")
+ refute String.contains?(atomized_str, "#PID<")
+ refute String.contains?(atomized_str, ">")
+ refute String.contains?(atomized_str, ".")
+
+ # Check event structure
+ assert [:vm, :process, :memory, ^atomized_id] = info.event
+
+ # Check metric structure
+ assert String.starts_with?(info.metric, "vm.process.memory.")
+ assert String.ends_with?(info.metric, ".total")
+
+ # Clean up
+ Process.exit(test_pid, :kill)
+ Monitor.stop_id_monitor(test_pid)
+ end
+ end
+
+ describe "id_to_struct/1 for Ports" do
+ test "converts Port to proper struct format" do
+ {:ok, port} = :gen_tcp.listen(0, [])
+
+ {:ok, info} = Monitor.start_id_monitor(port)
+ atomized_id = info.atomized_id
+
+ # Check atomized_id format
+ atomized_str = Atom.to_string(atomized_id)
+ assert String.starts_with?(atomized_str, "port_")
+ refute String.contains?(atomized_str, "#Port<")
+ refute String.contains?(atomized_str, ">")
+ refute String.contains?(atomized_str, ".")
+
+ # Check event structure
+ assert [:vm, :port, :memory, ^atomized_id] = info.event
+
+ # Check metric structure
+ assert String.starts_with?(info.metric, "vm.port.memory.")
+ assert String.ends_with?(info.metric, ".total")
+
+ # Clean up
+ :gen_tcp.close(port)
+ Monitor.stop_id_monitor(port)
+ end
+ end
+
+ describe "telemetry integration" do
+ test "attaches telemetry handler when monitoring starts" do
+ test_pid = spawn(fn -> Process.sleep(:infinity) end)
+
+ initial_handlers = :telemetry.list_handlers([])
+ {:ok, _info} = Monitor.start_id_monitor(test_pid)
+ final_handlers = :telemetry.list_handlers([])
+
+ # Should have one more handler
+ assert length(final_handlers) == length(initial_handlers) + 1
+
+ # Clean up
+ Process.exit(test_pid, :kill)
+ Monitor.stop_id_monitor(test_pid)
+ end
+
+ test "detaches telemetry handler when monitoring stops" do
+ test_pid = spawn(fn -> Process.sleep(:infinity) end)
+
+ {:ok, info} = Monitor.start_id_monitor(test_pid)
+ handlers_before = :telemetry.list_handlers([])
+
+ Monitor.stop_id_monitor(test_pid)
+ handlers_after = :telemetry.list_handlers([])
+
+ # Should have one fewer handler
+ assert length(handlers_after) == length(handlers_before) - 1
+
+ # Verify handler is removed
+ handler =
+ Enum.find(handlers_after, fn h ->
+ h.id == {ProcessPort, info.event, self()}
+ end)
+
+ assert handler == nil
+
+ # Clean up
+ Process.exit(test_pid, :kill)
+ end
+ end
+end
diff --git a/test/observer_web/telemetry_consumer_test.exs b/test/observer_web/telemetry_consumer_test.exs
index 2a24825..05915b0 100644
--- a/test/observer_web/telemetry_consumer_test.exs
+++ b/test/observer_web/telemetry_consumer_test.exs
@@ -114,7 +114,7 @@ defmodule ObserverWeb.TelemetryConsumerTest do
end
end
- test "prints missing and bad measurements", %{reporter: reporter} do
+ test "Allow nil values", %{reporter: reporter} do
test_pid_process = self()
with_mock ObserverWeb.Telemetry,
@@ -125,26 +125,40 @@ defmodule ObserverWeb.TelemetryConsumerTest do
send(test_pid_process, {:capture_event, called, event})
:ok
end do
- :telemetry.execute([:vm, :memory], %{binary: :hundred}, %{foo: :bar})
+ :telemetry.execute([:vm, :memory], %{binary: nil, total: 2000}, %{foo: :bar})
assert_receive {:capture_event, 0, event}, 1_000
- assert %{metrics: [], measurements: %{binary: :hundred}, reporter: ^reporter} = event
+ assert %{
+ metrics: [
+ %Consumer{
+ name: "vm.memory.total",
+ value: 2.0,
+ unit: " kilobyte",
+ info: "",
+ tags: %{},
+ type: "summary"
+ }
+ ],
+ measurements: %{binary: nil, total: 2000},
+ reporter: ^reporter
+ } = event
assert_receive {:capture_event, 1, event}, 1_000
assert %{
metrics: [
+ _any,
%Consumer{
name: "vm.memory.binary",
- value: :hundred,
+ value: nil,
unit: " byte",
info: " (WARNING! measurement should be a number)",
tags: %{},
type: "last_value"
}
],
- measurements: %{binary: :hundred},
+ measurements: %{binary: nil, total: 2000},
reporter: ^reporter
} = event
end
diff --git a/test/observer_web/web/live/apps/page_test.exs b/test/observer_web/web/live/apps/page_test.exs
index 5ca3cb0..eca9635 100644
--- a/test/observer_web/web/live/apps/page_test.exs
+++ b/test/observer_web/web/live/apps/page_test.exs
@@ -342,7 +342,7 @@ defmodule Observer.Web.Apps.PageLiveTest do
assert html = render(index_live)
# Check the Process information is not being shown
- assert html =~ "Process #PID<0.0.11111> is either dead or protected"
+ assert html =~ "#PID<0.0.11111> is either dead or protected"
end
test "Select Service+Apps and Kill a process", %{conn: conn} do
@@ -517,6 +517,10 @@ defmodule Observer.Web.Apps.PageLiveTest do
:rpc.pinfo(pid, information)
end)
+ ObserverWeb.TelemetryMock
+ |> expect(:subscribe_for_new_data, fn _node, _metric -> :ok end)
+ |> expect(:list_data_by_node_key, fn _node, _metric, _options -> [] end)
+
TelemetryStubber.defaults()
{:ok, index_live, _html} = live(conn, "/observer/applications")
@@ -547,11 +551,11 @@ defmodule Observer.Web.Apps.PageLiveTest do
assert index_live
|> element("input[type=\"checkbox\"]")
- |> render_click() =~ "Memory monitoring enabled for process pid:"
+ |> render_click() =~ "Memory monitor enabled for id: #PID"
assert index_live
|> element("input[type=\"checkbox\"]")
- |> render_click() =~ "Memory monitoring disabled for process pid:"
+ |> render_click() =~ "Memory monitor disabled for id: #PID"
end
test "Select Service+Apps and Send a message to a process", %{conn: conn} do
@@ -709,6 +713,61 @@ defmodule Observer.Web.Apps.PageLiveTest do
assert html =~ "Connected"
end
+ test "Select Service+Apps and Toggle port Monitor", %{conn: conn} do
+ node = Node.self() |> to_string
+ service = Helpers.normalize_id(node)
+ test_pid_process = self()
+
+ ObserverWeb.RpcMock
+ |> stub(:call, fn node, module, function, args, timeout ->
+ :rpc.call(node, module, function, args, timeout)
+ end)
+ |> stub(:pinfo, fn pid, information ->
+ send(test_pid_process, {:apps_page_pid, self()})
+ :rpc.pinfo(pid, information)
+ end)
+
+ ObserverWeb.TelemetryMock
+ |> expect(:subscribe_for_new_data, fn _node, _metric -> :ok end)
+ |> expect(:list_data_by_node_key, fn _node, _metric, _options -> [] end)
+
+ TelemetryStubber.defaults()
+
+ {:ok, index_live, _html} = live(conn, "/observer/applications")
+
+ index_live
+ |> element("#apps-multi-select-toggle-options")
+ |> render_click()
+
+ index_live
+ |> element("#apps-multi-select-apps-kernel-add-item")
+ |> render_click()
+
+ index_live
+ |> element("#apps-multi-select-services-#{service}-add-item")
+ |> render_click()
+
+ port = Enum.random(:erlang.ports())
+
+ # Send the request 2 times to validate the path where the request
+ # was already executed.
+ id = "#{inspect(port)}"
+ series_name = "#{Node.self()}::kernel"
+
+ assert_receive {:apps_page_pid, apps_page_pid}, 1_000
+
+ send(apps_page_pid, {"request-process", %{"id" => id, "series_name" => series_name}})
+ send(apps_page_pid, {"request-process", %{"id" => id, "series_name" => series_name}})
+
+ assert index_live
+ |> element("input[type=\"checkbox\"]")
+ |> render_click() =~ "Memory monitor enabled for id: #Port"
+
+ assert index_live
+ |> element("input[type=\"checkbox\"]")
+ |> render_click() =~ "Memory monitor disabled for id: #Port"
+ end
+
test "Select Service+Apps and select a port that is dead or doesn't exist", %{conn: conn} do
node = Node.self() |> to_string
service = Helpers.normalize_id(node)
@@ -751,7 +810,7 @@ defmodule Observer.Web.Apps.PageLiveTest do
assert html = render(index_live)
# Check the Port information is not being shown
- assert html =~ "Port #Port<0.100> is either dead or protected"
+ assert html =~ "#Port<0.100> is either dead or protected"
end
test "Select Service+Apps and close a port", %{conn: conn} do
diff --git a/test/observer_web/web/live/metrics/vm_port_memory_test.exs b/test/observer_web/web/live/metrics/vm_port_memory_test.exs
new file mode 100644
index 0000000..847c8b1
--- /dev/null
+++ b/test/observer_web/web/live/metrics/vm_port_memory_test.exs
@@ -0,0 +1,174 @@
+defmodule Observer.Web.Metrics.VmPortMemoryTest do
+ use Observer.Web.ConnCase, async: false
+
+ import Phoenix.LiveViewTest
+ import Mox
+
+ alias Observer.Web.Mocks.TelemetryStubber
+ alias ObserverWeb.TelemetryFixtures
+
+ setup [
+ :set_mox_global,
+ :verify_on_exit!
+ ]
+
+ test "Add/Remove Service + vm.port.memory.port_0_300.total", %{conn: conn} do
+ node = Node.self() |> to_string
+ service_id = Helpers.normalize_id(node)
+ metric = "vm.port.memory.port_0_300.total"
+ metric_id = Helpers.normalize_id(metric)
+ telemetry_data = TelemetryFixtures.build_telemetry_data_vm_port_total_memory()
+
+ TelemetryStubber.defaults()
+ |> expect(:subscribe_for_new_keys, fn -> :ok end)
+ |> expect(:subscribe_for_new_data, fn ^node, ^metric -> :ok end)
+ |> expect(:unsubscribe_for_new_data, fn ^node, ^metric -> :ok end)
+ |> expect(:list_data_by_node_key, fn ^node, ^metric, _ -> [telemetry_data] end)
+ |> stub(:get_keys_by_node, fn _node -> [metric] end)
+
+ {:ok, liveview, _html} = live(conn, "/observer/metrics")
+
+ liveview
+ |> element("#metrics-multi-select-toggle-options")
+ |> render_click()
+
+ liveview
+ |> element("#metrics-multi-select-services-#{service_id}-add-item")
+ |> render_click()
+
+ html =
+ liveview
+ |> element("#metrics-multi-select-metrics-#{metric_id}-add-item")
+ |> render_click()
+
+ assert html =~ "services:#{node}"
+ assert html =~ "metrics:#{metric}"
+
+ html =
+ liveview
+ |> element("#metrics-multi-select-services-#{service_id}-remove-item")
+ |> render_click()
+
+ refute html =~ "services:#{node}"
+ assert html =~ "metrics:#{metric}"
+
+ html =
+ liveview
+ |> element("#metrics-multi-select-metrics-#{metric_id}-remove-item")
+ |> render_click()
+
+ refute html =~ "services:#{node}"
+ refute html =~ "metrics:#{metric}"
+ end
+
+ test "Add/Remove vm.port.memory.port_0_300.total + Service", %{conn: conn} do
+ node = Node.self() |> to_string
+ service_id = Helpers.normalize_id(node)
+ metric = "vm.port.memory.port_0_300.total"
+ metric_id = Helpers.normalize_id(metric)
+ telemetry_data = TelemetryFixtures.build_telemetry_data_vm_port_total_memory()
+
+ TelemetryStubber.defaults()
+ |> expect(:subscribe_for_new_keys, fn -> :ok end)
+ |> expect(:subscribe_for_new_data, fn ^node, ^metric -> :ok end)
+ |> expect(:unsubscribe_for_new_data, fn ^node, ^metric -> :ok end)
+ |> expect(:list_data_by_node_key, fn ^node, ^metric, _ -> [telemetry_data] end)
+ |> stub(:get_keys_by_node, fn _ -> [metric] end)
+
+ {:ok, liveview, _html} = live(conn, "/observer/metrics")
+
+ liveview
+ |> element("#metrics-multi-select-toggle-options")
+ |> render_click()
+
+ liveview
+ |> element("#metrics-multi-select-metrics-#{metric_id}-add-item")
+ |> render_click()
+
+ html =
+ liveview
+ |> element("#metrics-multi-select-services-#{service_id}-add-item")
+ |> render_click()
+
+ assert html =~ "services:#{node}"
+ assert html =~ "metrics:#{metric}"
+
+ html =
+ liveview
+ |> element("#metrics-multi-select-metrics-#{metric_id}-remove-item")
+ |> render_click()
+
+ assert html =~ "services:#{node}"
+ refute html =~ "metrics:#{metric}"
+
+ html =
+ liveview
+ |> element("#metrics-multi-select-services-#{service_id}-remove-item")
+ |> render_click()
+
+ refute html =~ "services:#{node}"
+ refute html =~ "metrics:#{metric}"
+ end
+
+ test "Init and Push vm.port.memory.port_0_300.total data", %{conn: conn} do
+ node = Node.self() |> to_string
+ service_id = Helpers.normalize_id(node)
+ metric = "vm.port.memory.port_0_300.total"
+ metric_id = Helpers.normalize_id(metric)
+
+ test_pid_process = self()
+
+ TelemetryStubber.defaults()
+ |> expect(:subscribe_for_new_keys, fn -> :ok end)
+ |> expect(:subscribe_for_new_data, fn ^node, ^metric ->
+ send(test_pid_process, {:liveview_pid, self()})
+ :ok
+ end)
+ |> expect(:list_data_by_node_key, fn ^node, ^metric, _ ->
+ [
+ TelemetryFixtures.build_telemetry_data_vm_port_total_memory(1_737_982_379_123)
+ ]
+ end)
+ |> stub(:get_keys_by_node, fn _ -> [metric] end)
+
+ {:ok, liveview, _html} = live(conn, "/observer/metrics")
+
+ liveview
+ |> element("#metrics-multi-select-toggle-options")
+ |> render_click()
+
+ liveview
+ |> element("#metrics-multi-select-metrics-#{metric_id}-add-item")
+ |> render_click()
+
+ html =
+ liveview
+ |> element("#metrics-multi-select-services-#{service_id}-add-item")
+ |> render_click()
+
+ assert_receive {:liveview_pid, liveview_pid}, 1_000
+
+ # assert initial data
+ assert html =~ "2025-01-27 12:52:59.123Z"
+
+ # assert live updated data
+ send(
+ liveview_pid,
+ {:metrics_new_data, node, metric,
+ TelemetryFixtures.build_telemetry_data_vm_port_total_memory(1_737_982_379_456)}
+ )
+
+ html = render(liveview)
+ assert html =~ "2025-01-27 12:52:59.456Z"
+
+ # Assert nil data received, this will indicate the application has restarted
+ send(
+ liveview_pid,
+ {:metrics_new_data, node, metric,
+ TelemetryFixtures.build_telemetry_data(1_737_982_379_789, nil)}
+ )
+
+ html = render(liveview)
+ assert html =~ "2025-01-27 12:52:59.789Z"
+ end
+end
diff --git a/test/observer_web/web/live/metrics/vm_process_memory_test.exs b/test/observer_web/web/live/metrics/vm_process_memory_test.exs
new file mode 100644
index 0000000..167748b
--- /dev/null
+++ b/test/observer_web/web/live/metrics/vm_process_memory_test.exs
@@ -0,0 +1,174 @@
+defmodule Observer.Web.Metrics.VmProcessMemoryTest do
+ use Observer.Web.ConnCase, async: false
+
+ import Phoenix.LiveViewTest
+ import Mox
+
+ alias Observer.Web.Mocks.TelemetryStubber
+ alias ObserverWeb.TelemetryFixtures
+
+ setup [
+ :set_mox_global,
+ :verify_on_exit!
+ ]
+
+ test "Add/Remove Service + vm.process.memory.pid_0_300_0.total", %{conn: conn} do
+ node = Node.self() |> to_string
+ service_id = Helpers.normalize_id(node)
+ metric = "vm.process.memory.pid_0_300_0.total"
+ metric_id = Helpers.normalize_id(metric)
+ telemetry_data = TelemetryFixtures.build_telemetry_data_vm_process_total_memory()
+
+ TelemetryStubber.defaults()
+ |> expect(:subscribe_for_new_keys, fn -> :ok end)
+ |> expect(:subscribe_for_new_data, fn ^node, ^metric -> :ok end)
+ |> expect(:unsubscribe_for_new_data, fn ^node, ^metric -> :ok end)
+ |> expect(:list_data_by_node_key, fn ^node, ^metric, _ -> [telemetry_data] end)
+ |> stub(:get_keys_by_node, fn _node -> [metric] end)
+
+ {:ok, liveview, _html} = live(conn, "/observer/metrics")
+
+ liveview
+ |> element("#metrics-multi-select-toggle-options")
+ |> render_click()
+
+ liveview
+ |> element("#metrics-multi-select-services-#{service_id}-add-item")
+ |> render_click()
+
+ html =
+ liveview
+ |> element("#metrics-multi-select-metrics-#{metric_id}-add-item")
+ |> render_click()
+
+ assert html =~ "services:#{node}"
+ assert html =~ "metrics:#{metric}"
+
+ html =
+ liveview
+ |> element("#metrics-multi-select-services-#{service_id}-remove-item")
+ |> render_click()
+
+ refute html =~ "services:#{node}"
+ assert html =~ "metrics:#{metric}"
+
+ html =
+ liveview
+ |> element("#metrics-multi-select-metrics-#{metric_id}-remove-item")
+ |> render_click()
+
+ refute html =~ "services:#{node}"
+ refute html =~ "metrics:#{metric}"
+ end
+
+ test "Add/Remove vm.process.memory.pid_0_300_0.total + Service", %{conn: conn} do
+ node = Node.self() |> to_string
+ service_id = Helpers.normalize_id(node)
+ metric = "vm.process.memory.pid_0_300_0.total"
+ metric_id = Helpers.normalize_id(metric)
+ telemetry_data = TelemetryFixtures.build_telemetry_data_vm_process_total_memory()
+
+ TelemetryStubber.defaults()
+ |> expect(:subscribe_for_new_keys, fn -> :ok end)
+ |> expect(:subscribe_for_new_data, fn ^node, ^metric -> :ok end)
+ |> expect(:unsubscribe_for_new_data, fn ^node, ^metric -> :ok end)
+ |> expect(:list_data_by_node_key, fn ^node, ^metric, _ -> [telemetry_data] end)
+ |> stub(:get_keys_by_node, fn _ -> [metric] end)
+
+ {:ok, liveview, _html} = live(conn, "/observer/metrics")
+
+ liveview
+ |> element("#metrics-multi-select-toggle-options")
+ |> render_click()
+
+ liveview
+ |> element("#metrics-multi-select-metrics-#{metric_id}-add-item")
+ |> render_click()
+
+ html =
+ liveview
+ |> element("#metrics-multi-select-services-#{service_id}-add-item")
+ |> render_click()
+
+ assert html =~ "services:#{node}"
+ assert html =~ "metrics:#{metric}"
+
+ html =
+ liveview
+ |> element("#metrics-multi-select-metrics-#{metric_id}-remove-item")
+ |> render_click()
+
+ assert html =~ "services:#{node}"
+ refute html =~ "metrics:#{metric}"
+
+ html =
+ liveview
+ |> element("#metrics-multi-select-services-#{service_id}-remove-item")
+ |> render_click()
+
+ refute html =~ "services:#{node}"
+ refute html =~ "metrics:#{metric}"
+ end
+
+ test "Init and Push vm.process.memory.pid_0_300_0.total data", %{conn: conn} do
+ node = Node.self() |> to_string
+ service_id = Helpers.normalize_id(node)
+ metric = "vm.process.memory.pid_0_300_0.total"
+ metric_id = Helpers.normalize_id(metric)
+
+ test_pid_process = self()
+
+ TelemetryStubber.defaults()
+ |> expect(:subscribe_for_new_keys, fn -> :ok end)
+ |> expect(:subscribe_for_new_data, fn ^node, ^metric ->
+ send(test_pid_process, {:liveview_pid, self()})
+ :ok
+ end)
+ |> expect(:list_data_by_node_key, fn ^node, ^metric, _ ->
+ [
+ TelemetryFixtures.build_telemetry_data_vm_process_total_memory(1_737_982_379_123)
+ ]
+ end)
+ |> stub(:get_keys_by_node, fn _ -> [metric] end)
+
+ {:ok, liveview, _html} = live(conn, "/observer/metrics")
+
+ liveview
+ |> element("#metrics-multi-select-toggle-options")
+ |> render_click()
+
+ liveview
+ |> element("#metrics-multi-select-metrics-#{metric_id}-add-item")
+ |> render_click()
+
+ html =
+ liveview
+ |> element("#metrics-multi-select-services-#{service_id}-add-item")
+ |> render_click()
+
+ assert_receive {:liveview_pid, liveview_pid}, 1_000
+
+ # assert initial data
+ assert html =~ "2025-01-27 12:52:59.123Z"
+
+ # assert live updated data
+ send(
+ liveview_pid,
+ {:metrics_new_data, node, metric,
+ TelemetryFixtures.build_telemetry_data_vm_process_total_memory(1_737_982_379_456)}
+ )
+
+ html = render(liveview)
+ assert html =~ "2025-01-27 12:52:59.456Z"
+
+ # Assert nil data received, this will indicate the application has restarted
+ send(
+ liveview_pid,
+ {:metrics_new_data, node, metric,
+ TelemetryFixtures.build_telemetry_data(1_737_982_379_789, nil)}
+ )
+
+ html = render(liveview)
+ assert html =~ "2025-01-27 12:52:59.789Z"
+ end
+end
diff --git a/test/support/fixtures/telemetry.ex b/test/support/fixtures/telemetry.ex
index 70f6844..adda197 100644
--- a/test/support/fixtures/telemetry.ex
+++ b/test/support/fixtures/telemetry.ex
@@ -53,6 +53,41 @@ defmodule ObserverWeb.TelemetryFixtures do
}
end
+ def build_telemetry_data_vm_process_total_memory(
+ timestamp \\ :rand.uniform(2_000_000_000_000),
+ value \\ 70_973.064
+ ) do
+ %ObserverWeb.Telemetry.Data{
+ timestamp: timestamp,
+ value: value,
+ unit: " kilobyte",
+ tags: %{},
+ measurements: %{
+ gc_full_sweep_after: 31_784_038,
+ gc_min_heap_size: 2_973_664,
+ stack_size: 14_523_720,
+ heap_size: 14_439_392,
+ stack_and_heap: 56_449_344,
+ total: 70_973_064
+ }
+ }
+ end
+
+ def build_telemetry_data_vm_port_total_memory(
+ timestamp \\ :rand.uniform(2_000_000_000_000),
+ value \\ 70_973.064
+ ) do
+ %ObserverWeb.Telemetry.Data{
+ timestamp: timestamp,
+ value: value,
+ unit: " kilobyte",
+ tags: %{},
+ measurements: %{
+ total: 70_973_064
+ }
+ }
+ end
+
def build_telemetry_data_phx_lv_socket_total(
timestamp \\ :rand.uniform(2_000_000_000_000),
value \\ 555