Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add process labels #1391

Merged
merged 4 commits into from
Jun 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions components/electric/lib/electric/features.ex
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ defmodule Electric.Features do

def init(args) do
name = Keyword.get(args, :name, __MODULE__)
Process.set_label({:features, name})

table = :ets.new(name, [:set, :public, :named_table, read_concurrency: true])

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ defmodule Electric.Postgres.CachedWal.EtsBacked do
@impl GenStage
def init(opts) do
origin = Keyword.fetch!(opts, :origin)
Process.set_label({:wal_cache, origin})

table = :ets.new(ets_table_name(origin), [:named_table, :ordered_set])
Logger.metadata(origin: origin, component: "CachedWal.EtsBacked")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ defmodule Electric.Postgres.CachedWal.Producer do
Logger.metadata(component: "CachedWal.Producer")

origin = Keyword.fetch!(opts, :origin)
Process.set_label({:wal_cache_producer, origin})

{:producer,
%{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ defmodule Electric.Postgres.Extension.SchemaCache do
@impl GenServer
def init({conn_config, opts}) do
origin = Connectors.origin(conn_config)
Process.set_label({:schema_cache, origin})

Logger.metadata(pg_producer: origin)
Logger.info("Starting #{__MODULE__} for #{origin}")
Expand Down Expand Up @@ -389,6 +390,7 @@ defmodule Electric.Postgres.Extension.SchemaCache do
def handle_call({:refresh_subscription, name}, from, %{refresh_task: nil} = state) do
task =
Task.async(fn ->
Process.set_label({:refresh_subscription, name})
result = SchemaLoader.refresh_subscription(state.backend, name)
GenServer.reply(from, result)
:ok
Expand Down
1 change: 1 addition & 0 deletions components/electric/lib/electric/postgres/oid_database.ex
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ defmodule Electric.Postgres.OidDatabase do
end

def init(_) do
Process.set_label(:oid_database)
oid_table = :ets.new(@oid_table, [:set, :named_table, keypos: 2, read_concurrency: true])
:ets.insert(oid_table, Electric.Postgres.OidDatabase.Defaults.get_defaults())

Expand Down
1 change: 1 addition & 0 deletions components/electric/lib/electric/postgres/proxy/handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ defmodule Electric.Postgres.Proxy.Handler do

@spec initial_state(Connectors.config(), options()) :: S.t()
def initial_state(connector_config, proxy_opts) do
Process.set_label(:postgres_proxy_handler)
{loader, loader_opts} = Keyword.get(proxy_opts, :loader, {SchemaCache, []})
password = connector_config |> get_in([:proxy, :password]) |> validate_password()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ defmodule Electric.Postgres.Proxy.SASL.SCRAMLockedCache do

@impl true
def init(:ok) do
Process.set_label(:scram_locked_cache)
init()
{:ok, %{keys: %{}, ref_to_key: %{}}}
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ defmodule Electric.Postgres.Proxy.UpstreamConnection do
parent = Keyword.fetch!(args, :parent)
connector_config = Keyword.fetch!(args, :connector_config)
session_id = Keyword.fetch!(args, :session_id)
Process.set_label({:upstream_connection, session_id})

name = name(session_id)
Electric.reg(name)
Expand Down
5 changes: 4 additions & 1 deletion components/electric/lib/electric/replication/initial_sync.ex
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,10 @@ defmodule Electric.Replication.InitialSync do
# 1. is made after the current transaction has started
# 2. is in a separate transaction (thus on a different connection)
# 3. is before the potentially big read queries to ensure this arrives ASAP on any data size
Task.start(fn -> perform_magic_write(conn_opts, marker) end)
Task.start(fn ->
Process.set_label(:magic_write)
perform_magic_write(conn_opts, marker)
end)

{:ok, _, [{xmin_str}]} =
:epgsql.squery(conn, "SELECT pg_snapshot_xmin(pg_current_snapshot())")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ defmodule Electric.Replication.Postgres.LogicalReplicationProducer do
repl_conn_opts = Connectors.get_connection_opts(connector_config, replication: true)
repl_opts = Connectors.get_replication_opts(connector_config)
wal_window_opts = Connectors.get_wal_window_opts(connector_config)
Process.set_label({:logical_replication, origin})

publication = repl_opts.publication
main_slot = repl_opts.slot
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ defmodule Electric.Replication.Postgres.MigrationConsumer do
@impl GenStage
def init({conn_config, opts}) do
origin = Connectors.origin(conn_config)
Process.set_label({:migration_consumer, origin})

%{publication: publication, subscription: subscription} =
Connectors.get_replication_opts(conn_config)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ defmodule Electric.Replication.Postgres.SlotServer do
{:via, :gproc, producer} = Keyword.fetch!(opts, :producer)

origin = Connectors.origin(conn_config)
Process.set_label({:slot_server, origin})
replication_opts = Connectors.get_replication_opts(conn_config)
slot = replication_opts.subscription

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ defmodule Electric.Replication.Postgres.TcpServer do

@impl ThousandIsland.Handler
def handle_connection(socket, _init) do
Process.set_label(:tcp_server)
{:ok, {ip, port}} = Socket.peername(socket)

client = "#{:inet.ntoa(ip)}:#{port}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ defmodule Electric.Replication.Postgres.Writer do
origin = Connectors.origin(conn_config)
name = name(origin)
Electric.reg(name)
Process.set_label({:writer_to_pg, origin})

Logger.metadata(origin: origin)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ defmodule Electric.Replication.PostgresConnectorMng do
@impl GenServer
def init(connector_config) do
origin = Connectors.origin(connector_config)
Process.set_label({:postgres_manager, origin})
name = name(origin)
Electric.reg(name)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ defmodule Electric.Replication.SatelliteCollectorConsumer do

@impl GenStage
def init(opts) do
Process.set_label(:satellite_collector_consumer)
{:consumer, Map.new(Keyword.take(opts, [:push_to])), Keyword.take(opts, [:subscribe_to])}
end

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ defmodule Electric.Replication.SatelliteCollectorProducer do

@impl GenStage
def init(connector_config) do
origin = Connectors.origin(connector_config)
Process.set_label({:satellite_collector_producer, origin})
table = :ets.new(nil, [:ordered_set, keypos: 2])

{:producer,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ defmodule Electric.Replication.SatelliteConnector do

@impl Supervisor
def init(%{name: name, producer: producer, origin: origin}) do
Process.set_label({:satellite_collector_supervisor, origin, name})

# `cancel: :temporary` is used here since the death of the Satellite WS process will eventually kill the supervisor,
# but it'll kill SatelliteCollectorConsumer first and cause it to restart with nowhere to resubscribe.
children = [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ defmodule Electric.Satellite.ClientManager do

@impl GenServer
def init(_) do
Process.set_label(:client_manager)
{:ok, %State{}}
end

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1005,7 +1005,9 @@ defmodule Electric.Satellite.ClientReconnectionInfo do

@impl GenServer
def init(connector_config) do
origin = Connectors.origin(connector_config)
Logger.metadata(component: "ClientReconnectionInfo")
Process.set_label({:client_reconnection_info, origin})

checkpoint_table = :ets.new(@checkpoint_ets, [:named_table, :public, :set])
subscriptions_table = :ets.new(@subscriptions_ets, [:named_table, :public, :ordered_set])
Expand Down
3 changes: 3 additions & 0 deletions components/electric/lib/electric/satellite/protocol.ex
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ defmodule Electric.Satellite.Protocol do
when not auth_passed?(state) and client_id != "" and token != "" do
Logger.metadata(client_id: client_id)
Logger.debug("Received auth request")
Process.set_label({:ws_server, state.origin, client_id})

# NOTE: We treat successful registration with Electric.safe_reg as an
# indication that at least the previously connected WS client is down.
Expand Down Expand Up @@ -1102,6 +1103,7 @@ defmodule Electric.Satellite.Protocol do

Task.start(fn ->
# This is `InitialSync.query_subscription_data/2` by default, but can be overridden for tests.
Process.set_label({:initial_sync, state.origin, id})
# Please see documentation on that function for context on the next `receive` block.
fun.({id, requests, context},
reply_to: {ref, parent},
Expand Down Expand Up @@ -1165,6 +1167,7 @@ defmodule Electric.Satellite.Protocol do
move_in_ref = state.out_rep.move_in_next_ref

Task.start(fn ->
Process.set_label({:initial_sync_query_move_in_data, state.origin})
# This is `InitialSync.query_move_in_data/4` by default, but can be overridden for tests.
# Please see documentation on that function for context on the next `receive` block.
fun.(move_in_ref, actions, context,
Expand Down
4 changes: 3 additions & 1 deletion components/electric/lib/electric/satellite/ws_server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,15 @@ defmodule Electric.Satellite.WebsocketServer do
@impl WebSock
def init(opts) do
connector_config = Keyword.fetch!(opts, :connector_config)
origin = Connectors.origin(connector_config)
Process.set_label({:ws_server, origin})

{:ok,
schedule_ping(%State{
last_msg_time: :erlang.timestamp(),
auth_provider: Keyword.fetch!(opts, :auth_provider),
connector_config: connector_config,
origin: Connectors.origin(connector_config),
origin: origin,
subscription_data_fun: Keyword.fetch!(opts, :subscription_data_fun),
move_in_data_fun: Keyword.fetch!(opts, :move_in_data_fun),
out_rep: %OutRep{allowed_unacked_txs: Keyword.get(opts, :allowed_unacked_txs, 30)},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ defmodule Electric.Telemetry.CallHomeReporter do
# We need to trap exits here so that `terminate/2` callback has more chances to run
# and send data before crash/shutdown
Process.flag(:trap_exit, true)
Process.set_label({:call_home_reporter, name})

metrics = save_target_path_to_options(metrics)

Expand Down
Loading