Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tx/group compaction fixes #24637

Open
wants to merge 12 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 22 additions & 6 deletions src/v/cluster/producer_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ class request {
}
}

seq_t first_sequence() const { return _first_sequence; }
seq_t last_sequence() const { return _last_sequence; }
model::term_id term() const { return _term; }

void set_value(request_result_t::value_type);
void set_error(request_result_t::error_type);
void mark_request_in_progress() { _state = request_state::in_progress; }
Expand Down Expand Up @@ -106,6 +110,14 @@ class request {
// Kafka clients only issue requests in batches of 5, the queue is fairly small
// at all times.
class requests {
private:
static constexpr int32_t requests_cached_max = 5;
// chunk size of the request containers to avoid wastage.
static constexpr size_t chunk_size = std::bit_ceil(
static_cast<unsigned long>(requests_cached_max));

using request_queue = ss::chunked_fifo<request_ptr, chunk_size>;

public:
result<request_ptr> try_emplace(
seq_t first, seq_t last, model::term_id current, bool reset_sequences);
Expand All @@ -118,17 +130,19 @@ class requests {
bool operator==(const requests&) const;
friend std::ostream& operator<<(std::ostream&, const requests&);

const request_queue& inflight_requests() const {
return _inflight_requests;
}

const request_queue& fnished_requests() const { return _finished_requests; }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo: fnished -> finished


private:
static constexpr int32_t requests_cached_max = 5;
// chunk size of the request containers to avoid wastage.
static constexpr size_t chunk_size = std::bit_ceil(
static_cast<unsigned long>(requests_cached_max));
bool is_valid_sequence(seq_t incoming) const;
std::optional<request_ptr> last_request() const;
void gc_requests_from_older_terms(model::term_id current);
void reset(request_result_t::error_type);
ss::chunked_fifo<request_ptr, chunk_size> _inflight_requests;
ss::chunked_fifo<request_ptr, chunk_size> _finished_requests;
request_queue _inflight_requests;
request_queue _finished_requests;
friend producer_state;
};

Expand Down Expand Up @@ -271,6 +285,8 @@ class producer_state {
// progress transactions with older epoch.
void reset_with_new_epoch(model::producer_epoch new_epoch);

const requests& idempotent_request_state() const { return _requests; }

private:
prefix_logger& _logger;

Expand Down
3 changes: 3 additions & 0 deletions src/v/cluster/rm_group_proxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,5 +55,8 @@ class rm_group_proxy {

virtual ss::future<abort_group_tx_reply>
abort_group_tx_locally(abort_group_tx_request) = 0;

virtual ss::future<get_producers_reply>
get_group_producers_locally(get_producers_request) = 0;
};
} // namespace cluster
52 changes: 52 additions & 0 deletions src/v/cluster/rm_partition_frontend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -559,4 +559,56 @@ ss::future<abort_tx_reply> rm_partition_frontend::do_abort_tx(
});
}

