Skip to content

Commit

Permalink
feat(topolotree): perf improvements and better deploy logic
Browse files Browse the repository at this point in the history
  • Loading branch information
shadaj committed Dec 11, 2023
1 parent d8ca3d4 commit b058b6a
Show file tree
Hide file tree
Showing 20 changed files with 458 additions and 296 deletions.
7 changes: 7 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions hydro_cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,5 +40,6 @@ nix = "0.26.2"
hydroflow_cli_integration = { path = "../hydroflow_cli_integration", version = "^0.3.0" }
indicatif = "0.17.6"
cargo_metadata = "0.15.4"
async-once-cell = "0.5.3"

[dev-dependencies]
5 changes: 3 additions & 2 deletions hydro_cli/src/core/custom_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,15 @@ impl Service for CustomService {
}
}

async fn deploy(&mut self, resource_result: &Arc<ResourceResult>) {
async fn deploy(&mut self, resource_result: &Arc<ResourceResult>) -> Result<()> {
if self.launched_host.is_some() {
return;
return Ok(());
}

let mut host_write = self.on.write().await;
let launched = host_write.provision(resource_result);
self.launched_host = Some(launched.await);
Ok(())
}

async fn ready(&mut self) -> Result<()> {
Expand Down
34 changes: 19 additions & 15 deletions hydro_cli/src/core/deployment.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::sync::{Arc, Weak};

use anyhow::Result;
use futures::{StreamExt, TryStreamExt};
use tokio::sync::RwLock;

use super::{progress, Host, ResourcePool, ResourceResult, Service};
Expand Down Expand Up @@ -62,22 +63,25 @@ impl Deployment {
.await;

progress::ProgressTracker::with_group("deploy", None, || {
let services_future =
self.services
.iter_mut()
.map(|service: &mut Weak<RwLock<dyn Service>>| async {
service
.upgrade()
.unwrap()
.write()
.await
.deploy(&result)
.await;
});

futures::future::join_all(services_future)
let services_future = self
.services
.iter_mut()
.map(|service: &mut Weak<RwLock<dyn Service>>| async {
service
.upgrade()
.unwrap()
.write()
.await
.deploy(&result)
.await
})
.collect::<Vec<_>>();

futures::stream::iter(services_future)
.buffer_unordered(8)
.try_fold((), |_, _| async { Ok(()) })
})
.await;
.await?;

progress::ProgressTracker::with_group("ready", None, || {
let all_services_ready =
Expand Down
7 changes: 4 additions & 3 deletions hydro_cli/src/core/gcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,11 @@ impl LaunchedSSHHost for LaunchedComputeEngine {
let mut config = SessionConfiguration::new();
config.set_compress(true);

let mut session =
AsyncSession::<TcpStream>::connect(target_addr, Some(config)).await?;

tokio::time::timeout(Duration::from_secs(15), async move {
let mut session =
AsyncSession::<TcpStream>::connect(target_addr, Some(config))
.await?;

session.handshake().await?;

session
Expand Down
10 changes: 5 additions & 5 deletions hydro_cli/src/core/hydroflow_crate/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use std::path::PathBuf;
use std::process::{Command, Stdio};
use std::sync::{Arc, Mutex};

use anyhow::{bail, Result};
use nanoid::nanoid;
use once_cell::sync::Lazy;
use tokio::sync::OnceCell;
Expand Down Expand Up @@ -34,7 +33,7 @@ pub async fn build_crate(
profile: Option<String>,
target_type: HostTargetType,
features: Option<Vec<String>>,
) -> Result<BuiltCrate> {
) -> Result<BuiltCrate, &'static str> {
let key = CacheKey {
src: src.clone(),
bin: bin.clone(),
Expand Down Expand Up @@ -130,12 +129,13 @@ pub async fn build_crate(
}

if spawned.wait().unwrap().success() {
bail!("cargo build succeeded but no binary was emitted")
Err("cargo build succeeded but no binary was emitted")
} else {
bail!("failed to build crate")
Err("failed to build crate")
}
})
.await?
.await
.map_err(|_| "failed to spawn blocking task")?
})
})
.await
Expand Down
55 changes: 33 additions & 22 deletions hydro_cli/src/core/hydroflow_crate/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@ use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;

use anyhow::{bail, Result};
use anyhow::{anyhow, bail, Result};
use async_channel::Receiver;
use async_trait::async_trait;
use futures_core::Future;
use hydroflow_cli_integration::ServerPort;
use tokio::sync::RwLock;
use tokio::task::JoinHandle;

use self::ports::{HydroflowPortConfig, HydroflowSink, SourcePath};
use super::progress::ProgressTracker;
Expand Down Expand Up @@ -38,7 +38,7 @@ pub struct HydroflowCrate {
/// Configuration for the ports that this service will listen on a port for.
port_to_bind: HashMap<String, ServerStrategy>,

built_binary: Option<JoinHandle<Result<BuiltCrate>>>,
built_binary: Arc<async_once_cell::OnceCell<Result<BuiltCrate, &'static str>>>,
launched_host: Option<Arc<dyn LaunchedHost>>,

/// A map of port names to config for how other services can connect to this one.
Expand Down Expand Up @@ -77,7 +77,7 @@ impl HydroflowCrate {
external_ports,
port_to_server: HashMap::new(),
port_to_bind: HashMap::new(),
built_binary: None,
built_binary: Arc::new(async_once_cell::OnceCell::new()),
launched_host: None,
server_defns: Arc::new(RwLock::new(HashMap::new())),
launched_binary: None,
Expand Down Expand Up @@ -163,23 +163,29 @@ impl HydroflowCrate {
.await
}

fn build(&mut self) -> JoinHandle<Result<BuiltCrate>> {
fn build(&self) -> impl Future<Output = Result<BuiltCrate, &'static str>> {
let src_cloned = self.src.canonicalize().unwrap();
let bin_cloned = self.bin.clone();
let example_cloned = self.example.clone();
let features_cloned = self.features.clone();
let host = self.on.clone();
let profile_cloned = self.profile.clone();
let target_type = host.try_read().unwrap().target_type();

tokio::task::spawn(build_crate(
src_cloned,
bin_cloned,
example_cloned,
profile_cloned,
target_type,
features_cloned,
))
let built_binary_cloned = self.built_binary.clone();

async move {
built_binary_cloned
.get_or_init(build_crate(
src_cloned,
bin_cloned,
example_cloned,
profile_cloned,
target_type,
features_cloned,
))
.await
.clone()
}
}
}

Expand All @@ -190,8 +196,7 @@ impl Service for HydroflowCrate {
return;
}

let built = self.build();
self.built_binary = Some(built);
tokio::task::spawn(self.build());

let mut host = self
.on
Expand All @@ -208,9 +213,9 @@ impl Service for HydroflowCrate {
}
}

async fn deploy(&mut self, resource_result: &Arc<ResourceResult>) {
async fn deploy(&mut self, resource_result: &Arc<ResourceResult>) -> Result<()> {
if self.launched_host.is_some() {
return;
return Ok(());
}

ProgressTracker::with_group(
Expand All @@ -220,12 +225,18 @@ impl Service for HydroflowCrate {
.unwrap_or_else(|| format!("service/{}", self.id)),
None,
|| async {
let built = self.build().await.clone().map_err(|e| anyhow!(e))?;

let mut host_write = self.on.write().await;
let launched = host_write.provision(resource_result);
self.launched_host = Some(launched.await);
let launched = host_write.provision(resource_result).await;

launched.copy_binary(built.clone()).await?;

self.launched_host = Some(launched);
Ok(())
},
)
.await;
.await
}

async fn ready(&mut self) -> Result<()> {
Expand All @@ -242,7 +253,7 @@ impl Service for HydroflowCrate {
|| async {
let launched_host = self.launched_host.as_ref().unwrap();

let built = self.built_binary.take().unwrap().await??.clone();
let built = self.build().await.clone().map_err(|e| anyhow!(e))?;
let args = self.args.as_ref().cloned().unwrap_or_default();

let binary = launched_host
Expand Down
18 changes: 18 additions & 0 deletions hydro_cli/src/core/localhost.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,20 @@ pub fn create_broadcast<T: AsyncRead + Send + Unpin + 'static>(
break;
}
}

if let Some(cli_receivers) = weak_cli_receivers.upgrade() {
let cli_receivers = cli_receivers.write().await;
for r in cli_receivers.iter() {
r.close();
}
}

if let Some(receivers) = weak_receivers.upgrade() {
let receivers = receivers.write().await;
for r in receivers.iter() {
r.close();
}
}
});

(cli_receivers, receivers)
Expand Down Expand Up @@ -161,6 +175,10 @@ impl LaunchedHost for LaunchedLocalhost {
}
}

async fn copy_binary(&self, _binary: Arc<(String, Vec<u8>, PathBuf)>) -> Result<()> {
Ok(())
}

async fn launch_binary(
&self,
id: String,
Expand Down
4 changes: 3 additions & 1 deletion hydro_cli/src/core/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ pub trait LaunchedHost: Send + Sync {
/// to listen to network connections (such as the IP address to bind to).
fn server_config(&self, strategy: &ServerStrategy) -> ServerBindConfig;

async fn copy_binary(&self, binary: Arc<(String, Vec<u8>, PathBuf)>) -> Result<()>;

async fn launch_binary(
&self,
id: String,
Expand Down Expand Up @@ -186,7 +188,7 @@ pub trait Service: Send + Sync {
fn collect_resources(&mut self, resource_batch: &mut ResourceBatch);

/// Connects to the acquired resources and prepares the service to be launched.
async fn deploy(&mut self, resource_result: &Arc<ResourceResult>);
async fn deploy(&mut self, resource_result: &Arc<ResourceResult>) -> Result<()>;

/// Launches the service, which should start listening for incoming network
/// connections. The service should not start computing at this point.
Expand Down
23 changes: 17 additions & 6 deletions hydro_cli/src/core/ssh.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,12 +104,7 @@ impl<T: LaunchedSSHHost> LaunchedHost for T {
LaunchedSSHHost::server_config(self, bind_type)
}

async fn launch_binary(
&self,
id: String,
binary: Arc<(String, Vec<u8>, PathBuf)>,
args: &[String],
) -> Result<Arc<RwLock<dyn LaunchedBinary>>> {
async fn copy_binary(&self, binary: Arc<(String, Vec<u8>, PathBuf)>) -> Result<()> {
let session = self.open_ssh_session().await?;

let sftp = async_retry(
Expand Down Expand Up @@ -172,6 +167,22 @@ impl<T: LaunchedSSHHost> LaunchedHost for T {
}
drop(sftp);

Ok(())
}

async fn launch_binary(
&self,
id: String,
binary: Arc<(String, Vec<u8>, PathBuf)>,
args: &[String],
) -> Result<Arc<RwLock<dyn LaunchedBinary>>> {
let session = self.open_ssh_session().await?;

let unique_name = &binary.0;

let user = self.ssh_user();
let binary_path = PathBuf::from(format!("/home/{user}/hydro-{unique_name}"));

let channel = ProgressTracker::leaf(
format!("launching binary /home/{user}/hydro-{unique_name}"),
async {
Expand Down
Loading

0 comments on commit b058b6a

Please sign in to comment.