diff --git a/src/functions/ducklake_compaction_functions.cpp b/src/functions/ducklake_compaction_functions.cpp index 50c754dff55..f953820f17e 100644 --- a/src/functions/ducklake_compaction_functions.cpp +++ b/src/functions/ducklake_compaction_functions.cpp @@ -47,16 +47,15 @@ vector DuckLakeCompactor::ParseSortOrders(const DuckLakeSort &sort_ } //! Binds ORDER BY expressions directly using ExpressionBinder. -vector DuckLakeCompactor::BindSortOrders(Binder &binder, DuckLakeTableEntry &table, - TableIndex table_index, +vector DuckLakeCompactor::BindSortOrders(Binder &binder, const ColumnList &columns, + const string &table_name, TableIndex table_index, vector &pre_bound_orders) { - auto &columns = table.GetColumns(); auto column_names = columns.GetColumnNames(); auto column_types = columns.GetColumnTypes(); // Create a child binder with the table columns in scope auto child_binder = Binder::CreateBinder(binder.context, &binder); - child_binder->bind_context.AddGenericBinding(table_index, table.name, column_names, column_types); + child_binder->bind_context.AddGenericBinding(table_index, table_name, column_names, column_types); // Bind each ORDER BY expression directly vector orders; @@ -69,6 +68,12 @@ vector DuckLakeCompactor::BindSortOrders(Binder &binder, DuckL return orders; } +vector DuckLakeCompactor::BindSortOrders(Binder &binder, DuckLakeTableEntry &table, + TableIndex table_index, + vector &pre_bound_orders) { + return BindSortOrders(binder, table.GetColumns(), table.name, table_index, pre_bound_orders); +} + //===--------------------------------------------------------------------===// // Compaction Operator //===--------------------------------------------------------------------===// @@ -375,7 +380,7 @@ unique_ptr DuckLakeCompactor::InsertSort(Binder &binder, unique } // Validate all column references in sort expressions exist in the table - DuckLakeTableEntry::ValidateSortExpressionColumns(table, pre_bound_orders); + DuckLakeTableEntry::ValidateSortExpressionColumns(table.GetColumns(), pre_bound_orders); // Resolve types for the input plan (could be LogicalGet or LogicalProjection) plan->ResolveOperatorTypes(); diff --git a/src/include/functions/ducklake_compaction_functions.hpp b/src/include/functions/ducklake_compaction_functions.hpp index a88d3c3ebeb..5a75e2dc085 100644 --- a/src/include/functions/ducklake_compaction_functions.hpp +++ b/src/include/functions/ducklake_compaction_functions.hpp @@ -93,6 +93,10 @@ class DuckLakeCompactor { static vector ParseSortOrders(const DuckLakeSort &sort_data); static vector BindSortOrders(Binder &binder, DuckLakeTableEntry &table, TableIndex table_index, vector &pre_bound_orders); + //! Overload that takes (columns, table_name) directly so CTAS planning can sort-bind before any + //! DuckLakeTableEntry exists. Behaves identically to the entry-taking overload. + static vector BindSortOrders(Binder &binder, const ColumnList &columns, const string &table_name, + TableIndex table_index, vector &pre_bound_orders); private: ClientContext &context; diff --git a/src/include/storage/ducklake_catalog.hpp b/src/include/storage/ducklake_catalog.hpp index f3d5cb014ab..a2b69e56b43 100644 --- a/src/include/storage/ducklake_catalog.hpp +++ b/src/include/storage/ducklake_catalog.hpp @@ -28,6 +28,7 @@ class DuckLakeFieldData; struct DuckLakeFileListEntry; struct DuckLakeConfigOption; struct DeleteFileMap; +struct BoundCreateTableInfo; class LogicalGet; //! Cache entry for DuckLake table statistics @@ -135,6 +136,8 @@ class DuckLakeCatalog : public Catalog { optional_ptr CreateSchema(CatalogTransaction transaction, CreateSchemaInfo &info) override; + ErrorData SupportsCreateTable(BoundCreateTableInfo &info) override; + void ScanSchemas(ClientContext &context, std::function callback) override; optional_ptr LookupSchema(CatalogTransaction transaction, const EntryLookupInfo &schema_lookup, diff --git a/src/include/storage/ducklake_insert.hpp b/src/include/storage/ducklake_insert.hpp index c591763ca50..4bd5ad493d3 100644 --- a/src/include/storage/ducklake_insert.hpp +++ b/src/include/storage/ducklake_insert.hpp @@ -15,13 +15,14 @@ #include "storage/ducklake_stats.hpp" #include "common/ducklake_data_file.hpp" #include "storage/ducklake_field_data.hpp" +#include "storage/ducklake_partition_data.hpp" +#include "storage/ducklake_sort_data.hpp" namespace duckdb { class DuckLakeCatalog; class DuckLakeSchemaEntry; class DuckLakeTableEntry; class DuckLakeFieldData; -struct DuckLakePartition; struct DuckLakeCopyOptions; struct DuckLakeCopyInput; @@ -41,15 +42,19 @@ class DuckLakeInsertGlobalState : public GlobalSinkState { class DuckLakeInsert : public PhysicalOperator { public: - //! INSERT INTO + //! INSERT INTO an existing table. DuckLakeInsert(PhysicalPlan &physical_plan, const vector &types, DuckLakeTableEntry &table, optional_idx partition_id, string encryption_key); - //! CREATE TABLE AS + //! CREATE TABLE AS - the table is created in GetGlobalSinkState. Any inline + //! PARTITIONED BY / SORTED BY clauses are pre-built into ctas_partition_data / ctas_sort_data at planning + //! time so the physical_copy upstream of this operator can hive-partition the write with a partition_id + //! that matches what gets attached to the table entry at sink-state-init. DuckLakeInsert(PhysicalPlan &physical_plan, const vector &types, SchemaCatalogEntry &schema, unique_ptr info, string table_uuid, string table_data_path, - string encryption_key); + unique_ptr ctas_partition_data, unique_ptr ctas_sort_data, + optional_idx partition_id, string encryption_key); - //! The table to insert into + //! The table to insert into (only set for INSERT INTO; nullptr for CTAS until GetGlobalSinkState resolves) optional_ptr table; //! Table schema, in case of CREATE TABLE AS optional_ptr schema; @@ -59,6 +64,11 @@ class DuckLakeInsert : public PhysicalOperator { string table_uuid; //! The table data path, in case of CREATE TABLE AS string table_data_path; + //! Pre-built partition spec for CTAS (allocated at planning time so the physical write carries the + //! correct partition_id). + unique_ptr ctas_partition_data; + //! Pre-built sort spec for CTAS (same lifecycle as ctas_partition_data). + unique_ptr ctas_sort_data; //! The partition id we are writing into (if any) optional_idx partition_id; //! The encryption key used for writing the Parquet files @@ -140,8 +150,13 @@ struct DuckLakeCopyOptions { struct DuckLakeCopyInput { explicit DuckLakeCopyInput(ClientContext &context, DuckLakeTableEntry &table, const string &hive_partition = ""); + //! CTAS-flavored: take the field-id mapping and (optional) partition spec from the planning-time-built + //! spec rather than from a DuckLakeTableEntry that hasn't been created yet. field_data is required + //! because the parquet writer always needs it; partition_data is null when CREATE TABLE AS has no + //! inline PARTITIONED BY clause. DuckLakeCopyInput(ClientContext &context, DuckLakeSchemaEntry &schema, const ColumnList &columns, - const string &data_path_p); + const string &data_path_p, DuckLakeFieldData &field_data, + optional_ptr partition_data = nullptr); DuckLakeCatalog &catalog; optional_ptr partition_data; diff --git a/src/include/storage/ducklake_schema_entry.hpp b/src/include/storage/ducklake_schema_entry.hpp index d50f8e637c9..3257444acd8 100644 --- a/src/include/storage/ducklake_schema_entry.hpp +++ b/src/include/storage/ducklake_schema_entry.hpp @@ -10,6 +10,8 @@ #include "duckdb/catalog/catalog_entry/schema_catalog_entry.hpp" #include "storage/ducklake_catalog_set.hpp" +#include "storage/ducklake_partition_data.hpp" +#include "storage/ducklake_sort_data.hpp" namespace duckdb { class DuckLakeTransaction; @@ -32,8 +34,12 @@ class DuckLakeSchemaEntry : public SchemaCatalogEntry { } public: + //! Create a DuckLakeTableEntry and register it with the transaction. + //! When prebuilt_partition_data / prebuilt_sort_data are used by CTAS when partitioned or sorted optional_ptr CreateTableExtended(CatalogTransaction transaction, BoundCreateTableInfo &info, - string table_uuid, string table_data_path); + string table_uuid, string table_data_path, + unique_ptr prebuilt_partition_data = nullptr, + unique_ptr prebuilt_sort_data = nullptr); unique_ptr GetInfo() const override; optional_ptr CreateTable(CatalogTransaction transaction, BoundCreateTableInfo &info) override; optional_ptr CreateFunction(CatalogTransaction transaction, CreateFunctionInfo &info) override; diff --git a/src/include/storage/ducklake_table_entry.hpp b/src/include/storage/ducklake_table_entry.hpp index 0f97e664a2f..8dc63131f21 100644 --- a/src/include/storage/ducklake_table_entry.hpp +++ b/src/include/storage/ducklake_table_entry.hpp @@ -122,8 +122,17 @@ class DuckLakeTableEntry : public TableCatalogEntry { virtual_column_map_t GetVirtualColumns() const override; vector GetRowIdColumns() const override; - //! Validates that all column references in sort expressions exist in the table - static void ValidateSortExpressionColumns(DuckLakeTableEntry &table, const vector &orders); + //! Validates that all column references in sort expressions exist in the column list. + //! Takes (columns) directly so it can run at CTAS planning time before a DuckLakeTableEntry exists. + static void ValidateSortExpressionColumns(const ColumnList &columns, const vector &orders); + + //! Build a DuckLakePartition from raw partition expressions (allocates a transaction-local id). + static unique_ptr BuildPartitionData(DuckLakeTransaction &transaction, const ColumnList &columns, + DuckLakeFieldData &field_data, + const vector> &partition_keys); + //! Build a DuckLakeSort from a vector of OrderByNode (allocates a transaction-local id). + static unique_ptr BuildSortData(DuckLakeTransaction &transaction, const ColumnList &columns, + const vector &orders); private: unique_ptr AlterTable(DuckLakeTransaction &transaction, RenameTableInfo &info); diff --git a/src/storage/ducklake_catalog.cpp b/src/storage/ducklake_catalog.cpp index c2c7e1f8a5a..ec77b4bfed4 100644 --- a/src/storage/ducklake_catalog.cpp +++ b/src/storage/ducklake_catalog.cpp @@ -10,6 +10,7 @@ #include "duckdb/parser/parsed_data/create_table_info.hpp" #include "duckdb/parser/parsed_data/create_view_info.hpp" #include "duckdb/parser/parsed_data/drop_info.hpp" +#include "duckdb/planner/parsed_data/bound_create_table_info.hpp" #include "duckdb/storage/database_size.hpp" #include "storage/ducklake_initializer.hpp" #include "storage/ducklake_schema_entry.hpp" @@ -155,6 +156,16 @@ optional_ptr DuckLakeCatalog::CreateSchema(CatalogTransaction tran return result; } +ErrorData DuckLakeCatalog::SupportsCreateTable(BoundCreateTableInfo &info) { + auto &base = info.Base().Cast(); + if (!base.options.empty()) { + return ErrorData( + ExceptionType::CATALOG, + StringUtil::Format("WITH clause is not supported for tables in a %s catalog", GetCatalogType())); + } + return ErrorData(); +} + void DuckLakeCatalog::DropSchema(ClientContext &context, DropInfo &info) { auto schema = GetSchema(GetCatalogTransaction(context), info.name, info.if_not_found); if (!schema) { diff --git a/src/storage/ducklake_insert.cpp b/src/storage/ducklake_insert.cpp index 162b9bdc158..9a407d1249e 100644 --- a/src/storage/ducklake_insert.cpp +++ b/src/storage/ducklake_insert.cpp @@ -42,10 +42,13 @@ DuckLakeInsert::DuckLakeInsert(PhysicalPlan &physical_plan, const vector &types, SchemaCatalogEntry &schema, unique_ptr info, string table_uuid_p, - string table_data_path_p, string encryption_key_p) + string table_data_path_p, unique_ptr ctas_partition_data_p, + unique_ptr ctas_sort_data_p, optional_idx partition_id, + string encryption_key_p) : PhysicalOperator(physical_plan, PhysicalOperatorType::EXTENSION, types, 1), table(nullptr), schema(&schema), info(std::move(info)), table_uuid(std::move(table_uuid_p)), table_data_path(std::move(table_data_path_p)), - encryption_key(std::move(encryption_key_p)) { + ctas_partition_data(std::move(ctas_partition_data_p)), ctas_sort_data(std::move(ctas_sort_data_p)), + partition_id(partition_id), encryption_key(std::move(encryption_key_p)) { } //===--------------------------------------------------------------------===// @@ -58,12 +61,18 @@ DuckLakeInsertGlobalState::DuckLakeInsertGlobalState(DuckLakeTableEntry &table) unique_ptr DuckLakeInsert::GetGlobalSinkState(ClientContext &context) const { optional_ptr table_ptr; if (info) { - // CREATE TABLE AS - create the table + // CREATE TABLE AS - materialize the table entry now, attaching a *clone* of the partition / sort spec + // we built at planning time so the on-disk partition_id and the in-memory partition_data agree. + // We clone (rather than std::move) so the operator's prebuilt spec stays intact across executions. + auto partition_clone = ctas_partition_data ? make_uniq(*ctas_partition_data) : nullptr; + auto sort_clone = ctas_sort_data ? make_uniq(*ctas_sort_data) : nullptr; + auto &catalog = schema->catalog; auto &ducklake_schema = schema.get_mutable()->Cast(); auto transaction = catalog.GetCatalogTransaction(context); - table_ptr = &ducklake_schema.CreateTableExtended(transaction, *info, table_uuid, table_data_path) - ->Cast(); + auto created = ducklake_schema.CreateTableExtended(transaction, *info, table_uuid, table_data_path, + std::move(partition_clone), std::move(sort_clone)); + table_ptr = &created->Cast(); } else { // INSERT INTO table_ptr = table; @@ -344,8 +353,11 @@ DuckLakeCopyInput::DuckLakeCopyInput(ClientContext &context, DuckLakeTableEntry } DuckLakeCopyInput::DuckLakeCopyInput(ClientContext &context, DuckLakeSchemaEntry &schema, const ColumnList &columns, - const string &data_path_p) + const string &data_path_p, DuckLakeFieldData &field_data_p, + optional_ptr partition_data_p) : catalog(schema.ParentCatalog().Cast()), columns(columns), data_path(data_path_p) { + field_data = &field_data_p; + partition_data = partition_data_p; schema_id = schema.GetSchemaId(); encryption_key = catalog.GenerateEncryptionKey(context); } @@ -745,40 +757,41 @@ static void ResolveColumnRefs(unique_ptr &expr) { ExpressionIterator::EnumerateChildren(*expr, [](unique_ptr &child) { ResolveColumnRefs(child); }); } -static optional_ptr PlanInsertSort(ClientContext &context, PhysicalPlanGenerator &planner, - PhysicalOperator &plan, DuckLakeTableEntry &table, - optional_ptr sort_data) { - // Parse the sort expressions from the sort_data +static optional_ptr PlanInsertSortFromColumns(ClientContext &context, PhysicalPlanGenerator &planner, + PhysicalOperator &plan, const ColumnList &columns, + const string &table_name, + optional_ptr sort_data) { + // Shared between the INSERT path (called with an existing table's columns/name) and the CTAS path + // (called with the planning-time CreateTableInfo's columns and the to-be-created table's name). auto pre_bound_orders = DuckLakeCompactor::ParseSortOrders(*sort_data); if (pre_bound_orders.empty()) { return nullptr; } + DuckLakeTableEntry::ValidateSortExpressionColumns(columns, pre_bound_orders); - // Validate all column references in sort expressions exist in the table - DuckLakeTableEntry::ValidateSortExpressionColumns(table, pre_bound_orders); - - // Bind the ORDER BY expressions auto binder = Binder::CreateBinder(context); TableIndex table_index(0); - auto orders = DuckLakeCompactor::BindSortOrders(*binder, table, table_index, pre_bound_orders); - - // Convert BoundColumnRefExpression to BoundReferenceExpression for physical plan + auto orders = DuckLakeCompactor::BindSortOrders(*binder, columns, table_name, table_index, pre_bound_orders); for (auto &order : orders) { ResolveColumnRefs(order.expression); } - // Create identity projection map vector projection_map; for (idx_t i = 0; i < plan.types.size(); i++) { projection_map.push_back(i); } - auto &order_op = planner.Make(plan.types, std::move(orders), std::move(projection_map), plan.estimated_cardinality); order_op.children.push_back(plan); return &order_op; } +static optional_ptr PlanInsertSort(ClientContext &context, PhysicalPlanGenerator &planner, + PhysicalOperator &plan, DuckLakeTableEntry &table, + optional_ptr sort_data) { + return PlanInsertSortFromColumns(context, planner, plan, table.GetColumns(), table.name, sort_data); +} + PhysicalOperator &DuckLakeCatalog::PlanInsert(ClientContext &context, PhysicalPlanGenerator &planner, LogicalInsert &op, optional_ptr plan) { if (op.return_chunk) { @@ -792,10 +805,10 @@ PhysicalOperator &DuckLakeCatalog::PlanInsert(ClientContext &context, PhysicalPl } auto &ducklake_table = op.table.Cast(); - // Sort data according to the table's SET SORTED BY configuration + // Sort wrap (when SET SORTED BY is configured and sort_on_insert is enabled). auto sort_data = ducklake_table.GetSortData(); - auto &ducklake_schema_for_sort = ducklake_table.ParentSchema().Cast(); - bool sort_on_insert = GetConfigOption("sort_on_insert", ducklake_schema_for_sort.GetSchemaId(), + auto &ducklake_schema = ducklake_table.ParentSchema().Cast(); + bool sort_on_insert = GetConfigOption("sort_on_insert", ducklake_schema.GetSchemaId(), ducklake_table.GetTableId(), "true") == "true"; if (sort_data && sort_on_insert) { auto sorted_plan = PlanInsertSort(context, planner, *plan, ducklake_table, sort_data); @@ -804,17 +817,13 @@ PhysicalOperator &DuckLakeCatalog::PlanInsert(ClientContext &context, PhysicalPl } } + // Data-inlining wrap. When sort_on_insert=false and inlining is enabled, sort AFTER the inline operator + // so overflow rows reach Parquet sorted; inlined rows skip the sort (they live in metadata, not Parquet). optional_ptr inline_data; - idx_t data_inlining_row_limit = GetInliningLimit(context, ducklake_table); if (data_inlining_row_limit > 0) { plan = planner.Make(*plan, data_inlining_row_limit); inline_data = plan->Cast(); - - // When sort_on_insert=false but inlining is enabled, add sorting AFTER - // the inline data operator. Data that exceeds the inlining limit passes - // through to parquet files and must be sorted. Data that is inlined - // (absorbed by DuckLakeInlineData) never reaches this sort operator. if (sort_data && !sort_on_insert) { auto sorted_plan = PlanInsertSort(context, planner, *plan, ducklake_table, sort_data); if (sorted_plan) { @@ -822,6 +831,7 @@ PhysicalOperator &DuckLakeCatalog::PlanInsert(ClientContext &context, PhysicalPl } } } + DuckLakeCopyInput copy_input(context, ducklake_table); auto &physical_copy = DuckLakeInsert::PlanCopyForInsert(context, planner, copy_input, plan); auto &insert = DuckLakeInsert::PlanInsert(context, planner, ducklake_table, std::move(copy_input.encryption_key)); @@ -838,7 +848,51 @@ PhysicalOperator &DuckLakeCatalog::PlanCreateTableAs(ClientContext &context, Phy auto &columns = create_info.columns; auto &duck_transaction = DuckLakeTransaction::Get(context, *this); auto &duck_schema = op.schema.Cast(); + + // CTAS planning time: the table entry does NOT exist yet (it will be materialized at sink-state-init by + // DuckLakeInsert::GetGlobalSinkState). However the physical write we're + // about to build needs the partition spec + field-id mapping right now so the parquet files land in the + // right hive directories with the right partition_id stamped on them. + // + // The split: we build field_data + partition_data + sort_data SYNTHETICALLY here from the raw inline + // clauses on `op.info`. We hand the partition_data / sort_data to the DuckLakeInsert operator. + + idx_t column_id = 1; + auto field_data = DuckLakeFieldData::FromColumns(columns, column_id); + unique_ptr partition_data; + if (!create_info.partition_keys.empty()) { + partition_data = + DuckLakeTableEntry::BuildPartitionData(duck_transaction, columns, *field_data, create_info.partition_keys); + } + + unique_ptr sort_data; + if (!create_info.sort_keys.empty()) { + // FIXME: TODO: The syntax needs to be enabled in DuckDB upstream + // CREATE TABLE AS ... SORTED BY (e) accepts only raw expressions; wrap each in an OrderByNode using the + // same defaults the DuckDB transformer produces for a bare expression list (ASCENDING + ORDER_DEFAULT). + // Matches the ALTER TABLE ... SET SORTED BY (e) path. + vector orders; + orders.reserve(create_info.sort_keys.size()); + for (auto &expr : create_info.sort_keys) { + orders.emplace_back(OrderType::ASCENDING, OrderByNullType::ORDER_DEFAULT, expr->Copy()); + } + sort_data = DuckLakeTableEntry::BuildSortData(duck_transaction, columns, orders); + } + reference root = plan; + + // Sort wrap (mirrors PlanInsert's sort_on_insert wrap). For CTAS we don't have a table_id yet so the + // table-level sort_on_insert override doesn't apply; we honor the schema-level setting (defaults to true). + bool sort_on_insert = + GetConfigOption("sort_on_insert", duck_schema.GetSchemaId(), TableIndex(), "true") == "true"; + if (sort_data && sort_on_insert) { + auto sorted_plan = + PlanInsertSortFromColumns(context, planner, root.get(), columns, create_info.table, sort_data.get()); + if (sorted_plan) { + root = *sorted_plan; + } + } + optional_ptr inline_data; idx_t data_inlining_row_limit = DataInliningRowLimit(context, duck_schema.GetSchemaId(), TableIndex()); auto &metadata_manager = duck_transaction.GetMetadataManager(); @@ -846,17 +900,24 @@ PhysicalOperator &DuckLakeCatalog::PlanCreateTableAs(ClientContext &context, Phy root = planner.Make(root.get(), data_inlining_row_limit); inline_data = root.get().Cast(); } - for (auto &col : op.info->Base().columns.Logical()) { + for (auto &col : columns.Logical()) { DuckLakeTypes::CheckSupportedType(col.Type()); } auto table_uuid = duck_transaction.GenerateUUID(); auto table_data_path = duck_schema.DataPath() + DuckLakeCatalog::GeneratePathFromName(table_uuid, create_info.table); - DuckLakeCopyInput copy_input(context, duck_schema, columns, table_data_path); + DuckLakeCopyInput copy_input(context, duck_schema, columns, table_data_path, *field_data, partition_data.get()); auto &physical_copy = DuckLakeInsert::PlanCopyForInsert(context, planner, copy_input, root.get()); - auto &insert = planner.Make(op.types, op.schema, std::move(op.info), std::move(table_uuid), - std::move(table_data_path), std::move(copy_input.encryption_key)); + + optional_idx insert_partition_id; + if (partition_data) { + insert_partition_id = partition_data->partition_id; + } + + auto &insert = planner.Make( + op.types, op.schema, std::move(op.info), std::move(table_uuid), std::move(table_data_path), + std::move(partition_data), std::move(sort_data), insert_partition_id, std::move(copy_input.encryption_key)); if (inline_data) { inline_data->insert = insert.Cast(); } diff --git a/src/storage/ducklake_schema_entry.cpp b/src/storage/ducklake_schema_entry.cpp index 10840d41b9f..b7b03fc962b 100644 --- a/src/storage/ducklake_schema_entry.cpp +++ b/src/storage/ducklake_schema_entry.cpp @@ -4,6 +4,7 @@ #include "duckdb/parser/parsed_data/comment_on_column_info.hpp" #include "duckdb/parser/parsed_data/create_view_info.hpp" #include "duckdb/parser/parsed_data/drop_info.hpp" +#include "duckdb/parser/result_modifier.hpp" #include "duckdb/planner/parsed_data/bound_create_table_info.hpp" #include "storage/ducklake_catalog.hpp" #include "storage/ducklake_table_entry.hpp" @@ -61,9 +62,10 @@ bool DuckLakeSchemaEntry::HandleCreateConflict(CatalogTransaction transaction, C return true; } -optional_ptr DuckLakeSchemaEntry::CreateTableExtended(CatalogTransaction transaction, - BoundCreateTableInfo &info, string table_uuid, - string table_data_path) { +optional_ptr +DuckLakeSchemaEntry::CreateTableExtended(CatalogTransaction transaction, BoundCreateTableInfo &info, string table_uuid, + string table_data_path, unique_ptr prebuilt_partition_data, + unique_ptr prebuilt_sort_data) { auto &duck_transaction = transaction.transaction->Cast(); auto &base_info = info.Base(); // check if we have an existing entry with this name @@ -85,6 +87,33 @@ optional_ptr DuckLakeSchemaEntry::CreateTableExtended(CatalogTrans auto table_entry = make_uniq(ParentCatalog(), *this, base_info, table_id, std::move(table_uuid), std::move(table_data_path), std::move(field_data), column_id, std::move(inlined_tables), LocalChangeType::CREATED); + // Two routes for partition / sort data: + // (A) CTAS: partition_data and sort_data were pre-built at planning time so the planner-built physical + // write could embed the right partition_id into the data files. Attach them as-is - rebuilding here + // would allocate new ids that wouldn't match the ids baked into the on-disk files. + // (B) Plain CREATE TABLE (non-CTAS): rebuild from the inline clauses on info.Base().partition_keys / + // sort_keys. There's no planning-time write to coordinate with, so the id allocated here is the only id. + if (prebuilt_partition_data) { + table_entry->SetPartitionData(std::move(prebuilt_partition_data)); + } else if (!base_info.partition_keys.empty()) { + table_entry->SetPartitionData(DuckLakeTableEntry::BuildPartitionData( + duck_transaction, table_entry->GetColumns(), table_entry->GetFieldData(), base_info.partition_keys)); + } + if (prebuilt_sort_data) { + table_entry->SetSortData(std::move(prebuilt_sort_data)); + } else if (!base_info.sort_keys.empty()) { + // TODO: FIXME: Needs a fix in upstream DuckDB then a fix here + // CREATE TABLE ... SORTED BY (e) accepts only raw expressions (no ASC/DESC/NULLS modifiers). + // Wrap each in an OrderByNode using the same defaults the DuckDB transformer produces for a bare + // expression list (ASCENDING + ORDER_DEFAULT) - matches the ALTER TABLE ... SET SORTED BY (e) path. + vector orders; + orders.reserve(base_info.sort_keys.size()); + for (auto &expr : base_info.sort_keys) { + orders.emplace_back(OrderType::ASCENDING, OrderByNullType::ORDER_DEFAULT, expr->Copy()); + } + table_entry->SetSortData( + DuckLakeTableEntry::BuildSortData(duck_transaction, table_entry->GetColumns(), orders)); + } auto result = table_entry.get(); duck_transaction.CreateEntry(std::move(table_entry)); return result; diff --git a/src/storage/ducklake_table_entry.cpp b/src/storage/ducklake_table_entry.cpp index f9335e1da11..677e75a21b4 100644 --- a/src/storage/ducklake_table_entry.cpp +++ b/src/storage/ducklake_table_entry.cpp @@ -422,7 +422,7 @@ string GetPartitionColumnName(ColumnRefExpression &colref) { return colref.GetColumnName(); } -void DuckLakeTableEntry::ValidateSortExpressionColumns(DuckLakeTableEntry &table, const vector &orders) { +void DuckLakeTableEntry::ValidateSortExpressionColumns(const ColumnList &columns, const vector &orders) { vector missing_columns; for (auto &order : orders) { ParsedExpressionIterator::VisitExpression( @@ -432,7 +432,7 @@ void DuckLakeTableEntry::ValidateSortExpressionColumns(DuckLakeTableEntry &table "Unexpected qualified column reference - only unqualified columns are supported"); } string column_name = colref.GetColumnName(); - if (!table.ColumnExists(column_name)) { + if (!columns.ColumnExists(column_name)) { if (std::find(missing_columns.begin(), missing_columns.end(), column_name) == missing_columns.end()) { missing_columns.push_back(column_name); @@ -453,7 +453,8 @@ void DuckLakeTableEntry::ValidateSortExpressionColumns(DuckLakeTableEntry &table } } -DuckLakePartitionField GetPartitionField(DuckLakeTableEntry &table, ParsedExpression &expr) { +DuckLakePartitionField GetPartitionField(const ColumnList &columns, DuckLakeFieldData &field_data, + ParsedExpression &expr) { string column_name; DuckLakePartitionField field; @@ -521,12 +522,12 @@ DuckLakePartitionField GetPartitionField(DuckLakeTableEntry &table, ParsedExpres "Unsupported partition key %s - only identity columns and year/month/day/hour/bucket are supported", expr.ToString()); } - if (!table.ColumnExists(column_name)) { + if (!columns.ColumnExists(column_name)) { throw CatalogException("Unexpected partition key - column \"%s\" does not exist", column_name); } - auto &col = table.GetColumn(column_name); + auto &col = columns.GetColumn(column_name); PhysicalIndex column_index(col.StorageOid()); - auto &field_id = table.GetFieldData().GetByRootIndex(column_index); + auto &field_id = field_data.GetByRootIndex(column_index); field.field_id = field_id.GetFieldIndex(); return field; } @@ -547,22 +548,41 @@ static bool PartitionFieldsMatch(optional_ptr current_partiti return true; } -unique_ptr DuckLakeTableEntry::AlterTable(DuckLakeTransaction &transaction, SetPartitionedByInfo &info) { - auto create_info = GetInfo(); - auto &table_info = create_info->Cast(); - // create a complete copy of this table with the partition info added +unique_ptr +DuckLakeTableEntry::BuildPartitionData(DuckLakeTransaction &transaction, const ColumnList &columns, + DuckLakeFieldData &field_data, + const vector> &partition_keys) { + // Always returns a non-null DuckLakePartition. Empty partition_keys yields a partition with no fields, + // which is the shape RESET PARTITIONED BY (and SET PARTITIONED BY () with empty list) needs to drop the + // existing partition spec at commit time. CTAS callers that mean "no PARTITIONED BY clause at all" should + // short-circuit and not call this helper. auto partition_data = make_uniq(); partition_data->partition_id = transaction.GetLocalCatalogId(); - for (idx_t expr_idx = 0; expr_idx < info.partition_keys.size(); expr_idx++) { - auto &expr = *info.partition_keys[expr_idx]; - auto partition_field = GetPartitionField(*this, expr); + for (idx_t expr_idx = 0; expr_idx < partition_keys.size(); expr_idx++) { + auto &expr = *partition_keys[expr_idx]; + auto partition_field = GetPartitionField(columns, field_data, expr); + // Reject duplicate partition keys: two expressions that resolve to the same (field_id, transform). + // `(year(ts), month(ts))` is fine - same column, different transforms - but `(a, a)` is not. + for (auto &existing : partition_data->fields) { + if (existing.field_id == partition_field.field_id && existing.transform == partition_field.transform) { + throw BinderException("Duplicate partition key: expression \"%s\" matches an earlier partition key", + expr.ToString()); + } + } partition_field.partition_key_index = expr_idx; partition_data->fields.push_back(partition_field); } + return partition_data; +} + +unique_ptr DuckLakeTableEntry::AlterTable(DuckLakeTransaction &transaction, SetPartitionedByInfo &info) { + auto create_info = GetInfo(); + auto &table_info = create_info->Cast(); + auto partition_data = BuildPartitionData(transaction, GetColumns(), GetFieldData(), info.partition_keys); if (PartitionFieldsMatch(GetPartitionData(), *partition_data)) { return nullptr; } - + auto new_entry = make_uniq(*this, table_info, std::move(partition_data)); return std::move(new_entry); } @@ -1258,24 +1278,16 @@ unique_ptr DuckLakeTableEntry::AlterTable(DuckLakeTransaction &tra return std::move(new_entry); } -unique_ptr DuckLakeTableEntry::AlterTable(DuckLakeTransaction &transaction, SetSortedByInfo &info) { - auto create_info = GetInfo(); - auto &table_info = create_info->Cast(); - - if (info.orders.empty()) { - // RESET SORTED BY - clear sort data - auto new_entry = make_uniq(*this, table_info, unique_ptr()); - return std::move(new_entry); +unique_ptr DuckLakeTableEntry::BuildSortData(DuckLakeTransaction &transaction, const ColumnList &columns, + const vector &orders) { + if (orders.empty()) { + return nullptr; } - - // Validate all column references in all sort expressions - ValidateSortExpressionColumns(*this, info.orders); - + ValidateSortExpressionColumns(columns, orders); auto sort_data = make_uniq(); sort_data->sort_id = transaction.GetLocalCatalogId(); - for (idx_t order_node_idx = 0; order_node_idx < info.orders.size(); order_node_idx++) { - auto &order_node = info.orders[order_node_idx]; - + for (idx_t order_node_idx = 0; order_node_idx < orders.size(); order_node_idx++) { + auto &order_node = orders[order_node_idx]; DuckLakeSortField sort_field; sort_field.sort_key_index = order_node_idx; sort_field.expression = order_node.expression->ToString(); @@ -1284,7 +1296,14 @@ unique_ptr DuckLakeTableEntry::AlterTable(DuckLakeTransaction &tra sort_field.null_order = order_node.null_order; sort_data->fields.push_back(sort_field); } + return sort_data; +} + +unique_ptr DuckLakeTableEntry::AlterTable(DuckLakeTransaction &transaction, SetSortedByInfo &info) { + auto create_info = GetInfo(); + auto &table_info = create_info->Cast(); + auto sort_data = BuildSortData(transaction, GetColumns(), info.orders); auto new_entry = make_uniq(*this, table_info, std::move(sort_data)); return std::move(new_entry); } diff --git a/src/storage/ducklake_transaction.cpp b/src/storage/ducklake_transaction.cpp index 6a7f3ae064c..0f77a983c4c 100644 --- a/src/storage/ducklake_transaction.cpp +++ b/src/storage/ducklake_transaction.cpp @@ -1748,6 +1748,18 @@ void DuckLakeTransaction::GetNewTableInfo(DuckLakeCommitState &commit_state, Duc inlined_entry.uuid = latest_table.GetTableUUID(); inlined_entry.columns = latest_table.GetTableColumns(); result.new_inlined_data_tables.push_back(std::move(inlined_entry)); + + // CREATE TABLE ... PARTITIONED BY / SORTED BY + if (local_change.type == LocalChangeType::CREATED) { + if (table.GetPartitionData()) { + auto partition_key = GetNewPartitionKey(commit_state, table); + result.new_partition_keys.push_back(std::move(partition_key)); + } + if (table.GetSortData()) { + auto sort_key = GetNewSortKey(commit_state, table); + result.new_sort_keys.push_back(std::move(sort_key)); + } + } break; } default: diff --git a/test/sql/create_table_inline_partition_sort/create_table_inline_basic.test b/test/sql/create_table_inline_partition_sort/create_table_inline_basic.test new file mode 100644 index 00000000000..38b97dd29f4 --- /dev/null +++ b/test/sql/create_table_inline_partition_sort/create_table_inline_basic.test @@ -0,0 +1,311 @@ +# name: test/sql/create_table_inline_partition_sort/create_table_inline_basic.test +# description: CREATE TABLE with inline PARTITIONED BY / SORTED BY clauses +# group: [create_table_inline_partition_sort] + +require ducklake + +require parquet + +test-env DUCKLAKE_CONNECTION {TEST_DIR}/{UUID}.db + +test-env DATA_PATH {TEST_DIR} + +statement ok +ATTACH 'ducklake:{DUCKLAKE_CONNECTION}' AS ducklake (DATA_PATH '{DATA_PATH}/inline_basic', METADATA_CATALOG 'ducklake_metadata', DATA_INLINING_ROW_LIMIT 0) + +statement ok +USE ducklake + +# Test 1: CREATE TABLE with inline PARTITIONED BY only (single column) +statement ok +CREATE TABLE t_part (i INTEGER, v VARCHAR) PARTITIONED BY (i) + +statement ok +INSERT INTO t_part VALUES (1, 'a'), (2, 'b'), (1, 'c') + +query II +SELECT i, v FROM t_part ORDER BY i, v +---- +1 a +1 c +2 b + +# verify partition metadata was persisted at CREATE time (no ALTER ran) +query I +SELECT count(*) FROM ducklake_metadata.ducklake_partition_info WHERE table_id = (SELECT table_id FROM ducklake_metadata.ducklake_table WHERE table_name = 't_part') +---- +1 + +query I +SELECT count(*) FROM ducklake_metadata.ducklake_partition_column WHERE table_id = (SELECT table_id FROM ducklake_metadata.ducklake_table WHERE table_name = 't_part') +---- +1 + +# verify the on-disk Parquet files landed in hive-style partition-specific directories +# (DATA_INLINING_ROW_LIMIT 0 forces INSERT to write Parquet directly; one file per partition value) +query I +SELECT count(*) FROM ducklake_metadata.ducklake_data_file WHERE table_id = (SELECT table_id FROM ducklake_metadata.ducklake_table WHERE table_name = 't_part') +---- +2 + +query I +SELECT DISTINCT regexp_extract(path, '.*(i=[0-9]+)[/\\].*', 1) FROM ducklake_metadata.ducklake_data_file WHERE table_id = (SELECT table_id FROM ducklake_metadata.ducklake_table WHERE table_name = 't_part') ORDER BY 1 +---- +i=1 +i=2 + +# Test 2: CREATE TABLE with inline SORTED BY only (single column) +statement ok +CREATE TABLE t_sort (i INTEGER, v VARCHAR) SORTED BY (i) + +statement ok +INSERT INTO t_sort VALUES (3, 'x'), (1, 'y'), (2, 'z') + +# t_sort has a single file (DATA_INLINING_ROW_LIMIT 0, no partitioning, one INSERT batch). +# SORTED BY (i) means rows are i-ascending in the file - SELECT without ORDER BY returns the +# file's row order directly, which IS the sort verification. +query II +SELECT i, v FROM t_sort +---- +1 y +2 z +3 x + +query I +SELECT count(*) FROM ducklake_metadata.ducklake_sort_info WHERE table_id = (SELECT table_id FROM ducklake_metadata.ducklake_table WHERE table_name = 't_sort') +---- +1 + +query II +SELECT sort_key_index, expression FROM ducklake_metadata.ducklake_sort_expression WHERE table_id = (SELECT table_id FROM ducklake_metadata.ducklake_table WHERE table_name = 't_sort') ORDER BY sort_key_index +---- +0 i + +# verify the inline path persisted the same defaults a bare ALTER TABLE ... SET SORTED BY (col) would +# (the metadata writer collapses OrderType::ASCENDING -> "ASC" and OrderByNullType::ORDER_DEFAULT -> "NULLS_LAST") +query III +SELECT sort_direction, null_order, dialect FROM ducklake_metadata.ducklake_sort_expression WHERE table_id = (SELECT table_id FROM ducklake_metadata.ducklake_table WHERE table_name = 't_sort') +---- +ASC NULLS_LAST duckdb + +# Test 3: CREATE TABLE with both clauses, PARTITIONED first +statement ok +CREATE TABLE t_both1 (a INTEGER, b INTEGER, v VARCHAR) PARTITIONED BY (a) SORTED BY (b) + +statement ok +INSERT INTO t_both1 VALUES (1, 30, 'p'), (2, 10, 'q'), (1, 20, 'r') + +# t_both1 is PARTITIONED BY (a), so each value of a lives in its own file. SORTED BY (b) means +# within each partition's file rows are b-ascending. Filter to one partition at a time and verify +# row order without ORDER BY - that order IS the on-disk row order of the partition's file. +query III +SELECT a, b, v FROM t_both1 WHERE a = 1 +---- +1 20 r +1 30 p + +# on-disk: one Parquet file per distinct partition value (a=1 and a=2), each in its own a=N directory +query I +SELECT DISTINCT regexp_extract(path, '.*(a=[0-9]+)[/\\].*', 1) FROM ducklake_metadata.ducklake_data_file WHERE table_id = (SELECT table_id FROM ducklake_metadata.ducklake_table WHERE table_name = 't_both1') ORDER BY 1 +---- +a=1 +a=2 + +# Test 4: CREATE TABLE with both clauses, SORTED first (verify either-order grammar) +statement ok +CREATE TABLE t_both2 (a INTEGER, b INTEGER) SORTED BY (b) PARTITIONED BY (a) + +query I +SELECT count(*) FROM ducklake_metadata.ducklake_partition_info WHERE table_id = (SELECT table_id FROM ducklake_metadata.ducklake_table WHERE table_name = 't_both2') +---- +1 + +query I +SELECT count(*) FROM ducklake_metadata.ducklake_sort_info WHERE table_id = (SELECT table_id FROM ducklake_metadata.ducklake_table WHERE table_name = 't_both2') +---- +1 + +# Test 5: multi-column partition + multi-column sort +statement ok +CREATE TABLE t_multi (a INTEGER, b INTEGER, c INTEGER) PARTITIONED BY (a, b) SORTED BY (c, a) + +query I +SELECT count(*) FROM ducklake_metadata.ducklake_partition_column WHERE table_id = (SELECT table_id FROM ducklake_metadata.ducklake_table WHERE table_name = 't_multi') +---- +2 + +query I +SELECT count(*) FROM ducklake_metadata.ducklake_sort_expression WHERE table_id = (SELECT table_id FROM ducklake_metadata.ducklake_table WHERE table_name = 't_multi') +---- +2 + +statement ok +INSERT INTO t_multi VALUES (1, 1, 30), (1, 1, 10), (1, 2, 20), (2, 1, 5) + +# t_multi is PARTITIONED BY (a, b), so each (a, b) combination lives in its own file. SORTED BY +# (c, a) means rows within a partition's file are (c, a)-ascending. +query III +SELECT a, b, c FROM t_multi WHERE a = 1 AND b = 1 +---- +1 1 10 +1 1 30 + +# multi-column hive-style nesting: a=N/b=M/ +query II +SELECT regexp_extract(path, '.*(a=[0-9]+)[/\\].*', 1) AS a_dir, regexp_extract(path, '.*(b=[0-9]+)[/\\].*', 1) AS b_dir +FROM ducklake_metadata.ducklake_data_file +WHERE table_id = (SELECT table_id FROM ducklake_metadata.ducklake_table WHERE table_name = 't_multi') +ORDER BY a_dir, b_dir +---- +a=1 b=1 +a=1 b=2 +a=2 b=1 + +# the a directory must come *before* the b directory in the path (PARTITIONED BY (a, b) ordering) +query I +SELECT count(*) FROM ducklake_metadata.ducklake_data_file +WHERE table_id = (SELECT table_id FROM ducklake_metadata.ducklake_table WHERE table_name = 't_multi') + AND regexp_matches(path, 'a=[0-9]+/b=[0-9]+') +---- +3 + +# Test 6: PARTITIONED BY with a transform function +statement ok +CREATE TABLE t_transform (ts TIMESTAMP, v INTEGER) PARTITIONED BY (year(ts)) + +query II +SELECT partition_key_index, transform FROM ducklake_metadata.ducklake_partition_column WHERE table_id = (SELECT table_id FROM ducklake_metadata.ducklake_table WHERE table_name = 't_transform') +---- +0 year + +statement ok +INSERT INTO t_transform VALUES (TIMESTAMP '2023-06-01', 1), (TIMESTAMP '2023-09-15', 2), (TIMESTAMP '2024-03-10', 3) + +# year() transform produces hive directories named with the partition key NAME (year, not ts) +query I +SELECT DISTINCT regexp_extract(path, '.*(year=[0-9]+)[/\\].*', 1) FROM ducklake_metadata.ducklake_data_file WHERE table_id = (SELECT table_id FROM ducklake_metadata.ducklake_table WHERE table_name = 't_transform') ORDER BY 1 +---- +year=2023 +year=2024 + +# Test 7: PARTITIONED BY only writes partition metadata, NOT sort metadata (asymmetric) +statement ok +CREATE TABLE t_only_part (a INTEGER) PARTITIONED BY (a) + +query I +SELECT count(*) FROM ducklake_metadata.ducklake_partition_info WHERE table_id = (SELECT table_id FROM ducklake_metadata.ducklake_table WHERE table_name = 't_only_part') +---- +1 + +query I +SELECT count(*) FROM ducklake_metadata.ducklake_sort_info WHERE table_id = (SELECT table_id FROM ducklake_metadata.ducklake_table WHERE table_name = 't_only_part') +---- +0 + +statement ok +INSERT INTO t_only_part VALUES (10), (20), (10) + +query I +SELECT DISTINCT regexp_extract(path, '.*(a=[0-9]+)[/\\].*', 1) FROM ducklake_metadata.ducklake_data_file WHERE table_id = (SELECT table_id FROM ducklake_metadata.ducklake_table WHERE table_name = 't_only_part') ORDER BY 1 +---- +a=10 +a=20 + +# Test 8: SORTED BY only writes sort metadata, NOT partition metadata (asymmetric) +statement ok +CREATE TABLE t_only_sort (a INTEGER) SORTED BY (a) + +query I +SELECT count(*) FROM ducklake_metadata.ducklake_partition_info WHERE table_id = (SELECT table_id FROM ducklake_metadata.ducklake_table WHERE table_name = 't_only_sort') +---- +0 + +query I +SELECT count(*) FROM ducklake_metadata.ducklake_sort_info WHERE table_id = (SELECT table_id FROM ducklake_metadata.ducklake_table WHERE table_name = 't_only_sort') +---- +1 + +# Test 9: PARTITIONED BY with bucket(N, col) transform happy path +statement ok +CREATE TABLE t_bucket (id BIGINT, v VARCHAR) PARTITIONED BY (bucket(8, id)) + +query II +SELECT partition_key_index, transform FROM ducklake_metadata.ducklake_partition_column WHERE table_id = (SELECT table_id FROM ducklake_metadata.ducklake_table WHERE table_name = 't_bucket') +---- +0 bucket(8) + +statement ok +INSERT INTO t_bucket VALUES (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd'), (5, 'e'), (6, 'f'), (7, 'g'), (8, 'h') + +# bucket() transform writes into hive directories named "bucket=N" (modulo bucket count) +# we don't fix specific bucket assignments (hash-dependent) but we assert at least 1 and at most 8 distinct buckets +query I +SELECT count(DISTINCT regexp_extract(path, '.*(bucket=[0-9]+)[/\\].*', 1)) BETWEEN 1 AND 8 FROM ducklake_metadata.ducklake_data_file WHERE table_id = (SELECT table_id FROM ducklake_metadata.ducklake_table WHERE table_name = 't_bucket') +---- +true + +# every file must live in a bucket=N directory +query I +SELECT count(*) FROM ducklake_metadata.ducklake_data_file WHERE table_id = (SELECT table_id FROM ducklake_metadata.ducklake_table WHERE table_name = 't_bucket') AND NOT regexp_matches(path, 'bucket=[0-9]+') +---- +0 + +# Test 10: same-transaction CREATE-inline + ALTER-set must NOT emit duplicate partition rows. +# The reverse-chain commit walk has to dedupe so only the latest (ALTER) wins. +statement ok +BEGIN + +statement ok +CREATE TABLE t_create_then_alter (a INTEGER, b INTEGER) PARTITIONED BY (a) SORTED BY (a) + +statement ok +ALTER TABLE t_create_then_alter SET PARTITIONED BY (b) + +statement ok +ALTER TABLE t_create_then_alter SET SORTED BY (b DESC) + +statement ok +COMMIT + +# exactly one partition_info row (the ALTER's), not two +query I +SELECT count(*) FROM ducklake_metadata.ducklake_partition_info WHERE table_id = (SELECT table_id FROM ducklake_metadata.ducklake_table WHERE table_name = 't_create_then_alter') +---- +1 + +# the surviving partition row references column b (the ALTER's choice), not a (the inline create table's). +query III +SELECT pc.partition_key_index, c.column_name, pc.transform +FROM ducklake_metadata.ducklake_partition_column pc +JOIN ducklake_metadata.ducklake_column c + ON pc.table_id = c.table_id AND pc.column_id = c.column_id AND c.end_snapshot IS NULL +WHERE pc.table_id = (SELECT table_id FROM ducklake_metadata.ducklake_table WHERE table_name = 't_create_then_alter') +---- +0 b identity + +# exactly one sort_info row, and its expression is b DESC (the ALTER's), not a +query I +SELECT count(*) FROM ducklake_metadata.ducklake_sort_info WHERE table_id = (SELECT table_id FROM ducklake_metadata.ducklake_table WHERE table_name = 't_create_then_alter') +---- +1 + +query III +SELECT expression, sort_direction, null_order FROM ducklake_metadata.ducklake_sort_expression WHERE table_id = (SELECT table_id FROM ducklake_metadata.ducklake_table WHERE table_name = 't_create_then_alter') +---- +b DESC NULLS_LAST + +# INSERT after the chain: data must land in b=N directories (ALTER won), NOT a=N directories +statement ok +INSERT INTO t_create_then_alter VALUES (1, 100), (2, 100), (3, 200) + +query I +SELECT DISTINCT regexp_extract(path, '.*(b=[0-9]+)[/\\].*', 1) FROM ducklake_metadata.ducklake_data_file WHERE table_id = (SELECT table_id FROM ducklake_metadata.ducklake_table WHERE table_name = 't_create_then_alter') ORDER BY 1 +---- +b=100 +b=200 + +# none of the files should be under an a=N directory (regression guard for the dedup logic) +query I +SELECT count(*) FROM ducklake_metadata.ducklake_data_file WHERE table_id = (SELECT table_id FROM ducklake_metadata.ducklake_table WHERE table_name = 't_create_then_alter') AND regexp_matches(path, 'a=[0-9]+/') +---- +0 diff --git a/test/sql/create_table_inline_partition_sort/create_table_inline_ctas.test b/test/sql/create_table_inline_partition_sort/create_table_inline_ctas.test new file mode 100644 index 00000000000..d93155f7202 --- /dev/null +++ b/test/sql/create_table_inline_partition_sort/create_table_inline_ctas.test @@ -0,0 +1,248 @@ +# name: test/sql/create_table_inline_partition_sort/create_table_inline_ctas.test +# description: CREATE TABLE AS SELECT with inline PARTITIONED BY / SORTED BY clauses +# group: [create_table_inline_partition_sort] + +require ducklake + +require parquet + +test-env DUCKLAKE_CONNECTION {TEST_DIR}/{UUID}.db + +test-env DATA_PATH {TEST_DIR} + +statement ok +ATTACH 'ducklake:{DUCKLAKE_CONNECTION}' AS ducklake (DATA_PATH '{DATA_PATH}/inline_ctas', METADATA_CATALOG 'ducklake_metadata', DATA_INLINING_ROW_LIMIT 0) + +statement ok +USE ducklake + +# Test 1: CTAS with PARTITIONED BY only +statement ok +CREATE TABLE ctas_part PARTITIONED BY (i) AS SELECT range AS i, range::VARCHAR AS v FROM range(5) + +query II +SELECT i, v FROM ctas_part ORDER BY i +---- +0 0 +1 1 +2 2 +3 3 +4 4 + +query I +SELECT count(*) FROM ducklake_metadata.ducklake_partition_info WHERE table_id = (SELECT table_id FROM ducklake_metadata.ducklake_table WHERE table_name = 'ctas_part') +---- +1 + +# CTAS must hive-partition on the initial write: 5 distinct values from range(5) -> 5 hive directories +query I +SELECT DISTINCT regexp_extract(path, '.*(i=[0-9]+)[/\\].*', 1) FROM ducklake_metadata.ducklake_data_file WHERE table_id = (SELECT table_id FROM ducklake_metadata.ducklake_table WHERE table_name = 'ctas_part') ORDER BY 1 +---- +i=0 +i=1 +i=2 +i=3 +i=4 + +# Test 2: CTAS with SORTED BY only. +statement ok +CREATE TABLE ctas_sort SORTED BY (j) AS SELECT range AS i, 9 - range AS j FROM range(10) + +# ctas_sort is non-partitioned; CTAS produces a single file. SELECT without ORDER BY returns the +# file's row order directly. SORTED BY (j) means j-ascending; i is forced descending by construction. +query II +SELECT i, j FROM ctas_sort +---- +9 0 +8 1 +7 2 +6 3 +5 4 +4 5 +3 6 +2 7 +1 8 +0 9 + +query I +SELECT count(*) FROM ducklake_metadata.ducklake_sort_info WHERE table_id = (SELECT table_id FROM ducklake_metadata.ducklake_table WHERE table_name = 'ctas_sort') +---- +1 + +# Test 3: CTAS with both clauses, PARTITIONED first. 30 rows partitioned into 3 groups of 10; +# sort key v = (29 - i) is the inverse of the SELECT's natural order so the sort has real work. +statement ok +CREATE TABLE ctas_both PARTITIONED BY (g) SORTED BY (v) AS +SELECT range % 3 AS g, 29 - range AS v FROM range(30) + +query II +SELECT + (SELECT count(*) FROM ducklake_metadata.ducklake_partition_info WHERE table_id = (SELECT table_id FROM ducklake_metadata.ducklake_table WHERE table_name = 'ctas_both')), + (SELECT count(*) FROM ducklake_metadata.ducklake_sort_info WHERE table_id = (SELECT table_id FROM ducklake_metadata.ducklake_table WHERE table_name = 'ctas_both')) +---- +1 1 + +# 3 distinct partition values -> 3 hive directories +query I +SELECT count(DISTINCT regexp_extract(path, '.*(g=[0-9]+)[/\\].*', 1)) FROM ducklake_metadata.ducklake_data_file WHERE table_id = (SELECT table_id FROM ducklake_metadata.ducklake_table WHERE table_name = 'ctas_both') +---- +3 + +# verify sort actually happened: each partition's file holds rows in v-ascending order +query II +SELECT g, v FROM ctas_both WHERE g = 0 +---- +0 2 +0 5 +0 8 +0 11 +0 14 +0 17 +0 20 +0 23 +0 26 +0 29 + +query II +SELECT g, v FROM ctas_both WHERE g = 1 +---- +1 1 +1 4 +1 7 +1 10 +1 13 +1 16 +1 19 +1 22 +1 25 +1 28 + +query II +SELECT g, v FROM ctas_both WHERE g = 2 +---- +2 0 +2 3 +2 6 +2 9 +2 12 +2 15 +2 18 +2 21 +2 24 +2 27 + +# Test 4: CTAS with SORTED first then PARTITIONED (either-order grammar). Same shape as Test 3 but +statement ok +CREATE TABLE ctas_swap SORTED BY (v) PARTITIONED BY (g) AS +SELECT range % 3 AS g, 29 - range AS v FROM range(30) + +query II +SELECT + (SELECT count(*) FROM ducklake_metadata.ducklake_partition_info WHERE table_id = (SELECT table_id FROM ducklake_metadata.ducklake_table WHERE table_name = 'ctas_swap')), + (SELECT count(*) FROM ducklake_metadata.ducklake_sort_info WHERE table_id = (SELECT table_id FROM ducklake_metadata.ducklake_table WHERE table_name = 'ctas_swap')) +---- +1 1 + +query I +SELECT count(DISTINCT regexp_extract(path, '.*(g=[0-9]+)[/\\].*', 1)) FROM ducklake_metadata.ducklake_data_file WHERE table_id = (SELECT table_id FROM ducklake_metadata.ducklake_table WHERE table_name = 'ctas_swap') +---- +3 + +# partition g=1's file +query II +SELECT g, v FROM ctas_swap WHERE g = 1 +---- +1 1 +1 4 +1 7 +1 10 +1 13 +1 16 +1 19 +1 22 +1 25 +1 28 + +# Test 5: CTAS partitioning by a transform expression +statement ok +CREATE TABLE ctas_transform PARTITIONED BY (year(ts)) AS +SELECT TIMESTAMP '2024-01-01' + INTERVAL (i) YEAR AS ts, i AS v FROM range(3) t(i) + +query II +SELECT partition_key_index, transform FROM ducklake_metadata.ducklake_partition_column WHERE table_id = (SELECT table_id FROM ducklake_metadata.ducklake_table WHERE table_name = 'ctas_transform') +---- +0 year + +# CTAS with year() transform must hive-partition on the initial write +query I +SELECT DISTINCT regexp_extract(path, '.*(year=[0-9]+)[/\\].*', 1) FROM ducklake_metadata.ducklake_data_file WHERE table_id = (SELECT table_id FROM ducklake_metadata.ducklake_table WHERE table_name = 'ctas_transform') ORDER BY 1 +---- +year=2024 +year=2025 +year=2026 + +# Test 6: CTAS with multi-column SORTED BY using string and integer columns +statement ok +CREATE TABLE ctas_multi SORTED BY (a, b) AS +SELECT i AS a, ('tag_' || i) AS b FROM range(4) t(i) + +# ctas_multi is non-partitioned; CTAS produces a single file. SORTED BY (a, b) means rows are +# (a, b)-ascending in that file - SELECT without ORDER BY returns the file's actual row order. +query II +SELECT a, b FROM ctas_multi +---- +0 tag_0 +1 tag_1 +2 tag_2 +3 tag_3 + +query II +SELECT sort_key_index, expression FROM ducklake_metadata.ducklake_sort_expression WHERE table_id = (SELECT table_id FROM ducklake_metadata.ducklake_table WHERE table_name = 'ctas_multi') ORDER BY sort_key_index +---- +0 a +1 b + +# Test 7: CTAS with a CTE source (regression: planning shape differs from a flat SELECT) +statement ok +CREATE TABLE ctas_cte PARTITIONED BY (g) SORTED BY (v) AS +WITH src AS (SELECT i % 3 AS g, i AS v FROM range(9) t(i)) +SELECT g, v FROM src + +query II +SELECT g, count(*) FROM ctas_cte GROUP BY g ORDER BY g +---- +0 3 +1 3 +2 3 + +# ctas_cte is PARTITIONED BY (g), so each value of g lives in its own file. SORTED BY (v) means +# rows within each partition's file are v-ascending. Filter to one partition; the row order +# returned IS the on-disk file order. +query II +SELECT g, v FROM ctas_cte WHERE g = 0 +---- +0 0 +0 3 +0 6 + +query II +SELECT g, v FROM ctas_cte WHERE g = 1 +---- +1 1 +1 4 +1 7 + +query II +SELECT g, v FROM ctas_cte WHERE g = 2 +---- +2 2 +2 5 +2 8 + +# CTE-sourced CTAS must still partition on the initial write (regression: planner passes the spec through) +query I +SELECT DISTINCT regexp_extract(path, '.*(g=[0-9]+)[/\\].*', 1) FROM ducklake_metadata.ducklake_data_file WHERE table_id = (SELECT table_id FROM ducklake_metadata.ducklake_table WHERE table_name = 'ctas_cte') ORDER BY 1 +---- +g=0 +g=1 +g=2 diff --git a/test/sql/create_table_inline_partition_sort/create_table_inline_ctas_sort_verification.test b/test/sql/create_table_inline_partition_sort/create_table_inline_ctas_sort_verification.test new file mode 100644 index 00000000000..8aaad79b5b0 --- /dev/null +++ b/test/sql/create_table_inline_partition_sort/create_table_inline_ctas_sort_verification.test @@ -0,0 +1,88 @@ +# name: test/sql/create_table_inline_partition_sort/create_table_inline_ctas_sort_verification.test +# description: Verify SORTED BY actually sorts data during CTAS by inspecting parquet rowgroup statistics. +# group: [create_table_inline_partition_sort] + +# Uses 3000+ rows across multiple 2048-row rowgroups; sort column is the INVERSE of the SELECT's +# natural order so any "sort skipped" regression would leave rowgroups in descending order. + +require ducklake + +require parquet + +test-env DUCKLAKE_CONNECTION {TEST_DIR}/{UUID}.db + +test-env DATA_PATH {TEST_DIR} + +statement ok +ATTACH 'ducklake:{DUCKLAKE_CONNECTION}' AS ducklake (DATA_PATH '{DATA_PATH}/inline_ctas_sort_verify', METADATA_CATALOG 'ducklake_metadata', DATA_INLINING_ROW_LIMIT 0) + +# 2048 is the minimum parquet rowgroup size in DuckDB; pick it so 3000 rows produce >=2 rowgroups. +statement ok +CALL ducklake.set_option('parquet_row_group_size', 2048) + +# Single-threaded + preserved insertion order keeps rowgroup boundaries deterministic, so we can assert +# specific stats_min values per rowgroup rather than an order-tolerant "monotonic" check. +statement ok +SET threads=1 + +statement ok +SET preserve_insertion_order=true + +statement ok +USE ducklake + +# CTAS feeding from range(3000): natural SELECT order is `a` ascending. Sort key `b` is `2999 - a` so its +# natural in-stream order is strictly descending. If SORTED BY (b) is honored at CTAS time, the data is +# resorted to b ascending and the per-rowgroup stats_min/max for `b` will be disjoint and increasing. +statement ok +CREATE TABLE sort_verify SORTED BY (b) AS +SELECT range AS a, (2999 - range) AS b FROM range(3000) + +# logical correctness: data round-trips +query I +SELECT count(*) FROM sort_verify +---- +3000 + +# Confirm we got at least 2 rowgroups (otherwise the per-rowgroup-stats check is meaningless). +query I +SELECT count(DISTINCT row_group_id) >= 2 +FROM parquet_metadata('{DATA_PATH}/inline_ctas_sort_verify/main/sort_verify/*.parquet') +WHERE path_in_schema = 'b' +---- +true + +# ---------------------------------------------------------------------------------------------------------------------- +# Hardcoded per-rowgroup min/max sanity check on `sort_verify`. +# Layout: 3000 rows, parquet_row_group_size=2048, single thread, preserve_insertion_order=true. After SORT BY b ASC: +# ---------------------------------------------------------------------------------------------------------------------- + +# column `b` +query III +SELECT row_group_id, stats_min::INTEGER AS rg_min, stats_max::INTEGER AS rg_max +FROM parquet_metadata('{DATA_PATH}/inline_ctas_sort_verify/main/sort_verify/*.parquet') +WHERE path_in_schema = 'b' +ORDER BY row_group_id +---- +0 0 2047 +1 2048 2999 + +# column `a` +query III +SELECT row_group_id, stats_min::INTEGER AS rg_min, stats_max::INTEGER AS rg_max +FROM parquet_metadata('{DATA_PATH}/inline_ctas_sort_verify/main/sort_verify/*.parquet') +WHERE path_in_schema = 'a' +ORDER BY row_group_id +---- +0 952 2999 +1 0 951 + +# row counts per rowgroup confirm the 2048 + 952 split +query II +SELECT row_group_id, row_group_num_rows +FROM parquet_metadata('{DATA_PATH}/inline_ctas_sort_verify/main/sort_verify/*.parquet') +WHERE path_in_schema = 'b' +ORDER BY row_group_id +---- +0 2048 +1 952 diff --git a/test/sql/create_table_inline_partition_sort/create_table_inline_errors.test b/test/sql/create_table_inline_partition_sort/create_table_inline_errors.test new file mode 100644 index 00000000000..85343564a51 --- /dev/null +++ b/test/sql/create_table_inline_partition_sort/create_table_inline_errors.test @@ -0,0 +1,96 @@ +# name: test/sql/create_table_inline_partition_sort/create_table_inline_errors.test +# description: validation errors for inline PARTITIONED BY / SORTED BY on CREATE TABLE / CTAS +# group: [create_table_inline_partition_sort] + +require ducklake + +require parquet + +test-env DUCKLAKE_CONNECTION {TEST_DIR}/{UUID}.db + +test-env DATA_PATH {TEST_DIR} + +statement ok +ATTACH 'ducklake:{DUCKLAKE_CONNECTION}' AS ducklake (DATA_PATH '{DATA_PATH}/inline_errors', METADATA_CATALOG 'ducklake_metadata', DATA_INLINING_ROW_LIMIT 0) + +statement ok +USE ducklake + +# PARTITIONED BY references nonexistent column +statement error +CREATE TABLE bad_part (a INTEGER) PARTITIONED BY (nonexistent) +---- +column "nonexistent" does not exist + +# SORTED BY references nonexistent column +statement error +CREATE TABLE bad_sort (a INTEGER) SORTED BY (nonexistent) +---- +:.*nonexistent.* + +# WITH (...) clause is rejected by SupportsCreateTable override (DuckLake has no WITH semantics) +statement error +CREATE TABLE bad_with (a INTEGER) WITH (some_option='x') +---- +WITH clause is not supported for tables in a ducklake catalog + +# unsupported transform function +statement error +CREATE TABLE bad_transform (a INTEGER) PARTITIONED BY (foo(a)) +---- +:.*Unsupported partition function.* + +# malformed bucket() expression - missing args +statement error +CREATE TABLE bad_bucket (a INTEGER) PARTITIONED BY (bucket(a)) +---- +Expected bucket(bucket_count, column) + +# duplicate partition key (same column with same transform) is rejected +statement error +CREATE TABLE bad_dup (a INTEGER) PARTITIONED BY (a, a) +---- +Duplicate partition key + +# CTAS variants: SORTED BY references nonexistent column +statement error +CREATE TABLE bad_ctas SORTED BY (nonexistent) AS SELECT 1 AS a +---- +:.*nonexistent.* + +# CTAS variants: PARTITIONED BY references nonexistent column +statement error +CREATE TABLE bad_ctas_part PARTITIONED BY (nonexistent) AS SELECT 1 AS a +---- +:.*nonexistent.* + +# verify the failing CREATEs left no metadata behind +query I +SELECT count(*) FROM ducklake_metadata.ducklake_partition_info +---- +0 + +query I +SELECT count(*) FROM ducklake_metadata.ducklake_sort_info +---- +0 + +# A successful CREATE after failures still works (no transaction state corruption) +statement ok +CREATE TABLE ok_after_errors (a INTEGER) PARTITIONED BY (a) + +query I +SELECT count(*) FROM ducklake_metadata.ducklake_partition_info WHERE table_id = (SELECT table_id FROM ducklake_metadata.ducklake_table WHERE table_name = 'ok_after_errors') +---- +1 + +# Same column with DIFFERENT transforms is allowed (e.g. year + month of the same timestamp). +# This must NOT be flagged as a duplicate partition key. +statement ok +CREATE TABLE ok_diff_transforms (ts TIMESTAMP) PARTITIONED BY (year(ts), month(ts)) + +query II +SELECT partition_key_index, transform FROM ducklake_metadata.ducklake_partition_column WHERE table_id = (SELECT table_id FROM ducklake_metadata.ducklake_table WHERE table_name = 'ok_diff_transforms') ORDER BY partition_key_index +---- +0 year +1 month diff --git a/test/sql/create_table_inline_partition_sort/create_table_inline_transactions.test b/test/sql/create_table_inline_partition_sort/create_table_inline_transactions.test new file mode 100644 index 00000000000..d04656cfc00 --- /dev/null +++ b/test/sql/create_table_inline_partition_sort/create_table_inline_transactions.test @@ -0,0 +1,453 @@ +# name: test/sql/create_table_inline_partition_sort/create_table_inline_transactions.test +# description: Transaction semantics for inline PARTITIONED BY / SORTED BY on CREATE TABLE / CTAS: +# rollback discards everything (entry, files, transaction-local ids); concurrent CTAS with +# distinct names cannot conflict; concurrent CTAS with the same name produces a commit-time +# conflict; +# group: [create_table_inline_partition_sort] + +require ducklake + +require parquet + +test-env DUCKLAKE_CONNECTION {TEST_DIR}/{UUID}.db + +test-env DATA_PATH {TEST_DIR} + +statement ok +ATTACH 'ducklake:{DUCKLAKE_CONNECTION}' AS ducklake (DATA_PATH '{DATA_PATH}/inline_tx', METADATA_CATALOG 'ducklake_meta', DATA_INLINING_ROW_LIMIT 0) + +statement ok +SET immediate_transaction_mode=true + +statement ok +USE ducklake + +# ---------------------------------------------------------------------------------------------------------------------- +# Test 1: ROLLBACK after a successful inline-partitioned CREATE+INSERT. +# Rollback must discard the table entry, the planning-time-allocated partition_id/sort_id, and the data files. +# ---------------------------------------------------------------------------------------------------------------------- + +statement ok +BEGIN + +statement ok +CREATE TABLE rollback_part (i INTEGER, v VARCHAR) PARTITIONED BY (i) + +statement ok +INSERT INTO rollback_part VALUES (1, 'a'), (2, 'b') + +# table is visible inside the transaction +query I +SELECT count(*) FROM rollback_part +---- +2 + +statement ok +ROLLBACK + +# table is gone after rollback +statement error +SELECT * FROM rollback_part +---- +:.*not exist.* + +# the next inline CTAS proceeds cleanly - no leftover transaction-local catalog state poisoning subsequent +# planning (regression guard for the planning-time partition_id allocator). +statement ok +CREATE TABLE post_rollback PARTITIONED BY (a) AS SELECT 1 AS a + +query I +SELECT * FROM post_rollback +---- +1 + +# ---------------------------------------------------------------------------------------------------------------------- +# Test 2: ROLLBACK after CTAS with both inline clauses. The most invasive shape - exercises the full +# planning-time field_data + partition_data + sort_data construction, the sink-time CreateTableExtended +# branch, the data-file write, and the rollback cleanup. +# ---------------------------------------------------------------------------------------------------------------------- + +statement ok +BEGIN + +statement ok +CREATE TABLE rollback_ctas PARTITIONED BY (g) SORTED BY (v) AS +SELECT i % 3 AS g, i AS v FROM range(9) t(i) + +query I +SELECT count(*) FROM rollback_ctas +---- +9 + +statement ok +ROLLBACK + +statement error +SELECT * FROM rollback_ctas +---- +:.*not exist.* + +# verify the metadata catalog never saw the rolled-back partition_info / sort_info rows +query I +SELECT count(*) FROM ducklake_meta.ducklake_partition_info +WHERE table_id IN (SELECT table_id FROM ducklake_meta.ducklake_table WHERE table_name = 'rollback_ctas') +---- +0 + +query I +SELECT count(*) FROM ducklake_meta.ducklake_sort_info +WHERE table_id IN (SELECT table_id FROM ducklake_meta.ducklake_table WHERE table_name = 'rollback_ctas') +---- +0 + +# ---------------------------------------------------------------------------------------------------------------------- +# Test 3: ROLLBACK + same-name CREATE in a fresh transaction succeeds. The rolled-back table_id / +# partition_id were transaction-local and should not leak into the next transaction's namespace. +# ---------------------------------------------------------------------------------------------------------------------- + +statement ok +BEGIN + +statement ok +CREATE TABLE name_reuse PARTITIONED BY (a) AS SELECT 1 AS a, 'x' AS v + +statement ok +ROLLBACK + +# Same name, same partition column - works because the rollback wiped the prior attempt entirely. +statement ok +CREATE TABLE name_reuse PARTITIONED BY (a) AS SELECT 99 AS a, 'y' AS v + +query II +SELECT a, v FROM name_reuse +---- +99 y + +# ---------------------------------------------------------------------------------------------------------------------- +# Test 4: Two concurrent transactions both CREATE TABLE foo PARTITIONED BY ... AS SELECT with the SAME +# name. First to commit wins; second errors at COMMIT time. This is a name-collision conflict - the new +# inline clauses don't introduce a different conflict mode. +# ---------------------------------------------------------------------------------------------------------------------- + +statement ok con1 +BEGIN + +statement ok con2 +BEGIN + +statement ok con1 +CREATE TABLE ducklake.conflict_same_name PARTITIONED BY (i) AS SELECT 1 AS i + +statement ok con2 +CREATE TABLE ducklake.conflict_same_name SORTED BY (i) AS SELECT 2 AS i + +statement ok con1 +COMMIT + +statement error con2 +COMMIT +---- + +# con1's table is the surviving one - check its inline partition spec landed +query I +SELECT count(*) FROM ducklake_meta.ducklake_partition_info +WHERE table_id IN (SELECT table_id FROM ducklake_meta.ducklake_table WHERE table_name = 'conflict_same_name') +---- +1 + +query I +SELECT * FROM ducklake.conflict_same_name +---- +1 + +# ---------------------------------------------------------------------------------------------------------------------- +# Test 5: Two concurrent CTAS-with-inline-spec with DIFFERENT names. No conflict - each transaction +# allocates partition_id from its own counter; commit-time remap assigns globally-unique committed ids. +# This proves the planning-time partition_id allocation isn't a serialization point. +# ---------------------------------------------------------------------------------------------------------------------- + +statement ok con1 +BEGIN + +statement ok con2 +BEGIN + +statement ok con1 +CREATE TABLE ducklake.concurrent_a PARTITIONED BY (i) AS SELECT range AS i, 'a' AS src FROM range(5) + +statement ok con2 +CREATE TABLE ducklake.concurrent_b PARTITIONED BY (i) SORTED BY (i) AS SELECT range AS i, 'b' AS src FROM range(7) + +statement ok con1 +COMMIT + +statement ok con2 +COMMIT + +# both tables exist and have their own partition_info rows +query II +SELECT + (SELECT count(*) FROM ducklake_meta.ducklake_partition_info + WHERE table_id IN (SELECT table_id FROM ducklake_meta.ducklake_table WHERE table_name = 'concurrent_a')), + (SELECT count(*) FROM ducklake_meta.ducklake_partition_info + WHERE table_id IN (SELECT table_id FROM ducklake_meta.ducklake_table WHERE table_name = 'concurrent_b')) +---- +1 1 + +# the two tables have DISTINCT committed partition_ids (proves the commit-time remap deduplicates the +# transaction-local ids that started identical at planning time on each connection). +query I +SELECT count(DISTINCT partition_id) FROM ducklake_meta.ducklake_partition_info +WHERE table_id IN (SELECT table_id FROM ducklake_meta.ducklake_table WHERE table_name IN ('concurrent_a', 'concurrent_b')) +---- +2 + +query II +SELECT count(*), max(i) FROM ducklake.concurrent_a +---- +5 4 + +query II +SELECT count(*), max(i) FROM ducklake.concurrent_b +---- +7 6 + +# ---------------------------------------------------------------------------------------------------------------------- +# Test 6: Same-transaction inline CTAS + ALTER SET PARTITIONED BY + COMMIT. Last writer wins (the +# metadata-layer dedup collapses by table_id; final on-disk state references the ALTER's partition column). +# This is regression coverage for the chain-walk behavior change in this branch. +# ---------------------------------------------------------------------------------------------------------------------- + +statement ok +BEGIN + +statement ok +CREATE TABLE chain_winner PARTITIONED BY (a) SORTED BY (a) AS +SELECT range AS a, range + 100 AS b FROM range(3) + +statement ok +ALTER TABLE chain_winner SET PARTITIONED BY (b) + +statement ok +ALTER TABLE chain_winner SET SORTED BY (b DESC) + +statement ok +COMMIT + +# exactly one partition_info row (the last ALTER's) and one sort_info row (the last ALTER's) +query I +SELECT count(*) FROM ducklake_meta.ducklake_partition_info +WHERE table_id IN (SELECT table_id FROM ducklake_meta.ducklake_table WHERE table_name = 'chain_winner') +---- +1 + +query I +SELECT count(*) FROM ducklake_meta.ducklake_sort_info +WHERE table_id IN (SELECT table_id FROM ducklake_meta.ducklake_table WHERE table_name = 'chain_winner') +---- +1 + +# the surviving sort row is `b DESC`, not the inline `a ASC` +query III +SELECT expression, sort_direction, null_order FROM ducklake_meta.ducklake_sort_expression +WHERE table_id IN (SELECT table_id FROM ducklake_meta.ducklake_table WHERE table_name = 'chain_winner') +---- +b DESC NULLS_LAST + +# ---------------------------------------------------------------------------------------------------------------------- +# Test 7: CTAS that errors mid-execution because of a runtime constraint violation. The transaction must +# unwind cleanly - no zombie entry, no partition_info row +# ---------------------------------------------------------------------------------------------------------------------- + +statement error +CREATE TABLE error_ctas PARTITIONED BY (i) AS +SELECT CAST(range AS UTINYINT) AS i FROM range(300) +---- +:.*[Cc]onversion.*out of range.* + +# the failed CTAS must not have partially registered the table +statement error +SELECT * FROM error_ctas +---- +:.*not exist.* + +query I +SELECT count(*) FROM ducklake_meta.ducklake_partition_info +WHERE table_id IN (SELECT table_id FROM ducklake_meta.ducklake_table WHERE table_name = 'error_ctas') +---- +0 + +# ---------------------------------------------------------------------------------------------------------------------- +# Test 8: CTAS that errors at the BINDER stage (unknown column in PARTITIONED BY). This fails in +# PlanCreateTableAs *before* any planning-time partition_id allocation completes, so transaction state +# must be uncorrupted. +# ---------------------------------------------------------------------------------------------------------------------- + +statement ok +BEGIN + +statement error +CREATE TABLE binder_error PARTITIONED BY (nonexistent) AS SELECT 1 AS i +---- +:.*nonexistent.* + +# subsequent statements in the same transaction still work +statement ok +CREATE TABLE recovers_after_binder_error (i INTEGER) PARTITIONED BY (i) + +statement ok +INSERT INTO recovers_after_binder_error VALUES (42) + +statement ok +COMMIT + +query I +SELECT i FROM recovers_after_binder_error +---- +42 + +# ---------------------------------------------------------------------------------------------------------------------- +# Test 9: Long transaction that mixes inline CTAS + plain INSERT into other tables. Verifies the inline +# clauses don't disturb the transaction's broader commit logic. +# ---------------------------------------------------------------------------------------------------------------------- + +statement ok +CREATE TABLE existing_target (k INTEGER, v VARCHAR) + +statement ok +BEGIN + +statement ok +INSERT INTO existing_target VALUES (1, 'first') + +statement ok +CREATE TABLE long_tx_ctas PARTITIONED BY (g) AS SELECT i AS g, i + 100 AS v FROM range(4) t(i) + +statement ok +INSERT INTO existing_target VALUES (2, 'second') + +statement ok +INSERT INTO long_tx_ctas VALUES (10, 110), (11, 111) + +statement ok +COMMIT + +query II +SELECT k, v FROM existing_target ORDER BY k +---- +1 first +2 second + +query II +SELECT g, count(*) FROM long_tx_ctas GROUP BY g ORDER BY g +---- +0 1 +1 1 +2 1 +3 1 +10 1 +11 1 + +# ---------------------------------------------------------------------------------------------------------------------- +# Test 10: Two inline CTAS in one transaction, ROLLBACK. Both tables AND both partition specs unwind. +# Guards against any per-CTAS state that might accumulate beyond the transaction-local catalog set. +# ---------------------------------------------------------------------------------------------------------------------- + +statement ok +BEGIN + +statement ok +CREATE TABLE rb_a PARTITIONED BY (i) AS SELECT range AS i FROM range(3) + +statement ok +CREATE TABLE rb_b PARTITIONED BY (j) SORTED BY (j) AS SELECT range AS j FROM range(4) + +statement ok +ROLLBACK + +statement error +SELECT * FROM rb_a +---- +:.*not exist.* + +statement error +SELECT * FROM rb_b +---- +:.*not exist.* + +# neither rolled-back table left a partition_info row +query I +SELECT count(*) FROM ducklake_meta.ducklake_partition_info +WHERE table_id IN (SELECT table_id FROM ducklake_meta.ducklake_table WHERE table_name IN ('rb_a','rb_b')) +---- +0 + +# subsequent CTAS in fresh transaction succeeds and gets fresh ids - no collision with rolled-back local ids +statement ok +CREATE TABLE rb_after PARTITIONED BY (i) AS SELECT 1 AS i + +query I +SELECT * FROM rb_after +---- +1 + +# ---------------------------------------------------------------------------------------------------------------------- +# Test 11: Snapshot isolation. con2 begins BEFORE con1's inline-partitioned CTAS commits. con2 must NOT +# observe the new table even though it's already committed in the metadata DB. The partition_info row's +# begin_snapshot is the visibility gate. +# ---------------------------------------------------------------------------------------------------------------------- + +statement ok con2 +BEGIN + +statement ok con1 +CREATE TABLE ducklake.snap_iso PARTITIONED BY (i) AS SELECT 1 AS i + +# con2's snapshot predates the CTAS commit +statement error con2 +SELECT * FROM ducklake.snap_iso +---- +:.*not exist.* + +statement ok con2 +COMMIT + +# new transaction sees it +query I +SELECT * FROM ducklake.snap_iso +---- +1 + +# ---------------------------------------------------------------------------------------------------------------------- +# Test 12: Same-transaction INSERT after inline CTAS uses the partition spec. After commit, every data +# file (CTAS-written + INSERT-written) carries a partition_id, and on-disk paths are hive-partitioned. +# ---------------------------------------------------------------------------------------------------------------------- + +statement ok +BEGIN + +statement ok +CREATE TABLE same_tx_part PARTITIONED BY (g) AS SELECT range AS g, range + 100 AS v FROM range(3) + +statement ok +INSERT INTO same_tx_part VALUES (10, 110), (10, 111), (11, 112) + +statement ok +COMMIT + +# every data file references the partition_id - CTAS-written and INSERT-written alike +query I +SELECT count(*) FROM ducklake_meta.ducklake_data_file +WHERE table_id = (SELECT table_id FROM ducklake_meta.ducklake_table WHERE table_name='same_tx_part') + AND partition_id IS NULL +---- +0 + +query II +SELECT g, count(*) FROM same_tx_part GROUP BY g ORDER BY g +---- +0 1 +1 1 +2 1 +10 2 +11 1 diff --git a/test/sql/create_table_inline_partition_sort/create_table_inline_with_inlining.test b/test/sql/create_table_inline_partition_sort/create_table_inline_with_inlining.test new file mode 100644 index 00000000000..7538a72c6da --- /dev/null +++ b/test/sql/create_table_inline_partition_sort/create_table_inline_with_inlining.test @@ -0,0 +1,215 @@ +# name: test/sql/create_table_inline_partition_sort/create_table_inline_with_inlining.test +# description: CTAS with inline PARTITIONED BY / SORTED BY interacting with the data-inlining row limit. +# When a CTAS's row count is below DATA_INLINING_ROW_LIMIT, rows go to the per-table +# ducklake_inlined_data__ catalog table instead of Parquet. Partition and sort +# metadata must still be recorded; the inline operator coordinates with the about-to-be-created +# table_id at sink-state-init time. Once a flush is called, inlined rows materialize as +# partition-aware Parquet files. (On-disk hive directory verification is intentionally omitted - +# that's covered by create_table_inline_basic.test / create_table_inline_ctas.test.) +# group: [create_table_inline_partition_sort] + +require ducklake + +require parquet + +test-env DUCKLAKE_CONNECTION {TEST_DIR}/{UUID}.db + +test-env DATA_PATH {TEST_DIR} + +# DATA_INLINING_ROW_LIMIT is non-zero (the historic default-shaped CTAS path). Pick 1000 so we can write +# both fully-inlined and overflow-to-Parquet cases below. +statement ok +ATTACH 'ducklake:{DUCKLAKE_CONNECTION}' AS ducklake (DATA_PATH '{DATA_PATH}/inline_with_inlining', METADATA_CATALOG 'ducklake_meta', DATA_INLINING_ROW_LIMIT 1000) + +statement ok +USE ducklake + +# ---------------------------------------------------------------------------------------------------------------------- +# Test 1: CTAS that fits entirely under DATA_INLINING_ROW_LIMIT, with PARTITIONED BY (i). +# Rows go to ducklake_inlined_data__*; ducklake_data_file stays empty for this table. +# Partition metadata is still recorded - the partition_id is allocated at planning time and bound to the +# table entry at sink-state-init regardless of whether the rows landed in Parquet or in the inlined table. +# ---------------------------------------------------------------------------------------------------------------------- + +statement ok +CREATE TABLE small_part PARTITIONED BY (i) AS +SELECT range AS i, range::VARCHAR AS v FROM range(50) + +# data round-trips (rows readable whether inlined or in Parquet) +query I +SELECT count(*) FROM small_part +---- +50 + +# partition metadata recorded for the inlined CTAS +query I +SELECT count(*) FROM ducklake_meta.ducklake_partition_info +WHERE table_id = (SELECT table_id FROM ducklake_meta.ducklake_table WHERE table_name = 'small_part') +---- +1 + +query II +SELECT partition_key_index, transform FROM ducklake_meta.ducklake_partition_column +WHERE table_id = (SELECT table_id FROM ducklake_meta.ducklake_table WHERE table_name = 'small_part') +---- +0 identity + +# zero data_file rows: rows are sitting in the inlined-data catalog table, not on Parquet +query I +SELECT count(*) FROM ducklake_meta.ducklake_data_file +WHERE table_id = (SELECT table_id FROM ducklake_meta.ducklake_table WHERE table_name = 'small_part') +---- +0 + +# the per-table inlined-data table exists in the metadata catalog +query I +SELECT count(*) FROM ducklake_meta.ducklake_inlined_data_tables +WHERE table_id = (SELECT table_id FROM ducklake_meta.ducklake_table WHERE table_name = 'small_part') +---- +1 + +# ---------------------------------------------------------------------------------------------------------------------- +# Test 2: CTAS that fits under DATA_INLINING_ROW_LIMIT with both PARTITIONED BY and SORTED BY clauses. +# Both partition and sort metadata land in their respective catalog tables even though the data is inlined. +# ---------------------------------------------------------------------------------------------------------------------- + +statement ok +CREATE TABLE small_both PARTITIONED BY (g) SORTED BY (v) AS +SELECT i % 4 AS g, i AS v FROM range(20) t(i) + +query I +SELECT count(*) FROM small_both +---- +20 + +query I +SELECT count(*) FROM ducklake_meta.ducklake_partition_info +WHERE table_id = (SELECT table_id FROM ducklake_meta.ducklake_table WHERE table_name = 'small_both') +---- +1 + +query I +SELECT count(*) FROM ducklake_meta.ducklake_sort_info +WHERE table_id = (SELECT table_id FROM ducklake_meta.ducklake_table WHERE table_name = 'small_both') +---- +1 + +# the sort_expression row carries the planning-time-allocated direction/null_order +query III +SELECT expression, sort_direction, null_order FROM ducklake_meta.ducklake_sort_expression +WHERE table_id = (SELECT table_id FROM ducklake_meta.ducklake_table WHERE table_name = 'small_both') +---- +v ASC NULLS_LAST + +# fully inlined: zero Parquet files +query I +SELECT count(*) FROM ducklake_meta.ducklake_data_file +WHERE table_id = (SELECT table_id FROM ducklake_meta.ducklake_table WHERE table_name = 'small_both') +---- +0 + +# ---------------------------------------------------------------------------------------------------------------------- +# Test 3: Flushing inlined data after a fully-inlined partitioned CTAS. +# `ducklake_flush_inlined_data` materializes the inlined rows as Parquet files using the table's +# partition spec. Post-flush, ducklake_data_file gains rows that reference the partition_id. +# ---------------------------------------------------------------------------------------------------------------------- + +statement ok +CALL ducklake_flush_inlined_data('ducklake') + +# at least one data file now exists for small_part, and each carries a partition_id (the inlining-then-flush +# path hands the spec through correctly) +query I +SELECT count(*) > 0 FROM ducklake_meta.ducklake_data_file +WHERE table_id = (SELECT table_id FROM ducklake_meta.ducklake_table WHERE table_name = 'small_part') +---- +true + +query I +SELECT count(*) FROM ducklake_meta.ducklake_data_file +WHERE table_id = (SELECT table_id FROM ducklake_meta.ducklake_table WHERE table_name = 'small_part') + AND partition_id IS NULL +---- +0 + +# data still visible after flush +query I +SELECT count(*) FROM small_part +---- +50 + +# ---------------------------------------------------------------------------------------------------------------------- +# Test 4: CTAS that OVERFLOWS DATA_INLINING_ROW_LIMIT (1000). 5000 rows -> the operator routes everything +# above the limit to Parquet. The partition spec applies to the Parquet writes; the sort spec applies to +# the row stream BEFORE the inline operator (sort_on_insert=true default). +# ---------------------------------------------------------------------------------------------------------------------- + +statement ok +CREATE TABLE overflow_part PARTITIONED BY (g) SORTED BY (v) AS +SELECT i % 8 AS g, i AS v FROM range(5000) t(i) + +query I +SELECT count(*) FROM overflow_part +---- +5000 + +# partition + sort metadata persist +query II +SELECT + (SELECT count(*) FROM ducklake_meta.ducklake_partition_info + WHERE table_id = (SELECT table_id FROM ducklake_meta.ducklake_table WHERE table_name = 'overflow_part')), + (SELECT count(*) FROM ducklake_meta.ducklake_sort_info + WHERE table_id = (SELECT table_id FROM ducklake_meta.ducklake_table WHERE table_name = 'overflow_part')) +---- +1 1 + +# overflow path: at least one Parquet file was written, and each has a non-null partition_id +query I +SELECT count(*) > 0 FROM ducklake_meta.ducklake_data_file +WHERE table_id = (SELECT table_id FROM ducklake_meta.ducklake_table WHERE table_name = 'overflow_part') +---- +true + +query I +SELECT count(*) FROM ducklake_meta.ducklake_data_file +WHERE table_id = (SELECT table_id FROM ducklake_meta.ducklake_table WHERE table_name = 'overflow_part') + AND partition_id IS NULL +---- +0 + +# ---------------------------------------------------------------------------------------------------------------------- +# Test 5: Flushing the overflow_part table (which had data both inlined and on Parquet). After flush, all +# rows should be on Parquet with non-null partition_id, and the inlined-data table count for it goes to 0 +# (or the inlined-data tables are cleared per-table by the flush). +# ---------------------------------------------------------------------------------------------------------------------- + +statement ok +CALL ducklake_flush_inlined_data('ducklake') + +# overflow_part: zero rows in any inlined-data table after flush +query I +SELECT count(*) FROM ducklake_meta.ducklake_data_file +WHERE table_id = (SELECT table_id FROM ducklake_meta.ducklake_table WHERE table_name = 'overflow_part') + AND partition_id IS NULL +---- +0 + +# the row count is preserved end-to-end (inlined → flushed → Parquet) +query I +SELECT count(*) FROM overflow_part +---- +5000 + +# small_both: same checks - the partition+sort spec persists through inlining + flush +query I +SELECT count(*) FROM small_both +---- +20 + +query II +SELECT g, count(*) FROM small_both GROUP BY g ORDER BY g +---- +0 5 +1 5 +2 5 +3 5