From 3435a8038aa052a8270723ff72992e08fd928a58 Mon Sep 17 00:00:00 2001 From: Serhii Koropets <33310880+koropets@users.noreply.github.com> Date: Wed, 2 Oct 2024 09:44:06 +0300 Subject: [PATCH] Add support for cargo fmt (#242) * Add support for cargo fmt * Fix ci.yaml on. Fix README.md --- .github/workflows/ci.yaml | 27 +++++++ Cargo.lock | 2 +- Cargo.toml | 2 +- README.md | 2 +- examples/query_server.rs | 5 +- src/crd/argo/argo.rs | 5 +- src/crd/argo/mod.rs | 148 +++++++++++++++++++++++--------------- src/crd/gordo/gordo.rs | 33 ++++----- src/crd/gordo/mod.rs | 4 +- src/crd/metrics/mod.rs | 56 +++++++-------- src/crd/mod.rs | 4 +- src/crd/model/mod.rs | 44 ++++++------ src/crd/model/model.rs | 46 ++++++++---- src/crd/pod/mod.rs | 62 ++++++++-------- src/deploy_job.rs | 72 +++++++++---------- src/errors.rs | 2 +- src/lib.rs | 110 +++++++++++++++++----------- src/main.rs | 26 +++---- src/utils.rs | 33 ++++----- src/views.rs | 33 +++++---- tests/helpers.rs | 8 +-- tests/test_controller.rs | 23 +++--- tests/test_server.rs | 4 +- 23 files changed, 416 insertions(+), 335 deletions(-) diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 932771f..b251dac 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -6,7 +6,14 @@ on: push: branches: - master + paths-ignore: + - '**.md' pull_request: + types: + - opened + - synchronize + paths-ignore: + - '**.md' jobs: build-docker: @@ -57,6 +64,26 @@ jobs: rm -rf /tmp/.buildx-cache mv /tmp/.buildx-cache-new /tmp/.buildx-cache + fmt: + name: Check format ${{ matrix.rust }} + runs-on: ubuntu-latest + strategy: + matrix: + rust: [stable] + + steps: + - name: Checkout code + uses: actions/checkout@v3 + + - name: Setup Rust toolchain + uses: dtolnay/rust-toolchain@stable + with: + toolchain: ${{ matrix.rust }} + + - name: cargo fmt + run: | + cargo fmt --check + test: name: Test ${{ matrix.rust }} on ${{ matrix.os }} needs: build-docker diff --git a/Cargo.lock b/Cargo.lock index 003dced..dfef8cf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -832,7 +832,7 @@ checksum = "32085ea23f3234fc7846555e85283ba4de91e21016dc0455a16286d87a292d64" [[package]] name = "gordo-controller" -version = "2.1.1" +version = "2.1.3" dependencies = [ "actix-rt", "actix-web", diff --git a/Cargo.toml b/Cargo.toml index 17e80e4..f0ca195 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "gordo-controller" -version = "2.1.2" +version = "2.1.3" authors = ["Miles Granger ", "Serhii Koropets "] edition = "2018" diff --git a/README.md b/README.md index 0f03cb6..6161cbd 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,5 @@ # gordo-controller -Gordo controller +[Gordo](https://github.com/equinor/gordo) controller [![CI](https://github.com/equinor/gordo-controller/workflows/CI/badge.svg)](https://github.com/equinor/gordo-controller/actions) diff --git a/examples/query_server.rs b/examples/query_server.rs index d7d06e2..3efdb17 100644 --- a/examples/query_server.rs +++ b/examples/query_server.rs @@ -37,10 +37,7 @@ async fn main() -> Result<(), Box> { .await?; assert!(resp.is_empty()); - let body = reqwest::get("http://0.0.0.0:8888/metrics") - .await? - .text() - .await?; + let body = reqwest::get("http://0.0.0.0:8888/metrics").await?.text().await?; assert!(body.contains("gordo_controller_http_requests_total")); diff --git a/src/crd/argo/argo.rs b/src/crd/argo/argo.rs index 5fe3151..7ce9796 100644 --- a/src/crd/argo/argo.rs +++ b/src/crd/argo/argo.rs @@ -1,6 +1,6 @@ use kube::CustomResource; -use serde::{Deserialize, Serialize}; use schemars::JsonSchema; +use serde::{Deserialize, Serialize}; // Origin here https://github.com/argoproj/argo/blob/master/pkg/apis/workflow/v1alpha1/workflow_types.go#L34 #[derive(Serialize, Deserialize, Clone, Debug, PartialEq, JsonSchema)] @@ -30,8 +30,7 @@ impl Default for ArgoWorkflowPhase { #[kube(group = "argoproj.io", version = "v1alpha1", kind = "Workflow", namespaced)] #[kube(shortname = "wf")] #[kube(status = "ArgoWorkflowStatus")] -pub struct ArgoWorkflowSpec { -} +pub struct ArgoWorkflowSpec {} #[derive(Serialize, Deserialize, Clone, Debug, Default, JsonSchema)] pub struct ArgoWorkflowStatus { diff --git a/src/crd/argo/mod.rs b/src/crd/argo/mod.rs index 143c75f..2a0da4e 100644 --- a/src/crd/argo/mod.rs +++ b/src/crd/argo/mod.rs @@ -1,48 +1,46 @@ pub mod argo; pub use argo::*; -use log::{error, info, warn}; -use crate::crd::model::{Model, ModelPhase, ModelPodTerminatedStatus, patch_model_status, patch_model_with_default_status}; -use crate::crd::pod::{POD_MATCH_LABELS, FAILED}; use crate::crd::metrics::warning_happened; -use k8s_openapi::api::core::v1::ContainerStateTerminated; -use chrono::{DateTime, Utc}; -use k8s_openapi::{ - api::core::v1::Pod, +use crate::crd::model::{ + patch_model_status, patch_model_with_default_status, Model, ModelPhase, ModelPodTerminatedStatus, }; +use crate::crd::pod::{FAILED, POD_MATCH_LABELS}; +use chrono::{DateTime, Utc}; +use k8s_openapi::api::core::v1::ContainerStateTerminated; +use k8s_openapi::api::core::v1::Pod; use kube::api::Api; +use log::{error, info, warn}; pub const WF_MATCH_LABELS: &'static [&'static str] = &[ - "applications.gordo.equinor.com/project-name", - "applications.gordo.equinor.com/project-revision", + "applications.gordo.equinor.com/project-name", + "applications.gordo.equinor.com/project-revision", ]; pub const WF_NUMBER_LABEL: &str = "applications.gordo.equinor.com/project-workflow"; fn some_of_workflows_in_phases(workflows: &Vec<&Workflow>, phases: Vec) -> bool { - workflows.iter() - .any(|wf| match &wf.status { - Some(status) => match &status.phase { - Some(status_phase) => (&phases).into_iter().find(|phase| &status_phase == phase).is_some(), - None => false, - }, - _ => false, - }) + workflows.iter().any(|wf| match &wf.status { + Some(status) => match &status.phase { + Some(status_phase) => (&phases).into_iter().find(|phase| &status_phase == phase).is_some(), + None => false, + }, + _ => false, + }) } fn all_of_workflows_in_phases(workflows: &Vec<&Workflow>, phases: Vec) -> bool { - workflows.iter() - .all(|wf| match &wf.status { - Some(status) => match &status.phase { - Some(status_phase) => (&phases).into_iter().find(|phase| &status_phase == phase).is_some(), - None => false, - }, - _ => false, - }) + workflows.iter().all(|wf| match &wf.status { + Some(status) => match &status.phase { + Some(status_phase) => (&phases).into_iter().find(|phase| &status_phase == phase).is_some(), + None => false, + }, + _ => false, + }) } fn find_model_workflows<'a>(model: &'a Model, workflows: &'a [Workflow]) -> Vec<&'a Workflow> { - //TODO for performance reason we supposed to reimplement this algorithm with BTreeMap + //TODO for performance reason we supposed to reimplement this algorithm with BTreeMap workflows .iter() .filter(|workflow| { @@ -75,25 +73,23 @@ fn find_model_workflows<'a>(model: &'a Model, workflows: &'a [Workflow]) -> Vec< fn failed_pods_terminated_statuses<'a>(model: &'a Model, pods: &'a Vec) -> Vec<&'a ContainerStateTerminated> { pods.iter() - .filter(|pod| { - match &pod.status { - Some(status) => match &status.phase { - Some(phase) => phase == FAILED, - None => false, - }, + .filter(|pod| match &pod.status { + Some(status) => match &status.phase { + Some(phase) => phase == FAILED, None => false, - } + }, + None => false, }) .filter(|pod| { let pod_labels = &pod.metadata.labels; let model_labels = &model.metadata.labels; POD_MATCH_LABELS .iter() - .all(|&label_name| { - match (model_labels, pod_labels) { - (Some(model_labels), Some(pod_labels)) => model_labels.get(label_name) == pod_labels.get(label_name), - _ => false, + .all(|&label_name| match (model_labels, pod_labels) { + (Some(model_labels), Some(pod_labels)) => { + model_labels.get(label_name) == pod_labels.get(label_name) } + _ => false, }) }) .flat_map(|pod| pod.status.as_ref()) @@ -104,10 +100,13 @@ fn failed_pods_terminated_statuses<'a>(model: &'a Model, pods: &'a Vec) -> .collect() } -fn last_container_terminated_status(terminated_statuses: Vec<&ContainerStateTerminated>) -> Option<&ContainerStateTerminated> { +fn last_container_terminated_status( + terminated_statuses: Vec<&ContainerStateTerminated>, +) -> Option<&ContainerStateTerminated> { if terminated_statuses.len() > 0 { let min_date_time = DateTime::::MIN_UTC.clone(); - let last_terminated_state_ind = terminated_statuses.iter() + let last_terminated_state_ind = terminated_statuses + .iter() .enumerate() .max_by_key(|(_, terminated_state)| match &terminated_state.finished_at { Some(finished_at) => finished_at.0, @@ -129,7 +128,7 @@ pub async fn monitor_wf(model_api: &Api, workflows: &Vec, model None => { warn!("Model labels field is empty"); continue; - }, + } }; let model_name = match &model.metadata.name { Some(model_name) => model_name, @@ -139,17 +138,27 @@ pub async fn monitor_wf(model_api: &Api, workflows: &Vec, model } }; match &model.status { - Some(model_status) => { - let is_reapplied_model = match (&model_status.revision, labels.get("applications.gordo.equinor.com/project-revision")) { + Some(model_status) => { + let is_reapplied_model = match ( + &model_status.revision, + labels.get("applications.gordo.equinor.com/project-revision"), + ) { (Some(status_revision), Some(metadata_revision)) => status_revision != metadata_revision, _ => false, }; - if !is_reapplied_model { + if !is_reapplied_model { match &model_status.phase { ModelPhase::InProgress | ModelPhase::Unknown => { let found_workflows = find_model_workflows(&model, &workflows); let mut new_model_phase: Option = None; - if some_of_workflows_in_phases(&found_workflows, vec![ArgoWorkflowPhase::Error, ArgoWorkflowPhase::Failed, ArgoWorkflowPhase::Skipped]) { + if some_of_workflows_in_phases( + &found_workflows, + vec![ + ArgoWorkflowPhase::Error, + ArgoWorkflowPhase::Failed, + ArgoWorkflowPhase::Skipped, + ], + ) { new_model_phase = Some(ModelPhase::Failed); } else if all_of_workflows_in_phases(&found_workflows, vec![ArgoWorkflowPhase::Succeeded]) { new_model_phase = Some(ModelPhase::Succeeded); @@ -162,22 +171,31 @@ pub async fn monitor_wf(model_api: &Api, workflows: &Vec, model if let Some(model_name) = labels.get("applications.gordo.equinor.com/model-name") { let terminated_statuses = failed_pods_terminated_statuses(&model, &pods); info!("Found {} failed pods in terminated status which is relates to the model '{}'", terminated_statuses.len(), model_name); - if let Some(terminated_status) = last_container_terminated_status(terminated_statuses) { + if let Some(terminated_status) = + last_container_terminated_status(terminated_statuses) + { new_model_status.code = Some(terminated_status.exit_code); if let Some(message) = &terminated_status.message { let trimmed_message = message.trim_end(); if !trimmed_message.is_empty() { - let result: serde_json::Result = serde_json::from_str(&trimmed_message); + let result: serde_json::Result = + serde_json::from_str(&trimmed_message); match result { Ok(terminated_status_message) => { - info!("Last terminated status message {:?} for model '{}'", terminated_status_message, model_name); - new_model_status.error_type = terminated_status_message.error_type.clone(); - new_model_status.message = terminated_status_message.message.clone(); - new_model_status.traceback = terminated_status_message.traceback.clone(); - }, + info!( + "Last terminated status message {:?} for model '{}'", + terminated_status_message, model_name + ); + new_model_status.error_type = + terminated_status_message.error_type.clone(); + new_model_status.message = + terminated_status_message.message.clone(); + new_model_status.traceback = + terminated_status_message.traceback.clone(); + } Err(err) => { - warn!("Got JSON error where parsing pod's terminated message for the model '{}': {:?}", model_name, err); - warning_happened("parse_terminated_message") + warn!("Got JSON error where parsing pod's terminated message for the model '{}': {:?}", model_name, err); + warning_happened("parse_terminated_message") } } } @@ -188,24 +206,36 @@ pub async fn monitor_wf(model_api: &Api, workflows: &Vec, model if model_phase != model_status.phase { match patch_model_status(&model_api, &model_name, &new_model_status).await { Ok(new_model) => { - info!("Patching Model '{}' from status {:?} to {:?}", model_name, model.status, new_model.status); + info!( + "Patching Model '{}' from status {:?} to {:?}", + model_name, model.status, new_model.status + ); } Err(err) => { - error!( "Failed to patch status of Model '{}' - error: {:?}", model_name, err); + error!( + "Failed to patch status of Model '{}' - error: {:?}", + model_name, err + ); } } } } - }, + } _ => (), } } else { match patch_model_with_default_status(&model_api, &model).await { Ok(new_model) => { - info!("Patching Model '{}' from status {:?} to default status {:?}", model_name, model.status, new_model.status); + info!( + "Patching Model '{}' from status {:?} to default status {:?}", + model_name, model.status, new_model.status + ); } Err(err) => { - error!( "Failed to patch status of Model '{}' with default status - error: {:?}", model_name, err); + error!( + "Failed to patch status of Model '{}' with default status - error: {:?}", + model_name, err + ); } } } @@ -213,4 +243,4 @@ pub async fn monitor_wf(model_api: &Api, workflows: &Vec, model _ => (), }; } -} \ No newline at end of file +} diff --git a/src/crd/gordo/gordo.rs b/src/crd/gordo/gordo.rs index 32049be..b3c4aad 100644 --- a/src/crd/gordo/gordo.rs +++ b/src/crd/gordo/gordo.rs @@ -1,21 +1,19 @@ use futures::future::join_all; +use k8s_openapi::api::batch::v1::Job; use kube::{ - api::{Api, DeleteParams, ListParams, PatchParams, PostParams, Patch}, + api::{Api, DeleteParams, ListParams, Patch, PatchParams, PostParams}, client::Client, CustomResource, }; -use k8s_openapi::{ - api::batch::v1::Job, -}; -use log::{error, info, debug}; +use log::{debug, error, info}; +use schemars::JsonSchema; use serde::{Deserialize, Serialize}; use serde_json::{json, Value}; use std::collections::HashMap; -use schemars::JsonSchema; -use crate::{create_deploy_job, Config}; use crate::crd::metrics::KUBE_ERRORS; use crate::utils::get_revision; +use crate::{create_deploy_job, Config}; pub type GenerationNumber = Option; @@ -28,7 +26,13 @@ pub struct GordoConfig { } #[derive(CustomResource, Debug, Clone, Deserialize, Serialize, JsonSchema)] -#[kube(group = "equinor.com", version = "v1", kind = "Gordo", status="GordoStatus", namespaced)] +#[kube( + group = "equinor.com", + version = "v1", + kind = "Gordo", + status = "GordoStatus", + namespaced +)] #[kube(shortname = "gd")] pub struct ConfigMapGeneratorSpec { #[serde(rename = "deploy-version")] @@ -105,7 +109,7 @@ pub async fn start_gordo_deploy_job( Some(job) => job, None => { error!("Job is None"); - return + return; } }; @@ -121,7 +125,7 @@ pub async fn start_gordo_deploy_job( match jobs.create(&postparams, &job).await { Ok(job) => info!("Submitted job: {:?}", job.metadata.name), Err(e) => { - error!("Failed to submit job with error: {:?}", e); + error!("Failed to submit job with error: {:?}", e); } } @@ -129,10 +133,7 @@ pub async fn start_gordo_deploy_job( status.project_revision = revision; // Update the status of this job - info!( - "Setting status of this gordo '{}' to '{:?}'", - &gordo_name, &status - ); + info!("Setting status of this gordo '{}' to '{:?}'", &gordo_name, &status); let patch = json!({ "status": status }); match resource .patch_status(&gordo_name, &PatchParams::default(), &Patch::Merge(&patch)) @@ -140,7 +141,7 @@ pub async fn start_gordo_deploy_job( { Ok(o) => info!("Patched status: {:?}", o.status), Err(e) => { - error!("Failed to patch status: {:?}", e); + error!("Failed to patch status: {:?}", e); } }; } @@ -201,7 +202,7 @@ pub async fn remove_gordo_deploy_jobs(gordo: &Gordo, client: &Client, namespace: .await; } Err(e) => { - error!("Failed to list jobs: {:?}", e); + error!("Failed to list jobs: {:?}", e); } } } diff --git a/src/crd/gordo/mod.rs b/src/crd/gordo/mod.rs index f59492f..a5a6d19 100644 --- a/src/crd/gordo/mod.rs +++ b/src/crd/gordo/mod.rs @@ -1,9 +1,9 @@ use kube::{api::Api, client::Client}; -use crate::{Config}; +use crate::Config; pub mod gordo; -pub use gordo::{Gordo, GordoSubmissionStatus, start_gordo_deploy_job}; +pub use gordo::{start_gordo_deploy_job, Gordo, GordoSubmissionStatus}; pub async fn handle_gordo_state( gordo: &Gordo, diff --git a/src/crd/metrics/mod.rs b/src/crd/metrics/mod.rs index 2ba4adc..70f5eea 100644 --- a/src/crd/metrics/mod.rs +++ b/src/crd/metrics/mod.rs @@ -1,44 +1,44 @@ -use prometheus::{Opts, IntCounterVec, Registry}; use lazy_static::lazy_static; +use prometheus::{IntCounterVec, Opts, Registry}; pub const METRICS_NAMESPACE: &str = "gordo_controller"; lazy_static! { pub static ref KUBE_ERRORS: IntCounterVec = IntCounterVec::new( - Opts::new("kube_errors", "gordo-controller k8s related errors") - .namespace(METRICS_NAMESPACE), - &["action", "kube_name"] - ).unwrap(); + Opts::new("kube_errors", "gordo-controller k8s related errors").namespace(METRICS_NAMESPACE), + &["action", "kube_name"] + ) + .unwrap(); pub static ref WARNINGS: IntCounterVec = IntCounterVec::new( - Opts::new("warnings", "gordo-controller warnings") - .namespace(METRICS_NAMESPACE), - &["name"] - ).unwrap(); + Opts::new("warnings", "gordo-controller warnings").namespace(METRICS_NAMESPACE), + &["name"] + ) + .unwrap(); pub static ref RECONCILE_GORDO_COUNT: IntCounterVec = IntCounterVec::new( - Opts::new("reconcile_gordo_count", "Gordo reconcile count") - .namespace(METRICS_NAMESPACE), - &["gordo_name"] - ).unwrap(); + Opts::new("reconcile_gordo_count", "Gordo reconcile count").namespace(METRICS_NAMESPACE), + &["gordo_name"] + ) + .unwrap(); pub static ref RECONCILE_GORDO_SUCCEDED: IntCounterVec = IntCounterVec::new( - Opts::new("reconcile_gordo_succeded", "Reconcile Gordo succeded") - .namespace(METRICS_NAMESPACE), - &[] - ).unwrap(); + Opts::new("reconcile_gordo_succeded", "Reconcile Gordo succeded").namespace(METRICS_NAMESPACE), + &[] + ) + .unwrap(); pub static ref RECONCILE_GORDO_ERROR: IntCounterVec = IntCounterVec::new( - Opts::new("reconcile_gordo_error", "Reconcile Gordo errors") - .namespace(METRICS_NAMESPACE), - &[] - ).unwrap(); + Opts::new("reconcile_gordo_error", "Reconcile Gordo errors").namespace(METRICS_NAMESPACE), + &[] + ) + .unwrap(); } pub fn custom_metrics(registry: &Registry) { - registry.register(Box::new(KUBE_ERRORS.clone())).unwrap(); - registry.register(Box::new(WARNINGS.clone())).unwrap(); - registry.register(Box::new(RECONCILE_GORDO_COUNT.clone())).unwrap(); - registry.register(Box::new(RECONCILE_GORDO_SUCCEDED.clone())).unwrap(); - registry.register(Box::new(RECONCILE_GORDO_ERROR.clone())).unwrap(); + registry.register(Box::new(KUBE_ERRORS.clone())).unwrap(); + registry.register(Box::new(WARNINGS.clone())).unwrap(); + registry.register(Box::new(RECONCILE_GORDO_COUNT.clone())).unwrap(); + registry.register(Box::new(RECONCILE_GORDO_SUCCEDED.clone())).unwrap(); + registry.register(Box::new(RECONCILE_GORDO_ERROR.clone())).unwrap(); } pub fn warning_happened(name: &str) { - WARNINGS.with_label_values(&[name]).inc_by(1); -} \ No newline at end of file + WARNINGS.with_label_values(&[name]).inc_by(1); +} diff --git a/src/crd/mod.rs b/src/crd/mod.rs index 068a968..af61e9b 100644 --- a/src/crd/mod.rs +++ b/src/crd/mod.rs @@ -1,5 +1,5 @@ +pub mod argo; pub mod gordo; +pub mod metrics; pub mod model; pub mod pod; -pub mod argo; -pub mod metrics; \ No newline at end of file diff --git a/src/crd/model/mod.rs b/src/crd/model/mod.rs index 3bd6383..f7467be 100644 --- a/src/crd/model/mod.rs +++ b/src/crd/model/mod.rs @@ -1,14 +1,17 @@ pub mod model; pub use model::*; -use kube::api::{Api, PatchParams, Patch}; +use kube::api::{Api, Patch, PatchParams}; use log::{error, info, warn}; use serde_json::json; use crate::crd::gordo::gordo::{Gordo, GordoStatus}; use crate::errors::Error; -pub async fn patch_model_with_default_status<'a>(model_resource: &'a Api, model: &'a Model) -> Result{ +pub async fn patch_model_with_default_status<'a>( + model_resource: &'a Api, + model: &'a Model, +) -> Result { let mut status = ModelStatus::default(); status.revision = match model.metadata.labels.to_owned() { Some(labels) => match labels.get("applications.gordo.equinor.com/project-revision") { @@ -18,16 +21,19 @@ pub async fn patch_model_with_default_status<'a>(model_resource: &'a Api, None => None, }; match model.metadata.name.to_owned() { - Some(name) => patch_model_status( - model_resource, - &name, - &status - ).await.map_err(Error::KubeError), + Some(name) => patch_model_status(model_resource, &name, &status) + .await + .map_err(Error::KubeError), None => Err(Error::MissingKey(".metadata.name")), } } -pub async fn monitor_models(model_api: &Api, gordo_api: &Api, models: &Vec, gordos: &Vec) -> () { +pub async fn monitor_models( + model_api: &Api, + gordo_api: &Api, + models: &Vec, + gordos: &Vec, +) -> () { for model in models.iter() { if let None = model.status { //TODO Update state here @@ -36,14 +42,17 @@ pub async fn monitor_models(model_api: &Api, gordo_api: &Api, mode Some(name) => name, None => { warn!("Model does not have a name"); - continue + continue; } }; info!("Unknown status for model {}", name); match patch_model_with_default_status(model_api, &model).await { - Ok(new_model) => info!("Patching Model '{}' from status {:?} to {:?}", name, model.status, new_model.status), + Ok(new_model) => info!( + "Patching Model '{}' from status {:?} to {:?}", + name, model.status, new_model.status + ), Err(err) => { - error!( "Failed to patch status of Model '{}' - error: {:?}", name, err); + error!("Failed to patch status of Model '{}' - error: {:?}", name, err); } } } @@ -71,17 +80,12 @@ pub async fn monitor_models(model_api: &Api, gordo_api: &Api, mode Some(name) => name, None => { warn!("Gordo does not have a name"); - continue + continue; } }; - if let Err(err) = gordo_api - .patch_status(&name, &pp, &Patch::Merge(&patch)) - .await - { - error!( - "Failed to patch status of Gordo '{}' - error: {:?}", name, err - ); + if let Err(err) = gordo_api.patch_status(&name, &pp, &Patch::Merge(&patch)).await { + error!("Failed to patch status of Gordo '{}' - error: {:?}", name, err); } } } -} \ No newline at end of file +} diff --git a/src/crd/model/model.rs b/src/crd/model/model.rs index 89968b3..90a5ead 100644 --- a/src/crd/model/model.rs +++ b/src/crd/model/model.rs @@ -1,14 +1,20 @@ use crate::crd::gordo::Gordo; -use kube::api::{Api, PatchParams, Patch}; +use kube::api::{Api, Patch, PatchParams}; use kube::CustomResource; +use schemars::JsonSchema; use serde::{Deserialize, Serialize}; -use serde_json::Value; use serde_json::json; -use schemars::JsonSchema; +use serde_json::Value; /// Represents the 'spec' field of a Model custom resource definition #[derive(CustomResource, Serialize, Deserialize, Clone, Debug, JsonSchema)] -#[kube(group = "equinor.com", version = "v1", status="ModelStatus", kind = "Model", namespaced)] +#[kube( + group = "equinor.com", + version = "v1", + status = "ModelStatus", + kind = "Model", + namespaced +)] #[kube(shortname = "gm")] pub struct ModelSpec { #[serde(rename = "gordo-version")] @@ -62,7 +68,10 @@ pub fn filter_models_on_gordo<'a>(gordo: &'a Gordo, models: &'a [Model]) -> impl .iter() // Filter on OwnerReference .filter(move |model| { - match (model.metadata.owner_references.to_owned(), gordo.metadata.name.to_owned()) { + match ( + model.metadata.owner_references.to_owned(), + gordo.metadata.name.to_owned(), + ) { (Some(owner_references), Some(name)) => owner_references.iter().any(|owner_ref| owner_ref.name == name), _ => false, } @@ -71,11 +80,14 @@ pub fn filter_models_on_gordo<'a>(gordo: &'a Gordo, models: &'a [Model]) -> impl .filter(move |model| match gordo.status.as_ref() { Some(status) => { match model.metadata.labels.to_owned() { - Some(labels) => labels - .get("applications.gordo.equinor.com/project-revision") - // TODO: Here for compatibility when gordo-components <= 0.46.0 used 'project-version' to refer to 'project-revision' - // TODO: can remove when people have >= 0.47.0 of gordo - .or_else(|| { labels.get("applications.gordo.equinor.com/project-version") }) == Some(&status.project_revision), + Some(labels) => { + labels + .get("applications.gordo.equinor.com/project-revision") + // TODO: Here for compatibility when gordo-components <= 0.46.0 used 'project-version' to refer to 'project-revision' + // TODO: can remove when people have >= 0.47.0 of gordo + .or_else(|| labels.get("applications.gordo.equinor.com/project-version")) + == Some(&status.project_revision) + } None => false, } } @@ -83,10 +95,16 @@ pub fn filter_models_on_gordo<'a>(gordo: &'a Gordo, models: &'a [Model]) -> impl }) } -pub async fn patch_model_status<'a>(model_resource: &'a Api, model_name: &'a str, new_status: &ModelStatus) -> kube::Result { +pub async fn patch_model_status<'a>( + model_resource: &'a Api, + model_name: &'a str, + new_status: &ModelStatus, +) -> kube::Result { let patch_params = PatchParams::default(); let patch = json!({ "status": new_status }); - model_resource.patch_status(model_name, &patch_params, &Patch::Merge(&patch)).await + model_resource + .patch_status(model_name, &patch_params, &Patch::Merge(&patch)) + .await } pub fn get_model_project<'a>(model: &'a Model) -> Option { @@ -98,7 +116,7 @@ pub fn get_model_project<'a>(model: &'a Model) -> Option { } } None - }, + } _ => None, } -} \ No newline at end of file +} diff --git a/src/crd/pod/mod.rs b/src/crd/pod/mod.rs index 9d557fa..e1e4dca 100644 --- a/src/crd/pod/mod.rs +++ b/src/crd/pod/mod.rs @@ -1,10 +1,8 @@ -use log::{error, info, warn}; use kube::api::Api; +use log::{error, info, warn}; -use k8s_openapi::{ - api::core::v1::Pod, -}; -use crate::crd::model::{Model, ModelStatus, ModelPhase, patch_model_status}; +use crate::crd::model::{patch_model_status, Model, ModelPhase, ModelStatus}; +use k8s_openapi::api::core::v1::Pod; pub const PENDING: &str = "Pending"; pub const RUNNING: &str = "Running"; @@ -13,9 +11,9 @@ pub const FAILED: &str = "Failed"; pub const UNKNOWN: &str = "Unknown"; pub const POD_MATCH_LABELS: &'static [&'static str] = &[ - "applications.gordo.equinor.com/project-name", - "applications.gordo.equinor.com/project-revision", - "applications.gordo.equinor.com/model-name" + "applications.gordo.equinor.com/project-name", + "applications.gordo.equinor.com/project-revision", + "applications.gordo.equinor.com/model-name", ]; async fn update_model_status(model_resource: &Api, model: &Model, new_status: &ModelStatus) { @@ -23,31 +21,36 @@ async fn update_model_status(model_resource: &Api, model: &Model, new_sta Some(name) => name, None => { error!("Model metadata.name is empty"); - return + return; } }; match patch_model_status(model_resource, name, &new_status).await { - Ok(new_model) => info!("Patching Model '{}' from status {:?} to {:?}", name, model.status, new_model.status), + Ok(new_model) => info!( + "Patching Model '{}' from status {:?} to {:?}", + name, model.status, new_model.status + ), Err(err) => { - error!( "Failed to patch status of Model '{}' - error: {:?}", name, err); + error!("Failed to patch status of Model '{}' - error: {:?}", name, err); } } } pub async fn monitor_pods(model_api: &Api, models: &Vec, pods: &Vec) -> () { //Filtering only active models - let actual_models: Vec<_> = models.into_iter() + let actual_models: Vec<_> = models + .into_iter() .filter(|model| match &model.status { Some(status) => status.phase == ModelPhase::Unknown || status.phase == ModelPhase::InProgress, None => true, }) .collect(); if actual_models.is_empty() { - return + return; } //TODO to perform the models-pods matching in O(1) makes sense to do collect into some sort of HashMap here - let actual_pods_labels: Vec<_> = pods.iter() + let actual_pods_labels: Vec<_> = pods + .iter() .filter(|pod| match pod.metadata.labels.to_owned() { Some(labels) => labels.get("applications.gordo.equinor.com/model-name").is_some(), None => false, @@ -60,10 +63,10 @@ pub async fn monitor_pods(model_api: &Api, models: &Vec, pods: &Ve } else { None } - }, - _ => None - } - _ => None + } + _ => None, + }, + _ => None, }) .collect(); @@ -79,14 +82,13 @@ pub async fn monitor_pods(model_api: &Api, models: &Vec, pods: &Ve let new_model_status = match &model.status { Some(status) => { let pods_labels = &actual_pods_labels; - let pods_phases: Vec<_> = pods_labels.into_iter() - .filter(|(_, labels)| { - match &model.metadata.labels { - Some(model_labels) => POD_MATCH_LABELS. - iter(). - all(|&label_name| model_labels.get(label_name) == labels.get(label_name)), - None => false, - } + let pods_phases: Vec<_> = pods_labels + .into_iter() + .filter(|(_, labels)| match &model.metadata.labels { + Some(model_labels) => POD_MATCH_LABELS + .iter() + .all(|&label_name| model_labels.get(label_name) == labels.get(label_name)), + None => false, }) .map(|(phase, _)| phase) .collect(); @@ -108,15 +110,11 @@ pub async fn monitor_pods(model_api: &Api, models: &Vec, pods: &Ve } else { None } - }, + } None => None, }; if let Some(new_status) = new_model_status { - update_model_status( - model_api, - &model, - &new_status, - ).await; + update_model_status(model_api, &model, &new_status).await; } } } diff --git a/src/deploy_job.rs b/src/deploy_job.rs index 64a2a24..aa30d6b 100644 --- a/src/deploy_job.rs +++ b/src/deploy_job.rs @@ -1,19 +1,16 @@ use crate::{ - Gordo, - Config, - utils::{object_to_owner_reference, env_var}, + utils::{env_var, object_to_owner_reference}, + Config, Gordo, }; -use k8s_openapi::api::core::v1::{Container, EnvVar, PodSpec, PodTemplateSpec, - ResourceRequirements}; use k8s_openapi::api::batch::v1::{Job, JobSpec}; -use k8s_openapi::api::core::v1::{SecurityContext, Volume, VolumeMount, EmptyDirVolumeSource}; +use k8s_openapi::api::core::v1::{Container, EnvVar, PodSpec, PodTemplateSpec, ResourceRequirements}; +use k8s_openapi::api::core::v1::{EmptyDirVolumeSource, SecurityContext, Volume, VolumeMount}; use k8s_openapi::apimachinery::pkg::api::resource::Quantity; use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta as OpenApiObjectMeta; use kube::api::ObjectMeta; +use log::{info, warn}; use std::collections::BTreeMap; use std::iter::FromIterator; -use log::{warn, info}; - // TODO builder @@ -62,27 +59,25 @@ fn deploy_container(gordo: &Gordo, environment: Vec, config: &Config) -> ("memory".to_owned(), Quantity("1000Mi".to_owned())), ("cpu".to_owned(), Quantity("2000m".to_string())), ] - .into_iter(), + .into_iter(), )), requests: Some(BTreeMap::from_iter( vec![ ("memory".to_owned(), Quantity("500Mi".to_owned())), ("cpu".to_owned(), Quantity("250m".to_string())), ] - .into_iter(), + .into_iter(), )), }); let mut security_context = SecurityContext::default(); security_context.run_as_non_root = Some(true); if config.deploy_job_ro_fs { security_context.read_only_root_filesystem = Some(true); - container.volume_mounts = Some(vec![ - VolumeMount { - name: "tmp".to_string(), - mount_path: "/tmp".to_string(), - ..VolumeMount::default() - } - ]); + container.volume_mounts = Some(vec![VolumeMount { + name: "tmp".to_string(), + mount_path: "/tmp".to_string(), + ..VolumeMount::default() + }]); } container.security_context = Some(security_context); container @@ -93,15 +88,13 @@ fn deploy_pod_spec(containers: Vec, config: &Config) -> PodSpec { pod_spec.containers = containers; pod_spec.restart_policy = Some("Never".to_string()); if config.deploy_job_ro_fs { - pod_spec.volumes = Some(vec![ - Volume { - name: "tmp".to_string(), - empty_dir: Some(EmptyDirVolumeSource::default()), - ..Volume::default() - } - ]); + pod_spec.volumes = Some(vec![Volume { + name: "tmp".to_string(), + empty_dir: Some(EmptyDirVolumeSource::default()), + ..Volume::default() + }]); } - pod_spec.service_account = config.argo_service_account.as_ref().map(|v| { v.to_string() }); + pod_spec.service_account = config.argo_service_account.as_ref().map(|v| v.to_string()); pod_spec } @@ -139,18 +132,12 @@ pub fn create_deploy_job(gordo: &Gordo, config: &Config) -> Option { return None; } }; - let job_name_suffix = format!( - "{}-{}", - name, - &gordo.metadata.generation.unwrap_or(0) - ); + let job_name_suffix = format!("{}-{}", name, &gordo.metadata.generation.unwrap_or(0)); let job_name = deploy_job_name("gordo-dpl-", &job_name_suffix); info!("Creating job \"{}\" for Gordo \"{}\"", job_name, name); - let owner_references_result = object_to_owner_reference::( - gordo.metadata.clone() - ); + let owner_references_result = object_to_owner_reference::(gordo.metadata.clone()); let owner_references = match owner_references_result { Ok(owner_references) => owner_references, Err(_) => { @@ -182,8 +169,14 @@ pub fn create_deploy_job(gordo: &Gordo, config: &Config) -> Option { initial_environment.insert("WORKFLOW_GENERATOR_PROJECT_REVISION".into(), project_revision.clone()); // TODO: Backward compat. Until all have moved >=0.47.0 of gordo-components initial_environment.insert("WORKFLOW_GENERATOR_PROJECT_VERSION".into(), project_revision); - initial_environment.insert("WORKFLOW_GENERATOR_DOCKER_REGISTRY".into(), config.docker_registry.clone()); - initial_environment.insert("WORKFLOW_GENERATOR_GORDO_VERSION".into(), gordo.spec.deploy_version.clone()); + initial_environment.insert( + "WORKFLOW_GENERATOR_DOCKER_REGISTRY".into(), + config.docker_registry.clone(), + ); + initial_environment.insert( + "WORKFLOW_GENERATOR_GORDO_VERSION".into(), + gordo.spec.deploy_version.clone(), + ); initial_environment.insert("WORKFLOW_GENERATOR_RESOURCE_LABELS".into(), resources_labels); initial_environment.insert("DEBUG_SHOW_WORKFLOW".into(), debug_show_workflow.into()); @@ -201,7 +194,10 @@ pub fn create_deploy_job(gordo: &Gordo, config: &Config) -> Option { initial_environment.insert("ARGO_SERVICE_ACCOUNT".into(), argo_service_account.into()); } - initial_environment.insert("ARGO_VERSION_NUMBER".into(), config.argo_version_number.map_or("".into(), |v| v.to_string())); + initial_environment.insert( + "ARGO_VERSION_NUMBER".into(), + config.argo_version_number.map_or("".into(), |v| v.to_string()), + ); let resources_labels = &config.resources_labels; @@ -214,8 +210,8 @@ pub fn create_deploy_job(gordo: &Gordo, config: &Config) -> Option { let mut environment: Vec = vec![]; initial_environment.iter().for_each(|(key, value)| { - environment.push(env_var(key, value)); - }); + environment.push(env_var(key, value)); + }); let container = deploy_container(&gordo, environment, config); let pod_spec = deploy_pod_spec(vec![container], config); diff --git a/src/errors.rs b/src/errors.rs index 4b70d3c..ca6701d 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -17,7 +17,7 @@ pub enum ConfigError { #[error("Failed to load environment config: {0}")] Environment(#[source] envy::Error), #[error("Faild to load '{0}' config field: {1}")] - Field(&'static str, String) + Field(&'static str, String), } impl Error { diff --git a/src/lib.rs b/src/lib.rs index ef4507b..5ef6aac 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,36 +1,34 @@ -use std::result::Result; -use std::sync::Arc; -use serde::Deserialize; -use serde_json; +use crate::crd::metrics::{RECONCILE_GORDO_COUNT, RECONCILE_GORDO_ERROR, RECONCILE_GORDO_SUCCEDED}; +use crate::errors::ConfigError; use futures::StreamExt; +use k8s_openapi::api::core::v1::Pod; +use kube::runtime::controller::{Action, Context, Controller}; use kube::{ api::{Api, ListParams}, client::Client, }; -use kube::runtime::controller::{Context, Controller, Action}; -use k8s_openapi::{ - api::core::v1::Pod, -}; -use log::{info, warn, debug}; +use log::{debug, info, warn}; +use serde::Deserialize; +use serde_json; +use std::result::Result; +use std::sync::Arc; use tokio::time::Duration; -use crate::crd::metrics::{RECONCILE_GORDO_COUNT, RECONCILE_GORDO_SUCCEDED, RECONCILE_GORDO_ERROR}; -use crate::errors::ConfigError; pub mod crd; pub mod deploy_job; -pub mod views; -pub mod utils; pub mod errors; +pub mod utils; +pub mod views; use crate::crd::{ - gordo::{Gordo, handle_gordo_state}, - model::{monitor_models, Model}, - pod::{monitor_pods}, argo::{monitor_wf, Workflow}, + gordo::{handle_gordo_state, Gordo}, + model::{monitor_models, Model}, + pod::monitor_pods, }; pub use deploy_job::create_deploy_job; -use std::collections::{HashMap, BTreeMap}; use errors::Error; +use std::collections::{BTreeMap, HashMap}; fn default_deploy_repository() -> String { "".to_string() @@ -51,16 +49,16 @@ fn default_deploy_ro_fs() -> bool { #[derive(Deserialize, Debug, Clone)] pub struct GordoEnvironmentConfig { pub deploy_image: String, - #[serde(default="default_deploy_repository")] + #[serde(default = "default_deploy_repository")] pub deploy_repository: String, - #[serde(default="default_server_port")] + #[serde(default = "default_server_port")] pub server_port: u16, - #[serde(default="default_server_host")] + #[serde(default = "default_server_host")] pub server_host: String, pub docker_registry: String, pub default_deploy_environment: String, pub resources_labels: String, - #[serde(default="default_deploy_ro_fs")] + #[serde(default = "default_deploy_ro_fs")] pub deploy_job_ro_fs: bool, pub argo_service_account: Option, pub argo_version_number: Option, @@ -82,9 +80,9 @@ pub struct Config { } impl Config { - pub fn from_envs(envs: Iter) -> Result - where Iter: Iterator + where + Iter: Iterator, { let mut workflow_generator_envs: Vec<(String, String)> = vec![]; let mut other_envs: Vec<(String, String)> = vec![]; @@ -95,20 +93,23 @@ impl Config { other_envs.push((key, value)) } } - let env_config: GordoEnvironmentConfig = envy::from_iter::<_, _>(other_envs.into_iter()) - .map_err(|err| ConfigError::Environment(err) )?; + let env_config: GordoEnvironmentConfig = + envy::from_iter::<_, _>(other_envs.into_iter()).map_err(|err| ConfigError::Environment(err))?; debug!("WORKFLOW_GENERATOR environments: {:?}", workflow_generator_envs); debug!("Environment config: {:?}", &env_config); - let default_deploy_environment: Option> = Config::load_from_json(&env_config.default_deploy_environment) - .map_err(|err| ConfigError::Field("DEFAULT_DEPLOY_ENVIRONMENT", err))?; - let resources_labels: Option> = Config::load_from_json(&env_config.resources_labels) - .map_err(|err| ConfigError::Field("RESOURCES_LABELS", err))?; + let default_deploy_environment: Option> = + Config::load_from_json(&env_config.default_deploy_environment) + .map_err(|err| ConfigError::Field("DEFAULT_DEPLOY_ENVIRONMENT", err))?; + let resources_labels: Option> = + Config::load_from_json(&env_config.resources_labels) + .map_err(|err| ConfigError::Field("RESOURCES_LABELS", err))?; let argo_version_number = match env_config.argo_version_number { - Some(value) => { - let result = value.parse::() + Some(value) => { + let result = value + .parse::() .map_err(|err| ConfigError::Field("ARGO_VERSION_NUMBER", err.to_string()))?; Some(result) - }, + } None => None, }; Ok(Config { @@ -126,7 +127,10 @@ impl Config { }) } - pub fn load_from_json<'a, T>(json_value: &'a str) -> Result, String> where T: Deserialize<'a> { + pub fn load_from_json<'a, T>(json_value: &'a str) -> Result, String> + where + T: Deserialize<'a>, + { if json_value.is_empty() { return Ok(None); } @@ -142,7 +146,7 @@ impl Config { return match serde_json::to_string(resources_labels) { Ok(value) => Ok(value), Err(err) => Err(err.to_string()), - } + }; } Ok("".to_string()) } @@ -170,7 +174,6 @@ struct Data { config: Config, } - #[warn(unused_variables)] async fn reconcile_gordo(gordo: Arc, ctx: Context) -> Result { let namespace = gordo @@ -178,7 +181,11 @@ async fn reconcile_gordo(gordo: Arc, ctx: Context) -> Result, ctx: Context) -> Result = Api::namespaced(client.clone(), namespace); let models_obj_list = model_api.list(&lp).await.map_err(Error::KubeError)?; let models: Vec<_> = models_obj_list.into_iter().collect(); let names = utils::resource_names(&models); let current_gordo = (*gordo).clone(); - debug!("Reconcile {} {}{}", models.len(), utils::plural_str(models.len(), "models", Some(": ".to_string())), names); + debug!( + "Reconcile {} {}{}", + models.len(), + utils::plural_str(models.len(), "models", Some(": ".to_string())), + names + ); monitor_models(&model_api, &gordo_api, &models, &vec![current_gordo]).await; let workflow_api: Api = Api::namespaced(client.clone(), namespace); let workflows_obj_list = workflow_api.list(&lp).await.map_err(Error::KubeError)?; let workflows: Vec<_> = workflows_obj_list.into_iter().collect(); let names = utils::resource_names(&workflows); - debug!("Reconcile {} {}{}", workflows.len(), utils::plural_str(workflows.len(), "workflows", Some(": ".to_string())), names); + debug!( + "Reconcile {} {}{}", + workflows.len(), + utils::plural_str(workflows.len(), "workflows", Some(": ".to_string())), + names + ); let pod_api: Api = Api::namespaced(client.clone(), namespace); let pod_obj_list = pod_api.list(&lp).await.map_err(Error::KubeError)?; let pods: Vec<_> = pod_obj_list.into_iter().collect(); let names = utils::resource_names(&pods); - debug!("Reconcile {} {}{}", pods.len(), utils::plural_str(pods.len(), "pods", Some(": ".to_string())), names); + debug!( + "Reconcile {} {}{}", + pods.len(), + utils::plural_str(pods.len(), "pods", Some(": ".to_string())), + names + ); monitor_wf(&model_api, &workflows, &models, &pods).await; monitor_pods(&model_api, &models, &pods).await; @@ -218,7 +242,6 @@ async fn reconcile_gordo(gordo: Arc, ctx: Context) -> Result) -> Action { Action::requeue(Duration::from_secs(30)) } @@ -244,7 +267,8 @@ pub async fn init_gordo_controller(client: Client, config: Config) { Err(e) => { warn!("Reconcile failed: {:?}", e); RECONCILE_GORDO_ERROR.with_label_values(&[]).inc(); - }, + } } - }).await; + }) + .await; } diff --git a/src/main.rs b/src/main.rs index bce2427..79be140 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,21 +1,17 @@ use actix_web::{middleware, web, App, HttpServer}; -use gordo_controller::{init_gordo_controller, crd, views, Config, errors}; -use kube::{ - client::Client, -}; use actix_web_prom::PrometheusMetricsBuilder; -use prometheus::Registry; -use log::{info, warn}; use errors::Error; +use gordo_controller::{crd, errors, init_gordo_controller, views, Config}; +use kube::client::Client; +use log::{info, warn}; +use prometheus::Registry; use std::env::vars; - #[actix_rt::main] async fn main() -> Result<(), errors::Error> { //TODO do not forget about RUST_LOG env in all deployment scripts env_logger::init(); - let gordo_config = Config::from_envs(vars()).unwrap(); info!("Starting with config: {:?}", gordo_config); let bind_address = format!("{}:{}", &gordo_config.server_host, gordo_config.server_port); @@ -33,22 +29,18 @@ async fn main() -> Result<(), errors::Error> { let server = HttpServer::new(move || { App::new() - .app_data(web::Data::new(views::AppState{ - client: client.clone(), - })) + .app_data(web::Data::new(views::AppState { client: client.clone() })) .wrap(prometheus.clone()) - .wrap(middleware::Logger::default() - .exclude("/health") - .exclude("/metrics")) + .wrap(middleware::Logger::default().exclude("/health").exclude("/metrics")) .wrap(middleware::Compress::default()) .service(web::resource("/health").to(views::health)) .service(web::resource("/gordos").to(views::gordos)) .service(web::resource("/gordos/{name}").to(views::get_gordo)) .service(web::resource("/models").to(views::models)) .service(web::resource("/models/{gordo_name}").to(views::models_by_gordo)) - }) - .bind(&bind_address) - .expect(&format!("Could not bind to '{}'", &bind_address)); + }) + .bind(&bind_address) + .expect(&format!("Could not bind to '{}'", &bind_address)); tokio::select! { _ = server.run() => { diff --git a/src/utils.rs b/src/utils.rs index 52d08e4..1a42f00 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -1,15 +1,11 @@ +use crate::errors::Error; use k8s_openapi::{ + api::core::v1::EnvVar, apimachinery::pkg::apis::meta::v1::{ObjectMeta, OwnerReference}, - api::core::v1::{EnvVar}, -}; -use kube::{ - api::{Resource}, }; -use crate::errors::Error; +use kube::api::Resource; -pub fn object_to_owner_reference>( - meta: ObjectMeta, -) -> Result { +pub fn object_to_owner_reference>(meta: ObjectMeta) -> Result { Ok(OwnerReference { api_version: K::api_version(&()).to_string(), kind: K::kind(&()).to_string(), @@ -19,25 +15,20 @@ pub fn object_to_owner_reference>( }) } -pub fn resource_names>(resource: &Vec) -> String { +pub fn resource_names>(resource: &Vec) -> String { // TODO intersperse - let words: Vec = resource.iter() - .map(|resource| { - match &resource.meta().name { - Some(name) => format!("\"{}\"", name), - None => "".into(), - } + let words: Vec = resource + .iter() + .map(|resource| match &resource.meta().name { + Some(name) => format!("\"{}\"", name), + None => "".into(), }) .collect(); words.join(", ") } pub fn plural_str(length: usize, word: &str, suffix: Option) -> String { - let result = if length == 1 { - word.trim_end_matches('s') - } else { - word - }; + let result = if length == 1 { word.trim_end_matches('s') } else { word }; match (suffix, length > 0) { (Some(suffix_str), true) => { let mut suffix_owned = suffix_str.to_owned(); @@ -58,4 +49,4 @@ pub fn env_var(name: &str, value: &str) -> EnvVar { pub fn get_revision() -> String { chrono::Utc::now().timestamp_millis().to_string() -} \ No newline at end of file +} diff --git a/src/views.rs b/src/views.rs index ef1b441..e820779 100644 --- a/src/views.rs +++ b/src/views.rs @@ -1,10 +1,10 @@ use crate::crd::model::{filter_models_on_gordo, Model}; +use crate::errors::Error; use crate::Gordo; -use actix_web::{http::StatusCode, error, web, HttpResponseBuilder, http, HttpRequest, HttpResponse}; -use kube::{Client, Api}; +use actix_web::{error, http, http::StatusCode, web, HttpRequest, HttpResponse, HttpResponseBuilder}; use kube::api::ListParams; +use kube::{Api, Client}; use serde::Serialize; -use crate::errors::Error; pub struct AppState { pub client: Client, @@ -12,15 +12,14 @@ pub struct AppState { #[derive(Serialize)] struct ErrorResponse { - error: String + error: String, } impl error::ResponseError for Error { fn error_response(&self) -> HttpResponse { - HttpResponseBuilder::new(self.status_code()) - .json(ErrorResponse { - error: self.to_string() - }) + HttpResponseBuilder::new(self.status_code()).json(ErrorResponse { + error: self.to_string(), + }) } fn status_code(&self) -> http::StatusCode { @@ -38,24 +37,27 @@ pub async fn gordos(data: web::Data, _req: HttpRequest) -> actix_web:: let gordo_api: Api = Api::default_namespaced(data.client.clone()); let lp = ListParams::default(); - let gordo_list= gordo_api.list(&lp).await.map_err(Error::KubeError)?; + let gordo_list = gordo_api.list(&lp).await.map_err(Error::KubeError)?; let gordos: Vec = gordo_list.into_iter().collect(); Ok(web::Json(gordos)) } // Get a gordo by name -pub async fn get_gordo(data: web::Data, name: web::Path) -> actix_web::Result, Error> { +pub async fn get_gordo( + data: web::Data, + name: web::Path, +) -> actix_web::Result, Error> { let gordo_api: Api = Api::default_namespaced(data.client.clone()); let lp = ListParams::default(); let name_str = name.as_str(); - let gordo_list= gordo_api.list(&lp).await.map_err(Error::KubeError)?; + let gordo_list = gordo_api.list(&lp).await.map_err(Error::KubeError)?; let gordo: Option = gordo_list .into_iter() .filter(|gordo| gordo.metadata.name == Some(name_str.to_string())) .nth(0); match gordo { - Some(item) => Ok(web::Json(item)), + Some(item) => Ok(web::Json(item)), None => Err(Error::NotFound("gordo")), } } @@ -71,13 +73,16 @@ pub async fn models(data: web::Data, _req: HttpRequest) -> actix_web:: } // List current models belonging to a specific Gordo at the same project revision number -pub async fn models_by_gordo(data: web::Data, gordo_name: web::Path) -> actix_web::Result>, Error> { +pub async fn models_by_gordo( + data: web::Data, + gordo_name: web::Path, +) -> actix_web::Result>, Error> { let gordo_api: Api = Api::default_namespaced(data.client.clone()); let model_api: Api = Api::default_namespaced(data.client.clone()); let lp = ListParams::default(); let name_str = gordo_name.as_str(); - let gordo_list= gordo_api.list(&lp).await.map_err(Error::KubeError)?; + let gordo_list = gordo_api.list(&lp).await.map_err(Error::KubeError)?; // Get the gordo by name, can result in None let gordo_by_name: Option = gordo_list .into_iter() diff --git a/tests/helpers.rs b/tests/helpers.rs index de611cd..08ec0de 100644 --- a/tests/helpers.rs +++ b/tests/helpers.rs @@ -15,16 +15,14 @@ pub async fn client() -> Client { pub async fn remove_gordos(gordos: &Api) { for gordo in gordos.list(&ListParams::default()).await.unwrap().items.iter() { let name = gordo.metadata.name.clone().expect("gordo.metadata.name is empty"); - gordos - .delete(&name, &DeleteParams::default()) - .await - .unwrap(); + gordos.delete(&name, &DeleteParams::default()).await.unwrap(); } } // Get the repo's example `Gordo` config file pub fn deserialize_config(name: &str) -> T - where T: DeserializeOwned +where + T: DeserializeOwned, { let config_str = std::fs::read_to_string(format!("{}/{}", env!("CARGO_MANIFEST_DIR"), name)) .expect("Failed to read config file"); diff --git a/tests/test_controller.rs b/tests/test_controller.rs index 42860fb..d4ffc11 100644 --- a/tests/test_controller.rs +++ b/tests/test_controller.rs @@ -4,11 +4,11 @@ use kube::api::{DeleteParams, ListParams, PostParams}; mod helpers; -use gordo_controller::crd::gordo::Gordo; use gordo_controller::crd::gordo::gordo::GordoStatus; +use gordo_controller::crd::gordo::Gordo; use gordo_controller::crd::model::{filter_models_on_gordo, Model}; -use gordo_controller::{GordoEnvironmentConfig, Config}; -use gordo_controller::deploy_job::{deploy_job_name, create_deploy_job}; +use gordo_controller::deploy_job::{create_deploy_job, deploy_job_name}; +use gordo_controller::{Config, GordoEnvironmentConfig}; // We can create a gordo using the `example-gordo.yaml` file in the repo. #[tokio::test] @@ -24,10 +24,7 @@ async fn test_create_gordo() { // Apply the `gordo-example.yaml` file let gordo: Gordo = helpers::deserialize_config("example-gordo.yaml"); - let new_gordo = match gordos - .create(&PostParams::default(), &gordo) - .await - { + let new_gordo = match gordos.create(&PostParams::default(), &gordo).await { Ok(new_gordo) => new_gordo, Err(err) => panic!("Failed to create gordo with error: {:?}", err), }; @@ -74,10 +71,13 @@ fn test_deploy_job_injects_project_version() { gordo.metadata.uid = Some("6571b980-8824-4b4f-b87c-639c40ef91e3".to_string()); let envs: Vec<(String, String)> = vec![ - ("DEPLOY_IMAGE".to_string(), "ghcr.io/equinor/gordo-base:latest".to_string()), + ( + "DEPLOY_IMAGE".to_string(), + "ghcr.io/equinor/gordo-base:latest".to_string(), + ), ("DOCKER_REGISTRY".to_string(), "ghcr.io".to_string()), ("DEFAULT_DEPLOY_ENVIRONMENT".to_string(), "{}".to_string()), - ("RESOURCES_LABELS".to_string(), "{}".to_string()) + ("RESOURCES_LABELS".to_string(), "{}".to_string()), ]; let config = Config::from_envs(envs.into_iter()).unwrap(); @@ -113,7 +113,10 @@ fn test_filter_models_on_gordo() { // Change one of the models to have a revision matching the Gordo let mut new_labels = models[0].metadata.labels.clone().unwrap_or(BTreeMap::new()); - new_labels.insert("applications.gordo.equinor.com/project-version".to_owned(), project_revision); + new_labels.insert( + "applications.gordo.equinor.com/project-version".to_owned(), + project_revision, + ); models[0].metadata.labels = Some(new_labels); assert_eq!(filter_models_on_gordo(&gordo, &models).count(), 1); diff --git a/tests/test_server.rs b/tests/test_server.rs index bbaee8d..3962d5a 100644 --- a/tests/test_server.rs +++ b/tests/test_server.rs @@ -35,7 +35,5 @@ async fn test_view_models() { // Helper for just this module: loading app state for testing async fn app_state() -> web::Data { let client = helpers::client().await; - web::Data::new(views::AppState { - client: client.clone(), - }) + web::Data::new(views::AppState { client: client.clone() }) }