Skip to content

Commit

Permalink
feat: add unit error and telementry
Browse files Browse the repository at this point in the history
  • Loading branch information
TroyKomodo committed Jul 13, 2024
1 parent b1bf1de commit e0eca0d
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 7 deletions.
35 changes: 30 additions & 5 deletions foundations/src/batcher/dataloader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,30 @@ use super::{BatchOperation, Batcher, BatcherConfig, BatcherDataloader, BatcherEr
#[allow(type_alias_bounds)]
pub type LoaderOutput<L: Loader<S>, S: BuildHasher = RandomState> = Result<HashMap<L::Key, L::Value, S>, L::Error>;

#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Ord, PartialOrd)]
pub struct UnitError;

impl std::error::Error for UnitError {}

impl std::fmt::Display for UnitError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "unknown")
}
}

impl From<()> for UnitError {
fn from(_: ()) -> Self {
Self
}
}

pub trait Loader<S: BuildHasher + Default = RandomState> {
type Key: Clone + Eq + std::hash::Hash + Send + Sync;
type Value: Clone + Send + Sync;
type Error: Clone + std::error::Error + Send + Sync;

const NAME: &'static str;

fn config(&self) -> BatcherConfig {
BatcherConfig {
concurrency: 10,
Expand All @@ -23,13 +42,15 @@ pub trait Loader<S: BuildHasher + Default = RandomState> {
fn load(&self, keys: Vec<Self::Key>) -> impl std::future::Future<Output = LoaderOutput<Self, S>> + Send;
}

pub struct DataLoader<L: Loader<S> + Send + Sync, S: BuildHasher + Default + Send + Sync = RandomState>(
Batcher<Wrapper<L, S>>,
);
pub struct DataLoader<L: Loader<S> + Send + Sync, S: BuildHasher + Default + Send + Sync = RandomState> {
batcher: Batcher<Wrapper<L, S>>,
}

impl<L: Loader<S> + Send + Sync + 'static, S: BuildHasher + Default + Send + Sync + 'static> DataLoader<L, S> {
pub fn new(loader: L) -> Self {
Self(Batcher::new(Wrapper(loader, PhantomData)))
Self {
batcher: Batcher::new(Wrapper(loader, PhantomData)),
}
}

pub async fn load(&self, key: L::Key) -> Result<Option<L::Value>, BatcherError<L::Error>> {
Expand All @@ -42,7 +63,7 @@ impl<L: Loader<S> + Send + Sync + 'static, S: BuildHasher + Default + Send + Syn
&self,
keys: impl IntoIterator<Item = L::Key>,
) -> Result<HashMap<L::Key, L::Value, S>, BatcherError<L::Error>> {
self.0.execute_many(keys).await
self.batcher.execute_many(keys).await
}
}

Expand All @@ -54,6 +75,8 @@ impl<L: Loader<S> + Send + Sync, S: BuildHasher + Default + Send + Sync> BatchOp
type Mode = BatcherDataloader<S>;
type Response = L::Value;

const NAME: &'static str = L::NAME;

fn config(&self) -> BatcherConfig {
self.0.config()
}
Expand Down Expand Up @@ -88,6 +111,8 @@ mod tests {
type Key = u64;
type Value = u64;

const NAME: &'static str = "test";

fn config(&self) -> BatcherConfig {
self.config.clone()
}
Expand Down
13 changes: 11 additions & 2 deletions foundations/src/batcher/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::sync::atomic::{AtomicU64, AtomicUsize};
use std::sync::Arc;

use tokio::sync::OnceCell;
use tracing::Instrument;

pub mod dataloader;

Expand Down Expand Up @@ -200,6 +201,8 @@ pub trait BatchOperation: Send + Sync {
type Error: Clone + std::fmt::Debug + Send + Sync;
type Mode: BatchMode<Self>;

const NAME: &'static str;

fn config(&self) -> BatcherConfig;

fn process(
Expand Down Expand Up @@ -283,10 +286,16 @@ impl<E: std::error::Error> From<E> for BatcherError<E> {
}

impl<T: BatchOperation + 'static> Batch<T> {
#[tracing::instrument(skip_all, fields(name = %T::NAME))]
async fn run(self, inner: Arc<BatcherInner<T>>) {
self.results
.get_or_init(|| async move {
let _ticket = inner.semaphore.acquire().await.map_err(|_| BatcherError::AcquireSemaphore)?;
let _ticket = inner
.semaphore
.acquire()
.instrument(tracing::debug_span!("Semaphore"))
.await
.map_err(|_| BatcherError::AcquireSemaphore)?;
Ok(inner.operation.process(self.ops).await.map_err(BatcherError::Batch)?)
})
.await;
Expand Down Expand Up @@ -405,7 +414,7 @@ impl<T: BatchOperation + 'static + Send + Sync> Batcher<T> {
T::Mode::output_item_to_result(iter.into_iter().next().ok_or(BatcherError::MissingResult)?)
}

#[tracing::instrument(skip_all)]
#[tracing::instrument(skip_all, fields(name = %T::NAME))]
pub async fn execute_many(
&self,
documents: impl IntoIterator<Item = T::Item>,
Expand Down

0 comments on commit e0eca0d

Please sign in to comment.