Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tx/group compaction fixes #24637

Open
wants to merge 12 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions src/v/cluster/tx_gateway_frontend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2974,4 +2974,23 @@ ss::future<tx::errc> tx_gateway_frontend::do_delete_partition_from_tx(
co_return tx::errc::none;
}

ss::future<tx::errc> tx_gateway_frontend::unsafe_abort_group_transaction(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add a comment describing what is "unsafe" about this operation? what invariants will break? what semantic behavior breaks?

kafka::group_id group,
model::producer_identity pid,
model::tx_seq tx_seq,
model::timeout_clock::duration timeout) {
auto holder = _gate.hold();
vlog(
txlog.warn,
"Issuing an unsafe abort of group transaction, group: {}, pid: {}, seq: "
"{}, timeout: {}",
group,
pid,
tx_seq,
timeout);
auto result = co_await _rm_group_proxy->abort_group_tx(
std::move(group), pid, tx_seq, timeout);
co_return result.ec;
}

} // namespace cluster
6 changes: 6 additions & 0 deletions src/v/cluster/tx_gateway_frontend.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,12 @@ class tx_gateway_frontend final
ss::future<find_coordinator_reply>
find_coordinator(kafka::transactional_id);

ss::future<tx::errc> unsafe_abort_group_transaction(
kafka::group_id,
model::producer_identity,
model::tx_seq,
model::timeout_clock::duration);

ss::future<> stop();

