Skip to content

Commit

Permalink
chore(bors): merge pull request #900
Browse files Browse the repository at this point in the history
900: Readiness probe changes r=niladrih a=niladrih

Changes:
1. Makes the readiness probe test tighter
2. The readiness check API depends on the agent-core service's Node gRPC endpoint.
The current RwLock lets multiple tokio threads probe the agent-core. This is not
ideal. This change makes the get operation atomic, so that agent-core state cache
updates could be performed by only a single thread, and the updated cache should
feed the next few ready calls.

Co-authored-by: Niladri Halder <[email protected]>
  • Loading branch information
mayastor-bors and niladrih committed Dec 9, 2024
2 parents f6e9145 + 0e478f7 commit 41dac82
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 54 deletions.
1 change: 1 addition & 0 deletions control-plane/rest/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ rustls = { version = "0.23.19", default-features = false }
rustls-pemfile = "2.2.0"
actix-web = { version = "4.9.0", features = ["rustls-0_23"] }
actix-service = "2.0.2"
tokio = { version = "1.41.0", features = ["sync"] }
opentelemetry = { version = "0.26.0" }
tracing-actix-web = { version = "0.7.14", features = ["opentelemetry_0_26"] }
tracing = "0.1.40"
Expand Down
40 changes: 18 additions & 22 deletions control-plane/rest/service/src/health/core_state.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
use crate::v0::core_grpc;
use grpc::operations::node::traits::NodeOperations;
use std::{
sync::RwLock,
time::{Duration, Instant},
};
use std::time::{Duration, Instant};
use tokio::sync::Mutex;

/// This is a type to cache the liveness of the agent-core service.
/// This is meant to be wrapped inside an Arc and used across threads.
pub struct CachedCoreState {
state: RwLock<ServerState>,
state: Mutex<ServerState>,
cache_duration: Duration,
}

Expand All @@ -18,13 +16,23 @@ struct ServerState {
last_updated: Instant,
}

impl ServerState {
/// Update the state of the agent-core service, or assume it's unavailable if something
/// went wrong.
async fn update_or_assume_unavailable(&mut self) {
let new_value = core_grpc().node().probe(None).await.unwrap_or(false);
self.is_live = new_value;
self.last_updated = Instant::now();
}
}

