Skip to content

Commit

Permalink
Consolidate process naming to ease the use of :observer
Browse files Browse the repository at this point in the history
  • Loading branch information
alco committed Apr 19, 2024
1 parent 3923259 commit affd6b5
Show file tree
Hide file tree
Showing 24 changed files with 247 additions and 226 deletions.
80 changes: 78 additions & 2 deletions components/electric/lib/electric.ex
Original file line number Diff line number Diff line change
@@ -1,9 +1,70 @@
defmodule Electric do
@moduledoc false

alias Electric.Replication.Connectors

@type connector :: Connectors.config() | Connectors.origin()
@type reg_name :: {:via, :gproc, {:n, :l, term()}}
@type write_to_pg_mode :: :logical_replication | :direct_writes

defmacro __using__(proc_type) do
proc_module = proc_module(proc_type)

quote do
use unquote(proc_module)

alias Electric.Replication.Connectors

@spec start_link(Connectors.config()) :: Supervisor.on_start()
def start_link(connector_config) do
name = static_name(connector_config)
unquote(proc_module).start_link(__MODULE__, connector_config, name: name)
end

@spec reg(Electric.connector()) :: true
def reg(connector) do
connector
|> reg_name()
|> Electric.reg()
end

@spec reg_name(Electric.connector()) :: Electric.reg_name()
def reg_name(connector) do
Electric.name(__MODULE__, connector)
end

@spec static_name(Electric.connector()) :: atom
def static_name(connector) do
Electric.static_name(__MODULE__, connector)
end

@spec ets_table_name(Electric.connector()) :: atom
defp ets_table_name(connector) do
static_name(connector)
end

defoverridable start_link: 1
end
end

defp proc_module(:supervisor), do: Supervisor
defp proc_module(:gen_server), do: GenServer
defp proc_module(:gen_stage), do: GenStage

def static_name(module) do
String.to_atom(trim_module_name(module))
end

def static_name(module, connector) do
String.to_atom(trim_module_name(module) <> ":" <> origin(connector))
end

defp trim_module_name(module) do
module
|> inspect()
|> String.replace_leading("Electric.", "")
end

@doc """
Register process with the given name
"""
Expand Down Expand Up @@ -57,14 +118,29 @@ defmodule Electric do
end
end

def await_reg({:via, :gproc, name}, timeout) do
:gproc.await(name, timeout)
end

@doc """
Helper function for gproc registration
"""
@spec name(module(), term()) :: reg_name
def name(module, term) do
@spec name(module, connector) :: reg_name
def name(module, connector) do
{:via, :gproc, {:n, :l, {module, origin(connector)}}}
end

@spec gen_name(module, term) :: reg_name
def gen_name(module, term) do
{:via, :gproc, {:n, :l, {module, term}}}
end

@spec origin(connector) :: Connectors.origin()
def origin(origin) when is_binary(origin), do: origin

def origin(connector_config) when is_list(connector_config),
do: Connectors.origin(connector_config)

@doc """
Helper function to lookup pid that corresponds to registered gproc name
"""
Expand Down
4 changes: 4 additions & 0 deletions components/electric/lib/electric/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,10 @@ defmodule Electric.Application do
defp pg_server_port, do: Application.fetch_env!(:electric, :pg_server_port)

defp listener_opts do
[num_acceptors: 1] ++ ipv6_opts()
end

defp ipv6_opts do
use_ipv6? = Application.get_env(:electric, :listen_on_ipv6?, false)

if use_ipv6? do
Expand Down
6 changes: 5 additions & 1 deletion components/electric/lib/electric/features.ex
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,12 @@ defmodule Electric.Features do
@type flag() :: atom()
@type name() :: atom()

def static_name do
Electric.static_name(__MODULE__)
end

def start_link(args) do
GenServer.start_link(__MODULE__, args)
GenServer.start_link(__MODULE__, args, name: static_name())
end

