diff --git a/editoast/src/redis_utils.rs b/editoast/src/redis_utils.rs index 665625acccf..ffab5522d5e 100644 --- a/editoast/src/redis_utils.rs +++ b/editoast/src/redis_utils.rs @@ -77,6 +77,27 @@ impl RedisConnection { } } + /// Get a list of deserializable value from redis + #[tracing::instrument(name = "cache:get_bulk", skip(self), err)] + pub async fn json_get_bulk( + &mut self, + keys: K, + ) -> Result>> { + let values: Vec> = self.mget(keys).await?; + values + .into_iter() + .map(|value| match value { + Some(v) => match serde_json::from_str::(&v) { + Ok(value) => Ok(Some(value)), + Err(_) => { + Err(RedisError::from((ErrorKind::TypeError, "Expected valid json")).into()) + } + }, + None => Ok(None), + }) + .collect() + } + /// Get a deserializable value from redis with expiry time #[tracing::instrument(name = "cache:get_with_expiration", skip(self), err)] pub async fn json_get_ex( diff --git a/editoast/src/views/v2/train_schedule/projection.rs b/editoast/src/views/v2/train_schedule/projection.rs index 355faa52fda..f371951e8f1 100644 --- a/editoast/src/views/v2/train_schedule/projection.rs +++ b/editoast/src/views/v2/train_schedule/projection.rs @@ -194,8 +194,8 @@ async fn project_path( .await?; // 1. Retrieve cached projection - let mut hit_cache: HashMap = HashMap::new(); - let mut miss_cache = HashMap::new(); + let mut trains_hash_values = HashMap::new(); + let mut trains_details = HashMap::new(); for (train, sim) in trains.iter().zip(simulations) { let pathfinding_result = pathfinding_from_train( @@ -245,15 +245,32 @@ async fn project_path( &path_routes, &path_blocks, ); - let projection: Option = redis_conn - .json_get_ex(&hash, CACHE_PROJECTION_EXPIRATION) - .await?; + trains_hash_values.insert(train.id, hash); + trains_details.insert(train.id, train_details); + } + let cached_projections: Vec> = redis_conn + .json_get_bulk(&trains_hash_values.values().collect::>()) + .await?; + + let mut hit_cache: HashMap = HashMap::new(); + let mut miss_cache = HashMap::new(); + for ((train_id, train_details), projection) in + trains_details.into_iter().zip(cached_projections) + { if let Some(cached) = projection { - hit_cache.insert(train.id, cached); + hit_cache.insert(train_id, cached); } else { - miss_cache.insert(train.id, train_details); + miss_cache.insert(train_id, train_details.clone()); } } + + // Reset expire time of hit cache + let mut pipe = redis::pipe(); + for id in hit_cache.keys() { + pipe.expire(&trains_hash_values[id], CACHE_PROJECTION_EXPIRATION as i64); + } + pipe.query_async(&mut redis_conn).await?; + info!( nb_hit = hit_cache.len(), nb_miss = miss_cache.len(), @@ -274,31 +291,25 @@ async fn project_path( ); let signal_updates = signal_updates?; - // 3. Store the projection in the cache - for (id, train_details) in miss_cache { - let hash = train_projection_input_hash( - infra.id, - &infra.version, - &train_details, - &path_track_ranges, - &path_routes, - &path_blocks, - ); - let cached = CachedProjectPathTrainResult { + // 3. Store the projection in the cache (using pipeline) + let mut pipe = redis::pipe(); + for id in miss_cache.keys() { + let hash = &trains_hash_values[id]; + let cached_value = CachedProjectPathTrainResult { space_time_curves: space_time_curves - .get(&id) + .get(id) .expect("Space time curves not availabe for train") .clone(), signal_updates: signal_updates - .get(&id) + .get(id) .expect("Signal update not availabe for train") .clone(), }; - redis_conn - .json_set_ex(&hash, &cached, CACHE_PROJECTION_EXPIRATION) - .await?; - hit_cache.insert(id, cached); + let str_value = serde_json::to_string(&cached_value).unwrap(); + pipe.set_ex(hash, &str_value, CACHE_PROJECTION_EXPIRATION); + hit_cache.insert(*id, cached_value); } + pipe.query_async(&mut redis_conn).await?; let train_map: HashMap = trains.into_iter().map(|ts| (ts.id, ts)).collect();