Skip to content

Commit

Permalink
Add backoff handling for watcher and Controller - for #577 (#703)
Browse files Browse the repository at this point in the history
* implement backoff for watcher - for #577

Signed-off-by: clux <[email protected]>

* move magic number into strategy

Signed-off-by: clux <[email protected]>

* expose backoff from watcher and semi-propagate into controller

awkward. will write a comment

Signed-off-by: clux <[email protected]>

* potential abstraction

Signed-off-by: clux <[email protected]>

* another builder layer; allow eliding ListParams

Signed-off-by: clux <[email protected]>

* forgot to add file

Signed-off-by: clux <[email protected]>

* easy parts of code review

Signed-off-by: clux <[email protected]>

* rewrite as a helper (take N)

jesus this stuff is hard.

Signed-off-by: clux <[email protected]>

* rename as suggested

Signed-off-by: clux <[email protected]>

* Reimplement watcher backoff as FSM (#720)

* Fix clippy warnings

Signed-off-by: Teo Klestrup Röijezon <[email protected]>

* Reimplement watch backoff as FSM

Signed-off-by: Teo Klestrup Röijezon <[email protected]>

* Remove useless lifetime bounds

Signed-off-by: Teo Klestrup Röijezon <[email protected]>

* Silence clippy size warning

Signed-off-by: Teo Klestrup Röijezon <[email protected]>

* Silence clippy properly this time around

Signed-off-by: Teo Klestrup Röijezon <[email protected]>

* Split StreamBackoff into a separate utils module

Signed-off-by: Teo Klestrup Röijezon <[email protected]>

* Backoff tests

Signed-off-by: Teo Klestrup Röijezon <[email protected]>

* Add stream close test

Signed-off-by: Teo Klestrup Röijezon <[email protected]>

* remove backoff pin, fix docs

Signed-off-by: clux <[email protected]>

* newline

Signed-off-by: clux <[email protected]>

* Add `Backoff` wrapper that implements client-go's reset timer behaviour (#729)

Signed-off-by: Teo Klestrup Röijezon <[email protected]>

* use new reset backoff and replicate client-go reflector values

Signed-off-by: clux <[email protected]>

* fix node watcher example

Signed-off-by: clux <[email protected]>

* Use released `backoff`

Signed-off-by: Teo Klestrup Röijezon <[email protected]>

* Factor out default `Backoff`

Signed-off-by: Teo Klestrup Röijezon <[email protected]>

* Add note to `watcher` about backoff

Signed-off-by: Teo Klestrup Röijezon <[email protected]>

* Added backoff to Controller

Signed-off-by: Teo Klestrup Röijezon <[email protected]>

* Changelog

Signed-off-by: Teo Klestrup Röijezon <[email protected]>

* Revert `Observer` for now

Signed-off-by: Teo Klestrup Röijezon <[email protected]>

* The clippyman comes for us all, eventually

And we must all pay our due respects, or pay the price.

Signed-off-by: Teo Klestrup Röijezon <[email protected]>

* Fix build warnings

Signed-off-by: Teo Klestrup Röijezon <[email protected]>

* remove backoff_watch

Signed-off-by: clux <[email protected]>

* doc tweaks

Signed-off-by: clux <[email protected]>

* sentence

Signed-off-by: clux <[email protected]>

* upgrading backoff is not actually breaking

Signed-off-by: clux <[email protected]>

Co-authored-by: Teo Klestrup Röijezon <[email protected]>
Co-authored-by: Teo Klestrup Röijezon <[email protected]>
  • Loading branch information
3 people authored Dec 21, 2021
1 parent 155859b commit 276719c
Show file tree
Hide file tree
Showing 22 changed files with 372 additions and 35 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ UNRELEASED
===================
* see https://github.com/kube-rs/kube-rs/compare/0.65.0...master
* Added `DeleteParams::background()`, `DeleteParams::foreground()`, `DeleteParams::orphan()` constructors - [#747](https://github.com/kube-rs/kube-rs/issues/747)
* Introduced `StreamBackoff` mechanism for backing off watchers - #703
* BREAKING: `Controller` now uses `backoff` for trigger watches by default, use `Controller::trigger_backoff` to override

0.65.0 / 2021-12-10
===================
Expand Down
1 change: 1 addition & 0 deletions examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ tower = { version = "0.4.6", features = ["limit"] }
tower-http = { version = "0.2.0", features = ["trace", "decompression-gzip"] }
hyper = { version = "0.14.13", features = ["client", "http1", "stream", "tcp"] }
thiserror = "1.0.29"
backoff = "0.4.0"

[[example]]
name = "configmapgen_controller"
Expand Down
1 change: 0 additions & 1 deletion examples/dynamic_jsonpath.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use jsonpath_lib;
use k8s_openapi::api::core::v1::Pod;
use kube::{
api::{Api, ListParams},
Expand Down
1 change: 1 addition & 0 deletions examples/dynamic_pod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ async fn main() -> anyhow::Result<()> {
}
#[derive(Clone, Deserialize, Debug)]
struct ContainerSimple {
#[allow(dead_code)]
image: String,
}
type PodSimple = Object<PodSpecSimple, NotUsed>;
Expand Down
3 changes: 1 addition & 2 deletions examples/dynamic_watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,7 @@ async fn main() -> anyhow::Result<()> {
let api = Api::<DynamicObject>::all_with(client, &ar);

// Fully compatible with kube-runtime
let watcher = watcher(api, ListParams::default());
try_flatten_applied(watcher)
try_flatten_applied(watcher(api, ListParams::default()))
.try_for_each(|p| async move {
log::info!("Applied: {}", p.name());
Ok(())
Expand Down
5 changes: 3 additions & 2 deletions examples/event_watcher.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#[macro_use] extern crate log;
use futures::{StreamExt, TryStreamExt};
use futures::{pin_mut, TryStreamExt};
use k8s_openapi::api::core::v1::Event;
use kube::{
api::{Api, ListParams},
Expand All @@ -16,8 +16,9 @@ async fn main() -> anyhow::Result<()> {
let events: Api<Event> = Api::all(client);
let lp = ListParams::default();

let mut ew = try_flatten_applied(watcher(events, lp)).boxed();
let ew = try_flatten_applied(watcher(events, lp));

pin_mut!(ew);
while let Some(event) = ew.try_next().await? {
handle_event(event)?;
}
Expand Down
2 changes: 1 addition & 1 deletion examples/job_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use k8s_openapi::api::batch::v1::Job;
use serde_json::json;

use kube::{
api::{Api, DeleteParams, ListParams, PostParams, PropagationPolicy, ResourceExt, WatchEvent},
api::{Api, DeleteParams, ListParams, PostParams, ResourceExt, WatchEvent},
Client,
};

Expand Down
17 changes: 12 additions & 5 deletions examples/node_watcher.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
#[macro_use] extern crate log;
use futures::{StreamExt, TryStreamExt};
use backoff::ExponentialBackoff;
use futures::{pin_mut, TryStreamExt};
use k8s_openapi::api::core::v1::{Event, Node};
use kube::{
api::{Api, ListParams, ResourceExt},
runtime::{utils::try_flatten_applied, watcher},
runtime::{
utils::{try_flatten_applied, StreamBackoff},
watcher,
},
Client,
};

Expand All @@ -15,10 +19,13 @@ async fn main() -> anyhow::Result<()> {
let events: Api<Event> = Api::all(client.clone());
let nodes: Api<Node> = Api::all(client.clone());

let lp = ListParams::default().labels("beta.kubernetes.io/os=linux");
let obs = try_flatten_applied(StreamBackoff::new(
watcher(nodes, ListParams::default().labels("beta.kubernetes.io/os=linux")),
ExponentialBackoff::default(),
));

let mut apply_stream = try_flatten_applied(watcher(nodes, lp)).boxed();
while let Some(n) = apply_stream.try_next().await? {
pin_mut!(obs);
while let Some(n) = obs.try_next().await? {
check_for_node_failures(&events, n).await?;
}
Ok(())
Expand Down
4 changes: 2 additions & 2 deletions examples/pod_watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ async fn main() -> anyhow::Result<()> {
let client = Client::try_default().await?;
let namespace = std::env::var("NAMESPACE").unwrap_or_else(|_| "default".into());
let api = Api::<Pod>::namespaced(client, &namespace);
let watcher = watcher(api, ListParams::default());
try_flatten_applied(watcher)

try_flatten_applied(watcher(api, ListParams::default()))
.try_for_each(|p| async move {
log::debug!("Applied: {}", p.name());
if let Some(unready_reason) = pod_unready(&p) {
Expand Down
1 change: 1 addition & 0 deletions kube-client/src/discovery/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ pub struct Discovery {
/// Builds an internal map of its cache
impl Discovery {
/// Construct a caching api discovery client
#[must_use]
pub fn new(client: Client) -> Self {
let groups = HashMap::new();
let mode = DiscoveryMode::Block(vec![]);
Expand Down
10 changes: 5 additions & 5 deletions kube-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,8 +227,8 @@ mod test {
.timeout(15);
let mut stream = pods.watch(&lp, "0").await?.boxed();
while let Some(ev) = stream.try_next().await? {
let watch_debug = format!("we: {:?}", ev);
assert!(true, "can debug format watch event {}", watch_debug);
// can debug format watch event
let _ = format!("we: {:?}", ev);
match ev {
WatchEvent::Modified(o) => {
let s = o.status.as_ref().expect("status exists on pod");
Expand All @@ -237,7 +237,7 @@ mod test {
break;
}
}
WatchEvent::Error(e) => assert!(false, "watch error: {}", e),
WatchEvent::Error(e) => panic!("watch error: {}", e),
_ => {}
}
}
Expand Down Expand Up @@ -315,7 +315,7 @@ mod test {
break;
}
}
WatchEvent::Error(e) => assert!(false, "watch error: {}", e),
WatchEvent::Error(e) => panic!("watch error: {}", e),
_ => {}
}
}
Expand Down Expand Up @@ -428,7 +428,7 @@ mod test {
break;
}
}
WatchEvent::Error(e) => assert!(false, "watch error: {}", e),
WatchEvent::Error(e) => panic!("watch error: {}", e),
_ => {}
}
}
Expand Down
1 change: 1 addition & 0 deletions kube-core/src/dynamic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ pub struct DynamicObject {

impl DynamicObject {
/// Create a DynamicObject with minimal values set from ApiResource.
#[must_use]
pub fn new(name: &str, resource: &ApiResource) -> Self {
Self {
types: Some(TypeMeta {
Expand Down
2 changes: 2 additions & 0 deletions kube-core/src/object.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,10 +296,12 @@ mod test {
// Replacing heavy type k8s_openapi::api::core::v1::PodSpec with:
#[derive(Clone)]
struct PodSpecSimple {
#[allow(dead_code)]
containers: Vec<ContainerSimple>,
}
#[derive(Clone, Debug, PartialEq)]
struct ContainerSimple {
#[allow(dead_code)]
image: String,
}
type PodSimple = Object<PodSpecSimple, NotUsed>;
Expand Down
1 change: 1 addition & 0 deletions kube-core/src/params.rs
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,7 @@ impl PatchParams {
}

/// Construct `PatchParams` for server-side apply
#[must_use]
pub fn apply(manager: &str) -> Self {
Self {
field_manager: Some(manager.into()),
Expand Down
3 changes: 2 additions & 1 deletion kube-core/src/subresource.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ impl Default for AttachParams {
#[cfg_attr(docsrs, doc(cfg(feature = "ws")))]
impl AttachParams {
/// Default parameters for an tty exec with stdin and stdout
#[must_use]
pub fn interactive_tty() -> Self {
Self {
stdin: true,
Expand Down Expand Up @@ -348,7 +349,7 @@ impl Request {
#[cfg(test)]
mod test {
use crate::{request::Request, resource::Resource};
use k8s::{apps::v1 as appsv1, core::v1 as corev1};
use k8s::core::v1 as corev1;
use k8s_openapi::api as k8s;

use crate::subresource::LogParams;
Expand Down
1 change: 1 addition & 0 deletions kube-runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ tracing = "0.1.29"
json-patch = "0.2.6"
serde_json = "1.0.68"
thiserror = "1.0.29"
backoff = "0.4.0"

[dependencies.k8s-openapi]
version = "0.13.1"
Expand Down
32 changes: 22 additions & 10 deletions kube-runtime/src/controller/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,11 @@ use crate::{
scheduler::{self, scheduler, ScheduleRequest},
utils::{
try_flatten_applied, try_flatten_touched, trystream_try_via, CancelableJoinHandle,
KubeRuntimeStreamExt,
KubeRuntimeStreamExt, StreamBackoff,
},
watcher::{self, watcher},
};
use backoff::backoff::Backoff;
use derivative::Derivative;
use futures::{
channel,
Expand Down Expand Up @@ -391,6 +392,7 @@ where
{
// NB: Need to Unpin for stream::select_all
trigger_selector: stream::SelectAll<BoxStream<'static, Result<ReconcileRequest<K>, watcher::Error>>>,
trigger_backoff: Box<dyn Backoff + Send>,
/// [`run`](crate::Controller::run) starts a graceful shutdown when any of these [`Future`]s complete,
/// refusing to start any new reconciliations but letting any existing ones finish.
graceful_shutdown_selector: Vec<BoxFuture<'static, ()>>,
Expand All @@ -406,7 +408,7 @@ where
impl<K> Controller<K>
where
K: Clone + Resource + DeserializeOwned + Debug + Send + Sync + 'static,
K::DynamicType: Eq + Hash + Clone + Default,
K::DynamicType: Eq + Hash + Clone,
{
/// Create a Controller on a type `K`
///
Expand All @@ -416,16 +418,13 @@ where
/// and receive reconcile events for.
/// For the full set of objects `K` in the given `Api` scope, you can use [`ListParams::default`].
#[must_use]
pub fn new(owned_api: Api<K>, lp: ListParams) -> Self {
pub fn new(owned_api: Api<K>, lp: ListParams) -> Self
where
K::DynamicType: Default,
{
Self::new_with(owned_api, lp, Default::default())
}
}

impl<K> Controller<K>
where
K: Clone + Resource + DeserializeOwned + Debug + Send + Sync + 'static,
K::DynamicType: Eq + Hash + Clone,
{
/// Create a Controller on a type `K`
///
/// Takes an [`Api`] object that determines how the `Controller` listens for changes to the `K`.
Expand All @@ -452,6 +451,7 @@ where
trigger_selector.push(self_watcher);
Self {
trigger_selector,
trigger_backoff: Box::new(watcher::default_backoff()),
graceful_shutdown_selector: vec![
// Fallback future, ensuring that we never terminate if no additional futures are added to the selector
future::pending().boxed(),
Expand All @@ -465,6 +465,18 @@ where
}
}

/// Specify the backoff policy for "trigger" watches
///
/// This includes the core watch, as well as auxilary watches introduced by [`Self::owns`] and [`Self::watches`].
///
/// The [`default_backoff`](crate::watcher::default_backoff) follows client-go conventions,
/// but can be overridden by calling this method.
#[must_use]
pub fn trigger_backoff(mut self, backoff: impl Backoff + Send + 'static) -> Self {
self.trigger_backoff = Box::new(backoff);
self
}

/// Retrieve a copy of the reader before starting the controller
pub fn store(&self) -> Store<K> {
self.reader.clone()
Expand Down Expand Up @@ -757,7 +769,7 @@ where
error_policy,
context,
self.reader,
self.trigger_selector
StreamBackoff::new(self.trigger_selector, self.trigger_backoff)
.take_until(future::select_all(self.graceful_shutdown_selector)),
)
.take_until(futures::future::select_all(self.forceful_shutdown_selector))
Expand Down
86 changes: 86 additions & 0 deletions kube-runtime/src/utils/backoff_reset_timer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
use std::time::{Duration, Instant};

use backoff::{backoff::Backoff, Clock, SystemClock};

/// A [`Backoff`] wrapper that resets after a fixed duration has elapsed.
pub struct ResetTimerBackoff<B, C = SystemClock> {
backoff: B,
clock: C,
last_backoff: Option<Instant>,
reset_duration: Duration,
}

impl<B: Backoff> ResetTimerBackoff<B> {
pub fn new(backoff: B, reset_duration: Duration) -> Self {
Self::new_with_custom_clock(backoff, reset_duration, SystemClock {})
}
}

impl<B: Backoff, C: Clock> ResetTimerBackoff<B, C> {
fn new_with_custom_clock(backoff: B, reset_duration: Duration, clock: C) -> Self {
Self {
backoff,
clock,
last_backoff: None,
reset_duration,
}
}
}

impl<B: Backoff, C: Clock> Backoff for ResetTimerBackoff<B, C> {
fn next_backoff(&mut self) -> Option<Duration> {
if let Some(last_backoff) = self.last_backoff {
if self.clock.now() > last_backoff + self.reset_duration {
tracing::debug!(
?last_backoff,
reset_duration = ?self.reset_duration,
"Resetting backoff, since reset duration has expired"
);
self.backoff.reset();
}
}
self.last_backoff = Some(self.clock.now());
self.backoff.next_backoff()
}

fn reset(&mut self) {
// Do not even bother trying to reset here, since `next_backoff` will take care of this when the timer expires.
}
}

#[cfg(test)]
mod tests {
use backoff::{backoff::Backoff, Clock};
use tokio::time::advance;

use super::ResetTimerBackoff;
use crate::utils::stream_backoff::tests::LinearBackoff;
use std::time::{Duration, Instant};

#[tokio::test]
async fn should_reset_when_timer_expires() {
tokio::time::pause();
let mut backoff = ResetTimerBackoff::new_with_custom_clock(
LinearBackoff::new(Duration::from_secs(2)),
Duration::from_secs(60),
TokioClock,
);
assert_eq!(backoff.next_backoff(), Some(Duration::from_secs(2)));
advance(Duration::from_secs(40)).await;
assert_eq!(backoff.next_backoff(), Some(Duration::from_secs(4)));
advance(Duration::from_secs(40)).await;
assert_eq!(backoff.next_backoff(), Some(Duration::from_secs(6)));
advance(Duration::from_secs(80)).await;
assert_eq!(backoff.next_backoff(), Some(Duration::from_secs(2)));
advance(Duration::from_secs(80)).await;
assert_eq!(backoff.next_backoff(), Some(Duration::from_secs(2)));
}

struct TokioClock;

impl Clock for TokioClock {
fn now(&self) -> Instant {
tokio::time::Instant::now().into_std()
}
}
}
7 changes: 7 additions & 0 deletions kube-runtime/src/utils.rs → kube-runtime/src/utils/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,11 @@
//! Helpers for manipulating built-in streams
mod backoff_reset_timer;
mod stream_backoff;

pub use backoff_reset_timer::ResetTimerBackoff;
pub use stream_backoff::StreamBackoff;

use crate::watcher;
use futures::{
pin_mut,
Expand Down
Loading

0 comments on commit 276719c

Please sign in to comment.