From c95115aa9eb3efd5b81b194257152931115f51c3 Mon Sep 17 00:00:00 2001 From: Jiening Yu Date: Fri, 12 Jan 2024 23:53:46 +0800 Subject: [PATCH] writing resources by request body chunks --- Cargo.toml | 2 +- src/account.rs | 1 + src/handle/resource.rs | 42 ++++++++++++++++++++++++++++++++++++------ src/resource.rs | 25 ++++++++++++++++++++++--- 4 files changed, 60 insertions(+), 10 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 7898b1a..a55ca21 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -33,9 +33,9 @@ rand = "0.8" siphasher = "1.0" highway = "1.1" async-trait = "0.1" +http-body-util = "0.1" [dev-dependencies] tower = "0.4" serde_json = "1.0" mime = "0.3" -http-body-util = "0.1" diff --git a/src/account.rs b/src/account.rs index 0af8187..5b1d939 100644 --- a/src/account.rs +++ b/src/account.rs @@ -15,6 +15,7 @@ pub mod verify; /// A permission group of an account. #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Hash)] +#[non_exhaustive] pub enum Permission { /// Post postings. /// diff --git a/src/handle/resource.rs b/src/handle/resource.rs index 9907467..7d797c5 100644 --- a/src/handle/resource.rs +++ b/src/handle/resource.rs @@ -109,20 +109,50 @@ pub async fn upload( config, .. }): State>, - payload: axum::body::Bytes, + payload: axum::body::Body, ) -> Result, Error> { let select = sd!(worlds.account, auth.account); va!(auth, select => UploadResource); + + let mut hasher = highway::PortableHash::default(); + let buf_path = config.resource_path.join( + resource_sessions + .lock() + .await + .buf_name(id) + .ok_or(Error::ResourceUploadSessionNotFound(id))?, + ); + let mut file = tokio::fs::File::create(&buf_path) + .await + .map_err(|_| Error::ResourceSaveFailed)?; + let mut stream = http_body_util::BodyStream::new(payload); + while let Some(chunk) = stream + .try_next() + .await + .map_err(|_| Error::ResourceSaveFailed)? + { + let chunk = chunk.into_data().map_err(|_| Error::ResourceSaveFailed)?; + highway::HighwayHash::append(&mut hasher, &chunk); + tokio::io::AsyncWriteExt::write_all(&mut file, &chunk) + .await + .map_err(|_| Error::ResourceSaveFailed)?; + } + tokio::io::AsyncWriteExt::flush(&mut file) + .await + .map_err(|_| Error::ResourceSaveFailed)?; + file.sync_data() + .await + .map_err(|_| Error::ResourceSaveFailed)?; + let resource = resource_sessions .lock() .await - .accept(id, &payload, auth.account)?; + .accept(id, hasher, auth.account)?; let id = resource.id(); let path = config.resource_path.join(resource.file_name()); - if let Err(err) = tokio::fs::write(path, payload).await { - tracing::error!("failed to write resource file {resource:?}: {err}"); - return Err(Error::PermissionDenied); - } + tokio::fs::rename(buf_path, path) + .await + .map_err(|_| Error::ResourceSaveFailed)?; worlds .resource diff --git a/src/resource.rs b/src/resource.rs index 2f3d2d4..8f5b79e 100644 --- a/src/resource.rs +++ b/src/resource.rs @@ -93,6 +93,13 @@ impl Resource { pub fn file_name(&self) -> String { format!("{}{}", Self::FILE_PREFIX, self.id) } + + const BUF_PREFIX: &'static str = "buf_"; + + /// Buffer file name of this resource. + pub fn buf_name(&self) -> String { + format!("{}{}", Self::BUF_PREFIX, self.id) + } } impl dmds::Data for Resource { @@ -198,7 +205,12 @@ impl UploadSessions { /// /// **Id of the resource will be changed**, so you have to /// tell the new id to the frontend. - pub fn accept(&mut self, id: u64, data: &[u8], user: u64) -> Result { + pub fn accept( + &mut self, + id: u64, + mut hasher: H, + user: u64, + ) -> Result { self.cleanup(); let res = &self .inner @@ -210,10 +222,17 @@ impl UploadSessions { } let mut res = self.inner.remove(&id).unwrap().resource; - res.id = - highway::HighwayHash::hash64(highway::PortableHash::new(highway::Key::default()), data); + SystemTime::now().hash(&mut hasher); + user.hash(&mut hasher); + res.id = hasher.finish(); Ok(res) } + + /// Gets filesystem buffer name of a resource session. + #[inline] + pub fn buf_name(&self, id: u64) -> Option { + self.inner.get(&id).map(|s| s.resource.buf_name()) + } } /// Type of a [`Resource`].