Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tx/group compaction fixes #24637

Open
wants to merge 12 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 55 additions & 4 deletions src/v/kafka/server/group_data_parser.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,17 +101,68 @@ class group_data_parser {
b, group::aborted_tx_record_version);
return handle_abort(b.header(), std::move(data));
}
if (
b.header().type == model::record_batch_type::tx_fence
|| b.header().type == model::record_batch_type::group_fence_tx) {
if (b.header().type == model::record_batch_type::tx_fence) {
/*
A potential bug in 24.2.0 resulted in a situation where tx_fence
batches were retained _after_ compaction while their corresponding
data/commit/abort batches were compacted away. This applied to
only group transactions that used tx_fence to begin the
transaction.

Historical context: tx_fence was considered historically as
fence batch that begins a group transaction and regular data
partition transaction. That changed starting 24.2.0 where a
dedicated fence batch type (group_tx_fence) was used for group
transaction fencing.

After this buggy compaction, these uncleaned tx_fence batches are
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the buggy compaction fixed now? Otherwise. shouldn't we just fix the buggy compaction instead?

accounted as open transactions when computing
max_collectible_offset thus blocking further compaction after
upgrade to 24.2.x.

We just ignore tx_fence batches going forward, the rationale is as
follows.

- Fistly they are not currently in use starting 24.2 (in favor of
a dedicated group_tx_fence), anyone starting group transactions
from 24.2 shouldn't see any conflicts

- For sealed transactions, commit/abort/data batches were already
removed if the compaction ran, so ignoring tx_fence should be the
right thing to in such cases without any conflicts/correctness
issues

- Hypothetically if the compaction didn't run, it is still ok to
ignore those batches because in group transactions commited
transactions are atomically rewritten as a separate raft_data
batch along with commit marker which will be applied in the stm
(so no state will be lost)

- Any group transaction using tx_fence likely belonged to 24.1.x
which is atleast 6 months old at the time of writing, so
reasonable to assume all such transactions are already sealed,
especially since we started enforcing max transaction timeout of
15mins.

- The only case where it could theoretically be a problem is
during an upgrade from 24.1.x with an open transaction upgrading
to 24.2.x (with the fix) and the transaction remaining open
throughout the upgrade which then be considered aborted (if
leadership is assumed on a 24.2.x broker). This is a highly
unlikey scenario but the suggestion is to stop all running group
transactions (kstreams) applications when doing the upgrade
*/
return ss::now();
}
if (b.header().type == model::record_batch_type::group_fence_tx) {
return parse_fence(std::move(b));
}
if (b.header().type == model::record_batch_type::version_fence) {
auto fence = features::feature_table::decode_version_fence(
std::move(b));
return handle_version_fence(fence);
}
vlog(klog.warn, "ignoring batch with type: {}", b.header().type);
vlog(klog.debug, "ignoring batch with type: {}", b.header().type);
return ss::make_ready_future<>();
}

Expand Down
4 changes: 2 additions & 2 deletions src/v/kafka/server/group_stm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ void group_stm::update_tx_offset(
it == _producers.end() || it->second.tx == nullptr
|| offset_md.pid.epoch != it->second.epoch) {
vlog(
cluster::txlog.warn,
cluster::txlog.debug,
"producer {} not found, skipping offsets update",
offset_md.pid);
return;
Expand All @@ -57,7 +57,7 @@ void group_stm::commit(model::producer_identity pid) {
|| pid.epoch != it->second.epoch) {
// missing prepare may happen when the consumer log gets truncated
vlog(
cluster::txlog.warn,
cluster::txlog.debug,
"unable to find ongoing transaction for producer: {}, skipping "
"commit",
pid);
Expand Down
11 changes: 3 additions & 8 deletions src/v/kafka/server/group_tx_tracker_stm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -118,14 +118,9 @@ ss::future<> group_tx_tracker_stm::handle_raft_data(model::record_batch) {
}

ss::future<> group_tx_tracker_stm::handle_tx_offsets(
model::record_batch_header header, kafka::group_tx::offsets_metadata data) {
// in case the fence got truncated, try to start the transaction from
// this point on. This is not possible today but may help if delete
// retention is implemented for consumer topics.
maybe_add_tx_begin_offset(
std::move(data.group_id),
model::producer_identity{header.producer_id, header.producer_epoch},
header.base_offset);
model::record_batch_header, kafka::group_tx::offsets_metadata) {
// Transaction boundaries are determined by fence/commit or abort
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change doesn't seem to be explained by the commit message?

// batches
return ss::now();
}

Expand Down
2 changes: 1 addition & 1 deletion src/v/kafka/server/tests/consumer_group_recovery_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ struct cg_recovery_test_fixture : seastar_test {
model::record_batch make_tx_fence_batch(
const model::producer_identity& pid, group_tx::fence_metadata cmd) {
return make_tx_batch(
model::record_batch_type::tx_fence,
model::record_batch_type::group_fence_tx,
group::fence_control_record_version,
pid,
std::move(cmd));
Expand Down