Skip to content

Commit

Permalink
Add rustfmt.toml and reformat
Browse files Browse the repository at this point in the history
  • Loading branch information
milesgranger committed Nov 14, 2019
1 parent 4b9900f commit 214648a
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 132 deletions.
1 change: 1 addition & 0 deletions .rustfmt.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
max_width = 120
102 changes: 37 additions & 65 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,9 @@ async fn main() -> ! {
GordoEnvironmentConfig::default()
});

let kube_config = config::load_kube_config().await.unwrap_or_else(|_| {
config::incluster_config().expect("Failed to get local kube config and incluster config")
});
let kube_config = config::load_kube_config()
.await
.unwrap_or_else(|_| config::incluster_config().expect("Failed to get local kube config and incluster config"));

let namespace = kube_config.default_ns.to_owned();
info!("Got default namespace of: {}", &namespace);
Expand All @@ -85,7 +85,6 @@ async fn main() -> ! {
launch_waiting_gordo_workflows(&resource, &client, &namespace, &env_config).await;

loop {

// Update state changes
informer
.poll()
Expand All @@ -95,8 +94,7 @@ async fn main() -> ! {
while let Some(event) = informer.pop() {
match event {
WatchEvent::Added(gordo) => {
start_gordo_deploy_job(&gordo, &client, &resource, &namespace, &env_config)
.await;
start_gordo_deploy_job(&gordo, &client, &resource, &namespace, &env_config).await;
}
WatchEvent::Modified(gordo) => {
info!(
Expand All @@ -109,29 +107,16 @@ async fn main() -> ! {
GordoStatus::Submitted(ref generation) => {
// If it's submitted, we only want to launch the job if the GenerationNumber has changed.
if generation != &gordo.metadata.generation.map(|v| v as u32) {
start_gordo_deploy_job(
&gordo,
&client,
&resource,
&namespace,
&env_config,
)
.await;
start_gordo_deploy_job(&gordo, &client, &resource, &namespace, &env_config)
.await;
}
}
}
}

// No Gordo status
None => {
start_gordo_deploy_job(
&gordo,
&client,
&resource,
&namespace,
&env_config,
)
.await;
start_gordo_deploy_job(&gordo, &client, &resource, &namespace, &env_config).await;
}
}
}
Expand Down Expand Up @@ -189,13 +174,7 @@ pub(crate) async fn launch_waiting_gordo_workflows(
.map(|gordo| {
// Submit this gordo resource.
info!("Submitting waiting Gordo: {}", &gordo.metadata.name);
start_gordo_deploy_job(
gordo,
&client,
&resource,
&namespace,
&env_config,
)
start_gordo_deploy_job(gordo, &client, &resource, &namespace, &env_config)
}),
)
.await;
Expand Down Expand Up @@ -262,52 +241,45 @@ async fn start_gordo_deploy_job(
}

/// Remove any gordo deploy jobs associated with this `Gordo`
pub(crate) async fn remove_gordo_deploy_jobs(
gordo: &Gordo,
client: &APIClient,
namespace: &str,
) -> () {
info!(
"Removing any gordo-deploy jobs for Gordo: '{}'",
&gordo.metadata.name
);
pub(crate) async fn remove_gordo_deploy_jobs(gordo: &Gordo, client: &APIClient, namespace: &str) -> () {
info!("Removing any gordo-deploy jobs for Gordo: '{}'", &gordo.metadata.name);

let jobs = Api::v1Job(client.clone()).within(&namespace);
match jobs.list(&ListParams::default()).await {
Ok(job_list) => {
join_all(job_list
.items
.into_iter()
.filter(|job| {
job.metadata.labels.get("gordoProjectName") == Some(&gordo.metadata.name)
})
.map(|job| async move {
let jobs_api = Api::v1Job(client.clone()).within(&namespace);
match jobs_api
.delete(&job.metadata.name, &DeleteParams::default())
.await
{
Ok(_) => {
info!(
"Successfully requested to delete job: {}, waiting for it to die.",
&job.metadata.name
);
join_all(
job_list
.items
.into_iter()
.filter(|job| job.metadata.labels.get("gordoProjectName") == Some(&gordo.metadata.name))
.map(|job| {
async move {
let jobs_api = Api::v1Job(client.clone()).within(&namespace);
match jobs_api.delete(&job.metadata.name, &DeleteParams::default()).await {
Ok(_) => {
info!(
"Successfully requested to delete job: {}, waiting for it to die.",
&job.metadata.name
);

// Keep trying to get the job, it will fail when it no longer exists.
while let Ok(job) = jobs_api.get(&job.metadata.name).await {
info!(
// Keep trying to get the job, it will fail when it no longer exists.
while let Ok(job) = jobs_api.get(&job.metadata.name).await {
info!(
"Got job resourceVersion: {:#?}, generation: {:#?} waiting for it to be deleted.",
job.metadata.resourceVersion, job.metadata.generation
);
std::thread::sleep(std::time::Duration::from_secs(1));
std::thread::sleep(std::time::Duration::from_secs(1));
}
}
Err(err) => error!(
"Failed to delete old gordo job: '{}' with error: {:?}",
&job.metadata.name, err
),
}
}
Err(err) => error!(
"Failed to delete old gordo job: '{}' with error: {:?}",
&job.metadata.name, err
),
}
})).await;
}),
)
.await;
}
Err(e) => error!("Failed to list jobs: {:?}", e),
}
Expand Down
19 changes: 6 additions & 13 deletions src/tests/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ use crate::Gordo;

// Get the `APIClient` using current kube config
pub async fn client() -> APIClient {
let config = config::load_kube_config().await.unwrap_or_else(|_| {
config::incluster_config().expect("Failed to get local kube config and incluster config")
});
let config = config::load_kube_config()
.await
.unwrap_or_else(|_| config::incluster_config().expect("Failed to get local kube config and incluster config"));
APIClient::new(config)
}

Expand All @@ -24,13 +24,7 @@ pub fn gordo_custom_resource_api(client: APIClient) -> Api<Gordo> {

// Remove _all_ gordos.
pub async fn remove_gordos(gordos: &Api<Gordo>) {
for gordo in gordos
.list(&ListParams::default())
.await
.unwrap()
.items
.iter()
{
for gordo in gordos.list(&ListParams::default()).await.unwrap().items.iter() {
gordos
.delete(&gordo.metadata.name, &DeleteParams::default())
.await
Expand All @@ -40,8 +34,7 @@ pub async fn remove_gordos(gordos: &Api<Gordo>) {

// Get the repo's example `Gordo` config file
pub fn gordo_example_config() -> Value {
let config_str =
std::fs::read_to_string(format!("{}/example-gordo.yaml", env!("CARGO_MANIFEST_DIR")))
.expect("Failed to read config file");
let config_str = std::fs::read_to_string(format!("{}/example-gordo.yaml", env!("CARGO_MANIFEST_DIR")))
.expect("Failed to read config file");
serde_yaml::from_str(&config_str).expect("Unable to parse config file into yaml")
}
63 changes: 9 additions & 54 deletions src/tests/test_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,7 @@ fn test_create_gordo() {
helpers::remove_gordos(&gordos).await;

// Ensure there are no Gordos
assert_eq!(
gordos
.list(&ListParams::default())
.await
.unwrap()
.items
.len(),
0
);
assert_eq!(gordos.list(&ListParams::default()).await.unwrap().items.len(), 0);

// Apply the `gordo-example.yaml` file
let config = helpers::gordo_example_config();
Expand All @@ -37,34 +29,15 @@ fn test_create_gordo() {
};

// Ensure there are now one gordos
assert_eq!(
gordos
.list(&ListParams::default())
.await
.unwrap()
.items
.len(),
1
);
assert_eq!(gordos.list(&ListParams::default()).await.unwrap().items.len(), 1);

// Delete the gordo
if let Err(err) = gordos
.delete(&new_gordo.metadata.name, &DeleteParams::default())
.await
{
if let Err(err) = gordos.delete(&new_gordo.metadata.name, &DeleteParams::default()).await {
panic!("Failed to delete gordo with error: {:?}", err);
}

// Back to zero gordos
assert_eq!(
gordos
.list(&ListParams::default())
.await
.unwrap()
.items
.len(),
0
);
assert_eq!(gordos.list(&ListParams::default()).await.unwrap().items.len(), 0);
})
}

Expand All @@ -91,36 +64,21 @@ fn test_launch_waiting_gordos() {

// No jobs waiting after applying config.
let jobs = Api::v1Job(client.clone()).within("default");
assert_eq!(
jobs.list(&ListParams::default()).await.unwrap().items.len(),
0
);
assert_eq!(jobs.list(&ListParams::default()).await.unwrap().items.len(), 0);

// Launch the waiting config.
let resource = helpers::gordo_custom_resource_api(client.clone());
crate::launch_waiting_gordo_workflows(
&resource,
&client,
"default",
&GordoEnvironmentConfig::default(),
)
.await;
crate::launch_waiting_gordo_workflows(&resource, &client, "default", &GordoEnvironmentConfig::default()).await;

// Now we should have one job.
assert_eq!(
jobs.list(&ListParams::default()).await.unwrap().items.len(),
1
);
assert_eq!(jobs.list(&ListParams::default()).await.unwrap().items.len(), 1);

// Delete all jobs
crate::remove_gordo_deploy_jobs(&new_gordo, &client, "default").await;

// And finally, we should have zero jobs
std::thread::sleep(std::time::Duration::from_secs(5)); // Time for step above to finish
assert_eq!(
jobs.list(&ListParams::default()).await.unwrap().items.len(),
0
);
assert_eq!(jobs.list(&ListParams::default()).await.unwrap().items.len(), 0);
})
}

Expand All @@ -137,10 +95,7 @@ fn test_deploy_job_name() {

// Basic
let suffix = "some-suffix";
assert_eq!(
&DeployJob::deploy_job_name(prefix, suffix),
"gordo-dpl-some-suffix"
);
assert_eq!(&DeployJob::deploy_job_name(prefix, suffix), "gordo-dpl-some-suffix");

// Really long suffix
let mut suffix = std::iter::repeat("a").take(100).collect::<String>();
Expand Down

0 comments on commit 214648a

Please sign in to comment.