@@ -974,19 +974,12 @@ rm_stm::do_mark_expired(model::producer_identity pid) {
974
974
co_return std::error_code (co_await do_try_abort_old_tx (pid));
975
975
}
976
976
977
- ss::future<result<kafka_result>> rm_stm::do_sync_and_transactional_replicate (
977
+ ss::future<result<kafka_result>> rm_stm::transactional_replicate (
978
+ model::term_id synced_term,
978
979
producer_ptr producer,
979
980
model::batch_identity bid,
980
981
model::record_batch_reader rdr,
981
982
ssx::semaphore_units units) {
982
- if (!co_await sync (_sync_timeout)) {
983
- vlog (
984
- _ctx_log.trace ,
985
- " processing name:replicate_tx pid:{} => stale leader" ,
986
- bid.pid );
987
- co_return errc::not_leader;
988
- }
989
- auto synced_term = _insync_term;
990
983
auto result = co_await do_transactional_replicate (
991
984
synced_term, producer, bid, std::move (rdr));
992
985
if (!result) {
@@ -1105,31 +1098,34 @@ ss::future<result<kafka_result>> rm_stm::transactional_replicate(
1105
1098
if (!check_tx_permitted ()) {
1106
1099
co_return errc::generic_tx_error;
1107
1100
}
1101
+ if (!co_await sync (_sync_timeout)) {
1102
+ vlog (
1103
+ _ctx_log.trace ,
1104
+ " processing name:replicate_tx pid:{} => stale leader" ,
1105
+ bid.pid );
1106
+ co_return errc::not_leader;
1107
+ }
1108
+ auto synced_term = _insync_term;
1108
1109
auto [producer, _] = maybe_create_producer (bid.pid );
1109
1110
co_return co_await producer
1110
1111
->run_with_lock ([&](ssx::semaphore_units units) {
1111
- return do_sync_and_transactional_replicate (
1112
- producer, bid, std::move (rdr), std::move (units));
1112
+ return transactional_replicate (
1113
+ synced_term, producer, bid, std::move (rdr), std::move (units));
1113
1114
})
1114
1115
.finally ([this , producer] {
1115
1116
_producer_state_manager.local ().touch (*producer, _vcluster_id);
1116
1117
});
1117
1118
}
1118
1119
1119
- ss::future<result<kafka_result>> rm_stm::do_sync_and_idempotent_replicate (
1120
+ ss::future<result<kafka_result>> rm_stm::idempotent_replicate (
1121
+ model::term_id synced_term,
1120
1122
producer_ptr producer,
1121
1123
model::batch_identity bid,
1122
1124
model::record_batch_reader br,
1123
1125
raft::replicate_options opts,
1124
1126
ss::lw_shared_ptr<available_promise<>> enqueued,
1125
1127
ssx::semaphore_units units,
1126
1128
producer_previously_known known_producer) {
1127
- if (!co_await sync (_sync_timeout)) {
1128
- // it's ok not to set enqueued on early return because
1129
- // the safety check in replicate_in_stages sets it automatically
1130
- co_return errc::not_leader;
1131
- }
1132
- auto synced_term = _insync_term;
1133
1129
auto result = co_await do_idempotent_replicate (
1134
1130
synced_term,
1135
1131
producer,
@@ -1256,10 +1252,17 @@ ss::future<result<kafka_result>> rm_stm::idempotent_replicate(
1256
1252
raft::replicate_options opts,
1257
1253
ss::lw_shared_ptr<available_promise<>> enqueued) {
1258
1254
try {
1255
+ if (!co_await sync (_sync_timeout)) {
1256
+ // it's ok not to set enqueued on early return because
1257
+ // the safety check in replicate_in_stages sets it automatically
1258
+ co_return errc::not_leader;
1259
+ }
1260
+ auto synced_term = _insync_term;
1259
1261
auto [producer, known_producer] = maybe_create_producer (bid.pid );
1260
1262
co_return co_await producer
1261
1263
->run_with_lock ([&, known_producer](ssx::semaphore_units units) {
1262
- return do_sync_and_idempotent_replicate (
1264
+ return idempotent_replicate (
1265
+ synced_term,
1263
1266
producer,
1264
1267
bid,
1265
1268
std::move (br),
0 commit comments