diff --git a/controller/src/main.rs b/controller/src/main.rs index ff8d76f55..31aab21be 100644 --- a/controller/src/main.rs +++ b/controller/src/main.rs @@ -2,13 +2,15 @@ extern crate lazy_static; mod util; -use akri_shared::akri::{metrics::run_metrics_server, API_NAMESPACE}; +use akri_shared::{ + akri::{metrics::run_metrics_server, API_NAMESPACE}, + k8s::AKRI_CONFIGURATION_LABEL_NAME, +}; +use futures::StreamExt; +use kube::runtime::{watcher::Config, Controller}; use prometheus::IntGaugeVec; use std::sync::Arc; -use util::{ - controller_ctx::{ControllerContext, CONTROLLER_FIELD_MANAGER_ID}, - instance_action, node_watcher, pod_watcher, -}; +use util::{controller_ctx::ControllerContext, instance_action, node_watcher, pod_watcher}; /// Length of time to sleep between controller system validation checks pub const SYSTEM_CHECK_DELAY_SECS: u64 = 30; @@ -34,33 +36,46 @@ async fn main() -> Result<(), Box ); log::info!("{} Controller logging started", API_NAMESPACE); - let mut tasks = Vec::new(); // Start server for prometheus metrics tokio::spawn(run_metrics_server()); - - let controller_ctx = Arc::new(ControllerContext::new( - Arc::new(kube::Client::try_default().await?), - CONTROLLER_FIELD_MANAGER_ID, - )); - let instance_watcher_ctx = controller_ctx.clone(); + let client = Arc::new(kube::Client::try_default().await?); + let controller_ctx = Arc::new(ControllerContext::new(client.clone())); let node_watcher_ctx = controller_ctx.clone(); let pod_watcher_ctx = controller_ctx.clone(); - // Handle instance changes - tasks.push(tokio::spawn(async { - instance_action::run(instance_watcher_ctx).await; - })); - // Watch for node disappearance - tasks.push(tokio::spawn(async { - node_watcher::run(node_watcher_ctx).await; - })); - // Watch for broker Pod state changes - tasks.push(tokio::spawn(async { - pod_watcher::run(pod_watcher_ctx).await; - })); + node_watcher::check(client.clone()).await?; + let node_controller = Controller::new( + node_watcher_ctx.client.all().as_inner(), + Config::default().any_semantic(), + ) + .shutdown_on_signal() + .run( + node_watcher::reconcile, + node_watcher::error_policy, + node_watcher_ctx, + ) + .filter_map(|x| async move { std::result::Result::ok(x) }) + .for_each(|_| futures::future::ready(())); + + pod_watcher::check(client.clone()).await?; + let pod_controller = Controller::new( + pod_watcher_ctx.client.all().as_inner(), + Config::default().labels(AKRI_CONFIGURATION_LABEL_NAME), + ) + .shutdown_on_signal() + .run( + pod_watcher::reconcile, + pod_watcher::error_policy, + pod_watcher_ctx, + ) + .filter_map(|x| async move { std::result::Result::ok(x) }) + .for_each(|_| futures::future::ready(())); - futures::future::try_join_all(tasks).await?; + tokio::select! { + _ = futures::future::join(node_controller, pod_controller) => {}, + _ = instance_action::run(client) => {} + } log::info!("{} Controller end", API_NAMESPACE); Ok(()) diff --git a/controller/src/util/controller_ctx.rs b/controller/src/util/controller_ctx.rs index 717855c5e..8534688c1 100644 --- a/controller/src/util/controller_ctx.rs +++ b/controller/src/util/controller_ctx.rs @@ -10,9 +10,6 @@ use k8s_openapi::api::core::v1::{Node, Pod, Service}; use tokio::sync::RwLock; -// Identifier for the controller to be set as the field manager for server-side apply -pub const CONTROLLER_FIELD_MANAGER_ID: &str = "akri.sh/controller"; - /// Pod states that BrokerPodWatcher is interested in /// /// PodState describes the various states that the controller can @@ -91,16 +88,14 @@ pub struct ControllerContext { pub client: Arc, pub known_pods: Arc>>, pub known_nodes: Arc>>, - pub identifier: String, } impl ControllerContext { - pub fn new(client: Arc, identifier: &str) -> Self { + pub fn new(client: Arc) -> Self { ControllerContext { client, known_pods: Arc::new(RwLock::new(HashMap::new())), known_nodes: Arc::new(RwLock::new(HashMap::new())), - identifier: identifier.to_string(), } } } diff --git a/controller/src/util/instance_action.rs b/controller/src/util/instance_action.rs index 0b0e98d41..29d38b735 100644 --- a/controller/src/util/instance_action.rs +++ b/controller/src/util/instance_action.rs @@ -1,7 +1,6 @@ -use super::super::BROKER_POD_COUNT_METRIC; -use super::{pod_action::PodAction, pod_action::PodActionInfo}; -use crate::util::controller_ctx::{ControllerContext, ControllerKubeClient}; -use crate::util::{ControllerError, Result}; +use crate::util::controller_ctx::ControllerKubeClient; +use crate::util::{pod_action::PodAction, pod_action::PodActionInfo}; +use crate::BROKER_POD_COUNT_METRIC; use akri_shared::akri::configuration::Configuration; use akri_shared::k8s::api::Api; use akri_shared::{ @@ -12,17 +11,13 @@ use akri_shared::{ }, }; use anyhow::Context; -use futures::StreamExt; +use futures::TryStreamExt; use k8s_openapi::api::batch::v1::{Job, JobSpec}; use k8s_openapi::api::core::v1::{Pod, PodSpec}; use kube::{ api::{ListParams, ResourceExt}, - runtime::{ - controller::{Action, Controller}, - finalizer::{finalizer, Event}, - watcher::Config, - }, + runtime::{controller::Action, watcher::watcher, watcher::Config, WatchStreamExt}, }; use log::{error, trace}; use std::collections::HashMap; @@ -32,71 +27,37 @@ use std::sync::Arc; pub const PENDING_POD_GRACE_PERIOD_MINUTES: i64 = 5; /// Length of time a Pod can be in an error state before we retry pub const FAILED_POD_GRACE_PERIOD_MINUTES: i64 = 0; +// Identifier for the controller to be set as the field manager for server-side apply +pub const CONTROLLER_FIELD_MANAGER_ID: &str = "akri.sh/controller"; -pub static INSTANCE_FINALIZER: &str = "instances.kube.rs"; - -/// Initialize the instance controller -/// TODO: consider passing state that is shared among controllers such as a metrics exporter -pub async fn run(ctx: Arc) { - let api = ctx.client.all().as_inner(); +/// This function is the main Reconcile function for Instance resources +/// This will get called every time an Instance gets added or is changed. +pub async fn run(client: Arc) -> anyhow::Result<()> { + let api: Box> = client.all(); if let Err(e) = api.list(&ListParams::default().limit(1)).await { error!("Instance CRD is not queryable; {e:?}. Is the CRD installed?"); std::process::exit(1); } - Controller::new(api, Config::default().any_semantic()) - .shutdown_on_signal() - .run(reconcile, error_policy, ctx) - // TODO: needs update for tokio? - .filter_map(|x| async move { std::result::Result::ok(x) }) - .for_each(|_| futures::future::ready(())) - .await; -} - -fn error_policy( - _instance: Arc, - error: &ControllerError, - _ctx: Arc, -) -> Action { - log::warn!("reconcile failed: {:?}", error); - Action::requeue(std::time::Duration::from_secs(5 * 60)) -} - -/// Instance event types -/// -/// Instance actions describe the types of actions the Controller can -/// react to for Instances. -/// -/// This will determine what broker management actions to take (if any) -/// -/// | --> Instance Applied -/// | --> No broker => Do nothing -/// | --> => Deploy a Job if one does not exist -/// | --> => Ensure that each Node on Instance's `nodes` list (up to `capacity` total) have a Pod. -/// Deploy Pods as necessary - -/// This function is the main Reconcile function for Instance resources -/// This will get called every time an Instance gets added or is changed, it will also be called for every existing instance on startup. -pub async fn reconcile(instance: Arc, ctx: Arc) -> Result { - let ns = instance.namespace().unwrap(); // instance has namespace scope - trace!("Reconciling {} in {}", instance.name_any(), ns); - finalizer( - &ctx.client.clone().all().as_inner(), - INSTANCE_FINALIZER, - instance, - |event| reconcile_inner(event, ctx), - ) - .await - .map_err(|e| ControllerError::FinalizerError(Box::new(e))) -} -async fn reconcile_inner(event: Event, ctx: Arc) -> Result { - match event { - Event::Apply(instance) => handle_instance_change(&instance, ctx.clone()).await, - Event::Cleanup(_) => { - // Do nothing. OwnerReferences are attached to Jobs and Pods to automate cleanup - Ok(default_requeue_action()) - } + // First handle existing instances + let instances = api.list(&ListParams::default()).await?; + for instance in instances { + handle_instance_change(instance, client.clone()).await?; } + + watcher(api.as_inner(), Config::default()) + .applied_objects() + .try_for_each(move |instance| { + let client = client.clone(); + async move { + handle_instance_change(instance, client) + .await + .map_err(kube::runtime::watcher::Error::WatchFailed)?; + Ok(()) + } + }) + .await?; + Ok(()) } /// PodContext stores a set of details required to track/create/delete broker @@ -222,7 +183,6 @@ async fn handle_addition_work( pod: Pod, configuration_name: &str, new_node: &str, - field_manager: &str, ) -> anyhow::Result<()> { trace!( "handle_addition_work - Create new Pod for Node={:?}", @@ -230,7 +190,7 @@ async fn handle_addition_work( ); trace!("handle_addition_work - New pod spec={:?}", pod); - api.apply(pod, field_manager).await?; + api.apply(pod, CONTROLLER_FIELD_MANAGER_ID).await?; trace!("handle_addition_work - pod::create_pod succeeded",); BROKER_POD_COUNT_METRIC .with_label_values(&[configuration_name, new_node]) @@ -241,14 +201,18 @@ async fn handle_addition_work( /// Handle Instance change by /// 1) checking to make sure the Instance's Configuration exists -/// 2) calling the appropriate handler depending on the broker type (Pod or Job) if any +/// 2) taking the appropriate action depending on the broker type (Pod or Job) if any: +/// | --> No broker => Do nothing +/// | --> => Deploy a Job if one does not exist +/// | --> => Ensure that each Node on Instance's `nodes` list (up to `capacity` total) have a Pod. +/// Deploy Pods as necessary pub async fn handle_instance_change( - instance: &Instance, - ctx: Arc, -) -> Result { + instance: Instance, + client: Arc, +) -> kube::Result { trace!("handle_instance_change - enter"); let instance_namespace = instance.namespace().unwrap(); - let api: Box> = ctx.client.namespaced(&instance_namespace); + let api: Box> = client.namespaced(&instance_namespace); let Ok(Some(configuration)) = api.get(&instance.spec.configuration_name).await else { // In this scenario, a configuration has been deleted without the Akri Agent deleting the associated Instances. // Furthermore, Akri Agent is still modifying the Instances. This should not happen beacuse Agent @@ -263,13 +227,13 @@ pub async fn handle_instance_change( return Ok(default_requeue_action()); }; let res = match broker_spec { - BrokerSpec::BrokerPodSpec(p) => handle_instance_change_pod(instance, p, ctx).await, + BrokerSpec::BrokerPodSpec(p) => handle_instance_change_pod(&instance, p, client).await, BrokerSpec::BrokerJobSpec(j) => { handle_instance_change_job( - instance, + &instance, *configuration.metadata.generation.as_ref().unwrap(), j, - ctx.client.clone(), + client.clone(), ) .await } @@ -329,7 +293,7 @@ pub async fn handle_instance_change_job( pub async fn handle_instance_change_pod( instance: &Instance, podspec: &PodSpec, - ctx: Arc, + client: Arc, ) -> anyhow::Result<()> { trace!("handle_instance_change_pod - enter"); // Assume all nodes require PodAction::Add (reflect that there is no running Pod, unless we find one) @@ -355,9 +319,7 @@ pub async fn handle_instance_change_pod( AKRI_INSTANCE_LABEL_NAME, instance.name_unchecked() )); - let api = ctx - .client - .namespaced(&instance.namespace().context("no namespace")?); + let api = client.namespaced(&instance.namespace().context("no namespace")?); let instance_pods = api.list(&lp).await?; trace!( "handle_instance_change - found {} pods", @@ -375,7 +337,7 @@ pub async fn handle_instance_change_pod( "handle_instance_change - nodes tracked after querying existing pods={:?}", nodes_to_act_on ); - do_pod_action_for_nodes(nodes_to_act_on, instance, podspec, api, &ctx.identifier).await?; + do_pod_action_for_nodes(nodes_to_act_on, instance, podspec, api).await?; trace!("handle_instance_change - exit"); Ok(()) @@ -386,7 +348,6 @@ pub(crate) async fn do_pod_action_for_nodes( instance: &Instance, podspec: &PodSpec, api: Box>, - field_manager: &str, ) -> anyhow::Result<()> { trace!("do_pod_action_for_nodes - enter"); // Iterate over nodes_to_act_on where value == (PodAction::Remove | PodAction::RemoveAndAdd) @@ -439,7 +400,6 @@ pub(crate) async fn do_pod_action_for_nodes( new_pod, &instance.spec.configuration_name, &new_node, - field_manager, ) .await?; } @@ -466,16 +426,6 @@ mod handle_instance_tests { use chrono::Utc; use mockall::predicate::*; - #[derive(Clone, Debug, PartialEq)] - pub enum InstanceAction { - /// An Instance is added - Add, - /// An Instance is removed - Remove, - /// An Instance is updated - Update, - } - #[derive(Clone)] struct HandleInstanceWork { find_pods_selector: &'static str, @@ -568,14 +518,6 @@ mod handle_instance_tests { } } - fn configure_deletion_work_for_config_a_b494b6() -> HandleDeletionWork { - HandleDeletionWork { - broker_pod_names: vec!["config-a-b494b6-pod"], - // instance_svc_names: vec!["config-a-b494b6-svc"], - cleanup_namespaces: vec!["config-a-namespace"], - } - } - fn configure_for_handle_deletion_work(mock: &mut MockApi, work: &HandleDeletionWork) { for i in 0..work.broker_pod_names.len() { let broker_pod_name = work.broker_pod_names[i]; @@ -631,22 +573,13 @@ mod handle_instance_tests { } async fn run_handle_instance_change_test( - ctx: Arc, + client: Arc, instance_file: &'static str, - action: &'static InstanceAction, ) { trace!("run_handle_instance_change_test enter"); - let instance_json = file::read_file_to_string(instance_file); + let instance_json: String = file::read_file_to_string(instance_file); let instance: Instance = serde_json::from_str(&instance_json).unwrap(); - reconcile_inner( - match action { - InstanceAction::Add | InstanceAction::Update => Event::Apply(Arc::new(instance)), - InstanceAction::Remove => Event::Cleanup(Arc::new(instance)), - }, - ctx, - ) - .await - .unwrap(); + handle_instance_change(instance, client).await.unwrap(); trace!("run_handle_instance_change_test exit"); } @@ -668,12 +601,7 @@ mod handle_instance_tests { addition_work: Some(configure_add_local_config_a_b494b6(false)), }, ); - run_handle_instance_change_test( - Arc::new(ControllerContext::new(Arc::new(mock), "test")), - "../test/json/local-instance.json", - &InstanceAction::Add, - ) - .await; + run_handle_instance_change_test(Arc::new(mock), "../test/json/local-instance.json").await; } #[tokio::test] @@ -694,12 +622,7 @@ mod handle_instance_tests { addition_work: Some(configure_add_local_config_a_b494b6(true)), }, ); - run_handle_instance_change_test( - Arc::new(ControllerContext::new(Arc::new(mock), "test")), - "../test/json/local-instance.json", - &InstanceAction::Add, - ) - .await; + run_handle_instance_change_test(Arc::new(mock), "../test/json/local-instance.json").await; } #[tokio::test] @@ -722,12 +645,7 @@ mod handle_instance_tests { )), }, ); - run_handle_instance_change_test( - Arc::new(ControllerContext::new(Arc::new(mock), "test")), - "../test/json/shared-instance.json", - &InstanceAction::Add, - ) - .await; + run_handle_instance_change_test(Arc::new(mock), "../test/json/shared-instance.json").await; } #[tokio::test] @@ -750,12 +668,8 @@ mod handle_instance_tests { )), }, ); - run_handle_instance_change_test( - Arc::new(ControllerContext::new(Arc::new(mock), "test")), - "../test/json/shared-instance-update.json", - &InstanceAction::Update, - ) - .await; + run_handle_instance_change_test(Arc::new(mock), "../test/json/shared-instance-update.json") + .await; } #[tokio::test] @@ -806,17 +720,11 @@ mod handle_instance_tests { )), }, ); - run_handle_instance_change_test( - Arc::new(ControllerContext::new(Arc::new(mock), "test")), - instance_file, - &InstanceAction::Update, - ) - .await; + run_handle_instance_change_test(Arc::new(mock), instance_file).await; } /// Checks that the BROKER_POD_COUNT_METRIC is appropriately incremented - /// and decremented when an instance is added and deleted (and pods are - /// created and deleted). Cannot be run in parallel with other tests + /// when an instance is added. Cannot be run in parallel with other tests /// due to the metric being a global variable and modified unpredictably by /// other tests. /// Run with: cargo test -- test_broker_pod_count_metric --ignored @@ -842,47 +750,6 @@ mod handle_instance_tests { addition_work: Some(configure_add_local_config_a_b494b6(false)), }, ); - run_handle_instance_change_test( - Arc::new(ControllerContext::new(Arc::new(mock), "test")), - "../test/json/local-instance.json", - &InstanceAction::Add, - ) - .await; - - // Check that broker pod count metric has been incremented to include new pod for this instance - assert_eq!( - BROKER_POD_COUNT_METRIC - .with_label_values(&["config-a", "node-a"]) - .get(), - 1 - ); - let mut mock = MockControllerKubeClient::default(); - configure_for_handle_instance_change( - &mut mock, - &HandleInstanceWork { - find_pods_selector: "akri.sh/instance=config-a-b494b6", - find_pods_result: "../test/json/running-pod-list-for-config-a-local.json", - find_pods_phase: None, - find_pods_start_time: None, - find_pods_delete_start_time: false, - config_work: get_config_work(), - deletion_work: Some(configure_deletion_work_for_config_a_b494b6()), - addition_work: None, - }, - ); - run_handle_instance_change_test( - Arc::new(ControllerContext::new(Arc::new(mock), "test")), - "../test/json/local-instance.json", - &InstanceAction::Remove, - ) - .await; - - // Check that broker pod count metric has been decremented to reflect deleted instance and pod - assert_eq!( - BROKER_POD_COUNT_METRIC - .with_label_values(&["config-a", "node-a"]) - .get(), - 0 - ); + run_handle_instance_change_test(Arc::new(mock), "../test/json/local-instance.json").await; } } diff --git a/controller/src/util/mod.rs b/controller/src/util/mod.rs index a3ee3ab2f..95f322b63 100644 --- a/controller/src/util/mod.rs +++ b/controller/src/util/mod.rs @@ -17,6 +17,9 @@ pub enum ControllerError { // so boxing this error to break cycles FinalizerError(#[source] Box>), + #[error("Watcher Error: {0}")] + WatcherError(#[from] kube::runtime::watcher::Error), + #[error(transparent)] Other(#[from] anyhow::Error), } diff --git a/controller/src/util/node_watcher.rs b/controller/src/util/node_watcher.rs index 446aa1566..474e863b2 100644 --- a/controller/src/util/node_watcher.rs +++ b/controller/src/util/node_watcher.rs @@ -9,11 +9,9 @@ use crate::util::{ controller_ctx::{ControllerContext, NodeState}, ControllerError, Result, }; -use akri_shared::k8s::api::Api; - use akri_shared::akri::instance::{device_usage::NodeUsage, Instance}; +use akri_shared::k8s::api::Api; use anyhow::Context; -use futures::StreamExt; use k8s_openapi::api::core::v1::{Node, NodeStatus}; use kube::{ api::{ @@ -21,36 +19,32 @@ use kube::{ TypeMeta, }, runtime::{ - controller::{Action, Controller}, + controller::Action, finalizer::{finalizer, Event}, reflector::Lookup, - watcher::Config, }, }; -use log::{error, info, trace}; +use log::{info, trace}; use std::str::FromStr; use std::{collections::HashMap, sync::Arc}; -pub static NODE_FINALIZER: &str = "nodes.kube.rs"; +use super::controller_ctx::ControllerKubeClient; -/// Initialize the instance controller -/// TODO: consider passing state that is shared among controllers such as a metrics exporter -pub async fn run(ctx: Arc) { - let api = ctx.client.all().as_inner(); +pub static NODE_FINALIZER: &str = "akri-node-watcher.kube.rs"; + +pub async fn check(client: Arc) -> anyhow::Result<()> { + let api: Box> = client.all(); if let Err(e) = api.list(&ListParams::default().limit(1)).await { - error!("Nodes are not queryable; {e:?}"); - std::process::exit(1); + anyhow::bail!("Nodes are not queryable; {e:?}") } - Controller::new(api, Config::default().any_semantic()) - .shutdown_on_signal() - .run(reconcile, error_policy, ctx) - // TODO: needs update for tokio? - .filter_map(|x| async move { std::result::Result::ok(x) }) - .for_each(|_| futures::future::ready(())) - .await; + Ok(()) } -fn error_policy(_node: Arc, error: &ControllerError, _ctx: Arc) -> Action { +pub fn error_policy( + _node: Arc, + error: &ControllerError, + _ctx: Arc, +) -> Action { log::warn!("reconcile failed: {:?}", error); Action::requeue(std::time::Duration::from_secs(5 * 60)) } @@ -290,7 +284,7 @@ mod tests { mock.node .expect_all() .return_once(|| Box::new(MockApi::new())); - let ctx = Arc::new(ControllerContext::new(Arc::new(mock), "test")); + let ctx = Arc::new(ControllerContext::new(Arc::new(mock))); reconcile_inner(Event::Apply(Arc::new(node)), ctx.clone()) .await .unwrap(); @@ -311,7 +305,7 @@ mod tests { mock.node .expect_all() .return_once(|| Box::new(MockApi::new())); - let ctx = Arc::new(ControllerContext::new(Arc::new(mock), "test")); + let ctx = Arc::new(ControllerContext::new(Arc::new(mock))); reconcile_inner(Event::Apply(Arc::new(node)), ctx.clone()) .await .unwrap(); @@ -332,7 +326,7 @@ mod tests { mock.node .expect_all() .return_once(|| Box::new(MockApi::new())); - let ctx = Arc::new(ControllerContext::new(Arc::new(mock), "test")); + let ctx = Arc::new(ControllerContext::new(Arc::new(mock))); ctx.known_nodes .write() .await @@ -379,7 +373,7 @@ mod tests { mock.instance .expect_all() .return_once(move || Box::new(instance_api_mock)); - let ctx = Arc::new(ControllerContext::new(Arc::new(mock), "test")); + let ctx = Arc::new(ControllerContext::new(Arc::new(mock))); ctx.known_nodes .write() .await @@ -426,7 +420,7 @@ mod tests { mock.instance .expect_all() .return_once(move || Box::new(instance_api_mock)); - let ctx = Arc::new(ControllerContext::new(Arc::new(mock), "test")); + let ctx = Arc::new(ControllerContext::new(Arc::new(mock))); ctx.known_nodes .write() .await @@ -469,7 +463,7 @@ mod tests { mock.instance .expect_all() .return_once(move || Box::new(instance_api_mock)); - let ctx = Arc::new(ControllerContext::new(Arc::new(mock), "test")); + let ctx = Arc::new(ControllerContext::new(Arc::new(mock))); reconcile_inner(Event::Cleanup(Arc::new(node)), ctx.clone()) .await .unwrap(); diff --git a/controller/src/util/pod_watcher.rs b/controller/src/util/pod_watcher.rs index fca284368..15a21c2a0 100644 --- a/controller/src/util/pod_watcher.rs +++ b/controller/src/util/pod_watcher.rs @@ -8,8 +8,6 @@ use akri_shared::{ }, }; -use futures::StreamExt; - use k8s_openapi::api::core::v1::Pod; use k8s_openapi::{ api::core::v1::{Service, ServiceSpec}, @@ -20,16 +18,15 @@ use kube::api::ObjectList; use kube::{ api::{ListParams, ObjectMeta, ResourceExt}, runtime::{ - controller::{Action, Controller}, + controller::Action, finalizer::{finalizer, Event}, - watcher::Config, }, }; -use log::{error, info, trace}; +use log::{info, trace}; use std::future::Future; use std::{collections::BTreeMap, sync::Arc}; -pub static POD_FINALIZER: &str = "pods.kube.rs"; +pub static POD_FINALIZER: &str = "akri-pod-watcher.kube.rs"; /// The `kind` of a broker Pod's controlling OwnerReference /// @@ -68,22 +65,21 @@ fn get_broker_pod_owner_kind(pod: Arc) -> BrokerPodOwnerKind { } } -/// Run the Pod reconciler -pub async fn run(ctx: Arc) { - let api = ctx.client.all().as_inner(); +pub async fn check( + client: Arc, +) -> anyhow::Result<()> { + let api: Box> = client.all(); if let Err(e) = api.list(&ListParams::default().limit(1)).await { - error!("Pods are not queryable; {e:?}"); - std::process::exit(1); - } - Controller::new(api, Config::default().labels(AKRI_CONFIGURATION_LABEL_NAME)) - .shutdown_on_signal() - .run(reconcile, error_policy, ctx) - .filter_map(|x| async move { std::result::Result::ok(x) }) - .for_each(|_| futures::future::ready(())) - .await; + anyhow::bail!("Pods are not queryable; {e:?}") + } + Ok(()) } -fn error_policy(_node: Arc, error: &ControllerError, _ctx: Arc) -> Action { +pub fn error_policy( + _pod: Arc, + error: &ControllerError, + _ctx: Arc, +) -> Action { log::warn!("reconcile failed: {:?}", error); Action::requeue(std::time::Duration::from_secs(5 * 60)) } @@ -252,7 +248,7 @@ async fn handle_non_running_pod(pod: Arc, ctx: Arc) -> a // Only redeploy Pods that are managed by the Akri Controller (controlled by an Instance OwnerReference) if get_broker_pod_owner_kind(pod) == BrokerPodOwnerKind::Instance { if let Ok(Some(instance)) = ctx.client.namespaced(&namespace).get(&instance_id).await { - super::instance_action::handle_instance_change(&instance, ctx).await?; + super::instance_action::handle_instance_change(instance, ctx.client.clone()).await?; } } Ok(()) @@ -371,7 +367,7 @@ async fn add_instance_and_configuration_services( instance_service_spec, labels, )?; - api.apply(instance_svc, &ctx.identifier).await?; + api.apply(instance_svc, POD_FINALIZER).await?; } if let Some(configuration_service_spec) = &configuration.spec.configuration_service_spec { let configuration_uid = configuration.uid().unwrap(); @@ -393,8 +389,8 @@ async fn add_instance_and_configuration_services( configuration_service_spec, labels, )?; - // TODO: handle already exists error - api.apply(config_svc, &ctx.identifier).await?; + // TODO: use patch instead of apply + api.apply(config_svc, POD_FINALIZER).await?; } Ok(()) } @@ -710,10 +706,9 @@ mod tests { let pod = make_pod_with_owners_and_phase("instance_name", "copnfig_name", "Unknown", "Instance"); let pod_name = pod.name_unchecked(); - let ctx = Arc::new(ControllerContext::new( - Arc::new(MockControllerKubeClient::default()), - "test", - )); + let ctx = Arc::new(ControllerContext::new(Arc::new( + MockControllerKubeClient::default(), + ))); reconcile_inner(Event::Apply(Arc::new(pod)), ctx.clone()) .await .unwrap(); @@ -729,10 +724,9 @@ mod tests { let pod = make_pod_with_owners_and_phase("instance_name", "config_name", "Pending", "Instance"); let pod_name = pod.name_unchecked(); - let ctx = Arc::new(ControllerContext::new( - Arc::new(MockControllerKubeClient::default()), - "test", - )); + let ctx = Arc::new(ControllerContext::new(Arc::new( + MockControllerKubeClient::default(), + ))); reconcile_inner(Event::Apply(Arc::new(pod)), ctx.clone()) .await .unwrap(); @@ -749,10 +743,9 @@ mod tests { let pod = make_pod_with_owners_and_phase("instance_name", "config_name", "Running", "Instance"); let pod_name = pod.name_unchecked(); - let ctx = Arc::new(ControllerContext::new( - Arc::new(MockControllerKubeClient::default()), - "test", - )); + let ctx = Arc::new(ControllerContext::new(Arc::new( + MockControllerKubeClient::default(), + ))); ctx.known_pods .write() .await @@ -820,7 +813,7 @@ mod tests { .return_once(|_| Box::new(mock_svc_api)) .with(mockall::predicate::eq("test-ns")); - let ctx = Arc::new(ControllerContext::new(Arc::new(mock), "test")); + let ctx = Arc::new(ControllerContext::new(Arc::new(mock))); reconcile_inner(Event::Apply(Arc::new(pod)), ctx.clone()) .await @@ -885,7 +878,7 @@ mod tests { .expect_namespaced() .return_once(|_| Box::new(mock_svc_api)) .with(mockall::predicate::eq("test-ns")); - ControllerContext::new(Arc::new(mock), "test") + ControllerContext::new(Arc::new(mock)) } async fn test_reconcile_applied_terminated_phases(phase: &str) {