Skip to content

Commit

Permalink
datalake: parser for protobuf message offset list
Browse files Browse the repository at this point in the history
Protobuf records have an additional prefix after the schema id. Since a
protobuf schema can contain multiple message types, and those messages
may be nested, they contain a list of message ids to be traversed to
find the actual message schema for this record. These are encoded as a
varint representing the length followed by a list of varints.
  • Loading branch information
jcipar committed Oct 2, 2024
1 parent 0afdb12 commit 0f557dd
Show file tree
Hide file tree
Showing 3 changed files with 175 additions and 19 deletions.
53 changes: 51 additions & 2 deletions src/v/datalake/schema_registry.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,11 @@
*/
#include "datalake/schema_registry.h"

#include "bytes/iobuf_parser.h"
#include "pandaproxy/schema_registry/types.h"

namespace datalake {
get_schema_id_result get_value_schema_id(const iobuf& buf) {
get_schema_id_result get_value_schema_id(iobuf& buf) {
// Messages that use the schema registry have a 5-byte prefix:
// offset 0: magic byte with value 0
// offsets 1-4: schema ID as big-endian integer
Expand All @@ -27,7 +28,55 @@ get_schema_id_result get_value_schema_id(const iobuf& buf) {
return get_schema_error::no_schema_id;
}
auto id = parser.consume_be_type<int32_t>();
return pandaproxy::schema_registry::schema_id{id};
schema_message_data res = {
.schema_id = pandaproxy::schema_registry::schema_id{id},
.shared_message_data = buf.share(
parser.bytes_consumed(), parser.bytes_left())};

return res;
}

// TODO: this is mostly a copy-and-paste of get_proto_offsets from
// pandaproxy::schema_registry with a slightly different interface. Unify these.
get_proto_offsets_result get_proto_offsets(iobuf& buf) {
auto header = get_value_schema_id(buf);
if (!header.has_value()) {
return header.error();
}
proto_schema_message_data result;
result.schema_id = header.value().schema_id;
iobuf_const_parser parser(header.value().shared_message_data);

// The encoding is a length, followed by indexes into the file or message.
// Each number is a zigzag encoded integer.
auto [offset_count, bytes_read] = parser.read_varlong();
if (!bytes_read) {
return get_schema_error::bad_varint;
}
// Reject more offsets than bytes remaining; it's not possible
if (static_cast<size_t>(offset_count) > parser.bytes_left()) {
return get_schema_error::not_enough_bytes;
}
if (offset_count == 0) {
result.protobuf_offsets.push_back(0);
result.shared_message_data = header.value().shared_message_data.share(
parser.bytes_consumed(), parser.bytes_left());
return result;
}
result.protobuf_offsets.resize(offset_count);
for (auto& o : result.protobuf_offsets) {
if (parser.bytes_left() == 0) {
return get_schema_error::not_enough_bytes;
}
std::tie(o, bytes_read) = parser.read_varlong();
if (!bytes_read) {
return get_schema_error::bad_varint;
}
}

result.shared_message_data = header.value().shared_message_data.share(
parser.bytes_consumed(), parser.bytes_left());
return result;
}

} // namespace datalake
29 changes: 24 additions & 5 deletions src/v/datalake/schema_registry.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,26 @@ class iobuf;

namespace datalake {

// Stores a pair of schema id and the remaining message data after the schema
// prefix. The message data is shared from the original iobuf.
struct schema_message_data {
pandaproxy::schema_registry::schema_id schema_id;
iobuf shared_message_data;
};

// Stores the schema_id, protobuf message offsets, and remaining message data.
// The message data is shared from the original iobuf.
struct proto_schema_message_data {
pandaproxy::schema_registry::schema_id schema_id;
std::vector<int32_t> protobuf_offsets;
iobuf shared_message_data;
};

enum class get_schema_error {
ok = 0,
no_schema_id,
not_enough_bytes,
bad_varint,
};

struct get_schema_error_category : std::error_category {
Expand All @@ -35,6 +51,8 @@ struct get_schema_error_category : std::error_category {
return "No schema ID";
case get_schema_error::not_enough_bytes:
return "Not enough bytes in message";
case get_schema_error::bad_varint:
return "Bad encoded value for varint";
}
}

Expand All @@ -48,13 +66,14 @@ inline std::error_code make_error_code(get_schema_error e) noexcept {
return {static_cast<int>(e), get_schema_error_category::error_category()};
}

using get_schema_id_result
= result<pandaproxy::schema_registry::schema_id, get_schema_error>;
using get_schema_id_result = result<schema_message_data, get_schema_error>;
using get_proto_offsets_result
= result<proto_schema_message_data, get_schema_error>;

// Extract the schema id from a record's value. This simply extracts the id. It
// does not do any validation. Returns std::nullopt if the record does not have
// a schema id.
get_schema_id_result get_value_schema_id(const iobuf& record);
// does not validate that the schema exists in the Schema Registry.
get_schema_id_result get_value_schema_id(iobuf& record);
get_proto_offsets_result get_proto_offsets(iobuf& record);

} // namespace datalake

Expand Down
112 changes: 100 additions & 12 deletions src/v/datalake/tests/schema_registry_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,32 +10,54 @@

#include "bytes/iobuf.h"
#include "datalake/schema_registry.h"
#include "gmock/gmock.h"
#include "model/record.h"
#include "utils/vint.h"

#include <gtest/gtest.h>

#include <variant>

TEST(DatalakeSchemaRegistry, SchemaIdForValidRecord) {
iobuf value;
namespace {
void buf_append(iobuf& b, uint8_t byte) { b.append(&byte, 1); }
void buf_append(iobuf& b, int32_t val) {
b.append(reinterpret_cast<uint8_t*>(&val), 4);
}
void buf_append(iobuf& b, const bytes& byte) {
b.append(byte.data(), byte.size());
}
void buf_append(iobuf& b, const std::string& str) {
b.append(str.data(), str.size());
}

template<typename... Args>
iobuf buf_from(const Args&... args) {
iobuf b;
(buf_append(b, args), ...);
return b;
}
} // namespace

TEST(DatalakeSchemaRegistry, SchemaIdForValidRecord) {
uint8_t magic = 0;
int32_t schema_id = 12;
int32_t schema_id_encoded = htobe32(schema_id);
value.append(&magic, 1);
value.append(reinterpret_cast<uint8_t*>(&schema_id_encoded), 4);
int32_t schema_id_encoded = ss::cpu_to_be(schema_id);
std::string payload = "Hello world";

iobuf value = buf_from(magic, schema_id_encoded, payload);

auto res = datalake::get_value_schema_id(value);
ASSERT_TRUE(res.has_value());
EXPECT_EQ(res.value()(), schema_id);
EXPECT_EQ(res.value().schema_id(), schema_id);
EXPECT_EQ(res.value().shared_message_data.size_bytes(), payload.size());
}

TEST(DatalakeSchemaRegistry, SchemaIdForShortRecord) {
iobuf value;

uint8_t magic = 0;
int32_t schema_id = 12;
int32_t schema_id_encoded = htobe32(schema_id);
int32_t schema_id_encoded = ss::cpu_to_be(schema_id);
value.append(&magic, 1);
// Only adding 3 bytes here instead of 4 to generate an invalid record.
value.append(reinterpret_cast<uint8_t*>(&schema_id_encoded), 3);
Expand All @@ -46,14 +68,80 @@ TEST(DatalakeSchemaRegistry, SchemaIdForShortRecord) {
}

TEST(DatalakeSchemaRegistry, SchemaIdForBadMagic) {
iobuf value;

uint8_t magic = 5; // Invalid magic
int32_t schema_id = 12;
int32_t schema_id_encoded = htobe32(schema_id);
value.append(&magic, 1);
value.append(reinterpret_cast<uint8_t*>(&schema_id_encoded), 4);
int32_t schema_id_encoded = ss::cpu_to_be(schema_id);

iobuf value = buf_from(magic, schema_id_encoded);

auto res = datalake::get_value_schema_id(value);
ASSERT_TRUE(res.has_error());
EXPECT_EQ(res.error(), datalake::get_schema_error::no_schema_id);
}

TEST(DatalakeSchemaRegistry, GetProtoOffsetsOk) {
uint8_t magic = 0;
uint32_t schema_id = 12;
int32_t schema_id_encoded = ss::cpu_to_be(schema_id);
std::string payload = "Hello world";

uint8_t proto_msg_count = 5;
auto encoded = vint::to_bytes(proto_msg_count);
iobuf value = buf_from(magic, schema_id_encoded, encoded);

for (uint8_t i = 0; i < proto_msg_count; i++) {
encoded = vint::to_bytes(i);
value.append(encoded.data(), encoded.size());
}
value.append(payload.data(), payload.size());

auto res = datalake::get_proto_offsets(value);
ASSERT_TRUE(res.has_value());
const auto& offsets = res.value().protobuf_offsets;
EXPECT_THAT(offsets, testing::ElementsAre(0, 1, 2, 3, 4));
EXPECT_EQ(res.value().schema_id(), schema_id);
EXPECT_EQ(res.value().shared_message_data.size_bytes(), payload.size());
}

TEST(DatalakeSchemaRegistry, GetProtoOffsetsDefaultZero) {
// This tests a special case where the offset count is 0, we should assume
// that the message is the first one defined in the schema and return {0}.

uint8_t magic = 0;
uint32_t schema_id = 12;
int32_t schema_id_encoded = ss::cpu_to_be(schema_id);
std::string payload = "Hello world";

uint8_t proto_msg_count = 0;
auto encoded = vint::to_bytes(proto_msg_count);

iobuf value = buf_from(magic, schema_id_encoded, encoded, payload);

auto res = datalake::get_proto_offsets(value);
ASSERT_TRUE(res.has_value());
const auto& offsets = res.value().protobuf_offsets;
EXPECT_EQ(offsets.size(), 1);
EXPECT_EQ(offsets[0], 0);
EXPECT_EQ(res.value().schema_id(), schema_id);
EXPECT_EQ(res.value().shared_message_data.size_bytes(), payload.size());
}

TEST(DatalakeSchemaRegistry, GetProtoOffsetsNotEnoughData) {
uint8_t magic = 0;
uint32_t schema_id = 12;
int32_t schema_id_encoded = ss::cpu_to_be(schema_id);

uint8_t proto_msg_count = 9;
auto encoded = vint::to_bytes(proto_msg_count);

iobuf value = buf_from(magic, schema_id_encoded, encoded);

for (uint8_t i = 0; i < proto_msg_count - 1; i++) {
encoded = vint::to_bytes(i);
value.append(encoded.data(), encoded.size());
}

auto res = datalake::get_proto_offsets(value);
ASSERT_TRUE(res.has_error());
EXPECT_EQ(res.error(), datalake::get_schema_error::not_enough_bytes);
}

0 comments on commit 0f557dd

Please sign in to comment.