From 02b600b8d3e3b38ddf7fd12d8b39923d69b8ff79 Mon Sep 17 00:00:00 2001 From: rob Date: Tue, 18 Jun 2024 11:59:19 +0100 Subject: [PATCH 1/4] Add process labels --- .../electric/lib/electric/postgres/cached_wal/ets_backed.ex | 1 + .../electric/lib/electric/postgres/cached_wal/producer.ex | 1 + .../replication/postgres/logical_replication_producer.ex | 1 + .../lib/electric/replication/postgres/migration_consumer.ex | 1 + .../electric/lib/electric/replication/postgres/writer.ex | 1 + components/electric/lib/electric/satellite/protocol.ex | 1 + components/electric/lib/electric/satellite/ws_server.ex | 4 +++- 7 files changed, 9 insertions(+), 1 deletion(-) diff --git a/components/electric/lib/electric/postgres/cached_wal/ets_backed.ex b/components/electric/lib/electric/postgres/cached_wal/ets_backed.ex index 331b50d0c9..aac007e894 100644 --- a/components/electric/lib/electric/postgres/cached_wal/ets_backed.ex +++ b/components/electric/lib/electric/postgres/cached_wal/ets_backed.ex @@ -123,6 +123,7 @@ defmodule Electric.Postgres.CachedWal.EtsBacked do @impl GenStage def init(opts) do origin = Keyword.fetch!(opts, :origin) + Process.set_label({:wal_cache, origin}) table = :ets.new(ets_table_name(origin), [:named_table, :ordered_set]) Logger.metadata(origin: origin, component: "CachedWal.EtsBacked") diff --git a/components/electric/lib/electric/postgres/cached_wal/producer.ex b/components/electric/lib/electric/postgres/cached_wal/producer.ex index 9ab951238e..9fbf5ceb7b 100644 --- a/components/electric/lib/electric/postgres/cached_wal/producer.ex +++ b/components/electric/lib/electric/postgres/cached_wal/producer.ex @@ -28,6 +28,7 @@ defmodule Electric.Postgres.CachedWal.Producer do Logger.metadata(component: "CachedWal.Producer") origin = Keyword.fetch!(opts, :origin) + Process.set_label({:wal_cache_producer, origin}) {:producer, %{ diff --git a/components/electric/lib/electric/replication/postgres/logical_replication_producer.ex b/components/electric/lib/electric/replication/postgres/logical_replication_producer.ex index ea0e361c43..23af2641f0 100644 --- a/components/electric/lib/electric/replication/postgres/logical_replication_producer.ex +++ b/components/electric/lib/electric/replication/postgres/logical_replication_producer.ex @@ -108,6 +108,7 @@ defmodule Electric.Replication.Postgres.LogicalReplicationProducer do repl_conn_opts = Connectors.get_connection_opts(connector_config, replication: true) repl_opts = Connectors.get_replication_opts(connector_config) wal_window_opts = Connectors.get_wal_window_opts(connector_config) + Process.set_label({:logical_replication, origin}) publication = repl_opts.publication main_slot = repl_opts.slot diff --git a/components/electric/lib/electric/replication/postgres/migration_consumer.ex b/components/electric/lib/electric/replication/postgres/migration_consumer.ex index 33b3742253..397eb54dc9 100644 --- a/components/electric/lib/electric/replication/postgres/migration_consumer.ex +++ b/components/electric/lib/electric/replication/postgres/migration_consumer.ex @@ -43,6 +43,7 @@ defmodule Electric.Replication.Postgres.MigrationConsumer do @impl GenStage def init({conn_config, opts}) do origin = Connectors.origin(conn_config) + Process.set_label({:migration_consumer, origin}) %{publication: publication, subscription: subscription} = Connectors.get_replication_opts(conn_config) diff --git a/components/electric/lib/electric/replication/postgres/writer.ex b/components/electric/lib/electric/replication/postgres/writer.ex index bdc3159e0e..134daf9321 100644 --- a/components/electric/lib/electric/replication/postgres/writer.ex +++ b/components/electric/lib/electric/replication/postgres/writer.ex @@ -29,6 +29,7 @@ defmodule Electric.Replication.Postgres.Writer do origin = Connectors.origin(conn_config) name = name(origin) Electric.reg(name) + Process.set_label({:writer_to_pg, origin}) Logger.metadata(origin: origin) diff --git a/components/electric/lib/electric/satellite/protocol.ex b/components/electric/lib/electric/satellite/protocol.ex index 1ac49069d9..4ddda43e09 100644 --- a/components/electric/lib/electric/satellite/protocol.ex +++ b/components/electric/lib/electric/satellite/protocol.ex @@ -49,6 +49,7 @@ defmodule Electric.Satellite.Protocol do when not auth_passed?(state) and client_id != "" and token != "" do Logger.metadata(client_id: client_id) Logger.debug("Received auth request") + Process.set_label({:ws_server, state.origin, client_id}) # NOTE: We treat successful registration with Electric.safe_reg as an # indication that at least the previously connected WS client is down. diff --git a/components/electric/lib/electric/satellite/ws_server.ex b/components/electric/lib/electric/satellite/ws_server.ex index ebbbc481c8..5fc69c4d52 100644 --- a/components/electric/lib/electric/satellite/ws_server.ex +++ b/components/electric/lib/electric/satellite/ws_server.ex @@ -55,13 +55,15 @@ defmodule Electric.Satellite.WebsocketServer do @impl WebSock def init(opts) do connector_config = Keyword.fetch!(opts, :connector_config) + origin = Connectors.origin(connector_config) + Process.set_label({:ws_server, origin}) {:ok, schedule_ping(%State{ last_msg_time: :erlang.timestamp(), auth_provider: Keyword.fetch!(opts, :auth_provider), connector_config: connector_config, - origin: Connectors.origin(connector_config), + origin: origin, subscription_data_fun: Keyword.fetch!(opts, :subscription_data_fun), move_in_data_fun: Keyword.fetch!(opts, :move_in_data_fun), out_rep: %OutRep{allowed_unacked_txs: Keyword.get(opts, :allowed_unacked_txs, 30)}, From d8d756fc435fefc85b6d58996aa64f32b67ca69d Mon Sep 17 00:00:00 2001 From: rob Date: Tue, 18 Jun 2024 16:11:56 +0100 Subject: [PATCH 2/4] Add label for ClientReconnectionInfo process --- .../electric/lib/electric/satellite/client_reconnection_info.ex | 2 ++ 1 file changed, 2 insertions(+) diff --git a/components/electric/lib/electric/satellite/client_reconnection_info.ex b/components/electric/lib/electric/satellite/client_reconnection_info.ex index 275c530247..cdb95160cc 100644 --- a/components/electric/lib/electric/satellite/client_reconnection_info.ex +++ b/components/electric/lib/electric/satellite/client_reconnection_info.ex @@ -1005,7 +1005,9 @@ defmodule Electric.Satellite.ClientReconnectionInfo do @impl GenServer def init(connector_config) do + origin = Connectors.origin(connector_config) Logger.metadata(component: "ClientReconnectionInfo") + Process.set_label({:client_reconnection_info, origin}) checkpoint_table = :ets.new(@checkpoint_ets, [:named_table, :public, :set]) subscriptions_table = :ets.new(@subscriptions_ets, [:named_table, :public, :ordered_set]) From c44b452cc611b5cee05ab256692712e25092ed1e Mon Sep 17 00:00:00 2001 From: rob Date: Wed, 19 Jun 2024 20:37:02 +0100 Subject: [PATCH 3/4] Add more labels --- components/electric/lib/electric/features.ex | 1 + .../electric/lib/electric/postgres/extension/schema_cache.ex | 1 + components/electric/lib/electric/postgres/oid_database.ex | 1 + components/electric/lib/electric/postgres/proxy/handler.ex | 1 + .../lib/electric/postgres/proxy/sasl/scram_locked_cache.ex | 1 + .../electric/lib/electric/postgres/proxy/upstream_connection.ex | 1 + .../electric/lib/electric/replication/postgres/slot_server.ex | 1 + .../electric/lib/electric/replication/postgres/tcp_server.ex | 1 + .../electric/lib/electric/replication/postgres_manager.ex | 1 + .../lib/electric/replication/satellite_collector_consumer.ex | 1 + .../lib/electric/replication/satellite_collector_producer.ex | 2 ++ .../electric/lib/electric/replication/satellite_connector.ex | 2 ++ components/electric/lib/electric/satellite/client_manager.ex | 1 + .../electric/lib/electric/telemetry/call_home_reporter.ex | 1 + 14 files changed, 16 insertions(+) diff --git a/components/electric/lib/electric/features.ex b/components/electric/lib/electric/features.ex index 279cd3ed7c..8e7fa35d04 100644 --- a/components/electric/lib/electric/features.ex +++ b/components/electric/lib/electric/features.ex @@ -92,6 +92,7 @@ defmodule Electric.Features do def init(args) do name = Keyword.get(args, :name, __MODULE__) + Process.set_label({:features, name}) table = :ets.new(name, [:set, :public, :named_table, read_concurrency: true]) diff --git a/components/electric/lib/electric/postgres/extension/schema_cache.ex b/components/electric/lib/electric/postgres/extension/schema_cache.ex index a438b54218..105c14b964 100644 --- a/components/electric/lib/electric/postgres/extension/schema_cache.ex +++ b/components/electric/lib/electric/postgres/extension/schema_cache.ex @@ -214,6 +214,7 @@ defmodule Electric.Postgres.Extension.SchemaCache do @impl GenServer def init({conn_config, opts}) do origin = Connectors.origin(conn_config) + Process.set_label({:schema_cache, origin}) Logger.metadata(pg_producer: origin) Logger.info("Starting #{__MODULE__} for #{origin}") diff --git a/components/electric/lib/electric/postgres/oid_database.ex b/components/electric/lib/electric/postgres/oid_database.ex index a95d0b83eb..b8d168836e 100644 --- a/components/electric/lib/electric/postgres/oid_database.ex +++ b/components/electric/lib/electric/postgres/oid_database.ex @@ -63,6 +63,7 @@ defmodule Electric.Postgres.OidDatabase do end def init(_) do + Process.set_label(:oid_database) oid_table = :ets.new(@oid_table, [:set, :named_table, keypos: 2, read_concurrency: true]) :ets.insert(oid_table, Electric.Postgres.OidDatabase.Defaults.get_defaults()) diff --git a/components/electric/lib/electric/postgres/proxy/handler.ex b/components/electric/lib/electric/postgres/proxy/handler.ex index 835a42d309..1f74b29a85 100644 --- a/components/electric/lib/electric/postgres/proxy/handler.ex +++ b/components/electric/lib/electric/postgres/proxy/handler.ex @@ -56,6 +56,7 @@ defmodule Electric.Postgres.Proxy.Handler do @spec initial_state(Connectors.config(), options()) :: S.t() def initial_state(connector_config, proxy_opts) do + Process.set_label(:postgres_proxy_handler) {loader, loader_opts} = Keyword.get(proxy_opts, :loader, {SchemaCache, []}) password = connector_config |> get_in([:proxy, :password]) |> validate_password() diff --git a/components/electric/lib/electric/postgres/proxy/sasl/scram_locked_cache.ex b/components/electric/lib/electric/postgres/proxy/sasl/scram_locked_cache.ex index 48479a007c..ada841e760 100644 --- a/components/electric/lib/electric/postgres/proxy/sasl/scram_locked_cache.ex +++ b/components/electric/lib/electric/postgres/proxy/sasl/scram_locked_cache.ex @@ -75,6 +75,7 @@ defmodule Electric.Postgres.Proxy.SASL.SCRAMLockedCache do @impl true def init(:ok) do + Process.set_label(:scram_locked_cache) init() {:ok, %{keys: %{}, ref_to_key: %{}}} end diff --git a/components/electric/lib/electric/postgres/proxy/upstream_connection.ex b/components/electric/lib/electric/postgres/proxy/upstream_connection.ex index 397dc07e5a..43e55cd4f8 100644 --- a/components/electric/lib/electric/postgres/proxy/upstream_connection.ex +++ b/components/electric/lib/electric/postgres/proxy/upstream_connection.ex @@ -29,6 +29,7 @@ defmodule Electric.Postgres.Proxy.UpstreamConnection do parent = Keyword.fetch!(args, :parent) connector_config = Keyword.fetch!(args, :connector_config) session_id = Keyword.fetch!(args, :session_id) + Process.set_label({:upstream_connection, session_id}) name = name(session_id) Electric.reg(name) diff --git a/components/electric/lib/electric/replication/postgres/slot_server.ex b/components/electric/lib/electric/replication/postgres/slot_server.ex index c06a7ffa4c..90f9625723 100644 --- a/components/electric/lib/electric/replication/postgres/slot_server.ex +++ b/components/electric/lib/electric/replication/postgres/slot_server.ex @@ -135,6 +135,7 @@ defmodule Electric.Replication.Postgres.SlotServer do {:via, :gproc, producer} = Keyword.fetch!(opts, :producer) origin = Connectors.origin(conn_config) + Process.set_label({:slot_server, origin}) replication_opts = Connectors.get_replication_opts(conn_config) slot = replication_opts.subscription diff --git a/components/electric/lib/electric/replication/postgres/tcp_server.ex b/components/electric/lib/electric/replication/postgres/tcp_server.ex index 075081e784..46d157177c 100644 --- a/components/electric/lib/electric/replication/postgres/tcp_server.ex +++ b/components/electric/lib/electric/replication/postgres/tcp_server.ex @@ -144,6 +144,7 @@ defmodule Electric.Replication.Postgres.TcpServer do @impl ThousandIsland.Handler def handle_connection(socket, _init) do + Process.set_label(:tcp_server) {:ok, {ip, port}} = Socket.peername(socket) client = "#{:inet.ntoa(ip)}:#{port}" diff --git a/components/electric/lib/electric/replication/postgres_manager.ex b/components/electric/lib/electric/replication/postgres_manager.ex index 0494bd7855..bba4e95ac0 100644 --- a/components/electric/lib/electric/replication/postgres_manager.ex +++ b/components/electric/lib/electric/replication/postgres_manager.ex @@ -80,6 +80,7 @@ defmodule Electric.Replication.PostgresConnectorMng do @impl GenServer def init(connector_config) do origin = Connectors.origin(connector_config) + Process.set_label({:postgres_manager, origin}) name = name(origin) Electric.reg(name) diff --git a/components/electric/lib/electric/replication/satellite_collector_consumer.ex b/components/electric/lib/electric/replication/satellite_collector_consumer.ex index ed9ef01318..931994707f 100644 --- a/components/electric/lib/electric/replication/satellite_collector_consumer.ex +++ b/components/electric/lib/electric/replication/satellite_collector_consumer.ex @@ -17,6 +17,7 @@ defmodule Electric.Replication.SatelliteCollectorConsumer do @impl GenStage def init(opts) do + Process.set_label(:satellite_collector_consumer) {:consumer, Map.new(Keyword.take(opts, [:push_to])), Keyword.take(opts, [:subscribe_to])} end diff --git a/components/electric/lib/electric/replication/satellite_collector_producer.ex b/components/electric/lib/electric/replication/satellite_collector_producer.ex index eed1ed32bc..75f41c4efc 100644 --- a/components/electric/lib/electric/replication/satellite_collector_producer.ex +++ b/components/electric/lib/electric/replication/satellite_collector_producer.ex @@ -35,6 +35,8 @@ defmodule Electric.Replication.SatelliteCollectorProducer do @impl GenStage def init(connector_config) do + origin = Connectors.origin(connector_config) + Process.set_label({:satellite_collector_producer, origin}) table = :ets.new(nil, [:ordered_set, keypos: 2]) {:producer, diff --git a/components/electric/lib/electric/replication/satellite_connector.ex b/components/electric/lib/electric/replication/satellite_connector.ex index d7145ae16f..34dcd89b94 100644 --- a/components/electric/lib/electric/replication/satellite_connector.ex +++ b/components/electric/lib/electric/replication/satellite_connector.ex @@ -20,6 +20,8 @@ defmodule Electric.Replication.SatelliteConnector do @impl Supervisor def init(%{name: name, producer: producer, origin: origin}) do + Process.set_label({:satellite_collector_supervisor, origin, name}) + # `cancel: :temporary` is used here since the death of the Satellite WS process will eventually kill the supervisor, # but it'll kill SatelliteCollectorConsumer first and cause it to restart with nowhere to resubscribe. children = [ diff --git a/components/electric/lib/electric/satellite/client_manager.ex b/components/electric/lib/electric/satellite/client_manager.ex index bc5234e7c3..19d4783061 100644 --- a/components/electric/lib/electric/satellite/client_manager.ex +++ b/components/electric/lib/electric/satellite/client_manager.ex @@ -52,6 +52,7 @@ defmodule Electric.Satellite.ClientManager do @impl GenServer def init(_) do + Process.set_label(:client_manager) {:ok, %State{}} end diff --git a/components/electric/lib/electric/telemetry/call_home_reporter.ex b/components/electric/lib/electric/telemetry/call_home_reporter.ex index a3cf00e9ef..2c2cda4119 100644 --- a/components/electric/lib/electric/telemetry/call_home_reporter.ex +++ b/components/electric/lib/electric/telemetry/call_home_reporter.ex @@ -44,6 +44,7 @@ defmodule Electric.Telemetry.CallHomeReporter do # We need to trap exits here so that `terminate/2` callback has more chances to run # and send data before crash/shutdown Process.flag(:trap_exit, true) + Process.set_label({:call_home_reporter, name}) metrics = save_target_path_to_options(metrics) From f7c37cb4090cd41ff78395ac17aac12008e64b7f Mon Sep 17 00:00:00 2001 From: rob Date: Thu, 20 Jun 2024 14:27:23 +0100 Subject: [PATCH 4/4] Add process labels for Tasks --- .../electric/lib/electric/postgres/extension/schema_cache.ex | 1 + components/electric/lib/electric/replication/initial_sync.ex | 5 ++++- components/electric/lib/electric/satellite/protocol.ex | 2 ++ 3 files changed, 7 insertions(+), 1 deletion(-) diff --git a/components/electric/lib/electric/postgres/extension/schema_cache.ex b/components/electric/lib/electric/postgres/extension/schema_cache.ex index 105c14b964..5ec040c83f 100644 --- a/components/electric/lib/electric/postgres/extension/schema_cache.ex +++ b/components/electric/lib/electric/postgres/extension/schema_cache.ex @@ -390,6 +390,7 @@ defmodule Electric.Postgres.Extension.SchemaCache do def handle_call({:refresh_subscription, name}, from, %{refresh_task: nil} = state) do task = Task.async(fn -> + Process.set_label({:refresh_subscription, name}) result = SchemaLoader.refresh_subscription(state.backend, name) GenServer.reply(from, result) :ok diff --git a/components/electric/lib/electric/replication/initial_sync.ex b/components/electric/lib/electric/replication/initial_sync.ex index d9a59a1fa8..8070c0f6ea 100644 --- a/components/electric/lib/electric/replication/initial_sync.ex +++ b/components/electric/lib/electric/replication/initial_sync.ex @@ -182,7 +182,10 @@ defmodule Electric.Replication.InitialSync do # 1. is made after the current transaction has started # 2. is in a separate transaction (thus on a different connection) # 3. is before the potentially big read queries to ensure this arrives ASAP on any data size - Task.start(fn -> perform_magic_write(conn_opts, marker) end) + Task.start(fn -> + Process.set_label(:magic_write) + perform_magic_write(conn_opts, marker) + end) {:ok, _, [{xmin_str}]} = :epgsql.squery(conn, "SELECT pg_snapshot_xmin(pg_current_snapshot())") diff --git a/components/electric/lib/electric/satellite/protocol.ex b/components/electric/lib/electric/satellite/protocol.ex index 4ddda43e09..d41f30aae9 100644 --- a/components/electric/lib/electric/satellite/protocol.ex +++ b/components/electric/lib/electric/satellite/protocol.ex @@ -1103,6 +1103,7 @@ defmodule Electric.Satellite.Protocol do Task.start(fn -> # This is `InitialSync.query_subscription_data/2` by default, but can be overridden for tests. + Process.set_label({:initial_sync, state.origin, id}) # Please see documentation on that function for context on the next `receive` block. fun.({id, requests, context}, reply_to: {ref, parent}, @@ -1166,6 +1167,7 @@ defmodule Electric.Satellite.Protocol do move_in_ref = state.out_rep.move_in_next_ref Task.start(fn -> + Process.set_label({:initial_sync_query_move_in_data, state.origin}) # This is `InitialSync.query_move_in_data/4` by default, but can be overridden for tests. # Please see documentation on that function for context on the next `receive` block. fun.(move_in_ref, actions, context,