Skip to content

Commit

Permalink
feat: add unit error and telementry
Browse files Browse the repository at this point in the history
  • Loading branch information
TroyKomodo committed Jul 13, 2024
1 parent b1bf1de commit c08f4d5
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 7 deletions.
35 changes: 30 additions & 5 deletions foundations/src/batcher/dataloader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,31 @@ use super::{BatchOperation, Batcher, BatcherConfig, BatcherDataloader, BatcherEr
#[allow(type_alias_bounds)]
pub type LoaderOutput<L: Loader<S>, S: BuildHasher = RandomState> = Result<HashMap<L::Key, L::Value, S>, L::Error>;

#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Ord, PartialOrd)]
pub struct UnitError;

impl std::error::Error for UnitError {}

impl std::fmt::Display for UnitError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "unknown")
}
}

impl From<()> for UnitError {
fn from(_: ()) -> Self {
Self
}
}

pub trait Loader<S: BuildHasher + Default = RandomState> {
type Key: Clone + Eq + std::hash::Hash + Send + Sync;
type Value: Clone + Send + Sync;
type Error: Clone + std::error::Error + Send + Sync;

fn config(&self) -> BatcherConfig {
BatcherConfig {
name: std::any::type_name::<Self>().to_string(),
concurrency: 10,
max_batch_size: 1000,
sleep_duration: std::time::Duration::from_millis(5),
Expand All @@ -23,13 +41,15 @@ pub trait Loader<S: BuildHasher + Default = RandomState> {
fn load(&self, keys: Vec<Self::Key>) -> impl std::future::Future<Output = LoaderOutput<Self, S>> + Send;
}

pub struct DataLoader<L: Loader<S> + Send + Sync, S: BuildHasher + Default + Send + Sync = RandomState>(
Batcher<Wrapper<L, S>>,
);
pub struct DataLoader<L: Loader<S> + Send + Sync, S: BuildHasher + Default + Send + Sync = RandomState> {
batcher: Batcher<Wrapper<L, S>>,
}

impl<L: Loader<S> + Send + Sync + 'static, S: BuildHasher + Default + Send + Sync + 'static> DataLoader<L, S> {
pub fn new(loader: L) -> Self {
Self(Batcher::new(Wrapper(loader, PhantomData)))
Self {
batcher: Batcher::new(Wrapper(loader, PhantomData)),
}
}

pub async fn load(&self, key: L::Key) -> Result<Option<L::Value>, BatcherError<L::Error>> {
Expand All @@ -42,7 +62,7 @@ impl<L: Loader<S> + Send + Sync + 'static, S: BuildHasher + Default + Send + Syn
&self,
keys: impl IntoIterator<Item = L::Key>,
) -> Result<HashMap<L::Key, L::Value, S>, BatcherError<L::Error>> {
self.0.execute_many(keys).await
self.batcher.execute_many(keys).await
}
}

Expand Down Expand Up @@ -119,6 +139,7 @@ mod tests {
results
}),
config: BatcherConfig {
name: "test".to_string(),
concurrency: 10,
max_batch_size: 1000,
sleep_duration: std::time::Duration::from_millis(5),
Expand Down Expand Up @@ -158,6 +179,7 @@ mod tests {
results
}),
config: BatcherConfig {
name: "test".to_string(),
concurrency: 10,
max_batch_size: 1000,
sleep_duration: std::time::Duration::from_millis(5),
Expand Down Expand Up @@ -192,6 +214,7 @@ mod tests {
results
}),
config: BatcherConfig {
name: "test".to_string(),
concurrency: 10,
max_batch_size: 3000,
sleep_duration: std::time::Duration::from_millis(5),
Expand Down Expand Up @@ -226,6 +249,7 @@ mod tests {
results
}),
config: BatcherConfig {
name: "test".to_string(),
concurrency: 10,
max_batch_size: 1000,
sleep_duration: std::time::Duration::from_millis(100),
Expand Down Expand Up @@ -261,6 +285,7 @@ mod tests {
}
}),
config: BatcherConfig {
name: "test".to_string(),
concurrency: 10,
max_batch_size: 1000,
sleep_duration: std::time::Duration::from_millis(5),
Expand Down
14 changes: 12 additions & 2 deletions foundations/src/batcher/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::sync::atomic::{AtomicU64, AtomicUsize};
use std::sync::Arc;

use tokio::sync::OnceCell;
use tracing::Instrument;

pub mod dataloader;

Expand Down Expand Up @@ -228,6 +229,7 @@ struct BatcherInner<T: BatchOperation> {
batch_id: AtomicU64,
max_batch_size: AtomicUsize,
operation: T,
name: String,
active_batch: tokio::sync::RwLock<Option<Batch<T>>>,
}

Expand Down Expand Up @@ -283,10 +285,16 @@ impl<E: std::error::Error> From<E> for BatcherError<E> {
}

impl<T: BatchOperation + 'static> Batch<T> {
#[tracing::instrument(skip_all, fields(name = %inner.name))]
async fn run(self, inner: Arc<BatcherInner<T>>) {
self.results
.get_or_init(|| async move {
let _ticket = inner.semaphore.acquire().await.map_err(|_| BatcherError::AcquireSemaphore)?;
let _ticket = inner
.semaphore
.acquire()
.instrument(tracing::debug_span!("Semaphore"))
.await
.map_err(|_| BatcherError::AcquireSemaphore)?;
Ok(inner.operation.process(self.ops).await.map_err(BatcherError::Batch)?)
})
.await;
Expand All @@ -295,6 +303,7 @@ impl<T: BatchOperation + 'static> Batch<T> {

#[derive(Clone)]
pub struct BatcherConfig {
pub name: String,
pub concurrency: usize,
pub max_batch_size: usize,
pub sleep_duration: std::time::Duration,
Expand Down Expand Up @@ -371,6 +380,7 @@ impl<T: BatchOperation + 'static + Send + Sync> Batcher<T> {
sleep_duration: AtomicU64::new(config.sleep_duration.as_nanos() as u64),
max_batch_size: AtomicUsize::new(config.max_batch_size),
operation,
name: config.name,
});

Self {
Expand Down Expand Up @@ -405,7 +415,7 @@ impl<T: BatchOperation + 'static + Send + Sync> Batcher<T> {
T::Mode::output_item_to_result(iter.into_iter().next().ok_or(BatcherError::MissingResult)?)
}

#[tracing::instrument(skip_all)]
#[tracing::instrument(skip_all, fields(name = %self.inner.name))]
pub async fn execute_many(
&self,
documents: impl IntoIterator<Item = T::Item>,
Expand Down

0 comments on commit c08f4d5

Please sign in to comment.