Skip to content

Commit

Permalink
feat(hydroflow): add initial test using Hydro CLI from Hydroflow+
Browse files Browse the repository at this point in the history
This also required a change to Hydroflow core to make it possible to run the dataflow itself on a single thread (using a LocalSet), even if the surrounding runtime is not single-threaded (required to work around deadlocks because we can't use async APIs inside Hydroflow+). This requires us to spawn any Hydroflow tasks (only for `dest_sink` at the moment) right next to when we run the dataflow rather than when the Hydroflow graph is initialized. From a conceptual perspective, this seems _more right_, since now creating a Hydroflow program will not result in any actual tasks running.

In the third PR of this series, I aim to add a new Hydroflow+ operator that will automate the setup of a `dest_sink`/`source_stream` pair that span nodes.
  • Loading branch information
shadaj committed Dec 12, 2023
1 parent 8b7d87f commit a1b31b4
Show file tree
Hide file tree
Showing 31 changed files with 204 additions and 54 deletions.
29 changes: 8 additions & 21 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -216,11 +216,6 @@ jobs:
pip install maturin
maturin develop
- name: Configure core dumps
run: |
mkdir cli-core-dumps
echo "$PWD/cli-core-dumps/corefile-%e-%p-%t" | sudo tee /proc/sys/kernel/core_pattern
- name: Run Python tests
run: |
ulimit -c unlimited
Expand All @@ -229,24 +224,16 @@ jobs:
cd python_tests
pip install -r requirements.txt
RUST_BACKTRACE=1 pytest *.py
cd ../..
- name: Print backtraces
if: ${{ failure() }}
- name: Run Hydroflow+ Python tests
run: |
cd cli-core-dumps
sudo apt-get install gdb
for file in $(ls); do
echo "Backtrace for $file"
rust-gdb -ex "thread apply all bt" -batch $PWD/../.venv/bin/python $file
done
cd ..
- name: Upload core dumps
if: ${{ failure() }}
uses: actions/upload-artifact@master
with:
name: cli-core-dumps
path: ./cli-core-dumps/*
ulimit -c unlimited
cd hydro_cli
source .venv/bin/activate
cd ../hydroflow_plus_test/python_tests
pip install -r requirements.txt
RUST_BACKTRACE=1 pytest *.py
lints:
name: Lints
Expand Down
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions docs/docs/deploy/your-first-deploy.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ Let's open up `src/main.rs` in the generated project and write a new `main` func
```rust
#[hydroflow::main]
async fn main() {
let mut ports = hydroflow::util::cli::init().await;
let ports = hydroflow::util::cli::init().await;
}
```

Expand Down Expand Up @@ -72,7 +72,7 @@ use hydroflow::hydroflow_syntax;

#[hydroflow::main]
async fn main() {
let mut ports = hydroflow::util::cli::init().await;
let ports = hydroflow::util::cli::init().await;

let input_recv = ports
.port("input")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use hydroflow_datalog::datalog;

#[hydroflow::main]
async fn main() {
let mut ports = hydroflow::util::cli::init().await;
let ports = hydroflow::util::cli::init().await;
let vote_to_participant_port = ports
.port("vote_to_participant")
.connect::<ConnectedDemux<ConnectedDirect>>()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use hydroflow_datalog::datalog;

#[hydroflow::main]
async fn main() {
let mut ports = hydroflow::util::cli::init().await;
let ports = hydroflow::util::cli::init().await;
let vote_to_participant_source = ports
.port("vote_to_participant")
.connect::<ConnectedDirect>()
Expand Down
2 changes: 1 addition & 1 deletion hydro_cli_examples/examples/dedalus_receiver/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use hydroflow_datalog::datalog;

#[hydroflow::main]
async fn main() {
let mut ports = hydroflow::util::cli::init().await;
let ports = hydroflow::util::cli::init().await;
let broadcast_recv = ports
.port("broadcast")
.connect::<ConnectedDirect>()
Expand Down
2 changes: 1 addition & 1 deletion hydro_cli_examples/examples/dedalus_sender/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use hydroflow_datalog::datalog;

#[hydroflow::main]
async fn main() {
let mut ports = hydroflow::util::cli::init().await;
let ports = hydroflow::util::cli::init().await;
let broadcast_port = ports
.port("broadcast")
.connect::<ConnectedDemux<ConnectedDirect>>()
Expand Down
2 changes: 1 addition & 1 deletion hydro_cli_examples/examples/dedalus_vote_leader/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use hydroflow_datalog::datalog;

#[hydroflow::main]
async fn main() {
let mut ports = hydroflow::util::cli::init().await;
let ports = hydroflow::util::cli::init().await;
let to_replica_port = ports
.port("to_replica")
.connect::<ConnectedDemux<ConnectedDirect>>()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use hydroflow_datalog::datalog;

#[hydroflow::main]
async fn main() {
let mut ports = hydroflow::util::cli::init().await;
let ports = hydroflow::util::cli::init().await;
let to_replica_source = ports
.port("to_replica")
.connect::<ConnectedDirect>()
Expand Down
2 changes: 1 addition & 1 deletion hydro_cli_examples/examples/stdout_receiver/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use hydroflow::util::cli::{ConnectedDirect, ConnectedSource};

#[hydroflow::main]
async fn main() {
let mut ports = hydroflow::util::cli::init().await;
let ports = hydroflow::util::cli::init().await;
let echo_recv = ports
.port("echo")
.connect::<ConnectedDirect>()
Expand Down
2 changes: 1 addition & 1 deletion hydro_cli_examples/examples/tagged_stdout_receiver/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use hydroflow::util::cli::{ConnectedDirect, ConnectedSource, ConnectedTagged};

#[hydroflow::main]
async fn main() {
let mut ports = hydroflow::util::cli::init().await;
let ports = hydroflow::util::cli::init().await;
let echo_recv = ports
.port("echo")
.connect::<ConnectedTagged<ConnectedDirect>>()
Expand Down
2 changes: 1 addition & 1 deletion hydro_cli_examples/examples/ws_chat_server/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ struct ChatMessage {

#[hydroflow::main]
async fn main() {
let mut ports = hydroflow::util::cli::init().await;
let ports = hydroflow::util::cli::init().await;

let from_peer = ports
.port("from_peer")
Expand Down
21 changes: 15 additions & 6 deletions hydroflow/src/scheduled/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
use std::any::Any;
use std::future::Future;
use std::marker::PhantomData;
use std::pin::Pin;

use instant::Instant;
use tokio::sync::mpsc::UnboundedSender;
Expand Down Expand Up @@ -37,6 +38,8 @@ pub struct Context {
/// meaningless.
pub(crate) subgraph_id: SubgraphId,

pub(crate) tasks_to_spawn: Vec<Pin<Box<dyn Future<Output = ()> + 'static>>>,

/// Join handles for spawned tasks.
pub(crate) task_join_handles: Vec<JoinHandle<()>>,
}
Expand Down Expand Up @@ -155,23 +158,29 @@ impl Context {
.expect("StateHandle wrong type T for casting.")
}

/// Spawns an async task on the internal Tokio executor.
pub fn spawn_task<Fut>(&mut self, future: Fut)
/// Prepares an async task to be launched by [`Self::spawn_tasks`].
pub fn request_task<Fut>(&mut self, future: Fut)
where
Fut: Future<Output = ()> + 'static,
{
self.task_join_handles
.push(tokio::task::spawn_local(future));
self.tasks_to_spawn.push(Box::pin(future));
}

/// Launches all tasks requested with [`Self::request_task`] on the internal Tokio executor.
pub fn spawn_tasks(&mut self) {
for task in self.tasks_to_spawn.drain(..) {
self.task_join_handles.push(tokio::task::spawn_local(task));
}
}

/// Aborts all tasks spawned with [`Self::spawn_task`].
/// Aborts all tasks spawned with [`Self::spawn_tasks`].
pub fn abort_tasks(&mut self) {
for task in self.task_join_handles.drain(..) {
task.abort();
}
}

/// Waits for all tasks spawned with [`Self::spawn_task`] to complete.
/// Waits for all tasks spawned with [`Self::spawn_tasks`] to complete.
///
/// Will probably just hang.
pub async fn join_tasks(&mut self) {
Expand Down
8 changes: 5 additions & 3 deletions hydroflow/src/scheduled/graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ impl<'a> Default for Hydroflow<'a> {

subgraph_id: SubgraphId(0),

tasks_to_spawn: Vec::new(),
task_join_handles: Vec::new(),
};
Self {
Expand Down Expand Up @@ -351,6 +352,7 @@ impl<'a> Hydroflow<'a> {
/// TODO(mingwei): Currently blockes forever, no notion of "completion."
#[tracing::instrument(level = "trace", skip(self), ret)]
pub async fn run_async(&mut self) -> Option<Never> {
self.context.spawn_tasks();
loop {
// Run any work which is immediately available.
self.run_available_async().await;
Expand Down Expand Up @@ -681,12 +683,12 @@ impl<'a> Hydroflow<'a> {
}

impl<'a> Hydroflow<'a> {
/// Alias for [`Context::spawn_task`].
pub fn spawn_task<Fut>(&mut self, future: Fut)
/// Alias for [`Context::request_task`].
pub fn request_task<Fut>(&mut self, future: Fut)
where
Fut: Future<Output = ()> + 'static,
{
self.context.spawn_task(future);
self.context.request_task(future);
}

/// Alias for [`Context::abort_tasks`].
Expand Down
20 changes: 15 additions & 5 deletions hydroflow/src/util/cli.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#![allow(missing_docs)] // TODO(mingwei)

use std::cell::RefCell;
use std::collections::HashMap;

pub use hydroflow_cli_integration::*;
Expand All @@ -15,19 +16,28 @@ pub async fn launch_flow(mut flow: Hydroflow<'_>) {
stop.0.send(()).unwrap();
});

let local_set = tokio::task::LocalSet::new();
let flow = local_set.run_until(async move {
flow.run_async().await;
});

tokio::select! {
_ = stop.1 => {},
_ = flow.run_async() => {}
_ = flow => {}
}
}

pub struct HydroCLI {
ports: HashMap<String, ServerOrBound>,
ports: RefCell<HashMap<String, ServerOrBound>>,
}

impl HydroCLI {
pub fn port(&mut self, name: &str) -> ServerOrBound {
self.ports.remove(name).unwrap()
pub fn port(&self, name: &str) -> ServerOrBound {
self.ports
.try_borrow_mut()
.unwrap()
.remove(name)
.unwrap_or_else(|| panic!("port {} not found", name))
}
}

Expand Down Expand Up @@ -73,6 +83,6 @@ pub async fn init() -> HydroCLI {
println!("ack start");

HydroCLI {
ports: all_connected,
ports: RefCell::new(all_connected),
}
}
6 changes: 6 additions & 0 deletions hydroflow_cli_integration/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,12 @@ impl ServerOrBound {
T::from_defn(self).await
}

pub fn connect_local_blocking<T: Connected>(self) -> T {
let handle = tokio::runtime::Handle::current();
let _guard = handle.enter();
futures::executor::block_on(T::from_defn(self))
}

pub async fn accept_tcp(&mut self) -> TcpStream {
if let ServerOrBound::Bound(BoundConnection::TcpPort(handle, _)) = self {
handle.recv().await.unwrap().unwrap()
Expand Down
2 changes: 1 addition & 1 deletion hydroflow_lang/src/graph/ops/dest_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ pub const DEST_SINK: OperatorConstraints = OperatorConstraints {
}
}
#hydroflow
.spawn_task(sink_feed_flush(#recv_ident, #sink_arg));
.request_task(sink_feed_flush(#recv_ident, #sink_arg));
}
};

Expand Down
17 changes: 17 additions & 0 deletions hydroflow_plus/src/stream.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::hash::Hash;
use std::marker::PhantomData;

use hydroflow::futures::Sink;
use proc_macro2::Span;
use stageleft::{IntoQuotedMut, Quoted};
use syn::parse_quote;
Expand Down Expand Up @@ -332,6 +333,22 @@ impl<'a, T> HfStream<'a, T> {
#ident = #self_ident -> for_each(#f);
});
}

pub fn dest_sink<S: Unpin + Sink<T> + 'a>(&self, sink: impl Quoted<'a, S>) {
let sink = sink.splice();
let self_ident = &self.ident;

self.graph
.builders
.borrow_mut()
.as_mut()
.unwrap()
.entry(self.node_id)
.or_default()
.add_statement(parse_quote! {
#self_ident -> dest_sink(#sink);
});
}
}

impl<'a, K, V1> HfStream<'a, (K, V1)> {
Expand Down
2 changes: 2 additions & 0 deletions hydroflow_plus_test/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ version = "0.0.0"
edition = "2021"

[dependencies]
hydroflow = { path = "../hydroflow", version = "^0.5.0", features = [ "cli_integration" ] }
hydroflow_plus = { path = "../hydroflow_plus", version = "^0.5.0" }
tokio = { version = "1.16", features = [ "full" ] }
stageleft = { path = "../stageleft", version = "^0.1.0" }
regex = "1"
serde = "1"
Expand Down
12 changes: 12 additions & 0 deletions hydroflow_plus_test/examples/networked_basic.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
use hydroflow_plus_test::*;

// cannot use hydroflow::main because connect_local_blocking causes a deadlock
#[tokio::main]
async fn main() {
let node_id: usize = std::env::args().nth(1).unwrap().parse().unwrap();
let ports = hydroflow::util::cli::init().await;

let joined = hydroflow_plus_test::networked::networked_basic!(&ports, node_id);

hydroflow::util::cli::launch_flow(joined).await;
}
Loading

0 comments on commit a1b31b4

Please sign in to comment.