Skip to content

Commit

Permalink
Introduce an Ecto repo in place of ad-hoc DB connections in ClientRec…
Browse files Browse the repository at this point in the history
…onnectionInfo
  • Loading branch information
alco committed Apr 10, 2024
1 parent 020a809 commit 9a5a1cd
Show file tree
Hide file tree
Showing 5 changed files with 163 additions and 64 deletions.
38 changes: 38 additions & 0 deletions components/electric/lib/electric/postgres/repo.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
defmodule Electric.Postgres.Repo do
@moduledoc """
Ecto repo for managing a pool of DB connections.
This repo must be started as part of a `PostgresConnectorSup` supervision tree, configured
with the same `connector_config` that its siblings use. Under the hood, it will start a
dynamic Ecto repo that can be looked up by the name returned from `name/1`.
Connections in the pool will use SSL if the Postgres connector is configured with
`DATABASE_REQUIRE_SSL=true`. This is different from `epgsql`'s default behaviour where it
tries to use SSL first and only falls back to using plain TCP if that fails.
"""

use Ecto.Repo, otp_app: :electric, adapter: Ecto.Adapters.Postgres

alias Electric.Replication.Connectors

@default_pool_size 10

def config(connector_config, opts) do
origin = Connectors.origin(connector_config)
conn_opts = Connectors.get_connection_opts(connector_config)

[
name: name(origin),
hostname: conn_opts.host,
port: conn_opts.port,
username: conn_opts.username,
password: conn_opts.password,
database: conn_opts.database,
ssl: conn_opts.ssl == :required,
pool_size: Keyword.get(opts, :pool_size, @default_pool_size),
log: false
]
end

def name(origin), do: :"#{inspect(__MODULE__)}:#{origin}"
end
41 changes: 40 additions & 1 deletion components/electric/lib/electric/replication/postgres/client.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ defmodule Electric.Replication.Postgres.Client do

import Electric.Postgres.Dialect.Postgresql, only: [quote_ident: 1]

alias Electric.Postgres.{Extension, Lsn}
alias Electric.Postgres.{Extension, Lsn, Repo}
alias Electric.Replication.Connectors

require Logger
Expand All @@ -27,6 +27,45 @@ defmodule Electric.Replication.Postgres.Client do
:epgsql.connect(ip_addr, username, password, epgsql_conn_opts)
end

@doc """
Execute the given function using a pooled DB connection.
The pool is managed by `Electric.Postgres.Repo` and so the passed function should use the
repo module's API for querying and executing SQL statements instead of `epgsql`. See
`pooled_query!/3` and `query!/2` below for a high-level API built on top of the Ecto repo.
"""
@spec with_pool(Connectors.origin(), (-> any)) :: any
def with_pool(origin, fun) when is_binary(origin) and is_function(fun, 0) do
Repo.put_dynamic_repo(Repo.name(origin))
Repo.checkout(fun)
end

@doc """
Execute the given SQL query/statement using a pooled DB connection.
The pool is managed by `Electric.Postgres.Repo` and the query is executed by invoking `query!/2`.
"""
@spec pooled_query!(Connectors.origin(), String.t(), [term]) :: {[String.t()], [tuple()]}
def pooled_query!(origin, query_str, params) when is_binary(origin) do
with_pool(origin, fn -> query!(query_str, params) end)
end

@doc """
Execute the given SQL query/statement in the context of a checked-out DB connection.
This function assumes a connection has been checked out from a pool managed by
`Electric.Postgres.Repo` and will fail if that's not the case. Use this to issue multiple
queries/statements on a single DB connection by wrapping them in an anonymous function and
passing it to `with_pool/2`.
"""
@spec query!(String.t(), [term]) :: {[String.t()], [tuple()]}
def query!(query_str, params \\ []) when is_binary(query_str) and is_list(params) do
true = Repo.checked_out?()

