Skip to content

Commit

Permalink
Add process labels (#1391)
Browse files Browse the repository at this point in the history
Adding labels to processes to make using the Erlang Observer and various
debugging tasks easier
  • Loading branch information
robacourt committed Jun 24, 2024
1 parent a94e860 commit 20334cd
Show file tree
Hide file tree
Showing 23 changed files with 34 additions and 2 deletions.
1 change: 1 addition & 0 deletions components/electric/lib/electric/features.ex
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ defmodule Electric.Features do

def init(args) do
name = Keyword.get(args, :name, __MODULE__)
Process.set_label({:features, name})

table = :ets.new(name, [:set, :public, :named_table, read_concurrency: true])

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ defmodule Electric.Postgres.CachedWal.EtsBacked do
@impl GenStage
def init(opts) do
origin = Keyword.fetch!(opts, :origin)
Process.set_label({:wal_cache, origin})

table = :ets.new(ets_table_name(origin), [:named_table, :ordered_set])
Logger.metadata(origin: origin, component: "CachedWal.EtsBacked")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ defmodule Electric.Postgres.CachedWal.Producer do
Logger.metadata(component: "CachedWal.Producer")

origin = Keyword.fetch!(opts, :origin)
Process.set_label({:wal_cache_producer, origin})

{:producer,
%{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ defmodule Electric.Postgres.Extension.SchemaCache do
@impl GenServer
def init({conn_config, opts}) do
origin = Connectors.origin(conn_config)
Process.set_label({:schema_cache, origin})

Logger.metadata(pg_producer: origin)
Logger.info("Starting #{__MODULE__} for #{origin}")
Expand Down Expand Up @@ -389,6 +390,7 @@ defmodule Electric.Postgres.Extension.SchemaCache do
def handle_call({:refresh_subscription, name}, from, %{refresh_task: nil} = state) do
task =
Task.async(fn ->
Process.set_label({:refresh_subscription, name})
result = SchemaLoader.refresh_subscription(state.backend, name)
GenServer.reply(from, result)
:ok
Expand Down
1 change: 1 addition & 0 deletions components/electric/lib/electric/postgres/oid_database.ex
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ defmodule Electric.Postgres.OidDatabase do
end

def init(_) do
Process.set_label(:oid_database)
oid_table = :ets.new(@oid_table, [:set, :named_table, keypos: 2, read_concurrency: true])
:ets.insert(oid_table, Electric.Postgres.OidDatabase.Defaults.get_defaults())

Expand Down
1 change: 1 addition & 0 deletions components/electric/lib/electric/postgres/proxy/handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ defmodule Electric.Postgres.Proxy.Handler do

@spec initial_state(Connectors.config(), options()) :: S.t()
def initial_state(connector_config, proxy_opts) do
Process.set_label(:postgres_proxy_handler)
{loader, loader_opts} = Keyword.get(proxy_opts, :loader, {SchemaCache, []})
password = connector_config |> get_in([:proxy, :password]) |> validate_password()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ defmodule Electric.Postgres.Proxy.SASL.SCRAMLockedCache do

@impl true
def init(:ok) do
Process.set_label(:scram_locked_cache)
init()
{:ok, %{keys: %{}, ref_to_key: %{}}}
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ defmodule Electric.Postgres.Proxy.UpstreamConnection do
parent = Keyword.fetch!(args, :parent)
connector_config = Keyword.fetch!(args, :connector_config)
session_id = Keyword.fetch!(args, :session_id)
Process.set_label({:upstream_connection, session_id})

name = name(session_id)
Electric.reg(name)
Expand Down
5 changes: 4 additions & 1 deletion components/electric/lib/electric/replication/initial_sync.ex
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,10 @@ defmodule Electric.Replication.InitialSync do
# 1. is made after the current transaction has started
# 2. is in a separate transaction (thus on a different connection)
# 3. is before the potentially big read queries to ensure this arrives ASAP on any data size
Task.start(fn -> perform_magic_write(conn_opts, marker) end)
Task.start(fn ->
Process.set_label(:magic_write)
perform_magic_write(conn_opts, marker)
end)

{:ok, _, [{xmin_str}]} =
:epgsql.squery(conn, "SELECT pg_snapshot_xmin(pg_current_snapshot())")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ defmodule Electric.Replication.Postgres.LogicalReplicationProducer do
repl_conn_opts = Connectors.get_connection_opts(connector_config, replication: true)
repl_opts = Connectors.get_replication_opts(connector_config)
wal_window_opts = Connectors.get_wal_window_opts(connector_config)
Process.set_label({:logical_replication, origin})

publication = repl_opts.publication
main_slot = repl_opts.slot
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ defmodule Electric.Replication.Postgres.MigrationConsumer do
@impl GenStage
def init({conn_config, opts}) do
origin = Connectors.origin(conn_config)
Process.set_label({:migration_consumer, origin})

%{publication: publication, subscription: subscription} =
Connectors.get_replication_opts(conn_config)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ defmodule Electric.Replication.Postgres.SlotServer do
{:via, :gproc, producer} = Keyword.fetch!(opts, :producer)

origin = Connectors.origin(conn_config)
Process.set_label({:slot_server, origin})
replication_opts = Connectors.get_replication_opts(conn_config)
slot = replication_opts.subscription

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ defmodule Electric.Replication.Postgres.TcpServer do

@impl ThousandIsland.Handler
def handle_connection(socket, _init) do
Process.set_label(:tcp_server)
{:ok, {ip, port}} = Socket.peername(socket)

client = "#{:inet.ntoa(ip)}:#{port}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ defmodule Electric.Replication.Postgres.Writer do
origin = Connectors.origin(conn_config)
name = name(origin)
Electric.reg(name)
Process.set_label({:writer_to_pg, origin})

Logger.metadata(origin: origin)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ defmodule Electric.Replication.PostgresConnectorMng do
@impl GenServer
def init(connector_config) do
origin = Connectors.origin(connector_config)
Process.set_label({:postgres_manager, origin})
name = name(origin)
Electric.reg(name)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ defmodule Electric.Replication.SatelliteCollectorConsumer do

@impl GenStage
def init(opts) do
Process.set_label(:satellite_collector_consumer)
{:consumer, Map.new(Keyword.take(opts, [:push_to])), Keyword.take(opts, [:subscribe_to])}
end

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ defmodule Electric.Replication.SatelliteCollectorProducer do

@impl GenStage
def init(connector_config) do
origin = Connectors.origin(connector_config)
Process.set_label({:satellite_collector_producer, origin})
table = :ets.new(nil, [:ordered_set, keypos: 2])

{:producer,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ defmodule Electric.Replication.SatelliteConnector do

@impl Supervisor
def init(%{name: name, producer: producer, origin: origin}) do
Process.set_label({:satellite_collector_supervisor, origin, name})

# `cancel: :temporary` is used here since the death of the Satellite WS process will eventually kill the supervisor,
# but it'll kill SatelliteCollectorConsumer first and cause it to restart with nowhere to resubscribe.
children = [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ defmodule Electric.Satellite.ClientManager do

@impl GenServer
def init(_) do
Process.set_label(:client_manager)
{:ok, %State{}}
end

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1005,7 +1005,9 @@ defmodule Electric.Satellite.ClientReconnectionInfo do

@impl GenServer
def init(connector_config) do
origin = Connectors.origin(connector_config)
Logger.metadata(component: "ClientReconnectionInfo")
Process.set_label({:client_reconnection_info, origin})

checkpoint_table = :ets.new(@checkpoint_ets, [:named_table, :public, :set])
subscriptions_table = :ets.new(@subscriptions_ets, [:named_table, :public, :ordered_set])
Expand Down
3 changes: 3 additions & 0 deletions components/electric/lib/electric/satellite/protocol.ex
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ defmodule Electric.Satellite.Protocol do
when not auth_passed?(state) and client_id != "" and token != "" do
Logger.metadata(client_id: client_id)
Logger.debug("Received auth request")
Process.set_label({:ws_server, state.origin, client_id})

# NOTE: We treat successful registration with Electric.safe_reg as an
# indication that at least the previously connected WS client is down.
Expand Down Expand Up @@ -1102,6 +1103,7 @@ defmodule Electric.Satellite.Protocol do

Task.start(fn ->
# This is `InitialSync.query_subscription_data/2` by default, but can be overridden for tests.
Process.set_label({:initial_sync, state.origin, id})
# Please see documentation on that function for context on the next `receive` block.
fun.({id, requests, context},
reply_to: {ref, parent},
Expand Down Expand Up @@ -1165,6 +1167,7 @@ defmodule Electric.Satellite.Protocol do
move_in_ref = state.out_rep.move_in_next_ref

Task.start(fn ->
Process.set_label({:initial_sync_query_move_in_data, state.origin})
# This is `InitialSync.query_move_in_data/4` by default, but can be overridden for tests.
# Please see documentation on that function for context on the next `receive` block.
fun.(move_in_ref, actions, context,
Expand Down
4 changes: 3 additions & 1 deletion components/electric/lib/electric/satellite/ws_server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,15 @@ defmodule Electric.Satellite.WebsocketServer do
@impl WebSock
def init(opts) do
connector_config = Keyword.fetch!(opts, :connector_config)
origin = Connectors.origin(connector_config)
Process.set_label({:ws_server, origin})

{:ok,
schedule_ping(%State{
last_msg_time: :erlang.timestamp(),
auth_provider: Keyword.fetch!(opts, :auth_provider),
connector_config: connector_config,
origin: Connectors.origin(connector_config),
origin: origin,
subscription_data_fun: Keyword.fetch!(opts, :subscription_data_fun),
move_in_data_fun: Keyword.fetch!(opts, :move_in_data_fun),
out_rep: %OutRep{allowed_unacked_txs: Keyword.get(opts, :allowed_unacked_txs, 30)},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ defmodule Electric.Telemetry.CallHomeReporter do
# We need to trap exits here so that `terminate/2` callback has more chances to run
# and send data before crash/shutdown
Process.flag(:trap_exit, true)
Process.set_label({:call_home_reporter, name})

metrics = save_target_path_to_options(metrics)

Expand Down

0 comments on commit 20334cd

Please sign in to comment.