diff --git a/control-plane/rest/Cargo.toml b/control-plane/rest/Cargo.toml index a3f26f862..950c770a4 100644 --- a/control-plane/rest/Cargo.toml +++ b/control-plane/rest/Cargo.toml @@ -20,6 +20,7 @@ rustls = { version = "0.23.19", default-features = false } rustls-pemfile = "2.2.0" actix-web = { version = "4.9.0", features = ["rustls-0_23"] } actix-service = "2.0.2" +tokio = { version = "1.41.0", features = ["sync"] } opentelemetry = { version = "0.26.0" } tracing-actix-web = { version = "0.7.14", features = ["opentelemetry_0_26"] } tracing = "0.1.40" diff --git a/control-plane/rest/service/src/health/core_state.rs b/control-plane/rest/service/src/health/core_state.rs index 46c02df1e..2aaadf19b 100644 --- a/control-plane/rest/service/src/health/core_state.rs +++ b/control-plane/rest/service/src/health/core_state.rs @@ -1,14 +1,12 @@ use crate::v0::core_grpc; use grpc::operations::node::traits::NodeOperations; -use std::{ - sync::RwLock, - time::{Duration, Instant}, -}; +use std::time::{Duration, Instant}; +use tokio::sync::Mutex; /// This is a type to cache the liveness of the agent-core service. /// This is meant to be wrapped inside an Arc and used across threads. pub struct CachedCoreState { - state: RwLock, + state: Mutex, cache_duration: Duration, } @@ -18,13 +16,23 @@ struct ServerState { last_updated: Instant, } +impl ServerState { + /// Update the state of the agent-core service, or assume it's unavailable if something + /// went wrong. + async fn update_or_assume_unavailable(&mut self) { + let new_value = core_grpc().node().probe(None).await.unwrap_or(false); + self.is_live = new_value; + self.last_updated = Instant::now(); + } +} + impl CachedCoreState { /// Create a new cache for serving readiness health checks based on agent-core health. pub async fn new(cache_duration: Duration) -> Self { let agent_core_is_live = core_grpc().node().probe(None).await.unwrap_or(false); CachedCoreState { - state: RwLock::new(ServerState { + state: Mutex::new(ServerState { is_live: agent_core_is_live, last_updated: Instant::now(), }), @@ -35,24 +43,12 @@ impl CachedCoreState { /// Get the cached state of the agent-core service, or assume it's unavailable if something /// went wrong. pub async fn get_or_assume_unavailable(&self) -> bool { - let should_update = { - let state = self.state.read().unwrap(); - state.last_updated.elapsed() >= self.cache_duration - }; + let mut state = self.state.lock().await; - if should_update { - self.update_or_assume_unavailable().await; + if state.last_updated.elapsed() >= self.cache_duration { + state.update_or_assume_unavailable().await; } - self.state.read().unwrap().is_live - } - - /// Update the state of the agent-core service, or assume it's unavailable if something - /// went wrong. - pub async fn update_or_assume_unavailable(&self) { - let new_value = core_grpc().node().probe(None).await.unwrap_or(false); - let mut state = self.state.write().unwrap(); - state.is_live = new_value; - state.last_updated = Instant::now(); + state.is_live } }