diff --git a/control-plane/rest/service/src/health/core_state.rs b/control-plane/rest/service/src/health/core_state.rs new file mode 100644 index 000000000..46c02df1e --- /dev/null +++ b/control-plane/rest/service/src/health/core_state.rs @@ -0,0 +1,58 @@ +use crate::v0::core_grpc; +use grpc::operations::node::traits::NodeOperations; +use std::{ + sync::RwLock, + time::{Duration, Instant}, +}; + +/// This is a type to cache the liveness of the agent-core service. +/// This is meant to be wrapped inside an Arc and used across threads. +pub struct CachedCoreState { + state: RwLock, + cache_duration: Duration, +} + +/// This type remembers a liveness state, and when this data was refreshed. +struct ServerState { + is_live: bool, + last_updated: Instant, +} + +impl CachedCoreState { + /// Create a new cache for serving readiness health checks based on agent-core health. + pub async fn new(cache_duration: Duration) -> Self { + let agent_core_is_live = core_grpc().node().probe(None).await.unwrap_or(false); + + CachedCoreState { + state: RwLock::new(ServerState { + is_live: agent_core_is_live, + last_updated: Instant::now(), + }), + cache_duration, + } + } + + /// Get the cached state of the agent-core service, or assume it's unavailable if something + /// went wrong. + pub async fn get_or_assume_unavailable(&self) -> bool { + let should_update = { + let state = self.state.read().unwrap(); + state.last_updated.elapsed() >= self.cache_duration + }; + + if should_update { + self.update_or_assume_unavailable().await; + } + + self.state.read().unwrap().is_live + } + + /// Update the state of the agent-core service, or assume it's unavailable if something + /// went wrong. + pub async fn update_or_assume_unavailable(&self) { + let new_value = core_grpc().node().probe(None).await.unwrap_or(false); + let mut state = self.state.write().unwrap(); + state.is_live = new_value; + state.last_updated = Instant::now(); + } +} diff --git a/control-plane/rest/service/src/health/handlers.rs b/control-plane/rest/service/src/health/handlers.rs new file mode 100644 index 000000000..5aa2f1082 --- /dev/null +++ b/control-plane/rest/service/src/health/handlers.rs @@ -0,0 +1,28 @@ +use crate::CachedCoreState; +use actix_web::{get, web::Data, HttpResponse, Responder}; + +/// Liveness probe check. Failure will result in Pod restart. 200 on success. +#[get("/live")] +async fn liveness(_cached_core_state: Data) -> impl Responder { + HttpResponse::Ok() + .content_type("text/plain; charset=utf-8") + .insert_header(("X-Content-Type-Options", "nosniff")) + .body("live") +} + +/// Readiness probe check. Failure will result in removal of Container from Kubernetes service +/// target pool. 200 on success, 503 on failure. +#[get("/ready")] +async fn readiness(cached_core_state: Data) -> HttpResponse { + if cached_core_state.get_or_assume_unavailable().await { + return HttpResponse::Ok() + .content_type("text/plain; charset=utf-8") + .insert_header(("X-Content-Type-Options", "nosniff")) + .body("ready"); + } + + HttpResponse::ServiceUnavailable() + .content_type("text/plain; charset=utf-8") + .insert_header(("X-Content-Type-Options", "nosniff")) + .body("not ready") +} diff --git a/control-plane/rest/service/src/health/mod.rs b/control-plane/rest/service/src/health/mod.rs new file mode 100644 index 000000000..da63f89a5 --- /dev/null +++ b/control-plane/rest/service/src/health/mod.rs @@ -0,0 +1,4 @@ +/// Has tools to collect the liveness state of the agent-core service. +pub mod core_state; +/// Actix request handlers for health checks. +pub mod handlers; diff --git a/control-plane/rest/service/src/main.rs b/control-plane/rest/service/src/main.rs index 4a1a68d57..7f2be800f 100644 --- a/control-plane/rest/service/src/main.rs +++ b/control-plane/rest/service/src/main.rs @@ -1,19 +1,28 @@ mod authentication; +mod health; mod v0; -use crate::v0::{CORE_CLIENT, JSON_GRPC_CLIENT}; +use crate::{ + health::{ + core_state::CachedCoreState, + handlers::{liveness, readiness}, + }, + v0::{CORE_CLIENT, JSON_GRPC_CLIENT}, +}; use actix_service::ServiceFactory; use actix_web::{ body::MessageBody, dev::{ServiceRequest, ServiceResponse}, - middleware, HttpServer, + middleware, + web::Data, + HttpServer, }; use clap::Parser; use grpc::{client::CoreClient, operations::jsongrpc::client::JsonGrpcClient}; use http::Uri; use rustls::{pki_types::PrivateKeyDer, ServerConfig}; use rustls_pemfile::{certs, rsa_private_keys}; -use std::{fs::File, io::BufReader}; +use std::{fs::File, io::BufReader, time::Duration}; use stor_port::transport_api::{RequestMinTimeout, TimeoutOptions}; use utils::{ tracing_telemetry::{FmtLayer, FmtStyle, KeyValue}, @@ -34,6 +43,10 @@ pub(crate) struct CliArgs { #[clap(long, short = 'z', default_value = DEFAULT_GRPC_CLIENT_ADDR)] core_grpc: Uri, + /// Set the frequency of probing the agent-core for a liveness check. + #[arg(long = "core-health-freq", value_parser = humantime::parse_duration, default_value = "2m")] + core_liveness_check_frequency: Duration, + /// The json gRPC Server URL or address to connect to the service. #[clap(long, short = 'J')] json_grpc: Option, @@ -219,20 +232,26 @@ async fn main() -> anyhow::Result<()> { .with_tracing_tags(cli_args.tracing_tags.clone()) .init("rest-server"); + // Initialize the core client to be used in rest + CORE_CLIENT + .set(CoreClient::new(cli_args.core_grpc, timeout_opts()).await) + .ok() + .expect("Expect to be initialised only once"); + + let cached_core_state = + Data::new(CachedCoreState::new(cli_args.core_liveness_check_frequency).await); + let app = move || { actix_web::App::new() + .app_data(cached_core_state.clone()) + .service(liveness) + .service(readiness) .wrap(tracing_actix_web::TracingLogger::default()) .wrap(middleware::Logger::default()) .app_data(authentication::init(get_jwk_path())) .configure_api(&v0::configure_api) }; - // Initialize the core client to be used in rest - CORE_CLIENT - .set(CoreClient::new(CliArgs::args().core_grpc, timeout_opts()).await) - .ok() - .expect("Expect to be initialised only once"); - // Initialize the json grpc client to be used in rest if let Some(json_grpc) = CliArgs::args().json_grpc { JSON_GRPC_CLIENT diff --git a/deployer/src/infra/rest.rs b/deployer/src/infra/rest.rs index 4e3bce43a..c4ba70254 100644 --- a/deployer/src/infra/rest.rs +++ b/deployer/src/infra/rest.rs @@ -44,6 +44,10 @@ impl ComponentAction for Rest { } } + if let Some(core_health_freq) = &options.rest_core_health_freq { + binary = binary.with_args(vec!["--core-health-freq", core_health_freq]); + } + if cfg.container_exists("jaeger") { let jaeger_config = format!("jaeger.{}:4317", cfg.get_name()); binary = binary.with_args(vec!["--jaeger", &jaeger_config]) diff --git a/deployer/src/lib.rs b/deployer/src/lib.rs index ba6ea0f73..c92a87589 100644 --- a/deployer/src/lib.rs +++ b/deployer/src/lib.rs @@ -136,6 +136,10 @@ pub struct StartOptions { #[clap(long, conflicts_with = "no_rest")] pub rest_jwk: Option, + /// Set the rest-to-core health probe frequency on the rest. + #[arg(long)] + pub rest_core_health_freq: Option, + /// Use the following image pull policy when creating containers from images. #[clap(long, default_value = "ifnotpresent")] pub image_pull_policy: composer::ImagePullPolicy, diff --git a/tests/bdd/common/deployer.py b/tests/bdd/common/deployer.py index e278dd9d2..d129636b9 100644 --- a/tests/bdd/common/deployer.py +++ b/tests/bdd/common/deployer.py @@ -38,6 +38,7 @@ class StartOptions: no_min_timeouts: bool = False rust_log: str = None rust_log_silence: str = None + rest_core_health_freq: str = None def args(self): args = [ @@ -99,6 +100,9 @@ def args(self): if rust_log_silence is not None: args.append(f"--rust-log-silence={rust_log_silence}") + if self.rest_core_health_freq: + args.append(f"--rest-core-health-freq={self.rest_core_health_freq}") + agent_arg = "--agents=Core" if self.ha_node_agent: agent_arg += ",HaNode" @@ -106,6 +110,7 @@ def args(self): agent_arg += ",HaCluster" if self.ha_cluster_agent_fast is not None: args.append(f"--cluster-fast-requeue={self.ha_cluster_agent_fast}") + args.append(agent_arg) return args @@ -139,6 +144,7 @@ def start( no_min_timeouts=False, rust_log: str = None, rust_log_silence: str = None, + rest_core_health_freq: str = None, ): options = StartOptions( io_engines, @@ -165,6 +171,7 @@ def start( no_min_timeouts=no_min_timeouts, rust_log=rust_log, rust_log_silence=rust_log_silence, + rest_core_health_freq=rest_core_health_freq, ) pytest.deployer_options = options Deployer.start_with_opts(options) diff --git a/tests/bdd/features/health_probes/readiness_probe.feature b/tests/bdd/features/health_probes/readiness_probe.feature new file mode 100644 index 000000000..da8849379 --- /dev/null +++ b/tests/bdd/features/health_probes/readiness_probe.feature @@ -0,0 +1,12 @@ +Feature: Readiness Probe + + Background: + Given a running agent-core service + And a running REST service with "--core-health-freq" set to "4s" + + Scenario: The REST API /ready service should not update its readiness status more than once in 4s + Given agent-core service is available + And the REST service returns a 200 status code for an HTTP GET request to the /ready endpoint + When the agent-core service is brought down forcefully + Then the REST service returns 200 for /ready endpoint for 2 more second + And after a delay of 4s the REST service returns 503 for /ready endpoint for the following 5s diff --git a/tests/bdd/features/health_probes/test_readiness_probe.py b/tests/bdd/features/health_probes/test_readiness_probe.py new file mode 100644 index 000000000..c0dfa4daa --- /dev/null +++ b/tests/bdd/features/health_probes/test_readiness_probe.py @@ -0,0 +1,100 @@ +"""Readiness Probe feature tests.""" + +import time + +import pytest +from common.deployer import Deployer +from common.docker import Docker +from pytest_bdd import given, scenario, then, when +from requests import get as http_get +from retrying import retry + +READINESS_API_ENDPOINT = "http://localhost:8081/ready" + + +@pytest.fixture(scope="module") +def setup(): + Deployer.start(io_engines=1, rest_core_health_freq="4s") + yield + Deployer.stop() + + +@scenario( + "readiness_probe.feature", + "The REST API /ready service should not update its readiness status more than once in 4s", +) +def test_the_rest_api_ready_service_should_not_update_its_readiness_status_more_than_once_in_4s( + setup, +): + """The REST API /ready service should not update its readiness status more than once in 4s.""" + + +@given('a running REST service with "--core-health-freq" set to "4s"') +def a_running_rest_service(setup): + """a running REST service with "--core-health-freq" set to "4s".""" + + +@given("a running agent-core service") +def a_running_agent_core_service(setup): + """a running agent-core service.""" + + +@given("agent-core service is available") +def agent_core_service_is_available(setup): + """agent-core service is available.""" + + +@given( + "the REST service returns a 200 status code for an HTTP GET request to the /ready endpoint" +) +def the_rest_service_returns_a_200_status_code_for_an_http_get_request_to_the_ready_endpoint( + setup, +): + """the REST service returns a 200 status code for an HTTP GET request to the /ready endpoint.""" + + # 5 minute retry. + @retry( + stop_max_attempt_number=1500, + wait_fixed=200, + ) + def rest_is_ready(): + response = http_get(READINESS_API_ENDPOINT) + assert response.status_code == 200 + + rest_is_ready() + + +@when("the agent-core service is brought down forcefully") +def the_agent_core_service_is_brought_down_forcefully(setup): + """the agent-core service is brought down forcefully.""" + Docker.kill_container("core") + + +@then("the REST service returns 200 for /ready endpoint for 2 more second") +def the_rest_service_returns_200_for_ready_endpoint_for_2_more_second(setup): + """the REST service returns 200 for /ready endpoint for 2 more second.""" + start_time = time.time() + while time.time() - start_time < 2: + response = http_get(READINESS_API_ENDPOINT) + if response.status_code != 200: + raise ValueError( + "Expected Readiness probe to return 200 for this duration of 2s" + ) + + +@then( + "after a delay of 4s the REST service returns 503 for /ready endpoint for the following 5s" +) +def after_a_delay_of_4s_the_rest_service_returns_503_for_ready_endpoint_for_the_following_5s( + setup, +): + """after a delay of 4s the REST service returns 503 for /ready endpoint for the following 5s.""" + time.sleep(4) + + start_time = time.time() + while time.time() - start_time < 5: + response = http_get(READINESS_API_ENDPOINT) + if response.status_code != 503: + raise ValueError( + "Expected Readiness probe to return 503 for this duration of 5s" + )