Skip to content
Open
Show file tree
Hide file tree
Changes from 28 commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
0aeb7ea
Update delta-rs to add FileFormatOptions
corwinjoy Sep 27, 2025
c081278
Update optimize to use zstd compression by default. Fix clippy warnings.
corwinjoy Sep 29, 2025
9059d8e
Merge branch 'main' into file_format_options_squashed
corwinjoy Sep 29, 2025
a41d3d3
Merge branch 'main' into file_format_options_squashed
corwinjoy Oct 1, 2025
cd88ef8
Fix build error created by merging main
corwinjoy Oct 1, 2025
e5915e0
Merge branch 'main' into file_format_options_squashed
corwinjoy Oct 2, 2025
56ec968
Update optimize.rs to fix merge conflicts from main
corwinjoy Oct 2, 2025
1b7d6c2
cargo format
corwinjoy Oct 2, 2025
4a89682
Merge branch 'main' into file_format_options_squashed
corwinjoy Oct 2, 2025
56a0cb5
Merge latest from main
corwinjoy Oct 2, 2025
73cccbe
Undo refactorization of optimize routines to make diffs clearer
corwinjoy Oct 7, 2025
5359a9e
Move file_format_options into DeltaTableConfig
corwinjoy Oct 13, 2025
2d29203
Merge branch 'main' into file_format_options_squashed
corwinjoy Oct 13, 2025
b9376bd
Fix build errors from merge with main
corwinjoy Oct 13, 2025
4ae8443
Update comment for with_table_config in CreateBuilder
corwinjoy Oct 14, 2025
c7281de
Refined optimize.rs, cargo fmt, comment on builder
corwinjoy Oct 14, 2025
24fba72
Inline and Remove build_writer_properties_factory_ffo Function.
corwinjoy Oct 29, 2025
d71bda5
Inline and Remove to_table_parquet_options_from_ffo Function.
corwinjoy Oct 29, 2025
2c2a7fd
Remove file_format_options argument from the find_files and find_file…
corwinjoy Oct 29, 2025
a0921c4
cargo fmt
corwinjoy Oct 29, 2025
a8aba72
Work in progress. Begin removal of file_format_options from DeltaScan…
corwinjoy Oct 29, 2025
31569a6
Finish removal of file_format_options from DeltaScanBuilder
corwinjoy Oct 29, 2025
762b6b5
cargo fmt
corwinjoy Oct 29, 2025
9d48c94
Remove dead code flagged by clippy
corwinjoy Oct 30, 2025
6913ec8
Merge branch 'main' into file_format_options_squashed
corwinjoy Oct 30, 2025
46097ee
Fix errors introduced by merge from main.
corwinjoy Oct 30, 2025
9495f13
Cargo clippy + fmt
corwinjoy Oct 30, 2025
020d409
Remove extension to create WriterPropertiesBuilder from WriterPropert…
corwinjoy Oct 30, 2025
2d14fbf
Change try_with_config to not be async to match main.
corwinjoy Nov 3, 2025
cd21447
Remove build_writer_properties_factory_wp and convert to trait.
corwinjoy Nov 3, 2025
ac71c65
Convert build_writer_properties_factory_or_default_ffo to trait.
corwinjoy Nov 3, 2025
8047afe
cargo fmt
corwinjoy Nov 3, 2025
a036d7f
Update parquet-key-management to version 0.4.1 to avoid build depende…
corwinjoy Nov 3, 2025
fe4a8ea
Move KMS class from file_format_options.rs to new file kms_encryption.rs
corwinjoy Nov 4, 2025
fd9e6dd
Use integration-test just for the encryption example.
corwinjoy Nov 4, 2025
ada7ab7
Require datafusion for the kms_encryption module.
corwinjoy Nov 5, 2025
9a304ca
Move TableEncryption into kms_encryption.rs
corwinjoy Nov 5, 2025
c5577a6
Use async kms
jgiannuzzi Nov 5, 2025
e44dbf8
Merge pull request #21 from corwinjoy/move_kms
corwinjoy Nov 5, 2025
af6ff45
Merge branch 'main' into file_format_options_squashed
corwinjoy Nov 5, 2025
8d86757
Add documentation to top of kms_encryption.rs
corwinjoy Nov 5, 2025
c071ab0
Remove whitespace changes in crates/deltalake/Cargo.toml
corwinjoy Nov 6, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ object_store = { version = "0.12.1" }
parquet = { version = "56.2" }

# datafusion
datafusion = "50.2"
datafusion = { version = "50.2", features = ["default", "parquet_encryption"] }
datafusion-ffi = "50.2"
datafusion-proto = "50.2"

Expand All @@ -66,7 +66,7 @@ tempfile = { version = "3" }
uuid = { version = "1" }

# runtime / async
async-trait = { version = "0.1" }
async-trait = { version = "0.1.89" }
futures = { version = "0.3" }
tokio = { version = "1" }

Expand Down
8 changes: 7 additions & 1 deletion crates/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ arrow-ord = { workspace = true }
arrow-row = { workspace = true }
arrow-schema = { workspace = true, features = ["serde"] }
arrow-select = { workspace = true }
parquet = { workspace = true, features = ["async", "object_store"] }
parquet = { workspace = true, features = ["async", "object_store", "encryption"] }
object_store = { workspace = true }

# datafusion
Expand Down Expand Up @@ -94,6 +94,8 @@ datatest-stable = "0.3"
deltalake-test = { path = "../test" }
dotenvy = "0"
fs_extra = "1.2.0"
parquet-key-management = { version = "0.4.0", features = ["_test_utils", "datafusion"] }
paste = "1"
pretty_assertions = "1.2.1"
pretty_env_logger = "0.5.0"
rstest = { version = "0.26.1" }
Expand Down Expand Up @@ -145,6 +147,10 @@ required-features = ["datafusion"]
name = "command_vacuum"
required-features = ["datafusion"]

[[test]]
name = "commands_with_encryption"
required-features = ["datafusion"]

[[test]]
name = "commit_info_format"
required-features = ["datafusion"]
Expand Down
43 changes: 39 additions & 4 deletions crates/core/src/delta_datafusion/table_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -343,11 +343,18 @@ impl DeltaScanConfigBuilder {
None
};

let table_parquet_options = snapshot
.load_config()
.file_format_options
.as_ref()
.map(|ffo| ffo.table_options().parquet);

Ok(DeltaScanConfig {
file_column_name,
wrap_partition_values: self.wrap_partition_values.unwrap_or(true),
enable_parquet_pushdown: self.enable_parquet_pushdown,
schema: self.schema.clone(),
table_parquet_options,
})
}
}
Expand All @@ -363,6 +370,9 @@ pub struct DeltaScanConfig {
pub enable_parquet_pushdown: bool,
/// Schema to read as
pub schema: Option<SchemaRef>,
/// Options that control how Parquet files are read
#[serde(skip)]
pub table_parquet_options: Option<TableParquetOptions>,
}

