Skip to content
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
3e6a570
Infer stream ipc format for arrow data sources
corasaurus-hex Nov 2, 2025
0ad62ed
Allow FileOpener for ArrowSource to open both IPC formats
corasaurus-hex Nov 2, 2025
34ccba4
Split reading file vs stream because repartitioning + ranges
corasaurus-hex Nov 3, 2025
99ebe62
Fix rewind bug
corasaurus-hex Nov 3, 2025
936b2e3
Remove a comment that isn't needed anymore
corasaurus-hex Nov 3, 2025
a8bc19d
Stray reference left over from Rename Symbol fail
corasaurus-hex Nov 3, 2025
93d26b1
Merge branch 'main' into cs--register-arrow-ipc-stream-format-files
corasaurus-hex Nov 3, 2025
21320cf
Merge branch 'main' into cs--register-arrow-ipc-stream-format-files
corasaurus-hex Nov 4, 2025
3c00395
Address clippy error
corasaurus-hex Nov 4, 2025
917c6c3
Address additional clippy errors
corasaurus-hex Nov 4, 2025
0f5642a
Merge branch 'main' into cs--register-arrow-ipc-stream-format-files
corasaurus-hex Nov 4, 2025
8941014
Merge branch 'main' into cs--register-arrow-ipc-stream-format-files
corasaurus-hex Nov 4, 2025
ffeca09
Merge branch 'main' into cs--register-arrow-ipc-stream-format-files
corasaurus-hex Nov 4, 2025
07593b4
Pull out the stream format check into an independent function
corasaurus-hex Nov 6, 2025
0446c32
Refactor schema inference
corasaurus-hex Nov 7, 2025
7409462
Let's move the `into()` outside the parens
corasaurus-hex Nov 7, 2025
3f72b0c
Err, no, on the inside
corasaurus-hex Nov 7, 2025
9e63fc7
Merge branch 'main' into cs--register-arrow-ipc-stream-format-files
corasaurus-hex Nov 7, 2025
c6d4a06
Also include a test for arrow stream source
corasaurus-hex Nov 7, 2025
4e28ef9
Add a bunch more tests
corasaurus-hex Nov 7, 2025
d6c77f8
Merge branch 'main' into cs--register-arrow-ipc-stream-format-files
corasaurus-hex Nov 7, 2025
cd4f45b
Merge branch 'main' into cs--register-arrow-ipc-stream-format-files
corasaurus-hex Nov 8, 2025
d88f5c0
Merge branch 'main' into cs--register-arrow-ipc-stream-format-files
corasaurus-hex Nov 8, 2025
afb17eb
Merge branch 'main' into cs--register-arrow-ipc-stream-format-files
corasaurus-hex Nov 8, 2025
589a1d2
Document rename of `ArrowSource` and addition of `ArrowStreamSource`
corasaurus-hex Nov 9, 2025
0ac2f6d
Rename ArrowStreamSource to ArrowStreamFileSource
corasaurus-hex Nov 9, 2025
d0fb260
Merge branch 'main' into cs--register-arrow-ipc-stream-format-files
corasaurus-hex Nov 9, 2025
c8e8bd1
Keep same public interface but switch between formats
corasaurus-hex Nov 9, 2025
6f01b80
whitespace fixes
corasaurus-hex Nov 9, 2025
6b5ba4b
Remove note from upgrade guide
corasaurus-hex Nov 9, 2025
2e857c7
Improve the docs a little
corasaurus-hex Nov 9, 2025
c67c881
Add a note about the confusing naming
corasaurus-hex Nov 9, 2025
724444f
Merge branch 'main' into cs--register-arrow-ipc-stream-format-files-w…
corasaurus-hex Nov 9, 2025
882e9fa
Stray line in the upgrade doc
corasaurus-hex Nov 9, 2025
9144d9e
Add initializers and fill out shared interface
corasaurus-hex Nov 9, 2025
d5122f0
Formatting
corasaurus-hex Nov 9, 2025
10cc7d7
Merge branch 'main' into cs--register-arrow-ipc-stream-format-files-w…
corasaurus-hex Nov 9, 2025
5f064ff
Merge branch 'main' into cs--register-arrow-ipc-stream-format-files
corasaurus-hex Nov 9, 2025
68d286a
Merge branch 'main' into cs--register-arrow-ipc-stream-format-files-w…
corasaurus-hex Nov 9, 2025
8b314a5
Type error and linting fix
corasaurus-hex Nov 9, 2025
20c8377
Docs
corasaurus-hex Nov 9, 2025
da81767
Merge branch 'cs--register-arrow-ipc-stream-format-files-wrapped' int…
corasaurus-hex Nov 9, 2025
2ddd690
Update docs for clarity
corasaurus-hex Nov 9, 2025
cf08a7d
Add more comments describing the format so that it's easier to
corasaurus-hex Nov 10, 2025
057e0de
Test querying from empty stream format file
corasaurus-hex Nov 10, 2025
4c30eb9
Add tests for corrupted stream format and empty stream
corasaurus-hex Nov 10, 2025
b2041d8
ArrowStreamOpener -> ArrowStreamFileOpener in error text
corasaurus-hex Nov 10, 2025
3718797
Make `inner` public only within the crate
corasaurus-hex Nov 10, 2025
1724b3b
Combine errors for file and stream formats
corasaurus-hex Nov 10, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion datafusion/core/src/datasource/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ pub use datafusion_datasource_parquet::{ParquetFileMetrics, ParquetFileReaderFac

pub use json::{JsonOpener, JsonSource};

pub use arrow::{ArrowOpener, ArrowSource};
pub use arrow::{ArrowFileOpener, ArrowFileSource, ArrowStreamOpener, ArrowStreamSource};
pub use csv::{CsvOpener, CsvSource};
pub use datafusion_datasource::file::FileSource;
pub use datafusion_datasource::file_groups::FileGroup;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use bytes::{BufMut, BytesMut};
use datafusion::common::Result;
use datafusion::datasource::listing::PartitionedFile;
use datafusion::datasource::physical_plan::{
ArrowSource, CsvSource, FileSource, JsonSource, ParquetSource,
ArrowFileSource, CsvSource, FileSource, JsonSource, ParquetSource,
};
use datafusion::physical_plan::ExecutionPlan;
use datafusion::prelude::SessionContext;
Expand Down Expand Up @@ -282,9 +282,9 @@ async fn test_multi_source_schema_adapter_reuse() -> Result<()> {
// Create a test factory
let factory = Arc::new(UppercaseAdapterFactory {});

// Test ArrowSource
// Test ArrowFileSource
{
let source = ArrowSource::default();
let source = ArrowFileSource::default();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this needs to be added to the migration guide docs/source/library-user-guide/upgrading.md. We're renaming a public facing struct so this is a breaking change, if minor.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would this go under the 51.0.0 upgrade section?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated the upgrade document.

let source_with_adapter = source
.clone()
.with_schema_adapter_factory(factory.clone())
Expand Down
266 changes: 167 additions & 99 deletions datafusion/datasource-arrow/src/file_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,13 @@ use std::any::Any;
use std::borrow::Cow;
use std::collections::HashMap;
use std::fmt::{self, Debug};
use std::io::{Seek, SeekFrom};
use std::sync::Arc;

use arrow::datatypes::{Schema, SchemaRef};
use arrow::error::ArrowError;
use arrow::ipc::convert::fb_to_schema;
use arrow::ipc::reader::FileReader;
use arrow::ipc::reader::{FileReader, StreamReader};
use arrow::ipc::writer::IpcWriteOptions;
use arrow::ipc::{root_as_message, CompressionType};
use datafusion_common::error::Result;
Expand All @@ -49,7 +50,7 @@ use datafusion_execution::{SendableRecordBatchStream, TaskContext};
use datafusion_expr::dml::InsertOp;
use datafusion_physical_expr_common::sort_expr::LexRequirement;

use crate::source::ArrowSource;
use crate::source::{ArrowFileSource, ArrowStreamSource};
use async_trait::async_trait;
use bytes::Bytes;
use datafusion_datasource::file_compression_type::FileCompressionType;
Expand All @@ -61,7 +62,7 @@ use datafusion_physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan};
use datafusion_session::Session;
use futures::stream::BoxStream;
use futures::StreamExt;
use object_store::{GetResultPayload, ObjectMeta, ObjectStore};
use object_store::{GetOptions, GetRange, GetResultPayload, ObjectMeta, ObjectStore};
use tokio::io::AsyncWriteExt;

/// Initial writing buffer size. Note this is just a size hint for efficiency. It
Expand Down Expand Up @@ -150,12 +151,18 @@ impl FileFormat for ArrowFormat {
let schema = match r.payload {
#[cfg(not(target_arch = "wasm32"))]
GetResultPayload::File(mut file, _) => {
let reader = FileReader::try_new(&mut file, None)?;
reader.schema()
}
GetResultPayload::Stream(stream) => {
infer_schema_from_file_stream(stream).await?
match FileReader::try_new(&mut file, None) {
Ok(reader) => reader.schema(),
Err(_) => {
// not in the file format, but FileReader read some bytes
// while trying to parse the file and so we need to rewind
// it to the beginning of the file
file.seek(SeekFrom::Start(0))?;
StreamReader::try_new(&mut file, None)?.schema()
}
}
}
GetResultPayload::Stream(stream) => infer_ipc_schema(stream).await?,
};
schemas.push(schema.as_ref().clone());
}
Expand All @@ -175,10 +182,39 @@ impl FileFormat for ArrowFormat {

async fn create_physical_plan(
&self,
_state: &dyn Session,
state: &dyn Session,
conf: FileScanConfig,
) -> Result<Arc<dyn ExecutionPlan>> {
let source = Arc::new(ArrowSource::default());
let is_stream_format = if let Some(first_group) = conf.file_groups.first() {
if let Some(first_file) = first_group.files().first() {
let object_store =
state.runtime_env().object_store(&conf.object_store_url)?;

let get_opts = GetOptions {
range: Some(GetRange::Bounded(0..6)),
..Default::default()
};
let result = object_store
.get_opts(&first_file.object_meta.location, get_opts)
.await?;
let bytes = result.bytes().await?;

// assume stream format if the file is too short
// or the file does not start with the magic number
bytes.len() < 6 || bytes[0..6] != ARROW_MAGIC
} else {
false // no files, default to file format
}
} else {
false // no file groups, default to file format
};

let source: Arc<dyn FileSource> = if is_stream_format {
Arc::new(ArrowStreamSource::default())
} else {
Arc::new(ArrowFileSource::default())
};

let config = FileScanConfigBuilder::from(conf)
.with_source(source)
.build();
Expand All @@ -203,7 +239,9 @@ impl FileFormat for ArrowFormat {
}

fn file_source(&self) -> Arc<dyn FileSource> {
Arc::new(ArrowSource::default())
// defaulting to the file format source since it's
// more capable in general
Arc::new(ArrowFileSource::default())
}
}

Expand Down Expand Up @@ -344,40 +382,68 @@ impl DataSink for ArrowFileSink {
}
}

// Custom implementation of inferring schema. Should eventually be moved upstream to arrow-rs.
// See <https://github.com/apache/arrow-rs/issues/5021>

const ARROW_MAGIC: [u8; 6] = [b'A', b'R', b'R', b'O', b'W', b'1'];
const CONTINUATION_MARKER: [u8; 4] = [0xff; 4];

/// Custom implementation of inferring schema. Should eventually be moved upstream to arrow-rs.
/// See <https://github.com/apache/arrow-rs/issues/5021>
async fn infer_schema_from_file_stream(
async fn infer_ipc_schema(
mut stream: BoxStream<'static, object_store::Result<Bytes>>,
) -> Result<SchemaRef> {
// Expected format:
// <magic number "ARROW1"> - 6 bytes
// <empty padding bytes [to 8 byte boundary]> - 2 bytes
// <continuation: 0xFFFFFFFF> - 4 bytes, not present below v0.15.0
// <metadata_size: int32> - 4 bytes
// <metadata_flatbuffer: bytes>
// <rest of file bytes>

// So in first read we need at least all known sized sections,
// which is 6 + 2 + 4 + 4 = 16 bytes.
// Expected IPC format is either:
//
// stream:
// <continuation: 0xFFFFFFFF> - 4 bytes (added in v0.15.0+)
// <metadata_size: int32> - 4 bytes
// <metadata_flatbuffer: bytes>
// <rest of file bytes>
//
// file:
// <magic number "ARROW1"> - 6 bytes
// <empty padding bytes [to 8 byte boundary]> - 2 bytes
// <stream format above>

// Perform the initial read such that we always have the metadata size
let bytes = collect_at_least_n_bytes(&mut stream, 16, None).await?;

// Files should start with these magic bytes
if bytes[0..6] != ARROW_MAGIC {
return Err(ArrowError::ParseError(
"Arrow file does not contain correct header".to_string(),
))?;
}

// Since continuation marker bytes added in later versions
let (meta_len, rest_of_bytes_start_index) = if bytes[8..12] == CONTINUATION_MARKER {
(&bytes[12..16], 16)
// The preamble size is everything before the metadata size
let preamble_size = if bytes[0..6] == ARROW_MAGIC {
// File format starts with magic number "ARROW1"
if bytes[8..12] == CONTINUATION_MARKER {
// Continuation marker was added in v0.15.0
12
} else {
// File format before v0.15.0
8
}
} else if bytes[0..4] == CONTINUATION_MARKER {
// Stream format after v0.15.0 starts with continuation marker
4
} else {
(&bytes[8..12], 12)
// Stream format before v0.15.0 does not have a preamble
0
};

infer_ipc_schema_ignoring_preamble_bytes(bytes, preamble_size, stream).await
}

async fn infer_ipc_schema_ignoring_preamble_bytes(
bytes: Vec<u8>,
preamble_size: usize,
mut stream: BoxStream<'static, object_store::Result<Bytes>>,
) -> Result<SchemaRef> {
let (meta_len, rest_of_bytes_start_index): ([u8; 4], usize) = (
bytes[preamble_size..preamble_size + 4]
.try_into()
.map_err(|err| {
ArrowError::ParseError(format!(
"Unable to read IPC message as metadata length: {err:?}"
))
})?,
preamble_size + 4,
);

let meta_len = [meta_len[0], meta_len[1], meta_len[2], meta_len[3]];
let meta_len = i32::from_le_bytes(meta_len);

Expand Down Expand Up @@ -427,7 +493,8 @@ async fn collect_at_least_n_bytes(
if buf.len() < n {
return Err(ArrowError::ParseError(
"Unexpected end of byte stream for Arrow IPC file".to_string(),
))?;
)
.into());
}
Ok(buf)
}
Expand Down Expand Up @@ -524,79 +591,80 @@ mod tests {

#[tokio::test]
async fn test_infer_schema_stream() -> Result<()> {
let mut bytes = std::fs::read("tests/data/example.arrow")?;
bytes.truncate(bytes.len() - 20); // mangle end to show we don't need to read whole file
let location = Path::parse("example.arrow")?;
let in_memory_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
in_memory_store.put(&location, bytes.into()).await?;

let state = MockSession::new();
let object_meta = ObjectMeta {
location,
last_modified: DateTime::default(),
size: u64::MAX,
e_tag: None,
version: None,
};

let arrow_format = ArrowFormat {};
let expected = vec!["f0: Int64", "f1: Utf8", "f2: Boolean"];
for file in ["example.arrow", "example_stream.arrow"] {
let mut bytes = std::fs::read(format!("tests/data/{file}"))?;
bytes.truncate(bytes.len() - 20); // mangle end to show we don't need to read whole file
let location = Path::parse(file)?;
let in_memory_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
in_memory_store.put(&location, bytes.into()).await?;

let state = MockSession::new();
let object_meta = ObjectMeta {
location,
last_modified: DateTime::default(),
size: u64::MAX,
e_tag: None,
version: None,
};

// Test chunk sizes where too small so we keep having to read more bytes
// And when large enough that first read contains all we need
for chunk_size in [7, 3000] {
let store = Arc::new(ChunkedStore::new(in_memory_store.clone(), chunk_size));
let inferred_schema = arrow_format
.infer_schema(
&state,
&(store.clone() as Arc<dyn ObjectStore>),
std::slice::from_ref(&object_meta),
)
.await?;
let actual_fields = inferred_schema
.fields()
.iter()
.map(|f| format!("{}: {:?}", f.name(), f.data_type()))
.collect::<Vec<_>>();
assert_eq!(expected, actual_fields);
let arrow_format = ArrowFormat {};
let expected = vec!["f0: Int64", "f1: Utf8", "f2: Boolean"];

// Test chunk sizes where too small so we keep having to read more bytes
// And when large enough that first read contains all we need
for chunk_size in [7, 3000] {
let store =
Arc::new(ChunkedStore::new(in_memory_store.clone(), chunk_size));
let inferred_schema = arrow_format
.infer_schema(
&state,
&(store.clone() as Arc<dyn ObjectStore>),
std::slice::from_ref(&object_meta),
)
.await?;
let actual_fields = inferred_schema
.fields()
.iter()
.map(|f| format!("{}: {:?}", f.name(), f.data_type()))
.collect::<Vec<_>>();
assert_eq!(expected, actual_fields);
}
}

Ok(())
}

#[tokio::test]
async fn test_infer_schema_short_stream() -> Result<()> {
let mut bytes = std::fs::read("tests/data/example.arrow")?;
bytes.truncate(20); // should cause error that file shorter than expected
let location = Path::parse("example.arrow")?;
let in_memory_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
in_memory_store.put(&location, bytes.into()).await?;

let state = MockSession::new();
let object_meta = ObjectMeta {
location,
last_modified: DateTime::default(),
size: u64::MAX,
e_tag: None,
version: None,
};
for file in ["example.arrow", "example_stream.arrow"] {
let mut bytes = std::fs::read(format!("tests/data/{file}"))?;
bytes.truncate(20); // should cause error that file shorter than expected
let location = Path::parse(file)?;
let in_memory_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
in_memory_store.put(&location, bytes.into()).await?;

let state = MockSession::new();
let object_meta = ObjectMeta {
location,
last_modified: DateTime::default(),
size: u64::MAX,
e_tag: None,
version: None,
};

let arrow_format = ArrowFormat {};
let arrow_format = ArrowFormat {};

let store = Arc::new(ChunkedStore::new(in_memory_store.clone(), 7));
let err = arrow_format
.infer_schema(
&state,
&(store.clone() as Arc<dyn ObjectStore>),
std::slice::from_ref(&object_meta),
)
.await;
let store = Arc::new(ChunkedStore::new(in_memory_store.clone(), 7));
let err = arrow_format
.infer_schema(
&state,
&(store.clone() as Arc<dyn ObjectStore>),
std::slice::from_ref(&object_meta),
)
.await;

assert!(err.is_err());
assert_eq!(
"Arrow error: Parser error: Unexpected end of byte stream for Arrow IPC file",
err.unwrap_err().to_string().lines().next().unwrap()
);
assert!(err.is_err());
assert_eq!( "Arrow error: Parser error: Unexpected end of byte stream for Arrow IPC file", err.unwrap_err().to_string().lines().next().unwrap());
}

Ok(())
}
Expand Down
Loading