diff --git a/src/api/filesystem/mod.rs b/src/api/filesystem/mod.rs index 9ecadfb71..58c35ef00 100644 --- a/src/api/filesystem/mod.rs +++ b/src/api/filesystem/mod.rs @@ -26,6 +26,8 @@ use crate::abi::fuse_abi::{ino64_t, stat64}; mod async_io; #[cfg(feature = "async-io")] pub use async_io::{AsyncFileSystem, AsyncZeroCopyReader, AsyncZeroCopyWriter}; +#[cfg(all(target_os = "linux", feature = "fusedev"))] +use std::os::unix::io::AsRawFd; mod sync_io; pub use sync_io::FileSystem; @@ -208,6 +210,18 @@ pub trait ZeroCopyReader: io::Read { off: u64, ) -> io::Result; + /// Copies at most `count` bytes from `self` directly into `f` at offset `off` with less data copy + /// `f` could be local file description or tcp socket + #[cfg(all(target_os = "linux", feature = "fusedev"))] + fn splice_to( + &mut self, + _f: &dyn AsRawFd, + _count: usize, + _off: Option, + ) -> io::Result { + Err(io::Error::from_raw_os_error(libc::ENOSYS)) + } + /// Copies exactly `count` bytes of data from `self` into `f` at offset `off`. `off + count` /// must be less than `u64::MAX`. /// @@ -251,6 +265,50 @@ pub trait ZeroCopyReader: io::Read { Ok(()) } + /// Copies exactly `count` bytes of data from `self` into `f` at offset `off`. `off + count` + /// must be less than `u64::MAX`. + /// `f` could be local file description or tcp socket + #[cfg(all(target_os = "linux", feature = "fusedev"))] + fn splice_exact_to( + &mut self, + f: &mut dyn AsRawFd, + mut count: usize, + mut off: Option, + ) -> io::Result<()> { + let c = count + .try_into() + .map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e))?; + if let Some(v) = off.as_ref() { + if v.checked_add(c).is_none() { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + "`off` + `count` must be less than u64::MAX", + )); + } + } + + while count > 0 { + match self.splice_to(f, count, off) { + Ok(0) => { + return Err(io::Error::new( + io::ErrorKind::WriteZero, + "failed to fill whole buffer", + )) + } + Ok(n) => { + count -= n; + if let Some(v) = off.as_mut() { + *v += n as u64; + } + } + Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {} + Err(e) => return Err(e), + } + } + + Ok(()) + } + /// Copies all remaining bytes from `self` into `f` at offset `off`. Equivalent to repeatedly /// calling `read_to` until it returns either `Ok(0)` or a non-`ErrorKind::Interrupted` error. /// @@ -275,6 +333,24 @@ pub trait ZeroCopyReader: io::Read { } } } + + /// Copies all remaining bytes from `self` into `f` at offset `off`. + /// `f` could be local file description or tcp socket + #[cfg(all(target_os = "linux", feature = "fusedev"))] + fn splice_to_end(&mut self, f: &mut dyn AsRawFd, mut off: Option) -> io::Result { + let mut out = 0; + loop { + match self.splice_to(f, ::std::usize::MAX, off) { + Ok(0) => return Ok(out), + Ok(n) => { + off.as_mut().map(|v| v.saturating_add(n as u64)); + out += n; + } + Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {} + Err(e) => return Err(e), + } + } + } } /// A trait for directly copying data from a `File` into the fuse transport without first storing @@ -300,6 +376,25 @@ pub trait ZeroCopyWriter: io::Write { off: u64, ) -> io::Result; + /// Append `count` bytes data from fd `f` at offset `off` + /// `f` should be file description or socket + /// This data always at the end of all data, only available for bufferd writer. + /// For example: + /// We already write "aaa to writer. Then we append fd buf witch contains "bbb". + /// Finally we write "ccc" to writer. The final data is "aaacccbbb". + /// + /// # Errors + /// ENOSYS: writer doesn't support this operation, should fallback to use `write_from`. + #[cfg(all(target_os = "linux", feature = "fusedev"))] + fn append_fd_buf( + &mut self, + _f: &dyn AsRawFd, + _count: usize, + _off: Option, + ) -> io::Result { + Err(io::Error::from_raw_os_error(libc::ENOSYS)) + } + /// Copies exactly `count` bytes of data from `f` at offset `off` into `self`. `off + count` /// must be less than `u64::MAX`. /// diff --git a/src/api/server/async_io.rs b/src/api/server/async_io.rs index 463c292c2..9376c5cbf 100644 --- a/src/api/server/async_io.rs +++ b/src/api/server/async_io.rs @@ -60,15 +60,15 @@ impl<'a, S: BitmapSlice> io::Read for AsyncZcReader<'a, S> { } } -struct AsyncZcWriter<'a, S: BitmapSlice = ()>(Writer<'a, S>); +struct AsyncZcWriter<'a, 'b, S: BitmapSlice = ()>(Writer<'a, 'b, S>); // The underlying VolatileSlice contains "*mut u8", which is just a pointer to a u8 array. // Actually we rely on the AsyncExecutor is a single-threaded worker, and we do not really send // 'Reader' to other threads. -unsafe impl<'a, S: BitmapSlice> Send for AsyncZcWriter<'a, S> {} +unsafe impl<'a, 'b, S: BitmapSlice> Send for AsyncZcWriter<'a, 'b, S> {} #[async_trait(?Send)] -impl<'a, S: BitmapSlice> AsyncZeroCopyWriter for AsyncZcWriter<'a, S> { +impl<'a, 'b, S: BitmapSlice> AsyncZeroCopyWriter for AsyncZcWriter<'a, 'b, S> { async fn async_write_from( &mut self, f: Arc, @@ -79,7 +79,7 @@ impl<'a, S: BitmapSlice> AsyncZeroCopyWriter for AsyncZcWriter<'a, S> { } } -impl<'a, S: BitmapSlice> ZeroCopyWriter for AsyncZcWriter<'a, S> { +impl<'a, 'b, S: BitmapSlice> ZeroCopyWriter for AsyncZcWriter<'a, 'b, S> { fn write_from( &mut self, f: &mut dyn FileReadWriteVolatile, @@ -94,7 +94,7 @@ impl<'a, S: BitmapSlice> ZeroCopyWriter for AsyncZcWriter<'a, S> { } } -impl<'a, S: BitmapSlice> io::Write for AsyncZcWriter<'a, S> { +impl<'a, 'b, S: BitmapSlice> io::Write for AsyncZcWriter<'a, 'b, S> { fn write(&mut self, buf: &[u8]) -> io::Result { self.0.write(buf) } @@ -120,7 +120,7 @@ impl Server { pub async unsafe fn async_handle_message( &self, mut r: Reader<'_, S>, - w: Writer<'_, S>, + w: Writer<'_, '_, S>, vu_req: Option<&mut dyn FsCacheReqHandler>, hook: Option<&dyn MetricsHook>, ) -> Result { @@ -222,7 +222,10 @@ impl Server { res } - async fn async_lookup(&self, mut ctx: SrvContext<'_, F, S>) -> Result { + async fn async_lookup( + &self, + mut ctx: SrvContext<'_, '_, F, S>, + ) -> Result { let buf = ServerUtil::get_message_body(&mut ctx.r, &ctx.in_header, 0)?; let name = match bytes_to_cstr(buf.as_ref()) { Ok(name) => name, @@ -234,8 +237,7 @@ impl Server { return Err(e); } }; - - let version = self.vers.load(); + let version = &self.meta.load().version; let result = self .fs .async_lookup(ctx.context(), ctx.nodeid(), name) @@ -258,7 +260,10 @@ impl Server { } } - async fn async_getattr(&self, mut ctx: SrvContext<'_, F, S>) -> Result { + async fn async_getattr( + &self, + mut ctx: SrvContext<'_, '_, F, S>, + ) -> Result { let GetattrIn { flags, fh, .. } = ctx.r.read_obj().map_err(Error::DecodeMessage)?; let handle = if (flags & GETATTR_FH) != 0 { Some(fh.into()) @@ -273,7 +278,10 @@ impl Server { ctx.async_handle_attr_result(result).await } - async fn async_setattr(&self, mut ctx: SrvContext<'_, F, S>) -> Result { + async fn async_setattr( + &self, + mut ctx: SrvContext<'_, '_, F, S>, + ) -> Result { let setattr_in: SetattrIn = ctx.r.read_obj().map_err(Error::DecodeMessage)?; let handle = if setattr_in.valid & FATTR_FH != 0 { Some(setattr_in.fh.into()) @@ -290,7 +298,7 @@ impl Server { ctx.async_handle_attr_result(result).await } - async fn async_open(&self, mut ctx: SrvContext<'_, F, S>) -> Result { + async fn async_open(&self, mut ctx: SrvContext<'_, '_, F, S>) -> Result { let OpenIn { flags, fuse_flags } = ctx.r.read_obj().map_err(Error::DecodeMessage)?; let result = self .fs @@ -311,7 +319,7 @@ impl Server { } } - async fn async_read(&self, mut ctx: SrvContext<'_, F, S>) -> Result { + async fn async_read(&self, mut ctx: SrvContext<'_, '_, F, S>) -> Result { let ReadIn { fh, offset, @@ -369,7 +377,7 @@ impl Server { .await .map_err(Error::EncodeMessage)?; ctx.w - .async_commit(Some(&data_writer.0)) + .async_commit(Some(&mut data_writer.0)) .await .map_err(Error::EncodeMessage)?; Ok(out.len as usize) @@ -378,7 +386,10 @@ impl Server { } } - async fn async_write(&self, mut ctx: SrvContext<'_, F, S>) -> Result { + async fn async_write( + &self, + mut ctx: SrvContext<'_, '_, F, S>, + ) -> Result { let WriteIn { fh, offset, @@ -430,7 +441,10 @@ impl Server { } } - async fn async_fsync(&self, mut ctx: SrvContext<'_, F, S>) -> Result { + async fn async_fsync( + &self, + mut ctx: SrvContext<'_, '_, F, S>, + ) -> Result { let FsyncIn { fh, fsync_flags, .. } = ctx.r.read_obj().map_err(Error::DecodeMessage)?; @@ -446,7 +460,10 @@ impl Server { } } - async fn async_fsyncdir(&self, mut ctx: SrvContext<'_, F, S>) -> Result { + async fn async_fsyncdir( + &self, + mut ctx: SrvContext<'_, '_, F, S>, + ) -> Result { let FsyncIn { fh, fsync_flags, .. } = ctx.r.read_obj().map_err(Error::DecodeMessage)?; @@ -462,7 +479,10 @@ impl Server { } } - async fn async_create(&self, mut ctx: SrvContext<'_, F, S>) -> Result { + async fn async_create( + &self, + mut ctx: SrvContext<'_, '_, F, S>, + ) -> Result { let args: CreateIn = ctx.r.read_obj().map_err(Error::DecodeMessage)?; let buf = ServerUtil::get_message_body(&mut ctx.r, &ctx.in_header, size_of::())?; let name = match bytes_to_cstr(buf.as_ref()) { @@ -508,7 +528,7 @@ impl Server { async fn async_fallocate( &self, - mut ctx: SrvContext<'_, F, S>, + mut ctx: SrvContext<'_, '_, F, S>, ) -> Result { let FallocateIn { fh, @@ -529,7 +549,7 @@ impl Server { } } -impl<'a, F: AsyncFileSystem, S: BitmapSlice> SrvContext<'a, F, S> { +impl<'a, 'b, F: AsyncFileSystem, S: BitmapSlice> SrvContext<'a, 'b, F, S> { async fn async_reply_ok( &mut self, out: Option, diff --git a/src/api/server/mod.rs b/src/api/server/mod.rs index cf95455c3..5d77ea532 100644 --- a/src/api/server/mod.rs +++ b/src/api/server/mod.rs @@ -20,6 +20,8 @@ use std::ffi::CStr; use std::io::{self, Read}; use std::marker::PhantomData; use std::mem::size_of; +#[cfg(all(target_os = "linux", feature = "fusedev"))] +use std::os::unix::io::AsRawFd; use std::sync::Arc; use arc_swap::ArcSwap; @@ -50,7 +52,7 @@ pub const MAX_REQ_PAGES: u16 = 256; // 1MB /// Fuse Server to handle requests from the Fuse client and vhost user master. pub struct Server { fs: F, - vers: ArcSwap, + meta: ArcSwap, } impl Server { @@ -58,12 +60,19 @@ impl Server { pub fn new(fs: F) -> Server { Server { fs, - vers: ArcSwap::new(Arc::new(ServerVersion { - major: KERNEL_VERSION, - minor: KERNEL_MINOR_VERSION, - })), + meta: ArcSwap::new(Arc::new(ServerMeta::default())), } } + + /// Whether FUSE module support splice read + pub fn is_support_splice_read(&self) -> bool { + self.meta.load().support_splice_read + } + + /// Whether FUSE module support splice write + pub fn is_support_splice_write(&self) -> bool { + self.meta.load().support_splice_write + } } struct ZcReader<'a, S: BitmapSlice = ()>(Reader<'a, S>); @@ -77,6 +86,14 @@ impl<'a, S: BitmapSlice> ZeroCopyReader for ZcReader<'a, S> { ) -> io::Result { self.0.read_to_at(f, count, off) } + + #[cfg(all(target_os = "linux", feature = "fusedev"))] + fn splice_to(&mut self, f: &dyn AsRawFd, count: usize, off: Option) -> io::Result { + match off { + None => self.0.splice_to(f, count), + Some(off) => self.0.splice_to_at(f, count, off), + } + } } impl<'a, S: BitmapSlice> io::Read for ZcReader<'a, S> { @@ -85,9 +102,9 @@ impl<'a, S: BitmapSlice> io::Read for ZcReader<'a, S> { } } -struct ZcWriter<'a, S: BitmapSlice = ()>(Writer<'a, S>); +struct ZcWriter<'a, 'b, S: BitmapSlice = ()>(Writer<'a, 'b, S>); -impl<'a, S: BitmapSlice> ZeroCopyWriter for ZcWriter<'a, S> { +impl<'a, 'b, S: BitmapSlice> ZeroCopyWriter for ZcWriter<'a, 'b, S> { fn write_from( &mut self, f: &mut dyn FileReadWriteVolatile, @@ -97,12 +114,22 @@ impl<'a, S: BitmapSlice> ZeroCopyWriter for ZcWriter<'a, S> { self.0.write_from_at(f, count, off) } + #[cfg(all(target_os = "linux", feature = "fusedev"))] + fn append_fd_buf( + &mut self, + f: &dyn AsRawFd, + count: usize, + off: Option, + ) -> io::Result { + self.0.append_fd_buf(f, count, off) + } + fn available_bytes(&self) -> usize { self.0.available_bytes() } } -impl<'a, S: BitmapSlice> io::Write for ZcWriter<'a, S> { +impl<'a, 'b, S: BitmapSlice> io::Write for ZcWriter<'a, 'b, S> { fn write(&mut self, buf: &[u8]) -> io::Result { self.0.write(buf) } @@ -118,6 +145,23 @@ struct ServerVersion { minor: u32, } +#[derive(Default)] +struct ServerMeta { + #[allow(dead_code)] + version: ServerVersion, + support_splice_read: bool, + support_splice_write: bool, +} + +impl Default for ServerVersion { + fn default() -> Self { + Self { + major: KERNEL_VERSION, + minor: KERNEL_MINOR_VERSION, + } + } +} + struct ServerUtil(); impl ServerUtil { @@ -166,17 +210,17 @@ pub trait MetricsHook { fn release(&self, oh: Option<&OutHeader>); } -struct SrvContext<'a, F, S: BitmapSlice = ()> { +struct SrvContext<'a, 'b, F, S: BitmapSlice = ()> { in_header: InHeader, context: Context, r: Reader<'a, S>, - w: Writer<'a, S>, + w: Writer<'a, 'b, S>, phantom: PhantomData, phantom2: PhantomData, } -impl<'a, F: FileSystem, S: BitmapSlice> SrvContext<'a, F, S> { - fn new(in_header: InHeader, r: Reader<'a, S>, w: Writer<'a, S>) -> Self { +impl<'a, 'b, F: FileSystem, S: BitmapSlice> SrvContext<'a, 'b, F, S> { + fn new(in_header: InHeader, r: Reader<'a, S>, w: Writer<'a, 'b, S>) -> Self { let context = Context::from(&in_header); SrvContext { diff --git a/src/api/server/sync_io.rs b/src/api/server/sync_io.rs index ba2606fca..f48930a01 100644 --- a/src/api/server/sync_io.rs +++ b/src/api/server/sync_io.rs @@ -19,6 +19,7 @@ use crate::abi::virtio_fs::{RemovemappingIn, RemovemappingOne, SetupmappingIn}; use crate::api::filesystem::{ DirEntry, Entry, FileSystem, GetxattrReply, IoctlData, ListxattrReply, }; +use crate::api::server::ServerMeta; #[cfg(feature = "fusedev")] use crate::transport::FuseDevWriter; use crate::transport::{pagesize, FsCacheReqHandler, Reader, Writer}; @@ -66,7 +67,7 @@ impl Server { pub fn handle_message( &self, mut r: Reader<'_, S>, - w: Writer<'_, S>, + w: Writer<'_, '_, S>, vu_req: Option<&mut dyn FsCacheReqHandler>, hook: Option<&dyn MetricsHook>, ) -> Result { @@ -171,7 +172,7 @@ impl Server { res } - fn lookup(&self, mut ctx: SrvContext<'_, F, S>) -> Result { + fn lookup(&self, mut ctx: SrvContext<'_, '_, F, S>) -> Result { let buf = ServerUtil::get_message_body(&mut ctx.r, &ctx.in_header, 0)?; let name = bytes_to_cstr(buf.as_ref()).map_err(|e| { let _ = ctx.reply_error_explicit(io::Error::from_raw_os_error(libc::EINVAL)); @@ -180,7 +181,7 @@ impl Server { })?; #[cfg(not(feature = "fuse-t"))] - let version = self.vers.load(); + let version = &self.meta.load().version; let result = self.fs.lookup(ctx.context(), ctx.nodeid(), name); match result { @@ -205,7 +206,10 @@ impl Server { } } - pub(super) fn forget(&self, mut ctx: SrvContext<'_, F, S>) -> Result { + pub(super) fn forget( + &self, + mut ctx: SrvContext<'_, '_, F, S>, + ) -> Result { let ForgetIn { nlookup } = ctx.r.read_obj().map_err(Error::DecodeMessage)?; self.fs.forget(ctx.context(), ctx.nodeid(), nlookup); @@ -214,7 +218,7 @@ impl Server { Ok(0) } - fn getattr(&self, mut ctx: SrvContext<'_, F, S>) -> Result { + fn getattr(&self, mut ctx: SrvContext<'_, '_, F, S>) -> Result { let GetattrIn { flags, fh, .. } = ctx.r.read_obj().map_err(Error::DecodeMessage)?; let handle = if (flags & GETATTR_FH) != 0 { Some(fh.into()) @@ -226,7 +230,7 @@ impl Server { ctx.handle_attr_result(result) } - fn setattr(&self, mut ctx: SrvContext<'_, F, S>) -> Result { + fn setattr(&self, mut ctx: SrvContext<'_, '_, F, S>) -> Result { let setattr_in: SetattrIn = ctx.r.read_obj().map_err(Error::DecodeMessage)?; let handle = if setattr_in.valid & FATTR_FH != 0 { Some(setattr_in.fh.into()) @@ -242,7 +246,10 @@ impl Server { ctx.handle_attr_result(result) } - pub(super) fn readlink(&self, mut ctx: SrvContext<'_, F, S>) -> Result { + pub(super) fn readlink( + &self, + mut ctx: SrvContext<'_, '_, F, S>, + ) -> Result { match self.fs.readlink(ctx.context(), ctx.nodeid()) { Ok(linkname) => { // We need to disambiguate the option type here even though it is `None`. @@ -252,7 +259,10 @@ impl Server { } } - pub(super) fn symlink(&self, mut ctx: SrvContext<'_, F, S>) -> Result { + pub(super) fn symlink( + &self, + mut ctx: SrvContext<'_, '_, F, S>, + ) -> Result { let buf = ServerUtil::get_message_body(&mut ctx.r, &ctx.in_header, 0)?; // The name and linkname are encoded one after another and separated by a nul character. let (name, linkname) = ServerUtil::extract_two_cstrs(&buf)?; @@ -263,7 +273,7 @@ impl Server { } } - pub(super) fn mknod(&self, mut ctx: SrvContext<'_, F, S>) -> Result { + pub(super) fn mknod(&self, mut ctx: SrvContext<'_, '_, F, S>) -> Result { let MknodIn { mode, rdev, umask, .. } = ctx.r.read_obj().map_err(Error::DecodeMessage)?; @@ -283,7 +293,7 @@ impl Server { } } - pub(super) fn mkdir(&self, mut ctx: SrvContext<'_, F, S>) -> Result { + pub(super) fn mkdir(&self, mut ctx: SrvContext<'_, '_, F, S>) -> Result { let MkdirIn { mode, umask } = ctx.r.read_obj().map_err(Error::DecodeMessage)?; let buf = ServerUtil::get_message_body(&mut ctx.r, &ctx.in_header, size_of::())?; let name = bytes_to_cstr(buf.as_ref()).map_err(|e| { @@ -301,7 +311,10 @@ impl Server { } } - pub(super) fn unlink(&self, mut ctx: SrvContext<'_, F, S>) -> Result { + pub(super) fn unlink( + &self, + mut ctx: SrvContext<'_, '_, F, S>, + ) -> Result { let buf = ServerUtil::get_message_body(&mut ctx.r, &ctx.in_header, 0)?; let name = bytes_to_cstr(buf.as_ref()).map_err(|e| { let _ = ctx.reply_error_explicit(io::Error::from_raw_os_error(libc::EINVAL)); @@ -315,7 +328,7 @@ impl Server { } } - pub(super) fn rmdir(&self, mut ctx: SrvContext<'_, F, S>) -> Result { + pub(super) fn rmdir(&self, mut ctx: SrvContext<'_, '_, F, S>) -> Result { let buf = ServerUtil::get_message_body(&mut ctx.r, &ctx.in_header, 0)?; let name = bytes_to_cstr(buf.as_ref()).map_err(|e| { let _ = ctx.reply_error_explicit(io::Error::from_raw_os_error(libc::EINVAL)); @@ -331,7 +344,7 @@ impl Server { pub(super) fn do_rename( &self, - mut ctx: SrvContext<'_, F, S>, + mut ctx: SrvContext<'_, '_, F, S>, msg_size: usize, newdir: u64, flags: u32, @@ -352,14 +365,20 @@ impl Server { } } - pub(super) fn rename(&self, mut ctx: SrvContext<'_, F, S>) -> Result { + pub(super) fn rename( + &self, + mut ctx: SrvContext<'_, '_, F, S>, + ) -> Result { let RenameIn { newdir, .. } = ctx.r.read_obj().map_err(Error::DecodeMessage)?; self.do_rename(ctx, size_of::(), newdir, 0) } #[cfg(target_os = "linux")] - pub(super) fn rename2(&self, mut ctx: SrvContext<'_, F, S>) -> Result { + pub(super) fn rename2( + &self, + mut ctx: SrvContext<'_, '_, F, S>, + ) -> Result { let Rename2In { newdir, flags, .. } = ctx.r.read_obj().map_err(Error::DecodeMessage)?; let flags = @@ -368,7 +387,7 @@ impl Server { self.do_rename(ctx, size_of::(), newdir, flags) } - pub(super) fn link(&self, mut ctx: SrvContext<'_, F, S>) -> Result { + pub(super) fn link(&self, mut ctx: SrvContext<'_, '_, F, S>) -> Result { let LinkIn { oldnodeid } = ctx.r.read_obj().map_err(Error::DecodeMessage)?; let buf = ServerUtil::get_message_body(&mut ctx.r, &ctx.in_header, size_of::())?; let name = bytes_to_cstr(buf.as_ref()).map_err(|e| { @@ -386,7 +405,7 @@ impl Server { } } - fn open(&self, mut ctx: SrvContext<'_, F, S>) -> Result { + fn open(&self, mut ctx: SrvContext<'_, '_, F, S>) -> Result { let OpenIn { flags, fuse_flags } = ctx.r.read_obj().map_err(Error::DecodeMessage)?; match self.fs.open(ctx.context(), ctx.nodeid(), flags, fuse_flags) { @@ -403,7 +422,7 @@ impl Server { } } - fn read(&self, mut ctx: SrvContext<'_, F, S>) -> Result { + fn read(&self, mut ctx: SrvContext<'_, '_, F, S>) -> Result { let ReadIn { fh, offset, @@ -454,7 +473,7 @@ impl Server { .write_all(out.as_slice()) .map_err(Error::EncodeMessage)?; ctx.w - .commit(Some(&data_writer.0)) + .commit(Some(&mut data_writer.0)) .map_err(Error::EncodeMessage)?; Ok(out.len as usize) } @@ -462,7 +481,7 @@ impl Server { } } - fn write(&self, mut ctx: SrvContext<'_, F, S>) -> Result { + fn write(&self, mut ctx: SrvContext<'_, '_, F, S>) -> Result { let WriteIn { fh, offset, @@ -511,14 +530,20 @@ impl Server { } } - pub(super) fn statfs(&self, mut ctx: SrvContext<'_, F, S>) -> Result { + pub(super) fn statfs( + &self, + mut ctx: SrvContext<'_, '_, F, S>, + ) -> Result { match self.fs.statfs(ctx.context(), ctx.nodeid()) { Ok(st) => ctx.reply_ok(Some(Kstatfs::from(st)), None), Err(e) => ctx.reply_error(e), } } - pub(super) fn release(&self, mut ctx: SrvContext<'_, F, S>) -> Result { + pub(super) fn release( + &self, + mut ctx: SrvContext<'_, '_, F, S>, + ) -> Result { let ReleaseIn { fh, flags, @@ -548,7 +573,7 @@ impl Server { } } - fn fsync(&self, mut ctx: SrvContext<'_, F, S>) -> Result { + fn fsync(&self, mut ctx: SrvContext<'_, '_, F, S>) -> Result { let FsyncIn { fh, fsync_flags, .. } = ctx.r.read_obj().map_err(Error::DecodeMessage)?; @@ -563,7 +588,10 @@ impl Server { } } - pub(super) fn setxattr(&self, mut ctx: SrvContext<'_, F, S>) -> Result { + pub(super) fn setxattr( + &self, + mut ctx: SrvContext<'_, '_, F, S>, + ) -> Result { let SetxattrIn { size, flags } = ctx.r.read_obj().map_err(Error::DecodeMessage)?; let buf = ServerUtil::get_message_body(&mut ctx.r, &ctx.in_header, size_of::())?; @@ -594,7 +622,10 @@ impl Server { } } - pub(super) fn getxattr(&self, mut ctx: SrvContext<'_, F, S>) -> Result { + pub(super) fn getxattr( + &self, + mut ctx: SrvContext<'_, '_, F, S>, + ) -> Result { let GetxattrIn { size, .. } = ctx.r.read_obj().map_err(Error::DecodeMessage)?; if size > MAX_BUFFER_SIZE { return ctx.reply_error_explicit(io::Error::from_raw_os_error(libc::ENOMEM)); @@ -622,7 +653,10 @@ impl Server { } } - pub(super) fn listxattr(&self, mut ctx: SrvContext<'_, F, S>) -> Result { + pub(super) fn listxattr( + &self, + mut ctx: SrvContext<'_, '_, F, S>, + ) -> Result { let GetxattrIn { size, .. } = ctx.r.read_obj().map_err(Error::DecodeMessage)?; if size > MAX_BUFFER_SIZE { @@ -645,7 +679,7 @@ impl Server { pub(super) fn removexattr( &self, - mut ctx: SrvContext<'_, F, S>, + mut ctx: SrvContext<'_, '_, F, S>, ) -> Result { let buf = ServerUtil::get_message_body(&mut ctx.r, &ctx.in_header, 0)?; let name = bytes_to_cstr(buf.as_ref()).map_err(|e| { @@ -660,7 +694,7 @@ impl Server { } } - pub(super) fn flush(&self, mut ctx: SrvContext<'_, F, S>) -> Result { + pub(super) fn flush(&self, mut ctx: SrvContext<'_, '_, F, S>) -> Result { let FlushIn { fh, lock_owner, .. } = ctx.r.read_obj().map_err(Error::DecodeMessage)?; match self @@ -672,7 +706,7 @@ impl Server { } } - pub(super) fn init(&self, mut ctx: SrvContext<'_, F, S>) -> Result { + pub(super) fn init(&self, mut ctx: SrvContext<'_, '_, F, S>) -> Result { let InitIn { major, minor, @@ -742,8 +776,13 @@ impl Server { out.max_pages = MAX_REQ_PAGES; out.max_write = MAX_REQ_PAGES as u32 * pagesize() as u32; // 1MB } - let vers = ServerVersion { major, minor }; - self.vers.store(Arc::new(vers)); + let meta = ServerMeta { + version: ServerVersion { major, minor }, + support_splice_read: capable.contains(FsOptions::SPLICE_READ), + support_splice_write: capable + .contains(FsOptions::SPLICE_WRITE | FsOptions::SPLICE_MOVE), + }; + self.meta.store(Arc::new(meta)); if minor < KERNEL_MINOR_VERSION_INIT_OUT_SIZE { ctx.reply_ok( Some( @@ -772,7 +811,10 @@ impl Server { } } - pub(super) fn opendir(&self, mut ctx: SrvContext<'_, F, S>) -> Result { + pub(super) fn opendir( + &self, + mut ctx: SrvContext<'_, '_, F, S>, + ) -> Result { let OpenIn { flags, .. } = ctx.r.read_obj().map_err(Error::DecodeMessage)?; match self.fs.opendir(ctx.context(), ctx.nodeid(), flags) { @@ -791,7 +833,7 @@ impl Server { fn do_readdir( &self, - mut ctx: SrvContext<'_, F, S>, + mut ctx: SrvContext<'_, '_, F, S>, plus: bool, ) -> Result { let ReadIn { @@ -847,23 +889,28 @@ impl Server { ctx.w .write_all(out.as_slice()) .map_err(Error::EncodeMessage)?; - ctx.w.commit(Some(&cursor)).map_err(Error::EncodeMessage)?; + ctx.w + .commit(Some(&mut cursor)) + .map_err(Error::EncodeMessage)?; Ok(out.len as usize) } } - pub(super) fn readdir(&self, ctx: SrvContext<'_, F, S>) -> Result { + pub(super) fn readdir(&self, ctx: SrvContext<'_, '_, F, S>) -> Result { self.do_readdir(ctx, false) } #[cfg(target_os = "linux")] - pub(super) fn readdirplus(&self, ctx: SrvContext<'_, F, S>) -> Result { + pub(super) fn readdirplus( + &self, + ctx: SrvContext<'_, '_, F, S>, + ) -> Result { self.do_readdir(ctx, true) } pub(super) fn releasedir( &self, - mut ctx: SrvContext<'_, F, S>, + mut ctx: SrvContext<'_, '_, F, S>, ) -> Result { let ReleaseIn { fh, flags, .. } = ctx.r.read_obj().map_err(Error::DecodeMessage)?; @@ -876,7 +923,7 @@ impl Server { } } - fn fsyncdir(&self, mut ctx: SrvContext<'_, F, S>) -> Result { + fn fsyncdir(&self, mut ctx: SrvContext<'_, '_, F, S>) -> Result { let FsyncIn { fh, fsync_flags, .. } = ctx.r.read_obj().map_err(Error::DecodeMessage)?; @@ -891,7 +938,7 @@ impl Server { } } - pub(super) fn getlk(&self, mut ctx: SrvContext<'_, F, S>) -> Result { + pub(super) fn getlk(&self, mut ctx: SrvContext<'_, '_, F, S>) -> Result { let LkIn { fh, owner, @@ -912,7 +959,7 @@ impl Server { } } - pub(super) fn setlk(&self, mut ctx: SrvContext<'_, F, S>) -> Result { + pub(super) fn setlk(&self, mut ctx: SrvContext<'_, '_, F, S>) -> Result { let LkIn { fh, owner, @@ -933,7 +980,10 @@ impl Server { } } - pub(super) fn setlkw(&self, mut ctx: SrvContext<'_, F, S>) -> Result { + pub(super) fn setlkw( + &self, + mut ctx: SrvContext<'_, '_, F, S>, + ) -> Result { let LkIn { fh, owner, @@ -954,7 +1004,10 @@ impl Server { } } - pub(super) fn access(&self, mut ctx: SrvContext<'_, F, S>) -> Result { + pub(super) fn access( + &self, + mut ctx: SrvContext<'_, '_, F, S>, + ) -> Result { let AccessIn { mask, .. } = ctx.r.read_obj().map_err(Error::DecodeMessage)?; match self.fs.access(ctx.context(), ctx.nodeid(), mask) { @@ -963,7 +1016,7 @@ impl Server { } } - fn create(&self, mut ctx: SrvContext<'_, F, S>) -> Result { + fn create(&self, mut ctx: SrvContext<'_, '_, F, S>) -> Result { let args: CreateIn = ctx.r.read_obj().map_err(Error::DecodeMessage)?; let buf = ServerUtil::get_message_body(&mut ctx.r, &ctx.in_header, size_of::())?; let name = bytes_to_cstr(buf.as_ref()).map_err(|e| { @@ -997,9 +1050,9 @@ impl Server { } } - pub(super) fn interrupt(&self, _ctx: SrvContext<'_, F, S>) {} + pub(super) fn interrupt(&self, _ctx: SrvContext<'_, '_, F, S>) {} - pub(super) fn bmap(&self, mut ctx: SrvContext<'_, F, S>) -> Result { + pub(super) fn bmap(&self, mut ctx: SrvContext<'_, '_, F, S>) -> Result { let BmapIn { block, blocksize, .. } = ctx.r.read_obj().map_err(Error::DecodeMessage)?; @@ -1010,14 +1063,14 @@ impl Server { } } - pub(super) fn destroy(&self, mut ctx: SrvContext<'_, F, S>) { + pub(super) fn destroy(&self, mut ctx: SrvContext<'_, '_, F, S>) { self.fs.destroy(); if let Err(e) = ctx.reply_ok(None::, None) { warn!("fuse channel reply destroy failed {:?}", e); } } - pub(super) fn ioctl(&self, mut ctx: SrvContext<'_, F, S>) -> Result { + pub(super) fn ioctl(&self, mut ctx: SrvContext<'_, '_, F, S>) -> Result { let IoctlIn { fh, flags, @@ -1062,7 +1115,7 @@ impl Server { } } - pub(super) fn poll(&self, mut ctx: SrvContext<'_, F, S>) -> Result { + pub(super) fn poll(&self, mut ctx: SrvContext<'_, '_, F, S>) -> Result { let PollIn { fh, kh, @@ -1091,7 +1144,7 @@ impl Server { pub(super) fn notify_reply( &self, - mut ctx: SrvContext<'_, F, S>, + mut ctx: SrvContext<'_, '_, F, S>, ) -> Result { if let Err(e) = self.fs.notify_reply() { ctx.reply_error(e) @@ -1102,7 +1155,7 @@ impl Server { pub(super) fn batch_forget( &self, - mut ctx: SrvContext<'_, F, S>, + mut ctx: SrvContext<'_, '_, F, S>, ) -> Result { let BatchForgetIn { count, .. } = ctx.r.read_obj().map_err(Error::DecodeMessage)?; @@ -1134,7 +1187,7 @@ impl Server { Ok(0) } - fn fallocate(&self, mut ctx: SrvContext<'_, F, S>) -> Result { + fn fallocate(&self, mut ctx: SrvContext<'_, '_, F, S>) -> Result { let FallocateIn { fh, offset, @@ -1153,7 +1206,7 @@ impl Server { } #[cfg(target_os = "linux")] - pub(super) fn lseek(&self, mut ctx: SrvContext<'_, F, S>) -> Result { + pub(super) fn lseek(&self, mut ctx: SrvContext<'_, '_, F, S>) -> Result { let LseekIn { fh, offset, whence, .. } = ctx.r.read_obj().map_err(Error::DecodeMessage)?; @@ -1176,7 +1229,7 @@ impl Server { impl Server { pub(super) fn setupmapping( &self, - mut ctx: SrvContext<'_, F, S>, + mut ctx: SrvContext<'_, '_, F, S>, vu_req: Option<&mut dyn FsCacheReqHandler>, ) -> Result { if let Some(req) = vu_req { @@ -1208,7 +1261,7 @@ impl Server { pub(super) fn removemapping( &self, - mut ctx: SrvContext<'_, F, S>, + mut ctx: SrvContext<'_, '_, F, S>, vu_req: Option<&mut dyn FsCacheReqHandler>, ) -> Result { if let Some(req) = vu_req { @@ -1244,7 +1297,7 @@ impl Server { } } -impl<'a, F: FileSystem, S: BitmapSlice> SrvContext<'a, F, S> { +impl<'a, 'b, F: FileSystem, S: BitmapSlice> SrvContext<'a, 'b, F, S> { fn reply_ok(&mut self, out: Option, data: Option<&[u8]>) -> Result { let data2 = out.as_ref().map(|v| v.as_slice()).unwrap_or(&[]); let data3 = data.unwrap_or(&[]); @@ -1338,7 +1391,7 @@ impl<'a, F: FileSystem, S: BitmapSlice> SrvContext<'a, F, S> { } fn add_dirent( - cursor: &mut Writer<'_, S>, + cursor: &mut Writer<'_, '_, S>, max: u32, d: DirEntry, entry: Option, diff --git a/src/transport/fusedev/linux_session.rs b/src/transport/fusedev/linux_session.rs index 200aa02fd..cff3403dc 100644 --- a/src/transport/fusedev/linux_session.rs +++ b/src/transport/fusedev/linux_session.rs @@ -9,20 +9,25 @@ //! A FUSE session can have multiple FUSE channels so that FUSE requests are handled in parallel. use std::fs::{File, OpenOptions}; +use std::io; +use std::io::IoSlice; +use std::marker::PhantomData; +use std::mem::ManuallyDrop; use std::ops::Deref; use std::os::unix::fs::PermissionsExt; -use std::os::unix::io::AsRawFd; +use std::os::unix::io::{AsRawFd, RawFd}; use std::os::unix::net::UnixStream; use std::path::{Path, PathBuf}; use std::sync::{Arc, Mutex}; use mio::{Events, Poll, Token, Waker}; use nix::errno::Errno; -use nix::fcntl::{fcntl, FcntlArg, FdFlag, OFlag}; +use nix::fcntl::{fcntl, FcntlArg, FdFlag, OFlag, SpliceFFlags}; use nix::mount::{mount, umount2, MntFlags, MsFlags}; use nix::poll::{poll, PollFd, PollFlags}; use nix::sys::epoll::{epoll_ctl, EpollEvent, EpollFlags, EpollOp}; use nix::unistd::{getgid, getuid, read}; +use vm_memory::bitmap::BitmapSlice; use super::{ super::pagesize, @@ -33,6 +38,11 @@ use super::{ // These follows definition from libfuse. const POLL_EVENTS_CAPACITY: usize = 1024; +// fuse header consumes 1 buf + data(1MB) consumes at least 256 bufs +const DEFAULT_SPLICE_PIPE_READ_BUF_SIZE: usize = 260; +// fuse header consumes 1 buf + data(128KB) consumes at least 32 bufs +const DEFAULT_SPLICE_PIPE_WRITE_BUF_SIZE: usize = 64; + const FUSE_DEVICE: &str = "/dev/fuse"; const FUSE_FSTYPE: &str = "fuse"; const FUSERMOUNT_BIN: &str = "fusermount3"; @@ -247,6 +257,10 @@ pub struct FuseChannel { poll: Poll, waker: Arc, buf: Vec, + /// pipe for splice read + r_p: Option, + /// pipe for splice write (memory data, fd data) + w_ps: Option<(Pipe, Pipe)>, } impl FuseChannel { @@ -274,6 +288,8 @@ impl FuseChannel { poll, waker, buf: vec![0x0u8; bufsize], + r_p: None, + w_ps: None, }) } @@ -281,41 +297,143 @@ impl FuseChannel { self.waker.clone() } - /// Get next available FUSE request from the underlying fuse device file. + /// Enable using splice syscall to read FUSE requests + /// + /// Improve performance of write request because of less data copy. + /// It's better to check whether FUSE module supports SPLICE_READ before enable this. + /// + /// let mut session = FuseSession::new("./mnt", "fs", "fs", false)?; + /// let mut ch = session.new_channel()?; + /// let fs = Server::new(MyFs::new()); // MyFs impl FileSystem trait + /// loop { + /// // after handle init request, we know whether kernel support splice read + /// if fs.is_support_splice_read() { + /// ch.enable_splice_read(); + /// } + /// let (r, w) = ch.get_request()?.unwrap(); + /// fs.handle_message(r, w, None, None)?; + /// } + /// + pub fn enable_splice_read(&mut self) -> Result<()> { + if self.r_p.is_some() { + return Ok(()); + } + self.r_p = Some(Pipe::new(DEFAULT_SPLICE_PIPE_READ_BUF_SIZE * pagesize())?); + Ok(()) + } + + /// Enable using splice syscall to reply FUSE requests + /// + /// Improve performance of read request because of less data copy. + /// It's better to check whether FUSE module supports SPLICE_WRITE&SPLICE_MOVE before enable this. + /// + /// let mut session = FuseSession::new("./mnt", "fs", "fs", false)?; + /// let mut ch = session.new_channel()?; + /// let fs = Server::new(MyFs::new()); // MyFs impl FileSystem trait + /// loop { + /// // after handle init request, we know whether kernel support splice write + /// if fs.is_support_splice_write() { + /// ch.enable_splice_write(); + /// } + /// let (r, w) = ch.get_request()?.unwrap(); + /// fs.handle_message(r, w, None, None)?; + /// } + /// + pub fn enable_splice_write(&mut self) -> Result<()> { + if self.w_ps.is_some() { + return Ok(()); + } + if self.w_ps.is_none() { + self.w_ps = Some(( + Pipe::new(DEFAULT_SPLICE_PIPE_WRITE_BUF_SIZE * pagesize())?, + Pipe::new(DEFAULT_SPLICE_PIPE_WRITE_BUF_SIZE * pagesize())?, + )); + } + Ok(()) + } + + /// Check whether next FUSE request is available /// /// Returns: - /// - Ok(None): signal has pending on the exiting event channel - /// - Ok(Some((reader, writer))): reader to receive request and writer to send reply - /// - Err(e): error message - pub fn get_request(&mut self) -> Result> { + /// - Ok((available, need_exit)) whether FUSE request is available and whether fuse server need exit + /// - Err(e) error message + fn is_readable(&mut self) -> Result<(bool, bool)> { let mut events = Events::with_capacity(POLL_EVENTS_CAPACITY); let mut need_exit = false; - loop { - let mut fusereq_available = false; - match self.poll.poll(&mut events, None) { - Ok(_) => {} - Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => continue, - Err(e) => return Err(SessionFailure(format!("epoll wait: {e}"))), - } + let mut fusereq_available = false; + match self.poll.poll(&mut events, None) { + Ok(_) => {} + Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => return Ok((false, false)), + Err(e) => return Err(SessionFailure(format!("epoll wait: {}", e))), + } - for event in events.iter() { - if event.is_readable() { - match event.token() { - EXIT_FUSE_EVENT => need_exit = true, - FUSE_DEV_EVENT => fusereq_available = true, - x => { - error!("unexpected epoll event"); - return Err(SessionFailure(format!("unexpected epoll event: {}", x.0))); - } + for event in events.iter() { + if event.is_readable() { + match event.token() { + EXIT_FUSE_EVENT => need_exit = true, + FUSE_DEV_EVENT => fusereq_available = true, + x => { + error!("unexpected epoll event"); + return Err(SessionFailure(format!("unexpected epoll event: {}", x.0))); } - } else if event.is_error() { - info!("FUSE channel already closed!"); - return Err(SessionFailure("epoll error".to_string())); - } else { - // We should not step into this branch as other event is not registered. - panic!("unknown epoll result events"); } + } else if event.is_error() { + info!("FUSE channel already closed!"); + return Err(SessionFailure("epoll error".to_string())); + } else { + // We should not step into this branch as other event is not registered. + panic!("unknown epoll result events"); } + } + Ok((fusereq_available, need_exit)) + } + + fn prepare(&mut self) -> Result<()> { + if self.r_p.is_some() && self.r_p.as_ref().unwrap().is_invalid() { + self.r_p = Some(Pipe::new(DEFAULT_SPLICE_PIPE_READ_BUF_SIZE * pagesize())?); + } + if let Some((vm_p, fd_p)) = self.w_ps.as_mut() { + if vm_p.is_invalid() { + *vm_p = Pipe::new(DEFAULT_SPLICE_PIPE_WRITE_BUF_SIZE * pagesize())?; + } + if fd_p.is_invalid() { + *fd_p = Pipe::new(DEFAULT_SPLICE_PIPE_WRITE_BUF_SIZE * pagesize())?; + } + } + Ok(()) + } + + fn do_splice(&mut self) -> io::Result { + loop { + match nix::fcntl::splice( + self.file.as_raw_fd(), + None, + self.r_p.as_ref().unwrap().wfd(), + None, + self.buf.len(), + SpliceFFlags::empty(), + ) { + Ok(n) => return Ok(n), + Err(e) if e == nix::Error::EIO => { + // FUSE reply EIO if pipe's available buffers are not enough to fill fuse request + // So we need resize pipe buf size + let pipe = self.r_p.as_mut().unwrap(); + pipe.set_size(pipe.size() * 2)?; + } + Err(e) => return Err(e.into()), + } + } + } + + /// Get next available FUSE request from the underlying fuse device file. + /// + /// Returns: + /// - Ok(None): signal has pending on the exiting event channel + /// - Ok(Some((reader, writer))): reader to receive request and writer to send reply + /// - Err(e): error message + pub fn get_request(&mut self) -> Result> { + loop { + let (fusereq_available, need_exit) = self.is_readable()?; // Handle wake up event first. We don't read the event fd so that a LEVEL triggered // event can still be delivered to other threads/daemons. @@ -325,46 +443,88 @@ impl FuseChannel { } if fusereq_available { let fd = self.file.as_raw_fd(); - match read(fd, &mut self.buf) { - Ok(len) => { - // ############################################### - // Note: it's a heavy hack to reuse the same underlying data - // buffer for both Reader and Writer, in order to reduce memory - // consumption. Here we assume Reader won't be used anymore once - // we start to write to the Writer. To get rid of this hack, - // just allocate a dedicated data buffer for Writer. - let buf = unsafe { - std::slice::from_raw_parts_mut(self.buf.as_mut_ptr(), self.buf.len()) - }; - // Reader::new() and Writer::new() should always return success. - let reader = - Reader::from_fuse_buffer(FuseBuf::new(&mut self.buf[..len])).unwrap(); - let writer = FuseDevWriter::new(fd, buf).unwrap(); - return Ok(Some((reader, writer))); - } - Err(e) => match e { - Errno::ENOENT => { - // ENOENT means the operation was interrupted, it's safe to restart - trace!("restart reading due to ENOENT"); - continue; - } - Errno::EAGAIN => { - trace!("restart reading due to EAGAIN"); - continue; - } - Errno::EINTR => { - trace!("syscall interrupted"); - continue; + self.prepare().map_err(|e| { + error!("failed to prepare, err = {:?}", e); + e + })?; + let err = if self.r_p.is_some() { + let res = self.do_splice(); + match res { + Ok(n) => { + let buf = unsafe { + std::slice::from_raw_parts_mut( + self.buf.as_mut_ptr(), + self.buf.len(), + ) + }; + return Ok(Some(( + Reader::from_pipe_reader(splice::PipeReader::new( + self.r_p.as_mut().unwrap(), + n, + )), + FuseDevWriter::with_pipe( + fd, + buf, + self.w_ps.as_mut().map(|(vm_p, fd_p)| { + (PipeWriter::new(vm_p), PipeWriter::new(fd_p)) + }), + ) + .unwrap(), + ))); } - Errno::ENODEV => { - info!("fuse filesystem umounted"); - return Ok(None); - } - e => { - warn! {"read fuse dev failed on fd {}: {}", fd, e}; - return Err(SessionFailure(format!("read new request: {e:?}"))); + Err(e) => e, + } + } else { + match read(fd, &mut self.buf) { + Ok(len) => { + let buf = unsafe { + std::slice::from_raw_parts_mut( + self.buf.as_mut_ptr(), + self.buf.len(), + ) + }; + // Reader::new() and Writer::new() should always return success. + let reader = + Reader::from_fuse_buffer(FuseBuf::new(&mut self.buf[..len])) + .unwrap(); + return Ok(Some(( + reader, + FuseDevWriter::with_pipe( + fd, + buf, + self.w_ps.as_mut().map(|(vm_p, fd_p)| { + (PipeWriter::new(vm_p), PipeWriter::new(fd_p)) + }), + ) + .unwrap(), + ))); } - }, + Err(e) => e.into(), + } + }; + match err.raw_os_error().unwrap_or(libc::EIO) { + libc::ENOENT => { + // ENOENT means the operation was interrupted, it's safe to restart + trace!("restart reading due to ENOENT"); + continue; + } + libc::EAGAIN => { + trace!("restart reading due to EAGAIN"); + continue; + } + libc::EINTR => { + trace!("syscall interrupted"); + continue; + } + libc::ENODEV => { + info!("fuse filesystem umounted"); + return Ok(None); + } + code => { + let e = nix::Error::from_i32(code); + warn! {"read fuse dev failed on fd {}: {}", fd, e}; + return Err(SessionFailure(format!("read new request: {e:?}"))); + } } } } @@ -607,6 +767,477 @@ fn fuse_fusermount_umount(mountpoint: &str, fusermount: &str) -> Result<()> { } } +impl<'a, 'b, S: BitmapSlice + Default> FuseDevWriter<'a, 'b, S> { + /// Construct writer with pipe + pub fn with_pipe( + fd: RawFd, + data_buf: &'a mut [u8], + pw: Option<(PipeWriter<'b>, PipeWriter<'b>)>, + ) -> Result> { + let buf = unsafe { Vec::from_raw_parts(data_buf.as_mut_ptr(), 0, data_buf.len()) }; + Ok(FuseDevWriter { + fd, + buffered: false, + buf: ManuallyDrop::new(buf), + pw, + bitmapslice: S::default(), + phantom: PhantomData, + }) + } +} + +impl<'a, 'b, S: BitmapSlice> FuseDevWriter<'a, 'b, S> { + /// Append fd buffer for FUSE reply. + /// Often for FUSE read requests: + /// fuse reply header in buf and data in fd buffer + pub fn append_fd_buf(&mut self, fd: RawFd, size: u64, off: Option) -> io::Result { + if self.pw.is_none() || !self.buffered { + return Err(io::Error::from_raw_os_error(libc::EINVAL)); + } + self.check_available_space(size as usize)?; + let n = self + .pw + .as_mut() + .unwrap() + .1 + .splice_from(fd, size as usize, off)?; + Ok(n) + } + + pub(crate) fn commit_by_splice( + &mut self, + other: Option<&mut FuseDevWriter<'a, 'b, S>>, + ) -> io::Result { + let (other_bufs, (mut pw, fd_pw)) = match other { + Some(w) => (w.buf.as_slice(), w.pw.take().unwrap()), + _ => (&[] as &[u8], self.pw.take().unwrap()), + }; + match (self.buf.len(), other_bufs.len()) { + (0, 0) => return Ok(0), + (0, _) => pw.splice_from_buf(other_bufs), + (_, 0) => pw.splice_from_buf(&self.buf), + (_, _) => { + let bufs = [IoSlice::new(self.buf.as_slice()), IoSlice::new(other_bufs)]; + pw.splice_from_iovec(&bufs) + } + } + .map_err(|e| { + error! {"fail to vmsplice to pipe on commit: {}", e}; + e + })?; + pw.splice_from_pipe(fd_pw)?; + // final commit to fuse fd + pw.splice_to_all(self.fd).map_err(|e| { + error! {"fail to splice pipe fd to fuse device on commit: {}", e}; + e + }) + } +} + +impl<'a, S: BitmapSlice + Default> Reader<'a, S> { + /// Construct a new Reader wrapper over PipeReader + pub fn from_pipe_reader(reader: splice::PipeReader<'a>) -> Reader<'a, S> { + ReaderInner::Pipe(reader).into() + } +} + +pub(crate) mod splice { + use crate::transport::pagesize; + use nix::fcntl::{FcntlArg, OFlag, SpliceFFlags}; + use std::io::IoSlice; + use std::os::unix::io::RawFd; + + fn nr_pages(start: usize, len: usize) -> usize { + let start_vfn = start / pagesize(); + let end_vfn = (start + len - 1) / pagesize(); + end_vfn - start_vfn + 1 + } + + #[inline] + fn buf_nr_pages(buf: &[u8]) -> usize { + let start = (buf.as_ptr() as usize) / pagesize(); + let end = (unsafe { buf.as_ptr().add(buf.len() - 1) as usize }) / pagesize(); + end - start + 1 + } + + #[derive(Debug)] + pub struct Pipe { + r: RawFd, + w: RawFd, + size: usize, + } + + impl Drop for Pipe { + fn drop(&mut self) { + if self.is_invalid() { + return; + } + nix::unistd::close(self.r).expect("failed to close pipe"); + nix::unistd::close(self.w).expect("failed to close pipe"); + } + } + + impl Pipe { + pub fn new(buf_size: usize) -> std::io::Result { + let (r, w) = nix::unistd::pipe2(OFlag::O_NONBLOCK | OFlag::O_CLOEXEC)?; + nix::fcntl::fcntl(w, FcntlArg::F_SETPIPE_SZ(buf_size as nix::libc::c_int))?; + Ok(Self { + r, + w, + size: buf_size, + }) + } + + pub fn rfd(&self) -> RawFd { + self.r + } + + pub fn wfd(&self) -> RawFd { + self.w + } + + pub fn clear(&mut self) { + let _ = nix::unistd::close(self.r); + let _ = nix::unistd::close(self.w); + self.r = -1; + self.w = -1; + } + + pub fn is_invalid(&self) -> bool { + self.r == -1 + } + + pub fn set_size(&mut self, new_size: usize) -> std::io::Result<()> { + nix::fcntl::fcntl(self.w, FcntlArg::F_SETPIPE_SZ(new_size as nix::libc::c_int))?; + self.size = new_size; + Ok(()) + } + + pub fn size(&self) -> usize { + self.size + } + } + + /// Reader for fuse requests using fuse splice + #[derive(Debug)] + pub struct PipeReader<'a> { + p: &'a mut Pipe, + n_bytes: usize, + bytes_consumed: usize, + } + + impl<'a> Drop for PipeReader<'a> { + fn drop(&mut self) { + if self.n_bytes != 0 { + self.p.clear(); + } + } + } + + impl<'a> std::io::Read for PipeReader<'a> { + fn read(&mut self, buf: &mut [u8]) -> std::io::Result { + if self.n_bytes == 0 { + return Ok(0); + } + let n = nix::unistd::read(self.p.r, buf)?; + self.n_bytes -= n; + self.bytes_consumed += n; + Ok(n) + } + } + + impl<'a> PipeReader<'a> { + pub fn new(p: &'a mut Pipe, n_bytes: usize) -> PipeReader<'a> { + Self { + p, + n_bytes, + bytes_consumed: 0, + } + } + + pub fn available_bytes(&self) -> usize { + self.n_bytes + } + + pub fn bytes_consumed(&self) -> usize { + self.bytes_consumed + } + + pub fn splice_to(&mut self, fd: RawFd, len: usize) -> std::io::Result { + let n = nix::fcntl::splice(self.p.r, None, fd, None, len, SpliceFFlags::empty())?; + self.n_bytes -= n; + self.bytes_consumed += n; + Ok(n) + } + + pub fn splice_to_at( + &mut self, + fd: RawFd, + len: usize, + off: &mut i64, + ) -> std::io::Result { + let n = nix::fcntl::splice(self.p.r, None, fd, Some(off), len, SpliceFFlags::empty())?; + self.n_bytes -= n; + self.bytes_consumed += n; + Ok(n) + } + } + + #[derive(Debug)] + pub struct PipeWriter<'a> { + p: &'a mut Pipe, + n_bufs: usize, + n_bytes: usize, + } + + impl<'a> PipeWriter<'a> { + pub fn new(p: &'a mut Pipe) -> Self { + Self { + p, + n_bufs: 0, + n_bytes: 0, + } + } + + pub fn len(&self) -> usize { + self.n_bytes + } + + pub fn is_empty(&self) -> bool { + self.len() == 0 + } + + fn splice_to(&mut self, fd: RawFd, len: usize) -> std::io::Result { + let len = nix::fcntl::splice( + self.p.rfd(), + None, + fd, + None, + len, + SpliceFFlags::SPLICE_F_MOVE, + )?; + self.n_bytes -= len; + Ok(len) + } + + pub fn splice_to_all(mut self, fd: RawFd) -> std::io::Result { + self.splice_to(fd, self.n_bytes) + } + + fn may_grow_pipe(&mut self, more_bufs: usize) -> std::io::Result<()> { + let expect_size = (self.n_bufs + more_bufs) * pagesize(); + if expect_size > self.p.size() { + self.p.set_size(expect_size + 2 * pagesize()) + } else { + Ok(()) + } + } + + pub fn splice_from( + &mut self, + fd: RawFd, + len: usize, + mut off: Option, + ) -> std::io::Result { + let expect_bufs = len / pagesize() + 1; + self.may_grow_pipe(expect_bufs)?; + let ori_off = off; + let n = nix::fcntl::splice( + fd, + off.as_mut(), + self.p.wfd(), + None, + len, + SpliceFFlags::SPLICE_F_MOVE, + )?; + if let Some(off) = ori_off { + self.n_bufs += nr_pages(off as usize, n); + } else { + self.n_bufs += n / pagesize() + 1; + } + self.n_bytes += n; + Ok(n) + } + + pub fn splice_from_pipe(&mut self, mut other: PipeWriter<'_>) -> std::io::Result { + self.may_grow_pipe(other.n_bufs)?; + let n = nix::fcntl::splice( + other.p.rfd(), + None, + self.p.wfd(), + None, + other.len(), + SpliceFFlags::SPLICE_F_MOVE, + )?; + self.n_bytes += n; + other.n_bytes -= n; + self.n_bufs += other.n_bufs; + Ok(n) + } + + pub fn splice_from_buf(&mut self, buf: &[u8]) -> std::io::Result { + let expect_bufs = buf_nr_pages(buf); + self.may_grow_pipe(expect_bufs)?; + let n = nix::fcntl::vmsplice( + self.p.wfd(), + &[IoSlice::new(buf)], + SpliceFFlags::SPLICE_F_MOVE, + )?; + self.n_bufs += buf_nr_pages(&buf[..n]); + self.n_bytes += n; + Ok(n) + } + + pub fn splice_from_iovec(&mut self, iovec: &[IoSlice]) -> std::io::Result { + let mut expect_bufs = 0; + let mut expect_size = 0; + for iv in iovec.iter() { + expect_size += iv.len(); + expect_bufs += buf_nr_pages(iv.as_ref()); + } + self.may_grow_pipe(expect_bufs)?; + let n = nix::fcntl::vmsplice(self.p.wfd(), iovec, SpliceFFlags::SPLICE_F_MOVE)?; + if n != expect_size { + let mut scan_len = 0; + let mut grow_bufs = 0; + for iv in iovec.iter() { + if n - scan_len > iv.len() { + grow_bufs += buf_nr_pages(iv.as_ref()); + } else { + grow_bufs += buf_nr_pages(&iv.as_ref()[..n - scan_len]); + break; + } + scan_len += iv.len(); + } + self.n_bufs += grow_bufs; + } else { + self.n_bufs += expect_bufs; + } + self.n_bytes += n; + Ok(n) + } + } + + impl<'a> Drop for PipeWriter<'a> { + fn drop(&mut self) { + if !self.p.is_invalid() && self.n_bytes != 0 { + self.p.clear(); + } + } + } + + #[cfg(test)] + mod tests { + use crate::transport::fusedev::splice::{Pipe, PipeReader, PipeWriter}; + use nix::fcntl::SpliceFFlags; + use std::io::{IoSlice, Read, Write}; + use std::os::unix::io::AsRawFd; + use std::path::Path; + use vmm_sys_util::tempfile::TempFile; + + #[test] + fn test_splice_reader() { + let p_res = Pipe::new(4096); + assert!(p_res.is_ok()); + let mut p = p_res.unwrap(); + let content = "hello world!"; + // prepare test file data + std::fs::write(Path::new("splice_testdata"), content).unwrap(); + let file = std::fs::File::open("splice_testdata").unwrap(); + let res = nix::fcntl::splice( + file.as_raw_fd(), + None, + p.wfd(), + None, + content.len(), + SpliceFFlags::empty(), + ); + assert!(res.is_ok()); + assert_eq!(content.len(), res.unwrap()); + let mut reader = PipeReader::new(&mut p, content.len()); + let mut buf = vec![0_u8; 1024]; + let res = reader.read(&mut buf); + assert!(res.is_ok()); + assert_eq!(content.len(), res.unwrap()); + assert_eq!(content.as_bytes(), &buf[..content.len()]); + assert_eq!(0, reader.available_bytes()); + assert_eq!(content.len(), reader.bytes_consumed()); + drop(reader); + + let mut new_file = std::fs::OpenOptions::new() + .write(true) + .truncate(true) + .create(true) + .open("splice_testdata_2") + .unwrap(); + let mut off = 0; + nix::fcntl::splice( + file.as_raw_fd(), + Some(&mut off), + p.wfd(), + None, + content.len(), + SpliceFFlags::empty(), + ) + .unwrap(); + let mut reader = PipeReader::new(&mut p, content.len()); + let res = reader.splice_to(new_file.as_raw_fd(), content.len()); + assert!(res.is_ok()); + assert_eq!(content.len(), res.unwrap()); + new_file.flush().unwrap(); + let data = std::fs::read("splice_testdata_2").unwrap(); + assert_eq!(content.as_bytes(), &data); + drop(reader); + + off = 0; + nix::fcntl::splice( + file.as_raw_fd(), + Some(&mut off), + p.wfd(), + None, + content.len(), + SpliceFFlags::empty(), + ) + .unwrap(); + let mut reader = PipeReader::new(&mut p, content.len()); + let mut off2 = 1; + let res = reader.splice_to_at(new_file.as_raw_fd(), content.len(), &mut off2); + assert!(res.is_ok()); + assert_eq!(content.len(), res.unwrap()); + new_file.flush().unwrap(); + let data = std::fs::read("splice_testdata_2").unwrap(); + assert_eq!("hhello world!".as_bytes(), &data); + + drop(file); + drop(new_file); + let _ = std::fs::remove_file("splice_testdata"); + let _ = std::fs::remove_file("splice_testdata_2"); + } + + #[test] + fn test_splice_writer() { + let mut from = TempFile::new().unwrap().into_file(); + let to = TempFile::new().unwrap().into_file(); + let buf = [0_u8; 64]; + from.write(&buf).unwrap(); + let mut pipe = Pipe::new(4096).unwrap(); + let mut pw = PipeWriter::new(&mut pipe); + assert_eq!(64, pw.splice_from_buf(&buf).unwrap()); + let buf2 = [0_u8; 16]; + assert_eq!( + 80, + pw.splice_from_iovec(&[IoSlice::new(&buf), IoSlice::new(&buf2)]) + .unwrap() + ); + assert_eq!(144, pw.n_bytes); + assert_eq!(32, pw.splice_from(from.as_raw_fd(), 32, Some(0)).unwrap()); + assert_eq!(32, pw.splice_from(from.as_raw_fd(), 48, Some(32)).unwrap()); + assert_eq!(208, pw.splice_to_all(to.as_raw_fd()).unwrap()); + assert!(!pipe.is_invalid()); + } + } +} + #[cfg(test)] mod tests { use super::*; @@ -645,6 +1276,8 @@ mod tests { } } +use crate::transport::fusedev::splice::{Pipe, PipeWriter}; +use crate::transport::ReaderInner; #[cfg(feature = "async_io")] pub use asyncio::FuseDevTask; diff --git a/src/transport/fusedev/mod.rs b/src/transport/fusedev/mod.rs index 2d1bd19ef..4ec2d1bd2 100644 --- a/src/transport/fusedev/mod.rs +++ b/src/transport/fusedev/mod.rs @@ -41,6 +41,8 @@ pub use fuse_t_session::*; pub const FUSE_KERN_BUF_PAGES: usize = 256; /// Maximum size of FUSE message header, 4K. pub const FUSE_HEADER_SIZE: usize = 0x1000; +use crate::transport::fusedev::splice::PipeWriter; +use crate::transport::ReaderInner; /// A buffer reference wrapper for fuse requests. #[derive(Debug)] @@ -66,12 +68,11 @@ impl<'a, S: BitmapSlice + Default> Reader<'a, S> { VolatileSlice::with_bitmap(buf.mem.as_mut_ptr(), buf.mem.len(), S::default()) }); - Ok(Reader { - buffers: IoBuffers { - buffers, - bytes_consumed: 0, - }, + Ok(ReaderInner::Buffer(IoBuffers { + buffers, + bytes_consumed: 0, }) + .into()) } } @@ -82,36 +83,77 @@ impl<'a, S: BitmapSlice + Default> Reader<'a, S> { /// 2. If the writer is split, a final commit() MUST be called to issue the /// device write operation. /// 3. Concurrency, caller should not write to the writer concurrently. -#[derive(Debug, PartialEq, Eq)] -pub struct FuseDevWriter<'a, S: BitmapSlice = ()> { +#[derive(Debug)] +pub struct FuseDevWriter<'a, 'b, S: BitmapSlice = ()> { fd: RawFd, buffered: bool, buf: ManuallyDrop>, + pw: Option<(PipeWriter<'b>, PipeWriter<'b>)>, bitmapslice: S, - phantom: PhantomData<&'a mut [S]>, + phantom: PhantomData<(&'a mut [S], &'b mut [S])>, } -impl<'a, S: BitmapSlice + Default> FuseDevWriter<'a, S> { +impl<'a, 'b, S: BitmapSlice + Default> FuseDevWriter<'a, 'b, S> { /// Construct a new [Writer]. - pub fn new(fd: RawFd, data_buf: &'a mut [u8]) -> Result> { + pub fn new(fd: RawFd, data_buf: &'a mut [u8]) -> Result> { + Self::with_pipe(fd, data_buf, None) + } +} + +/// macos doesn't support splice read/write +#[cfg(not(target_os = "linux"))] +pub(crate) mod splice { + #[derive(Debug)] + pub struct PipeWriter<'a>(&'a ()); + + impl<'a> PipeWriter<'a> { + pub fn len(&self) -> usize { + 0 + } + + pub fn is_empty(&self) -> bool { + self.len() == 0 + } + } +} + +#[cfg(not(target_os = "linux"))] +impl<'a, 'b, S: BitmapSlice> FuseDevWriter<'a, 'b, S> { + pub(crate) fn commit_by_splice( + &mut self, + _other: Option<&mut FuseDevWriter<'a, 'b, S>>, + ) -> io::Result { + panic!("can't run here"); + } +} + +#[cfg(not(target_os = "linux"))] +impl<'a, 'b, S: BitmapSlice + Default> FuseDevWriter<'a, 'b, S> { + /// Construct writer with pipe + pub fn with_pipe( + fd: RawFd, + data_buf: &'a mut [u8], + _p: Option<(PipeWriter<'b>, PipeWriter<'b>)>, + ) -> Result> { let buf = unsafe { Vec::from_raw_parts(data_buf.as_mut_ptr(), 0, data_buf.len()) }; Ok(FuseDevWriter { fd, buffered: false, buf: ManuallyDrop::new(buf), + pw: None, bitmapslice: S::default(), phantom: PhantomData, }) } } -impl<'a, S: BitmapSlice> FuseDevWriter<'a, S> { +impl<'a, 'b, S: BitmapSlice> FuseDevWriter<'a, 'b, S> { /// Split the [Writer] at the given offset. /// /// After the split, `self` will be able to write up to `offset` bytes while the returned /// `Writer` can write up to `available_bytes() - offset` bytes. Returns an error if /// `offset > self.available_bytes()`. - pub fn split_at(&mut self, offset: usize) -> Result> { + pub fn split_at(&mut self, offset: usize) -> Result> { if self.buf.capacity() < offset { return Err(Error::SplitOutOfBounds(offset)); } @@ -133,19 +175,18 @@ impl<'a, S: BitmapSlice> FuseDevWriter<'a, S> { fd: self.fd, buffered: true, buf, + pw: self.pw.take(), bitmapslice: self.bitmapslice.clone(), phantom: PhantomData, }) } - /// Compose the FUSE reply message and send the message to `/dev/fuse`. - pub fn commit(&mut self, other: Option<&Writer<'a, S>>) -> io::Result { - if !self.buffered { - return Ok(0); - } - + fn commit_by_write( + &mut self, + other: Option<&mut FuseDevWriter<'a, 'b, S>>, + ) -> io::Result { let o = match other { - Some(Writer::FuseDev(w)) => w.buf.as_slice(), + Some(w) => w.buf.as_slice(), _ => &[], }; let res = match (self.buf.len(), o.len()) { @@ -161,14 +202,42 @@ impl<'a, S: BitmapSlice> FuseDevWriter<'a, S> { res.map_err(|e| io::Error::from_raw_os_error(e as i32)) } + /// Compose the FUSE reply message and send the message to `/dev/fuse`. + pub fn commit(&mut self, other: Option<&mut Writer<'a, 'b, S>>) -> io::Result { + if !self.buffered { + return Ok(0); + } + let other_fw = match other { + Some(Writer::FuseDev(w)) => Some(w), + _ => None, + }; + let use_splice = match other_fw { + None => self.pw.is_some() && !self.pw.as_ref().unwrap().1.is_empty(), + Some(ref w) => w.pw.is_some() && !w.pw.as_ref().unwrap().1.is_empty(), + }; + if !use_splice { + self.commit_by_write(other_fw) + } else { + self.commit_by_splice(other_fw) + } + } + + fn fd_bufs_size(&self) -> usize { + if let Some((_, pw)) = self.pw.as_ref() { + pw.len() + } else { + 0 + } + } + /// Return number of bytes already written to the internal buffer. pub fn bytes_written(&self) -> usize { - self.buf.len() + self.buf.len() + self.fd_bufs_size() } /// Return number of bytes available for writing. pub fn available_bytes(&self) -> usize { - self.buf.capacity() - self.buf.len() + self.buf.capacity() - self.bytes_written() } fn account_written(&mut self, count: usize) { @@ -288,7 +357,7 @@ impl<'a, S: BitmapSlice> FuseDevWriter<'a, S> { } } -impl<'a, S: BitmapSlice> Write for FuseDevWriter<'a, S> { +impl<'a, 'b, S: BitmapSlice> io::Write for FuseDevWriter<'a, 'b, S> { fn write(&mut self, data: &[u8]) -> io::Result { self.check_available_space(data.len())?; @@ -345,7 +414,7 @@ mod async_io { use crate::file_buf::FileVolatileBuf; use crate::file_traits::AsyncFileReadWriteVolatile; - impl<'a, S: BitmapSlice> FuseDevWriter<'a, S> { + impl<'a, 'b, S: BitmapSlice> FuseDevWriter<'a, 'b, S> { /// Write data from a buffer into this writer in asynchronous mode. /// /// Return the number of bytes written to the writer. @@ -479,7 +548,10 @@ mod async_io { /// Commit all internal buffers of the writer and others. /// /// We need this because the lifetime of others is usually shorter than self. - pub async fn async_commit(&mut self, other: Option<&Writer<'a, S>>) -> io::Result { + pub async fn async_commit( + &mut self, + other: Option<&mut Writer<'a, 'b, S>>, + ) -> io::Result { let o = match other { Some(Writer::FuseDev(w)) => w.buf.as_slice(), _ => &[], @@ -519,6 +591,18 @@ mod tests { use std::os::unix::io::AsRawFd; use vmm_sys_util::tempfile::TempFile; + #[cfg(target_os = "linux")] + use crate::transport::fusedev::splice::{Pipe, PipeReader}; + #[cfg(target_os = "linux")] + use std::os::unix::fs::FileExt; + + #[cfg(target_os = "linux")] + fn prepare_pipe(buf: &[u8], buf_size: usize) -> (Pipe, usize) { + let pipe = Pipe::new(buf_size).unwrap(); + let len = nix::unistd::write(pipe.wfd(), buf).unwrap(); + (pipe, len) + } + #[test] fn reader_test_simple_chain() { let mut buf = [0u8; 106]; @@ -542,6 +626,32 @@ mod tests { assert_eq!(reader.available_bytes(), 0); assert_eq!(reader.bytes_read(), 106); + + #[cfg(target_os = "linux")] + { + let buf = [0u8; 106]; + let (mut pipe, len) = prepare_pipe(&buf, 128); + let mut reader = Reader::<()>::from_pipe_reader(PipeReader::new(&mut pipe, len)); + + assert_eq!(reader.available_bytes(), 106); + assert_eq!(reader.bytes_read(), 0); + + let mut buffer = [0 as u8; 64]; + if let Err(_) = reader.read_exact(&mut buffer) { + panic!("read_exact should not fail here"); + } + + assert_eq!(reader.available_bytes(), 42); + assert_eq!(reader.bytes_read(), 64); + + match reader.read(&mut buffer) { + Err(_) => panic!("read should not fail here"), + Ok(length) => assert_eq!(length, 42), + } + + assert_eq!(reader.available_bytes(), 0); + assert_eq!(reader.bytes_read(), 106); + } } #[test] @@ -617,6 +727,21 @@ mod tests { .kind(), io::ErrorKind::UnexpectedEof ); + + #[cfg(target_os = "linux")] + { + let buf = [0u8; 106]; + let (mut pipe, len) = prepare_pipe(&buf, 128); + let mut reader = Reader::<()>::from_pipe_reader(PipeReader::new(&mut pipe, len)); + let mut buf2 = vec![0_u8; 128]; + assert_eq!( + reader + .read_exact(&mut buf2[..]) + .expect_err("read more bytes than available") + .kind(), + io::ErrorKind::UnexpectedEof + ); + } } #[test] @@ -627,6 +752,14 @@ mod tests { assert_eq!(reader.available_bytes(), 32); assert_eq!(other.available_bytes(), 96); + + #[cfg(target_os = "linux")] + { + let buf = [0u8; 128]; + let (mut pipe, len) = prepare_pipe(&buf, 128); + let mut reader = Reader::<()>::from_pipe_reader(PipeReader::new(&mut pipe, len)); + assert!(reader.split_at(32).is_err()); + } } #[test] @@ -634,11 +767,66 @@ mod tests { let mut buf = [0u8; 128]; let mut reader = Reader::<()>::from_fuse_buffer(FuseBuf::new(&mut buf)).unwrap(); - if let Ok(_) = reader.split_at(256) { + if reader.split_at(256).is_ok() { panic!("successfully split Reader with out of bounds offset"); } } + #[cfg(target_os = "linux")] + #[test] + fn writer_append_fd_buf() { + let mut buf = [1_u8; 64]; + let mut data_file = TempFile::new().unwrap().into_file(); + data_file.write(&buf[..32]).unwrap(); + + let dst_file = TempFile::new().unwrap().into_file(); + let mut pipe1 = Pipe::new(4096).unwrap(); + let pwriter1 = PipeWriter::new(&mut pipe1); + let mut pipe2 = Pipe::new(4096).unwrap(); + let pwriter2 = PipeWriter::new(&mut pipe2); + let mut writer = FuseDevWriter::<()>::with_pipe( + dst_file.as_raw_fd(), + &mut buf, + Some((pwriter1, pwriter2)), + ) + .unwrap(); + writer.buffered = true; + let buf2 = [0_u8; 16]; + assert_eq!( + 32, + writer + .append_fd_buf(data_file.as_raw_fd(), 32, Some(0)) + .unwrap() + ); + assert_eq!(32, writer.bytes_written()); + assert_eq!(32, writer.available_bytes()); + assert!(writer.write(&buf2).is_ok()); + assert_eq!(48, writer.bytes_written()); + assert_eq!(16, writer.available_bytes()); + let mut w2 = writer.split_at(8).unwrap(); + assert!(writer + .append_fd_buf(data_file.as_raw_fd(), 32, Some(0)) + .is_err()); + assert_eq!( + 16, + w2.append_fd_buf(data_file.as_raw_fd(), 16, Some(16)) + .unwrap() + ); + assert!(w2.write(&buf2).is_err()); + let mut w2_final = Writer::FuseDev(w2); + let res = writer.commit(Some(&mut w2_final)); + assert!(res.is_ok()); + assert_eq!(64, res.unwrap()); + let mut all_data = vec![0_u8; 64]; + assert_eq!(64, dst_file.read_at(&mut all_data, 0).unwrap()); + assert_eq!(&buf2, &all_data[..16]); + assert_eq!(&[1_u8; 48], &all_data[16..]); + drop(w2_final); + drop(writer); + assert!(!pipe1.is_invalid()); + assert!(!pipe2.is_invalid()); + } + #[test] fn writer_simple_commit_header() { let file = TempFile::new().unwrap().into_file(); @@ -727,7 +915,7 @@ mod tests { 64 ); - writer.commit(Some(&other.into())).unwrap(); + writer.commit(Some(&mut other.into())).unwrap(); } #[test] @@ -740,6 +928,19 @@ mod tests { reader.read(&mut buf[..]).expect("failed to read to buffer"), 48 ); + + #[cfg(target_os = "linux")] + { + let buf2 = [0u8; 48]; + let (mut pipe, len) = prepare_pipe(&buf2, 128); + let mut reader = Reader::<()>::from_pipe_reader(PipeReader::new(&mut pipe, len)); + let mut buf = vec![0u8; 64]; + + assert_eq!( + reader.read(&mut buf[..]).expect("failed to read to buffer"), + 48 + ); + } } #[test] @@ -788,6 +989,21 @@ mod tests { assert_eq!(reader.available_bytes(), 1); assert_eq!(reader.bytes_read(), 8); assert!(reader.read_obj::().is_err()); + + #[cfg(target_os = "linux")] + { + let mut buf2 = [0u8; 9]; + buf2[0] = 1; + let (mut pipe, len) = prepare_pipe(&buf2, 128); + let mut reader = Reader::<()>::from_pipe_reader(PipeReader::new(&mut pipe, len)); + assert_eq!( + 1_u64, + reader.read_obj::().expect("failed to read to file") + ); + assert_eq!(reader.available_bytes(), 1); + assert_eq!(reader.bytes_read(), 8); + assert!(reader.read_obj::().is_err()); + } } #[test] @@ -802,6 +1018,19 @@ mod tests { assert_eq!(reader.available_bytes(), 1); assert_eq!(reader.bytes_read(), 47); + + #[cfg(target_os = "linux")] + { + let buf2 = [0u8; 48]; + let (mut pipe, len) = prepare_pipe(&buf2, 128); + let mut reader = Reader::<()>::from_pipe_reader(PipeReader::new(&mut pipe, len)); + reader + .read_exact_to(&mut file, 47) + .expect("failed to read to file"); + + assert_eq!(reader.available_bytes(), 1); + assert_eq!(reader.bytes_read(), 47); + } } #[test] @@ -818,6 +1047,69 @@ mod tests { ); assert_eq!(reader.available_bytes(), 0); assert_eq!(reader.bytes_read(), 48); + + #[cfg(target_os = "linux")] + { + let buf2 = [0u8; 48]; + let (mut pipe, len) = prepare_pipe(&buf2, 128); + let mut reader = Reader::<()>::from_pipe_reader(PipeReader::new(&mut pipe, len)); + let mut file = TempFile::new().unwrap().into_file(); + + assert_eq!( + reader + .read_to_at(&mut file, 48, 16) + .expect("failed to read to file"), + 48 + ); + assert_eq!(reader.available_bytes(), 0); + assert_eq!(reader.bytes_read(), 48); + } + } + + #[cfg(target_os = "linux")] + #[test] + fn read_to_fd() { + let mut buf2 = [0u8; 48]; + let mut reader = Reader::<()>::from_fuse_buffer(FuseBuf::new(&mut buf2)).unwrap(); + let mut file = TempFile::new().unwrap().into_file(); + assert_eq!( + reader + .splice_to(&mut file, 16) + .expect("failed to read to file"), + 16 + ); + assert_eq!(reader.available_bytes(), 32); + assert_eq!(reader.bytes_read(), 16); + assert_eq!( + reader + .splice_to_at(&mut file, 44, 20) + .expect("failed to read to file"), + 32 + ); + assert_eq!(reader.available_bytes(), 0); + assert_eq!(reader.bytes_read(), 48); + drop(reader); + drop(file); + + let (mut pipe, len) = prepare_pipe(&buf2, 128); + let mut reader = Reader::<()>::from_pipe_reader(PipeReader::new(&mut pipe, len)); + let mut file = TempFile::new().unwrap().into_file(); + assert_eq!( + reader + .splice_to(&mut file, 16) + .expect("failed to read to file"), + 16 + ); + assert_eq!(reader.available_bytes(), 32); + assert_eq!(reader.bytes_read(), 16); + assert_eq!( + reader + .splice_to_at(&mut file, 44, 20) + .expect("failed to read to file"), + 32 + ); + assert_eq!(reader.available_bytes(), 0); + assert_eq!(reader.bytes_read(), 48); } #[test] @@ -1056,8 +1348,9 @@ mod tests { 64 ); - let res = - async_runtime::block_on(async { writer.async_commit(Some(&other.into())).await }); + let res = async_runtime::block_on(async { + writer.async_commit(Some(&mut other.into())).await + }); let _ = res.unwrap(); } } diff --git a/src/transport/mod.rs b/src/transport/mod.rs index 31490ef4f..895025c70 100644 --- a/src/transport/mod.rs +++ b/src/transport/mod.rs @@ -19,6 +19,8 @@ use std::collections::VecDeque; use std::io::{self, IoSlice, Read}; use std::marker::PhantomData; use std::mem::{size_of, MaybeUninit}; +#[cfg(all(target_os = "linux", feature = "fusedev"))] +use std::os::unix::io::{AsRawFd, FromRawFd}; use std::ptr::copy_nonoverlapping; use std::{cmp, fmt}; @@ -32,6 +34,8 @@ use crate::file_buf::FileVolatileSlice; #[cfg(feature = "async-io")] use crate::file_traits::AsyncFileReadWriteVolatile; use crate::file_traits::FileReadWriteVolatile; +#[cfg(all(target_os = "linux", feature = "fusedev"))] +use crate::transport::fusedev::splice; use crate::BitmapSlice; mod fs_cache_req_handler; @@ -74,6 +78,12 @@ pub enum Error { ConvertIndirectDescriptor(virtio_queue::Error), } +impl From for Error { + fn from(e: io::Error) -> Self { + Self::IoError(e) + } +} + impl fmt::Display for Error { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { use self::Error::*; @@ -354,22 +364,31 @@ impl IoBuffers<'_, S> { } } -/// Reader to access FUSE requests from the transport layer data buffers. +enum ReaderInner<'a, S = ()> { + Buffer(IoBuffers<'a, S>), + #[cfg(all(target_os = "linux", feature = "fusedev"))] + Pipe(splice::PipeReader<'a>), +} + +/// Reader to access FUSE requests. /// /// Note that virtio spec requires driver to place any device-writable /// descriptors after any device-readable descriptors (2.6.4.2 in Virtio Spec v1.1). /// Reader will skip iterating over descriptor chain when first writable /// descriptor is encountered. -#[derive(Clone)] pub struct Reader<'a, S = ()> { - buffers: IoBuffers<'a, S>, + inner: ReaderInner<'a, S>, } impl Default for Reader<'_, S> { fn default() -> Self { - Reader { - buffers: IoBuffers::default(), - } + ReaderInner::Buffer(IoBuffers::default()).into() + } +} + +impl<'a, S: BitmapSlice> From> for Reader<'a, S> { + fn from(r: ReaderInner<'a, S>) -> Reader<'a, S> { + Self { inner: r } } } @@ -400,8 +419,33 @@ impl Reader<'_, S> { mut dst: F, count: usize, ) -> io::Result { - self.buffers - .consume_for_read(count, |bufs| dst.write_vectored_volatile(bufs)) + match &mut self.inner { + ReaderInner::Buffer(buffers) => { + buffers.consume_for_read(count, |bufs| dst.write_vectored_volatile(bufs)) + } + #[cfg(all(target_os = "linux", feature = "fusedev"))] + ReaderInner::Pipe(p) => { + let mut buf = vec![0_u8; count]; + let n = p.read(&mut buf)?; + let slice = unsafe { FileVolatileSlice::from_mut_slice(&mut buf[..n]) }; + dst.write_volatile(slice) + } + } + } + + /// Reads data into a file descriptor using syscall splice. + /// Returns the number of bytes read. + #[cfg(all(target_os = "linux", feature = "fusedev"))] + pub fn splice_to(&mut self, f: &F, count: usize) -> io::Result { + let fd = f.as_raw_fd(); + if let ReaderInner::Pipe(p) = &mut self.inner { + p.splice_to(fd, count) + } else { + let mut file = unsafe { std::fs::File::from_raw_fd(f.as_raw_fd()) }; + let res = self.read_to(&mut file, count); + std::mem::forget(file); + res + } } /// Reads data from the descriptor chain buffer into a File at offset `off`. @@ -414,8 +458,39 @@ impl Reader<'_, S> { count: usize, off: u64, ) -> io::Result { - self.buffers - .consume_for_read(count, |bufs| dst.write_vectored_at_volatile(bufs, off)) + match &mut self.inner { + ReaderInner::Buffer(buffers) => { + buffers.consume_for_read(count, |bufs| dst.write_vectored_at_volatile(bufs, off)) + } + #[cfg(all(target_os = "linux", feature = "fusedev"))] + ReaderInner::Pipe(p) => { + let mut buf = vec![0_u8; count]; + let n = p.read(&mut buf)?; + let slice = unsafe { FileVolatileSlice::from_mut_slice(&mut buf[..n]) }; + dst.write_at_volatile(slice, off) + } + } + } + + /// Reads data into a file descriptor using syscall splice at offset `off`. + /// Returns the number of bytes read. + #[cfg(all(target_os = "linux", feature = "fusedev"))] + pub fn splice_to_at( + &mut self, + f: &F, + count: usize, + off: u64, + ) -> io::Result { + let fd = f.as_raw_fd(); + if let ReaderInner::Pipe(p) = &mut self.inner { + let mut _off = off as i64; + p.splice_to_at(fd, count, &mut _off) + } else { + let mut file = unsafe { std::fs::File::from_raw_fd(f.as_raw_fd()) }; + let res = self.read_to_at(&mut file, count, off); + std::mem::forget(file); + res + } } /// Reads exactly size of data from the descriptor chain buffer into a file descriptor. @@ -446,12 +521,20 @@ impl Reader<'_, S> { /// May return an error if the combined lengths of all the buffers in the DescriptorChain /// would cause an integer overflow. pub fn available_bytes(&self) -> usize { - self.buffers.available_bytes() + match &self.inner { + ReaderInner::Buffer(buffers) => buffers.available_bytes(), + #[cfg(all(target_os = "linux", feature = "fusedev"))] + ReaderInner::Pipe(p) => p.available_bytes(), + } } /// Returns number of bytes already read from the descriptor chain buffer. pub fn bytes_read(&self) -> usize { - self.buffers.bytes_consumed() + match &self.inner { + ReaderInner::Buffer(buffers) => buffers.bytes_consumed(), + #[cfg(all(target_os = "linux", feature = "fusedev"))] + ReaderInner::Pipe(p) => p.bytes_consumed(), + } } /// Splits this `Reader` into two at the given offset in the `DescriptorChain` buffer. @@ -459,29 +542,43 @@ impl Reader<'_, S> { /// `Reader` can read up to `available_bytes() - offset` bytes. Returns an error if /// `offset > self.available_bytes()`. pub fn split_at(&mut self, offset: usize) -> Result { - self.buffers - .split_at(offset) - .map(|buffers| Reader { buffers }) + match &mut self.inner { + ReaderInner::Buffer(buffers) => buffers + .split_at(offset) + .map(|buffers| ReaderInner::Buffer(buffers).into()), + // pipe mode doesn't support split + #[cfg(all(target_os = "linux", feature = "fusedev"))] + ReaderInner::Pipe(_) => Err(io::Error::from_raw_os_error(libc::EINVAL).into()), + } } } impl io::Read for Reader<'_, S> { fn read(&mut self, buf: &mut [u8]) -> io::Result { - self.buffers.consume_for_read(buf.len(), |bufs| { - let mut rem = buf; - let mut total = 0; - for buf in bufs { - let copy_len = cmp::min(rem.len(), buf.len()); - - // Safe because we have already verified that `buf` points to valid memory. - unsafe { - copy_nonoverlapping(buf.as_ptr() as *const u8, rem.as_mut_ptr(), copy_len); - } - rem = &mut rem[copy_len..]; - total += copy_len; + match &mut self.inner { + #[cfg(all(target_os = "linux", feature = "fusedev"))] + ReaderInner::Pipe(p) => p.read(buf), + ReaderInner::Buffer(buffers) => { + buffers.consume_for_read(buf.len(), |bufs| { + let mut rem = buf; + let mut total = 0; + for buf in bufs { + let copy_len = cmp::min(rem.len(), buf.len()); + // Safe because we have already verified that `buf` points to valid memory. + unsafe { + copy_nonoverlapping( + buf.as_ptr() as *const u8, + rem.as_mut_ptr(), + copy_len, + ); + } + rem = &mut rem[copy_len..]; + total += copy_len; + } + Ok(total) + }) } - Ok(total) - }) + } } } @@ -501,36 +598,42 @@ mod async_io { off: u64, ) -> io::Result { // Safe because `bufs` doesn't out-live `self`. - let bufs = unsafe { self.buffers.prepare_io_buf(count) }; - if bufs.is_empty() { - Ok(0) - } else { - let (res, _) = dst.async_write_vectored_at_volatile(bufs, off).await; - match res { - Ok(cnt) => { - self.buffers.mark_used(cnt)?; - Ok(cnt) + match &mut self.inner { + ReaderInner::Buffer(buffers) => { + let bufs = unsafe { buffers.prepare_io_buf(count) }; + if bufs.is_empty() { + Ok(0) + } else { + let (res, _) = dst.async_write_vectored_at_volatile(bufs, off).await; + match res { + Ok(cnt) => { + buffers.mark_used(cnt)?; + Ok(cnt) + } + Err(e) => Err(e), + } } - Err(e) => Err(e), } + #[cfg(all(target_os = "linux", feature = "fusedev"))] + ReaderInner::Pipe(_) => Err(std::io::Error::from_raw_os_error(libc::ENOTSUP)), } } } } /// Writer to send reply message to '/dev/fuse` or virtiofs queue. -pub enum Writer<'a, S: BitmapSlice = ()> { +pub enum Writer<'a, 'b, S: BitmapSlice = ()> { #[cfg(feature = "fusedev")] /// Writer for FuseDev transport driver. - FuseDev(FuseDevWriter<'a, S>), + FuseDev(FuseDevWriter<'a, 'b, S>), #[cfg(feature = "virtiofs")] /// Writer for virtiofs transport driver. VirtioFs(VirtioFsWriter<'a, S>), /// Writer for Noop transport driver. - Noop(PhantomData<&'a S>), + Noop(PhantomData<(&'a S, &'b S)>), } -impl<'a, S: BitmapSlice> Writer<'a, S> { +impl<'a, 'b, S: BitmapSlice> Writer<'a, 'b, S> { /// Write data to the descriptor chain buffer from a File at offset `off`. /// /// Return the number of bytes written to the descriptor chain buffer. @@ -549,6 +652,23 @@ impl<'a, S: BitmapSlice> Writer<'a, S> { } } + /// Append `count` bytes data from `src` at offset `off` + /// `src` should be file description or socket + #[cfg(all(target_os = "linux", feature = "fusedev"))] + pub fn append_fd_buf( + &mut self, + src: &F, + count: usize, + off: Option, + ) -> io::Result { + match self { + Writer::FuseDev(w) => { + w.append_fd_buf(src.as_raw_fd(), count as u64, off.map(|v| v as i64)) + } + _ => Err(std::io::Error::from_raw_os_error(libc::ENOSYS)), + } + } + /// Split this `Writer` into two at the given offset in the `DescriptorChain` buffer. /// /// After the split, `self` will be able to write up to `offset` bytes while the returned @@ -590,7 +710,7 @@ impl<'a, S: BitmapSlice> Writer<'a, S> { } /// Commit all internal buffers of self and others - pub fn commit(&mut self, other: Option<&Self>) -> io::Result { + pub fn commit(&mut self, other: Option<&mut Self>) -> io::Result { match self { #[cfg(feature = "fusedev")] Writer::FuseDev(w) => w.commit(other), @@ -601,7 +721,7 @@ impl<'a, S: BitmapSlice> Writer<'a, S> { } } -impl<'a, S: BitmapSlice> io::Write for Writer<'a, S> { +impl<'a, 'b, S: BitmapSlice> io::Write for Writer<'a, 'b, S> { fn write(&mut self, buf: &[u8]) -> io::Result { match self { #[cfg(feature = "fusedev")] @@ -634,7 +754,7 @@ impl<'a, S: BitmapSlice> io::Write for Writer<'a, S> { } #[cfg(feature = "async-io")] -impl<'a, S: BitmapSlice> Writer<'a, S> { +impl<'a, 'b, S: BitmapSlice> Writer<'a, 'b, S> { /// Write data from a buffer into this writer in asynchronous mode. pub async fn async_write(&mut self, data: &[u8]) -> io::Result { match self { @@ -703,7 +823,10 @@ impl<'a, S: BitmapSlice> Writer<'a, S> { } /// Commit all internal buffers of self and others - pub async fn async_commit(&mut self, other: Option<&Writer<'a, S>>) -> io::Result { + pub async fn async_commit( + &mut self, + other: Option<&mut Writer<'a, 'b, S>>, + ) -> io::Result { match self { #[cfg(feature = "fusedev")] Writer::FuseDev(w) => w.async_commit(other).await, @@ -715,14 +838,14 @@ impl<'a, S: BitmapSlice> Writer<'a, S> { } #[cfg(feature = "fusedev")] -impl<'a, S: BitmapSlice> From> for Writer<'a, S> { - fn from(w: FuseDevWriter<'a, S>) -> Self { +impl<'a, 'b, S: BitmapSlice> From> for Writer<'a, 'b, S> { + fn from(w: FuseDevWriter<'a, 'b, S>) -> Self { Writer::FuseDev(w) } } #[cfg(feature = "virtiofs")] -impl<'a, S: BitmapSlice> From> for Writer<'a, S> { +impl<'a, 'b, S: BitmapSlice> From> for Writer<'a, 'b, S> { fn from(w: VirtioFsWriter<'a, S>) -> Self { Writer::VirtioFs(w) } diff --git a/src/transport/virtiofs/mod.rs b/src/transport/virtiofs/mod.rs index 141490258..f7bd126d8 100644 --- a/src/transport/virtiofs/mod.rs +++ b/src/transport/virtiofs/mod.rs @@ -44,6 +44,7 @@ use std::io::{self, IoSlice, Write}; use std::ops::Deref; use std::ptr::copy_nonoverlapping; +use crate::transport::ReaderInner; use virtio_queue::DescriptorChain; use vm_memory::bitmap::{BitmapSlice, MS}; use vm_memory::{Address, ByteValued, GuestMemory, GuestMemoryRegion, MemoryRegionAddress}; @@ -97,12 +98,11 @@ impl<'a> Reader<'a> { ); } - Ok(Reader { - buffers: IoBuffers { - buffers, - bytes_consumed: 0, - }, + Ok(ReaderInner::Buffer(IoBuffers { + buffers, + bytes_consumed: 0, }) + .into()) } } @@ -247,7 +247,7 @@ impl<'a, S: BitmapSlice> VirtioFsWriter<'a, S> { /// Commit all internal buffers of self and others /// /// This is provided just to be compatible with fusedev - pub fn commit(&mut self, _other: Option<&Writer<'a, S>>) -> io::Result { + pub fn commit(&mut self, _other: Option<&mut Writer<'a, '_, S>>) -> io::Result { Ok(0) } @@ -380,7 +380,10 @@ mod async_io { /// Commit all internal buffers of self and others /// We need this because the lifetime of others is usually shorter than self. - pub async fn async_commit(&mut self, other: Option<&Writer<'a, S>>) -> io::Result { + pub async fn async_commit( + &mut self, + other: Option<&mut Writer<'a, '_, S>>, + ) -> io::Result { self.commit(other) } }