Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/import huggingface #496

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open

Feat/import huggingface #496

wants to merge 5 commits into from

Conversation

Lilianxr
Copy link
Collaborator

@Lilianxr Lilianxr commented Jan 2, 2025

Summary by CodeRabbit

  • New Features

    • Added support for streaming file uploads to workspaces.
    • Introduced a new endpoint for handling file streams with gzip encoding support.
    • Enabled file upload capabilities for large files with improved chunk processing.
  • Improvements

    • Enhanced file upload error handling with specific messages.
    • Added a more robust file streaming mechanism.

Copy link

coderabbitai bot commented Jan 2, 2025

Walkthrough

This pull request introduces a new asynchronous function add_stream for handling file uploads via a streaming payload in the workspace file handling system. It includes support for gzip-encoded uploads, validates custom headers for filename and total size, and processes incoming data in chunks. Additionally, a new POST route /file_stream/{path:.*} is added to the workspace service to facilitate these streaming uploads. The existing add function remains unchanged, ensuring backward compatibility.

Changes

File Change Summary
src/server/src/controllers/workspaces/files.rs Added add_stream() method for streaming file uploads and save_stream() helper function.
src/server/src/services/workspaces.rs Added new POST route /file_stream/{path:.*} to support streaming file upload endpoint.

Sequence Diagram

sequenceDiagram
    participant Client
    participant Server
    participant FileSystem
    Client->>Server: POST /file_stream/{path} with streaming payload
    Server->>Server: Validate headers (filename, total size)
    Server->>Server: Check for gzip encoding
    loop Process File Chunks
        Client->>Server: Send file chunk
        Server->>Server: Decompress if gzip
        Server->>FileSystem: Write chunk to file
    end
    Server-->>Client: Return upload completion response
Loading

Poem

🐰 Streaming files, chunk by chunk,
Gzip dancing, data's spunk!
Headers checked with rabbit's might,
Uploads flowing smooth and tight,
File transfer, a coding delight! 🚀


Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR. (Beta)
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🧹 Nitpick comments (3)
src/server/src/controllers/workspaces/files.rs (2)

24-24: Consider making the buffer size configurable

The buffer size threshold is hardcoded. Consider making this configurable through environment variables or configuration files to allow for runtime tuning based on system resources and requirements.

-const BUFFER_SIZE_THRESHOLD: usize = 262144; // 256kb
+const DEFAULT_BUFFER_SIZE_THRESHOLD: usize = 262144; // 256kb
+
+fn get_buffer_size() -> usize {
+    std::env::var("OXEN_UPLOAD_BUFFER_SIZE")
+        .ok()
+        .and_then(|v| v.parse().ok())
+        .unwrap_or(DEFAULT_BUFFER_SIZE_THRESHOLD)
+}

303-324: Enhance error handling in save_stream function

