diff --git a/instrumentation/kamon-kafka/src/main/java/kamon/instrumentation/kafka/client/advisor/PollMethodAdvisor_3_7_0_and_up_Async.java b/instrumentation/kamon-kafka/src/main/java/kamon/instrumentation/kafka/client/advisor/PollMethodAdvisor_3_7_0_and_up_Async.java new file mode 100644 index 000000000..04285cab6 --- /dev/null +++ b/instrumentation/kamon-kafka/src/main/java/kamon/instrumentation/kafka/client/advisor/PollMethodAdvisor_3_7_0_and_up_Async.java @@ -0,0 +1,46 @@ +/* + * Copyright 2013-2020 The Kamon Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kamon.instrumentation.kafka.client.advisor; + +import kamon.Kamon; +import kamon.instrumentation.kafka.client.RecordProcessor; +import kanela.agent.libs.net.bytebuddy.asm.Advice; +import org.apache.kafka.clients.consumer.ConsumerGroupMetadata; +import org.apache.kafka.clients.consumer.ConsumerRecords; + +import java.time.Instant; +import java.util.Optional; + +/** + * Consumer Instrumentation + */ +public class PollMethodAdvisor_3_7_0_and_up_Async { + @Advice.OnMethodEnter(suppress = Throwable.class) + public static void onEnter(@Advice.Local("startTime") Instant startTime) { + startTime = Kamon.clock().instant(); + } + + @Advice.OnMethodExit(suppress = Throwable.class) + public static void onExit( + @Advice.Local("startTime") Instant startTime, + @Advice.FieldValue(value = "groupMetadata") Optional groupMetadata, + @Advice.FieldValue("clientId") String clientId, + @Advice.Return(readOnly = false) ConsumerRecords records) { + + records = RecordProcessor.process(startTime, clientId, groupMetadata, records); + } +} \ No newline at end of file diff --git a/instrumentation/kamon-kafka/src/main/java/kamon/instrumentation/kafka/client/advisor/PollMethodAdvisor_3_7_0_and_up_Legacy.java b/instrumentation/kamon-kafka/src/main/java/kamon/instrumentation/kafka/client/advisor/PollMethodAdvisor_3_7_0_and_up_Legacy.java new file mode 100644 index 000000000..3bfc22aca --- /dev/null +++ b/instrumentation/kamon-kafka/src/main/java/kamon/instrumentation/kafka/client/advisor/PollMethodAdvisor_3_7_0_and_up_Legacy.java @@ -0,0 +1,47 @@ +/* + * Copyright 2013-2020 The Kamon Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kamon.instrumentation.kafka.client.advisor; + +import kamon.Kamon; +import kamon.instrumentation.kafka.client.RecordProcessor; +import kanela.agent.libs.net.bytebuddy.asm.Advice; +import org.apache.kafka.clients.consumer.ConsumerGroupMetadata; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.internals.ConsumerCoordinator; + +import java.time.Instant; +import java.util.Optional; + +/** + * Consumer Instrumentation + */ +public class PollMethodAdvisor_3_7_0_and_up_Legacy { + @Advice.OnMethodEnter(suppress = Throwable.class) + public static void onEnter(@Advice.Local("startTime") Instant startTime) { + startTime = Kamon.clock().instant(); + } + + @Advice.OnMethodExit(suppress = Throwable.class) + public static void onExit( + @Advice.Local("startTime") Instant startTime, + @Advice.FieldValue(value = "coordinator") ConsumerCoordinator coordinator, + @Advice.FieldValue("clientId") String clientId, + @Advice.Return(readOnly = false) ConsumerRecords records) { + + records = RecordProcessor.process(startTime, clientId, coordinator, records); + } +} \ No newline at end of file diff --git a/instrumentation/kamon-kafka/src/main/scala/kamon/instrumentation/kafka/client/ConsumerInstrumentation.scala b/instrumentation/kamon-kafka/src/main/scala/kamon/instrumentation/kafka/client/ConsumerInstrumentation.scala index 4aa497018..b34ecc443 100644 --- a/instrumentation/kamon-kafka/src/main/scala/kamon/instrumentation/kafka/client/ConsumerInstrumentation.scala +++ b/instrumentation/kamon-kafka/src/main/scala/kamon/instrumentation/kafka/client/ConsumerInstrumentation.scala @@ -17,11 +17,15 @@ package kamon.instrumentation.kafka.client import java.time.{Duration, Instant} - import kamon.context.Context import kamon.instrumentation.kafka.client.ConsumedRecordData.ConsumerInfo -import kamon.instrumentation.kafka.client.advisor.PollMethodAdvisor +import kamon.instrumentation.kafka.client.advisor.{ + PollMethodAdvisor, + PollMethodAdvisor_3_7_0_and_up_Async, + PollMethodAdvisor_3_7_0_and_up_Legacy +} import kanela.agent.api.instrumentation.InstrumentationBuilder +import kanela.agent.libs.net.bytebuddy.matcher.ElementMatchers.{declaresField, hasType, named} class ConsumerInstrumentation extends InstrumentationBuilder { @@ -29,16 +33,34 @@ class ConsumerInstrumentation extends InstrumentationBuilder { * Instruments org.apache.kafka.clients.consumer.KafkaConsumer::poll(Long) * Kafka version < 2.3 */ - onType("org.apache.kafka.clients.consumer.KafkaConsumer") + onTypesMatching(named("org.apache.kafka.clients.consumer.KafkaConsumer").and(declaresField(named("groupId")))) .advise(method("poll").and(withArgument(0, classOf[Long])), classOf[PollMethodAdvisor]) /** * Instruments org.apache.kafka.clients.consumer.KafkaConsumer::poll(Duration) - * Kafka version >= 2.3 + * Kafka version >= 2.3 < 3.7 */ - onType("org.apache.kafka.clients.consumer.KafkaConsumer") + onTypesMatching(named("org.apache.kafka.clients.consumer.KafkaConsumer").and(declaresField(named("groupId")))) .advise(method("poll").and(withArgument(0, classOf[Duration])), classOf[PollMethodAdvisor]) + /** + * Instruments org.apache.kafka.clients.consumer.KafkaConsumer::poll(Duration) + * Kafka version >= 3.7 + */ + onTypesMatching( + named("org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer").and(declaresField(named("groupMetadata"))) + ) + .advise(method("poll").and(withArgument(0, classOf[Duration])), classOf[PollMethodAdvisor_3_7_0_and_up_Async]) + + /** + * Instruments org.apache.kafka.clients.consumer.KafkaConsumer::poll(Duration) + * Kafka version >= 3.7 + */ + onTypesMatching( + named("org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer").and(declaresField(named("coordinator"))) + ) + .advise(method("poll").and(withArgument(0, classOf[Duration])), classOf[PollMethodAdvisor_3_7_0_and_up_Legacy]) + /** * Instruments org.apache.kafka.clients.consumer.ConsumerRecord with the HasSpan mixin in order * to make the span available as parent for down stream operations diff --git a/instrumentation/kamon-kafka/src/main/scala/kamon/instrumentation/kafka/client/RecordProcessor.scala b/instrumentation/kamon-kafka/src/main/scala/kamon/instrumentation/kafka/client/RecordProcessor.scala index c4e023554..b84c2cafe 100644 --- a/instrumentation/kamon-kafka/src/main/scala/kamon/instrumentation/kafka/client/RecordProcessor.scala +++ b/instrumentation/kamon-kafka/src/main/scala/kamon/instrumentation/kafka/client/RecordProcessor.scala @@ -18,11 +18,11 @@ package kamon.instrumentation.kafka.client import java.time.Instant import java.util.Optional - import kamon.Kamon import kamon.context.Context import kamon.instrumentation.kafka.client.ConsumedRecordData.ConsumerInfo -import org.apache.kafka.clients.consumer.ConsumerRecords +import org.apache.kafka.clients.consumer.internals.ConsumerCoordinator +import org.apache.kafka.clients.consumer.{ConsumerGroupMetadata, ConsumerRecords} private[kafka] object RecordProcessor { @@ -61,8 +61,14 @@ private[kafka] object RecordProcessor { * KafkaConsumer which versions < 2.5 relies on internal groupId: String and higher versions in Optional[String]. */ private def resolve(groupId: AnyRef): Option[String] = groupId match { - case opt: Optional[String] => if (opt.isPresent) Some(opt.get()) else None - case value: String => Option(value) - case _ => None + case opt: Optional[_] => + if (opt.isPresent) opt.get() match { + case s: String => Some(s) + case meta: ConsumerGroupMetadata => Some(meta.groupId()) + } + else None + case value: String => Option(value) + case coord: ConsumerCoordinator => Some(coord.groupMetadata().groupId()) + case _ => None } }