Skip to content

Commit

Permalink
Wrap controller in Actix server (#67)
Browse files Browse the repository at this point in the history
  • Loading branch information
milesgranger authored Dec 4, 2019
1 parent 25bcf23 commit b926228
Show file tree
Hide file tree
Showing 8 changed files with 898 additions and 20 deletions.
601 changes: 593 additions & 8 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
actix-web = "2.0.0-alpha.2"
kube = { version = "0.20.0", features = ["openapi"] }
k8s-openapi = { version = "0.6.0", default-features = false, features = ["v1_15"] }
log = "0.4.8"
Expand All @@ -21,3 +22,4 @@ futures = "0.3.1"
[dev-dependencies]
serde_yaml = "0.8.11"
tokio-test = "0.2.0-alpha.6"
reqwest = "0.10.0-alpha.2"
1 change: 1 addition & 0 deletions examples/apply_gordo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,4 +65,5 @@ async fn main() {
.delete("gordo-model-name", &DeleteParams::default())
.await
.is_ok());
std::thread::sleep(Duration::from_secs(15));
}
85 changes: 85 additions & 0 deletions examples/query_server.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
use failure::_core::time::Duration;
use gordo_controller::{
crd::gordo::{load_gordo_resource, Gordo},
crd::model::{load_model_resource, Model},
load_kube_config,
};
use kube::api::{DeleteParams, PostParams};
use kube::client::APIClient;
use serde_json::Value;

#[tokio::main]
#[test]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Calling /gordos /models /gordos/<name> and /models/<name> will give back nothing before submitting
let resp: Vec<Gordo> = reqwest::get("http://0.0.0.0:8888/gordos").await?.json().await?;
assert!(resp.is_empty());

let resp: Vec<Model> = reqwest::get("http://0.0.0.0:8888/models").await?.json().await?;
assert!(resp.is_empty());

let resp: Option<Gordo> = reqwest::get("http://0.0.0.0:8888/gordos/test-project-name")
.await?
.json()
.await?;
assert!(resp.is_none());

let resp: Vec<Model> = reqwest::get("http://0.0.0.0:8888/models/test-project-name")
.await?
.json()
.await?;
assert!(resp.is_empty());

// Apply a Gordo and Model
let gordo: Value = read_manifest("example-gordo.yaml");
let model: Value = read_manifest("example-model.yaml");

let client = APIClient::new(load_kube_config().await);
let gordo_api = load_gordo_resource(&client, "default");
let model_api = load_model_resource(&client, "default");

// Create the Gordo and Model
gordo_api
.create(&PostParams::default(), serde_json::to_vec(&gordo).unwrap())
.await
.unwrap();
model_api
.create(&PostParams::default(), serde_json::to_vec(&model).unwrap())
.await
.unwrap();

// Wait for controller to pick up changes
std::thread::sleep(Duration::from_secs(20));

// Calling /gordos /models /gordos/<name> and /models/<name> will now give back stuff
let resp: Vec<Gordo> = reqwest::get("http://0.0.0.0:8888/gordos").await?.json().await?;
assert_eq!(resp.len(), 1);

let resp: Vec<Model> = reqwest::get("http://0.0.0.0:8888/models").await?.json().await?;
assert_eq!(resp.len(), 1);

let resp: Option<Gordo> = reqwest::get("http://0.0.0.0:8888/gordos/test-project-name")
.await?
.json()
.await?;
assert!(resp.is_some());

let resp: Vec<Model> = reqwest::get("http://0.0.0.0:8888/models/test-project-name")
.await?
.json()
.await?;
assert_eq!(resp.len(), 1);

// Clean up
gordo_api
.delete("test-project-name", &DeleteParams::default())
.await
.unwrap();

Ok(())
}

fn read_manifest(name: &str) -> Value {
let raw = std::fs::read_to_string(format!("{}/{}", env!("CARGO_MANIFEST_DIR"), name)).unwrap();
serde_yaml::from_str(&raw).unwrap()
}
101 changes: 100 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,32 @@
use futures::future::join;
use kube::config;
use kube::{api::Reflector, client::APIClient, config::Configuration};
use log::error;
use serde::Deserialize;

