Skip to content

Commit

Permalink
feat(rest): add health API for readiness and liveness probes
Browse files Browse the repository at this point in the history
Signed-off-by: Niladri Halder <[email protected]>
  • Loading branch information
niladrih committed Dec 3, 2024
1 parent 02aa5aa commit 2817ab5
Show file tree
Hide file tree
Showing 5 changed files with 141 additions and 12 deletions.
1 change: 1 addition & 0 deletions control-plane/rest/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ path = "./src/lib.rs"
rustls = { version = "0.23.19", default-features = false }
rustls-pemfile = "2.2.0"
actix-web = { version = "4.9.0", features = ["rustls-0_23"] }
tokio = { version = "1.41.0", default-features = false, features = ["macros"] }
actix-service = "2.0.2"
opentelemetry = { version = "0.26.0" }
tracing-actix-web = { version = "0.7.14", features = ["opentelemetry_0_26"] }
Expand Down
58 changes: 58 additions & 0 deletions control-plane/rest/service/src/health/core_state.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
use crate::v0::core_grpc;
use grpc::operations::node::traits::NodeOperations;
use std::{
sync::RwLock,
time::{Duration, Instant},
};

/// This is a type to cache the liveness of the agent-core service.
/// This is meant to wrapped inside an Arc and used across threads.
pub struct CachedCoreState {
state: RwLock<ServerState>,
cache_duration: Duration,
}

/// This type remembers a liveness state, and when this data was refreshed.
struct ServerState {
is_live: bool,
last_updated: Instant,
}

impl CachedCoreState {
/// Create a new cache for serving readiness health checks based on agent-core health.
pub async fn new(cache_duration: Duration) -> Self {
let agent_core_is_live = core_grpc().node().probe(None).await.unwrap_or(false);

CachedCoreState {
state: RwLock::new(ServerState {
is_live: agent_core_is_live,
last_updated: Instant::now(),
}),
cache_duration,
}
}

/// Get the cached state of the agent-core service, or assume it's unavailable if something
/// went wrong.
pub async fn get_or_assume_unavailable(&self) -> bool {
let should_update = {
let state = self.state.read().unwrap();
state.last_updated.elapsed() >= self.cache_duration
};

if should_update {
self.update_or_assume_unavailable().await;
}

self.state.read().unwrap().is_live
}

/// Update the state of the agent-core service, or assume it's unavailable if something
/// went wrong.
pub async fn update_or_assume_unavailable(&self) {
let new_value = core_grpc().node().probe(None).await.unwrap_or(false);
let mut state = self.state.write().unwrap();
state.is_live = new_value;
state.last_updated = Instant::now();
}
}
28 changes: 28 additions & 0 deletions control-plane/rest/service/src/health/handlers.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
use crate::CachedCoreState;
use actix_web::{get, web::Data, HttpResponse, Responder};

/// Liveness probe check. Failure will result in Pod restart. 200 on success.
#[get("/live")]
async fn liveness(_cached_core_state: Data<CachedCoreState>) -> impl Responder {
HttpResponse::Ok()
.content_type("text/plain; charset=utf-8")
.insert_header(("X-Content-Type-Options", "nosniff"))
.body("live")
}

/// Readiness probe check. Failure will result in removal of Container from Kubernetes service
/// target pool. 200 on success, 503 on failure.
#[get("/ready")]
async fn readiness(cached_core_state: Data<CachedCoreState>) -> HttpResponse {
if cached_core_state.get_or_assume_unavailable().await {
return HttpResponse::Ok()
.content_type("text/plain; charset=utf-8")
.insert_header(("X-Content-Type-Options", "nosniff"))
.body("ready");
}

HttpResponse::ServiceUnavailable()
.content_type("text/plain; charset=utf-8")
.insert_header(("X-Content-Type-Options", "nosniff"))
.body("not ready")
}
4 changes: 4 additions & 0 deletions control-plane/rest/service/src/health/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
/// Has tools to collect the liveness state of the agent-core service.
pub mod core_state;
/// Actix request handlers for health checks.
pub mod handlers;
62 changes: 50 additions & 12 deletions control-plane/rest/service/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,30 @@
mod authentication;
mod health;
mod v0;