The save_stream function could benefit from improved error handling and logging.

 async fn save_stream(filepath: &PathBuf, chunk: Vec<u8>) -> Result<&PathBuf, Error> {
-    log::debug!(
+    log::trace!(
         "workspace::files::save_stream writing {} bytes to file",
         chunk.len()
     );

     let filepath_cpy = filepath.clone();

-    let mut file = web::block(move || {
-        std::fs::OpenOptions::new()
-            .create(true)
-            .append(true)
-            .open(filepath_cpy)
-    })
-    .await??;
+    let mut file = web::block(move || -> std::io::Result<std::fs::File> {
+        let file = std::fs::OpenOptions::new()
+            .create(true)
+            .append(true)
+            .open(&filepath_cpy)?;
+        
+        // Ensure we have write permissions
+        file.metadata()?.permissions().readonly();
+        
+        Ok(file)
+    })
+    .await
+    .map_err(|e| {
+        log::error!("Failed to open file {}: {}", filepath_cpy.display(), e);
+        e
+    })??;

-    log::debug!("workspace::files::save_stream is writing file");
+    web::block(move || file.write_all(&chunk))
+        .await
+        .map_err(|e| {
+            log::error!("Failed to write to file: {}", e);
+            e
+        })??;

-    web::block(move || file.write_all(&chunk).map(|_| file)).await??;

     Ok(filepath)
 }
src/server/src/services/workspaces.rs (1)

34-37: Consider rate limiting for file uploads

The new file streaming endpoint should have rate limiting to prevent abuse. Consider adding a middleware for rate limiting based on client IP or authentication token.

use actix_web::middleware::DefaultHeaders;
use std::time::Duration;

// Example rate limiting middleware configuration
let file_upload_config = web::scope("/file_stream")
    .wrap(
        RateLimiter::new(
            SimpleMemoryStore::new(),
            RateLimiterConfig {
                max_requests: 10,
                interval: Duration::from_secs(60),
            },
        )
    )
    .route("/{path:.*}", web::post().to(controllers::workspaces::files::add_stream));
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 2f90d54 and 0d19b18.

📒 Files selected for processing (2)
  • src/server/src/controllers/workspaces/files.rs (3 hunks)
  • src/server/src/services/workspaces.rs (1 hunks)
🧰 Additional context used
🪛 GitHub Check: Clippy
src/server/src/controllers/workspaces/files.rs

[failure] 135-135135:
the borrowed expression implements the required traits

src/server/src/controllers/workspaces/files.rs Outdated Show resolved Hide resolved
Comment on lines +87 to +110
pub async fn add_stream(
req: HttpRequest,
mut payload: web::Payload,
) -> Result<HttpResponse, OxenHttpError> {
// whether if file chunk is compressed
let is_gzip_encoded = req
.headers()
.get("Content-Encoding")
.map_or(false, |v| v == "gzip");

let filename = req
.headers()
.get("X-Filename")
.and_then(|h| h.to_str().ok())
.ok_or_else(|| OxenHttpError::BadRequest("Missing X-Filename header".into()))?;

let total_size = req
.headers()
.get("X-Total-Size")
.and_then(|h| h.to_str().ok())
.and_then(|s| s.parse::<u64>().ok())
.ok_or_else(|| {
OxenHttpError::BadRequest("Missing or invalid X-Total-Size header".into())
})?;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Enhance header validation and error handling

The header validation could be improved in several ways:

  1. Add size limit validation
  2. Validate filename for security
  3. Provide more descriptive error messages
-    let filename = req
-        .headers()
-        .get("X-Filename")
-        .and_then(|h| h.to_str().ok())
-        .ok_or_else(|| OxenHttpError::BadRequest("Missing X-Filename header".into()))?;
+    let filename = req
+        .headers()
+        .get("X-Filename")
+        .and_then(|h| h.to_str().ok())
+        .ok_or_else(|| OxenHttpError::BadRequest("X-Filename header is required".into()))?;
+
+    // Validate filename for security
+    if filename.contains("..") || filename.starts_with('/') {
+        return Err(OxenHttpError::BadRequest("Invalid filename".into()));
+    }

-    let total_size = req
-        .headers()
-        .get("X-Total-Size")
-        .and_then(|h| h.to_str().ok())
-        .and_then(|s| s.parse::<u64>().ok())
-        .ok_or_else(|| {
-            OxenHttpError::BadRequest("Missing or invalid X-Total-Size header".into())
-        })?;
+    const MAX_FILE_SIZE: u64 = 1024 * 1024 * 1024; // 1GB
+
+    let total_size = req
+        .headers()
+        .get("X-Total-Size")
+        .and_then(|h| h.to_str().ok())
+        .and_then(|s| s.parse::<u64>().ok())
+        .ok_or_else(|| {
+            OxenHttpError::BadRequest("X-Total-Size header must be a valid number".into())
+        })?;
+
+    if total_size > MAX_FILE_SIZE {
+        return Err(OxenHttpError::BadRequest(format!(
+            "File size {} exceeds maximum allowed size of {}",
+            total_size, MAX_FILE_SIZE
+        ).into()));
+    }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
pub async fn add_stream(
req: HttpRequest,
mut payload: web::Payload,
) -> Result<HttpResponse, OxenHttpError> {
// whether if file chunk is compressed
let is_gzip_encoded = req
.headers()
.get("Content-Encoding")
.map_or(false, |v| v == "gzip");
let filename = req
.headers()
.get("X-Filename")
.and_then(|h| h.to_str().ok())
.ok_or_else(|| OxenHttpError::BadRequest("Missing X-Filename header".into()))?;
let total_size = req
.headers()
.get("X-Total-Size")
.and_then(|h| h.to_str().ok())
.and_then(|s| s.parse::<u64>().ok())
.ok_or_else(|| {
OxenHttpError::BadRequest("Missing or invalid X-Total-Size header".into())
})?;
pub async fn add_stream(
req: HttpRequest,
mut payload: web::Payload,
) -> Result<HttpResponse, OxenHttpError> {
// whether if file chunk is compressed
let is_gzip_encoded = req
.headers()
.get("Content-Encoding")
.map_or(false, |v| v == "gzip");
let filename = req
.headers()
.get("X-Filename")
.and_then(|h| h.to_str().ok())
.ok_or_else(|| OxenHttpError::BadRequest("X-Filename header is required".into()))?;
// Validate filename for security
if filename.contains("..") || filename.starts_with('/') {
return Err(OxenHttpError::BadRequest("Invalid filename".into()));
}
const MAX_FILE_SIZE: u64 = 1024 * 1024 * 1024; // 1GB
let total_size = req
.headers()
.get("X-Total-Size")
.and_then(|h| h.to_str().ok())
.and_then(|s| s.parse::<u64>().ok())
.ok_or_else(|| {
OxenHttpError::BadRequest("X-Total-Size header must be a valid number".into())
})?;
if total_size > MAX_FILE_SIZE {
return Err(OxenHttpError::BadRequest(format!(
"File size {} exceeds maximum allowed size of {}",
total_size, MAX_FILE_SIZE
).into()));
}

Comment on lines +34 to +37
.route(
"/file_stream/{path:.*}",
web::post().to(controllers::workspaces::files::add_stream),
)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codebase verification

Based on the code analysis, I can now provide the final response:

Content type validation needed for file streaming endpoint

The add_stream endpoint in src/server/src/controllers/workspaces/files.rs currently lacks content type validation. The endpoint only checks:

  • Content-Encoding for gzip compression
  • X-Filename header presence
  • X-Total-Size for file size validation

To prevent security risks:

  • Add content type validation using the Content-Type header
  • Define and enforce a whitelist of allowed MIME types
  • Return a 415 Unsupported Media Type response for disallowed content types
🔗 Analysis chain

Verify content type restrictions

The file streaming endpoint should validate the Content-Type header to ensure only allowed file types are uploaded.


Let me analyze the code further to verify content type restrictions.


Let me analyze the implementation of the file streaming endpoint.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Search for any existing content type validations in the codebase
rg -A 5 'Content-Type|content_type|MIME|mime'

# Search for file extension validations
ast-grep --pattern 'if $file.ends_with($ext) { $$$  }'

Length of output: 52593


Script:

#!/bin/bash
# Search for file upload validation in the controllers
rg -A 10 'add_stream|upload|file_stream' src/server/src/controllers/

# Search for multipart form data handling
rg -A 10 'multipart.*form-data' src/server/src/controllers/

Length of output: 30229


Script:

#!/bin/bash
# Search for the file streaming endpoint implementation
rg -A 20 'pub async fn add_stream' src/server/src/controllers/workspaces/files.rs

# Search for any file type validation in the files module
rg -A 10 'validate|check.*file|allowed.*type' src/server/src/controllers/workspaces/files.rs

Length of output: 1306

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (2)
src/server/src/controllers/workspaces/files.rs (2)

92-110: ⚠️ Potential issue

Enhance header validation for security and robustness

The current header validation could be improved:

  1. Missing size limit validation
  2. Missing filename security validation
  3. Headers could provide more descriptive error messages

This is a duplicate of a previous review comment. The suggested improvements remain valid and should be implemented.


151-202: 🛠️ Refactor suggestion

Improve streaming implementation robustness

The current streaming implementation needs several improvements:

  1. Missing progress tracking
  2. Missing proper error handling for buffer overflow
  3. Missing timeout handling
  4. Missing cancellation support

This is a duplicate of a previous review comment. The suggested improvements remain valid and should be implemented.

🧹 Nitpick comments (1)
src/server/src/controllers/workspaces/files.rs (1)

303-324: Enhance error context in save_stream

While the error handling is present, it could provide more context about what failed during the file operations.

Apply this diff:

 async fn save_stream(filepath: &PathBuf, chunk: Vec<u8>) -> Result<&PathBuf, Error> {
     log::debug!(
         "workspace::files::save_stream writing {} bytes to file",
         chunk.len()
     );

     let filepath_cpy = filepath.clone();

     let mut file = web::block(move || {
         std::fs::OpenOptions::new()
             .create(true)
             .append(true)
             .open(filepath_cpy)
-    })
-    .await??;
+    })
+    .await
+    .map_err(|e| actix_web::error::ErrorInternalServerError(
+        format!("Failed to open file {}: {}", filepath.display(), e)
+    ))??;

     log::debug!("workspace::files::save_stream is writing file");

-    web::block(move || file.write_all(&chunk).map(|_| file)).await??;
+    web::block(move || file.write_all(&chunk).map(|_| file))
+        .await
+        .map_err(|e| actix_web::error::ErrorInternalServerError(
+            format!("Failed to write {} bytes to {}: {}", 
+                chunk.len(), filepath.display(), e)
+        ))??;

     Ok(filepath)
 }
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 0d19b18 and 2f74bb4.

📒 Files selected for processing (1)
  • src/server/src/controllers/workspaces/files.rs (3 hunks)
🔇 Additional comments (2)
src/server/src/controllers/workspaces/files.rs (2)

17-24: LGTM! Well-structured imports and reasonable buffer size.

The new imports are properly scoped for the streaming functionality, and the 256kb buffer size is a reasonable choice for balancing memory usage and performance.


87-210: Add tests for streaming functionality

The new streaming implementation would benefit from comprehensive tests covering:

  1. Success cases with various file sizes
  2. Error cases (invalid headers, network issues)
  3. Edge cases (empty files, maximum size files)
  4. Concurrent upload scenarios

Would you like me to help create a test suite for this new functionality?

Comment on lines +131 to +133
if !full_dir.exists() {
std::fs::create_dir_all(&full_dir)?;
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add path traversal protection

The directory creation lacks path traversal protection. Malicious input could potentially create directories outside the workspace.

Apply this diff:

 if !full_dir.exists() {
+    // Ensure the full_dir is within the workspace_dir
+    if !full_dir.starts_with(&workspace_dir) {
+        return Err(OxenHttpError::BadRequest("Invalid directory path".into()));
+    }
     std::fs::create_dir_all(&full_dir)?;
 }

Committable suggestion skipped: line range outside the PR's diff.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (3)
src/server/src/controllers/workspaces/files.rs (3)

92-110: ⚠️ Potential issue

Enhance header validation for security

The header validation needs improvement for security and better error handling.

Apply this diff to enhance header validation:

+    const MAX_FILE_SIZE: u64 = 1024 * 1024 * 1024; // 1GB
+
     let filename = req
         .headers()
         .get("X-Filename")
         .and_then(|h| h.to_str().ok())
-        .ok_or_else(|| OxenHttpError::BadRequest("Missing X-Filename header".into()))?;
+        .ok_or_else(|| OxenHttpError::BadRequest("X-Filename header is required".into()))?;
+
+    // Validate filename for security
+    if filename.contains("..") || filename.starts_with('/') {
+        return Err(OxenHttpError::BadRequest("Invalid filename".into()));
+    }

     let total_size = req
         .headers()
         .get("X-Total-Size")
         .and_then(|h| h.to_str().ok())
         .and_then(|s| s.parse::<u64>().ok())
         .ok_or_else(|| {
-            OxenHttpError::BadRequest("Missing or invalid X-Total-Size header".into())
+            OxenHttpError::BadRequest("X-Total-Size header must be a valid number".into())
         })?;
+
+    if total_size > MAX_FILE_SIZE {
+        return Err(OxenHttpError::BadRequest(format!(
+            "File size {} exceeds maximum allowed size of {}",
+            total_size, MAX_FILE_SIZE
+        ).into()));
+    }

131-133: ⚠️ Potential issue

Add path traversal protection

The directory creation lacks path traversal protection.

Apply this diff:

 if !full_dir.exists() {
+    // Ensure the full_dir is within the workspace_dir
+    if !full_dir.starts_with(&workspace_dir) {
+        return Err(OxenHttpError::BadRequest("Invalid directory path".into()));
+    }
     std::fs::create_dir_all(&full_dir)?;
 }

159-206: 🛠️ Refactor suggestion

Improve streaming implementation with better progress tracking and error handling

The streaming implementation could be enhanced with better buffer management and error handling.

Key improvements needed:

  1. Track processed bytes for accurate progress
  2. Add buffer overflow protection
  3. Improve progress logging
  4. Add proper error handling for the streaming process

Apply this diff:

+    let mut bytes_processed = 0u64;
+    let start_time = std::time::Instant::now();
     let mut buffer = web::BytesMut::new();

     while let Some(chunk) = payload.next().await {
         let chunk = chunk.map_err(|_| OxenHttpError::BadRequest("Error reading payload".into()))?;
+        
+        // Check if we've exceeded the total size
+        if bytes_processed + (chunk.len() as u64) > total_size {
+            return Err(OxenHttpError::BadRequest("Upload exceeds declared size".into()));
+        }

         // Process chunk...
         let processed_chunk = if is_gzip_encoded {
             // ... gzip processing ...
         } else {
             chunk.to_vec()
         };
+        bytes_processed += processed_chunk.len() as u64;
         buffer.extend_from_slice(&processed_chunk);

         if buffer.len() > BUFFER_SIZE_THRESHOLD {
             save_stream(&filepath, buffer.split().freeze().to_vec()).await?;
+            
+            // Log progress periodically
+            if bytes_processed % (5 * 1024 * 1024) == 0 { // Every 5MB
+                log::debug!(
+                    "Upload progress: {:.1}% ({}/{} bytes)",
+                    (bytes_processed as f64 / total_size as f64) * 100.0,
+                    bytes_processed,
+                    total_size
+                );
+            }
         }
     }
🧹 Nitpick comments (2)
src/server/src/controllers/workspaces/files.rs (2)

24-24: Consider making BUFFER_SIZE_THRESHOLD configurable

The buffer size threshold could be made configurable through environment variables or configuration files to allow tuning based on system resources and requirements.

-const BUFFER_SIZE_THRESHOLD: usize = 262144; // 256kb
+const BUFFER_SIZE_THRESHOLD: usize = std::env::var("OXEN_BUFFER_SIZE_THRESHOLD")
+    .ok()
+    .and_then(|v| v.parse().ok())
+    .unwrap_or(262144); // 256kb default

311-332: Enhance error context in save_stream function

While the implementation is correct, the error handling could be more descriptive.

Apply this diff:

 async fn save_stream(filepath: &PathBuf, chunk: Vec<u8>) -> Result<&PathBuf, Error> {
     log::debug!(
         "workspace::files::save_stream writing {} bytes to file",
         chunk.len()
     );

     let filepath_cpy = filepath.clone();

     let mut file = web::block(move || {
         std::fs::OpenOptions::new()
             .create(true)
             .append(true)
             .open(filepath_cpy)
-    })
-    .await??;
+    })
+    .await
+    .map_err(|e| actix_web::error::ErrorInternalServerError(
+        format!("Failed to open file: {}", e)
+    ))??;

     log::debug!("workspace::files::save_stream is writing file");

-    web::block(move || file.write_all(&chunk).map(|_| file)).await??;
+    web::block(move || file.write_all(&chunk).map(|_| file))
+        .await
+        .map_err(|e| actix_web::error::ErrorInternalServerError(
+            format!("Failed to write to file: {}", e)
+        ))??;

     Ok(filepath)
 }
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 2f74bb4 and 00562d3.

📒 Files selected for processing (1)
  • src/server/src/controllers/workspaces/files.rs (3 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (3)
  • GitHub Check: Test Suite Windows
  • GitHub Check: Test Suite MacOS
  • GitHub Check: Clippy

Comment on lines +87 to +218
let workspace = repositories::workspaces::get(&repo, &workspace_id)?;

log::debug!("workspace::files::add_stream Got uploaded file name: {filename:?}");

let workspace_dir = workspace.dir();

log::debug!("workspace::files::add_stream Got workspace dir: {workspace_dir:?}");

let full_dir = workspace_dir.join(directory);

log::debug!("workspace::files::add_stream Got full dir: {full_dir:?}");

if !full_dir.exists() {
std::fs::create_dir_all(&full_dir)?;
}

let filepath = full_dir.join(filename);

log::debug!("workspace::files::add_stream Got filepath: {:?}", filepath);

let mut files = vec![];

let bytes_written = if filepath.exists() {
std::fs::metadata(&filepath)?.len()
} else {
0
};

// Log progress every 5MB
if bytes_written % (10 * 1024 * 1024) == 0 {
log::debug!(
"workspace::files::add_stream file upload progress: {:.1}% ({}/{} bytes)",
(bytes_written as f64 / total_size as f64) * 100.0,
bytes_written,
total_size
);
}

let mut buffer = web::BytesMut::new();

while let Some(chunk) = payload.next().await {
let chunk = chunk.map_err(|_| OxenHttpError::BadRequest("Error reading payload".into()))?;

// check if received eof signal
if chunk.len() == 1 && chunk[0] == 0 {
// validate file size match
if bytes_written == total_size {
log::info!("add_stream upload completed: {} bytes", total_size);

files.push(filepath.clone());

let path = repositories::workspaces::files::add(&workspace, filepath)?;
log::debug!("add_stream ✅ success! staged file {:?}", path);

return Ok(HttpResponse::Ok().json(FilePathsResponse {
status: StatusMessage::resource_created(),
paths: files,
}));
} else {
log::error!(
"Upload stream incomplete. Expected {} bytes but received {} bytes",
total_size,
bytes_written
);
return Ok(HttpResponse::InternalServerError()
.json(StatusMessage::internal_server_error()));
}
} else {
// not eof, save stream to file
let processed_chunk = if is_gzip_encoded {
let mut decoder = GzDecoder::new(&chunk[..]);
let mut decompressed = Vec::new();
decoder.read_to_end(&mut decompressed).map_err(|e| {
OxenHttpError::BadRequest(
format!("Failed to decompress gzip data: {}", e).into(),
)
})?;
decompressed
} else {
chunk.to_vec()
};
buffer.extend_from_slice(&processed_chunk);

if buffer.len() > BUFFER_SIZE_THRESHOLD {
save_stream(&filepath, buffer.split().freeze().to_vec()).await?;
}
}
}

if !buffer.is_empty() {
save_stream(&filepath, buffer.freeze().to_vec()).await?;
}

files.push(filepath.clone());

Ok(HttpResponse::Ok().json(FilePathsResponse {
status: StatusMessage::resource_created(),
paths: files,
}))
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codebase verification

⚠️ Incomplete upload handling needs improvement

The current implementation lacks cleanup mechanisms for incomplete uploads, which could lead to:

  • Orphaned partial files on connection failures
  • Inconsistent workspace state on premature upload termination
  • No automatic cleanup of incomplete uploads

Suggested improvements:

  • Implement temporary file staging with atomic move on completion
  • Add cleanup routine for partial uploads
  • Consider implementing upload resume capability
🔗 Analysis chain

Verify cleanup of incomplete uploads and endpoint usage

Please verify:

  1. Cleanup mechanism for incomplete uploads in case of failures
  2. Usage of the new streaming endpoint in the codebase
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Check for cleanup handling in error cases
rg -l 'cleanup|remove|delete.*incomplete' 

# Check for usage of the new streaming endpoint
rg -l 'file_stream.*path'

# Check for any existing error handling patterns
ast-grep --pattern 'try {
  $$$
  file_stream
  $$$
} catch (e) {
  $$$
}'

Length of output: 4366


Script:

#!/bin/bash
# Check workspace file handling implementation
rg -A 10 'fn save_stream' src/server/src/controllers/workspaces/files.rs

# Check workspace file cleanup implementations
rg -A 10 'fn (cleanup|remove|delete).*workspace.*file' src/lib/src/repositories/workspaces/

# Check error handling in workspace files
ast-grep --pattern 'fn add($$$) -> Result<$$$> {
  $$$
}'

Length of output: 569


Script:

#!/bin/bash
# Check for transaction patterns in workspace operations
rg -A 5 'transaction|atomic|rollback' src/lib/src/repositories/workspaces/

# Check if there's any workspace file validation
rg -A 5 'validate.*file|check.*file' src/lib/src/repositories/workspaces/

# Look for workspace file error handling
ast-grep --pattern 'match $_ {
  Ok($_) => $$$,
  Err($_) => $$$
}'  src/server/src/controllers/workspaces/files.rs

Length of output: 269

@Lilianxr Lilianxr requested a review from jcelliott January 6, 2025 21:34
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant