diff --git a/src/v/bytes/details/io_byte_iterator.h b/src/v/bytes/details/io_byte_iterator.h index 9db6749d1ce91..aa3cb57caffb9 100644 --- a/src/v/bytes/details/io_byte_iterator.h +++ b/src/v/bytes/details/io_byte_iterator.h @@ -14,12 +14,18 @@ #include "bytes/details/io_fragment.h" #include +#include // See io_iterator_consumer for iterator validity notes. namespace details { -class io_byte_iterator { + +template +class io_byte_iterator_base { public: - using io_const_iterator = io_fragment_list::const_iterator; + using io_const_iterator = std::conditional_t< + Forward, + io_fragment_list::const_iterator, + io_fragment_list::const_reverse_iterator>; // iterator_traits using difference_type = void; @@ -28,24 +34,35 @@ class io_byte_iterator { using reference = const char&; using iterator_category = std::forward_iterator_tag; - io_byte_iterator( + io_byte_iterator_base( const io_const_iterator& begin, const io_const_iterator& end) noexcept : _frag(begin) , _frag_end(end) { if (_frag != _frag_end) { - _frag_index = _frag->get(); - // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic) - _frag_index_end = _frag->get() + _frag->size(); - // handle an empty fragment - if (_frag_index == _frag_index_end) { - next_fragment(); + if constexpr (Forward) { + _frag_index = _frag->get(); + // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic) + _frag_index_end = _frag->get() + _frag->size(); + // handle an empty fragment + if (_frag_index == _frag_index_end) { + next_fragment(); + } + } else { + auto frag_size = _frag->size(); + if (frag_size == 0) { + next_fragment(); + return; + } + // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic) + _frag_index = _frag->get() + (_frag->size() - 1); + _frag_index_end = _frag->get(); } } else { _frag_index = nullptr; _frag_index_end = nullptr; } } - io_byte_iterator( + io_byte_iterator_base( const io_const_iterator& begin, const io_const_iterator& end, const char* frag_index, @@ -59,20 +76,27 @@ class io_byte_iterator { reference operator*() const noexcept { return *_frag_index; } pointer operator->() const noexcept { return _frag_index; } /// true if pointing to the byte-value (not necessarily the same address) - bool operator==(const io_byte_iterator& o) const noexcept { + bool operator==(const io_byte_iterator_base& o) const noexcept { return _frag_index == o._frag_index; } - bool operator!=(const io_byte_iterator& o) const noexcept { + bool operator!=(const io_byte_iterator_base& o) const noexcept { return !(*this == o); } - io_byte_iterator& operator++() { - // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic) - if (++_frag_index == _frag_index_end) { - next_fragment(); + io_byte_iterator_base& operator++() { + if constexpr (Forward) { + // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic) + if (++_frag_index == _frag_index_end) { + next_fragment(); + } + } else { + // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic) + if (_frag_index-- == _frag_index_end) { + next_fragment(); + } } return *this; } - io_byte_iterator operator++(int) { + io_byte_iterator_base operator++(int) { auto tmp = *this; ++*this; return tmp; @@ -83,12 +107,22 @@ class io_byte_iterator { while (true) { ++_frag; if (_frag != _frag_end) { - _frag_index = _frag->get(); - // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic) - _frag_index_end = _frag->get() + _frag->size(); - // handle an empty fragment - if (_frag_index == _frag_index_end) { - continue; + if constexpr (Forward) { + _frag_index = _frag->get(); + // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic) + _frag_index_end = _frag->get() + _frag->size(); + // handle an empty fragment + if (_frag_index == _frag_index_end) { + continue; + } + } else { + auto frag_size = _frag->size(); + if (frag_size == 0) { + continue; + } + // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic) + _frag_index = _frag->get() + (frag_size - 1); + _frag_index_end = _frag->get(); } return; } @@ -104,4 +138,7 @@ class io_byte_iterator { const char* _frag_index_end = nullptr; }; +using io_byte_iterator = io_byte_iterator_base; +using reverse_io_byte_iterator = io_byte_iterator_base; + } // namespace details diff --git a/src/v/bytes/details/io_fragment.h b/src/v/bytes/details/io_fragment.h index 6532b7a675500..70898d0e1baaf 100644 --- a/src/v/bytes/details/io_fragment.h +++ b/src/v/bytes/details/io_fragment.h @@ -231,6 +231,7 @@ class io_fragment_list { using iterator = iter; using reverse_iterator = iter; using const_iterator = iter; + using const_reverse_iterator = iter; bool empty() const { check_consistency(); @@ -338,11 +339,14 @@ class io_fragment_list { reverse_iterator rend() { return {this, nullptr}; } const_iterator cbegin() const { return {this, _head}; } const_iterator cend() const { return {this, nullptr}; } + const_reverse_iterator crbegin() const { return {this, _tail}; } + const_reverse_iterator crend() const { return {this, nullptr}; } private: friend class iter; friend class iter; friend class iter; + friend class iter; inline void update_generation() { #ifndef NDEBUG ++_generation; diff --git a/src/v/bytes/details/io_placeholder.h b/src/v/bytes/details/io_placeholder.h index aa4390e287b38..336b7ec1d5478 100644 --- a/src/v/bytes/details/io_placeholder.h +++ b/src/v/bytes/details/io_placeholder.h @@ -27,7 +27,10 @@ class io_placeholder { , _byte_index(initial_index) , _remaining_size(max_size_to_write) {} - [[gnu::always_inline]] void write(const char* src, size_t len) { + template + [[gnu::always_inline]] void write(const T* src, size_t len) + requires(sizeof(T) == 1) + { details::check_out_of_range(len, _remaining_size); std::copy_n(src, len, mutable_index()); _remaining_size -= len; diff --git a/src/v/bytes/iobuf.cc b/src/v/bytes/iobuf.cc index e96313f90b2c3..afe2d2134cd62 100644 --- a/src/v/bytes/iobuf.cc +++ b/src/v/bytes/iobuf.cc @@ -197,7 +197,7 @@ std::strong_ordering iobuf::operator<=>(const iobuf& o) const { rhs.remove_prefix(n); if (rhs.empty()) { rhs = other_next_view(); - if (o_it == o.cend()) { + if (rhs.empty()) { break; } } diff --git a/src/v/bytes/iobuf.h b/src/v/bytes/iobuf.h index 2544585ba212b..c587f460c4c68 100644 --- a/src/v/bytes/iobuf.h +++ b/src/v/bytes/iobuf.h @@ -71,8 +71,10 @@ class iobuf { using iterator = typename container::iterator; using reverse_iterator = typename container::reverse_iterator; using const_iterator = typename container::const_iterator; + using const_reverse_iterator = typename container::const_reverse_iterator; using iterator_consumer = details::io_iterator_consumer; using byte_iterator = details::io_byte_iterator; + using reverse_byte_iterator = details::reverse_io_byte_iterator; using placeholder = details::io_placeholder; static iobuf from(std::string_view view) { @@ -268,6 +270,8 @@ class iobuf { const_iterator end() const; const_iterator cbegin() const; const_iterator cend() const; + const_reverse_iterator crbegin() const; + const_reverse_iterator crend() const; std::string hexdump(size_t) const; @@ -307,6 +311,12 @@ inline iobuf::const_iterator iobuf::begin() const { return _frags.cbegin(); } inline iobuf::const_iterator iobuf::end() const { return _frags.cend(); } inline iobuf::const_iterator iobuf::cbegin() const { return _frags.cbegin(); } inline iobuf::const_iterator iobuf::cend() const { return _frags.cend(); } +inline iobuf::const_reverse_iterator iobuf::crbegin() const { + return _frags.crbegin(); +} +inline iobuf::const_reverse_iterator iobuf::crend() const { + return _frags.crend(); +} inline bool iobuf::operator!=(const iobuf& o) const { return !(*this == o); } inline bool iobuf::operator!=(std::string_view o) const { diff --git a/src/v/bytes/tests/iobuf_tests.cc b/src/v/bytes/tests/iobuf_tests.cc index 27e700abb55c9..09a2db46d7a1a 100644 --- a/src/v/bytes/tests/iobuf_tests.cc +++ b/src/v/bytes/tests/iobuf_tests.cc @@ -99,6 +99,10 @@ SEASTAR_THREAD_TEST_CASE(test_cmp_str_view) { std::strong_ordering::equal, (multiple_frags.share(0, 3) <=> "cat")); BOOST_CHECK_LT(iobuf::from(""), iobuf::from("cat")); BOOST_CHECK_GT(iobuf::from("cat"), iobuf::from("")); + auto multi_frags = iobuf::from("ab"); + multi_frags.append_fragments(iobuf::from("d")); + BOOST_CHECK_EQUAL( + true, (iobuf::from("abc") <=> multi_frags) == std::strong_ordering::less); } SEASTAR_THREAD_TEST_CASE(test_appended_data_is_retained) { @@ -303,6 +307,52 @@ SEASTAR_THREAD_TEST_CASE(traver_all_bytes_one_at_a_time) { } BOOST_CHECK_EQUAL(str, expected.data()); } +SEASTAR_THREAD_TEST_CASE(iterate_bytes_backward_no_frags) { + auto b = iobuf(); + auto begin = iobuf::reverse_byte_iterator(b.crbegin(), b.crend()); + auto end = iobuf::reverse_byte_iterator(b.crend(), b.crend()); + size_t bytes_read = 0; + for (; begin != end; begin++) { + bytes_read++; + } + BOOST_CHECK_EQUAL(bytes_read, 0); +} +SEASTAR_THREAD_TEST_CASE(iterate_bytes_backward_empty_frags) { + auto b = iobuf(); + b.append(std::make_unique(0)); + b.append(std::make_unique(1)); + auto begin = iobuf::reverse_byte_iterator(b.crbegin(), b.crend()); + auto end = iobuf::reverse_byte_iterator(b.crend(), b.crend()); + size_t bytes_read = 0; + for (; begin != end; begin++) { + bytes_read++; + } + BOOST_CHECK_EQUAL(bytes_read, 0); +} +SEASTAR_THREAD_TEST_CASE(iterate_bytes_backward_mult_frags) { + auto b = iobuf(); + std::string str1 = "hello"; + std::string str2 = "world"; + + b.append( + std::make_unique( + ss::temporary_buffer::copy_of(str1))); + b.append(std::make_unique(1)); + b.append( + std::make_unique( + ss::temporary_buffer::copy_of(str2))); + + auto begin = iobuf::reverse_byte_iterator(b.crbegin(), b.crend()); + auto end = iobuf::reverse_byte_iterator(b.crend(), b.crend()); + + iobuf rev; + for (; begin != end; begin++) { + rev.append(&*begin, 1); + } + BOOST_CHECK_EQUAL( + rev.linearize_to_string(), + std::ranges::to(std::views::reverse(str1 + str2))); +} SEASTAR_THREAD_TEST_CASE(not_equal_by_size) { auto a = iobuf(); auto b = iobuf(); diff --git a/src/v/serde/parquet/BUILD b/src/v/serde/parquet/BUILD index f1bc79a8c194e..bf3bd04889a91 100644 --- a/src/v/serde/parquet/BUILD +++ b/src/v/serde/parquet/BUILD @@ -120,6 +120,7 @@ redpanda_cc_library( "//src/v/compression", "//src/v/container:chunked_vector", "//src/v/hashing:crc32", + "//src/v/strings:utf8", "@abseil-cpp//absl/numeric:int128", "@seastar", ], diff --git a/src/v/serde/parquet/column_stats_collector.cc b/src/v/serde/parquet/column_stats_collector.cc index 1e7b02ac37424..27a33c99714e2 100644 --- a/src/v/serde/parquet/column_stats_collector.cc +++ b/src/v/serde/parquet/column_stats_collector.cc @@ -121,6 +121,131 @@ fixed_byte_array_value copy(fixed_byte_array_value& v) { return {v.val.share(0, v.val.size_bytes())}; } +std::optional binary_bound_truncator::get_min_bound(iobuf& b) const { + if (b.size_bytes() <= _max_bound_size) { + return {}; + } + return truncate_to_max_bound_size(b, is_valid_utf8(b)); +} + +std::optional binary_bound_truncator::get_max_bound(iobuf& b) const { + if (b.size_bytes() <= _max_bound_size) { + return {}; + } + + bool is_utf8 = is_valid_utf8(b); + return truncate_to_max_bound_size(b, is_utf8).and_then([&](auto trun) { + return try_increment(trun, is_utf8); + }); +} + +bool binary_bound_truncator::is_valid_utf8(const iobuf& b) const { + auto cbegin = iobuf::byte_iterator(b.cbegin(), b.cend()); + auto cend = iobuf::byte_iterator(b.cend(), b.cend()); + // Only validate up to the max bound size. This makes the runtime constant + // and an incremented utf8 sequence will still be greater than or equal to + // an incremented plain byte sequence. + return utf::is_valid_utf8(cbegin, cend, _max_bound_size); +} + +std::optional +binary_bound_truncator::try_increment(iobuf& b, bool is_utf8) const { + if (is_utf8) { + return try_increment_utf8(b); + } else { + return try_increment_bytes(b); + } +} + +std::optional +binary_bound_truncator::try_increment_utf8(iobuf& b) const { + auto crbegin = iobuf::reverse_byte_iterator(b.crbegin(), b.crend()); + auto crend = iobuf::reverse_byte_iterator(b.crend(), b.crend()); + auto utf_iter = utf::utf32_reverse_iterator(crbegin, crend); + auto utf_end = utf::utf32_reverse_iterator(crend, crend); + + size_t bytes_read = 0; + size_t code_points_read = 0; + std::optional inc_char; + for (; utf_iter != utf_end; ++utf_iter) { + ++code_points_read; + bytes_read += utf_iter->utf8_encoding_length(); + // TODO: `try_increment` only increments if the resulting utf8 encoding + // is the same length as the original. This can likely be relaxed if its + // acceptable that the truncated value exceeds `_max_bound_size` by a + // few bytes. + if (auto inc_c = utf_iter->try_increment()) { + inc_char = inc_c; + break; + } + } + + if (!inc_char) { + return {}; + } + + auto trun = b.share(0, b.size_bytes() - bytes_read); + auto ph = trun.reserve( + inc_char->utf8_encoding_length() + (code_points_read - 1)); + ph.write( + inc_char->utf8_encoding().data(), inc_char->utf8_encoding_length()); + const uint8_t z = 0; + for (size_t i = 0; i < (code_points_read - 1); ++i) { + ph.write(&z, 1); + } + return trun; +} + +std::optional +binary_bound_truncator::try_increment_bytes(iobuf& b) const { + size_t bytes_read = 0; + std::optional char_found; + auto crbegin = iobuf::reverse_byte_iterator(b.crbegin(), b.crend()); + auto crend = iobuf::reverse_byte_iterator(b.crend(), b.crend()); + for (; crbegin != crend; crbegin++) { + bytes_read++; + uint8_t c = *crbegin; + if (c != std::numeric_limits::max()) { + char_found = c + 1; + break; + } + } + + if (!char_found) { + return {}; + } + + auto trun = b.share(0, b.size_bytes() - bytes_read); + auto ph = trun.reserve(bytes_read); + ph.write(&char_found.value(), 1); + // Note that it'd be more efficient to just re-use the tail-end of `b`. + // However, zero-ing out the tail end results in a closer bound. + const uint8_t z = 0; + for (size_t i = 0; i < (bytes_read - 1); i++) { + ph.write(&z, 1); + } + return trun; +} + +std::optional binary_bound_truncator::truncate_to_max_bound_size( + iobuf& b, bool valid_utf8) const { + auto trun = b.share(0, _max_bound_size); + + if (valid_utf8) { + auto crbegin = iobuf::reverse_byte_iterator( + trun.crbegin(), trun.crend()); + auto crend = iobuf::reverse_byte_iterator(trun.crend(), trun.crend()); + auto n = utf::find_incomplete_code_point(crbegin, crend); + if (trun.size_bytes() == n) { + return {}; + } + + trun = trun.share(0, trun.size_bytes() - n); + } + + return trun; +} + } // namespace internal } // namespace serde::parquet diff --git a/src/v/serde/parquet/column_stats_collector.h b/src/v/serde/parquet/column_stats_collector.h index 7a82b1c0f9d56..95afcb81e157c 100644 --- a/src/v/serde/parquet/column_stats_collector.h +++ b/src/v/serde/parquet/column_stats_collector.h @@ -49,12 +49,46 @@ template<> byte_array_value copy(byte_array_value&); template<> fixed_byte_array_value copy(fixed_byte_array_value&); + +class noop_bound_truncator { +public: + explicit noop_bound_truncator(size_t) {} + std::optional get_min_bound(iobuf&) { return {}; } + std::optional get_max_bound(iobuf&) { return {}; } +}; + +class binary_bound_truncator { +public: + explicit binary_bound_truncator(size_t max_bound_size_bytes) + : _max_bound_size(max_bound_size_bytes) {} + + std::optional get_min_bound(iobuf&) const; + std::optional get_max_bound(iobuf&) const; + +private: + size_t _max_bound_size; + + bool is_valid_utf8(const iobuf&) const; + std::optional try_increment(iobuf&, bool) const; + std::optional try_increment_utf8(iobuf&) const; + std::optional try_increment_bytes(iobuf&) const; + std::optional truncate_to_max_bound_size(iobuf&, bool) const; +}; + } // namespace internal // We incrementally collect stats on columns so we can serialize // it in the metadata for query engine performance. -template +template< + typename value_type, + auto comparator, + typename truncator = internal::noop_bound_truncator> class column_stats_collector { + struct bound_t { + std::optional val; + bool is_exact = false; + }; + public: using ref_type = std::conditional_t< std::is_trivially_copyable_v, @@ -65,6 +99,10 @@ class column_stats_collector { std::optional, std::optional&>; + column_stats_collector() = default; + explicit column_stats_collector(std::optional max_bound_size) + : _max_bound_size(max_bound_size) {} + // Record a value in the collector void record_value(ref_type v) { if constexpr (std::is_floating_point_v) { @@ -72,11 +110,15 @@ class column_stats_collector { return; } } - if (!_min || comparator(v, *_min) == std::strong_ordering::less) { - _min = internal::copy(v); + + if ( + !_min.val || comparator(v, *_min.val) == std::strong_ordering::less) { + set_bound(_min, v); } - if (!_max || comparator(v, *_max) == std::strong_ordering::greater) { - _max = internal::copy(v); + if ( + !_max.val + || comparator(v, *_max.val) == std::strong_ordering::greater) { + set_bound(_max, v); } } @@ -87,27 +129,29 @@ class column_stats_collector { void merge(column_stats_collector& other) { _null_count += other._null_count; if ( - other._min - && (!_min || comparator(*other._min, *_min) == std::strong_ordering::less)) { - _min = internal::copy(*other._min); + other._min.val + && (!_min.val || comparator(*other._min.val, *_min.val) == std::strong_ordering::less)) { + _min = {internal::copy(*other._min.val), other._min.is_exact}; } if ( - other._max - && (!_max || comparator(*other._max, *_max) == std::strong_ordering::greater)) { - _max = internal::copy(*other._max); + other._max.val + && (!_max.val || comparator(*other._max.val, *_max.val) == std::strong_ordering::greater)) { + _max = {internal::copy(*other._max.val), other._max.is_exact}; } } - void reset() { _null_count = 0; - _min = std::nullopt; - _max = std::nullopt; + _min = {std::nullopt, false}; + _max = {std::nullopt, false}; } int64_t null_count() const { return _null_count; } - bound_ref_type min() { return normalize(_min, true); } - bound_ref_type max() { return normalize(_max, false); } + bound_ref_type min() { return normalize(_min.val, true); } + bool min_is_exact() const { return _min.is_exact; } + + bound_ref_type max() { return normalize(_max.val, false); } + bool max_is_exact() const { return _max.is_exact; } private: bound_ref_type normalize(bound_ref_type v, bool min) { @@ -120,8 +164,35 @@ class column_stats_collector { return v; } - std::optional _min; - std::optional _max; + template + void set_bound(bound_t& b, ref_type v) { + if constexpr ( + std::is_same_v + || std::is_same_v) { + if (_max_bound_size) { + iobuf& val = v.val; + std::optional t_b; + // Immediately truncating the new bound limits the length of any + // comparison in `record_value` to `_max_bound_size`. + if constexpr (is_min_bound) { + t_b = truncator{*_max_bound_size}.get_min_bound(val); + } else { + t_b = truncator{*_max_bound_size}.get_max_bound(val); + } + + if (t_b) { + b = {value_type{std::move(*t_b)}, false}; + return; + } + } + } + + b = {internal::copy(v), true}; + } + + std::optional _max_bound_size; + bound_t _min; + bound_t _max; int64_t _null_count = 0; }; diff --git a/src/v/serde/parquet/column_writer.cc b/src/v/serde/parquet/column_writer.cc index 224bfb8c5baf3..c2b8d3ec7fcaa 100644 --- a/src/v/serde/parquet/column_writer.cc +++ b/src/v/serde/parquet/column_writer.cc @@ -62,11 +62,19 @@ crc::crc32 compute_crc32(Args&&... args) { return crc; } -template +template< + typename value_type, + auto comparator, + typename truncator = internal::noop_bound_truncator> class buffered_column_writer final : public column_writer::impl { + using stats_t = column_stats_collector; + using bound_ref_t = stats_t::bound_ref_type; + public: buffered_column_writer(const schema_element& schema_element, options opts) - : _max_rep_level(schema_element.max_repetition_level) + : _current_page_stats(opts.max_bound_size_bytes) + , _flushed_stats(opts.max_bound_size_bytes) + , _max_rep_level(schema_element.max_repetition_level) , _max_def_level(schema_element.max_definition_level) , _opts(opts) {} @@ -134,23 +142,14 @@ class buffered_column_writer final : public column_writer::impl { size_t compressed_page_size = encoded_def_levels.size_bytes() + encoded_rep_levels.size_bytes() + encoded_data.size_bytes(); - using bound_type = decltype(_flushed_stats)::bound_ref_type; std::optional max_bound; - if (bound_type max = _current_page_stats.max()) { - // TODO: consider truncating large values instead of writing them - // (is_exact=false) - max_bound.emplace( - /*value=*/encode_for_stats(*max), - /*is_exact=*/true); + if (bound_ref_t max = _current_page_stats.max()) { + max_bound = get_bound(max, _current_page_stats.max_is_exact()); _flushed_stats.record_value(*max); } std::optional min_bound; - if (bound_type min = _current_page_stats.min()) { - // TODO: consider truncating large values instead of writing them - // (is_exact=false) - min_bound.emplace( - /*value=*/encode_for_stats(*min), - /*is_exact=*/true); + if (bound_ref_t min = _current_page_stats.min()) { + min_bound = get_bound(min, _current_page_stats.min_is_exact()); _flushed_stats.record_value(*min); } _flushed_stats.record_null(_current_page_stats.null_count()); @@ -213,20 +212,9 @@ class buffered_column_writer final : public column_writer::impl { statistics full_stats{ .null_count = _flushed_stats.null_count(), - .max = {}, - .min = {}, + .max = get_bound(_flushed_stats.max(), _flushed_stats.max_is_exact()), + .min = get_bound(_flushed_stats.min(), _flushed_stats.min_is_exact()), }; - using bound_type = decltype(_flushed_stats)::bound_ref_type; - if (bound_type max = _flushed_stats.max()) { - full_stats.max.emplace( - /*value=*/encode_for_stats(*max), - /*is_exact=*/true); - } - if (bound_type min = _flushed_stats.min()) { - full_stats.min.emplace( - /*value=*/encode_for_stats(*min), - /*is_exact=*/true); - } _flushed_stats.reset(); _total_memory_usage = 0; co_return flushed_pages{ @@ -236,8 +224,8 @@ class buffered_column_writer final : public column_writer::impl { } private: - column_stats_collector _current_page_stats; - column_stats_collector _flushed_stats; + stats_t _current_page_stats; + stats_t _flushed_stats; int64_t _total_memory_usage = 0; plain_encoder _value_buffer; chunked_vector _def_levels; @@ -248,6 +236,15 @@ class buffered_column_writer final : public column_writer::impl { rep_level _max_rep_level; def_level _max_def_level; options _opts; + + std::optional get_bound(bound_ref_t r, bool is_exact) { + if (!r) { + return {}; + } + + return statistics::bound{ + .value = encode_for_stats(*r), .is_exact = is_exact}; + } }; template class buffered_column_writer; @@ -258,9 +255,17 @@ template class buffered_column_writer; template class buffered_column_writer; template class buffered_column_writer; template class buffered_column_writer; +template class buffered_column_writer< + byte_array_value, + ordering::byte_array, + internal::binary_bound_truncator>; template class buffered_column_writer< fixed_byte_array_value, ordering::fixed_byte_array>; +template class buffered_column_writer< + fixed_byte_array_value, + ordering::fixed_byte_array, + internal::binary_bound_truncator>; template class buffered_column_writer< fixed_byte_array_value, ordering::int128_be>; @@ -308,20 +313,46 @@ make_impl(const schema_element& e, f64_type, options opts) { } std::unique_ptr make_impl(const schema_element& e, byte_array_type t, options opts) { - if (t.fixed_length.has_value()) { - if ( - t.fixed_length == sizeof(absl::int128) - && std::holds_alternative(e.logical_type)) { - return std::make_unique>(e, opts); - } + using ret_t = std::unique_ptr; + auto truncating_byte_array_writer = [&] -> ret_t { return std::make_unique>(e, opts); - } - return std::make_unique< - buffered_column_writer>(e, opts); + byte_array_value, + ordering::byte_array, + internal::binary_bound_truncator>>(e, opts); + }; + return ss::visit( + e.logical_type, + [&](const string_type&) { return truncating_byte_array_writer(); }, + [&](const enum_type&) { return truncating_byte_array_writer(); }, + [&](const json_type&) { return truncating_byte_array_writer(); }, + [&](const bson_type&) { return truncating_byte_array_writer(); }, + [&](const std::monostate&) -> ret_t { + if (t.fixed_length.has_value()) { + return std::make_unique>(e, opts); + } + + return truncating_byte_array_writer(); + }, + [&](const auto&) -> ret_t { + if (t.fixed_length.has_value()) { + if ( + t.fixed_length == sizeof(absl::int128) + && std::holds_alternative(e.logical_type)) { + return std::make_unique>(e, opts); + } + return std::make_unique>(e, opts); + } + return std::make_unique< + buffered_column_writer>( + e, opts); + }); } } // namespace diff --git a/src/v/serde/parquet/column_writer.h b/src/v/serde/parquet/column_writer.h index f856e26b17d7e..05c8a580f919c 100644 --- a/src/v/serde/parquet/column_writer.h +++ b/src/v/serde/parquet/column_writer.h @@ -48,6 +48,9 @@ class column_writer { struct options { // If true, use zstd compression for the column data. bool compress; + // If a value exists then an attempt will be made to truncate most + // binary bounds to this size. + std::optional max_bound_size_bytes; }; explicit column_writer(const schema_element&, options); diff --git a/src/v/serde/parquet/encoding.cc b/src/v/serde/parquet/encoding.cc index 992410d2d2c78..93cb5218a7f81 100644 --- a/src/v/serde/parquet/encoding.cc +++ b/src/v/serde/parquet/encoding.cc @@ -182,6 +182,6 @@ iobuf encode_for_stats(float64_value v) { e.add_value(v); return e.get_encoded_buf(); } -iobuf encode_for_stats(const byte_array_value& v) { return v.val.copy(); } -iobuf encode_for_stats(const fixed_byte_array_value& v) { return v.val.copy(); } +iobuf encode_for_stats(byte_array_value& v) { return v.val.share(); } +iobuf encode_for_stats(fixed_byte_array_value& v) { return v.val.share(); } } // namespace serde::parquet diff --git a/src/v/serde/parquet/encoding.h b/src/v/serde/parquet/encoding.h index 4bc2795c21d75..ff988465a4184 100644 --- a/src/v/serde/parquet/encoding.h +++ b/src/v/serde/parquet/encoding.h @@ -105,7 +105,7 @@ iobuf encode_for_stats(int32_value); iobuf encode_for_stats(int64_value); iobuf encode_for_stats(float32_value); iobuf encode_for_stats(float64_value); -iobuf encode_for_stats(const byte_array_value&); -iobuf encode_for_stats(const fixed_byte_array_value&); +iobuf encode_for_stats(byte_array_value&); +iobuf encode_for_stats(fixed_byte_array_value&); } // namespace serde::parquet diff --git a/src/v/serde/parquet/tests/BUILD b/src/v/serde/parquet/tests/BUILD index df5562830d755..915ff204d6b38 100644 --- a/src/v/serde/parquet/tests/BUILD +++ b/src/v/serde/parquet/tests/BUILD @@ -186,6 +186,7 @@ redpanda_cc_gtest( deps = [ "//src/v/serde/parquet:column_writer", "//src/v/serde/parquet:value", + "//src/v/strings:utf8", "//src/v/test_utils:gtest", "@googletest//:gtest", "@seastar", diff --git a/src/v/serde/parquet/tests/column_stats_collector_test.cc b/src/v/serde/parquet/tests/column_stats_collector_test.cc index d715703e40982..03fc069a43928 100644 --- a/src/v/serde/parquet/tests/column_stats_collector_test.cc +++ b/src/v/serde/parquet/tests/column_stats_collector_test.cc @@ -12,11 +12,13 @@ #include "gmock/gmock.h" #include "serde/parquet/column_stats_collector.h" #include "serde/parquet/value.h" +#include "strings/utf8.h" #include #include #include +#include #include #include @@ -186,4 +188,172 @@ TEST(ColumnStatsCollector, Merge) { EXPECT_THAT(collector_a.max(), Optional(int64_value{99})); } +TEST(NoopTrunactor, Basic) { + serde::parquet::internal::noop_bound_truncator t{0}; + iobuf b; + EXPECT_EQ(t.get_max_bound(b), std::nullopt); + EXPECT_EQ(t.get_min_bound(b), std::nullopt); +} + +namespace { +bool is_utf8(iobuf& b, size_t bound_size) { + auto begin = iobuf::byte_iterator(b.cbegin(), b.cend()); + auto end = iobuf::byte_iterator(b.cend(), b.cend()); + return utf::is_valid_utf8(begin, end, bound_size); +} +} // namespace + +TEST(BinaryTrunactor, Bytes) { + struct test_params { + size_t max_bound_size; + bool is_min_bound; + std::vector bound; + }; + + std::vector>>> + tests{ + {{.max_bound_size = 2, + .is_min_bound = true, + .bound = {0xFF, 0xFF, 0xFF}}, + {{0xFF, 0xFF}}}, + {{.max_bound_size = 2, + .is_min_bound = false, + .bound = {0xFF, 0xFF, 0xFF}}, + {}}, + {{.max_bound_size = 2, + .is_min_bound = false, + .bound = {0xFF, 0x1, 0xFF}}, + {{0xFF, 0x2}}}, + {{.max_bound_size = 2, + .is_min_bound = false, + .bound = {0xFE, 0xFF, 0xFF}}, + {{0xFF, 0x0}}}, + }; + + for (const auto& [params, expec] : tests) { + iobuf buf; + buf.append(¶ms.bound.front(), params.bound.size()); + auto param_bound_str = buf.linearize_to_string(); + auto res = expec.transform([](auto& e) { + iobuf b; + b.append(&e.front(), e.size()); + return b; + }); + + EXPECT_FALSE(is_utf8(buf, params.max_bound_size)); + + serde::parquet::internal::binary_bound_truncator t{ + params.max_bound_size}; + std::optional bound; + if (params.is_min_bound) { + bound = t.get_min_bound(buf); + } else { + bound = t.get_max_bound(buf); + } + EXPECT_EQ(bound, res); + + // Ensure the original buffer remains un-modified. + EXPECT_EQ(buf.linearize_to_string(), param_bound_str); + + if (bound) { + if (params.is_min_bound) { + EXPECT_LE( + bound->linearize_to_string(), buf.linearize_to_string()); + } else { + EXPECT_GE( + bound->linearize_to_string(), buf.linearize_to_string()); + } + } + } +} + +TEST(BinaryTrunactor, MinMax) { + auto b = iobuf::from( + "vkNOQZeDacDujKTSpi3tqFjam5Q7I0PaBS8uXvMeSYsNm8Q2yegdvbTOkjzo2bRSGDSSMjBJ" + "esftbKb7RmIjMh"); + auto b_min + = serde::parquet::internal::binary_bound_truncator{64}.get_min_bound(b); + auto b_max + = serde::parquet::internal::binary_bound_truncator{64}.get_max_bound(b); + EXPECT_TRUE((b_min <=> b_max) == std::strong_ordering::less); +} + +TEST(BinaryTrunactor, UTF8) { + struct test_params { + size_t max_bound_size; + bool is_min_bound; + std::string bound; + }; + + const auto max_code_point = utf::utf32_code_point{0x10FFFF}; + const std::string max_code_point_s = { + max_code_point.utf8_encoding().data(), + max_code_point.utf8_encoding_length()}; + + std::vector>> tests{ + {{.max_bound_size = 2, .is_min_bound = true, .bound = ""}, {}}, + {{.max_bound_size = 2, .is_min_bound = false, .bound = ""}, {}}, + {{.max_bound_size = 2, .is_min_bound = true, .bound = "hello"}, {"he"}}, + {{.max_bound_size = 2, .is_min_bound = false, .bound = "hello"}, {"hf"}}, + {{.max_bound_size = 4, .is_min_bound = false, .bound = "hello"}, + {"helm"}}, + {{.max_bound_size = 5, .is_min_bound = false, .bound = "hello"}, {}}, + {{.max_bound_size = 5, .is_min_bound = true, .bound = "hello"}, {}}, + {{.max_bound_size = 2, .is_min_bound = true, .bound = max_code_point_s}, + {}}, + {{.max_bound_size = 8, + .is_min_bound = false, + .bound = max_code_point_s + max_code_point_s + max_code_point_s}, + {}}, + {{.max_bound_size = 8, + .is_min_bound = true, + .bound = max_code_point_s + max_code_point_s + max_code_point_s}, + {max_code_point_s + max_code_point_s}}, + {{.max_bound_size = 2, + .is_min_bound = false, + .bound = "h" + max_code_point_s + max_code_point_s}, + {"i"}}, + {{.max_bound_size = 4, + .is_min_bound = false, + .bound = "h" + max_code_point_s + max_code_point_s}, + {"i"}}, + {{.max_bound_size = 5, + .is_min_bound = false, + .bound = "h" + max_code_point_s + max_code_point_s}, + {std::string{"i"} + '\0'}}, + }; + + for (auto& [params, expec] : tests) { + serde::parquet::internal::binary_bound_truncator t{ + params.max_bound_size}; + auto buf = iobuf::from(params.bound); + auto res = expec.transform([](auto& r) { return iobuf::from(r); }); + + EXPECT_TRUE(is_utf8(buf, params.max_bound_size)); + + std::optional bound; + if (params.is_min_bound) { + bound = t.get_min_bound(buf); + } else { + bound = t.get_max_bound(buf); + } + EXPECT_EQ(bound, res); + + // Ensure the original buffer remains un-modified. + EXPECT_EQ(buf.linearize_to_string(), params.bound); + + if (bound) { + EXPECT_TRUE(is_utf8(*bound, params.max_bound_size)); + + if (params.is_min_bound) { + EXPECT_LE( + bound->linearize_to_string(), buf.linearize_to_string()); + } else { + EXPECT_GE( + bound->linearize_to_string(), buf.linearize_to_string()); + } + } + } +} + // NOLINTEND(*magic-number*) diff --git a/src/v/serde/parquet/tests/generate_file.cc b/src/v/serde/parquet/tests/generate_file.cc index 0a7ea0717ccac..c01f22bbc2a5e 100644 --- a/src/v/serde/parquet/tests/generate_file.cc +++ b/src/v/serde/parquet/tests/generate_file.cc @@ -245,7 +245,11 @@ schema_element all_types_schema() { leaf_node("C", field_repetition_type::required, i64_type{}), leaf_node("D", field_repetition_type::required, f32_type{}), leaf_node("E", field_repetition_type::required, f64_type{}), - leaf_node("F", field_repetition_type::required, byte_array_type{}))); + leaf_node("F", field_repetition_type::required, byte_array_type{}), + leaf_node( + "G", + field_repetition_type::required, + byte_array_type{.fixed_length = 128}))); // TODO: also add logical types } @@ -284,7 +288,7 @@ value generate_required(const schema_element& root) { return fixed_byte_array_value{iobuf::from( random_generators::gen_alphanum_string(*t.fixed_length))}; } - auto size = random_generators::get_int(64); + auto size = random_generators::get_int(65, 128); return byte_array_value{ iobuf::from(random_generators::gen_alphanum_string(size))}; }); diff --git a/src/v/serde/parquet/writer.cc b/src/v/serde/parquet/writer.cc index 971df2e32a374..109e24b61fe4e 100644 --- a/src/v/serde/parquet/writer.cc +++ b/src/v/serde/parquet/writer.cc @@ -58,6 +58,7 @@ class writer::impl { element, { .compress = _opts.compress, + .max_bound_size_bytes = _opts.max_bound_size, }), }); }); diff --git a/src/v/serde/parquet/writer.h b/src/v/serde/parquet/writer.h index 2bddaa1714e4a..c6de336bf92d4 100644 --- a/src/v/serde/parquet/writer.h +++ b/src/v/serde/parquet/writer.h @@ -52,6 +52,11 @@ class writer { // group). Ecosystem libraries tend to default between 256Kib-1MiB static constexpr int64_t default_page_size = 512_KiB; int64_t page_buffer_size = default_page_size; + + // The target maximum size for bound values in statistic metadata. + // Most ecosystem libraries tend to set this to 60-64bytes. + static constexpr size_t default_max_bound_size = 64; + std::optional max_bound_size = default_max_bound_size; }; // Create a new parquet file writer using the given options that diff --git a/src/v/strings/BUILD b/src/v/strings/BUILD index d355ab2fab4f1..ab1b24c0d7095 100644 --- a/src/v/strings/BUILD +++ b/src/v/strings/BUILD @@ -11,6 +11,7 @@ redpanda_cc_library( visibility = ["//visibility:public"], deps = [ "//src/v/base", + "@boost//:container", "@boost//:locale", ], ) diff --git a/src/v/strings/tests/BUILD b/src/v/strings/tests/BUILD index 7e691e1ca940e..9d8191dc91080 100644 --- a/src/v/strings/tests/BUILD +++ b/src/v/strings/tests/BUILD @@ -8,8 +8,10 @@ redpanda_cc_btest_no_seastar( "static_str_test.cc", "utf8_control_chars.cc", "utf8_substring.cc", + "utf8_test.cc", ], deps = [ + "//src/v/bytes", "//src/v/strings:static_str", "//src/v/strings:string_switch", "//src/v/strings:utf8", diff --git a/src/v/strings/tests/utf8_test.cc b/src/v/strings/tests/utf8_test.cc new file mode 100644 index 0000000000000..74e0843c001fb --- /dev/null +++ b/src/v/strings/tests/utf8_test.cc @@ -0,0 +1,140 @@ +/* + * Copyright 2025 Redpanda Data, Inc. + * + * Use of this software is governed by the Business Source License + * included in the file licenses/BSL.md + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ + +#include "bytes/iobuf.h" +#include "strings/utf8.h" + +#include + +#include +#include + +BOOST_AUTO_TEST_CASE(is_valid_utf8_test) { + auto ts = [](char v) { return std::string{v}; }; + + auto test_cases = std::vector>{ + {"世", true}, + {"hello", true}, + {ts(0x80), false}, + {ts(0xFF), false}, + {ts(0xC2), false}, + {ts(0xE0), false}, + {ts(0xFF) + "hello", false}, + }; + + for (auto& [param, expected] : test_cases) { + BOOST_CHECK_EQUAL( + utf::is_valid_utf8(param.cbegin(), param.cend(), param.size()), + expected); + } +} + +BOOST_AUTO_TEST_CASE(is_valid_utf8_read_limit) { + auto ts = [](char v) { return std::string{v}; }; + + using test_params = std::pair; + auto test_cases = std::vector>{ + {{"世", 1}, true}, + {{ts(0x80), 1}, false}, + {{ts(0xFF), 1}, false}, + {{"hello" + ts(0xFF), 5}, true}, + {{"hello" + ts(0xFF), 6}, false}, + }; + + for (auto& [params, expected] : test_cases) { + auto& [val, max_read_bytes] = params; + BOOST_CHECK_EQUAL( + utf::is_valid_utf8(val.cbegin(), val.cend(), max_read_bytes), + expected); + } +} + +BOOST_AUTO_TEST_CASE(utf32_code_point_to_utf8) { + auto test_cases + = std::vector>>{ + {{U'a'}, {'a', 0, 0, 0}}, + }; + for (auto& [param, expected] : test_cases) { + BOOST_CHECK(param.utf8_encoding() == expected); + } +} + +BOOST_AUTO_TEST_CASE(utf32_code_point_encoding_length) { + auto test_cases = std::vector>{ + {{U'a'}, 1}, + {{0x007F + 1}, 2}, + {{0x07FF + 1}, 3}, + {{0xFFFF + 1}, 4}, + }; + + for (auto& [param, expected] : test_cases) { + BOOST_CHECK_EQUAL(param.utf8_encoding_length(), expected); + } +} + +BOOST_AUTO_TEST_CASE(utf32_code_point_try_increment) { + auto test_cases = std::vector< + std::pair>>{ + {{U'a'}, {{U'b'}}}, + {{U'é'}, {{U'ê'}}}, + {{U'世'}, {{U'丗'}}}, + {{0xD800}, {{0xE000}}}, // surrogate range + {{0xDFFF}, {{0xE000}}}, // ^^ + {{0x007F}, {}}, // can't increment without increasing utf8 encoding length + {{0x07FF}, {}}, // ^^ + {{0xFFFF}, {}}, // ^^ + {{0x10FFFF}, {}}, // max utf32 encoding + }; + + for (auto& [param, expected] : test_cases) { + BOOST_CHECK(param.try_increment() == expected); + } +} + +BOOST_AUTO_TEST_CASE(find_incomplete_code_point_test) { + std::string a = std::string{"hello世"}; + iobuf b = iobuf::from(a); + b.trim_back(1); + auto begin = iobuf::reverse_byte_iterator(b.crbegin(), b.crend()); + auto end = iobuf::reverse_byte_iterator(b.crend(), b.crend()); + auto r = utf::find_incomplete_code_point(begin, end); + BOOST_CHECK_EQUAL(r, 2); + + b.trim_back(r); + BOOST_CHECK_EQUAL(b.linearize_to_string(), "hello"); + + begin = iobuf::reverse_byte_iterator(b.crbegin(), b.crend()); + end = iobuf::reverse_byte_iterator(b.crend(), b.crend()); + r = utf::find_incomplete_code_point(begin, end); + BOOST_CHECK_EQUAL(r, 0); +} + +BOOST_AUTO_TEST_CASE(utf32_reverse_iterator_test) { + auto test_cases + = std::vector>>{ + {"", {}}, + {"a", {U'a'}}, + {"hello世", {U'h', U'e', U'l', U'l', U'o', U'世'}}, + }; + for (auto& [param, expected] : test_cases) { + iobuf b = iobuf::from(param); + auto begin = iobuf::reverse_byte_iterator(b.crbegin(), b.crend()); + auto end = iobuf::reverse_byte_iterator(b.crend(), b.crend()); + auto rcurr = utf::utf32_reverse_iterator(begin, end); + auto rend = utf::utf32_reverse_iterator(end, end); + std::vector rev_res{}; + for (; rcurr != rend; ++rcurr) { + rev_res.push_back(rcurr->code_point); + } + std::vector res{rev_res.crbegin(), rev_res.crend()}; + BOOST_CHECK_EQUAL(expected, res); + } +} diff --git a/src/v/strings/utf8.h b/src/v/strings/utf8.h index ca07651c70c85..6dae00ef9d9bf 100644 --- a/src/v/strings/utf8.h +++ b/src/v/strings/utf8.h @@ -14,10 +14,13 @@ #include "base/likely.h" #include "base/seastarx.h" +#include #include #include #include +#include +#include #include #include @@ -165,3 +168,185 @@ inline void validate_utf8(std::string_view s, const Thrower& thrower) { inline void validate_utf8(std::string_view s) { validate_utf8(s, default_utf8_thrower{}); } + +namespace utf { + +using code_point_t = boost::locale::utf::code_point; +using utf_traits_t = boost::locale::utf::utf_traits; + +template +concept char_iterator = std::same_as, char>; + +/// \brief Checks whether the bytes in [begin, end) are a valid utf-8 sequence. +/// \param begin a forward iterator to the start of the byte sequence. +/// \param end a forward iterator to the end of the byte sequence. +/// \param max_read_bytes the function will only validated code points that +/// start within `max_read_bytes` of `begin`. +/// \return True if a valid utf-8 sequence, false otherwise. +template +bool is_valid_utf8(T begin, T end, size_t max_read_bytes) { + size_t read_bytes = 0; + while (begin != end) { + const boost::locale::utf::code_point c + = boost::locale::utf::utf_traits::decode(begin, end); + if (!boost::locale::utf::is_valid_codepoint(c)) { + return false; + } + read_bytes += utf_traits_t::width(c); + if (read_bytes >= max_read_bytes) { + return true; + } + } + return true; +} + +/// \brief Finds the incomplete codepoint of a truncated utf-8 encoded string if +/// one exists. Assumes that the utf-8 string was valid prior to truncation. +/// \param rbegin the begin reverse iterator to the truncated utf-8 string. +/// \param rend the end reverse iterator to the truncated utf-8 string. +/// \return the number of bytes the incomplete codepoint on the end of the +/// string is. 0 if last codepoint is valid. +template +size_t find_incomplete_code_point(T rbegin, T rend) { + boost::container::static_vector rev_code_units{}; + for (auto curr = rbegin; curr != rend; ++curr) { + rev_code_units.push_back(*curr); + if (utf_traits_t::is_lead(*curr)) { + break; + } + } + + auto crbegin = rev_code_units.crbegin(); + const auto c = utf_traits_t::decode(crbegin, rev_code_units.crend()); + if (!boost::locale::utf::is_valid_codepoint(c)) { + return rev_code_units.size(); + } + + return 0; +} + +struct utf32_code_point { + // `code_point` is default constructed to an invalid state so that a default + // constructed `utf32_code_point` can be differentiated from one constructed + // with a valid code_point. + code_point_t code_point{boost::locale::utf::illegal}; + + bool operator==(const utf32_code_point&) const = default; + + /// \brief The length in bytes of this code point's utf-8 encoding. + unsigned utf8_encoding_length() const { + return utf_traits_t::width(code_point); + } + + /// \brief Encodes the utf-32 code point to utf-8 + /// Note that only the [0, utf8_encoding_length()) indices in the returned + /// array represent the utf-8 encoding. The remaining indices will just be + /// zero-initialized. + /// \returns an array containing the utf-8 encoding. + std::array utf8_encoding() const { + std::array ret{}; + utf_traits_t::encode(code_point, ret.begin()); + return ret; + } + + /// \brief Tries to increment the utf-32 code point. + /// Only increments the code point if the original utf-8 encoding byte + /// length can be preserved. Result may be larger than (c + 1) in order to + /// ensure a valid code point. + /// \return a new code point greater than `c` if possible. Otherwise + /// std::nullopt is returned. + std::optional try_increment() const { + if (code_point >= 0x10FFFF) { + // value limit for utf-32 code points. + return {}; + } + + auto new_c = code_point + 1; + + if (new_c >= 0xD800 && new_c <= 0xDFFF) { + // avoid utf-16 surrogate range. + new_c = 0xE000; + } + + if (utf_traits_t::width(new_c) > utf_traits_t::width(code_point)) { + // ensure utf-8 byte length for the code point remains the same. + return {}; + } + + return utf32_code_point{new_c}; + } +}; + +/** + * Adapts a valid reversed utf-8 byte sequence to a utf-32 code point sequence. + */ +template +class utf32_reverse_iterator { +public: + using value_type = utf32_code_point; + using difference_type = void; + using pointer = const value_type*; + using reference = const value_type&; + using iterator_category = std::forward_iterator_tag; + + /// Note that this constructor expects reverse iterators to a valid utf-8 + /// byte sequence. + /// \param start an iterator to the start of a valid reversed utf-8 byte + /// sequence. + /// \param end an iterator to the end of a valid reversed utf-8 byte + /// sequence. + utf32_reverse_iterator(I start, I end) + : _curr(start) + , _end(end) + , _current() { + decode_next(); + } + + reference operator*() const noexcept { return _current; } + pointer operator->() const noexcept { return &_current; } + + utf32_reverse_iterator& operator++() { + decode_next(); + return *this; + } + utf32_reverse_iterator operator++(int) { + auto tmp = *this; + ++*this; + return tmp; + } + + bool operator==(const utf32_reverse_iterator& o) const noexcept { + return _curr == o._curr && _current == o._current; + } + bool operator!=(const utf32_reverse_iterator& o) const noexcept { + return !(*this == o); + } + +private: + I _curr; + I _end; + utf32_code_point _current; + + void decode_next() { + if (_curr == _end) { + _current = {}; + return; + } + + boost::container::static_vector rev_code_units{}; + for (; _curr != _end; ++_curr) { + rev_code_units.push_back(*_curr); + if (utf_traits_t::is_lead(*_curr)) { + break; + } + } + + ++_curr; + + auto crbegin = rev_code_units.crbegin(); + const auto c = utf_traits_t::decode(crbegin, rev_code_units.crend()); + _current = {c}; + } +}; + +} // namespace utf diff --git a/src/v/test_utils/go/parquet_verifier/main.go b/src/v/test_utils/go/parquet_verifier/main.go index 87f78418320a6..c8429cc2e4f9b 100644 --- a/src/v/test_utils/go/parquet_verifier/main.go +++ b/src/v/test_utils/go/parquet_verifier/main.go @@ -163,18 +163,47 @@ func main() { i, j, col.NumValues(), col.NullCount(), ) } - if !parquet.Equal(cmin, colMin) { + if len(cmin.Bytes()) > 64 || len(cmax.Bytes()) > 64 { log.Fatalf( - "❌ : invalid min value for page in row group %d, column %d: %v != %v\n", - i, j, cmin, colMin, + "❌ : invalid length for cmin %v(len %d) or cmax %v(len %d) needs to be <= 64 bytes\n", + cmin, len(cmin.Bytes()), cmax, len(cmax.Bytes()), ) } - if !parquet.Equal(cmax, colMax) { - log.Fatalf( - "❌ : invalid max value for page in row group %d, column %d: %v != %v\n", - i, j, cmax, colMax, - ) + + switch cmin.Kind() { + case parquet.ByteArray, parquet.FixedLenByteArray: + if parquet.ByteArrayType.Compare(cmin, colMin) > 0 { + log.Fatalf( + "❌ : invalid min value for page in row group %d, column %d: %v > %v\n", + i, j, cmin, colMin, + ) + } + default: + if !parquet.Equal(cmin, colMin) { + log.Fatalf( + "❌ : invalid min value for page in row group %d, column %d: %v != %v\n", + i, j, cmin, colMin, + ) + } + } + + switch cmax.Kind() { + case parquet.ByteArray, parquet.FixedLenByteArray: + if parquet.ByteArrayType.Compare(cmax, colMax) < 0 { + log.Fatalf( + "❌ : invalid max value for page in row group %d, column %d: %v < %v\n", + i, j, cmax, colMax, + ) + } + default: + if !parquet.Equal(cmax, colMax) { + log.Fatalf( + "❌ : invalid max value for page in row group %d, column %d: %v != %v\n", + i, j, cmax, colMax, + ) + } } + _ = pages.Close() } } @@ -233,18 +262,40 @@ func main() { ) } - if !reflect.DeepEqual(omin, rmin) { - log.Fatalf( - "❌ row group %d, column %d min value mismatch: %v != %v\n", - i, j, omin, rmin, - ) + // Note that parquet-go currently does not truncate bounds in page stats. + // Hence the bounds will not be equal in the rewritten file. + switch omin.Kind() { + case parquet.ByteArray, parquet.FixedLenByteArray: + if parquet.ByteArrayType.Compare(omin, rmin) > 0 { + log.Fatalf( + "❌ row group %d, column %d min value mismatch: %v > %v\n", + i, j, omin, rmin, + ) + } + default: + if !parquet.Equal(omin, rmin) { + log.Fatalf( + "❌ row group %d, column %d min value mismatch: %v != %v\n", + i, j, omin, rmin, + ) + } } - if !reflect.DeepEqual(omax, rmax) { - log.Fatalf( - "❌ row group %d, column %d max value mismatch: %v != %v\n", - i, j, omax, rmin, - ) + switch omax.Kind() { + case parquet.ByteArray, parquet.FixedLenByteArray: + if parquet.ByteArrayType.Compare(omax, rmax) < 0 { + log.Fatalf( + "❌ row group %d, column %d max value mismatch: %v < %v\n", + i, j, omax, rmax, + ) + } + default: + if !parquet.Equal(omax, rmax) { + log.Fatalf( + "❌ row group %d, column %d max value mismatch: %v != %v\n", + i, j, omax, rmin, + ) + } } } }