Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 6 additions & 7 deletions datafusion-examples/examples/advanced_parquet_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -491,19 +491,18 @@ impl TableProvider for IndexTableProvider {
.with_file(indexed_file);

let file_source = Arc::new(
ParquetSource::default()
ParquetSource::new(schema.clone())
// provide the predicate so the DataSourceExec can try and prune
// row groups internally
.with_predicate(predicate)
// provide the factory to create parquet reader without re-reading metadata
.with_parquet_file_reader_factory(Arc::new(reader_factory)),
);
let file_scan_config =
FileScanConfigBuilder::new(object_store_url, schema, file_source)
.with_limit(limit)
.with_projection_indices(projection.cloned())
.with_file(partitioned_file)
.build();
let file_scan_config = FileScanConfigBuilder::new(object_store_url, file_source)
.with_limit(limit)
.with_projection_indices(projection.cloned())
.with_file(partitioned_file)
.build();

// Finally, put it all together into a DataSourceExec
Ok(DataSourceExec::from_data_source(file_scan_config))
Expand Down
22 changes: 13 additions & 9 deletions datafusion-examples/examples/csv_json_opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
use std::sync::Arc;

use arrow::datatypes::{DataType, Field, Schema};
use datafusion::common::config::CsvOptions;
use datafusion::{
assert_batches_eq,
datasource::{
Expand All @@ -31,9 +32,7 @@ use datafusion::{
test_util::aggr_test_schema,
};

use datafusion::datasource::{
physical_plan::FileScanConfigBuilder, table_schema::TableSchema,
};
use datafusion::datasource::physical_plan::FileScanConfigBuilder;
use futures::StreamExt;
use object_store::{local::LocalFileSystem, memory::InMemory, ObjectStore};

Expand All @@ -57,19 +56,25 @@ async fn csv_opener() -> Result<()> {

let path = std::path::Path::new(&path).canonicalize()?;

let options = CsvOptions {
has_header: Some(true),
delimiter: b',',
quote: b'"',
..Default::default()
};

let scan_config = FileScanConfigBuilder::new(
ObjectStoreUrl::local_filesystem(),
Arc::clone(&schema),
Arc::new(CsvSource::default()),
Arc::new(CsvSource::new(Arc::clone(&schema)).with_csv_options(options.clone())),
)
.with_projection_indices(Some(vec![12, 0]))
.with_limit(Some(5))
.with_file(PartitionedFile::new(path.display().to_string(), 10))
.build();

let config = CsvSource::new(true, b',', b'"')
let config = CsvSource::new(Arc::clone(&schema))
.with_csv_options(options)
.with_comment(Some(b'#'))
.with_schema(TableSchema::from_file_schema(schema))
.with_batch_size(8192)
.with_projection(&scan_config);

Expand Down Expand Up @@ -125,8 +130,7 @@ async fn json_opener() -> Result<()> {

let scan_config = FileScanConfigBuilder::new(
ObjectStoreUrl::local_filesystem(),
schema,
Arc::new(JsonSource::default()),
Arc::new(JsonSource::new(schema)),
)
.with_projection_indices(Some(vec![1, 0]))
.with_limit(Some(5))
Expand Down
5 changes: 3 additions & 2 deletions datafusion-examples/examples/custom_file_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use datafusion::{
FileFormat, FileFormatFactory,
},
physical_plan::{FileScanConfig, FileSinkConfig, FileSource},
table_schema::TableSchema,
MemTable,
},
error::Result,
Expand Down Expand Up @@ -128,8 +129,8 @@ impl FileFormat for TSVFileFormat {
.await
}

fn file_source(&self) -> Arc<dyn FileSource> {
self.csv_file_format.file_source()
fn file_source(&self, table_schema: TableSchema) -> Arc<dyn FileSource> {
self.csv_file_format.file_source(table_schema)
}
}

Expand Down
3 changes: 1 addition & 2 deletions datafusion-examples/examples/default_column_values.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ impl TableProvider for DefaultValueTableProvider {
&df_schema,
)?;

let parquet_source = ParquetSource::default()
let parquet_source = ParquetSource::new(schema.clone())
.with_predicate(filter)
.with_pushdown_filters(true);

Expand All @@ -257,7 +257,6 @@ impl TableProvider for DefaultValueTableProvider {

let file_scan_config = FileScanConfigBuilder::new(
ObjectStoreUrl::parse("memory://")?,
self.schema.clone(),
Arc::new(parquet_source),
)
.with_projection_indices(projection.cloned())
Expand Down
6 changes: 4 additions & 2 deletions datafusion-examples/examples/parquet_embedded_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -426,8 +426,10 @@ impl TableProvider for DistinctIndexTable {

// Build ParquetSource to actually read the files
let url = ObjectStoreUrl::parse("file://")?;
let source = Arc::new(ParquetSource::default().with_enable_page_index(true));
let mut builder = FileScanConfigBuilder::new(url, self.schema.clone(), source);
let source = Arc::new(
ParquetSource::new(self.schema.clone()).with_enable_page_index(true),
);
let mut builder = FileScanConfigBuilder::new(url, source);
for file in files_to_scan {
let path = self.dir.join(file);
let len = std::fs::metadata(&path)?.len();
Expand Down
5 changes: 3 additions & 2 deletions datafusion-examples/examples/parquet_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,9 +242,10 @@ impl TableProvider for IndexTableProvider {
let files = self.index.get_files(predicate.clone())?;

let object_store_url = ObjectStoreUrl::parse("file://")?;
let source = Arc::new(ParquetSource::default().with_predicate(predicate));
let source =
Arc::new(ParquetSource::new(self.schema()).with_predicate(predicate));
let mut file_scan_config_builder =
FileScanConfigBuilder::new(object_store_url, self.schema(), source)
FileScanConfigBuilder::new(object_store_url, source)
.with_projection_indices(projection.cloned())
.with_limit(limit);

Expand Down
38 changes: 21 additions & 17 deletions datafusion/catalog-listing/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use datafusion_datasource::schema_adapter::{
DefaultSchemaAdapterFactory, SchemaAdapter, SchemaAdapterFactory,
};
use datafusion_datasource::{
compute_all_files_statistics, ListingTableUrl, PartitionedFile,
compute_all_files_statistics, ListingTableUrl, PartitionedFile, TableSchema,
};
use datafusion_execution::cache::cache_manager::FileStatisticsCache;
use datafusion_execution::cache::cache_unit::DefaultFileStatisticsCache;
Expand Down Expand Up @@ -338,7 +338,16 @@ impl ListingTable {
fn create_file_source_with_schema_adapter(
&self,
) -> datafusion_common::Result<Arc<dyn FileSource>> {
let mut source = self.options.format.file_source();
let table_schema = TableSchema::new(
Arc::clone(&self.file_schema),
self.options
.table_partition_cols
.iter()
.map(|(col, field)| Arc::new(Field::new(col, field.clone(), false)))
.collect(),
);

let mut source = self.options.format.file_source(table_schema);
// Apply schema adapter to source if available
//
// The source will use this SchemaAdapter to adapt data batches as they flow up the plan.
Expand Down Expand Up @@ -418,7 +427,7 @@ impl TableProvider for ListingTable {
.options
.table_partition_cols
.iter()
.map(|col| Ok(self.table_schema.field_with_name(&col.0)?.clone()))
.map(|col| Ok(Arc::new(self.table_schema.field_with_name(&col.0)?.clone())))
.collect::<datafusion_common::Result<Vec<_>>>()?;

let table_partition_col_names = table_partition_cols
Expand Down Expand Up @@ -491,20 +500,15 @@ impl TableProvider for ListingTable {
.format
.create_physical_plan(
state,
FileScanConfigBuilder::new(
object_store_url,
Arc::clone(&self.file_schema),
file_source,
)
.with_file_groups(partitioned_file_lists)
.with_constraints(self.constraints.clone())
.with_statistics(statistics)
.with_projection_indices(projection)
.with_limit(limit)
.with_output_ordering(output_ordering)
.with_table_partition_cols(table_partition_cols)
.with_expr_adapter(self.expr_adapter_factory.clone())
.build(),
FileScanConfigBuilder::new(object_store_url, file_source)
.with_file_groups(partitioned_file_lists)
.with_constraints(self.constraints.clone())
.with_statistics(statistics)
.with_projection_indices(projection)
.with_limit(limit)
.with_output_ordering(output_ordering)
.with_expr_adapter(self.expr_adapter_factory.clone())
.build(),
)
.await?;

Expand Down
6 changes: 4 additions & 2 deletions datafusion/core/src/datasource/file_format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ pub(crate) mod test_util {
use datafusion_catalog::Session;
use datafusion_common::Result;
use datafusion_datasource::file_scan_config::FileScanConfigBuilder;
use datafusion_datasource::TableSchema;
use datafusion_datasource::{file_format::FileFormat, PartitionedFile};
use datafusion_execution::object_store::ObjectStoreUrl;
use std::sync::Arc;
Expand All @@ -66,6 +67,8 @@ pub(crate) mod test_util {
.await?
};

let table_schema = TableSchema::new(file_schema.clone(), vec![]);

let statistics = format
.infer_stats(state, &store, file_schema.clone(), &meta)
.await?;
Expand All @@ -85,8 +88,7 @@ pub(crate) mod test_util {
state,
FileScanConfigBuilder::new(
ObjectStoreUrl::local_filesystem(),
file_schema,
format.file_source(),
format.file_source(table_schema),
)
.with_file_groups(file_groups)
.with_statistics(statistics)
Expand Down
13 changes: 5 additions & 8 deletions datafusion/core/src/datasource/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,16 +124,13 @@ mod tests {
let f2 = Field::new("extra_column", DataType::Utf8, true);

let schema = Arc::new(Schema::new(vec![f1.clone(), f2.clone()]));
let source = ParquetSource::default()
let source = ParquetSource::new(Arc::clone(&schema))
.with_schema_adapter_factory(Arc::new(TestSchemaAdapterFactory {}))
.unwrap();
let base_conf = FileScanConfigBuilder::new(
ObjectStoreUrl::local_filesystem(),
schema,
source,
)
.with_file(partitioned_file)
.build();
let base_conf =
FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), source)
.with_file(partitioned_file)
.build();

let parquet_exec = DataSourceExec::from_data_source(base_conf);

Expand Down
29 changes: 14 additions & 15 deletions datafusion/core/src/datasource/physical_plan/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ mod tests {
use datafusion_common::{test_util, Result, ScalarValue};
use datafusion_datasource::file_format::FileFormat;
use datafusion_datasource::file_scan_config::FileScanConfigBuilder;
use datafusion_datasource::PartitionedFile;
use datafusion_datasource::{PartitionedFile, TableSchema};
use datafusion_datasource_avro::source::AvroSource;
use datafusion_datasource_avro::AvroFormat;
use datafusion_execution::object_store::ObjectStoreUrl;
Expand Down Expand Up @@ -81,15 +81,11 @@ mod tests {
.infer_schema(&state, &store, std::slice::from_ref(&meta))
.await?;

let source = Arc::new(AvroSource::new());
let conf = FileScanConfigBuilder::new(
ObjectStoreUrl::local_filesystem(),
file_schema,
source,
)
.with_file(meta.into())
.with_projection_indices(Some(vec![0, 1, 2]))
.build();
let source = Arc::new(AvroSource::new(Arc::clone(&file_schema)));
let conf = FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), source)
.with_file(meta.into())
.with_projection_indices(Some(vec![0, 1, 2]))
.build();

let source_exec = DataSourceExec::from_data_source(conf);
assert_eq!(
Expand Down Expand Up @@ -157,8 +153,8 @@ mod tests {
// Include the missing column in the projection
let projection = Some(vec![0, 1, 2, actual_schema.fields().len()]);

let source = Arc::new(AvroSource::new());
let conf = FileScanConfigBuilder::new(object_store_url, file_schema, source)
let source = Arc::new(AvroSource::new(Arc::clone(&file_schema)));
let conf = FileScanConfigBuilder::new(object_store_url, source)
.with_file(meta.into())
.with_projection_indices(projection)
.build();
Expand Down Expand Up @@ -227,13 +223,16 @@ mod tests {
partitioned_file.partition_values = vec![ScalarValue::from("2021-10-26")];

let projection = Some(vec![0, 1, file_schema.fields().len(), 2]);
let source = Arc::new(AvroSource::new());
let conf = FileScanConfigBuilder::new(object_store_url, file_schema, source)
let table_schema = TableSchema::new(
file_schema.clone(),
vec![Arc::new(Field::new("date", DataType::Utf8, false))],
);
let source = Arc::new(AvroSource::new(table_schema.clone()));
let conf = FileScanConfigBuilder::new(object_store_url, source)
// select specific columns of the files as well as the partitioning
// column which is supposed to be the last column in the table schema.
.with_projection_indices(projection)
.with_file(partitioned_file)
.with_table_partition_cols(vec![Field::new("date", DataType::Utf8, false)])
.build();

let source_exec = DataSourceExec::from_data_source(conf);
Expand Down
Loading