Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/import huggingface #496

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
161 changes: 161 additions & 0 deletions src/server/src/controllers/workspaces/files.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,15 @@ use actix_web::{web, HttpRequest, HttpResponse};

use actix_multipart::Multipart;
use actix_web::Error;
use flate2::read::GzDecoder;
use futures::StreamExt;
use futures_util::TryStreamExt as _;
use std::io::Read;
use std::io::Write;
use std::path::{Path, PathBuf};

const BUFFER_SIZE_THRESHOLD: usize = 262144; // 256kb

pub async fn get(
req: HttpRequest,
query: web::Query<ImgResize>,
Expand Down Expand Up @@ -79,6 +84,139 @@ pub async fn add(req: HttpRequest, payload: Multipart) -> Result<HttpResponse, O
}))
}

pub async fn add_stream(
req: HttpRequest,
mut payload: web::Payload,
) -> Result<HttpResponse, OxenHttpError> {
// whether if file chunk is compressed
let is_gzip_encoded = req
.headers()
.get("Content-Encoding")
.map_or(false, |v| v == "gzip");

let filename = req
.headers()
.get("X-Filename")
.and_then(|h| h.to_str().ok())
.ok_or_else(|| OxenHttpError::BadRequest("Missing X-Filename header".into()))?;

let total_size = req
.headers()
.get("X-Total-Size")
.and_then(|h| h.to_str().ok())
.and_then(|s| s.parse::<u64>().ok())
.ok_or_else(|| {
OxenHttpError::BadRequest("Missing or invalid X-Total-Size header".into())
})?;
Comment on lines +87 to +110
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Enhance header validation and error handling

The header validation could be improved in several ways:

  1. Add size limit validation
  2. Validate filename for security
  3. Provide more descriptive error messages
-    let filename = req
-        .headers()
-        .get("X-Filename")
-        .and_then(|h| h.to_str().ok())
-        .ok_or_else(|| OxenHttpError::BadRequest("Missing X-Filename header".into()))?;
+    let filename = req
+        .headers()
+        .get("X-Filename")
+        .and_then(|h| h.to_str().ok())
+        .ok_or_else(|| OxenHttpError::BadRequest("X-Filename header is required".into()))?;
+
+    // Validate filename for security
+    if filename.contains("..") || filename.starts_with('/') {
+        return Err(OxenHttpError::BadRequest("Invalid filename".into()));
+    }

-    let total_size = req
-        .headers()
-        .get("X-Total-Size")
-        .and_then(|h| h.to_str().ok())
-        .and_then(|s| s.parse::<u64>().ok())
-        .ok_or_else(|| {
-            OxenHttpError::BadRequest("Missing or invalid X-Total-Size header".into())
-        })?;
+    const MAX_FILE_SIZE: u64 = 1024 * 1024 * 1024; // 1GB
+
+    let total_size = req
+        .headers()
+        .get("X-Total-Size")
+        .and_then(|h| h.to_str().ok())
+        .and_then(|s| s.parse::<u64>().ok())
+        .ok_or_else(|| {
+            OxenHttpError::BadRequest("X-Total-Size header must be a valid number".into())
+        })?;
+
+    if total_size > MAX_FILE_SIZE {
+        return Err(OxenHttpError::BadRequest(format!(
+            "File size {} exceeds maximum allowed size of {}",
+            total_size, MAX_FILE_SIZE
+        ).into()));
+    }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
pub async fn add_stream(
req: HttpRequest,
mut payload: web::Payload,
) -> Result<HttpResponse, OxenHttpError> {
// whether if file chunk is compressed
let is_gzip_encoded = req
.headers()
.get("Content-Encoding")
.map_or(false, |v| v == "gzip");
let filename = req
.headers()
.get("X-Filename")
.and_then(|h| h.to_str().ok())
.ok_or_else(|| OxenHttpError::BadRequest("Missing X-Filename header".into()))?;
let total_size = req
.headers()
.get("X-Total-Size")
.and_then(|h| h.to_str().ok())
.and_then(|s| s.parse::<u64>().ok())
.ok_or_else(|| {
OxenHttpError::BadRequest("Missing or invalid X-Total-Size header".into())
})?;
pub async fn add_stream(
req: HttpRequest,
mut payload: web::Payload,
) -> Result<HttpResponse, OxenHttpError> {
// whether if file chunk is compressed
let is_gzip_encoded = req
.headers()
.get("Content-Encoding")
.map_or(false, |v| v == "gzip");
let filename = req
.headers()
.get("X-Filename")
.and_then(|h| h.to_str().ok())
.ok_or_else(|| OxenHttpError::BadRequest("X-Filename header is required".into()))?;
// Validate filename for security
if filename.contains("..") || filename.starts_with('/') {
return Err(OxenHttpError::BadRequest("Invalid filename".into()));
}
const MAX_FILE_SIZE: u64 = 1024 * 1024 * 1024; // 1GB
let total_size = req
.headers()
.get("X-Total-Size")
.and_then(|h| h.to_str().ok())
.and_then(|s| s.parse::<u64>().ok())
.ok_or_else(|| {
OxenHttpError::BadRequest("X-Total-Size header must be a valid number".into())
})?;
if total_size > MAX_FILE_SIZE {
return Err(OxenHttpError::BadRequest(format!(
"File size {} exceeds maximum allowed size of {}",
total_size, MAX_FILE_SIZE
).into()));
}


let app_data = app_data(&req)?;
let namespace = path_param(&req, "namespace")?;
let repo_name = path_param(&req, "repo_name")?;
let workspace_id = path_param(&req, "workspace_id")?;
let repo = get_repo(&app_data.path, namespace, &repo_name)?;
let directory = PathBuf::from(path_param(&req, "path")?);

let workspace = repositories::workspaces::get(&repo, &workspace_id)?;

log::debug!("workspace::files::add_stream Got uploaded file name: {filename:?}");

let workspace_dir = workspace.dir();

log::debug!("workspace::files::add_stream Got workspace dir: {workspace_dir:?}");

let full_dir = workspace_dir.join(directory);

log::debug!("workspace::files::add_stream Got full dir: {full_dir:?}");

if !full_dir.exists() {
std::fs::create_dir_all(&full_dir)?;
}
Comment on lines +131 to +133
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add path traversal protection

The directory creation lacks path traversal protection. Malicious input could potentially create directories outside the workspace.

Apply this diff:

 if !full_dir.exists() {
+    // Ensure the full_dir is within the workspace_dir
+    if !full_dir.starts_with(&workspace_dir) {
+        return Err(OxenHttpError::BadRequest("Invalid directory path".into()));
+    }
     std::fs::create_dir_all(&full_dir)?;
 }

Committable suggestion skipped: line range outside the PR's diff.


let filepath = full_dir.join(filename);

log::debug!("workspace::files::add_stream Got filepath: {:?}", filepath);

let mut files = vec![];

let bytes_written = if filepath.exists() {
std::fs::metadata(&filepath)?.len()
} else {
0
};

// Log progress every 5MB
if bytes_written % (10 * 1024 * 1024) == 0 {
log::debug!(
"workspace::files::add_stream file upload progress: {:.1}% ({}/{} bytes)",
(bytes_written as f64 / total_size as f64) * 100.0,
bytes_written,
total_size
);
}

let mut buffer = web::BytesMut::new();

while let Some(chunk) = payload.next().await {
let chunk = chunk.map_err(|_| OxenHttpError::BadRequest("Error reading payload".into()))?;

// check if received eof signal
if chunk.len() == 1 && chunk[0] == 0 {
// validate file size match
if bytes_written == total_size {
log::info!("add_stream upload completed: {} bytes", total_size);

files.push(filepath.clone());

let path = repositories::workspaces::files::add(&workspace, filepath)?;
log::debug!("add_stream ✅ success! staged file {:?}", path);

return Ok(HttpResponse::Ok().json(FilePathsResponse {
status: StatusMessage::resource_created(),
paths: files,
}));
} else {
log::error!(
"Upload stream incomplete. Expected {} bytes but received {} bytes",
total_size,
bytes_written
);
return Ok(HttpResponse::InternalServerError()
.json(StatusMessage::internal_server_error()));
}
} else {
// not eof, save stream to file
let processed_chunk = if is_gzip_encoded {
let mut decoder = GzDecoder::new(&chunk[..]);
let mut decompressed = Vec::new();
decoder.read_to_end(&mut decompressed).map_err(|e| {
OxenHttpError::BadRequest(
format!("Failed to decompress gzip data: {}", e).into(),
)
})?;
decompressed
} else {
chunk.to_vec()
};
buffer.extend_from_slice(&processed_chunk);

if buffer.len() > BUFFER_SIZE_THRESHOLD {
save_stream(&filepath, buffer.split().freeze().to_vec()).await?;
}
}
}

if !buffer.is_empty() {
save_stream(&filepath, buffer.freeze().to_vec()).await?;
}

files.push(filepath.clone());

Ok(HttpResponse::Ok().json(FilePathsResponse {
status: StatusMessage::resource_created(),
paths: files,
}))
}
Comment on lines +87 to +218
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codebase verification

⚠️ Incomplete upload handling needs improvement

The current implementation lacks cleanup mechanisms for incomplete uploads, which could lead to:

  • Orphaned partial files on connection failures
  • Inconsistent workspace state on premature upload termination
  • No automatic cleanup of incomplete uploads

Suggested improvements:

  • Implement temporary file staging with atomic move on completion
  • Add cleanup routine for partial uploads
  • Consider implementing upload resume capability
🔗 Analysis chain

Verify cleanup of incomplete uploads and endpoint usage

Please verify:

  1. Cleanup mechanism for incomplete uploads in case of failures
  2. Usage of the new streaming endpoint in the codebase
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Check for cleanup handling in error cases
rg -l 'cleanup|remove|delete.*incomplete' 

# Check for usage of the new streaming endpoint
rg -l 'file_stream.*path'

# Check for any existing error handling patterns
ast-grep --pattern 'try {
  $$$
  file_stream
  $$$
} catch (e) {
  $$$
}'

Length of output: 4366


Script:

#!/bin/bash
# Check workspace file handling implementation
rg -A 10 'fn save_stream' src/server/src/controllers/workspaces/files.rs

# Check workspace file cleanup implementations
rg -A 10 'fn (cleanup|remove|delete).*workspace.*file' src/lib/src/repositories/workspaces/

# Check error handling in workspace files
ast-grep --pattern 'fn add($$$) -> Result<$$$> {
  $$$
}'

