diff --git a/hydro_cli/src/core/custom_service.rs b/hydro_cli/src/core/custom_service.rs index a55eecb4d702..3f5695f3a90a 100644 --- a/hydro_cli/src/core/custom_service.rs +++ b/hydro_cli/src/core/custom_service.rs @@ -51,14 +51,15 @@ impl Service for CustomService { } } - async fn deploy(&mut self, resource_result: &Arc) { + async fn deploy(&mut self, resource_result: &Arc) -> Result<()> { if self.launched_host.is_some() { - return; + return Ok(()); } let mut host_write = self.on.write().await; let launched = host_write.provision(resource_result); self.launched_host = Some(launched.await); + Ok(()) } async fn ready(&mut self) -> Result<()> { diff --git a/hydro_cli/src/core/deployment.rs b/hydro_cli/src/core/deployment.rs index cab890d1dd90..7cc7a1442239 100644 --- a/hydro_cli/src/core/deployment.rs +++ b/hydro_cli/src/core/deployment.rs @@ -1,6 +1,7 @@ use std::sync::{Arc, Weak}; use anyhow::Result; +use futures::{StreamExt, TryStreamExt}; use tokio::sync::RwLock; use super::{progress, Host, ResourcePool, ResourceResult, Service}; @@ -62,22 +63,25 @@ impl Deployment { .await; progress::ProgressTracker::with_group("deploy", None, || { - let services_future = - self.services - .iter_mut() - .map(|service: &mut Weak>| async { - service - .upgrade() - .unwrap() - .write() - .await - .deploy(&result) - .await; - }); - - futures::future::join_all(services_future) + let services_future = self + .services + .iter_mut() + .map(|service: &mut Weak>| async { + service + .upgrade() + .unwrap() + .write() + .await + .deploy(&result) + .await + }) + .collect::>(); + + futures::stream::iter(services_future) + .buffer_unordered(8) + .try_fold((), |_, _| async { Ok(()) }) }) - .await; + .await?; progress::ProgressTracker::with_group("ready", None, || { let all_services_ready = diff --git a/hydro_cli/src/core/gcp.rs b/hydro_cli/src/core/gcp.rs index 1dd250f838b6..8c236e84bcf0 100644 --- a/hydro_cli/src/core/gcp.rs +++ b/hydro_cli/src/core/gcp.rs @@ -107,10 +107,11 @@ impl LaunchedSSHHost for LaunchedComputeEngine { let mut config = SessionConfiguration::new(); config.set_compress(true); - let mut session = - AsyncSession::::connect(target_addr, Some(config)).await?; - tokio::time::timeout(Duration::from_secs(15), async move { + let mut session = + AsyncSession::::connect(target_addr, Some(config)) + .await?; + session.handshake().await?; session diff --git a/hydro_cli/src/core/hydroflow_crate/mod.rs b/hydro_cli/src/core/hydroflow_crate/mod.rs index 19f1d6f034e8..247cea86af1b 100644 --- a/hydro_cli/src/core/hydroflow_crate/mod.rs +++ b/hydro_cli/src/core/hydroflow_crate/mod.rs @@ -38,7 +38,8 @@ pub struct HydroflowCrate { /// Configuration for the ports that this service will listen on a port for. port_to_bind: HashMap, - built_binary: Option>>, + building_binary: Option>>, + built_binary: Option, launched_host: Option>, /// A map of port names to config for how other services can connect to this one. @@ -77,6 +78,7 @@ impl HydroflowCrate { external_ports, port_to_server: HashMap::new(), port_to_bind: HashMap::new(), + building_binary: None, built_binary: None, launched_host: None, server_defns: Arc::new(RwLock::new(HashMap::new())), @@ -191,7 +193,7 @@ impl Service for HydroflowCrate { } let built = self.build(); - self.built_binary = Some(built); + self.building_binary = Some(built); let mut host = self .on @@ -208,9 +210,9 @@ impl Service for HydroflowCrate { } } - async fn deploy(&mut self, resource_result: &Arc) { + async fn deploy(&mut self, resource_result: &Arc) -> Result<()> { if self.launched_host.is_some() { - return; + return Ok(()); } ProgressTracker::with_group( @@ -221,11 +223,18 @@ impl Service for HydroflowCrate { None, || async { let mut host_write = self.on.write().await; - let launched = host_write.provision(resource_result); - self.launched_host = Some(launched.await); + let launched = host_write.provision(resource_result).await; + + let built = self.building_binary.take().unwrap().await??.clone(); + + launched.copy_binary(built.clone()).await?; + + self.built_binary = Some(built); + self.launched_host = Some(launched); + Ok(()) }, ) - .await; + .await } async fn ready(&mut self) -> Result<()> { @@ -242,7 +251,7 @@ impl Service for HydroflowCrate { || async { let launched_host = self.launched_host.as_ref().unwrap(); - let built = self.built_binary.take().unwrap().await??.clone(); + let built = self.built_binary.as_ref().unwrap().clone(); let args = self.args.as_ref().cloned().unwrap_or_default(); let binary = launched_host diff --git a/hydro_cli/src/core/localhost.rs b/hydro_cli/src/core/localhost.rs index d2546d49200b..090969e24d4d 100644 --- a/hydro_cli/src/core/localhost.rs +++ b/hydro_cli/src/core/localhost.rs @@ -126,6 +126,20 @@ pub fn create_broadcast( break; } } + + if let Some(cli_receivers) = weak_cli_receivers.upgrade() { + let cli_receivers = cli_receivers.write().await; + for r in cli_receivers.iter() { + r.close(); + } + } + + if let Some(receivers) = weak_receivers.upgrade() { + let receivers = receivers.write().await; + for r in receivers.iter() { + r.close(); + } + } }); (cli_receivers, receivers) @@ -161,6 +175,10 @@ impl LaunchedHost for LaunchedLocalhost { } } + async fn copy_binary(&self, _binary: Arc<(String, Vec, PathBuf)>) -> Result<()> { + Ok(()) + } + async fn launch_binary( &self, id: String, diff --git a/hydro_cli/src/core/mod.rs b/hydro_cli/src/core/mod.rs index 9b6fafdbe43d..207f04e4a227 100644 --- a/hydro_cli/src/core/mod.rs +++ b/hydro_cli/src/core/mod.rs @@ -90,6 +90,8 @@ pub trait LaunchedHost: Send + Sync { /// to listen to network connections (such as the IP address to bind to). fn server_config(&self, strategy: &ServerStrategy) -> ServerBindConfig; + async fn copy_binary(&self, binary: Arc<(String, Vec, PathBuf)>) -> Result<()>; + async fn launch_binary( &self, id: String, @@ -186,7 +188,7 @@ pub trait Service: Send + Sync { fn collect_resources(&mut self, resource_batch: &mut ResourceBatch); /// Connects to the acquired resources and prepares the service to be launched. - async fn deploy(&mut self, resource_result: &Arc); + async fn deploy(&mut self, resource_result: &Arc) -> Result<()>; /// Launches the service, which should start listening for incoming network /// connections. The service should not start computing at this point. diff --git a/hydro_cli/src/core/ssh.rs b/hydro_cli/src/core/ssh.rs index b5cef3f255dd..99f8099d0681 100644 --- a/hydro_cli/src/core/ssh.rs +++ b/hydro_cli/src/core/ssh.rs @@ -104,12 +104,7 @@ impl LaunchedHost for T { LaunchedSSHHost::server_config(self, bind_type) } - async fn launch_binary( - &self, - id: String, - binary: Arc<(String, Vec, PathBuf)>, - args: &[String], - ) -> Result>> { + async fn copy_binary(&self, binary: Arc<(String, Vec, PathBuf)>) -> Result<()> { let session = self.open_ssh_session().await?; let sftp = async_retry( @@ -172,6 +167,22 @@ impl LaunchedHost for T { } drop(sftp); + Ok(()) + } + + async fn launch_binary( + &self, + id: String, + binary: Arc<(String, Vec, PathBuf)>, + args: &[String], + ) -> Result>> { + let session = self.open_ssh_session().await?; + + let unique_name = &binary.0; + + let user = self.ssh_user(); + let binary_path = PathBuf::from(format!("/home/{user}/hydro-{unique_name}")); + let channel = ProgressTracker::leaf( format!("launching binary /home/{user}/hydro-{unique_name}"), async { diff --git a/hydro_cli/src/core/terraform.rs b/hydro_cli/src/core/terraform.rs index f89af014df69..9703c1a67d49 100644 --- a/hydro_cli/src/core/terraform.rs +++ b/hydro_cli/src/core/terraform.rs @@ -37,7 +37,8 @@ impl TerraformPool { .current_dir(deployment_folder.path()) .arg("apply") .arg("-auto-approve") - .arg("-no-color"); + .arg("-no-color") + .arg("-parallelism=128"); #[cfg(unix)] { @@ -276,7 +277,7 @@ impl TerraformApply { } } -fn destroy_deployment(deployment_folder: &TempDir) { +fn destroy_deployment(deployment_folder: TempDir) { println!( "Destroying terraform deployment at {}", deployment_folder.path().display() @@ -288,6 +289,7 @@ fn destroy_deployment(deployment_folder: &TempDir) { .arg("destroy") .arg("-auto-approve") .arg("-no-color") + .arg("-parallelism=128") .stdout(Stdio::piped()); #[cfg(unix)] @@ -306,6 +308,8 @@ fn destroy_deployment(deployment_folder: &TempDir) { .expect("Failed to destroy terraform deployment") .success() { + // prevent the folder from being deleted + let _ = deployment_folder.into_path(); eprintln!("WARNING: failed to destroy terraform deployment"); } } @@ -329,7 +333,7 @@ impl Drop for TerraformApply { } } - if let Some(deployment_folder) = &self.deployment_folder { + if let Some(deployment_folder) = self.deployment_folder.take() { destroy_deployment(deployment_folder); } } @@ -360,7 +364,7 @@ pub struct TerraformResult { impl Drop for TerraformResult { fn drop(&mut self) { - if let Some(deployment_folder) = &self.deployment_folder { + if let Some(deployment_folder) = self.deployment_folder.take() { destroy_deployment(deployment_folder); } } diff --git a/topolotree/.gitignore b/topolotree/.gitignore index afed0735dc96..5919a65fa502 100644 --- a/topolotree/.gitignore +++ b/topolotree/.gitignore @@ -1 +1,2 @@ *.csv +*.pdf diff --git a/topolotree/README.md b/topolotree/README.md new file mode 100644 index 000000000000..28142a1e2c04 --- /dev/null +++ b/topolotree/README.md @@ -0,0 +1,9 @@ +To collect memory/latency versus cluster size: +```bash +$ hydro deploy topolotree_latency.hydro.py -- gcp pn,pn_delta,topolo 2,3,4,5,6,7,8 1 1 +``` + +To collect latency vs throughput: +```bash +$ hydro deploy topolotree_latency.hydro.py -- gcp pn,pn_delta,topolo 6 1/1,2/1,4/1,8/1,16/1,32/1,64/1,128/1,256/1,512/1,1024/1,1024/2,1024/4,1024/8 +``` \ No newline at end of file diff --git a/topolotree/plot_latency_vs_throughput.py b/topolotree/plot_latency_vs_throughput.py new file mode 100644 index 000000000000..b26cd354480d --- /dev/null +++ b/topolotree/plot_latency_vs_throughput.py @@ -0,0 +1,88 @@ +# See https://stackoverflow.com/a/19521297/3187068 +import matplotlib +matplotlib.use('pdf') +font = {'size': 16} +matplotlib.rc('font', **font) + +from typing import Any, List +import argparse +import matplotlib.pyplot as plt +import numpy as np +import os +import pandas as pd +import re + +markers = ["o", "^", "s", "D", "p", "P", "X", "d"] + +def plot_lt(throughput_rows: pd.DataFrame, latency_rows: pd.DataFrame, ax: plt.Axes, marker: str, label: str) -> None: + throughput = throughput_rows["mean"]# / 1000 + throughput_std = throughput_rows["std"]# / 1000 + latency = latency_rows["percentile_50"] / 1000 + line = ax.plot(throughput, latency, marker, label=label, linewidth=2)[0] + ax.fill_betweenx(latency, + throughput - throughput_std, + throughput + throughput_std, + color = line.get_color(), + alpha=0.25) + + +def main(args) -> None: + fig, ax = plt.subplots(1, 1, figsize=(8, 4)) + ax.set_ylim(top=25) + ax.set_xscale('log') + ax.get_xaxis().set_major_formatter(matplotlib.ticker.ScalarFormatter()) + + dfs = pd.read_csv(args.results) + + # Abbreviate fields. + for i, df in enumerate([dfs.groupby(["protocol"])]): + for protocol, group in df: + throughput_rows = group[group["kind"] == "total_throughput"] + latency_rows = group[group["kind"] == "latency"] + + if protocol == "pn": + protocol = "PN-Counter" + elif protocol == "pn_delta": + protocol = "\"Delta-PN\"" + elif protocol == "topolo": + protocol = "OnceTree" + + plot_lt(throughput_rows, latency_rows, ax, markers[i] + "-", protocol) + + ax.set_title('') + ax.set_xlabel('Throughput (ops / second)') + ax.set_ylabel('Median Latency (ms)') + ax.legend(loc='upper right') + ax.grid() + fig.savefig(args.output, bbox_inches='tight') + print(f'Wrote plot to {args.output}.') + + fig_leg = plt.figure(figsize=(len(args.title)*3, 0.5)) + ax_leg = fig_leg.add_subplot(111) + # add the legend from the previous axes + ax_leg.legend(*ax.get_legend_handles_labels(), loc='center', ncol=len(args.title)) + # hide the axes frame and the x/y labels + ax_leg.axis('off') + # fig_leg.savefig('legend.pdf') + + +def get_parser() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser() + + parser.add_argument('--results', + type=argparse.FileType('r'), + help='results.csv file') + parser.add_argument('--title', + type=str, + help='Title for each experiment') + parser.add_argument('--output', + type=str, + default='compartmentalized_lt.pdf', + help='Output filename') + + return parser + + +if __name__ == '__main__': + parser = get_parser() + main(parser.parse_args()) \ No newline at end of file diff --git a/topolotree/plot_memory_latency.py b/topolotree/plot_memory_latency.py new file mode 100644 index 000000000000..636b8fb6727f --- /dev/null +++ b/topolotree/plot_memory_latency.py @@ -0,0 +1,101 @@ +# See https://stackoverflow.com/a/19521297/3187068 +import matplotlib +matplotlib.use('pdf') +font = {'size': 16} +matplotlib.rc('font', **font) + +from typing import Any, List +import argparse +import matplotlib.pyplot as plt +import numpy as np +import os +import pandas as pd +import re + +markers = ["o", "^", "s", "D", "p", "P", "X", "d"] + +def plot_lt(rows: pd.DataFrame, ax: plt.Axes, marker: str, label: str, scale: int) -> None: + # throughput_std = rows["std"] / scale + # throughput_mean = rows["mean"] / scale + throughput_med = rows["percentile_50"] / scale + throughput_low = rows["percentile_25"] / scale + throughput_high = rows["percentile_75"] / scale + cluster_depth = [2 ** v for v in rows["tree_depth"]] + line = ax.plot(cluster_depth, throughput_med, marker, label=label, linewidth=2)[0] + ax.fill_between(cluster_depth, + throughput_low, + throughput_high, + color = line.get_color(), + alpha=0.25) + + +def plot_lt_min(rows: pd.DataFrame, ax: plt.Axes, marker: str, label: str, scale: int) -> None: + throughput_med = rows["min"] / scale + cluster_depth = rows["tree_depth"] + line = ax.plot(cluster_depth, throughput_med, marker, label=label, linewidth=2)[0] + +def main(args) -> None: + fig, ax = plt.subplots(1, 1, figsize=(8, 4)) + # ax.set_yscale('log') + + if args.key == "memory_delta": + ax.set_ylim(top=400) + ax.set_xscale('log', base=2) + + dfs = pd.read_csv(args.results) + + + # Abbreviate fields. + for i, df in enumerate([dfs.groupby(["protocol"])]): + for protocol, group in df: + if protocol == "pn": + protocol = "PN-Counter" + elif protocol == "pn_delta": + protocol = "\"Delta-PN\"" + elif protocol == "topolo": + protocol = "OnceTree" + if args.key == "memory_delta": + plot_lt(group[group["kind"] == args.key], ax, markers[i] + "-", protocol, 1000 if args.key == 'latency' else 1000000) + else: + plot_lt(group[group["kind"] == args.key], ax, markers[i] + "-", protocol, 1000 if args.key == 'latency' else 1000000) + + ax.set_title('') + ax.set_xlabel('# of nodes (log scale)') + ax.set_ylabel('Latency (ms)' if args.key == 'latency' else 'Memory (MB)') + ax.legend(loc='upper left') + ax.grid() + fig.savefig(args.output, bbox_inches='tight') + print(f'Wrote plot to {args.output}.') + + fig_leg = plt.figure(figsize=(len(args.title)*3, 0.5)) + ax_leg = fig_leg.add_subplot(111) + # add the legend from the previous axes + ax_leg.legend(*ax.get_legend_handles_labels(), loc='center', ncol=len(args.title)) + # hide the axes frame and the x/y labels + ax_leg.axis('off') + # fig_leg.savefig('legend.pdf') + + +def get_parser() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser() + + parser.add_argument('--results', + type=argparse.FileType('r'), + help='results.csv file') + parser.add_argument('--title', + type=str, + help='Title for each experiment') + parser.add_argument('--key', + type=str, + help='Key') + parser.add_argument('--output', + type=str, + default='compartmentalized_lt.pdf', + help='Output filename') + + return parser + + +if __name__ == '__main__': + parser = get_parser() + main(parser.parse_args()) \ No newline at end of file diff --git a/topolotree/src/latency_measure.rs b/topolotree/src/latency_measure.rs index 37b9b09ccf1f..6384207ae4e3 100644 --- a/topolotree/src/latency_measure.rs +++ b/topolotree/src/latency_measure.rs @@ -1,5 +1,6 @@ use std::collections::HashMap; -use std::sync::atomic::AtomicU64; +use std::io::Write; +use std::sync::atomic::{AtomicBool, AtomicU64}; use std::sync::{mpsc, Arc}; use std::thread; use std::time::Instant; @@ -34,10 +35,12 @@ async fn main() { let atomic_counter = Arc::new(AtomicU64::new(0)); let atomic_borrow = atomic_counter.clone(); + let atomic_keep_running = Arc::new(AtomicBool::new(true)); + let atomic_keep_running_clone = atomic_keep_running.clone(); let (latency_sender, latency_receiver) = mpsc::channel::(); - thread::spawn(move || { + let printer_thread = thread::spawn(move || { let mut last_instant = Instant::now(); - loop { + while atomic_keep_running_clone.load(std::sync::atomic::Ordering::Relaxed) { thread::sleep(std::time::Duration::from_millis(100)); let now = Instant::now(); let counter = atomic_borrow.swap(0, std::sync::atomic::Ordering::Relaxed); @@ -48,6 +51,8 @@ async fn main() { while let Ok(latency) = latency_receiver.try_recv() { println!("latency,{}", latency); } + + std::io::stdout().flush().unwrap() } }); @@ -68,19 +73,17 @@ async fn main() { let inc_sender = inc_sender.clone(); let latency_sender = latency_sender.clone(); let atomic_counter = atomic_counter.clone(); + let keep_running = atomic_keep_running.clone(); tokio::spawn(async move { #[cfg(debug_assertions)] let mut count_tracker = HashMap::new(); let mut next_base: u64 = 0; - loop { - let id = ((((next_base % keys_per_partition) - + (partition_n * keys_per_partition)) - / (num_clients)) - * num_clients) - + i; - next_base += 1; + while keep_running.load(std::sync::atomic::Ordering::Relaxed) { + let id = (partition_n * keys_per_partition) + + ((((next_base % keys_per_partition) / num_clients) * num_clients) + i); + next_base = next_base.wrapping_add(1); let increment = rand::random::(); let change = if increment { 1 } else { -1 }; let start = Instant::now(); @@ -93,10 +96,12 @@ async fn main() { { let count = count_tracker.entry(id).or_insert(0); *count += change; - assert!(*count == received); + assert_eq!(*count, received); } - latency_sender.send(start.elapsed().as_micros()).unwrap(); + if next_base % 100 == 0 { + latency_sender.send(start.elapsed().as_micros()).unwrap(); + } atomic_counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed); } @@ -113,7 +118,7 @@ async fn main() { continue; } - if queues[(updated.key % num_clients) as usize] + if queues[((updated.key % keys_per_partition) % num_clients) as usize] .send(updated.value) .is_err() { @@ -125,5 +130,11 @@ async fn main() { let mut line = String::new(); std::io::stdin().read_line(&mut line).unwrap(); assert!(line.starts_with("stop")); + + atomic_keep_running.store(false, std::sync::atomic::Ordering::Relaxed); + printer_thread.join().unwrap(); + + println!("end"); + std::process::exit(0); } diff --git a/topolotree/src/main.rs b/topolotree/src/main.rs index 339ea192418f..7908f18de948 100644 --- a/topolotree/src/main.rs +++ b/topolotree/src/main.rs @@ -2,7 +2,7 @@ mod tests; use std::cell::RefCell; -use std::collections::{HashMap, HashSet}; +use std::collections::HashMap; use std::fmt::{Debug, Display}; use std::io; use std::rc::Rc; @@ -32,9 +32,11 @@ impl Display for NodeID { type PostNeighborJoin = (((u64, Option), (i64, usize)), NodeID); +type ContributionAgg = + Rc, (Timestamped, usize)>>>>; + fn run_topolotree( - self_id: u32, - init_neighbors: Vec, + neighbors: Vec, input_recv: impl Stream> + Unpin + 'static, increment_requests: impl Stream> + Unpin + 'static, output_send: tokio::sync::mpsc::UnboundedSender<(u32, Bytes)>, @@ -59,128 +61,105 @@ fn run_topolotree( parsed_input = source_stream(input_recv) -> map(Result::unwrap) -> map(|(src, x)| (NodeID(src), deserialize_from_bytes::(&x).unwrap())) - -> demux(|(src, msg), var_args!(payload, ping, pong, neighbor_of_neighbor)| { + -> demux(|(src, msg), var_args!(payload, ping, pong)| { match msg { TopolotreeMessage::Payload(p) => payload.give((src, p)), TopolotreeMessage::Ping() => ping.give((src, ())), TopolotreeMessage::Pong() => pong.give((src, ())), - TopolotreeMessage::NeighborOfNeighbor(its_neighbor, add) => neighbor_of_neighbor.give((src, (NodeID(its_neighbor), add))) } }); - from_neighbors = parsed_input[payload]; + from_neighbors = parsed_input[payload] -> tee(); pings = parsed_input[ping] -> tee(); pongs = parsed_input[pong] -> tee(); - neighbor_of_neighbor_ops = parsed_input[neighbor_of_neighbor] -> tee(); - - neighbor_of_neighbor = - neighbor_of_neighbor_ops - -> map(|(src, (neighbor, add))| (src, (neighbor, add))) - -> fold_keyed::<'static>(HashSet::new, |acc: &mut HashSet, (neighbor, add)| { - if add { - acc.insert(neighbor); - } else { - acc.remove(&neighbor); - } - }) - -> flat_map(|(src, acc)| acc.into_iter().map(move |neighbor| (src, neighbor))) - -> tee(); pings -> map(|(src, _)| (src, TopolotreeMessage::Pong())) -> output; // generate a ping every second neighbors -> [0]ping_generator; source_interval(Duration::from_secs(1)) -> [1]ping_generator; - ping_generator = cross_join() + ping_generator = cross_join_multiset() -> map(|(src, _)| (src, TopolotreeMessage::Ping())) -> output; - pongs -> dead_maybe_neighbors; - pings -> dead_maybe_neighbors; - new_neighbors -> map(|neighbor| (neighbor, ())) -> dead_maybe_neighbors; // fake pong - dead_maybe_neighbors = union() -> fold_keyed::<'static>(Instant::now, |acc: &mut Instant, _| { + pongs -> dead_neighbors; + pings -> dead_neighbors; + new_neighbors -> map(|neighbor| (neighbor, ())) -> dead_neighbors; // fake pong + dead_neighbors = union() -> fold_keyed::<'static>(Instant::now, |acc: &mut Instant, _| { *acc = Instant::now(); }) -> filter_map(|(node_id, acc)| { - if acc.elapsed().as_secs() >= 5 { + if acc.elapsed().as_secs() > 5 { Some(node_id) } else { None } - }) - -> map(|n| (n, ())) - -> [0]dead_neighbors; + }) -> tee(); - neighbors -> map(|n| (n, ())) -> [1]dead_neighbors; - dead_neighbors = join() - -> map(|(n, _)| n) - -> tee(); + from_neighbors + -> map(|(_, payload): (NodeID, Payload)| payload.key) + -> touched_keys; - // TODO(shadaj): only remove when we get an ack from the new leader - dead_neighbors -> removed_neighbors; - - dead_neighbors -> map(|n| (n, ())) -> [0]min_neighbor_of_dead_neighbor; - neighbor_of_neighbor -> [1]min_neighbor_of_dead_neighbor; - min_neighbor_of_dead_neighbor = join() - -> map(|(dead, ((), neighbor))| (dead, neighbor)) - -> filter(|(_, neighbor)| neighbor.0 != self_id) - -> reduce_keyed(|acc: &mut NodeID, n: NodeID| { - if n.0 < acc.0 { - *acc = n; - } - }) - -> tee(); + operations + -> map(|op| op.key) + -> touched_keys; + + touched_keys = union() -> unique() -> [0]from_neighbors_unfiltered; from_neighbors - -> map(|(src, payload): (NodeID, Payload)| ((payload.key, src), (payload.key, payload.contents))) - -> fold_keyed::<'static>(|| (Timestamped { timestamp: -1, data: Default::default() }, 0), |acc: &mut (Timestamped, usize), (key, val): (u64, Timestamped)| { - if val.timestamp > acc.0.timestamp { - acc.0 = val; + -> map(|(src, payload): (NodeID, Payload)| (src, (payload.key, payload.contents))) + -> fold::<'static>(|| Rc::new(RefCell::new(HashMap::new())), |acc: &mut ContributionAgg, (source, (key, val)): (NodeID, (u64, Timestamped))| { + let mut acc = acc.borrow_mut(); + let key_entry = acc.entry(key).or_default(); + let src_entry = key_entry.entry(Some(source)).or_insert((Timestamped { timestamp: -1, data: 0 }, 0)); + if val.timestamp > src_entry.0.timestamp { + src_entry.0 = val; *self_timestamp1.borrow_mut().entry(key).or_insert(0) += 1; - acc.1 = context.current_tick(); + src_entry.1 = context.current_tick(); } }) - -> map(|((key, src), (payload, change_tick))| (src, ((key, Some(src)), (payload.data, change_tick)))) - -> [1]from_actual_neighbors; - - neighbors -> map(|n| (n, ())) -> [0]from_actual_neighbors; - from_actual_neighbors = join() - -> map(|(_, (_, payload))| payload) - -> from_neighbors_or_local; - - local_value = source_stream(increment_requests) + -> from_neighbors_to_filter; + + from_neighbors_to_filter = union() -> [1]from_neighbors_unfiltered; + from_neighbors_unfiltered = + cross_join() -> + flat_map(|(key, hashmap)| { + let hashmap = hashmap.borrow(); + hashmap.get(&key).iter().flat_map(|v| v.iter()).map(|t| ((key, *t.0), (t.1.0.data, t.1.1))).collect::>().into_iter() + }) -> + from_neighbors_or_local; + + operations = source_stream(increment_requests) -> map(|x| deserialize_from_bytes::(&x.unwrap()).unwrap()) + -> tee(); + local_values = operations -> inspect(|change| { *self_timestamp2.borrow_mut().entry(change.key).or_insert(0) += 1; }) -> map(|change_payload: OperationPayload| (change_payload.key, (change_payload.change, context.current_tick()))) - -> reduce_keyed::<'static>(|agg: &mut (i64, usize), change: (i64, usize)| { - agg.0 += change.0; - agg.1 = std::cmp::max(agg.1, change.1); + -> fold::<'static>(|| Rc::new(RefCell::new(HashMap::new())), |agg: &mut ContributionAgg, change: (u64, (i64, usize))| { + let mut agg = agg.borrow_mut(); + let agg_key = agg.entry(change.0).or_default(); + let agg_key = agg_key.entry(None).or_insert((Timestamped { timestamp: 0, data: 0 }, 0)); + + agg_key.0.data += change.1.0; + agg_key.1 = change.1.1; }); - local_value -> map(|(key, data)| ((key, None), data)) -> from_neighbors_or_local; + local_values -> from_neighbors_to_filter; - from_neighbors_or_local = union() -> tee(); + from_neighbors_or_local = tee(); from_neighbors_or_local -> [0]all_neighbor_data; - new_neighbors = source_iter(init_neighbors) + new_neighbors = source_iter(neighbors) -> map(NodeID) -> tee(); - new_neighbors -> map(|n| (n, true)) -> neighbors; - removed_neighbors = map(|n| (n, false)) -> neighbors; - neighbors = union() - -> map(|(neighbor, add)| (neighbor, !add)) - -> sort_by_key(|(_, remove)| remove) // process adds first - -> fold::<'static>(Vec::new, |acc: &mut Vec, (neighbor, remove): (NodeID, bool)| { - if remove { - acc.retain(|x| *x != neighbor); - } else { - acc.push(neighbor); - } - }) - -> flatten() + new_neighbors + -> persist() + -> [pos]neighbors; + dead_neighbors -> [neg]neighbors; + neighbors = difference() -> tee(); neighbors -> [1]all_neighbor_data; @@ -234,7 +213,7 @@ fn run_topolotree( #[hydroflow::main] async fn main() { let mut args = std::env::args().skip(1); - let self_id = args.next().unwrap().parse().unwrap(); + let _self_id: u32 = args.next().unwrap().parse().unwrap(); let neighbors: Vec = args.map(|x| x.parse().unwrap()).collect(); let mut ports = hydroflow::util::cli::init().await; @@ -269,25 +248,26 @@ async fn main() { tokio::task::spawn_local(async move { while let Some(msg) = chan_rx.recv().await { - output_send.send(msg).await.unwrap(); + output_send.feed(msg).await.unwrap(); + while let Ok(msg) = chan_rx.try_recv() { + output_send.feed(msg).await.unwrap(); + } + output_send.flush().await.unwrap(); } }); let (query_tx, mut query_rx) = tokio::sync::mpsc::unbounded_channel(); tokio::task::spawn_local(async move { while let Some(msg) = query_rx.recv().await { - query_responses.send(msg).await.unwrap(); + query_responses.feed(msg).await.unwrap(); + while let Ok(msg) = query_rx.try_recv() { + query_responses.feed(msg).await.unwrap(); + } + query_responses.flush().await.unwrap(); } }); - let flow = run_topolotree( - self_id, - neighbors, - input_recv, - operations_send, - chan_tx, - query_tx, - ); + let flow = run_topolotree(neighbors, input_recv, operations_send, chan_tx, query_tx); let f1 = async move { #[cfg(target_os = "linux")] diff --git a/topolotree/src/protocol.rs b/topolotree/src/protocol.rs index 4ff9aeca6e48..d8a480f59c5d 100644 --- a/topolotree/src/protocol.rs +++ b/topolotree/src/protocol.rs @@ -13,7 +13,6 @@ pub enum TopolotreeMessage { Payload(Payload), Ping(), Pong(), - NeighborOfNeighbor(u32, bool), } #[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, Hash)] diff --git a/topolotree/src/tests.rs b/topolotree/src/tests.rs index 8f998ce8c3ef..ac4112deff96 100644 --- a/topolotree/src/tests.rs +++ b/topolotree/src/tests.rs @@ -69,7 +69,6 @@ async fn simple_payload_test() { simulate_input(&mut input_send, (1, TopolotreeMessage::Payload(Payload { key: 123, contents: Timestamped { timestamp: 1, data: 2 } }))).unwrap(); let mut flow = run_topolotree( - 0, neighbors, input_recv, operations_rx, @@ -110,7 +109,6 @@ async fn idempotence_test() { }; let mut flow = run_topolotree( - 0, neighbors, input_recv, operations_rx, @@ -151,7 +149,6 @@ async fn backwards_in_time_test() { }; let mut flow = run_topolotree( - 0, neighbors, input_recv, operations_rx, @@ -192,7 +189,6 @@ async fn multiple_input_sources_test() { }; let mut flow = run_topolotree( - 0, neighbors, input_recv, operations_rx, @@ -228,7 +224,6 @@ async fn operations_across_ticks() { let (query_send, mut query_recv) = unbounded_channel::(); let mut flow = run_topolotree( - 0, neighbors, input_recv, operations_rx, @@ -290,7 +285,6 @@ async fn operations_multiple_keys() { let (query_send, mut query_recv) = unbounded_channel::(); let mut flow = run_topolotree( - 0, neighbors, input_recv, operations_rx, @@ -376,7 +370,6 @@ async fn gossip_multiple_keys() { let (query_send, mut query_recv) = unbounded_channel::(); let mut flow = run_topolotree( - 0, neighbors, input_recv, operations_rx, @@ -445,7 +438,6 @@ async fn ping_pongs() { let (query_send, mut _query_recv) = unbounded_channel::(); let mut flow = run_topolotree( - 0, neighbors, input_recv, operations_rx, @@ -476,99 +468,3 @@ async fn ping_pongs() { (1, TopolotreeMessage::Ping()) ])); } - -#[hydroflow::test(start_paused = true)] -async fn no_gossip_if_dead() { - let neighbors: Vec = vec![1, 2, 3]; - - let (mut _operations_tx, operations_rx) = unbounded_channel::>(); - let (mut input_send, input_recv) = unbounded_channel::>(); - let (output_send, mut output_recv) = unbounded_channel::<(u32, Bytes)>(); - let (query_send, mut _query_recv) = unbounded_channel::(); - - let mut flow = run_topolotree( - 0, - neighbors, - input_recv, - operations_rx, - output_send, - query_send, - ); - - #[rustfmt::skip] - { - simulate_input(&mut input_send, (1, TopolotreeMessage::Payload(Payload { key: 123, contents: Timestamped { timestamp: 0, data: 5 } }))).unwrap(); - simulate_input(&mut input_send, (2, TopolotreeMessage::Payload(Payload { key: 123, contents: Timestamped { timestamp: 0, data: 5 } }))).unwrap(); - }; - - flow.run_tick(); - - #[rustfmt::skip] - assert_eq!(read_all(&mut output_recv).await, HashMultiSet::from_iter([ - (1, TopolotreeMessage::Ping()), - (2, TopolotreeMessage::Ping()), - (3, TopolotreeMessage::Ping()), - (1, TopolotreeMessage::Payload(Payload { key: 123, contents: Timestamped { timestamp: 2, data: 5 } })), - (2, TopolotreeMessage::Payload(Payload { key: 123, contents: Timestamped { timestamp: 2, data: 5 } })), - (3, TopolotreeMessage::Payload(Payload { key: 123, contents: Timestamped { timestamp: 2, data: 10 } })), - ])); - - tokio::time::advance(Duration::from_millis(500)).await; - - #[rustfmt::skip] - { - simulate_input(&mut input_send, (1, TopolotreeMessage::Pong())).unwrap(); - simulate_input(&mut input_send, (2, TopolotreeMessage::Pong())).unwrap(); - simulate_input(&mut input_send, (3, TopolotreeMessage::Pong())).unwrap(); - }; - - flow.run_tick(); - - tokio::time::advance(Duration::from_millis(500)).await; - - #[rustfmt::skip] - { - simulate_input(&mut input_send, (1, TopolotreeMessage::Payload(Payload { key: 123, contents: Timestamped { timestamp: 2, data: 6 } }))).unwrap(); - simulate_input(&mut input_send, (2, TopolotreeMessage::Payload(Payload { key: 123, contents: Timestamped { timestamp: 2, data: 6 } }))).unwrap(); - }; - - flow.run_tick(); - - #[rustfmt::skip] - assert_eq!(read_all(&mut output_recv).await, HashMultiSet::from_iter([ - (1, TopolotreeMessage::Ping()), - (2, TopolotreeMessage::Ping()), - (3, TopolotreeMessage::Ping()), - (1, TopolotreeMessage::Payload(Payload { key: 123, contents: Timestamped { timestamp: 4, data: 6 } })), - (2, TopolotreeMessage::Payload(Payload { key: 123, contents: Timestamped { timestamp: 4, data: 6 } })), - (3, TopolotreeMessage::Payload(Payload { key: 123, contents: Timestamped { timestamp: 4, data: 12 } })), - ])); - - tokio::time::advance(Duration::from_secs(5)).await; - - #[rustfmt::skip] - { - simulate_input(&mut input_send, (1, TopolotreeMessage::Pong())).unwrap(); - simulate_input(&mut input_send, (3, TopolotreeMessage::Pong())).unwrap(); - simulate_input(&mut input_send, (1, TopolotreeMessage::Payload(Payload { key: 123, contents: Timestamped { timestamp: 3, data: 7 } }))).unwrap(); - }; - - flow.run_tick(); - - // at this point, node 2 should be marked as dead - - #[rustfmt::skip] - assert_eq!(read_all(&mut output_recv).await, HashMultiSet::from_iter([ - (1, TopolotreeMessage::Ping()), - (1, TopolotreeMessage::Ping()), - (1, TopolotreeMessage::Ping()), - (1, TopolotreeMessage::Ping()), - (1, TopolotreeMessage::Ping()), - (3, TopolotreeMessage::Ping()), - (3, TopolotreeMessage::Ping()), - (3, TopolotreeMessage::Ping()), - (3, TopolotreeMessage::Ping()), - (3, TopolotreeMessage::Ping()), - (3, TopolotreeMessage::Payload(Payload { key: 123, contents: Timestamped { timestamp: 5, data: 7 } })), - ])); -} diff --git a/topolotree/topolotree_latency.hydro.py b/topolotree/topolotree_latency.hydro.py index cfd445445f5f..11a7f7cfba8a 100644 --- a/topolotree/topolotree_latency.hydro.py +++ b/topolotree/topolotree_latency.hydro.py @@ -61,7 +61,7 @@ async def run_experiment( num_partitions = int(partitions_arg) - print(f"Launching benchmark with protocol {tree_arg}, {num_replicas} replicas, and {num_clients} clients") + print(f"Launching benchmark with protocol {tree_arg}, {num_replicas} replicas, and {num_clients} clients on {num_partitions} partitions") currently_deployed = [] @@ -74,7 +74,7 @@ def create_machine(): if is_gcp: out = deployment.GCPComputeEngineHost( project="hydro-chrisdouglas", - machine_type="n2-standard-2", + machine_type="n2-standard-4", image="debian-cloud/debian-11", region="us-west1-a", network=gcp_vpc, @@ -137,7 +137,8 @@ def create_machine(): src=str(Path(__file__).parent.absolute()), profile=profile, bin="latency_measure", - args=[str(num_clients), str(i), str(1024)], + # 2 ** 14 for memory / latency experiments + args=[str(num_clients), str(i), str((2 ** 10) // num_partitions)], on=create_machine(), ) for i in range(num_partitions)] @@ -191,23 +192,29 @@ async def latency_plotter(): elif line_split[0] == "latency": number = int(line_split[1]) # microseconds latency_per_driver[driver_idx].append(number) + elif line_split[0] == "end": + break except asyncio.CancelledError: return latency_plotter_task = asyncio.create_task(latency_plotter()) await deployment.start() - print("Started! Please wait 15 seconds to collect data.") + print("Started! Please wait 60 seconds to collect data.") - await asyncio.sleep(15) + await asyncio.sleep(60) - memory_plotter_task.cancel() - await memory_plotter_task + print("Stopping all drivers") + ignore_stderrs = await asyncio.gather(*[node.stderr() for node in all_nodes]) - latency_plotter_task.cancel() + await asyncio.gather(*[node.stop() for node in drivers]) + + print("Collecting latency logs") await latency_plotter_task - await asyncio.gather(*[node.stop() for node in drivers]) + memory_plotter_task.cancel() + await memory_plotter_task + await asyncio.gather(*[node.stop() for node in all_nodes]) def summarize(v, kind): @@ -225,6 +232,7 @@ def summarize(v, kind): summaries_file.write(tree_arg + ",") summaries_file.write(str(tree_depth) + ",") summaries_file.write(str(num_clients) + ",") + summaries_file.write(str(num_partitions) + ",") summaries_file.write(kind + ",") summaries_file.write(str(np.mean(v)) + ",") summaries_file.write(str(np.std(v)) + ",") @@ -248,8 +256,9 @@ def summarize(v, kind): print("latency:") summarize(all_latencies, "latency") - print("throughput per driver:") - summarize(all_throughputs, "throughput_per_driver") + print("total throughput:") + print(throughput_per_driver) + summarize([v * num_partitions for v in all_throughputs], "total_throughput") memory_delta = [memory[-1] - memory[0] for memory in memory_per_node] print("memory delta:") @@ -262,6 +271,8 @@ def summarize(v, kind): + str(tree_depth) + "_num_clients_" + str(num_clients) + + "_num_partitions_" + + str(num_partitions) + "_" + experiment_id + ".csv", @@ -275,6 +286,8 @@ def summarize(v, kind): + str(tree_depth) + "_num_clients_" + str(num_clients) + + "_num_partitions_" + + str(num_partitions) + "_" + experiment_id + ".csv", @@ -288,6 +301,8 @@ def summarize(v, kind): + str(tree_depth) + "_num_clients_" + str(num_clients) + + "_num_partitions_" + + str(num_partitions) + "_" + experiment_id + ".csv", @@ -308,7 +323,7 @@ async def main(args): summaries_file = open(f"summaries_{experiment_id}.csv", "w") summaries_file.write( - "protocol,tree_depth,num_clients,kind,mean,std,min,max,percentile_99,percentile_75,percentile_50,percentile_25,percentile_1" + "protocol,tree_depth,num_clients,num_partitions,kind,mean,std,min,max,percentile_99,percentile_75,percentile_50,percentile_25,percentile_1" ) deployment = hydro.Deployment() @@ -324,23 +339,25 @@ async def main(args): ) for depth_arg in args[2].split(","): - for tree_arg in args[1].split(","): - for num_clients_arg in args[3].split(","): - for num_partitions_arg in args[4].split(","): - await run_experiment( - deployment, - localhost, - "dev" if args[0] == "local" else None, - pool, - experiment_id, - summaries_file, - tree_arg, - depth_arg, - num_clients_arg, - num_partitions_arg, - args[0] == "gcp", - network, - ) + for clients_partitions in args[3].split(","): + for tree_arg in args[1].split(","): + split = clients_partitions.split("/") + num_clients_arg = split[0] + num_partitions_arg = split[1] + await run_experiment( + deployment, + localhost, + "dev" if args[0] == "local" else None, + pool, + experiment_id, + summaries_file, + tree_arg, + depth_arg, + num_clients_arg, + num_partitions_arg, + args[0] == "gcp", + network, + ) summaries_file.close()