Skip to content

Commit

Permalink
use semconv V1_27_0
Browse files Browse the repository at this point in the history
  • Loading branch information
dimastbk committed Sep 13, 2024
1 parent 7f722ec commit 1e7af5e
Show file tree
Hide file tree
Showing 3 changed files with 172 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ async def async_consume_hook(span, record, args, kwargs):
from opentelemetry.instrumentation.aiokafka.version import __version__
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
from opentelemetry.instrumentation.utils import unwrap
from opentelemetry.semconv.schemas import Schemas


class AIOKafkaInstrumentor(BaseInstrumentor):
Expand Down Expand Up @@ -108,7 +109,7 @@ def _instrument(self, **kwargs):
__name__,
__version__,
tracer_provider=tracer_provider,
schema_url="https://opentelemetry.io/schemas/1.11.0",
schema_url=Schemas.V1_27_0.value,
)

wrap_function_wrapper(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@
from opentelemetry import context, propagate, trace
from opentelemetry.context import Context
from opentelemetry.propagators import textmap
from opentelemetry.semconv.trace import SpanAttributes
from opentelemetry.semconv._incubating.attributes import messaging_attributes
from opentelemetry.semconv.attributes import server_attributes
from opentelemetry.trace import Tracer
from opentelemetry.trace.span import Span

Expand All @@ -21,6 +22,16 @@ def _extract_bootstrap_servers(
return client._bootstrap_servers


def _extract_client_id(client: aiokafka.AIOKafkaClient) -> str:
return client._client_id


def _extract_consumer_group(
consumer: aiokafka.AIOKafkaConsumer,
) -> Optional[str]:
return consumer._group_id


def _extract_argument(
key: str,
position: int,
Expand Down Expand Up @@ -128,23 +139,108 @@ def set(
_aiokafka_setter = AIOKafkaContextSetter()


def _enrich_span(
def _enrich_base_span(
span: Span,
*,
bootstrap_servers: Union[str, List[str]],
client_id: str,
topic: str,
) -> None:
span.set_attribute(
messaging_attributes.MESSAGING_SYSTEM,
messaging_attributes.MessagingSystemValues.KAFKA.value,
)
span.set_attribute(
server_attributes.SERVER_ADDRESS, json.dumps(bootstrap_servers)
)
span.set_attribute(messaging_attributes.MESSAGING_CLIENT_ID, client_id)
span.set_attribute(messaging_attributes.MESSAGING_DESTINATION_NAME, topic)


def _enrich_send_span(
span: Span,
*,
bootstrap_servers: Union[str, List[str]],
client_id: str,
topic: str,
partition: Optional[int],
key: Optional[str],
) -> None:
if not span.is_recording():
return

span.set_attribute(SpanAttributes.MESSAGING_SYSTEM, "kafka")
span.set_attribute(SpanAttributes.MESSAGING_DESTINATION, topic)
_enrich_base_span(
span,
bootstrap_servers=bootstrap_servers,
client_id=client_id,
topic=topic,
)

if partition is not None:
span.set_attribute(SpanAttributes.MESSAGING_KAFKA_PARTITION, partition)
span.set_attribute(
messaging_attributes.MESSAGING_DESTINATION_PARTITION_ID,
str(partition),
)

span.set_attribute(messaging_attributes.MESSAGING_OPERATION_NAME, "send")
span.set_attribute(
SpanAttributes.MESSAGING_URL, json.dumps(bootstrap_servers)
messaging_attributes.MESSAGING_OPERATION_TYPE,
messaging_attributes.MessagingOperationTypeValues.PUBLISH.value,
)

if key is not None:
span.set_attribute(
messaging_attributes.MESSAGING_KAFKA_MESSAGE_KEY, str(key)
)


def _enrich_anext_span(
span: Span,
*,
bootstrap_servers: Union[str, List[str]],
client_id: str,
consumer_group: Optional[str],
topic: str,
partition: Optional[int],
key: Optional[str],
offset: int,
) -> None:
if not span.is_recording():
return

_enrich_base_span(
span,
bootstrap_servers=bootstrap_servers,
client_id=client_id,
topic=topic,
)

if consumer_group is not None:
span.set_attribute(
messaging_attributes.MESSAGING_CONSUMER_GROUP_NAME, consumer_group
)

if partition is not None:
span.set_attribute(
messaging_attributes.MESSAGING_DESTINATION_PARTITION_ID,
str(partition),
)

span.set_attribute(
messaging_attributes.MESSAGING_OPERATION_NAME, "receive"
)
span.set_attribute(
messaging_attributes.MESSAGING_OPERATION_TYPE,
messaging_attributes.MessagingOperationTypeValues.RECEIVE.value,
)

if key is not None:
span.set_attribute(
messaging_attributes.MESSAGING_KAFKA_MESSAGE_KEY, key
)

span.set_attribute(
messaging_attributes.MESSAGING_KAFKA_MESSAGE_OFFSET, offset
)


Expand All @@ -168,12 +264,21 @@ async def _traced_send(

topic = _extract_send_topic(args, kwargs)
bootstrap_servers = _extract_bootstrap_servers(instance.client)
client_id = _extract_client_id(instance.client)
key = _extract_send_key(args, kwargs)
partition = await _extract_send_partition(instance, args, kwargs)
span_name = _get_span_name("send", topic)
with tracer.start_as_current_span(
span_name, kind=trace.SpanKind.PRODUCER
) as span:
_enrich_span(span, bootstrap_servers, topic, partition)
_enrich_send_span(
span,
bootstrap_servers=bootstrap_servers,
client_id=client_id,
topic=topic,
partition=partition,
key=key,
)
propagate.inject(
headers,
context=trace.set_span_in_context(span),
Expand All @@ -196,6 +301,8 @@ async def _create_consumer_span(
record: ConsumerRecord,
extracted_context: Context,
bootstrap_servers: Union[str, List[str]],
client_id: str,
consumer_group: Optional[str],
args: Tuple[Any],
kwargs: Dict[str, Any],
):
Expand All @@ -207,7 +314,16 @@ async def _create_consumer_span(
) as span:
new_context = trace.set_span_in_context(span, extracted_context)
token = context.attach(new_context)
_enrich_span(span, bootstrap_servers, record.topic, record.partition)
_enrich_anext_span(
span,
bootstrap_servers=bootstrap_servers,
client_id=client_id,
consumer_group=consumer_group,
topic=record.topic,
partition=record.partition,
key=record.key,
offset=record.offset,
)
try:
if callable(async_consume_hook):
await async_consume_hook(span, record, args, kwargs)
Expand All @@ -229,6 +345,8 @@ async def _traced_next(

if record:
bootstrap_servers = _extract_bootstrap_servers(instance._client)
client_id = _extract_client_id(instance._client)
consumer_group = _extract_consumer_group(instance)

extracted_context = propagate.extract(
record.headers, getter=_aiokafka_getter
Expand All @@ -239,6 +357,8 @@ async def _traced_next(
record,
extracted_context,
bootstrap_servers,
client_id,
consumer_group,
args,
kwargs,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,9 @@ def test_context_getter(self) -> None:
@mock.patch(
"opentelemetry.instrumentation.aiokafka.utils._extract_send_partition"
)
@mock.patch("opentelemetry.instrumentation.aiokafka.utils._enrich_span")
@mock.patch(
"opentelemetry.instrumentation.aiokafka.utils._enrich_send_span"
)
@mock.patch("opentelemetry.trace.set_span_in_context")
@mock.patch("opentelemetry.propagate.inject")
async def test_wrap_send_with_topic_as_arg(
Expand All @@ -84,7 +86,9 @@ async def test_wrap_send_with_topic_as_arg(
@mock.patch(
"opentelemetry.instrumentation.aiokafka.utils._extract_send_partition"
)
@mock.patch("opentelemetry.instrumentation.aiokafka.utils._enrich_span")
@mock.patch(
"opentelemetry.instrumentation.aiokafka.utils._enrich_send_span"
)
@mock.patch("opentelemetry.trace.set_span_in_context")
@mock.patch("opentelemetry.propagate.inject")
async def test_wrap_send_with_topic_as_kwarg(
Expand Down Expand Up @@ -137,9 +141,11 @@ async def wrap_send_helper(
span = tracer.start_as_current_span().__enter__.return_value
enrich_span.assert_called_once_with(
span,
extract_bootstrap_servers.return_value,
self.topic_name,
extract_send_partition.return_value,
bootstrap_servers=extract_bootstrap_servers.return_value,
client_id=kafka_producer.client._client_id,
topic=self.topic_name,
partition=extract_send_partition.return_value,
key=None,
)

set_span_in_context.assert_called_once_with(span)
Expand All @@ -162,8 +168,16 @@ async def wrap_send_helper(
@mock.patch(
"opentelemetry.instrumentation.aiokafka.utils._extract_bootstrap_servers"
)
@mock.patch(
"opentelemetry.instrumentation.aiokafka.utils._extract_client_id"
)
@mock.patch(
"opentelemetry.instrumentation.aiokafka.utils._extract_consumer_group"
)
async def test_wrap_next(
self,
extract_consumer_group: mock.MagicMock,
extract_client_id: mock.MagicMock,
extract_bootstrap_servers: mock.MagicMock,
_create_consumer_span: mock.MagicMock,
extract: mock.MagicMock,
Expand All @@ -183,6 +197,12 @@ async def test_wrap_next(
)
bootstrap_servers = extract_bootstrap_servers.return_value

extract_client_id.assert_called_once_with(kafka_consumer._client)
client_id = extract_client_id.return_value

extract_consumer_group.assert_called_once_with(kafka_consumer)
consumer_group = extract_consumer_group.return_value

original_next_callback.assert_awaited_once_with(
*self.args, **self.kwargs
)
Expand All @@ -199,13 +219,17 @@ async def test_wrap_next(
record,
context,
bootstrap_servers,
client_id,
consumer_group,
self.args,
self.kwargs,
)

@mock.patch("opentelemetry.trace.set_span_in_context")
@mock.patch("opentelemetry.context.attach")
@mock.patch("opentelemetry.instrumentation.aiokafka.utils._enrich_span")
@mock.patch(
"opentelemetry.instrumentation.aiokafka.utils._enrich_anext_span"
)
@mock.patch("opentelemetry.context.detach")
async def test_create_consumer_span(
self,
Expand All @@ -219,13 +243,17 @@ async def test_create_consumer_span(
bootstrap_servers = mock.MagicMock()
extracted_context = mock.MagicMock()
record = mock.MagicMock()
client_id = mock.MagicMock()
consumer_group = mock.MagicMock()

await _create_consumer_span(
tracer,
consume_hook,
record,
extracted_context,
bootstrap_servers,
client_id,
consumer_group,
self.args,
self.kwargs,
)
Expand All @@ -242,7 +270,14 @@ async def test_create_consumer_span(
attach.assert_called_once_with(set_span_in_context.return_value)

enrich_span.assert_called_once_with(
span, bootstrap_servers, record.topic, record.partition
span,
bootstrap_servers=bootstrap_servers,
client_id=client_id,
consumer_group=consumer_group,
topic=record.topic,
partition=record.partition,
key=record.key,
offset=record.offset,
)
consume_hook.assert_awaited_once_with(
span, record, self.args, self.kwargs
Expand Down

0 comments on commit 1e7af5e

Please sign in to comment.