Length of output: 569


Script:

#!/bin/bash
# Check for transaction patterns in workspace operations
rg -A 5 'transaction|atomic|rollback' src/lib/src/repositories/workspaces/

# Check if there's any workspace file validation
rg -A 5 'validate.*file|check.*file' src/lib/src/repositories/workspaces/

# Look for workspace file error handling
ast-grep --pattern 'match $_ {
  Ok($_) => $$$,
  Err($_) => $$$
}'  src/server/src/controllers/workspaces/files.rs

Length of output: 269


pub async fn delete(req: HttpRequest) -> Result<HttpResponse, OxenHttpError> {
let app_data = app_data(&req)?;
let namespace = path_param(&req, "namespace")?;
Expand Down Expand Up @@ -169,3 +307,26 @@ async fn save_parts(

Ok(files)
}

async fn save_stream(filepath: &PathBuf, chunk: Vec<u8>) -> Result<&PathBuf, Error> {
log::debug!(
"workspace::files::save_stream writing {} bytes to file",
chunk.len()
);

let filepath_cpy = filepath.clone();

let mut file = web::block(move || {
std::fs::OpenOptions::new()
.create(true)
.append(true)
.open(filepath_cpy)
})
.await??;

log::debug!("workspace::files::save_stream is writing file");

web::block(move || file.write_all(&chunk).map(|_| file)).await??;

Ok(filepath)
}
4 changes: 4 additions & 0 deletions src/server/src/services/workspaces.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ pub fn workspace() -> Scope {
"/files/{path:.*}",
web::post().to(controllers::workspaces::files::add),
)
.route(
"/file_stream/{path:.*}",
web::post().to(controllers::workspaces::files::add_stream),
)
Comment on lines +34 to +37
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codebase verification

Based on the code analysis, I can now provide the final response:

Content type validation needed for file streaming endpoint

The add_stream endpoint in src/server/src/controllers/workspaces/files.rs currently lacks content type validation. The endpoint only checks:

  • Content-Encoding for gzip compression
  • X-Filename header presence
  • X-Total-Size for file size validation

To prevent security risks:

  • Add content type validation using the Content-Type header
  • Define and enforce a whitelist of allowed MIME types
  • Return a 415 Unsupported Media Type response for disallowed content types
🔗 Analysis chain

Verify content type restrictions

The file streaming endpoint should validate the Content-Type header to ensure only allowed file types are uploaded.


Let me analyze the code further to verify content type restrictions.


Let me analyze the implementation of the file streaming endpoint.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Search for any existing content type validations in the codebase
rg -A 5 'Content-Type|content_type|MIME|mime'

# Search for file extension validations
ast-grep --pattern 'if $file.ends_with($ext) { $$$  }'

Length of output: 52593


Script:

#!/bin/bash
# Search for file upload validation in the controllers
rg -A 10 'add_stream|upload|file_stream' src/server/src/controllers/

# Search for multipart form data handling
rg -A 10 'multipart.*form-data' src/server/src/controllers/

Length of output: 30229


Script:

#!/bin/bash
# Search for the file streaming endpoint implementation
rg -A 20 'pub async fn add_stream' src/server/src/controllers/workspaces/files.rs

# Search for any file type validation in the files module
rg -A 10 'validate|check.*file|allowed.*type' src/server/src/controllers/workspaces/files.rs

Length of output: 1306

.route(
"/files/{path:.*}",
web::delete().to(controllers::workspaces::files::delete),
Expand Down
Loading