From 9a5a1cd6a87529c21a5ab26843d337568058c34d Mon Sep 17 00:00:00 2001 From: Oleksii Sholik Date: Tue, 2 Apr 2024 13:11:50 +0300 Subject: [PATCH] Introduce an Ecto repo in place of ad-hoc DB connections in ClientReconnectionInfo --- .../electric/lib/electric/postgres/repo.ex | 38 +++++ .../electric/replication/postgres/client.ex | 41 ++++- .../replication/postgres_connector_sup.ex | 1 + .../satellite/client_reconnection_info.ex | 142 ++++++++++-------- components/electric/mix.exs | 5 +- 5 files changed, 163 insertions(+), 64 deletions(-) create mode 100644 components/electric/lib/electric/postgres/repo.ex diff --git a/components/electric/lib/electric/postgres/repo.ex b/components/electric/lib/electric/postgres/repo.ex new file mode 100644 index 0000000000..c70029c9a4 --- /dev/null +++ b/components/electric/lib/electric/postgres/repo.ex @@ -0,0 +1,38 @@ +defmodule Electric.Postgres.Repo do + @moduledoc """ + Ecto repo for managing a pool of DB connections. + + This repo must be started as part of a `PostgresConnectorSup` supervision tree, configured + with the same `connector_config` that its siblings use. Under the hood, it will start a + dynamic Ecto repo that can be looked up by the name returned from `name/1`. + + Connections in the pool will use SSL if the Postgres connector is configured with + `DATABASE_REQUIRE_SSL=true`. This is different from `epgsql`'s default behaviour where it + tries to use SSL first and only falls back to using plain TCP if that fails. + """ + + use Ecto.Repo, otp_app: :electric, adapter: Ecto.Adapters.Postgres + + alias Electric.Replication.Connectors + + @default_pool_size 10 + + def config(connector_config, opts) do + origin = Connectors.origin(connector_config) + conn_opts = Connectors.get_connection_opts(connector_config) + + [ + name: name(origin), + hostname: conn_opts.host, + port: conn_opts.port, + username: conn_opts.username, + password: conn_opts.password, + database: conn_opts.database, + ssl: conn_opts.ssl == :required, + pool_size: Keyword.get(opts, :pool_size, @default_pool_size), + log: false + ] + end + + def name(origin), do: :"#{inspect(__MODULE__)}:#{origin}" +end diff --git a/components/electric/lib/electric/replication/postgres/client.ex b/components/electric/lib/electric/replication/postgres/client.ex index c4146effef..391ae8dbb4 100644 --- a/components/electric/lib/electric/replication/postgres/client.ex +++ b/components/electric/lib/electric/replication/postgres/client.ex @@ -8,7 +8,7 @@ defmodule Electric.Replication.Postgres.Client do import Electric.Postgres.Dialect.Postgresql, only: [quote_ident: 1] - alias Electric.Postgres.{Extension, Lsn} + alias Electric.Postgres.{Extension, Lsn, Repo} alias Electric.Replication.Connectors require Logger @@ -27,6 +27,45 @@ defmodule Electric.Replication.Postgres.Client do :epgsql.connect(ip_addr, username, password, epgsql_conn_opts) end + @doc """ + Execute the given function using a pooled DB connection. + + The pool is managed by `Electric.Postgres.Repo` and so the passed function should use the + repo module's API for querying and executing SQL statements instead of `epgsql`. See + `pooled_query!/3` and `query!/2` below for a high-level API built on top of the Ecto repo. + """ + @spec with_pool(Connectors.origin(), (-> any)) :: any + def with_pool(origin, fun) when is_binary(origin) and is_function(fun, 0) do + Repo.put_dynamic_repo(Repo.name(origin)) + Repo.checkout(fun) + end + + @doc """ + Execute the given SQL query/statement using a pooled DB connection. + + The pool is managed by `Electric.Postgres.Repo` and the query is executed by invoking `query!/2`. + """ + @spec pooled_query!(Connectors.origin(), String.t(), [term]) :: {[String.t()], [tuple()]} + def pooled_query!(origin, query_str, params) when is_binary(origin) do + with_pool(origin, fn -> query!(query_str, params) end) + end + + @doc """ + Execute the given SQL query/statement in the context of a checked-out DB connection. + + This function assumes a connection has been checked out from a pool managed by + `Electric.Postgres.Repo` and will fail if that's not the case. Use this to issue multiple + queries/statements on a single DB connection by wrapping them in an anonymous function and + passing it to `with_pool/2`. + """ + @spec query!(String.t(), [term]) :: {[String.t()], [tuple()]} + def query!(query_str, params \\ []) when is_binary(query_str) and is_list(params) do + true = Repo.checked_out?() + + %Postgrex.Result{columns: columns, rows: rows} = Repo.query!(query_str, params) + {columns, rows} + end + @spec with_conn(Connectors.connection_opts(), fun()) :: term() | {:error, term()} def with_conn(conn_opts, fun) do # Best effort capture exit message, expect trap_exit to be set diff --git a/components/electric/lib/electric/replication/postgres_connector_sup.ex b/components/electric/lib/electric/replication/postgres_connector_sup.ex index 6ec4265782..c09dde796f 100644 --- a/components/electric/lib/electric/replication/postgres_connector_sup.ex +++ b/components/electric/lib/electric/replication/postgres_connector_sup.ex @@ -40,6 +40,7 @@ defmodule Electric.Replication.PostgresConnectorSup do ] children = [ + {Electric.Postgres.Repo, Electric.Postgres.Repo.config(connector_config, [])}, {Electric.Satellite.ClientReconnectionInfo, connector_config}, {SchemaCache, connector_config}, {SatelliteCollectorProducer, connector_config}, diff --git a/components/electric/lib/electric/satellite/client_reconnection_info.ex b/components/electric/lib/electric/satellite/client_reconnection_info.ex index b014420734..058fd6f8f4 100644 --- a/components/electric/lib/electric/satellite/client_reconnection_info.ex +++ b/components/electric/lib/electric/satellite/client_reconnection_info.ex @@ -243,10 +243,12 @@ defmodule Electric.Satellite.ClientReconnectionInfo do :ets.match_delete(@additional_data_ets, {{client_id, :_, :_, :_, :_}, :_, :_}) :ets.delete(@checkpoint_ets, client_id) - {:ok, _} = query(origin, @delete_subscriptions_query, [client_id]) - {:ok, _} = query(origin, @delete_checkpoint_query, [client_id]) - {:ok, _} = query(origin, @delete_actions_query, [client_id]) - {:ok, _} = query(origin, @delete_additional_data_for_client_query, [client_id]) + Client.with_pool(origin, fn -> + Client.query!(@delete_subscriptions_query, [client_id]) + Client.query!(@delete_checkpoint_query, [client_id]) + Client.query!(@delete_actions_query, [client_id]) + Client.query!(@delete_additional_data_for_client_query, [client_id]) + end) :ok end @@ -280,8 +282,11 @@ defmodule Electric.Satellite.ClientReconnectionInfo do {{client_id, :_, :_, :subscription, subscription_id}, :_, :_} ) - {:ok, 1} = query(origin, @delete_subscription_query, [client_id, subscription_id]) - {:ok, 1} = query(origin, @delete_subscription_data_query, [client_id, subscription_id]) + Client.with_pool(origin, fn -> + subs_uuid = encode_uuid(subscription_id) + Client.query!(@delete_subscription_query, [client_id, subs_uuid]) + Client.query!(@delete_subscription_data_query, [client_id, subs_uuid]) + end) :ok end @@ -310,8 +315,12 @@ defmodule Electric.Satellite.ClientReconnectionInfo do defp store_client_checkpoint(origin, client_id, wal_pos, sent_rows_graph) do :ets.insert(@checkpoint_ets, {client_id, wal_pos, sent_rows_graph}) sent_rows_graph_bin = :erlang.term_to_binary(sent_rows_graph) - {:ok, 1} = query(origin, @upsert_checkpoint_query, [client_id, wal_pos, sent_rows_graph_bin]) - :ok + + Client.pooled_query!(origin, @upsert_checkpoint_query, [ + client_id, + wal_pos, + sent_rows_graph_bin + ]) end @doc """ @@ -407,7 +416,7 @@ defmodule Electric.Satellite.ClientReconnectionInfo do :ets.match_delete(@additional_data_ets, {{client_id, :_, :_, :_, :_}, :_, :_}) origin = Keyword.fetch!(opts, :origin) - {:ok, _} = query(origin, @delete_additional_data_for_client_query, [client_id]) + Client.pooled_query!(origin, @delete_additional_data_for_client_query, [client_id]) actions = @actions_ets @@ -489,7 +498,7 @@ defmodule Electric.Satellite.ClientReconnectionInfo do [client_id, txid, :erlang.term_to_binary(actions)] end) - {:ok, 1} = query(origin, store_actions_query(map_size(actions_map)), values) + Client.pooled_query!(origin, store_actions_query(map_size(actions_map)), values) :ok end @@ -580,7 +589,7 @@ defmodule Electric.Satellite.ClientReconnectionInfo do defp delete_additional_data(origin, {client_id, _xmin, order, _subject, _subscription_id} = key) do :ets.delete(@additional_data_ets, key) - {:ok, 1} = query(origin, @delete_additional_data_for_key_query, [client_id, order]) + Client.pooled_query!(origin, @delete_additional_data_for_key_query, [client_id, order]) :ok end @@ -611,14 +620,13 @@ defmodule Electric.Satellite.ClientReconnectionInfo do def store_subscription(origin, client_id, subscription_id, xmin, pos, requests) do :ets.insert(@subscriptions_ets, {{client_id, subscription_id}, xmin, requests, pos}) - {:ok, 1} = - query(origin, @insert_subscription_query, [ - client_id, - subscription_id, - xmin, - pos, - :erlang.term_to_binary(requests) - ]) + Client.pooled_query!(origin, @insert_subscription_query, [ + client_id, + encode_uuid(subscription_id), + xmin, + pos, + :erlang.term_to_binary(requests) + ]) :ok end @@ -656,14 +664,13 @@ defmodule Electric.Satellite.ClientReconnectionInfo do {{client_id, xmin, pos, :subscription, subscription_id}, graph_diff, []} ) - {:ok, 1} = - query(origin, @insert_subscription_data_query, [ - client_id, - xmin, - pos, - subscription_id, - :erlang.term_to_binary(graph_diff) - ]) + Client.pooled_query!(origin, @insert_subscription_data_query, [ + client_id, + xmin, + pos, + encode_uuid(subscription_id), + :erlang.term_to_binary(graph_diff) + ]) :ok end @@ -692,14 +699,13 @@ defmodule Electric.Satellite.ClientReconnectionInfo do {{client_id, xmin, pos, :transaction, nil}, graph_diff, included_txns} ) - {:ok, 1} = - query(origin, @insert_transaction_data_query, [ - client_id, - xmin, - pos, - :erlang.term_to_binary(graph_diff), - included_txns - ]) + Client.pooled_query!(origin, @insert_transaction_data_query, [ + client_id, + xmin, + pos, + :erlang.term_to_binary(graph_diff), + included_txns + ]) :ok end @@ -728,7 +734,7 @@ defmodule Electric.Satellite.ClientReconnectionInfo do :ets.select_delete(@actions_ets, matchspec) - {:ok, _} = query(origin, @delete_actions_for_xids_query, [client_id, txids]) + Client.pooled_query!(origin, @delete_actions_for_xids_query, [client_id, txids]) :ok end @@ -744,17 +750,19 @@ defmodule Electric.Satellite.ClientReconnectionInfo do origin = Connectors.origin(connector_config) - restore_checkpoint_cache(origin, checkpoint_table) - restore_subscriptions_cache(origin, subscriptions_table) - restore_additional_data_cache(origin, additional_data_table) - restore_actions_cache(origin, actions_table) + Client.with_pool(origin, fn -> + restore_checkpoint_cache(checkpoint_table) + restore_subscriptions_cache(subscriptions_table) + restore_additional_data_cache(additional_data_table) + restore_actions_cache(actions_table) + end) {:ok, nil} end - defp restore_checkpoint_cache(origin, checkpoint_table) do - {:ok, _, rows} = - query(origin, "SELECT * FROM #{Extension.client_checkpoints_table()}", []) + defp restore_checkpoint_cache(checkpoint_table) do + {["client_id", "pg_wal_pos", "sent_rows_graph"], rows} = + Client.query!("SELECT * FROM #{Extension.client_checkpoints_table()}") checkpoints = for {client_id, wal_pos, sent_rows_graph} <- rows do @@ -764,35 +772,42 @@ defmodule Electric.Satellite.ClientReconnectionInfo do :ets.insert(checkpoint_table, checkpoints) end - defp restore_subscriptions_cache(origin, subscriptions_table) do - {:ok, _, rows} = - query(origin, "SELECT * FROM #{Extension.client_shape_subscriptions_table()}", []) + defp restore_subscriptions_cache(subscriptions_table) do + {["client_id", "subscription_id", "min_txid", "ord", "shape_requests"], rows} = + Client.query!("SELECT * FROM #{Extension.client_shape_subscriptions_table()}") subscriptions = for {client_id, subscription_id, xmin, pos, shape_requests_bin} <- rows do - {{client_id, subscription_id}, String.to_integer(xmin), + {{client_id, decode_uuid(subscription_id)}, String.to_integer(xmin), :erlang.binary_to_term(shape_requests_bin), pos} end :ets.insert(subscriptions_table, subscriptions) end - defp restore_additional_data_cache(origin, additional_data_table) do - {:ok, _, rows} = - query(origin, "SELECT * FROM #{Extension.client_additional_data_table()}", []) + defp restore_additional_data_cache(additional_data_table) do + {[ + "client_id", + "min_txid", + "ord", + "subject", + "subscription_id", + "graph_diff", + "included_txns" + ], rows} = Client.query!("SELECT * FROM #{Extension.client_additional_data_table()}") records = for {client_id, xmin, pos, subject, subscription_id, graph_diff, included_txns} <- rows do {{client_id, String.to_integer(xmin), pos, String.to_existing_atom(subject), - subscription_id}, :erlang.binary_to_term(graph_diff), included_txns} + decode_uuid(subscription_id)}, :erlang.binary_to_term(graph_diff), included_txns} end :ets.insert(additional_data_table, records) end - defp restore_actions_cache(origin, actions_table) do - {:ok, _, rows} = - query(origin, "SELECT * FROM #{Extension.client_actions_table()}", []) + defp restore_actions_cache(actions_table) do + {["client_id", "txid", "subquery_actions"], rows} = + Client.query!("SELECT * FROM #{Extension.client_actions_table()}") actions = for {client_id, txid, actions_bin} <- rows do @@ -802,14 +817,19 @@ defmodule Electric.Satellite.ClientReconnectionInfo do :ets.insert(actions_table, actions) end - defp query(origin, query, params) when is_binary(query) and is_list(params) do - origin - |> connector_config() - |> Connectors.get_connection_opts() - |> Client.with_conn(fn conn -> :epgsql.equery(conn, query, params) end) + # The encode_uuid() and decode_uuid() functions are needed here due to incomplete adoption + # of an Ecto repo for regular DB connections. Consider defining Ecto schemas for the database + # tables referenced in this module to get automatic type conversion. + + defp encode_uuid(uuid_str) do + {:ok, uuid_bin} = Ecto.UUID.dump(uuid_str) + uuid_bin end - defp connector_config(origin) do - # TODO + defp decode_uuid(nil), do: nil + + defp decode_uuid(uuid_bin) do + {:ok, uuid_str} = Ecto.UUID.load(uuid_bin) + uuid_str end end diff --git a/components/electric/mix.exs b/components/electric/mix.exs index 384629766a..b58007b19b 100644 --- a/components/electric/mix.exs +++ b/components/electric/mix.exs @@ -68,8 +68,9 @@ defmodule Electric.MixProject do {:req, "~> 0.4"}, {:pg_protocol, github: "electric-sql/pg_protocol"}, {:nimble_parsec, "~> 1.4"}, - {:postgrex, "~> 0.17", only: [:test]}, - {:ecto_sql, "~> 3.11", only: [:test]}, + {:postgrex, "~> 0.17"}, + {:ecto_sql, "~> 3.11"}, + {:ecto, "~> 3.11"}, {:dotenvy, "~> 0.8"}, {:timex, "~> 3.7"} ]