Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/v/cluster/rm_stm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1339,7 +1339,7 @@ model::offset rm_stm::last_stable_offset() {
}

auto synced_leader = _raft->is_leader() && _raft->term() == _insync_term;
model::offset lso{-1};
model::offset lso{model::invalid_lso};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

q: the new behavior is that it returns offset_not_available and that triggers a retry?

return error_code::offset_not_available;

If yes, I wonder (without this fix) if the translation here would throw an exception because it attempts translation before the start offset? How is it returning -1?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

from the logs

TRACE 2025-12-23 01:45:37,297 [shard 1:fetc] tx - [{kafka/source-topic/0}] - rm_stm.cc:1322 - lso update in progress, last_known_lso: -1, last_applied: 0

from the code

    auto maybe_lso = _partition->last_stable_offset();
    if (maybe_lso == model::invalid_lso) {
        return error_code::offset_not_available;
    }
    return _translator->from_log_offset(maybe_lso);

and then translator is actually just a translator_state, code here

model::offset offset_translator_state::from_log_offset(model::offset o) const {
    const auto d = delta(o);
    return model::offset(o - d);
}

no checks on valid offset nor exceptions

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO, this is a +1 for trying to use name constants as much as possible as opposed to magic numbers

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's try to be consistent. We track high watermark as the inclusive offset and add one when it is returned. Would it make sense to track LSO in the same way i.e. leave it as -1 and only add 1 when we need to retrieve it ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We track high watermark as the inclusive offset and add one when it is returned
Can you point me in the direction of where this occurs?

I poked around and it seems hwm and lso get the same treatment almost everywhere in code. Its possible that lso actually gets an increment on some fetch path, in which case, maybe invalid_lso should actually be -1

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in which case, maybe invalid_lso should actually be -1

that makes sense.. hopefully nothing breaks :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tests pass, looks good afaik

auto last_visible_index = _raft->last_visible_index();
auto next_to_apply = model::next_offset(last_applied);
if (first_tx_start <= last_visible_index) {
Expand Down Expand Up @@ -1464,7 +1464,7 @@ ss::future<bool> rm_stm::sync(model::timeout_clock::duration timeout) {
auto ready = co_await raft::persisted_stm<>::sync(timeout);
if (ready) {
if (current_insync_term != _insync_term) {
_last_known_lso = model::offset{-1};
_last_known_lso = model::invalid_lso;
vlog(
_ctx_log.trace,
"garbage collecting requests from terms < {}",
Expand Down
2 changes: 1 addition & 1 deletion src/v/cluster/rm_stm.h
Original file line number Diff line number Diff line change
Expand Up @@ -435,7 +435,7 @@ class rm_stm final : public raft::persisted_stm<> {
// Highest producer ID applied to this stm.
model::producer_id _highest_producer_id;
// for monotonicity of computed LSO.
model::offset _last_known_lso{-1};
model::offset _last_known_lso{model::invalid_lso};
/**
* LSO lock protects the LSO from being exposed before transaction begin
* batch is applied.
Expand Down
2 changes: 1 addition & 1 deletion src/v/cluster_link/model/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -1109,7 +1109,7 @@ struct shadow_topic_partition_leader_report
::model::partition_id partition;
kafka::offset source_partition_start_offset{-1};
kafka::offset source_partition_high_watermark{-1};
kafka::offset source_partition_last_stable_offset{-1};
kafka::offset source_partition_last_stable_offset{::model::invalid_lso};
std::chrono::milliseconds last_update_time{0};
kafka::offset shadow_partition_high_watermark{-1};

Expand Down
2 changes: 1 addition & 1 deletion src/v/cluster_link/replication/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ namespace cluster_link::replication {
struct partition_offsets_report {
kafka::offset source_start_offset{-1};
kafka::offset source_hwm{-1};
kafka::offset source_lso{-1};
kafka::offset source_lso{::model::invalid_lso};
ss::lowres_clock::time_point update_time{};
kafka::offset shadow_hwm{-1};

Expand Down
4 changes: 2 additions & 2 deletions src/v/kafka/client/direct_consumer/api_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ struct fetched_partition_data {
kafka::leader_epoch leader_epoch{-1};
kafka::offset start_offset{-1};
kafka::offset high_watermark{-1};
kafka::offset last_stable_offset{-1};
kafka::offset last_stable_offset{model::invalid_lso};

// the following have reasonable defaults
chunked_vector<model::record_batch> data{};
Expand Down Expand Up @@ -92,7 +92,7 @@ struct source_partition_offsets {
// The source partition's log high watermark
kafka::offset high_watermark{-1};
// The source partition's log last stable offset
kafka::offset last_stable_offset{-1};
kafka::offset last_stable_offset{model::offset_cast(model::invalid_lso)};
// The timestamp that the fetch response was received by the client
ss::lowres_clock::time_point last_offset_update_timestamp{};

Expand Down
2 changes: 1 addition & 1 deletion src/v/kafka/client/fetcher.cc
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ make_fetch_response(const model::topic_partition& tp, std::exception_ptr ex) {
.partition_index{tp.partition},
.error_code = error,
.high_watermark{model::offset{-1}},
.last_stable_offset{model::offset{-1}},
.last_stable_offset{model::invalid_lso},
.log_start_offset{model::offset{-1}},
.aborted_transactions{},
.records{}};
Expand Down
2 changes: 1 addition & 1 deletion src/v/kafka/server/fetch_session.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ struct fetch_session_partition {
, start_offset(p.log_start_offset)
, fetch_offset(p.fetch_offset)
, high_watermark(model::offset(-1))
, last_stable_offset(model::offset(-1))
, last_stable_offset(model::invalid_lso)
, current_leader_epoch(p.current_leader_epoch) {}
};
/**
Expand Down
4 changes: 2 additions & 2 deletions src/v/kafka/server/handlers/fetch.h
Original file line number Diff line number Diff line change
Expand Up @@ -275,13 +275,13 @@ struct read_result {
explicit read_result(error_code e)
: start_offset(-1)
, high_watermark(-1)
, last_stable_offset(-1)
, last_stable_offset(model::invalid_lso)
, error(e) {}

read_result(error_code e, leader_id_and_epoch leader)
: start_offset(-1)
, high_watermark(-1)
, last_stable_offset(-1)
, last_stable_offset(model::invalid_lso)
, current_leader(std::move(leader))
, error(e) {}

Expand Down
4 changes: 1 addition & 3 deletions src/v/model/fundamental.h
Original file line number Diff line number Diff line change
Expand Up @@ -306,9 +306,7 @@ inline constexpr model::offset prev_offset(model::offset o) {
return o - model::offset{1};
}

// An invalid offset indicating that actual LSO is not yet ready to be returned.
// Follows the policy that LSO is the next offset of the decided offset.
inline constexpr model::offset invalid_lso{next_offset(model::offset::min())};
inline constexpr model::offset invalid_lso{-1};

struct topic_partition_view {
topic_partition_view(model::topic_view tp, model::partition_id p)
Expand Down