diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index af0c599..68cebd4 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -48,29 +48,37 @@ jobs: sudo kustomize build k8s/minikube - name: Install minikube + env: + MINIKUBE_VERSION: v1.16.0 + MINIKUBE_SHA256: "af29a48b2d79075f9d57be3a28724eef2cd628bb87283ed58dd72cbe1f8967c4" run: | + set -e + sudo apt-get update sudo apt-get install -y conntrack - sudo curl -L -o /usr/bin/minikube https://storage.googleapis.com/minikube/releases/latest/minikube-linux-amd64 + sudo curl -L -o /usr/bin/minikube https://storage.googleapis.com/minikube/releases/${MINIKUBE_VERSION}/minikube-linux-amd64 + echo "${MINIKUBE_SHA256} /usr/bin/minikube" | sha256sum -c --status sudo chmod +x /usr/bin/minikube - name: Start minikube + env: + KUBERNETES_VERSION: v1.18.17 + MINIKUBE_HOME: /home/runner + CHANGE_MINIKUBE_NONE_USER: "true" + KUBECONFIG: /home/runner/.kube/config run: | - sudo minikube start --vm-driver=none + set -e + sudo -E /usr/bin/minikube start --kubernetes-version=${KUBERNETES_VERSION} --vm-driver=none sudo chown -R $USER $HOME/.minikube $HOME/.kube - chmod -R u+wrx $HOME/.minikube $HOME/.kube - - minikube update-context kubectl apply -k k8s/minikube -n default || echo "Skipping on Istio error" - bash ci/wait_gordo_controller.sh - name: Test CRDs run: | - sudo kubectl get gordos > /dev/null - sudo kubectl get models > /dev/null + kubectl get gordos > /dev/null + kubectl get models > /dev/null - name: Unit tests uses: actions-rs/cargo@v1 diff --git a/Cargo.lock b/Cargo.lock index 5370ab7..448d636 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -250,6 +250,17 @@ dependencies = [ "syn 1.0.12 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "actix-web-prom" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "actix-service 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)", + "actix-web 2.0.0 (registry+https://github.com/rust-lang/crates.io-index)", + "futures 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)", + "prometheus 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "adler32" version = "1.0.4" @@ -771,6 +782,7 @@ version = "0.6.2" dependencies = [ "actix-rt 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)", "actix-web 2.0.0 (registry+https://github.com/rust-lang/crates.io-index)", + "actix-web-prom 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", "chrono 0.4.10 (registry+https://github.com/rust-lang/crates.io-index)", "env_logger 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)", "envy 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)", @@ -778,7 +790,9 @@ dependencies = [ "futures 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)", "k8s-openapi 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)", "kube 0.26.0 (registry+https://github.com/rust-lang/crates.io-index)", + "lazy_static 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.10 (registry+https://github.com/rust-lang/crates.io-index)", + "prometheus 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)", "reqwest 0.10.2 (registry+https://github.com/rust-lang/crates.io-index)", "serde 1.0.104 (registry+https://github.com/rust-lang/crates.io-index)", "serde_json 1.0.48 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1348,6 +1362,24 @@ dependencies = [ "unicode-xid 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "prometheus" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "cfg-if 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)", + "fnv 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)", + "lazy_static 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)", + "protobuf 2.17.0 (registry+https://github.com/rust-lang/crates.io-index)", + "spin 0.5.2 (registry+https://github.com/rust-lang/crates.io-index)", + "thiserror 1.0.10 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "protobuf" +version = "2.17.0" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "quick-error" version = "1.2.2" @@ -1687,6 +1719,11 @@ name = "sourcefile" version = "0.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "spin" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "syn" version = "1.0.12" @@ -2184,6 +2221,7 @@ dependencies = [ "checksum actix-utils 1.0.4 (registry+https://github.com/rust-lang/crates.io-index)" = "a31d6e44d044cbad9d599eaac4007cf5194621c514b1324ea5116863357b04d5" "checksum actix-web 2.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "3158e822461040822f0dbf1735b9c2ce1f95f93b651d7a7aded00b1efbb1f635" "checksum actix-web-codegen 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "de0878b30e62623770a4713a6338329fd0119703bafc211d3e4144f4d4a7bdd5" +"checksum actix-web-prom 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "abf286ef30e1cf7f274845fb1270efa1bf949f280fc654e060f1252e4076d0ea" "checksum adler32 1.0.4 (registry+https://github.com/rust-lang/crates.io-index)" = "5d2e7343e7fc9de883d1b0341e0b13970f764c14101234857d2ddafa1cb1cac2" "checksum aho-corasick 0.7.6 (registry+https://github.com/rust-lang/crates.io-index)" = "58fb5e95d83b38284460a5fda7d6470aa0b8844d283a0b614b8535e880800d2d" "checksum anyhow 1.0.19 (registry+https://github.com/rust-lang/crates.io-index)" = "57114fc2a6cc374bce195d3482057c846e706d252ff3604363449695684d7a0d" @@ -2310,6 +2348,8 @@ dependencies = [ "checksum proc-macro-hack 0.5.11 (registry+https://github.com/rust-lang/crates.io-index)" = "ecd45702f76d6d3c75a80564378ae228a85f0b59d2f3ed43c91b4a69eb2ebfc5" "checksum proc-macro-nested 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "369a6ed065f249a159e06c45752c780bda2fb53c995718f9e484d08daa9eb42e" "checksum proc-macro2 1.0.7 (registry+https://github.com/rust-lang/crates.io-index)" = "0319972dcae462681daf4da1adeeaa066e3ebd29c69be96c6abb1259d2ee2bcc" +"checksum prometheus 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)" = "b0575e258dab62268e7236d7307caa38848acbda7ec7ab87bd9093791e999d20" +"checksum protobuf 2.17.0 (registry+https://github.com/rust-lang/crates.io-index)" = "cb14183cc7f213ee2410067e1ceeadba2a7478a59432ff0747a335202798b1e2" "checksum quick-error 1.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "9274b940887ce9addde99c4eee6b5c44cc494b182b97e73dc8ffdcb3397fd3f0" "checksum quote 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)" = "053a8c8bcc71fcce321828dc897a98ab9760bef03a4fc36693c231e5b3216cfe" "checksum rand 0.7.2 (registry+https://github.com/rust-lang/crates.io-index)" = "3ae1b169243eaf61759b8475a998f0a385e42042370f3a7dbaf35246eacc8412" @@ -2348,6 +2388,7 @@ dependencies = [ "checksum smallvec 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "44e59e0c9fa00817912ae6e4e6e3c4fe04455e75699d06eedc7d85917ed8e8f4" "checksum socket2 0.3.11 (registry+https://github.com/rust-lang/crates.io-index)" = "e8b74de517221a2cb01a53349cf54182acdc31a074727d3079068448c0676d85" "checksum sourcefile 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)" = "4bf77cb82ba8453b42b6ae1d692e4cdc92f9a47beaf89a847c8be83f4e328ad3" +"checksum spin 0.5.2 (registry+https://github.com/rust-lang/crates.io-index)" = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d" "checksum syn 1.0.12 (registry+https://github.com/rust-lang/crates.io-index)" = "ddc157159e2a7df58cd67b1cace10b8ed256a404fb0070593f137d8ba6bef4de" "checksum synstructure 0.12.3 (registry+https://github.com/rust-lang/crates.io-index)" = "67656ea1dc1b41b1451851562ea232ec2e5a80242139f7e679ceccfb5d61f545" "checksum tempfile 3.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "7a6e24d9338a0a5be79593e2fa15a648add6138caa803e2d5bc782c371732ca9" diff --git a/Cargo.toml b/Cargo.toml index 48390a2..f8c4b72 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,6 +20,9 @@ serde = { version = "1.0", features = ["derive"]} serde_json = "1.0" tokio = { version = "0.2.12", features = ["full"] } futures = "0.3.4" +prometheus = "0.8.0" +actix-web-prom = "0.2.0" +lazy_static = "1.4.0" [dev-dependencies] serde_yaml = "0.8.11" diff --git a/ci/integration_tests.sh b/ci/integration_tests.sh index b86cf58..68bd75a 100644 --- a/ci/integration_tests.sh +++ b/ci/integration_tests.sh @@ -1,5 +1,8 @@ #!/bin/bash +export DEPLOY_IMAGE=docker.io/gordo/gordo-deploy +export DOCKER_REGISTRY=docker.io + SLEEP_TIMEOUT=10 cargo run & diff --git a/examples/query_server.rs b/examples/query_server.rs index 1dcc30f..e001553 100644 --- a/examples/query_server.rs +++ b/examples/query_server.rs @@ -38,6 +38,13 @@ async fn main() -> Result<(), Box> { .await?; assert!(resp.is_empty()); + let body = reqwest::get("http://0.0.0.0:8888/metrics") + .await? + .text() + .await?; + + assert!(body.contains("gordo_controller_http_requests_total")) + // Apply a Gordo and Model let gordo: Value = read_manifest("example-gordo.yaml"); let gordo: Gordo = serde_json::from_value(gordo).unwrap(); diff --git a/src/crd/argo/mod.rs b/src/crd/argo/mod.rs index 9091e35..9eaa3ef 100644 --- a/src/crd/argo/mod.rs +++ b/src/crd/argo/mod.rs @@ -5,9 +5,10 @@ use futures::future::join3; use log::{error, info, warn}; use kube::api::Object; use k8s_openapi::api::core::v1::{PodSpec, PodStatus}; -use crate::crd::model::{Model, ModelPhase, ModelPodTerminatedStatus, patch_model_status, patch_model_with_default_status}; +use crate::crd::model::{Model, ModelPhase, ModelPodTerminatedStatus, patch_model_status, patch_model_with_default_status, get_model_project}; use crate::crd::pod::{POD_MATCH_LABELS, FAILED}; use crate::Controller; +use crate::crd::metrics::{kube_error_happened, warning_happened, ModelPhasesMetrics, update_model_counts}; use k8s_openapi::api::core::v1::ContainerStateTerminated; use chrono::MIN_DATE; @@ -106,16 +107,21 @@ fn last_container_terminated_status(terminated_statuses: Vec<&ContainerStateTerm } pub async fn monitor_wf(controller: &Controller) -> () { + // TODO this function definitely need to be refactored let (workflows, models, pods) = join3(controller.wf_state(), controller.model_state(), controller.pod_state()).await; + let mut model_phases_metrics = ModelPhasesMetrics::new(None); for model in models { + let labels = &model.metadata.labels; + let mut current_phase: Option = None; + let current_project: Option = get_model_project(&model); match &model.status { Some(model_status) => { - let labels = &model.metadata.labels; let is_reapplied_model = match (&model_status.revision, labels.get("applications.gordo.equinor.com/project-revision")) { (Some(status_revision), Some(metadata_revision)) => status_revision != metadata_revision, _ => false, }; + current_phase = Some(model_status.phase.clone()); if !is_reapplied_model { match &model_status.phase { ModelPhase::InProgress | ModelPhase::Unknown => { @@ -147,7 +153,10 @@ pub async fn monitor_wf(controller: &Controller) -> () { new_model_status.message = terminated_status_message.message.clone(); new_model_status.traceback = terminated_status_message.traceback.clone(); }, - Err(err) => warn!("Got JSON error where parsing pod's terminated message for the model '{}': {:?}", model_name, err), + Err(err) => { + warn!("Got JSON error where parsing pod's terminated message for the model '{}': {:?}", model_name, err); + warning_happened("parse_terminated_message") + } } } } @@ -156,8 +165,17 @@ pub async fn monitor_wf(controller: &Controller) -> () { } if model_phase != model_status.phase { match patch_model_status(&controller.model_resource, &model.metadata.name, new_model_status).await { - Ok(new_model) => info!("Patching Model '{}' from status {:?} to {:?}", model.metadata.name, model.status, new_model.status), - Err(err) => error!( "Failed to patch status of Model '{}' - error: {:?}", model.metadata.name, err), + Ok(new_model) => { + info!("Patching Model '{}' from status {:?} to {:?}", model.metadata.name, model.status, new_model.status); + current_phase = match new_model.status { + Some(status) => Some(status.phase), + None => None, + } + } + Err(err) => { + error!( "Failed to patch status of Model '{}' - error: {:?}", model.metadata.name, err); + kube_error_happened("patch_model", err); + } } } } @@ -166,12 +184,28 @@ pub async fn monitor_wf(controller: &Controller) -> () { } } else { match patch_model_with_default_status(&controller.model_resource, &model).await { - Ok(new_model) => info!("Patching Model '{}' from status {:?} to default status {:?}", model.metadata.name, model.status, new_model.status), - Err(err) => error!( "Failed to patch status of Model '{}' with default status - error: {:?}", model.metadata.name, err), + Ok(new_model) => { + info!("Patching Model '{}' from status {:?} to default status {:?}", model.metadata.name, model.status, new_model.status); + current_phase = match new_model.status { + Some(status) => Some(status.phase), + None => None, + } + } + Err(err) => { + error!( "Failed to patch status of Model '{}' with default status - error: {:?}", model.metadata.name, err); + kube_error_happened("patch_model", err); + } } } } _ => (), - } + }; + match (current_project, current_phase) { + (Some(project), Some(phase)) => { + model_phases_metrics.inc_model_counts(project, phase); + }, + _ => (), + }; } + update_model_counts(&model_phases_metrics); } \ No newline at end of file diff --git a/src/crd/gordo/gordo.rs b/src/crd/gordo/gordo.rs index b86e488..0cd3b8c 100644 --- a/src/crd/gordo/gordo.rs +++ b/src/crd/gordo/gordo.rs @@ -9,6 +9,7 @@ use serde_json::{json, Value}; use std::collections::HashMap; use crate::{DeployJob, GordoEnvironmentConfig}; +use crate::crd::metrics::{kube_error_happened}; pub type GenerationNumber = Option; pub type Gordo = Object; @@ -112,7 +113,10 @@ pub async fn start_gordo_deploy_job( let serialized_job_manifest = serde_json::to_vec(&job).unwrap(); match jobs.create(&postparams, serialized_job_manifest).await { Ok(job) => info!("Submitted job: {:?}", job.metadata.name), - Err(e) => error!("Failed to submit job with error: {:?}", e), + Err(e) => { + error!("Failed to submit job with error: {:?}", e); + kube_error_happened("submit_job", e); + } } let mut status = GordoStatus::from(gordo); @@ -130,7 +134,10 @@ pub async fn start_gordo_deploy_job( .await { Ok(o) => info!("Patched status: {:?}", o.status), - Err(e) => error!("Failed to patch status: {:?}", e), + Err(e) => { + error!("Failed to patch status: {:?}", e); + kube_error_happened("patch_gordo", e); + } }; } @@ -165,16 +172,22 @@ pub async fn remove_gordo_deploy_jobs(gordo: &Gordo, client: &APIClient, namespa tokio::time::delay_for(std::time::Duration::from_secs(1)).await; } } - Err(err) => error!( + Err(err) => { + error!( "Failed to delete old gordo job: '{}' with error: {:?}", &job.metadata.name, err - ), + ); + kube_error_happened("delete_gordo", err); + } } } }), ) .await; } - Err(e) => error!("Failed to list jobs: {:?}", e), + Err(e) => { + error!("Failed to list jobs: {:?}", e); + kube_error_happened("list_gordo", e); + } } } diff --git a/src/crd/gordo/mod.rs b/src/crd/gordo/mod.rs index 568758d..5ab143e 100644 --- a/src/crd/gordo/mod.rs +++ b/src/crd/gordo/mod.rs @@ -1,8 +1,10 @@ use futures::future::join_all; use kube::{api::Api, client::APIClient}; use log::error; +use std::collections::{HashSet}; use crate::{Controller, GordoEnvironmentConfig}; +use crate::crd::metrics::{KUBE_ERRORS, update_gordo_projects}; pub mod gordo; pub use gordo::*; @@ -21,12 +23,19 @@ pub async fn monitor_gordos(controller: &Controller) -> () { })) .await; + let gordo_projects: HashSet = gordos.into_iter() + .map(|gordo| { gordo.metadata.name }) + .collect(); + // Log any errors in handling state results.iter().for_each(|result| { if let Err(err) = result { error!("{:?}", err); + KUBE_ERRORS.with_label_values(&["monitor_gordos", "unknown"]).inc_by(1); } }); + + update_gordo_projects(&gordo_projects); } async fn handle_gordo_state( diff --git a/src/crd/metrics/mod.rs b/src/crd/metrics/mod.rs new file mode 100644 index 0000000..64098c4 --- /dev/null +++ b/src/crd/metrics/mod.rs @@ -0,0 +1,166 @@ +use crate::crd::model::{ModelPhase, PHASES_COUNT}; + +use std::collections::{HashMap, HashSet}; +use std::sync::Mutex; +use prometheus::{Opts, IntCounterVec, IntGaugeVec, Registry}; +use lazy_static::lazy_static; +use kube::{Error}; + +pub const METRICS_NAMESPACE: &str = "gordo_controller"; + +lazy_static! { + pub static ref KUBE_ERRORS: IntCounterVec = IntCounterVec::new( + Opts::new("kube_errors", "gordo-controller k8s related errors") + .namespace(METRICS_NAMESPACE), + &["action", "kube_name"] + ).unwrap(); + pub static ref WARNINGS: IntCounterVec = IntCounterVec::new( + Opts::new("warnings", "gordo-controller warnings") + .namespace(METRICS_NAMESPACE), + &["name"] + ).unwrap(); + pub static ref MODEL_COUNTS: IntGaugeVec = IntGaugeVec::new( + Opts::new("model_counts", "Number of models per projects and phases") + .namespace(METRICS_NAMESPACE), + &["project", "phase"] + ).unwrap(); + pub static ref GORDO_PROJECTS: IntGaugeVec = IntGaugeVec::new( + Opts::new("gordo_projects", "One metric per gordo project") + .namespace(METRICS_NAMESPACE), + &["project"] + ).unwrap(); + pub static ref PROJECTS: Mutex> = Mutex::new(HashMap::new()); +} + +pub fn custom_metrics(registry: &Registry) { + registry.register(Box::new(KUBE_ERRORS.clone())).unwrap(); + registry.register(Box::new(WARNINGS.clone())).unwrap(); + registry.register(Box::new(MODEL_COUNTS.clone())).unwrap(); + registry.register(Box::new(GORDO_PROJECTS.clone())).unwrap(); +} + +pub fn kube_error_name<'a>(err: Error) -> &'a str { + match err { + Error::Api(_) => "api", + Error::ReqwestError(_) => "request_error", + Error::HttpError(_) => "http_error", + Error::SerdeError(_) => "serde_error", + Error::RequestBuild => "request_build", + Error::RequestSend => "request_send", + Error::RequestParse => "request_parse", + Error::InvalidMethod(_) => "request_method", + Error::RequestValidation(_) => "request_validation", + Error::KubeConfig(_) => "kube_config", + Error::SslError(_) => "ssl_error", + } +} + +pub fn kube_error_happened(action: &str, err: Error) { + KUBE_ERRORS.with_label_values(&[action, kube_error_name(err)]).inc_by(1); +} + +pub fn warning_happened(name: &str) { + WARNINGS.with_label_values(&[name]).inc_by(1); +} + + +//Number of phases +const INITIAL_PROJECT_COUNT: usize = 5; + +pub struct ModelPhasesMetrics { + projects: HashMap, + metrics: Vec, + next_index: usize, +} + +impl ModelPhasesMetrics { + + pub fn new(initial_projects_count: Option) -> Self { + let project_count = initial_projects_count.unwrap_or(INITIAL_PROJECT_COUNT as u32) as usize; + ModelPhasesMetrics { + projects: HashMap::with_capacity(project_count), + metrics: Vec::with_capacity(project_count * PHASES_COUNT), + next_index: 0, + } + } + + fn get_index(phase: ModelPhase) -> usize { + match phase { + ModelPhase::Unknown => 0, + ModelPhase::InProgress => 1, + ModelPhase::Succeeded => 2, + ModelPhase::Failed => 3, + } + } + + fn get_project_index(&mut self, project: String) -> usize { + match self.projects.get(&project) { + Some(index) => *index, + None => { + let index = self.next_index; + let next_index = index + PHASES_COUNT; + self.metrics.resize(next_index, 0); + self.projects.insert(project, index); + self.next_index = next_index; + index + } + } + } + + pub fn inc_model_counts(&mut self, project: String, phase: ModelPhase) { + let base_index = self.get_project_index(project); + let index = base_index + Self::get_index(phase); + self.metrics[index] = self.metrics[index] + 1; + } +} + +fn phase_labels<'a>() -> [(ModelPhase, &'a str); PHASES_COUNT] { + return [ + (ModelPhase::Unknown, "unknown"), + (ModelPhase::InProgress, "in_progress"), + (ModelPhase::Succeeded, "succeeded"), + (ModelPhase::Failed, "failed"), + ]; +} + +pub fn update_gordo_projects(gordo_projects: &HashSet) { + // TODO consider to return Result<...> from this function + let mut old_project = PROJECTS.lock().unwrap(); + for (project, exists) in old_project.iter_mut() { + let new_exists = gordo_projects.contains(project); + if !new_exists { + GORDO_PROJECTS.with_label_values(&[project]).set(0); + *exists = false; + } + } + for project in gordo_projects { + GORDO_PROJECTS.with_label_values(&[project]).set(1); + old_project.insert(project.clone(), true); + } +} + +pub fn update_model_counts(model_phases_metrics: &ModelPhasesMetrics) { + // TODO consider to return Result<...> from this function + let old_project = PROJECTS.lock().unwrap(); + let new_projects = &model_phases_metrics.projects; + let mut labels: [&str; 2] = ["", ""]; + let phase_labels = phase_labels(); + for (project, exists) in old_project.iter() { + labels[0] = project; + for (model_phase, phase_label) in &phase_labels { + labels[1] = phase_label; + let mut metric: i64 = 0; + if *exists { + // TODO move this part to ModelPhasesMetrics + metric = match new_projects.get(project) { + Some(base_index) => { + let index = base_index + ModelPhasesMetrics::get_index(model_phase.clone()); + model_phases_metrics.metrics[index] + } + None => 0 + } + } + MODEL_COUNTS.with_label_values(&labels).set(metric); + } + } +} \ No newline at end of file diff --git a/src/crd/mod.rs b/src/crd/mod.rs index fc51b2d..068a968 100644 --- a/src/crd/mod.rs +++ b/src/crd/mod.rs @@ -1,4 +1,5 @@ pub mod gordo; pub mod model; pub mod pod; -pub mod argo; \ No newline at end of file +pub mod argo; +pub mod metrics; \ No newline at end of file diff --git a/src/crd/model/mod.rs b/src/crd/model/mod.rs index 3e7ce82..3e1dd27 100644 --- a/src/crd/model/mod.rs +++ b/src/crd/model/mod.rs @@ -7,6 +7,7 @@ use serde_json::json; use crate::crd::gordo::GordoStatus; use crate::Controller; +use crate::crd::metrics::{kube_error_happened}; pub async fn patch_model_with_default_status<'a>(model_resource: &'a Api, model: &'a Model) -> Result{ let mut status = ModelStatus::default(); @@ -28,7 +29,10 @@ pub async fn monitor_models(controller: &Controller) -> () { info!("Unknown status for model {}", model.metadata.name); match patch_model_with_default_status(&controller.model_resource, &model).await { Ok(new_model) => info!("Patching Model '{}' from status {:?} to {:?}", model.metadata.name, model.status, new_model.status), - Err(err) => error!( "Failed to patch status of Model '{}' - error: {:?}", model.metadata.name, err), + Err(err) => { + error!( "Failed to patch status of Model '{}' - error: {:?}", model.metadata.name, err); + kube_error_happened("patch_gordo", err); + } } } } @@ -60,6 +64,7 @@ pub async fn monitor_models(controller: &Controller) -> () { "Failed to patch status of Gordo '{}' - error: {:?}", &gordo.metadata.name, err ); + kube_error_happened("patch_gordo", err); } } } diff --git a/src/crd/model/model.rs b/src/crd/model/model.rs index 47a1a06..64d4926 100644 --- a/src/crd/model/model.rs +++ b/src/crd/model/model.rs @@ -37,6 +37,7 @@ pub enum ModelPhase { #[serde(alias = "succeeded")] Succeeded, } +pub const PHASES_COUNT: usize = 4; impl Default for ModelPhase { fn default() -> Self { @@ -97,4 +98,13 @@ pub async fn patch_model_status<'a>(model_resource: &'a Api, model_name: let patch_params = PatchParams::default(); let patch = serde_json::to_vec(&json!({ "status": new_status })).unwrap(); model_resource.patch_status(model_name, &patch_params, patch).await +} + +pub fn get_model_project<'a>(model: &'a Model) -> Option { + for ownerReference in &model.metadata.ownerReferences { + if ownerReference.kind.eq("Gordo") { + return Some(ownerReference.name.clone()); + } + } + return None; } \ No newline at end of file diff --git a/src/crd/pod/mod.rs b/src/crd/pod/mod.rs index 886176e..fc47fb4 100644 --- a/src/crd/pod/mod.rs +++ b/src/crd/pod/mod.rs @@ -3,6 +3,7 @@ use kube::api::Api; use crate::Controller; use crate::crd::model::{Model, ModelStatus, ModelPhase, patch_model_status}; +use crate::crd::metrics::{kube_error_happened}; pub const PENDING: &str = "Pending"; pub const RUNNING: &str = "Running"; @@ -19,7 +20,10 @@ pub const POD_MATCH_LABELS: &'static [&'static str] = &[ async fn update_model_status(model_resource: &Api, model: &Model, new_status: ModelStatus) { match patch_model_status(model_resource, &model.metadata.name, new_status).await { Ok(new_model) => info!("Patching Model '{}' from status {:?} to {:?}", model.metadata.name, model.status, new_model.status), - Err(err) => error!( "Failed to patch status of Model '{}' - error: {:?}", model.metadata.name, err), + Err(err) => { + error!( "Failed to patch status of Model '{}' - error: {:?}", model.metadata.name, err); + kube_error_happened("faild_to_patch_model", err); + } } } diff --git a/src/lib.rs b/src/lib.rs index bc251bb..0f23d9b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -158,6 +158,7 @@ pub async fn controller_init( loop { if let Err(err) = c1.poll().await { error!("Controller polling encountered an error: {:?}", err); + crd::metrics::KUBE_ERRORS.with_label_values(&["controller_polling"]).inc_by(1); } } }); diff --git a/src/main.rs b/src/main.rs index 38d1170..9ed36b7 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,5 +1,7 @@ use actix_web::{middleware, web, App, HttpServer}; -use gordo_controller::{controller_init, views, GordoEnvironmentConfig}; +use gordo_controller::{controller_init, crd, views, GordoEnvironmentConfig}; +use actix_web_prom::PrometheusMetrics; +use prometheus::{Registry}; use kube::config; use log::info; @@ -22,10 +24,17 @@ async fn main() -> () { let controller = controller_init(kube_config, env_config).await.unwrap(); + let registry = Registry::new(); + crd::metrics::custom_metrics(®istry); + let prometheus = PrometheusMetrics::new_with_registry(registry, crd::metrics::METRICS_NAMESPACE, Some("/metrics"), None).unwrap(); + HttpServer::new(move || { App::new() .data(controller.clone()) - .wrap(middleware::Logger::default().exclude("/health")) + .wrap(prometheus.clone()) + .wrap(middleware::Logger::default() + .exclude("/health") + .exclude("/metrics")) .wrap(middleware::Compress::default()) .service(web::resource("/health").to(views::health)) .service(web::resource("/gordos").to(views::gordos))