%Postgrex.Result{columns: columns, rows: rows} = Repo.query!(query_str, params)
{columns, rows}
end

@spec with_conn(Connectors.connection_opts(), fun()) :: term() | {:error, term()}
def with_conn(conn_opts, fun) do
# Best effort capture exit message, expect trap_exit to be set
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ defmodule Electric.Replication.PostgresConnectorSup do
]

children = [
{Electric.Postgres.Repo, Electric.Postgres.Repo.config(connector_config, [])},
{Electric.Satellite.ClientReconnectionInfo, connector_config},
{SchemaCache, connector_config},
{SatelliteCollectorProducer, connector_config},
Expand Down
142 changes: 81 additions & 61 deletions components/electric/lib/electric/satellite/client_reconnection_info.ex
Original file line number Diff line number Diff line change
Expand Up @@ -243,10 +243,12 @@ defmodule Electric.Satellite.ClientReconnectionInfo do
:ets.match_delete(@additional_data_ets, {{client_id, :_, :_, :_, :_}, :_, :_})
:ets.delete(@checkpoint_ets, client_id)

{:ok, _} = query(origin, @delete_subscriptions_query, [client_id])
{:ok, _} = query(origin, @delete_checkpoint_query, [client_id])
{:ok, _} = query(origin, @delete_actions_query, [client_id])
{:ok, _} = query(origin, @delete_additional_data_for_client_query, [client_id])
Client.with_pool(origin, fn ->
Client.query!(@delete_subscriptions_query, [client_id])
Client.query!(@delete_checkpoint_query, [client_id])
Client.query!(@delete_actions_query, [client_id])
Client.query!(@delete_additional_data_for_client_query, [client_id])
end)

:ok
end
Expand Down Expand Up @@ -280,8 +282,11 @@ defmodule Electric.Satellite.ClientReconnectionInfo do
{{client_id, :_, :_, :subscription, subscription_id}, :_, :_}
)

{:ok, 1} = query(origin, @delete_subscription_query, [client_id, subscription_id])
{:ok, 1} = query(origin, @delete_subscription_data_query, [client_id, subscription_id])
Client.with_pool(origin, fn ->
subs_uuid = encode_uuid(subscription_id)
Client.query!(@delete_subscription_query, [client_id, subs_uuid])
Client.query!(@delete_subscription_data_query, [client_id, subs_uuid])
end)

:ok
end
Expand Down Expand Up @@ -310,8 +315,12 @@ defmodule Electric.Satellite.ClientReconnectionInfo do
defp store_client_checkpoint(origin, client_id, wal_pos, sent_rows_graph) do
:ets.insert(@checkpoint_ets, {client_id, wal_pos, sent_rows_graph})
sent_rows_graph_bin = :erlang.term_to_binary(sent_rows_graph)
{:ok, 1} = query(origin, @upsert_checkpoint_query, [client_id, wal_pos, sent_rows_graph_bin])
:ok

Client.pooled_query!(origin, @upsert_checkpoint_query, [
client_id,
wal_pos,
sent_rows_graph_bin
])
end

