Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Alco/observer and process names #1137

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 78 additions & 2 deletions components/electric/lib/electric.ex
Original file line number Diff line number Diff line change
@@ -1,9 +1,70 @@
defmodule Electric do
@moduledoc false

alias Electric.Replication.Connectors

@type connector :: Connectors.config() | Connectors.origin()
@type reg_name :: {:via, :gproc, {:n, :l, term()}}
@type write_to_pg_mode :: :logical_replication | :direct_writes

defmacro __using__(proc_type) do
proc_module = proc_module(proc_type)

quote do
use unquote(proc_module)

alias Electric.Replication.Connectors

@spec start_link(Connectors.config()) :: Supervisor.on_start()
def start_link(connector_config) do
name = static_name(connector_config)
unquote(proc_module).start_link(__MODULE__, connector_config, name: name)
end

@spec reg(Electric.connector()) :: true
def reg(connector) do
connector
|> reg_name()
|> Electric.reg()
end

@spec reg_name(Electric.connector()) :: Electric.reg_name()
def reg_name(connector) do
Electric.name(__MODULE__, connector)
end

@spec static_name(Electric.connector()) :: atom
def static_name(connector) do
Electric.static_name(__MODULE__, connector)
end

@spec ets_table_name(Electric.connector()) :: atom
defp ets_table_name(connector) do
static_name(connector)
end

defoverridable start_link: 1
end
end

defp proc_module(:supervisor), do: Supervisor
defp proc_module(:gen_server), do: GenServer
defp proc_module(:gen_stage), do: GenStage

def static_name(module) do
String.to_atom(trim_module_name(module))
end

def static_name(module, connector) do
String.to_atom(trim_module_name(module) <> ":" <> origin(connector))
end

defp trim_module_name(module) do
module
|> inspect()
|> String.replace_leading("Electric.", "")
end

@doc """
Register process with the given name
"""
Expand Down Expand Up @@ -57,14 +118,29 @@ defmodule Electric do
end
end

def await_reg({:via, :gproc, name}, timeout) do
:gproc.await(name, timeout)
end

@doc """
Helper function for gproc registration
"""
@spec name(module(), term()) :: reg_name
def name(module, term) do
@spec name(module, connector) :: reg_name
def name(module, connector) do
{:via, :gproc, {:n, :l, {module, origin(connector)}}}
end

@spec gen_name(module, term) :: reg_name
def gen_name(module, term) do
{:via, :gproc, {:n, :l, {module, term}}}
end

@spec origin(connector) :: Connectors.origin()
def origin(origin) when is_binary(origin), do: origin

def origin(connector_config) when is_list(connector_config),
do: Connectors.origin(connector_config)

@doc """
Helper function to lookup pid that corresponds to registered gproc name
"""
Expand Down
4 changes: 4 additions & 0 deletions components/electric/lib/electric/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,10 @@ defmodule Electric.Application do
defp pg_server_port, do: Application.fetch_env!(:electric, :pg_server_port)

defp listener_opts do
[num_acceptors: 1] ++ ipv6_opts()
end

defp ipv6_opts do
use_ipv6? = Application.get_env(:electric, :listen_on_ipv6?, false)

if use_ipv6? do
Expand Down
6 changes: 5 additions & 1 deletion components/electric/lib/electric/features.ex
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,12 @@ defmodule Electric.Features do
@type flag() :: atom()
@type name() :: atom()

def static_name do
Electric.static_name(__MODULE__)
end

def start_link(args) do
GenServer.start_link(__MODULE__, args)
GenServer.start_link(__MODULE__, args, name: static_name())
end

