Skip to content

Commit

Permalink
chore: add a StatsD metric exporter (#1362)
Browse files Browse the repository at this point in the history
  • Loading branch information
icehaunter committed Jun 14, 2024
1 parent 326c68c commit bd42afb
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 28 deletions.
6 changes: 6 additions & 0 deletions components/electric/config/runtime.exs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ default_write_to_pg_mode = "logical_replication"
default_proxy_tracing_enable = false
default_resumable_wal_window = 2 * 1024 * 1024 * 1024
default_txn_cache_size = 256 * 1024 * 1024
default_metric_period = 2_000

if config_env() in [:dev, :test] do
source!([".env.#{config_env()}", ".env.#{config_env()}.local", System.get_env()])
Expand Down Expand Up @@ -139,6 +140,9 @@ pg_proxy_password_config =
env!("PG_PROXY_PORT", :string, nil)
|> Electric.Config.parse_pg_proxy_port(default_pg_proxy_port)

metric_period = env!("METRICS_MEASUREMENT_PERIOD", :integer, default_metric_period)
statsd_host = env!("STATSD_HOST", :string?, nil)

potential_errors =
auth_errors ++
[
Expand Down Expand Up @@ -171,6 +175,7 @@ end

{:ok, log_level} = log_level_config
config :logger, level: log_level
config :telemetry_poller, :default, period: metric_period

config :electric, Electric.Satellite.Auth, provider: auth_provider

Expand All @@ -184,6 +189,7 @@ config :electric,
http_port: env!("HTTP_PORT", :integer, default_http_server_port),
pg_server_port: pg_server_port,
listen_on_ipv6?: listen_on_ipv6?,
telemetry_statsd_host: statsd_host,
write_to_pg_mode: write_to_pg_mode

# disable all ddlx commands apart from `ENABLE`
Expand Down
75 changes: 48 additions & 27 deletions components/electric/lib/electric/telemetry.ex
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
defmodule Electric.Telemetry do
use Supervisor
alias Telemetry.Metrics
import Telemetry.Metrics

def start_link(init_arg) do
Expand All @@ -13,6 +12,7 @@ defmodule Electric.Telemetry do
]

children
|> add_statsd_reporter(Application.fetch_env!(:electric, :telemetry_statsd_host))
|> add_call_home_reporter(Application.fetch_env!(:electric, :telemetry))
|> Supervisor.init(strategy: :one_for_one)
end
Expand All @@ -31,6 +31,19 @@ defmodule Electric.Telemetry do

defp add_call_home_reporter(children, _), do: children

defp add_statsd_reporter(children, nil), do: children

defp add_statsd_reporter(children, host) do
children ++
[
{TelemetryMetricsStatsd,
host: host,
formatter: :datadog,
global_tags: [instance_id: Electric.instance_id()],
metrics: statsd_metrics()}
]
end

def static_info() do
{total_mem, _, _} = :memsup.get_memory_data()
processors = :erlang.system_info(:logical_processors)
Expand Down Expand Up @@ -124,37 +137,45 @@ defmodule Electric.Telemetry do
]
end

