Skip to content

Commit

Permalink
add py_udf operator [wip] (#792)
Browse files Browse the repository at this point in the history
* feat: add `py_udf` operator (#792)

feature-gates `py_udf` behind `python` feature, does not run in wasm tests

* feat: Minor extension to `py_udf` to pass multiple arguments (#792)

---------

Co-authored-by: Tiemo Bang <[email protected]>
  • Loading branch information
MingweiSamuel and Tiemo Bang authored Jul 1, 2023
1 parent 6323980 commit 7dbd5e2
Show file tree
Hide file tree
Showing 14 changed files with 363 additions and 6 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ jobs:
CARGO_TARGET_WASM32_UNKNOWN_UNKNOWN_RUNNER: wasm-bindgen-test-runner
with:
command: test
args: -p hydroflow --target wasm32-unknown-unknown --tests --no-fail-fast
args: -p hydroflow --target wasm32-unknown-unknown --tests --no-fail-fast --no-default-features --features __default_wasm

test-cli:
name: Test CLI
Expand Down
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 6 additions & 3 deletions hydroflow/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,15 @@ documentation = "https://docs.rs/hydroflow/"
description = "Hydro's low-level dataflow runtime and IR"

[features]
default = [ "async", "macros" , "nightly" ]
default = [ "macros" , "nightly", "python" ]
__default_wasm = [ "macros", "nightly" ]

nightly = [ "hydroflow_macro", "hydroflow_macro/diagnostics" ]
async = [ "dep:futures" ]
macros = [ "hydroflow_macro", "hydroflow_datalog" ]
hydroflow_macro = [ "dep:hydroflow_macro" ]
hydroflow_datalog = [ "dep:hydroflow_datalog" ]
cli_integration = [ "dep:hydroflow_cli_integration" ]
python = [ "dep:pyo3" ]

[[example]]
name = "kvs_bench"
Expand All @@ -24,14 +26,15 @@ required-features = [ "nightly" ]
bincode = "1.3"
byteorder = "1.4.3"
bytes = "1.1.0"
futures = { version = "0.3", optional = true }
futures = "0.3"
hydroflow_cli_integration = { optional = true, path = "../hydroflow_cli_integration", version = "^0.2.0" }
hydroflow_datalog = { optional = true, path = "../hydroflow_datalog", version = "^0.2.0" }
hydroflow_lang = { path = "../hydroflow_lang", version = "^0.2.0" }
hydroflow_macro = { optional = true, path = "../hydroflow_macro", version = "^0.2.0" }
itertools = "0.10"
lattices = { path = "../lattices", version = "^0.2.0", features = [ "serde" ] }
pusherator = { path = "../pusherator", version = "^0.0.1" }
pyo3 = { optional = true, version = "0.18" }
ref-cast = "1.0"
regex = "1.8.4"
rustc-hash = "1.1.0"
Expand Down
27 changes: 27 additions & 0 deletions hydroflow/examples/python_udf/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
use hydroflow_macro::hydroflow_syntax;
use pyo3::{Py, PyAny, PyResult, Python};

#[hydroflow::main]
async fn main() {
eprintln!("Vec sender starting...");

let v = vec![1, 2, 3, 4, 5];

let mut df = hydroflow_syntax! {
source_iter(v) -> inspect(
|x| println!("input:\t{:?}", x)
)
// Map to tuples
-> map(|x| (x, 1))
-> py_udf(r#"
def add(a, b):
return a + 1
"#, "add")
-> map(|x: PyResult<Py<PyAny>>| -> i32 {Python::with_gil(|py| {
x.unwrap().extract(py).unwrap()
})})
-> for_each(|x| println!("output:\t{:?}", x));
};

df.run_available();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
---
source: hydroflow/tests/surface_python.rs
expression: hf.meta_graph().unwrap().to_dot()
---
digraph {
subgraph "cluster n1v1" {
fillcolor="#dddddd"
style=filled
label = "sg_1v1\nstratum 0"
n1v1 [label="(n1v1) source_iter(0..10)", fontname=Monaco, shape=invhouse, style = filled, color = "#0022ff", fontcolor = "#ffffff"]
n2v1 [label="(n2v1) map(|x| (x,))", fontname=Monaco, shape=invhouse, style = filled, color = "#0022ff", fontcolor = "#ffffff"]
n3v1 [label="(n3v1) py_udf(\l r#\"\ldef fib(n):\lif n < 2:\l return n\lelse:\l return fib(n - 2) + fib(n - 1)\l \"#,\l \"fib\",\l)\l", fontname=Monaco, shape=invhouse, style = filled, color = "#0022ff", fontcolor = "#ffffff"]
n4v1 [label="(n4v1) map(|x: PyResult<Py<PyAny>>| Python::with_gil(|py| {\l usize::extract(x.unwrap().as_ref(py)).unwrap()\l}))\l", fontname=Monaco, shape=invhouse, style = filled, color = "#0022ff", fontcolor = "#ffffff"]
n5v1 [label="(n5v1) assert([0, 1, 1, 2, 3, 5, 8, 13, 21, 34])", fontname=Monaco, shape=house, style = filled, color = "#ffff00"]
n1v1 -> n2v1
n2v1 -> n3v1
n3v1 -> n4v1
n4v1 -> n5v1
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
---
source: hydroflow/tests/surface_python.rs
expression: hf.meta_graph().unwrap().to_mermaid()
---
%%{init:{'theme':'base','themeVariables':{'clusterBkg':'#ddd','clusterBorder':'#888'}}}%%
flowchart TD
classDef pullClass fill:#8af,stroke:#000,text-align:left,white-space:pre
classDef pushClass fill:#ff8,stroke:#000,text-align:left,white-space:pre
linkStyle default stroke:#aaa,stroke-width:4px,color:red,font-size:1.5em;
subgraph sg_1v1 ["sg_1v1 stratum 0"]
1v1[\"(1v1) <code>source_iter(0..10)</code>"/]:::pullClass
2v1[\"(2v1) <code>map(|x| (x,))</code>"/]:::pullClass
3v1[\"<div style=text-align:center>(3v1)</div> <code>py_udf(<br> r&num;&quot;<br>def fib(n):<br>if n &lt; 2:<br> return n<br>else:<br> return fib(n - 2) + fib(n - 1)<br> &quot;&num;,<br> &quot;fib&quot;,<br>)</code>"/]:::pullClass
4v1[\"<div style=text-align:center>(4v1)</div> <code>map(|x: PyResult&lt;Py&lt;PyAny&gt;&gt;| Python::with_gil(|py| {<br> usize::extract(x.unwrap().as_ref(py)).unwrap()<br>}))</code>"/]:::pullClass
5v1[/"(5v1) <code>assert([0, 1, 1, 2, 3, 5, 8, 13, 21, 34])</code>"\]:::pushClass
1v1--->2v1
2v1--->3v1
3v1--->4v1
4v1--->5v1
end

Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
---
source: hydroflow/tests/surface_python.rs
expression: hf.meta_graph().unwrap().to_dot()
---
digraph {
subgraph "cluster n1v1" {
fillcolor="#dddddd"
style=filled
label = "sg_1v1\nstratum 0"
n1v1 [label="(n1v1) source_iter([(5,)])", fontname=Monaco, shape=invhouse, style = filled, color = "#0022ff", fontcolor = "#ffffff"]
n2v1 [label="(n2v1) py_udf(r#\"\ldef add(a, b):\lreturn a + b\l \"#, \"add\")\l", fontname=Monaco, shape=invhouse, style = filled, color = "#0022ff", fontcolor = "#ffffff"]
n3v1 [label="(n3v1) map(PyResult::<Py<PyAny>>::unwrap_err)", fontname=Monaco, shape=invhouse, style = filled, color = "#0022ff", fontcolor = "#ffffff"]
n4v1 [label="(n4v1) map(|py_err| py_err.to_string())", fontname=Monaco, shape=invhouse, style = filled, color = "#0022ff", fontcolor = "#ffffff"]
n5v1 [label="(n5v1) assert([\"TypeError: add() missing 1 required positional argument: 'b'\"])", fontname=Monaco, shape=house, style = filled, color = "#ffff00"]
n1v1 -> n2v1
n2v1 -> n3v1
n3v1 -> n4v1
n4v1 -> n5v1
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
---
source: hydroflow/tests/surface_python.rs
expression: hf.meta_graph().unwrap().to_mermaid()
---
%%{init:{'theme':'base','themeVariables':{'clusterBkg':'#ddd','clusterBorder':'#888'}}}%%
flowchart TD
classDef pullClass fill:#8af,stroke:#000,text-align:left,white-space:pre
classDef pushClass fill:#ff8,stroke:#000,text-align:left,white-space:pre
linkStyle default stroke:#aaa,stroke-width:4px,color:red,font-size:1.5em;
subgraph sg_1v1 ["sg_1v1 stratum 0"]
1v1[\"(1v1) <code>source_iter([(5,)])</code>"/]:::pullClass
2v1[\"<div style=text-align:center>(2v1)</div> <code>py_udf(r&num;&quot;<br>def add(a, b):<br>return a + b<br> &quot;&num;, &quot;add&quot;)</code>"/]:::pullClass
3v1[\"(3v1) <code>map(PyResult::&lt;Py&lt;PyAny&gt;&gt;::unwrap_err)</code>"/]:::pullClass
4v1[\"(4v1) <code>map(|py_err| py_err.to_string())</code>"/]:::pullClass
5v1[/"(5v1) <code>assert([&quot;TypeError: add() missing 1 required positional argument: 'b'&quot;])</code>"\]:::pushClass
1v1--->2v1
2v1--->3v1
3v1--->4v1
4v1--->5v1
end

Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
---
source: hydroflow/tests/surface_python.rs
expression: hf.meta_graph().unwrap().to_dot()
---
digraph {
subgraph "cluster n1v1" {
fillcolor="#dddddd"
style=filled
label = "sg_1v1\nstratum 0"
n1v1 [label="(n1v1) source_iter([(5, 1)])", fontname=Monaco, shape=invhouse, style = filled, color = "#0022ff", fontcolor = "#ffffff"]
n2v1 [label="(n2v1) py_udf(r#\"\ldef add(a, b):\lreturn a + b\l \"#, \"add\")\l", fontname=Monaco, shape=invhouse, style = filled, color = "#0022ff", fontcolor = "#ffffff"]
n3v1 [label="(n3v1) map(|x: PyResult<Py<PyAny>>| Python::with_gil(|py| {\l usize::extract(x.unwrap().as_ref(py)).unwrap()\l}))\l", fontname=Monaco, shape=invhouse, style = filled, color = "#0022ff", fontcolor = "#ffffff"]
n4v1 [label="(n4v1) assert([6])", fontname=Monaco, shape=house, style = filled, color = "#ffff00"]
n1v1 -> n2v1
n2v1 -> n3v1
n3v1 -> n4v1
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
---
source: hydroflow/tests/surface_python.rs
expression: hf.meta_graph().unwrap().to_mermaid()
---
%%{init:{'theme':'base','themeVariables':{'clusterBkg':'#ddd','clusterBorder':'#888'}}}%%
flowchart TD
classDef pullClass fill:#8af,stroke:#000,text-align:left,white-space:pre
classDef pushClass fill:#ff8,stroke:#000,text-align:left,white-space:pre
linkStyle default stroke:#aaa,stroke-width:4px,color:red,font-size:1.5em;
subgraph sg_1v1 ["sg_1v1 stratum 0"]
1v1[\"(1v1) <code>source_iter([(5, 1)])</code>"/]:::pullClass
2v1[\"<div style=text-align:center>(2v1)</div> <code>py_udf(r&num;&quot;<br>def add(a, b):<br>return a + b<br> &quot;&num;, &quot;add&quot;)</code>"/]:::pullClass
3v1[\"<div style=text-align:center>(3v1)</div> <code>map(|x: PyResult&lt;Py&lt;PyAny&gt;&gt;| Python::with_gil(|py| {<br> usize::extract(x.unwrap().as_ref(py)).unwrap()<br>}))</code>"/]:::pullClass
4v1[/"(4v1) <code>assert([6])</code>"\]:::pushClass
1v1--->2v1
2v1--->3v1
3v1--->4v1
end

62 changes: 62 additions & 0 deletions hydroflow/tests/surface_python.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
#![cfg(feature = "python")]

use hydroflow::{assert_graphvis_snapshots, hydroflow_syntax};
use multiplatform_test::multiplatform_test;
use pyo3::prelude::*;

#[multiplatform_test(test)]
pub fn test_python_basic() {
let mut hf = hydroflow_syntax! {
source_iter(0..10)
-> map(|x| (x,))
-> py_udf(r#"
def fib(n):
if n < 2:
return n
else:
return fib(n - 2) + fib(n - 1)
"#, "fib")
-> map(|x: PyResult<Py<PyAny>>| Python::with_gil(|py| {
usize::extract(x.unwrap().as_ref(py)).unwrap()
}))
-> assert([0, 1, 1, 2, 3, 5, 8, 13, 21, 34]);
};
assert_graphvis_snapshots!(hf);

hf.run_available();
}

#[multiplatform_test(test)]
pub fn test_python_too_many_args() {
let mut hf = hydroflow_syntax! {
source_iter([(5,)])
-> py_udf(r#"
def add(a, b):
return a + b
"#, "add")
-> map(PyResult::<Py<PyAny>>::unwrap_err)
-> map(|py_err| py_err.to_string())
-> assert(["TypeError: add() missing 1 required positional argument: 'b'"]);
};
assert_graphvis_snapshots!(hf);

hf.run_available();
}

#[multiplatform_test(test)]
pub fn test_python_two_args() {
let mut hf = hydroflow_syntax! {
source_iter([(5,1)])
-> py_udf(r#"
def add(a, b):
return a + b
"#, "add")
-> map(|x: PyResult<Py<PyAny>>| Python::with_gil(|py| {
usize::extract(x.unwrap().as_ref(py)).unwrap()
}))
-> assert([6]);
};
assert_graphvis_snapshots!(hf);

hf.run_available();
}
4 changes: 2 additions & 2 deletions hydroflow_lang/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,8 @@ fn generate_op_docs() -> Result<()> {

const DOCTEST_HYDROFLOW_PREFIX: &str = "\
```rust
# #[allow(unused_imports)] use hydroflow::{var_args, var_expr};
# #[allow(unused_imports)] use hydroflow::pusherator::Pusherator;
# #[allow(unused_imports)] use hydroflow::{var_args, var_expr, pusherator::Pusherator};
# #[cfg(feature = \"python\")] #[allow(unused_imports)] use pyo3::prelude::*;
# let __rt = hydroflow::tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();
# __rt.block_on(async { hydroflow::tokio::task::LocalSet::new().run_until(async {
# let mut __hf = hydroflow::hydroflow_syntax! {";
Expand Down
1 change: 1 addition & 0 deletions hydroflow_lang/src/graph/ops/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,7 @@ declare_ops![
persist::PERSIST,
persist_mut::PERSIST_MUT,
persist_mut_keyed::PERSIST_MUT_KEYED,
py_udf::PY_UDF,
reduce::REDUCE,
spin::SPIN,
sort::SORT,
Expand Down
Loading

0 comments on commit 7dbd5e2

Please sign in to comment.