diff --git a/src/v/iceberg/compatibility.cc b/src/v/iceberg/compatibility.cc index 7df169c5ffc8c..e1e3b3c8e399b 100644 --- a/src/v/iceberg/compatibility.cc +++ b/src/v/iceberg/compatibility.cc @@ -422,4 +422,18 @@ schema_transform_result validate_schema_transform(struct_type& dest) { }); } +schema_evolution_result +evolve_schema(const struct_type& source, const struct_type& dest) { + auto dest_copy = dest.copy(); + auto transform_res = annotate_schema_transform(source, dest_copy); + if (transform_res.has_error()) { + return transform_res.error(); + } + if (auto validate_res = validate_schema_transform(dest_copy); + validate_res.has_error()) { + return validate_res.error(); + } + return std::move(dest_copy); +} + } // namespace iceberg diff --git a/src/v/iceberg/compatibility.h b/src/v/iceberg/compatibility.h index a6f786721e7a7..acfa327b4739f 100644 --- a/src/v/iceberg/compatibility.h +++ b/src/v/iceberg/compatibility.h @@ -89,4 +89,31 @@ annotate_schema_transform(const struct_type& source, const struct_type& dest); */ schema_transform_result validate_schema_transform(struct_type& dest); +/** + * evolve_schema - Prepares dest for insertion into table metadata. + * + * Take a copy of dest, annotate it with source metadata, + * apply the annotations, and return the copy (or an error). + * + * Preconditions: + * - Both input structs are un-annotated. That is, none of their + * fields have the optional ::meta filled in. + * + * Postconditions: + * - All fields in dest are either assigned a unique ID carried over + * from source, or are marked as "new" (i.e. needing a unique ID + * assigned before insertion into metadata) + * - Each field in source is annotated with metadata indicating whether + * it has a compatible counterpart in dest. + * + * @param source - The source (i.e original) schema + * @param dest - the proposed schema (probably extracted from some incoming + * record) + * + * @return A copy of the dest schema, with column IDs mapped from source + * or field metadata indicating a new column, as approprate. + */ +schema_evolution_result +evolve_schema(const struct_type& source, const struct_type& dest); + } // namespace iceberg diff --git a/src/v/iceberg/tests/compatibility_test.cc b/src/v/iceberg/tests/compatibility_test.cc index 31833875b53a5..045f99d0beba2 100644 --- a/src/v/iceberg/tests/compatibility_test.cc +++ b/src/v/iceberg/tests/compatibility_test.cc @@ -1032,3 +1032,40 @@ TEST_P(ValidateAnnotationTest, ValidateCatchesTypeErrors) { EXPECT_FALSE(res.has_error()); } + +struct StructEvoCompatibilityTest : public StructCompatibilityTestBase {}; + +INSTANTIATE_TEST_SUITE_P( + StructEvolutionTest, + StructEvoCompatibilityTest, + ::testing::ValuesIn(valid_plus_errs(invalid_cases))); + +TEST_P(StructEvoCompatibilityTest, CanEvolveStructsAndDetectErrors) { + // generate a schema per the test case + auto original_schema_struct = generator(); + + // manually update a copy of the schema in some way, also specified by the + // test case + auto type = update(original_schema_struct); + + // try to evolve the original schema into the new and update the latter + // accordingly. check against expectations (both success and expected + // qualities of the result) + auto evolve_res = evolve_schema(original_schema_struct, type); + ASSERT_EQ(evolve_res.has_error(), err().has_error()) + << (evolve_res.has_error() + ? fmt::format("Unexpected error: {}", evolve_res.error()) + : fmt::format( + "Expected {} got {}", err().error(), evolve_res.value())); + if (evolve_res.has_error()) { + ASSERT_EQ(evolve_res.error(), err().error()); + return; + } + + // Full validation step for struct evolution result + ASSERT_TRUE(validator(original_schema_struct, evolve_res.value())) + << fmt::format( + "Original: {}\nEvolved: {}", + original_schema_struct, + evolve_res.value()); +}