Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add SpanProcessor for OpenTelemetry #875

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,15 @@ if config_env() == :test do
send_result: :sync,
send_max_attempts: 1,
dedup_events: false,
test_mode: true
test_mode: true,
tracing: true

config :logger, backends: []

config :opentelemetry, span_processor: {Sentry.OpenTelemetry.SpanProcessor, []}

config :opentelemetry,
sampler: {Sentry.OpenTelemetry.Sampler, [drop: ["Elixir.Oban.Stager process"]]}
end

config :phoenix, :json_library, if(Code.ensure_loaded?(JSON), do: JSON, else: Jason)
8 changes: 8 additions & 0 deletions lib/sentry/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,13 @@ defmodule Sentry.Application do

integrations_config = Config.integrations()

maybe_span_storage =
if Config.tracing?() do
[Sentry.OpenTelemetry.SpanStorage]
else
[]
end

children =
[
{Registry, keys: :unique, name: Sentry.Transport.SenderRegistry},
Expand All @@ -39,6 +46,7 @@ defmodule Sentry.Application do
]}
] ++
maybe_http_client_spec ++
maybe_span_storage ++
[Sentry.Transport.SenderPool]

cache_loaded_applications()
Expand Down
14 changes: 14 additions & 0 deletions lib/sentry/config.ex
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,17 @@ defmodule Sentry.Config do
be used as the value for this option.
"""
],
tracing: [
type: :boolean,
default: false,
doc: """
Whether to enable tracing functionality based on OpenTelemetry. When enabled,
the Sentry SDK will use OpenTelemetry to collect and report distributed tracing
data to Sentry.

