Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(stats): improve indexing status err handling #1114

Merged
merged 7 commits into from
Nov 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion stats/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ by enabling word wrapping
| `STATS__CONCURRENT_​START_UPDATES` | | Amount of concurrent charts update on start | `3` |
| `STATS__​DEFAULT_​SCHEDULE` | | Schedule used for update groups with no config | `"0 0 1 * * * *"` |
| `STATS__LIMITS__REQUESTED_​POINTS_LIMIT` | | Maximum allowed number of requested points | `182500` |
| `STATS__BLOCKSCOUT_API_URL` | Required unless `STATS__​IGNORE_​​BLOCKSCOUT_​API_​ABSENCE` is set to `true`. | URL to Blockscout API. | `null` |
| `STATS__BLOCKSCOUT_API_URL` | Required unless `STATS__​IGNORE_​​BLOCKSCOUT_​API_​ABSENCE` is set to `true`. | URL to Blockscout API. Used for [conditional update start](#conditional-start). | `null` |
| `STATS__CONDITIONAL_​START__CHECK_PERIOD_SECS` | | Time between start condition checking (if they are not satisfied) | `5` |
| `STATS__CONDITIONAL_​START__BLOCKS_RATIO__​ENABLED` | | Enable `blocks_​ratio` threshold | `true` |
| `STATS__CONDITIONAL_​START__BLOCKS_RATIO__​THRESHOLD` | | Value for `blocks_​ratio` threshold | `0.98` |
Expand All @@ -95,6 +95,11 @@ by enabling word wrapping

[anchor]: <> (anchors.envs.end.service)

##### Conditional start
In order to prevent incorrect statistics from being collected, there is an option to automatically delay chart update. This is controlled by `STATS_CONDITIONAL_​START_*` environmental variables.

The service will periodically check the enabled start conditions and start updating charts once they are satisfied.

<details><summary>Server settings</summary>
<p>

Expand Down
72 changes: 29 additions & 43 deletions stats/stats-server/src/blockscout_waiter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,10 @@ use crate::settings::{Settings, StartConditionSettings, ToggleableThreshold};

use anyhow::Context;
use blockscout_service_launcher::launcher::ConfigSettings;
use reqwest::StatusCode;
use tokio::time::sleep;
use tracing::{info, warn};

fn is_retryable_code(status_code: &reqwest::StatusCode) -> bool {
matches!(
*status_code,
StatusCode::INTERNAL_SERVER_ERROR
| StatusCode::SERVICE_UNAVAILABLE
| StatusCode::GATEWAY_TIMEOUT
| StatusCode::TOO_MANY_REQUESTS
| StatusCode::IM_A_TEAPOT
)
}
const RETRIES: u64 = 10;

fn is_threshold_passed(
threshold: &ToggleableThreshold,
Expand Down Expand Up @@ -57,6 +47,7 @@ pub async fn wait_for_blockscout_indexing(
api_config: blockscout_client::Configuration,
wait_config: StartConditionSettings,
) -> Result<(), anyhow::Error> {
let mut consecutive_errors = 0;
loop {
match blockscout_client::apis::main_page_api::get_indexing_status(&api_config).await {
Ok(result)
Expand All @@ -75,16 +66,20 @@ pub async fn wait_for_blockscout_indexing(
info!("Blockscout indexing threshold passed");
return Ok(());
}
Ok(_) => {}
Err(blockscout_client::Error::ResponseError(r)) if is_retryable_code(&r.status) => {
warn!("Error from indexing status endpoint: {r:?}");
Ok(_) => {
info!("Blockscout indexing threshold is not passed");
consecutive_errors = 0;
}
Err(e) => {
return Err(e).context("Requesting indexing status");
if consecutive_errors >= RETRIES {
return Err(e).context("Requesting indexing status");
}
warn!("Error ({consecutive_errors}/{RETRIES}) requesting indexing status: {e:?}");
consecutive_errors += 1;
}
}
info!(
"Blockscout is not indexed enough. Checking again in {} secs",
"Rechecking indexing status in {} secs",
wait_config.check_period_secs
);
sleep(Duration::from_secs(wait_config.check_period_secs.into())).await;
Expand Down Expand Up @@ -139,11 +134,12 @@ mod tests {

async fn test_wait_indexing(
wait_config: StartConditionSettings,
timeout: Option<Duration>,
response: ResponseTemplate,
) -> Result<Result<(), anyhow::Error>, Elapsed> {
let server = mock_indexing_status(response).await;
tokio::time::timeout(
Duration::from_millis(500),
timeout.unwrap_or(Duration::from_millis(500)),
wait_for_blockscout_indexing(
blockscout_client::Configuration::new(Url::from_str(&server.uri()).unwrap()),
wait_config,
Expand All @@ -156,11 +152,12 @@ mod tests {
fn wait_config(
#[default(0.9)] blocks: f64,
#[default(0.9)] internal_transactions: f64,
#[default(0)] check_period_secs: u32,
) -> StartConditionSettings {
StartConditionSettings {
blocks_ratio: ToggleableThreshold::enabled(blocks),
internal_transactions_ratio: ToggleableThreshold::enabled(internal_transactions),
check_period_secs: 0,
check_period_secs,
}
}

Expand All @@ -171,6 +168,7 @@ mod tests {
) {
test_wait_indexing(
wait_config.clone(),
None,
ResponseTemplate::new(200).set_body_string(
r#"{
"finished_indexing": true,
Expand All @@ -186,6 +184,7 @@ mod tests {

test_wait_indexing(
wait_config,
None,
ResponseTemplate::new(200).set_body_string(
r#"{
"finished_indexing": false,
Expand All @@ -206,6 +205,7 @@ mod tests {
) {
test_wait_indexing(
wait_config,
None,
ResponseTemplate::new(200)
.set_body_string(
r#"{
Expand All @@ -229,6 +229,7 @@ mod tests {
) {
test_wait_indexing(
wait_config,
None,
ResponseTemplate::new(200)
.set_body_string(
r#"{
Expand All @@ -247,38 +248,23 @@ mod tests {
#[rstest]
#[tokio::test]
async fn wait_for_blockscout_indexing_retries_with_error_codes(
wait_config: StartConditionSettings,
#[with(0.9, 0.9, 1)] wait_config: StartConditionSettings,
) {
let timeout = Some(Duration::from_millis(1500));
let mut error_servers = JoinSet::from_iter([
test_wait_indexing(wait_config.clone(), ResponseTemplate::new(429)),
test_wait_indexing(wait_config.clone(), ResponseTemplate::new(500)),
test_wait_indexing(wait_config.clone(), ResponseTemplate::new(503)),
test_wait_indexing(wait_config.clone(), ResponseTemplate::new(504)),
test_wait_indexing(wait_config.clone(), timeout, ResponseTemplate::new(429)),
test_wait_indexing(wait_config.clone(), timeout, ResponseTemplate::new(500)),
test_wait_indexing(wait_config.clone(), timeout, ResponseTemplate::new(503)),
test_wait_indexing(wait_config.clone(), timeout, ResponseTemplate::new(504)),
test_wait_indexing(wait_config.clone(), timeout, ResponseTemplate::new(400)),
test_wait_indexing(wait_config.clone(), timeout, ResponseTemplate::new(403)),
test_wait_indexing(wait_config.clone(), timeout, ResponseTemplate::new(404)),
test_wait_indexing(wait_config.clone(), timeout, ResponseTemplate::new(405)),
]);
#[allow(for_loops_over_fallibles)]
for server in error_servers.join_next().await {
let test_result = server.unwrap();
test_result.expect_err("must time out");
}
}

#[rstest]
#[tokio::test]
async fn wait_for_blockscout_indexing_fails_with_error_codes(
wait_config: StartConditionSettings,
) {
let mut error_servers = JoinSet::from_iter([
test_wait_indexing(wait_config.clone(), ResponseTemplate::new(400)),
test_wait_indexing(wait_config.clone(), ResponseTemplate::new(403)),
test_wait_indexing(wait_config.clone(), ResponseTemplate::new(404)),
test_wait_indexing(wait_config.clone(), ResponseTemplate::new(405)),
]);
#[allow(for_loops_over_fallibles)]
for server in error_servers.join_next().await {
let test_result = server.unwrap();
test_result
.expect("must fail immediately")
.expect_err("must report error");
}
}
}
21 changes: 17 additions & 4 deletions stats/stats-server/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,12 +110,14 @@ pub async fn stats(mut settings: Settings) -> Result<(), anyhow::Error> {
let update_service =
Arc::new(UpdateService::new(db.clone(), blockscout, charts.clone()).await?);

tokio::spawn(async move {
let update_service_handle = tokio::spawn(async move {
// Wait for blockscout to index, if necessary.
if let Some(config) = blockscout_api_config {
if let Err(e) = wait_for_blockscout_indexing(config, settings.conditional_start).await {
tracing::error!(error =? e, "Error starting update service. Failed while waiting for blockscout indexing");
return;
let error_msg =
"Error starting update service. Failed while waiting for blockscout indexing";
tracing::error!(error =? e, error_msg);
panic!("{}. {:?}", error_msg, e);
}
}

Expand All @@ -126,6 +128,7 @@ pub async fn stats(mut settings: Settings) -> Result<(), anyhow::Error> {
settings.force_update_on_start,
)
.await;
Ok(())
});

if settings.metrics.enabled {
Expand All @@ -148,5 +151,15 @@ pub async fn stats(mut settings: Settings) -> Result<(), anyhow::Error> {
metrics: settings.metrics,
};

launcher::launch(&launch_settings, http_router, grpc_router).await
let futures = vec![
update_service_handle,
tokio::spawn(
async move { launcher::launch(&launch_settings, http_router, grpc_router).await },
),
];
let (res, _, others) = futures::future::select_all(futures).await;
for future in others.into_iter() {
future.abort()
}
res?
}
Loading