Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 34 additions & 3 deletions vortex-io/src/tokio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
use std::fs::File;
use std::io;
use std::ops::{Deref, Range};
use std::os::unix::fs::FileExt;
use std::path::Path;
use std::sync::Arc;

Expand Down Expand Up @@ -50,12 +49,15 @@ impl Deref for TokioFile {
}

impl VortexReadAt for TokioFile {

#[cfg(unix)]
#[tracing::instrument(skip_all, fields(range, alignment))]
async fn read_byte_range(
&self,
range: Range<u64>,
alignment: Alignment,
) -> io::Result<ByteBuffer> {
use std::os::unix::fs::FileExt;
let len = usize::try_from(range.end - range.start).vortex_expect("range too big for usize");
let this = self.clone();

Expand All @@ -68,6 +70,26 @@ impl VortexReadAt for TokioFile {
.await?
}

#[cfg(windows)]
#[tracing::instrument(skip_all, fields(range, alignment))]
async fn read_byte_range(
&self,
range: Range<u64>,
alignment: Alignment,
) -> io::Result<ByteBuffer> {
let len = usize::try_from(range.end - range.start).expect("range too big for usize");
let this = self.clone();

spawn_blocking(move || {
use std::os::windows::fs::FileExt;
let mut buffer = ByteBufferMut::with_capacity_aligned(len, alignment);
unsafe { buffer.set_len(len) }
this.seek_read(&mut buffer, range.start)?;
Ok(buffer.freeze())
})
.await?
}

fn performance_hint(&self) -> PerformanceHint {
PerformanceHint::local()
}
Expand Down Expand Up @@ -98,7 +120,6 @@ mod tests {
use std::fs::File;
use std::io::Write;
use std::ops::Deref;
use std::os::unix::fs::FileExt;

use tempfile::NamedTempFile;
use vortex_buffer::Alignment;
Expand Down Expand Up @@ -138,8 +159,18 @@ mod tests {

// Create a function to test if we can read from the file
let can_read = |file: &File| {

let mut buffer = vec![0; 7];
file.read_exact_at(&mut buffer, 0).is_ok()
#[cfg(unix)]
{
use std::os::unix::fs::FileExt;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we better using https://docs.rs/positioned-io/latest/positioned_io/ do you think?

Sorry, not dropping this PR, we're just making lots of changes to I/O at the moment!

file.read_exact_at(&mut buffer, 0).is_ok()
}
#[cfg(windows)]
{
use std::os::windows::fs::FileExt;
file.seek_read(&mut buffer, 0).map(|read| read == buffer.len()).unwrap_or(false)
}
};

// Test initial read
Expand Down
Loading