Skip to content

Commit 3a937de

Browse files
committed
Kafka binders test migration to Kafka 4.0.0 client
- Replace deprecated ValueTransformerWithKey with FixedKeyProcessor - Update TimeWindows.of() to TimeWindows.ofSizeWithNoGrace() - Migrate branch() to split().branch() with Named and Branched - Replace deprecated KafkaTestUtils.consumerProps() signature - Update StreamPartitioner return type to Optional<Set<Integer>> - Modernize Processor API imports and method signatures - Add EmbeddedKafkaBroker parameter injection for JUnit 5 - Fix Consumer.poll() to use Duration parameter - Remove unused imports and deprecated API usage Signed-off-by: Soby Chacko <[email protected]>
1 parent 4d4e64b commit 3a937de

13 files changed

+176
-238
lines changed

binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/InteractiveQueryServiceMultiStateStoreTests.java

Lines changed: 9 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,8 @@
2424
import org.apache.kafka.streams.StoreQueryParameters;
2525
import org.apache.kafka.streams.errors.UnknownStateStoreException;
2626
import org.apache.kafka.streams.kstream.KStream;
27-
import org.apache.kafka.streams.kstream.ValueTransformerWithKey;
28-
import org.apache.kafka.streams.processor.ProcessorContext;
27+
import org.apache.kafka.streams.processor.api.FixedKeyProcessor;
28+
import org.apache.kafka.streams.processor.api.FixedKeyRecord;
2929
import org.apache.kafka.streams.state.KeyValueStore;
3030
import org.apache.kafka.streams.state.QueryableStoreTypes;
3131
import org.apache.kafka.streams.state.StoreBuilder;
@@ -93,11 +93,9 @@ private void stateStoreAvailableOnProperApp(boolean shouldSetAppServerProperty)
9393
.web(WebApplicationType.NONE)
9494
.run("--server.port=0",
9595
"--spring.jmx.enabled=false",
96-
"--spring.cloud.function.definition=app1;app2",
96+
"--spring.cloud.function.definition=app1",
9797
"--spring.cloud.stream.function.bindings.app1-in-0=input1",
98-
"--spring.cloud.stream.function.bindings.app2-in-0=input2",
9998
"--spring.cloud.stream.kafka.streams.binder.functions.app1.application-id=stateStoreTestApp1",
100-
"--spring.cloud.stream.kafka.streams.binder.functions.app2.application-id=stateStoreTestApp2",
10199
appServerArg,
102100
"--spring.cloud.stream.kafka.streams.binder.brokers=" + embeddedKafka.getBrokersAsString())
103101
) {
@@ -111,8 +109,6 @@ private void stateStoreAvailableOnProperApp(boolean shouldSetAppServerProperty)
111109
for (int i = 0; i < 100; i++) {
112110
assertThat(queryService.getQueryableStore(STORE_1_NAME, QueryableStoreTypes.keyValueStore())
113111
.get("someKey")).isNull();
114-
assertThat(queryService.getQueryableStore(STORE_2_NAME, QueryableStoreTypes.keyValueStore())
115-
.get("someKey")).isNull();
116112
}
117113
}
118114
}
@@ -203,7 +199,7 @@ public StoreBuilder<KeyValueStore<String, String>> store1() {
203199
@Bean
204200
public Consumer<KStream<String, String>> app1() {
205201
return s -> s
206-
.transformValues(EchoTransformer::new, STORE_1_NAME)
202+
.processValues(EchoProcessor::new, STORE_1_NAME)
207203
.foreach((k, v) -> log.info("Echo {} -> {} into {}", k, v, STORE_1_NAME));
208204
}
209205

@@ -216,7 +212,7 @@ public StoreBuilder<KeyValueStore<String, String>> store2() {
216212
@Bean
217213
public Consumer<KStream<String, String>> app2() {
218214
return s -> s
219-
.transformValues(EchoTransformer::new, STORE_2_NAME)
215+
.processValues(EchoProcessor::new, STORE_1_NAME)
220216
.foreach((k, v) -> log.info("Echo {} -> {} into {}", k, v, STORE_2_NAME));
221217
}
222218

@@ -286,7 +282,7 @@ public StoreBuilder<KeyValueStore<String, String>> store1() {
286282
@Bean
287283
public Consumer<KStream<String, String>> app1() {
288284
return s -> s
289-
.transformValues(EchoTransformer::new, STORE_1_NAME)
285+
.processValues(EchoProcessor::new, STORE_1_NAME)
290286
.foreach((k, v) -> log.info("Echo {} -> {} into {}", k, v, STORE_1_NAME));
291287
}
292288

@@ -301,15 +297,12 @@ StoreQueryParametersCustomizer<?> storeQueryParametersCustomizer() {
301297
}
302298
}
303299

304-
static class EchoTransformer implements ValueTransformerWithKey<String, String, String> {
300+
static class EchoProcessor implements FixedKeyProcessor<String, String, String> {
305301

306-
@Override
307-
public void init(ProcessorContext context) {
308-
}
309302

310303
@Override
311-
public String transform(String key, String value) {
312-
return value;
304+
public void process(FixedKeyRecord<String, String> fixedKeyRecord) {
305+
313306
}
314307

315308
@Override

binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/MultipleFunctionsInSameAppTests.java

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,9 @@
2929
import org.apache.kafka.clients.consumer.ConsumerRecord;
3030
import org.apache.kafka.streams.KeyValue;
3131
import org.apache.kafka.streams.StreamsConfig;
32+
import org.apache.kafka.streams.kstream.Branched;
3233
import org.apache.kafka.streams.kstream.KStream;
34+
import org.apache.kafka.streams.kstream.Named;
3335
import org.junit.jupiter.api.AfterAll;
3436
import org.junit.jupiter.api.BeforeAll;
3537
import org.junit.jupiter.api.Test;
@@ -65,8 +67,7 @@ class MultipleFunctionsInSameAppTests {
6567
@BeforeAll
6668
public static void setUp() {
6769
embeddedKafka = EmbeddedKafkaCondition.getBroker();
68-
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("purchase-groups", "false",
69-
embeddedKafka);
70+
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps(embeddedKafka, "purchase-groups", false);
7071
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
7172
DefaultKafkaConsumerFactory<String, String> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
7273
consumer = cf.createConsumer();
@@ -80,7 +81,7 @@ public static void tearDown() {
8081

8182
@Test
8283
@SuppressWarnings("unchecked")
83-
void testMultiFunctionsInSameApp() throws InterruptedException {
84+
void testMultiFunctionsInSameApp(EmbeddedKafkaBroker embeddedKafka) throws InterruptedException {
8485
SpringApplication app = new SpringApplication(MultipleFunctionsInSameApp.class);
8586
app.setWebApplicationType(WebApplicationType.NONE);
8687

@@ -103,7 +104,7 @@ void testMultiFunctionsInSameApp() throws InterruptedException {
103104
"--spring.cloud.stream.kafka.streams.binder.functions.analyze.configuration.client.id=analyze-client",
104105
"--spring.cloud.stream.kafka.streams.binder.functions.anotherProcess.configuration.client.id=anotherProcess-client",
105106
"--spring.cloud.stream.kafka.streams.binder.brokers=" + embeddedKafka.getBrokersAsString())) {
106-
receiveAndValidate("purchases", "coffee", "electronics");
107+
receiveAndValidate(embeddedKafka, "purchases", "coffee", "electronics");
107108

108109
StreamsBuilderFactoryBean processStreamsBuilderFactoryBean = context
109110
.getBean("&stream-builder-processItem", StreamsBuilderFactoryBean.class);
@@ -138,7 +139,7 @@ void testMultiFunctionsInSameApp() throws InterruptedException {
138139
}
139140

140141
@Test
141-
void testMultiFunctionsInSameAppWithMultiBinders() throws Exception {
142+
void testMultiFunctionsInSameAppWithMultiBinders(EmbeddedKafkaBroker embeddedKafka) throws Exception {
142143
SpringApplication app = new SpringApplication(MultipleFunctionsInSameApp.class);
143144
app.setWebApplicationType(WebApplicationType.NONE);
144145

@@ -168,7 +169,7 @@ void testMultiFunctionsInSameAppWithMultiBinders() throws Exception {
168169
"--spring.cloud.stream.binders.kafka2.environment.spring.cloud.stream.kafka.streams.binder.configuration.client.id=analyze-client")) {
169170

170171
Thread.sleep(1000);
171-
receiveAndValidate("purchases", "coffee", "electronics");
172+
receiveAndValidate(embeddedKafka, "purchases", "coffee", "electronics");
172173

173174
StreamsBuilderFactoryBean processStreamsBuilderFactoryBean = context
174175
.getBean("&stream-builder-processItem", StreamsBuilderFactoryBean.class);
@@ -192,7 +193,7 @@ void testMultiFunctionsInSameAppWithMultiBinders() throws Exception {
192193
}
193194
}
194195

195-
private void receiveAndValidate(String in, String... out) throws InterruptedException {
196+
private void receiveAndValidate(EmbeddedKafkaBroker embeddedKafka, String in, String... out) throws InterruptedException {
196197
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
197198
DefaultKafkaProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
198199
try {
@@ -218,9 +219,16 @@ public static class MultipleFunctionsInSameApp {
218219

219220
@Bean
220221
public Function<KStream<String, String>, KStream<String, String>[]> processItem() {
221-
return input -> input.branch(
222-
(s, p) -> p.equalsIgnoreCase("coffee"),
223-
(s, p) -> p.equalsIgnoreCase("electronics"));
222+
return input -> {
223+
Map<String, KStream<String, String>> branches = input.split(Named.as("split-"))
224+
.branch((s, p) -> p.equalsIgnoreCase("coffee"), Branched.as("coffee"))
225+
.branch((s, p) -> p.equalsIgnoreCase("electronics"), Branched.as("electronics"))
226+
.defaultBranch(Branched.as("other"));
227+
return new KStream[] {
228+
branches.get("split-coffee"),
229+
branches.get("split-electronics")
230+
};
231+
};
224232
}
225233

226234
// Testing for the scenario under https://github.com/spring-cloud/spring-cloud-stream/issues/2817

binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/function/KafkaStreamsBinderWordCountBranchesFunctionTests.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -63,8 +63,7 @@ class KafkaStreamsBinderWordCountBranchesFunctionTests {
6363
@BeforeAll
6464
public static void setUp() throws Exception {
6565
embeddedKafka = EmbeddedKafkaCondition.getBroker();
66-
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("groupx", "false",
67-
embeddedKafka);
66+
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps(embeddedKafka, "groupx", false);
6867
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
6968
DefaultKafkaConsumerFactory<String, String> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
7069
consumer = cf.createConsumer();
@@ -77,7 +76,7 @@ public static void tearDown() {
7776
}
7877

7978
@Test
80-
void kstreamWordCountWithStringInputAndPojoOuput() throws Exception {
79+
void kstreamWordCountWithStringInputAndPojoOuput(EmbeddedKafkaBroker embeddedKafka) throws Exception {
8180
SpringApplication app = new SpringApplication(WordCountProcessorApplication.class);
8281
app.setWebApplicationType(WebApplicationType.NONE);
8382

@@ -100,14 +99,14 @@ void kstreamWordCountWithStringInputAndPojoOuput() throws Exception {
10099
"=KafkaStreamsBinderWordCountBranchesFunctionTests-abc",
101100
"--spring.cloud.stream.kafka.streams.binder.brokers=" + embeddedKafka.getBrokersAsString());
102101
try {
103-
receiveAndValidate(context);
102+
receiveAndValidate(embeddedKafka);
104103
}
105104
finally {
106105
context.close();
107106
}
108107
}
109108

110-
private void receiveAndValidate(ConfigurableApplicationContext context) throws Exception {
109+
private void receiveAndValidate(EmbeddedKafkaBroker embeddedKafka) throws Exception {
111110
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
112111
DefaultKafkaProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
113112
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf, true);
@@ -193,7 +192,7 @@ public Function<KStream<Object, String>, KStream<?, WordCount>[]> process() {
193192
final Map<String, KStream<Object, WordCount>> stringKStreamMap = input
194193
.flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.ROOT).split("\\W+")))
195194
.groupBy((key, value) -> value)
196-
.windowedBy(TimeWindows.of(Duration.ofSeconds(5)))
195+
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofSeconds(5)))
197196
.count(Materialized.as("WordCounts-branch"))
198197
.toStream()
199198
.map((key, value) -> new KeyValue<>(null, new WordCount(key.key(), value,

binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/function/KafkaStreamsBinderWordCountFunctionTests.java

Lines changed: 20 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import java.time.Duration;
2020
import java.util.Arrays;
2121
import java.util.Collection;
22+
import java.util.Collections;
2223
import java.util.Date;
2324
import java.util.List;
2425
import java.util.Locale;
@@ -86,8 +87,7 @@ class KafkaStreamsBinderWordCountFunctionTests {
8687
@BeforeAll
8788
public static void setUp() {
8889
embeddedKafka = EmbeddedKafkaCondition.getBroker();
89-
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("group", "false",
90-
embeddedKafka);
90+
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps(embeddedKafka, "group", false);
9191
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
9292
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
9393
DefaultKafkaConsumerFactory<String, String> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
@@ -102,7 +102,7 @@ public static void tearDown() {
102102

103103
@Test
104104
@SuppressWarnings("unchecked")
105-
void basicKStreamTopologyExecution() throws Exception {
105+
void basicKStreamTopologyExecution(EmbeddedKafkaBroker embeddedKafka) throws Exception {
106106
SpringApplication app = new SpringApplication(WordCountProcessorApplication.class);
107107
app.setWebApplicationType(WebApplicationType.NONE);
108108

@@ -123,7 +123,7 @@ void basicKStreamTopologyExecution() throws Exception {
123123
"--spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.consumedAs=custom-consumer",
124124
"--spring.cloud.stream.kafka.streams.bindings.process-out-0.producer.producedAs=custom-producer",
125125
"--spring.cloud.stream.kafka.streams.binder.brokers=" + embeddedKafka.getBrokersAsString())) {
126-
receiveAndValidate("words", "counts");
126+
receiveAndValidate(embeddedKafka, "words", "counts");
127127

128128
final MeterRegistry meterRegistry = context.getBean(MeterRegistry.class);
129129
Thread.sleep(100);
@@ -179,7 +179,7 @@ void basicKStreamTopologyExecution() throws Exception {
179179
}
180180

181181
@Test
182-
void kstreamWordCountWithApplicationIdSpecifiedAtDefaultConsumer() {
182+
void kstreamWordCountWithApplicationIdSpecifiedAtDefaultConsumer(EmbeddedKafkaBroker embeddedKafka) throws Exception {
183183
SpringApplication app = new SpringApplication(WordCountProcessorApplication.class);
184184
app.setWebApplicationType(WebApplicationType.NONE);
185185

@@ -195,12 +195,12 @@ void kstreamWordCountWithApplicationIdSpecifiedAtDefaultConsumer() {
195195
+ "=org.apache.kafka.common.serialization.Serdes$StringSerde",
196196
"--spring.cloud.stream.kafka.binder.brokers="
197197
+ embeddedKafka.getBrokersAsString())) {
198-
receiveAndValidate("words-5", "counts-5");
198+
receiveAndValidate(embeddedKafka, "words-5", "counts-5");
199199
}
200200
}
201201

202202
@Test
203-
void kstreamWordCountFunctionWithCustomProducerStreamPartitioner() throws Exception {
203+
void kstreamWordCountFunctionWithCustomProducerStreamPartitioner(EmbeddedKafkaBroker embeddedKafka) throws Exception {
204204
SpringApplication app = new SpringApplication(WordCountProcessorApplication.class);
205205
app.setWebApplicationType(WebApplicationType.NONE);
206206

@@ -240,7 +240,7 @@ void kstreamWordCountFunctionWithCustomProducerStreamPartitioner() throws Except
240240
}
241241

242242
@Test
243-
void kstreamBinderAutoStartup() throws Exception {
243+
void kstreamBinderAutoStartup(EmbeddedKafkaBroker embeddedKafka) throws Exception {
244244
SpringApplication app = new SpringApplication(WordCountProcessorApplication.class);
245245
app.setWebApplicationType(WebApplicationType.NONE);
246246

@@ -262,7 +262,7 @@ void kstreamBinderAutoStartup() throws Exception {
262262
}
263263

264264
@Test
265-
void kstreamIndividualBindingAutoStartup() throws Exception {
265+
void kstreamIndividualBindingAutoStartup(EmbeddedKafkaBroker embeddedKafka) throws Exception {
266266
SpringApplication app = new SpringApplication(WordCountProcessorApplication.class);
267267
app.setWebApplicationType(WebApplicationType.NONE);
268268

@@ -287,8 +287,7 @@ void kstreamIndividualBindingAutoStartup() throws Exception {
287287
// The following test verifies the fixes made for this issue:
288288
// https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/774
289289
@Test
290-
void outboundNullValueIsHandledGracefully()
291-
throws Exception {
290+
void outboundNullValueIsHandledGracefully(EmbeddedKafkaBroker embeddedKafka) {
292291
SpringApplication app = new SpringApplication(OutboundNullApplication.class);
293292
app.setWebApplicationType(WebApplicationType.NONE);
294293

@@ -323,7 +322,7 @@ void outboundNullValueIsHandledGracefully()
323322
}
324323
}
325324

326-
private void receiveAndValidate(String in, String out) {
325+
private void receiveAndValidate(EmbeddedKafkaBroker embeddedKafka, String in, String out) {
327326
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
328327
DefaultKafkaProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
329328
try {
@@ -401,7 +400,7 @@ Function<KStream<Object, String>, KStream<String, WordCount>> process() {
401400
.flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.ROOT).split("\\W+")))
402401
.map((key, value) -> new KeyValue<>(value, value))
403402
.groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
404-
.windowedBy(TimeWindows.of(Duration.ofMillis(5000)))
403+
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMillis(5000)))
405404
.count(Materialized.as("foo-WordCounts"))
406405
.toStream()
407406
.map((key, value) -> new KeyValue<>(key.key(), new WordCount(key.key(), value,
@@ -429,7 +428,14 @@ public void configureBuilder(StreamsBuilder builder) {
429428

430429
@Bean
431430
StreamPartitioner<String, WordCount> streamPartitioner() {
432-
return (t, k, v, n) -> k.equals("foo") ? 0 : 1;
431+
return (topic, key, value, numPartitions) -> {
432+
if (key.equals("foo")) {
433+
return Optional.of(Collections.singleton(0));
434+
}
435+
else {
436+
return Optional.of(Collections.singleton(1));
437+
}
438+
};
433439
}
434440
}
435441

binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/integration/DlqDestinationResolverTests.java

Lines changed: 3 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@
3131
import org.apache.kafka.streams.kstream.KStream;
3232
import org.apache.kafka.streams.kstream.Materialized;
3333
import org.apache.kafka.streams.kstream.TimeWindows;
34-
import org.junit.jupiter.api.BeforeAll;
3534
import org.junit.jupiter.api.Test;
3635

3736
import org.springframework.boot.SpringApplication;
@@ -46,7 +45,6 @@
4645
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
4746
import org.springframework.kafka.core.KafkaTemplate;
4847
import org.springframework.kafka.test.EmbeddedKafkaBroker;
49-
import org.springframework.kafka.test.condition.EmbeddedKafkaCondition;
5048
import org.springframework.kafka.test.context.EmbeddedKafka;
5149
import org.springframework.kafka.test.utils.KafkaTestUtils;
5250

@@ -58,15 +56,8 @@
5856
@EmbeddedKafka(topics = {"topic1-dlq", "topic2-dlq"})
5957
class DlqDestinationResolverTests {
6058

61-
private static EmbeddedKafkaBroker embeddedKafka;
62-
63-
@BeforeAll
64-
public static void setUp() {
65-
embeddedKafka = EmbeddedKafkaCondition.getBroker();
66-
}
67-
6859
@Test
69-
void dlqDestinationResolverWorks() throws Exception {
60+
void dlqDestinationResolverWorks(EmbeddedKafkaBroker embeddedKafka) throws Exception {
7061
SpringApplication app = new SpringApplication(WordCountProcessorApplication.class);
7162
app.setWebApplicationType(WebApplicationType.NONE);
7263

@@ -92,8 +83,7 @@ void dlqDestinationResolverWorks() throws Exception {
9283
template.setDefaultTopic("word2");
9384
template.sendDefault("foobar");
9485

95-
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("some-random-group",
96-
"false", embeddedKafka);
86+
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps(embeddedKafka, "some-random-group", false);
9787
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
9888
DefaultKafkaConsumerFactory<String, String> cf = new DefaultKafkaConsumerFactory<>(
9989
consumerProps);
@@ -128,7 +118,7 @@ public Function<KStream<Object, String>, KStream<?, String>> process() {
128118
value -> Arrays.asList(value.toLowerCase(Locale.ROOT).split("\\W+")))
129119
.map((key, value) -> new KeyValue<>(value, value))
130120
.groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
131-
.windowedBy(TimeWindows.of(Duration.ofSeconds(5))).count(Materialized.as("foo-WordCounts-x"))
121+
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofSeconds(5))).count(Materialized.as("foo-WordCounts-x"))
132122
.toStream().map((key, value) -> new KeyValue<>(null,
133123
"Count for " + key.key() + " : " + value));
134124
}

0 commit comments

Comments
 (0)