private:
Expand Down
40 changes: 40 additions & 0 deletions src/v/redpanda/admin/api-doc/transaction.json
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,46 @@
]
}
]
},
{
"path": "/v1/transaction/{group_id}/unsafe_abort_group_transaction",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: this looks a bit inconsistent, because group_id can be easily confused with transactional_id (when comparing with other endpoints)

"operations": [
{
"method": "POST",
"summary": "Unsafely abort a transaction from a group, only for debugging",
"type": "void",
"nickname": "unsafe_abort_group_transaction",
"produces": [
"application/json"
],
"parameters": [
{
"name": "group_id",
"in": "path",
"required": true,
"type": "string"
},
{
"name": "producer_id",
"in": "query",
"required": true,
"type": "integer"
},
{
"name": "producer_epoch",
"in": "query",
"required": true,
"type": "integer"
},
{
"name": "sequence",
"in": "int",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo? should be "in": "query",

"required": true,
"type": "integer"
}
]
}
]
}
],
"models": {
Expand Down
2 changes: 2 additions & 0 deletions src/v/redpanda/admin/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -555,6 +555,8 @@ class admin_server {
delete_partition_handler(std::unique_ptr<ss::http::request>);
ss::future<ss::json::json_return_type>
find_tx_coordinator_handler(std::unique_ptr<ss::http::request>);
ss::future<ss::json::json_return_type>
unsafe_abort_group_transaction(std::unique_ptr<ss::http::request>);

/// Cluster routes
ss::future<ss::json::json_return_type>
Expand Down
78 changes: 78 additions & 0 deletions src/v/redpanda/admin/transaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
#include "cluster/partition_manager.h"
#include "cluster/tx_gateway_frontend.h"
#include "container/lw_shared_container.h"
#include "kafka/server/coordinator_ntp_mapper.h"
#include "kafka/server/server.h"
#include "redpanda/admin/api-doc/transaction.json.hh"
#include "redpanda/admin/server.h"
#include "redpanda/admin/util.h"
Expand All @@ -36,6 +38,12 @@ void admin_server::register_transaction_routes() {
[this](std::unique_ptr<ss::http::request> req) {
return find_tx_coordinator_handler(std::move(req));
});

register_route<user>(
ss::httpd::transaction_json::unsafe_abort_group_transaction,
[this](std::unique_ptr<ss::http::request> req) {
return unsafe_abort_group_transaction(std::move(req));
});
}

ss::future<ss::json::json_return_type>
Expand Down Expand Up @@ -213,3 +221,73 @@ admin_server::delete_partition_handler(std::unique_ptr<ss::http::request> req) {
co_await throw_on_error(*req, res, ntp);
co_return ss::json::json_return_type(ss::json::json_void());
}

ss::future<ss::json::json_return_type>
admin_server::unsafe_abort_group_transaction(
std::unique_ptr<ss::http::request> request) {
if (!_tx_gateway_frontend.local_is_initialized()) {
throw ss::httpd::bad_request_exception("Transaction are disabled");
}

auto group_id = request->get_path_param("group_id");
auto pid_str = request->get_query_param("producer_id");
auto epoch_str = request->get_query_param("producer_epoch");
auto sequence_str = request->get_query_param("sequence");

if (group_id.empty()) {
throw ss::httpd::bad_param_exception("group_id cannot be empty");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this actually happen or it is guaranteed by the router that this is not empty? nit: Anyway, the right error here would be 404.

}

if (pid_str.empty() || epoch_str.empty() || sequence_str.empty()) {
throw ss::httpd::bad_param_exception(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should happen out of the box btw. This seems to be called implicitly when handling requests https://github.com/redpanda-data/seastar/blob/09a59a23ff2740a2fa591b0e65d978ca83d2b9e3/include/seastar/http/handlers.hh#L76

"invalid producer_id/epoch, should be >= 0");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: the error message is misaligned

}

std::optional<model::producer_id> pid;
try {
auto parsed_pid = boost::lexical_cast<model::producer_id::type>(
pid_str);
pid = model::producer_id{parsed_pid};
} catch (const boost::bad_lexical_cast& e) {
throw ss::httpd::bad_param_exception(
fmt::format("invalid producer_id, should be >= 0: {}", e));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I guess printing values should be more useful than printing the exception?

}

std::optional<model::producer_epoch> epoch;
try {
auto parsed_epoch = boost::lexical_cast<model::producer_epoch::type>(
epoch_str);
epoch = model::producer_epoch{parsed_epoch};
} catch (const boost::bad_lexical_cast& e) {
throw ss::httpd::bad_param_exception(
fmt::format("invalid producer_epoch, should be >= 0: {}", e));
}

std::optional<model::tx_seq> seq;
try {
auto parsed_seq = boost::lexical_cast<model::tx_seq::type>(
sequence_str);
seq = model::tx_seq{parsed_seq};
} catch (const boost::bad_lexical_cast& e) {
throw ss::httpd::bad_param_exception(
fmt::format("invalid transaction sequence, should be >= 0: {}", e));
}

auto& mapper = _kafka_server.local().coordinator_mapper();
auto kafka_gid = kafka::group_id{group_id};
auto group_ntp = mapper.ntp_for(kafka::group_id{group_id});
if (!group_ntp) {
throw ss::httpd::server_error_exception(
"consumer_offsets topic now found");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo now -> not

}

auto result
= co_await _tx_gateway_frontend.local().unsafe_abort_group_transaction(
std::move(kafka_gid),
model::producer_identity{pid.value(), epoch.value()},
seq.value(),
5s);

co_await throw_on_error(*request, result, group_ntp.value());
co_return ss::json::json_return_type(ss::json::json_void());
}
12 changes: 12 additions & 0 deletions tests/rptest/services/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -1849,3 +1849,15 @@ def get_debug_bundle_file(self, filename: str, node: MaybeNode = None):
def delete_debug_bundle_file(self, filename: str, node: MaybeNode = None):
path = f"debug/bundle/file/{filename}"
return self._request("DELETE", path, node=node)

def unsafe_abort_group_transaction(self, group_id: str, pid: int,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: def unsafe_abort_group_transaction(self, group_id: str, *, pid: int, to force the caller to specify the param names. too many ints that are easy to reorder

epoch: int, sequence: int):
params = {
"producer_id": pid,
"producer_epoch": epoch,
"sequence": sequence,
}
params = "&".join([f"{k}={v}" for k, v in params.items()])
return self._request(
'POST',
f"transaction/{group_id}/unsafe_abort_group_transaction?{params}")
86 changes: 86 additions & 0 deletions tests/rptest/transactions/transactions_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -945,6 +945,92 @@ def _produce_one(producer, idx):

assert num_consumed == expected_records

@cluster(num_nodes=3)
def unsafe_abort_group_transaction_test(self):
def random_group_name():
return ''.join(
random.choice(string.ascii_uppercase) for _ in range(16))

def wait_for_active_producers(count: int):
def describe_active_producers():
active_producers = []
for partition in range(0, 16):
desc = self.kafka_cli.describe_producers(
"__consumer_offsets", partition)
for producer in desc:
tx_start_offset = producer[
'CurrentTransactionStartOffset']
if 'None' in tx_start_offset:
continue
if int(tx_start_offset) >= 0:
active_producers.append(producer)
return active_producers

wait_until(
lambda: len(describe_active_producers()) == count,
timeout_sec=30,
backoff_sec=1,
err_msg=f"Timed out waiting for producer count to reach {count}"
)

group_name = random_group_name()
input_records = 10
self.generate_data(self.input_t, input_records)

# setup consumer offsets
rpk = RpkTool(self.redpanda)
rpk.consume(topic=self.input_t.name, n=1, group="test-group")

wait_for_active_producers(0)

# Setup a consumer to consume from ^^ topic and
# produce to a target topic.
producer_conf = {
'bootstrap.servers': self.redpanda.brokers(),
'transactional.id': 'test-repro',
# Large-ish timeout
'transaction.timeout.ms': 300000,
}
producer = ck.Producer(producer_conf)
consumer_conf = {
'bootstrap.servers': self.redpanda.brokers(),
'group.id': group_name,
'auto.offset.reset': 'earliest',
'enable.auto.commit': False,
}
consumer = ck.Consumer(consumer_conf)
consumer.subscribe([self.input_t])

# Consume - identity transform - produce
producer.init_transactions()
_ = self.consume(consumer)
# Start a transaction and flush some offsets
producer.begin_transaction()
producer.send_offsets_to_transaction(
consumer.position(consumer.assignment()),
consumer.consumer_group_metadata())
producer.flush()

wait_until(lambda: len(self.admin.get_all_transactions()) == 1,
timeout_sec=30,
backoff_sec=1,
err_msg="Timed out waiting for transaction to appear")

wait_for_active_producers(1)

self.admin.unsafe_abort_group_transaction(group_id=group_name,
pid=1,
epoch=0,
sequence=0)
wait_for_active_producers(0)
producer.commit_transaction()

wait_until(
lambda: self.no_running_transactions(),
timeout_sec=30,
backoff_sec=1,
err_msg="Timed out waiting for running transactions to wind down.")


class TransactionsStreamsTest(RedpandaTest, TransactionsMixin):
topics = (TopicSpec(partition_count=1, replication_factor=3),
Expand Down