Skip to content

Commit

Permalink
chore(bors): merge pull request #894
Browse files Browse the repository at this point in the history
894: feat(rest): add health API for readiness and liveness probes r=niladrih a=niladrih



Co-authored-by: Niladri Halder <[email protected]>
  • Loading branch information
mayastor-bors and niladrih committed Dec 6, 2024
2 parents 739940f + 5e5feea commit c3ae698
Show file tree
Hide file tree
Showing 9 changed files with 243 additions and 9 deletions.
58 changes: 58 additions & 0 deletions control-plane/rest/service/src/health/core_state.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
use crate::v0::core_grpc;
use grpc::operations::node::traits::NodeOperations;
use std::{
sync::RwLock,
time::{Duration, Instant},
};

/// This is a type to cache the liveness of the agent-core service.
/// This is meant to be wrapped inside an Arc and used across threads.
pub struct CachedCoreState {
state: RwLock<ServerState>,
cache_duration: Duration,
}

/// This type remembers a liveness state, and when this data was refreshed.
struct ServerState {
is_live: bool,
last_updated: Instant,
}

impl CachedCoreState {
/// Create a new cache for serving readiness health checks based on agent-core health.
pub async fn new(cache_duration: Duration) -> Self {
let agent_core_is_live = core_grpc().node().probe(None).await.unwrap_or(false);

CachedCoreState {
state: RwLock::new(ServerState {
is_live: agent_core_is_live,
last_updated: Instant::now(),
}),
cache_duration,
}
}

/// Get the cached state of the agent-core service, or assume it's unavailable if something
/// went wrong.
pub async fn get_or_assume_unavailable(&self) -> bool {
let should_update = {
let state = self.state.read().unwrap();
state.last_updated.elapsed() >= self.cache_duration
};

if should_update {
self.update_or_assume_unavailable().await;
}

self.state.read().unwrap().is_live
}

/// Update the state of the agent-core service, or assume it's unavailable if something
/// went wrong.
pub async fn update_or_assume_unavailable(&self) {
let new_value = core_grpc().node().probe(None).await.unwrap_or(false);
let mut state = self.state.write().unwrap();
state.is_live = new_value;
state.last_updated = Instant::now();
}
}
28 changes: 28 additions & 0 deletions control-plane/rest/service/src/health/handlers.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
use crate::CachedCoreState;
use actix_web::{get, web::Data, HttpResponse, Responder};

/// Liveness probe check. Failure will result in Pod restart. 200 on success.
#[get("/live")]
async fn liveness(_cached_core_state: Data<CachedCoreState>) -> impl Responder {
HttpResponse::Ok()
.content_type("text/plain; charset=utf-8")
.insert_header(("X-Content-Type-Options", "nosniff"))
.body("live")
}

/// Readiness probe check. Failure will result in removal of Container from Kubernetes service
/// target pool. 200 on success, 503 on failure.
#[get("/ready")]
async fn readiness(cached_core_state: Data<CachedCoreState>) -> HttpResponse {
if cached_core_state.get_or_assume_unavailable().await {
return HttpResponse::Ok()
.content_type("text/plain; charset=utf-8")
.insert_header(("X-Content-Type-Options", "nosniff"))
.body("ready");
}

HttpResponse::ServiceUnavailable()
.content_type("text/plain; charset=utf-8")
.insert_header(("X-Content-Type-Options", "nosniff"))
.body("not ready")
}
4 changes: 4 additions & 0 deletions control-plane/rest/service/src/health/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
/// Has tools to collect the liveness state of the agent-core service.
pub mod core_state;
/// Actix request handlers for health checks.
pub mod handlers;
37 changes: 28 additions & 9 deletions control-plane/rest/service/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,28 @@
mod authentication;
mod health;
mod v0;

