From 0150f6254c27573a29ff533e5e96d3a59b4e8f4e Mon Sep 17 00:00:00 2001 From: Stas Date: Wed, 13 Sep 2023 14:53:23 +0200 Subject: [PATCH] fix: update postgres libcluster strategy (#671) --- lib/realtime/cluster_strategy/postgres.ex | 361 ++++------------------ mix.exs | 2 +- 2 files changed, 67 insertions(+), 296 deletions(-) diff --git a/lib/realtime/cluster_strategy/postgres.ex b/lib/realtime/cluster_strategy/postgres.ex index 9f466583f..453c5d1e3 100644 --- a/lib/realtime/cluster_strategy/postgres.ex +++ b/lib/realtime/cluster_strategy/postgres.ex @@ -2,336 +2,107 @@ defmodule Realtime.Cluster.Strategy.Postgres do @moduledoc """ A libcluster strategy that uses Postgres LISTEN/NOTIFY to determine the cluster topology. - This strategy works by having all nodes in the cluster listen and notify to Postgres notifications - on the same channel. + This strategy operates by having all nodes in the cluster listen for and send notifications to a shared Postgres channel. - When a node comes online, it will notify the other nodes on the channel by sending a sync message. - Other nodes that receive this message will immediately send a heartbeat message back to this node. - Node will acknowledge the heartbeat messages from other nodes and connect to them. + When a node comes online, it begins to broadcast its name in a "heartbeat" message to the channel. All other nodes that receive this message attempt to connect to it. - All nodes will periodically send heartbeat messages to all other nodes. If a node fails to send a heartbeat - message inside of a certain time interval, it is considered inactive and will be disconnected from the cluster. + This strategy does not check connectivity between nodes and does not disconnect them ## Options - * `hostname` - The hostname of the database server (required) - * `username` - The username to connect to the database with (required) - * `password` - The password to connect to the database with (required) - * `database` - The database to connect to (required) - * `port` - The port to connect to (required) - * `parameters` - Additional database parameters, e.g. application_name (optional) * `heartbeat_interval` - The interval at which to send heartbeat messages in milliseconds (optional; default: 5_000) - * `node_timeout` - The interval after which a node is considered inactive in milliseconds (optional; default: 15_000) - - ## Usage - - config :libcluster, - topologies: [ - postgres: [ - strategy: #{__MODULE__}, - config: [ - hostname: "locahost", - username: "postgres", - password: "postgres", - database: "postgres", - port: 5432, - parameters: [ - application_name: "cluster_node_#{node()}" - ], - heartbeat_interval: 5_000, - node_timeout: 15_000 - ] - ] - ] + * `channel_name` - The name of the channel to which nodes will listen and notify (optional; default: "cluster) """ + use GenServer - @behaviour :gen_statem - use Cluster.Strategy - - alias Cluster.Logger alias Cluster.Strategy - alias Cluster.Strategy.State - alias Ecto.Adapters.SQL - alias Postgrex.Notifications, as: PN - alias Realtime.Repo - - @channel "cluster" - - defmodule MetaState do - @moduledoc false - defstruct conn: nil, - listen_ref: nil, - inactive_nodes: MapSet.new() - end - - def start_link(opts) do - :gen_statem.start_link(__MODULE__, opts, []) - end - - @state :no_state + alias Cluster.Logger + alias Postgrex, as: P - @impl :gen_statem - def callback_mode, do: :handle_event_function + def start_link(args), do: GenServer.start_link(__MODULE__, args) - @impl :gen_statem - def init([%State{config: config} = state]) do - Process.flag(:trap_exit, true) + def init([state]) do + opts = + [ + hostname: Keyword.fetch!(state.config, :hostname), + username: Keyword.fetch!(state.config, :username), + password: Keyword.fetch!(state.config, :password), + database: Keyword.fetch!(state.config, :database), + port: Keyword.fetch!(state.config, :port), + parameters: Keyword.fetch!(state.config, :parameters) + ] new_config = - config - |> Keyword.put_new(:parameters, []) + state.config |> Keyword.put_new(:heartbeat_interval, 5_000) - |> Keyword.put_new(:node_timeout, 15_000) - - state = %{ - state - | config: new_config, - meta: %MetaState{} + |> Keyword.put_new(:channel_name, "realtime_cluster") + |> Keyword.delete(:url) + + meta = %{ + opts: fn -> opts end, + conn: nil, + conn_notif: nil, + heartbeat_ref: make_ref() } - {:ok, @state, state, {:next_event, :internal, :connect}} + {:ok, %{state | config: new_config, meta: meta}, {:continue, :connect}} end - @impl :gen_statem - def handle_event(type, content, statem_state, state) + def handle_continue(:connect, state) do + with {:ok, conn} <- P.start_link(state.meta.opts.()), + {:ok, conn_notif} <- P.Notifications.start_link(state.meta.opts.()), + {_, _} <- P.Notifications.listen(conn_notif, state.config[:channel_name]) do + Logger.info(state.topology, "Connected to Postgres database") - def handle_event( - :internal, - :connect, - @state, - %State{config: config, topology: topology} = state - ) do - [ - hostname: Keyword.fetch!(config, :hostname), - username: Keyword.fetch!(config, :username), - password: Keyword.fetch!(config, :password), - database: Keyword.fetch!(config, :database), - port: Keyword.fetch!(config, :port), - parameters: Keyword.fetch!(config, :parameters) - ] - |> PN.start_link() - |> case do - {:ok, pid} -> - Logger.info(topology, "Connected to Postgres database") + meta = %{ + state.meta + | conn: conn, + conn_notif: conn_notif, + heartbeat_ref: heartbeat(0) + } - new_state = update_meta_state(state, :conn, pid) - - {:keep_state, new_state, {{:timeout, :listen}, 0, nil}} - - _ -> - Logger.error(topology, "Failed to connect to Postgres database") - - {:keep_state, state, {{:timeout, :connect}, rand(1_000), nil}} + {:noreply, put_in(state.meta, meta)} + else + reason -> + Logger.error(state.topology, "Failed to connect to Postgres: #{inspect(reason)}") + {:noreply, state} end end - def handle_event( - :internal, - :listen, - @state, - %State{meta: %{conn: conn}, topology: topology} = state - ) do - case PN.listen(conn, @channel) do - {:ok, ref} -> - Logger.info(topology, "Listening to Postgres notifications on channel #{@channel}") - - new_state = update_meta_state(state, :listen_ref, ref) - - {:keep_state, new_state, {{:timeout, {:notify, :sync}}, 0, nil}} - - _ -> - Logger.error( - topology, - "Failed to listen to Postgres notifications on channel #{@channel}" - ) - - {:keep_state, state, {{:timeout, :listen}, rand(1_000), nil}} - end + def handle_info(:heartbeat, state) do + Process.cancel_timer(state.meta.heartbeat_ref) + P.query(state.meta.conn, "NOTIFY #{state.config[:channel_name]}, '#{node()}'", []) + ref = heartbeat(state.config[:heartbeat_interval]) + {:noreply, put_in(state.meta.heartbeat_ref, ref)} end - def handle_event( - :internal, - {:notify, notify_event}, - @state, - %State{config: config, topology: topology} = state - ) do - heartbeat_interval = config[:heartbeat_interval] - - message = "#{notify_event}::#{node()}" + def handle_info({:notification, _, _, _, node}, state) do + node = String.to_atom(node) - timeout = - Repo - |> SQL.query("NOTIFY #{@channel}, '#{message}'", []) - |> case do - {:ok, _} -> - Logger.debug( - topology, - "Notified Postgres on channel #{@channel} with message: " <> message - ) + if node != node() do + topology = state.topology + Logger.debug(topology, "Trying to connect to node: #{node}") - rand(heartbeat_interval - 1_000, heartbeat_interval + 1_000) - - {:error, _} -> - Logger.error( - topology, - "Failed to notify Postgres on channel #{@channel} with message: " <> message - ) - - rand(1_000) - end - - {:keep_state, state, {{:timeout, {:notify, :heartbeat}}, timeout, nil}} - end - - def handle_event( - :info, - {:notification, _pid, _ref, _channel, message}, - @state, - %State{config: config, meta: %{inactive_nodes: inactive_nodes}} = state - ) do - [notify_event, node] = String.split(message, "::") - node = :"#{node}" - self = node() - new_inactive_nodes = MapSet.delete(inactive_nodes, node) - new_state = update_meta_state(state, :inactive_nodes, new_inactive_nodes) - new_inactive_nodes = disconnect_nodes(new_state) - new_state = update_meta_state(state, :inactive_nodes, new_inactive_nodes) - - if node != self do - new_state - |> connect_node(node) - |> case do + case Strategy.connect_nodes(topology, state.connect, state.list_nodes, [node]) do :ok -> - case notify_event do - "sync" -> - {:keep_state, new_state, {{:timeout, {:notify, :heartbeat}}, 0, nil}} - - "heartbeat" -> - {:keep_state, new_state, {{:timeout, {:listen, node}}, config[:node_timeout], nil}} - end + Logger.debug(topology, "Connected to node: #{node}") - :error -> - {:keep_state, new_state, {{:timeout, {:notify, :sync}}, 0, nil}} + {:error, _} -> + Logger.error(topology, "Failed to connect to node: #{node}") end - else - {:keep_state, new_state} - end - end - - def handle_event( - {:timeout, {:listen, node}}, - nil, - @state, - %State{meta: %{inactive_nodes: inactive_nodes}} = state - ) do - new_inactive_nodes = MapSet.put(inactive_nodes, node) - - new_inactive_nodes = - state - |> update_meta_state(:inactive_nodes, new_inactive_nodes) - |> disconnect_nodes() - - new_state = update_meta_state(state, :inactive_nodes, new_inactive_nodes) - - {:keep_state, new_state} - end - - def handle_event({:timeout, event}, nil, @state, state) do - {:keep_state, state, {:next_event, :internal, event}} - end - - def handle_event( - :info, - {:EXIT, _pid, _reason}, - @state, - %State{topology: topology} = state - ) do - Logger.error(topology, "Postgres notifications connection terminated") - - new_state = - state - |> update_meta_state(:conn, nil) - |> update_meta_state(:listen_ref, nil) - - {:keep_state, new_state, {{:timeout, :connect}, rand(1_000), nil}} - end - - def handle_event(:terminate, reason, @state, %State{ - meta: %{conn: conn, listen_ref: ref}, - topology: topology - }) do - Logger.warn(topology, "Postgres clustering strategy terminating") - - PN.unlisten(conn, ref) - - GenServer.stop(conn, reason, 100) - - {:stop, reason} - end - - defp connect_node(%State{connect: connect, list_nodes: list_nodes, topology: topology}, node) - when is_atom(node) do - Strategy.connect_nodes( - topology, - connect, - list_nodes, - [node] - ) - |> case do - :ok -> - Logger.debug(topology, "Connected to node: #{node}") - :ok - - {:error, _} -> - Logger.error(topology, "Failed to connect to node: #{node}") - :error end - end - - defp disconnect_nodes(%State{ - disconnect: disconnect, - list_nodes: list_nodes, - meta: %{inactive_nodes: inactive_nodes}, - topology: topology - }) do - case Strategy.disconnect_nodes( - topology, - disconnect, - list_nodes, - MapSet.to_list(inactive_nodes) - ) do - :ok -> - Logger.debug( - topology, - "Disconnected from inactive nodes: " <> inspect(inactive_nodes) - ) - - MapSet.new() - - {:error, bad_nodes} -> - new_inactive_nodes = - Enum.reduce(bad_nodes, MapSet.new(), fn {n, _}, acc -> MapSet.put(acc, n) end) - - Logger.error( - topology, - "Failed to disconnect from inactive nodes: " <> inspect(new_inactive_nodes) - ) - - new_inactive_nodes - end - end - defp update_meta_state(%State{meta: %MetaState{} = meta} = state, key, value) do - %{state | meta: Map.put(meta, key, value)} + {:noreply, state} end - defp rand(max) do - rand(0, max) + def handle_info(msg, state) do + Logger.error(state.topology, "Undefined message #{inspect(msg, pretty: true)}") + {:noreply, state} end - defp rand(min, max) do - min..max - |> Enum.random() - |> abs() + ### Internal functions + @spec heartbeat(non_neg_integer()) :: reference() + defp heartbeat(interval) when interval >= 0 do + Process.send_after(self(), :heartbeat, interval) end end diff --git a/mix.exs b/mix.exs index 9f6c6ab2e..f821a34d4 100644 --- a/mix.exs +++ b/mix.exs @@ -4,7 +4,7 @@ defmodule Realtime.MixProject do def project do [ app: :realtime, - version: "2.22.13", + version: "2.22.14", elixir: "~> 1.14.0", elixirc_paths: elixirc_paths(Mix.env()), start_permanent: Mix.env() == :prod,