Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tx/group compaction fixes #24637

Open
wants to merge 12 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions src/v/kafka/server/group.cc
Original file line number Diff line number Diff line change
Expand Up @@ -225,11 +225,13 @@ bool group::valid_previous_state(group_state s) const {
group::ongoing_transaction::ongoing_transaction(
model::tx_seq tx_seq,
model::partition_id coordinator_partition,
model::timeout_clock::duration tx_timeout)
model::timeout_clock::duration tx_timeout,
model::offset begin_offset)
: tx_seq(tx_seq)
, coordinator_partition(coordinator_partition)
, timeout(tx_timeout)
, last_update(model::timeout_clock::now()) {}
, last_update(model::timeout_clock::now())
, begin_offset(begin_offset) {}

group::tx_producer::tx_producer(model::producer_epoch epoch)
: epoch(epoch) {}
Expand Down Expand Up @@ -1934,7 +1936,8 @@ group::begin_tx(cluster::begin_group_tx_request r) {
r.pid.get_id(), r.pid.get_epoch());
producer_it->second.epoch = r.pid.get_epoch();
producer_it->second.transaction = std::make_unique<ongoing_transaction>(
ongoing_transaction(r.tx_seq, r.tm_partition, r.timeout));
ongoing_transaction(
r.tx_seq, r.tm_partition, r.timeout, result.value().last_offset));

try_arm(producer_it->second.transaction->deadline());

