Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add backoff handling for watcher and Controller #703

Merged
merged 32 commits into from
Dec 21, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
c26e894
implement backoff for watcher - for #577
clux Nov 11, 2021
19e6b5b
move magic number into strategy
clux Nov 11, 2021
09ee2f3
expose backoff from watcher and semi-propagate into controller
clux Nov 11, 2021
dda1a7b
potential abstraction
clux Nov 12, 2021
f6b034e
another builder layer; allow eliding ListParams
clux Nov 12, 2021
f5063a7
forgot to add file
clux Nov 12, 2021
ff2cfe4
easy parts of code review
clux Nov 16, 2021
0fc7159
rewrite as a helper (take N)
clux Nov 19, 2021
344cb92
rename as suggested
clux Nov 19, 2021
b8407ab
Reimplement watcher backoff as FSM (#720)
nightkr Nov 19, 2021
62679a9
remove backoff pin, fix docs
clux Nov 19, 2021
3144791
newline
clux Nov 19, 2021
00e94f3
Merge branch 'master' into backoff-watcher
clux Nov 19, 2021
dbccd9b
Merge branch 'master' into backoff-watcher
clux Nov 19, 2021
be2dc1c
Add `Backoff` wrapper that implements client-go's reset timer behavio…
nightkr Nov 22, 2021
bc71e9e
Merge remote-tracking branch 'origin/master' into backoff-watcher
clux Nov 22, 2021
5c780f5
use new reset backoff and replicate client-go reflector values
clux Nov 22, 2021
0cde56e
fix node watcher example
clux Nov 22, 2021
927d630
Use released `backoff`
nightkr Dec 20, 2021
676f085
Factor out default `Backoff`
nightkr Dec 20, 2021
c142ca1
Add note to `watcher` about backoff
nightkr Dec 21, 2021
6aa0d79
Added backoff to Controller
nightkr Dec 21, 2021
2805d14
Changelog
nightkr Dec 21, 2021
0402c0b
Revert `Observer` for now
nightkr Dec 21, 2021
426a30a
The clippyman comes for us all, eventually
nightkr Dec 21, 2021
33083f3
Merge remote-tracking branch 'origin/master' into backoff-watcher-/cl…
nightkr Dec 21, 2021
4fb49a8
Fix build warnings
nightkr Dec 21, 2021
d02b1ec
Merge pull request #763 from teozkr/backoff-watcher-/cleanup
nightkr Dec 21, 2021
ee8face
remove backoff_watch
clux Dec 21, 2021
c8179f4
doc tweaks
clux Dec 21, 2021
c973255
sentence
clux Dec 21, 2021
fd24a0f
upgrading backoff is not actually breaking
clux Dec 21, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ UNRELEASED
===================
* see https://github.com/kube-rs/kube-rs/compare/0.65.0...master
* Added `DeleteParams::background()`, `DeleteParams::foreground()`, `DeleteParams::orphan()` constructors - [#747](https://github.com/kube-rs/kube-rs/issues/747)
* Introduced `StreamBackoff` mechanism for backing off watchers - #703
* BREAKING: `Controller` now uses `backoff` for trigger watches by default, use `Controller::trigger_backoff` to override

0.65.0 / 2021-12-10
===================
Expand Down
1 change: 1 addition & 0 deletions examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ tower = { version = "0.4.6", features = ["limit"] }
tower-http = { version = "0.2.0", features = ["trace", "decompression-gzip"] }
hyper = { version = "0.14.13", features = ["client", "http1", "stream", "tcp"] }
thiserror = "1.0.29"
backoff = "0.4.0"

[[example]]
name = "configmapgen_controller"
Expand Down
1 change: 0 additions & 1 deletion examples/dynamic_jsonpath.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use jsonpath_lib;
use k8s_openapi::api::core::v1::Pod;
use kube::{
api::{Api, ListParams},
Expand Down
1 change: 1 addition & 0 deletions examples/dynamic_pod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ async fn main() -> anyhow::Result<()> {
}
#[derive(Clone, Deserialize, Debug)]
struct ContainerSimple {
#[allow(dead_code)]
image: String,
}
type PodSimple = Object<PodSpecSimple, NotUsed>;
Expand Down
3 changes: 1 addition & 2 deletions examples/dynamic_watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,7 @@ async fn main() -> anyhow::Result<()> {
let api = Api::<DynamicObject>::all_with(client, &ar);

// Fully compatible with kube-runtime
let watcher = watcher(api, ListParams::default());
try_flatten_applied(watcher)
try_flatten_applied(watcher(api, ListParams::default()))
.try_for_each(|p| async move {
log::info!("Applied: {}", p.name());
Ok(())
Expand Down
5 changes: 3 additions & 2 deletions examples/event_watcher.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#[macro_use] extern crate log;
use futures::{StreamExt, TryStreamExt};
use futures::{pin_mut, TryStreamExt};
use k8s_openapi::api::core::v1::Event;
use kube::{
api::{Api, ListParams},
Expand All @@ -16,8 +16,9 @@ async fn main() -> anyhow::Result<()> {
let events: Api<Event> = Api::all(client);
let lp = ListParams::default();

let mut ew = try_flatten_applied(watcher(events, lp)).boxed();
let ew = try_flatten_applied(watcher(events, lp));

pin_mut!(ew);
while let Some(event) = ew.try_next().await? {
handle_event(event)?;
}
Expand Down
2 changes: 1 addition & 1 deletion examples/job_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use k8s_openapi::api::batch::v1::Job;
use serde_json::json;

use kube::{
api::{Api, DeleteParams, ListParams, PostParams, PropagationPolicy, ResourceExt, WatchEvent},
api::{Api, DeleteParams, ListParams, PostParams, ResourceExt, WatchEvent},
Client,
};

Expand Down
17 changes: 12 additions & 5 deletions examples/node_watcher.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
#[macro_use] extern crate log;
use futures::{StreamExt, TryStreamExt};
use backoff::ExponentialBackoff;
use futures::{pin_mut, TryStreamExt};
use k8s_openapi::api::core::v1::{Event, Node};
use kube::{
api::{Api, ListParams, ResourceExt},
runtime::{utils::try_flatten_applied, watcher},
runtime::{
utils::{try_flatten_applied, StreamBackoff},
watcher,
},
Client,
};

Expand All @@ -15,10 +19,13 @@ async fn main() -> anyhow::Result<()> {
let events: Api<Event> = Api::all(client.clone());
let nodes: Api<Node> = Api::all(client.clone());

let lp = ListParams::default().labels("beta.kubernetes.io/os=linux");
let obs = try_flatten_applied(StreamBackoff::new(
watcher(nodes, ListParams::default().labels("beta.kubernetes.io/os=linux")),
ExponentialBackoff::default(),
));

let mut apply_stream = try_flatten_applied(watcher(nodes, lp)).boxed();
while let Some(n) = apply_stream.try_next().await? {
pin_mut!(obs);
while let Some(n) = obs.try_next().await? {
check_for_node_failures(&events, n).await?;
}
Ok(())
Expand Down
4 changes: 2 additions & 2 deletions examples/pod_watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ async fn main() -> anyhow::Result<()> {
let client = Client::try_default().await?;
let namespace = std::env::var("NAMESPACE").unwrap_or_else(|_| "default".into());
let api = Api::<Pod>::namespaced(client, &namespace);
let watcher = watcher(api, ListParams::default());
try_flatten_applied(watcher)

try_flatten_applied(watcher(api, ListParams::default()))
.try_for_each(|p| async move {
log::debug!("Applied: {}", p.name());
if let Some(unready_reason) = pod_unready(&p) {
Expand Down
1 change: 1 addition & 0 deletions kube-client/src/discovery/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ pub struct Discovery {
/// Builds an internal map of its cache
impl Discovery {
/// Construct a caching api discovery client
#[must_use]
pub fn new(client: Client) -> Self {
let groups = HashMap::new();
let mode = DiscoveryMode::Block(vec![]);
Expand Down
10 changes: 5 additions & 5 deletions kube-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,8 +227,8 @@ mod test {
.timeout(15);
let mut stream = pods.watch(&lp, "0").await?.boxed();
while let Some(ev) = stream.try_next().await? {
let watch_debug = format!("we: {:?}", ev);
assert!(true, "can debug format watch event {}", watch_debug);
// can debug format watch event
let _ = format!("we: {:?}", ev);
match ev {
WatchEvent::Modified(o) => {
let s = o.status.as_ref().expect("status exists on pod");
Expand All @@ -237,7 +237,7 @@ mod test {
break;
}
}
WatchEvent::Error(e) => assert!(false, "watch error: {}", e),
WatchEvent::Error(e) => panic!("watch error: {}", e),
_ => {}
}
}
Expand Down Expand Up @@ -315,7 +315,7 @@ mod test {
break;
}
}
WatchEvent::Error(e) => assert!(false, "watch error: {}", e),
WatchEvent::Error(e) => panic!("watch error: {}", e),
_ => {}
}
}
Expand Down Expand Up @@ -428,7 +428,7 @@ mod test {
break;
}
}
WatchEvent::Error(e) => assert!(false, "watch error: {}", e),
WatchEvent::Error(e) => panic!("watch error: {}", e),
_ => {}
}
}
Expand Down
1 change: 1 addition & 0 deletions kube-core/src/dynamic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ pub struct DynamicObject {

impl DynamicObject {
/// Create a DynamicObject with minimal values set from ApiResource.
#[must_use]
pub fn new(name: &str, resource: &ApiResource) -> Self {
Self {
types: Some(TypeMeta {
Expand Down
2 changes: 2 additions & 0 deletions kube-core/src/object.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,10 +296,12 @@ mod test {
// Replacing heavy type k8s_openapi::api::core::v1::PodSpec with:
#[derive(Clone)]
struct PodSpecSimple {
#[allow(dead_code)]
containers: Vec<ContainerSimple>,
}
#[derive(Clone, Debug, PartialEq)]
struct ContainerSimple {
#[allow(dead_code)]
image: String,
}
type PodSimple = Object<PodSpecSimple, NotUsed>;
Expand Down
1 change: 1 addition & 0 deletions kube-core/src/params.rs
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,7 @@ impl PatchParams {
}

/// Construct `PatchParams` for server-side apply
#[must_use]
pub fn apply(manager: &str) -> Self {
Self {
field_manager: Some(manager.into()),
Expand Down
3 changes: 2 additions & 1 deletion kube-core/src/subresource.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ impl Default for AttachParams {
#[cfg_attr(docsrs, doc(cfg(feature = "ws")))]
impl AttachParams {
/// Default parameters for an tty exec with stdin and stdout
#[must_use]
pub fn interactive_tty() -> Self {
Self {
stdin: true,
Expand Down Expand Up @@ -348,7 +349,7 @@ impl Request {
#[cfg(test)]
mod test {
use crate::{request::Request, resource::Resource};
use k8s::{apps::v1 as appsv1, core::v1 as corev1};
use k8s::core::v1 as corev1;
use k8s_openapi::api as k8s;

use crate::subresource::LogParams;
Expand Down
1 change: 1 addition & 0 deletions kube-runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ tracing = "0.1.29"
json-patch = "0.2.6"
serde_json = "1.0.68"
thiserror = "1.0.29"
backoff = "0.4.0"

[dependencies.k8s-openapi]
version = "0.13.1"
Expand Down
32 changes: 22 additions & 10 deletions kube-runtime/src/controller/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,11 @@ use crate::{
scheduler::{self, scheduler, ScheduleRequest},
utils::{
try_flatten_applied, try_flatten_touched, trystream_try_via, CancelableJoinHandle,
KubeRuntimeStreamExt,
KubeRuntimeStreamExt, StreamBackoff,
},
watcher::{self, watcher},
};
use backoff::backoff::Backoff;
use derivative::Derivative;
use futures::{
channel,
Expand Down Expand Up @@ -391,6 +392,7 @@ where
{
// NB: Need to Unpin for stream::select_all
trigger_selector: stream::SelectAll<BoxStream<'static, Result<ReconcileRequest<K>, watcher::Error>>>,
trigger_backoff: Box<dyn Backoff + Send>,
/// [`run`](crate::Controller::run) starts a graceful shutdown when any of these [`Future`]s complete,
/// refusing to start any new reconciliations but letting any existing ones finish.
graceful_shutdown_selector: Vec<BoxFuture<'static, ()>>,
Expand All @@ -406,7 +408,7 @@ where
impl<K> Controller<K>
where
K: Clone + Resource + DeserializeOwned + Debug + Send + Sync + 'static,
K::DynamicType: Eq + Hash + Clone + Default,
K::DynamicType: Eq + Hash + Clone,
{
/// Create a Controller on a type `K`
///
Expand All @@ -416,16 +418,13 @@ where
/// and receive reconcile events for.
/// For the full set of objects `K` in the given `Api` scope, you can use [`ListParams::default`].
#[must_use]
pub fn new(owned_api: Api<K>, lp: ListParams) -> Self {
pub fn new(owned_api: Api<K>, lp: ListParams) -> Self
where
K::DynamicType: Default,
{
Self::new_with(owned_api, lp, Default::default())
}
}

impl<K> Controller<K>
where
K: Clone + Resource + DeserializeOwned + Debug + Send + Sync + 'static,
K::DynamicType: Eq + Hash + Clone,
{
/// Create a Controller on a type `K`
///
/// Takes an [`Api`] object that determines how the `Controller` listens for changes to the `K`.
Expand All @@ -452,6 +451,7 @@ where
trigger_selector.push(self_watcher);
Self {
trigger_selector,
trigger_backoff: Box::new(watcher::default_backoff()),
graceful_shutdown_selector: vec![
// Fallback future, ensuring that we never terminate if no additional futures are added to the selector
future::pending().boxed(),
Expand All @@ -465,6 +465,18 @@ where
}
}

/// Specify the backoff policy for "trigger" watches
///
/// This includes the core watch, as well as auxilary watches introduced by [`Self::owns`] and [`Self::watches`].
///
/// The [`default_backoff`](crate::watcher::default_backoff) follows client-go conventions,
/// but can be overridden by calling this method.
#[must_use]
pub fn trigger_backoff(mut self, backoff: impl Backoff + Send + 'static) -> Self {
self.trigger_backoff = Box::new(backoff);
self
}

/// Retrieve a copy of the reader before starting the controller
pub fn store(&self) -> Store<K> {
self.reader.clone()
Expand Down Expand Up @@ -757,7 +769,7 @@ where
error_policy,
context,
self.reader,
self.trigger_selector
StreamBackoff::new(self.trigger_selector, self.trigger_backoff)
.take_until(future::select_all(self.graceful_shutdown_selector)),
)
.take_until(futures::future::select_all(self.forceful_shutdown_selector))
Expand Down
86 changes: 86 additions & 0 deletions kube-runtime/src/utils/backoff_reset_timer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
use std::time::{Duration, Instant};

use backoff::{backoff::Backoff, Clock, SystemClock};

/// A [`Backoff`] wrapper that resets after a fixed duration has elapsed.
pub struct ResetTimerBackoff<B, C = SystemClock> {
backoff: B,
clock: C,
last_backoff: Option<Instant>,
reset_duration: Duration,
}

impl<B: Backoff> ResetTimerBackoff<B> {
pub fn new(backoff: B, reset_duration: Duration) -> Self {
Self::new_with_custom_clock(backoff, reset_duration, SystemClock {})
}
}

impl<B: Backoff, C: Clock> ResetTimerBackoff<B, C> {
fn new_with_custom_clock(backoff: B, reset_duration: Duration, clock: C) -> Self {
Self {
backoff,
clock,
last_backoff: None,
reset_duration,
}
}
}

impl<B: Backoff, C: Clock> Backoff for ResetTimerBackoff<B, C> {
fn next_backoff(&mut self) -> Option<Duration> {
if let Some(last_backoff) = self.last_backoff {
if self.clock.now() > last_backoff + self.reset_duration {
tracing::debug!(
?last_backoff,
reset_duration = ?self.reset_duration,
"Resetting backoff, since reset duration has expired"
);
self.backoff.reset();
}
}
self.last_backoff = Some(self.clock.now());
self.backoff.next_backoff()
}

fn reset(&mut self) {
// Do not even bother trying to reset here, since `next_backoff` will take care of this when the timer expires.
}
}

#[cfg(test)]
mod tests {
use backoff::{backoff::Backoff, Clock};
use tokio::time::advance;

use super::ResetTimerBackoff;
use crate::utils::stream_backoff::tests::LinearBackoff;
use std::time::{Duration, Instant};

#[tokio::test]
async fn should_reset_when_timer_expires() {
tokio::time::pause();
let mut backoff = ResetTimerBackoff::new_with_custom_clock(
LinearBackoff::new(Duration::from_secs(2)),
Duration::from_secs(60),
TokioClock,
);
assert_eq!(backoff.next_backoff(), Some(Duration::from_secs(2)));
advance(Duration::from_secs(40)).await;
assert_eq!(backoff.next_backoff(), Some(Duration::from_secs(4)));
advance(Duration::from_secs(40)).await;
assert_eq!(backoff.next_backoff(), Some(Duration::from_secs(6)));
advance(Duration::from_secs(80)).await;
assert_eq!(backoff.next_backoff(), Some(Duration::from_secs(2)));
advance(Duration::from_secs(80)).await;
assert_eq!(backoff.next_backoff(), Some(Duration::from_secs(2)));
}

struct TokioClock;

impl Clock for TokioClock {
fn now(&self) -> Instant {
tokio::time::Instant::now().into_std()
}
}
}
7 changes: 7 additions & 0 deletions kube-runtime/src/utils.rs → kube-runtime/src/utils/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,11 @@
//! Helpers for manipulating built-in streams

mod backoff_reset_timer;
mod stream_backoff;

pub use backoff_reset_timer::ResetTimerBackoff;
pub use stream_backoff::StreamBackoff;

use crate::watcher;
use futures::{
pin_mut,
Expand Down
Loading