Skip to content

Commit

Permalink
Change ReconcileAction to Action and add associated ctors (#851)
Browse files Browse the repository at this point in the history
* Change ReconcileAction to Action and add associated ctors

fixes #317

Signed-off-by: clux <[email protected]>

* clippy

Signed-off-by: clux <[email protected]>

* clippy + docs

Signed-off-by: clux <[email protected]>

* fix extraneous doc warnings in entry

Signed-off-by: clux <[email protected]>

* remove Default for Action

Signed-off-by: clux <[email protected]>

* fix secret-syncer

Signed-off-by: clux <[email protected]>

* fix remaining references to reconciler action

Signed-off-by: clux <[email protected]>
  • Loading branch information
clux authored Mar 17, 2022
1 parent d99a144 commit 931cd48
Show file tree
Hide file tree
Showing 6 changed files with 76 additions and 62 deletions.
6 changes: 3 additions & 3 deletions architecture.md
Original file line number Diff line number Diff line change
Expand Up @@ -174,9 +174,9 @@ Once we have combined the stream of streams we essentially have a flattened supe
1. new input events get sent to the `scheduler`
2. scheduled events are then passed them through a `Runner` preventing duplicate parallel requests for the same object
3. when running, we send the affected object to the users `reconciler` fn and await that future
4. a) on success, prepare the users `ReconcilerAction` (generally a slow requeue several minutes from now)
4. b) on failure, prepare a `ReconcilerAction` based on the users error policy (generally a backoff'd requeue with shorter initial delay)
5. Map resulting `ReconcilerAction`s through an ad-hoc `scheduler` channel
4. a) on success, prepare the users `Action` (generally a slow requeue several minutes from now)
4. b) on failure, prepare a `Action` based on the users error policy (generally a backoff'd requeue with shorter initial delay)
5. Map resulting `Action`s through an ad-hoc `scheduler` channel
6. Resulting requeue requests through the channel are picked up at the top of `applier` and merged with input events in step 1.

Ideally, the process runs forever, and it minimises unnecessary reconcile calls (like users changing more than one related object while one reconcile is already happening).
Expand Down
17 changes: 5 additions & 12 deletions examples/configmapgen_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use futures::StreamExt;
use k8s_openapi::api::core::v1::ConfigMap;
use kube::{
api::{Api, ListParams, ObjectMeta, Patch, PatchParams, Resource},
runtime::controller::{Context, Controller, ReconcilerAction},
runtime::controller::{Action, Context, Controller},
Client, CustomResource,
};
use schemars::JsonSchema;
Expand All @@ -29,10 +29,7 @@ struct ConfigMapGeneratorSpec {
}

/// Controller triggers this whenever our main object or our children changed
async fn reconcile(
generator: Arc<ConfigMapGenerator>,
ctx: Context<Data>,
) -> Result<ReconcilerAction, Error> {
async fn reconcile(generator: Arc<ConfigMapGenerator>, ctx: Context<Data>) -> Result<Action, Error> {
log::info!("working hard");
tokio::time::sleep(Duration::from_secs(2)).await;
log::info!("hard work is done!");
Expand Down Expand Up @@ -70,16 +67,12 @@ async fn reconcile(
)
.await
.map_err(Error::ConfigMapCreationFailed)?;
Ok(ReconcilerAction {
requeue_after: Some(Duration::from_secs(300)),
})
Ok(Action::requeue(Duration::from_secs(300)))
}

/// The controller triggers this on reconcile errors
fn error_policy(_error: &Error, _ctx: Context<Data>) -> ReconcilerAction {
ReconcilerAction {
requeue_after: Some(Duration::from_secs(1)),
}
fn error_policy(_error: &Error, _ctx: Context<Data>) -> Action {
Action::requeue(Duration::from_secs(1))
}

// Data we want access to in error/reconcile calls
Expand Down
14 changes: 6 additions & 8 deletions examples/secret_syncer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use kube::{
api::{Api, DeleteParams, ListParams, ObjectMeta, Patch, PatchParams, Resource},
error::ErrorResponse,
runtime::{
controller::{Context, Controller, ReconcilerAction},
controller::{Action, Context, Controller},
finalizer::{finalizer, Event},
},
};
Expand All @@ -37,7 +37,7 @@ fn secret_name_for_configmap(cm: &ConfigMap) -> Result<String> {
))
}

async fn apply(cm: Arc<ConfigMap>, secrets: &kube::Api<Secret>) -> Result<ReconcilerAction> {
async fn apply(cm: Arc<ConfigMap>, secrets: &kube::Api<Secret>) -> Result<Action> {
println!("Reconciling {:?}", cm);
let secret_name = secret_name_for_configmap(&cm)?;
secrets
Expand All @@ -56,10 +56,10 @@ async fn apply(cm: Arc<ConfigMap>, secrets: &kube::Api<Secret>) -> Result<Reconc
)
.await
.map_err(Error::UpdateSecret)?;
Ok(ReconcilerAction { requeue_after: None })
Ok(Action::await_change())
}

async fn cleanup(cm: Arc<ConfigMap>, secrets: &kube::Api<Secret>) -> Result<ReconcilerAction> {
async fn cleanup(cm: Arc<ConfigMap>, secrets: &kube::Api<Secret>) -> Result<Action> {
println!("Cleaning up {:?}", cm);
secrets
.delete(&secret_name_for_configmap(&cm)?, &DeleteParams::default())
Expand All @@ -71,7 +71,7 @@ async fn cleanup(cm: Arc<ConfigMap>, secrets: &kube::Api<Secret>) -> Result<Reco
err => Err(err),
})
.map_err(Error::DeleteSecret)?;
Ok(ReconcilerAction { requeue_after: None })
Ok(Action::await_change())
}