impl CachedCoreState {
/// Create a new cache for serving readiness health checks based on agent-core health.
pub async fn new(cache_duration: Duration) -> Self {
let agent_core_is_live = core_grpc().node().probe(None).await.unwrap_or(false);

CachedCoreState {
state: RwLock::new(ServerState {
state: Mutex::new(ServerState {
is_live: agent_core_is_live,
last_updated: Instant::now(),
}),
Expand All @@ -35,24 +43,12 @@ impl CachedCoreState {
/// Get the cached state of the agent-core service, or assume it's unavailable if something
/// went wrong.
pub async fn get_or_assume_unavailable(&self) -> bool {
let should_update = {
let state = self.state.read().unwrap();
state.last_updated.elapsed() >= self.cache_duration
};
let mut state = self.state.lock().await;

if should_update {
self.update_or_assume_unavailable().await;
if state.last_updated.elapsed() >= self.cache_duration {
state.update_or_assume_unavailable().await;
}

self.state.read().unwrap().is_live
}

/// Update the state of the agent-core service, or assume it's unavailable if something
/// went wrong.
pub async fn update_or_assume_unavailable(&self) {
let new_value = core_grpc().node().probe(None).await.unwrap_or(false);
let mut state = self.state.write().unwrap();
state.is_live = new_value;
state.last_updated = Instant::now();
state.is_live
}
}
8 changes: 4 additions & 4 deletions tests/bdd/features/health_probes/readiness_probe.feature
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@ Feature: Readiness Probe

Background:
Given a running agent-core service
And a running REST service with "--core-health-freq" set to "4s"
And a running REST service with the cache refresh period set to "800ms"

Scenario: The REST API /ready service should not update its readiness status more than once in 4s
Scenario: The REST API /ready service should not update its readiness status more than once in the cache refresh period
Given agent-core service is available
And the REST service returns a 200 status code for an HTTP GET request to the /ready endpoint
When the agent-core service is brought down forcefully
Then the REST service returns 200 for /ready endpoint for 2 more second
And after a delay of 4s the REST service returns 503 for /ready endpoint for the following 5s
Then the REST service return changes from 200 to 503 within double of the cache refresh period
And it keeps returning 503 at least for the cache refresh period
74 changes: 46 additions & 28 deletions tests/bdd/features/health_probes/test_readiness_probe.py
Original file line number Diff line number Diff line change
@@ -1,37 +1,51 @@
"""Readiness Probe feature tests."""

import logging
import time

import pytest
import requests
from common.deployer import Deployer
from common.docker import Docker
from pytest_bdd import given, scenario, then, when
from requests import get as http_get
from retrying import retry

READINESS_API_ENDPOINT = "http://localhost:8081/ready"
logger = logging.getLogger(__name__)


def ready_http_get(context_msg_prefix: str):
try:
response = requests.get("http://localhost:8081/ready", timeout=(0.003, 0.010))
logger.info(
f"{context_msg_prefix}: response.status_code: {response.status_code}"
)
return response
except requests.exceptions.Timeout:
logger.error(f"{context_msg_prefix}: the request timed out")
except requests.exceptions.RequestException as e:
logger.error(f"{context_msg_prefix}: an error occurred: {e}")


@pytest.fixture(scope="module")
def setup():
Deployer.start(io_engines=1, rest_core_health_freq="4s")
Deployer.start(io_engines=1, rest_core_health_freq="800ms", request_timeout="50ms")
yield
Deployer.stop()


@scenario(
"readiness_probe.feature",
"The REST API /ready service should not update its readiness status more than once in 4s",
"The REST API /ready service should not update its readiness status more than once in the cache refresh period",
)
def test_the_rest_api_ready_service_should_not_update_its_readiness_status_more_than_once_in_4s(
def test_the_rest_api_ready_service_should_not_update_its_readiness_status_more_than_once_in_the_cache_refresh_period(
setup,
):
"""The REST API /ready service should not update its readiness status more than once in 4s."""
"""The REST API /ready service should not update its readiness status more than once in the cache refresh period."""


@given('a running REST service with "--core-health-freq" set to "4s"')
@given('a running REST service with the cache refresh period set to "800ms"')
def a_running_rest_service(setup):
"""a running REST service with "--core-health-freq" set to "4s"."""
"""a running REST service with the cache refresh period set to "800ms"."""


@given("a running agent-core service")
Expand All @@ -51,14 +65,15 @@ def the_rest_service_returns_a_200_status_code_for_an_http_get_request_to_the_re
setup,
):
"""the REST service returns a 200 status code for an HTTP GET request to the /ready endpoint."""
logger.info(f"Initial request: expected status: 200")

# 5 minute retry.
@retry(
stop_max_attempt_number=1500,
wait_fixed=200,
)
def rest_is_ready():
response = http_get(READINESS_API_ENDPOINT)
response = ready_http_get(context_msg_prefix="Initial request")
assert response.status_code == 200

rest_is_ready()
Expand All @@ -70,31 +85,34 @@ def the_agent_core_service_is_brought_down_forcefully(setup):
Docker.kill_container("core")


@then("the REST service returns 200 for /ready endpoint for 2 more second")
def the_rest_service_returns_200_for_ready_endpoint_for_2_more_second(setup):
"""the REST service returns 200 for /ready endpoint for 2 more second."""
start_time = time.time()
while time.time() - start_time < 2:
response = http_get(READINESS_API_ENDPOINT)
if response.status_code != 200:
raise ValueError(
"Expected Readiness probe to return 200 for this duration of 2s"
)


@then(
"after a delay of 4s the REST service returns 503 for /ready endpoint for the following 5s"
"the REST service return changes from 200 to 503 within double of the cache refresh period"
)
def after_a_delay_of_4s_the_rest_service_returns_503_for_ready_endpoint_for_the_following_5s(
def the_rest_service_return_changes_from_200_to_503_within_double_of_the_cache_refresh_period(
setup,
):
"""the REST service return changes from 200 to 503 within double of the cache refresh period."""
logger.info(f"Request after killing core: expected status: 503")

@retry(wait_fixed=50, stop_max_delay=1600)
def rest_is_not_ready():
response = ready_http_get(context_msg_prefix="Request after killing core")
assert response.status_code == 503

rest_is_not_ready()


@then("it keeps returning 503 at least for the cache refresh period")
def it_keeps_returning_503_at_least_for_the_cache_refresh_period(
setup,
):
"""after a delay of 4s the REST service returns 503 for /ready endpoint for the following 5s."""
time.sleep(4)
"""it keeps returning 503 at least for the cache refresh period."""
logger.info(f"Request after cache refresh: expected status: 503")

start_time = time.time()
while time.time() - start_time < 5:
response = http_get(READINESS_API_ENDPOINT)
while time.time() - start_time < 0.8:
response = ready_http_get(context_msg_prefix="Request after cache refresh")
if response.status_code != 503:
raise ValueError(
"Expected Readiness probe to return 503 for this duration of 5s"
"Expected Readiness probe to return 503 for this duration of 800ms"
)

0 comments on commit 41dac82

Please sign in to comment.