Skip to content

Commit

Permalink
Merge pull request #24604 from vbotbuildovich/backport-pr-24537-v24.2…
Browse files Browse the repository at this point in the history
….x-260

[v24.2.x] pandaproxy: add max memory check for incoming requests
  • Loading branch information
IoannisRP authored Dec 18, 2024
2 parents 17cfda4 + 35f6032 commit cd11afe
Show file tree
Hide file tree
Showing 8 changed files with 201 additions and 12 deletions.
75 changes: 75 additions & 0 deletions src/v/pandaproxy/probe.cc
Original file line number Diff line number Diff line change
Expand Up @@ -131,4 +131,79 @@ void probe::setup_metrics() {
}
}

server_probe::server_probe(
server::context_t& ctx, const ss::sstring& group_name)
: _ctx(ctx)
, _group_name(group_name)
, _metrics()
, _public_metrics() {
setup_metrics();
}

void server_probe::setup_metrics() {
namespace sm = ss::metrics;

auto setup_common = [this]<typename MetricDef>() {
const auto usage = [](const size_t current, const size_t max) {
constexpr double divide_by_zero = -1.;
constexpr double invalid_values = -2.;
if (max == 0) {
return divide_by_zero;
}
if (current > max) {
return invalid_values;
}
const auto max_d = static_cast<double>(max);
const auto current_d = static_cast<double>(current);
return (max_d - current_d) / max_d;
};

std::vector<MetricDef> defs;
defs.reserve(3);
defs.emplace_back(
sm::make_gauge(
"inflight_requests_usage_ratio",
[this, usage] {
return usage(_ctx.inflight_sem.current(), _ctx.max_inflight);
},
sm::description(ssx::sformat(
"Usage ratio of in-flight requests in the {}", _group_name)))
.aggregate({}));
defs.emplace_back(
sm::make_gauge(
"inflight_requests_memory_usage_ratio",
[this, usage] {
return usage(_ctx.mem_sem.current(), _ctx.max_memory);
},
sm::description(ssx::sformat(
"Memory usage ratio of in-flight requests in the {}",
_group_name)))
.aggregate({}));
defs.emplace_back(
sm::make_gauge(
"queued_requests_memory_blocked",
[this] { return _ctx.mem_sem.waiters(); },
sm::description(ssx::sformat(
"Number of requests queued in {}, due to memory limitations",
_group_name)))
.aggregate({}));
return defs;
};

if (!config::shard_local_cfg().disable_metrics()) {
_metrics.add_group(
_group_name,
setup_common
.template operator()<ss::metrics::impl::metric_definition_impl>(),
{},
{});
}

if (!config::shard_local_cfg().disable_public_metrics()) {
_public_metrics.add_group(
_group_name,
setup_common.template operator()<ss::metrics::metric_definition>());
}
}

} // namespace pandaproxy
19 changes: 19 additions & 0 deletions src/v/pandaproxy/probe.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#pragma once

#include "metrics/metrics.h"
#include "pandaproxy/server.h"
#include "utils/log_hist.h"

#include <seastar/core/metrics_registration.hh>
Expand Down Expand Up @@ -81,4 +82,22 @@ class probe {
metrics::public_metric_groups _public_metrics;
};

class server_probe {
public:
server_probe(server::context_t& ctx, const ss::sstring& group_name);
server_probe(const server_probe&) = delete;
server_probe& operator=(const server_probe&) = delete;
server_probe(server_probe&&) = delete;
server_probe& operator=(server_probe&&) = delete;
~server_probe() = default;

private:
void setup_metrics();

server::context_t& _ctx;
const ss::sstring& _group_name;
metrics::internal_metric_groups _metrics;
metrics::public_metric_groups _public_metrics;
};

} // namespace pandaproxy
4 changes: 4 additions & 0 deletions src/v/pandaproxy/reply.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ inline ss::http::reply& set_reply_too_many_requests(ss::http::reply& rep) {
.add_header("Retry-After", "0");
}

inline ss::http::reply& set_reply_payload_too_large(ss::http::reply& rep) {
return rep.set_status(ss::http::reply::status_type::payload_too_large);
}

