diff --git a/src/v/cluster/rm_stm.cc b/src/v/cluster/rm_stm.cc index 175fc43d34418..703b65f6c7f4d 100644 --- a/src/v/cluster/rm_stm.cc +++ b/src/v/cluster/rm_stm.cc @@ -1339,7 +1339,7 @@ model::offset rm_stm::last_stable_offset() { } auto synced_leader = _raft->is_leader() && _raft->term() == _insync_term; - model::offset lso{-1}; + model::offset lso{model::invalid_lso}; auto last_visible_index = _raft->last_visible_index(); auto next_to_apply = model::next_offset(last_applied); if (first_tx_start <= last_visible_index) { @@ -1464,7 +1464,7 @@ ss::future rm_stm::sync(model::timeout_clock::duration timeout) { auto ready = co_await raft::persisted_stm<>::sync(timeout); if (ready) { if (current_insync_term != _insync_term) { - _last_known_lso = model::offset{-1}; + _last_known_lso = model::invalid_lso; vlog( _ctx_log.trace, "garbage collecting requests from terms < {}", diff --git a/src/v/cluster/rm_stm.h b/src/v/cluster/rm_stm.h index 56a1bef5a33f1..5bca164670427 100644 --- a/src/v/cluster/rm_stm.h +++ b/src/v/cluster/rm_stm.h @@ -435,7 +435,7 @@ class rm_stm final : public raft::persisted_stm<> { // Highest producer ID applied to this stm. model::producer_id _highest_producer_id; // for monotonicity of computed LSO. - model::offset _last_known_lso{-1}; + model::offset _last_known_lso{model::invalid_lso}; /** * LSO lock protects the LSO from being exposed before transaction begin * batch is applied. diff --git a/src/v/cluster_link/model/types.h b/src/v/cluster_link/model/types.h index 8510dae8d06ef..2e2022be8114c 100644 --- a/src/v/cluster_link/model/types.h +++ b/src/v/cluster_link/model/types.h @@ -1109,7 +1109,7 @@ struct shadow_topic_partition_leader_report ::model::partition_id partition; kafka::offset source_partition_start_offset{-1}; kafka::offset source_partition_high_watermark{-1}; - kafka::offset source_partition_last_stable_offset{-1}; + kafka::offset source_partition_last_stable_offset{::model::invalid_lso}; std::chrono::milliseconds last_update_time{0}; kafka::offset shadow_partition_high_watermark{-1}; diff --git a/src/v/cluster_link/replication/types.h b/src/v/cluster_link/replication/types.h index a16eff7fc2b44..ea0f17b4a4fc9 100644 --- a/src/v/cluster_link/replication/types.h +++ b/src/v/cluster_link/replication/types.h @@ -22,7 +22,7 @@ namespace cluster_link::replication { struct partition_offsets_report { kafka::offset source_start_offset{-1}; kafka::offset source_hwm{-1}; - kafka::offset source_lso{-1}; + kafka::offset source_lso{::model::invalid_lso}; ss::lowres_clock::time_point update_time{}; kafka::offset shadow_hwm{-1}; diff --git a/src/v/kafka/client/direct_consumer/api_types.h b/src/v/kafka/client/direct_consumer/api_types.h index 560fa2c6fe0b9..105f14f279bb8 100644 --- a/src/v/kafka/client/direct_consumer/api_types.h +++ b/src/v/kafka/client/direct_consumer/api_types.h @@ -64,7 +64,7 @@ struct fetched_partition_data { kafka::leader_epoch leader_epoch{-1}; kafka::offset start_offset{-1}; kafka::offset high_watermark{-1}; - kafka::offset last_stable_offset{-1}; + kafka::offset last_stable_offset{model::invalid_lso}; // the following have reasonable defaults chunked_vector data{}; @@ -92,7 +92,7 @@ struct source_partition_offsets { // The source partition's log high watermark kafka::offset high_watermark{-1}; // The source partition's log last stable offset - kafka::offset last_stable_offset{-1}; + kafka::offset last_stable_offset{model::offset_cast(model::invalid_lso)}; // The timestamp that the fetch response was received by the client ss::lowres_clock::time_point last_offset_update_timestamp{}; diff --git a/src/v/kafka/client/fetcher.cc b/src/v/kafka/client/fetcher.cc index 25d30a408f703..44286c736a3c1 100644 --- a/src/v/kafka/client/fetcher.cc +++ b/src/v/kafka/client/fetcher.cc @@ -72,7 +72,7 @@ make_fetch_response(const model::topic_partition& tp, std::exception_ptr ex) { .partition_index{tp.partition}, .error_code = error, .high_watermark{model::offset{-1}}, - .last_stable_offset{model::offset{-1}}, + .last_stable_offset{model::invalid_lso}, .log_start_offset{model::offset{-1}}, .aborted_transactions{}, .records{}}; diff --git a/src/v/kafka/server/fetch_session.h b/src/v/kafka/server/fetch_session.h index 5eb2a17fd98f3..18cf3f9392ebd 100644 --- a/src/v/kafka/server/fetch_session.h +++ b/src/v/kafka/server/fetch_session.h @@ -42,7 +42,7 @@ struct fetch_session_partition { , start_offset(p.log_start_offset) , fetch_offset(p.fetch_offset) , high_watermark(model::offset(-1)) - , last_stable_offset(model::offset(-1)) + , last_stable_offset(model::invalid_lso) , current_leader_epoch(p.current_leader_epoch) {} }; /** diff --git a/src/v/kafka/server/handlers/fetch.h b/src/v/kafka/server/handlers/fetch.h index 07dfcc0fd5158..25a1ed05dc31e 100644 --- a/src/v/kafka/server/handlers/fetch.h +++ b/src/v/kafka/server/handlers/fetch.h @@ -275,13 +275,13 @@ struct read_result { explicit read_result(error_code e) : start_offset(-1) , high_watermark(-1) - , last_stable_offset(-1) + , last_stable_offset(model::invalid_lso) , error(e) {} read_result(error_code e, leader_id_and_epoch leader) : start_offset(-1) , high_watermark(-1) - , last_stable_offset(-1) + , last_stable_offset(model::invalid_lso) , current_leader(std::move(leader)) , error(e) {} diff --git a/src/v/model/fundamental.h b/src/v/model/fundamental.h index 56f31648fd8d1..0fe61029b1923 100644 --- a/src/v/model/fundamental.h +++ b/src/v/model/fundamental.h @@ -306,9 +306,7 @@ inline constexpr model::offset prev_offset(model::offset o) { return o - model::offset{1}; } -// An invalid offset indicating that actual LSO is not yet ready to be returned. -// Follows the policy that LSO is the next offset of the decided offset. -inline constexpr model::offset invalid_lso{next_offset(model::offset::min())}; +inline constexpr model::offset invalid_lso{-1}; struct topic_partition_view { topic_partition_view(model::topic_view tp, model::partition_id p)