Skip to content

Commit

Permalink
fix review (remove AIOKafkaPropertiesExtractor)
Browse files Browse the repository at this point in the history
  • Loading branch information
dimastbk committed Aug 31, 2024
1 parent dfa84a1 commit 965ca43
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 105 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,92 +15,76 @@
_LOG = getLogger(__name__)


class AIOKafkaPropertiesExtractor:
@staticmethod
def extract_bootstrap_servers(
client: aiokafka.AIOKafkaClient,
) -> Union[str, List[str]]:
return client._bootstrap_servers

@staticmethod
def _extract_argument(
key: str,
position: int,
default_value: Any,
args: Tuple[Any],
kwargs: Dict[str, Any],
) -> Any:
if len(args) > position:
return args[position]
return kwargs.get(key, default_value)

@staticmethod
def extract_send_topic(args: Tuple[Any], kwargs: Dict[str, Any]) -> str:
"""extract topic from `send` method arguments in AIOKafkaProducer class"""
return AIOKafkaPropertiesExtractor._extract_argument(
"topic", 0, "unknown", args, kwargs
)
def _extract_bootstrap_servers(
client: aiokafka.AIOKafkaClient,
) -> Union[str, List[str]]:
return client._bootstrap_servers

@staticmethod
def extract_send_value(
args: Tuple[Any], kwargs: Dict[str, Any]
) -> Optional[Any]:
"""extract value from `send` method arguments in AIOKafkaProducer class"""
return AIOKafkaPropertiesExtractor._extract_argument(
"value", 1, None, args, kwargs
)

@staticmethod
def extract_send_key(
args: Tuple[Any], kwargs: Dict[str, Any]
) -> Optional[Any]:
"""extract key from `send` method arguments in AIOKafkaProducer class"""
return AIOKafkaPropertiesExtractor._extract_argument(
"key", 2, None, args, kwargs
)
def _extract_argument(
key: str,
position: int,
default_value: Any,
args: Tuple[Any],
kwargs: Dict[str, Any],
) -> Any:
if len(args) > position:
return args[position]
return kwargs.get(key, default_value)

@staticmethod
def extract_send_headers(args: Tuple[Any], kwargs: Dict[str, Any]):
"""extract headers from `send` method arguments in AIOKafkaProducer class"""
return AIOKafkaPropertiesExtractor._extract_argument(
"headers", 5, None, args, kwargs
)

@staticmethod
async def extract_send_partition(
instance: aiokafka.AIOKafkaProducer,
args: Tuple[Any],
kwargs: Dict[str, Any],
) -> Optional[int]:
"""extract partition `send` method arguments, using the `_partition` method in AIOKafkaProducer class"""
try:
topic = AIOKafkaPropertiesExtractor.extract_send_topic(
args, kwargs
)
key = AIOKafkaPropertiesExtractor.extract_send_key(args, kwargs)
value = AIOKafkaPropertiesExtractor.extract_send_value(
args, kwargs
)
partition = AIOKafkaPropertiesExtractor._extract_argument(
"partition", 3, None, args, kwargs
)
key_bytes, value_bytes = instance._serialize(topic, key, value)
valid_types = (bytes, bytearray, memoryview, type(None))
if (
type(key_bytes) not in valid_types
or type(value_bytes) not in valid_types
):
return None

await instance.client._wait_on_metadata(topic)

return instance._partition(
topic, partition, key, value, key_bytes, value_bytes
)
except Exception as exception: # pylint: disable=W0703
_LOG.debug("Unable to extract partition: %s", exception)
def _extract_send_topic(args: Tuple[Any], kwargs: Dict[str, Any]) -> str:
"""extract topic from `send` method arguments in AIOKafkaProducer class"""
return _extract_argument("topic", 0, "unknown", args, kwargs)


def _extract_send_value(
args: Tuple[Any], kwargs: Dict[str, Any]
) -> Optional[Any]:
"""extract value from `send` method arguments in AIOKafkaProducer class"""
return _extract_argument("value", 1, None, args, kwargs)


def _extract_send_key(
args: Tuple[Any], kwargs: Dict[str, Any]
) -> Optional[Any]:
"""extract key from `send` method arguments in AIOKafkaProducer class"""
return _extract_argument("key", 2, None, args, kwargs)


def _extract_send_headers(args: Tuple[Any], kwargs: Dict[str, Any]):
"""extract headers from `send` method arguments in AIOKafkaProducer class"""
return _extract_argument("headers", 5, None, args, kwargs)


async def _extract_send_partition(
instance: aiokafka.AIOKafkaProducer,
args: Tuple[Any],
kwargs: Dict[str, Any],
) -> Optional[int]:
"""extract partition `send` method arguments, using the `_partition` method in AIOKafkaProducer class"""
try:
topic = _extract_send_topic(args, kwargs)
key = _extract_send_key(args, kwargs)
value = _extract_send_value(args, kwargs)
partition = _extract_argument("partition", 3, None, args, kwargs)
key_bytes, value_bytes = instance._serialize(topic, key, value)
valid_types = (bytes, bytearray, memoryview, type(None))
if (
type(key_bytes) not in valid_types
or type(value_bytes) not in valid_types
):
return None

