diff --git a/Cargo.lock b/Cargo.lock index 8e4f1316a..4c8e456ea 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -103,7 +103,7 @@ dependencies = [ "actix-utils", "futures-core", "futures-util", - "mio", + "mio 0.8.11", "num_cpus", "socket2 0.4.9", "tokio", @@ -189,7 +189,7 @@ dependencies = [ "serde_json", "serde_urlencoded", "smallvec", - "socket2 0.5.3", + "socket2 0.5.8", "time", "url", ] @@ -2236,9 +2236,9 @@ checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55" [[package]] name = "libc" -version = "0.2.149" +version = "0.2.168" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a08173bc88b7955d1b3145aa561539096c421ac8debde8cbc3612ec635fee29b" +checksum = "5aaeb2981e0606ca11d79718f8bb01164f1d6ed75080182d3abf017e6d244b6d" [[package]] name = "libloading" @@ -2394,6 +2394,17 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "mio" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2886843bf800fba2e3377cff24abf6379b4c4d5c6681eaf9ea5b0d15090450bd" +dependencies = [ + "libc", + "wasi", + "windows-sys 0.52.0", +] + [[package]] name = "multimap" version = "0.8.3" @@ -3769,12 +3780,12 @@ dependencies = [ [[package]] name = "socket2" -version = "0.5.3" +version = "0.5.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2538b18701741680e0322a2302176d3253a35388e2e62f172f64f4f16605f877" +checksum = "c970269d99b64e60ec3bd6ad27270092a5394c4e309314b18ae3fe575695fbe8" dependencies = [ "libc", - "windows-sys 0.48.0", + "windows-sys 0.52.0", ] [[package]] @@ -4063,21 +4074,20 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.33.0" +version = "1.42.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4f38200e3ef7995e5ef13baec2f432a6da0aa9ac495b2c0e8f3b7eec2c92d653" +checksum = "5cec9b21b0450273377fc97bd4c33a8acffc8c996c987a7c5b319a0083707551" dependencies = [ "backtrace", "bytes", "libc", - "mio", - "num_cpus", + "mio 1.0.3", "parking_lot", "pin-project-lite", "signal-hook-registry", - "socket2 0.5.3", + "socket2 0.5.8", "tokio-macros", - "windows-sys 0.48.0", + "windows-sys 0.52.0", ] [[package]] @@ -4092,9 +4102,9 @@ dependencies = [ [[package]] name = "tokio-macros" -version = "2.1.0" +version = "2.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "630bdcf245f78637c13ec01ffae6187cca34625e8c63150d424b59e55af2675e" +checksum = "693d596312e88961bc67d7f1f97af8a70227d9f90c31bba5806eec004978d752" dependencies = [ "proc-macro2", "quote", diff --git a/control-plane/rest/Cargo.toml b/control-plane/rest/Cargo.toml index d9e014c06..79b0b85cc 100644 --- a/control-plane/rest/Cargo.toml +++ b/control-plane/rest/Cargo.toml @@ -21,6 +21,7 @@ rustls = "0.21.12" rustls-pemfile = "1.0.3" actix-web = { version = "4.4.0", features = ["rustls-0_21"] } actix-service = "2.0.2" +tokio = { version = "1.41.0", features = ["sync"] } opentelemetry = { version = "0.22.0" } actix-web-opentelemetry = "0.17.0" tracing = "0.1.37" diff --git a/control-plane/rest/service/src/health/core_state.rs b/control-plane/rest/service/src/health/core_state.rs new file mode 100644 index 000000000..2aaadf19b --- /dev/null +++ b/control-plane/rest/service/src/health/core_state.rs @@ -0,0 +1,54 @@ +use crate::v0::core_grpc; +use grpc::operations::node::traits::NodeOperations; +use std::time::{Duration, Instant}; +use tokio::sync::Mutex; + +/// This is a type to cache the liveness of the agent-core service. +/// This is meant to be wrapped inside an Arc and used across threads. +pub struct CachedCoreState { + state: Mutex, + cache_duration: Duration, +} + +/// This type remembers a liveness state, and when this data was refreshed. +struct ServerState { + is_live: bool, + last_updated: Instant, +} + +impl ServerState { + /// Update the state of the agent-core service, or assume it's unavailable if something + /// went wrong. + async fn update_or_assume_unavailable(&mut self) { + let new_value = core_grpc().node().probe(None).await.unwrap_or(false); + self.is_live = new_value; + self.last_updated = Instant::now(); + } +} + +impl CachedCoreState { + /// Create a new cache for serving readiness health checks based on agent-core health. + pub async fn new(cache_duration: Duration) -> Self { + let agent_core_is_live = core_grpc().node().probe(None).await.unwrap_or(false); + + CachedCoreState { + state: Mutex::new(ServerState { + is_live: agent_core_is_live, + last_updated: Instant::now(), + }), + cache_duration, + } + } + + /// Get the cached state of the agent-core service, or assume it's unavailable if something + /// went wrong. + pub async fn get_or_assume_unavailable(&self) -> bool { + let mut state = self.state.lock().await; + + if state.last_updated.elapsed() >= self.cache_duration { + state.update_or_assume_unavailable().await; + } + + state.is_live + } +} diff --git a/control-plane/rest/service/src/health/handlers.rs b/control-plane/rest/service/src/health/handlers.rs new file mode 100644 index 000000000..5aa2f1082 --- /dev/null +++ b/control-plane/rest/service/src/health/handlers.rs @@ -0,0 +1,28 @@ +use crate::CachedCoreState; +use actix_web::{get, web::Data, HttpResponse, Responder}; + +/// Liveness probe check. Failure will result in Pod restart. 200 on success. +#[get("/live")] +async fn liveness(_cached_core_state: Data) -> impl Responder { + HttpResponse::Ok() + .content_type("text/plain; charset=utf-8") + .insert_header(("X-Content-Type-Options", "nosniff")) + .body("live") +} + +/// Readiness probe check. Failure will result in removal of Container from Kubernetes service +/// target pool. 200 on success, 503 on failure. +#[get("/ready")] +async fn readiness(cached_core_state: Data) -> HttpResponse { + if cached_core_state.get_or_assume_unavailable().await { + return HttpResponse::Ok() + .content_type("text/plain; charset=utf-8") + .insert_header(("X-Content-Type-Options", "nosniff")) + .body("ready"); + } + + HttpResponse::ServiceUnavailable() + .content_type("text/plain; charset=utf-8") + .insert_header(("X-Content-Type-Options", "nosniff")) + .body("not ready") +} diff --git a/control-plane/rest/service/src/health/mod.rs b/control-plane/rest/service/src/health/mod.rs new file mode 100644 index 000000000..da63f89a5 --- /dev/null +++ b/control-plane/rest/service/src/health/mod.rs @@ -0,0 +1,4 @@ +/// Has tools to collect the liveness state of the agent-core service. +pub mod core_state; +/// Actix request handlers for health checks. +pub mod handlers; diff --git a/control-plane/rest/service/src/main.rs b/control-plane/rest/service/src/main.rs index c07f3b485..df59d36bf 100644 --- a/control-plane/rest/service/src/main.rs +++ b/control-plane/rest/service/src/main.rs @@ -1,16 +1,25 @@ mod authentication; +mod health; mod v0; -use crate::v0::{CORE_CLIENT, JSON_GRPC_CLIENT}; +use crate::{ + health::{ + core_state::CachedCoreState, + handlers::{liveness, readiness}, + }, + v0::{CORE_CLIENT, JSON_GRPC_CLIENT}, +}; use actix_service::ServiceFactory; use actix_web::{ body::MessageBody, dev::{ServiceRequest, ServiceResponse}, - middleware, App, HttpServer, + middleware, + web::Data, + App, HttpServer, }; use rustls::{Certificate, PrivateKey, ServerConfig}; use rustls_pemfile::{certs, rsa_private_keys}; -use std::{fs::File, io::BufReader}; +use std::{fs::File, io::BufReader, time::Duration}; use utils::DEFAULT_GRPC_CLIENT_ADDR; #[derive(Debug, Parser)] @@ -28,6 +37,10 @@ pub(crate) struct CliArgs { #[clap(long, short = 'z', default_value = DEFAULT_GRPC_CLIENT_ADDR)] core_grpc: Uri, + /// Set the frequency of probing the agent-core for a liveness check. + #[arg(long = "core-health-freq", value_parser = humantime::parse_duration, default_value = "2m")] + core_liveness_check_frequency: Duration, + /// The json gRPC Server URL or address to connect to the service. #[clap(long, short = 'J')] json_grpc: Option, @@ -222,24 +235,30 @@ async fn main() -> anyhow::Result<()> { .with_tracing_tags(cli_args.tracing_tags.clone()) .init("rest-server"); + // Initialize the core client to be used in rest + CORE_CLIENT + .set(CoreClient::new(cli_args.core_grpc, timeout_opts()).await) + .ok() + .expect("Expect to be initialised only once"); + + let cached_core_state = + Data::new(CachedCoreState::new(cli_args.core_liveness_check_frequency).await); + let app = move || { App::new() + .app_data(cached_core_state.clone()) + .service(liveness) + .service(readiness) .wrap(RequestTracing::new()) .wrap(middleware::Logger::default()) .app_data(authentication::init(get_jwk_path())) .configure_api(&v0::configure_api) }; - // Initialise the core client to be used in rest - CORE_CLIENT - .set(CoreClient::new(CliArgs::args().core_grpc, timeout_opts()).await) - .ok() - .expect("Expect to be initialised only once"); - - // Initialise the json grpc client to be used in rest - if CliArgs::args().json_grpc.is_some() { + // Initialize the json grpc client to be used in rest + if let Some(json_grpc) = CliArgs::args().json_grpc { JSON_GRPC_CLIENT - .set(JsonGrpcClient::new(CliArgs::args().json_grpc.unwrap(), timeout_opts()).await) + .set(JsonGrpcClient::new(json_grpc, timeout_opts()).await) .ok() .expect("Expect to be initialised only once"); } diff --git a/deployer/src/infra/rest.rs b/deployer/src/infra/rest.rs index 4e3bce43a..c4ba70254 100644 --- a/deployer/src/infra/rest.rs +++ b/deployer/src/infra/rest.rs @@ -44,6 +44,10 @@ impl ComponentAction for Rest { } } + if let Some(core_health_freq) = &options.rest_core_health_freq { + binary = binary.with_args(vec!["--core-health-freq", core_health_freq]); + } + if cfg.container_exists("jaeger") { let jaeger_config = format!("jaeger.{}:4317", cfg.get_name()); binary = binary.with_args(vec!["--jaeger", &jaeger_config]) diff --git a/deployer/src/lib.rs b/deployer/src/lib.rs index baff0bf06..1220955f1 100644 --- a/deployer/src/lib.rs +++ b/deployer/src/lib.rs @@ -136,6 +136,10 @@ pub struct StartOptions { #[clap(long, conflicts_with = "no_rest")] pub rest_jwk: Option, + /// Set the rest-to-core health probe frequency on the rest. + #[arg(long)] + pub rest_core_health_freq: Option, + /// Use the following image pull policy when creating containers from images. #[clap(long, default_value = "ifnotpresent")] pub image_pull_policy: composer::ImagePullPolicy, diff --git a/tests/bdd/common/deployer.py b/tests/bdd/common/deployer.py index 1c91c0293..b5bae59f7 100644 --- a/tests/bdd/common/deployer.py +++ b/tests/bdd/common/deployer.py @@ -1,11 +1,10 @@ import os import subprocess -from datetime import datetime - -import pytest from dataclasses import dataclass +from datetime import datetime import common +import pytest from common.docker import Docker from common.nvme import nvme_disconnect_allours_wait @@ -36,6 +35,9 @@ class StartOptions: io_engine_devices: [str] = () request_timeout: str = "" no_min_timeouts: bool = False + rust_log: str = None + rust_log_silence: str = None + rest_core_health_freq: str = None def args(self): args = [ @@ -84,6 +86,9 @@ def args(self): if self.no_min_timeouts: args.append(f"--no-min-timeouts") + if self.rest_core_health_freq: + args.append(f"--rest-core-health-freq={self.rest_core_health_freq}") + agent_arg = "--agents=Core" if self.ha_node_agent: agent_arg += ",HaNode" @@ -91,6 +96,7 @@ def args(self): agent_arg += ",HaCluster" if self.ha_cluster_agent_fast is not None: args.append(f"--cluster-fast-requeue={self.ha_cluster_agent_fast}") + args.append(agent_arg) return args @@ -122,6 +128,9 @@ def start( io_engine_devices=[], request_timeout="", no_min_timeouts=False, + rust_log: str = None, + rust_log_silence: str = None, + rest_core_health_freq: str = None, ): options = StartOptions( io_engines, @@ -146,6 +155,9 @@ def start( io_engine_devices=io_engine_devices, request_timeout=request_timeout, no_min_timeouts=no_min_timeouts, + rust_log=rust_log, + rust_log_silence=rust_log_silence, + rest_core_health_freq=rest_core_health_freq, ) pytest.deployer_options = options Deployer.start_with_opts(options) diff --git a/tests/bdd/features/health_probes/readiness_probe.feature b/tests/bdd/features/health_probes/readiness_probe.feature new file mode 100644 index 000000000..b059501f2 --- /dev/null +++ b/tests/bdd/features/health_probes/readiness_probe.feature @@ -0,0 +1,12 @@ +Feature: Readiness Probe + + Background: + Given a running agent-core service + And a running REST service with the cache refresh period set to "800ms" + + Scenario: The REST API /ready service should not update its readiness status more than once in the cache refresh period + Given agent-core service is available + And the REST service returns a 200 status code for an HTTP GET request to the /ready endpoint + When the agent-core service is brought down forcefully + Then the REST service return changes from 200 to 503 within double of the cache refresh period + And it keeps returning 503 at least for the cache refresh period diff --git a/tests/bdd/features/health_probes/test_readiness_probe.py b/tests/bdd/features/health_probes/test_readiness_probe.py new file mode 100644 index 000000000..c7572a006 --- /dev/null +++ b/tests/bdd/features/health_probes/test_readiness_probe.py @@ -0,0 +1,118 @@ +"""Readiness Probe feature tests.""" + +import logging +import time + +import pytest +import requests +from common.deployer import Deployer +from common.docker import Docker +from pytest_bdd import given, scenario, then, when +from retrying import retry + +logger = logging.getLogger(__name__) + + +def ready_http_get(context_msg_prefix: str): + try: + response = requests.get("http://localhost:8081/ready", timeout=(0.003, 0.010)) + logger.info( + f"{context_msg_prefix}: response.status_code: {response.status_code}" + ) + return response + except requests.exceptions.Timeout: + logger.error(f"{context_msg_prefix}: the request timed out") + except requests.exceptions.RequestException as e: + logger.error(f"{context_msg_prefix}: an error occurred: {e}") + + +@pytest.fixture(scope="module") +def setup(): + Deployer.start(io_engines=1, rest_core_health_freq="800ms", request_timeout="50ms") + yield + Deployer.stop() + + +@scenario( + "readiness_probe.feature", + "The REST API /ready service should not update its readiness status more than once in the cache refresh period", +) +def test_the_rest_api_ready_service_should_not_update_its_readiness_status_more_than_once_in_the_cache_refresh_period( + setup, +): + """The REST API /ready service should not update its readiness status more than once in the cache refresh period.""" + + +@given('a running REST service with the cache refresh period set to "800ms"') +def a_running_rest_service(setup): + """a running REST service with the cache refresh period set to "800ms".""" + + +@given("a running agent-core service") +def a_running_agent_core_service(setup): + """a running agent-core service.""" + + +@given("agent-core service is available") +def agent_core_service_is_available(setup): + """agent-core service is available.""" + + +@given( + "the REST service returns a 200 status code for an HTTP GET request to the /ready endpoint" +) +def the_rest_service_returns_a_200_status_code_for_an_http_get_request_to_the_ready_endpoint( + setup, +): + """the REST service returns a 200 status code for an HTTP GET request to the /ready endpoint.""" + logger.info(f"Initial request: expected status: 200") + + # 5 minute retry. + @retry( + stop_max_attempt_number=1500, + wait_fixed=200, + ) + def rest_is_ready(): + response = ready_http_get(context_msg_prefix="Initial request") + assert response.status_code == 200 + + rest_is_ready() + + +@when("the agent-core service is brought down forcefully") +def the_agent_core_service_is_brought_down_forcefully(setup): + """the agent-core service is brought down forcefully.""" + Docker.kill_container("core") + + +@then( + "the REST service return changes from 200 to 503 within double of the cache refresh period" +) +def the_rest_service_return_changes_from_200_to_503_within_double_of_the_cache_refresh_period( + setup, +): + """the REST service return changes from 200 to 503 within double of the cache refresh period.""" + logger.info(f"Request after killing core: expected status: 503") + + @retry(wait_fixed=50, stop_max_delay=1600) + def rest_is_not_ready(): + response = ready_http_get(context_msg_prefix="Request after killing core") + assert response.status_code == 503 + + rest_is_not_ready() + + +@then("it keeps returning 503 at least for the cache refresh period") +def it_keeps_returning_503_at_least_for_the_cache_refresh_period( + setup, +): + """it keeps returning 503 at least for the cache refresh period.""" + logger.info(f"Request after cache refresh: expected status: 503") + + start_time = time.time() + while time.time() - start_time < 0.8: + response = ready_http_get(context_msg_prefix="Request after cache refresh") + if response.status_code != 503: + raise ValueError( + "Expected Readiness probe to return 503 for this duration of 800ms" + )