This feature requires `opentelemetry` package and its integrations with Bandit, Phoenix or Ecto.
"""
],
included_environments: [
type: {:or, [{:in, [:all]}, {:list, {:or, [:atom, :string]}}]},
deprecated: "Use :dsn to control whether to send events to Sentry.",
Expand Down Expand Up @@ -627,6 +638,9 @@ defmodule Sentry.Config do
@spec integrations() :: keyword()
def integrations, do: fetch!(:integrations)

@spec tracing?() :: boolean()
def tracing?, do: fetch!(:tracing)

@spec put_config(atom(), term()) :: :ok
def put_config(key, value) when is_atom(key) do
unless key in @valid_keys do
Expand Down
27 changes: 27 additions & 0 deletions lib/sentry/opentelemetry/sampler.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
defmodule Sentry.OpenTelemetry.Sampler do
@moduledoc false

def setup(config) do
config
end

def description(_) do
"SentrySampler"
end

def should_sample(
_ctx,
_trace_id,
_links,
span_name,
_span_kind,
_attributes,
config
) do
if span_name in config[:drop] do
{:drop, [], []}
else
{:record_and_sample, [], []}
end
end
end
199 changes: 199 additions & 0 deletions lib/sentry/opentelemetry/span_processor.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
if Code.ensure_loaded?(OpenTelemetry) do
defmodule Sentry.OpenTelemetry.SpanProcessor do
@moduledoc false

require OpenTelemetry.SemConv.ClientAttributes, as: ClientAttributes
require OpenTelemetry.SemConv.Incubating.DBAttributes, as: DBAttributes
require OpenTelemetry.SemConv.Incubating.HTTPAttributes, as: HTTPAttributes
require OpenTelemetry.SemConv.Incubating.URLAttributes, as: URLAttributes
require OpenTelemetry.SemConv.Incubating.MessagingAttributes, as: MessagingAttributes
@behaviour :otel_span_processor

require Logger

alias Sentry.{Transaction, OpenTelemetry.SpanStorage, OpenTelemetry.SpanRecord}
alias Sentry.Interfaces.Span

@impl true
def on_start(_ctx, otel_span, _config) do
span_record = SpanRecord.new(otel_span)

SpanStorage.store_span(span_record)

otel_span
end

@impl true
def on_end(otel_span, _config) do
span_record = SpanRecord.new(otel_span)

SpanStorage.update_span(span_record)

if span_record.parent_span_id == nil do
root_span_record = SpanStorage.get_root_span(span_record.span_id)
child_span_records = SpanStorage.get_child_spans(span_record.span_id)
transaction = build_transaction(root_span_record, child_span_records)

result =
case Sentry.send_transaction(transaction) do
{:ok, _id} ->
true

:ignored ->
true

{:error, error} ->
Logger.error("Failed to send transaction to Sentry: #{inspect(error)}")
{:error, :invalid_span}
end

:ok = SpanStorage.remove_span(span_record.span_id)

result
else
true
end
end

@impl true
def force_flush(_config) do
:ok
end

defp build_transaction(root_span_record, child_span_records) do
root_span = build_span(root_span_record)
child_spans = Enum.map(child_span_records, &build_span(&1))

Transaction.new(%{
span_id: root_span.span_id,
transaction: transaction_name(root_span_record),
transaction_info: %{source: :custom},
start_timestamp: root_span_record.start_time,
timestamp: root_span_record.end_time,
contexts: %{
trace: build_trace_context(root_span_record),
otel: build_otel_context(root_span_record)
},
spans: child_spans
})
end

defp transaction_name(
%{attributes: %{unquote(to_string(MessagingAttributes.messaging_system())) => :oban}} =
span_record
) do
span_record.attributes["oban.job.worker"]
end

defp transaction_name(span_record), do: span_record.name

defp build_trace_context(span_record) do
{op, description} = get_op_description(span_record)

%{
trace_id: span_record.trace_id,
span_id: span_record.span_id,
parent_span_id: span_record.parent_span_id,
op: op,
description: description,
origin: span_record.origin,
data: span_record.attributes
}
end

defp build_otel_context(span_record), do: span_record.attributes

defp get_op_description(
%{
attributes: %{
unquote(to_string(HTTPAttributes.http_request_method())) => http_request_method
}
} = span_record
) do
op = "http.#{span_record.kind}"

client_address =
Map.get(span_record.attributes, to_string(ClientAttributes.client_address()))

url_path = Map.get(span_record.attributes, to_string(URLAttributes.url_path()))

description =
to_string(http_request_method) <>
((client_address && " from #{client_address}") || "") <>
((url_path && " #{url_path}") || "")

{op, description}
end

defp get_op_description(
%{attributes: %{unquote(to_string(DBAttributes.db_system())) => _db_system}} =
span_record
) do
db_query_text = Map.get(span_record.attributes, "db.statement")

{"db", db_query_text}
end

defp get_op_description(%{
attributes:
%{unquote(to_string(MessagingAttributes.messaging_system())) => :oban} = attributes
}) do
{"queue.process", attributes["oban.job.worker"]}
end

defp get_op_description(span_record) do
{span_record.name, span_record.name}
end

defp build_span(span_record) do
{op, description} = get_op_description(span_record)

%Span{
op: op,
description: description,
start_timestamp: span_record.start_time,
timestamp: span_record.end_time,
trace_id: span_record.trace_id,
span_id: span_record.span_id,
parent_span_id: span_record.parent_span_id,
origin: span_record.origin,
data: Map.put(span_record.attributes, "otel.kind", span_record.kind),
status: span_status(span_record)
}
end

defp span_status(%{
attributes: %{
unquote(to_string(HTTPAttributes.http_response_status_code())) =>
http_response_status_code
}
}) do
to_status(http_response_status_code)
end

defp span_status(_span_record), do: nil

# WebSocket upgrade spans doesn't have a HTTP status
defp to_status(nil), do: nil

defp to_status(status) when status in 200..299, do: "ok"

for {status, string} <- %{
400 => "invalid_argument",
401 => "unauthenticated",
403 => "permission_denied",
404 => "not_found",
409 => "already_exists",
429 => "resource_exhausted",
499 => "cancelled",
500 => "internal_error",
501 => "unimplemented",
503 => "unavailable",
504 => "deadline_exceeded"
} do
defp to_status(unquote(status)), do: unquote(string)
end

defp to_status(_any), do: "unknown_error"
end
end
72 changes: 72 additions & 0 deletions lib/sentry/opentelemetry/span_record.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
if Code.ensure_loaded?(OpenTelemetry) do
defmodule Sentry.OpenTelemetry.SpanRecord do
require Record
require OpenTelemetry

@fields Record.extract(:span, from_lib: "opentelemetry/include/otel_span.hrl")
Record.defrecordp(:span, @fields)

defstruct @fields ++ [:origin]

def new(otel_span) do
otel_attrs = span(otel_span)

{:attributes, _, _, _, attributes} = otel_attrs[:attributes]

origin =
case otel_attrs[:instrumentation_scope] do
{:instrumentation_scope, origin, _version, _} ->
origin

_ ->
:undefined
end

attrs =
otel_attrs
|> Keyword.delete(:attributes)
|> Keyword.merge(
trace_id: cast_trace_id(otel_attrs[:trace_id]),
span_id: cast_span_id(otel_attrs[:span_id]),
parent_span_id: cast_span_id(otel_attrs[:parent_span_id]),
origin: origin,
start_time: cast_timestamp(otel_attrs[:start_time]),
end_time: cast_timestamp(otel_attrs[:end_time]),
attributes: normalize_attributes(attributes)
)
|> Map.new()

struct(__MODULE__, attrs)
end

defp normalize_attributes(attributes) do
Enum.map(attributes, fn {key, value} ->
{to_string(key), value}
end)
|> Map.new()
end

defp cast_span_id(nil), do: nil
defp cast_span_id(:undefined), do: nil
defp cast_span_id(span_id), do: bytes_to_hex(span_id, 16)

defp cast_trace_id(trace_id), do: bytes_to_hex(trace_id, 32)

defp cast_timestamp(:undefined), do: nil
defp cast_timestamp(nil), do: nil

defp cast_timestamp(timestamp) do
nano_timestamp = OpenTelemetry.timestamp_to_nano(timestamp)
{:ok, datetime} = DateTime.from_unix(div(nano_timestamp, 1_000_000), :millisecond)

DateTime.to_iso8601(datetime)
end

defp bytes_to_hex(bytes, length) do
case(:otel_utils.format_binary_string("~#{length}.16.0b", [bytes])) do
{:ok, result} -> result
{:error, _} -> raise "Failed to convert bytes to hex: #{inspect(bytes)}"
end
end
end
end
Loading
Loading