Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

datalake: helper code to get schemas for records #23308

Merged
merged 2 commits into from
Oct 3, 2024

Conversation

jcipar
Copy link
Contributor

@jcipar jcipar commented Sep 12, 2024

This adds some helper methods to get the schema for a record, and modifies the record multiplexer to use them. The translators for other types are still in progress, so we don't yet do anything with this information.

Backports Required

  • none - not a bug fix
  • none - this is a backport
  • none - issue does not exist in previous branches
  • none - papercut/not impactful enough to backport
  • v24.2.x
  • v24.1.x
  • v23.3.x

Release Notes

  • none

@github-actions github-actions bot added area/redpanda area/wasm WASM Data Transforms labels Sep 12, 2024
*/
#include "model/record.h"

std::optional<uint32_t> get_value_schema_id(const model::record& record) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where is the format of the record defined? is this format defined by the schema registry? if so, perhaps this should be a helper that lives in schema registry?

Comment on lines 49 to 75
v::application
v::features
v::gtest_main
v::kafka_test_utils
v::datalake
v::model_test_utils
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i don't think you need all these dependencies

#pragma once

#include "bytes/iobuf_parser.h"
#include "model/record.h"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

forward declare model::record

*/
#pragma once

#include "bytes/iobuf_parser.h"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unused import

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unused import

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@@ -74,3 +75,6 @@ class WasmTestFixture : public ::testing::Test {
model::transform_metadata _meta;
std::vector<ss::sstring> _log_lines;
};

// For using fake_schema_registry outside of WASM fixtures.
std::unique_ptr<wasm::schema_registry> make_fake_schema_registry();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, maybe the fake schema registry can live with schema registry? cc @rockwotj

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had suggested to @jcipar to move it to src/v/schema_registry. I'm not sure how we want to structure the HTTP service vs the internal functionality. Maybe we have src/v/schema_registry/http. IDK we should involve the enterprise team in this discussion too.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @BenPope hi!

@@ -11,10 +11,23 @@

#include "bytes/iobuf_parser.h"
#include "model/record.h"
#include "pandaproxy/schema_registry/types.h"
#include "wasm/schema_registry.h"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

forward declare schema registry

@@ -11,10 +11,23 @@

#include "bytes/iobuf_parser.h"
#include "model/record.h"
#include "pandaproxy/schema_registry/types.h"
#include "wasm/schema_registry.h"

#include <optional>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

needs

Comment on lines 28 to 41
batch.for_each_record([&batch, this](model::record&& record) {
iobuf key = record.release_key();
iobuf val = record.release_value();
// *1000: Redpanda timestamps are milliseconds. Iceberg uses
// microseconds.
int64_t timestamp = (batch.header().first_timestamp.value()
+ record.timestamp_delta())
* 1000;
int64_t offset = static_cast<int64_t>(batch.base_offset())
+ record.offset_delta();
int64_t estimated_size = key.size_bytes() + val.size_bytes() + 16;

// Translate the record
auto& translator = get_translator();
iceberg::struct_value data = std::visit(
[&key, &val, timestamp, offset](schemaless_translator& tr) {
return tr.translate_event(
std::move(key), std::move(val), timestamp, offset);
},
translator);

// Send it to the writer
auto& writer = get_writer();
writer.add_data_struct(std::move(data), estimated_size);
});
co_await batch.for_each_record_async(
[&batch, this](model::record&& record) -> ss::future<> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

separate commit explaining change to async?

int64_t estimated_size = key.size_bytes() + val.size_bytes() + 16;

// Translate the record
auto& translator = co_await get_translator(record);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

co-routine lambda's aren't allowed

Comment on lines 27 to 29
using get_schema_result = std::variant<
pandaproxy::schema_registry::canonical_schema_definition,
get_schema_error>;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something like the following is more idiomatic:

namespace datalake {

enum class error_code  {
    success = 0,
    no_schema_id,
    invalid_schema_id
};

std::error_code make_error_code(error_code e) noexcept;

template<typename T>
using result = result<T, error_info>;

} // namespace datalake

namespace std {

template<>
struct is_error_code_enum<datalake::error_code> : true_type {};

} // namespace std

return std::nullopt;
}
auto id = parser.consume_type<uint32_t>();
return id;
Copy link
Member