pub mod crd;
pub mod deploy_job;
pub mod views;

use crate::crd::{
gordo::{load_gordo_resource, monitor_gordos},
model::{load_model_resource, monitor_models, Model},
};
pub use crd::gordo::Gordo;
pub use deploy_job::DeployJob;

#[derive(Deserialize, Debug, Clone)]
pub struct GordoEnvironmentConfig {
deploy_image: String,
pub deploy_image: String,
pub server_port: u16,
pub server_host: String,
}
impl Default for GordoEnvironmentConfig {
fn default() -> Self {
GordoEnvironmentConfig {
deploy_image: "auroradevacr.azurecr.io/gordo-infrastructure/gordo-deploy".to_owned(),
server_port: 8888,
server_host: "0.0.0.0".to_owned(),
}
}
}
Expand All @@ -25,3 +39,88 @@ pub fn minor_version(deploy_version: &str) -> Option<u32> {
.map(|v| v.parse::<u32>().ok())
.unwrap_or(None)
}

/// Load the `kube::Configuration` giving priority to local, falling back to in-cluster config
pub async fn load_kube_config() -> Configuration {
config::load_kube_config()
.await
.unwrap_or_else(|_| config::incluster_config().expect("Failed to get local kube config and incluster config"))
}

#[derive(Clone)]
pub struct Controller {
client: APIClient,
namespace: String,
gordo_rf: Reflector<Gordo>,
model_rf: Reflector<Model>,
}

impl Controller {
/// Create a new instance of the Gordo Controller
pub async fn new(kube_config: Configuration) -> Self {
let namespace = kube_config.default_ns.to_owned();
let client = APIClient::new(kube_config);

let model_resource = load_model_resource(&client, &namespace);
let model_rf = Reflector::new(model_resource.clone()).timeout(15).init().await.unwrap();

let gordo_resource = load_gordo_resource(&client, &namespace);
let gordo_rf = Reflector::new(gordo_resource.clone()).timeout(15).init().await.unwrap();

Controller {
client,
namespace,
gordo_rf,
model_rf,
}
}

/// Poll the Gordo and Model reflectors
async fn poll(&self) -> Result<(), kube::Error> {
let (result1, result2) = join(self.gordo_rf.poll(), self.model_rf.poll()).await;

// Return any error, or return Ok
result1?;
result2?;
Ok(())
}

/// Current state of Gordos
pub async fn gordo_state(&self) -> Vec<Gordo> {
self.gordo_rf.read().unwrap_or_default()
}
/// Current state of Models
pub async fn model_state(&self) -> Vec<Model> {
self.model_rf.read().unwrap_or_default()
}
}

/// This returns a `Controller` and calls `poll` on it continuously.
/// While at the same time initializing the monitoring of `Gorod`s and `Model`s
pub async fn controller_init(
kube_config: Configuration,
env_config: GordoEnvironmentConfig,
) -> Result<Controller, kube::Error> {
let controller = Controller::new(kube_config).await;

// Continuously poll `Controller::poll` to keep the app state current
let c1 = controller.clone();
tokio::spawn(async move {
loop {
if let Err(err) = c1.poll().await {
error!("Failed polling Controller with error: {:?}", err);
};
}
});

// Start the normal monitoring of Gordos and Models to direct desired state changes
let c2 = controller.clone();
tokio::spawn(async move {
join(
monitor_gordos(&c2.client, &c2.namespace, &env_config),
monitor_models(&c2.client, &c2.namespace, &env_config),
)
.await;
});
Ok(controller)
}
36 changes: 25 additions & 11 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
use futures::future::join;
use kube::{client::APIClient, config};
use actix_web::{middleware, web, App, HttpServer};
use gordo_controller::{controller_init, views, GordoEnvironmentConfig};
use kube::config;
use log::info;

use gordo_controller::GordoEnvironmentConfig;

