Skip to content

Commit 456eb45

Browse files
committed
Add SpanProcessor for OpenTelemetry
1 parent 5f6a0c9 commit 456eb45

File tree

15 files changed

+989
-7
lines changed

15 files changed

+989
-7
lines changed

config/config.exs

+5
Original file line numberDiff line numberDiff line change
@@ -16,3 +16,8 @@ if config_env() == :test do
1616
end
1717

1818
config :phoenix, :json_library, if(Code.ensure_loaded?(JSON), do: JSON, else: Jason)
19+
20+
config :opentelemetry, span_processor: {Sentry.OpenTelemetry.SpanProcessor, []}
21+
22+
config :opentelemetry,
23+
sampler: {Sentry.OpenTelemetry.Sampler, [drop: ["Elixir.Oban.Stager process"]]}

lib/sentry/application.ex

+1
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ defmodule Sentry.Application do
3232
Sentry.Sources,
3333
Sentry.Dedupe,
3434
Sentry.ClientReport.Sender,
35+
Sentry.OpenTelemetry.SpanStorage,
3536
{Sentry.Integrations.CheckInIDMappings,
3637
[
3738
max_expected_check_in_time:

lib/sentry/opentelemetry/sampler.ex

+27
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
defmodule Sentry.OpenTelemetry.Sampler do
2+
@moduledoc false
3+
4+
def setup(config) do
5+
config
6+
end
7+
8+
def description(_) do
9+
"SentrySampler"
10+
end
11+
12+
def should_sample(
13+
_ctx,
14+
_trace_id,
15+
_links,
16+
span_name,
17+
_span_kind,
18+
_attributes,
19+
config
20+
) do
21+
if span_name in config[:drop] do
22+
{:drop, [], []}
23+
else
24+
{:record_and_sample, [], []}
25+
end
26+
end
27+
end
+199
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,199 @@
1+
if Code.ensure_loaded?(OpenTelemetry) do
2+
defmodule Sentry.OpenTelemetry.SpanProcessor do
3+
@moduledoc false
4+
5+
require OpenTelemetry.SemConv.ClientAttributes, as: ClientAttributes
6+
require OpenTelemetry.SemConv.Incubating.DBAttributes, as: DBAttributes
7+
require OpenTelemetry.SemConv.Incubating.HTTPAttributes, as: HTTPAttributes
8+
require OpenTelemetry.SemConv.Incubating.URLAttributes, as: URLAttributes
9+
require OpenTelemetry.SemConv.Incubating.MessagingAttributes, as: MessagingAttributes
10+
@behaviour :otel_span_processor
11+
12+
require Logger
13+
14+
alias Sentry.{Transaction, OpenTelemetry.SpanStorage, OpenTelemetry.SpanRecord}
15+
alias Sentry.Interfaces.Span
16+
17+
@impl true
18+
def on_start(_ctx, otel_span, _config) do
19+
span_record = SpanRecord.new(otel_span)
20+
21+
SpanStorage.store_span(span_record)
22+
23+
otel_span
24+
end
25+
26+
@impl true
27+
def on_end(otel_span, _config) do
28+
span_record = SpanRecord.new(otel_span)
29+
30+
SpanStorage.update_span(span_record)
31+
32+
if span_record.parent_span_id == nil do
33+
root_span_record = SpanStorage.get_root_span(span_record.span_id)
34+
child_span_records = SpanStorage.get_child_spans(span_record.span_id)
35+
transaction = build_transaction(root_span_record, child_span_records)
36+
37+
result =
38+
case Sentry.send_transaction(transaction) do
39+
{:ok, _id} ->
40+
true
41+
42+
:ignored ->
43+
true
44+
45+
{:error, error} ->
46+
Logger.error("Failed to send transaction to Sentry: #{inspect(error)}")
47+
{:error, :invalid_span}
48+
end
49+
50+
:ok = SpanStorage.remove_span(span_record.span_id)
51+
52+
result
53+
else
54+
true
55+
end
56+
end
57+
58+
@impl true
59+
def force_flush(_config) do
60+
:ok
61+
end
62+
63+
defp build_transaction(root_span_record, child_span_records) do
64+
root_span = build_span(root_span_record)
65+
child_spans = Enum.map(child_span_records, &build_span(&1))
66+
67+
Transaction.new(%{
68+
span_id: root_span.span_id,
69+
transaction: transaction_name(root_span_record),
70+
transaction_info: %{source: :custom},
71+
start_timestamp: root_span_record.start_time,
72+
timestamp: root_span_record.end_time,
73+
contexts: %{
74+
trace: build_trace_context(root_span_record),
75+
otel: build_otel_context(root_span_record)
76+
},
77+
spans: child_spans
78+
})
79+
end
80+
81+
defp transaction_name(
82+
%{attributes: %{unquote(to_string(MessagingAttributes.messaging_system())) => :oban}} =
83+
span_record
84+
) do
85+
span_record.attributes["oban.job.worker"]
86+
end
87+
88+
defp transaction_name(span_record), do: span_record.name
89+
90+
defp build_trace_context(span_record) do
91+
{op, description} = get_op_description(span_record)
92+
93+
%{
94+
trace_id: span_record.trace_id,
95+
span_id: span_record.span_id,
96+
parent_span_id: span_record.parent_span_id,
97+
op: op,
98+
description: description,
99+
origin: span_record.origin,
100+
data: span_record.attributes
101+
}
102+
end
103+
104+
defp build_otel_context(span_record), do: span_record.attributes
105+
106+
defp get_op_description(
107+
%{
108+
attributes: %{
109+
unquote(to_string(HTTPAttributes.http_request_method())) => http_request_method
110+
}
111+
} = span_record
112+
) do
113+
op = "http.#{span_record.kind}"
114+
115+
client_address =
116+
Map.get(span_record.attributes, to_string(ClientAttributes.client_address()))
117+
118+
url_path = Map.get(span_record.attributes, to_string(URLAttributes.url_path()))
119+
120+
description =
121+
to_string(http_request_method) <>
122+
((client_address && " from #{client_address}") || "") <>
123+
((url_path && " #{url_path}") || "")
124+
125+
{op, description}
126+
end
127+
128+
defp get_op_description(
129+
%{attributes: %{unquote(to_string(DBAttributes.db_system())) => _db_system}} =
130+
span_record
131+
) do
132+
db_query_text = Map.get(span_record.attributes, "db.statement")
133+
134+
{"db", db_query_text}
135+
end
136+
137+
defp get_op_description(%{
138+
attributes:
139+
%{unquote(to_string(MessagingAttributes.messaging_system())) => :oban} = attributes
140+
}) do
141+
{"queue.process", attributes["oban.job.worker"]}
142+
end
143+
144+
defp get_op_description(span_record) do
145+
{span_record.name, span_record.name}
146+
end
147+
148+
defp build_span(span_record) do
149+
{op, description} = get_op_description(span_record)
150+
151+
%Span{
152+
op: op,
153+
description: description,
154+
start_timestamp: span_record.start_time,
155+
timestamp: span_record.end_time,
156+
trace_id: span_record.trace_id,
157+
span_id: span_record.span_id,
158+
parent_span_id: span_record.parent_span_id,
159+
origin: span_record.origin,
160+
data: Map.put(span_record.attributes, "otel.kind", span_record.kind),
161+
status: span_status(span_record)
162+
}
163+
end
164+
165+
defp span_status(%{
166+
attributes: %{
167+
unquote(to_string(HTTPAttributes.http_response_status_code())) =>
168+
http_response_status_code
169+
}
170+
}) do
171+
to_status(http_response_status_code)
172+
end
173+
174+
defp span_status(_span_record), do: nil
175+
176+
# WebSocket upgrade spans doesn't have a HTTP status
177+
defp to_status(nil), do: nil
178+
179+
defp to_status(status) when status in 200..299, do: "ok"
180+
181+
for {status, string} <- %{
182+
400 => "invalid_argument",
183+
401 => "unauthenticated",
184+
403 => "permission_denied",
185+
404 => "not_found",
186+
409 => "already_exists",
187+
429 => "resource_exhausted",
188+
499 => "cancelled",
189+
500 => "internal_error",
190+
501 => "unimplemented",
191+
503 => "unavailable",
192+
504 => "deadline_exceeded"
193+
} do
194+
defp to_status(unquote(status)), do: unquote(string)
195+
end
196+
197+
defp to_status(_any), do: "unknown_error"
198+
end
199+
end
+72
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
if Code.ensure_loaded?(OpenTelemetry) do
2+
defmodule Sentry.OpenTelemetry.SpanRecord do
3+
require Record
4+
require OpenTelemetry
5+
6+
@fields Record.extract(:span, from_lib: "opentelemetry/include/otel_span.hrl")
7+
Record.defrecordp(:span, @fields)
8+
9+
defstruct @fields ++ [:origin]
10+
11+
def new(otel_span) do
12+
otel_attrs = span(otel_span)
13+
14+
{:attributes, _, _, _, attributes} = otel_attrs[:attributes]
15+
16+
origin =
17+
case otel_attrs[:instrumentation_scope] do
18+
{:instrumentation_scope, origin, _version, _} ->
19+
origin
20+
21+
_ ->
22+
:undefined
23+
end
24+
25+
attrs =
26+
otel_attrs
27+
|> Keyword.delete(:attributes)
28+
|> Keyword.merge(
29+
trace_id: cast_trace_id(otel_attrs[:trace_id]),
30+
span_id: cast_span_id(otel_attrs[:span_id]),
31+
parent_span_id: cast_span_id(otel_attrs[:parent_span_id]),
32+
origin: origin,
33+
start_time: cast_timestamp(otel_attrs[:start_time]),
34+
end_time: cast_timestamp(otel_attrs[:end_time]),
35+
attributes: normalize_attributes(attributes)
36+
)
37+
|> Map.new()
38+
39+
struct(__MODULE__, attrs)
40+
end
41+
42+
defp normalize_attributes(attributes) do
43+
Enum.map(attributes, fn {key, value} ->
44+
{to_string(key), value}
45+
end)
46+
|> Map.new()
47+
end
48+
49+
defp cast_span_id(nil), do: nil
50+
defp cast_span_id(:undefined), do: nil
51+
defp cast_span_id(span_id), do: bytes_to_hex(span_id, 16)
52+
53+
defp cast_trace_id(trace_id), do: bytes_to_hex(trace_id, 32)
54+
55+
defp cast_timestamp(:undefined), do: nil
56+
defp cast_timestamp(nil), do: nil
57+
58+
defp cast_timestamp(timestamp) do
59+
nano_timestamp = OpenTelemetry.timestamp_to_nano(timestamp)
60+
{:ok, datetime} = DateTime.from_unix(div(nano_timestamp, 1_000_000), :millisecond)
61+
62+
DateTime.to_iso8601(datetime)
63+
end
64+
65+
defp bytes_to_hex(bytes, length) do
66+
case(:otel_utils.format_binary_string("~#{length}.16.0b", [bytes])) do
67+
{:ok, result} -> result
68+
{:error, _} -> raise "Failed to convert bytes to hex: #{inspect(bytes)}"
69+
end
70+
end
71+
end
72+
end

0 commit comments

Comments
 (0)