diff --git a/tokio-fs/Cargo.toml b/tokio-fs/Cargo.toml index d4b14ec..a6d7cb4 100644 --- a/tokio-fs/Cargo.toml +++ b/tokio-fs/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "dmds-tokio-fs" -version = "0.1.0" +version = "0.2.0" edition = "2021" authors = ["JieningYu "] description = "Dmds I/O handler interacts with the filesystem using Tokio" @@ -15,7 +15,7 @@ maintenance = { status = "actively-developed" } # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -dmds = "0.1" +dmds = "0.2" async-trait = "0.1" tokio = { version = "1.34", features = ["fs", "io-util", "time", "rt"] } futures = { version = "0.3", features = ["executor"] } diff --git a/tokio-fs/src/lib.rs b/tokio-fs/src/lib.rs index e610d1b..8b7e955 100644 --- a/tokio-fs/src/lib.rs +++ b/tokio-fs/src/lib.rs @@ -6,10 +6,13 @@ use std::{ }; use async_trait::async_trait; -use bytes::BytesMut; +use bytes::{BufMut, BytesMut}; use dashmap::DashSet; use dmds::IoHandle; -use tokio::{fs::File, io::BufReader}; +use tokio::{ + fs::File, + io::{AsyncReadExt, BufReader}, +}; #[cfg(test)] mod tests; @@ -38,7 +41,7 @@ impl IoHandle for FsHandle { async fn read_chunk( &self, pos: [usize; DIMS], - ) -> std::io::Result> { + ) -> std::io::Result<(u32, Self::Read<'_>)> { let path = self.path(&pos); let result = File::open(&path).await; @@ -50,11 +53,17 @@ impl IoHandle for FsHandle { self.invalid_chunks.insert(Box::new(pos)); } } - - Ok(FsReader { - _handle: self, - file: BufReader::new(result?), - }) + let mut file = BufReader::new(result?); + let mut buf = [0_u8; 4]; + file.read_exact(&mut &mut buf[..]).await?; + + Ok(( + u32::from_be_bytes(buf), + FsReader { + _handle: self, + file, + }, + )) } } @@ -76,6 +85,7 @@ impl FsHandle { chunk: &dmds::Chunk, ) -> std::io::Result<()> { let mut buf = BytesMut::new(); + buf.put_u32(T::VERSION); chunk.write_buf(&mut buf).await?; let buf = buf.freeze();