ss::future<get_producers_reply>
rm_partition_frontend::get_producers_locally(get_producers_request request) {
get_producers_reply reply;
auto partition = _partition_manager.local().get(request.ntp);
if (!partition || !partition->is_leader()) {
reply.error_code = tx::errc::not_coordinator;
co_return reply;
}
reply.error_code = tx::errc::none;
auto stm = partition->raft()->stm_manager()->get<rm_stm>();
if (!stm) {
// maybe an internal (non data) partition
co_return reply;
}
const auto& producers = stm->get_producers();
for (const auto& [pid, state] : producers) {
producer_state_info producer_info;
producer_info.pid = state->id();
// fill in the idempotent producer state.
const auto& requests = state->idempotent_request_state();
for (const auto& request : requests.inflight_requests()) {
idempotent_request_info request_info;
request_info.first_sequence = request->first_sequence();
request_info.last_sequence = request->last_sequence();
request_info.term = request->term();
producer_info.inflight_requests.push_back(std::move(request_info));
}

for (const auto& request : requests.fnished_requests()) {
idempotent_request_info request_info;
request_info.first_sequence = request->first_sequence();
request_info.last_sequence = request->last_sequence();
request_info.term = request->term();
producer_info.finished_requests.push_back(std::move(request_info));
}
producer_info.last_update = state->last_update_timestamp();

// Fill in transactional producer state, if any.
const auto& tx_state = state->transaction_state();
if (state->has_transaction_in_progress() && tx_state) {
producer_info.tx_begin_offset = tx_state->first;
producer_info.tx_end_offset = tx_state->last;
producer_info.tx_seq = tx_state->sequence;
producer_info.tx_timeout = tx_state->timeout;
producer_info.coordinator_partition
= tx_state->coordinator_partition;
}
reply.producers.push_back(std::move(producer_info));
}
co_return reply;
}

} // namespace cluster
2 changes: 2 additions & 0 deletions src/v/cluster/rm_partition_frontend.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ class rm_partition_frontend {
model::producer_identity,
model::tx_seq,
model::timeout_clock::duration);
ss::future<get_producers_reply>
get_producers_locally(get_producers_request);
ss::future<> stop() {
_as.request_abort();
return ss::make_ready_future<>();
Expand Down
6 changes: 6 additions & 0 deletions src/v/cluster/tx_gateway.cc
Original file line number Diff line number Diff line change
Expand Up @@ -119,4 +119,10 @@ ss::future<find_coordinator_reply> tx_gateway::find_coordinator(
co_return co_await _tx_gateway_frontend.local().find_coordinator(r.tid);
}

ss::future<get_producers_reply> tx_gateway::get_producers(
get_producers_request request, rpc::streaming_context&) {
co_return co_await _tx_gateway_frontend.local().get_producers(
std::move(request));
}

} // namespace cluster
3 changes: 3 additions & 0 deletions src/v/cluster/tx_gateway.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ class tx_gateway final : public tx_gateway_service {
ss::future<find_coordinator_reply> find_coordinator(
find_coordinator_request, rpc::streaming_context&) override;

ss::future<get_producers_reply>
get_producers(get_producers_request, rpc::streaming_context&) override;

private:
ss::sharded<cluster::tx_gateway_frontend>& _tx_gateway_frontend;
rm_group_proxy* _rm_group_proxy;
Expand Down
5 changes: 5 additions & 0 deletions src/v/cluster/tx_gateway.json
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,11 @@
"name": "find_coordinator",
"input_type": "find_coordinator_request",
"output_type": "find_coordinator_reply"
},
{
"name": "get_producers",
"input_type": "get_producers_request",
"output_type": "get_producers_reply"
}
]
}
82 changes: 82 additions & 0 deletions src/v/cluster/tx_gateway_frontend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2974,4 +2974,86 @@ ss::future<tx::errc> tx_gateway_frontend::do_delete_partition_from_tx(
co_return tx::errc::none;
}

ss::future<tx::errc> tx_gateway_frontend::unsafe_abort_group_transaction(
bharathv marked this conversation as resolved.
Show resolved Hide resolved
kafka::group_id group,
model::producer_identity pid,
model::tx_seq tx_seq,
model::timeout_clock::duration timeout) {
auto holder = _gate.hold();
vlog(
txlog.warn,
"Issuing an unsafe abort of group transaction, group: {}, pid: {}, seq: "
"{}, timeout: {}",
group,
pid,
tx_seq,
timeout);
auto result = co_await _rm_group_proxy->abort_group_tx(
std::move(group), pid, tx_seq, timeout);
co_return result.ec;
}