use crate::v0::{CORE_CLIENT, JSON_GRPC_CLIENT};
use crate::{
health::{
core_state::CachedCoreState,
handlers::{liveness, readiness},
},
v0::{CORE_CLIENT, JSON_GRPC_CLIENT},
};
use actix_service::ServiceFactory;
use actix_web::{
body::MessageBody,
dev::{ServiceRequest, ServiceResponse},
middleware, HttpServer,
middleware,
web::Data,
HttpServer,
};
use clap::Parser;
use grpc::{client::CoreClient, operations::jsongrpc::client::JsonGrpcClient};
use http::Uri;
use rustls::{pki_types::PrivateKeyDer, ServerConfig};
use rustls_pemfile::{certs, rsa_private_keys};
use std::{fs::File, io::BufReader};
use std::{fs::File, io::BufReader, net::SocketAddr, time::Duration};
use stor_port::transport_api::{RequestMinTimeout, TimeoutOptions};
use tokio::try_join;
use utils::{
tracing_telemetry::{FmtLayer, FmtStyle, KeyValue},
DEFAULT_GRPC_CLIENT_ADDR,
Expand All @@ -30,10 +40,18 @@ pub(crate) struct CliArgs {
#[clap(long)]
http: Option<String>,

/// The bind address for the health REST server.
#[clap(long, default_value = "[::]:9091")]
health_endpoint: SocketAddr,

/// The CORE gRPC Server URL or address to connect to the services.
#[clap(long, short = 'z', default_value = DEFAULT_GRPC_CLIENT_ADDR)]
core_grpc: Uri,

/// Set the maximum frequency of probing the agent-core for a liveness check.
#[arg(value_parser = humantime::parse_duration, default_value = "2m")]
core_liveness_check_frequency: Duration,

/// The json gRPC Server URL or address to connect to the service.
#[clap(long, short = 'J')]
json_grpc: Option<Uri>,
Expand Down Expand Up @@ -78,6 +96,10 @@ pub(crate) struct CliArgs {
#[clap(long, short, default_value_t = num_cpus::get_physical())]
workers: usize,

/// Set the number of health service workers. Uses a minimum of 1 worker.
#[clap(long, default_value_t = 1)]
health_workers: usize,

/// Set the max number of workers to start.
/// The value 0 means the number of available physical CPUs is used.
#[clap(long, short, default_value = utils::DEFAULT_REST_MAX_WORKER_THREADS)]
Expand Down Expand Up @@ -229,7 +251,7 @@ async fn main() -> anyhow::Result<()> {

// Initialize the core client to be used in rest
CORE_CLIENT
.set(CoreClient::new(CliArgs::args().core_grpc, timeout_opts()).await)
.set(CoreClient::new(cli_args.core_grpc, timeout_opts()).await)
.ok()
.expect("Expect to be initialised only once");

Expand All @@ -241,18 +263,34 @@ async fn main() -> anyhow::Result<()> {
.expect("Expect to be initialised only once");
}

let server =
let main_server =
HttpServer::new(app).bind_rustls_0_23(CliArgs::args().https, get_certificates()?)?;
let result = if let Some(http) = CliArgs::args().http {
server.bind(http).map_err(anyhow::Error::from)?
let main_server = if let Some(http) = CliArgs::args().http {
main_server.bind(http).map_err(anyhow::Error::from)?
} else {
server
main_server
}
.workers(workers(&CliArgs::args()))
.run()
.await;
.workers(workers(&CliArgs::args()));

let cached_core_state =
Data::new(CachedCoreState::new(CliArgs::args().core_liveness_check_frequency).await);
let health_server = HttpServer::new(move || {
actix_web::App::new()
.app_data(cached_core_state.clone())
.service(liveness)
.service(readiness)
.wrap(tracing_actix_web::TracingLogger::default())
.wrap(middleware::Logger::default())
})
.bind(CliArgs::args().health_endpoint)?
// Use a minimum of 1 worker.
.workers(CliArgs::args().health_workers.max(1));

let result = try_join!(main_server.run(), health_server.run());

utils::tracing_telemetry::flush_traces();

result.map_err(|e| e.into())
result?;

Ok(())
}

0 comments on commit 2817ab5

Please sign in to comment.