Skip to content

Commit

Permalink
osrdyne: using timeout_allowance, various clippy fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
ElysaSrc committed Jul 17, 2024
1 parent 8e0ee10 commit 3ca6e69
Show file tree
Hide file tree
Showing 10 changed files with 362 additions and 262 deletions.
1 change: 1 addition & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ services:
retries: 3

core:
# This is a dummy container to build the core image
image: ghcr.io/openrailassociation/osrd-edge/osrd-core:${TAG-dev}
container_name: osrd-core-dummy
build:
Expand Down
35 changes: 23 additions & 12 deletions osrdyne/src/api.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::sync::Arc;
use std::{env, sync::Arc};

use axum::{extract::State, routing::get, Json, Router};
use log::info;
Expand All @@ -9,21 +9,21 @@ use crate::drivers::worker_driver::WorkerMetadata;

#[derive(Clone)]
struct AppState {
known_cores: Arc<Mutex<Arc<Vec<WorkerMetadata>>>>,
known_workers: Arc<Mutex<Arc<Vec<WorkerMetadata>>>>,
}

pub async fn create_server(
addr: String,
known_cores: tokio::sync::watch::Receiver<Arc<Vec<WorkerMetadata>>>,
known_workers: tokio::sync::watch::Receiver<Arc<Vec<WorkerMetadata>>>,
) {
let app_state = AppState {
known_cores: Arc::new(Mutex::new(Arc::new(vec![]))),
known_workers: Arc::new(Mutex::new(Arc::new(vec![]))),
};

tokio::spawn(app_state_updater(app_state.clone(), known_cores));
tokio::spawn(app_state_updater(app_state.clone(), known_workers));

let app = Router::new()
.route("/", get(|| async { "Hello, World!" }))
.route("/version", get(version))
.route("/health", get(health_check))
.route("/status", get(list_cores))
.with_state(app_state);
Expand All @@ -43,11 +43,11 @@ pub async fn create_server(

async fn app_state_updater(
state: AppState,
mut known_cores: tokio::sync::watch::Receiver<Arc<Vec<WorkerMetadata>>>,
mut known_workers: tokio::sync::watch::Receiver<Arc<Vec<WorkerMetadata>>>,
) {
while known_cores.changed().await.is_ok() {
let mut state_known_cores = state.known_cores.lock().await;
*state_known_cores = known_cores.borrow().clone();
while known_workers.changed().await.is_ok() {
let mut state_known_workers = state.known_workers.lock().await;
*state_known_workers = known_workers.borrow().clone();
}
}

Expand All @@ -66,8 +66,19 @@ struct ListCoresResponse {
}

async fn list_cores(State(state): State<AppState>) -> Json<ListCoresResponse> {
let latest_known_cores = state.known_cores.lock().await;
let latest_known_workers = state.known_workers.lock().await;
Json(ListCoresResponse {
cores: (**latest_known_cores).clone(),
cores: (**latest_known_workers).clone(),
})
}

#[derive(Serialize)]
pub struct Version {
git_describe: Option<String>,
}

async fn version() -> Json<Version> {
Json(Version {
git_describe: env::var("OSRD_GIT_DESCRIBE").ok(),
})
}
4 changes: 1 addition & 3 deletions osrdyne/src/drivers/kubernetes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,6 @@ impl WorkerDriver for KubernetesDriver {
tolerations: self.options.kube_deployment_options.tolerations.clone(),
..Default::default()
}),
..Default::default()
},
..Default::default()
}),
Expand Down Expand Up @@ -212,7 +211,6 @@ impl WorkerDriver for KubernetesDriver {
target_cpu_utilization_percentage: Some(
autoscaling.target_cpu_utilization_percentage,
),
..Default::default()
}
}),
..Default::default()
Expand Down Expand Up @@ -261,7 +259,7 @@ impl WorkerDriver for KubernetesDriver {
.map_err(super::worker_driver::DriverError::KubernetesError)?;

// Delete the autoscaler
if let Some(_) = &self.options.autoscaling {
if self.options.autoscaling.is_some() {
kube::api::Api::<HorizontalPodAutoscaler>::namespaced(
self.client.clone(),
&self.options.namespace,
Expand Down
4 changes: 2 additions & 2 deletions osrdyne/src/management_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,13 +80,13 @@ impl ManagementClient {
let port = config.management_port;

let vhost = match parsed_uri.path() {
s if s.len() == 0 => "%2f", // that's the default
"" => "%2f", // that's the default
s => &s[1..],
};

let password = parsed_uri.password().unwrap_or("guest");
let user = match parsed_uri.username() {
username if username.len() == 0 => "guest",
"" => "guest",
username => username,
};

Expand Down
4 changes: 2 additions & 2 deletions osrdyne/src/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,7 @@ async fn activity_processor(
last_activities.insert(key.clone(), now);

// Update the state tracker
let _ = client.require_worker_group(key, extra_lifetime);
let _ = client.require_worker_group(key, extra_lifetime).await;
}
Ok(())
}
Expand Down Expand Up @@ -440,7 +440,7 @@ async fn worker_control_loop(

// Remove unwanted groups
for worker_key in current_worker_keys {
if !wanted_worker_keys.contains(&worker_key) {
if !wanted_worker_keys.contains(worker_key) {
if let Err(e) = driver.destroy_worker_group(worker_key.clone()).await {
log::error!(
"Failed to destroy worker group: {:?}. Aborting current loop iteration.",
Expand Down
119 changes: 90 additions & 29 deletions osrdyne/src/queue_controller.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
use std::collections::HashMap;
use std::fmt::Display;
use std::fmt::Formatter;
use std::sync::Arc;

use lapin::options::QueueBindOptions;
use lapin::options::QueueDeclareOptions;
use lapin::options::QueueDeleteOptions;
use lapin::types::FieldTable;
use lapin::Channel;
use log::debug;
use tokio::sync::watch;
use tokio::sync::oneshot;
use tokio::select;
use lapin::Channel;
use tokio::sync::oneshot;
use tokio::sync::watch;
use tokio::task::AbortHandle;
use tokio::task::JoinSet;

Expand All @@ -19,7 +21,6 @@ use crate::target_tracker::TargetUpdate;
use crate::Key;
use crate::Pool;


#[derive(Debug)]
pub struct QueuesState {
pub target_generation: GenerationId,
Expand All @@ -31,7 +32,7 @@ pub async fn queues_control_loop(
chan: Channel,
initial_keys: Vec<Key>,
mut expected_state: watch::Receiver<TargetUpdate>,
res: oneshot::Sender<watch::Receiver<QueuesState>>
res: oneshot::Sender<watch::Receiver<QueuesState>>,
) {
let chan = Arc::new(chan);

Expand All @@ -50,24 +51,32 @@ pub async fn queues_control_loop(

// remove queues which aren't supposed to be there
if !target.queues.contains_key(&queue) {
let job = jobs.spawn(update_queue(pool.clone(), chan.clone(), queue.clone(), None));
let job = jobs.spawn(update_queue(
pool.clone(),
chan.clone(),
queue.clone(),
None,
));
jobs_by_key.insert(queue, job);
}
}

// set all queues to the target state
for (queue, queue_status) in target.queues.iter() {
let job = jobs.spawn(update_queue(pool.clone(), chan.clone(), queue.clone(), Some(*queue_status)));
let job = jobs.spawn(update_queue(
pool.clone(),
chan.clone(),
queue.clone(),
Some(*queue_status),
));
jobs_by_key.insert(queue.clone(), job);
}

// send back the initial state
let (tx, rx) = watch::channel(
QueuesState {
target_generation: target.generation,
queues: init_state,
}
);
let (tx, rx) = watch::channel(QueuesState {
target_generation: target.generation,
queues: init_state,
});
let _ = res.send(rx);

// two concurrent events can happen:
Expand Down Expand Up @@ -98,7 +107,7 @@ pub async fn queues_control_loop(
// a new target state arrived
changed = expected_state.changed() => {
// stop if the sender was dropped
if let Err(_) = changed {
if changed.is_err() {
break 'outer;
}

Expand Down Expand Up @@ -132,46 +141,98 @@ pub async fn queues_control_loop(
}
}


#[derive(Debug)]
enum QueueUpdateError {
QueueNotEmpty(Key),
LapinError(lapin::Error),
}

impl Display for QueueUpdateError {
fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), std::fmt::Error> {
match self {
QueueUpdateError::QueueNotEmpty(key) => {
write!(f, "queue {:?} is not empty", key)
}
QueueUpdateError::LapinError(err) => {
write!(f, "lapin error: {:?}", err)
}
}
}
}

impl From<lapin::Error> for QueueUpdateError {
fn from(value: lapin::Error) -> Self {
Self::LapinError(value)
}
}

async fn update_queue(pool: Arc<Pool>, chan: Arc<Channel>, key: Key, new_state: Option<QueueStatus>) -> Result<(Key, Option<QueueStatus>), QueueUpdateError> {
async fn update_queue(
pool: Arc<Pool>,
chan: Arc<Channel>,
key: Key,
new_state: Option<QueueStatus>,
) -> Result<(Key, Option<QueueStatus>), QueueUpdateError> {
let queue_name = pool.key_queue_name(&key);
match new_state {
Some(QueueStatus::Active) => {
debug!("declaring and binding queue {:?}", &key);
chan.queue_declare(&queue_name, QueueDeclareOptions::default(), FieldTable::default()).await?;
chan.queue_bind(&queue_name, &pool.request_xchg, &key.encode(), QueueBindOptions::default(), FieldTable::default()).await?;
},
chan.queue_declare(
&queue_name,
QueueDeclareOptions::default(),
FieldTable::default(),
)
.await?;
chan.queue_bind(
&queue_name,
&pool.request_xchg,
&key.encode(),
QueueBindOptions::default(),
FieldTable::default(),
)
.await?;
}
Some(QueueStatus::Unbound) => {
debug!("declaring and unbinding queue {:?}", &key);
chan.queue_declare(&queue_name, QueueDeclareOptions::default(), FieldTable::default()).await?;
chan.queue_unbind(&queue_name, &pool.request_xchg, &key.encode(), FieldTable::default()).await?;
},
chan.queue_declare(
&queue_name,
QueueDeclareOptions::default(),
FieldTable::default(),
)
.await?;
chan.queue_unbind(
&queue_name,
&pool.request_xchg,
&key.encode(),
FieldTable::default(),
)
.await?;
}
None => {
debug!("deleting queue {:?}", &key);
chan.queue_unbind(&queue_name, &pool.request_xchg, &key.encode(), FieldTable::default()).await?;
match chan.queue_delete(&queue_name, QueueDeleteOptions {
if_empty: true,
..Default::default()
}).await {
chan.queue_unbind(
&queue_name,
&pool.request_xchg,
&key.encode(),
FieldTable::default(),
)
.await?;
match chan
.queue_delete(
&queue_name,
QueueDeleteOptions {
if_empty: true,
..Default::default()
},
)
.await
{
Err(lapin::Error::ProtocolError(err)) => {
debug!("got protocol error (assuming non empty queue): {:?}", err);
return Err(QueueUpdateError::QueueNotEmpty(key));
},
}
res => res?,
};
},
}
}
Ok((key, new_state))
}
Loading

0 comments on commit 3ca6e69

Please sign in to comment.