pub(crate) struct DeltaScanBuilder<'a> {
Expand Down Expand Up @@ -644,13 +654,31 @@ impl<'a> DeltaScanBuilder<'a> {

let stats = stats.unwrap_or(Statistics::new_unknown(&schema));

let parquet_options = TableParquetOptions {
global: self.session.config().options().execution.parquet.clone(),
..Default::default()
};
let parquet_options: TableParquetOptions = config
.table_parquet_options
.clone()
.unwrap_or_else(|| self.session.table_options().parquet.clone());

// We have to set the encryption factory on the ParquetSource based on the Parquet options,
// as this is usually handled by the ParquetFormat type in DataFusion,
// which is not used in delta-rs.
let encryption_factory = parquet_options
.crypto
.factory_id
.as_ref()
.map(|factory_id| {
self.session
.runtime_env()
.parquet_encryption_factory(factory_id)
})
.transpose()?;

let mut file_source = ParquetSource::new(parquet_options);

if let Some(encryption_factory) = encryption_factory {
file_source = file_source.with_encryption_factory(encryption_factory);
}

// Sometimes (i.e Merge) we want to prune files that don't make the
// filter and read the entire contents for files that do match the
// filter
Expand Down Expand Up @@ -731,6 +759,9 @@ impl TableProvider for DeltaTable {
limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
register_store(self.log_store(), session.runtime_env().as_ref());
if let Some(format_options) = &self.config.file_format_options {
format_options.update_session(session)?;
}
let filter_expr = conjunction(filters.iter().cloned());

let scan = DeltaScanBuilder::new(self.snapshot()?.snapshot(), self.log_store(), session)
Expand Down Expand Up @@ -819,6 +850,10 @@ impl TableProvider for DeltaTableProvider {
limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
register_store(self.log_store.clone(), session.runtime_env().as_ref());
if let Some(format_options) = &self.snapshot.load_config().file_format_options {
format_options.update_session(session)?;
}

let filter_expr = conjunction(filters.iter().cloned());

let mut scan = DeltaScanBuilder::new(&self.snapshot, self.log_store.clone(), session)
Expand Down
3 changes: 1 addition & 2 deletions crates/core/src/operations/add_column.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
//! Add a new column to a table

use std::sync::Arc;

use delta_kernel::schema::StructType;
use futures::future::BoxFuture;
use itertools::Itertools;
use std::sync::Arc;

use super::{CustomExecuteHandler, Operation};
use crate::kernel::schema::merge_delta_struct;
Expand Down
18 changes: 13 additions & 5 deletions crates/core/src/operations/create.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
//! Command for creating a new delta table
// https://github.com/delta-io/delta/blob/master/core/src/main/scala/org/apache/spark/sql/delta/commands/CreateDeltaTableCommand.scala

use std::collections::HashMap;
use std::sync::Arc;

use delta_kernel::schema::MetadataValue;
use futures::future::BoxFuture;
use serde_json::Value;
use std::collections::HashMap;
use std::sync::Arc;
use tracing::log::*;
use uuid::Uuid;

Expand All @@ -21,7 +20,7 @@ use crate::logstore::LogStoreRef;
use crate::protocol::{DeltaOperation, SaveMode};
use crate::table::builder::ensure_table_uri;
use crate::table::config::TableProperty;
use crate::{DeltaTable, DeltaTableBuilder};
use crate::{DeltaTable, DeltaTableBuilder, DeltaTableConfig};

#[derive(thiserror::Error, Debug)]
enum CreateError {
Expand Down Expand Up @@ -61,6 +60,7 @@ pub struct CreateBuilder {
storage_options: Option<HashMap<String, String>>,
actions: Vec<Action>,
log_store: Option<LogStoreRef>,
table_config: DeltaTableConfig,
configuration: HashMap<String, Option<String>>,
/// Additional information to add to the commit
commit_properties: CommitProperties,
Expand Down Expand Up @@ -98,6 +98,7 @@ impl CreateBuilder {
storage_options: None,
actions: Default::default(),
log_store: None,
table_config: DeltaTableConfig::default(),
configuration: Default::default(),
commit_properties: CommitProperties::default(),
raise_if_key_not_exists: true,
Expand Down Expand Up @@ -238,6 +239,12 @@ impl CreateBuilder {
self
}

/// Set configuration options for the table
pub fn with_table_config(mut self, table_config: DeltaTableConfig) -> Self {
self.table_config = table_config;
self
}

/// Set a custom execute handler, for pre and post execution
pub fn with_custom_execute_handler(mut self, handler: Arc<dyn CustomExecuteHandler>) -> Self {
self.custom_execute_handler = Some(handler);
Expand All @@ -262,14 +269,15 @@ impl CreateBuilder {
let (storage_url, table) = if let Some(log_store) = self.log_store {
(
ensure_table_uri(log_store.root_uri())?.as_str().to_string(),
DeltaTable::new(log_store, Default::default()),
DeltaTable::new(log_store, self.table_config.clone()),
)
} else {
let storage_url =
ensure_table_uri(self.location.clone().ok_or(CreateError::MissingLocation)?)?;
(
storage_url.as_str().to_string(),
DeltaTableBuilder::from_uri(storage_url)?
.with_table_config(self.table_config.clone())
.with_storage_options(self.storage_options.clone().unwrap_or_default())
.build()?,
)
Expand Down
55 changes: 41 additions & 14 deletions crates/core/src/operations/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

use async_trait::async_trait;
use datafusion::catalog::Session;
use datafusion::common::ScalarValue;
use datafusion::common::{exec_datafusion_err, ScalarValue};
use datafusion::dataframe::DataFrame;
use datafusion::datasource::provider_as_source;
use datafusion::error::Result as DataFusionResult;
Expand Down Expand Up @@ -59,6 +59,9 @@ use crate::operations::write::WriterStatsConfig;
use crate::operations::CustomExecuteHandler;
use crate::protocol::DeltaOperation;
use crate::table::config::TablePropertiesExt as _;
use crate::table::file_format_options::{
build_writer_properties_factory_wp, state_with_file_format_options, WriterPropertiesFactoryRef,
};
use crate::table::state::DeltaTableState;
use crate::{DeltaTable, DeltaTableError};

Expand All @@ -78,7 +81,7 @@ pub struct DeleteBuilder {
/// Datafusion session state relevant for executing the input plan
session: Option<Arc<dyn Session>>,
/// Properties passed to underlying parquet writer for when files are rewritten
writer_properties: Option<WriterProperties>,
writer_properties_factory: Option<WriterPropertiesFactoryRef>,
/// Commit properties and configuration
commit_properties: CommitProperties,
custom_execute_handler: Option<Arc<dyn CustomExecuteHandler>>,
Expand Down Expand Up @@ -126,13 +129,22 @@ impl super::Operation for DeleteBuilder {
impl DeleteBuilder {
/// Create a new [`DeleteBuilder`]
pub(crate) fn new(log_store: LogStoreRef, snapshot: Option<EagerSnapshot>) -> Self {
let file_format_options = snapshot
.as_ref()
.map(|ss| ss.load_config().file_format_options.clone());
let writer_properties_factory = match file_format_options {
Some(file_format_options) => file_format_options
.clone()
.map(|ffo| ffo.writer_properties_factory()),
None => None,
};
Self {
predicate: None,
snapshot,
log_store,
session: None,
commit_properties: CommitProperties::default(),
writer_properties: None,
writer_properties_factory,
custom_execute_handler: None,
}
}
Expand All @@ -157,7 +169,8 @@ impl DeleteBuilder {

/// Writer properties passed to parquet writer for when files are rewritten
pub fn with_writer_properties(mut self, writer_properties: WriterProperties) -> Self {
self.writer_properties = Some(writer_properties);
let writer_properties_factory = build_writer_properties_factory_wp(writer_properties);
self.writer_properties_factory = Some(writer_properties_factory);
self
}

Expand Down Expand Up @@ -189,6 +202,20 @@ impl std::future::IntoFuture for DeleteBuilder {

register_store(this.log_store.clone(), session.runtime_env().as_ref());

let file_format_options = &snapshot.load_config().file_format_options;
let session_state =
session
.as_any()
.downcast_ref::<SessionState>()
.ok_or_else(|| {
exec_datafusion_err!("Failed to downcast Session to SessionState")
})?;

let session = Arc::new(state_with_file_format_options(
session_state.clone(),
file_format_options.as_ref(),
)?);

let predicate = match this.predicate {
Some(predicate) => match predicate {
Expression::DataFusion(expr) => Some(expr),
Expand All @@ -203,8 +230,8 @@ impl std::future::IntoFuture for DeleteBuilder {
predicate,
this.log_store.clone(),
snapshot,
session.as_ref(),
this.writer_properties,
session.as_ref().clone(),
this.writer_properties_factory,
this.commit_properties,
operation_id,
this.custom_execute_handler.as_ref(),
Expand Down Expand Up @@ -268,7 +295,7 @@ async fn execute_non_empty_expr(
expression: &Expr,
rewrite: &[Add],
metrics: &mut DeleteMetrics,
writer_properties: Option<WriterProperties>,
writer_properties_factory: Option<WriterPropertiesFactoryRef>,
partition_scan: bool,
operation_id: Uuid,
) -> DeltaResult<Vec<Action>> {
Expand Down Expand Up @@ -322,7 +349,7 @@ async fn execute_non_empty_expr(
log_store.object_store(Some(operation_id)),
Some(snapshot.table_properties().target_file_size().get() as usize),
None,
writer_properties.clone(),
writer_properties_factory.clone(),
writer_stats_config.clone(),
)
.await?;
Expand Down Expand Up @@ -359,7 +386,7 @@ async fn execute_non_empty_expr(
log_store.object_store(Some(operation_id)),
Some(snapshot.table_properties().target_file_size().get() as usize),
None,
writer_properties,
writer_properties_factory,
writer_stats_config,
)
.await?;
Expand All @@ -375,8 +402,8 @@ async fn execute(
predicate: Option<Expr>,
log_store: LogStoreRef,
snapshot: EagerSnapshot,
session: &dyn Session,
writer_properties: Option<WriterProperties>,
session: SessionState,
writer_properties_factory: Option<WriterPropertiesFactoryRef>,
mut commit_properties: CommitProperties,
operation_id: Uuid,
handle: Option<&Arc<dyn CustomExecuteHandler>>,
Expand All @@ -389,7 +416,7 @@ async fn execute(
let mut metrics = DeleteMetrics::default();

let scan_start = Instant::now();
let candidates = find_files(&snapshot, log_store.clone(), session, predicate.clone()).await?;
let candidates = find_files(&snapshot, log_store.clone(), &session, predicate.clone()).await?;
metrics.scan_time_ms = Instant::now().duration_since(scan_start).as_millis() as u64;

let predicate = predicate.unwrap_or(lit(true));
Expand All @@ -399,11 +426,11 @@ async fn execute(
let add = execute_non_empty_expr(
&snapshot,
log_store.clone(),
session,
&session,
&predicate,
&candidates.candidates,
&mut metrics,
writer_properties,
writer_properties_factory.clone(),
candidates.partition_scan,
operation_id,
)
Expand Down
Loading
Loading