@doc false
# This function is not currently used, but is here as a general reference to the metrics exposed
# by our system. We're likely to want to expose them as prometheus metrics at some point.
def metrics(),
do: [
Metrics.last_value("electric.postgres.migration.electrified_tables"),
Metrics.counter("electric.postgres.replication_from.start.monotonic_time",
defp statsd_metrics() do
[
last_value("vm.memory.total", unit: :byte),
last_value("vm.memory.processes_used", unit: :byte),
last_value("vm.memory.binary", unit: :byte),
last_value("vm.memory.ets", unit: :byte),
last_value("vm.total_run_queue_lengths.total"),
last_value("vm.total_run_queue_lengths.cpu"),
last_value("vm.total_run_queue_lengths.io"),
last_value("electric.resources.wal_cache.cache_memory_total", unit: :byte),
last_value("electric.postgres.migration.electrified_tables"),
counter("electric.postgres.replication_from.start.monotonic_time",
tags: [:short_version]
),
Metrics.last_value("electric.postgres.replication_from.start.electrified_tables"),
Metrics.sum("electric.postgres.replication_from.transaction.operations"),
Metrics.counter("electric.postgres.replication_to.start.monotonic_time"),
Metrics.sum("electric.postgres.replication_to.send.transactions"),
Metrics.summary("electric.satellite.connection.stop.duration"),
Metrics.summary("electric.satellite.replication.start.continued_subscriptions",
drop: & &1[:initial_sync]
),
Metrics.counter("electric.satellite.replication.start.monotonic_time",
last_value("electric.postgres.replication_from.start.electrified_tables"),
sum("electric.postgres.replication_from.transaction.operations"),
counter("electric.postgres.replication_to.start.monotonic_time"),
sum("electric.postgres.replication_to.send.transactions"),
counter("electric.satellite.replication.start.monotonic_time",
keep: & &1[:initial_sync]
),
Metrics.counter("electric.satellite.replication.start.monotonic_time", tags: [:client_id]),
Metrics.counter("electric.satellite.replication.start.monotonic_time", tags: [:user_id]),
Metrics.sum("electric.satellite.replication.transaction_send.operations"),
Metrics.sum("electric.satellite.replication.transaction_receive.operations"),
Metrics.counter("electric.satellite.replication.bad_transaction.monotonic_time"),
Metrics.summary("electric.satellite.replication.new_subscription.start.included_tables"),
Metrics.summary("electric.satellite.replication.new_subscription.start.shapes"),
Metrics.summary("electric.satellite.replication.new_subscription.shape_data.duration"),
Metrics.summary("electric.satellite.replication.new_subscription.stop.duration"),
Metrics.summary("electric.satellite.replication.new_subscription.stop.send_lag")
counter("electric.satellite.replication.start.monotonic_time", tags: [:client_id]),
counter("electric.satellite.replication.start.monotonic_time", tags: [:user_id]),
sum("electric.satellite.replication.transaction_send.operations"),
sum("electric.satellite.replication.transaction_receive.operations"),
counter("electric.satellite.replication.bad_transaction.monotonic_time"),
summary("electric.satellite.replication.new_subscription.start.monotonic_time"),
summary("electric.satellite.replication.new_subscription.shape_data.duration",
unit: {:native, :millisecond}
),
summary("electric.satellite.replication.new_subscription.stop.duration",
unit: {:native, :millisecond}
),
summary("electric.satellite.replication.new_subscription.stop.send_lag",
unit: {:native, :millisecond}
)
]
|> Enum.map(&%{&1 | tags: [:instance_id | &1.tags]})
end

defp periodic_measurements do
[
Expand Down
3 changes: 2 additions & 1 deletion components/electric/mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ defmodule Electric.MixProject do
{:gen_stage, "~> 1.2"},
{:telemetry, "~> 1.1", override: true},
{:telemetry_poller, "~> 1.0"},
{:telemetry_metrics, "~> 1.0"},
{:telemetry_metrics, "~> 1.0", override: true},
{:telemetry_metrics_statsd, "~> 0.7"},
{:joken, "~> 2.6"},
{:libgraph, "~> 0.16.0"},
{:pathex, "~> 2.5.2"},
Expand Down
1 change: 1 addition & 0 deletions components/electric/mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
"stream_data": {:hex, :stream_data, "1.1.1", "fd515ca95619cca83ba08b20f5e814aaf1e5ebff114659dc9731f966c9226246", [:mix], [], "hexpm", "45d0cd46bd06738463fd53f22b70042dbb58c384bb99ef4e7576e7bb7d3b8c8c"},
"telemetry": {:hex, :telemetry, "1.2.1", "68fdfe8d8f05a8428483a97d7aab2f268aaff24b49e0f599faa091f1d4e7f61c", [:rebar3], [], "hexpm", "dad9ce9d8effc621708f99eac538ef1cbe05d6a874dd741de2e689c47feafed5"},
"telemetry_metrics": {:hex, :telemetry_metrics, "1.0.0", "29f5f84991ca98b8eb02fc208b2e6de7c95f8bb2294ef244a176675adc7775df", [:mix], [{:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "f23713b3847286a534e005126d4c959ebcca68ae9582118ce436b521d1d47d5d"},
"telemetry_metrics_statsd": {:hex, :telemetry_metrics_statsd, "0.7.0", "92732fae63db31ef2508df6faee7d81401883e33f2976715a82f296a33a45cee", [:mix], [{:nimble_options, "~> 0.4 or ~> 1.0", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}, {:telemetry_metrics, "~> 0.6", [hex: :telemetry_metrics, repo: "hexpm", optional: false]}], "hexpm", "797e34a856376dfd4e96347da0f747fcff4e0cadf6e6f0f989598f563cad05ff"},
"telemetry_poller": {:hex, :telemetry_poller, "1.1.0", "58fa7c216257291caaf8d05678c8d01bd45f4bdbc1286838a28c4bb62ef32999", [:rebar3], [{:telemetry, "~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "9eb9d9cbfd81cbd7cdd24682f8711b6e2b691289a0de6826e58452f28c103c8f"},
"thousand_island": {:hex, :thousand_island, "1.3.5", "6022b6338f1635b3d32406ff98d68b843ba73b3aa95cfc27154223244f3a6ca5", [:mix], [{:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "2be6954916fdfe4756af3239fb6b6d75d0b8063b5df03ba76fd8a4c87849e180"},
"timex": {:hex, :timex, "3.7.11", "bb95cb4eb1d06e27346325de506bcc6c30f9c6dea40d1ebe390b262fad1862d1", [:mix], [{:combine, "~> 0.10", [hex: :combine, repo: "hexpm", optional: false]}, {:gettext, "~> 0.20", [hex: :gettext, repo: "hexpm", optional: false]}, {:tzdata, "~> 1.1", [hex: :tzdata, repo: "hexpm", optional: false]}], "hexpm", "8b9024f7efbabaf9bd7aa04f65cf8dcd7c9818ca5737677c7b76acbc6a94d1aa"},
Expand Down

0 comments on commit bd42afb

Please sign in to comment.