diff --git a/control-plane/rest/Cargo.toml b/control-plane/rest/Cargo.toml index a3f26f862..950c770a4 100644 --- a/control-plane/rest/Cargo.toml +++ b/control-plane/rest/Cargo.toml @@ -20,6 +20,7 @@ rustls = { version = "0.23.19", default-features = false } rustls-pemfile = "2.2.0" actix-web = { version = "4.9.0", features = ["rustls-0_23"] } actix-service = "2.0.2" +tokio = { version = "1.41.0", features = ["sync"] } opentelemetry = { version = "0.26.0" } tracing-actix-web = { version = "0.7.14", features = ["opentelemetry_0_26"] } tracing = "0.1.40" diff --git a/control-plane/rest/service/src/health/core_state.rs b/control-plane/rest/service/src/health/core_state.rs index 46c02df1e..2aaadf19b 100644 --- a/control-plane/rest/service/src/health/core_state.rs +++ b/control-plane/rest/service/src/health/core_state.rs @@ -1,14 +1,12 @@ use crate::v0::core_grpc; use grpc::operations::node::traits::NodeOperations; -use std::{ - sync::RwLock, - time::{Duration, Instant}, -}; +use std::time::{Duration, Instant}; +use tokio::sync::Mutex; /// This is a type to cache the liveness of the agent-core service. /// This is meant to be wrapped inside an Arc and used across threads. pub struct CachedCoreState { - state: RwLock, + state: Mutex, cache_duration: Duration, } @@ -18,13 +16,23 @@ struct ServerState { last_updated: Instant, } +impl ServerState { + /// Update the state of the agent-core service, or assume it's unavailable if something + /// went wrong. + async fn update_or_assume_unavailable(&mut self) { + let new_value = core_grpc().node().probe(None).await.unwrap_or(false); + self.is_live = new_value; + self.last_updated = Instant::now(); + } +} + impl CachedCoreState { /// Create a new cache for serving readiness health checks based on agent-core health. pub async fn new(cache_duration: Duration) -> Self { let agent_core_is_live = core_grpc().node().probe(None).await.unwrap_or(false); CachedCoreState { - state: RwLock::new(ServerState { + state: Mutex::new(ServerState { is_live: agent_core_is_live, last_updated: Instant::now(), }), @@ -35,24 +43,12 @@ impl CachedCoreState { /// Get the cached state of the agent-core service, or assume it's unavailable if something /// went wrong. pub async fn get_or_assume_unavailable(&self) -> bool { - let should_update = { - let state = self.state.read().unwrap(); - state.last_updated.elapsed() >= self.cache_duration - }; + let mut state = self.state.lock().await; - if should_update { - self.update_or_assume_unavailable().await; + if state.last_updated.elapsed() >= self.cache_duration { + state.update_or_assume_unavailable().await; } - self.state.read().unwrap().is_live - } - - /// Update the state of the agent-core service, or assume it's unavailable if something - /// went wrong. - pub async fn update_or_assume_unavailable(&self) { - let new_value = core_grpc().node().probe(None).await.unwrap_or(false); - let mut state = self.state.write().unwrap(); - state.is_live = new_value; - state.last_updated = Instant::now(); + state.is_live } } diff --git a/tests/bdd/features/health_probes/readiness_probe.feature b/tests/bdd/features/health_probes/readiness_probe.feature index da8849379..b059501f2 100644 --- a/tests/bdd/features/health_probes/readiness_probe.feature +++ b/tests/bdd/features/health_probes/readiness_probe.feature @@ -2,11 +2,11 @@ Feature: Readiness Probe Background: Given a running agent-core service - And a running REST service with "--core-health-freq" set to "4s" + And a running REST service with the cache refresh period set to "800ms" - Scenario: The REST API /ready service should not update its readiness status more than once in 4s + Scenario: The REST API /ready service should not update its readiness status more than once in the cache refresh period Given agent-core service is available And the REST service returns a 200 status code for an HTTP GET request to the /ready endpoint When the agent-core service is brought down forcefully - Then the REST service returns 200 for /ready endpoint for 2 more second - And after a delay of 4s the REST service returns 503 for /ready endpoint for the following 5s + Then the REST service return changes from 200 to 503 within double of the cache refresh period + And it keeps returning 503 at least for the cache refresh period diff --git a/tests/bdd/features/health_probes/test_readiness_probe.py b/tests/bdd/features/health_probes/test_readiness_probe.py index c0dfa4daa..c7572a006 100644 --- a/tests/bdd/features/health_probes/test_readiness_probe.py +++ b/tests/bdd/features/health_probes/test_readiness_probe.py @@ -1,37 +1,51 @@ """Readiness Probe feature tests.""" +import logging import time import pytest +import requests from common.deployer import Deployer from common.docker import Docker from pytest_bdd import given, scenario, then, when -from requests import get as http_get from retrying import retry -READINESS_API_ENDPOINT = "http://localhost:8081/ready" +logger = logging.getLogger(__name__) + + +def ready_http_get(context_msg_prefix: str): + try: + response = requests.get("http://localhost:8081/ready", timeout=(0.003, 0.010)) + logger.info( + f"{context_msg_prefix}: response.status_code: {response.status_code}" + ) + return response + except requests.exceptions.Timeout: + logger.error(f"{context_msg_prefix}: the request timed out") + except requests.exceptions.RequestException as e: + logger.error(f"{context_msg_prefix}: an error occurred: {e}") @pytest.fixture(scope="module") def setup(): - Deployer.start(io_engines=1, rest_core_health_freq="4s") + Deployer.start(io_engines=1, rest_core_health_freq="800ms", request_timeout="50ms") yield Deployer.stop() @scenario( "readiness_probe.feature", - "The REST API /ready service should not update its readiness status more than once in 4s", + "The REST API /ready service should not update its readiness status more than once in the cache refresh period", ) -def test_the_rest_api_ready_service_should_not_update_its_readiness_status_more_than_once_in_4s( +def test_the_rest_api_ready_service_should_not_update_its_readiness_status_more_than_once_in_the_cache_refresh_period( setup, ): - """The REST API /ready service should not update its readiness status more than once in 4s.""" + """The REST API /ready service should not update its readiness status more than once in the cache refresh period.""" -@given('a running REST service with "--core-health-freq" set to "4s"') +@given('a running REST service with the cache refresh period set to "800ms"') def a_running_rest_service(setup): - """a running REST service with "--core-health-freq" set to "4s".""" + """a running REST service with the cache refresh period set to "800ms".""" @given("a running agent-core service") @@ -51,6 +65,7 @@ def the_rest_service_returns_a_200_status_code_for_an_http_get_request_to_the_re setup, ): """the REST service returns a 200 status code for an HTTP GET request to the /ready endpoint.""" + logger.info(f"Initial request: expected status: 200") # 5 minute retry. @retry( @@ -58,7 +73,7 @@ def the_rest_service_returns_a_200_status_code_for_an_http_get_request_to_the_re wait_fixed=200, ) def rest_is_ready(): - response = http_get(READINESS_API_ENDPOINT) + response = ready_http_get(context_msg_prefix="Initial request") assert response.status_code == 200 rest_is_ready() @@ -70,31 +85,34 @@ def the_agent_core_service_is_brought_down_forcefully(setup): Docker.kill_container("core") -@then("the REST service returns 200 for /ready endpoint for 2 more second") -def the_rest_service_returns_200_for_ready_endpoint_for_2_more_second(setup): - """the REST service returns 200 for /ready endpoint for 2 more second.""" - start_time = time.time() - while time.time() - start_time < 2: - response = http_get(READINESS_API_ENDPOINT) - if response.status_code != 200: - raise ValueError( - "Expected Readiness probe to return 200 for this duration of 2s" - ) - - @then( - "after a delay of 4s the REST service returns 503 for /ready endpoint for the following 5s" + "the REST service return changes from 200 to 503 within double of the cache refresh period" ) -def after_a_delay_of_4s_the_rest_service_returns_503_for_ready_endpoint_for_the_following_5s( +def the_rest_service_return_changes_from_200_to_503_within_double_of_the_cache_refresh_period( + setup, +): + """the REST service return changes from 200 to 503 within double of the cache refresh period.""" + logger.info(f"Request after killing core: expected status: 503") + + @retry(wait_fixed=50, stop_max_delay=1600) + def rest_is_not_ready(): + response = ready_http_get(context_msg_prefix="Request after killing core") + assert response.status_code == 503 + + rest_is_not_ready() + + +@then("it keeps returning 503 at least for the cache refresh period") +def it_keeps_returning_503_at_least_for_the_cache_refresh_period( setup, ): - """after a delay of 4s the REST service returns 503 for /ready endpoint for the following 5s.""" - time.sleep(4) + """it keeps returning 503 at least for the cache refresh period.""" + logger.info(f"Request after cache refresh: expected status: 503") start_time = time.time() - while time.time() - start_time < 5: - response = http_get(READINESS_API_ENDPOINT) + while time.time() - start_time < 0.8: + response = ready_http_get(context_msg_prefix="Request after cache refresh") if response.status_code != 503: raise ValueError( - "Expected Readiness probe to return 503 for this duration of 5s" + "Expected Readiness probe to return 503 for this duration of 800ms" )