@doc """
Expand Down Expand Up @@ -407,7 +416,7 @@ defmodule Electric.Satellite.ClientReconnectionInfo do
:ets.match_delete(@additional_data_ets, {{client_id, :_, :_, :_, :_}, :_, :_})

origin = Keyword.fetch!(opts, :origin)
{:ok, _} = query(origin, @delete_additional_data_for_client_query, [client_id])
Client.pooled_query!(origin, @delete_additional_data_for_client_query, [client_id])

actions =
@actions_ets
Expand Down Expand Up @@ -489,7 +498,7 @@ defmodule Electric.Satellite.ClientReconnectionInfo do
[client_id, txid, :erlang.term_to_binary(actions)]
end)

{:ok, 1} = query(origin, store_actions_query(map_size(actions_map)), values)
Client.pooled_query!(origin, store_actions_query(map_size(actions_map)), values)

:ok
end
Expand Down Expand Up @@ -580,7 +589,7 @@ defmodule Electric.Satellite.ClientReconnectionInfo do
defp delete_additional_data(origin, {client_id, _xmin, order, _subject, _subscription_id} = key) do
:ets.delete(@additional_data_ets, key)

{:ok, 1} = query(origin, @delete_additional_data_for_key_query, [client_id, order])
Client.pooled_query!(origin, @delete_additional_data_for_key_query, [client_id, order])

:ok
end
Expand Down Expand Up @@ -611,14 +620,13 @@ defmodule Electric.Satellite.ClientReconnectionInfo do
def store_subscription(origin, client_id, subscription_id, xmin, pos, requests) do
:ets.insert(@subscriptions_ets, {{client_id, subscription_id}, xmin, requests, pos})

{:ok, 1} =
query(origin, @insert_subscription_query, [
client_id,
subscription_id,
xmin,
pos,
:erlang.term_to_binary(requests)
])
Client.pooled_query!(origin, @insert_subscription_query, [
client_id,
encode_uuid(subscription_id),
xmin,
pos,
:erlang.term_to_binary(requests)
])

:ok
end
Expand Down Expand Up @@ -656,14 +664,13 @@ defmodule Electric.Satellite.ClientReconnectionInfo do
{{client_id, xmin, pos, :subscription, subscription_id}, graph_diff, []}
)

{:ok, 1} =
query(origin, @insert_subscription_data_query, [
client_id,
xmin,
pos,
subscription_id,
:erlang.term_to_binary(graph_diff)
])
Client.pooled_query!(origin, @insert_subscription_data_query, [
client_id,
xmin,
pos,
encode_uuid(subscription_id),
:erlang.term_to_binary(graph_diff)
])

:ok
end
Expand Down Expand Up @@ -692,14 +699,13 @@ defmodule Electric.Satellite.ClientReconnectionInfo do
{{client_id, xmin, pos, :transaction, nil}, graph_diff, included_txns}
)

{:ok, 1} =
query(origin, @insert_transaction_data_query, [
client_id,
xmin,
pos,
:erlang.term_to_binary(graph_diff),
included_txns
])
Client.pooled_query!(origin, @insert_transaction_data_query, [
client_id,
xmin,
pos,
:erlang.term_to_binary(graph_diff),
included_txns
])

:ok
end
Expand Down Expand Up @@ -728,7 +734,7 @@ defmodule Electric.Satellite.ClientReconnectionInfo do

:ets.select_delete(@actions_ets, matchspec)

{:ok, _} = query(origin, @delete_actions_for_xids_query, [client_id, txids])
Client.pooled_query!(origin, @delete_actions_for_xids_query, [client_id, txids])

:ok
end
Expand All @@ -744,17 +750,19 @@ defmodule Electric.Satellite.ClientReconnectionInfo do

origin = Connectors.origin(connector_config)

restore_checkpoint_cache(origin, checkpoint_table)
restore_subscriptions_cache(origin, subscriptions_table)
restore_additional_data_cache(origin, additional_data_table)
restore_actions_cache(origin, actions_table)
Client.with_pool(origin, fn ->
restore_checkpoint_cache(checkpoint_table)
restore_subscriptions_cache(subscriptions_table)
restore_additional_data_cache(additional_data_table)
restore_actions_cache(actions_table)
end)

{:ok, nil}
end

defp restore_checkpoint_cache(origin, checkpoint_table) do
{:ok, _, rows} =
query(origin, "SELECT * FROM #{Extension.client_checkpoints_table()}", [])
defp restore_checkpoint_cache(checkpoint_table) do
{["client_id", "pg_wal_pos", "sent_rows_graph"], rows} =
Client.query!("SELECT * FROM #{Extension.client_checkpoints_table()}")

checkpoints =
for {client_id, wal_pos, sent_rows_graph} <- rows do
Expand All @@ -764,35 +772,42 @@ defmodule Electric.Satellite.ClientReconnectionInfo do
:ets.insert(checkpoint_table, checkpoints)
end

defp restore_subscriptions_cache(origin, subscriptions_table) do
{:ok, _, rows} =
query(origin, "SELECT * FROM #{Extension.client_shape_subscriptions_table()}", [])
defp restore_subscriptions_cache(subscriptions_table) do
{["client_id", "subscription_id", "min_txid", "ord", "shape_requests"], rows} =
Client.query!("SELECT * FROM #{Extension.client_shape_subscriptions_table()}")

subscriptions =
for {client_id, subscription_id, xmin, pos, shape_requests_bin} <- rows do
{{client_id, subscription_id}, String.to_integer(xmin),
{{client_id, decode_uuid(subscription_id)}, String.to_integer(xmin),
:erlang.binary_to_term(shape_requests_bin), pos}
end

:ets.insert(subscriptions_table, subscriptions)
end

defp restore_additional_data_cache(origin, additional_data_table) do
{:ok, _, rows} =
query(origin, "SELECT * FROM #{Extension.client_additional_data_table()}", [])
defp restore_additional_data_cache(additional_data_table) do
{[
"client_id",
"min_txid",
"ord",
"subject",
"subscription_id",
"graph_diff",
"included_txns"
], rows} = Client.query!("SELECT * FROM #{Extension.client_additional_data_table()}")

records =
for {client_id, xmin, pos, subject, subscription_id, graph_diff, included_txns} <- rows do
{{client_id, String.to_integer(xmin), pos, String.to_existing_atom(subject),
subscription_id}, :erlang.binary_to_term(graph_diff), included_txns}
decode_uuid(subscription_id)}, :erlang.binary_to_term(graph_diff), included_txns}
end

:ets.insert(additional_data_table, records)
end

defp restore_actions_cache(origin, actions_table) do
{:ok, _, rows} =
query(origin, "SELECT * FROM #{Extension.client_actions_table()}", [])
defp restore_actions_cache(actions_table) do
{["client_id", "txid", "subquery_actions"], rows} =
Client.query!("SELECT * FROM #{Extension.client_actions_table()}")

actions =
for {client_id, txid, actions_bin} <- rows do
Expand All @@ -802,14 +817,19 @@ defmodule Electric.Satellite.ClientReconnectionInfo do
:ets.insert(actions_table, actions)
end

defp query(origin, query, params) when is_binary(query) and is_list(params) do
origin
|> connector_config()
|> Connectors.get_connection_opts()
|> Client.with_conn(fn conn -> :epgsql.equery(conn, query, params) end)
# The encode_uuid() and decode_uuid() functions are needed here due to incomplete adoption
# of an Ecto repo for regular DB connections. Consider defining Ecto schemas for the database
# tables referenced in this module to get automatic type conversion.

defp encode_uuid(uuid_str) do
{:ok, uuid_bin} = Ecto.UUID.dump(uuid_str)
uuid_bin
end

defp connector_config(origin) do
# TODO
defp decode_uuid(nil), do: nil

defp decode_uuid(uuid_bin) do
{:ok, uuid_str} = Ecto.UUID.load(uuid_bin)
uuid_str
end
end
5 changes: 3 additions & 2 deletions components/electric/mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,9 @@ defmodule Electric.MixProject do
{:req, "~> 0.4"},
{:pg_protocol, github: "electric-sql/pg_protocol"},
{:nimble_parsec, "~> 1.4"},
{:postgrex, "~> 0.17", only: [:test]},
{:ecto_sql, "~> 3.11", only: [:test]},
{:postgrex, "~> 0.17"},
{:ecto_sql, "~> 3.11"},
{:ecto, "~> 3.11"},
{:dotenvy, "~> 0.8"},
{:timex, "~> 3.7"}
]
Expand Down

0 comments on commit 9a5a1cd

Please sign in to comment.