Skip to content

Commit

Permalink
Fix Kafka lag instrumentation for version 2.7+ of Kafka
Browse files Browse the repository at this point in the history
  • Loading branch information
piochelepiotr committed Nov 12, 2024
1 parent cb6a6fc commit 31cd5c9
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,28 @@ public void methodAdvice(MethodTransformer transformer) {
.and(takesArgument(2, named("org.apache.kafka.common.serialization.Deserializer"))),
KafkaConsumerInfoInstrumentation.class.getName() + "$ConstructorAdvice");

transformer.applyAdvice(
isConstructor()
.and(takesArgument(0, Map.class))
.and(takesArgument(1, named("org.apache.kafka.common.serialization.Deserializer")))
.and(takesArgument(2, named("org.apache.kafka.common.serialization.Deserializer"))),
KafkaConsumerInfoInstrumentation.class.getName() + "$SecondConstructorAdvice");

transformer.applyAdvice(
isMethod()
.and(isPublic())
.and(named("poll"))
.and(takesArguments(1))
.and(takesArgument(0, long.class))
.and(returns(named("org.apache.kafka.clients.consumer.ConsumerRecords"))),
KafkaConsumerInfoInstrumentation.class.getName() + "$RecordsAdvice");

transformer.applyAdvice(
isMethod()
.and(isPublic())
.and(named("poll"))
.and(takesArguments(1))
.and(takesArgument(0, named("java.time.Duration")))
.and(returns(named("org.apache.kafka.clients.consumer.ConsumerRecords"))),
KafkaConsumerInfoInstrumentation.class.getName() + "$RecordsAdvice");
}
Expand All @@ -94,6 +111,7 @@ public static void captureGroup(
@Advice.FieldValue("metadata") Metadata metadata,
@Advice.FieldValue("coordinator") ConsumerCoordinator coordinator,
@Advice.Argument(0) ConsumerConfig consumerConfig) {
System.out.println("constructor advice!!!");
String consumerGroup = consumerConfig.getString(ConsumerConfig.GROUP_ID_CONFIG);
String normalizedConsumerGroup =
consumerGroup != null && !consumerGroup.isEmpty() ? consumerGroup : null;
Expand Down Expand Up @@ -131,6 +149,66 @@ public static void muzzleCheck(ConsumerRecord record) {
}
}

public static class SecondConstructorAdvice {
@Advice.OnMethodExit(suppress = Throwable.class)
public static void captureGroup(
@Advice.This KafkaConsumer consumer,
@Advice.FieldValue("metadata") Metadata metadata,
@Advice.FieldValue("coordinator") ConsumerCoordinator coordinator,
@Advice.Argument(0) Map<String, Object> consumerConfig) {
System.out.println("new constructor advice!!!");
Object groupID = consumerConfig.get("group.id");
String consumerGroup = groupID instanceof String ? (String) groupID : null;
String normalizedConsumerGroup =
consumerGroup != null && !consumerGroup.isEmpty() ? consumerGroup : null;
System.out.println("consume group " + normalizedConsumerGroup);

String bootstrapServers = null;
Object bootstrapServersObj = consumerConfig.get("bootstrap.servers");
if (bootstrapServersObj instanceof String) {
bootstrapServers = (String) bootstrapServersObj;
System.out.println("bootstrap servers " + bootstrapServers);
} else {
System.out.println("it s not a string");
}
// if (bootstrapServersList != null && !bootstrapServersList.isEmpty()) {
// bootstrapServers = String.join(",", bootstrapServersList);
// }
// Object bootstrapServersObj = consumerConfig.get("bootstrap.servers");
// if (bootstrapServersObj instanceof List) {
// List<?> tempList = (List<?>) bootstrapServersObj;

// // Verify each element is a String
// if (!tempList.isEmpty() && tempList.stream().allMatch(element -> element instanceof String)) {
// bootstrapServers = String.join(",", (List<String>) tempList);
// }
// }
KafkaConsumerInfo kafkaConsumerInfo;
if (Config.get().isDataStreamsEnabled()) {
kafkaConsumerInfo =
new KafkaConsumerInfo(normalizedConsumerGroup, metadata, bootstrapServers);
} else {
kafkaConsumerInfo = new KafkaConsumerInfo(normalizedConsumerGroup, bootstrapServers);
}

if (kafkaConsumerInfo.getConsumerGroup() != null
|| kafkaConsumerInfo.getClientMetadata() != null) {
InstrumentationContext.get(KafkaConsumer.class, KafkaConsumerInfo.class)
.put(consumer, kafkaConsumerInfo);
if (coordinator != null) {
InstrumentationContext.get(ConsumerCoordinator.class, KafkaConsumerInfo.class)
.put(coordinator, kafkaConsumerInfo);
}
}
}

public static void muzzleCheck(ConsumerRecord record) {
// KafkaConsumerInstrumentation only applies for kafka versions with headers
// Make an explicit call so KafkaConsumerGroupInstrumentation does the same
record.headers();
}
}

/**
* this method transfers the consumer group from the KafkaConsumer class key to the
* ConsumerRecords key. This is necessary because in the poll method, we don't have access to the
Expand All @@ -157,6 +235,7 @@ public static void captureGroup(
@Advice.Enter final AgentScope scope,
@Advice.This KafkaConsumer consumer,
@Advice.Return ConsumerRecords records) {
System.out.println("polling!");
int recordsCount = 0;
if (records != null) {
KafkaConsumerInfo kafkaConsumerInfo =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,9 +142,11 @@ public static void wrap(
KafkaConsumerInfo kafkaConsumerInfo =
InstrumentationContext.get(ConsumerRecords.class, KafkaConsumerInfo.class).get(records);
String group = KafkaConsumerInstrumentationHelper.extractGroup(kafkaConsumerInfo);
System.out.println("consumer info retrieved" + group);
String clusterId =
KafkaConsumerInstrumentationHelper.extractClusterId(
kafkaConsumerInfo, InstrumentationContext.get(Metadata.class, String.class));
System.out.println("cluster ID retrieved" + clusterId);
String bootstrapServers =
KafkaConsumerInstrumentationHelper.extractBootstrapServers(kafkaConsumerInfo);
iterator =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ public static class MetadataUpdateBefore22Advice {
public static void onEnter(
@Advice.This final Metadata metadata, @Advice.Argument(0) final Cluster newCluster) {
if (newCluster != null && !newCluster.isBootstrapConfigured()) {
System.out.println("getting cluster ID before 22: " + newCluster.clusterResource().clusterId());
InstrumentationContext.get(Metadata.class, String.class)
.put(metadata, newCluster.clusterResource().clusterId());
}
Expand All @@ -83,6 +84,7 @@ public static class MetadataUpdate22AndAfterAdvice {
public static void onEnter(
@Advice.This final Metadata metadata, @Advice.Argument(1) final MetadataResponse response) {
if (response != null) {
System.out.println("getting cluster ID after 22: " + response.clusterId());
InstrumentationContext.get(Metadata.class, String.class)
.put(metadata, response.clusterId());
}
Expand Down

0 comments on commit 31cd5c9

Please sign in to comment.