Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion tests/kafkatest/tests/core/transactions_upgrade_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ def copy_messages_transactionally_during_upgrade(self, input_topic, output_topic

self.perform_upgrade(from_kafka_version)

copier_timeout_sec = 180
copier_timeout_sec = 360
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: due to timeouts and re-creation of producer, this copier_timeout needed to be increased. I experimented a bit and found that 360s was a consistently reliable value.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I described in https://issues.apache.org/jira/browse/KAFKA-20000, the performance regression is caused by the backoff logic. Therefore, I suggest fixing the underlying issue instead of increasing the timeout.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

kindly asking, if this is something to consider? If so, would add some test for this adjustment.

Thanks.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Pankraz76 thanks for the effort. As Justine suggested, hardcoding the timeout is a bit coarse-grained. Please refer to KAFKA-20000 for more discussion.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
copier_timeout_sec = 360
copier_timeout_sec = 360

sorry again this something for SCA. Taking away the off-topics upfront.

spotless and rewrite both ready to fix on their own.

for copier in copiers:
wait_until(lambda: copier.is_done,
timeout_sec=copier_timeout_sec,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -308,9 +308,11 @@ public static void runEventLoop(Namespace parsedArgs) {

String consumerGroup = parsedArgs.getString("consumerGroup");

final KafkaProducer<String, String> producer = createProducer(parsedArgs);
KafkaProducer<String, String> producer = createProducer(parsedArgs);
final KafkaConsumer<String, String> consumer = createConsumer(parsedArgs);

int producerNumber = 0;

final AtomicLong remainingMessages = new AtomicLong(
parsedArgs.getInt("maxMessages") == -1 ? Long.MAX_VALUE : parsedArgs.getInt("maxMessages"));

Expand Down Expand Up @@ -387,7 +389,17 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
long messagesSentWithinCurrentTxn = records.count();

ConsumerGroupMetadata groupMetadata = useGroupMetadata ? consumer.groupMetadata() : new ConsumerGroupMetadata(consumerGroup);
producer.sendOffsetsToTransaction(consumerPositions(consumer), groupMetadata);
try {
producer.sendOffsetsToTransaction(consumerPositions(consumer), groupMetadata);
} catch (KafkaException e) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMHO, shouldn't we focus on handling the TimeoutException?

AFAIK, CONCURRENT_TRANSACTIONS eventually manifests as a TimeoutException on the client side. I'm concerned that broad scope retries might mask other underlying errors. In those cases, we need to clearly identify the root cause to take appropriate action.

What do you think?

// in case the producer gets stuck here, create a new one and continue the loop
try { producer.close(Duration.ofSeconds(0)); } catch (Exception ignore) {}
parsedArgs.getAttrs().put("transactionalId", parsedArgs.getString("transactionalId") + producerNumber++);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a safer way to generate a globally unique transactionalId to avoid collisions?

Simply appending an incremental number (producerNumber++) to the user-provided ID seems risky in a shared cluster environment. If the generated ID happens to match an existing transactionalId of a running production application, it could trigger the fencing mechanism, unintentionally aborting the active transactions of that application.

Perhaps appending a UUID or a random suffix would be a safer approach to ensure uniqueness?
What do you think?

producer = createProducer(parsedArgs);
producer.initTransactions();
resetToLastCommittedPositions(consumer);
Comment on lines +395 to +400

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// in case the producer gets stuck here, create a new one and continue the loop
try { producer.close(Duration.ofSeconds(0)); } catch (Exception ignore) {}
parsedArgs.getAttrs().put("transactionalId", parsedArgs.getString("transactionalId") + producerNumber++);
producer = createProducer(parsedArgs);
producer.initTransactions();
resetToLastCommittedPositions(consumer);
circutBreaker(); // in case the producer gets stuck here, create a new one and continue the loop

could give dedicated to this concern apply single responsibility principle, giving more focus to each own. Here its just about breaking the circut, how this is actually done seems to be some kind of (randomly) changing impl. detail.

continue;
}

if (enableRandomAborts && random.nextInt() % 3 == 0) {
abortTransactionAndResetPosition(producer, consumer);
Expand Down