Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,16 @@
# CHANGELOG (v0.2.X)

## 0.2.2 🚀 ()

### ⚠️ Backwards incompatible changes for 0.2.1
* None

### Bug fixes
* None

### Enhancements
* None

## 0.2.1 🚀 (2025-10-27)

### ⚠️ Backwards incompatible changes for 0.2.0
Expand Down
7 changes: 7 additions & 0 deletions lib/observer_web/telemetry.ex
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,13 @@ defmodule ObserverWeb.Telemetry do
@spec cached_mode() :: :local | :broadcast | :observer
def cached_mode, do: default().cached_mode()

@doc """
Update data retention period
"""
@spec update_data_retention_period(non_neg_integer()) :: :ok
def update_data_retention_period(retention_period),
do: default().update_data_retention_period(retention_period)

### ==========================================================================
### Private functions
### ==========================================================================
Expand Down
1 change: 1 addition & 0 deletions lib/observer_web/telemetry/adapter.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,5 @@ defmodule ObserverWeb.Telemetry.Adapter do
@callback get_keys_by_node(node()) :: list()
@callback list_active_nodes() :: list()
@callback cached_mode() :: nil | :local | :broadcast | :observer
@callback update_data_retention_period(non_neg_integer()) :: :ok
end
56 changes: 49 additions & 7 deletions lib/observer_web/telemetry/storage.ex
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,16 @@ defmodule ObserverWeb.Telemetry.Storage do
do_handle_metrics(event, state)
end

def handle_cast({:update_data_retention_period, retention_period}, state) do
# If decreasing retention period, immediately prune old data
if retention_period && state.data_retention_period &&
retention_period < state.data_retention_period do
prune_old_data_immediately(state, retention_period)
end

{:noreply, %{state | data_retention_period: retention_period}}
end

@impl true
def handle_info({:observer_web_telemetry, event}, state) do
do_handle_metrics(event, state)
Expand Down Expand Up @@ -164,16 +174,10 @@ defmodule ObserverWeb.Telemetry.Storage do
deletion_period_to = now_minutes - retention_period - 1
deletion_period_from = deletion_period_to - 2

prune_keys = fn key, table ->
Enum.each(deletion_period_from..deletion_period_to, fn timestamp ->
:ets.delete(table, metric_key(key, timestamp))
end)
end

Enum.each(tables, fn {node, table} ->
node
|> get_keys_by_node()
|> Enum.each(&prune_keys.(&1, table))
|> Enum.each(&prune_keys(&1, table, deletion_period_from, deletion_period_to))
end)

{:noreply, state}
Expand Down Expand Up @@ -351,6 +355,19 @@ defmodule ObserverWeb.Telemetry.Storage do
ets_lookup_if_exist(@storage_table, @mode_key, nil)
end

@impl true
def update_data_retention_period(retention_period) do
msg = {:update_data_retention_period, retention_period}

case cached_mode() do
mode when mode in [:local, :observer] ->
GenServer.cast(__MODULE__, msg)

_mode_with_no_storage ->
:ok
end
end

### ==========================================================================
### Private functions
### ==========================================================================
Expand Down Expand Up @@ -394,6 +411,31 @@ defmodule ObserverWeb.Telemetry.Storage do
end
end

defp prune_old_data_immediately(
%{node_metric_tables: tables, data_retention_period: old_retention},
new_retention
) do
now_minutes = unix_to_minutes()
new_retention_minutes = trunc(new_retention / @one_minute_in_milliseconds)
old_retention_minutes = trunc(old_retention / @one_minute_in_milliseconds)

# Delete all data outside the new retention window
deletion_from = now_minutes - old_retention_minutes - 2
deletion_to = now_minutes - new_retention_minutes

Enum.each(tables, fn {node, table} ->
node
|> get_keys_by_node()
|> Enum.each(&prune_keys(&1, table, deletion_from, deletion_to))
end)
end

