Skip to content

Commit 829205a

Browse files
authored
Merge pull request #23519 from rockwotj/parquet-metadata
2 parents 4c2c24c + 8a7e3b9 commit 829205a

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

41 files changed

+2672
-51
lines changed

.bazelrc

+3
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,9 @@ build:system-clang-17 --action_env=CXX=clang++-17 --host_action_env=CXX=clang++-
2121
build:system-clang-18 --config=system-clang
2222
build:system-clang-18 --action_env=CC=clang-18 --host_action_env=CC=clang-18
2323
build:system-clang-18 --action_env=CXX=clang++-18 --host_action_env=CXX=clang++-18
24+
build:system-clang-19 --config=system-clang
25+
build:system-clang-19 --action_env=CC=clang-19 --host_action_env=CC=clang-19
26+
build:system-clang-19 --action_env=CXX=clang++-19 --host_action_env=CXX=clang++-19
2427

2528
# https://github.com/bazelbuild/rules_foreign_cc/issues/1065
2629
# https://github.com/bazelbuild/rules_foreign_cc/issues/1186#issuecomment-2053550487

MODULE.bazel

+3
Original file line numberDiff line numberDiff line change
@@ -189,13 +189,15 @@ use_repo(
189189
"com_github_hamba_avro_v2",
190190
"com_github_hashicorp_go_multierror",
191191
"com_github_kballard_go_shellquote",
192+
"com_github_kr_pretty",
192193
"com_github_lestrrat_go_jwx",
193194
"com_github_linkedin_goavro_v2",
194195
"com_github_lorenzosaino_go_sysctl",
195196
"com_github_mattn_go_isatty",
196197
"com_github_moby_term",
197198
"com_github_opencontainers_go_digest",
198199
"com_github_opencontainers_image_spec",
200+
"com_github_parquet_go_parquet_go",
199201
"com_github_pkg_browser",
200202
"com_github_pkg_errors",
201203
"com_github_prometheus_client_model",
@@ -206,6 +208,7 @@ use_repo(
206208
"com_github_safchain_ethtool",
207209
"com_github_santhosh_tekuri_jsonschema_v6",
208210
"com_github_schollz_progressbar_v3",
211+
"com_github_segmentio_encoding",
209212
"com_github_spf13_afero",
210213
"com_github_spf13_cobra",
211214
"com_github_spf13_pflag",

bazel/thirdparty/go.work

+1-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ go 1.23.0
44
// Everything we build with Bazel goes here.
55
use (
66
src/go/rpk
7-
src/go/kreq-gen
7+
src/v/test_utils/go
88
src/transform-sdk/go/transform
99
src/transform-sdk/go/transform/internal/testdata
1010
src/transform-sdk/tests

src/CMakeLists.txt

-1
Original file line numberDiff line numberDiff line change
@@ -7,5 +7,4 @@ find_package(Boost REQUIRED
77
find_package(absl REQUIRED)
88
find_program(GO_PROGRAM go REQUIRED)
99
add_subdirectory(v)
10-
add_subdirectory(go/kreq-gen)
1110
add_subdirectory(transform-sdk/go/transform/internal/testdata)

src/go/kreq-gen/go.mod

-9
This file was deleted.

src/go/kreq-gen/go.sum

-4
This file was deleted.

src/v/bytes/bytes.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ class bytes {
9191
bool empty() const noexcept { return data_.empty(); }
9292

9393
void resize(size_type size) { data_.resize(size); }
94-
94+
void reserve(size_type size) { data_.reserve(size); }
9595
void push_back(value_type v) { data_.push_back(v); }
9696

9797
friend bool operator==(const bytes&, const bytes&) = default;

src/v/container/fragmented_vector.h

+31-27
Original file line numberDiff line numberDiff line change
@@ -52,21 +52,24 @@ class fragmented_vector {
5252

5353
// calculate the maximum number of elements per fragment while
5454
// keeping the element count a power of two
55-
static constexpr size_t calc_elems_per_frag(size_t esize) {
56-
size_t max = fragment_size_bytes / esize;
55+
static consteval size_t calc_elems_per_frag() {
56+
size_t max = fragment_size_bytes / sizeof(T);
5757
if constexpr (is_chunked_vector) {
58-
max = max_allocation_size / esize;
58+
max = max_allocation_size / sizeof(T);
5959
}
60-
assert(max > 0);
6160
return std::bit_floor(max);
6261
}
6362

64-
static constexpr size_t elems_per_frag = calc_elems_per_frag(sizeof(T));
65-
66-
static_assert(
67-
(elems_per_frag & (elems_per_frag - 1)) == 0,
68-
"element count per fragment must be a power of 2");
69-
static_assert(elems_per_frag >= 1);
63+
/**
64+
* The maximum number of bytes per fragment as specified in
65+
* as part of the type. Note that for most types, the true
66+
* number of bytes in a full fragment may as low as half
67+
* of this amount (+1) since the number of elements is restricted
68+
* to a power of two.
69+
*/
70+
static consteval size_t calc_max_frag_bytes() {
71+
return calc_elems_per_frag() * sizeof(T);
72+
}
7073

7174
public:
7275
using this_type = fragmented_vector<T, fragment_size_bytes>;
@@ -81,19 +84,6 @@ class fragmented_vector {
8184
using pointer = T*;
8285
using const_pointer = const T*;
8386

84-
/**
85-
* The maximum number of bytes per fragment as specified in
86-
* as part of the type. Note that for most types, the true
87-
* number of bytes in a full fragment may as low as half
88-
* of this amount (+1) since the number of elements is restricted
89-
* to a power of two.
90-
*/
91-
static constexpr size_t max_frag_bytes = elems_per_frag * sizeof(T);
92-
93-
static_assert(
94-
max_frag_bytes <= max_allocation_size,
95-
"max size of a fragment must be <= 128KiB");
96-
9787
fragmented_vector() noexcept = default;
9888
explicit fragmented_vector(allocator_type alloc)
9989
: _frags(alloc) {}
@@ -180,7 +170,7 @@ class fragmented_vector {
180170
--_size;
181171
if (_frags.back().empty()) {
182172
_frags.pop_back();
183-
_capacity -= std::min(elems_per_frag, _capacity);
173+
_capacity -= std::min(calc_elems_per_frag(), _capacity);
184174
}
185175
update_generation();
186176
}
@@ -203,7 +193,7 @@ class fragmented_vector {
203193
while (n >= _frags.back().size()) {
204194
n -= _frags.back().size();
205195
_frags.pop_back();
206-
_capacity -= elems_per_frag;
196+
_capacity -= calc_elems_per_frag();
207197
}
208198

209199
for (size_t i = 0; i < n; ++i) {
@@ -213,18 +203,22 @@ class fragmented_vector {
213203
}
214204

215205
const_reference at(size_t index) const {
206+
static constexpr size_t elems_per_frag = calc_elems_per_frag();
216207
return _frags.at(index / elems_per_frag).at(index % elems_per_frag);
217208
}
218209

219210
reference at(size_t index) {
211+
static constexpr size_t elems_per_frag = calc_elems_per_frag();
220212
return _frags.at(index / elems_per_frag).at(index % elems_per_frag);
221213
}
222214

223215
const_reference operator[](size_t index) const {
216+
static constexpr size_t elems_per_frag = calc_elems_per_frag();
224217
return _frags[index / elems_per_frag][index % elems_per_frag];
225218
}
226219

227220
reference operator[](size_t index) {
221+
static constexpr size_t elems_per_frag = calc_elems_per_frag();
228222
return _frags[index / elems_per_frag][index % elems_per_frag];
229223
}
230224

@@ -275,6 +269,7 @@ class fragmented_vector {
275269
// For fixed size fragments we noop, as we already reserve the full size
276270
// of vector
277271
if constexpr (is_chunked_vector) {
272+
static constexpr size_t elems_per_frag = calc_elems_per_frag();
278273
if (new_cap > _capacity) {
279274
if (_frags.empty()) {
280275
auto& frag = _frags.emplace_back();
@@ -310,7 +305,11 @@ class fragmented_vector {
310305
/**
311306
* Returns the (maximum) number of elements in each fragment of this vector.
312307
*/
313-
static constexpr size_t elements_per_fragment() { return elems_per_frag; }
308+
static constexpr size_t elements_per_fragment() {
309+
return calc_elems_per_frag();
310+
}
311+
312+
static constexpr size_t max_frag_bytes() { return calc_max_frag_bytes(); }
314313

315314
/**
316315
* Remove all elements from the vector.
@@ -499,6 +498,10 @@ class fragmented_vector {
499498
}
500499

501500
void add_capacity() {
501+
static constexpr size_t elems_per_frag = calc_elems_per_frag();
502+
static_assert(
503+
calc_max_frag_bytes() <= max_allocation_size,
504+
"max size of a fragment must be <= 128KiB");
502505
if constexpr (is_chunked_vector) {
503506
if (
504507
_frags.size() == 1 && _frags.back().capacity() < elems_per_frag) {
@@ -510,7 +513,8 @@ class fragmented_vector {
510513
} else if (_frags.empty()) {
511514
// At least one element or 32 bytes worth of elements for small
512515
// items.
513-
constexpr size_t initial_cap = std::max(1UL, 32UL / sizeof(T));
516+
static constexpr size_t initial_cap = std::max(
517+
1UL, 32UL / sizeof(T));
514518
_capacity = initial_cap;
515519
_frags.emplace_back(_frags.get_allocator()).reserve(_capacity);
516520
return;

src/v/container/tests/fragmented_vector_test.cc

+3-3
Original file line numberDiff line numberDiff line change
@@ -60,15 +60,15 @@ class fragmented_vector_validator {
6060
calc_cap += f.capacity();
6161

6262
if (i + 1 < v._frags.size()) {
63-
if (f.size() < v.elems_per_frag) {
63+
if (f.size() < v.elements_per_fragment()) {
6464
return AssertionFailure() << fmt::format(
6565
"fragment {} is undersized ({} < {})",
6666
i,
6767
f.size(),
68-
v.elems_per_frag);
68+
v.elements_per_fragment());
6969
}
7070
}
71-
if (f.capacity() > std::decay_t<decltype(v)>::max_frag_bytes) {
71+
if (f.capacity() > std::decay_t<decltype(v)>::max_frag_bytes()) {
7272
return AssertionFailure() << fmt::format(
7373
"fragment {} capacity over max_frag_bytes ({})",
7474
i,

src/v/kafka/protocol/tests/BUILD

+2-2
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,11 @@ redpanda_cc_btest(
88
"protocol_test.cc",
99
],
1010
data = [
11-
"//src/go/kreq-gen:kafka_request_generator",
11+
"//src/v/test_utils/go/kreq-gen:kafka_request_generator",
1212
],
1313
# could also use runfiles here, which would be easier once cmake is gone
1414
# because then we could use bazel_tools runfiles library
15-
env = {"GENERATOR_BIN": "$(rootpath //src/go/kreq-gen:kafka_request_generator)"},
15+
env = {"GENERATOR_BIN": "$(rootpath //src/v/test_utils/go/kreq-gen:kafka_request_generator)"},
1616
deps = [
1717
"//src/v/kafka/protocol",
1818
"//src/v/test_utils:seastar_boost",

src/v/kafka/protocol/tests/CMakeLists.txt

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
set(KAFKA_REQUEST_GENERATOR "${CMAKE_BINARY_DIR}/src/go/kreq-gen/kafka-request-generator")
1+
set(KAFKA_REQUEST_GENERATOR "${CMAKE_BINARY_DIR}/src/v/test_utils/go/kreq-gen/kafka-request-generator")
22

33
rp_test(
44
UNIT_TEST

src/v/kafka/server/handlers/metadata.cc

+2-1
Original file line numberDiff line numberDiff line change
@@ -619,6 +619,7 @@ metadata_memory_estimator(size_t request_size, connection_context& conn_ctx) {
619619
// generally ~8000 bytes). Finally, we add max_frag_bytes to account for the
620620
// worse-cast overshoot during vector re-allocation.
621621
return default_memory_estimate(request_size) + size_estimate
622-
+ large_fragment_vector<metadata_response_partition>::max_frag_bytes;
622+
+ large_fragment_vector<
623+
metadata_response_partition>::max_frag_bytes();
623624
}
624625
} // namespace kafka

src/v/serde/parquet/BUILD

+45
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
load("//bazel:build.bzl", "redpanda_cc_library")
2+
3+
package(
4+
default_visibility = ["//src/v/serde/parquet:__subpackages__"],
5+
)
6+
7+
redpanda_cc_library(
8+
name = "schema",
9+
srcs = [
10+
"flattened_schema.cc",
11+
],
12+
hdrs = [
13+
"flattened_schema.h",
14+
"schema.h",
15+
],
16+
include_prefix = "serde/parquet",
17+
deps = [
18+
"//src/v/base",
19+
"//src/v/container:fragmented_vector",
20+
"//src/v/utils:uuid",
21+
"@seastar",
22+
],
23+
)
24+
25+
redpanda_cc_library(
26+
name = "metadata",
27+
srcs = [
28+
"metadata.cc",
29+
],
30+
hdrs = [
31+
"metadata.h",
32+
],
33+
include_prefix = "serde/parquet",
34+
deps = [
35+
":schema",
36+
"//src/v/base",
37+
"//src/v/bytes:iobuf",
38+
"//src/v/container:fragmented_vector",
39+
"//src/v/serde/thrift:compact",
40+
"//src/v/utils:uuid",
41+
"//src/v/utils:vint",
42+
"@abseil-cpp//absl/container:flat_hash_map",
43+
"@seastar",
44+
],
45+
)

src/v/serde/parquet/CMakeLists.txt

+11
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
2+
v_cc_library(
3+
NAME serde_parquet
4+
SRCS
5+
metadata.cc
6+
DEPS
7+
Seastar::seastar
8+
v::bytes
9+
v::container
10+
v::utils
11+
)

src/v/serde/parquet/README.md

+20
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
# Parquet Library
2+
3+
Here lies a library to be able to write parquet files for Redpanda's Iceberg integration.
4+
Due to Redpanda's usage of Seastar off the shelf preexisting parquet libraries do not meet
5+
our strict requirements, imposed by our userland task scheduler and virtual memory avoiding
6+
allocator.
7+
8+
9+
### Metadata
10+
11+
Parquet metadata is serialized using [Apache Thrift's compact wire format][thrift-compact-format].
12+
13+
We use metadata that is the logical representation of what our application needs, then we write out
14+
the wire format with all the deprecated and legacy types to be compatible with legacy query systems.
15+
16+
The physical format of serialized parquet metadata is documented [here][parquet-thrift].
17+
18+
19+
[parquet-thrift]: https://github.com/apache/parquet-format/blob/master/src/main/thrift/parquet.thrift
20+
[thrift-compact-format]: https://github.com/apache/thrift/blob/master/doc/specs/thrift-compact-protocol.md
+30
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/*
2+
* Copyright 2024 Redpanda Data, Inc.
3+
*
4+
* Use of this software is governed by the Business Source License
5+
* included in the file licenses/BSL.md
6+
*
7+
* As of the Change Date specified in that file, in accordance with
8+
* the Business Source License, use of this software will be governed
9+
* by the Apache License, Version 2.0
10+
*/
11+
12+
#include "serde/parquet/flattened_schema.h"
13+
14+
namespace serde::parquet {
15+
16+
chunked_vector<flattened_schema> flatten(const schema_element& root) {
17+
chunked_vector<flattened_schema> flattened;
18+
root.for_each([&flattened](const schema_element& elem) {
19+
flattened.emplace_back(
20+
elem.type,
21+
elem.repetition_type,
22+
elem.name,
23+
elem.children.size(),
24+
elem.field_id,
25+
elem.logical_type);
26+
});
27+
return flattened;
28+
}
29+
30+
} // namespace serde::parquet

0 commit comments

Comments
 (0)