Skip to content

Conversation

@corasaurus-hex
Copy link
Contributor

Which issue does this PR close?

Rationale for this change

Currently Datafusion can only read Arrow files if the're in the File format, not the Stream format. I work with a bunch of Stream format files and wanted native support.

What changes are included in this PR?

To accomplish the above, this PR splits the Arrow datasource into two separate implementations (ArrowStream* and ArrowFile*) with a facade on top to differentiate between the formats at query planning time.

Are these changes tested?

Yes, there are end-to-end sqllogictests along with tests for the changes within datasource-arrow.

Are there any user-facing changes?

Technically yes, in that we support a new format now. I'm not sure which documentation would need to be updated?

@github-actions github-actions bot added core Core DataFusion crate sqllogictest SQL Logic Tests (.slt) datasource Changes to the datasource crate labels Nov 3, 2025
// correct offset which is a lot of duplicate I/O. We're opting to avoid
// that entirely by only acting on a single partition and reading sequentially.
Ok(None)
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is perhaps the weightiest decision in this PR. if we want to repartition a file in the ipc stream format then we need to read from the beginning of the file for each partition, or figure out another way to create the ad-hoc equivalent of the ipc file format footer so we can minimize duplicate reads (likely by reading the entire file all the way through once and then caching the result in memory for the execution plan to use for each partition)

Copy link

@jdcasale jdcasale Nov 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd argue that while this problem is worth solving, doing so is tangent to this change.
I'd like to see this solved, but I see no reason why we couldn't solve this in a follow-on.

Probably worth documenting the practical consequences of leaving it in this state though -- correct me if I'm wrong here, but I think this means that we end up hydrating the entire file into memory for certain operations, right? That's probably not a good long-term state.

nvm, after rereading I misunderstood this, it only affects IO

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't imagine this would mean I need to read the entire file into memory and keep it there? In my previous message I meant we would need to read all the record batch and dictionary locations and keep them in memory in much the same way that the arrow file format footer does. So it would mean a single pass through to record all of that and then multiple threads can seek to different parts of the file and process it.

That's my understanding of the effect of this, that it means we can't parallelize queries against this file format.

If you believe that the resulting behavior would be pathological to the extreme then we should absolutely document that. Thoughts on how we can reliably test that it is? Or who might be aware of the implications of this? And where to document it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think partitioning is doable, but it's better be done afterwards if anyone has a real use case.

In order to do repartition, this function has to scan once, record the dictionary and batch positions, then split the work evenly to parallel partitioned workers -- this task's can be done at around full disk bandwidth speed (5GB/Sec on recent MacBooks)
Regarding decoding the batches from Arrow IPC Stream file to in-memory arrow RecordBatches, if dictionary encoding and some heavy weigh compression like zstd is applied, the bandwidth can be way lower (several hundred MB/S)
So it's still worth a whole scan up front to make the whole processing faster with partitioning, though I don't known if it's a common requirement to query large IPC Stream file.

Copy link

@jdcasale jdcasale left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is basically right. Couple of nits, one question.

// correct offset which is a lot of duplicate I/O. We're opting to avoid
// that entirely by only acting on a single partition and reading sequentially.
Ok(None)
}
Copy link

@jdcasale jdcasale Nov 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd argue that while this problem is worth solving, doing so is tangent to this change.
I'd like to see this solved, but I see no reason why we couldn't solve this in a follow-on.

Probably worth documenting the practical consequences of leaving it in this state though -- correct me if I'm wrong here, but I think this means that we end up hydrating the entire file into memory for certain operations, right? That's probably not a good long-term state.

nvm, after rereading I misunderstood this, it only affects IO

@github-actions github-actions bot removed the documentation Improvements or additions to documentation label Nov 9, 2025
@corasaurus-hex
Copy link
Contributor Author

And another update: I think the other PR had the right of it. We maintain backwards compatibility and keep users from having to know about the naming messiness. I've merged that PR back into this one and pulled in Adrian's changes from main.

Copy link

@jdcasale jdcasale left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don’t have merge permissions, but have reviewed, all my questions have been addressed — lgtm

Copy link
Contributor

@2010YOUY01 2010YOUY01 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the amazing work -- this looks good to me, left several suggestions.

However, I'm not deeply familiar with the related code, so I'd prefer that others to approve and merge it.

