Skip to content

Commit

Permalink
KAFKA-17248: Add reporter for adding thread metrics to telemetry pipe…
Browse files Browse the repository at this point in the history
…line and a test [2/N] (#17376)

This PR adds a Reporter instance that will add streams thread metrics to the telemetry pipeline.
For testing, the PR adds a unit test.

Reviewers: Matthias Sax <[email protected]>
  • Loading branch information
bbejeck authored Oct 5, 2024
1 parent 3bb408c commit 930f165
Show file tree
Hide file tree
Showing 7 changed files with 268 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.metrics.KafkaMetric;

import java.io.Closeable;
import java.time.Duration;
Expand Down Expand Up @@ -122,6 +123,15 @@ public interface Consumer<K, V> extends Closeable {
*/
void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback);

/**
* @see KafkaConsumer#registerMetricForSubscription(KafkaMetric)
*/
void registerMetricForSubscription(KafkaMetric metric);

/**
* @see KafkaConsumer#unregisterMetricFromSubscription(KafkaMetric)
*/
void unregisterMetricFromSubscription(KafkaMetric metric);
/**
* @see KafkaConsumer#seek(TopicPartition, long)
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.utils.LogContext;
Expand Down Expand Up @@ -1744,4 +1745,14 @@ KafkaConsumerMetrics kafkaConsumerMetrics() {
boolean updateAssignmentMetadataIfNeeded(final Timer timer) {
return delegate.updateAssignmentMetadataIfNeeded(timer);
}

@Override
public void registerMetricForSubscription(KafkaMetric metric) {
throw new UnsupportedOperationException("not implemented");
}

@Override
public void unregisterMetricFromSubscription(KafkaMetric metric) {
throw new UnsupportedOperationException("not implemented");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.utils.LogContext;

import java.time.Duration;
Expand Down Expand Up @@ -76,6 +77,8 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
private Uuid clientInstanceId;
private int injectTimeoutExceptionCounter;

private final List<KafkaMetric> addedMetrics = new ArrayList<>();

public MockConsumer(OffsetResetStrategy offsetResetStrategy) {
this.subscriptions = new SubscriptionState(new LogContext(), offsetResetStrategy);
this.partitions = new HashMap<>();
Expand Down Expand Up @@ -176,6 +179,16 @@ private synchronized void subscribe(Pattern pattern, Optional<ConsumerRebalanceL
subscriptions.assignFromSubscribed(assignedPartitions);
}

@Override
public void registerMetricForSubscription(KafkaMetric metric) {
addedMetrics.add(metric);
}

@Override
public void unregisterMetricFromSubscription(KafkaMetric metric) {
addedMetrics.remove(metric);
}

@Override
public synchronized void assign(Collection<TopicPartition> partitions) {
ensureNotClosed();
Expand Down Expand Up @@ -632,4 +645,8 @@ public void resetShouldRebalance() {
public Duration lastPollTimeout() {
return lastPollTimeout;
}

public List<KafkaMetric> addedMetrics() {
return Collections.unmodifiableList(addedMetrics);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.common.requests.JoinGroupRequest;
Expand Down Expand Up @@ -642,6 +643,16 @@ private void updateGroupMetadata(final Optional<Integer> memberEpoch, final Opti
);
}

@Override
public void registerMetricForSubscription(KafkaMetric metric) {
throw new UnsupportedOperationException("not implemented");
}

@Override
public void unregisterMetricFromSubscription(KafkaMetric metric) {
throw new UnsupportedOperationException("not implemented");
}

/**
* poll implementation using {@link ApplicationEventHandler}.
* 1. Poll for background events. If there's a fetch response event, process the record and return it. If it is
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.apache.kafka.common.errors.InvalidGroupIdException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.common.serialization.Deserializer;
Expand Down Expand Up @@ -426,6 +427,17 @@ public void subscribe(Collection<String> topics, ConsumerRebalanceListener liste
subscribeInternal(topics, Optional.of(listener));
}


@Override
public void registerMetricForSubscription(KafkaMetric metric) {
throw new UnsupportedOperationException("not implemented");
}

@Override
public void unregisterMetricFromSubscription(KafkaMetric metric) {
throw new UnsupportedOperationException("not implemented");
}

@Override
public void subscribe(Collection<String> topics) {
subscribeInternal(topics, Optional.empty());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.streams.internals.metrics;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.MetricsReporter;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.Map;
import java.util.Objects;

public class StreamsThreadMetricsDelegatingReporter implements MetricsReporter {

private static final Logger log = LoggerFactory.getLogger(StreamsThreadMetricsDelegatingReporter.class);
private static final String THREAD_ID_TAG = "thread-id";
private final Consumer<byte[], byte[]> consumer;
private final String threadId;
private final String stateUpdaterThreadId;


public StreamsThreadMetricsDelegatingReporter(final Consumer<byte[], byte[]> consumer, final String threadId, final String stateUpdaterThreadId) {
this.consumer = Objects.requireNonNull(consumer);
this.threadId = Objects.requireNonNull(threadId);
this.stateUpdaterThreadId = Objects.requireNonNull(stateUpdaterThreadId);
log.debug("Creating MetricsReporter for threadId {} and stateUpdaterId {}", threadId, stateUpdaterThreadId);
}

@Override
public void init(final List<KafkaMetric> metrics) {
metrics.forEach(this::metricChange);
}

@Override
public void metricChange(final KafkaMetric metric) {
if (tagMatchStreamOrStateUpdaterThreadId(metric)) {
log.debug("Registering metric {}", metric.metricName());
consumer.registerMetricForSubscription(metric);
}
}

private boolean tagMatchStreamOrStateUpdaterThreadId(final KafkaMetric metric) {
final Map<String, String> tags = metric.metricName().tags();
final boolean shouldInclude = tags.containsKey(THREAD_ID_TAG) && (tags.get(THREAD_ID_TAG).equals(threadId) || tags.get(THREAD_ID_TAG).equals(stateUpdaterThreadId));
if (!shouldInclude) {
log.trace("Rejecting metric {}", metric.metricName());
}
return shouldInclude;
}

@Override
public void metricRemoval(final KafkaMetric metric) {
if (tagMatchStreamOrStateUpdaterThreadId(metric)) {
log.debug("Unregistering metric {}", metric.metricName());
consumer.unregisterMetricFromSubscription(metric);
}
}

@Override
public void close() {
// No op
}

@Override
public void configure(final Map<String, ?> configs) {
// No op
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.streams.internals.metrics;

import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Measurable;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.utils.Time;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;

import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static org.junit.jupiter.api.Assertions.assertEquals;

class StreamsThreadMetricsDelegatingReporterTest {

private MockConsumer<byte[], byte[]> mockConsumer;
private StreamsThreadMetricsDelegatingReporter streamsThreadMetricsDelegatingReporter;

private KafkaMetric kafkaMetricOneHasThreadIdTag;
private KafkaMetric kafkaMetricTwoHasThreadIdTag;
private KafkaMetric kafkaMetricThreeHasThreadIdTag;
private KafkaMetric kafkaMetricWithoutThreadIdTag;
private final Object lock = new Object();
private final MetricConfig metricConfig = new MetricConfig();


@BeforeEach
public void setUp() {
final Map<String, String> threadIdTagMap = new HashMap<>();
final String threadId = "abcxyz-StreamThread-1";
threadIdTagMap.put("thread-id", threadId);

final Map<String, String> threadIdWithStateUpdaterTagMap = new HashMap<>();
final String stateUpdaterId = "deftuv-StateUpdater-1";
threadIdWithStateUpdaterTagMap.put("thread-id", stateUpdaterId);

final Map<String, String> noThreadIdTagMap = new HashMap<>();
noThreadIdTagMap.put("client-id", "foo");

mockConsumer = new MockConsumer<>(OffsetResetStrategy.NONE);
streamsThreadMetricsDelegatingReporter = new StreamsThreadMetricsDelegatingReporter(mockConsumer, threadId, stateUpdaterId);

final MetricName metricNameOne = new MetricName("metric-one", "test-group-one", "foo bar baz", threadIdTagMap);
final MetricName metricNameTwo = new MetricName("metric-two", "test-group-two", "description two", threadIdWithStateUpdaterTagMap);
final MetricName metricNameThree = new MetricName("metric-three", "test-group-three", "description three", threadIdTagMap);
final MetricName metricNameFour = new MetricName("metric-four", "test-group-three", "description three", noThreadIdTagMap);

kafkaMetricOneHasThreadIdTag = new KafkaMetric(lock, metricNameOne, (Measurable) (m, now) -> 1.0, metricConfig, Time.SYSTEM);
kafkaMetricTwoHasThreadIdTag = new KafkaMetric(lock, metricNameTwo, (Measurable) (m, now) -> 2.0, metricConfig, Time.SYSTEM);
kafkaMetricThreeHasThreadIdTag = new KafkaMetric(lock, metricNameThree, (Measurable) (m, now) -> 3.0, metricConfig, Time.SYSTEM);
kafkaMetricWithoutThreadIdTag = new KafkaMetric(lock, metricNameFour, (Measurable) (m, now) -> 4.0, metricConfig, Time.SYSTEM);
}

@AfterEach
public void tearDown() {
mockConsumer.close();
}


@Test
@DisplayName("Init method should register metrics it receives as parameters")
public void shouldInitMetrics() {
final List<KafkaMetric> allMetrics = Arrays.asList(kafkaMetricOneHasThreadIdTag, kafkaMetricTwoHasThreadIdTag, kafkaMetricThreeHasThreadIdTag);
final List<KafkaMetric> expectedMetrics = Arrays.asList(kafkaMetricOneHasThreadIdTag, kafkaMetricTwoHasThreadIdTag, kafkaMetricThreeHasThreadIdTag);
streamsThreadMetricsDelegatingReporter.init(allMetrics);
assertEquals(expectedMetrics, mockConsumer.addedMetrics());
}

@Test
@DisplayName("Should register metrics with thread-id in tag map")
public void shouldRegisterMetrics() {
streamsThreadMetricsDelegatingReporter.metricChange(kafkaMetricOneHasThreadIdTag);
assertEquals(kafkaMetricOneHasThreadIdTag, mockConsumer.addedMetrics().get(0));
}

@Test
@DisplayName("Should remove metrics")
public void shouldRemoveMetrics() {
streamsThreadMetricsDelegatingReporter.metricChange(kafkaMetricOneHasThreadIdTag);
streamsThreadMetricsDelegatingReporter.metricChange(kafkaMetricTwoHasThreadIdTag);
streamsThreadMetricsDelegatingReporter.metricChange(kafkaMetricThreeHasThreadIdTag);
List<KafkaMetric> expected = Arrays.asList(kafkaMetricOneHasThreadIdTag, kafkaMetricTwoHasThreadIdTag, kafkaMetricThreeHasThreadIdTag);
assertEquals(expected, mockConsumer.addedMetrics());
streamsThreadMetricsDelegatingReporter.metricRemoval(kafkaMetricOneHasThreadIdTag);
expected = Arrays.asList(kafkaMetricTwoHasThreadIdTag, kafkaMetricThreeHasThreadIdTag);
assertEquals(expected, mockConsumer.addedMetrics());
}

@Test
@DisplayName("Should not register metrics without thread-id tag")
public void shouldNotRegisterMetricsWithoutThreadIdTag() {
streamsThreadMetricsDelegatingReporter.metricChange(kafkaMetricWithoutThreadIdTag);
assertEquals(0, mockConsumer.addedMetrics().size());
}
}

0 comments on commit 930f165

Please sign in to comment.