Skip to content

Commit 746f595

Browse files
perf(sync): batch file uploads into single HTTP requests per batch (#2985)
Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
1 parent 542c008 commit 746f595

File tree

1 file changed

+39
-32
lines changed

1 file changed

+39
-32
lines changed

crates/forge_services/src/sync.rs

Lines changed: 39 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -167,9 +167,9 @@ impl<F: 'static + WorkspaceIndexRepository + FileReaderInfra, D: FileDiscovery +
167167
}
168168
}
169169

170-
// Pass 2: upload files — each file's content is read on-demand
171-
// immediately before upload so only one batch occupies memory at a time.
172-
let mut upload_stream = self.upload_files(sync_paths.upload);
170+
// Pass 2: upload files — files are grouped into batches of `batch_size`
171+
// and each batch is sent in a single HTTP request, sequentially.
172+
let mut upload_stream = Box::pin(self.upload_files(sync_paths.upload));
173173

174174
// Process uploads as they complete, updating progress incrementally
175175
while let Some(result) = upload_stream.next().await {
@@ -262,13 +262,15 @@ impl<F: 'static + WorkspaceIndexRepository + FileReaderInfra, D: FileDiscovery +
262262
Ok(files_to_delete.len())
263263
}
264264

265-
/// Uploads files in parallel, reading their content on-demand to keep
266-
/// memory bounded to a single batch at a time.
265+
/// Uploads files in batches, sending one HTTP request per batch of
266+
/// `batch_size` files.
267267
///
268-
/// Each path is read from disk immediately before its upload, so the peak
269-
/// memory footprint is `batch_size × avg_file_size` rather than the size
270-
/// of the entire upload set. The caller is responsible for processing the
271-
/// stream and tracking progress.
268+
/// Files within each batch are read from disk, collected into a single
269+
/// [`forge_domain::FileUpload`] payload, and uploaded in one request.
270+
/// Batches are processed sequentially — only one HTTP request is in-flight
271+
/// at a time — which keeps both memory usage and server concurrency
272+
/// bounded. The stream yields the number of files successfully uploaded
273+
/// per batch.
272274
fn upload_files(
273275
&self,
274276
paths: Vec<PathBuf>,
@@ -279,37 +281,42 @@ impl<F: 'static + WorkspaceIndexRepository + FileReaderInfra, D: FileDiscovery +
279281
let infra = self.infra.clone();
280282
let batch_size = self.batch_size;
281283

282-
futures::stream::iter(paths)
283-
.map(move |file_path| {
284-
let user_id = user_id.clone();
285-
let workspace_id = workspace_id.clone();
286-
let token = token.clone();
287-
let infra = infra.clone();
288-
async move {
284+
futures::stream::iter(paths).chunks(batch_size).then(move |batch| {
285+
let user_id = user_id.clone();
286+
let workspace_id = workspace_id.clone();
287+
let token = token.clone();
288+
let infra = infra.clone();
289+
async move {
290+
let mut files = Vec::with_capacity(batch.len());
291+
for file_path in &batch {
289292
info!(workspace_id = %workspace_id, path = %file_path.display(), "File sync started");
290-
// Read content on-demand — keeps only one batch in memory at a time
291293
let content = infra
292-
.read_utf8(&file_path)
294+
.read_utf8(file_path)
293295
.await
294296
.with_context(|| {
295297
format!("Failed to read file '{}' for upload", file_path.display())
296298
})?;
297-
let path_str = file_path.to_string_lossy().into_owned();
298-
let file = forge_domain::FileRead::new(path_str, content);
299-
let upload = forge_domain::CodeBase::new(
300-
user_id.clone(),
301-
workspace_id.clone(),
302-
vec![file],
303-
);
304-
infra
305-
.upload_files(&upload, &token)
306-
.await
307-
.context("Failed to upload files")?;
299+
files.push(forge_domain::FileRead::new(
300+
file_path.to_string_lossy().into_owned(),
301+
content,
302+
));
303+
}
304+
let count = files.len();
305+
let upload = forge_domain::CodeBase::new(
306+
user_id.clone(),
307+
workspace_id.clone(),
308+
files,
309+
);
310+
infra
311+
.upload_files(&upload, &token)
312+
.await
313+
.context("Failed to upload files")?;
314+
for file_path in &batch {
308315
info!(workspace_id = %workspace_id, path = %file_path.display(), "File sync completed");
309-
Ok::<_, anyhow::Error>(1)
310316
}
311-
})
312-
.buffer_unordered(batch_size)
317+
Ok::<_, anyhow::Error>(count)
318+
}
319+
})
313320
}
314321

315322
/// Discovers workspace files and streams their hashes without retaining

0 commit comments

Comments
 (0)