diff --git a/osrdyne/configuration_templates/process-compose-driver.yaml b/osrdyne/configuration_templates/process-compose-driver.yaml new file mode 100644 index 00000000000..d5b5299b624 --- /dev/null +++ b/osrdyne/configuration_templates/process-compose-driver.yaml @@ -0,0 +1,11 @@ +worker_driver: + type: ProcessComposeDriver + address: localhost + port: 8080 # process-compose sets it to 8080 by default, better change it... + process: core # the process name to scale + + # Files to write the content of the osrdyne environment variables + comm_files: + worker_id: /tmp/worker_id # WORKER_ID + worker_key: /tmp/worker_key # WORKER_KEY + amqp_uri: /tmp/worker_amqp_uri # WORKER_AMQP_URI diff --git a/osrdyne/configuration_templates/process-compose.yaml b/osrdyne/configuration_templates/process-compose.yaml new file mode 100644 index 00000000000..95b5c7faddf --- /dev/null +++ b/osrdyne/configuration_templates/process-compose.yaml @@ -0,0 +1,37 @@ +version: "0.5" +is_strict: true + +environment: + - "CORE_EDITOAST_URL=http://localhost:8090" + - "JAVA_TOOL_OPTIONS=-javaagent:$HOME/opentelemetry-javaagent.jar" # edit this path + - "CORE_MONITOR_TYPE=opentelemetry" + - "OTEL_EXPORTER_OTLP_TRACES_PROTOCOL=grpc" + - "OTEL_EXPORTER_OTLP_TRACES_ENDPOINT=http://localhost:4317" + - "OTEL_METRICS_EXPORTER=none" + - "OTEL_LOGS_EXPORTER=none" + +processes: + core: + # Disabled by default (started by osrdyne, only the templating matters). + # The osrdyne env variables are read from the files specified in the driver configuration. + command: | + ( + export WORKER_ID=$(cat /tmp/worker_id); + export WORKER_KEY=$(cat /tmp/worker_key); + export WORKER_AMQP_URI="$(cat /tmp/worker_amqp_uri);" + java $$JAVA_OPTS -ea -jar build/libs/osrd-all.jar worker + ) + working_dir: "$OSRD_PATH/core" + disabled: true + gateway: + command: cargo run + working_dir: "$OSRD_PATH/gateway" + editoast: + command: cargo run runserver + working_dir: "$OSRD_PATH/editoast" + front: + command: yarn start + working_dir: "$OSRD_PATH/front" + osrdyne: + command: cargo run + working_dir: "$OSRD_PATH/osrdyne" diff --git a/osrdyne/src/config.rs b/osrdyne/src/config.rs index 16c642bfc27..86ea0769f91 100644 --- a/osrdyne/src/config.rs +++ b/osrdyne/src/config.rs @@ -1,6 +1,9 @@ use std::time::Duration; -use crate::drivers::{docker::DockerDriverOptions, kubernetes::KubernetesDriverOptions}; +use crate::drivers::{ + docker::DockerDriverOptions, kubernetes::KubernetesDriverOptions, + process_compose::PCDriverOptions, +}; use serde::{Deserialize, Serialize}; use figment::{ @@ -14,6 +17,7 @@ pub enum WorkerDriverConfig { Noop, DockerDriver(DockerDriverOptions), KubernetesDriver(KubernetesDriverOptions), + ProcessComposeDriver(PCDriverOptions), } #[derive(Debug, Deserialize, Serialize)] diff --git a/osrdyne/src/drivers.rs b/osrdyne/src/drivers.rs index 91e28b04dd4..8a9fc9454d6 100644 --- a/osrdyne/src/drivers.rs +++ b/osrdyne/src/drivers.rs @@ -1,6 +1,7 @@ pub mod docker; pub mod kubernetes; pub mod noop; +pub mod process_compose; pub mod worker_driver; const LABEL_MANAGED_BY: &str = "osrd/managed_by"; diff --git a/osrdyne/src/drivers/docker.rs b/osrdyne/src/drivers/docker.rs index 565cedc39ba..2b8206f7b1a 100644 --- a/osrdyne/src/drivers/docker.rs +++ b/osrdyne/src/drivers/docker.rs @@ -56,7 +56,7 @@ impl DockerDriver { impl WorkerDriver for DockerDriver { fn get_or_create_worker_group( - &self, + &mut self, worker_key: Key, ) -> Pin> + Send + '_>> { Box::pin(async move { @@ -140,7 +140,7 @@ impl WorkerDriver for DockerDriver { } fn destroy_worker_group( - &self, + &mut self, worker_key: Key, ) -> Pin> + Send + '_>> { Box::pin(async move { diff --git a/osrdyne/src/drivers/kubernetes.rs b/osrdyne/src/drivers/kubernetes.rs index 80d692e0458..229ab51690d 100644 --- a/osrdyne/src/drivers/kubernetes.rs +++ b/osrdyne/src/drivers/kubernetes.rs @@ -98,7 +98,7 @@ impl KubernetesDriver { impl WorkerDriver for KubernetesDriver { fn get_or_create_worker_group( - &self, + &mut self, worker_key: Key, ) -> Pin> + Send + '_>> { Box::pin(async move { @@ -245,7 +245,7 @@ impl WorkerDriver for KubernetesDriver { } fn destroy_worker_group( - &self, + &mut self, worker_key: Key, ) -> Pin> + Send + '_>> { Box::pin(async move { diff --git a/osrdyne/src/drivers/noop.rs b/osrdyne/src/drivers/noop.rs index bcf0443ab2d..27304f47d4c 100644 --- a/osrdyne/src/drivers/noop.rs +++ b/osrdyne/src/drivers/noop.rs @@ -18,14 +18,14 @@ impl NoopDriver { impl WorkerDriver for NoopDriver { fn get_or_create_worker_group( - &self, + &mut self, _worker_key: Key, ) -> Pin> + Send + '_>> { Box::pin(async move { Ok(self.fixed_pool_id) }) } fn destroy_worker_group( - &self, + &mut self, _worker_key: Key, ) -> Pin> + Send + '_>> { Box::pin(async move { Ok(()) }) diff --git a/osrdyne/src/drivers/process_compose.rs b/osrdyne/src/drivers/process_compose.rs new file mode 100644 index 00000000000..24579e37621 --- /dev/null +++ b/osrdyne/src/drivers/process_compose.rs @@ -0,0 +1,394 @@ +//! A driver to manage workers using [process-compose](https://github.com/F1bonacc1/process-compose) REST API. +//! +//! # DISCLAIMER +//! +//! THIS DRIVER IS INTENDED FOR DEVELOPMENT PURPOSES ONLY. IT WILL NOT WORK IN PRODUCTION AND IS EXPECTED +//! TO BREAK. USE AT YOUR OWN RISK. +//! +//! Multiple process-compose instances are not supported. +//! +//! # Implementation details +//! +//! ## Spawning workers +//! +//! `process-compose` configuration is read at startup and cannot be modified while `process-compose` is running. +//! This has several unfortunate consequences on how the driver works, especially on the scaling part. Indeed, +//! we cant just edit the YAML in the driver as there's no way to reload the configuration. +//! +//! process-compose allows a process to be scaled (`sh$ process-compose process scale `). However, +//! we cannot set the configuration of a replicated process. Each one of them has to follow the configuration defined +//! in `process-compose.yaml`. **Including environment variables**. This is particularly problematic for the driver +//! which has to set a different WORKER_{ID,KEY,AMQP_URI} for each process. +//! +//! We overcome this by reading the values of the WORKER_* variables from tmp files when the process start. These files +//! are written by the driver just before a process starts. +//! +//! ## Identifying workers +//! +//! Unlike Docker or Kube, process-compose doesn't let us tag each process individually. We have to store in the driver +//! a mapping {PID => (Key, Uuid)} to keep track of which process is assigned to which worker key. +//! +//! ## Dealing with `process-compose process scale` behaviour +//! +//! process-compose scaling may rename the process being scale. For example, if when starting PC whe have: +//! +//! ``` +//! core +//! editoast +//! front +//! # ... +//! ``` +//! +//! and we scale `core` to 3 (`process-compose process scale core 3`), we will have: +//! +//! ``` +//! core-0 +//! core-1 +//! core-2 +//! editoast +//! front +//! ``` +//! +//! Note how the original `core` process is now named `core-0`. +//! Now, if we want to scale even more, `process-compose process scale core 5`, we will fail since `core` doesn't exist anymore. +//! We have to run `process-compose process scale core-0 5` instead. +//! +//! The reverse applies when scaling down: `process-compose process scale core-0 1` will rename `core-0` to `core`. +//! +//! # Known limitations +//! +//! * Restarting the osrdyne process but not the other will spawn new workers for each message queue (the driver lost +//! its internal mapping). +//! * The driver doesn't cleanup disabled or errored workers. + +use std::{fmt::Debug, future::Future, pin::Pin}; + +use anyhow::{bail, Context}; +use im::HashMap; +use serde::{Deserialize, Serialize}; +use uuid::Uuid; + +use crate::Key; + +use super::worker_driver::{ + DriverError, DriverError::ProcessComposeError, WorkerDriver, WorkerMetadata, +}; + +type Pid = u64; + +#[derive(Debug, Clone, Deserialize, Serialize)] +pub struct PCDriverOptions { + process: String, + address: String, + port: u64, + comm_files: CommFiles, +} + +#[derive(Debug, Clone, Deserialize, Serialize)] +pub struct CommFiles { + worker_id: String, + worker_key: String, + amqp_uri: String, +} + +pub struct PCDriver { + options: PCDriverOptions, + amqp_uri: String, + workers: HashMap, + /// A lock on the files from which are read the worker environment variables. + /// Prevents races when spawning multiple workers rapidly (eg. when osrdyne is rescheduled). + spawn_lock: tokio::sync::Mutex<()>, + pc: PCClient, +} + +impl WorkerDriver for PCDriver { + fn get_or_create_worker_group( + &mut self, + worker_key: Key, + ) -> Pin> + Send + '_>> { + Box::pin(async move { + if let Some((_, id)) = self.workers.get(&worker_key) { + return Ok(*id); + } + + self.start_worker(worker_key.clone()) + .await + .map_err(ProcessComposeError)?; + + Ok(self + .workers + .get(&worker_key) + .expect( + "process should have been created or an error should have been raised before", + ) + .1) + }) + } + + fn destroy_worker_group( + &mut self, + worker_key: Key, + ) -> Pin> + Send + '_>> { + Box::pin(async move { + if self.workers.contains_key(&worker_key) { + self.stop_worker(worker_key.clone()) + .await + .map_err(ProcessComposeError)?; + } + assert!(!self.workers.contains_key(&worker_key)); + Ok(()) + }) + } + + fn list_worker_groups( + &self, + ) -> Pin, DriverError>> + Send + '_>> { + Box::pin(async move { + let workers = self + .workers + .iter() + .map(|(key, (info, id))| (info.pid, (key, id))) + .collect::>(); + + Ok(self + .pc + .process_info() + .await + .map_err(ProcessComposeError)? + .into_iter() + .filter_map(|p| { + (p.name.starts_with(&self.options.process) + && matches!(p.status, ProcessStatus::Running)) + .then_some(p) + }) + .filter_map(|ProcessInfo { name, pid, .. }| { + let Some((key, id)) = workers.get(&pid) else { + log::warn!("unexpected unmanaged worker {name}:{pid}"); + return None; + }; + Some(WorkerMetadata { + external_id: pid.to_string(), + worker_id: **id, + worker_key: (*key).clone(), + }) + }) + .collect()) + }) + } +} + +impl PCDriver { + pub fn new(options: PCDriverOptions, amqp_uri: String) -> Self { + let pc = PCClient::new(options.address.clone(), options.port); + Self { + options, + amqp_uri, + workers: HashMap::new(), + spawn_lock: Default::default(), + pc, + } + } + + async fn scan_workers(&self) -> anyhow::Result> { + let processes = self.pc.process_info().await?; + Ok(processes + .into_iter() + .filter(|p| p.name.starts_with(&self.options.process)) + .collect()) + } + + async fn start_worker(&mut self, key: Key) -> anyhow::Result<()> { + let processes = self.scan_workers().await?; + let pc_proc_name_id = if processes.is_empty() { + &self.options.process + } else { + &processes[0].name + }; + + let id = { + let _ = self.spawn_lock.lock().await; + let id = Uuid::new_v4(); + let CommFiles { + worker_id, + worker_key, + amqp_uri, + } = &self.options.comm_files; + std::fs::write(worker_id, id.to_string())?; + std::fs::write(worker_key, key.to_string())?; + std::fs::write(amqp_uri, self.amqp_uri.clone())?; + self.pc + .process_scale(pc_proc_name_id, processes.len() + 1) + .await?; + id + }; + + self.sync_processes(Some((key, id))).await + } + + async fn stop_worker(&mut self, key: Key) -> anyhow::Result<()> { + let processes = self.scan_workers().await?; + let pc_proc_name_id = if processes.is_empty() { + &self.options.process + } else { + &processes[0].name + }; + + let stop = self + .workers + .remove(&key) + .expect("process to stop should exist") + .0 + .name; + self.pc.process_stop(&stop).await?; + self.pc + .process_scale(pc_proc_name_id, processes.len() - 1) + .await?; + + self.sync_processes(None).await + } + + async fn sync_processes(&mut self, mut new_worker: Option<(Key, Uuid)>) -> anyhow::Result<()> { + let processes = self.scan_workers().await?; + let mut old_state = std::mem::replace(&mut self.workers, HashMap::new()) + .into_iter() + .map(|(key, (info, id))| (info.pid, (key, id))) + .collect::>(); + + for p @ ProcessInfo { pid, status, .. } in processes { + match status { + ProcessStatus::Disabled | ProcessStatus::Completed => continue, + ProcessStatus::Running => { + if let Some((key, id)) = old_state.remove(&pid) { + // yup, the process is still there + self.workers.insert(key, (p, id)); + } else { + let Some((key, id)) = new_worker.take() else { + log::warn!("process {pid} with status {status:?} cannot be given a worker key - was the process started manually?"); + continue; + }; + log::info!("attached worker {key} to process {}:{pid}", p.name); + self.workers.insert(key, (p, id)); + } + } + status => { + log::warn!( + "unexpected non-running worker process {pid} with status: {status:?}" + ); + continue; + } + } + } + + if let Some((key, _)) = new_worker { + bail!("worker with key={key} did not start successfully"); + } + + Ok(()) + } +} + +struct PCClient { + address: String, + port: u64, + client: reqwest::Client, +} + +#[derive(Debug, Clone, Deserialize)] +struct ProcessInfo { + name: String, + status: ProcessStatus, + pid: Pid, + // more fields are omitted +} + +#[derive(Debug, Deserialize)] +#[serde(untagged)] +enum PCResult { + Ok(T), + Err { error: String }, +} + +impl PCResult { + fn into_result(self) -> anyhow::Result { + match self { + Self::Ok(ok) => Ok(ok), + Self::Err { error } => bail!("process-compose error: {error}"), + } + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Deserialize)] +enum ProcessStatus { + Disabled, + Foreground, + Pending, + Running, + Launching, + Launched, + Restarting, + Terminating, + Completed, + Skipped, + Error, +} + +impl PCClient { + fn new(address: String, port: u64) -> Self { + let client = reqwest::Client::builder().build().unwrap(); + Self { + address, + port, + client, + } + } + + fn url(&self, route: &str) -> String { + format!("http://{}:{}/{}", self.address, self.port, route) + } + + async fn process_info(&self) -> anyhow::Result> { + #[derive(Deserialize)] + struct Data { + data: Vec, + } + + self.client + .get(self.url("processes")) + .send() + .await + .context("could not send request to process-compose")? + .json::>() + .await + .context("could not parse response from process-compose")? + .into_result() + .map(|data| data.data) + } + + async fn process_scale(&self, process: &str, count: usize) -> anyhow::Result<()> { + log::debug!("scaling process {process} to {count}"); + self.client + .patch(self.url(&format!("process/scale/{process}/{count}"))) + .send() + .await + .context("could not send request to process-compose")? + .json::>() + .await + .context("could not parse response from process-compose")? + .into_result()?; + Ok(()) + } + + async fn process_stop(&self, process: &str) -> anyhow::Result<()> { + log::info!("stopping process {process}"); + self.client + .post(self.url(&format!("process/stop/{process}"))) + .send() + .await + .context("could not send request to process-compose")? + .json::>() + .await + .context("could not parse response from process-compose")? + .into_result()?; + Ok(()) + } +} diff --git a/osrdyne/src/drivers/worker_driver.rs b/osrdyne/src/drivers/worker_driver.rs index 71a9d2099df..8eb2adafcdb 100644 --- a/osrdyne/src/drivers/worker_driver.rs +++ b/osrdyne/src/drivers/worker_driver.rs @@ -20,11 +20,14 @@ pub struct WorkerMetadata { } #[derive(Debug)] +#[allow(clippy::enum_variant_names)] pub enum DriverError { /// Docker error DockerError(bollard::errors::Error), /// Kubernetes error KubernetesError(kube::Error), + /// Process-compose error + ProcessComposeError(anyhow::Error), } impl Display for DriverError { @@ -32,6 +35,7 @@ impl Display for DriverError { match self { DriverError::DockerError(e) => write!(f, "Docker error: {}", e), DriverError::KubernetesError(e) => write!(f, "Kubernetes error: {}", e), + DriverError::ProcessComposeError(e) => write!(f, "process-compose error: {}", e), } } } @@ -41,13 +45,13 @@ pub trait WorkerDriver: Send { /// If the worker is already scheduled, nothing happens. /// Returns the internal UUID of the worker. fn get_or_create_worker_group( - &self, + &mut self, worker_key: Key, ) -> Pin> + Send + '_>>; /// Unschedules a worker from the given group. fn destroy_worker_group( - &self, + &mut self, worker_key: Key, ) -> Pin> + Send + '_>>; diff --git a/osrdyne/src/main.rs b/osrdyne/src/main.rs index 0d479c0c659..81e1e98eabf 100644 --- a/osrdyne/src/main.rs +++ b/osrdyne/src/main.rs @@ -20,6 +20,7 @@ use crate::config::WorkerDriverConfig; use crate::drivers::docker::DockerDriver; use crate::drivers::kubernetes::KubernetesDriver; use crate::drivers::noop::NoopDriver; +use crate::drivers::process_compose::PCDriver; use crate::drivers::worker_driver::WorkerDriver; mod api; @@ -116,6 +117,11 @@ async fn main() -> Result<(), anyhow::Error> { ) } + WorkerDriverConfig::ProcessComposeDriver(opts) => { + info!("Using process-compose driver"); + Box::new(PCDriver::new(opts, config.amqp_uri.clone())) + } + WorkerDriverConfig::Noop => { info!("Using Noop driver"); Box::new(NoopDriver::new()) diff --git a/osrdyne/src/pool.rs b/osrdyne/src/pool.rs index 197f5652302..38fa83e3d3b 100644 --- a/osrdyne/src/pool.rs +++ b/osrdyne/src/pool.rs @@ -414,7 +414,7 @@ async fn activity_processor( async fn worker_control_loop( expected_state: tokio::sync::watch::Receiver, running_workers_watch: tokio::sync::watch::Sender>>, - driver: Box, + mut driver: Box, sleep_interval: Duration, ) { loop {