Skip to content
Open
15 changes: 11 additions & 4 deletions src/functions/ducklake_compaction_functions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,16 @@ vector<OrderByNode> DuckLakeCompactor::ParseSortOrders(const DuckLakeSort &sort_
}

//! Binds ORDER BY expressions directly using ExpressionBinder.
vector<BoundOrderByNode> DuckLakeCompactor::BindSortOrders(Binder &binder, DuckLakeTableEntry &table, idx_t table_index,
vector<BoundOrderByNode> DuckLakeCompactor::BindSortOrders(Binder &binder, const ColumnList &columns,
const string &table_name, TableIndex table_index,
vector<OrderByNode> &pre_bound_orders) {
auto &columns = table.GetColumns();
auto column_names = columns.GetColumnNames();
auto column_types = columns.GetColumnTypes();

// Create a child binder with the table columns in scope
auto child_binder = Binder::CreateBinder(binder.context, &binder);
child_binder->bind_context.AddGenericBinding(table_index, table.name, column_names, column_types);
// AddGenericBinding takes the raw idx_t binding id, not DuckLake's TableIndex wrapper.
child_binder->bind_context.AddGenericBinding(table_index.index, table_name, column_names, column_types);

// Bind each ORDER BY expression directly
vector<BoundOrderByNode> orders;
Expand All @@ -68,6 +69,12 @@ vector<BoundOrderByNode> DuckLakeCompactor::BindSortOrders(Binder &binder, DuckL
return orders;
}

vector<BoundOrderByNode> DuckLakeCompactor::BindSortOrders(Binder &binder, DuckLakeTableEntry &table,
idx_t table_index,
vector<OrderByNode> &pre_bound_orders) {
return BindSortOrders(binder, table.GetColumns(), table.name, TableIndex(table_index), pre_bound_orders);
}

//===--------------------------------------------------------------------===//
// Compaction Operator
//===--------------------------------------------------------------------===//
Expand Down Expand Up @@ -384,7 +391,7 @@ unique_ptr<LogicalOperator> DuckLakeCompactor::InsertSort(Binder &binder, unique
}

// Validate all column references in sort expressions exist in the table
DuckLakeTableEntry::ValidateSortExpressionColumns(table, pre_bound_orders);
DuckLakeTableEntry::ValidateSortExpressionColumns(table.GetColumns(), pre_bound_orders);

// Resolve types for the input plan (could be LogicalGet or LogicalProjection)
plan->ResolveOperatorTypes();
Expand Down
61 changes: 48 additions & 13 deletions src/functions/ducklake_set_option.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,40 @@
#include "storage/ducklake_table_entry.hpp"
#include "storage/ducklake_schema_entry.hpp"

#include "duckdb/execution/expression_executor.hpp"
#include "duckdb/parser/parsed_expression.hpp"
#include "duckdb/planner/binder.hpp"
#include "duckdb/planner/expression_binder/constant_binder.hpp"

namespace duckdb {

vector<DuckLakeTag> ValidateOptionsInCreateWith(ClientContext &context,
const case_insensitive_map_t<unique_ptr<ParsedExpression>> &options) {
// Bind via ConstantBinder so each value expression must fold to a literal (no column refs, no
// subqueries) — mirrors upstream ATTACH (bind_attach.cpp) and CREATE SECRET (bind_create.cpp).
// `allow_unfoldable=true` on EvaluateScalar lets volatile functions like random() through; that's
// fine here because we only persist the resulting Value, not the expression tree.
// Each expression is copied before binding because CTAS calls this twice (plan time + sink init)
// over the same options map; consuming the map (the upstream pattern) would break the second call.
auto binder = Binder::CreateBinder(context);
ConstantBinder option_binder(*binder, context, "DuckLake WITH option");
vector<DuckLakeTag> result;
result.reserve(options.size());
for (auto &option : options) {
if (!option.second) {
throw BinderException("WITH option \"%s\" requires a value", option.first);
}
auto expr_copy = option.second->Copy();
auto bound_expr = option_binder.Bind(expr_copy);
if (bound_expr->HasParameter()) {
throw ParameterNotResolvedException();
}
auto value = ExpressionExecutor::EvaluateScalar(context, *bound_expr, true);
result.push_back(ValidateDuckLakeConfigOption(context, option.first, value));
}
return result;
}

struct DuckLakeSetOptionData : public TableFunctionData {
DuckLakeSetOptionData(Catalog &catalog, DuckLakeConfigOption option_p)
: catalog(catalog), option(std::move(option_p)) {
Expand All @@ -15,17 +47,12 @@ struct DuckLakeSetOptionData : public TableFunctionData {
DuckLakeConfigOption option;
};

static unique_ptr<FunctionData> DuckLakeSetOptionBind(ClientContext &context, TableFunctionBindInput &input,
vector<LogicalType> &return_types, vector<string> &names) {
auto &catalog = DuckLakeBaseMetadataFunction::GetCatalog(context, input.inputs[0]);
DuckLakeConfigOption config_option;
auto &option = config_option.option.key;
auto &value = config_option.option.value;

option = StringUtil::Lower(StringValue::Get(input.inputs[1]));
auto &val = input.inputs[2];
DuckLakeTag ValidateDuckLakeConfigOption(ClientContext &context, const string &option_key, const Value &val) {
DuckLakeTag result;
auto option = StringUtil::Lower(option_key);
result.key = option;
auto &value = result.value;

// read the option
if (option == "parquet_compression") {
auto codec = val.DefaultCastAs(LogicalType::VARCHAR).GetValue<string>();
vector<string> supported_algorithms {"uncompressed", "snappy", "gzip", "zstd", "brotli", "lz4", "lz4_raw"};
Expand Down Expand Up @@ -82,9 +109,8 @@ static unique_ptr<FunctionData> DuckLakeSetOptionBind(ClientContext &context, Ta
} else if (option == "delete_older_than" || option == "expire_older_than") {
auto interval_value = val.ToString();
if (!interval_value.empty()) {
// Let's verify this is actually an interval
interval_t result;
if (!Interval::FromString(val.ToString(), result)) {
interval_t interval_result;
if (!Interval::FromString(val.ToString(), interval_result)) {
throw BinderException("%s is not a valid interval value.", option);
}
}
Expand All @@ -103,6 +129,15 @@ static unique_ptr<FunctionData> DuckLakeSetOptionBind(ClientContext &context, Ta
} else {
throw NotImplementedException("Unsupported option %s", option);
}
return result;
}

static unique_ptr<FunctionData> DuckLakeSetOptionBind(ClientContext &context, TableFunctionBindInput &input,
vector<LogicalType> &return_types, vector<string> &names) {
auto &catalog = DuckLakeBaseMetadataFunction::GetCatalog(context, input.inputs[0]);
DuckLakeConfigOption config_option;
config_option.option = ValidateDuckLakeConfigOption(context, StringValue::Get(input.inputs[1]), input.inputs[2]);
auto &option = config_option.option.key;

// read the scope
string schema;
Expand Down
4 changes: 4 additions & 0 deletions src/include/functions/ducklake_compaction_functions.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,10 @@ class DuckLakeCompactor {
static vector<OrderByNode> ParseSortOrders(const DuckLakeSort &sort_data);
static vector<BoundOrderByNode> BindSortOrders(Binder &binder, DuckLakeTableEntry &table, idx_t table_index,
vector<OrderByNode> &pre_bound_orders);
//! Overload that takes (columns, table_name) directly so CTAS planning can sort-bind before any
//! DuckLakeTableEntry exists. Behaves identically to the entry-taking overload.
static vector<BoundOrderByNode> BindSortOrders(Binder &binder, const ColumnList &columns, const string &table_name,
TableIndex table_index, vector<OrderByNode> &pre_bound_orders);

private:
ClientContext &context;
Expand Down
15 changes: 15 additions & 0 deletions src/include/functions/ducklake_table_functions.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,22 @@

namespace duckdb {
class DuckLakeCatalog;
class ParsedExpression;
struct DuckLakeSnapshotInfo;
struct DuckLakeTag;

//! Validate and canonicalize a single (key, value) DuckLake config option.
//! Shared by `CALL ducklake.set_option(...)` and CREATE TABLE / CTAS `WITH (...)`.
DuckLakeTag ValidateDuckLakeConfigOption(ClientContext &context, const string &option_key, const Value &val);

//! Bind, fold, and validate every entry in a CREATE TABLE / CTAS `WITH (...)` options map.
//! Each value expression must fold to a literal under ConstantBinder (constants, `getvariable(...)`,
//! `upper('zstd')`, etc.); each key/value is then run through ValidateDuckLakeConfigOption.
//! Empty input → empty output. Throws BinderException on any unbindable expression or unknown key.
//! The input map is left intact — copies each expression before binding because CTAS calls this
//! both at plan time (PlanCreateTableAs) and again during sink-state-init (CreateTableExtended).
vector<DuckLakeTag> ValidateOptionsInCreateWith(ClientContext &context,
const case_insensitive_map_t<unique_ptr<ParsedExpression>> &options);

class DuckLakeTableFunctionUtil {
public:
Expand Down
3 changes: 3 additions & 0 deletions src/include/storage/ducklake_catalog.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ class DuckLakeFieldData;
struct DuckLakeFileListEntry;
struct DuckLakeConfigOption;
struct DeleteFileMap;
struct BoundCreateTableInfo;
class LogicalGet;

//! Cache entry for DuckLake table statistics
Expand Down Expand Up @@ -135,6 +136,8 @@ class DuckLakeCatalog : public Catalog {

optional_ptr<CatalogEntry> CreateSchema(CatalogTransaction transaction, CreateSchemaInfo &info) override;

ErrorData SupportsCreateTable(BoundCreateTableInfo &info) override;

void ScanSchemas(ClientContext &context, std::function<void(SchemaCatalogEntry &)> callback) override;

optional_ptr<SchemaCatalogEntry> LookupSchema(CatalogTransaction transaction, const EntryLookupInfo &schema_lookup,
Expand Down
41 changes: 35 additions & 6 deletions src/include/storage/ducklake_insert.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,17 @@
#include "duckdb/common/index_vector.hpp"
#include "storage/ducklake_stats.hpp"
#include "common/ducklake_data_file.hpp"
#include "common/ducklake_options.hpp"
#include "storage/ducklake_field_data.hpp"
#include "storage/ducklake_partition_data.hpp"
#include "storage/ducklake_sort_data.hpp"

namespace duckdb {
class DuckLakeCatalog;
class DuckLakeSchemaEntry;
class DuckLakeTableEntry;
class ParsedExpression;
class DuckLakeFieldData;
struct DuckLakePartition;
struct DuckLakeCopyOptions;
struct DuckLakeCopyInput;

Expand All @@ -41,15 +44,19 @@ class DuckLakeInsertGlobalState : public GlobalSinkState {

class DuckLakeInsert : public PhysicalOperator {
public:
//! INSERT INTO
//! INSERT INTO an existing table.
DuckLakeInsert(PhysicalPlan &physical_plan, const vector<LogicalType> &types, DuckLakeTableEntry &table,
optional_idx partition_id, string encryption_key);
//! CREATE TABLE AS
//! CREATE TABLE AS - the table is created in GetGlobalSinkState. Any inline
//! PARTITIONED BY / SORTED BY clauses are pre-built into ctas_partition_data / ctas_sort_data at planning
//! time so the physical_copy upstream of this operator can hive-partition the write with a partition_id
//! that matches what gets attached to the table entry at sink-state-init.
DuckLakeInsert(PhysicalPlan &physical_plan, const vector<LogicalType> &types, SchemaCatalogEntry &schema,
unique_ptr<BoundCreateTableInfo> info, string table_uuid, string table_data_path,
string encryption_key);
unique_ptr<DuckLakePartition> ctas_partition_data, unique_ptr<DuckLakeSort> ctas_sort_data,
optional_idx partition_id, string encryption_key);

//! The table to insert into
//! The table to insert into (only set for INSERT INTO; nullptr for CTAS until GetGlobalSinkState resolves)
optional_ptr<DuckLakeTableEntry> table;
//! Table schema, in case of CREATE TABLE AS
optional_ptr<SchemaCatalogEntry> schema;
Expand All @@ -59,6 +66,11 @@ class DuckLakeInsert : public PhysicalOperator {
string table_uuid;
//! The table data path, in case of CREATE TABLE AS
string table_data_path;
//! Pre-built partition spec for CTAS (allocated at planning time so the physical write carries the
//! correct partition_id).
unique_ptr<DuckLakePartition> ctas_partition_data;
//! Pre-built sort spec for CTAS (same lifecycle as ctas_partition_data).
unique_ptr<DuckLakeSort> ctas_sort_data;
//! The partition id we are writing into (if any)
optional_idx partition_id;
//! The encryption key used for writing the Parquet files
Expand Down Expand Up @@ -140,8 +152,22 @@ struct DuckLakeCopyOptions {

struct DuckLakeCopyInput {
explicit DuckLakeCopyInput(ClientContext &context, DuckLakeTableEntry &table, const string &hive_partition = "");
//! CTAS-flavored: take the field-id mapping and (optional) partition spec from the planning-time-built
//! spec rather than from a DuckLakeTableEntry that hasn't been created yet. field_data is required
//! because the parquet writer always needs it; partition_data is null when CREATE TABLE AS has no
//! inline PARTITIONED BY clause. `options_in_create_with` is the raw `WITH (...)` clause from the
//! parser; it is validated and surfaced through the `options_in_create_with` map so the parquet
//! write honors it.
DuckLakeCopyInput(ClientContext &context, DuckLakeSchemaEntry &schema, const ColumnList &columns,
const string &data_path_p);
const string &data_path_p, DuckLakeFieldData &field_data,
optional_ptr<DuckLakePartition> partition_data = nullptr,
const case_insensitive_map_t<unique_ptr<ParsedExpression>> &options_in_create_with = {});

//! Look up an effective DuckLake config option for this write: returns the WITH override if one is
//! present in `options_in_create_with`, otherwise falls through to the catalog's table/schema/global
//! lookup. Used by PlanCopyForInsert so a CTAS write whose new table_id is not yet allocated still
//! picks up the user's WITH (...) values.
bool GetEffectiveOption(const string &key, string &out) const;

DuckLakeCatalog &catalog;
optional_ptr<DuckLakePartition> partition_data;
Expand All @@ -153,6 +179,9 @@ struct DuckLakeCopyInput {
TableIndex table_id;
InsertVirtualColumns virtual_columns = InsertVirtualColumns::NONE;
optional_idx get_table_index;
//! CREATE TABLE / CTAS WITH (...) options that override same-key catalog options for this write.
//! Empty for plain INSERT. Validated at plan time before reaching here.
option_map_t options_in_create_with;
};

} // namespace duckdb
1 change: 1 addition & 0 deletions src/include/storage/ducklake_metadata_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ class DuckLakeMetadataManager {
virtual string WriteNewPartitionKeys(DuckLakeSnapshot commit_snapshot,
const vector<DuckLakePartitionInfo> &new_partitions);
virtual string WriteNewSortKeys(DuckLakeSnapshot commit_snapshot, const vector<DuckLakeSortInfo> &new_sorts);
virtual string WriteOptionsInCreateWith(const vector<DuckLakeConfigOption> &options_in_create_with);
virtual string WriteDroppedColumns(const vector<DuckLakeDroppedColumn> &dropped_columns);
virtual string WriteNewColumns(const vector<DuckLakeNewColumn> &new_columns);
virtual string WriteNewTags(const vector<DuckLakeTagInfo> &new_tags);
Expand Down
9 changes: 9 additions & 0 deletions src/include/storage/ducklake_partition_data.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,21 @@ enum class DuckLakeTransformType { IDENTITY, BUCKET, YEAR, MONTH, DAY, HOUR };
struct DuckLakeTransform {
DuckLakeTransformType type;
idx_t bucket_count = 0; // only for BUCKET

bool operator==(const DuckLakeTransform &other) const {
return type == other.type && bucket_count == other.bucket_count;
}
};

struct DuckLakePartitionField {
idx_t partition_key_index = 0;
FieldIndex field_id;
DuckLakeTransform transform;

bool operator==(const DuckLakePartitionField &other) const {
return partition_key_index == other.partition_key_index && field_id == other.field_id &&
transform == other.transform;
}
};

struct DuckLakePartition {
Expand Down
8 changes: 7 additions & 1 deletion src/include/storage/ducklake_schema_entry.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@

#include "duckdb/catalog/catalog_entry/schema_catalog_entry.hpp"
#include "storage/ducklake_catalog_set.hpp"
#include "storage/ducklake_partition_data.hpp"
#include "storage/ducklake_sort_data.hpp"

namespace duckdb {
class DuckLakeTransaction;
Expand All @@ -32,8 +34,12 @@ class DuckLakeSchemaEntry : public SchemaCatalogEntry {
}

public:
//! Create a DuckLakeTableEntry and register it with the transaction.
//! When prebuilt_partition_data / prebuilt_sort_data are used by CTAS when partitioned or sorted
optional_ptr<CatalogEntry> CreateTableExtended(CatalogTransaction transaction, BoundCreateTableInfo &info,
string table_uuid, string table_data_path);
string table_uuid, string table_data_path,
unique_ptr<DuckLakePartition> prebuilt_partition_data = nullptr,
unique_ptr<DuckLakeSort> prebuilt_sort_data = nullptr);
unique_ptr<CreateInfo> GetInfo() const override;
optional_ptr<CatalogEntry> CreateTable(CatalogTransaction transaction, BoundCreateTableInfo &info) override;
optional_ptr<CatalogEntry> CreateFunction(CatalogTransaction transaction, CreateFunctionInfo &info) override;
Expand Down
22 changes: 20 additions & 2 deletions src/include/storage/ducklake_table_entry.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,13 @@ class DuckLakeTableEntry : public TableCatalogEntry {
optional_ptr<const DuckLakeFieldId> GetFieldId(FieldIndex field_index) const;
void SetPartitionData(unique_ptr<DuckLakePartition> partition_data);
void SetSortData(unique_ptr<DuckLakeSort> sort_data);
//! Table-scoped config options collected from a CREATE TABLE / CTAS WITH (...) clause.
void SetOptionsInCreateWith(vector<DuckLakeTag> options) {
options_in_create_with = std::move(options);
}
const vector<DuckLakeTag> &GetOptionsInCreateWith() const {
return options_in_create_with;
}
shared_ptr<DuckLakeTableStats> GetTableStats(ClientContext &context);
shared_ptr<DuckLakeTableStats> GetTableStats(DuckLakeTransaction &transaction);
idx_t GetNetDataFileRowCount(DuckLakeTransaction &transaction);
Expand Down Expand Up @@ -122,8 +129,17 @@ class DuckLakeTableEntry : public TableCatalogEntry {
virtual_column_map_t GetVirtualColumns() const override;
vector<column_t> GetRowIdColumns() const override;

//! Validates that all column references in sort expressions exist in the table
static void ValidateSortExpressionColumns(DuckLakeTableEntry &table, const vector<OrderByNode> &orders);
//! Validates that all column references in sort expressions exist in the column list.
//! Takes (columns) directly so it can run at CTAS planning time before a DuckLakeTableEntry exists.
static void ValidateSortExpressionColumns(const ColumnList &columns, const vector<OrderByNode> &orders);

//! Build a DuckLakePartition from raw partition expressions (allocates a transaction-local id).
static unique_ptr<DuckLakePartition> BuildPartitionData(DuckLakeTransaction &transaction, const ColumnList &columns,
DuckLakeFieldData &field_data,
const vector<unique_ptr<ParsedExpression>> &partition_keys);
//! Build a DuckLakeSort from a vector of OrderByNode (allocates a transaction-local id).
static unique_ptr<DuckLakeSort> BuildSortData(DuckLakeTransaction &transaction, const ColumnList &columns,
const vector<OrderByNode> &orders);

private:
unique_ptr<CatalogEntry> AlterTable(DuckLakeTransaction &transaction, RenameTableInfo &info);
Expand Down Expand Up @@ -178,6 +194,8 @@ class DuckLakeTableEntry : public TableCatalogEntry {
unique_ptr<DuckLakeSort> sort_data;
// only set for REMOVED_COLUMN
unique_ptr<ColumnChangeInfo> changed_fields;
// table-scoped config options collected from CREATE TABLE / CTAS WITH (...), pending until commit
vector<DuckLakeTag> options_in_create_with;
};

} // namespace duckdb
Loading
Loading