Skip to content

Commit

Permalink
datalake: parser for protobuf message offset list
Browse files Browse the repository at this point in the history
Protobuf records have an additional prefix after the schema id. Since a
protobuf schema can contain multiple message types, and those messages
may be nested, they contain a list of message ids to be traversed to
find the actual message schema for this record. These are encoded as a
varint representing the length followed by a list of varints.
  • Loading branch information
jcipar committed Oct 1, 2024
1 parent 739be8a commit d348baf
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 0 deletions.
37 changes: 37 additions & 0 deletions src/v/datalake/schema_registry.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
*
* https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md
*/
#include "bytes/iobuf_parser.h"
#include "pandaproxy/schema_registry/types.h"
#include "datalake/schema_registry.h"

#include "pandaproxy/schema_registry/types.h"
Expand All @@ -30,4 +32,39 @@ get_schema_id_result get_value_schema_id(const iobuf& buf) {
return pandaproxy::schema_registry::schema_id{id};
}

// TODO: this is mostly a copy-and-paste of get_proto_offsets from
// pandaproxy::schema_registry with a slightly different interface. Unify these.
get_proto_offsets_result get_proto_offsets(const iobuf& buf) {
// The encoding is a length, followed by indexes into the file or message.
// Each number is a zigzag encoded integer.
iobuf_const_parser parser(buf);
// skip schema id prefix
parser.skip(sizeof(uint8_t) + sizeof(int32_t));
std::vector<int32_t> offsets;
auto [offset_count, bytes_read] = parser.read_varlong();
if (!bytes_read) {
return get_schema_error::bad_varint;
}
// Reject more offsets than bytes remaining; it's not possible
if (static_cast<size_t>(offset_count) > parser.bytes_left()) {
return get_schema_error::not_enough_bytes;
}
offsets.resize(offset_count);
for (auto& o : offsets) {
if (parser.bytes_left() == 0) {
return get_schema_error::not_enough_bytes;
}
std::tie(o, bytes_read) = parser.read_varlong();
if (!bytes_read) {
return get_schema_error::bad_varint;
}
}
if (offsets.empty()) {
// If the decoded length is 0, then assume the optimised encoding for
// using the first message in the file
offsets.push_back(0);
}
return offsets;
}

} // namespace datalake
5 changes: 5 additions & 0 deletions src/v/datalake/schema_registry.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ enum class get_schema_error {
ok = 0,
no_schema_id,
not_enough_bytes,
bad_varint,
};

struct get_schema_error_category : std::error_category {
Expand All @@ -35,6 +36,8 @@ struct get_schema_error_category : std::error_category {
return "No schema ID";
case get_schema_error::not_enough_bytes:
return "Not enough bytes in message";
case get_schema_error::bad_varint:
return "Bad encoded value for varint";
}
}

Expand All @@ -50,11 +53,13 @@ inline std::error_code make_error_code(get_schema_error e) noexcept {

using get_schema_id_result
= result<pandaproxy::schema_registry::schema_id, get_schema_error>;
using get_proto_offsets_result = result<std::vector<int32_t>, get_schema_error>;

// Extract the schema id from a record's value. This simply extracts the id. It
// does not do any validation. Returns std::nullopt if the record does not have
// a schema id.
get_schema_id_result get_value_schema_id(const iobuf& record);
get_proto_offsets_result get_proto_offsets(const iobuf& record);

} // namespace datalake

Expand Down
53 changes: 53 additions & 0 deletions src/v/datalake/tests/schema_registry_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include "bytes/iobuf.h"
#include "datalake/schema_registry.h"
#include "model/record.h"
#include "utils/vint.h"

#include <gtest/gtest.h>

Expand Down Expand Up @@ -57,3 +58,55 @@ TEST(DatalakeSchemaRegistry, SchemaIdForBadMagic) {
ASSERT_TRUE(res.has_error());
EXPECT_EQ(res.error(), datalake::get_schema_error::no_schema_id);
}

TEST(DatalakeSchemaRegistry, GetProtoOffsetsOk) {
iobuf value;

uint8_t magic = 5; // Invalid magic
uint32_t schema_id = 12;
value.append(&magic, 1);
value.append(reinterpret_cast<uint8_t*>(&schema_id), 4);

uint8_t proto_msg_count = 9;
std::array<uint8_t, 16> encoded;
size_t encoded_size = vint::serialize(proto_msg_count, encoded.data());
value.append(encoded.data(), encoded_size);

for (uint8_t i = 0; i < proto_msg_count; i++) {
encoded_size = vint::serialize(i, encoded.data());
value.append(encoded.data(), encoded_size);
}

auto res = datalake::get_proto_offsets(value);
std::cerr << "RESULT " << res << std::endl;
ASSERT_TRUE(res.has_value());
const auto& offsets = res.value();
EXPECT_EQ(offsets.size(), proto_msg_count);
for (int32_t o = 0; o < offsets.size(); o++) {
EXPECT_EQ(o, offsets[o]);
}
}

TEST(DatalakeSchemaRegistry, GetProtoOffsetsNotEnoughData) {
iobuf value;

uint8_t magic = 5; // Invalid magic
uint32_t schema_id = 12;
value.append(&magic, 1);
value.append(reinterpret_cast<uint8_t*>(&schema_id), 4);

uint8_t proto_msg_count = 9;
std::array<uint8_t, 16> encoded;
size_t encoded_size = vint::serialize(proto_msg_count, encoded.data());
value.append(encoded.data(), encoded_size);

for (uint8_t i = 0; i < proto_msg_count-1; i++) {
encoded_size = vint::serialize(i, encoded.data());
value.append(encoded.data(), encoded_size);
}

auto res = datalake::get_proto_offsets(value);
std::cerr << "RESULT " << res << std::endl;
ASSERT_TRUE(res.has_error());
EXPECT_EQ(res.error(), datalake::get_schema_error::not_enough_bytes);
}

0 comments on commit d348baf

Please sign in to comment.