/// Does not hold anything special, since [`FileScanConfig`] is sufficient for arrow
/// `FileSource` for Arrow IPC file format. Supports range-based parallel reading.
#[derive(Clone)]
pub struct ArrowSource {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here there seems to be several public API changes like ArrowSource and ArrowOpener, it would be great to include them in the upgrade guide https://github.com/apache/datafusion/blob/main/docs/source/library-user-guide/upgrading.md
Though I'm not sure if we can make it in 51.0.0, if not this should be under 52.0.0 section

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the sounds of it it's not going to make it into 51. Do I just make a section for 52?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that would be great.

@2010YOUY01 2010YOUY01 added the api change Changes the API exposed to users of the crate label Nov 10, 2025
@2010YOUY01
Copy link
Contributor

I do find one thing annoying, but I don't know if it's impacting this PR. We are calling these ArrowFileSource and ArrowStreamSource, but both of them are file readers, right? It's just that one is stored in a random access approach and one is stored in a stream approach. When I see the name ArrowStreamSource I would intuitively think that means some kind of Arrow stream. Especially if I see the two of those next to each other, my intuition would be that one is a streaming source and one is a file source. I know you're reusing the terminology in the Arrow spec, so again I may be overthinking this.

Perhaps ArrowIPCStreamSource if that's the formal term 🤔

}

fn with_batch_size(&self, _batch_size: usize) -> Arc<dyn FileSource> {
Arc::new(Self { ..self.clone() })
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Arc::new(Self { ..self.clone() })
Arc::new(self.clone())

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the Source impl does not support batch_size nothing is changed and it could just return Self here.
Same from ArrowFileOpener

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would be a change in behavior, though, since it it's supposed to return an entirely new clone of the data? And this code is the same as what was here before and also what is given as an example in MockSource in the main crate.

}

fn with_projection(&self, _config: &FileScanConfig) -> Arc<dyn FileSource> {
Arc::new(Self { ..self.clone() })
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Arc::new(Self { ..self.clone() })
Arc::new(self.clone())

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

..Default::default()
};
let bytes = store
.get_opts(object_location, get_opts)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it return an error here if the file is less than 6 bytes ?
According to https://docs.rs/object_store/latest/object_store/struct.GetOptions.html#structfield.range it returns https://docs.rs/object_store/latest/object_store/enum.Error.html#variant.NotModified
If this error is indeed returned then the check below bytes.len() >= 6 is not really needed. It actually confuses the maintainer that it is possible bytes to be less than 6 bytes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My reading of this is that it can be less than 6 bytes: https://github.com/apache/arrow-rs-object-store/blob/main/src/util.rs#L196-L199

@corasaurus-hex
Copy link
Contributor Author

@2010YOUY01 I'm fine with any name, but technically all the arrow sources in here would be ArrowIPC*, both file and stream. Since these are internal to the crate right now can we decide on a name in another PR or in another venue? Or is there a mechanism for making a decisions around something like this? This would be the third name change, including my own change, in this PR, and I'd rather get consensus somehow so I can make one final change (if needed)

Copy link
Member

@timsaucer timsaucer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the contribution!

@timsaucer timsaucer added this pull request to the merge queue Nov 10, 2025
Merged via the queue into apache:main with commit 900ee65 Nov 10, 2025
28 checks passed
Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @corasaurus-hex this is a great contribution

//!
//! # Naming Note
//!
//! The naming in this module can be confusing:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

@alamb
Copy link
Contributor

alamb commented Nov 10, 2025

Thanks also to everyone for the review

@corasaurus-hex
Copy link
Contributor Author

Yeah, thanks everyone for your reviews! It's much better than it was at the start!!

@2010YOUY01
Copy link
Contributor

@2010YOUY01 I'm fine with any name, but technically all the arrow sources in here would be ArrowIPC*, both file and stream. Since these are internal to the crate right now can we decide on a name in another PR or in another venue? Or is there a mechanism for making a decisions around something like this? This would be the third name change, including my own change, in this PR, and I'd rather get consensus somehow so I can make one final change (if needed)

There are no formal naming rules, but my approach is to choose names that make their purpose as clear as possible. Using Stream here might be ambiguous due to other system concepts, so I’d prefer using a more explicit name like IPCStream.

This is just a minor point, though — we can proceed with either option.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

api change Changes the API exposed to users of the crate core Core DataFusion crate datasource Changes to the datasource crate sqllogictest SQL Logic Tests (.slt)

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Add support for registering files in the Arrow IPC stream format as tables using register_arrow or similar

7 participants