Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(hydroflow): add initial test using Hydro CLI from Hydroflow+ #978

Merged
merged 1 commit into from
Dec 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 8 additions & 21 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -216,11 +216,6 @@ jobs:
pip install maturin
maturin develop

- name: Configure core dumps
run: |
mkdir cli-core-dumps
echo "$PWD/cli-core-dumps/corefile-%e-%p-%t" | sudo tee /proc/sys/kernel/core_pattern

- name: Run Python tests
run: |
ulimit -c unlimited
Expand All @@ -229,24 +224,16 @@ jobs:
cd python_tests
pip install -r requirements.txt
RUST_BACKTRACE=1 pytest *.py
cd ../..

- name: Print backtraces
if: ${{ failure() }}
- name: Run Hydroflow+ Python tests
run: |
cd cli-core-dumps
sudo apt-get install gdb
for file in $(ls); do
echo "Backtrace for $file"
rust-gdb -ex "thread apply all bt" -batch $PWD/../.venv/bin/python $file
done
cd ..

- name: Upload core dumps
if: ${{ failure() }}
uses: actions/upload-artifact@master
with:
name: cli-core-dumps
path: ./cli-core-dumps/*
ulimit -c unlimited
cd hydro_cli
source .venv/bin/activate
cd ../hydroflow_plus_test/python_tests
pip install -r requirements.txt
RUST_BACKTRACE=1 pytest *.py

lints:
name: Lints
Expand Down
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions docs/docs/deploy/your-first-deploy.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ Let's open up `src/main.rs` in the generated project and write a new `main` func
```rust
#[hydroflow::main]
async fn main() {
let mut ports = hydroflow::util::cli::init().await;
let ports = hydroflow::util::cli::init().await;
}
```

Expand Down Expand Up @@ -72,7 +72,7 @@ use hydroflow::hydroflow_syntax;

#[hydroflow::main]
async fn main() {
let mut ports = hydroflow::util::cli::init().await;
let ports = hydroflow::util::cli::init().await;

let input_recv = ports
.port("input")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use hydroflow_datalog::datalog;

#[hydroflow::main]
async fn main() {
let mut ports = hydroflow::util::cli::init().await;
let ports = hydroflow::util::cli::init().await;
let vote_to_participant_port = ports
.port("vote_to_participant")
.connect::<ConnectedDemux<ConnectedDirect>>()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use hydroflow_datalog::datalog;

#[hydroflow::main]
async fn main() {
let mut ports = hydroflow::util::cli::init().await;
let ports = hydroflow::util::cli::init().await;
let vote_to_participant_source = ports
.port("vote_to_participant")
.connect::<ConnectedDirect>()
Expand Down
2 changes: 1 addition & 1 deletion hydro_cli_examples/examples/dedalus_receiver/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use hydroflow_datalog::datalog;

#[hydroflow::main]
async fn main() {
let mut ports = hydroflow::util::cli::init().await;
let ports = hydroflow::util::cli::init().await;
let broadcast_recv = ports
.port("broadcast")
.connect::<ConnectedDirect>()
Expand Down
2 changes: 1 addition & 1 deletion hydro_cli_examples/examples/dedalus_sender/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use hydroflow_datalog::datalog;

#[hydroflow::main]
async fn main() {
let mut ports = hydroflow::util::cli::init().await;
let ports = hydroflow::util::cli::init().await;
let broadcast_port = ports
.port("broadcast")
.connect::<ConnectedDemux<ConnectedDirect>>()
Expand Down
2 changes: 1 addition & 1 deletion hydro_cli_examples/examples/dedalus_vote_leader/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use hydroflow_datalog::datalog;

#[hydroflow::main]
async fn main() {
let mut ports = hydroflow::util::cli::init().await;
let ports = hydroflow::util::cli::init().await;
let to_replica_port = ports
.port("to_replica")
.connect::<ConnectedDemux<ConnectedDirect>>()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use hydroflow_datalog::datalog;

#[hydroflow::main]
async fn main() {
let mut ports = hydroflow::util::cli::init().await;
let ports = hydroflow::util::cli::init().await;
let to_replica_source = ports
.port("to_replica")
.connect::<ConnectedDirect>()
Expand Down
2 changes: 1 addition & 1 deletion hydro_cli_examples/examples/stdout_receiver/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use hydroflow::util::cli::{ConnectedDirect, ConnectedSource};

#[hydroflow::main]
async fn main() {
let mut ports = hydroflow::util::cli::init().await;
let ports = hydroflow::util::cli::init().await;
let echo_recv = ports
.port("echo")
.connect::<ConnectedDirect>()
Expand Down
2 changes: 1 addition & 1 deletion hydro_cli_examples/examples/tagged_stdout_receiver/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use hydroflow::util::cli::{ConnectedDirect, ConnectedSource, ConnectedTagged};

#[hydroflow::main]
async fn main() {
let mut ports = hydroflow::util::cli::init().await;
let ports = hydroflow::util::cli::init().await;
let echo_recv = ports
.port("echo")
.connect::<ConnectedTagged<ConnectedDirect>>()
Expand Down
2 changes: 1 addition & 1 deletion hydro_cli_examples/examples/ws_chat_server/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ struct ChatMessage {

#[hydroflow::main]
async fn main() {
let mut ports = hydroflow::util::cli::init().await;
let ports = hydroflow::util::cli::init().await;

let from_peer = ports
.port("from_peer")
Expand Down
21 changes: 15 additions & 6 deletions hydroflow/src/scheduled/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
use std::any::Any;
use std::future::Future;
use std::marker::PhantomData;
use std::pin::Pin;

use instant::Instant;
use tokio::sync::mpsc::UnboundedSender;
Expand Down Expand Up @@ -37,6 +38,8 @@ pub struct Context {
/// meaningless.
pub(crate) subgraph_id: SubgraphId,

pub(crate) tasks_to_spawn: Vec<Pin<Box<dyn Future<Output = ()> + 'static>>>,

/// Join handles for spawned tasks.
pub(crate) task_join_handles: Vec<JoinHandle<()>>,
}
Expand Down Expand Up @@ -155,23 +158,29 @@ impl Context {
.expect("StateHandle wrong type T for casting.")
}

/// Spawns an async task on the internal Tokio executor.
pub fn spawn_task<Fut>(&mut self, future: Fut)
/// Prepares an async task to be launched by [`Self::spawn_tasks`].
pub fn request_task<Fut>(&mut self, future: Fut)
where
Fut: Future<Output = ()> + 'static,
{
self.task_join_handles
.push(tokio::task::spawn_local(future));
self.tasks_to_spawn.push(Box::pin(future));
}

/// Launches all tasks requested with [`Self::request_task`] on the internal Tokio executor.
pub fn spawn_tasks(&mut self) {
for task in self.tasks_to_spawn.drain(..) {
self.task_join_handles.push(tokio::task::spawn_local(task));
}
}

/// Aborts all tasks spawned with [`Self::spawn_task`].
/// Aborts all tasks spawned with [`Self::spawn_tasks`].
pub fn abort_tasks(&mut self) {
for task in self.task_join_handles.drain(..) {
task.abort();
}
}

/// Waits for all tasks spawned with [`Self::spawn_task`] to complete.
/// Waits for all tasks spawned with [`Self::spawn_tasks`] to complete.
///
/// Will probably just hang.
pub async fn join_tasks(&mut self) {
Expand Down
8 changes: 5 additions & 3 deletions hydroflow/src/scheduled/graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ impl<'a> Default for Hydroflow<'a> {

subgraph_id: SubgraphId(0),

tasks_to_spawn: Vec::new(),
task_join_handles: Vec::new(),
};
Self {
Expand Down Expand Up @@ -351,6 +352,7 @@ impl<'a> Hydroflow<'a> {
/// TODO(mingwei): Currently blockes forever, no notion of "completion."
#[tracing::instrument(level = "trace", skip(self), ret)]
pub async fn run_async(&mut self) -> Option<Never> {
self.context.spawn_tasks();
shadaj marked this conversation as resolved.
Show resolved Hide resolved
loop {
// Run any work which is immediately available.
self.run_available_async().await;
Expand Down Expand Up @@ -681,12 +683,12 @@ impl<'a> Hydroflow<'a> {
}

impl<'a> Hydroflow<'a> {
/// Alias for [`Context::spawn_task`].
pub fn spawn_task<Fut>(&mut self, future: Fut)
/// Alias for [`Context::request_task`].
pub fn request_task<Fut>(&mut self, future: Fut)
where
Fut: Future<Output = ()> + 'static,
{
self.context.spawn_task(future);
self.context.request_task(future);
}

/// Alias for [`Context::abort_tasks`].
Expand Down
20 changes: 15 additions & 5 deletions hydroflow/src/util/cli.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#![allow(missing_docs)] // TODO(mingwei)

use std::cell::RefCell;
use std::collections::HashMap;

pub use hydroflow_cli_integration::*;
Expand All @@ -15,19 +16,28 @@ pub async fn launch_flow(mut flow: Hydroflow<'_>) {
stop.0.send(()).unwrap();
});

let local_set = tokio::task::LocalSet::new();
let flow = local_set.run_until(async move {
flow.run_async().await;
});

tokio::select! {
_ = stop.1 => {},
_ = flow.run_async() => {}
_ = flow => {}
}
}

pub struct HydroCLI {
ports: HashMap<String, ServerOrBound>,
ports: RefCell<HashMap<String, ServerOrBound>>,
}

impl HydroCLI {
pub fn port(&mut self, name: &str) -> ServerOrBound {
self.ports.remove(name).unwrap()
pub fn port(&self, name: &str) -> ServerOrBound {
self.ports
.try_borrow_mut()
.unwrap()
.remove(name)
.unwrap_or_else(|| panic!("port {} not found", name))
}
}

Expand Down Expand Up @@ -73,6 +83,6 @@ pub async fn init() -> HydroCLI {
println!("ack start");

HydroCLI {
ports: all_connected,
ports: RefCell::new(all_connected),
}
}
6 changes: 6 additions & 0 deletions hydroflow_cli_integration/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,12 @@ impl ServerOrBound {
T::from_defn(self).await
}

pub fn connect_local_blocking<T: Connected>(self) -> T {
let handle = tokio::runtime::Handle::current();
let _guard = handle.enter();
futures::executor::block_on(T::from_defn(self))
}

pub async fn accept_tcp(&mut self) -> TcpStream {
if let ServerOrBound::Bound(BoundConnection::TcpPort(handle, _)) = self {
handle.recv().await.unwrap().unwrap()
Expand Down
4 changes: 2 additions & 2 deletions hydroflow_lang/src/graph/ops/dest_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use super::{
/// A `Sink` is a thing into which values can be sent, asynchronously. For example, sending items
/// into a bounded channel.
///
/// Note this operator must be used within a Tokio runtime.
/// Note this operator must be used within a Tokio runtime, and the Hydroflow program must be launched with `run_async`.
///
/// ```rustbook
/// # #[hydroflow::main]
Expand Down Expand Up @@ -139,7 +139,7 @@ pub const DEST_SINK: OperatorConstraints = OperatorConstraints {
}
}
#hydroflow
.spawn_task(sink_feed_flush(#recv_ident, #sink_arg));
.request_task(sink_feed_flush(#recv_ident, #sink_arg));
}
};

Expand Down
17 changes: 17 additions & 0 deletions hydroflow_plus/src/stream.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::hash::Hash;
use std::marker::PhantomData;

use hydroflow::futures::Sink;
use proc_macro2::Span;
use stageleft::{IntoQuotedMut, Quoted};
use syn::parse_quote;
Expand Down Expand Up @@ -332,6 +333,22 @@ impl<'a, T> HfStream<'a, T> {
#ident = #self_ident -> for_each(#f);
});
}

pub fn dest_sink<S: Unpin + Sink<T> + 'a>(&self, sink: impl Quoted<'a, S>) {
let sink = sink.splice();
let self_ident = &self.ident;

self.graph
.builders
.borrow_mut()
.as_mut()
.unwrap()
.entry(self.node_id)
.or_default()
.add_statement(parse_quote! {
#self_ident -> dest_sink(#sink);
});
}
}

impl<'a, K, V1> HfStream<'a, (K, V1)> {
Expand Down
2 changes: 2 additions & 0 deletions hydroflow_plus_test/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ version = "0.0.0"
edition = "2021"

[dependencies]
hydroflow = { path = "../hydroflow", version = "^0.5.0", features = [ "cli_integration" ] }
hydroflow_plus = { path = "../hydroflow_plus", version = "^0.5.0" }
tokio = { version = "1.16", features = [ "full" ] }
stageleft = { path = "../stageleft", version = "^0.1.0" }
regex = "1"
serde = "1"
Expand Down
12 changes: 12 additions & 0 deletions hydroflow_plus_test/examples/networked_basic.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
use hydroflow_plus_test::*;

// cannot use hydroflow::main because connect_local_blocking causes a deadlock
#[tokio::main]
async fn main() {
let node_id: usize = std::env::args().nth(1).unwrap().parse().unwrap();
let ports = hydroflow::util::cli::init().await;

let joined = hydroflow_plus_test::networked::networked_basic!(&ports, node_id);

hydroflow::util::cli::launch_flow(joined).await;
}
Loading
Loading