Skip to content

Commit

Permalink
iceberg/compat: Introduce evolve_schema
Browse files Browse the repository at this point in the history
Make a copy of the destination schema.
Generate a schema transform from source to dest.
Apply the transform to the copy and return the result (or an error code)

Signed-off-by: Oren Leiman <[email protected]>
  • Loading branch information
oleiman committed Jan 6, 2025
1 parent e1b228b commit 66bcfb8
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 0 deletions.
14 changes: 14 additions & 0 deletions src/v/iceberg/compatibility.cc
Original file line number Diff line number Diff line change
Expand Up @@ -422,4 +422,18 @@ schema_transform_result validate_schema_transform(struct_type& dest) {
});
}

schema_evolution_result
evolve_schema(const struct_type& source, const struct_type& dest) {
auto dest_copy = dest.copy();
auto transform_res = annotate_schema_transform(source, dest_copy);
if (transform_res.has_error()) {
return transform_res.error();
}
if (auto validate_res = validate_schema_transform(dest_copy);
validate_res.has_error()) {
return validate_res.error();
}
return std::move(dest_copy);
}

} // namespace iceberg
27 changes: 27 additions & 0 deletions src/v/iceberg/compatibility.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,4 +89,31 @@ annotate_schema_transform(const struct_type& source, const struct_type& dest);
*/
schema_transform_result validate_schema_transform(struct_type& dest);

/**
* evolve_schema - Prepares dest for insertion into table metadata.
*
* Take a copy of dest, annotate it with source metadata,
* apply the annotations, and return the copy (or an error).
*
* Preconditions:
* - Both input structs are un-annotated. That is, none of their
* fields have the optional ::meta filled in.
*
* Postconditions:
* - All fields in dest are either assigned a unique ID carried over
* from source, or are marked as "new" (i.e. needing a unique ID
* assigned before insertion into metadata)
* - Each field in source is annotated with metadata indicating whether
* it has a compatible counterpart in dest.
*
* @param source - The source (i.e original) schema
* @param dest - the proposed schema (probably extracted from some incoming
* record)
*
* @return A copy of the dest schema, with column IDs mapped from source
* or field metadata indicating a new column, as approprate.
*/
schema_evolution_result
evolve_schema(const struct_type& source, const struct_type& dest);

} // namespace iceberg
37 changes: 37 additions & 0 deletions src/v/iceberg/tests/compatibility_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1032,3 +1032,40 @@ TEST_P(ValidateAnnotationTest, ValidateCatchesTypeErrors) {

EXPECT_FALSE(res.has_error());
}

struct StructEvoCompatibilityTest : public StructCompatibilityTestBase {};

INSTANTIATE_TEST_SUITE_P(
StructEvolutionTest,
StructEvoCompatibilityTest,
::testing::ValuesIn(valid_plus_errs(invalid_cases)));

TEST_P(StructEvoCompatibilityTest, CanEvolveStructsAndDetectErrors) {
// generate a schema per the test case
auto original_schema_struct = generator();

// manually update a copy of the schema in some way, also specified by the
// test case
auto type = update(original_schema_struct);

// try to evolve the original schema into the new and update the latter
// accordingly. check against expectations (both success and expected
// qualities of the result)
auto evolve_res = evolve_schema(original_schema_struct, type);
ASSERT_EQ(evolve_res.has_error(), err().has_error())
<< (evolve_res.has_error()
? fmt::format("Unexpected error: {}", evolve_res.error())
: fmt::format(
"Expected {} got {}", err().error(), evolve_res.value()));
if (evolve_res.has_error()) {
ASSERT_EQ(evolve_res.error(), err().error());
return;
}

// Full validation step for struct evolution result
ASSERT_TRUE(validator(original_schema_struct, evolve_res.value()))
<< fmt::format(
"Original: {}\nEvolved: {}",
original_schema_struct,
evolve_res.value());
}

0 comments on commit 66bcfb8

Please sign in to comment.