Skip to content

Commit

Permalink
rsc: update read_job transaction hit path (#1671)
Browse files Browse the repository at this point in the history
* rsc: made read_job transaction smaller

* rsc: added verification logic to hit path in read_job

* rsc: fix comment

* rsc: remove unused import

* rsc: get rid of verification queries, actually not needed

* rsc: made it so that we resolve all blobs in one query

* fix clang errors

* trying older version of clang

* using clang version 18.1.3

* rsc: chunk ids in query and apply review comments

* undo clang changes as we pinned version of ubuntu

* missed clang file

* rsc: add job_id to error message
  • Loading branch information
AbrarQuazi authored Jan 21, 2025
1 parent d1e50b7 commit 6f23bbd
Show file tree
Hide file tree
Showing 2 changed files with 161 additions and 144 deletions.
303 changes: 160 additions & 143 deletions rust/rsc/src/bin/rsc/read_job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@ use crate::types::{
};
use axum::Json;
use entity::{job, job_use, output_dir, output_file, output_symlink};
use entity::prelude::Blob;
use futures::future::join_all;
use hyper::StatusCode;
use rand::{thread_rng, Rng};
use rsc::database;
use sea_orm::DatabaseTransaction;
use sea_orm::{
prelude::Uuid, ActiveModelTrait, ActiveValue::*, ColumnTrait, DatabaseConnection, DbErr,
EntityTrait, ModelTrait, QueryFilter, TransactionTrait,
EntityTrait, ModelTrait, QueryFilter, TransactionTrait, ConnectionTrait,
};
use std::collections::HashMap;
use std::sync::{Arc, RwLock};
Expand All @@ -34,26 +35,55 @@ async fn record_miss(hash: String, conn: Arc<DatabaseConnection>) {
}

#[tracing::instrument(skip(db, stores))]
async fn resolve_blob(
id: Uuid,
db: &DatabaseTransaction,
async fn resolve_blobs<T: ConnectionTrait>(
ids: &Vec<Uuid>,
db: &T,
stores: &HashMap<Uuid, Arc<dyn blob::DebugBlobStore + Sync + Send>>,
) -> Result<ResolvedBlob, String> {
let Ok(Some(blob)) = entity::prelude::Blob::find_by_id(id).one(db).await else {
return Err(format!("Unable to find blob {} by id", id));
};
) -> Result<HashMap<Uuid, ResolvedBlob>, String> {
//Postgres has a 65,535 parameter limit, ensuring we chunk ID's below that: https://www.postgresql.org/docs/current/limits.html
const CHUNK_SIZE: usize = 50_000;

let Some(store) = stores.get(&blob.store_id) else {
return Err(format!(
"Unable to find backing store {} for blob {}",
blob.store_id, id
));
};
let mut resolved_map = HashMap::new();

for chunk in ids.chunks(CHUNK_SIZE) {
// Fetch chunked blobs in a single query
let blob_map: HashMap<Uuid, entity::blob::Model> = Blob::find()
.filter(entity::blob::Column::Id.is_in(chunk.to_vec()))
.all(db)
.await
.map_err(|e| format!("Failed to query blobs, database error: {}", e))?
.into_iter()
.map(|b| (b.id, b))
.collect();

return Ok(ResolvedBlob {
id: blob.id,
url: store.download_url(blob.key).await,
});
// Ensure we have all requested blobs
for &id in chunk {
if !blob_map.contains_key(&id) {
return Err(format!("Unable to find blob {} by id", id));
}
}

// Resolve all download URLs in parallel
let futures = blob_map.iter().map(|(id, blob)| {
let store_opt = stores.get(&blob.store_id).cloned();
let key = blob.key.clone();

async move {
let store = store_opt.ok_or_else(|| {
format!("Unable to find backing store {} for blob {}", blob.store_id, id)
})?;
let url = store.download_url(key).await;
Ok::<(Uuid, ResolvedBlob), String>((*id, ResolvedBlob { id: *id, url }))
}
});

let results = join_all(futures).await;

let partial_map: HashMap<Uuid, ResolvedBlob> = results.into_iter().collect::<Result<_,_>>()?;
resolved_map.extend(partial_map);
}

Ok(resolved_map)
}

#[tracing::instrument(skip_all)]
Expand All @@ -63,152 +93,139 @@ pub async fn read_job(
blob_stores: HashMap<Uuid, Arc<dyn blob::DebugBlobStore + Sync + Send>>,
) -> (StatusCode, Json<ReadJobResponse>) {
let hash = payload.hash();
let hash_for_spawns = hash.clone();

// TODO: This transaction is quite large with a bunch of "serialized" queries. If read_job
// becomes a bottleneck it should be rewritten such that joining on promises is delayed for as
// long as possible. Another option would be to collect all blob ids ahead of time and make a
// single db query to list them all out instead of a query per blob id.
let result = conn
// Fetch the job and related entities in a single transaction
let fetch_result = conn
.as_ref()
.transaction::<_, (Option<Uuid>, ReadJobResponse), DbErr>(|txn| {
let hash = hash.clone();
.transaction::<_, Option<(job::Model, Vec<output_file::Model>, Vec<output_symlink::Model>, Vec<output_dir::Model>)>, DbErr>(|txn| {
Box::pin(async move {
let Some(matching_job) = job::Entity::find()
.filter(job::Column::Hash.eq(hash.clone()))
.one(txn)
.await?
else {
tracing::info!(%hash, "Miss");
return Ok((None, ReadJobResponse::NoMatch));
return Ok(None);
};

let output_files = matching_job.find_related(output_file::Entity).all(txn).await?;
let output_symlinks = matching_job.find_related(output_symlink::Entity).all(txn).await?;
let output_dirs = matching_job.find_related(output_dir::Entity).all(txn).await?;

tracing::info!(%hash, "Hit");
let output_files = matching_job
.find_related(output_file::Entity)
.all(txn)
.await?
.into_iter()
.map(|m| {
let stores_copy = blob_stores.clone();
async move {
let blob = resolve_blob(m.blob_id, txn, &stores_copy).await?;

Ok(ResolvedBlobFile {
path: m.path,
mode: m.mode,
blob,
})
}
});

let output_files: Result<Vec<ResolvedBlobFile>, String> =
futures::future::join_all(output_files)
.await
.into_iter()
.collect();

let output_files = match output_files {
Err(err) => {
tracing::error! {%err, "Failed to resolve all output files. Resolving job as a cache miss."};
return Ok((None, ReadJobResponse::NoMatch))
},
Ok(files) => files,
};
Ok(Some((matching_job, output_files, output_symlinks, output_dirs)))
})
})
.await;

let hash_copy = hash_for_spawns.clone();
let Some((matching_job, output_files, output_symlinks, output_dirs)) = fetch_result.ok().flatten() else {
tokio::spawn(async move {
record_miss(hash_copy, conn.clone()).await;
});
return (StatusCode::NOT_FOUND, Json(ReadJobResponse::NoMatch));
};

let output_symlinks = matching_job
.find_related(output_symlink::Entity)
.all(txn)
.await?
.into_iter()
.map(|m| Symlink {
path: m.path,
link: m.link,
})
.collect();

let output_dirs = matching_job
.find_related(output_dir::Entity)
.all(txn)
.await?
.into_iter()
.map(|m| Dir {
path: m.path,
mode: m.mode,
hidden: Some(m.hidden),
})
.collect();

let stdout_blob = match resolve_blob(matching_job.stdout_blob_id, txn, &blob_stores).await {
Err(err) => {
tracing::error! {%err, "Failed to resolve stdout blob. Resolving job as a cache miss."};
return Ok((None, ReadJobResponse::NoMatch))
},
Ok(blob) => blob,
};
// Collect all the blob IDs we need to resolve
let mut blob_ids: Vec<Uuid> = output_files.iter().map(|f| f.blob_id).collect();
blob_ids.push(matching_job.stdout_blob_id);
blob_ids.push(matching_job.stderr_blob_id);

let stderr_blob = match resolve_blob(matching_job.stderr_blob_id, txn, &blob_stores).await {
Err(err) => {
tracing::error! {%err, "Failed to resolve stderr blob. Resolving job as a cache miss."};
return Ok((None, ReadJobResponse::NoMatch))
},
Ok(blob) => blob,
};
// Resolve all needed blobs in one go
let resolved_blob_map = match resolve_blobs(&blob_ids, conn.as_ref(), &blob_stores).await {
Ok(map) => map,
Err(err) => {
tracing::error!(%err, "Failed to resolve blobs. Resolving job as a cache miss.");
return (StatusCode::NOT_FOUND, Json(ReadJobResponse::NoMatch));
}
};

Ok((
Some(matching_job.id),
ReadJobResponse::Match {
output_symlinks,
output_dirs,
output_files,
stdout_blob,
stderr_blob,
status: matching_job.status,
runtime: matching_job.runtime,
cputime: matching_job.cputime,
memory: matching_job.memory as u64,
ibytes: matching_job.i_bytes as u64,
obytes: matching_job.o_bytes as u64,
},
))
// Construct ResolvedBlobFile for each output file
let output_files = output_files
.into_iter()
.map(|m| {
let blob_id = m.blob_id;
let job_id = m.job_id;
let resolved_blob = resolved_blob_map.get(&blob_id).cloned().ok_or_else(|| {
format!("Missing resolved blob for {}, from job_id: {}", blob_id, job_id)
})?;
Ok(ResolvedBlobFile {
path: m.path,
mode: m.mode,
blob: resolved_blob,
})
})
.await;
.collect::<Result<Vec<_>, String>>();

match result {
Ok((Some(job_id), response)) => {
// If we get a match we want to record the use but we don't
// want to block sending the response on it so we spawn a task
// to go do that.
let mut status = StatusCode::NOT_FOUND;
if let ReadJobResponse::Match { .. } = response {
status = StatusCode::OK;
let shared_conn = conn.clone();
tokio::spawn(async move {
record_hit(job_id, hash, shared_conn).await;
});
}
(status, Json(response))
let output_files = match output_files {
Ok(files) => files,
Err(err) => {
tracing::error!(%err, "Failed to resolve all output files. Resolving job as a cache miss.");
return (StatusCode::NOT_FOUND, Json(ReadJobResponse::NoMatch));
}
Ok((None, _)) => {
let shared_conn = conn.clone();
tokio::spawn(async move {
record_miss(hash, shared_conn).await;
});
(StatusCode::NOT_FOUND, Json(ReadJobResponse::NoMatch))
};

// Collect other resolved entities
let output_symlinks: Vec<Symlink> = output_symlinks
.into_iter()
.map(|m| Symlink {
path: m.path,
link: m.link,
})
.collect();

let output_dirs: Vec<Dir> = output_dirs
.into_iter()
.map(|m| Dir {
path: m.path,
mode: m.mode,
hidden: Some(m.hidden),
})
.collect();

// Resolve stdout and stderr blobs from the map
let stdout_blob = match resolved_blob_map.get(&matching_job.stdout_blob_id) {
Some(blob) => blob.clone(),
None => {
tracing::error!("Failed to resolve stdout blob. Resolving job as a cache miss.");
return (StatusCode::NOT_FOUND, Json(ReadJobResponse::NoMatch));
}
Err(cause) => {
tracing::error! {
%cause,
"failed to read job"
};
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(ReadJobResponse::NoMatch),
)
};

let stderr_blob = match resolved_blob_map.get(&matching_job.stderr_blob_id) {
Some(blob) => blob.clone(),
None => {
tracing::error!("Failed to resolve stderr blob. Resolving job as a cache miss.");
return (StatusCode::NOT_FOUND, Json(ReadJobResponse::NoMatch));
}
}
};

// Construct response
let response = ReadJobResponse::Match {
output_symlinks,
output_dirs,
output_files,
stdout_blob,
stderr_blob,
status: matching_job.status,
runtime: matching_job.runtime,
cputime: matching_job.cputime,
memory: matching_job.memory as u64,
ibytes: matching_job.i_bytes as u64,
obytes: matching_job.o_bytes as u64,
};

let job_id = matching_job.id;
let hash_copy = hash_for_spawns.clone();
tracing::info!(%hash_copy, "Hit");
tokio::spawn(async move {
record_hit(job_id, hash_copy, conn.clone()).await;
});

(StatusCode::OK, Json(response))
}


#[tracing::instrument(skip_all)]
pub async fn allow_job(
Json(payload): Json<AllowJobPayload>,
Expand Down
2 changes: 1 addition & 1 deletion rust/rsc/src/bin/rsc/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ pub enum PostBlobResponse {
Ok { blobs: Vec<PostBlobResponsePart> },
}

#[derive(Debug, Serialize, Deserialize)]
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ResolvedBlob {
pub id: Uuid,
pub url: String,
Expand Down

0 comments on commit 6f23bbd

Please sign in to comment.