Skip to content

Commit

Permalink
Merge pull request #23301 from bharathv/fix-23296
Browse files Browse the repository at this point in the history
kafka: fix fetching from partitions with negative indexes
  • Loading branch information
piyushredpanda authored Sep 13, 2024
2 parents 915b229 + f15f38d commit 0341050
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 14 deletions.
26 changes: 12 additions & 14 deletions src/v/container/contiguous_range_map.h
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,7 @@ class contiguous_range_map {
* Determines if element comparing equal to given key exists in a map.
*/
bool contains(KeyT key) const {
check_key(key);
return _values.size() > static_cast<size_t>(key)
return valid_key(key) && _values.size() > static_cast<size_t>(key)
&& _values[key].has_value();
}

Expand All @@ -165,7 +164,10 @@ class contiguous_range_map {
*/
template<typename... Args>
std::pair<iterator, bool> emplace(KeyT key, Args&&... args) {
check_key(key);
if (!valid_key(key)) {
throw std::invalid_argument(
fmt::format("Invalid key {}, must be positive", key));
}
/**
* Inserting last element directly with emplace_back
*/
Expand Down Expand Up @@ -234,8 +236,7 @@ class contiguous_range_map {
* element is not present in the map
*/
iterator find(KeyT key) {
check_key(key);
if (static_cast<size_t>(key) < _values.size()) {
if (valid_key(key) && static_cast<size_t>(key) < _values.size()) {
auto it = _values.begin() + static_cast<size_t>(key);
if (it->has_value()) {
return iterator(this, it);
Expand All @@ -248,8 +249,7 @@ class contiguous_range_map {
* element is not present in the map
*/
const_iterator find(KeyT key) const {
check_key(key);
if (static_cast<size_t>(key) < _values.size()) {
if (valid_key(key) && static_cast<size_t>(key) < _values.size()) {
auto it = _values.begin() + static_cast<size_t>(key);
if (it->has_value()) {
return const_iterator(this, it);
Expand All @@ -259,12 +259,12 @@ class contiguous_range_map {
}
/**
* Removes the element with given key from the map.
* Invalid keys are ignored.
*
* NOTE: it does not shrink the underlying data structure.
*/
void erase(KeyT key) {
check_key(key);
if (static_cast<size_t>(key) < _values.size()) {
if (valid_key(key) && static_cast<size_t>(key) < _values.size()) {
if (_values[static_cast<size_t>(key)].has_value()) {
_values[static_cast<size_t>(key)].reset();
--_size;
Expand Down Expand Up @@ -293,11 +293,9 @@ class contiguous_range_map {
}

private:
static void check_key(KeyT key) {
vassert(
key >= 0,
"contiguous_range_map keys must be positive. current key {}",
key);
static bool valid_key(KeyT key) {
// map keys must be positive.
return key >= 0;
}
underlying_t _values;
// number of valid entries
Expand Down
33 changes: 33 additions & 0 deletions src/v/kafka/server/tests/fetch_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,39 @@ FIXTURE_TEST(fetch_response_iterator_test, redpanda_thread_fixture) {
}
};

FIXTURE_TEST(fetch_non_existent, redpanda_thread_fixture) {
model::topic topic("foo");
model::partition_id pid(0);
auto ntp = make_default_ntp(topic, pid);
auto log_config = make_default_config();
wait_for_controller_leadership().get0();
add_topic(model::topic_namespace_view(ntp)).get();
kafka::fetch_request non_existent_ntp;
non_existent_ntp.data.max_wait_ms = std::chrono::milliseconds(1000);
non_existent_ntp.data.topics.emplace_back(kafka::fetch_topic{
.name = topic,
.fetch_partitions = {{
.partition_index = model::partition_id{-1},
.current_leader_epoch = kafka::leader_epoch(0),
.fetch_offset = model::offset(0),
}}});

auto client = make_kafka_client().get0();
client.connect().get();
auto defer = ss::defer([&client] {
client.stop().then([&client] { client.shutdown(); }).get();
});
auto resp = client
.dispatch(std::move(non_existent_ntp), kafka::api_version(6))
.get0();
BOOST_REQUIRE_EQUAL(resp.data.topics.size(), 1);
BOOST_REQUIRE(resp.data.topics.at(0).errored());
BOOST_REQUIRE_EQUAL(resp.data.topics.at(0).partitions.size(), 1);
BOOST_REQUIRE_EQUAL(
resp.data.topics.at(0).partitions.at(0).error_code,
kafka::error_code::unknown_topic_or_partition);
}

FIXTURE_TEST(fetch_empty, redpanda_thread_fixture) {
// create a topic partition with some data
model::topic topic("foo");
Expand Down

0 comments on commit 0341050

Please sign in to comment.