diff --git a/rust/sedona-geoparquet/src/file_opener.rs b/rust/sedona-geoparquet/src/file_opener.rs index d634b92aa..fd2948590 100644 --- a/rust/sedona-geoparquet/src/file_opener.rs +++ b/rust/sedona-geoparquet/src/file_opener.rs @@ -23,6 +23,7 @@ use datafusion::datasource::{ }; use datafusion_common::Result; use datafusion_datasource_parquet::metadata::DFParquetMetadata; +use datafusion_execution::cache::cache_manager::FileMetadataCache; use datafusion_physical_expr::PhysicalExpr; use datafusion_physical_plan::metrics::{Count, ExecutionPlanMetricsSet, MetricBuilder}; use object_store::ObjectStore; @@ -86,6 +87,7 @@ pub struct GeoParquetFileOpener { file_schema: SchemaRef, enable_pruning: bool, metrics: GeoParquetFileOpenerMetrics, + file_metadata_cache: Option>, } impl GeoParquetFileOpener { @@ -98,6 +100,7 @@ impl GeoParquetFileOpener { file_schema: SchemaRef, enable_pruning: bool, execution_plan_global_metrics: &ExecutionPlanMetricsSet, + file_metadata_cache: Option>, ) -> Self { Self { inner, @@ -107,6 +110,7 @@ impl GeoParquetFileOpener { file_schema, enable_pruning, metrics: GeoParquetFileOpenerMetrics::new(execution_plan_global_metrics), + file_metadata_cache, } } } @@ -116,9 +120,11 @@ impl FileOpener for GeoParquetFileOpener { let self_clone = self.clone(); Ok(Box::pin(async move { + let file_metadata_cache = self_clone.file_metadata_cache.or(None); let parquet_metadata = DFParquetMetadata::new(&self_clone.object_store, &file_meta.object_meta) .with_metadata_size_hint(self_clone.metadata_size_hint) + .with_file_metadata_cache(file_metadata_cache) .fetch_metadata() .await?; diff --git a/rust/sedona-geoparquet/src/format.rs b/rust/sedona-geoparquet/src/format.rs index e20adb1cc..689751063 100644 --- a/rust/sedona-geoparquet/src/format.rs +++ b/rust/sedona-geoparquet/src/format.rs @@ -35,6 +35,7 @@ use datafusion::{ use datafusion_catalog::{memory::DataSourceExec, Session}; use datafusion_common::{plan_err, GetExt, Result, Statistics}; use datafusion_datasource_parquet::metadata::DFParquetMetadata; +use datafusion_execution::cache::cache_manager::FileMetadataCache; use datafusion_physical_expr::{LexRequirement, PhysicalExpr}; use datafusion_physical_plan::{ filter_pushdown::FilterPushdownPropagation, metrics::ExecutionPlanMetricsSet, ExecutionPlan, @@ -189,11 +190,16 @@ impl FileFormat for GeoParquetFormat { // copy more ParquetFormat code. It may be that caching at the object // store level is the way to go here. let metadatas: Vec<_> = futures::stream::iter(objects) - .map(|object| async move { - DFParquetMetadata::new(store.as_ref(), object) - .with_metadata_size_hint(self.inner().metadata_size_hint()) - .fetch_metadata() - .await + .map(|object| { + let file_metadata_cache = + state.runtime_env().cache_manager.get_file_metadata_cache(); + async move { + DFParquetMetadata::new(store.as_ref(), object) + .with_metadata_size_hint(self.inner().metadata_size_hint()) + .with_file_metadata_cache(Some(file_metadata_cache)) + .fetch_metadata() + .await + } }) .boxed() // Workaround https://github.com/rust-lang/rust/issues/64552 .buffered(state.config_options().execution.meta_fetch_concurrency) @@ -270,7 +276,7 @@ impl FileFormat for GeoParquetFormat { async fn create_physical_plan( &self, - _state: &dyn Session, + state: &dyn Session, config: FileScanConfig, ) -> Result> { // A copy of ParquetSource::create_physical_plan() that ensures the underlying @@ -287,6 +293,9 @@ impl FileFormat for GeoParquetFormat { source = source.with_metadata_size_hint(metadata_size_hint) } + source = source + .with_file_metadata_cache(state.runtime_env().cache_manager.get_file_metadata_cache()); + let conf = FileScanConfigBuilder::from(config) .with_source(Arc::new(source)) .build(); @@ -327,6 +336,8 @@ pub struct GeoParquetFileSource { inner: ParquetSource, metadata_size_hint: Option, predicate: Option>, + // pub(crate) parquet_file_reader_factory: Option>, + file_metadata_cache: Option>, } impl GeoParquetFileSource { @@ -336,6 +347,7 @@ impl GeoParquetFileSource { inner: ParquetSource::new(options.inner.clone()), metadata_size_hint: None, predicate: None, + file_metadata_cache: None, } } @@ -383,6 +395,7 @@ impl GeoParquetFileSource { inner: parquet_source.clone(), metadata_size_hint, predicate: new_predicate, + file_metadata_cache: None, }) } else { sedona_internal_err!("GeoParquetFileSource constructed from non-ParquetSource") @@ -395,9 +408,18 @@ impl GeoParquetFileSource { inner: self.inner.with_predicate(predicate.clone()), metadata_size_hint: self.metadata_size_hint, predicate: Some(predicate), + file_metadata_cache: self.file_metadata_cache.clone(), } } + pub fn with_file_metadata_cache( + mut self, + file_metadata_cache: Arc, + ) -> Self { + self.file_metadata_cache = Some(file_metadata_cache); + self + } + /// Apply a [SchemaAdapterFactory] to the inner [ParquetSource] pub fn with_schema_adapter_factory( &self, @@ -419,6 +441,7 @@ impl GeoParquetFileSource { inner: parquet_source, metadata_size_hint: self.metadata_size_hint, predicate: self.predicate.clone(), + file_metadata_cache: self.file_metadata_cache.clone(), } } @@ -428,6 +451,7 @@ impl GeoParquetFileSource { inner: self.inner.clone().with_metadata_size_hint(hint), metadata_size_hint: Some(hint), predicate: self.predicate.clone(), + file_metadata_cache: self.file_metadata_cache.clone(), } } } @@ -458,6 +482,7 @@ impl FileSource for GeoParquetFileSource { // HACK: Since there is no public API to set inner's metrics, so we use // inner's metrics as the ExecutionPlan-global metrics self.inner.metrics(), + self.file_metadata_cache.clone(), )) } @@ -469,11 +494,14 @@ impl FileSource for GeoParquetFileSource { let inner_result = self.inner.try_pushdown_filters(filters.clone(), config)?; match &inner_result.updated_node { Some(updated_node) => { - let updated_inner = Self::try_from_file_source( + let mut updated_inner = Self::try_from_file_source( updated_node.clone(), self.metadata_size_hint, None, )?; + if let Some(file_metadata_cache) = self.file_metadata_cache.clone() { + updated_inner = updated_inner.with_file_metadata_cache(file_metadata_cache); + } Ok(inner_result.with_updated_node(Arc::new(updated_inner))) } None => Ok(inner_result),