Skip to content

Commit 24705cf

Browse files
committed
Fix KafkaItemReader ClassCastException during ExecutionContext deserialization
Signed-off-by: Hyunwoo Jung <[email protected]>
1 parent d4a7dfd commit 24705cf

File tree

2 files changed

+21
-12
lines changed

2 files changed

+21
-12
lines changed

spring-batch-infrastructure/src/main/java/org/springframework/batch/infrastructure/item/kafka/KafkaItemReader.java

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2019-2023 the original author or authors.
2+
* Copyright 2019-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -49,6 +49,7 @@
4949
*
5050
* @author Mathieu Ouellet
5151
* @author Mahmoud Ben Hassine
52+
* @author Hyunwoo Jung
5253
* @since 4.2
5354
*/
5455
public class KafkaItemReader<K, V> extends AbstractItemStreamItemReader<V> {
@@ -57,6 +58,8 @@ public class KafkaItemReader<K, V> extends AbstractItemStreamItemReader<V> {
5758

5859
private static final long DEFAULT_POLL_TIMEOUT = 30L;
5960

61+
private final String topicName;
62+
6063
private final List<TopicPartition> topicPartitions;
6164

6265
private @Nullable Map<TopicPartition, Long> partitionOffsets;
@@ -111,6 +114,7 @@ public KafkaItemReader(Properties consumerProperties, String topicName, List<Int
111114
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG + " property must be provided");
112115
this.consumerProperties = consumerProperties;
113116
Assert.hasLength(topicName, "Topic name must not be null or empty");
117+
this.topicName = topicName;
114118
Assert.isTrue(!partitions.isEmpty(), "At least one partition must be provided");
115119
this.topicPartitions = new ArrayList<>();
116120
for (Integer partition : partitions) {
@@ -175,10 +179,10 @@ public void open(ExecutionContext executionContext) {
175179
}
176180
}
177181
if (this.saveState && executionContext.containsKey(TOPIC_PARTITION_OFFSETS)) {
178-
Map<TopicPartition, Long> offsets = (Map<TopicPartition, Long>) executionContext
179-
.get(TOPIC_PARTITION_OFFSETS);
180-
for (Map.Entry<TopicPartition, Long> entry : offsets.entrySet()) {
181-
this.partitionOffsets.put(entry.getKey(), entry.getValue() == 0 ? 0 : entry.getValue() + 1);
182+
Map<String, Long> offsets = (Map<String, Long>) executionContext.get(TOPIC_PARTITION_OFFSETS);
183+
for (Map.Entry<String, Long> entry : offsets.entrySet()) {
184+
this.partitionOffsets.put(new TopicPartition(this.topicName, Integer.parseInt(entry.getKey())),
185+
entry.getValue() == 0 ? 0 : entry.getValue() + 1);
182186
}
183187
}
184188
this.kafkaConsumer.assign(this.topicPartitions);
@@ -205,7 +209,11 @@ public void open(ExecutionContext executionContext) {
205209
@Override
206210
public void update(ExecutionContext executionContext) {
207211
if (this.saveState) {
208-
executionContext.put(TOPIC_PARTITION_OFFSETS, new HashMap<>(this.partitionOffsets));
212+
Map<String, Long> offsets = new HashMap<>();
213+
for (Map.Entry<TopicPartition, Long> entry : this.partitionOffsets.entrySet()) {
214+
offsets.put(String.valueOf(entry.getKey().partition()), entry.getValue());
215+
}
216+
executionContext.put(TOPIC_PARTITION_OFFSETS, offsets);
209217
}
210218
this.kafkaConsumer.commitSync();
211219
}

spring-batch-infrastructure/src/test/java/org/springframework/batch/infrastructure/item/kafka/KafkaItemReaderIntegrationTests.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2019-2023 the original author or authors.
2+
* Copyright 2019-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -60,6 +60,7 @@
6060
* @author Mahmoud Ben Hassine
6161
* @author François Martin
6262
* @author Patrick Baumgartner
63+
* @author Hyunwoo Jung
6364
*/
6465
@Testcontainers(disabledWithoutDocker = true)
6566
@ExtendWith(SpringExtension.class)
@@ -267,8 +268,8 @@ void testReadFromSinglePartitionAfterRestart() throws ExecutionException, Interr
267268
future.get();
268269
}
269270
ExecutionContext executionContext = new ExecutionContext();
270-
Map<TopicPartition, Long> offsets = new HashMap<>();
271-
offsets.put(new TopicPartition("topic3", 0), 1L);
271+
Map<String, Long> offsets = new HashMap<>();
272+
offsets.put("0", 1L);
272273
executionContext.put("topic.partition.offsets", offsets);
273274

274275
// topic3-0: val0, val1, val2, val3, val4
@@ -308,9 +309,9 @@ void testReadFromMultiplePartitionsAfterRestart() throws ExecutionException, Int
308309
}
309310

310311
ExecutionContext executionContext = new ExecutionContext();
311-
Map<TopicPartition, Long> offsets = new HashMap<>();
312-
offsets.put(new TopicPartition("topic4", 0), 1L);
313-
offsets.put(new TopicPartition("topic4", 1), 2L);
312+
Map<String, Long> offsets = new HashMap<>();
313+
offsets.put("0", 1L);
314+
offsets.put("1", 2L);
314315
executionContext.put("topic.partition.offsets", offsets);
315316

316317
// topic4-0: val0, val2, val4, val6

0 commit comments

Comments
 (0)