diff --git a/src/v/kafka/server/group_data_parser.h b/src/v/kafka/server/group_data_parser.h index 753cf42cca65..1dff125db497 100644 --- a/src/v/kafka/server/group_data_parser.h +++ b/src/v/kafka/server/group_data_parser.h @@ -101,9 +101,59 @@ class group_data_parser { b, group::aborted_tx_record_version); return handle_abort(b.header(), std::move(data)); } - if ( - b.header().type == model::record_batch_type::tx_fence - || b.header().type == model::record_batch_type::group_fence_tx) { + if (b.header().type == model::record_batch_type::tx_fence) { + /* + A potential bug in 24.2.0 resulted in a situation where tx_fence + batches were retained _after_ compaction while their corresponding + data/commit/abort batches were compacted away. This applied to + only group transactions that used tx_fence to begin the + transaction. + + Historical context: tx_fence was considered historically as + fence batch that begins a group transaction and regular data + partition transaction. That changed starting 24.2.0 where a + dedicated fence batch type (group_tx_fence) was used for group + transaction fencing. + + After this buggy compaction, these uncleaned tx_fence batches are + accounted as open transactions when computing + max_collectible_offset thus blocking further compaction after + upgrade to 24.2.x. + + We just ignore tx_fence batches going forward, the rationale is as + follows. + + - Fistly they are not currently in use starting 24.2 (in favor of + a dedicated group_tx_fence), anyone starting group transactions + from 24.2 shouldn't see any conflicts + + - For sealed transactions, commit/abort/data batches were already + removed if the compaction ran, so ignoring tx_fence should be the + right thing to in such cases without any conflicts/correctness + issues + + - Hypothetically if the compaction didn't run, it is still ok to + ignore those batches because in group transactions commited + transactions are atomically rewritten as a separate raft_data + batch along with commit marker which will be applied in the stm + (so no state will be lost) + + - Any group transaction using tx_fence likely belonged to 24.1.x + which is atleast 6 months old at the time of writing, so + reasonable to assume all such transactions are already sealed, + especially since we started enforcing max transaction timeout of + 15mins. + + - The only case where it could theoretically be a problem is + during an upgrade from 24.1.x with an open transaction upgrading + to 24.2.x (with the fix) and the transaction remaining open + throughout the upgrade which then be considered aborted (if + leadership is assumed on a 24.2.x broker). This is a highly + unlikey scenario but the suggestion is to stop all running group + transactions (kstreams) applications when doing the upgrade + */ + } + if (b.header().type == model::record_batch_type::group_fence_tx) { return parse_fence(std::move(b)); } if (b.header().type == model::record_batch_type::version_fence) { diff --git a/src/v/kafka/server/group_stm.cc b/src/v/kafka/server/group_stm.cc index 95a2fd14ec89..a9f6e71625f8 100644 --- a/src/v/kafka/server/group_stm.cc +++ b/src/v/kafka/server/group_stm.cc @@ -30,7 +30,7 @@ void group_stm::update_tx_offset( it == _producers.end() || it->second.tx == nullptr || offset_md.pid.epoch != it->second.epoch) { vlog( - cluster::txlog.warn, + cluster::txlog.debug, "producer {} not found, skipping offsets update", offset_md.pid); return; @@ -57,7 +57,7 @@ void group_stm::commit(model::producer_identity pid) { || pid.epoch != it->second.epoch) { // missing prepare may happen when the consumer log gets truncated vlog( - cluster::txlog.warn, + cluster::txlog.debug, "unable to find ongoing transaction for producer: {}, skipping " "commit", pid); diff --git a/src/v/kafka/server/group_tx_tracker_stm.cc b/src/v/kafka/server/group_tx_tracker_stm.cc index d2163506493c..23da50b43dc4 100644 --- a/src/v/kafka/server/group_tx_tracker_stm.cc +++ b/src/v/kafka/server/group_tx_tracker_stm.cc @@ -118,14 +118,9 @@ ss::future<> group_tx_tracker_stm::handle_raft_data(model::record_batch) { } ss::future<> group_tx_tracker_stm::handle_tx_offsets( - model::record_batch_header header, kafka::group_tx::offsets_metadata data) { - // in case the fence got truncated, try to start the transaction from - // this point on. This is not possible today but may help if delete - // retention is implemented for consumer topics. - maybe_add_tx_begin_offset( - std::move(data.group_id), - model::producer_identity{header.producer_id, header.producer_epoch}, - header.base_offset); + model::record_batch_header, kafka::group_tx::offsets_metadata) { + // Transaction boundaries are determined by fence/commit or abort + // batches return ss::now(); }