inline std::unique_ptr<ss::http::reply> reply_unavailable() {
auto rep = std::make_unique<ss::http::reply>(ss::http::reply{});
set_reply_unavailable(*rep);
Expand Down
9 changes: 6 additions & 3 deletions src/v/pandaproxy/rest/proxy.cc
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ proxy::proxy(
, _inflight_config_binding(config::shard_local_cfg().max_in_flight_pandaproxy_requests_per_shard.bind())
, _client(client)
, _client_cache(client_cache)
, _ctx{{{{}, _mem_sem, _inflight_sem, {}, smp_sg}, *this},
, _ctx{{{{}, max_memory, _mem_sem, _inflight_config_binding(), _inflight_sem, {}, smp_sg}, *this},
{config::always_true(), config::shard_local_cfg().superusers.bind(), controller},
_config.pandaproxy_api.value()}
, _server(
Expand All @@ -126,8 +126,11 @@ proxy::proxy(
json::serialization_format::application_json)
, _ensure_started{[this]() { return do_start(); }}
, _controller(controller) {
_inflight_config_binding.watch(
[this]() { _inflight_sem.set_capacity(_inflight_config_binding()); });
_inflight_config_binding.watch([this]() {
const size_t capacity = _inflight_config_binding();
_inflight_sem.set_capacity(capacity);
_ctx.max_inflight = capacity;
});
}

ss::future<> proxy::start() {
Expand Down
9 changes: 6 additions & 3 deletions src/v/pandaproxy/schema_registry/service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -612,7 +612,7 @@ service::service(
config::shard_local_cfg()
.max_in_flight_schema_registry_requests_per_shard.bind())
, _client(client)
, _ctx{{{}, _mem_sem, _inflight_sem, {}, smp_sg}, *this}
, _ctx{{{}, max_memory, _mem_sem, _inflight_config_binding(), _inflight_sem, {}, smp_sg}, *this}
, _server(
"schema_registry", // server_name
"schema_registry", // public_metric_group_name
Expand All @@ -630,8 +630,11 @@ service::service(
config::always_true(),
config::shard_local_cfg().superusers.bind(),
controller.get()} {
_inflight_config_binding.watch(
[this]() { _inflight_sem.set_capacity(_inflight_config_binding()); });
_inflight_config_binding.watch([this]() {
const size_t capacity = _inflight_config_binding();
_inflight_sem.set_capacity(capacity);
_ctx.max_inflight = capacity;
});
}

ss::future<> service::start() {
Expand Down
24 changes: 20 additions & 4 deletions src/v/pandaproxy/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

#include <charconv>
#include <exception>
#include <memory>

namespace pandaproxy {

Expand Down Expand Up @@ -104,6 +105,12 @@ struct handler_adaptor : ss::httpd::handler_base {
co_return std::move(rp.rep);
}
auto req_size = get_request_size(*rq.req);
if (req_size > _ctx.max_memory) {
set_reply_payload_too_large(*rp.rep);
rp.mime_type = _exceptional_mime_type;
set_and_measure_response(rp);
co_return std::move(rp.rep);
}
auto sem_units = co_await ss::get_units(_ctx.mem_sem, req_size);
if (_ctx.as.abort_requested()) {
set_reply_unavailable(*rp.rep);
Expand Down Expand Up @@ -150,7 +157,8 @@ server::server(
, _api20(std::move(api20))
, _has_routes(false)
, _ctx(ctx)
, _exceptional_mime_type(exceptional_mime_type) {
, _exceptional_mime_type(exceptional_mime_type)
, _probe{} {
_api20.set_api_doc(_server._routes);
_api20.register_api_file(_server._routes, header);
_api20.add_definitions_file(_server._routes, definitions);
Expand Down Expand Up @@ -195,6 +203,9 @@ ss::future<> server::start(
const std::vector<model::broker_endpoint>& advertised) {
_server._routes.register_exeption_handler(
exception_replier{ss::sstring{name(_exceptional_mime_type)}});

_probe = std::make_unique<server_probe>(_ctx, _public_metrics_group_name);

_ctx.advertised_listeners.reserve(endpoints.size());
for (auto& server_endpoint : endpoints) {
auto addr = co_await net::resolve_dns(server_endpoint.address);
Expand Down Expand Up @@ -234,13 +245,18 @@ ss::future<> server::start(
}
co_await _server.listen(addr, cred);
}

co_return;
}

ss::future<> server::stop() {
return _pending_reqs.close()
.finally([this]() { return _ctx.as.request_abort(); })
.finally([this]() mutable { return _server.stop(); });
return _pending_reqs.close().finally([this]() {
_ctx.as.request_abort();
_probe.reset(nullptr);
return _server.stop();
});
}

server::~server() noexcept = default;

} // namespace pandaproxy
10 changes: 8 additions & 2 deletions src/v/pandaproxy/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include <seastar/core/future.hh>
#include <seastar/core/gate.hh>
#include <seastar/core/shared_ptr.hh>
#include <seastar/core/sstring.hh>
#include <seastar/http/api_docs.hh>
#include <seastar/http/handlers.hh>
#include <seastar/http/httpd.hh>
Expand All @@ -40,6 +41,8 @@

namespace pandaproxy {

class server_probe;

inline ss::shard_id user_shard(const ss::sstring& name) {
auto hash = xxhash_64(name.data(), name.length());
return jump_consistent_hash(hash, ss::smp::count);
Expand Down Expand Up @@ -69,7 +72,9 @@ class server {
public:
struct context_t {
std::vector<net::unresolved_address> advertised_listeners;
size_t max_memory;
ssx::semaphore& mem_sem;
size_t max_inflight;
adjustable_semaphore& inflight_sem;
ss::abort_source as;
ss::smp_service_group smp_sg;
Expand Down Expand Up @@ -103,9 +108,9 @@ class server {
};

server() = delete;
~server() = default;
~server() noexcept;
server(const server&) = delete;
server(server&&) noexcept = default;
server(server&&) noexcept = delete;
server& operator=(const server&) = delete;
server& operator=(server&&) = delete;

Expand Down Expand Up @@ -135,6 +140,7 @@ class server {
bool _has_routes;
context_t& _ctx;
json::serialization_format _exceptional_mime_type;
std::unique_ptr<server_probe> _probe;
};

template<typename service_t>
Expand Down
63 changes: 63 additions & 0 deletions tests/rptest/tests/pandaproxy_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,15 @@
from rptest.services import tls
from rptest.utils.utf8 import CONTROL_CHARS_MAP
from typing import Optional, List, Dict, Union
from rptest.utils.mode_checks import skip_debug_mode


def create_topic_names(count):
return list(f"pandaproxy-topic-{uuid.uuid4()}" for _ in range(count))


PAYLOAD_TOO_LARGE_HTTP_ERROR_CODE = 413

HTTP_GET_BROKERS_HEADERS = {
"Accept": "application/vnd.kafka.v2+json",
"Content-Type": "application/vnd.kafka.v2+json"
Expand Down Expand Up @@ -1284,6 +1287,66 @@ def test_invalid_topics_fetch(self):
assert sc_res.json(
)["message"] == f'Invalid parameter \'topic_name\' got \'{topic_name.translate(CONTROL_CHARS_MAP)}\''

#Memory tracking is disabled in debug
@skip_debug_mode
@cluster(num_nodes=3)
def test_topic_produce_request_too_big(self):
"""
Create a topic and post a request larger than the total available memory.
"""

self.redpanda.set_resource_settings(
ResourceSettings(memory_mb=256, num_cpus=1))
self.redpanda.start()

name = create_topic_names(1)[0]

self.logger.info("Generating request larger than the available memory")
value = {
"value":
("TWVzc2FnZSBTdGFydC4gVXNpbmcgYSBsb25nIHNlbnRlbmNlIHRvIGJlIGFibGUgdG8gcmVhY2ggdGhlIGF2YWlsYWJsZSB"
"tZW1vcnkgbGltaXQgd2l0aG91dCBoYXZpbmcgdG8gdXNlIHRvbyBtYW55IHJlY29yZHMuIEV2ZXJ5IHJlY29yZCBvYmplY3"
"QgaXMgOTYgYnl0ZXMgKyBoZWFwLiBJZiBhIHNtYWxsIHZhbHVlIHN0cmluZyBpcyB1c2VkIHBlciBvYmplY3QsIHdoZW4gd"
"GhpcyBqc29uIGlzIHBhcnNlZCwgdGhlIG1lbW9yeSByZXF1aXJlbWVudHMgYXJlIG11Y2ggbW9yZSB0aGFuIHRoZSByZXF1"
"ZXN0IGl0c2VsZi4gTWVzc2FnZSBFbmQuIE1lc3NhZ2UgU3RhcnQuIFVzaW5nIGEgbG9uZyBzZW50ZW5jZSB0byBiZSBhYmx"
"lIHRvIHJlYWNoIHRoZSBhdmFpbGFibGUgbWVtb3J5IGxpbWl0IHdpdGhvdXQgaGF2aW5nIHRvIHVzZSB0b28gbWFueSByZW"
"NvcmRzLiBFdmVyeSByZWNvcmQgb2JqZWN0IGlzIDk2IGJ5dGVzICsgaGVhcC4gSWYgYSBzbWFsbCB2YWx1ZSBzdHJpbmcga"
"XMgdXNlZCBwZXIgb2JqZWN0LCB3aGVuIHRoaXMganNvbiBpcyBwYXJzZWQsIHRoZSBtZW1vcnkgcmVxdWlyZW1lbnRzIGFy"
"ZSBtdWNoIG1vcmUgdGhhbiB0aGUgcmVxdWVzdCBpdHNlbGYuIE1lc3NhZ2UgRW5kLiBNZXNzYWdlIFN0YXJ0LiBVc2luZyB"
"hIGxvbmcgc2VudGVuY2UgdG8gYmUgYWJsZSB0byByZWFjaCB0aGUgYXZhaWxhYmxlIG1lbW9yeSBsaW1pdCB3aXRob3V0IG"
"hhdmluZyB0byB1c2UgdG9vIG1hbnkgcmVjb3Jkcy4gRXZlcnkgcmVjb3JkIG9iamVjdCBpcyA5NiBieXRlcyArIGhlYXAuI"
"ElmIGEgc21hbGwgdmFsdWUgc3RyaW5nIGlzIHVzZWQgcGVyIG9iamVjdCwgd2hlbiB0aGlzIGpzb24gaXMgcGFyc2VkLCB0"
"aGUgbWVtb3J5IHJlcXVpcmVtZW50cyBhcmUgbXVjaCBtb3JlIHRoYW4gdGhlIHJlcXVlc3QgaXRzZWxmLiBNZXNzYWdlIEV"
"uZC4gTWVzc2FnZSBTdGFydC4gVXNpbmcgYSBsb25nIHNlbnRlbmNlIHRvIGJlIGFibGUgdG8gcmVhY2ggdGhlIGF2YWlsYW"
"JsZSBtZW1vcnkgbGltaXQgd2l0aG91dCBoYXZpbmcgdG8gdXNlIHRvbyBtYW55IHJlY29yZHMuIEV2ZXJ5IHJlY29yZCBvY"
"mplY3QgaXMgOTYgYnl0ZXMgKyBoZWFwLiBJZiBhIHNtYWxsIHZhbHVlIHN0cmluZyBpcyB1c2VkIHBlciBvYmplY3QsIHdo"
"ZW4gdGhpcyBqc29uIGlzIHBhcnNlZCwgdGhlIG1lbW9yeSByZXF1aXJlbWVudHMgYXJlIG11Y2ggbW9yZSB0aGFuIHRoZSB"
"yZXF1ZXN0IGl0c2VsZi4gTWVzc2FnZSBFbmQuIE1lc3NhZ2UgU3RhcnQuIFVzaW5nIGEgbG9uZyBzZW50ZW5jZSB0byBiZS"
"BhYmxlIHRvIHJlYWNoIHRoZSBhdmFpbGFibGUgbWVtb3J5IGxpbWl0IHdpdGhvdXQgaGF2aW5nIHRvIHVzZSB0b28gbWFue"
"SByZWNvcmRzLiBFdmVyeSByZWNvcmQgb2JqZWN0IGlzIDk2IGJ5dGVzICsgaGVhcC4gSWYgYSBzbWFsbCB2YWx1ZSBzdHJp"
"bmcgaXMgdXNlZCBwZXIgb2JqZWN0LCB3aGVuIHRoaXMganNvbiBpcyBwYXJzZWQsIHRoZSBtZW1vcnkgcmVxdWlyZW1lbnR"
"zIGFyZSBtdWNoIG1vcmUgdGhhbiB0aGUgcmVxdWVzdCBpdHNlbGYuIE1lc3NhZ2UgRW5kLg=="
)
}
values = [value for _ in range(50000)]
data = {"records": values}
data_json = json.dumps(data)

# With 256Mb available per core, the available memory for the kafka services
# is 90.4Mb at most. We want to ensure that this request is larger than this
memory_limit = 90.4 * 1024 * 1024
assert len(data_json) > memory_limit, \
f"Expected request larger than {memory_limit}b. Got {len(data_json)}b, instead"

self.logger.info(f"Creating test topic: {name}")
self._create_topics([name], partitions=3)

self.logger.info(f"Producing to topic: {name}")
produce_result_raw = self._produce_topic(name, data_json)
assert produce_result_raw.status_code == PAYLOAD_TOO_LARGE_HTTP_ERROR_CODE, \
f"Expected '{PAYLOAD_TOO_LARGE_HTTP_ERROR_CODE}' " \
f"but got '{produce_result_raw.status_code}' instead"


class PandaProxySASLTest(PandaProxyEndpoints):
"""
Expand Down

0 comments on commit cd11afe

Please sign in to comment.