use crate::v0::{CORE_CLIENT, JSON_GRPC_CLIENT};
use crate::{
health::{
core_state::CachedCoreState,
handlers::{liveness, readiness},
},
v0::{CORE_CLIENT, JSON_GRPC_CLIENT},
};
use actix_service::ServiceFactory;
use actix_web::{
body::MessageBody,
dev::{ServiceRequest, ServiceResponse},
middleware, HttpServer,
middleware,
web::Data,
HttpServer,
};
use clap::Parser;
use grpc::{client::CoreClient, operations::jsongrpc::client::JsonGrpcClient};
use http::Uri;
use rustls::{pki_types::PrivateKeyDer, ServerConfig};
use rustls_pemfile::{certs, rsa_private_keys};
use std::{fs::File, io::BufReader};
use std::{fs::File, io::BufReader, time::Duration};
use stor_port::transport_api::{RequestMinTimeout, TimeoutOptions};
use utils::{
tracing_telemetry::{FmtLayer, FmtStyle, KeyValue},
Expand All @@ -34,6 +43,10 @@ pub(crate) struct CliArgs {
#[clap(long, short = 'z', default_value = DEFAULT_GRPC_CLIENT_ADDR)]
core_grpc: Uri,

/// Set the frequency of probing the agent-core for a liveness check.
#[arg(long = "core-health-freq", value_parser = humantime::parse_duration, default_value = "2m")]
core_liveness_check_frequency: Duration,

/// The json gRPC Server URL or address to connect to the service.
#[clap(long, short = 'J')]
json_grpc: Option<Uri>,
Expand Down Expand Up @@ -219,20 +232,26 @@ async fn main() -> anyhow::Result<()> {
.with_tracing_tags(cli_args.tracing_tags.clone())
.init("rest-server");

// Initialize the core client to be used in rest
CORE_CLIENT
.set(CoreClient::new(cli_args.core_grpc, timeout_opts()).await)
.ok()
.expect("Expect to be initialised only once");

let cached_core_state =
Data::new(CachedCoreState::new(cli_args.core_liveness_check_frequency).await);

let app = move || {
actix_web::App::new()
.app_data(cached_core_state.clone())
.service(liveness)
.service(readiness)
.wrap(tracing_actix_web::TracingLogger::default())
.wrap(middleware::Logger::default())
.app_data(authentication::init(get_jwk_path()))
.configure_api(&v0::configure_api)
};

// Initialize the core client to be used in rest
CORE_CLIENT
.set(CoreClient::new(CliArgs::args().core_grpc, timeout_opts()).await)
.ok()
.expect("Expect to be initialised only once");

// Initialize the json grpc client to be used in rest
if let Some(json_grpc) = CliArgs::args().json_grpc {
JSON_GRPC_CLIENT
Expand Down
4 changes: 4 additions & 0 deletions deployer/src/infra/rest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ impl ComponentAction for Rest {
}
}

if let Some(core_health_freq) = &options.rest_core_health_freq {
binary = binary.with_args(vec!["--core-health-freq", core_health_freq]);
}

if cfg.container_exists("jaeger") {
let jaeger_config = format!("jaeger.{}:4317", cfg.get_name());
binary = binary.with_args(vec!["--jaeger", &jaeger_config])
Expand Down
4 changes: 4 additions & 0 deletions deployer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,10 @@ pub struct StartOptions {
#[clap(long, conflicts_with = "no_rest")]
pub rest_jwk: Option<String>,

/// Set the rest-to-core health probe frequency on the rest.
#[arg(long)]
pub rest_core_health_freq: Option<String>,

/// Use the following image pull policy when creating containers from images.
#[clap(long, default_value = "ifnotpresent")]
pub image_pull_policy: composer::ImagePullPolicy,
Expand Down
7 changes: 7 additions & 0 deletions tests/bdd/common/deployer.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ class StartOptions:
no_min_timeouts: bool = False
rust_log: str = None
rust_log_silence: str = None
rest_core_health_freq: str = None

def args(self):
args = [
Expand Down Expand Up @@ -99,13 +100,17 @@ def args(self):
if rust_log_silence is not None:
args.append(f"--rust-log-silence={rust_log_silence}")

if self.rest_core_health_freq:
args.append(f"--rest-core-health-freq={self.rest_core_health_freq}")

agent_arg = "--agents=Core"
if self.ha_node_agent:
agent_arg += ",HaNode"
if self.ha_cluster_agent:
agent_arg += ",HaCluster"
if self.ha_cluster_agent_fast is not None:
args.append(f"--cluster-fast-requeue={self.ha_cluster_agent_fast}")

args.append(agent_arg)

return args
Expand Down Expand Up @@ -139,6 +144,7 @@ def start(
no_min_timeouts=False,
rust_log: str = None,
rust_log_silence: str = None,
rest_core_health_freq: str = None,
):
options = StartOptions(
io_engines,
Expand All @@ -165,6 +171,7 @@ def start(
no_min_timeouts=no_min_timeouts,
rust_log=rust_log,
rust_log_silence=rust_log_silence,
rest_core_health_freq=rest_core_health_freq,
)
pytest.deployer_options = options
Deployer.start_with_opts(options)
Expand Down
12 changes: 12 additions & 0 deletions tests/bdd/features/health_probes/readiness_probe.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
Feature: Readiness Probe

Background:
Given a running agent-core service
And a running REST service with "--core-health-freq" set to "10ms"

Scenario: The REST API /ready service should not update its readiness status more than once in 10 milliseconds
Given agent-core service is available
And the REST service returns a 200 status code for an HTTP GET request to the /ready endpoint
When the agent-core service is brought down forcefully
Then the REST service returns 200 for /ready endpoint for 5 more milliseconds
And after a delay of 5 seconds the REST service returns 503 for /ready endpoint for the following 12 milliseconds
98 changes: 98 additions & 0 deletions tests/bdd/features/health_probes/test_readiness_probe.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
"""Readiness Probe feature tests."""

import time

import pytest
from common.deployer import Deployer
from common.docker import Docker
from pytest_bdd import given, scenario, then, when
from requests import get as http_get
from retrying import retry

READINESS_API_ENDPOINT = "http://localhost:8081/ready"


@pytest.fixture(scope="module")
def setup():
Deployer.start(io_engines=1, rest_core_health_freq="10ms")
yield
Deployer.stop()


@scenario(
"readiness_probe.feature",
"The REST API /ready service should not update its readiness status more than once in 10 milliseconds",
)
def test_the_rest_api_ready_service_should_not_update_its_readiness_status_more_than_once_in_10_milliseconds():
"""The REST API /ready service should not update its readiness status more than once in 10 milliseconds."""


@given('a running REST service with "--core-health-freq" set to "10ms"')
def a_running_rest_service(setup):
"""a running REST service with "--core-health-freq" set to "10ms"."""


@given("a running agent-core service")
def a_running_agent_core_service(setup):
"""a running agent-core service."""


@given("agent-core service is available")
def agent_core_service_is_available(setup):
"""agent-core service is available."""


@given(
"the REST service returns a 200 status code for an HTTP GET request to the /ready endpoint"
)
def the_rest_service_returns_a_200_status_code_for_an_http_get_request_to_the_ready_endpoint(
setup,
):
"""the REST service returns a 200 status code for an HTTP GET request to the /ready endpoint."""

# 5 minute retry.
@retry(
stop_max_attempt_number=1500,
wait_fixed=200,
)
def rest_is_ready():
response = http_get(READINESS_API_ENDPOINT)
assert response.status_code == 200

rest_is_ready()


@when("the agent-core service is brought down forcefully")
def the_agent_core_service_is_brought_down_forcefully(setup):
"""the agent-core service is brought down forcefully."""
Docker.kill_container("core")


@then("the REST service returns 200 for /ready endpoint for 5 more milliseconds")
def the_rest_service_returns_200_for_ready_endpoint_for_5_more_milliseconds(setup):
"""the REST service returns 200 for /ready endpoint for 5 more milliseconds."""
start_time = time.time()
while time.time() - start_time < 5:
response = http_get(READINESS_API_ENDPOINT)
if response.status_code != 200:
raise ValueError(
"Expected Readiness probe to return 200 for this duration of 5 milliseconds"
)


@then(
"after a delay of 5 seconds the REST service returns 503 for /ready endpoint for the following 12 milliseconds"
)
def after_a_delay_of_5_seconds_the_rest_service_returns_503_for_ready_endpoint_for_the_following_12_milliseconds(
setup,
):
"""after a delay of 5 seconds the REST service returns 503 for /ready endpoint for the following 12 milliseconds."""
time.sleep(5)

start_time = time.time()
while time.time() - start_time < 12:
response = http_get(READINESS_API_ENDPOINT)
if response.status_code != 503:
raise ValueError(
"Expected Readiness probe to return 503 for this duration of 12 milliseconds"
)

0 comments on commit c3ae698

Please sign in to comment.