Skip to content

Commit

Permalink
Change ReconcileAction to Action and add associated ctors
Browse files Browse the repository at this point in the history
fixes #317

Signed-off-by: clux <[email protected]>
  • Loading branch information
clux committed Mar 17, 2022
1 parent d99a144 commit 90dbd17
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 40 deletions.
17 changes: 5 additions & 12 deletions examples/configmapgen_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use futures::StreamExt;
use k8s_openapi::api::core::v1::ConfigMap;
use kube::{
api::{Api, ListParams, ObjectMeta, Patch, PatchParams, Resource},
runtime::controller::{Context, Controller, ReconcilerAction},
runtime::controller::{Action, Context, Controller},
Client, CustomResource,
};
use schemars::JsonSchema;
Expand All @@ -29,10 +29,7 @@ struct ConfigMapGeneratorSpec {
}

/// Controller triggers this whenever our main object or our children changed
async fn reconcile(
generator: Arc<ConfigMapGenerator>,
ctx: Context<Data>,
) -> Result<ReconcilerAction, Error> {
async fn reconcile(generator: Arc<ConfigMapGenerator>, ctx: Context<Data>) -> Result<Action, Error> {
log::info!("working hard");
tokio::time::sleep(Duration::from_secs(2)).await;
log::info!("hard work is done!");
Expand Down Expand Up @@ -70,16 +67,12 @@ async fn reconcile(
)
.await
.map_err(Error::ConfigMapCreationFailed)?;
Ok(ReconcilerAction {
requeue_after: Some(Duration::from_secs(300)),
})
Ok(Action::requeue(Duration::from_secs(300)))
}

/// The controller triggers this on reconcile errors
fn error_policy(_error: &Error, _ctx: Context<Data>) -> ReconcilerAction {
ReconcilerAction {
requeue_after: Some(Duration::from_secs(1)),
}
fn error_policy(_error: &Error, _ctx: Context<Data>) -> Action {
Action::requeue(Duration::from_secs(1))
}

// Data we want access to in error/reconcile calls
Expand Down
78 changes: 55 additions & 23 deletions kube-runtime/src/controller/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,44 @@ pub enum Error<ReconcilerErr: std::error::Error + 'static, QueueErr: std::error:

/// Results of the reconciliation attempt
#[derive(Debug, Clone)]
pub struct ReconcilerAction {
pub struct Action {
/// Whether (and when) to next trigger the reconciliation if no external watch triggers hit
///
/// For example, use this to query external systems for updates, expire time-limited resources, or
/// (in your `error_policy`) retry after errors.
pub requeue_after: Option<Duration>,
requeue_after: Option<Duration>,
}

impl Action {
/// Action to to the reconciliation at this time even if no external watch triggers hit
///
/// This is the best-practice action that ensures eventual consistency of your controller
/// even in the case of missed changes (which can happen).
///
/// Watch events are not normally missed, so running this once per hour (`Default`) as a fallback is reasonable.
pub fn requeue(duration: Duration) -> Self {
Self {
requeue_after: Some(duration),
}
}

/// Do nothing until a change is detected
///
/// This stops the controller periodically reconciling this object until a relevant watch event
/// was **detected**.
///
/// **Warning**: If you have watch desyncs, it is possible to miss changes entirely.
/// It is therefore not recommended to disable requeuing this way, unless you have
/// frequent changes to the underlying object, or some other hook to retain eventual consistency.
pub fn await_change() -> Self {
Self { requeue_after: None }
}
}

impl Default for Action {
fn default() -> Self {
Action::requeue(Duration::from_secs(3600))
}
}

/// Helper for building custom trigger filters, see the implementations of [`trigger_self`] and [`trigger_owners`] for some examples.
Expand Down Expand Up @@ -219,15 +251,15 @@ impl Display for ReconcileReason {
/// (such as triggering from arbitrary [`Stream`]s), at the cost of being a bit more verbose.
pub fn applier<K, QueueStream, ReconcilerFut, T>(
mut reconciler: impl FnMut(Arc<K>, Context<T>) -> ReconcilerFut,
mut error_policy: impl FnMut(&ReconcilerFut::Error, Context<T>) -> ReconcilerAction,
mut error_policy: impl FnMut(&ReconcilerFut::Error, Context<T>) -> Action,
context: Context<T>,
store: Store<K>,
queue: QueueStream,
) -> impl Stream<Item = Result<(ObjectRef<K>, ReconcilerAction), Error<ReconcilerFut::Error, QueueStream::Error>>>
) -> impl Stream<Item = Result<(ObjectRef<K>, Action), Error<ReconcilerFut::Error, QueueStream::Error>>>
where
K: Clone + Resource + 'static,
K::DynamicType: Debug + Eq + Hash + Clone + Unpin,
ReconcilerFut: TryFuture<Ok = ReconcilerAction> + Unpin,
ReconcilerFut: TryFuture<Ok = Action> + Unpin,
ReconcilerFut::Error: std::error::Error + 'static,
QueueStream: TryStream,
QueueStream::Ok: Into<ReconcileRequest<K>>,
Expand Down Expand Up @@ -283,7 +315,7 @@ where
.on_complete(async { tracing::debug!("applier runner-merge terminated") })
// finally, for each completed reconcile call:
.and_then(move |(obj_ref, reconciler_result, reconciler_span)| {
let (ReconcilerAction { requeue_after }, requeue_reason) = match &reconciler_result {
let (Action { requeue_after }, requeue_reason) = match &reconciler_result {
Ok(action) =>
// do what user told us
(action.clone(), ReconcileReason::ReconcilerRequestedRetry),
Expand Down Expand Up @@ -327,7 +359,7 @@ where
/// use kube::{
/// Client, CustomResource,
/// api::{Api, ListParams},
/// runtime::controller::{Context, Controller, ReconcilerAction}
/// runtime::controller::{Context, Controller, Action}
/// };
/// use serde::{Deserialize, Serialize};
/// use tokio::time::Duration;
Expand All @@ -348,17 +380,17 @@ where
/// }
///
/// /// The reconciler that will be called when either object change
/// async fn reconcile(g: Arc<ConfigMapGenerator>, _ctx: Context<()>) -> Result<ReconcilerAction, Error> {
/// async fn reconcile(g: Arc<ConfigMapGenerator>, _ctx: Context<()>) -> Result<Action, Error> {
/// // .. use api here to reconcile a child ConfigMap with ownerreferences
/// // see configmapgen_controller example for full info
/// Ok(ReconcilerAction {
/// requeue_after: Some(Duration::from_secs(300)),
/// Ok(Action {
/// requeue: Some(Duration::from_secs(300)),
/// })
/// }
/// /// an error handler that will be called when the reconciler fails
/// fn error_policy(_error: &Error, _ctx: Context<()>) -> ReconcilerAction {
/// ReconcilerAction {
/// requeue_after: Some(Duration::from_secs(60)),
/// fn error_policy(_error: &Error, _ctx: Context<()>) -> Action {
/// Action {
/// requeue: Some(Duration::from_secs(60)),
/// }
/// }
///
Expand Down Expand Up @@ -594,7 +626,7 @@ where
/// use kube::{
/// Client,
/// api::{ListParams, Api, ResourceExt},
/// runtime::{controller::{Context, Controller, ReconcilerAction}},
/// runtime::{controller::{Context, Controller, Action}},
/// };
/// use std::{convert::Infallible, io::BufRead};
/// let (mut reload_tx, reload_rx) = futures::channel::mpsc::channel(0);
Expand All @@ -613,7 +645,7 @@ where
/// .run(
/// |o, _| async move {
/// println!("Reconciling {}", o.name());
/// Ok(ReconcilerAction { requeue_after: None })
/// Ok(Action { requeue: None })
/// },
/// |err: &Infallible, _| Err(err).unwrap(),
/// Context::new(()),
Expand Down Expand Up @@ -658,7 +690,7 @@ where
/// use futures::future::FutureExt;
/// use k8s_openapi::api::core::v1::ConfigMap;
/// use kube::{api::ListParams, Api, Client, ResourceExt};
/// use kube_runtime::controller::{Context, Controller, ReconcilerAction};
/// use kube_runtime::controller::{Context, Controller, Action};
/// use std::convert::Infallible;
/// Controller::new(
/// Api::<ConfigMap>::all(Client::try_default().await.unwrap()),
Expand All @@ -668,7 +700,7 @@ where
/// .run(
/// |o, _| async move {
/// println!("Reconciling {}", o.name());
/// Ok(ReconcilerAction { requeue_after: None })
/// Ok(Action { requeue: None })
/// },
/// |err: &Infallible, _| Err(err).unwrap(),
/// Context::new(()),
Expand Down Expand Up @@ -748,12 +780,12 @@ where
pub fn run<ReconcilerFut, T>(
self,
mut reconciler: impl FnMut(Arc<K>, Context<T>) -> ReconcilerFut,
error_policy: impl FnMut(&ReconcilerFut::Error, Context<T>) -> ReconcilerAction,
error_policy: impl FnMut(&ReconcilerFut::Error, Context<T>) -> Action,
context: Context<T>,
) -> impl Stream<Item = Result<(ObjectRef<K>, ReconcilerAction), Error<ReconcilerFut::Error, watcher::Error>>>
) -> impl Stream<Item = Result<(ObjectRef<K>, Action), Error<ReconcilerFut::Error, watcher::Error>>>
where
K::DynamicType: Debug + Unpin,
ReconcilerFut: TryFuture<Ok = ReconcilerAction> + Send + 'static,
ReconcilerFut: TryFuture<Ok = Action> + Send + 'static,
ReconcilerFut::Error: std::error::Error + Send + 'static,
{
applier(
Expand All @@ -775,7 +807,7 @@ where

#[cfg(test)]
mod tests {
use super::{Context, ReconcilerAction};
use super::{Action, Context};
use crate::Controller;
use k8s_openapi::api::core::v1::ConfigMap;
use kube_client::Api;
Expand All @@ -795,8 +827,8 @@ mod tests {
fn test_controller_should_be_send() {
assert_send(
Controller::new(mock_type::<Api<ConfigMap>>(), Default::default()).run(
|_, _| async { Ok(mock_type::<ReconcilerAction>()) },
|_: &std::io::Error, _| mock_type::<ReconcilerAction>(),
|_, _| async { Ok(mock_type::<Action>()) },
|_: &std::io::Error, _| mock_type::<Action>(),
Context::new(()),
),
);
Expand Down
10 changes: 5 additions & 5 deletions kube-runtime/src/finalizer.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//! Finalizer helper for [`Controller`](crate::Controller) reconcilers
use crate::controller::ReconcilerAction;
use crate::controller::Action;
use futures::{TryFuture, TryFutureExt};
use json_patch::{AddOperation, PatchOperation, RemoveOperation, TestOperation};
use kube_client::{
Expand Down Expand Up @@ -103,10 +103,10 @@ pub async fn finalizer<K, ReconcileFut>(
finalizer_name: &str,
obj: Arc<K>,
reconcile: impl FnOnce(Event<K>) -> ReconcileFut,
) -> Result<ReconcilerAction, Error<ReconcileFut::Error>>
) -> Result<Action, Error<ReconcileFut::Error>>
where
K: Resource + Clone + DeserializeOwned + Serialize + Debug,
ReconcileFut: TryFuture<Ok = ReconcilerAction>,
ReconcileFut: TryFuture<Ok = Action>,
ReconcileFut::Error: StdError + 'static,
{
match FinalizerState::for_object(&*obj, finalizer_name) {
Expand Down Expand Up @@ -178,14 +178,14 @@ where
.await
.map_err(Error::AddFinalizer)?;
// No point applying here, since the patch will cause a new reconciliation
Ok(ReconcilerAction { requeue_after: None })
Ok(Action::await_change())
}
FinalizerState {
finalizer_index: None,
is_deleting: true,
} => {
// Our work here is done
Ok(ReconcilerAction { requeue_after: None })
Ok(Action::await_change())
}
}
}
Expand Down

0 comments on commit 90dbd17

Please sign in to comment.