Skip to content

Commit

Permalink
Tracking model statuses for restarted gordos (#143)
Browse files Browse the repository at this point in the history
* ModelStatus.revision

* Tracking model statuses for restarted gordos. Tracking models from split Argo workflows in the right way

* Type in comment
  • Loading branch information
koropets authored Jun 17, 2020
1 parent b297192 commit 1dac103
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 45 deletions.
112 changes: 69 additions & 43 deletions src/crd/argo/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use futures::future::join3;
use log::{error, info, warn};
use kube::api::Object;
use k8s_openapi::api::core::v1::{PodSpec, PodStatus};
use crate::crd::model::{Model, ModelPhase, ModelPodTerminatedStatus, patch_model_status};
use crate::crd::model::{Model, ModelPhase, ModelPodTerminatedStatus, patch_model_status, patch_model_with_default_status};
use crate::crd::pod::{POD_MATCH_LABELS, FAILED};
use crate::Controller;
use k8s_openapi::api::core::v1::ContainerStateTerminated;
Expand All @@ -16,6 +16,8 @@ pub const WF_MATCH_LABELS: &'static [&'static str] = &[
"applications.gordo.equinor.com/project-revision",
];

pub const WF_NUMBER_LABEL: &str = "applications.gordo.equinor.com/project-workflow";