def default_key, do: @default_key
Expand Down
21 changes: 7 additions & 14 deletions components/electric/lib/electric/postgres/cached_wal/ets_backed.ex
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ defmodule Electric.Postgres.CachedWal.EtsBacked do
removing oldest entries (FIFO)
"""

use GenStage
use Electric, :gen_stage

alias Electric.Replication.Changes.Transaction
alias Electric.Replication.Connectors
Expand All @@ -34,17 +34,12 @@ defmodule Electric.Postgres.CachedWal.EtsBacked do

# Public API

@spec name(Connectors.origin()) :: Electric.reg_name()
def name(origin) do
Electric.name(__MODULE__, origin)
end

@doc """
Start the cache. See module docs for options
"""
def start_link(opts) do
origin = Keyword.fetch!(opts, :origin)
GenStage.start_link(__MODULE__, opts, name: name(origin))
GenStage.start_link(__MODULE__, opts, name: static_name(origin))
end

def clear_cache(stage) do
Expand Down Expand Up @@ -87,12 +82,12 @@ defmodule Electric.Postgres.CachedWal.EtsBacked do

@impl Api
def request_notification(origin, wal_pos) do
GenStage.call(name(origin), {:request_notification, wal_pos})
GenStage.call(reg_name(origin), {:request_notification, wal_pos})
end

@impl Api
def cancel_notification_request(origin, ref) do
GenStage.call(name(origin), {:cancel_notification, ref})
GenStage.call(reg_name(origin), {:cancel_notification, ref})
end

@impl Api
Expand All @@ -108,7 +103,7 @@ defmodule Electric.Postgres.CachedWal.EtsBacked do

@impl Api
def telemetry_stats(origin) do
GenStage.call(name(origin), :telemetry_stats)
GenStage.call(reg_name(origin), :telemetry_stats)
catch
:exit, _ -> nil
end
Expand All @@ -124,6 +119,8 @@ defmodule Electric.Postgres.CachedWal.EtsBacked do
def init(opts) do
origin = Keyword.fetch!(opts, :origin)

reg(origin)

table = :ets.new(ets_table_name(origin), [:named_table, :ordered_set])
Logger.metadata(origin: origin, component: "CachedWal.EtsBacked")

Expand All @@ -142,10 +139,6 @@ defmodule Electric.Postgres.CachedWal.EtsBacked do
end
end

defp ets_table_name(origin) do
String.to_atom(inspect(__MODULE__) <> ":" <> origin)
end

@impl GenStage
def handle_call({:request_notification, wal_pos}, {from, _}, state) do
ref = make_ref()
Expand Down
38 changes: 17 additions & 21 deletions components/electric/lib/electric/postgres/extension/schema_cache.ex
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ defmodule Electric.Postgres.Extension.SchemaCache do
itself (via the functions in the `Extension` module).
"""

use GenServer
use Electric, :gen_server

import Electric.Postgres.Extension, only: [is_extension_relation: 1]

Expand Down Expand Up @@ -44,27 +44,18 @@ defmodule Electric.Postgres.Extension.SchemaCache do
Supervisor.child_spec(default, [])
end

def start_link({conn_config, opts}) do
start_link(conn_config, opts)
def start_link({connector_config, opts}) do
start_link(connector_config, opts)
end

def start_link(conn_config, opts \\ []) do
GenServer.start_link(__MODULE__, {conn_config, opts}, name: name(conn_config))
end

@spec name(Connectors.config()) :: Electric.reg_name()
def name(conn_config) when is_list(conn_config) do
name(Connectors.origin(conn_config))
end

@spec name(Connectors.origin()) :: Electric.reg_name()
def name(origin) when is_binary(origin) do
Electric.name(__MODULE__, origin)
def start_link(connector_config, opts \\ []) do
name = static_name(connector_config)
GenServer.start_link(__MODULE__, {connector_config, opts}, name: name)
end

@spec ready?(Connectors.origin()) :: boolean()
def ready?(origin) do
case Electric.lookup_pid(name(origin)) do
case Electric.lookup_pid(reg_name(origin)) do
pid when is_pid(pid) -> true
_ -> false
end
Expand Down Expand Up @@ -200,7 +191,7 @@ defmodule Electric.Postgres.Extension.SchemaCache do
end

defp call(name, msg) when is_binary(name) do
call(name(name), msg)
call(reg_name(name), msg)
end

