-
Notifications
You must be signed in to change notification settings - Fork 601
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[CORE-8525] - Iceberg schema evolution #24676
[CORE-8525] - Iceberg schema evolution #24676
Conversation
eef6479
to
9d607f2
Compare
Signed-off-by: Oren Leiman <[email protected]>
0773533
to
e0afd1f
Compare
CI Failures:
|
e0afd1f
to
02b5d2c
Compare
force push - refactored interfaces to indicate at each step whether anything changed (fields added/removed, types promoted, requiredness changed). Might want more granularity at some point, but basically as a first step want to signal to calling code whether a metadata update is required. |
CI Failures:
|
CI test resultstest results on build#60330
test results on build#60369
test results on build#60379
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you lay out the next steps from this piece? Here we have the ability to validate and have all the delta information between the two schemas.
I'm assuming we need to:
- apply transformations in the data path from old data to new schemas based on the metadata here.
- alter tables as needed in iceberg (which there is already some code for right?)
}, | ||
}, | ||
struct_evolution_test_case{ | ||
.description = "removing a required field works", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought we just make these null
- this isn't backwards compatible... I guess the Iceberg spec allows this so it's fine?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought we just make these
null
Ah, that's the other thing we talked about last week. There's a product preference for "clean" removal of columns. I don't feel strongly about it either way, considering 👇
this isn't backwards compatible
I think it is. As a parquet reader, I assume a column whose ID doesn't appear in the schema is just skipped?
Iceberg spec allows this
☝️
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As a parquet reader, I assume a column whose ID doesn't appear in the schema is just skipped?
I'm talking if you have a query that is SELECT foo, bar FROM table
and you delete column bar
then the query fails.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some automation will list all of the column names in a SELECT statement, but maybe this isn't a big deal because someone will have to explicitly remove said column from source schema and this is a case of we recommend you turn on iceberg compat checks in SR first
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm talking if you have a query that is
SELECT foo, bar FROM table
Interesting. I think we'll need to revisit this with @mattschumpert , as the whole motivation for dropping the column "for real" was something like "you dropped a field from your schema, it's confusing if it still appears there, even if always null".
I don't have a good sense of which invariant is more important from a product perspective. Doesn't seem like a blocker on this PR though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it might be OK if we drop something for real to mirror that. Connector based approaches don't do that AFAIK, but that might be something that can be done and we just recommend enabling the SR compat checks so you don't break queries
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Iceberg spec allows this
For my own understanding, can you elaborate on how this behaves with various query engines? Does this make the entire table unusable for certain query engines? Or does this only affect as-of queries? Or is it something that's inconsistent across engines at the moment?
Presumably if the current schema has a required field removed, subsequent queries should correctly not be able to use that field, though I guess that's only unsurprising if we think of "required" as "non-nullable"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From a SQL statement/validation side of things removing columns is only an issue if you explicitly use that column. If you're just doing SELECT * or never referencing it explicitly then it's fine to remove.
Presumably if the current schema has a required field removed, subsequent queries should correctly not be able to use that field, though I guess that's only unsurprising if we think of "required" as "non-nullable"
If you're saying the case of making a column nullable from NOT NULL, then yes nothing "breaks" in terms of the query runs, but predicates/transforms on that column may have unintended side effects, but they should not "break" from my understanding of ANSI SQL. If a query engine has different semantics we can't really do anything about it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Anyways, I think it's fine to remove columns that are removed from the source schema because we can add compat checks later to SR that would catch these (and likely this would be caught in the existing SR validation).
// TODO(oren): do we need to detect field reordering? or just support it? | ||
// should reordered fields in a schema correspond to some parquet layout | ||
// change on disk? | ||
.any_change = schema_changed::no, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think schema registry canonicalization should save us from some of this (we should probably check that we're traversing the protobuf schema correctly), but I don't think we should care about ordering of fields, since parquet is columar it should not matter the order of columns (where in row oriented formats you might be able to stop parsing early if you don't need all the fields?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
schema registry canonicalization should save us
great point
parquet is columnar it should not matter the order of columns
That's my read of the correctness rule - i.e. "it doesn't matter what order fields appear in the schema" vs "you can change the column order by schema evolution".
I think the first one is covered by |
In my mind, both of these are covered by 8530.
Or something. @andrwng had suggested another approach that I understood as (paraphrasing) "keep writing 'old data' to the original Parquet file with the understanding that it can be read by the new schema". So we need to agree on implementation detail, but generally keeping forward compatible records off the DLQ is the goal.
Yup, though I think most of the existing code is fit for purpose. There's a bunch of stuff in place already for updating schemas, so we're mostly adding support for "interesting" updates. |
Agreed on the end goal - I'm not going to fight too hard for one or the other, but note that you can have producers writing both, so you might get small files during a translation interval. It does seem simpler tho so I'm happy to go that route if that's true. |
Yeah no strong opinion on my end either, though my initial instinct was to upcast. |
In general this all LGTM outside the nits and small cleanups around the variant, I just want to make sure I have a good understanding of the current standing of things and the roadmap here. Thanks for this! |
02b5d2c
to
3094cfe
Compare
force push CR comments (comments & adjusting the shape of |
CI Failure:
|
Yeah I think a bunch of agents got nuked or something 🤷 |
/ci-repeat 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice! Overall this looks pretty solid.
"keep writing 'old data' to the original Parquet file with the understanding that it can be read by the new schema". So we need to agree on implementation detail
Yea the thought here was that it's hopefully uncommon for many schemas to be in use at once, and that with out current data primitives it's probably a good amount of non-trivial code to cast all the different kinds of schema changes to the actual data (especially thinking about field removal, nullability, and such).
My hunch is that the onus would generally be on query engines to reconcile the data schema with the current Iceberg schema. If that is the case, the "write old data" approach seems pretty reasonable. Wondering if either of you have had experience with that yet?
using evolution_metadata | ||
= std::variant<std::nullopt_t, src_info, is_new, removed>; | ||
mutable evolution_metadata meta{std::nullopt}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I roughly understand why it is the case, but it's interesting that this field isn't used in the comparison operator of nested_field
. Just want to call that out to make sure that's intentional.
It also makes me wonder whether we should do schema evolution on some struct field_and_evo_meta { field, evolution_metadata }
rather than on the fields themselves, though I guess that'd also require use to duplicate a ton of code for "evolvable complex types"..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, it's intentional. The general idea is to distinguish this little annotation field from those which carry Iceberg semantics.
As far as wrapping in an aggregate, I thought about that. I also have a previous iteration in the can that does the traversal and builds up a flat vector of "actions" (w/ field references inside) that can be evaluated post facto.
I think these alternatives mostly work fine (if they require more code), but it's a bit cumbersome to reason about pushing that information down towards table metadata w/o more API shenanigans.
I agree it looks sort of wacky, but my intuition is that sticking this extra couple bytes on each field is the simplest thing.
}, | ||
}, | ||
struct_evolution_test_case{ | ||
.description = "removing a required field works", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Iceberg spec allows this
For my own understanding, can you elaborate on how this behaves with various query engines? Does this make the entire table unusable for certain query engines? Or does this only affect as-of queries? Or is it something that's inconsistent across engines at the moment?
Presumably if the current schema has a required field removed, subsequent queries should correctly not be able to use that field, though I guess that's only unsurprising if we think of "required" as "non-nullable"
schema_transform_result | ||
annotate_schema_transform(const struct_type& source, const struct_type& dest); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a thought, this seems like a thing we could fuzz test
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah i agree. probably should. noted for a follow up 🙂
return os << tc.description; | ||
} | ||
|
||
static const std::vector<struct_evolution_test_case> valid_cases{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wondering whether the current code supports multiple updates in the same evolution?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you mean prior to this diff? I think so
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I meant with the new compatibility checks -- for instance, I was wondering if it made sense to test an add and remove in the same evolution (unless i missed cases where multiple operations are happening)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh, yeah definitely. i don't think there's a specific test geared towards that, but the one whose description begins "grouping multiple fields into a struct..." has that effect
No I actually agree here that this is the simpler path forward and we should start with this. With compaction and the assumption that schema evolution is not a frequent operation, I think we may not need the more complicated thing. |
Retry command for Build#60369please wait until all jobs are finished before running the slash command
|
variant: - bool is_new - bool removed - struct src_info - id of the source field - requiredness of the source field - optionally source field type (if primitive) Used in subsequent commits to: - cheaply identify struct fields that have changed shape - defer type & requiredness invariant checks to the end Only track primitive types because a) they are cheap to copy and b) structured types are effectively checked during traversal Signed-off-by: Oren Leiman <[email protected]>
Signed-off-by: Oren Leiman <[email protected]>
Templatize the visitor to add a const nested_field* version. Signed-off-by: Oren Leiman <[email protected]>
Some functions to support recursive iteration and transformation of the fields under a field or struct. Signed-off-by: Oren Leiman <[email protected]>
And lightly refactor test suite base class for reuse Signed-off-by: Oren Leiman <[email protected]>
Mix of compatible & incompatible, structural & type oriented Does not include any real tests Signed-off-by: Oren Leiman <[email protected]>
Signed-off-by: Oren Leiman <[email protected]>
Signed-off-by: Oren Leiman <[email protected]>
Make a copy of the destination schema. Generate a schema transform from source to dest. Apply the transform to the copy and return the result (or an error code) Signed-off-by: Oren Leiman <[email protected]>
Generates increasingly nested structs and measures the time it takes to: - collect all fields of two copies w/ field_collecting_visitor - generate a schema transform from one copy to another - visit a generated schema transform Serves as a decent point of comparison between the limited schema verification already present in the tree and the utilities introduced in this commit stack. Signed-off-by: Oren Leiman <[email protected]>
Signed-off-by: Oren Leiman <[email protected]>
3094cfe
to
1bf54b7
Compare
force push contents (CR):
|
S& s, detail::FieldOp<T*> auto&& fn, detail::FieldPredicate auto&& filter) { | ||
static_assert(std::is_const_v<S> == std::is_const_v<T>); | ||
chunked_vector<T*> stk; | ||
for (const auto& f : std::ranges::reverse_view(s.fields)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Interesting that this was reversed between revisions without too much recourse. Curious what the implication is/was?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nothing functional. Just an accidental omission - I intended to duplicate the control flow of the field collector visitor, though it shouldn't have any effect here.
return os << tc.description; | ||
} | ||
|
||
static const std::vector<struct_evolution_test_case> valid_cases{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I meant with the new compatibility checks -- for instance, I was wondering if it made sense to test an add and remove in the same evolution (unless i missed cases where multiple operations are happening)
This PR implements the Iceberg side of Schema Evolution. In broad strokes, it adds:
iceberg::nested_field
, intended to support (1), both in progress and for post-traversal validation.iceberg::nested_field::evolution_metadata
, which validates the result of (1).The visitor pattern in (1) is largely inspired by Apache Iceberg: CheckCompatibility.java and TypeUtil.java, though our implementation is somewhat simpler in the sense that we have
std::variant
w/ proper (enough) dynamic dispatch to arbitrary callables.Microbenchmark
The benchmark included with this PR seeks to compare two mechanisms for traversing a pair of input schemas and updating field IDs:
catalog_schema_manager::fill_field_ids
)Notably this benchmark does not cover cases where the two schemas differ. This is primarily to test the computational cost and memory pressure of the two traversal mechanisms.
Conclusions
Backports Required
Release Notes