Skip to content

Commit

Permalink
feat(hydroflow): auto-configure Hydro Deploy based on Hydroflow+ plans
Browse files Browse the repository at this point in the history
  • Loading branch information
shadaj committed Dec 15, 2023
1 parent f89f7ca commit d36d898
Show file tree
Hide file tree
Showing 22 changed files with 696 additions and 174 deletions.
13 changes: 13 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ members = [
"hydroflow_lang",
"hydroflow_macro",
"hydroflow_plus",
"hydroflow_plus_cli_integration",
"hydroflow_plus_test",
"hydroflow_plus_test_macro",
"lattices",
Expand Down
4 changes: 2 additions & 2 deletions hydro_cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ documentation = "https://docs.rs/hydro_cli/"
description = "Hydro Deploy Command Line Interface"

[lib]
name = "hydro"
name = "hydro_cli"
# "cdylib" is necessary to produce a shared library for Python to import from.
crate-type = ["cdylib"]
crate-type = ["lib", "cdylib"]

[dependencies]
tokio = { version = "1.16", features = [ "full" ] }
Expand Down
17 changes: 16 additions & 1 deletion hydro_cli/src/core/custom_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::sync::{Arc, Weak};

use anyhow::{bail, Result};
use async_trait::async_trait;
use hydroflow_cli_integration::ServerPort;
use hydroflow_cli_integration::{ConnectedDirect, ServerPort};
use tokio::sync::RwLock;

use super::hydroflow_crate::ports::{
Expand Down Expand Up @@ -32,6 +32,10 @@ impl CustomService {
launched_host: None,
}
}

pub fn create_port(&self, self_arc: &Arc<RwLock<Self>>) -> CustomClientPort {
CustomClientPort::new(Arc::downgrade(self_arc))
}
}

#[async_trait]
Expand Down Expand Up @@ -95,6 +99,17 @@ impl CustomClientPort {
.load_instantiated(&|p| p)
.await
}

pub async fn connect(&self) -> ConnectedDirect {
self.client_port
.as_ref()
.unwrap()
.load_instantiated(&|p| p)
.await
.instantiate()
.connect::<ConnectedDirect>()
.await
}
}

