Skip to content

Commit

Permalink
Merge pull request #23308 from jcipar/jcipar/datalake-schema-registry
Browse files Browse the repository at this point in the history
datalake: helper code to get schemas for records
  • Loading branch information
jcipar authored Oct 3, 2024
2 parents 1442931 + 0f557dd commit 7ae3406
Show file tree
Hide file tree
Showing 5 changed files with 325 additions and 0 deletions.
1 change: 1 addition & 0 deletions src/v/datalake/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ v_cc_library(
datalake_manager.cc
parquet_writer.cc
record_multiplexer.cc
schema_registry.cc
schemaless_translator.cc
schema_protobuf.cc
protobuf_utils.cc
Expand Down
82 changes: 82 additions & 0 deletions src/v/datalake/schema_registry.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Copyright 2024 Redpanda Data, Inc.
*
* Licensed as a Redpanda Enterprise file under the Redpanda Community
* License (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md
*/
#include "datalake/schema_registry.h"

#include "bytes/iobuf_parser.h"
#include "pandaproxy/schema_registry/types.h"

namespace datalake {
get_schema_id_result get_value_schema_id(iobuf& buf) {
// Messages that use the schema registry have a 5-byte prefix:
// offset 0: magic byte with value 0
// offsets 1-4: schema ID as big-endian integer
const uint8_t schema_registry_magic_byte = 0;

if (buf.size_bytes() < sizeof(uint8_t) + sizeof(int32_t)) {
return get_schema_error::not_enough_bytes;
}
iobuf_const_parser parser(buf);
auto magic = parser.consume_type<uint8_t>();
if (magic != schema_registry_magic_byte) {
return get_schema_error::no_schema_id;
}
auto id = parser.consume_be_type<int32_t>();
schema_message_data res = {
.schema_id = pandaproxy::schema_registry::schema_id{id},
.shared_message_data = buf.share(
parser.bytes_consumed(), parser.bytes_left())};

return res;
}

// TODO: this is mostly a copy-and-paste of get_proto_offsets from
// pandaproxy::schema_registry with a slightly different interface. Unify these.
get_proto_offsets_result get_proto_offsets(iobuf& buf) {
auto header = get_value_schema_id(buf);
if (!header.has_value()) {
return header.error();
}
proto_schema_message_data result;
result.schema_id = header.value().schema_id;
iobuf_const_parser parser(header.value().shared_message_data);

// The encoding is a length, followed by indexes into the file or message.
// Each number is a zigzag encoded integer.
auto [offset_count, bytes_read] = parser.read_varlong();
if (!bytes_read) {
return get_schema_error::bad_varint;
}
// Reject more offsets than bytes remaining; it's not possible
if (static_cast<size_t>(offset_count) > parser.bytes_left()) {
return get_schema_error::not_enough_bytes;
}
if (offset_count == 0) {
result.protobuf_offsets.push_back(0);
result.shared_message_data = header.value().shared_message_data.share(
parser.bytes_consumed(), parser.bytes_left());
return result;
}
result.protobuf_offsets.resize(offset_count);
for (auto& o : result.protobuf_offsets) {
if (parser.bytes_left() == 0) {
return get_schema_error::not_enough_bytes;
}
std::tie(o, bytes_read) = parser.read_varlong();
if (!bytes_read) {
return get_schema_error::bad_varint;
}
}

result.shared_message_data = header.value().shared_message_data.share(
parser.bytes_consumed(), parser.bytes_left());
return result;
}

} // namespace datalake
83 changes: 83 additions & 0 deletions src/v/datalake/schema_registry.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Copyright 2024 Redpanda Data, Inc.
*
* Licensed as a Redpanda Enterprise file under the Redpanda Community
* License (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md
*/
#pragma once

#include "pandaproxy/schema_registry/types.h"

#include <system_error>
#include <type_traits>

class iobuf;

namespace datalake {

// Stores a pair of schema id and the remaining message data after the schema
// prefix. The message data is shared from the original iobuf.
struct schema_message_data {
pandaproxy::schema_registry::schema_id schema_id;
iobuf shared_message_data;
};

// Stores the schema_id, protobuf message offsets, and remaining message data.
// The message data is shared from the original iobuf.
struct proto_schema_message_data {
pandaproxy::schema_registry::schema_id schema_id;
std::vector<int32_t> protobuf_offsets;
iobuf shared_message_data;
};

enum class get_schema_error {
ok = 0,
no_schema_id,
not_enough_bytes,
bad_varint,
};

struct get_schema_error_category : std::error_category {
const char* name() const noexcept override { return "Get Schema Error"; }

std::string message(int ev) const override {
switch (static_cast<get_schema_error>(ev)) {
case get_schema_error::ok:
return "Ok";
case get_schema_error::no_schema_id:
return "No schema ID";
case get_schema_error::not_enough_bytes:
return "Not enough bytes in message";
case get_schema_error::bad_varint:
return "Bad encoded value for varint";
}
}

static const std::error_category& error_category() {
static get_schema_error_category e;
return e;
}
};

inline std::error_code make_error_code(get_schema_error e) noexcept {
return {static_cast<int>(e), get_schema_error_category::error_category()};
}

using get_schema_id_result = result<schema_message_data, get_schema_error>;
using get_proto_offsets_result
= result<proto_schema_message_data, get_schema_error>;

// Extract the schema id from a record's value. This simply extracts the id. It
// does not validate that the schema exists in the Schema Registry.
get_schema_id_result get_value_schema_id(iobuf& record);
get_proto_offsets_result get_proto_offsets(iobuf& record);

} // namespace datalake

namespace std {
template<>
struct is_error_code_enum<datalake::get_schema_error> : true_type {};
} // namespace std
12 changes: 12 additions & 0 deletions src/v/datalake/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,18 @@ rp_test(
v::datalake_test_proto_cc_files
INPUT_FILES
"${testdata_dir}/iceberg_ready_test_messages_edition2023.proto"
LABELS datalake
ARGS "-- -c 1"
)

rp_test(
GTEST
BINARY_NAME gtest_datalake_schema_registry
SOURCES schema_registry_test.cc
LIBRARIES
v::gtest_main
v::datalake
LABELS datalake
ARGS "-- -c 1"
)

Expand Down
147 changes: 147 additions & 0 deletions src/v/datalake/tests/schema_registry_test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
/*
* Copyright 2024 Redpanda Data, Inc.
*
* Licensed as a Redpanda Enterprise file under the Redpanda Community
* License (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md
*/

#include "bytes/iobuf.h"
#include "datalake/schema_registry.h"
#include "gmock/gmock.h"
#include "model/record.h"
#include "utils/vint.h"

#include <gtest/gtest.h>

#include <variant>

namespace {
void buf_append(iobuf& b, uint8_t byte) { b.append(&byte, 1); }
void buf_append(iobuf& b, int32_t val) {
b.append(reinterpret_cast<uint8_t*>(&val), 4);
}
void buf_append(iobuf& b, const bytes& byte) {
b.append(byte.data(), byte.size());
}
void buf_append(iobuf& b, const std::string& str) {
b.append(str.data(), str.size());
}

template<typename... Args>
iobuf buf_from(const Args&... args) {
iobuf b;
(buf_append(b, args), ...);
return b;
}
} // namespace

TEST(DatalakeSchemaRegistry, SchemaIdForValidRecord) {
uint8_t magic = 0;
int32_t schema_id = 12;
int32_t schema_id_encoded = ss::cpu_to_be(schema_id);
std::string payload = "Hello world";

iobuf value = buf_from(magic, schema_id_encoded, payload);

auto res = datalake::get_value_schema_id(value);
ASSERT_TRUE(res.has_value());
EXPECT_EQ(res.value().schema_id(), schema_id);
EXPECT_EQ(res.value().shared_message_data.size_bytes(), payload.size());
}

TEST(DatalakeSchemaRegistry, SchemaIdForShortRecord) {
iobuf value;

uint8_t magic = 0;
int32_t schema_id = 12;
int32_t schema_id_encoded = ss::cpu_to_be(schema_id);
value.append(&magic, 1);
// Only adding 3 bytes here instead of 4 to generate an invalid record.
value.append(reinterpret_cast<uint8_t*>(&schema_id_encoded), 3);

auto res = datalake::get_value_schema_id(value);
ASSERT_TRUE(res.has_error());
EXPECT_EQ(res.error(), datalake::get_schema_error::not_enough_bytes);
}

TEST(DatalakeSchemaRegistry, SchemaIdForBadMagic) {
uint8_t magic = 5; // Invalid magic
int32_t schema_id = 12;
int32_t schema_id_encoded = ss::cpu_to_be(schema_id);

iobuf value = buf_from(magic, schema_id_encoded);

auto res = datalake::get_value_schema_id(value);
ASSERT_TRUE(res.has_error());
EXPECT_EQ(res.error(), datalake::get_schema_error::no_schema_id);
}

TEST(DatalakeSchemaRegistry, GetProtoOffsetsOk) {
uint8_t magic = 0;
uint32_t schema_id = 12;
int32_t schema_id_encoded = ss::cpu_to_be(schema_id);
std::string payload = "Hello world";

uint8_t proto_msg_count = 5;
auto encoded = vint::to_bytes(proto_msg_count);
iobuf value = buf_from(magic, schema_id_encoded, encoded);

for (uint8_t i = 0; i < proto_msg_count; i++) {
encoded = vint::to_bytes(i);
value.append(encoded.data(), encoded.size());
}
value.append(payload.data(), payload.size());

auto res = datalake::get_proto_offsets(value);
ASSERT_TRUE(res.has_value());
const auto& offsets = res.value().protobuf_offsets;
EXPECT_THAT(offsets, testing::ElementsAre(0, 1, 2, 3, 4));
EXPECT_EQ(res.value().schema_id(), schema_id);
EXPECT_EQ(res.value().shared_message_data.size_bytes(), payload.size());
}

TEST(DatalakeSchemaRegistry, GetProtoOffsetsDefaultZero) {
// This tests a special case where the offset count is 0, we should assume
// that the message is the first one defined in the schema and return {0}.

uint8_t magic = 0;
uint32_t schema_id = 12;
int32_t schema_id_encoded = ss::cpu_to_be(schema_id);
std::string payload = "Hello world";

uint8_t proto_msg_count = 0;
auto encoded = vint::to_bytes(proto_msg_count);

iobuf value = buf_from(magic, schema_id_encoded, encoded, payload);

auto res = datalake::get_proto_offsets(value);
ASSERT_TRUE(res.has_value());
const auto& offsets = res.value().protobuf_offsets;
EXPECT_EQ(offsets.size(), 1);
EXPECT_EQ(offsets[0], 0);
EXPECT_EQ(res.value().schema_id(), schema_id);
EXPECT_EQ(res.value().shared_message_data.size_bytes(), payload.size());
}

TEST(DatalakeSchemaRegistry, GetProtoOffsetsNotEnoughData) {
uint8_t magic = 0;
uint32_t schema_id = 12;
int32_t schema_id_encoded = ss::cpu_to_be(schema_id);

uint8_t proto_msg_count = 9;
auto encoded = vint::to_bytes(proto_msg_count);

iobuf value = buf_from(magic, schema_id_encoded, encoded);

for (uint8_t i = 0; i < proto_msg_count - 1; i++) {
encoded = vint::to_bytes(i);
value.append(encoded.data(), encoded.size());
}

auto res = datalake::get_proto_offsets(value);
ASSERT_TRUE(res.has_error());
EXPECT_EQ(res.error(), datalake::get_schema_error::not_enough_bytes);
}

0 comments on commit 7ae3406

Please sign in to comment.