Skip to content

Commit

Permalink
forgot to add file
Browse files Browse the repository at this point in the history
Signed-off-by: clux <[email protected]>
  • Loading branch information
clux committed Nov 12, 2021
1 parent fdcfbca commit 5a2bd69
Showing 1 changed file with 114 additions and 0 deletions.
114 changes: 114 additions & 0 deletions kube-runtime/src/observer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
use backoff::ExponentialBackoff;
use futures::{future::ready, Stream, StreamExt};
use kube_client::{
api::{Api, ListParams},
core::Resource,
};
use serde::de::DeserializeOwned;
use std::fmt::Debug;

use crate::{
utils,
watcher::{watcher, Error, Event, Result},
};


/// A simple observer around a watcher with error handling and retry backoff
///
/// # Error handling
///
/// An `Observer` sets a sensible default backoff policy for all watch events and will retry
/// (with expotential backoff) from transient `watcher` failures until the retry policy is breached.
///
/// To configure the retry policy use `Observer::backoff`.
///
/// Note that while all errors will be retried, some represent the need for user action to recover:
///
/// - 404 `ErrorResponse`(watching invalid / missing api kind/group for `K`)
/// - 403 `ErrorResponse` (missing list + watch rbac verbs for `K`)
pub struct Observer<K>
where
K: Clone + Resource + Send + Sync + 'static,
{
// temporary builder params
api: Api<K>,
listparams: Option<ListParams>,
backoff: Option<ExponentialBackoff>,
}

impl<K> From<Api<K>> for Observer<K>
where
K: Clone + Resource + DeserializeOwned + Debug + Send + Sync + 'static,
{
fn from(api: Api<K>) -> Observer<K> {
Observer::new(api)
}
}

impl<K> Observer<K>
where
K: Clone + Resource + DeserializeOwned + Debug + Send + Sync + 'static,
{
/// Create a Observer on a reflector on a type `K`
///
/// Takes an [`Api`] object that determines how the `Observer` listens for changes to the `K`.
///
/// The [`ListParams`] controls to the possible subset of objects of `K` that you want to cache.
/// For the full set of objects `K` in the given `Api` scope, you can use [`ListParams::default`].
#[must_use]
pub fn new(api: Api<K>) -> Self {
Self {
api,
listparams: None,
backoff: None,
}
}

// start the watcher and filter out backoff errors from the stream for a while
fn start(self) -> impl Stream<Item = Result<Event<K>>> {
let backoff = self.backoff.unwrap_or_else(|| backoff::ExponentialBackoff {
max_elapsed_time: Some(std::time::Duration::from_secs(60 * 10)),
..ExponentialBackoff::default()
});
let lp = self.listparams.unwrap_or_else(|| ListParams::default());
watcher(self.api, lp, backoff).filter(|r| ready(!std::matches!(r, Err(Error::BackoffRetriable))))
}

/// Set the backoff policy
pub fn backoff(mut self, backoff: ExponentialBackoff) -> Self {
self.backoff = Some(backoff);
self
}

/// Set the parameters for the watch
pub fn params(mut self, lp: ListParams) -> Self {
self.listparams = Some(lp);
self
}

/// Run the watcher and produce an information stream of watch events (modified/added)
///
/// This stream will emit only `Ok` events until the error policy is breached
///
/// # Errors
///
/// If a [`watcher::Error`] was encountered for longer than what the
/// [`ExponentialBackoff`](backoff::ExponentialBackoff) policy allows, then
/// that error is considered irrecoverable and propagated in a stream item here.
pub fn watch_applies(self) -> impl Stream<Item = Result<K, Error>> {
utils::try_flatten_applied(self.start())
}

/// Run the watcher, and produce an informational stream of watch events (modified/added/deleted)
///
/// This stream will emit only `Ok` events until the error policy is breached
///
/// # Errors
///
/// If a [`watcher::Error`] was encountered for longer than what the
/// [`ExponentialBackoff`](backoff::ExponentialBackoff) policy allows, then
/// that error is considered irrecoverable and propagated in a stream item here.
pub fn watch_touches(self) -> impl Stream<Item = Result<K, Error>> {
utils::try_flatten_touched(self.start())
}
}

0 comments on commit 5a2bd69

Please sign in to comment.