Skip to content

Commit

Permalink
fusedev: support splice to handle FUSE requests.
Browse files Browse the repository at this point in the history
1. Support enable splice_read/splice_write on FuseChannel.
2. Add splice interface for ZeroCopyReader and ZeroCopyWriter.
3. Add unit-test cases for splice interface.

Signed-off-by: henry.hj <[email protected]>
  • Loading branch information
henry.hj committed Apr 14, 2023
1 parent f18d4be commit 0131745
Show file tree
Hide file tree
Showing 9 changed files with 1,443 additions and 248 deletions.
84 changes: 84 additions & 0 deletions src/api/filesystem/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ use crate::abi::fuse_abi::{ino64_t, stat64};
mod async_io;
#[cfg(feature = "async-io")]
pub use async_io::{AsyncFileSystem, AsyncZeroCopyReader, AsyncZeroCopyWriter};
#[cfg(all(target_os = "linux", feature = "fusedev"))]
use std::os::unix::io::{AsRawFd, FromRawFd};

mod sync_io;
pub use sync_io::FileSystem;
Expand Down Expand Up @@ -208,6 +210,16 @@ pub trait ZeroCopyReader: io::Read {
off: u64,
) -> io::Result<usize>;

/// Copies at most `count` bytes from `self` directly into `f` at offset `off` with less data copy
/// `f` could be local file description or tcp socket
#[cfg(all(target_os = "linux", feature = "fusedev"))]
fn splice_to(&mut self, f: &dyn AsRawFd, count: usize, off: u64) -> io::Result<usize> {
let mut file = unsafe { std::fs::File::from_raw_fd(f.as_raw_fd()) };
let res = self.read_to(&mut file, count, off);
std::mem::forget(file);
res
}

/// Copies exactly `count` bytes of data from `self` into `f` at offset `off`. `off + count`
/// must be less than `u64::MAX`.
///
Expand Down Expand Up @@ -251,6 +263,46 @@ pub trait ZeroCopyReader: io::Read {
Ok(())
}

/// Copies exactly `count` bytes of data from `self` into `f` at offset `off`. `off + count`
/// must be less than `u64::MAX`.
/// `f` could be local file description or tcp socket
#[cfg(all(target_os = "linux", feature = "fusedev"))]
fn splice_exact_to(
&mut self,
f: &mut dyn AsRawFd,
mut count: usize,
mut off: u64,
) -> io::Result<()> {
let c = count
.try_into()
.map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e))?;
if off.checked_add(c).is_none() {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
"`off` + `count` must be less than u64::MAX",
));
}

while count > 0 {
match self.splice_to(f, count, off) {
Ok(0) => {
return Err(io::Error::new(
io::ErrorKind::WriteZero,
"failed to fill whole buffer",
))
}
Ok(n) => {
count -= n;
off += n as u64;
}
Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {}
Err(e) => return Err(e),
}
}

Ok(())
}

/// Copies all remaining bytes from `self` into `f` at offset `off`. Equivalent to repeatedly
/// calling `read_to` until it returns either `Ok(0)` or a non-`ErrorKind::Interrupted` error.
///
Expand All @@ -275,6 +327,24 @@ pub trait ZeroCopyReader: io::Read {
}
}
}

/// Copies all remaining bytes from `self` into `f` at offset `off`.
/// `f` could be local file description or tcp socket
#[cfg(all(target_os = "linux", feature = "fusedev"))]
fn splice_to_end(&mut self, f: &mut dyn AsRawFd, mut off: u64) -> io::Result<usize> {
let mut out = 0;
loop {
match self.splice_to(f, ::std::usize::MAX, off) {
Ok(0) => return Ok(out),
Ok(n) => {
off = off.saturating_add(n as u64);
out += n;
}
Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {}
Err(e) => return Err(e),
}
}
}
}

/// A trait for directly copying data from a `File` into the fuse transport without first storing
Expand All @@ -300,6 +370,20 @@ pub trait ZeroCopyWriter: io::Write {
off: u64,
) -> io::Result<usize>;

/// Append `count` bytes data from fd `f` at offset `off`
/// `f` should be file description or socket
/// This data always at the end of all data, only available for bufferd writer.
/// For example:
/// We already write "aaa to writer. Then we append fd buf witch contains "bbb".
/// Finally we write "ccc" to writer. The final data is "aaacccbbb".
///
/// # Errors
/// EINVAL: writer doesn't support this operation, should fallback to use `write_from`.
#[cfg(all(target_os = "linux", feature = "fusedev"))]
fn append_fd_buf(&mut self, _f: &dyn AsRawFd, _count: usize, _off: u64) -> io::Result<usize> {
Err(io::Error::from_raw_os_error(libc::EINVAL))
}

