Skip to content

Commit

Permalink
rsc: chunk ids in query and apply review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
AbrarQuazi committed Jan 16, 2025
1 parent 43858cf commit c33b2a7
Showing 1 changed file with 37 additions and 33 deletions.
70 changes: 37 additions & 33 deletions rust/rsc/src/bin/rsc/read_job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,47 +36,51 @@ async fn record_miss(hash: String, conn: Arc<DatabaseConnection>) {

#[tracing::instrument(skip(db, stores))]
async fn resolve_blobs<T: ConnectionTrait>(
ids: &[Uuid],
ids: &Vec<Uuid>,
db: &T,
stores: &HashMap<Uuid, Arc<dyn blob::DebugBlobStore + Sync + Send>>,
) -> Result<HashMap<Uuid, ResolvedBlob>, String> {
// Fetch all blobs in a single query
let blobs = Blob::find()
.filter(entity::blob::Column::Id.is_in(ids.to_vec()))
.all(db)
.await
.map_err(|e| format!("Failed to query blobs: {}", e))?;

// Build a map of blob_id -> blob model for quick lookup
let blob_map: HashMap<Uuid, entity::blob::Model> = blobs.into_iter().map(|b| (b.id, b)).collect();
//Postgres has a 65,535 parameter limit, ensuring we chunk ID's below that: https://www.postgresql.org/docs/current/limits.html
const CHUNK_SIZE: usize = 50_000;

// Ensure we have all requested blobs
for &id in ids {
if !blob_map.contains_key(&id) {
return Err(format!("Unable to find blob {} by id", id));
let mut resolved_map = HashMap::new();

for chunk in ids.chunks(CHUNK_SIZE) {
// Fetch chunked blobs in a single query
let blob_map: HashMap<Uuid, entity::blob::Model> = Blob::find()
.filter(entity::blob::Column::Id.is_in(chunk.to_vec()))
.all(db)
.await
.map_err(|e| format!("Failed to query blobs, database error: {}", e))?
.into_iter()
.map(|b| (b.id, b))
.collect();

// Ensure we have all requested blobs
for &id in chunk {
if !blob_map.contains_key(&id) {
return Err(format!("Unable to find blob {} by id", id));
}
}
}

// Resolve all download URLs in parallel
let futures = blob_map.iter().map(|(id, blob)| {
let store_opt = stores.get(&blob.store_id).cloned();
let key = blob.key.clone();

async move {
let store = store_opt.ok_or_else(|| {
format!("Unable to find backing store {} for blob {}", blob.store_id, id)
})?;
let url = store.download_url(key).await;
Ok::<(Uuid, ResolvedBlob), String>((*id, ResolvedBlob { id: *id, url }))
}
});
// Resolve all download URLs in parallel
let futures = blob_map.iter().map(|(id, blob)| {
let store_opt = stores.get(&blob.store_id).cloned();
let key = blob.key.clone();

async move {
let store = store_opt.ok_or_else(|| {
format!("Unable to find backing store {} for blob {}", blob.store_id, id)
})?;
let url = store.download_url(key).await;
Ok::<(Uuid, ResolvedBlob), String>((*id, ResolvedBlob { id: *id, url }))
}
});

let results = join_all(futures).await;
let results = join_all(futures).await;

let mut resolved_map = HashMap::new();
for res in results {
let (id, resolved_blob) = res?;
resolved_map.insert(id, resolved_blob);
let partial_map: HashMap<Uuid, ResolvedBlob> = results.into_iter().collect::<Result<_,_>>()?;
resolved_map.extend(partial_map);
}

Ok(resolved_map)
Expand Down

0 comments on commit c33b2a7

Please sign in to comment.