Skip to content

Commit

Permalink
Add basic implementation of blob upload (#1486)
Browse files Browse the repository at this point in the history
* Add basic implementation of blob upload

* Add streaming, minimize params

* Use random name for blob key

* Use a trait to define a blob store

* cleanup

* Adress comments
  • Loading branch information
V-FEXrt authored Dec 12, 2023
1 parent 319b681 commit d3217d9
Show file tree
Hide file tree
Showing 6 changed files with 230 additions and 41 deletions.
93 changes: 75 additions & 18 deletions rust/rsc/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion rust/rsc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@ entity = { path = "../entity" }
migration = { path = "../migration" }
sea-orm = { version = "0.12.6", features = ["sqlx-postgres", "runtime-tokio-native-tls"]}
serde = "1.0.164"
futures = "0.3.29"
tokio = { version = "1.28.2", features = ["full"] }
tokio-util = { version = "0.7.1", features = ["io"] }
tracing = "0.1"
tracing-subscriber = "0.3"
axum = "0.6.18"
axum = { version = "0.6.18", features = ["multipart"] }
tower = "0.4.12"
hyper = "0.14.27"
serde_bytes = "0.11.9"
Expand All @@ -35,3 +37,4 @@ config = "0.13.3"
serde_json = "1.0.100"
clap = { version = "4.3.23", features = ["derive"] }
is-terminal = "0.4.9"
async-trait = "0.1.74"
4 changes: 4 additions & 0 deletions rust/rsc/src/common/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ pub struct RSCConfigOverride {
pub database_url: Option<String>,
pub server_addr: Option<String>,
pub standalone: Option<bool>,
// TODO: the backing store should be configurable via URI
pub local_store: Option<String>,
}

#[derive(Debug, Deserialize, Serialize)]
Expand All @@ -15,6 +17,7 @@ pub struct RSCConfig {
// TODO: We should allow setting a domain as well
pub server_addr: String,
pub standalone: bool,
pub local_store: Option<String>,
}

impl RSCConfig {
Expand All @@ -35,6 +38,7 @@ impl RSCConfig {
.set_override_option("database_url", overrides.database_url)?
.set_override_option("server_addr", overrides.server_addr)?
.set_override_option("standalone", overrides.standalone)?
.set_override_option("local_store", overrides.local_store)?
.build()?;

config.try_deserialize()
Expand Down
78 changes: 78 additions & 0 deletions rust/rsc/src/rsc/blob.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
use crate::types::GetUploadUrlResponse;
use async_trait::async_trait;
use axum::{extract::Multipart, http::StatusCode, Json};
use data_encoding::BASE64URL;
use futures::TryStreamExt;
use rand_core::{OsRng, RngCore};
use sea_orm::DatabaseConnection;
use std::sync::Arc;
use tokio::fs::File;
use tokio::io::BufWriter;
use tokio_util::io::StreamReader;
use tracing;

// TODO: Update this trait to return the url for a given key
#[async_trait]
pub trait BlobStore {
async fn stream<A, B>(&self, reader: StreamReader<A, B>) -> Result<String, std::io::Error>
where
StreamReader<A, B>: tokio::io::AsyncRead + std::marker::Send;
}

pub trait DebugBlobStore: BlobStore + std::fmt::Debug {}

#[derive(Debug, Clone)]
pub struct LocalBlobStore {
pub root: String,
}

#[async_trait]
impl BlobStore for LocalBlobStore {
async fn stream<A, B>(&self, reader: StreamReader<A, B>) -> Result<String, std::io::Error>
where
StreamReader<A, B>: tokio::io::AsyncRead + std::marker::Send,
{
futures::pin_mut!(reader);

let name = create_temp_filename();
let path = std::path::Path::new(&self.root).join(name.clone());
let mut file = BufWriter::new(File::create(path).await?);

tokio::io::copy(&mut reader, &mut file).await?;

Ok(name)
}
}

impl DebugBlobStore for LocalBlobStore {}

fn create_temp_filename() -> String {
let mut key = [0u8; 16];
OsRng.fill_bytes(&mut key);
// URL must be used as files can't contain /
BASE64URL.encode(&key)
}

#[tracing::instrument]
pub async fn get_upload_url(server_addr: String) -> Json<GetUploadUrlResponse> {
let url = server_addr + "/blob";
Json(GetUploadUrlResponse { url })
}

#[tracing::instrument]
pub async fn create_blob(
mut multipart: Multipart,
_conn: Arc<DatabaseConnection>,
store: impl DebugBlobStore,
) -> (StatusCode, String) {
while let Ok(Some(field)) = multipart.next_field().await {
let body = field.map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err));
match store.stream(StreamReader::new(body)).await {
// TODO: The blob should be inserted into the db instead of just printing the key
Ok(key) => println!("{:?}", key),
Err(msg) => return (StatusCode::INTERNAL_SERVER_ERROR, msg.to_string()),
}
}

(StatusCode::OK, "ok".into())
}
Loading

0 comments on commit d3217d9

Please sign in to comment.