Skip to content

Commit

Permalink
feat(electric): Internal API for distributed tracing using OpenTeleme…
Browse files Browse the repository at this point in the history
…try (#1370)

This PR adds OpenTelemetry libraries and covers parts of the initial
shape sync process with tracing spans.

The E2E setup is modified to run an [OpenTelemetry
Collector](https://opentelemetry.io/docs/collector/) with each test for
collecting both client- and server-originating traces and exporting them
to Honeycomb.io.
  • Loading branch information
alco authored Jun 27, 2024
1 parent cf7f316 commit febb710
Show file tree
Hide file tree
Showing 27 changed files with 534 additions and 183 deletions.
5 changes: 5 additions & 0 deletions .changeset/clever-pears-raise.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@core/electric": patch
---

Add support for covering the code with trace spans and exporting traces using the OpenTelemetry protocol.
1 change: 1 addition & 0 deletions .github/workflows/e2e.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ jobs:
env:
BUILDKITE_ANALYTICS_TOKEN: ${{ secrets.BUILDKITE_TEST_ANALYTICS_E2E }}
ELECTRIC_WRITE_TO_PG_MODE: ${{ matrix.write_to_pg_mode }}
HONEYCOMB_API_KEY: ${{ secrets.HONEYCOMB_API_KEY }}
steps:
- uses: actions/checkout@v3
with:
Expand Down
2 changes: 1 addition & 1 deletion components/electric/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ RUN make compile ${MAKE_RELEASE_TASK}
FROM ${RUNNER_IMAGE} AS runner_setup

RUN apt-get update -y && \
apt-get install -y libstdc++6 openssl libncurses5 locales && \
apt-get install -y libstdc++6 openssl libncurses5 locales ca-certificates && \
apt-get clean && \
rm -f /var/lib/apt/lists/*_*

Expand Down
41 changes: 41 additions & 0 deletions components/electric/config/runtime.exs
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,47 @@ telemetry =

config :electric, :telemetry, telemetry

###
# OpenTelemetry
###

# Currently, traces are primarily collected during E2E test runs on CI. We're setting a fairly
# low delay between consecutive export batches to quickly go from one test to the next one.
config :opentelemetry, :processors, otel_batch_processor: %{scheduled_delay_ms: 1000}

otel_export = env!("OTEL_EXPORT", :string, nil)

case otel_export do
"honeycomb" ->
# Exporting directly to Honeycomb.io is left here mostly for debugging purposes. Prefer
# using the generic "otlp" export to a locally running OpenTelemetry Collector.
honeycomb_api_key = env!("HONEYCOMB_API_KEY", :string, "")

config :opentelemetry_exporter,
otlp_endpoint: "https://api.honeycomb.io",
otlp_headers: [{"x-honeycomb-team", honeycomb_api_key}],
otlp_compression: :gzip

"otlp" ->
if endpoint = env!("OTLP_ENDPOINT", :string, nil) do
config :opentelemetry_exporter,
otlp_endpoint: endpoint,
otlp_compression: :gzip
end

"debug" ->
# In this mode, each span is printed to stdout as soon as it ends, without batching.
config :opentelemetry, :processors,
otel_simple_processor: %{exporter: {:otel_exporter_stdout, []}}

_ ->
config :opentelemetry,
processors: [],
traces_exporter: :none
end

###

if config_env() in [:dev, :test] do
path = Path.expand("runtime.#{config_env()}.exs", __DIR__)

Expand Down
2 changes: 2 additions & 0 deletions components/electric/lib/electric/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ defmodule Electric.Application do
require Logger

def start(_type, _args) do
Electric.Telemetry.OpenTelemetry.setup()

children =
[
Electric.Telemetry,
Expand Down
2 changes: 1 addition & 1 deletion components/electric/lib/electric/postgres/extension.ex
Original file line number Diff line number Diff line change
Expand Up @@ -461,7 +461,7 @@ defmodule Electric.Postgres.Extension do
defp ensure_transaction(conn, fun) when is_function(fun, 1) do
case :epgsql.squery(conn, @is_transaction_sql) do
{:ok, _cols, [{"t"}]} -> fun.(conn)
{:ok, _cols, [{"f"}]} -> Client.with_transaction(conn, fun)
{:ok, _cols, [{"f"}]} -> Client.with_transaction(conn, fun, telemetry: false)
end
end

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ defmodule Electric.Postgres.Extension.SchemaCache do
alias Electric.Postgres.Extension.SchemaLoader
alias Electric.Postgres.Schema
alias Electric.Postgres.Extension.SchemaCache.Global
alias Electric.Telemetry.OpenTelemetry

require Logger

Expand Down Expand Up @@ -77,12 +78,20 @@ defmodule Electric.Postgres.Extension.SchemaCache do

@impl SchemaLoader
def load(origin) do
call(origin, {:load, :current})
OpenTelemetry.with_span(
"schema_cache.load",
[origin: origin, version: "current"],
fn -> call(origin, {:load, :current}) end
)
end

@impl SchemaLoader
def load(origin, version) do
call(origin, {:load, {:version, version}})
OpenTelemetry.with_span(
"schema_cache.load",
[origin: origin, version: version],
fn -> call(origin, {:load, {:version, version}}) end
)
end

@impl SchemaLoader
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ defmodule Electric.Postgres.Extension.SchemaLoader.Epgsql do
checkout!(pool, fn conn ->
query = ~s|ALTER SUBSCRIPTION "#{name}" REFRESH PUBLICATION WITH (copy_data = false)|

case :epgsql.squery(conn, query) do
case Client.squery(conn, query) do
{:ok, [], []} ->
:ok

Expand Down
6 changes: 5 additions & 1 deletion components/electric/lib/electric/postgres/repo.ex
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ defmodule Electric.Postgres.Repo do

alias Electric.Replication.Connectors

@telemetry_prefix [:electric, :repo]
@default_pool_size 10

def config(connector_config, opts) do
Expand All @@ -34,10 +35,13 @@ defmodule Electric.Postgres.Repo do
socket_options: Map.get(conn_opts, :tcp_opts, []),
pool_size: Keyword.get(opts, :pool_size, @default_pool_size),
log: false,
after_connect: {__MODULE__, :set_display_settings, []}
after_connect: {__MODULE__, :set_display_settings, []},
telemetry_prefix: @telemetry_prefix
]
end

def telemetry_prefix, do: @telemetry_prefix

def name(origin), do: :"#{inspect(__MODULE__)}:#{origin}"

# Explicitly set those configuration parameters that affect formatting of values of certain
Expand Down
65 changes: 39 additions & 26 deletions components/electric/lib/electric/replication/initial_sync.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ defmodule Electric.Replication.InitialSync do

alias Electric.Utils
alias Electric.Telemetry.Metrics
alias Electric.Telemetry.OpenTelemetry
alias Electric.Replication.Shapes
alias Electric.Postgres.{CachedWal, Extension}
alias Electric.Replication.Changes.{NewRecord, Transaction}
Expand Down Expand Up @@ -96,29 +97,36 @@ defmodule Electric.Replication.InitialSync do
{acc_graph, results, req_ids} ->
start = System.monotonic_time()

case Shapes.ShapeRequest.query_initial_data(
request,
conn,
schema_version,
origin,
context
) do
{:ok, data, graph} ->
Metrics.span_event(
span,
:shape_data,
%{duration: System.monotonic_time() - start},
%{shape_hash: request.hash}
)

{:cont,
{Utils.merge_graph_edges(acc_graph, graph),
Map.merge(results, data, fn _, {change, v1}, {_, v2} -> {change, v1 ++ v2} end),
[request.id | req_ids]}}

{:error, reason} ->
{:halt, {:error, reason}}
end
request_attrs = Map.take(request, [:request_id, :source_table, :target_table])
span_attrs = Map.merge(request_attrs, %{subscription_id: subscription_id})

OpenTelemetry.with_span("initial_sync.query_subscription_data", span_attrs, fn ->
Shapes.ShapeRequest.query_initial_data(request, conn, schema_version, origin, context)
|> case do
{:ok, data, graph} ->
Metrics.span_event(
span,
:shape_data,
%{duration: System.monotonic_time() - start},
%{shape_hash: request.hash}
)

graph_attrs = graph |> Graph.info() |> Map.new(fn {k, v} -> {"graph.#{k}", v} end)

OpenTelemetry.add_span_attributes(
Map.merge(graph_attrs, %{num_rows: map_size(data)})
)

{:cont,
{Utils.merge_graph_edges(acc_graph, graph),
Map.merge(results, data, fn _, {change, v1}, {_, v2} -> {change, v1 ++ v2} end),
[request.id | req_ids]}}

{:error, reason} ->
OpenTelemetry.record_exception(inspect(reason))
{:halt, {:error, reason}}
end
end)
end)
|> case do
{:error, reason} ->
Expand Down Expand Up @@ -174,7 +182,7 @@ defmodule Electric.Replication.InitialSync do
conn_opts = Connectors.get_connection_opts(opts)

Client.with_conn(conn_opts, fn conn ->
Client.with_transaction(
Client.with_transaction_mode(
"ISOLATION LEVEL REPEATABLE READ READ ONLY",
conn,
fn conn ->
Expand All @@ -188,11 +196,16 @@ defmodule Electric.Replication.InitialSync do
end)

{:ok, _, [{xmin_str}]} =
:epgsql.squery(conn, "SELECT pg_snapshot_xmin(pg_current_snapshot())")
Client.squery(conn, "SELECT pg_snapshot_xmin(pg_current_snapshot())")

xmin = String.to_integer(xmin_str)
send(parent, {:data_insertion_point, ref, xmin})
fun.(conn, xmin)

OpenTelemetry.with_span(
"initial_sync.readonly_txn_with_checkpoint",
[marker: marker],
fn -> fun.(conn, xmin) end
)
end
)
end)
Expand Down
46 changes: 31 additions & 15 deletions components/electric/lib/electric/replication/postgres/client.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ defmodule Electric.Replication.Postgres.Client do
alias Electric.Postgres.Extension
alias Electric.Postgres.Lsn
alias Electric.Replication.Connectors
alias Electric.Telemetry.OpenTelemetry

require Logger

Expand All @@ -35,11 +36,13 @@ defmodule Electric.Replication.Postgres.Client do
def with_conn(conn_opts, fun) do
# Best effort capture exit message, expect trap_exit to be set
wait_exit = fn conn, res ->
receive do
{:EXIT, ^conn, _} -> res
after
500 -> res
end
OpenTelemetry.with_span("epgsql.await_exit", [], fn ->
receive do
{:EXIT, ^conn, _} -> res
after
500 -> res
end
end)
end

Logger.info("Postgres.Client.with_conn(#{inspect(sanitize_conn_opts(conn_opts))})")
Expand All @@ -52,7 +55,9 @@ defmodule Electric.Replication.Postgres.Client do
with {:ok, ^conn} <- :epgsql.connect(conn, ip_addr, username, password, epgsql_conn_opts),
:ok <- set_display_settings(conn) do
try do
fun.(conn)
OpenTelemetry.with_span("epgsql.with_conn", [], fn ->
fun.(conn)
end)
rescue
e ->
Logger.error(Exception.format(:error, e, __STACKTRACE__))
Expand Down Expand Up @@ -94,10 +99,20 @@ defmodule Electric.Replication.Postgres.Client do
Wrapper for :epgsql.with_transaction/3 that always sets `reraise` to `true` by default and makes `begin_opts` a
standalone function argument for easier code reading.
"""
def with_transaction(mode \\ "", conn, fun, in_opts \\ [])
when is_binary(mode) and is_list(in_opts) do
def with_transaction_mode(mode, conn, fun, in_opts \\ [])
when is_binary(mode) and is_pid(conn) and is_list(in_opts) do
opts = Keyword.merge([reraise: true, begin_opts: mode], in_opts)
:epgsql.with_transaction(conn, fun, opts)
fun = fn -> :epgsql.with_transaction(conn, fun, opts) end

if Keyword.get(in_opts, :telemetry, true) do
OpenTelemetry.with_span("epgsql.with_transaction", %{"txn.mode" => mode}, fun)
else
fun.()
end
end

def with_transaction(conn, fun, opts \\ []) when is_pid(conn) and is_list(opts) do
with_transaction_mode("", conn, fun, opts)
end

def close(conn) do
Expand Down Expand Up @@ -148,9 +163,10 @@ defmodule Electric.Replication.Postgres.Client do
end
end

defp squery(conn, query) do
Logger.debug("Postgres.Client: #{query}")
:epgsql.squery(conn, query)
def squery(conn, query) do
OpenTelemetry.with_span("epgsql.squery", %{"db.statement" => query}, fn ->
:epgsql.squery(conn, query)
end)
end

@spec get_system_id(connection()) :: {:ok, binary}
Expand Down Expand Up @@ -269,7 +285,7 @@ defmodule Electric.Replication.Postgres.Client do
#
# See `Electric.Postgres.display_settings/0` for details.
defp set_display_settings(conn) do
results = :epgsql.squery(conn, Electric.Postgres.display_settings() |> Enum.join(";"))
results = squery(conn, Electric.Postgres.display_settings() |> Enum.join(";"))
Enum.find(results, :ok, &(not match?({:ok, [], []}, &1)))
end

Expand Down Expand Up @@ -345,8 +361,8 @@ defmodule Electric.Replication.Postgres.Client do
{:ok, {short :: String.t(), long :: String.t(), cluster_id :: String.t()}}
| {:error, term()}
def get_server_versions(conn) do
with {:ok, _, [{short}]} <- :epgsql.squery(conn, "SHOW SERVER_VERSION"),
{:ok, _, [{long}]} <- :epgsql.squery(conn, "SELECT VERSION()"),
with {:ok, _, [{short}]} <- squery(conn, "SHOW SERVER_VERSION"),
{:ok, _, [{long}]} <- squery(conn, "SELECT VERSION()"),
{:ok, _, _, [{cluster_id}]} <- Extension.save_and_get_cluster_id(conn) do
{:ok, {short, long, cluster_id}}
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ defmodule Electric.Replication.Postgres.Writer do
# writes to PG.
#
# [1]: https://www.postgresql.org/docs/14/runtime-config-custom.html
{:ok, [], []} = :epgsql.squery(conn, "SET electric.session_replication_role = replica")
{:ok, [], []} = Client.squery(conn, "SET electric.session_replication_role = replica")

{:via, :gproc, producer_name} = Keyword.fetch!(opts, :producer)

Expand Down Expand Up @@ -88,7 +88,7 @@ defmodule Electric.Replication.Postgres.Writer do

Client.with_transaction(state.conn, fn conn ->
Enum.each(statements, fn stmt ->
case :epgsql.squery(conn, stmt) do
case Client.squery(conn, stmt) do
{:ok, _} ->
:ok

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,11 @@ defmodule Electric.Replication.PostgresConnectorMng do
{:ok, state, {:continue, :init}}
end

@impl GenServer
def terminate(_reason, _state) do
Logger.info("Terminating #{inspect(__MODULE__)}")
end

defp ets_table_name(origin) do
String.to_atom(inspect(__MODULE__) <> ":" <> origin)
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ defmodule Electric.Replication.Shapes.Querying do
alias Electric.Replication.Changes
alias Electric.Replication.Changes.Ownership
alias Electric.Replication.Eval
alias Electric.Replication.Postgres.Client
alias Electric.Replication.Shapes.ChangeProcessing
alias Electric.Replication.Shapes.ShapeRequest.Layer
alias Electric.Replication.Shapes.SentRowsGraph
Expand Down Expand Up @@ -105,7 +106,7 @@ defmodule Electric.Replication.Shapes.Querying do

# Important reason for `squery` usage here (as opposed to what might be more reasonable `equery`) is that we need
# string representation of all fields, so we don't want to do double-conversion inside epgsql and back
with {:ok, _, rows} <- :epgsql.squery(conn, query) do
with {:ok, _, rows} <- Client.squery(conn, query) do
curr_records =
rows_to_changes_with_tags(rows, Enum.map(table_info.columns, & &1.name), layer, origin)

Expand Down
Loading

0 comments on commit febb710

Please sign in to comment.