From 9685bc1ea8f4b9e6dfce4f4eb800db1a6ab06749 Mon Sep 17 00:00:00 2001 From: Eirik A Date: Thu, 24 Aug 2023 06:55:58 +0100 Subject: [PATCH] Fixup watcher docs, and watchlist doc examples (#1284) * Fixup watcher docs, and watchlist doc examples Lots of missing references in doc builds because many of these are warnings. Removed the `namespace_reflector` because it's a slowly changing thing to watch, and instead added a way to opt-in to `WATCHLIST=1` in pod and node watchers. Was trying to do this on event_watcher also but apparently events on k3s does not support sendInitialEvents on 1.27 yet so left it out there. While in the area i also updated event_watcher to use the new `Event` type to make the example better. Signed-off-by: clux * clippy Signed-off-by: clux --------- Signed-off-by: clux --- examples/Cargo.toml | 4 -- examples/README.md | 6 +++ examples/event_watcher.rs | 24 +++++++---- examples/namespace_reflector.rs | 49 ---------------------- examples/node_watcher.rs | 8 +++- examples/pod_watcher.rs | 9 +++- kube-core/src/params.rs | 6 +-- kube-runtime/src/controller/mod.rs | 4 +- kube-runtime/src/utils/mod.rs | 2 - kube-runtime/src/utils/stream_subscribe.rs | 2 - kube-runtime/src/watcher.rs | 24 +++++++---- 11 files changed, 56 insertions(+), 82 deletions(-) delete mode 100644 examples/namespace_reflector.rs diff --git a/examples/Cargo.toml b/examples/Cargo.toml index 0ce18ea4d..985566972 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -213,7 +213,3 @@ path = "secret_syncer.rs" name = "pod_shell_crossterm" path = "pod_shell_crossterm.rs" required-features = ["ws"] - -[[example]] -name = "namespace_reflector" -path = "namespace_reflector.rs" diff --git a/examples/README.md b/examples/README.md index ca0db9419..9a778cc6a 100644 --- a/examples/README.md +++ b/examples/README.md @@ -87,6 +87,12 @@ cargo run --example node_watcher cargo run --example dynamic_watcher ``` +The `node_` and `pod_` watcher also allows using [Kubernetes 1.27 Streaming lists](https://kubernetes.io/docs/reference/using-api/api-concepts/#streaming-lists) via `WATCHLIST=1`: + +```sh +WATCHLIST=1 RUST_LOG=info,kube=debug cargo run --example pod_watcher +``` + ### controllers Main example requires you creating the custom resource first: diff --git a/examples/event_watcher.rs b/examples/event_watcher.rs index e48026ee8..20c7bca2f 100644 --- a/examples/event_watcher.rs +++ b/examples/event_watcher.rs @@ -1,11 +1,11 @@ use futures::{pin_mut, TryStreamExt}; -use k8s_openapi::api::core::v1::Event; +use k8s_openapi::api::{core::v1::ObjectReference, events::v1::Event}; use kube::{ api::Api, runtime::{watcher, WatchStreamExt}, Client, }; -use tracing::*; +use tracing::info; #[tokio::main] async fn main() -> anyhow::Result<()> { @@ -13,9 +13,7 @@ async fn main() -> anyhow::Result<()> { let client = Client::try_default().await?; let events: Api = Api::all(client); - let wc = watcher::Config::default(); - - let ew = watcher(events, wc).applied_objects(); + let ew = watcher(events, watcher::Config::default()).applied_objects(); pin_mut!(ew); while let Some(event) = ew.try_next().await? { @@ -27,10 +25,18 @@ async fn main() -> anyhow::Result<()> { // This function lets the app handle an added/modified event from k8s fn handle_event(ev: Event) -> anyhow::Result<()> { info!( - "Event: \"{}\" via {} {}", - ev.message.unwrap().trim(), - ev.involved_object.kind.unwrap(), - ev.involved_object.name.unwrap() + "{}: {} ({})", + ev.regarding.map(fmt_obj_ref).unwrap_or_default(), + ev.reason.unwrap_or_default(), + ev.note.unwrap_or_default(), ); Ok(()) } + +fn fmt_obj_ref(oref: ObjectReference) -> String { + format!( + "{}/{}", + oref.kind.unwrap_or_default(), + oref.name.unwrap_or_default() + ) +} diff --git a/examples/namespace_reflector.rs b/examples/namespace_reflector.rs deleted file mode 100644 index 6d7af6385..000000000 --- a/examples/namespace_reflector.rs +++ /dev/null @@ -1,49 +0,0 @@ -use futures::TryStreamExt; -use k8s_openapi::api::core::v1::Namespace; -use kube::{ - api::Api, - runtime::{predicates, reflector, watcher, WatchStreamExt}, - Client, ResourceExt, -}; -use tracing::*; - -#[tokio::main] -async fn main() -> anyhow::Result<()> { - tracing_subscriber::fmt::init(); - let client = Client::try_default().await?; - - let api: Api = Api::all(client); - let (reader, writer) = reflector::store::(); - - tokio::spawn(async move { - // Show state every 5 seconds of watching - loop { - reader.wait_until_ready().await.unwrap(); - tokio::time::sleep(std::time::Duration::from_secs(5)).await; - info!("Current namespace count: {}", reader.state().len()); - // full information with debug logs - for p in reader.state() { - let yaml = serde_yaml::to_string(p.as_ref()).unwrap(); - debug!("Namespace {}: \n{}", p.name_any(), yaml); - } - } - }); - - let stream = watcher(api, watcher::Config::default().streaming_lists()) - .default_backoff() - .modify(|ns| { - // memory optimization for our store - we don't care about managed fields/annotations/status - ns.managed_fields_mut().clear(); - ns.annotations_mut().clear(); - ns.status = None; - }) - .reflect(writer) - .applied_objects() - .predicate_filter(predicates::resource_version); // NB: requires an unstable feature - - futures::pin_mut!(stream); - while let Some(ns) = stream.try_next().await? { - info!("saw {}", ns.name_any()); - } - Ok(()) -} diff --git a/examples/node_watcher.rs b/examples/node_watcher.rs index e5a079f34..1391abe11 100644 --- a/examples/node_watcher.rs +++ b/examples/node_watcher.rs @@ -14,7 +14,13 @@ async fn main() -> anyhow::Result<()> { let events: Api = Api::all(client.clone()); let nodes: Api = Api::all(client.clone()); - let wc = watcher::Config::default().labels("beta.kubernetes.io/arch=amd64"); + let use_watchlist = std::env::var("WATCHLIST").map(|s| s == "1").unwrap_or(false); + let wc = if use_watchlist { + // requires WatchList feature gate on 1.27 or later + watcher::Config::default().streaming_lists() + } else { + watcher::Config::default() + }; let obs = watcher(nodes, wc).default_backoff().applied_objects(); pin_mut!(obs); diff --git a/examples/pod_watcher.rs b/examples/pod_watcher.rs index e4e79fc4d..7a1f2dcc0 100644 --- a/examples/pod_watcher.rs +++ b/examples/pod_watcher.rs @@ -12,8 +12,15 @@ async fn main() -> anyhow::Result<()> { tracing_subscriber::fmt::init(); let client = Client::try_default().await?; let api = Api::::default_namespaced(client); + let use_watchlist = std::env::var("WATCHLIST").map(|s| s == "1").unwrap_or(false); + let wc = if use_watchlist { + // requires WatchList feature gate on 1.27 or later + watcher::Config::default().streaming_lists() + } else { + watcher::Config::default() + }; - watcher(api, watcher::Config::default()) + watcher(api, wc) .applied_objects() .default_backoff() .try_for_each(|p| async move { diff --git a/kube-core/src/params.rs b/kube-core/src/params.rs index 38e59e3d3..01136c669 100644 --- a/kube-core/src/params.rs +++ b/kube-core/src/params.rs @@ -106,7 +106,7 @@ impl ListParams { } else { // When there's a continue token, we don't want to set resourceVersion if let Some(rv) = &self.resource_version { - if rv != "0" || (rv == "0" && self.limit.is_none()) { + if rv != "0" || self.limit.is_none() { qp.append_pair("resourceVersion", rv.as_str()); match &self.version_match { @@ -189,7 +189,7 @@ impl ListParams { /// Sets an arbitary resource version match strategy /// - /// A non-default strategy such as `VersionMatch::Exact` or `VersionMatch::NotGreaterThan` + /// A non-default strategy such as `VersionMatch::Exact` or `VersionMatch::NotOlderThan` /// requires an explicit `resource_version` set to pass request validation. #[must_use] pub fn matching(mut self, version_match: VersionMatch) -> Self { @@ -472,7 +472,7 @@ impl WatchParams { /// Constructor for doing Kubernetes 1.27 Streaming List watches /// - /// Enables [`VersionMatch::NotGreaterThan`] semantics and [`WatchParams::send_initial_events`]. + /// Enables [`VersionMatch::NotOlderThan`] semantics and [`WatchParams::send_initial_events`]. pub fn streaming_lists() -> Self { Self { send_initial_events: true, diff --git a/kube-runtime/src/controller/mod.rs b/kube-runtime/src/controller/mod.rs index ec254d4bc..457744035 100644 --- a/kube-runtime/src/controller/mod.rs +++ b/kube-runtime/src/controller/mod.rs @@ -428,8 +428,8 @@ pub struct Config { impl Config { /// The debounce duration used to deduplicate reconciliation requests. /// - /// When set to a non-zero duration, debouncing is enabled in the [`Scheduler`] resulting - /// in __trailing edge debouncing__ of reqonciler requests. + /// When set to a non-zero duration, debouncing is enabled in the [`scheduler`](crate::scheduler()) + /// resulting in __trailing edge debouncing__ of reqonciler requests. /// This option can help to reduce the amount of unnecessary reconciler calls /// when using multiple controller relations, or during rapid phase transitions. /// diff --git a/kube-runtime/src/utils/mod.rs b/kube-runtime/src/utils/mod.rs index 0ba7c4ece..d6c98b403 100644 --- a/kube-runtime/src/utils/mod.rs +++ b/kube-runtime/src/utils/mod.rs @@ -117,8 +117,6 @@ where S::Ok: Debug, S::Error: Debug, { - // `arc_with_non_send_sync` false positive: https://github.com/rust-lang/rust-clippy/issues/11076 - #[allow(clippy::arc_with_non_send_sync)] let stream = Arc::new(Mutex::new(stream.into_stream().peekable())); ( SplitCase { diff --git a/kube-runtime/src/utils/stream_subscribe.rs b/kube-runtime/src/utils/stream_subscribe.rs index 31f37747c..3391ee0cf 100644 --- a/kube-runtime/src/utils/stream_subscribe.rs +++ b/kube-runtime/src/utils/stream_subscribe.rs @@ -62,8 +62,6 @@ impl Stream for StreamSubscribe { match item { Poll::Ready(Some(item)) => { - // `arc_with_non_send_sync` false positive: https://github.com/rust-lang/rust-clippy/issues/11076 - #[allow(clippy::arc_with_non_send_sync)] let item = Arc::new(item); this.sender.send(Some(item.clone())).ok(); Poll::Ready(Some(item)) diff --git a/kube-runtime/src/watcher.rs b/kube-runtime/src/watcher.rs index 150ed0f75..0edfc11cb 100644 --- a/kube-runtime/src/watcher.rs +++ b/kube-runtime/src/watcher.rs @@ -200,12 +200,19 @@ pub enum ListSemantic { } /// Configurable watcher listwatch semantics + #[derive(Clone, Default, Debug, PartialEq)] pub enum InitialListStrategy { + /// List first, then watch from given resouce version + /// + /// This is the old and default way of watching. The watcher will do a paginated list call first before watching. + /// When using this mode, you can configure the page_size on the watcher. #[default] ListWatch, /// Kubernetes 1.27 Streaming Lists - /// https://kubernetes.io/docs/reference/using-api/api-concepts/#streaming-lists + /// + /// See [upstream documentation on streaming lists](https://kubernetes.io/docs/reference/using-api/api-concepts/#streaming-lists), + /// and the [KEP](https://github.com/kubernetes/enhancements/tree/master/keps/sig-api-machinery/3157-watch-list#design-details). StreamingList, } @@ -233,10 +240,9 @@ pub struct Config { /// /// Configures re-list for performance vs. consistency. /// - /// NB: This option only has an effect for [`WatcherMode::ListWatch`]. + /// NB: This option only has an effect for [`InitialListStrategy::ListWatch`]. pub list_semantic: ListSemantic, - /// Kubernetes 1.27 Streaming Lists /// Control how the watcher fetches the initial list of objects. /// /// ListWatch: The watcher will fetch the initial list of objects using a list call. @@ -245,8 +251,8 @@ pub struct Config { /// StreamingList is more efficient than ListWatch, but it requires the server to support /// streaming list bookmarks (opt-in feature gate in Kubernetes 1.27). /// - /// See https://kubernetes.io/docs/reference/using-api/api-concepts/#streaming-lists - /// See https://github.com/kubernetes/enhancements/tree/master/keps/sig-api-machinery/3157-watch-list#design-details + /// See [upstream documentation on streaming lists](https://kubernetes.io/docs/reference/using-api/api-concepts/#streaming-lists), + /// and the [KEP](https://github.com/kubernetes/enhancements/tree/master/keps/sig-api-machinery/3157-watch-list#design-details). pub initial_list_strategy: InitialListStrategy, /// Maximum number of objects retrieved per list operation resyncs. @@ -256,7 +262,7 @@ pub struct Config { /// /// Defaults to 500. Note that `None` represents unbounded. /// - /// NB: This option only has an effect for [`WatcherMode::ListWatch`]. + /// NB: This option only has an effect for [`InitialListStrategy::ListWatch`]. pub page_size: Option, /// Enables watch events with type "BOOKMARK". @@ -325,7 +331,7 @@ impl Config { /// Sets list semantic to configure re-list performance and consistency /// - /// NB: This option only has an effect for [`WatcherMode::ListWatch`]. + /// NB: This option only has an effect for [`InitialListStrategy::ListWatch`]. #[must_use] pub fn list_semantic(mut self, semantic: ListSemantic) -> Self { self.list_semantic = semantic; @@ -334,7 +340,7 @@ impl Config { /// Sets list semantic to `Any` to improve list performance /// - /// NB: This option only has an effect for [`WatcherMode::ListWatch`]. + /// NB: This option only has an effect for [`InitialListStrategy::ListWatch`]. #[must_use] pub fn any_semantic(self) -> Self { self.list_semantic(ListSemantic::Any) @@ -355,7 +361,7 @@ impl Config { /// This can reduce the memory consumption during resyncs, at the cost of requiring more /// API roundtrips to complete. /// - /// NB: This option only has an effect for [`WatcherMode::ListWatch`]. + /// NB: This option only has an effect for [`InitialListStrategy::ListWatch`]. #[must_use] pub fn page_size(mut self, page_size: u32) -> Self { self.page_size = Some(page_size);