Skip to content

Commit

Permalink
tx/groups: escape hatch for unsafe aborting of group transactions
Browse files Browse the repository at this point in the history
  • Loading branch information
bharathv committed Dec 20, 2024
1 parent 09afe13 commit 6519159
Show file tree
Hide file tree
Showing 7 changed files with 243 additions and 0 deletions.
19 changes: 19 additions & 0 deletions src/v/cluster/tx_gateway_frontend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2974,4 +2974,23 @@ ss::future<tx::errc> tx_gateway_frontend::do_delete_partition_from_tx(
co_return tx::errc::none;
}

ss::future<tx::errc> tx_gateway_frontend::unsafe_abort_group_transaction(
kafka::group_id group,
model::producer_identity pid,
model::tx_seq tx_seq,
model::timeout_clock::duration timeout) {
auto holder = _gate.hold();
vlog(
txlog.warn,
"Issuing an unsafe abort of group transaction, group: {}, pid: {}, seq: "
"{}, timeout: {}",
group,
pid,
tx_seq,
timeout);
auto result = co_await _rm_group_proxy->abort_group_tx(
std::move(group), pid, tx_seq, timeout);
co_return result.ec;
}

} // namespace cluster
6 changes: 6 additions & 0 deletions src/v/cluster/tx_gateway_frontend.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,12 @@ class tx_gateway_frontend final
ss::future<find_coordinator_reply>
find_coordinator(kafka::transactional_id);

ss::future<tx::errc> unsafe_abort_group_transaction(
kafka::group_id,
model::producer_identity,
model::tx_seq,
model::timeout_clock::duration);

ss::future<> stop();

