Skip to content

Commit c6e3d5d

Browse files
committed
datalake: parser for protobuf message offset list
Protobuf records have an additional prefix after the schema id. Since a protobuf schema can contain multiple message types, and those messages may be nested, they contain a list of message ids to be traversed to find the actual message schema for this record. These are encoded as a varint representing the length followed by a list of varints.
1 parent 739be8a commit c6e3d5d

File tree

3 files changed

+175
-19
lines changed

3 files changed

+175
-19
lines changed

src/v/datalake/schema_registry.cc

+51-2
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,11 @@
99
*/
1010
#include "datalake/schema_registry.h"
1111

12+
#include "bytes/iobuf_parser.h"
1213
#include "pandaproxy/schema_registry/types.h"
1314

1415
namespace datalake {
15-
get_schema_id_result get_value_schema_id(const iobuf& buf) {
16+
get_schema_id_result get_value_schema_id(iobuf& buf) {
1617
// Messages that use the schema registry have a 5-byte prefix:
1718
// offset 0: magic byte with value 0
1819
// offsets 1-4: schema ID as big-endian integer
@@ -27,7 +28,55 @@ get_schema_id_result get_value_schema_id(const iobuf& buf) {
2728
return get_schema_error::no_schema_id;
2829
}
2930
auto id = parser.consume_be_type<int32_t>();
30-
return pandaproxy::schema_registry::schema_id{id};
31+
schema_message_data res = {
32+
.schema_id = pandaproxy::schema_registry::schema_id{id},
33+
.shared_message_data = buf.share(
34+
parser.bytes_consumed(), parser.bytes_left())};
35+
36+
return res;
37+
}
38+
39+
// TODO: this is mostly a copy-and-paste of get_proto_offsets from
40+
// pandaproxy::schema_registry with a slightly different interface. Unify these.
41+
get_proto_offsets_result get_proto_offsets(iobuf& buf) {
42+
auto header = get_value_schema_id(buf);
43+
if (!header.has_value()) {
44+
return header.error();
45+
}
46+
proto_schema_message_data result;
47+
result.schema_id = header.value().schema_id;
48+
iobuf_const_parser parser(header.value().shared_message_data);
49+
50+
// The encoding is a length, followed by indexes into the file or message.
51+
// Each number is a zigzag encoded integer.
52+
auto [offset_count, bytes_read] = parser.read_varlong();
53+
if (!bytes_read) {
54+
return get_schema_error::bad_varint;
55+
}
56+
// Reject more offsets than bytes remaining; it's not possible
57+
if (static_cast<size_t>(offset_count) > parser.bytes_left()) {
58+
return get_schema_error::not_enough_bytes;
59+
}
60+
if (offset_count == 0) {
61+
result.protobuf_offsets.push_back(0);
62+
result.shared_message_data = header.value().shared_message_data.share(
63+
parser.bytes_consumed(), parser.bytes_left());
64+
return result;
65+
}
66+
result.protobuf_offsets.resize(offset_count);
67+
for (auto& o : result.protobuf_offsets) {
68+
if (parser.bytes_left() == 0) {
69+
return get_schema_error::not_enough_bytes;
70+
}
71+
std::tie(o, bytes_read) = parser.read_varlong();
72+
if (!bytes_read) {
73+
return get_schema_error::bad_varint;
74+
}
75+
}
76+
77+
result.shared_message_data = header.value().shared_message_data.share(
78+
parser.bytes_consumed(), parser.bytes_left());
79+
return result;
3180
}
3281

3382
} // namespace datalake

src/v/datalake/schema_registry.h

+24-5
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,26 @@ class iobuf;
1818

1919
namespace datalake {
2020

21+
// Stores a pair of schema id and the remaining message data after the schema
22+
// prefix. The message data is shared from the original iobuf.
23+
struct schema_message_data {
24+
pandaproxy::schema_registry::schema_id schema_id;
25+
iobuf shared_message_data;
26+
};
27+
28+
// Stores the schema_id, protobuf message offsets, and remaining message data.
29+
// The message data is shared from the original iobuf.
30+
struct proto_schema_message_data {
31+
pandaproxy::schema_registry::schema_id schema_id;
32+
std::vector<int32_t> protobuf_offsets;
33+
iobuf shared_message_data;
34+
};
35+
2136
enum class get_schema_error {
2237
ok = 0,
2338
no_schema_id,
2439
not_enough_bytes,
40+
bad_varint,
2541
};
2642

2743
struct get_schema_error_category : std::error_category {
@@ -35,6 +51,8 @@ struct get_schema_error_category : std::error_category {
3551
return "No schema ID";
3652
case get_schema_error::not_enough_bytes:
3753
return "Not enough bytes in message";
54+
case get_schema_error::bad_varint:
55+
return "Bad encoded value for varint";
3856
}
3957
}
4058

@@ -48,13 +66,14 @@ inline std::error_code make_error_code(get_schema_error e) noexcept {
4866
return {static_cast<int>(e), get_schema_error_category::error_category()};
4967
}
5068

51-
using get_schema_id_result
52-
= result<pandaproxy::schema_registry::schema_id, get_schema_error>;
69+
using get_schema_id_result = result<schema_message_data, get_schema_error>;
70+
using get_proto_offsets_result
71+
= result<proto_schema_message_data, get_schema_error>;
5372

5473
// Extract the schema id from a record's value. This simply extracts the id. It
55-
// does not do any validation. Returns std::nullopt if the record does not have
56-
// a schema id.
57-
get_schema_id_result get_value_schema_id(const iobuf& record);
74+
// does not validate that the schema exists in the Schema Registry.
75+
get_schema_id_result get_value_schema_id(iobuf& record);
76+
get_proto_offsets_result get_proto_offsets(iobuf& record);
5877

5978
} // namespace datalake
6079

src/v/datalake/tests/schema_registry_test.cc

+100-12
Original file line numberDiff line numberDiff line change
@@ -10,32 +10,54 @@
1010

1111
#include "bytes/iobuf.h"
1212
#include "datalake/schema_registry.h"
13+
#include "gmock/gmock.h"
1314
#include "model/record.h"
15+
#include "utils/vint.h"
1416

1517
#include <gtest/gtest.h>
1618

1719
#include <variant>
1820

19-
TEST(DatalakeSchemaRegistry, SchemaIdForValidRecord) {
20-
iobuf value;
21+
namespace {
22+
void buf_append(iobuf& b, uint8_t byte) { b.append(&byte, 1); }
23+
void buf_append(iobuf& b, int32_t val) {
24+
b.append(reinterpret_cast<uint8_t*>(&val), 4);
25+
}
26+
void buf_append(iobuf& b, const bytes& byte) {
27+
b.append(byte.data(), byte.size());
28+
}
29+
void buf_append(iobuf& b, const std::string& str) {
30+
b.append(str.data(), str.size());
31+
}
2132

33+
template<typename... Args>
34+
iobuf buf_from(const Args&... args) {
35+
iobuf b;
36+
(buf_append(b, args), ...);
37+
return b;
38+
}
39+
} // namespace
40+
41+
TEST(DatalakeSchemaRegistry, SchemaIdForValidRecord) {
2242
uint8_t magic = 0;
2343
int32_t schema_id = 12;
24-
int32_t schema_id_encoded = htobe32(schema_id);
25-
value.append(&magic, 1);
26-
value.append(reinterpret_cast<uint8_t*>(&schema_id_encoded), 4);
44+
int32_t schema_id_encoded = ss::cpu_to_be(schema_id);
45+
std::string payload = "Hello world";
46+
47+
iobuf value = buf_from(magic, schema_id_encoded, payload);
2748

2849
auto res = datalake::get_value_schema_id(value);
2950
ASSERT_TRUE(res.has_value());
30-
EXPECT_EQ(res.value()(), schema_id);
51+
EXPECT_EQ(res.value().schema_id(), schema_id);
52+
EXPECT_EQ(res.value().shared_message_data.size_bytes(), payload.size());
3153
}
3254

3355
TEST(DatalakeSchemaRegistry, SchemaIdForShortRecord) {
3456
iobuf value;
3557

3658
uint8_t magic = 0;
3759
int32_t schema_id = 12;
38-
int32_t schema_id_encoded = htobe32(schema_id);
60+
int32_t schema_id_encoded = ss::cpu_to_be(schema_id);
3961
value.append(&magic, 1);
4062
// Only adding 3 bytes here instead of 4 to generate an invalid record.
4163
value.append(reinterpret_cast<uint8_t*>(&schema_id_encoded), 3);
@@ -46,14 +68,80 @@ TEST(DatalakeSchemaRegistry, SchemaIdForShortRecord) {
4668
}
4769

4870
TEST(DatalakeSchemaRegistry, SchemaIdForBadMagic) {
49-
iobuf value;
50-
5171
uint8_t magic = 5; // Invalid magic
5272
int32_t schema_id = 12;
53-
int32_t schema_id_encoded = htobe32(schema_id);
54-
value.append(&magic, 1);
55-
value.append(reinterpret_cast<uint8_t*>(&schema_id_encoded), 4);
73+
int32_t schema_id_encoded = ss::cpu_to_be(schema_id);
74+
75+
iobuf value = buf_from(magic, schema_id_encoded);
76+
5677
auto res = datalake::get_value_schema_id(value);
5778
ASSERT_TRUE(res.has_error());
5879
EXPECT_EQ(res.error(), datalake::get_schema_error::no_schema_id);
5980
}
81+
82+
TEST(DatalakeSchemaRegistry, GetProtoOffsetsOk) {
83+
uint8_t magic = 0;
84+
uint32_t schema_id = 12;
85+
int32_t schema_id_encoded = ss::cpu_to_be(schema_id);
86+
std::string payload = "Hello world";
87+
88+
uint8_t proto_msg_count = 5;
89+
auto encoded = vint::to_bytes(proto_msg_count);
90+
iobuf value = buf_from(magic, schema_id_encoded, encoded);
91+
92+
for (uint8_t i = 0; i < proto_msg_count; i++) {
93+
encoded = vint::to_bytes(i);
94+
value.append(encoded.data(), encoded.size());
95+
}
96+
value.append(payload.data(), payload.size());
97+
98+
auto res = datalake::get_proto_offsets(value);
99+
ASSERT_TRUE(res.has_value());
100+
const auto& offsets = res.value().protobuf_offsets;
101+
EXPECT_THAT(offsets, testing::ElementsAre(0, 1, 2, 3, 4));
102+
EXPECT_EQ(res.value().schema_id(), schema_id);
103+
EXPECT_EQ(res.value().shared_message_data.size_bytes(), payload.size());
104+
}
105+
106+
TEST(DatalakeSchemaRegistry, GetProtoOffsetsDefaultZero) {
107+
// This tests a special case where the offset count is 0, we should assume
108+
// that the message is the first one defined in the schema and return {0}.
109+
110+
uint8_t magic = 0;
111+
uint32_t schema_id = 12;
112+
int32_t schema_id_encoded = ss::cpu_to_be(schema_id);
113+
std::string payload = "Hello world";
114+
115+
uint8_t proto_msg_count = 0;
116+
auto encoded = vint::to_bytes(proto_msg_count);
117+
118+
iobuf value = buf_from(magic, schema_id_encoded, encoded, payload);
119+
120+
auto res = datalake::get_proto_offsets(value);
121+
ASSERT_TRUE(res.has_value());
122+
const auto& offsets = res.value().protobuf_offsets;
123+
EXPECT_EQ(offsets.size(), 1);
124+
EXPECT_EQ(offsets[0], 0);
125+
EXPECT_EQ(res.value().schema_id(), schema_id);
126+
EXPECT_EQ(res.value().shared_message_data.size_bytes(), payload.size());
127+
}
128+
129+
TEST(DatalakeSchemaRegistry, GetProtoOffsetsNotEnoughData) {
130+
uint8_t magic = 0;
131+
uint32_t schema_id = 12;
132+
int32_t schema_id_encoded = ss::cpu_to_be(schema_id);
133+
134+
uint8_t proto_msg_count = 9;
135+
auto encoded = vint::to_bytes(proto_msg_count);
136+
137+
iobuf value = buf_from(magic, schema_id_encoded, encoded);
138+
139+
for (uint8_t i = 0; i < proto_msg_count - 1; i++) {
140+
encoded = vint::to_bytes(i);
141+
value.append(encoded.data(), encoded.size());
142+
}
143+
144+
auto res = datalake::get_proto_offsets(value);
145+
ASSERT_TRUE(res.has_error());
146+
EXPECT_EQ(res.error(), datalake::get_schema_error::not_enough_bytes);
147+
}

0 commit comments

Comments
 (0)