diff --git a/tests/kafkatest/tests/core/transactions_upgrade_test.py b/tests/kafkatest/tests/core/transactions_upgrade_test.py index 724605c6b24e6..6c952f0a626aa 100644 --- a/tests/kafkatest/tests/core/transactions_upgrade_test.py +++ b/tests/kafkatest/tests/core/transactions_upgrade_test.py @@ -179,7 +179,7 @@ def copy_messages_transactionally_during_upgrade(self, input_topic, output_topic self.perform_upgrade(from_kafka_version) - copier_timeout_sec = 180 + copier_timeout_sec = 360 for copier in copiers: wait_until(lambda: copier.is_done, timeout_sec=copier_timeout_sec, diff --git a/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java b/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java index f99252d37add8..b9345d76cb42d 100644 --- a/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java +++ b/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java @@ -308,9 +308,11 @@ public static void runEventLoop(Namespace parsedArgs) { String consumerGroup = parsedArgs.getString("consumerGroup"); - final KafkaProducer producer = createProducer(parsedArgs); + KafkaProducer producer = createProducer(parsedArgs); final KafkaConsumer consumer = createConsumer(parsedArgs); + int producerNumber = 0; + final AtomicLong remainingMessages = new AtomicLong( parsedArgs.getInt("maxMessages") == -1 ? Long.MAX_VALUE : parsedArgs.getInt("maxMessages")); @@ -387,7 +389,17 @@ public void onPartitionsAssigned(Collection partitions) { long messagesSentWithinCurrentTxn = records.count(); ConsumerGroupMetadata groupMetadata = useGroupMetadata ? consumer.groupMetadata() : new ConsumerGroupMetadata(consumerGroup); - producer.sendOffsetsToTransaction(consumerPositions(consumer), groupMetadata); + try { + producer.sendOffsetsToTransaction(consumerPositions(consumer), groupMetadata); + } catch (KafkaException e) { + // in case the producer gets stuck here, create a new one and continue the loop + try { producer.close(Duration.ofSeconds(0)); } catch (Exception ignore) {} + parsedArgs.getAttrs().put("transactionalId", parsedArgs.getString("transactionalId") + producerNumber++); + producer = createProducer(parsedArgs); + producer.initTransactions(); + resetToLastCommittedPositions(consumer); + continue; + } if (enableRandomAborts && random.nextInt() % 3 == 0) { abortTransactionAndResetPosition(producer, consumer);