diff --git a/CHANGELOG.md b/CHANGELOG.md index 796e6d3..ae79033 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,16 @@ # CHANGELOG (v0.2.X) +## 0.2.2 🚀 () + +### ⚠️ Backwards incompatible changes for 0.2.1 + * None + +### Bug fixes + * None + +### Enhancements + * None + ## 0.2.1 🚀 (2025-10-27) ### ⚠️ Backwards incompatible changes for 0.2.0 diff --git a/lib/observer_web/telemetry.ex b/lib/observer_web/telemetry.ex index e2664a7..ea2c88e 100644 --- a/lib/observer_web/telemetry.ex +++ b/lib/observer_web/telemetry.ex @@ -79,6 +79,13 @@ defmodule ObserverWeb.Telemetry do @spec cached_mode() :: :local | :broadcast | :observer def cached_mode, do: default().cached_mode() + @doc """ + Update data retention period + """ + @spec update_data_retention_period(non_neg_integer()) :: :ok + def update_data_retention_period(retention_period), + do: default().update_data_retention_period(retention_period) + ### ========================================================================== ### Private functions ### ========================================================================== diff --git a/lib/observer_web/telemetry/adapter.ex b/lib/observer_web/telemetry/adapter.ex index 4925236..8efc0f7 100644 --- a/lib/observer_web/telemetry/adapter.ex +++ b/lib/observer_web/telemetry/adapter.ex @@ -11,4 +11,5 @@ defmodule ObserverWeb.Telemetry.Adapter do @callback get_keys_by_node(node()) :: list() @callback list_active_nodes() :: list() @callback cached_mode() :: nil | :local | :broadcast | :observer + @callback update_data_retention_period(non_neg_integer()) :: :ok end diff --git a/lib/observer_web/telemetry/storage.ex b/lib/observer_web/telemetry/storage.ex index 1c898ec..1e593d1 100644 --- a/lib/observer_web/telemetry/storage.ex +++ b/lib/observer_web/telemetry/storage.ex @@ -103,6 +103,16 @@ defmodule ObserverWeb.Telemetry.Storage do do_handle_metrics(event, state) end + def handle_cast({:update_data_retention_period, retention_period}, state) do + # If decreasing retention period, immediately prune old data + if retention_period && state.data_retention_period && + retention_period < state.data_retention_period do + prune_old_data_immediately(state, retention_period) + end + + {:noreply, %{state | data_retention_period: retention_period}} + end + @impl true def handle_info({:observer_web_telemetry, event}, state) do do_handle_metrics(event, state) @@ -164,16 +174,10 @@ defmodule ObserverWeb.Telemetry.Storage do deletion_period_to = now_minutes - retention_period - 1 deletion_period_from = deletion_period_to - 2 - prune_keys = fn key, table -> - Enum.each(deletion_period_from..deletion_period_to, fn timestamp -> - :ets.delete(table, metric_key(key, timestamp)) - end) - end - Enum.each(tables, fn {node, table} -> node |> get_keys_by_node() - |> Enum.each(&prune_keys.(&1, table)) + |> Enum.each(&prune_keys(&1, table, deletion_period_from, deletion_period_to)) end) {:noreply, state} @@ -351,6 +355,19 @@ defmodule ObserverWeb.Telemetry.Storage do ets_lookup_if_exist(@storage_table, @mode_key, nil) end + @impl true + def update_data_retention_period(retention_period) do + msg = {:update_data_retention_period, retention_period} + + case cached_mode() do + mode when mode in [:local, :observer] -> + GenServer.cast(__MODULE__, msg) + + _mode_with_no_storage -> + :ok + end + end + ### ========================================================================== ### Private functions ### ========================================================================== @@ -394,6 +411,31 @@ defmodule ObserverWeb.Telemetry.Storage do end end + defp prune_old_data_immediately( + %{node_metric_tables: tables, data_retention_period: old_retention}, + new_retention + ) do + now_minutes = unix_to_minutes() + new_retention_minutes = trunc(new_retention / @one_minute_in_milliseconds) + old_retention_minutes = trunc(old_retention / @one_minute_in_milliseconds) + + # Delete all data outside the new retention window + deletion_from = now_minutes - old_retention_minutes - 2 + deletion_to = now_minutes - new_retention_minutes + + Enum.each(tables, fn {node, table} -> + node + |> get_keys_by_node() + |> Enum.each(&prune_keys(&1, table, deletion_from, deletion_to)) + end) + end + + defp prune_keys(key, table, deletion_period_from, deletion_period_to) do + Enum.each(deletion_period_from..deletion_period_to, fn timestamp -> + :ets.delete(table, metric_key(key, timestamp)) + end) + end + # NOTE: PubSub topics defp keys_topic, do: "metrics::keys" defp metrics_topic(node, key), do: "metrics::#{node}::#{key}" diff --git a/lib/web/pages/metrics/page.ex b/lib/web/pages/metrics/page.ex index 5d27b49..aad01e3 100644 --- a/lib/web/pages/metrics/page.ex +++ b/lib/web/pages/metrics/page.ex @@ -78,7 +78,7 @@ defmodule Observer.Web.Metrics.Page do field={@form[:start_time]} type="select" label="Start Time" - options={["1m", "5m", "15m", "30m", "1h"]} + options={["1m", "5m", "15m", "30m", "1h", "6h", "12h", "1d", "3d", "1w"]} />
@@ -438,6 +438,11 @@ defmodule Observer.Web.Metrics.Page do defp start_time_to_integer("15m"), do: 15 defp start_time_to_integer("30m"), do: 30 defp start_time_to_integer("1h"), do: 60 + defp start_time_to_integer("6h"), do: 360 + defp start_time_to_integer("12h"), do: 720 + defp start_time_to_integer("1d"), do: 1_440 + defp start_time_to_integer("3d"), do: 4_320 + defp start_time_to_integer("1w"), do: 10_080 defp node_info_new, do: %{ diff --git a/test/observer_web/telemetry/observer_storage_test.exs b/test/observer_web/telemetry/observer_storage_test.exs index 317042e..a5d9e33 100644 --- a/test/observer_web/telemetry/observer_storage_test.exs +++ b/test/observer_web/telemetry/observer_storage_test.exs @@ -269,6 +269,36 @@ defmodule ObserverWeb.Telemetry.ObserverStorageTest do assert [] = Storage.list_data_by_node_key(node |> to_string(), key_name, order: :asc) end + test "Pruning expiring entries when retention period is updated", %{node: node} do + key_name = "test.phoenix" + + now = System.os_time(:millisecond) + + Storage.subscribe_for_new_data(node, key_name) + + with_mock System, os_time: fn _ -> now - 30_000 end do + Enum.each(1..5, &Storage.push_data(build_metric(node, key_name, &1))) + + assert_receive {:metrics_new_data, ^node, ^key_name, %{timestamp: _, unit: _, value: 5}}, + 1_000 + end + + assert [ + %ObserverWeb.Telemetry.Data{timestamp: _, unit: _, value: 1, tags: _}, + %ObserverWeb.Telemetry.Data{timestamp: _, unit: _, value: 2, tags: _}, + %ObserverWeb.Telemetry.Data{timestamp: _, unit: _, value: 3, tags: _}, + %ObserverWeb.Telemetry.Data{timestamp: _, unit: _, value: 4, tags: _}, + %ObserverWeb.Telemetry.Data{timestamp: _, unit: _, value: 5, tags: _} + ] = Storage.list_data_by_node_key(node |> to_string(), key_name, order: :asc) + + # Update retention period + Storage.update_data_retention_period(1_000) + + :timer.sleep(100) + + assert [] = Storage.list_data_by_node_key(node |> to_string(), key_name, order: :asc) + end + defp build_metric(node, name, value) do %{ metrics: [ diff --git a/test/observer_web/web/live/metrics/page_test.exs b/test/observer_web/web/live/metrics/page_test.exs index 3a7b855..9fb8342 100644 --- a/test/observer_web/web/live/metrics/page_test.exs +++ b/test/observer_web/web/live/metrics/page_test.exs @@ -160,6 +160,36 @@ defmodule Observer.Web.Metrics.PageLiveTest do liveview |> element("#metrics-update-form") |> render_change(%{num_cols: 4, start_time: time}) =~ time + + time = "6h" + + liveview + |> element("#metrics-update-form") + |> render_change(%{num_cols: 4, start_time: time}) =~ time + + time = "12h" + + liveview + |> element("#metrics-update-form") + |> render_change(%{num_cols: 4, start_time: time}) =~ time + + time = "1d" + + liveview + |> element("#metrics-update-form") + |> render_change(%{num_cols: 4, start_time: time}) =~ time + + time = "3d" + + liveview + |> element("#metrics-update-form") + |> render_change(%{num_cols: 4, start_time: time}) =~ time + + time = "1w" + + liveview + |> element("#metrics-update-form") + |> render_change(%{num_cols: 4, start_time: time}) =~ time end test "Testing NodeDown, removing the current node from the selected services", %{conn: conn} do