ss::future<get_producers_reply>
tx_gateway_frontend::get_producers(get_producers_request request) {
auto holder = _gate.hold();
const auto& ntp = request.ntp;
if (!_metadata_cache.local().contains(ntp)) {
co_return get_producers_reply{
.error_code = tx::errc::partition_not_exists};
}

auto leader_opt = _leaders.local().get_leader(ntp);
if (!leader_opt) {
co_return get_producers_reply{.error_code = tx::errc::leader_not_found};
}
auto leader = leader_opt.value();
if (leader == _self) {
auto shard = _shard_table.local().shard_for(ntp);
if (!shard.has_value()) {
co_return get_producers_reply{
.error_code = tx::errc::shard_not_found};
}
co_return co_await container().invoke_on(
shard.value(),
_ssg,
[request = std::move(request)](tx_gateway_frontend& local) mutable {
return local.get_producers_locally(std::move(request));
});
}
auto timeout = request.timeout;
auto result = co_await _connection_cache.local()
.with_node_client<tx_gateway_client_protocol>(
_self,
ss::this_shard_id(),
leader,
model::timeout_clock::now() + timeout,
[request = std::move(request),
timeout](tx_gateway_client_protocol cp) mutable {
return cp.get_producers(
std::move(request),
rpc::client_opts(
model::timeout_clock::now() + timeout));
});
if (result.has_error()) {
co_return get_producers_reply{.error_code = tx::errc::not_coordinator};
}
co_return std::move(result.value().data);
}

ss::future<get_producers_reply>
tx_gateway_frontend::get_producers_locally(get_producers_request request) {
auto& ntp = request.ntp;
bool is_consumer_offsets_ntp = ntp.ns()
== model::kafka_consumer_offsets_nt.ns()
&& ntp.tp.topic
== model::kafka_consumer_offsets_nt.tp;

if (is_consumer_offsets_ntp) {
co_return co_await _rm_group_proxy->get_group_producers_locally(
std::move(request));
}
co_return co_await _rm_partition_frontend.local().get_producers_locally(
std::move(request));
}

} // namespace cluster
11 changes: 11 additions & 0 deletions src/v/cluster/tx_gateway_frontend.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,14 @@ class tx_gateway_frontend final
ss::future<find_coordinator_reply>
find_coordinator(kafka::transactional_id);

ss::future<tx::errc> unsafe_abort_group_transaction(
kafka::group_id,
model::producer_identity,
model::tx_seq,
model::timeout_clock::duration);

ss::future<get_producers_reply> get_producers(get_producers_request);

ss::future<> stop();

private:
Expand Down Expand Up @@ -271,6 +279,9 @@ class tx_gateway_frontend final
model::timeout_clock::duration,
bool ignore_update_ts);

ss::future<get_producers_reply>
get_producers_locally(get_producers_request);

friend tx_gateway;
};
} // namespace cluster
40 changes: 40 additions & 0 deletions src/v/cluster/tx_protocol_types.cc
Original file line number Diff line number Diff line change
Expand Up @@ -236,4 +236,44 @@ std::ostream& operator<<(std::ostream& o, const try_abort_reply& r) {
r.ec);
return o;
}

std::ostream& operator<<(std::ostream& o, const idempotent_request_info& info) {
fmt::print(
o,
"{{ first: {}, last: {}, term: {} }}",
info.first_sequence,
info.last_sequence,
info.term);
return o;
}

std::ostream& operator<<(std::ostream& o, const producer_state_info& info) {
fmt::print(
o,
"{{ pid: {}, inflight_requests: {}, finished: {}, begin offset: {}, end "
"offset: {}, sequence: {}, timeout: "
"{}, coordinator: {}, last_update: {}, group: {} }}",
info.pid,
info.inflight_requests,
info.finished_requests,
info.tx_begin_offset,
info.tx_end_offset,
info.tx_seq,
info.tx_timeout,
info.coordinator_partition,
info.last_update,
info.group_id);
return o;
}

std::ostream& operator<<(std::ostream& o, const get_producers_reply& r) {
fmt::print(o, "{{ ec: {}, producers: {} }}", r.error_code, r.producers);
return o;
}

std::ostream& operator<<(std::ostream& o, const get_producers_request& r) {
fmt::print(o, "{{ ntp: {} }}", r.ntp);
return o;
}

} // namespace cluster
Loading
Loading