Skip to content

Commit

Permalink
fix: add health-check
Browse files Browse the repository at this point in the history
  • Loading branch information
TroyKomodo committed May 1, 2024
1 parent bb04414 commit ba1a5b5
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 28 deletions.
2 changes: 0 additions & 2 deletions foundations/src/dataloader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ use std::sync::atomic::AtomicU64;
use std::sync::Arc;
use std::time::Duration;

use futures::FutureExt;

use self::batch_loader::BatchLoader;
pub use self::types::LoaderOutput;
use self::types::{BatchState, DataLoaderInner};
Expand Down
52 changes: 26 additions & 26 deletions foundations/src/telementry/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,46 +160,46 @@ async fn metrics(
}

#[cfg(feature = "health-check")]
pub fn register_health_check<F, Fut>(check: F) -> usize
where
F: Fn() -> Fut + Send + Sync + 'static,
Fut: std::future::Future<Output = bool> + Send + Sync + 'static,
{
register_health_check_boxed(Box::new(move || Box::pin(check())))
}

#[cfg(feature = "health-check")]
pub fn register_health_check_boxed(check: health_check::BoxHealthCheck) -> usize {
health_check::register(check)
}

#[cfg(feature = "health-check")]
pub fn unregister_health_check(id: usize) {
health_check::unregister(id);
}
pub use health_check::{register as register_health_check, unregister as unregister_health_check, HealthCheck};

#[cfg(feature = "health-check")]
mod health_check {
use std::pin::Pin;
use std::sync::atomic::AtomicUsize;

use futures::Future;
use scc::HashMap;

type BoxFut = Pin<Box<dyn std::future::Future<Output = bool> + Send + Sync>>;
pub(super) type BoxHealthCheck = Box<dyn Fn() -> BoxFut + Send + Sync>;
pub trait HealthCheck: Send + Sync + 'static {
fn check(&self) -> Pin<Box<dyn Future<Output = bool> + Send + '_>>;
}

impl<F, Fut> HealthCheck for F
where
F: Fn() -> Fut + Send + Sync + 'static,
Fut: Future<Output = bool> + Send + Sync + 'static,
{
fn check(&self) -> Pin<Box<dyn Future<Output = bool> + Send + '_>> {
Box::pin((self)())
}
}

#[derive(Default)]
struct HealthCheck {
struct HealthChecker {
id: AtomicUsize,
health_checks: HashMap<usize, BoxHealthCheck>,
health_checks: HashMap<usize, Box<dyn HealthCheck>>,
}

static HEALTH_CHECK: once_cell::sync::Lazy<HealthCheck> =
once_cell::sync::Lazy::<HealthCheck>::new(|| HealthCheck::default());
static HEALTH_CHECK: once_cell::sync::Lazy<HealthChecker> =
once_cell::sync::Lazy::<HealthChecker>::new(|| HealthChecker::default());

pub fn register(check: BoxHealthCheck) -> usize {
pub fn register(check: impl HealthCheck) -> usize {
let id = HEALTH_CHECK.id.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
HEALTH_CHECK.health_checks.insert(id, check).ok().expect("id already exists");
HEALTH_CHECK
.health_checks
.insert(id, Box::new(check))
.ok()
.expect("id already exists");
id
}

Expand All @@ -211,7 +211,7 @@ mod health_check {
let mut o_entry = HEALTH_CHECK.health_checks.first_entry_async().await;

while let Some(entry) = o_entry {
if (entry.get())().await {
if (entry.get()).check().await {
return false;
}

Expand Down

0 comments on commit ba1a5b5

Please sign in to comment.