#[tokio::main]
Expand Down Expand Up @@ -103,9 +103,7 @@ async fn main() -> anyhow::Result<()> {
.await
}
},
|_err, _| ReconcilerAction {
requeue_after: Some(Duration::from_secs(2)),
},
|_err, _| Action::requeue(Duration::from_secs(2)),
Context::new(()),
)
.for_each(|msg| async move { println!("Reconciled: {:?}", msg) })
Expand Down
17 changes: 8 additions & 9 deletions kube-client/src/api/entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,10 @@
#[allow(unused_imports)] use std::collections::HashMap;
use std::fmt::Debug;

use crate::{Api, Error, Result};
use kube_core::{params::PostParams, Resource};
use serde::{de::DeserializeOwned, Serialize};

use crate::{Api, Error, Result};

impl<K: Resource + Clone + DeserializeOwned + Debug> Api<K> {
/// Gets a given object's "slot" on the Kubernetes API, designed for "get-or-create" and "get-and-modify" patterns
///
Expand Down Expand Up @@ -210,7 +209,7 @@ impl<'a, K> OccupiedEntry<'a, K> {

/// Validate that [`Self::object`] is valid, and refers to the same object as the original [`Api::entry`] call
///
/// Defaults [`ObjectMeta::name`] and [`ObjectMeta::namespace`] if unset.
/// Defaults `ObjectMeta::name` and `ObjectMeta::namespace` if unset.
fn prepare_for_commit(&mut self) -> Result<(), CommitValidationError>
where
K: Resource,
Expand Down Expand Up @@ -260,26 +259,26 @@ pub enum CommitError {
#[derive(Debug, thiserror::Error)]
/// Pre-commit validation errors
pub enum CommitValidationError {
/// [`ObjectMeta::name`] does not match the name passed to [`Api::entry`]
/// `ObjectMeta::name` does not match the name passed to [`Api::entry`]
#[error(".metadata.name does not match the name passed to Api::entry (got: {object_name:?}, expected: {expected:?})")]
NameMismatch {
/// The name of the object ([`ObjectMeta::name`])
/// The name of the object (`ObjectMeta::name`)
object_name: String,
/// The name passed to [`Api::entry`]
expected: String,
},
/// [`ObjectMeta::namespace`] does not match the namespace of the [`Api`]
/// `ObjectMeta::namespace` does not match the namespace of the [`Api`]
#[error(".metadata.namespace does not match the namespace of the Api (got: {object_namespace:?}, expected: {expected:?})")]
NamespaceMismatch {
/// The name of the object ([`ObjectMeta::namespace`])
/// The name of the object (`ObjectMeta::namespace`)
object_namespace: Option<String>,
/// The namespace of the [`Api`]
expected: Option<String>,
},
/// [`ObjectMeta::generate_name`] must not be set
/// `ObjectMeta::generate_name` must not be set
#[error(".metadata.generate_name must not be set (got: {object_generate_name:?})")]
GenerateName {
/// The set name generation template of the object ([`ObjectMeta::generate_name`])
/// The set name generation template of the object (`ObjectMeta::generate_name`)
object_generate_name: String,
},
}
Expand Down
74 changes: 49 additions & 25 deletions kube-runtime/src/controller/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,40 @@ pub enum Error<ReconcilerErr: std::error::Error + 'static, QueueErr: std::error:

/// Results of the reconciliation attempt
#[derive(Debug, Clone)]
pub struct ReconcilerAction {
pub struct Action {
/// Whether (and when) to next trigger the reconciliation if no external watch triggers hit
///
/// For example, use this to query external systems for updates, expire time-limited resources, or
/// (in your `error_policy`) retry after errors.
pub requeue_after: Option<Duration>,
requeue_after: Option<Duration>,
}

impl Action {
/// Action to to the reconciliation at this time even if no external watch triggers hit
///
/// This is the best-practice action that ensures eventual consistency of your controller
/// even in the case of missed changes (which can happen).
///
/// Watch events are not normally missed, so running this once per hour (`Default`) as a fallback is reasonable.
#[must_use]
pub fn requeue(duration: Duration) -> Self {
Self {
requeue_after: Some(duration),
}
}

/// Do nothing until a change is detected
///
/// This stops the controller periodically reconciling this object until a relevant watch event
/// was **detected**.
///
/// **Warning**: If you have watch desyncs, it is possible to miss changes entirely.
/// It is therefore not recommended to disable requeuing this way, unless you have
/// frequent changes to the underlying object, or some other hook to retain eventual consistency.
#[must_use]
pub fn await_change() -> Self {
Self { requeue_after: None }
}
}

/// Helper for building custom trigger filters, see the implementations of [`trigger_self`] and [`trigger_owners`] for some examples.
Expand Down Expand Up @@ -219,15 +247,15 @@ impl Display for ReconcileReason {
/// (such as triggering from arbitrary [`Stream`]s), at the cost of being a bit more verbose.
pub fn applier<K, QueueStream, ReconcilerFut, T>(
mut reconciler: impl FnMut(Arc<K>, Context<T>) -> ReconcilerFut,
mut error_policy: impl FnMut(&ReconcilerFut::Error, Context<T>) -> ReconcilerAction,
mut error_policy: impl FnMut(&ReconcilerFut::Error, Context<T>) -> Action,
context: Context<T>,
store: Store<K>,
queue: QueueStream,
) -> impl Stream<Item = Result<(ObjectRef<K>, ReconcilerAction), Error<ReconcilerFut::Error, QueueStream::Error>>>
) -> impl Stream<Item = Result<(ObjectRef<K>, Action), Error<ReconcilerFut::Error, QueueStream::Error>>>
where
K: Clone + Resource + 'static,
K::DynamicType: Debug + Eq + Hash + Clone + Unpin,
ReconcilerFut: TryFuture<Ok = ReconcilerAction> + Unpin,
ReconcilerFut: TryFuture<Ok = Action> + Unpin,
ReconcilerFut::Error: std::error::Error + 'static,
QueueStream: TryStream,
QueueStream::Ok: Into<ReconcileRequest<K>>,
Expand Down Expand Up @@ -283,7 +311,7 @@ where
.on_complete(async { tracing::debug!("applier runner-merge terminated") })
// finally, for each completed reconcile call:
.and_then(move |(obj_ref, reconciler_result, reconciler_span)| {
let (ReconcilerAction { requeue_after }, requeue_reason) = match &reconciler_result {
let (Action { requeue_after }, requeue_reason) = match &reconciler_result {
Ok(action) =>
// do what user told us
(action.clone(), ReconcileReason::ReconcilerRequestedRetry),
Expand Down Expand Up @@ -327,7 +355,7 @@ where
/// use kube::{
/// Client, CustomResource,
/// api::{Api, ListParams},
/// runtime::controller::{Context, Controller, ReconcilerAction}
/// runtime::controller::{Context, Controller, Action}
/// };
/// use serde::{Deserialize, Serialize};
/// use tokio::time::Duration;
Expand All @@ -348,18 +376,14 @@ where
/// }
///
/// /// The reconciler that will be called when either object change
/// async fn reconcile(g: Arc<ConfigMapGenerator>, _ctx: Context<()>) -> Result<ReconcilerAction, Error> {
/// async fn reconcile(g: Arc<ConfigMapGenerator>, _ctx: Context<()>) -> Result<Action, Error> {
/// // .. use api here to reconcile a child ConfigMap with ownerreferences
/// // see configmapgen_controller example for full info
/// Ok(ReconcilerAction {
/// requeue_after: Some(Duration::from_secs(300)),
/// })
/// Ok(Action::requeue(Duration::from_secs(300)))
/// }
/// /// an error handler that will be called when the reconciler fails
/// fn error_policy(_error: &Error, _ctx: Context<()>) -> ReconcilerAction {
/// ReconcilerAction {
/// requeue_after: Some(Duration::from_secs(60)),
/// }
/// fn error_policy(_error: &Error, _ctx: Context<()>) -> Action {
/// Action::requeue(Duration::from_secs(60))
/// }
///
/// /// something to drive the controller
Expand Down Expand Up @@ -594,7 +618,7 @@ where
/// use kube::{
/// Client,
/// api::{ListParams, Api, ResourceExt},
/// runtime::{controller::{Context, Controller, ReconcilerAction}},
/// runtime::{controller::{Context, Controller, Action}},
/// };
/// use std::{convert::Infallible, io::BufRead};
/// let (mut reload_tx, reload_rx) = futures::channel::mpsc::channel(0);
Expand All @@ -613,7 +637,7 @@ where
/// .run(
/// |o, _| async move {
/// println!("Reconciling {}", o.name());
/// Ok(ReconcilerAction { requeue_after: None })
/// Ok(Action::await_change())
/// },
/// |err: &Infallible, _| Err(err).unwrap(),
/// Context::new(()),
Expand Down Expand Up @@ -658,7 +682,7 @@ where
/// use futures::future::FutureExt;
/// use k8s_openapi::api::core::v1::ConfigMap;
/// use kube::{api::ListParams, Api, Client, ResourceExt};
/// use kube_runtime::controller::{Context, Controller, ReconcilerAction};
/// use kube_runtime::controller::{Context, Controller, Action};
/// use std::convert::Infallible;
/// Controller::new(
/// Api::<ConfigMap>::all(Client::try_default().await.unwrap()),
Expand All @@ -668,7 +692,7 @@ where
/// .run(
/// |o, _| async move {
/// println!("Reconciling {}", o.name());
/// Ok(ReconcilerAction { requeue_after: None })
/// Ok(Action::await_change())
/// },
/// |err: &Infallible, _| Err(err).unwrap(),
/// Context::new(()),
Expand Down Expand Up @@ -748,12 +772,12 @@ where
pub fn run<ReconcilerFut, T>(
self,
mut reconciler: impl FnMut(Arc<K>, Context<T>) -> ReconcilerFut,
error_policy: impl FnMut(&ReconcilerFut::Error, Context<T>) -> ReconcilerAction,
error_policy: impl FnMut(&ReconcilerFut::Error, Context<T>) -> Action,
context: Context<T>,
) -> impl Stream<Item = Result<(ObjectRef<K>, ReconcilerAction), Error<ReconcilerFut::Error, watcher::Error>>>
) -> impl Stream<Item = Result<(ObjectRef<K>, Action), Error<ReconcilerFut::Error, watcher::Error>>>
where
K::DynamicType: Debug + Unpin,
ReconcilerFut: TryFuture<Ok = ReconcilerAction> + Send + 'static,
ReconcilerFut: TryFuture<Ok = Action> + Send + 'static,
ReconcilerFut::Error: std::error::Error + Send + 'static,
{
applier(
Expand All @@ -775,7 +799,7 @@ where

#[cfg(test)]
mod tests {
use super::{Context, ReconcilerAction};
use super::{Action, Context};
use crate::Controller;
use k8s_openapi::api::core::v1::ConfigMap;
use kube_client::Api;
Expand All @@ -795,8 +819,8 @@ mod tests {
fn test_controller_should_be_send() {
assert_send(
Controller::new(mock_type::<Api<ConfigMap>>(), Default::default()).run(
|_, _| async { Ok(mock_type::<ReconcilerAction>()) },
|_: &std::io::Error, _| mock_type::<ReconcilerAction>(),
|_, _| async { Ok(mock_type::<Action>()) },
|_: &std::io::Error, _| mock_type::<Action>(),
Context::new(()),
),
);
Expand Down
10 changes: 5 additions & 5 deletions kube-runtime/src/finalizer.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//! Finalizer helper for [`Controller`](crate::Controller) reconcilers
use crate::controller::ReconcilerAction;
use crate::controller::Action;
use futures::{TryFuture, TryFutureExt};
use json_patch::{AddOperation, PatchOperation, RemoveOperation, TestOperation};
use kube_client::{
Expand Down Expand Up @@ -103,10 +103,10 @@ pub async fn finalizer<K, ReconcileFut>(
finalizer_name: &str,
obj: Arc<K>,
reconcile: impl FnOnce(Event<K>) -> ReconcileFut,
) -> Result<ReconcilerAction, Error<ReconcileFut::Error>>
) -> Result<Action, Error<ReconcileFut::Error>>
where
K: Resource + Clone + DeserializeOwned + Serialize + Debug,
ReconcileFut: TryFuture<Ok = ReconcilerAction>,
ReconcileFut: TryFuture<Ok = Action>,
ReconcileFut::Error: StdError + 'static,
{
match FinalizerState::for_object(&*obj, finalizer_name) {
Expand Down Expand Up @@ -178,14 +178,14 @@ where
.await
.map_err(Error::AddFinalizer)?;
// No point applying here, since the patch will cause a new reconciliation
Ok(ReconcilerAction { requeue_after: None })
Ok(Action::await_change())
}
FinalizerState {
finalizer_index: None,
is_deleting: true,
} => {
// Our work here is done
Ok(ReconcilerAction { requeue_after: None })
Ok(Action::await_change())
}
}
}
Expand Down

0 comments on commit 931cd48

Please sign in to comment.