@BenPope BenPope Sep 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might not be sufficient for protobuf, which uses zig-zag encoded offsets to get correct message if the Subject Name Strategy isn't TopicName.

It may be worth extracting and reusing some of the code in the existing validation

@jcipar jcipar force-pushed the jcipar/datalake-schema-registry branch 3 times, most recently from 4df5a62 to 65940dd Compare September 30, 2024 21:37
@jcipar jcipar marked this pull request as ready for review September 30, 2024 21:37
*/
#pragma once

#include "bytes/iobuf_parser.h"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unused import

// Extract the schema id from a record's value. This simply extracts the id. It
// does not do any validation. Returns std::nullopt if the record does not have
// a schema id.
get_schema_id_result get_value_schema_id(const iobuf& record);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

forward declare iobuf

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

}
iobuf_const_parser parser(buf);
auto magic = parser.consume_type<uint8_t>();
if (magic != 0) {
Copy link
Member

@dotnwat dotnwat Sep 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

curious if this is a value we control?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the schema registry format leading "magic byte".

Some named constants would help in reading the code here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks. was mostly just curious--zero would not be my choice for a magic value, but i guess we can't control everything!

Comment on lines 42 to 71
if (static_cast<size_t>(offset_count) > parser.bytes_left()) {
return get_schema_error::not_enough_bytes;
}
offsets.resize(offset_count);
for (auto& o : offsets) {
std::tie(o, bytes_read) = parser.read_varlong();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doesn't this provide an inconsistent semantic for short read scenarios? for example, even if offset_count < parser.bytes_bytes() then you could still run out of bytes when parsing since some logical offsets may take more than 1 byte read from the parser? in this case, you could get back not_enough_bytes or some exception thrown out of the parser for the same error scenario?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a bit inconsistent.

There are a few different reasons we might stop parsing a varint:

  • Found a byte with the top bit cleared.
  • Read the maximum number of bytes for the given type.
  • Got to the end of the buffer.

Our varint parser doesn't distinguish between those cases. E.g. detail::var_decoder in vint.h returns true (meaning "stop parsing") upon finding a byte with the top bit cleared, or when "shift > limit", i.e. the number of bits read is greater than the size.

deserialize in that same module loops through a range until it runs out of bytes or val_decoder returns true and returns the number of bytes read.

As far as I can tell, currently the only way we could return 0 bytes read is if we call it at the end of the buffer. Otherwise it always reads at least on byte. Also, we currently don't detect improperly formatted varints. So maybe I should just return not_enough_bytes in both cases.

In the future we could detect improperly-formatted varints and return 0 bytes read from the parser in those cases. In that case it would definitely make sense to separate these cases.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks! i took a closer look at our varint decoder. looks like we have some improvements to it we can make in the future!

return get_schema_error::bad_varint;
}
}
if (offsets.empty()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you also need to validate that the number of offsets read is the same as offset_count?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure how they wouldn't be. The loop that reads the offsets will execute that many times, and if it fails to read an offset it returns an error.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Invalid offsets aside, if offset_count == 0 then we should return {0}. Here's the same method in franz-go:

https://github.com/twmb/franz-go/blob/b77dd13e2bfaee7f5181df27b40ee4a4f6a73b09/pkg/sr/serde.go#L482-L507

We do that here, but in a very roundabout way by resizing then checking the result of that method.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure how they wouldn't be

i'm asking if it is safe to trust the size value you are decoding from the wire format as the prefix on the data.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a test for this case, and generally our iobuf parser throws on short input

@jcipar jcipar force-pushed the jcipar/datalake-schema-registry branch 2 times, most recently from d348baf to d86f6a5 Compare October 1, 2024 15:28
Comment on lines 61 to 62
get_schema_id_result get_value_schema_id(const iobuf& record);
get_proto_offsets_result get_proto_offsets(const iobuf& record);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're going to need to know how much of the value to chop off - so I think the success case needs to either return the remaining data (via share) or how many bytes were read.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated

return get_schema_error::bad_varint;
}
}
if (offsets.empty()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Invalid offsets aside, if offset_count == 0 then we should return {0}. Here's the same method in franz-go:

https://github.com/twmb/franz-go/blob/b77dd13e2bfaee7f5181df27b40ee4a4f6a73b09/pkg/sr/serde.go#L482-L507

We do that here, but in a very roundabout way by resizing then checking the result of that method.

@jcipar jcipar force-pushed the jcipar/datalake-schema-registry branch from d86f6a5 to 5a3b749 Compare October 1, 2024 17:39
Comment on lines 74 to 75
// does not do any validation. Returns std::nullopt if the record does not have
// a schema id.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment is out of date about the return type.

Can you talk about the mutability of the input buffer since it's taken by mutable reference?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is still a reference to std::nullopt which is incorrect


uint8_t proto_msg_count = 9;
std::array<uint8_t, 16> encoded;
size_t encoded_size = vint::serialize(proto_msg_count, encoded.data());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: vint::to_bytes is probably simpler and will make this a bit more readable.

Comment on lines 89 to 92
EXPECT_EQ(offsets.size(), proto_msg_count);
for (int32_t o = 0; o < offsets.size(); o++) {
EXPECT_EQ(o, offsets[o]);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be replaced with:

EXPECT_THAT(offsets, ElementsAre(0, 1, 2, 3, 4, 5, 6, 7, 8, 9))

Ref: http://google.github.io/googletest/reference/matchers.html#container-matchers

// that the message is the first one defined in the schema and return {0}.
iobuf value;

uint8_t magic = 0; // Invalid magic
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wrong comment

TEST(DatalakeSchemaRegistry, GetProtoOffsetsOk) {
iobuf value;

uint8_t magic = 0; // Invalid magic
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wrong comment

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

whoops. Copy-and-paste error :-(


uint8_t magic = 0;
int32_t schema_id = 12;
int32_t schema_id_encoded = htobe32(schema_id);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TBH I find the name htobe32 to be quite horrible (not your fault). Seastar has nice helpers for this: ss::cpu_to_be

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

int32_t schema_id_encoded = htobe32(schema_id);
std::string payload = "Hello world";
value.append(&magic, 1);
value.append(reinterpret_cast<uint8_t*>(&schema_id_encoded), 4);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

some of these tests could be more readable with a helper method to construct an iobuf with something like:

void buf_append(iobuf& b, uint8_t byte) { b.append(&byte, 1); }
void buf_append(iobuf& b, const bytes& byte) {
b.append(byte.data(), byte.size());
}
template<typename... Args>
iobuf buf_from(const Args&... args) {
iobuf b;
(buf_append(b, args), ...);
return b;
}

TEST(DatalakeSchemaRegistry, GetProtoOffsetsNotEnoughData) {
iobuf value;

uint8_t magic = 0; // Invalid magic
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wrong comment

@jcipar jcipar force-pushed the jcipar/datalake-schema-registry branch 3 times, most recently from 05d5ee0 to 0fdf809 Compare October 1, 2024 19:50
Copy link
Contributor

@rockwotj rockwotj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good - just two small things

Comment on lines 74 to 75
// does not do any validation. Returns std::nullopt if the record does not have
// a schema id.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is still a reference to std::nullopt which is incorrect

Comment on lines 87 to 88
// value.append(&magic, 1);
// value.append(reinterpret_cast<uint8_t*>(&schema_id_encoded), 4);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove

@jcipar jcipar force-pushed the jcipar/datalake-schema-registry branch from 0fdf809 to c6e3d5d Compare October 1, 2024 20:17
@vbotbuildovich
Copy link
Collaborator

vbotbuildovich commented Oct 1, 2024

new failures in https://buildkite.com/redpanda/redpanda/builds/55595#019249fa-05d7-4bed-865a-46d4432acc78:

"rptest.tests.debug_bundle_test.DebugBundleTest.test_post_debug_bundle.ignore_none=False"

new failures in https://buildkite.com/redpanda/redpanda/builds/55595#019249fa-05d9-4f38-9899-b4215ed935d4:

"rptest.tests.debug_bundle_test.DebugBundleTest.test_post_debug_bundle.ignore_none=True"

new failures in https://buildkite.com/redpanda/redpanda/builds/55595#01924a14-5ba5-4fc8-81e3-2d4b61fdf340:

"rptest.tests.debug_bundle_test.DebugBundleTest.test_post_debug_bundle.ignore_none=True"

new failures in https://buildkite.com/redpanda/redpanda/builds/55595#01924a14-5ba3-4283-9d11-94fb65d19c8e:

"rptest.tests.debug_bundle_test.DebugBundleTest.test_post_debug_bundle.ignore_none=False"

new failures in https://buildkite.com/redpanda/redpanda/builds/55595#01924a5c-4106-4e85-935b-a541fcf58333:

"rptest.tests.debug_bundle_test.DebugBundleTest.test_post_debug_bundle.ignore_none=False"

new failures in https://buildkite.com/redpanda/redpanda/builds/55595#01924a5c-4193-4a3c-a8a3-d8d8e0796adf:

"rptest.tests.debug_bundle_test.DebugBundleTest.test_post_debug_bundle.ignore_none=True"

new failures in https://buildkite.com/redpanda/redpanda/builds/55595#01924adb-fabe-4aae-825a-d9caf5f1440c:

"rptest.tests.debug_bundle_test.DebugBundleTest.test_post_debug_bundle.ignore_none=False"

new failures in https://buildkite.com/redpanda/redpanda/builds/55595#01924adb-fca6-4da1-9877-f67c222b725f:

"rptest.tests.debug_bundle_test.DebugBundleTest.test_post_debug_bundle.ignore_none=True"

new failures in https://buildkite.com/redpanda/redpanda/builds/55595#01924adb-f8f6-4399-b02c-0ed9e689a654:

"rptest.tests.debug_bundle_test.DebugBundleTest.test_post_debug_bundle.ignore_none=True"

new failures in https://buildkite.com/redpanda/redpanda/builds/55595#01924adb-f45a-457d-b114-d3fe446d51bb:

"rptest.tests.debug_bundle_test.DebugBundleTest.test_post_debug_bundle.ignore_none=False"

new failures in https://buildkite.com/redpanda/redpanda/builds/55595#01924d20-098d-405a-b911-37860f623ce7:

"rptest.tests.debug_bundle_test.DebugBundleTest.test_post_debug_bundle.ignore_none=True"

new failures in https://buildkite.com/redpanda/redpanda/builds/55595#01924d20-0930-47b0-a267-0b7849c92664:

"rptest.tests.debug_bundle_test.DebugBundleTest.test_post_debug_bundle.ignore_none=False"

new failures in https://buildkite.com/redpanda/redpanda/builds/55595#01924d20-08b4-4557-9310-71b2d1fad66b:

"rptest.tests.debug_bundle_test.DebugBundleTest.test_post_debug_bundle.ignore_none=True"

new failures in https://buildkite.com/redpanda/redpanda/builds/55595#01924d20-07f0-413b-9d88-569e3ffd68a9:

"rptest.tests.debug_bundle_test.DebugBundleTest.test_post_debug_bundle.ignore_none=False"

new failures in https://buildkite.com/redpanda/redpanda/builds/55595#01924de6-0a86-4b07-9ce1-46463f696ec8:

"rptest.tests.rpk_generate_test.RpkGenerateTest.test_generate_grafana"

Copy link
Member

@BenPope BenPope left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No objections

Comment on lines +12 to +15
#include "pandaproxy/schema_registry/types.h"

#include <system_error>
#include <type_traits>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
#include "pandaproxy/schema_registry/types.h"
#include <system_error>
#include <type_traits>
#include "base/outcome.h"
#include "pandaproxy/schema_registry/types.h"
#include <system_error>
#include <vector>

@jcipar jcipar force-pushed the jcipar/datalake-schema-registry branch from c6e3d5d to c21f5e7 Compare October 2, 2024 19:09
jcipar added 2 commits October 2, 2024 19:29
This adds a function to parse the schema id from the value of a record.
Protobuf records have an additional prefix after the schema id. Since a
protobuf schema can contain multiple message types, and those messages
may be nested, they contain a list of message ids to be traversed to
find the actual message schema for this record. These are encoded as a
varint representing the length followed by a list of varints.
@jcipar jcipar force-pushed the jcipar/datalake-schema-registry branch from c21f5e7 to 0f557dd Compare October 2, 2024 23:29
@jcipar jcipar merged commit 7ae3406 into redpanda-data:dev Oct 3, 2024
17 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/redpanda area/wasm WASM Data Transforms
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants