Skip to content

Commit

Permalink
Use Watcher API for instance reconciliation in controller
Browse files Browse the repository at this point in the history
Signed-off-by: Kate Goldenring <[email protected]>
  • Loading branch information
kate-goldenring committed Oct 2, 2024
1 parent d9689ae commit 97470b0
Show file tree
Hide file tree
Showing 6 changed files with 151 additions and 284 deletions.
65 changes: 40 additions & 25 deletions controller/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@
extern crate lazy_static;
mod util;

use akri_shared::akri::{metrics::run_metrics_server, API_NAMESPACE};
use akri_shared::{
akri::{metrics::run_metrics_server, API_NAMESPACE},
k8s::AKRI_CONFIGURATION_LABEL_NAME,
};
use futures::StreamExt;
use kube::runtime::{watcher::Config, Controller};
use prometheus::IntGaugeVec;
use std::sync::Arc;
use util::{
controller_ctx::{ControllerContext, CONTROLLER_FIELD_MANAGER_ID},
instance_action, node_watcher, pod_watcher,
};
use util::{controller_ctx::ControllerContext, instance_action, node_watcher, pod_watcher};

/// Length of time to sleep between controller system validation checks
pub const SYSTEM_CHECK_DELAY_SECS: u64 = 30;
Expand All @@ -34,33 +36,46 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>
);

log::info!("{} Controller logging started", API_NAMESPACE);
let mut tasks = Vec::new();

// Start server for prometheus metrics
tokio::spawn(run_metrics_server());

let controller_ctx = Arc::new(ControllerContext::new(
Arc::new(kube::Client::try_default().await?),
CONTROLLER_FIELD_MANAGER_ID,
));
let instance_watcher_ctx = controller_ctx.clone();
let client = Arc::new(kube::Client::try_default().await?);
let controller_ctx = Arc::new(ControllerContext::new(client.clone()));
let node_watcher_ctx = controller_ctx.clone();
let pod_watcher_ctx = controller_ctx.clone();

// Handle instance changes
tasks.push(tokio::spawn(async {
instance_action::run(instance_watcher_ctx).await;
}));
// Watch for node disappearance
tasks.push(tokio::spawn(async {
node_watcher::run(node_watcher_ctx).await;
}));
// Watch for broker Pod state changes
tasks.push(tokio::spawn(async {
pod_watcher::run(pod_watcher_ctx).await;
}));
node_watcher::check(client.clone()).await?;
let node_controller = Controller::new(
node_watcher_ctx.client.all().as_inner(),
Config::default().any_semantic(),
)
.shutdown_on_signal()
.run(
node_watcher::reconcile,
node_watcher::error_policy,
node_watcher_ctx,
)
.filter_map(|x| async move { std::result::Result::ok(x) })
.for_each(|_| futures::future::ready(()));

pod_watcher::check(client.clone()).await?;
let pod_controller = Controller::new(
pod_watcher_ctx.client.all().as_inner(),
Config::default().labels(AKRI_CONFIGURATION_LABEL_NAME),
)
.shutdown_on_signal()
.run(
pod_watcher::reconcile,
pod_watcher::error_policy,
pod_watcher_ctx,
)
.filter_map(|x| async move { std::result::Result::ok(x) })
.for_each(|_| futures::future::ready(()));

futures::future::try_join_all(tasks).await?;
tokio::select! {
_ = futures::future::join(node_controller, pod_controller) => {},
_ = instance_action::run(client) => {}
}

log::info!("{} Controller end", API_NAMESPACE);
Ok(())
Expand Down
7 changes: 1 addition & 6 deletions controller/src/util/controller_ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,6 @@ use k8s_openapi::api::core::v1::{Node, Pod, Service};

use tokio::sync::RwLock;

// Identifier for the controller to be set as the field manager for server-side apply
pub const CONTROLLER_FIELD_MANAGER_ID: &str = "akri.sh/controller";

/// Pod states that BrokerPodWatcher is interested in
///
/// PodState describes the various states that the controller can
Expand Down Expand Up @@ -91,16 +88,14 @@ pub struct ControllerContext {
pub client: Arc<dyn ControllerKubeClient>,
pub known_pods: Arc<RwLock<HashMap<String, PodState>>>,
pub known_nodes: Arc<RwLock<HashMap<String, NodeState>>>,
pub identifier: String,
}

impl ControllerContext {
pub fn new(client: Arc<dyn ControllerKubeClient>, identifier: &str) -> Self {
pub fn new(client: Arc<dyn ControllerKubeClient>) -> Self {
ControllerContext {
client,
known_pods: Arc::new(RwLock::new(HashMap::new())),
known_nodes: Arc::new(RwLock::new(HashMap::new())),
identifier: identifier.to_string(),
}
}
}
Loading

0 comments on commit 97470b0

Please sign in to comment.