impl HydroflowSource for CustomClientPort {
Expand Down
99 changes: 62 additions & 37 deletions hydro_cli/src/core/deployment.rs
Original file line number Diff line number Diff line change
@@ -1,40 +1,79 @@
use std::sync::{Arc, Weak};
use std::path::PathBuf;
use std::sync::Arc;

use anyhow::Result;
use futures::{StreamExt, TryStreamExt};
use tokio::sync::RwLock;

use super::{progress, Host, ResourcePool, ResourceResult, Service};
use super::{
progress, CustomService, Host, HydroflowCrate, LocalhostHost, ResourcePool, ResourceResult,
Service,
};

#[derive(Default)]
pub struct Deployment {
pub hosts: Vec<Arc<RwLock<dyn Host>>>,
pub services: Vec<Weak<RwLock<dyn Service>>>,
pub services: Vec<Arc<RwLock<dyn Service>>>,
pub resource_pool: ResourcePool,
last_resource_result: Option<Arc<ResourceResult>>,
next_host_id: usize,
next_service_id: usize,
}

impl Deployment {
pub fn new() -> Self {
Self::default()
}

#[allow(non_snake_case)]
pub fn Localhost(&mut self) -> Arc<RwLock<LocalhostHost>> {
self.add_host(LocalhostHost::new)
}

#[allow(non_snake_case)]
pub fn CustomService(
&mut self,
on: Arc<RwLock<dyn Host>>,
external_ports: Vec<u16>,
) -> Arc<RwLock<CustomService>> {
self.add_service(|id| CustomService::new(id, on, external_ports))
}

#[allow(non_snake_case, clippy::too_many_arguments)]
pub fn HydroflowCrate(
&mut self,
src: impl Into<PathBuf>,
on: Arc<RwLock<dyn Host>>,
bin: Option<String>,
example: Option<String>,
profile: Option<String>,
features: Option<Vec<String>>,
args: Option<Vec<String>>,
display_id: Option<String>,
external_ports: Vec<u16>,
) -> Arc<RwLock<HydroflowCrate>> {
self.add_service(|id| {
crate::core::HydroflowCrate::new(
id,
src.into(),
on,
bin,
example,
profile,
features,
args,
display_id,
external_ports,
)
})
}

pub async fn deploy(&mut self) -> Result<()> {
progress::ProgressTracker::with_group("deploy", None, || async {
let mut resource_batch = super::ResourceBatch::new();
let active_services = self
.services
.iter()
.filter(|service| service.upgrade().is_some())
.cloned()
.collect::<Vec<_>>();
self.services = active_services;

for service in self.services.iter_mut() {
service
.upgrade()
.unwrap()
.write()
.await
.collect_resources(&mut resource_batch);
service.write().await.collect_resources(&mut resource_batch);
}

for host in self.hosts.iter_mut() {
Expand Down Expand Up @@ -66,14 +105,8 @@ impl Deployment {
let services_future = self
.services
.iter_mut()
.map(|service: &mut Weak<RwLock<dyn Service>>| async {
service
.upgrade()
.unwrap()
.write()
.await
.deploy(&result)
.await
.map(|service: &mut Arc<RwLock<dyn Service>>| async {
service.write().await.deploy(&result).await
})
.collect::<Vec<_>>();

Expand All @@ -87,8 +120,8 @@ impl Deployment {
let all_services_ready =
self.services
.iter()
.map(|service: &Weak<RwLock<dyn Service>>| async {
service.upgrade().unwrap().write().await.ready().await?;
.map(|service: &Arc<RwLock<dyn Service>>| async {
service.write().await.ready().await?;
Ok(()) as Result<()>
});

Expand All @@ -102,20 +135,12 @@ impl Deployment {
}

pub async fn start(&mut self) -> Result<()> {
let active_services = self
.services
.iter()
.filter(|service| service.upgrade().is_some())
.cloned()
.collect::<Vec<_>>();
self.services = active_services;

progress::ProgressTracker::with_group("start", None, || {
let all_services_start =
self.services
.iter()
.map(|service: &Weak<RwLock<dyn Service>>| async {
service.upgrade().unwrap().write().await.start().await?;
.map(|service: &Arc<RwLock<dyn Service>>| async {
service.write().await.start().await?;
Ok(()) as Result<()>
});

Expand Down Expand Up @@ -144,7 +169,7 @@ impl Deployment {
self.next_service_id += 1;

let dyn_arc: Arc<RwLock<dyn Service>> = arc.clone();
self.services.push(Arc::downgrade(&dyn_arc));
self.services.push(dyn_arc);
arc
}
}
8 changes: 4 additions & 4 deletions hydro_cli/src/core/localhost.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,15 +128,15 @@ pub fn create_broadcast<T: AsyncRead + Send + Unpin + 'static>(
}

if let Some(cli_receivers) = weak_cli_receivers.upgrade() {
let cli_receivers = cli_receivers.write().await;
for r in cli_receivers.iter() {
let mut cli_receivers = cli_receivers.write().await;
for r in cli_receivers.drain(..) {
r.close();
}
}

if let Some(receivers) = weak_receivers.upgrade() {
let receivers = receivers.write().await;
for r in receivers.iter() {
let mut receivers = receivers.write().await;
for r in receivers.drain(..) {
r.close();
}
}
Expand Down
37 changes: 16 additions & 21 deletions hydro_cli/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,10 +159,7 @@ impl Deployment {

#[allow(non_snake_case)]
fn Localhost(&self, py: Python<'_>) -> PyResult<Py<PyAny>> {
let arc = self
.underlying
.blocking_write()
.add_host(crate::core::LocalhostHost::new);
let arc = self.underlying.blocking_write().Localhost();

Ok(Py::new(
py,
Expand Down Expand Up @@ -214,9 +211,10 @@ impl Deployment {
on: &Host,
external_ports: Vec<u16>,
) -> PyResult<Py<PyAny>> {
let service = self.underlying.blocking_write().add_service(|id| {
crate::core::CustomService::new(id, on.underlying.clone(), external_ports)
});
let service = self
.underlying
.blocking_write()
.CustomService(on.underlying.clone(), external_ports);

Ok(Py::new(
py,
Expand Down Expand Up @@ -244,20 +242,17 @@ impl Deployment {
display_id: Option<String>,
external_ports: Option<Vec<u16>>,
) -> PyResult<Py<PyAny>> {
let service = self.underlying.blocking_write().add_service(|id| {
crate::core::HydroflowCrate::new(
id,
src.into(),
on.underlying.clone(),
bin,
example,
profile,
features,
args,
display_id,
external_ports.unwrap_or_default(),
)
});
let service = self.underlying.blocking_write().HydroflowCrate(
src,
on.underlying.clone(),
bin,
example,
profile,
features,
args,
display_id,
external_ports.unwrap_or_default(),
);

Ok(Py::new(
py,
Expand Down
6 changes: 6 additions & 0 deletions hydroflow_cli_integration/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,12 @@ pub enum ServerPort {
Null,
}

impl ServerPort {
pub fn instantiate(&self) -> ServerOrBound {
ServerOrBound::Server(self.into())
}
}

#[derive(Debug)]
pub enum RealizedServerPort {
UnixSocket(JoinHandle<io::Result<UnixStream>>),
Expand Down
Loading

0 comments on commit d36d898

Please sign in to comment.