await instance.client._wait_on_metadata(topic)

return instance._partition(
topic, partition, key, value, key_bytes, value_bytes
)
except Exception as exception: # pylint: disable=W0703
_LOG.debug("Unable to extract partition: %s", exception)
return None


ProduceHookT = Optional[Callable[[Span, Tuple, Dict], Awaitable[None]]]
ConsumeHookT = Optional[
Expand Down Expand Up @@ -177,22 +161,14 @@ async def _traced_send(
args: Tuple[Any],
kwargs: Dict[str, Any],
) -> None:
headers = AIOKafkaPropertiesExtractor.extract_send_headers(
args, kwargs
)
headers = _extract_send_headers(args, kwargs)
if headers is None:
headers = []
kwargs["headers"] = headers

topic = AIOKafkaPropertiesExtractor.extract_send_topic(args, kwargs)
bootstrap_servers = (
AIOKafkaPropertiesExtractor.extract_bootstrap_servers(
instance.client
)
)
partition = await AIOKafkaPropertiesExtractor.extract_send_partition(
instance, args, kwargs
)
topic = _extract_send_topic(args, kwargs)
bootstrap_servers = _extract_bootstrap_servers(instance.client)
partition = await _extract_send_partition(instance, args, kwargs)
span_name = _get_span_name("send", topic)
with tracer.start_as_current_span(
span_name, kind=trace.SpanKind.PRODUCER
Expand Down Expand Up @@ -252,11 +228,7 @@ async def _traced_next(
record = await func(*args, **kwargs)

if record:
bootstrap_servers = (
AIOKafkaPropertiesExtractor.extract_bootstrap_servers(
instance._client
)
)
bootstrap_servers = _extract_bootstrap_servers(instance._client)

extracted_context = propagate.extract(
record.headers, getter=_aiokafka_getter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@
from opentelemetry.instrumentation.aiokafka.utils import (
AIOKafkaContextGetter,
AIOKafkaContextSetter,
AIOKafkaPropertiesExtractor,
_aiokafka_getter,
_aiokafka_setter,
_create_consumer_span,
_extract_send_partition,
_get_span_name,
_wrap_anext,
_wrap_send,
Expand Down Expand Up @@ -54,10 +54,10 @@ def test_context_getter(self) -> None:
self.assertEqual(["key1"], context_getter.keys(carrier_list))

@mock.patch(
"opentelemetry.instrumentation.aiokafka.utils.AIOKafkaPropertiesExtractor.extract_bootstrap_servers"
"opentelemetry.instrumentation.aiokafka.utils._extract_bootstrap_servers"
)
@mock.patch(
"opentelemetry.instrumentation.aiokafka.utils.AIOKafkaPropertiesExtractor.extract_send_partition"
"opentelemetry.instrumentation.aiokafka.utils._extract_send_partition"
)
@mock.patch("opentelemetry.instrumentation.aiokafka.utils._enrich_span")
@mock.patch("opentelemetry.trace.set_span_in_context")
Expand All @@ -79,10 +79,10 @@ async def test_wrap_send_with_topic_as_arg(
)

@mock.patch(
"opentelemetry.instrumentation.aiokafka.utils.AIOKafkaPropertiesExtractor.extract_bootstrap_servers"
"opentelemetry.instrumentation.aiokafka.utils._extract_bootstrap_servers"
)
@mock.patch(
"opentelemetry.instrumentation.aiokafka.utils.AIOKafkaPropertiesExtractor.extract_send_partition"
"opentelemetry.instrumentation.aiokafka.utils._extract_send_partition"
)
@mock.patch("opentelemetry.instrumentation.aiokafka.utils._enrich_span")
@mock.patch("opentelemetry.trace.set_span_in_context")
Expand Down Expand Up @@ -160,7 +160,7 @@ async def wrap_send_helper(
"opentelemetry.instrumentation.aiokafka.utils._create_consumer_span"
)
@mock.patch(
"opentelemetry.instrumentation.aiokafka.utils.AIOKafkaPropertiesExtractor.extract_bootstrap_servers"
"opentelemetry.instrumentation.aiokafka.utils._extract_bootstrap_servers"
)
async def test_wrap_next(
self,
Expand Down Expand Up @@ -255,7 +255,7 @@ async def test_kafka_properties_extractor(self):
aiokafka_instance_mock._partition.return_value = "partition"
aiokafka_instance_mock.client._wait_on_metadata = mock.AsyncMock()
assert (
await AIOKafkaPropertiesExtractor.extract_send_partition(
await _extract_send_partition(
aiokafka_instance_mock, self.args, self.kwargs
)
== "partition"
Expand All @@ -264,7 +264,7 @@ async def test_kafka_properties_extractor(self):
Exception("mocked error")
)
assert (
await AIOKafkaPropertiesExtractor.extract_send_partition(
await _extract_send_partition(
aiokafka_instance_mock, self.args, self.kwargs
)
is None
Expand Down

0 comments on commit 965ca43

Please sign in to comment.