Skip to content

Commit

Permalink
feat(topolotree): perf improvements and better deploy logic
Browse files Browse the repository at this point in the history
  • Loading branch information
shadaj committed Dec 11, 2023
1 parent d8ca3d4 commit 09966d2
Show file tree
Hide file tree
Showing 17 changed files with 429 additions and 277 deletions.
5 changes: 3 additions & 2 deletions hydro_cli/src/core/custom_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,15 @@ impl Service for CustomService {
}
}

async fn deploy(&mut self, resource_result: &Arc<ResourceResult>) {
async fn deploy(&mut self, resource_result: &Arc<ResourceResult>) -> Result<()> {
if self.launched_host.is_some() {
return;
return Ok(());
}

let mut host_write = self.on.write().await;
let launched = host_write.provision(resource_result);
self.launched_host = Some(launched.await);
Ok(())
}

async fn ready(&mut self) -> Result<()> {
Expand Down
34 changes: 19 additions & 15 deletions hydro_cli/src/core/deployment.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::sync::{Arc, Weak};

use anyhow::Result;
use futures::{StreamExt, TryStreamExt};
use tokio::sync::RwLock;

use super::{progress, Host, ResourcePool, ResourceResult, Service};
Expand Down Expand Up @@ -62,22 +63,25 @@ impl Deployment {
.await;

progress::ProgressTracker::with_group("deploy", None, || {
let services_future =
self.services
.iter_mut()
.map(|service: &mut Weak<RwLock<dyn Service>>| async {
service
.upgrade()
.unwrap()
.write()
.await
.deploy(&result)
.await;
});

futures::future::join_all(services_future)
let services_future = self
.services
.iter_mut()
.map(|service: &mut Weak<RwLock<dyn Service>>| async {
service
.upgrade()
.unwrap()
.write()
.await
.deploy(&result)
.await
})
.collect::<Vec<_>>();

futures::stream::iter(services_future)
.buffer_unordered(8)
.try_fold((), |_, _| async { Ok(()) })
})
.await;
.await?;

progress::ProgressTracker::with_group("ready", None, || {
let all_services_ready =
Expand Down
7 changes: 4 additions & 3 deletions hydro_cli/src/core/gcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,11 @@ impl LaunchedSSHHost for LaunchedComputeEngine {
let mut config = SessionConfiguration::new();
config.set_compress(true);

let mut session =
AsyncSession::<TcpStream>::connect(target_addr, Some(config)).await?;

tokio::time::timeout(Duration::from_secs(15), async move {
let mut session =
AsyncSession::<TcpStream>::connect(target_addr, Some(config))
.await?;

session.handshake().await?;

session
Expand Down
25 changes: 17 additions & 8 deletions hydro_cli/src/core/hydroflow_crate/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ pub struct HydroflowCrate {
/// Configuration for the ports that this service will listen on a port for.
port_to_bind: HashMap<String, ServerStrategy>,

built_binary: Option<JoinHandle<Result<BuiltCrate>>>,
building_binary: Option<JoinHandle<Result<BuiltCrate>>>,
built_binary: Option<BuiltCrate>,
launched_host: Option<Arc<dyn LaunchedHost>>,

/// A map of port names to config for how other services can connect to this one.
Expand Down Expand Up @@ -77,6 +78,7 @@ impl HydroflowCrate {
external_ports,
port_to_server: HashMap::new(),
port_to_bind: HashMap::new(),
building_binary: None,
built_binary: None,
launched_host: None,
server_defns: Arc::new(RwLock::new(HashMap::new())),
Expand Down Expand Up @@ -191,7 +193,7 @@ impl Service for HydroflowCrate {
}

let built = self.build();
self.built_binary = Some(built);
self.building_binary = Some(built);

let mut host = self
.on
Expand All @@ -208,9 +210,9 @@ impl Service for HydroflowCrate {
}
}

async fn deploy(&mut self, resource_result: &Arc<ResourceResult>) {
async fn deploy(&mut self, resource_result: &Arc<ResourceResult>) -> Result<()> {
if self.launched_host.is_some() {
return;
return Ok(());
}

ProgressTracker::with_group(
Expand All @@ -221,11 +223,18 @@ impl Service for HydroflowCrate {
None,
|| async {
let mut host_write = self.on.write().await;
let launched = host_write.provision(resource_result);
self.launched_host = Some(launched.await);
let launched = host_write.provision(resource_result).await;

let built = self.building_binary.take().unwrap().await??.clone();

launched.copy_binary(built.clone()).await?;

self.built_binary = Some(built);
self.launched_host = Some(launched);
Ok(())
},
)
.await;
.await
}

async fn ready(&mut self) -> Result<()> {
Expand All @@ -242,7 +251,7 @@ impl Service for HydroflowCrate {
|| async {
let launched_host = self.launched_host.as_ref().unwrap();

let built = self.built_binary.take().unwrap().await??.clone();
let built = self.built_binary.as_ref().unwrap().clone();
let args = self.args.as_ref().cloned().unwrap_or_default();

let binary = launched_host
Expand Down
18 changes: 18 additions & 0 deletions hydro_cli/src/core/localhost.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,20 @@ pub fn create_broadcast<T: AsyncRead + Send + Unpin + 'static>(
break;
}
}

if let Some(cli_receivers) = weak_cli_receivers.upgrade() {
let cli_receivers = cli_receivers.write().await;
for r in cli_receivers.iter() {
r.close();
}
}

if let Some(receivers) = weak_receivers.upgrade() {
let receivers = receivers.write().await;
for r in receivers.iter() {
r.close();
}
}
});

(cli_receivers, receivers)
Expand Down Expand Up @@ -161,6 +175,10 @@ impl LaunchedHost for LaunchedLocalhost {
}
}

async fn copy_binary(&self, _binary: Arc<(String, Vec<u8>, PathBuf)>) -> Result<()> {
Ok(())
}

async fn launch_binary(
&self,
id: String,
Expand Down
4 changes: 3 additions & 1 deletion hydro_cli/src/core/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ pub trait LaunchedHost: Send + Sync {
/// to listen to network connections (such as the IP address to bind to).
fn server_config(&self, strategy: &ServerStrategy) -> ServerBindConfig;

async fn copy_binary(&self, binary: Arc<(String, Vec<u8>, PathBuf)>) -> Result<()>;

async fn launch_binary(
&self,
id: String,
Expand Down Expand Up @@ -186,7 +188,7 @@ pub trait Service: Send + Sync {
fn collect_resources(&mut self, resource_batch: &mut ResourceBatch);

/// Connects to the acquired resources and prepares the service to be launched.
async fn deploy(&mut self, resource_result: &Arc<ResourceResult>);
async fn deploy(&mut self, resource_result: &Arc<ResourceResult>) -> Result<()>;

/// Launches the service, which should start listening for incoming network
/// connections. The service should not start computing at this point.
Expand Down
23 changes: 17 additions & 6 deletions hydro_cli/src/core/ssh.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,12 +104,7 @@ impl<T: LaunchedSSHHost> LaunchedHost for T {
LaunchedSSHHost::server_config(self, bind_type)
}

async fn launch_binary(
&self,
id: String,
binary: Arc<(String, Vec<u8>, PathBuf)>,
args: &[String],
) -> Result<Arc<RwLock<dyn LaunchedBinary>>> {
async fn copy_binary(&self, binary: Arc<(String, Vec<u8>, PathBuf)>) -> Result<()> {
let session = self.open_ssh_session().await?;

let sftp = async_retry(
Expand Down Expand Up @@ -172,6 +167,22 @@ impl<T: LaunchedSSHHost> LaunchedHost for T {
}
drop(sftp);

Ok(())
}

async fn launch_binary(
&self,
id: String,
binary: Arc<(String, Vec<u8>, PathBuf)>,
args: &[String],
) -> Result<Arc<RwLock<dyn LaunchedBinary>>> {
let session = self.open_ssh_session().await?;

let unique_name = &binary.0;

let user = self.ssh_user();
let binary_path = PathBuf::from(format!("/home/{user}/hydro-{unique_name}"));

let channel = ProgressTracker::leaf(
format!("launching binary /home/{user}/hydro-{unique_name}"),
async {
Expand Down
12 changes: 8 additions & 4 deletions hydro_cli/src/core/terraform.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ impl TerraformPool {
.current_dir(deployment_folder.path())
.arg("apply")
.arg("-auto-approve")
.arg("-no-color");
.arg("-no-color")
.arg("-parallelism=128");

#[cfg(unix)]
{
Expand Down Expand Up @@ -276,7 +277,7 @@ impl TerraformApply {
}
}

fn destroy_deployment(deployment_folder: &TempDir) {
fn destroy_deployment(deployment_folder: TempDir) {
println!(
"Destroying terraform deployment at {}",
deployment_folder.path().display()
Expand All @@ -288,6 +289,7 @@ fn destroy_deployment(deployment_folder: &TempDir) {
.arg("destroy")
.arg("-auto-approve")
.arg("-no-color")
.arg("-parallelism=128")
.stdout(Stdio::piped());

#[cfg(unix)]
Expand All @@ -306,6 +308,8 @@ fn destroy_deployment(deployment_folder: &TempDir) {
.expect("Failed to destroy terraform deployment")
.success()
{
// prevent the folder from being deleted
let _ = deployment_folder.into_path();
eprintln!("WARNING: failed to destroy terraform deployment");
}
}
Expand All @@ -329,7 +333,7 @@ impl Drop for TerraformApply {
}
}

if let Some(deployment_folder) = &self.deployment_folder {
if let Some(deployment_folder) = self.deployment_folder.take() {
destroy_deployment(deployment_folder);
}
}
Expand Down Expand Up @@ -360,7 +364,7 @@ pub struct TerraformResult {

impl Drop for TerraformResult {
fn drop(&mut self) {
if let Some(deployment_folder) = &self.deployment_folder {
if let Some(deployment_folder) = self.deployment_folder.take() {
destroy_deployment(deployment_folder);
}
}
Expand Down
1 change: 1 addition & 0 deletions topolotree/.gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
*.csv
*.pdf
9 changes: 9 additions & 0 deletions topolotree/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
To collect memory/latency versus cluster size:
```bash
$ hydro deploy topolotree_latency.hydro.py -- gcp pn,pn_delta,topolo 2,3,4,5,6,7,8 1 1
```

To collect latency vs throughput:
```bash
$ hydro deploy topolotree_latency.hydro.py -- gcp pn,pn_delta,topolo 6 1/1,2/1,4/1,8/1,16/1,32/1,64/1,128/1,256/1,512/1,1024/1,1024/2,1024/4,1024/8
```
88 changes: 88 additions & 0 deletions topolotree/plot_latency_vs_throughput.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
# See https://stackoverflow.com/a/19521297/3187068
import matplotlib
matplotlib.use('pdf')
font = {'size': 16}
matplotlib.rc('font', **font)

from typing import Any, List
import argparse
import matplotlib.pyplot as plt
import numpy as np
import os
import pandas as pd
import re

markers = ["o", "^", "s", "D", "p", "P", "X", "d"]

def plot_lt(throughput_rows: pd.DataFrame, latency_rows: pd.DataFrame, ax: plt.Axes, marker: str, label: str) -> None:
throughput = throughput_rows["mean"]# / 1000
throughput_std = throughput_rows["std"]# / 1000
latency = latency_rows["percentile_50"] / 1000
line = ax.plot(throughput, latency, marker, label=label, linewidth=2)[0]
ax.fill_betweenx(latency,
throughput - throughput_std,
throughput + throughput_std,
color = line.get_color(),
alpha=0.25)


def main(args) -> None:
fig, ax = plt.subplots(1, 1, figsize=(8, 4))
ax.set_ylim(top=25)
ax.set_xscale('log')
ax.get_xaxis().set_major_formatter(matplotlib.ticker.ScalarFormatter())

dfs = pd.read_csv(args.results)

# Abbreviate fields.
for i, df in enumerate([dfs.groupby(["protocol"])]):
for protocol, group in df:
throughput_rows = group[group["kind"] == "total_throughput"]
latency_rows = group[group["kind"] == "latency"]

if protocol == "pn":
protocol = "PN-Counter"
elif protocol == "pn_delta":
protocol = "\"Delta-PN\""
elif protocol == "topolo":
protocol = "OnceTree"

plot_lt(throughput_rows, latency_rows, ax, markers[i] + "-", protocol)

ax.set_title('')
ax.set_xlabel('Throughput (ops / second)')
ax.set_ylabel('Median Latency (ms)')
ax.legend(loc='upper right')
ax.grid()
fig.savefig(args.output, bbox_inches='tight')
print(f'Wrote plot to {args.output}.')

fig_leg = plt.figure(figsize=(len(args.title)*3, 0.5))
ax_leg = fig_leg.add_subplot(111)
# add the legend from the previous axes
ax_leg.legend(*ax.get_legend_handles_labels(), loc='center', ncol=len(args.title))
# hide the axes frame and the x/y labels
ax_leg.axis('off')
# fig_leg.savefig('legend.pdf')


def get_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser()

parser.add_argument('--results',
type=argparse.FileType('r'),
help='results.csv file')
parser.add_argument('--title',
type=str,
help='Title for each experiment')
parser.add_argument('--output',
type=str,
default='compartmentalized_lt.pdf',
help='Output filename')

return parser


if __name__ == '__main__':
parser = get_parser()
main(parser.parse_args())
Loading

0 comments on commit 09966d2

Please sign in to comment.