defp prune_keys(key, table, deletion_period_from, deletion_period_to) do
Enum.each(deletion_period_from..deletion_period_to, fn timestamp ->
:ets.delete(table, metric_key(key, timestamp))
end)
end

# NOTE: PubSub topics
defp keys_topic, do: "metrics::keys"
defp metrics_topic(node, key), do: "metrics::#{node}::#{key}"
Expand Down
7 changes: 6 additions & 1 deletion lib/web/pages/metrics/page.ex
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ defmodule Observer.Web.Metrics.Page do
field={@form[:start_time]}
type="select"
label="Start Time"
options={["1m", "5m", "15m", "30m", "1h"]}
options={["1m", "5m", "15m", "30m", "1h", "6h", "12h", "1d", "3d", "1w"]}
/>

<div>
Expand Down Expand Up @@ -438,6 +438,11 @@ defmodule Observer.Web.Metrics.Page do
defp start_time_to_integer("15m"), do: 15
defp start_time_to_integer("30m"), do: 30
defp start_time_to_integer("1h"), do: 60
defp start_time_to_integer("6h"), do: 360
defp start_time_to_integer("12h"), do: 720
defp start_time_to_integer("1d"), do: 1_440
defp start_time_to_integer("3d"), do: 4_320
defp start_time_to_integer("1w"), do: 10_080

defp node_info_new,
do: %{
Expand Down
30 changes: 30 additions & 0 deletions test/observer_web/telemetry/observer_storage_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,36 @@ defmodule ObserverWeb.Telemetry.ObserverStorageTest do
assert [] = Storage.list_data_by_node_key(node |> to_string(), key_name, order: :asc)
end

test "Pruning expiring entries when retention period is updated", %{node: node} do
key_name = "test.phoenix"

now = System.os_time(:millisecond)

Storage.subscribe_for_new_data(node, key_name)

with_mock System, os_time: fn _ -> now - 30_000 end do
Enum.each(1..5, &Storage.push_data(build_metric(node, key_name, &1)))

assert_receive {:metrics_new_data, ^node, ^key_name, %{timestamp: _, unit: _, value: 5}},
1_000
end

assert [
%ObserverWeb.Telemetry.Data{timestamp: _, unit: _, value: 1, tags: _},
%ObserverWeb.Telemetry.Data{timestamp: _, unit: _, value: 2, tags: _},
%ObserverWeb.Telemetry.Data{timestamp: _, unit: _, value: 3, tags: _},
%ObserverWeb.Telemetry.Data{timestamp: _, unit: _, value: 4, tags: _},
%ObserverWeb.Telemetry.Data{timestamp: _, unit: _, value: 5, tags: _}
] = Storage.list_data_by_node_key(node |> to_string(), key_name, order: :asc)

# Update retention period
Storage.update_data_retention_period(1_000)

:timer.sleep(100)

assert [] = Storage.list_data_by_node_key(node |> to_string(), key_name, order: :asc)
end

defp build_metric(node, name, value) do
%{
metrics: [
Expand Down
30 changes: 30 additions & 0 deletions test/observer_web/web/live/metrics/page_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,36 @@ defmodule Observer.Web.Metrics.PageLiveTest do
liveview
|> element("#metrics-update-form")
|> render_change(%{num_cols: 4, start_time: time}) =~ time

time = "6h"

liveview
|> element("#metrics-update-form")
|> render_change(%{num_cols: 4, start_time: time}) =~ time

time = "12h"

liveview
|> element("#metrics-update-form")
|> render_change(%{num_cols: 4, start_time: time}) =~ time

time = "1d"

liveview
|> element("#metrics-update-form")
|> render_change(%{num_cols: 4, start_time: time}) =~ time

time = "3d"

liveview
|> element("#metrics-update-form")
|> render_change(%{num_cols: 4, start_time: time}) =~ time

time = "1w"

liveview
|> element("#metrics-update-form")
|> render_change(%{num_cols: 4, start_time: time}) =~ time
end

test "Testing NodeDown, removing the current node from the selected services", %{conn: conn} do
Expand Down