Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tx/group compaction fixes #24637

Open
wants to merge 12 commits into
base: dev
Choose a base branch
from
Open

tx/group compaction fixes #24637

wants to merge 12 commits into from

Conversation

bharathv
Copy link
Contributor

@bharathv bharathv commented Dec 21, 2024

Check individual commit message for details

Main changes

  • Adds a lots of observability
    • Now consumer offsets partitions are supported in describe_producers Kafka API
    • Adds a v1/producers/kafka/topic/partition end point that gives detailed producer level debug info
  • Fixes a couple of compaction bugs with group transactions

Backports Required

  • none - not a bug fix
  • none - this is a backport
  • none - issue does not exist in previous branches
  • none - papercut/not impactful enough to backport
  • v24.3.x
  • v24.2.x
  • v24.1.x

Release Notes

Bug Fixes

  • Fixes an issue that blocked the compaction of consumer offsets with group transactions.

@bharathv
Copy link
Contributor Author

/dt

@vbotbuildovich
Copy link
Collaborator

vbotbuildovich commented Dec 21, 2024

CI test results

test results on build#60037
test_id test_kind job_url test_status passed
test_consumer_group_recovery_rpunit.test_consumer_group_recovery_rpunit unit https://buildkite.com/redpanda/redpanda/builds/60037#0193e7ef-114e-474c-b463-9426fd2ed955 FAIL 0/2
test_consumer_group_recovery_rpunit.test_consumer_group_recovery_rpunit unit https://buildkite.com/redpanda/redpanda/builds/60037#0193e7ef-1152-4aea-a1e2-19d14ea9a00a FAIL 0/2
test results on build#60049
test_id test_kind job_url test_status passed
kafka_server_rpfixture.kafka_server_rpfixture unit https://buildkite.com/redpanda/redpanda/builds/60049#0193f07e-533f-4715-b8a1-0f31c067b1da FLAKY 1/2
rptest.tests.cloud_storage_scrubber_test.CloudStorageScrubberTest.test_scrubber.cloud_storage_type=CloudStorageType.S3 ducktape https://buildkite.com/redpanda/redpanda/builds/60049#0193f0bf-dcc3-4557-af8c-d9940a72aff1 FAIL 0/1
rptest.tests.datalake.partition_movement_test.PartitionMovementTest.test_cross_core_movements.cloud_storage_type=CloudStorageType.S3 ducktape https://buildkite.com/redpanda/redpanda/builds/60049#0193f0bf-dcc1-4950-bb16-fd655ecc86af FLAKY 3/6
rptest.tests.datalake.partition_movement_test.PartitionMovementTest.test_cross_core_movements.cloud_storage_type=CloudStorageType.S3 ducktape https://buildkite.com/redpanda/redpanda/builds/60049#0193f0bf-dcc2-4f4b-8cde-76741d798378 FLAKY 4/6
rptest.tests.datalake.partition_movement_test.PartitionMovementTest.test_cross_core_movements.cloud_storage_type=CloudStorageType.S3 ducktape https://buildkite.com/redpanda/redpanda/builds/60049#0193f0c5-171e-474d-aa8e-0d0ffb161139 FLAKY 1/6
rptest.tests.datalake.partition_movement_test.PartitionMovementTest.test_cross_core_movements.cloud_storage_type=CloudStorageType.S3 ducktape https://buildkite.com/redpanda/redpanda/builds/60049#0193f0c5-171f-4e2a-8beb-403808ca77b4 FLAKY 2/6
rptest.tests.e2e_topic_recovery_test.EndToEndTopicRecovery.test_restore_with_aborted_tx.recovery_overrides=.retention.local.target.bytes.1024.redpanda.remote.write.True.redpanda.remote.read.True.cloud_storage_type=CloudStorageType.ABS ducktape https://buildkite.com/redpanda/redpanda/builds/60049#0193f0c5-171b-40a1-bfbf-37f5a6e814bf FAIL 0/1
rptest.transactions.consumer_offsets_test.VerifyConsumerOffsetsThruUpgrades.test_consumer_group_offsets.versions_to_upgrade=3 ducktape https://buildkite.com/redpanda/redpanda/builds/60049#0193f0c5-1719-46be-8112-309c6520cae5 FAIL 0/1
test results on build#60058
test_id test_kind job_url test_status passed
rptest.tests.cloud_storage_scrubber_test.CloudStorageScrubberTest.test_scrubber.cloud_storage_type=CloudStorageType.S3 ducktape https://buildkite.com/redpanda/redpanda/builds/60058#0193f31d-2f6d-4a69-8dc9-12c8e28584c6 FAIL 0/1
rptest.tests.data_migrations_api_test.DataMigrationsApiTest.test_mount_inexistent ducktape https://buildkite.com/redpanda/redpanda/builds/60058#0193f337-48f5-4f5d-b1f2-e3b95d1ca948 FLAKY 5/6
rptest.tests.datalake.partition_movement_test.PartitionMovementTest.test_cross_core_movements.cloud_storage_type=CloudStorageType.S3 ducktape https://buildkite.com/redpanda/redpanda/builds/60058#0193f31d-2f6c-47ff-9829-30ea21edb404 FLAKY 2/6
rptest.transactions.producers_api_test.ProducersAdminAPITest.test_producers_state_api_during_load ducktape https://buildkite.com/redpanda/redpanda/builds/60058#0193f31d-2f6f-44e5-8895-92629bdc07fb FAIL 0/1
rptest.transactions.producers_api_test.ProducersAdminAPITest.test_producers_state_api_during_load ducktape https://buildkite.com/redpanda/redpanda/builds/60058#0193f337-48f5-4f5d-b1f2-e3b95d1ca948 FAIL 0/1

