Skip to content

Commit

Permalink
writing resources by request body chunks
Browse files Browse the repository at this point in the history
  • Loading branch information
JieningYu committed Jan 12, 2024
1 parent 600f7d3 commit c95115a
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 10 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@ rand = "0.8"
siphasher = "1.0"
highway = "1.1"
async-trait = "0.1"
http-body-util = "0.1"

[dev-dependencies]
tower = "0.4"
serde_json = "1.0"
mime = "0.3"
http-body-util = "0.1"
1 change: 1 addition & 0 deletions src/account.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ pub mod verify;

/// A permission group of an account.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Hash)]
#[non_exhaustive]
pub enum Permission {
/// Post postings.
///
Expand Down
42 changes: 36 additions & 6 deletions src/handle/resource.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,20 +109,50 @@ pub async fn upload<Io: IoHandle>(
config,
..
}): State<Global<Io>>,
payload: axum::body::Bytes,
payload: axum::body::Body,
) -> Result<Json<UploadRes>, Error> {
let select = sd!(worlds.account, auth.account);
va!(auth, select => UploadResource);

let mut hasher = highway::PortableHash::default();
let buf_path = config.resource_path.join(
resource_sessions
.lock()
.await
.buf_name(id)
.ok_or(Error::ResourceUploadSessionNotFound(id))?,
);
let mut file = tokio::fs::File::create(&buf_path)
.await
.map_err(|_| Error::ResourceSaveFailed)?;
let mut stream = http_body_util::BodyStream::new(payload);
while let Some(chunk) = stream
.try_next()
.await
.map_err(|_| Error::ResourceSaveFailed)?
{
let chunk = chunk.into_data().map_err(|_| Error::ResourceSaveFailed)?;
highway::HighwayHash::append(&mut hasher, &chunk);
tokio::io::AsyncWriteExt::write_all(&mut file, &chunk)
.await
.map_err(|_| Error::ResourceSaveFailed)?;
}
tokio::io::AsyncWriteExt::flush(&mut file)
.await
.map_err(|_| Error::ResourceSaveFailed)?;
file.sync_data()
.await
.map_err(|_| Error::ResourceSaveFailed)?;

let resource = resource_sessions
.lock()
.await
.accept(id, &payload, auth.account)?;
.accept(id, hasher, auth.account)?;
let id = resource.id();
let path = config.resource_path.join(resource.file_name());
if let Err(err) = tokio::fs::write(path, payload).await {
tracing::error!("failed to write resource file {resource:?}: {err}");
return Err(Error::PermissionDenied);
}
tokio::fs::rename(buf_path, path)
.await
.map_err(|_| Error::ResourceSaveFailed)?;

worlds
.resource
Expand Down
25 changes: 22 additions & 3 deletions src/resource.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,13 @@ impl Resource {
pub fn file_name(&self) -> String {
format!("{}{}", Self::FILE_PREFIX, self.id)
}

const BUF_PREFIX: &'static str = "buf_";

/// Buffer file name of this resource.
pub fn buf_name(&self) -> String {
format!("{}{}", Self::BUF_PREFIX, self.id)
}
}

impl dmds::Data for Resource {
Expand Down Expand Up @@ -198,7 +205,12 @@ impl UploadSessions {
///
/// **Id of the resource will be changed**, so you have to
/// tell the new id to the frontend.
pub fn accept(&mut self, id: u64, data: &[u8], user: u64) -> Result<Resource, Error> {
pub fn accept<H: Hasher>(
&mut self,
id: u64,
mut hasher: H,
user: u64,
) -> Result<Resource, Error> {
self.cleanup();
let res = &self
.inner
Expand All @@ -210,10 +222,17 @@ impl UploadSessions {
}

let mut res = self.inner.remove(&id).unwrap().resource;
res.id =
highway::HighwayHash::hash64(highway::PortableHash::new(highway::Key::default()), data);
SystemTime::now().hash(&mut hasher);
user.hash(&mut hasher);
res.id = hasher.finish();
Ok(res)
}

/// Gets filesystem buffer name of a resource session.
#[inline]
pub fn buf_name(&self, id: u64) -> Option<String> {
self.inner.get(&id).map(|s| s.resource.buf_name())
}
}

/// Type of a [`Resource`].
Expand Down

0 comments on commit c95115a

Please sign in to comment.