From 8245991d96a958f0f7ede053563288cdb800e562 Mon Sep 17 00:00:00 2001 From: Niladri Halder Date: Mon, 9 Dec 2024 06:41:56 +0000 Subject: [PATCH] fix(rest): make readiness check atomic The readiness check API depends on the agent-core service's Node gRPC endpoint. The current RwLock lets multiple tokio threads probe the agent-core. This is not ideal. This change makes the get operation atomic, so that agent-core state cache updates could be performed by only a single thread, and the updated cache should feed the next few ready calls. Signed-off-by: Niladri Halder --- control-plane/rest/Cargo.toml | 1 + .../rest/service/src/health/core_state.rs | 40 +++++++++---------- 2 files changed, 19 insertions(+), 22 deletions(-) diff --git a/control-plane/rest/Cargo.toml b/control-plane/rest/Cargo.toml index a3f26f862..950c770a4 100644 --- a/control-plane/rest/Cargo.toml +++ b/control-plane/rest/Cargo.toml @@ -20,6 +20,7 @@ rustls = { version = "0.23.19", default-features = false } rustls-pemfile = "2.2.0" actix-web = { version = "4.9.0", features = ["rustls-0_23"] } actix-service = "2.0.2" +tokio = { version = "1.41.0", features = ["sync"] } opentelemetry = { version = "0.26.0" } tracing-actix-web = { version = "0.7.14", features = ["opentelemetry_0_26"] } tracing = "0.1.40" diff --git a/control-plane/rest/service/src/health/core_state.rs b/control-plane/rest/service/src/health/core_state.rs index 46c02df1e..2aaadf19b 100644 --- a/control-plane/rest/service/src/health/core_state.rs +++ b/control-plane/rest/service/src/health/core_state.rs @@ -1,14 +1,12 @@ use crate::v0::core_grpc; use grpc::operations::node::traits::NodeOperations; -use std::{ - sync::RwLock, - time::{Duration, Instant}, -}; +use std::time::{Duration, Instant}; +use tokio::sync::Mutex; /// This is a type to cache the liveness of the agent-core service. /// This is meant to be wrapped inside an Arc and used across threads. pub struct CachedCoreState { - state: RwLock, + state: Mutex, cache_duration: Duration, } @@ -18,13 +16,23 @@ struct ServerState { last_updated: Instant, } +impl ServerState { + /// Update the state of the agent-core service, or assume it's unavailable if something + /// went wrong. + async fn update_or_assume_unavailable(&mut self) { + let new_value = core_grpc().node().probe(None).await.unwrap_or(false); + self.is_live = new_value; + self.last_updated = Instant::now(); + } +} + impl CachedCoreState { /// Create a new cache for serving readiness health checks based on agent-core health. pub async fn new(cache_duration: Duration) -> Self { let agent_core_is_live = core_grpc().node().probe(None).await.unwrap_or(false); CachedCoreState { - state: RwLock::new(ServerState { + state: Mutex::new(ServerState { is_live: agent_core_is_live, last_updated: Instant::now(), }), @@ -35,24 +43,12 @@ impl CachedCoreState { /// Get the cached state of the agent-core service, or assume it's unavailable if something /// went wrong. pub async fn get_or_assume_unavailable(&self) -> bool { - let should_update = { - let state = self.state.read().unwrap(); - state.last_updated.elapsed() >= self.cache_duration - }; + let mut state = self.state.lock().await; - if should_update { - self.update_or_assume_unavailable().await; + if state.last_updated.elapsed() >= self.cache_duration { + state.update_or_assume_unavailable().await; } - self.state.read().unwrap().is_live - } - - /// Update the state of the agent-core service, or assume it's unavailable if something - /// went wrong. - pub async fn update_or_assume_unavailable(&self) { - let new_value = core_grpc().node().probe(None).await.unwrap_or(false); - let mut state = self.state.write().unwrap(); - state.is_live = new_value; - state.last_updated = Instant::now(); + state.is_live } }