@bharathv
Copy link
Contributor Author

/ci-repeat 3

@vbotbuildovich
Copy link
Collaborator

vbotbuildovich commented Dec 23, 2024

Retry command for Build#60049

please wait until all jobs are finished before running the slash command



/ci-repeat 1
tests/rptest/tests/cloud_storage_scrubber_test.py::CloudStorageScrubberTest.test_scrubber@{"cloud_storage_type":1}
tests/rptest/transactions/consumer_offsets_test.py::VerifyConsumerOffsetsThruUpgrades.test_consumer_group_offsets@{"versions_to_upgrade":3}
tests/rptest/tests/e2e_topic_recovery_test.py::EndToEndTopicRecovery.test_restore_with_aborted_tx@{"cloud_storage_type":2,"recovery_overrides":{"redpanda.remote.read":true,"redpanda.remote.write":true,"retention.local.target.bytes":1024}}

@bharathv bharathv marked this pull request as ready for review December 23, 2024 09:29
@bharathv bharathv requested a review from a team as a code owner December 23, 2024 09:29
A bug in 24.2.0 resulted in a situation where tx_fence batches were
retained _after_ compaction while their corresponding data/commit/abort
batches were compacted away. This applied to only group transactions
that used tx_fence to begin the transaction.

Historical context: tx_fence was considered historically as fence
batches that begin a group transaction and regular data partition
transaction. That changed starting 24.2.0 where a dedicated fence
batch type (group_tx_fence) was used for group transaction fencing.

After this buggy compaction, these uncleaned tx_fence batches are
accounted as open transactions when computing max_collectible_offset
thus blocking further compaction after upgrade to 24.2.x.

We just ignore tx_fence batches going forward, the rationale is as
follows.

- Fistly they are not currently in use starting 24.2 (in favor of
a dedicated group_tx_fence), anyone starting group transactions
from 24.2 shouldn't see any conflicts

- For sealed transactions, commit/abort/data batches were already
removed if the compaction ran, so ignoring tx_fence should be the right
thing to in such cases without any conflicts/correctness issues

- Hypothetically if the compaction didn't run, it is still ok to ignore
those batches because in group transactions commited transactions are
atomically rewritten as a separate raft_data batch along with commit
marker which will be applied in the stm (so no state will be lost)

- Any group transaction using tx_fence likely belonged to 24.1.x which
is atleast 6 months old at the time of writing, so reasonable to assume
all such transactions are already sealed, especially since we started
enforcing max transaction timeout of 15mins.

- The only case where it could theoretically be a problem is during an
upgrade from 24.1.x with an open transaction upgrading to 24.2.x (with
the fix) and the transaction remaining open throughout the upgrade
which then be considered aborted (if leadership is assumed on a 24.2.x
broker). This is a highly unlikey scenario but the suggestion is to stop
all running group transactions (kstreams) applications when doing the
upgrade

note: this only affects group transactions, so regular transactions that
donot do offset commits as a part of transactions are safe.
This will result in hanging transactions and subsequent blocking
of compaction.
.. for a given partition, to be hooked up with REST API in the next
commit.
/v1/debug/producers/{namespace}/{topic}/{partition}

.. includes low level debug information about producers for
idempotency/transactional state.
@@ -185,6 +186,8 @@ class group_manager {

described_group describe_group(const model::ntp&, const kafka::group_id&);

partition_response describe_partition_producers(const model::ntp&);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: partition_response is confusing as it has no context, can we create an alias for this type f.e. partition_producers ?

}
response.error_code = kafka::error_code::none;
// snapshot the list of groups attached to this partition
fragmented_vector<std::pair<group_id, group_ptr>> groups;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

chunked_vector ?


absl::btree_map<model::producer_identity, model::offset>
producer_to_begin;
const all_txs_t& inflight_transactions() const { return _all_txs; }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This part seems not relevant for current commit ?

Comment on lines +69 to +70
"[{}] group: {}, producer: {}, begin: {}",
_raft->ntp(),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can we add more context to this log line ? i.e. it is not known what the begin is