private:
Expand Down
40 changes: 40 additions & 0 deletions src/v/redpanda/admin/api-doc/transaction.json
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,46 @@
]
}
]
},
{
"path": "/v1/transaction/{group_id}/unsafe_abort_group_transaction",
"operations": [
{
"method": "POST",
"summary": "Unsafely abort a transaction from a group, only for debugging",
"type": "void",
"nickname": "unsafe_abort_group_transaction",
"produces": [
"application/json"
],
"parameters": [
{
"name": "group_id",
"in": "path",
"required": true,
"type": "string"
},
{
"name": "producer_id",
"in": "query",
"required": true,
"type": "integer"
},
{
"name": "producer_epoch",
"in": "query",
"required": true,
"type": "integer"
},
{
"name": "sequence",
"in": "int",
"required": true,
"type": "integer"
}
]
}
]
}
],
"models": {
Expand Down
2 changes: 2 additions & 0 deletions src/v/redpanda/admin/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -555,6 +555,8 @@ class admin_server {
delete_partition_handler(std::unique_ptr<ss::http::request>);
ss::future<ss::json::json_return_type>
find_tx_coordinator_handler(std::unique_ptr<ss::http::request>);
ss::future<ss::json::json_return_type>
unsafe_abort_group_transaction(std::unique_ptr<ss::http::request>);

/// Cluster routes
ss::future<ss::json::json_return_type>
Expand Down
78 changes: 78 additions & 0 deletions src/v/redpanda/admin/transaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
#include "cluster/partition_manager.h"
#include "cluster/tx_gateway_frontend.h"
#include "container/lw_shared_container.h"
#include "kafka/server/coordinator_ntp_mapper.h"
#include "kafka/server/server.h"
#include "redpanda/admin/api-doc/transaction.json.hh"
#include "redpanda/admin/server.h"
#include "redpanda/admin/util.h"
Expand All @@ -36,6 +38,12 @@ void admin_server::register_transaction_routes() {
[this](std::unique_ptr<ss::http::request> req) {
return find_tx_coordinator_handler(std::move(req));
});

register_route<user>(
ss::httpd::transaction_json::unsafe_abort_group_transaction,
[this](std::unique_ptr<ss::http::request> req) {
return unsafe_abort_group_transaction(std::move(req));
});
}

ss::future<ss::json::json_return_type>
Expand Down Expand Up @@ -213,3 +221,73 @@ admin_server::delete_partition_handler(std::unique_ptr<ss::http::request> req) {
co_await throw_on_error(*req, res, ntp);
co_return ss::json::json_return_type(ss::json::json_void());
}

ss::future<ss::json::json_return_type>
admin_server::unsafe_abort_group_transaction(
std::unique_ptr<ss::http::request> request) {
if (!_tx_gateway_frontend.local_is_initialized()) {
throw ss::httpd::bad_request_exception("Transaction are disabled");
}

auto group_id = request->get_path_param("group_id");
auto pid_str = request->get_query_param("producer_id");
auto epoch_str = request->get_query_param("producer_epoch");
auto sequence_str = request->get_query_param("sequence");

if (group_id.empty()) {
throw ss::httpd::bad_param_exception("group_id cannot be empty");
}

if (pid_str.empty() || epoch_str.empty() || sequence_str.empty()) {
throw ss::httpd::bad_param_exception(
"invalid producer_id/epoch, should be >= 0");
}

std::optional<model::producer_id> pid;
try {
auto parsed_pid = boost::lexical_cast<model::producer_id::type>(
pid_str);
pid = model::producer_id{parsed_pid};
} catch (const boost::bad_lexical_cast& e) {
throw ss::httpd::bad_param_exception(
fmt::format("invalid producer_id, should be >= 0: {}", e));
}

std::optional<model::producer_epoch> epoch;
try {
auto parsed_epoch = boost::lexical_cast<model::producer_epoch::type>(
epoch_str);
epoch = model::producer_epoch{parsed_epoch};
} catch (const boost::bad_lexical_cast& e) {
throw ss::httpd::bad_param_exception(
fmt::format("invalid producer_epoch, should be >= 0: {}", e));
}

std::optional<model::tx_seq> seq;
try {
auto parsed_seq = boost::lexical_cast<model::tx_seq::type>(
sequence_str);
seq = model::tx_seq{parsed_seq};
} catch (const boost::bad_lexical_cast& e) {
throw ss::httpd::bad_param_exception(
fmt::format("invalid transaction sequence, should be >= 0: {}", e));
}

auto& mapper = _kafka_server.local().coordinator_mapper();
auto kafka_gid = kafka::group_id{group_id};
auto group_ntp = mapper.ntp_for(kafka::group_id{group_id});
if (!group_ntp) {
throw ss::httpd::server_error_exception(
"consumer_offsets topic now found");
}

auto result
= co_await _tx_gateway_frontend.local().unsafe_abort_group_transaction(
std::move(kafka_gid),
model::producer_identity{pid.value(), epoch.value()},
seq.value(),
5s);

co_await throw_on_error(*request, result, group_ntp.value());
co_return ss::json::json_return_type(ss::json::json_void());
}
12 changes: 12 additions & 0 deletions tests/rptest/services/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -1849,3 +1849,15 @@ def get_debug_bundle_file(self, filename: str, node: MaybeNode = None):
def delete_debug_bundle_file(self, filename: str, node: MaybeNode = None):
path = f"debug/bundle/file/{filename}"
return self._request("DELETE", path, node=node)

def unsafe_abort_group_transaction(self, group_id: str, pid: int,
epoch: int, sequence: int):
params = {
"producer_id": pid,
"producer_epoch": epoch,
"sequence": sequence,
}
params = "&".join([f"{k}={v}" for k, v in params.items()])
return self._request(
'POST',
f"transaction/{group_id}/unsafe_abort_group_transaction?{params}")
86 changes: 86 additions & 0 deletions tests/rptest/transactions/transactions_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -945,6 +945,92 @@ def _produce_one(producer, idx):

assert num_consumed == expected_records

@cluster(num_nodes=3)
def unsafe_abort_group_transaction_test(self):
def random_group_name():
return ''.join(
random.choice(string.ascii_uppercase) for _ in range(16))

def wait_for_active_producers(count: int):
def describe_active_producers():
active_producers = []
for partition in range(0, 16):
desc = self.kafka_cli.describe_producers(
"__consumer_offsets", partition)
for producer in desc:
tx_start_offset = producer[
'CurrentTransactionStartOffset']
if 'None' in tx_start_offset:
continue
if int(tx_start_offset) >= 0:
active_producers.append(producer)
return active_producers

wait_until(
lambda: len(describe_active_producers()) == count,
timeout_sec=30,
backoff_sec=1,
err_msg=f"Timed out waiting for producer count to reach {count}"
)

group_name = random_group_name()
input_records = 10
self.generate_data(self.input_t, input_records)

# setup consumer offsets
rpk = RpkTool(self.redpanda)
rpk.consume(topic=self.input_t.name, n=1, group="test-group")

wait_for_active_producers(0)

# Setup a consumer to consume from ^^ topic and
# produce to a target topic.
producer_conf = {
'bootstrap.servers': self.redpanda.brokers(),
'transactional.id': 'test-repro',
# Large-ish timeout
'transaction.timeout.ms': 300000,
}
producer = ck.Producer(producer_conf)
consumer_conf = {
'bootstrap.servers': self.redpanda.brokers(),
'group.id': group_name,
'auto.offset.reset': 'earliest',
'enable.auto.commit': False,
}
consumer = ck.Consumer(consumer_conf)
consumer.subscribe([self.input_t])

# Consume - identity transform - produce
producer.init_transactions()
_ = self.consume(consumer)
# Start a transaction and flush some offsets
producer.begin_transaction()
producer.send_offsets_to_transaction(
consumer.position(consumer.assignment()),
consumer.consumer_group_metadata())
producer.flush()

wait_until(lambda: len(self.admin.get_all_transactions()) == 1,
timeout_sec=30,
backoff_sec=1,
err_msg="Timed out waiting for transaction to appear")

wait_for_active_producers(1)

self.admin.unsafe_abort_group_transaction(group_id=group_name,
pid=1,
epoch=0,
sequence=0)
wait_for_active_producers(0)
producer.commit_transaction()

wait_until(
lambda: self.no_running_transactions(),
timeout_sec=30,
backoff_sec=1,
err_msg="Timed out waiting for running transactions to wind down.")


class TransactionsStreamsTest(RedpandaTest, TransactionsMixin):
topics = (TopicSpec(partition_count=1, replication_factor=3),
Expand Down

0 comments on commit 6519159

Please sign in to comment.