Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 10 additions & 5 deletions src/functions/ducklake_compaction_functions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,16 +47,15 @@ vector<OrderByNode> DuckLakeCompactor::ParseSortOrders(const DuckLakeSort &sort_
}

//! Binds ORDER BY expressions directly using ExpressionBinder.
vector<BoundOrderByNode> DuckLakeCompactor::BindSortOrders(Binder &binder, DuckLakeTableEntry &table,
TableIndex table_index,
vector<BoundOrderByNode> DuckLakeCompactor::BindSortOrders(Binder &binder, const ColumnList &columns,
const string &table_name, TableIndex table_index,
vector<OrderByNode> &pre_bound_orders) {
auto &columns = table.GetColumns();
auto column_names = columns.GetColumnNames();
auto column_types = columns.GetColumnTypes();

// Create a child binder with the table columns in scope
auto child_binder = Binder::CreateBinder(binder.context, &binder);
child_binder->bind_context.AddGenericBinding(table_index, table.name, column_names, column_types);
child_binder->bind_context.AddGenericBinding(table_index, table_name, column_names, column_types);

// Bind each ORDER BY expression directly
vector<BoundOrderByNode> orders;
Expand All @@ -69,6 +68,12 @@ vector<BoundOrderByNode> DuckLakeCompactor::BindSortOrders(Binder &binder, DuckL
return orders;
}

vector<BoundOrderByNode> DuckLakeCompactor::BindSortOrders(Binder &binder, DuckLakeTableEntry &table,
TableIndex table_index,
vector<OrderByNode> &pre_bound_orders) {
return BindSortOrders(binder, table.GetColumns(), table.name, table_index, pre_bound_orders);
}

//===--------------------------------------------------------------------===//
// Compaction Operator
//===--------------------------------------------------------------------===//
Expand Down Expand Up @@ -375,7 +380,7 @@ unique_ptr<LogicalOperator> DuckLakeCompactor::InsertSort(Binder &binder, unique
}

// Validate all column references in sort expressions exist in the table
DuckLakeTableEntry::ValidateSortExpressionColumns(table, pre_bound_orders);
DuckLakeTableEntry::ValidateSortExpressionColumns(table.GetColumns(), pre_bound_orders);

// Resolve types for the input plan (could be LogicalGet or LogicalProjection)
plan->ResolveOperatorTypes();
Expand Down
4 changes: 4 additions & 0 deletions src/include/functions/ducklake_compaction_functions.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,10 @@ class DuckLakeCompactor {
static vector<OrderByNode> ParseSortOrders(const DuckLakeSort &sort_data);
static vector<BoundOrderByNode> BindSortOrders(Binder &binder, DuckLakeTableEntry &table, TableIndex table_index,
vector<OrderByNode> &pre_bound_orders);
//! Overload that takes (columns, table_name) directly so CTAS planning can sort-bind before any
//! DuckLakeTableEntry exists. Behaves identically to the entry-taking overload.
static vector<BoundOrderByNode> BindSortOrders(Binder &binder, const ColumnList &columns, const string &table_name,
TableIndex table_index, vector<OrderByNode> &pre_bound_orders);

private:
ClientContext &context;
Expand Down
3 changes: 3 additions & 0 deletions src/include/storage/ducklake_catalog.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ class DuckLakeFieldData;
struct DuckLakeFileListEntry;
struct DuckLakeConfigOption;
struct DeleteFileMap;
struct BoundCreateTableInfo;
class LogicalGet;

//! Cache entry for DuckLake table statistics
Expand Down Expand Up @@ -135,6 +136,8 @@ class DuckLakeCatalog : public Catalog {

optional_ptr<CatalogEntry> CreateSchema(CatalogTransaction transaction, CreateSchemaInfo &info) override;

ErrorData SupportsCreateTable(BoundCreateTableInfo &info) override;

void ScanSchemas(ClientContext &context, std::function<void(SchemaCatalogEntry &)> callback) override;

optional_ptr<SchemaCatalogEntry> LookupSchema(CatalogTransaction transaction, const EntryLookupInfo &schema_lookup,
Expand Down
27 changes: 21 additions & 6 deletions src/include/storage/ducklake_insert.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,14 @@
#include "storage/ducklake_stats.hpp"
#include "common/ducklake_data_file.hpp"
#include "storage/ducklake_field_data.hpp"
#include "storage/ducklake_partition_data.hpp"
#include "storage/ducklake_sort_data.hpp"

namespace duckdb {
class DuckLakeCatalog;
class DuckLakeSchemaEntry;
class DuckLakeTableEntry;
class DuckLakeFieldData;
struct DuckLakePartition;
struct DuckLakeCopyOptions;
struct DuckLakeCopyInput;

Expand All @@ -41,15 +42,19 @@ class DuckLakeInsertGlobalState : public GlobalSinkState {

class DuckLakeInsert : public PhysicalOperator {
public:
//! INSERT INTO
//! INSERT INTO an existing table.
DuckLakeInsert(PhysicalPlan &physical_plan, const vector<LogicalType> &types, DuckLakeTableEntry &table,
optional_idx partition_id, string encryption_key);
//! CREATE TABLE AS
//! CREATE TABLE AS - the table is created in GetGlobalSinkState. Any inline
//! PARTITIONED BY / SORTED BY clauses are pre-built into ctas_partition_data / ctas_sort_data at planning
//! time so the physical_copy upstream of this operator can hive-partition the write with a partition_id
//! that matches what gets attached to the table entry at sink-state-init.
DuckLakeInsert(PhysicalPlan &physical_plan, const vector<LogicalType> &types, SchemaCatalogEntry &schema,
unique_ptr<BoundCreateTableInfo> info, string table_uuid, string table_data_path,
string encryption_key);
unique_ptr<DuckLakePartition> ctas_partition_data, unique_ptr<DuckLakeSort> ctas_sort_data,
optional_idx partition_id, string encryption_key);

//! The table to insert into
//! The table to insert into (only set for INSERT INTO; nullptr for CTAS until GetGlobalSinkState resolves)
optional_ptr<DuckLakeTableEntry> table;
//! Table schema, in case of CREATE TABLE AS
optional_ptr<SchemaCatalogEntry> schema;
Expand All @@ -59,6 +64,11 @@ class DuckLakeInsert : public PhysicalOperator {
string table_uuid;
//! The table data path, in case of CREATE TABLE AS
string table_data_path;
//! Pre-built partition spec for CTAS (allocated at planning time so the physical write carries the
//! correct partition_id).
unique_ptr<DuckLakePartition> ctas_partition_data;
//! Pre-built sort spec for CTAS (same lifecycle as ctas_partition_data).
unique_ptr<DuckLakeSort> ctas_sort_data;
//! The partition id we are writing into (if any)
optional_idx partition_id;
//! The encryption key used for writing the Parquet files
Expand Down Expand Up @@ -140,8 +150,13 @@ struct DuckLakeCopyOptions {

struct DuckLakeCopyInput {
explicit DuckLakeCopyInput(ClientContext &context, DuckLakeTableEntry &table, const string &hive_partition = "");
//! CTAS-flavored: take the field-id mapping and (optional) partition spec from the planning-time-built
//! spec rather than from a DuckLakeTableEntry that hasn't been created yet. field_data is required
//! because the parquet writer always needs it; partition_data is null when CREATE TABLE AS has no
//! inline PARTITIONED BY clause.
DuckLakeCopyInput(ClientContext &context, DuckLakeSchemaEntry &schema, const ColumnList &columns,
const string &data_path_p);
const string &data_path_p, DuckLakeFieldData &field_data,
optional_ptr<DuckLakePartition> partition_data = nullptr);

DuckLakeCatalog &catalog;
optional_ptr<DuckLakePartition> partition_data;
Expand Down
8 changes: 7 additions & 1 deletion src/include/storage/ducklake_schema_entry.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@

#include "duckdb/catalog/catalog_entry/schema_catalog_entry.hpp"
#include "storage/ducklake_catalog_set.hpp"
#include "storage/ducklake_partition_data.hpp"
#include "storage/ducklake_sort_data.hpp"

namespace duckdb {
class DuckLakeTransaction;
Expand All @@ -32,8 +34,12 @@ class DuckLakeSchemaEntry : public SchemaCatalogEntry {
}

public:
//! Create a DuckLakeTableEntry and register it with the transaction.
//! When prebuilt_partition_data / prebuilt_sort_data are used by CTAS when partitioned or sorted
optional_ptr<CatalogEntry> CreateTableExtended(CatalogTransaction transaction, BoundCreateTableInfo &info,
string table_uuid, string table_data_path);
string table_uuid, string table_data_path,
unique_ptr<DuckLakePartition> prebuilt_partition_data = nullptr,
unique_ptr<DuckLakeSort> prebuilt_sort_data = nullptr);
unique_ptr<CreateInfo> GetInfo() const override;
optional_ptr<CatalogEntry> CreateTable(CatalogTransaction transaction, BoundCreateTableInfo &info) override;
optional_ptr<CatalogEntry> CreateFunction(CatalogTransaction transaction, CreateFunctionInfo &info) override;
Expand Down
13 changes: 11 additions & 2 deletions src/include/storage/ducklake_table_entry.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,17 @@ class DuckLakeTableEntry : public TableCatalogEntry {
virtual_column_map_t GetVirtualColumns() const override;
vector<column_t> GetRowIdColumns() const override;

//! Validates that all column references in sort expressions exist in the table
static void ValidateSortExpressionColumns(DuckLakeTableEntry &table, const vector<OrderByNode> &orders);
//! Validates that all column references in sort expressions exist in the column list.
//! Takes (columns) directly so it can run at CTAS planning time before a DuckLakeTableEntry exists.
static void ValidateSortExpressionColumns(const ColumnList &columns, const vector<OrderByNode> &orders);

//! Build a DuckLakePartition from raw partition expressions (allocates a transaction-local id).
static unique_ptr<DuckLakePartition> BuildPartitionData(DuckLakeTransaction &transaction, const ColumnList &columns,
DuckLakeFieldData &field_data,
const vector<unique_ptr<ParsedExpression>> &partition_keys);
//! Build a DuckLakeSort from a vector of OrderByNode (allocates a transaction-local id).
static unique_ptr<DuckLakeSort> BuildSortData(DuckLakeTransaction &transaction, const ColumnList &columns,
const vector<OrderByNode> &orders);

private:
unique_ptr<CatalogEntry> AlterTable(DuckLakeTransaction &transaction, RenameTableInfo &info);
Expand Down
11 changes: 11 additions & 0 deletions src/storage/ducklake_catalog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include "duckdb/parser/parsed_data/create_table_info.hpp"
#include "duckdb/parser/parsed_data/create_view_info.hpp"
#include "duckdb/parser/parsed_data/drop_info.hpp"
#include "duckdb/planner/parsed_data/bound_create_table_info.hpp"
#include "duckdb/storage/database_size.hpp"
#include "storage/ducklake_initializer.hpp"
#include "storage/ducklake_schema_entry.hpp"
Expand Down Expand Up @@ -155,6 +156,16 @@ optional_ptr<CatalogEntry> DuckLakeCatalog::CreateSchema(CatalogTransaction tran
return result;
}

ErrorData DuckLakeCatalog::SupportsCreateTable(BoundCreateTableInfo &info) {
auto &base = info.Base().Cast<CreateTableInfo>();
if (!base.options.empty()) {
return ErrorData(
ExceptionType::CATALOG,
StringUtil::Format("WITH clause is not supported for tables in a %s catalog", GetCatalogType()));
}
return ErrorData();
}

void DuckLakeCatalog::DropSchema(ClientContext &context, DropInfo &info) {
auto schema = GetSchema(GetCatalogTransaction(context), info.name, info.if_not_found);
if (!schema) {
Expand Down
Loading
Loading