/// Copies exactly `count` bytes of data from `f` at offset `off` into `self`. `off + count`
/// must be less than `u64::MAX`.
///
Expand Down
59 changes: 40 additions & 19 deletions src/api/server/async_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,15 +60,15 @@ impl<'a, S: BitmapSlice> io::Read for AsyncZcReader<'a, S> {
}
}

struct AsyncZcWriter<'a, S: BitmapSlice = ()>(Writer<'a, S>);
struct AsyncZcWriter<'a, 'b, S: BitmapSlice = ()>(Writer<'a, 'b, S>);

// The underlying VolatileSlice contains "*mut u8", which is just a pointer to a u8 array.
// Actually we rely on the AsyncExecutor is a single-threaded worker, and we do not really send
// 'Reader' to other threads.
unsafe impl<'a, S: BitmapSlice> Send for AsyncZcWriter<'a, S> {}
unsafe impl<'a, 'b, S: BitmapSlice> Send for AsyncZcWriter<'a, 'b, S> {}

#[async_trait(?Send)]
impl<'a, S: BitmapSlice> AsyncZeroCopyWriter for AsyncZcWriter<'a, S> {
impl<'a, 'b, S: BitmapSlice> AsyncZeroCopyWriter for AsyncZcWriter<'a, 'b, S> {
async fn async_write_from(
&mut self,
f: Arc<dyn AsyncFileReadWriteVolatile>,
Expand All @@ -79,7 +79,7 @@ impl<'a, S: BitmapSlice> AsyncZeroCopyWriter for AsyncZcWriter<'a, S> {
}
}

impl<'a, S: BitmapSlice> ZeroCopyWriter for AsyncZcWriter<'a, S> {
impl<'a, 'b, S: BitmapSlice> ZeroCopyWriter for AsyncZcWriter<'a, 'b, S> {
fn write_from(
&mut self,
f: &mut dyn FileReadWriteVolatile,
Expand All @@ -90,7 +90,7 @@ impl<'a, S: BitmapSlice> ZeroCopyWriter for AsyncZcWriter<'a, S> {
}
}

impl<'a, S: BitmapSlice> io::Write for AsyncZcWriter<'a, S> {
impl<'a, 'b, S: BitmapSlice> io::Write for AsyncZcWriter<'a, 'b, S> {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.0.write(buf)
}
Expand All @@ -116,7 +116,7 @@ impl<F: AsyncFileSystem + Sync> Server<F> {
pub async unsafe fn async_handle_message<S: BitmapSlice>(
&self,
mut r: Reader<'_, S>,
w: Writer<'_, S>,
w: Writer<'_, '_, S>,
vu_req: Option<&mut dyn FsCacheReqHandler>,
hook: Option<&dyn MetricsHook>,
) -> Result<usize> {
Expand Down Expand Up @@ -210,10 +210,13 @@ impl<F: AsyncFileSystem + Sync> Server<F> {
res
}

async fn async_lookup<S: BitmapSlice>(&self, mut ctx: SrvContext<'_, F, S>) -> Result<usize> {
async fn async_lookup<S: BitmapSlice>(
&self,
mut ctx: SrvContext<'_, '_, F, S>,
) -> Result<usize> {
let buf = ServerUtil::get_message_body(&mut ctx.r, &ctx.in_header, 0)?;
let name = bytes_to_cstr(buf.as_ref())?;
let version = self.vers.load();
let version = &self.meta.load().version;
let result = self
.fs
.async_lookup(ctx.context(), ctx.nodeid(), name)
Expand All @@ -236,7 +239,10 @@ impl<F: AsyncFileSystem + Sync> Server<F> {
}
}

async fn async_getattr<S: BitmapSlice>(&self, mut ctx: SrvContext<'_, F, S>) -> Result<usize> {
async fn async_getattr<S: BitmapSlice>(
&self,
mut ctx: SrvContext<'_, '_, F, S>,
) -> Result<usize> {
let GetattrIn { flags, fh, .. } = ctx.r.read_obj().map_err(Error::DecodeMessage)?;
let handle = if (flags & GETATTR_FH) != 0 {
Some(fh.into())
Expand All @@ -251,7 +257,10 @@ impl<F: AsyncFileSystem + Sync> Server<F> {
ctx.async_handle_attr_result(result).await
}

async fn async_setattr<S: BitmapSlice>(&self, mut ctx: SrvContext<'_, F, S>) -> Result<usize> {
async fn async_setattr<S: BitmapSlice>(
&self,
mut ctx: SrvContext<'_, '_, F, S>,
) -> Result<usize> {
let setattr_in: SetattrIn = ctx.r.read_obj().map_err(Error::DecodeMessage)?;
let handle = if setattr_in.valid & FATTR_FH != 0 {
Some(setattr_in.fh.into())
Expand All @@ -268,7 +277,7 @@ impl<F: AsyncFileSystem + Sync> Server<F> {
ctx.async_handle_attr_result(result).await
}

async fn async_open<S: BitmapSlice>(&self, mut ctx: SrvContext<'_, F, S>) -> Result<usize> {
async fn async_open<S: BitmapSlice>(&self, mut ctx: SrvContext<'_, '_, F, S>) -> Result<usize> {
let OpenIn { flags, fuse_flags } = ctx.r.read_obj().map_err(Error::DecodeMessage)?;
let result = self
.fs
Expand All @@ -289,7 +298,7 @@ impl<F: AsyncFileSystem + Sync> Server<F> {
}
}

async fn async_read<S: BitmapSlice>(&self, mut ctx: SrvContext<'_, F, S>) -> Result<usize> {
async fn async_read<S: BitmapSlice>(&self, mut ctx: SrvContext<'_, '_, F, S>) -> Result<usize> {
let ReadIn {
fh,
offset,
Expand Down Expand Up @@ -347,7 +356,7 @@ impl<F: AsyncFileSystem + Sync> Server<F> {
.await
.map_err(Error::EncodeMessage)?;
ctx.w
.async_commit(Some(&data_writer.0))
.async_commit(Some(&mut data_writer.0))
.await
.map_err(Error::EncodeMessage)?;
Ok(out.len as usize)
Expand All @@ -356,7 +365,10 @@ impl<F: AsyncFileSystem + Sync> Server<F> {
}
}

async fn async_write<S: BitmapSlice>(&self, mut ctx: SrvContext<'_, F, S>) -> Result<usize> {
async fn async_write<S: BitmapSlice>(
&self,
mut ctx: SrvContext<'_, '_, F, S>,
) -> Result<usize> {
let WriteIn {
fh,
offset,
Expand Down Expand Up @@ -408,7 +420,10 @@ impl<F: AsyncFileSystem + Sync> Server<F> {
}
}

async fn async_fsync<S: BitmapSlice>(&self, mut ctx: SrvContext<'_, F, S>) -> Result<usize> {
async fn async_fsync<S: BitmapSlice>(
&self,
mut ctx: SrvContext<'_, '_, F, S>,
) -> Result<usize> {
let FsyncIn {
fh, fsync_flags, ..
} = ctx.r.read_obj().map_err(Error::DecodeMessage)?;
Expand All @@ -424,7 +439,10 @@ impl<F: AsyncFileSystem + Sync> Server<F> {
}
}

async fn async_fsyncdir<S: BitmapSlice>(&self, mut ctx: SrvContext<'_, F, S>) -> Result<usize> {
async fn async_fsyncdir<S: BitmapSlice>(
&self,
mut ctx: SrvContext<'_, '_, F, S>,
) -> Result<usize> {
let FsyncIn {
fh, fsync_flags, ..
} = ctx.r.read_obj().map_err(Error::DecodeMessage)?;
Expand All @@ -440,7 +458,10 @@ impl<F: AsyncFileSystem + Sync> Server<F> {
}
}

async fn async_create<S: BitmapSlice>(&self, mut ctx: SrvContext<'_, F, S>) -> Result<usize> {
async fn async_create<S: BitmapSlice>(
&self,
mut ctx: SrvContext<'_, '_, F, S>,
) -> Result<usize> {
let args: CreateIn = ctx.r.read_obj().map_err(Error::DecodeMessage)?;
let buf = ServerUtil::get_message_body(&mut ctx.r, &ctx.in_header, size_of::<CreateIn>())?;
let name = bytes_to_cstr(&buf)?;
Expand Down Expand Up @@ -476,7 +497,7 @@ impl<F: AsyncFileSystem + Sync> Server<F> {

async fn async_fallocate<S: BitmapSlice>(
&self,
mut ctx: SrvContext<'_, F, S>,
mut ctx: SrvContext<'_, '_, F, S>,
) -> Result<usize> {
let FallocateIn {
fh,
Expand All @@ -497,7 +518,7 @@ impl<F: AsyncFileSystem + Sync> Server<F> {
}
}

impl<'a, F: AsyncFileSystem, S: BitmapSlice> SrvContext<'a, F, S> {
impl<'a, 'b, F: AsyncFileSystem, S: BitmapSlice> SrvContext<'a, 'b, F, S> {
async fn async_reply_ok<T: ByteValued>(
&mut self,
out: Option<T>,
Expand Down
Loading

0 comments on commit 0131745

Please sign in to comment.