Skip to content

Commit

Permalink
editoast: bulk get cached projection
Browse files Browse the repository at this point in the history
  • Loading branch information
flomonster committed Jul 8, 2024
1 parent c3d986c commit 03edc77
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 24 deletions.
21 changes: 21 additions & 0 deletions editoast/src/redis_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,27 @@ impl RedisConnection {
}
}

/// Get a list of deserializable value from redis
#[tracing::instrument(name = "cache:get_bulk", skip(self), err)]
pub async fn json_get_bulk<T: DeserializeOwned, K: Debug + ToRedisArgs + Send + Sync>(
&mut self,
keys: K,
) -> Result<Vec<Option<T>>> {
let values: Vec<Option<String>> = self.mget(keys).await?;
values
.into_iter()
.map(|value| match value {
Some(v) => match serde_json::from_str::<T>(&v) {
Ok(value) => Ok(Some(value)),
Err(_) => {
Err(RedisError::from((ErrorKind::TypeError, "Expected valid json")).into())
}
},
None => Ok(None),
})
.collect()
}

/// Get a deserializable value from redis with expiry time
#[tracing::instrument(name = "cache:get_with_expiration", skip(self), err)]
pub async fn json_get_ex<T: DeserializeOwned, K: Debug + ToRedisArgs + Send + Sync>(
Expand Down
59 changes: 35 additions & 24 deletions editoast/src/views/v2/train_schedule/projection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,8 +194,8 @@ async fn project_path(
.await?;

// 1. Retrieve cached projection
let mut hit_cache: HashMap<i64, CachedProjectPathTrainResult> = HashMap::new();
let mut miss_cache = HashMap::new();
let mut trains_hash_values = HashMap::new();
let mut trains_details = HashMap::new();

for (train, sim) in trains.iter().zip(simulations) {
let pathfinding_result = pathfinding_from_train(
Expand Down Expand Up @@ -245,15 +245,32 @@ async fn project_path(
&path_routes,
&path_blocks,
);
let projection: Option<CachedProjectPathTrainResult> = redis_conn
.json_get_ex(&hash, CACHE_PROJECTION_EXPIRATION)
.await?;
trains_hash_values.insert(train.id, hash);
trains_details.insert(train.id, train_details);
}
let cached_projections: Vec<Option<CachedProjectPathTrainResult>> = redis_conn
.json_get_bulk(&trains_hash_values.values().collect::<Vec<_>>())
.await?;

let mut hit_cache: HashMap<i64, CachedProjectPathTrainResult> = HashMap::new();
let mut miss_cache = HashMap::new();
for ((train_id, train_details), projection) in
trains_details.into_iter().zip(cached_projections)
{
if let Some(cached) = projection {
hit_cache.insert(train.id, cached);
hit_cache.insert(train_id, cached);
} else {
miss_cache.insert(train.id, train_details);
miss_cache.insert(train_id, train_details.clone());
}
}

// Reset expire time of hit cache
let mut pipe = redis::pipe();
for id in hit_cache.keys() {
pipe.expire(&trains_hash_values[id], CACHE_PROJECTION_EXPIRATION as i64);
}
pipe.query_async(&mut redis_conn).await?;

info!(
nb_hit = hit_cache.len(),
nb_miss = miss_cache.len(),
Expand All @@ -274,31 +291,25 @@ async fn project_path(
);
let signal_updates = signal_updates?;

// 3. Store the projection in the cache
for (id, train_details) in miss_cache {
let hash = train_projection_input_hash(
infra.id,
&infra.version,
&train_details,
&path_track_ranges,
&path_routes,
&path_blocks,
);
let cached = CachedProjectPathTrainResult {
// 3. Store the projection in the cache (using pipeline)
let mut pipe = redis::pipe();
for id in miss_cache.keys() {
let hash = &trains_hash_values[id];
let cached_value = CachedProjectPathTrainResult {
space_time_curves: space_time_curves
.get(&id)
.get(id)
.expect("Space time curves not availabe for train")
.clone(),
signal_updates: signal_updates
.get(&id)
.get(id)
.expect("Signal update not availabe for train")
.clone(),
};
redis_conn
.json_set_ex(&hash, &cached, CACHE_PROJECTION_EXPIRATION)
.await?;
hit_cache.insert(id, cached);
let str_value = serde_json::to_string(&cached_value).unwrap();
pipe.set_ex(hash, &str_value, CACHE_PROJECTION_EXPIRATION);
hit_cache.insert(*id, cached_value);
}
pipe.query_async(&mut redis_conn).await?;

let train_map: HashMap<i64, TrainSchedule> = trains.into_iter().map(|ts| (ts.id, ts)).collect();

Expand Down

0 comments on commit 03edc77

Please sign in to comment.