Skip to content

Commit

Permalink
feat(hydroflow): add initial test using Hydro CLI from Hydroflow+
Browse files Browse the repository at this point in the history
This also required a change to Hydroflow core to make it possible to run the dataflow itself on a single thread (using a LocalSet), even if the surrounding runtime is not single-threaded (required to work around deadlocks because we can't use async APIs inside Hydroflow+). This requires us to spawn any Hydroflow tasks (only for `dest_sink` at the moment) right next to when we run the dataflow rather than when the Hydroflow graph is initialized. From a conceptual perspective, this seems _more right_, since now creating a Hydroflow program will not result in any actual tasks running.

In the third PR of this series, I aim to add a new Hydroflow+ operator that will automate the setup of a `dest_sink`/`source_stream` pair that span nodes.
  • Loading branch information
shadaj committed Dec 14, 2023
1 parent 8b7d87f commit 2a7d88c
Show file tree
Hide file tree
Showing 31 changed files with 269 additions and 56 deletions.
29 changes: 8 additions & 21 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -216,11 +216,6 @@ jobs:
pip install maturin
maturin develop
- name: Configure core dumps
run: |
mkdir cli-core-dumps
echo "$PWD/cli-core-dumps/corefile-%e-%p-%t" | sudo tee /proc/sys/kernel/core_pattern
- name: Run Python tests
run: |
ulimit -c unlimited
Expand All @@ -229,24 +224,16 @@ jobs:
cd python_tests
pip install -r requirements.txt
RUST_BACKTRACE=1 pytest *.py
cd ../..
- name: Print backtraces
if: ${{ failure() }}
- name: Run Hydroflow+ Python tests
run: |
cd cli-core-dumps
sudo apt-get install gdb
for file in $(ls); do
echo "Backtrace for $file"
rust-gdb -ex "thread apply all bt" -batch $PWD/../.venv/bin/python $file
done
cd ..
- name: Upload core dumps
if: ${{ failure() }}
uses: actions/upload-artifact@master
with:
name: cli-core-dumps
path: ./cli-core-dumps/*
ulimit -c unlimited
cd hydro_cli
source .venv/bin/activate
cd ../hydroflow_plus_test/python_tests
pip install -r requirements.txt
RUST_BACKTRACE=1 pytest *.py
lints:
name: Lints
Expand Down
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions docs/docs/deploy/your-first-deploy.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ Let's open up `src/main.rs` in the generated project and write a new `main` func
```rust
#[hydroflow::main]
async fn main() {
let mut ports = hydroflow::util::cli::init().await;
let ports = hydroflow::util::cli::init().await;
}
```

Expand Down Expand Up @@ -72,7 +72,7 @@ use hydroflow::hydroflow_syntax;

#[hydroflow::main]
async fn main() {
let mut ports = hydroflow::util::cli::init().await;
let ports = hydroflow::util::cli::init().await;

let input_recv = ports
.port("input")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use hydroflow_datalog::datalog;

#[hydroflow::main]
async fn main() {
let mut ports = hydroflow::util::cli::init().await;
let ports = hydroflow::util::cli::init().await;
let vote_to_participant_port = ports
.port("vote_to_participant")
.connect::<ConnectedDemux<ConnectedDirect>>()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use hydroflow_datalog::datalog;

#[hydroflow::main]
async fn main() {
let mut ports = hydroflow::util::cli::init().await;
let ports = hydroflow::util::cli::init().await;
let vote_to_participant_source = ports
.port("vote_to_participant")
.connect::<ConnectedDirect>()
Expand Down
2 changes: 1 addition & 1 deletion hydro_cli_examples/examples/dedalus_receiver/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use hydroflow_datalog::datalog;

#[hydroflow::main]
async fn main() {
let mut ports = hydroflow::util::cli::init().await;
let ports = hydroflow::util::cli::init().await;
let broadcast_recv = ports
.port("broadcast")
.connect::<ConnectedDirect>()
Expand Down
2 changes: 1 addition & 1 deletion hydro_cli_examples/examples/dedalus_sender/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use hydroflow_datalog::datalog;

#[hydroflow::main]
async fn main() {
let mut ports = hydroflow::util::cli::init().await;
let ports = hydroflow::util::cli::init().await;
let broadcast_port = ports
.port("broadcast")
.connect::<ConnectedDemux<ConnectedDirect>>()
Expand Down
2 changes: 1 addition & 1 deletion hydro_cli_examples/examples/dedalus_vote_leader/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use hydroflow_datalog::datalog;

#[hydroflow::main]
async fn main() {
let mut ports = hydroflow::util::cli::init().await;
let ports = hydroflow::util::cli::init().await;
let to_replica_port = ports
.port("to_replica")
.connect::<ConnectedDemux<ConnectedDirect>>()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use hydroflow_datalog::datalog;

#[hydroflow::main]
async fn main() {
let mut ports = hydroflow::util::cli::init().await;
let ports = hydroflow::util::cli::init().await;
let to_replica_source = ports
.port("to_replica")
.connect::<ConnectedDirect>()
Expand Down
2 changes: 1 addition & 1 deletion hydro_cli_examples/examples/stdout_receiver/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use hydroflow::util::cli::{ConnectedDirect, ConnectedSource};

#[hydroflow::main]
async fn main() {
let mut ports = hydroflow::util::cli::init().await;
let ports = hydroflow::util::cli::init().await;
let echo_recv = ports
.port("echo")
.connect::<ConnectedDirect>()
Expand Down
2 changes: 1 addition & 1 deletion hydro_cli_examples/examples/tagged_stdout_receiver/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use hydroflow::util::cli::{ConnectedDirect, ConnectedSource, ConnectedTagged};

#[hydroflow::main]
async fn main() {
let mut ports = hydroflow::util::cli::init().await;
let ports = hydroflow::util::cli::init().await;
let echo_recv = ports
.port("echo")
.connect::<ConnectedTagged<ConnectedDirect>>()
Expand Down
2 changes: 1 addition & 1 deletion hydro_cli_examples/examples/ws_chat_server/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ struct ChatMessage {

#[hydroflow::main]
async fn main() {
let mut ports = hydroflow::util::cli::init().await;
let ports = hydroflow::util::cli::init().await;

let from_peer = ports
.port("from_peer")
Expand Down
21 changes: 15 additions & 6 deletions hydroflow/src/scheduled/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
use std::any::Any;
use std::future::Future;
use std::marker::PhantomData;
use std::pin::Pin;

use instant::Instant;
use tokio::sync::mpsc::UnboundedSender;
Expand Down Expand Up @@ -37,6 +38,8 @@ pub struct Context {
/// meaningless.
pub(crate) subgraph_id: SubgraphId,

pub(crate) tasks_to_spawn: Vec<Pin<Box<dyn Future<Output = ()> + 'static>>>,

/// Join handles for spawned tasks.
pub(crate) task_join_handles: Vec<JoinHandle<()>>,
}
Expand Down Expand Up @@ -155,23 +158,29 @@ impl Context {
.expect("StateHandle wrong type T for casting.")
}

/// Spawns an async task on the internal Tokio executor.
pub fn spawn_task<Fut>(&mut self, future: Fut)
/// Prepares an async task to be launched by [`Self::spawn_tasks`].
pub fn request_task<Fut>(&mut self, future: Fut)
where
Fut: Future<Output = ()> + 'static,
{
self.task_join_handles
.push(tokio::task::spawn_local(future));
self.tasks_to_spawn.push(Box::pin(future));
}

/// Launches all tasks requested with [`Self::request_task`] on the internal Tokio executor.
pub fn spawn_tasks(&mut self) {
for task in self.tasks_to_spawn.drain(..) {
self.task_join_handles.push(tokio::task::spawn_local(task));
}
}

/// Aborts all tasks spawned with [`Self::spawn_task`].
/// Aborts all tasks spawned with [`Self::spawn_tasks`].
pub fn abort_tasks(&mut self) {
for task in self.task_join_handles.drain(..) {
task.abort();
}
}

/// Waits for all tasks spawned with [`Self::spawn_task`] to complete.
/// Waits for all tasks spawned with [`Self::spawn_tasks`] to complete.
///
/// Will probably just hang.
pub async fn join_tasks(&mut self) {
Expand Down
8 changes: 5 additions & 3 deletions hydroflow/src/scheduled/graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ impl<'a> Default for Hydroflow<'a> {

subgraph_id: SubgraphId(0),

tasks_to_spawn: Vec::new(),
task_join_handles: Vec::new(),
};
Self {
Expand Down Expand Up @@ -351,6 +352,7 @@ impl<'a> Hydroflow<'a> {
/// TODO(mingwei): Currently blockes forever, no notion of "completion."
#[tracing::instrument(level = "trace", skip(self), ret)]
pub async fn run_async(&mut self) -> Option<Never> {
self.context.spawn_tasks();
loop {
// Run any work which is immediately available.
self.run_available_async().await;
Expand Down Expand Up @@ -681,12 +683,12 @@ impl<'a> Hydroflow<'a> {
}

impl<'a> Hydroflow<'a> {
/// Alias for [`Context::spawn_task`].
pub fn spawn_task<Fut>(&mut self, future: Fut)
/// Alias for [`Context::request_task`].
pub fn request_task<Fut>(&mut self, future: Fut)
where
Fut: Future<Output = ()> + 'static,
{
self.context.spawn_task(future);
self.context.request_task(future);
}

/// Alias for [`Context::abort_tasks`].
Expand Down
20 changes: 15 additions & 5 deletions hydroflow/src/util/cli.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#![allow(missing_docs)] // TODO(mingwei)

use std::cell::RefCell;
use std::collections::HashMap;

pub use hydroflow_cli_integration::*;
Expand All @@ -15,19 +16,28 @@ pub async fn launch_flow(mut flow: Hydroflow<'_>) {
stop.0.send(()).unwrap();
});

let local_set = tokio::task::LocalSet::new();
let flow = local_set.run_until(async move {
flow.run_async().await;
});

tokio::select! {
_ = stop.1 => {},
_ = flow.run_async() => {}
_ = flow => {}
}
}

pub struct HydroCLI {
ports: HashMap<String, ServerOrBound>,
ports: RefCell<HashMap<String, ServerOrBound>>,
}

impl HydroCLI {
pub fn port(&mut self, name: &str) -> ServerOrBound {
self.ports.remove(name).unwrap()
pub fn port(&self, name: &str) -> ServerOrBound {
self.ports
.try_borrow_mut()
.unwrap()
.remove(name)
.unwrap_or_else(|| panic!("port {} not found", name))
}
}

Expand Down Expand Up @@ -73,6 +83,6 @@ pub async fn init() -> HydroCLI {
println!("ack start");

HydroCLI {
ports: all_connected,
ports: RefCell::new(all_connected),
}
}
6 changes: 6 additions & 0 deletions hydroflow_cli_integration/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,12 @@ impl ServerOrBound {
T::from_defn(self).await
}

pub fn connect_local_blocking<T: Connected>(self) -> T {
let handle = tokio::runtime::Handle::current();
let _guard = handle.enter();
futures::executor::block_on(T::from_defn(self))
}

pub async fn accept_tcp(&mut self) -> TcpStream {
if let ServerOrBound::Bound(BoundConnection::TcpPort(handle, _)) = self {
handle.recv().await.unwrap().unwrap()
Expand Down
4 changes: 2 additions & 2 deletions hydroflow_lang/src/graph/ops/dest_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use super::{
/// A `Sink` is a thing into which values can be sent, asynchronously. For example, sending items
/// into a bounded channel.
///
/// Note this operator must be used within a Tokio runtime.
/// Note this operator must be used within a Tokio runtime, and the Hydroflow program must be launched with `run_async`.
///
/// ```rustbook
/// # #[hydroflow::main]
Expand Down Expand Up @@ -139,7 +139,7 @@ pub const DEST_SINK: OperatorConstraints = OperatorConstraints {
}
}
#hydroflow
.spawn_task(sink_feed_flush(#recv_ident, #sink_arg));
.request_task(sink_feed_flush(#recv_ident, #sink_arg));
}
};

Expand Down
Loading

0 comments on commit 2a7d88c

Please sign in to comment.