def default_key, do: @default_key
Expand Down
21 changes: 7 additions & 14 deletions components/electric/lib/electric/postgres/cached_wal/ets_backed.ex
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ defmodule Electric.Postgres.CachedWal.EtsBacked do
removing oldest entries (FIFO)
"""

use GenStage
use Electric, :gen_stage

alias Electric.Replication.Changes.Transaction
alias Electric.Replication.Connectors
Expand All @@ -34,17 +34,12 @@ defmodule Electric.Postgres.CachedWal.EtsBacked do

# Public API

@spec name(Connectors.origin()) :: Electric.reg_name()
def name(origin) do
Electric.name(__MODULE__, origin)
end

@doc """
Start the cache. See module docs for options
"""
def start_link(opts) do
origin = Keyword.fetch!(opts, :origin)
GenStage.start_link(__MODULE__, opts, name: name(origin))
GenStage.start_link(__MODULE__, opts, name: static_name(origin))
end

def clear_cache(stage) do
Expand Down Expand Up @@ -87,12 +82,12 @@ defmodule Electric.Postgres.CachedWal.EtsBacked do

@impl Api
def request_notification(origin, wal_pos) do
GenStage.call(name(origin), {:request_notification, wal_pos})
GenStage.call(reg_name(origin), {:request_notification, wal_pos})
end

@impl Api
def cancel_notification_request(origin, ref) do
GenStage.call(name(origin), {:cancel_notification, ref})
GenStage.call(reg_name(origin), {:cancel_notification, ref})
end

@impl Api
Expand All @@ -108,7 +103,7 @@ defmodule Electric.Postgres.CachedWal.EtsBacked do

@impl Api
def telemetry_stats(origin) do
GenStage.call(name(origin), :telemetry_stats)
GenStage.call(reg_name(origin), :telemetry_stats)
catch
:exit, _ -> nil
end
Expand All @@ -124,6 +119,8 @@ defmodule Electric.Postgres.CachedWal.EtsBacked do
def init(opts) do
origin = Keyword.fetch!(opts, :origin)

reg(origin)

table = :ets.new(ets_table_name(origin), [:named_table, :ordered_set])
Logger.metadata(origin: origin, component: "CachedWal.EtsBacked")

Expand All @@ -142,10 +139,6 @@ defmodule Electric.Postgres.CachedWal.EtsBacked do
end
end

defp ets_table_name(origin) do
String.to_atom(inspect(__MODULE__) <> ":" <> origin)
end

@impl GenStage
def handle_call({:request_notification, wal_pos}, {from, _}, state) do
ref = make_ref()
Expand Down
38 changes: 17 additions & 21 deletions components/electric/lib/electric/postgres/extension/schema_cache.ex
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ defmodule Electric.Postgres.Extension.SchemaCache do
itself (via the functions in the `Extension` module).
"""

use GenServer
use Electric, :gen_server

import Electric.Postgres.Extension, only: [is_extension_relation: 1]

Expand Down Expand Up @@ -44,27 +44,18 @@ defmodule Electric.Postgres.Extension.SchemaCache do
Supervisor.child_spec(default, [])
end

def start_link({conn_config, opts}) do
start_link(conn_config, opts)
def start_link({connector_config, opts}) do
start_link(connector_config, opts)
end

def start_link(conn_config, opts \\ []) do
GenServer.start_link(__MODULE__, {conn_config, opts}, name: name(conn_config))
end

@spec name(Connectors.config()) :: Electric.reg_name()
def name(conn_config) when is_list(conn_config) do
name(Connectors.origin(conn_config))
end

@spec name(Connectors.origin()) :: Electric.reg_name()
def name(origin) when is_binary(origin) do
Electric.name(__MODULE__, origin)
def start_link(connector_config, opts \\ []) do
name = static_name(connector_config)
GenServer.start_link(__MODULE__, {connector_config, opts}, name: name)
end

@spec ready?(Connectors.origin()) :: boolean()
def ready?(origin) do
case Electric.lookup_pid(name(origin)) do
case Electric.lookup_pid(reg_name(origin)) do
pid when is_pid(pid) -> true
_ -> false
end
Expand Down Expand Up @@ -200,7 +191,7 @@ defmodule Electric.Postgres.Extension.SchemaCache do
end

defp call(name, msg) when is_binary(name) do
call(name(name), msg)
call(reg_name(name), msg)
end

