From a794199cff0c4fab00f90c857d493abf04f1ddc5 Mon Sep 17 00:00:00 2001 From: Niladri Halder Date: Thu, 5 Dec 2024 05:49:56 +0000 Subject: [PATCH] feat(rest): add health API for readiness and liveness probes Signed-off-by: Niladri Halder --- .../rest/service/src/health/core_state.rs | 58 +++++++++++++++++++ .../rest/service/src/health/handlers.rs | 28 +++++++++ control-plane/rest/service/src/health/mod.rs | 4 ++ control-plane/rest/service/src/main.rs | 37 +++++++++--- 4 files changed, 118 insertions(+), 9 deletions(-) create mode 100644 control-plane/rest/service/src/health/core_state.rs create mode 100644 control-plane/rest/service/src/health/handlers.rs create mode 100644 control-plane/rest/service/src/health/mod.rs diff --git a/control-plane/rest/service/src/health/core_state.rs b/control-plane/rest/service/src/health/core_state.rs new file mode 100644 index 000000000..46c02df1e --- /dev/null +++ b/control-plane/rest/service/src/health/core_state.rs @@ -0,0 +1,58 @@ +use crate::v0::core_grpc; +use grpc::operations::node::traits::NodeOperations; +use std::{ + sync::RwLock, + time::{Duration, Instant}, +}; + +/// This is a type to cache the liveness of the agent-core service. +/// This is meant to be wrapped inside an Arc and used across threads. +pub struct CachedCoreState { + state: RwLock, + cache_duration: Duration, +} + +/// This type remembers a liveness state, and when this data was refreshed. +struct ServerState { + is_live: bool, + last_updated: Instant, +} + +impl CachedCoreState { + /// Create a new cache for serving readiness health checks based on agent-core health. + pub async fn new(cache_duration: Duration) -> Self { + let agent_core_is_live = core_grpc().node().probe(None).await.unwrap_or(false); + + CachedCoreState { + state: RwLock::new(ServerState { + is_live: agent_core_is_live, + last_updated: Instant::now(), + }), + cache_duration, + } + } + + /// Get the cached state of the agent-core service, or assume it's unavailable if something + /// went wrong. + pub async fn get_or_assume_unavailable(&self) -> bool { + let should_update = { + let state = self.state.read().unwrap(); + state.last_updated.elapsed() >= self.cache_duration + }; + + if should_update { + self.update_or_assume_unavailable().await; + } + + self.state.read().unwrap().is_live + } + + /// Update the state of the agent-core service, or assume it's unavailable if something + /// went wrong. + pub async fn update_or_assume_unavailable(&self) { + let new_value = core_grpc().node().probe(None).await.unwrap_or(false); + let mut state = self.state.write().unwrap(); + state.is_live = new_value; + state.last_updated = Instant::now(); + } +} diff --git a/control-plane/rest/service/src/health/handlers.rs b/control-plane/rest/service/src/health/handlers.rs new file mode 100644 index 000000000..5aa2f1082 --- /dev/null +++ b/control-plane/rest/service/src/health/handlers.rs @@ -0,0 +1,28 @@ +use crate::CachedCoreState; +use actix_web::{get, web::Data, HttpResponse, Responder}; + +/// Liveness probe check. Failure will result in Pod restart. 200 on success. +#[get("/live")] +async fn liveness(_cached_core_state: Data) -> impl Responder { + HttpResponse::Ok() + .content_type("text/plain; charset=utf-8") + .insert_header(("X-Content-Type-Options", "nosniff")) + .body("live") +} + +/// Readiness probe check. Failure will result in removal of Container from Kubernetes service +/// target pool. 200 on success, 503 on failure. +#[get("/ready")] +async fn readiness(cached_core_state: Data) -> HttpResponse { + if cached_core_state.get_or_assume_unavailable().await { + return HttpResponse::Ok() + .content_type("text/plain; charset=utf-8") + .insert_header(("X-Content-Type-Options", "nosniff")) + .body("ready"); + } + + HttpResponse::ServiceUnavailable() + .content_type("text/plain; charset=utf-8") + .insert_header(("X-Content-Type-Options", "nosniff")) + .body("not ready") +} diff --git a/control-plane/rest/service/src/health/mod.rs b/control-plane/rest/service/src/health/mod.rs new file mode 100644 index 000000000..da63f89a5 --- /dev/null +++ b/control-plane/rest/service/src/health/mod.rs @@ -0,0 +1,4 @@ +/// Has tools to collect the liveness state of the agent-core service. +pub mod core_state; +/// Actix request handlers for health checks. +pub mod handlers; diff --git a/control-plane/rest/service/src/main.rs b/control-plane/rest/service/src/main.rs index 4a1a68d57..7f2be800f 100644 --- a/control-plane/rest/service/src/main.rs +++ b/control-plane/rest/service/src/main.rs @@ -1,19 +1,28 @@ mod authentication; +mod health; mod v0; -use crate::v0::{CORE_CLIENT, JSON_GRPC_CLIENT}; +use crate::{ + health::{ + core_state::CachedCoreState, + handlers::{liveness, readiness}, + }, + v0::{CORE_CLIENT, JSON_GRPC_CLIENT}, +}; use actix_service::ServiceFactory; use actix_web::{ body::MessageBody, dev::{ServiceRequest, ServiceResponse}, - middleware, HttpServer, + middleware, + web::Data, + HttpServer, }; use clap::Parser; use grpc::{client::CoreClient, operations::jsongrpc::client::JsonGrpcClient}; use http::Uri; use rustls::{pki_types::PrivateKeyDer, ServerConfig}; use rustls_pemfile::{certs, rsa_private_keys}; -use std::{fs::File, io::BufReader}; +use std::{fs::File, io::BufReader, time::Duration}; use stor_port::transport_api::{RequestMinTimeout, TimeoutOptions}; use utils::{ tracing_telemetry::{FmtLayer, FmtStyle, KeyValue}, @@ -34,6 +43,10 @@ pub(crate) struct CliArgs { #[clap(long, short = 'z', default_value = DEFAULT_GRPC_CLIENT_ADDR)] core_grpc: Uri, + /// Set the frequency of probing the agent-core for a liveness check. + #[arg(long = "core-health-freq", value_parser = humantime::parse_duration, default_value = "2m")] + core_liveness_check_frequency: Duration, + /// The json gRPC Server URL or address to connect to the service. #[clap(long, short = 'J')] json_grpc: Option, @@ -219,20 +232,26 @@ async fn main() -> anyhow::Result<()> { .with_tracing_tags(cli_args.tracing_tags.clone()) .init("rest-server"); + // Initialize the core client to be used in rest + CORE_CLIENT + .set(CoreClient::new(cli_args.core_grpc, timeout_opts()).await) + .ok() + .expect("Expect to be initialised only once"); + + let cached_core_state = + Data::new(CachedCoreState::new(cli_args.core_liveness_check_frequency).await); + let app = move || { actix_web::App::new() + .app_data(cached_core_state.clone()) + .service(liveness) + .service(readiness) .wrap(tracing_actix_web::TracingLogger::default()) .wrap(middleware::Logger::default()) .app_data(authentication::init(get_jwk_path())) .configure_api(&v0::configure_api) }; - // Initialize the core client to be used in rest - CORE_CLIENT - .set(CoreClient::new(CliArgs::args().core_grpc, timeout_opts()).await) - .ok() - .expect("Expect to be initialised only once"); - // Initialize the json grpc client to be used in rest if let Some(json_grpc) = CliArgs::args().json_grpc { JSON_GRPC_CLIENT