Expand Down
11 changes: 9 additions & 2 deletions src/v/kafka/server/group.h
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,10 @@ class group final : public ss::enable_lw_shared_from_this<group> {
*/
struct ongoing_transaction {
ongoing_transaction(
model::tx_seq, model::partition_id, model::timeout_clock::duration);
model::tx_seq,
model::partition_id,
model::timeout_clock::duration,
model::offset);

model::tx_seq tx_seq;
model::partition_id coordinator_partition;
Expand All @@ -177,6 +180,7 @@ class group final : public ss::enable_lw_shared_from_this<group> {
model::timeout_clock::time_point last_update;

bool is_expiration_requested{false};
model::offset begin_offset{-1};

model::timeout_clock::time_point deadline() const {
return last_update + timeout;
Expand All @@ -200,6 +204,8 @@ class group final : public ss::enable_lw_shared_from_this<group> {
std::unique_ptr<ongoing_transaction> transaction;
};

using producers_map = chunked_hash_map<model::producer_id, tx_producer>;

struct offset_metadata {
model::offset log_offset;
model::offset offset;
Expand Down Expand Up @@ -657,6 +663,8 @@ class group final : public ss::enable_lw_shared_from_this<group> {
}
}

const producers_map& producers() const { return _producers; }

// helper for the kafka api: describe groups
described_group describe() const;

Expand Down Expand Up @@ -713,7 +721,6 @@ class group final : public ss::enable_lw_shared_from_this<group> {
private:
using member_map = absl::node_hash_map<kafka::member_id, member_ptr>;
using protocol_support = absl::node_hash_map<kafka::protocol_name, int>;
using producers_map = chunked_hash_map<model::producer_id, tx_producer>;

friend std::ostream& operator<<(std::ostream&, const group&);

Expand Down
61 changes: 60 additions & 1 deletion src/v/kafka/server/group_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include "kafka/server/group.h"
#include "kafka/server/group_metadata.h"
#include "kafka/server/group_recovery_consumer.h"
#include "kafka/server/group_tx_tracker_stm.h"
#include "kafka/server/logger.h"
#include "model/fundamental.h"
#include "model/namespace.h"
Expand Down Expand Up @@ -978,7 +979,7 @@ ss::future<> group_manager::do_recover_group(
if (session.tx) {
auto& tx = *session.tx;
group::ongoing_transaction group_tx(
tx.tx_seq, tx.tm_partition, tx.timeout);
tx.tx_seq, tx.tm_partition, tx.timeout, tx.begin_offset);
for (auto& [tp, o_md] : tx.offsets) {
group_tx.offsets[tp] = group::pending_tx_offset{
.offset_metadata = group_tx::partition_offset{
Expand Down Expand Up @@ -1620,6 +1621,64 @@ group_manager::describe_group(const model::ntp& ntp, const kafka::group_id& g) {
return group->describe();
}

partition_response
group_manager::describe_partition_producers(const model::ntp& ntp) {
vlog(klog.debug, "describe producers: {}", ntp);
partition_response response;
response.partition_index = ntp.tp.partition;
auto it = _partitions.find(ntp);
if (it == _partitions.end() || !it->second->partition->is_leader()) {
response.error_code = error_code::not_leader_for_partition;
return response;
}
response.error_code = kafka::error_code::none;
// snapshot the list of groups attached to this partition
fragmented_vector<std::pair<group_id, group_ptr>> groups;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

chunked_vector ?

std::copy_if(
_groups.begin(),
_groups.end(),
std::back_inserter(groups),
[&ntp](auto g_pair) {
const auto& [group_id, group] = g_pair;
return group->partition()->ntp() == ntp;
});
for (auto& [gid, group] : groups) {
if (group->in_state(group_state::dead)) {
continue;
}
auto partition = group->partition();
if (!partition) {
// unlikely, conservative check
continue;
}
for (const auto& [id, state] : group->producers()) {
auto& tx = state.transaction;
int64_t start_offset = -1;
if (tx && tx->begin_offset >= model::offset{0}) {
start_offset = partition->get_offset_translator_state()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it possible that tx->begin_offset doesn't exist anymore/removed by retention?

->from_log_offset(tx->begin_offset);
}
int64_t last_timetamp = -1;
if (tx) {
auto time_since_last_update = model::timeout_clock::now()
- tx->last_update;
auto last_update_ts
= (model::timestamp_clock::now() - time_since_last_update);
last_timetamp = last_update_ts.time_since_epoch() / 1ms;
}
response.active_producers.push_back({
.producer_id = id,
.producer_epoch = state.epoch,
.last_sequence = tx ? tx->tx_seq : -1,
.last_timestamp = last_timetamp,
.coordinator_epoch = -1,
.current_txn_start_offset = start_offset,
});
}
}
return response;
}

ss::future<std::vector<deletable_group_result>> group_manager::delete_groups(
std::vector<std::pair<model::ntp, group_id>> groups) {
std::vector<deletable_group_result> results;
Expand Down
3 changes: 3 additions & 0 deletions src/v/kafka/server/group_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "kafka/protocol/offset_delete.h"
#include "kafka/protocol/offset_fetch.h"
#include "kafka/protocol/schemata/delete_groups_response.h"
#include "kafka/protocol/schemata/describe_producers_response.h"
#include "kafka/protocol/schemata/list_groups_response.h"
#include "kafka/protocol/sync_group.h"
#include "kafka/protocol/txn_offset_commit.h"
Expand Down Expand Up @@ -185,6 +186,8 @@ class group_manager {

described_group describe_group(const model::ntp&, const kafka::group_id&);

partition_response describe_partition_producers(const model::ntp&);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: partition_response is confusing as it has no context, can we create an alias for this type f.e. partition_producers ?


ss::future<std::vector<deletable_group_result>>
delete_groups(std::vector<std::pair<model::ntp, group_id>>);

Expand Down
6 changes: 4 additions & 2 deletions src/v/kafka/server/group_recovery_consumer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ ss::future<> group_recovery_consumer::handle_fence_v1(
pid.get_epoch(),
data.tx_seq,
data.transaction_timeout_ms,
model::partition_id(0));
model::partition_id(0),
header.base_offset);
co_return;
}

Expand All @@ -92,7 +93,8 @@ ss::future<> group_recovery_consumer::handle_fence(
pid.get_epoch(),
data.tx_seq,
data.transaction_timeout_ms,
data.tm_partition);
data.tm_partition,
header.base_offset);
co_return;
}

Expand Down
4 changes: 3 additions & 1 deletion src/v/kafka/server/group_stm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -103,14 +103,16 @@ void group_stm::try_set_fence(
model::producer_epoch epoch,
model::tx_seq txseq,
model::timeout_clock::duration transaction_timeout_ms,
model::partition_id tm_partition) {
model::partition_id tm_partition,
model::offset fence_offset) {
auto [it, _] = _producers.try_emplace(id, epoch);
if (it->second.epoch <= epoch) {
it->second.epoch = epoch;
it->second.tx = std::make_unique<ongoing_tx>(ongoing_tx{
.tx_seq = txseq,
.tm_partition = tm_partition,
.timeout = transaction_timeout_ms,
.begin_offset = fence_offset,
.offsets = {},
});
}
Expand Down
4 changes: 3 additions & 1 deletion src/v/kafka/server/group_stm.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ class group_stm {
model::tx_seq tx_seq;
model::partition_id tm_partition;
model::timeout_clock::duration timeout;
model::offset begin_offset{-1};
chunked_hash_map<model::topic_partition, group::offset_metadata>
offsets;
};
Expand All @@ -57,7 +58,8 @@ class group_stm {
model::producer_epoch epoch,
model::tx_seq txseq,
model::timeout_clock::duration transaction_timeout_ms,
model::partition_id tm_partition);
model::partition_id tm_partition,
model::offset fence_offset);

bool has_data() const {
return !_is_removed && (_is_loaded || _offsets.size() > 0);
Expand Down
24 changes: 21 additions & 3 deletions src/v/kafka/server/group_tx_tracker_stm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,30 @@ ss::future<> group_tx_tracker_stm::do_apply(const model::record_batch& b) {

model::offset group_tx_tracker_stm::max_collectible_offset() {
auto result = last_applied_offset();
for (const auto& [_, group_state] : _all_txs) {
for (const auto& [gid, group_state] : _all_txs) {
if (!group_state.begin_offsets.empty()) {
result = std::min(
result, model::prev_offset(*group_state.begin_offsets.begin()));
auto group_least_begin = *group_state.begin_offsets.begin();
result = std::min(result, model::prev_offset(group_least_begin));
vlog(
klog.trace,
"[{}] group: {}, earliest tx begin offset: {}",
_raft->ntp(),
gid,
group_least_begin);
if (klog.is_enabled(ss::log_level::trace)) {
for (auto& [pid, offset] : group_state.producer_to_begin) {
vlog(
klog.trace,
"[{}] group: {}, producer: {}, begin: {}",
_raft->ntp(),
Comment on lines +69 to +70
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can we add more context to this log line ? i.e. it is not known what the begin is

gid,
pid,
offset);
}
}
}
}

return result;
}

Expand Down
50 changes: 26 additions & 24 deletions src/v/kafka/server/group_tx_tracker_stm.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,30 @@ class group_tx_tracker_stm final
, public group_data_parser<group_tx_tracker_stm> {
public:
static constexpr std::string_view name = "group_tx_tracker_stm";
struct per_group_state
: serde::envelope<
per_group_state,
serde::version<0>,
serde::compat_version<0>> {
per_group_state() = default;

per_group_state(model::producer_identity pid, model::offset offset) {
maybe_add_tx_begin(pid, offset);
}

void
maybe_add_tx_begin(model::producer_identity pid, model::offset offset);

absl::btree_set<model::offset> begin_offsets;

absl::btree_map<model::producer_identity, model::offset>
producer_to_begin;

auto serde_fields() {
return std::tie(begin_offsets, producer_to_begin);
}
};
using all_txs_t = absl::btree_map<kafka::group_id, per_group_state>;

group_tx_tracker_stm(ss::logger&, raft::consensus*);

Expand Down Expand Up @@ -72,31 +96,9 @@ class group_tx_tracker_stm final
model::record_batch_header, kafka::group_tx::commit_metadata);
ss::future<> handle_version_fence(features::feature_table::version_fence);

private:
struct per_group_state
: serde::envelope<
per_group_state,
serde::version<0>,
serde::compat_version<0>> {
per_group_state() = default;

per_group_state(model::producer_identity pid, model::offset offset) {
maybe_add_tx_begin(pid, offset);
}

void
maybe_add_tx_begin(model::producer_identity pid, model::offset offset);

absl::btree_set<model::offset> begin_offsets;

absl::btree_map<model::producer_identity, model::offset>
producer_to_begin;
const all_txs_t& inflight_transactions() const { return _all_txs; }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This part seems not relevant for current commit ?


auto serde_fields() {
return std::tie(begin_offsets, producer_to_begin);
}
};
using all_txs_t = absl::btree_map<kafka::group_id, per_group_state>;
private:
struct snapshot
: serde::envelope<snapshot, serde::version<0>, serde::compat_version<0>> {
all_txs_t transactions;
Expand Down
15 changes: 12 additions & 3 deletions src/v/kafka/server/handlers/describe_producers.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
#include "kafka/protocol/errors.h"
#include "kafka/protocol/produce.h"
#include "kafka/protocol/schemata/describe_producers_response.h"
#include "kafka/server/group_manager.h"
#include "kafka/server/group_router.h"
#include "kafka/server/handlers/details/security.h"
#include "kafka/server/request_context.h"
#include "kafka/server/response.h"
Expand All @@ -41,8 +43,8 @@ make_error_response(model::partition_id id, kafka::error_code ec) {
return partition_response{.partition_index = id, .error_code = ec};
}

partition_response
do_get_producers_for_partition(cluster::partition_manager& pm, model::ktp ntp) {
partition_response do_get_producers_for_data_partition(
cluster::partition_manager& pm, model::ktp ntp) {
auto partition = pm.get(ntp);
if (!partition || !partition->is_leader()) {
return make_error_response(
Expand Down Expand Up @@ -97,9 +99,16 @@ get_producers_for_partition(request_context& ctx, model::ktp ntp) {
ntp.get_partition(), kafka::error_code::not_leader_for_partition);
}

if (ntp.get_topic() == model::kafka_consumer_offsets_topic) {
co_return co_await ctx.groups().get_group_manager().invoke_on(
*shard, [ktp = std::move(ntp)](kafka::group_manager& gm) mutable {
return gm.describe_partition_producers(ktp.to_ntp());
});
}

co_return co_await ctx.partition_manager().invoke_on(
*shard, [ntp = std::move(ntp)](cluster::partition_manager& pm) mutable {
return do_get_producers_for_partition(pm, std::move(ntp));
return do_get_producers_for_data_partition(pm, std::move(ntp));
});
}

Expand Down
2 changes: 1 addition & 1 deletion tests/rptest/clients/kafka_cli_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -486,7 +486,7 @@ def describe_producers(self, topic: str, partition: int):
split_str = res.split("\n")
info_str = split_str[0]
info_key = info_str.strip().split("\t")
assert info_key == expected_columns, f"{info_key}"
assert info_key == expected_columns, f"{info_key} vs {expected_columns}"

assert split_str[-1] == ""

Expand Down
Loading