defp call(pid, msg) when is_pid(pid) do
Expand All @@ -212,8 +203,9 @@ defmodule Electric.Postgres.Extension.SchemaCache do
end

@impl GenServer
def init({conn_config, opts}) do
origin = Connectors.origin(conn_config)
def init({connector_config, opts}) do
reg(connector_config)
origin = Connectors.origin(connector_config)

Logger.metadata(pg_producer: origin)
Logger.info("Starting #{__MODULE__} for #{origin}")
Expand All @@ -226,12 +218,12 @@ defmodule Electric.Postgres.Extension.SchemaCache do
{:ok, backend} =
opts
|> SchemaLoader.get(:backend)
|> SchemaLoader.connect(conn_config)
|> SchemaLoader.connect(connector_config)

state = %{
origin: origin,
backend: backend,
conn_config: conn_config,
conn_config: connector_config,
opts: opts,
current: nil,
refresh_task: nil,
Expand All @@ -242,6 +234,10 @@ defmodule Electric.Postgres.Extension.SchemaCache do
{:ok, state}
end

def init(connector_config) when is_list(connector_config) do
init({connector_config, []})
end

@impl GenServer
def handle_call({:load, :current}, _from, %{current: nil} = state) do
with {{:ok, schema_version}, state} <- load_current_schema(state) do
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,10 @@ defmodule Electric.Postgres.Extension.SchemaCache.Global do

require Logger

{:via, :gproc, key} = name = Electric.name(SchemaCache, :__global__)

@name name
@key key

def name, do: @name
def name, do: Electric.name(SchemaCache, "__global__")

def register(origin) do
case Electric.reg_or_locate(@name, origin) do
case Electric.reg_or_locate(name(), origin) do
:ok ->
# Kept as a warning to remind us that this is wrong... ;)
Logger.warning("SchemaCache #{inspect(origin)} registered as the global instance")
Expand All @@ -32,7 +27,7 @@ defmodule Electric.Postgres.Extension.SchemaCache.Global do
end

defp with_instance(timeout \\ 5_000, fun) when is_function(fun, 1) do
{pid, _value} = :gproc.await(@key, timeout)
{pid, _value} = Electric.await_reg(name(), timeout)
fun.(pid)
end

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ defmodule Electric.Postgres.Extension.SchemaLoader.Epgsql do
# only connect when required, not immediately
lazy: true,
pool_size: 4,
worker_idle_timeout: 30_000
worker_idle_timeout: 30_000,
name: Electric.static_name(__MODULE__)
)
end

Expand Down
10 changes: 7 additions & 3 deletions components/electric/lib/electric/postgres/oid_database.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,12 @@ defmodule Electric.Postgres.OidDatabase do

@oid_table :oid_database

def static_name do
Electric.static_name(__MODULE__)
end

def start_link(_) do
GenServer.start_link(__MODULE__, [], name: __MODULE__)
GenServer.start_link(__MODULE__, [], name: static_name())
end

def update_oids(conn, kinds \\ nil) do
Expand All @@ -22,11 +26,11 @@ defmodule Electric.Postgres.OidDatabase do
end

with {:ok, oids} <- res do
save_oids(__MODULE__, oids)
save_oids(static_name(), oids)
end
end

def save_oids(server \\ __MODULE__, values) do
def save_oids(server \\ static_name(), values) do
GenServer.call(server, {:save_oids, Enum.map(values, &pg_type_from_tuple/1)})
end

Expand Down
7 changes: 6 additions & 1 deletion components/electric/lib/electric/postgres/proxy.ex
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,12 @@ defmodule Electric.Postgres.Proxy do
Logger.info("Starting Proxy server listening on port #{listen_opts[:port]}")

ThousandIsland.child_spec(
Keyword.merge(listen_opts, handler_module: Handler, handler_options: handler_state)
Keyword.merge(listen_opts,
handler_module: Handler,
handler_options: handler_state,
num_acceptors: 1,
supervisor_options: [name: Electric.static_name(__MODULE__, connector_config)]
)
)
end

Expand Down
Loading

0 comments on commit affd6b5

Please sign in to comment.