#[tokio::main]
async fn main() -> () {
std::env::set_var("RUST_LOG", "info,kube=info");
Expand All @@ -16,12 +15,27 @@ async fn main() -> () {
.await
.unwrap_or_else(|_| config::incluster_config().expect("Failed to get local kube config and incluster config"));

let namespace = kube_config.default_ns.to_owned();
let client = APIClient::new(kube_config);
let bind_address = format!("{}:{}", &env_config.server_host, env_config.server_port);

let controller = controller_init(kube_config, env_config).await.unwrap();

// Launch in new thread b/c HttpServer starts own async executor
let handle = std::thread::spawn(move || {
HttpServer::new(move || {
App::new()
.data(controller.clone())
.wrap(middleware::Logger::default().exclude("/health"))
.service(web::resource("/health").to(views::health))
.service(web::resource("/gordos").to(views::gordos))
.service(web::resource("/gordos/{name}").to(views::get_gordo))
.service(web::resource("/models").to(views::models))
.service(web::resource("/models/{gordo_name}").to(views::models_by_gordo))
})
.bind(&bind_address)
.expect(&format!("Could not bind to '{}'", &bind_address))
.run()
.unwrap();
});

join(
gordo_controller::crd::gordo::monitor_gordos(&client, &namespace, &env_config),
gordo_controller::crd::model::monitor_models(&client, &namespace, &env_config),
)
.await;
handle.join().unwrap()
}
46 changes: 46 additions & 0 deletions src/views.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
use crate::crd::model::Model;
use crate::{Controller, Gordo};
use actix_web::{http::StatusCode, web, HttpRequest, HttpResponse};

// Simple health check endpoint
pub async fn health(_req: HttpRequest) -> HttpResponse {
HttpResponse::new(StatusCode::OK)
}

// List current gordos
pub async fn gordos(data: web::Data<Controller>, _req: HttpRequest) -> web::Json<Vec<Gordo>> {
web::Json(data.gordo_state().await)
}

// Get a gordo by name
pub async fn get_gordo(data: web::Data<Controller>, name: web::Path<String>) -> web::Json<Option<Gordo>> {
let gordo = data
.gordo_state()
.await
.into_iter()
.filter(|gordo| gordo.metadata.name == name.as_str())
.nth(0);
web::Json(gordo)
}

// List current models
pub async fn models(data: web::Data<Controller>, _req: HttpRequest) -> web::Json<Vec<Model>> {
web::Json(data.model_state().await)
}

// List current models belonging to a specific Gordo
pub async fn models_by_gordo(data: web::Data<Controller>, gordo_name: web::Path<String>) -> web::Json<Vec<Model>> {
let models = data
.model_state()
.await
.into_iter()
.filter(|model| {
model
.metadata
.ownerReferences
.iter()
.any(|owner_ref| owner_ref.name == gordo_name.as_str())
})
.collect();
web::Json(models)
}
46 changes: 46 additions & 0 deletions tests/test_server.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
use gordo_controller::{controller_init, load_kube_config, views, Controller, Gordo, GordoEnvironmentConfig};
use tokio_test::block_on;

use actix_web::web::Json;
use actix_web::{http::StatusCode, test, web};
use gordo_controller::crd::model::Model;

#[test]
fn test_view_health() {
block_on(async {
let req = test::TestRequest::default().to_http_request();
let resp = views::health(req).await;
assert_eq!(resp.status(), StatusCode::OK);
})
}

#[test]
fn test_view_gordos() {
block_on(async {
let data = app_state().await;

let req = test::TestRequest::default().to_http_request();
let resp: Json<Vec<Gordo>> = views::gordos(data, req).await;
assert_eq!(resp.0.len(), 0);
})
}

#[test]
fn test_view_models() {
block_on(async {
let data = app_state().await;

let req = test::TestRequest::default().to_http_request();
let resp: Json<Vec<Model>> = views::models(data, req).await;
assert_eq!(resp.0.len(), 0);
})
}

// Helper for just this module: loading app state for testing
async fn app_state() -> web::Data<Controller> {
let kube_config = load_kube_config().await;
let controller = controller_init(kube_config, GordoEnvironmentConfig::default())
.await
.unwrap();
web::Data::new(controller)
}

0 comments on commit b926228

Please sign in to comment.