Skip to content

Commit

Permalink
Address more comments
Browse files Browse the repository at this point in the history
Signed-off-by: Nicolas Belouin <[email protected]>
  • Loading branch information
diconico07 committed Apr 29, 2024
1 parent 39d346b commit 006f78a
Show file tree
Hide file tree
Showing 9 changed files with 42 additions and 52 deletions.
2 changes: 1 addition & 1 deletion agent/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ serde_derive = "1.0.104"
serde_json = "1.0.45"
serde_yaml = { version = "0.8.11", optional = true }
thiserror = "1.0.50"
tokio = { version = "1.0", features = ["rt-multi-thread", "time", "fs", "macros", "net", "signal"] }
tokio = { version = "1.0", features = ["rt-multi-thread", "time", "fs", "macros", "net"] }
tokio-stream = { version = "0.1", features = ["net", "sync"] }
tonic = "0.10"
tower = "0.4.8"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -403,14 +403,10 @@ impl DiscoveryHandlerRegistry for DHRegistryImpl {
let notifier_receiver = self.endpoint_notifier.subscribe();
let local_req = self.requests.clone();
tokio::spawn(async move {
let mut signal =
tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
.unwrap();
select! {
_ = dh_req_ref
.watch_devices(notifier_receiver) => {},
_ = terminated.notified() => {},
_ = signal.recv() => {},
}
local_req.write().await.remove(&local_key);
});
Expand Down
3 changes: 0 additions & 3 deletions agent/src/discovery_handler_manager/embedded_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,9 @@ impl EmbeddedHandlerEndpoint {
sender: watch::Sender<Vec<Arc<DiscoveredDevice>>>,
mut stream: ReceiverStream<Result<DiscoverResponse, tonic::Status>>,
) {
let mut signal =
tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()).unwrap();
loop {
let msg = select! {
_ = sender.closed() => return,
_ = signal.recv() => return,
msg = stream.try_next() => match msg {
Ok(Some(msg)) => msg,
Ok(None) => {
Expand Down
9 changes: 2 additions & 7 deletions agent/src/discovery_handler_manager/registration_socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use akri_discovery_utils::discovery::v0::{
};
use akri_shared::uds::unix_stream;
use async_trait::async_trait;
use futures::{FutureExt, Stream, StreamExt, TryFutureExt};
use futures::{Stream, StreamExt, TryFutureExt};
use tokio::{select, sync::watch};
use tokio_stream::StreamExt as _;
use tonic::{transport::Channel, Request, Response, Status};
Expand Down Expand Up @@ -67,15 +67,12 @@ impl NetworkEndpoint {
sender: watch::Sender<Vec<Arc<DiscoveredDevice>>>,
mut stream: Pin<Box<dyn Stream<Item = Result<DiscoverResponse, tonic::Status>> + Send>>,
) {
let mut signal =
tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()).unwrap();
loop {
let msg = select! {
// This means all queries for this endpoint must end.
_ = stopper.stopped() => return,
// This means all receiver dropped (i.e no one cares about this query anymore)
_ = sender.closed() => return,
_ = signal.recv() => return,
msg = stream.try_next() => match msg {
Ok(Some(msg)) => msg,
Ok(None) => {
Expand Down Expand Up @@ -216,8 +213,6 @@ pub async fn run_registration_server(
}
}
};
let mut signal =
tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()).unwrap();
tonic::transport::Server::builder()
.add_service(
akri_discovery_utils::discovery::v0::registration_server::RegistrationServer::new(
Expand All @@ -227,7 +222,7 @@ pub async fn run_registration_server(
},
),
)
.serve_with_incoming_shutdown(incoming, signal.recv().map(|_| ()))
.serve_with_incoming(incoming)
.await?;
trace!(
"internal_run_registration_server - gracefully shutdown ... deleting socket {}",
Expand Down
9 changes: 5 additions & 4 deletions agent/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,11 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>
),
);

let (instances_cache, task) = plugin_manager::device_plugin_instance_controller::start_dpm(
device_plugin_manager.clone(),
);
tasks.push(task);
let (instances_cache, device_plugin_controller_task) =
plugin_manager::device_plugin_instance_controller::start_dpm(
device_plugin_manager.clone(),
);
tasks.push(device_plugin_controller_task);

tasks.push(tokio::spawn(
plugin_manager::device_plugin_slot_reclaimer::start_reclaimer(device_plugin_manager),
Expand Down
28 changes: 11 additions & 17 deletions agent/src/plugin_manager/device_plugin_instance_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@ use std::str::FromStr;
use std::{collections::HashMap, sync::Arc, time::Duration};

use akri_shared::{akri::instance::Instance, k8s::api::IntoApi};
use anyhow::Context;
use async_trait::async_trait;
use futures::StreamExt;
use itertools::Itertools;
use kube::api::{Patch, PatchParams};
use kube::core::{NotUsed, Object, ObjectMeta, TypeMeta};
use kube::ResourceExt;
use kube::{Resource, ResourceExt};
use kube_runtime::controller::Action;
use kube_runtime::reflector::Store;
use kube_runtime::Controller;
Expand All @@ -27,6 +28,8 @@ use super::device_plugin_runner::{
};
use super::v1beta1::{AllocateRequest, AllocateResponse, ListAndWatchResponse};

pub const DP_SLOT_PREFIX: &str = "akri.sh/";

#[derive(Error, Debug)]
pub enum DevicePluginError {
#[error("Slot already in use")]
Expand Down Expand Up @@ -223,8 +226,8 @@ impl InstanceDevicePlugin {
let patch = Patch::Apply(
serde_json::to_value(Object {
types: Some(TypeMeta {
api_version: "akri.sh/v0".to_owned(),
kind: "Instance".to_owned(),
api_version: Instance::api_version(&()).to_string(),
kind: Instance::kind(&()).to_string(),
}),
status: None::<NotUsed>,
spec: PartialInstanceSlotUsage { device_usage },
Expand All @@ -233,7 +236,7 @@ impl InstanceDevicePlugin {
..Default::default()
},
})
.unwrap(),
.context("Could not create instance patch")?,
);
api.raw_patch(
&self.instance_name,
Expand Down Expand Up @@ -280,8 +283,8 @@ impl InstanceDevicePlugin {
let patch = Patch::Apply(
serde_json::to_value(Object {
types: Some(TypeMeta {
api_version: "akri.sh/v0".to_owned(),
kind: "Instance".to_owned(),
api_version: Instance::api_version(&()).to_string(),
kind: Instance::kind(&()).to_string(),
}),
status: None::<NotUsed>,
spec: PartialInstanceSlotUsage { device_usage },
Expand All @@ -290,7 +293,7 @@ impl InstanceDevicePlugin {
..Default::default()
},
})
.unwrap(),
.context("Could not create instance patch")?,
);
api.raw_patch(
&self.instance_name,
Expand Down Expand Up @@ -487,8 +490,6 @@ impl ConfigurationDevicePlugin {
let instance_name = plugin.instance_name.clone();
let mut receiver = plugin.slots_status.lock().await.subscribe();
tokio::spawn(async move {
let mut signal =
tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()).unwrap();
loop {
{
let (has_free, used_config_slots) = {
Expand Down Expand Up @@ -568,7 +569,6 @@ impl ConfigurationDevicePlugin {
break;
}
},
_ = signal.recv() => {break}
}
}
slots_ref.write().await.send_modify(|slots| {
Expand Down Expand Up @@ -779,7 +779,7 @@ impl DevicePluginManager {
.enumerate()
.filter_map(|(i, u)| match u {
DeviceUsage::Node(n) if *n == self.node_name => {
Some(format!("akri.sh/{}-{}", instance, i))
Some(format!("{}{}-{}", DP_SLOT_PREFIX, instance, i))
}
DeviceUsage::Configuration { vdev, node } if *node == self.node_name => {
Some(vdev.to_string())
Expand All @@ -798,12 +798,6 @@ pub fn start_dpm(dpm: Arc<DevicePluginManager>) -> (Store<Instance>, JoinHandle<
let store = controller.store();
let task = tokio::spawn(async {
controller
.graceful_shutdown_on(async {
let mut signal =
tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
.unwrap();
signal.recv().await;
})
.run(reconcile, error_policy, dpm)
.for_each(|_| futures::future::ready(()))
.await
Expand Down
13 changes: 8 additions & 5 deletions agent/src/plugin_manager/device_plugin_slot_reclaimer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ use tokio::net::UnixStream;
use tonic::transport::{Endpoint, Uri};
use tower::service_fn;

use crate::plugin_manager::v1::ListPodResourcesRequest;
use crate::plugin_manager::{
device_plugin_instance_controller::DP_SLOT_PREFIX, v1::ListPodResourcesRequest,
};

use super::{
device_plugin_instance_controller::DevicePluginManager,
Expand All @@ -21,6 +23,10 @@ pub const KUBELET_SOCKET: &str = "/var/lib/kubelet/pod-resources/kubelet.sock";
const SLOT_GRACE_PERIOD: Duration = Duration::from_secs(20);
const SLOT_RECLAIM_INTERVAL: Duration = Duration::from_secs(10);

/// This function connects to kubelet's resource monitoring interface and extracts
/// the set of resources currently used by pods on the node.
/// It uses this Kubelet interface:
/// https://kubernetes.io/docs/concepts/extend-kubernetes/compute-storage-net/device-plugins/#grpc-endpoint-list
async fn get_used_slots() -> Result<HashSet<String>, anyhow::Error> {
// We will ignore this dummy uri because UDS does not use it.
// Some servers will check the uri content so the uri needs to
Expand Down Expand Up @@ -51,7 +57,7 @@ async fn get_used_slots() -> Result<HashSet<String>, anyhow::Error> {
.flat_map(|pr| {
pr.containers.into_iter().flat_map(|cr| {
cr.devices.into_iter().flat_map(|cd| {
if cd.resource_name.starts_with("akri.sh/") {
if cd.resource_name.starts_with(DP_SLOT_PREFIX) {
cd.device_ids
} else {
vec![]
Expand All @@ -66,8 +72,6 @@ async fn get_used_slots() -> Result<HashSet<String>, anyhow::Error> {

pub async fn start_reclaimer(dp_manager: Arc<DevicePluginManager>) {
let mut stalled_slots: HashMap<String, Instant> = HashMap::new();
let mut signal =
tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()).unwrap();
loop {
trace!("reclaiming unused slots - start");
if let Ok(used_slots) = get_used_slots().await {
Expand Down Expand Up @@ -102,7 +106,6 @@ pub async fn start_reclaimer(dp_manager: Arc<DevicePluginManager>) {
}
tokio::select! {
_ = tokio::time::sleep(SLOT_RECLAIM_INTERVAL) => {},
_ = signal.recv() => return,
};
}
}
22 changes: 14 additions & 8 deletions agent/src/util/discovery_configuration_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,18 +58,24 @@ pub async fn start_controller(
let controller = Controller::new(api, Default::default());

controller
.graceful_shutdown_on(async {
let mut signal =
tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()).unwrap();
signal.recv().await;
})
// Reconcile the Configuration when the discovery handler manager signals a change
.reconcile_on(tokio_stream::wrappers::ReceiverStream::new(rec))
.run(reconcile, error_policy, ctx)
.for_each(|_| futures::future::ready(()))
.await;
}

/// This function is the main Reconcile function for Configurations resources
/// This will get called every time a Configuration gets added or is changed, it will also be called
/// for every existing configuration on startup.
/// We also set-up discovery manager to trigger reconciliation upon discovery state change
///
/// Here the function will (in order):
/// - Check if Configuration awaits deletion, and if so terminate pending discovery, remove finalizer and return early
/// - Add finalizer if not here already
/// - Start discovery if not already started
/// - Get discovery results (empty list if just started)
/// - Create/Delete Instances according to discovery results
pub async fn reconcile(
dc: Arc<Configuration>,
ctx: Arc<ControllerContext>,
Expand Down Expand Up @@ -515,7 +521,7 @@ mod tests {
namespace: Some("namespace-a".to_string()),
name: Some("instance-1".to_string()),
owner_references: Some(vec![OwnerReference {
api_version: "akri.sh/v0".to_string(),
api_version: Instance::api_version(&()).to_string(),
block_owner_deletion: None,
controller: Some(true),
kind: "Configuration".to_string(),
Expand All @@ -539,7 +545,7 @@ mod tests {
namespace: Some("namespace-a".to_string()),
name: Some("instance-2".to_string()),
owner_references: Some(vec![OwnerReference {
api_version: "akri.sh/v0".to_string(),
api_version: Instance::api_version(&()).to_string(),
block_owner_deletion: None,
controller: Some(true),
kind: "Configuration".to_string(),
Expand All @@ -563,7 +569,7 @@ mod tests {
namespace: Some("namespace-a".to_string()),
name: Some("instance-3".to_string()),
owner_references: Some(vec![OwnerReference {
api_version: "akri.sh/v0".to_string(),
api_version: Instance::api_version(&()).to_string(),
block_owner_deletion: None,
controller: Some(true),
kind: "Configuration".to_string(),
Expand Down
4 changes: 1 addition & 3 deletions agent/src/util/stopper.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::sync::Arc;

use futures::stream::{AbortHandle, Abortable};
use tokio::{signal::unix::SignalKind, sync::watch};
use tokio::sync::watch;

#[derive(Clone)]
pub struct Stopper {
Expand All @@ -16,10 +16,8 @@ impl Stopper {
};
let local_s = s.clone();
tokio::spawn(async move {
let mut signal = tokio::signal::unix::signal(SignalKind::terminate()).unwrap();
tokio::select! {
_ = local_s.stopped() => {},
_ = signal.recv() => local_s.stop()
}
});
s
Expand Down

0 comments on commit 006f78a

Please sign in to comment.