Skip to content

Commit 5a3b749

Browse files
committed
datalake: parser for protobuf message offset list
Protobuf records have an additional prefix after the schema id. Since a protobuf schema can contain multiple message types, and those messages may be nested, they contain a list of message ids to be traversed to find the actual message schema for this record. These are encoded as a varint representing the length followed by a list of varints.
1 parent 739be8a commit 5a3b749

File tree

3 files changed

+162
-6
lines changed

3 files changed

+162
-6
lines changed

src/v/datalake/schema_registry.cc

+51-2
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,11 @@
99
*/
1010
#include "datalake/schema_registry.h"
1111

12+
#include "bytes/iobuf_parser.h"
1213
#include "pandaproxy/schema_registry/types.h"
1314

1415
namespace datalake {
15-
get_schema_id_result get_value_schema_id(const iobuf& buf) {
16+
get_schema_id_result get_value_schema_id(iobuf& buf) {
1617
// Messages that use the schema registry have a 5-byte prefix:
1718
// offset 0: magic byte with value 0
1819
// offsets 1-4: schema ID as big-endian integer
@@ -27,7 +28,55 @@ get_schema_id_result get_value_schema_id(const iobuf& buf) {
2728
return get_schema_error::no_schema_id;
2829
}
2930
auto id = parser.consume_be_type<int32_t>();
30-
return pandaproxy::schema_registry::schema_id{id};
31+
schema_message_data res = {
32+
.schema_id = pandaproxy::schema_registry::schema_id{id},
33+
.shared_message_data = buf.share(
34+
parser.bytes_consumed(), parser.bytes_left())};
35+
36+
return res;
37+
}
38+
39+
// TODO: this is mostly a copy-and-paste of get_proto_offsets from
40+
// pandaproxy::schema_registry with a slightly different interface. Unify these.
41+
get_proto_offsets_result get_proto_offsets(iobuf& buf) {
42+
auto header = get_value_schema_id(buf);
43+
if (!header.has_value()) {
44+
return header.error();
45+
}
46+
proto_schema_message_data result;
47+
result.schema_id = header.value().schema_id;
48+
iobuf_const_parser parser(header.value().shared_message_data);
49+
50+
// The encoding is a length, followed by indexes into the file or message.
51+
// Each number is a zigzag encoded integer.
52+
auto [offset_count, bytes_read] = parser.read_varlong();
53+
if (!bytes_read) {
54+
return get_schema_error::bad_varint;
55+
}
56+
// Reject more offsets than bytes remaining; it's not possible
57+
if (static_cast<size_t>(offset_count) > parser.bytes_left()) {
58+
return get_schema_error::not_enough_bytes;
59+
}
60+
if (offset_count == 0) {
61+
result.protobuf_offsets.push_back(0);
62+
result.shared_message_data = header.value().shared_message_data.share(
63+
parser.bytes_consumed(), parser.bytes_left());
64+
return result;
65+
}
66+
result.protobuf_offsets.resize(offset_count);
67+
for (auto& o : result.protobuf_offsets) {
68+
if (parser.bytes_left() == 0) {
69+
return get_schema_error::not_enough_bytes;
70+
}
71+
std::tie(o, bytes_read) = parser.read_varlong();
72+
if (!bytes_read) {
73+
return get_schema_error::bad_varint;
74+
}
75+
}
76+
77+
result.shared_message_data = header.value().shared_message_data.share(
78+
parser.bytes_consumed(), parser.bytes_left());
79+
return result;
3180
}
3281

3382
} // namespace datalake

src/v/datalake/schema_registry.h

+23-3
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,26 @@ class iobuf;
1818

1919
namespace datalake {
2020

21+
// Stores a pair of schema id and the remaining message data after the schema
22+
// prefix. The message data is shared from the original iobuf.
23+
struct schema_message_data {
24+
pandaproxy::schema_registry::schema_id schema_id;
25+
iobuf shared_message_data;
26+
};
27+
28+
// Stores the schema_id, protobuf message offsets, and remaining message data.
29+
// The message data is shared from the original iobuf.
30+
struct proto_schema_message_data {
31+
pandaproxy::schema_registry::schema_id schema_id;
32+
std::vector<int32_t> protobuf_offsets;
33+
iobuf shared_message_data;
34+
};
35+
2136
enum class get_schema_error {
2237
ok = 0,
2338
no_schema_id,
2439
not_enough_bytes,
40+
bad_varint,
2541
};
2642

2743
struct get_schema_error_category : std::error_category {
@@ -35,6 +51,8 @@ struct get_schema_error_category : std::error_category {
3551
return "No schema ID";
3652
case get_schema_error::not_enough_bytes:
3753
return "Not enough bytes in message";
54+
case get_schema_error::bad_varint:
55+
return "Bad encoded value for varint";
3856
}
3957
}
4058

@@ -48,13 +66,15 @@ inline std::error_code make_error_code(get_schema_error e) noexcept {
4866
return {static_cast<int>(e), get_schema_error_category::error_category()};
4967
}
5068

51-
using get_schema_id_result
52-
= result<pandaproxy::schema_registry::schema_id, get_schema_error>;
69+
using get_schema_id_result = result<schema_message_data, get_schema_error>;
70+
using get_proto_offsets_result
71+
= result<proto_schema_message_data, get_schema_error>;
5372

5473
// Extract the schema id from a record's value. This simply extracts the id. It
5574
// does not do any validation. Returns std::nullopt if the record does not have
5675
// a schema id.
57-
get_schema_id_result get_value_schema_id(const iobuf& record);
76+
get_schema_id_result get_value_schema_id(iobuf& record);
77+
get_proto_offsets_result get_proto_offsets(iobuf& record);
5878

5979
} // namespace datalake
6080

src/v/datalake/tests/schema_registry_test.cc

+88-1
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
#include "bytes/iobuf.h"
1212
#include "datalake/schema_registry.h"
1313
#include "model/record.h"
14+
#include "utils/vint.h"
1415

1516
#include <gtest/gtest.h>
1617

@@ -22,12 +23,15 @@ TEST(DatalakeSchemaRegistry, SchemaIdForValidRecord) {
2223
uint8_t magic = 0;
2324
int32_t schema_id = 12;
2425
int32_t schema_id_encoded = htobe32(schema_id);
26+
std::string payload = "Hello world";
2527
value.append(&magic, 1);
2628
value.append(reinterpret_cast<uint8_t*>(&schema_id_encoded), 4);
29+
value.append(payload.data(), payload.size());
2730

2831
auto res = datalake::get_value_schema_id(value);
2932
ASSERT_TRUE(res.has_value());
30-
EXPECT_EQ(res.value()(), schema_id);
33+
EXPECT_EQ(res.value().schema_id(), schema_id);
34+
EXPECT_EQ(res.value().shared_message_data.size_bytes(), payload.size());
3135
}
3236

3337
TEST(DatalakeSchemaRegistry, SchemaIdForShortRecord) {
@@ -57,3 +61,86 @@ TEST(DatalakeSchemaRegistry, SchemaIdForBadMagic) {
5761
ASSERT_TRUE(res.has_error());
5862
EXPECT_EQ(res.error(), datalake::get_schema_error::no_schema_id);
5963
}
64+
65+
TEST(DatalakeSchemaRegistry, GetProtoOffsetsOk) {
66+
iobuf value;
67+
68+
uint8_t magic = 0; // Invalid magic
69+
uint32_t schema_id = 12;
70+
int32_t schema_id_encoded = htobe32(schema_id);
71+
std::string payload = "Hello world";
72+
value.append(&magic, 1);
73+
value.append(reinterpret_cast<uint8_t*>(&schema_id_encoded), 4);
74+
75+
uint8_t proto_msg_count = 9;
76+
std::array<uint8_t, 16> encoded;
77+
size_t encoded_size = vint::serialize(proto_msg_count, encoded.data());
78+
value.append(encoded.data(), encoded_size);
79+
80+
for (uint8_t i = 0; i < proto_msg_count; i++) {
81+
encoded_size = vint::serialize(i, encoded.data());
82+
value.append(encoded.data(), encoded_size);
83+
}
84+
value.append(payload.data(), payload.size());
85+
86+
auto res = datalake::get_proto_offsets(value);
87+
ASSERT_TRUE(res.has_value());
88+
const auto& offsets = res.value().protobuf_offsets;
89+
EXPECT_EQ(offsets.size(), proto_msg_count);
90+
for (int32_t o = 0; o < offsets.size(); o++) {
91+
EXPECT_EQ(o, offsets[o]);
92+
}
93+
EXPECT_EQ(res.value().schema_id(), schema_id);
94+
EXPECT_EQ(res.value().shared_message_data.size_bytes(), payload.size());
95+
}
96+
97+
TEST(DatalakeSchemaRegistry, GetProtoOffsetsDefaultZero) {
98+
// This tests a special case where the offset count is 0, we should assume
99+
// that the message is the first one defined in the schema and return {0}.
100+
iobuf value;
101+
102+
uint8_t magic = 0; // Invalid magic
103+
uint32_t schema_id = 12;
104+
int32_t schema_id_encoded = htobe32(schema_id);
105+
std::string payload = "Hello world";
106+
value.append(&magic, 1);
107+
value.append(reinterpret_cast<uint8_t*>(&schema_id_encoded), 4);
108+
109+
uint8_t proto_msg_count = 0;
110+
std::array<uint8_t, 16> encoded;
111+
size_t encoded_size = vint::serialize(proto_msg_count, encoded.data());
112+
value.append(encoded.data(), encoded_size);
113+
value.append(payload.data(), payload.size());
114+
115+
auto res = datalake::get_proto_offsets(value);
116+
ASSERT_TRUE(res.has_value());
117+
const auto& offsets = res.value().protobuf_offsets;
118+
EXPECT_EQ(offsets.size(), 1);
119+
EXPECT_EQ(offsets[0], 0);
120+
EXPECT_EQ(res.value().schema_id(), schema_id);
121+
EXPECT_EQ(res.value().shared_message_data.size_bytes(), payload.size());
122+
}
123+
124+
TEST(DatalakeSchemaRegistry, GetProtoOffsetsNotEnoughData) {
125+
iobuf value;
126+
127+
uint8_t magic = 0; // Invalid magic
128+
uint32_t schema_id = 12;
129+
int32_t schema_id_encoded = htobe32(schema_id);
130+
value.append(&magic, 1);
131+
value.append(reinterpret_cast<uint8_t*>(&schema_id_encoded), 4);
132+
133+
uint8_t proto_msg_count = 9;
134+
std::array<uint8_t, 16> encoded;
135+
size_t encoded_size = vint::serialize(proto_msg_count, encoded.data());
136+
value.append(encoded.data(), encoded_size);
137+
138+
for (uint8_t i = 0; i < proto_msg_count - 1; i++) {
139+
encoded_size = vint::serialize(i, encoded.data());
140+
value.append(encoded.data(), encoded_size);
141+
}
142+
143+
auto res = datalake::get_proto_offsets(value);
144+
ASSERT_TRUE(res.has_error());
145+
EXPECT_EQ(res.error(), datalake::get_schema_error::not_enough_bytes);
146+
}

0 commit comments

Comments
 (0)