if (pid_str.empty() || epoch_str.empty() || sequence_str.empty()) {
throw ss::httpd::bad_param_exception(
"invalid producer_id/epoch, should be >= 0");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: the error message is misaligned

@vbotbuildovich
Copy link
Collaborator

vbotbuildovich commented Dec 23, 2024

Retry command for Build#60058

please wait until all jobs are finished before running the slash command



/ci-repeat 1
tests/rptest/transactions/producers_api_test.py::ProducersAdminAPITest.test_producers_state_api_during_load
tests/rptest/tests/cloud_storage_scrubber_test.py::CloudStorageScrubberTest.test_scrubber@{"cloud_storage_type":1}

auto& tx = state.transaction;
int64_t start_offset = -1;
if (tx && tx->begin_offset >= model::offset{0}) {
start_offset = partition->get_offset_translator_state()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it possible that tx->begin_offset doesn't exist anymore/removed by retention?

Copy link
Contributor

@nvartolomei nvartolomei left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Skimmed the PR to be up-to-date with the changes. Hope the observations are useful.

@@ -2974,4 +2974,23 @@ ss::future<tx::errc> tx_gateway_frontend::do_delete_partition_from_tx(
co_return tx::errc::none;
}

ss::future<tx::errc> tx_gateway_frontend::unsafe_abort_group_transaction(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add a comment describing what is "unsafe" about this operation? what invariants will break? what semantic behavior breaks?

},
{
"name": "sequence",
"in": "int",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo? should be "in": "query",

}

if (pid_str.empty() || epoch_str.empty() || sequence_str.empty()) {
throw ss::httpd::bad_param_exception(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should happen out of the box btw. This seems to be called implicitly when handling requests https://github.com/redpanda-data/seastar/blob/09a59a23ff2740a2fa591b0e65d978ca83d2b9e3/include/seastar/http/handlers.hh#L76

auto sequence_str = request->get_query_param("sequence");

if (group_id.empty()) {
throw ss::httpd::bad_param_exception("group_id cannot be empty");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this actually happen or it is guaranteed by the router that this is not empty? nit: Anyway, the right error here would be 404.

@@ -1849,3 +1849,15 @@ def get_debug_bundle_file(self, filename: str, node: MaybeNode = None):
def delete_debug_bundle_file(self, filename: str, node: MaybeNode = None):
path = f"debug/bundle/file/{filename}"
return self._request("DELETE", path, node=node)

def unsafe_abort_group_transaction(self, group_id: str, pid: int,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: def unsafe_abort_group_transaction(self, group_id: str, *, pid: int, to force the caller to specify the param names. too many ints that are easy to reorder

dedicated fence batch type (group_tx_fence) was used for group
transaction fencing.

After this buggy compaction, these uncleaned tx_fence batches are
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the buggy compaction fixed now? Otherwise. shouldn't we just fix the buggy compaction instead?

model::producer_identity{header.producer_id, header.producer_epoch},
header.base_offset);
model::record_batch_header, kafka::group_tx::offsets_metadata) {
// Transaction boundaries are determined by fence/commit or abort
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change doesn't seem to be explained by the commit message?

@@ -92,6 +92,46 @@
]
}
]
},
{
"path": "/v1/transaction/{group_id}/unsafe_abort_group_transaction",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: this looks a bit inconsistent, because group_id can be easily confused with transactional_id (when comparing with other endpoints)

cluster::get_producers_request{ntp, timeout});
if (result.error_code != cluster::tx::errc::none) {
throw ss::httpd::server_error_exception(fmt::format(
"Error {} processing partition state for ntp: {}",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: "processing partition state" sounds a bit ambiguous, maybe "getting producers for ntp:" instead?

const model::ntp ntp = parse_ntp_from_request(req->param);
auto timeout = std::chrono::duration_cast<model::timeout_clock::duration>(
10s);
auto result = co_await _tx_gateway_frontend.local().get_producers(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we check here if the frontend is initialized (as is done for the other endpoint)?

pid = model::producer_id{parsed_pid};
} catch (const boost::bad_lexical_cast& e) {
throw ss::httpd::bad_param_exception(
fmt::format("invalid producer_id, should be >= 0: {}", e));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I guess printing values should be more useful than printing the exception?

auto group_ntp = mapper.ntp_for(kafka::group_id{group_id});
if (!group_ntp) {
throw ss::httpd::server_error_exception(
"consumer_offsets topic now found");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo now -> not

producers.producers.push(std::move(producer_state));
co_await ss::coroutine::maybe_yield();
}
co_return ss::json::json_return_type(std::move(producers));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ss::json::stream_range_as_array?

return _inflight_requests;
}

const request_queue& fnished_requests() const { return _finished_requests; }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo: fnished -> finished

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants