Skip to content

Commit

Permalink
Prometheus support (#206)
Browse files Browse the repository at this point in the history
* Prometheus metrics middleware

* Prometheus metrics middleware

* kube_errors metric

* Fixing "Start minikube" test

* Fixing "Start minikube" test second attempt

* Run minikube under root user

* Fixing unit-tests

* Second attempt for fixing unittests

* CHANGE_MINIKUBE_NONE_USER=true

* CHANGE_MINIKUBE_NONE_USER=true

* Yet another atempt to fix unit-texts

* Debug ls for ci.yaml

* Debugging ci.yaml

* Debugging ci.yaml

* Debugging ci.yaml

* cp -r for ci.yaml

* Additional env vars for minikube

* minikube profiles list

* -E for sudo

* Remove debugging from ci.yaml

* Remove set -x from ci.yaml

* RUST_BACKTRACE=1

* Run unit-tests as shell script

* ls $HOME/.kube/config

* minikube update-context for runner

* Revert changes in ci.yaml

* using minikube 1.17.1

* v1.16.0

* Formating for sha256 sum file

* sha256 as stdin

* 1.15.1

* Downgrade kubernetes to v1.18.17

* sudo -E. Formating for ci.yaml

* /home/runner

* Check k8s resources without sudo

* Debug .kube/config

* Revert "Debug .kube/config"

This reverts commit 3e02a47.

* Do not minikube update-context

* print $HOME/.kube/config

* Debugging ci.yaml

* Debugging ci.yaml

* Debugging ci.yaml

* I hope that this config will fix problems with CI

* Basic test for /metrics endpoint

* metric for parse_terminated_message

* error metrics and ModelPhasesMetrics

* src/metrics.rs -> src/crd/metrics/mod.rs

* Fix all errors

* ModelPhasesMetrics

* GORDO_PROJECTS

* register GORDO_PROJECTS

* Handling metrics for deleted projects

* Fix error: method `apply_to_gauges` not found for this

* Update unused mute

* Fix issue with old projects

* gordo_projectes now calculating for gordos

* Unused import

* warning metrics
  • Loading branch information
koropets authored Jun 7, 2021
1 parent f455541 commit 82b8fff
Show file tree
Hide file tree
Showing 15 changed files with 340 additions and 26 deletions.
24 changes: 16 additions & 8 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -48,29 +48,37 @@ jobs:
sudo kustomize build k8s/minikube
- name: Install minikube
env:
MINIKUBE_VERSION: v1.16.0
MINIKUBE_SHA256: "af29a48b2d79075f9d57be3a28724eef2cd628bb87283ed58dd72cbe1f8967c4"
run: |
set -e
sudo apt-get update
sudo apt-get install -y conntrack
sudo curl -L -o /usr/bin/minikube https://storage.googleapis.com/minikube/releases/latest/minikube-linux-amd64
sudo curl -L -o /usr/bin/minikube https://storage.googleapis.com/minikube/releases/${MINIKUBE_VERSION}/minikube-linux-amd64
echo "${MINIKUBE_SHA256} /usr/bin/minikube" | sha256sum -c --status
sudo chmod +x /usr/bin/minikube
- name: Start minikube
env:
KUBERNETES_VERSION: v1.18.17
MINIKUBE_HOME: /home/runner
CHANGE_MINIKUBE_NONE_USER: "true"
KUBECONFIG: /home/runner/.kube/config
run: |
sudo minikube start --vm-driver=none
set -e
sudo -E /usr/bin/minikube start --kubernetes-version=${KUBERNETES_VERSION} --vm-driver=none
sudo chown -R $USER $HOME/.minikube $HOME/.kube
chmod -R u+wrx $HOME/.minikube $HOME/.kube
minikube update-context
kubectl apply -k k8s/minikube -n default || echo "Skipping on Istio error"
bash ci/wait_gordo_controller.sh
- name: Test CRDs
run: |
sudo kubectl get gordos > /dev/null
sudo kubectl get models > /dev/null
kubectl get gordos > /dev/null
kubectl get models > /dev/null
- name: Unit tests
uses: actions-rs/cargo@v1
Expand Down
41 changes: 41 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ serde = { version = "1.0", features = ["derive"]}
serde_json = "1.0"
tokio = { version = "0.2.12", features = ["full"] }
futures = "0.3.4"
prometheus = "0.8.0"
actix-web-prom = "0.2.0"
lazy_static = "1.4.0"

[dev-dependencies]
serde_yaml = "0.8.11"
Expand Down
3 changes: 3 additions & 0 deletions ci/integration_tests.sh
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
#!/bin/bash

export DEPLOY_IMAGE=docker.io/gordo/gordo-deploy
export DOCKER_REGISTRY=docker.io

SLEEP_TIMEOUT=10

cargo run &
Expand Down
7 changes: 7 additions & 0 deletions examples/query_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,13 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
.await?;
assert!(resp.is_empty());

let body = reqwest::get("http://0.0.0.0:8888/metrics")
.await?
.text()
.await?;

assert!(body.contains("gordo_controller_http_requests_total"))

// Apply a Gordo and Model
let gordo: Value = read_manifest("example-gordo.yaml");
let gordo: Gordo = serde_json::from_value(gordo).unwrap();
Expand Down
50 changes: 42 additions & 8 deletions src/crd/argo/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@ use futures::future::join3;
use log::{error, info, warn};
use kube::api::Object;
use k8s_openapi::api::core::v1::{PodSpec, PodStatus};
use crate::crd::model::{Model, ModelPhase, ModelPodTerminatedStatus, patch_model_status, patch_model_with_default_status};
use crate::crd::model::{Model, ModelPhase, ModelPodTerminatedStatus, patch_model_status, patch_model_with_default_status, get_model_project};
use crate::crd::pod::{POD_MATCH_LABELS, FAILED};
use crate::Controller;
use crate::crd::metrics::{kube_error_happened, warning_happened, ModelPhasesMetrics, update_model_counts};
use k8s_openapi::api::core::v1::ContainerStateTerminated;
use chrono::MIN_DATE;

Expand Down Expand Up @@ -106,16 +107,21 @@ fn last_container_terminated_status(terminated_statuses: Vec<&ContainerStateTerm
}

pub async fn monitor_wf(controller: &Controller) -> () {
// TODO this function definitely need to be refactored
let (workflows, models, pods) = join3(controller.wf_state(), controller.model_state(), controller.pod_state()).await;
let mut model_phases_metrics = ModelPhasesMetrics::new(None);

for model in models {
let labels = &model.metadata.labels;
let mut current_phase: Option<ModelPhase> = None;
let current_project: Option<String> = get_model_project(&model);
match &model.status {
Some(model_status) => {
let labels = &model.metadata.labels;
let is_reapplied_model = match (&model_status.revision, labels.get("applications.gordo.equinor.com/project-revision")) {
(Some(status_revision), Some(metadata_revision)) => status_revision != metadata_revision,
_ => false,
};
current_phase = Some(model_status.phase.clone());
if !is_reapplied_model {
match &model_status.phase {
ModelPhase::InProgress | ModelPhase::Unknown => {
Expand Down Expand Up @@ -147,7 +153,10 @@ pub async fn monitor_wf(controller: &Controller) -> () {
new_model_status.message = terminated_status_message.message.clone();
new_model_status.traceback = terminated_status_message.traceback.clone();
},
Err(err) => warn!("Got JSON error where parsing pod's terminated message for the model '{}': {:?}", model_name, err),
Err(err) => {
warn!("Got JSON error where parsing pod's terminated message for the model '{}': {:?}", model_name, err);
warning_happened("parse_terminated_message")
}
}
}
}
Expand All @@ -156,8 +165,17 @@ pub async fn monitor_wf(controller: &Controller) -> () {
}
if model_phase != model_status.phase {
match patch_model_status(&controller.model_resource, &model.metadata.name, new_model_status).await {
Ok(new_model) => info!("Patching Model '{}' from status {:?} to {:?}", model.metadata.name, model.status, new_model.status),
Err(err) => error!( "Failed to patch status of Model '{}' - error: {:?}", model.metadata.name, err),
Ok(new_model) => {
info!("Patching Model '{}' from status {:?} to {:?}", model.metadata.name, model.status, new_model.status);
current_phase = match new_model.status {
Some(status) => Some(status.phase),
None => None,
}
}
Err(err) => {
error!( "Failed to patch status of Model '{}' - error: {:?}", model.metadata.name, err);
kube_error_happened("patch_model", err);
}
}
}
}
Expand All @@ -166,12 +184,28 @@ pub async fn monitor_wf(controller: &Controller) -> () {
}
} else {
match patch_model_with_default_status(&controller.model_resource, &model).await {
Ok(new_model) => info!("Patching Model '{}' from status {:?} to default status {:?}", model.metadata.name, model.status, new_model.status),
Err(err) => error!( "Failed to patch status of Model '{}' with default status - error: {:?}", model.metadata.name, err),
Ok(new_model) => {
info!("Patching Model '{}' from status {:?} to default status {:?}", model.metadata.name, model.status, new_model.status);
current_phase = match new_model.status {
Some(status) => Some(status.phase),
None => None,
}
}
Err(err) => {
error!( "Failed to patch status of Model '{}' with default status - error: {:?}", model.metadata.name, err);
kube_error_happened("patch_model", err);
}
}
}
}
_ => (),
}
};
match (current_project, current_phase) {
(Some(project), Some(phase)) => {
model_phases_metrics.inc_model_counts(project, phase);
},
_ => (),
};
}
update_model_counts(&model_phases_metrics);
}
23 changes: 18 additions & 5 deletions src/crd/gordo/gordo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use serde_json::{json, Value};
use std::collections::HashMap;

