Skip to content

Commit efe9624

Browse files
committed
pandaproxy: add max memory check for incoming requests
1 parent bedd51b commit efe9624

File tree

6 files changed

+76
-2
lines changed

6 files changed

+76
-2
lines changed

src/v/pandaproxy/reply.h

+4
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,10 @@ inline ss::http::reply& set_reply_too_many_requests(ss::http::reply& rep) {
6565
.add_header("Retry-After", "0");
6666
}
6767

68+
inline ss::http::reply& set_reply_payload_too_large(ss::http::reply& rep) {
69+
return rep.set_status(ss::http::reply::status_type::payload_too_large);
70+
}
71+
6872
inline std::unique_ptr<ss::http::reply> reply_unavailable() {
6973
auto rep = std::make_unique<ss::http::reply>(ss::http::reply{});
7074
set_reply_unavailable(*rep);

src/v/pandaproxy/rest/proxy.cc

+1-1
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ proxy::proxy(
113113
, _inflight_config_binding(config::shard_local_cfg().max_in_flight_pandaproxy_requests_per_shard.bind())
114114
, _client(client)
115115
, _client_cache(client_cache)
116-
, _ctx{{{{}, _mem_sem, _inflight_sem, {}, smp_sg}, *this},
116+
, _ctx{{{{}, max_memory, _mem_sem, _inflight_sem, {}, smp_sg}, *this},
117117
{config::always_true(), config::shard_local_cfg().superusers.bind(), controller},
118118
_config.pandaproxy_api.value()}
119119
, _server(

src/v/pandaproxy/schema_registry/service.cc

+1-1
Original file line numberDiff line numberDiff line change
@@ -614,7 +614,7 @@ service::service(
614614
config::shard_local_cfg()
615615
.max_in_flight_schema_registry_requests_per_shard.bind())
616616
, _client(client)
617-
, _ctx{{{}, _mem_sem, _inflight_sem, {}, smp_sg}, *this}
617+
, _ctx{{{}, max_memory, _mem_sem, _inflight_sem, {}, smp_sg}, *this}
618618
, _server(
619619
"schema_registry", // server_name
620620
"schema_registry", // public_metric_group_name

src/v/pandaproxy/server.cc

+6
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,12 @@ struct handler_adaptor : ss::httpd::handler_base {
104104
co_return std::move(rp.rep);
105105
}
106106
auto req_size = get_request_size(*rq.req);
107+
if (req_size > _ctx.max_memory) {
108+
set_reply_payload_too_large(*rp.rep);
109+
rp.mime_type = _exceptional_mime_type;
110+
set_and_measure_response(rp);
111+
co_return std::move(rp.rep);
112+
}
107113
auto sem_units = co_await ss::get_units(_ctx.mem_sem, req_size);
108114
if (_ctx.as.abort_requested()) {
109115
set_reply_unavailable(*rp.rep);

src/v/pandaproxy/server.h

+1
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ class server {
6969
public:
7070
struct context_t {
7171
std::vector<net::unresolved_address> advertised_listeners;
72+
size_t max_memory;
7273
ssx::semaphore& mem_sem;
7374
adjustable_semaphore& inflight_sem;
7475
ss::abort_source as;

tests/rptest/tests/pandaproxy_test.py

+63
Original file line numberDiff line numberDiff line change
@@ -35,12 +35,15 @@
3535
from rptest.services import tls
3636
from rptest.utils.utf8 import CONTROL_CHARS_MAP
3737
from typing import Optional, List, Dict, Union
38+
from rptest.utils.mode_checks import skip_debug_mode
3839

3940

4041
def create_topic_names(count):
4142
return list(f"pandaproxy-topic-{uuid.uuid4()}" for _ in range(count))
4243

4344

45+
PAYLOAD_TOO_LARGE_HTTP_ERROR_CODE = 413
46+
4447
HTTP_GET_BROKERS_HEADERS = {
4548
"Accept": "application/vnd.kafka.v2+json",
4649
"Content-Type": "application/vnd.kafka.v2+json"
@@ -1284,6 +1287,66 @@ def test_invalid_topics_fetch(self):
12841287
assert sc_res.json(
12851288
)["message"] == f'Invalid parameter \'topic_name\' got \'{topic_name.translate(CONTROL_CHARS_MAP)}\''
12861289

1290+
#Memory tracking is disabled in debug
1291+
@skip_debug_mode
1292+
@cluster(num_nodes=3)
1293+
def test_topic_produce_request_too_big(self):
1294+
"""
1295+
Create a topic and post a request larger than the total available memory.
1296+
"""
1297+
1298+
self.redpanda.set_resource_settings(
1299+
ResourceSettings(memory_mb=256, num_cpus=1))
1300+
self.redpanda.start()
1301+
1302+
name = create_topic_names(1)[0]
1303+
1304+
self.logger.info("Generating request larger than the available memory")
1305+
value = {
1306+
"value":
1307+
("TWVzc2FnZSBTdGFydC4gVXNpbmcgYSBsb25nIHNlbnRlbmNlIHRvIGJlIGFibGUgdG8gcmVhY2ggdGhlIGF2YWlsYWJsZSB"
1308+
"tZW1vcnkgbGltaXQgd2l0aG91dCBoYXZpbmcgdG8gdXNlIHRvbyBtYW55IHJlY29yZHMuIEV2ZXJ5IHJlY29yZCBvYmplY3"
1309+
"QgaXMgOTYgYnl0ZXMgKyBoZWFwLiBJZiBhIHNtYWxsIHZhbHVlIHN0cmluZyBpcyB1c2VkIHBlciBvYmplY3QsIHdoZW4gd"
1310+
"GhpcyBqc29uIGlzIHBhcnNlZCwgdGhlIG1lbW9yeSByZXF1aXJlbWVudHMgYXJlIG11Y2ggbW9yZSB0aGFuIHRoZSByZXF1"
1311+
"ZXN0IGl0c2VsZi4gTWVzc2FnZSBFbmQuIE1lc3NhZ2UgU3RhcnQuIFVzaW5nIGEgbG9uZyBzZW50ZW5jZSB0byBiZSBhYmx"
1312+
"lIHRvIHJlYWNoIHRoZSBhdmFpbGFibGUgbWVtb3J5IGxpbWl0IHdpdGhvdXQgaGF2aW5nIHRvIHVzZSB0b28gbWFueSByZW"
1313+
"NvcmRzLiBFdmVyeSByZWNvcmQgb2JqZWN0IGlzIDk2IGJ5dGVzICsgaGVhcC4gSWYgYSBzbWFsbCB2YWx1ZSBzdHJpbmcga"
1314+
"XMgdXNlZCBwZXIgb2JqZWN0LCB3aGVuIHRoaXMganNvbiBpcyBwYXJzZWQsIHRoZSBtZW1vcnkgcmVxdWlyZW1lbnRzIGFy"
1315+
"ZSBtdWNoIG1vcmUgdGhhbiB0aGUgcmVxdWVzdCBpdHNlbGYuIE1lc3NhZ2UgRW5kLiBNZXNzYWdlIFN0YXJ0LiBVc2luZyB"
1316+
"hIGxvbmcgc2VudGVuY2UgdG8gYmUgYWJsZSB0byByZWFjaCB0aGUgYXZhaWxhYmxlIG1lbW9yeSBsaW1pdCB3aXRob3V0IG"
1317+
"hhdmluZyB0byB1c2UgdG9vIG1hbnkgcmVjb3Jkcy4gRXZlcnkgcmVjb3JkIG9iamVjdCBpcyA5NiBieXRlcyArIGhlYXAuI"
1318+
"ElmIGEgc21hbGwgdmFsdWUgc3RyaW5nIGlzIHVzZWQgcGVyIG9iamVjdCwgd2hlbiB0aGlzIGpzb24gaXMgcGFyc2VkLCB0"
1319+
"aGUgbWVtb3J5IHJlcXVpcmVtZW50cyBhcmUgbXVjaCBtb3JlIHRoYW4gdGhlIHJlcXVlc3QgaXRzZWxmLiBNZXNzYWdlIEV"
1320+
"uZC4gTWVzc2FnZSBTdGFydC4gVXNpbmcgYSBsb25nIHNlbnRlbmNlIHRvIGJlIGFibGUgdG8gcmVhY2ggdGhlIGF2YWlsYW"
1321+
"JsZSBtZW1vcnkgbGltaXQgd2l0aG91dCBoYXZpbmcgdG8gdXNlIHRvbyBtYW55IHJlY29yZHMuIEV2ZXJ5IHJlY29yZCBvY"
1322+
"mplY3QgaXMgOTYgYnl0ZXMgKyBoZWFwLiBJZiBhIHNtYWxsIHZhbHVlIHN0cmluZyBpcyB1c2VkIHBlciBvYmplY3QsIHdo"
1323+
"ZW4gdGhpcyBqc29uIGlzIHBhcnNlZCwgdGhlIG1lbW9yeSByZXF1aXJlbWVudHMgYXJlIG11Y2ggbW9yZSB0aGFuIHRoZSB"
1324+
"yZXF1ZXN0IGl0c2VsZi4gTWVzc2FnZSBFbmQuIE1lc3NhZ2UgU3RhcnQuIFVzaW5nIGEgbG9uZyBzZW50ZW5jZSB0byBiZS"
1325+
"BhYmxlIHRvIHJlYWNoIHRoZSBhdmFpbGFibGUgbWVtb3J5IGxpbWl0IHdpdGhvdXQgaGF2aW5nIHRvIHVzZSB0b28gbWFue"
1326+
"SByZWNvcmRzLiBFdmVyeSByZWNvcmQgb2JqZWN0IGlzIDk2IGJ5dGVzICsgaGVhcC4gSWYgYSBzbWFsbCB2YWx1ZSBzdHJp"
1327+
"bmcgaXMgdXNlZCBwZXIgb2JqZWN0LCB3aGVuIHRoaXMganNvbiBpcyBwYXJzZWQsIHRoZSBtZW1vcnkgcmVxdWlyZW1lbnR"
1328+
"zIGFyZSBtdWNoIG1vcmUgdGhhbiB0aGUgcmVxdWVzdCBpdHNlbGYuIE1lc3NhZ2UgRW5kLg=="
1329+
)
1330+
}
1331+
values = [value for _ in range(50000)]
1332+
data = {"records": values}
1333+
data_json = json.dumps(data)
1334+
1335+
# With 256Mb available per core, the available memory for the kafka services
1336+
# is 90.4Mb at most. We want to ensure that this request is larger than this
1337+
memory_limit = 90.4 * 1024 * 1024
1338+
assert len(data_json) > memory_limit, \
1339+
f"Expected request larger than {memory_limit}b. Got {len(data_json)}b, instead"
1340+
1341+
self.logger.info(f"Creating test topic: {name}")
1342+
self._create_topics([name], partitions=3)
1343+
1344+
self.logger.info(f"Producing to topic: {name}")
1345+
produce_result_raw = self._produce_topic(name, data_json)
1346+
assert produce_result_raw.status_code == PAYLOAD_TOO_LARGE_HTTP_ERROR_CODE, \
1347+
f"Expected '{PAYLOAD_TOO_LARGE_HTTP_ERROR_CODE}' " \
1348+
f"but got '{produce_result_raw.status_code}' instead"
1349+
12871350

12881351
class PandaProxySASLTest(PandaProxyEndpoints):
12891352
"""

0 commit comments

Comments
 (0)