defp call(pid, msg) when is_pid(pid) do
Expand All @@ -212,8 +203,9 @@ defmodule Electric.Postgres.Extension.SchemaCache do
end

@impl GenServer
def init({conn_config, opts}) do
origin = Connectors.origin(conn_config)
def init({connector_config, opts}) do
reg(connector_config)
origin = Connectors.origin(connector_config)

Logger.metadata(pg_producer: origin)
Logger.info("Starting #{__MODULE__} for #{origin}")
Expand All @@ -226,12 +218,12 @@ defmodule Electric.Postgres.Extension.SchemaCache do
{:ok, backend} =
opts
|> SchemaLoader.get(:backend)
|> SchemaLoader.connect(conn_config)
|> SchemaLoader.connect(connector_config)

state = %{
origin: origin,
backend: backend,
conn_config: conn_config,
conn_config: connector_config,
opts: opts,
current: nil,
refresh_task: nil,
Expand All @@ -242,6 +234,10 @@ defmodule Electric.Postgres.Extension.SchemaCache do
{:ok, state}
end

def init(connector_config) when is_list(connector_config) do
init({connector_config, []})
end

@impl GenServer
def handle_call({:load, :current}, _from, %{current: nil} = state) do
with {{:ok, schema_version}, state} <- load_current_schema(state) do
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,10 @@ defmodule Electric.Postgres.Extension.SchemaCache.Global do

require Logger

{:via, :gproc, key} = name = Electric.name(SchemaCache, :__global__)

@name name
@key key

def name, do: @name
def name, do: Electric.name(SchemaCache, "__global__")

def register(origin) do
case Electric.reg_or_locate(@name, origin) do
case Electric.reg_or_locate(name(), origin) do
:ok ->
# Kept as a warning to remind us that this is wrong... ;)
Logger.warning("SchemaCache #{inspect(origin)} registered as the global instance")
Expand All @@ -32,7 +27,7 @@ defmodule Electric.Postgres.Extension.SchemaCache.Global do
end

defp with_instance(timeout \\ 5_000, fun) when is_function(fun, 1) do
{pid, _value} = :gproc.await(@key, timeout)
{pid, _value} = Electric.await_reg(name(), timeout)
fun.(pid)
end

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ defmodule Electric.Postgres.Extension.SchemaLoader.Epgsql do
# only connect when required, not immediately
lazy: true,
pool_size: 4,
worker_idle_timeout: 30_000
worker_idle_timeout: 30_000,
name: Electric.static_name(__MODULE__)
)
end

Expand Down
10 changes: 7 additions & 3 deletions components/electric/lib/electric/postgres/oid_database.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,12 @@ defmodule Electric.Postgres.OidDatabase do

@oid_table :oid_database

def static_name do
Electric.static_name(__MODULE__)
end

def start_link(_) do
GenServer.start_link(__MODULE__, [], name: __MODULE__)
GenServer.start_link(__MODULE__, [], name: static_name())
end

def update_oids(conn, kinds \\ nil) do
Expand All @@ -22,11 +26,11 @@ defmodule Electric.Postgres.OidDatabase do
end

with {:ok, oids} <- res do
save_oids(__MODULE__, oids)
save_oids(static_name(), oids)
end
end

def save_oids(server \\ __MODULE__, values) do
def save_oids(server \\ static_name(), values) do
GenServer.call(server, {:save_oids, Enum.map(values, &pg_type_from_tuple/1)})
end

Expand Down
7 changes: 6 additions & 1 deletion components/electric/lib/electric/postgres/proxy.ex
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,12 @@ defmodule Electric.Postgres.Proxy do
Logger.info("Starting Proxy server listening on port #{listen_opts[:port]}")

ThousandIsland.child_spec(
Keyword.merge(listen_opts, handler_module: Handler, handler_options: handler_state)
Keyword.merge(listen_opts,
handler_module: Handler,
handler_options: handler_state,
num_acceptors: 1,
supervisor_options: [name: Electric.static_name(__MODULE__, connector_config)]
)
)
end

Expand Down
Loading
Loading