Skip to content

Commit 90afffb

Browse files
committed
GH-3062: Fix KafkaBinderMetrics for resource leaks
Fixes: #3062 The `KafkaBinderMetrics` creates `KafkaConsumer` instances and schedule the fix rate task for them, but never closes them even when the `scheduler` is shut downed * Implement a `Lifecycle` contract in the `KafkaBinderMetrics` and call `close()` from the `stop()` to satisfy CRaC resource management expectations. * Also close all the `KafkaConsumer` instances from the `metadataConsumers` **Cherry-pick to `4.1.x`**
1 parent af6b4ff commit 90afffb

File tree

2 files changed

+58
-13
lines changed

2 files changed

+58
-13
lines changed

binders/kafka-binder/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderMetrics.java

Lines changed: 53 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2024 the original author or authors.
2+
* Copyright 2016-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -20,16 +20,15 @@
2020
import java.util.LinkedList;
2121
import java.util.List;
2222
import java.util.Map;
23-
import java.util.Optional;
2423
import java.util.concurrent.ConcurrentHashMap;
2524
import java.util.concurrent.ExecutionException;
26-
import java.util.concurrent.ExecutorService;
2725
import java.util.concurrent.Executors;
2826
import java.util.concurrent.Future;
2927
import java.util.concurrent.ScheduledExecutorService;
3028
import java.util.concurrent.ScheduledThreadPoolExecutor;
3129
import java.util.concurrent.TimeUnit;
3230
import java.util.concurrent.TimeoutException;
31+
import java.util.concurrent.atomic.AtomicBoolean;
3332
import java.util.concurrent.locks.ReentrantLock;
3433
import java.util.function.ToDoubleFunction;
3534

@@ -50,10 +49,12 @@
5049
import org.springframework.cloud.stream.binder.kafka.common.TopicInformation;
5150
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
5251
import org.springframework.context.ApplicationListener;
52+
import org.springframework.context.Lifecycle;
5353
import org.springframework.kafka.core.ConsumerFactory;
5454
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
5555
import org.springframework.lang.Nullable;
5656
import org.springframework.util.ObjectUtils;
57+
import org.springframework.util.ReflectionUtils;
5758

5859
/**
5960
* Metrics for Kafka binder.
@@ -72,7 +73,7 @@
7273
* @author Omer Celik
7374
*/
7475
public class KafkaBinderMetrics
75-
implements MeterBinder, ApplicationListener<BindingCreatedEvent>, AutoCloseable {
76+
implements MeterBinder, ApplicationListener<BindingCreatedEvent>, AutoCloseable, Lifecycle {
7677

7778
private static final int DEFAULT_TIMEOUT = 5;
7879

@@ -101,6 +102,8 @@ public class KafkaBinderMetrics
101102

102103
private final ReentrantLock consumerFactoryLock = new ReentrantLock();
103104

105+
private final AtomicBoolean running = new AtomicBoolean();
106+
104107
public KafkaBinderMetrics(KafkaMessageChannelBinder binder,
105108
KafkaBinderConfigurationProperties binderConfigurationProperties,
106109
ConsumerFactory<?, ?> defaultConsumerFactory,
@@ -125,14 +128,14 @@ public void setTimeout(int timeout) {
125128

126129
@Override
127130
public void bindTo(MeterRegistry registry) {
128-
/**
131+
/*
129132
* We can't just replace one scheduler with another.
130133
* Before and even after the old one is gathered by GC, it's threads still exist, consume memory and CPU resources to switch contexts.
131134
* Theoretically, as a result of processing n topics, there will be about (1+n)*n/2 threads simultaneously at the same time.
132135
*/
133136
if (this.scheduler != null) {
134137
LOG.info("Try to shutdown the old scheduler with " + ((ScheduledThreadPoolExecutor) scheduler).getPoolSize() + " threads");
135-
this.scheduler.shutdown();
138+
this.scheduler.shutdownNow();
136139
}
137140

138141
this.scheduler = Executors.newScheduledThreadPool(this.binder.getTopicsInUse().size());
@@ -278,10 +281,50 @@ public void onApplicationEvent(BindingCreatedEvent event) {
278281
}
279282

280283
@Override
281-
public void close() throws Exception {
282-
if (this.meterRegistry != null) {
283-
this.meterRegistry.find(OFFSET_LAG_METRIC_NAME).meters().forEach(this.meterRegistry::remove);
284+
public void close() {
285+
if (this.scheduler != null) {
286+
this.consumerFactoryLock.lock();
287+
try {
288+
if (this.meterRegistry != null) {
289+
this.meterRegistry.find(OFFSET_LAG_METRIC_NAME).meters().forEach(this.meterRegistry::remove);
290+
}
291+
this.scheduler.shutdownNow();
292+
try {
293+
this.scheduler.awaitTermination(
294+
binderConfigurationProperties.getMetrics().getOffsetLagMetricsInterval().toSeconds(),
295+
TimeUnit.SECONDS);
296+
}
297+
catch (InterruptedException ex) {
298+
Thread.currentThread().interrupt();
299+
ReflectionUtils.rethrowRuntimeException(ex);
300+
}
301+
}
302+
finally {
303+
this.scheduler = null;
304+
this.metadataConsumers.values().forEach(Consumer::close);
305+
this.metadataConsumers.clear();
306+
this.consumerFactoryLock.unlock();
307+
}
284308
}
285-
Optional.ofNullable(scheduler).ifPresent(ExecutorService::shutdown);
286309
}
310+
311+
@Override
312+
public void start() {
313+
this.running.set(true);
314+
// Nothing else to do here. The 'bindTo()' is called from the 'onApplicationEvent()',
315+
// which, in turn, is emitted from the 'AbstractBindingLifecycle.start()' logic.
316+
}
317+
318+
@Override
319+
public void stop() {
320+
if (this.running.compareAndSet(true, false)) {
321+
close();
322+
}
323+
}
324+
325+
@Override
326+
public boolean isRunning() {
327+
return this.running.get();
328+
}
329+
287330
}

binders/kafka-binder/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderMetricsTest.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2024 the original author or authors.
2+
* Copyright 2016-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -57,6 +57,7 @@
5757
* @author Tomek Szmytka
5858
* @author Nico Heller
5959
* @author Kurt Hong
60+
* @author Artem Bilan
6061
*/
6162
class KafkaBinderMetricsTest {
6263

@@ -346,10 +347,11 @@ public void usesBeginningOffsetIfNoCommittedOffsetFound() {
346347
}
347348

348349
@Test
349-
public void shouldShutdownSchedulerOnClose() throws Exception {
350+
public void shouldShutdownSchedulerOnClose() {
350351
metrics.bindTo(meterRegistry);
352+
assertThat(metrics.scheduler).isNotNull();
351353
metrics.close();
352-
assertThat(metrics.scheduler.isShutdown()).isTrue();
354+
assertThat(metrics.scheduler).isNull();
353355
}
354356

355357
@Test

0 commit comments

Comments
 (0)