use crate::{DeployJob, GordoEnvironmentConfig};
use crate::crd::metrics::{kube_error_happened};

pub type GenerationNumber = Option<u32>;
pub type Gordo = Object<GordoSpec, GordoStatus>;
Expand Down Expand Up @@ -112,7 +113,10 @@ pub async fn start_gordo_deploy_job(
let serialized_job_manifest = serde_json::to_vec(&job).unwrap();
match jobs.create(&postparams, serialized_job_manifest).await {
Ok(job) => info!("Submitted job: {:?}", job.metadata.name),
Err(e) => error!("Failed to submit job with error: {:?}", e),
Err(e) => {
error!("Failed to submit job with error: {:?}", e);
kube_error_happened("submit_job", e);
}
}

let mut status = GordoStatus::from(gordo);
Expand All @@ -130,7 +134,10 @@ pub async fn start_gordo_deploy_job(
.await
{
Ok(o) => info!("Patched status: {:?}", o.status),
Err(e) => error!("Failed to patch status: {:?}", e),
Err(e) => {
error!("Failed to patch status: {:?}", e);
kube_error_happened("patch_gordo", e);
}
};
}

Expand Down Expand Up @@ -165,16 +172,22 @@ pub async fn remove_gordo_deploy_jobs(gordo: &Gordo, client: &APIClient, namespa
tokio::time::delay_for(std::time::Duration::from_secs(1)).await;
}
}
Err(err) => error!(
Err(err) => {
error!(
"Failed to delete old gordo job: '{}' with error: {:?}",
&job.metadata.name, err
),
);
kube_error_happened("delete_gordo", err);
}
}
}
}),
)
.await;
}
Err(e) => error!("Failed to list jobs: {:?}", e),
Err(e) => {
error!("Failed to list jobs: {:?}", e);
kube_error_happened("list_gordo", e);
}
}
}
9 changes: 9 additions & 0 deletions src/crd/gordo/mod.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
use futures::future::join_all;
use kube::{api::Api, client::APIClient};
use log::error;
use std::collections::{HashSet};

use crate::{Controller, GordoEnvironmentConfig};
use crate::crd::metrics::{KUBE_ERRORS, update_gordo_projects};

pub mod gordo;
pub use gordo::*;
Expand All @@ -21,12 +23,19 @@ pub async fn monitor_gordos(controller: &Controller) -> () {
}))
.await;

let gordo_projects: HashSet<String> = gordos.into_iter()
.map(|gordo| { gordo.metadata.name })
.collect();

// Log any errors in handling state
results.iter().for_each(|result| {
if let Err(err) = result {
error!("{:?}", err);
KUBE_ERRORS.with_label_values(&["monitor_gordos", "unknown"]).inc_by(1);
}
});

update_gordo_projects(&gordo_projects);
}

async fn handle_gordo_state(
Expand Down
Loading

0 comments on commit 82b8fff

Please sign in to comment.