fn some_of_workflows_in_phases(workflows: &Vec<&ArgoWorkflow>, phases: Vec<ArgoWorkflowPhase>) -> bool {
workflows.iter()
.any(|wf| match &wf.status {
Expand All @@ -39,14 +41,23 @@ fn all_of_workflows_in_phases(workflows: &Vec<&ArgoWorkflow>, phases: Vec<ArgoWo
}

fn find_model_workflows<'a>(model: &'a Model, workflows: &'a [ArgoWorkflow]) -> Vec<&'a ArgoWorkflow> {
//TODO for performance reason we supposed to reimplement this algorithm with BTreeMap
workflows
.iter()
.filter(|workflow| {
let workflow_labels = &workflow.metadata.labels;
let model_labels = &model.metadata.labels;
WF_MATCH_LABELS
let equal_labels = WF_MATCH_LABELS
.iter()
.all(move |&label_name| workflow_labels.get(label_name) == model_labels.get(label_name))
.all(move |&label_name| workflow_labels.get(label_name) == model_labels.get(label_name));
let mut result = equal_labels;
if equal_labels {
result = match (workflow_labels.get(WF_NUMBER_LABEL), model_labels.get(WF_NUMBER_LABEL)) {
(Some(workflow_wf_number), Some(model_wf_number)) => workflow_wf_number == model_wf_number,
_ => equal_labels,
}
}
result
})
.collect()
}
Expand Down Expand Up @@ -99,52 +110,67 @@ pub async fn monitor_wf(controller: &Controller) -> () {

for model in models {
match &model.status {
Some(model_status) => match &model_status.phase {
ModelPhase::InProgress | ModelPhase::Unknown => {
let found_workflows = find_model_workflows(&model, &workflows);
let mut new_model_phase: Option<ModelPhase> = None;
if some_of_workflows_in_phases(&found_workflows, vec![ArgoWorkflowPhase::Error, ArgoWorkflowPhase::Failed, ArgoWorkflowPhase::Skipped]) {
new_model_phase = Some(ModelPhase::Failed);
} else if all_of_workflows_in_phases(&found_workflows, vec![ArgoWorkflowPhase::Succeeded]) {
new_model_phase = Some(ModelPhase::Succeeded);
}
if let Some(model_phase) = new_model_phase {
let mut new_model_status = model_status.clone();
new_model_status.phase = model_phase.clone();
info!("New phase for the model '{}' will be {:?}", model.metadata.name, model_status);
if model_phase == ModelPhase::Failed {
if let Some(model_name) = model.metadata.labels.get("applications.gordo.equinor.com/model-name") {
let terminated_statuses = failed_pods_terminated_statuses(&model, &pods);
info!("Found {} failed pods in terminated status which is relates to the model '{}'", terminated_statuses.len(), model.metadata.name);
if let Some(terminated_status) = last_container_terminated_status(terminated_statuses) {
new_model_status.code = Some(terminated_status.exit_code);
if let Some(message) = &terminated_status.message {
let trimmed_message = message.trim_end();
if !trimmed_message.is_empty() {
let result: serde_json::Result<ModelPodTerminatedStatus> = serde_json::from_str(&trimmed_message);
match result {
Ok(terminated_status_message) => {
info!("Last terminated status message {:?} for model '{}'", terminated_status_message, model_name);
new_model_status.error_type = terminated_status_message.error_type.clone();
new_model_status.message = terminated_status_message.message.clone();
new_model_status.traceback = terminated_status_message.traceback.clone();
},
Err(err) => warn!("Got JSON error where parsing pod's terminated message for the model '{}': {:?}", model_name, err),
Some(model_status) => {
let labels = &model.metadata.labels;
let is_reapplied_model = match (&model_status.revision, labels.get("applications.gordo.equinor.com/project-revision")) {
(Some(status_revision), Some(metadata_revision)) => status_revision != metadata_revision,
_ => false,
};
if !is_reapplied_model {
match &model_status.phase {
ModelPhase::InProgress | ModelPhase::Unknown => {
let found_workflows = find_model_workflows(&model, &workflows);
let mut new_model_phase: Option<ModelPhase> = None;
if some_of_workflows_in_phases(&found_workflows, vec![ArgoWorkflowPhase::Error, ArgoWorkflowPhase::Failed, ArgoWorkflowPhase::Skipped]) {
new_model_phase = Some(ModelPhase::Failed);
} else if all_of_workflows_in_phases(&found_workflows, vec![ArgoWorkflowPhase::Succeeded]) {
new_model_phase = Some(ModelPhase::Succeeded);
}
if let Some(model_phase) = new_model_phase {
let mut new_model_status = model_status.clone();
new_model_status.phase = model_phase.clone();
info!("New phase for the model '{}' will be {:?}", model.metadata.name, model_status);
if model_phase == ModelPhase::Failed {
if let Some(model_name) = model.metadata.labels.get("applications.gordo.equinor.com/model-name") {
let terminated_statuses = failed_pods_terminated_statuses(&model, &pods);
info!("Found {} failed pods in terminated status which is relates to the model '{}'", terminated_statuses.len(), model.metadata.name);
if let Some(terminated_status) = last_container_terminated_status(terminated_statuses) {
new_model_status.code = Some(terminated_status.exit_code);
if let Some(message) = &terminated_status.message {
let trimmed_message = message.trim_end();
if !trimmed_message.is_empty() {
let result: serde_json::Result<ModelPodTerminatedStatus> = serde_json::from_str(&trimmed_message);
match result {
Ok(terminated_status_message) => {
info!("Last terminated status message {:?} for model '{}'", terminated_status_message, model_name);
new_model_status.error_type = terminated_status_message.error_type.clone();
new_model_status.message = terminated_status_message.message.clone();
new_model_status.traceback = terminated_status_message.traceback.clone();
},
Err(err) => warn!("Got JSON error where parsing pod's terminated message for the model '{}': {:?}", model_name, err),
}
}
}
}
}
}
if model_phase != model_status.phase {
match patch_model_status(&controller.model_resource, &model.metadata.name, new_model_status).await {
Ok(new_model) => info!("Patching Model '{}' from status {:?} to {:?}", model.metadata.name, model.status, new_model.status),
Err(err) => error!( "Failed to patch status of Model '{}' - error: {:?}", model.metadata.name, err),
}
}
}
}
if model_phase != model_status.phase {
match patch_model_status(&controller.model_resource, &model.metadata.name, new_model_status).await {
Ok(new_model) => info!("Patching Model '{}' from status {:?} to {:?}", model.metadata.name, model.status, new_model.status),
Err(err) => error!( "Failed to patch status of Model '{}' - error: {:?}", model.metadata.name, err), }
}
},
_ => (),
}
},
_ => (),
},
} else {
match patch_model_with_default_status(&controller.model_resource, &model).await {
Ok(new_model) => info!("Patching Model '{}' from status {:?} to default status {:?}", model.metadata.name, model.status, new_model.status),
Err(err) => error!( "Failed to patch status of Model '{}' with default status - error: {:?}", model.metadata.name, err),
}
}
}
_ => (),
}
}
Expand Down
13 changes: 11 additions & 2 deletions src/crd/model/mod.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,22 @@
pub mod model;
pub use model::*;

use kube::api::PatchParams;
use kube::api::{Api, PatchParams};
use log::{error, info};
use serde_json::json;

use crate::crd::gordo::GordoStatus;
use crate::Controller;

pub async fn patch_model_with_default_status<'a>(model_resource: &'a Api<Model>, model: &'a Model) -> Result<Model, kube::Error>{
let mut status = ModelStatus::default();
status.revision = match model.metadata.labels.get("applications.gordo.equinor.com/project-revision") {
Some(revision) => Some(revision.to_string()),
None => None,
};
patch_model_status(model_resource, &model.metadata.name, status.clone()).await
}

pub async fn monitor_models(controller: &Controller) -> () {
let models = controller.model_state().await;
let gordos = controller.gordo_state().await;
Expand All @@ -17,7 +26,7 @@ pub async fn monitor_models(controller: &Controller) -> () {
//TODO Update state here
//let name = model.spec.config["name"].as_str().unwrap_or("unknown");
info!("Unknown status for model {}", model.metadata.name);
match patch_model_status(&controller.model_resource, &model.metadata.name, ModelStatus::default()).await {
match patch_model_with_default_status(&controller.model_resource, &model).await {
Ok(new_model) => info!("Patching Model '{}' from status {:?} to {:?}", model.metadata.name, model.status, new_model.status),
Err(err) => error!( "Failed to patch status of Model '{}' - error: {:?}", model.metadata.name, err),
}
Expand Down
1 change: 1 addition & 0 deletions src/crd/model/model.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ pub struct ModelStatus {
pub error_type: Option<String>,
pub message: Option<String>,
pub traceback: Option<String>,
pub revision: Option<String>,
}

#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
Expand Down

0 comments on commit 1dac103

Please sign in to comment.