diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index a5cc12129b45..4f987e5bd8ad 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -178,7 +178,7 @@ jobs: CARGO_TARGET_WASM32_UNKNOWN_UNKNOWN_RUNNER: wasm-bindgen-test-runner with: command: test - args: -p hydroflow --target wasm32-unknown-unknown --tests --no-fail-fast + args: -p hydroflow --target wasm32-unknown-unknown --tests --no-fail-fast --no-default-features --features __default_wasm test-cli: name: Test CLI diff --git a/Cargo.lock b/Cargo.lock index 1ef04c3c098d..5e1fc63be2b3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1363,6 +1363,7 @@ dependencies = [ "lattices", "multiplatform_test", "pusherator", + "pyo3", "rand 0.8.5", "rand_distr", "ref-cast", diff --git a/hydroflow/Cargo.toml b/hydroflow/Cargo.toml index ecc30b8e01ea..398c3ff1357f 100644 --- a/hydroflow/Cargo.toml +++ b/hydroflow/Cargo.toml @@ -8,13 +8,15 @@ documentation = "https://docs.rs/hydroflow/" description = "Hydro's low-level dataflow runtime and IR" [features] -default = [ "async", "macros" , "nightly" ] +default = [ "macros" , "nightly", "python" ] +__default_wasm = [ "macros", "nightly" ] + nightly = [ "hydroflow_macro", "hydroflow_macro/diagnostics" ] -async = [ "dep:futures" ] macros = [ "hydroflow_macro", "hydroflow_datalog" ] hydroflow_macro = [ "dep:hydroflow_macro" ] hydroflow_datalog = [ "dep:hydroflow_datalog" ] cli_integration = [ "dep:hydroflow_cli_integration" ] +python = [ "dep:pyo3" ] [[example]] name = "kvs_bench" @@ -24,7 +26,7 @@ required-features = [ "nightly" ] bincode = "1.3" byteorder = "1.4.3" bytes = "1.1.0" -futures = { version = "0.3", optional = true } +futures = "0.3" hydroflow_cli_integration = { optional = true, path = "../hydroflow_cli_integration", version = "^0.2.0" } hydroflow_datalog = { optional = true, path = "../hydroflow_datalog", version = "^0.2.0" } hydroflow_lang = { path = "../hydroflow_lang", version = "^0.2.0" } @@ -32,6 +34,7 @@ hydroflow_macro = { optional = true, path = "../hydroflow_macro", version = "^0. itertools = "0.10" lattices = { path = "../lattices", version = "^0.2.0", features = [ "serde" ] } pusherator = { path = "../pusherator", version = "^0.0.1" } +pyo3 = { optional = true, version = "0.18" } ref-cast = "1.0" regex = "1.8.4" rustc-hash = "1.1.0" diff --git a/hydroflow/examples/python_udf/main.rs b/hydroflow/examples/python_udf/main.rs new file mode 100644 index 000000000000..c9f2a195aca8 --- /dev/null +++ b/hydroflow/examples/python_udf/main.rs @@ -0,0 +1,27 @@ +use hydroflow_macro::hydroflow_syntax; +use pyo3::{Py, PyAny, PyResult, Python}; + +#[hydroflow::main] +async fn main() { + eprintln!("Vec sender starting..."); + + let v = vec![1, 2, 3, 4, 5]; + + let mut df = hydroflow_syntax! { + source_iter(v) -> inspect( + |x| println!("input:\t{:?}", x) + ) + // Map to tuples + -> map(|x| (x, 1)) + -> py_udf(r#" +def add(a, b): + return a + 1 + "#, "add") + -> map(|x: PyResult>| -> i32 {Python::with_gil(|py| { + x.unwrap().extract(py).unwrap() + })}) + -> for_each(|x| println!("output:\t{:?}", x)); + }; + + df.run_available(); +} diff --git a/hydroflow/tests/snapshots/surface_python__python_basic@graphvis_dot.snap b/hydroflow/tests/snapshots/surface_python__python_basic@graphvis_dot.snap new file mode 100644 index 000000000000..0bf385b65a70 --- /dev/null +++ b/hydroflow/tests/snapshots/surface_python__python_basic@graphvis_dot.snap @@ -0,0 +1,21 @@ +--- +source: hydroflow/tests/surface_python.rs +expression: hf.meta_graph().unwrap().to_dot() +--- +digraph { + subgraph "cluster n1v1" { + fillcolor="#dddddd" + style=filled + label = "sg_1v1\nstratum 0" + n1v1 [label="(n1v1) source_iter(0..10)", fontname=Monaco, shape=invhouse, style = filled, color = "#0022ff", fontcolor = "#ffffff"] + n2v1 [label="(n2v1) map(|x| (x,))", fontname=Monaco, shape=invhouse, style = filled, color = "#0022ff", fontcolor = "#ffffff"] + n3v1 [label="(n3v1) py_udf(\l r#\"\ldef fib(n):\lif n < 2:\l return n\lelse:\l return fib(n - 2) + fib(n - 1)\l \"#,\l \"fib\",\l)\l", fontname=Monaco, shape=invhouse, style = filled, color = "#0022ff", fontcolor = "#ffffff"] + n4v1 [label="(n4v1) map(|x: PyResult>| Python::with_gil(|py| {\l usize::extract(x.unwrap().as_ref(py)).unwrap()\l}))\l", fontname=Monaco, shape=invhouse, style = filled, color = "#0022ff", fontcolor = "#ffffff"] + n5v1 [label="(n5v1) assert([0, 1, 1, 2, 3, 5, 8, 13, 21, 34])", fontname=Monaco, shape=house, style = filled, color = "#ffff00"] + n1v1 -> n2v1 + n2v1 -> n3v1 + n3v1 -> n4v1 + n4v1 -> n5v1 + } +} + diff --git a/hydroflow/tests/snapshots/surface_python__python_basic@graphvis_mermaid.snap b/hydroflow/tests/snapshots/surface_python__python_basic@graphvis_mermaid.snap new file mode 100644 index 000000000000..20830defd26f --- /dev/null +++ b/hydroflow/tests/snapshots/surface_python__python_basic@graphvis_mermaid.snap @@ -0,0 +1,21 @@ +--- +source: hydroflow/tests/surface_python.rs +expression: hf.meta_graph().unwrap().to_mermaid() +--- +%%{init:{'theme':'base','themeVariables':{'clusterBkg':'#ddd','clusterBorder':'#888'}}}%% +flowchart TD +classDef pullClass fill:#8af,stroke:#000,text-align:left,white-space:pre +classDef pushClass fill:#ff8,stroke:#000,text-align:left,white-space:pre +linkStyle default stroke:#aaa,stroke-width:4px,color:red,font-size:1.5em; +subgraph sg_1v1 ["sg_1v1 stratum 0"] + 1v1[\"(1v1) source_iter(0..10)"/]:::pullClass + 2v1[\"(2v1) map(|x| (x,))"/]:::pullClass + 3v1[\"
(3v1)
py_udf(
r#"
def fib(n):
if n < 2:
return n
else:
return fib(n - 2) + fib(n - 1)
"#,
"fib",
)
"/]:::pullClass + 4v1[\"
(4v1)
map(|x: PyResult<Py<PyAny>>| Python::with_gil(|py| {
usize::extract(x.unwrap().as_ref(py)).unwrap()
}))
"/]:::pullClass + 5v1[/"(5v1) assert([0, 1, 1, 2, 3, 5, 8, 13, 21, 34])"\]:::pushClass + 1v1--->2v1 + 2v1--->3v1 + 3v1--->4v1 + 4v1--->5v1 +end + diff --git a/hydroflow/tests/snapshots/surface_python__python_too_many_args@graphvis_dot.snap b/hydroflow/tests/snapshots/surface_python__python_too_many_args@graphvis_dot.snap new file mode 100644 index 000000000000..771d8fac5691 --- /dev/null +++ b/hydroflow/tests/snapshots/surface_python__python_too_many_args@graphvis_dot.snap @@ -0,0 +1,21 @@ +--- +source: hydroflow/tests/surface_python.rs +expression: hf.meta_graph().unwrap().to_dot() +--- +digraph { + subgraph "cluster n1v1" { + fillcolor="#dddddd" + style=filled + label = "sg_1v1\nstratum 0" + n1v1 [label="(n1v1) source_iter([(5,)])", fontname=Monaco, shape=invhouse, style = filled, color = "#0022ff", fontcolor = "#ffffff"] + n2v1 [label="(n2v1) py_udf(r#\"\ldef add(a, b):\lreturn a + b\l \"#, \"add\")\l", fontname=Monaco, shape=invhouse, style = filled, color = "#0022ff", fontcolor = "#ffffff"] + n3v1 [label="(n3v1) map(PyResult::>::unwrap_err)", fontname=Monaco, shape=invhouse, style = filled, color = "#0022ff", fontcolor = "#ffffff"] + n4v1 [label="(n4v1) map(|py_err| py_err.to_string())", fontname=Monaco, shape=invhouse, style = filled, color = "#0022ff", fontcolor = "#ffffff"] + n5v1 [label="(n5v1) assert([\"TypeError: add() missing 1 required positional argument: 'b'\"])", fontname=Monaco, shape=house, style = filled, color = "#ffff00"] + n1v1 -> n2v1 + n2v1 -> n3v1 + n3v1 -> n4v1 + n4v1 -> n5v1 + } +} + diff --git a/hydroflow/tests/snapshots/surface_python__python_too_many_args@graphvis_mermaid.snap b/hydroflow/tests/snapshots/surface_python__python_too_many_args@graphvis_mermaid.snap new file mode 100644 index 000000000000..a81b4827769a --- /dev/null +++ b/hydroflow/tests/snapshots/surface_python__python_too_many_args@graphvis_mermaid.snap @@ -0,0 +1,21 @@ +--- +source: hydroflow/tests/surface_python.rs +expression: hf.meta_graph().unwrap().to_mermaid() +--- +%%{init:{'theme':'base','themeVariables':{'clusterBkg':'#ddd','clusterBorder':'#888'}}}%% +flowchart TD +classDef pullClass fill:#8af,stroke:#000,text-align:left,white-space:pre +classDef pushClass fill:#ff8,stroke:#000,text-align:left,white-space:pre +linkStyle default stroke:#aaa,stroke-width:4px,color:red,font-size:1.5em; +subgraph sg_1v1 ["sg_1v1 stratum 0"] + 1v1[\"(1v1) source_iter([(5,)])"/]:::pullClass + 2v1[\"
(2v1)
py_udf(r#"
def add(a, b):
return a + b
"#, "add")
"/]:::pullClass + 3v1[\"(3v1) map(PyResult::<Py<PyAny>>::unwrap_err)"/]:::pullClass + 4v1[\"(4v1) map(|py_err| py_err.to_string())"/]:::pullClass + 5v1[/"(5v1) assert(["TypeError: add() missing 1 required positional argument: 'b'"])"\]:::pushClass + 1v1--->2v1 + 2v1--->3v1 + 3v1--->4v1 + 4v1--->5v1 +end + diff --git a/hydroflow/tests/snapshots/surface_python__python_two_args@graphvis_dot.snap b/hydroflow/tests/snapshots/surface_python__python_two_args@graphvis_dot.snap new file mode 100644 index 000000000000..e3d31d3e0a76 --- /dev/null +++ b/hydroflow/tests/snapshots/surface_python__python_two_args@graphvis_dot.snap @@ -0,0 +1,19 @@ +--- +source: hydroflow/tests/surface_python.rs +expression: hf.meta_graph().unwrap().to_dot() +--- +digraph { + subgraph "cluster n1v1" { + fillcolor="#dddddd" + style=filled + label = "sg_1v1\nstratum 0" + n1v1 [label="(n1v1) source_iter([(5, 1)])", fontname=Monaco, shape=invhouse, style = filled, color = "#0022ff", fontcolor = "#ffffff"] + n2v1 [label="(n2v1) py_udf(r#\"\ldef add(a, b):\lreturn a + b\l \"#, \"add\")\l", fontname=Monaco, shape=invhouse, style = filled, color = "#0022ff", fontcolor = "#ffffff"] + n3v1 [label="(n3v1) map(|x: PyResult>| Python::with_gil(|py| {\l usize::extract(x.unwrap().as_ref(py)).unwrap()\l}))\l", fontname=Monaco, shape=invhouse, style = filled, color = "#0022ff", fontcolor = "#ffffff"] + n4v1 [label="(n4v1) assert([6])", fontname=Monaco, shape=house, style = filled, color = "#ffff00"] + n1v1 -> n2v1 + n2v1 -> n3v1 + n3v1 -> n4v1 + } +} + diff --git a/hydroflow/tests/snapshots/surface_python__python_two_args@graphvis_mermaid.snap b/hydroflow/tests/snapshots/surface_python__python_two_args@graphvis_mermaid.snap new file mode 100644 index 000000000000..50749ed5aa29 --- /dev/null +++ b/hydroflow/tests/snapshots/surface_python__python_two_args@graphvis_mermaid.snap @@ -0,0 +1,19 @@ +--- +source: hydroflow/tests/surface_python.rs +expression: hf.meta_graph().unwrap().to_mermaid() +--- +%%{init:{'theme':'base','themeVariables':{'clusterBkg':'#ddd','clusterBorder':'#888'}}}%% +flowchart TD +classDef pullClass fill:#8af,stroke:#000,text-align:left,white-space:pre +classDef pushClass fill:#ff8,stroke:#000,text-align:left,white-space:pre +linkStyle default stroke:#aaa,stroke-width:4px,color:red,font-size:1.5em; +subgraph sg_1v1 ["sg_1v1 stratum 0"] + 1v1[\"(1v1) source_iter([(5, 1)])"/]:::pullClass + 2v1[\"
(2v1)
py_udf(r#"
def add(a, b):
return a + b
"#, "add")
"/]:::pullClass + 3v1[\"
(3v1)
map(|x: PyResult<Py<PyAny>>| Python::with_gil(|py| {
usize::extract(x.unwrap().as_ref(py)).unwrap()
}))
"/]:::pullClass + 4v1[/"(4v1) assert([6])"\]:::pushClass + 1v1--->2v1 + 2v1--->3v1 + 3v1--->4v1 +end + diff --git a/hydroflow/tests/surface_python.rs b/hydroflow/tests/surface_python.rs new file mode 100644 index 000000000000..5b208dc612bf --- /dev/null +++ b/hydroflow/tests/surface_python.rs @@ -0,0 +1,62 @@ +#![cfg(feature = "python")] + +use hydroflow::{assert_graphvis_snapshots, hydroflow_syntax}; +use multiplatform_test::multiplatform_test; +use pyo3::prelude::*; + +#[multiplatform_test(test)] +pub fn test_python_basic() { + let mut hf = hydroflow_syntax! { + source_iter(0..10) + -> map(|x| (x,)) + -> py_udf(r#" +def fib(n): + if n < 2: + return n + else: + return fib(n - 2) + fib(n - 1) + "#, "fib") + -> map(|x: PyResult>| Python::with_gil(|py| { + usize::extract(x.unwrap().as_ref(py)).unwrap() + })) + -> assert([0, 1, 1, 2, 3, 5, 8, 13, 21, 34]); + }; + assert_graphvis_snapshots!(hf); + + hf.run_available(); +} + +#[multiplatform_test(test)] +pub fn test_python_too_many_args() { + let mut hf = hydroflow_syntax! { + source_iter([(5,)]) + -> py_udf(r#" +def add(a, b): + return a + b + "#, "add") + -> map(PyResult::>::unwrap_err) + -> map(|py_err| py_err.to_string()) + -> assert(["TypeError: add() missing 1 required positional argument: 'b'"]); + }; + assert_graphvis_snapshots!(hf); + + hf.run_available(); +} + +#[multiplatform_test(test)] +pub fn test_python_two_args() { + let mut hf = hydroflow_syntax! { + source_iter([(5,1)]) + -> py_udf(r#" +def add(a, b): + return a + b + "#, "add") + -> map(|x: PyResult>| Python::with_gil(|py| { + usize::extract(x.unwrap().as_ref(py)).unwrap() + })) + -> assert([6]); + }; + assert_graphvis_snapshots!(hf); + + hf.run_available(); +} diff --git a/hydroflow_lang/build.rs b/hydroflow_lang/build.rs index b40c47abf9ce..df87caa77ac9 100644 --- a/hydroflow_lang/build.rs +++ b/hydroflow_lang/build.rs @@ -93,8 +93,8 @@ fn generate_op_docs() -> Result<()> { const DOCTEST_HYDROFLOW_PREFIX: &str = "\ ```rust -# #[allow(unused_imports)] use hydroflow::{var_args, var_expr}; -# #[allow(unused_imports)] use hydroflow::pusherator::Pusherator; +# #[allow(unused_imports)] use hydroflow::{var_args, var_expr, pusherator::Pusherator}; +# #[cfg(feature = \"python\")] #[allow(unused_imports)] use pyo3::prelude::*; # let __rt = hydroflow::tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap(); # __rt.block_on(async { hydroflow::tokio::task::LocalSet::new().run_until(async { # let mut __hf = hydroflow::hydroflow_syntax! {"; diff --git a/hydroflow_lang/src/graph/ops/mod.rs b/hydroflow_lang/src/graph/ops/mod.rs index 59497d290b75..9bc68ec8eb28 100644 --- a/hydroflow_lang/src/graph/ops/mod.rs +++ b/hydroflow_lang/src/graph/ops/mod.rs @@ -264,6 +264,7 @@ declare_ops![ persist::PERSIST, persist_mut::PERSIST_MUT, persist_mut_keyed::PERSIST_MUT_KEYED, + py_udf::PY_UDF, reduce::REDUCE, spin::SPIN, sort::SORT, diff --git a/hydroflow_lang/src/graph/ops/py_udf.rs b/hydroflow_lang/src/graph/ops/py_udf.rs new file mode 100644 index 000000000000..310a5bdf273e --- /dev/null +++ b/hydroflow_lang/src/graph/ops/py_udf.rs @@ -0,0 +1,141 @@ +use proc_macro2::Literal; +use quote::quote_spanned; + +use super::{ + FlowProperties, FlowPropertyVal, OperatorCategory, OperatorConstraints, OperatorInstance, + OperatorWriteOutput, WriteContextArgs, RANGE_0, RANGE_1, +}; + +/// > Arguments: First, the source code for a python module, second, the name of a unary function +/// > defined within the module source code. +/// +/// **Requires the "python" feature to be enabled.** +/// +/// An operator which allows you to run a python udf. Input arguments must be a stream of tuples +/// whose items implement [`IntoPy`](https://docs.rs/pyo3/latest/pyo3/conversion/trait.IntoPy.html). +/// See the [relevant pyo3 docs here](https://pyo3.rs/latest/conversions/tables#mapping-of-rust-types-to-python-types). +/// +/// Output items are of type `PyResult>`. Rust native types can be extracted using +/// `.extract()`, see the [relevant pyo3 docs here](https://pyo3.rs/latest/conversions/traits#extract-and-the-frompyobject-trait) +/// or the examples below. +/// +/// ```hydroflow +/// source_iter(0..10) +/// -> map(|x| (x,)) +/// -> py_udf(r#" +/// def fib(n): +/// if n < 2: +/// return n +/// else: +/// return fib(n - 2) + fib(n - 1) +/// "#, "fib") +/// -> map(|x: PyResult>| Python::with_gil(|py| { +/// usize::extract(x.unwrap().as_ref(py)).unwrap() +/// })) +/// -> assert([0, 1, 1, 2, 3, 5, 8, 13, 21, 34]); +/// ``` +/// +/// ```hydroflow +/// source_iter([(5,1)]) +/// -> py_udf(r#" +/// def add(a, b): +/// return a + b +/// "#, "add") +/// -> map(|x: PyResult>| Python::with_gil(|py| { +/// usize::extract(x.unwrap().as_ref(py)).unwrap() +/// })) +/// -> assert([6]); +/// ``` +pub const PY_UDF: OperatorConstraints = OperatorConstraints { + name: "py_udf", + categories: &[OperatorCategory::Map], + hard_range_inn: RANGE_1, + soft_range_inn: RANGE_1, + hard_range_out: RANGE_1, + soft_range_out: RANGE_1, + num_args: 2, + persistence_args: RANGE_0, + type_args: RANGE_0, + is_external_input: false, + ports_inn: None, + ports_out: None, + properties: FlowProperties { + deterministic: FlowPropertyVal::DependsOnArgs, + monotonic: FlowPropertyVal::DependsOnArgs, + inconsistency_tainted: false, + }, + input_delaytype_fn: |_| None, + write_fn: |wc @ &WriteContextArgs { + root, + op_span, + context, + hydroflow, + ident, + inputs, + outputs, + is_pull, + op_name, + op_inst: OperatorInstance { arguments, .. }, + .. + }, + _| { + let py_src = &arguments[0]; + let py_func_name = &arguments[1]; + + let py_func_ident = wc.make_ident("py_func"); + + let err_lit = Literal::string(&*format!( + "Hydroflow 'python' feature must be enabled to use `{}`", + op_name + )); + + let write_prologue = quote_spanned! {op_span=> + #[cfg(feature = "python")] + let #py_func_ident = { + ::pyo3::prepare_freethreaded_python(); + let func = ::pyo3::Python::with_gil::<_, ::pyo3::PyResult<::pyo3::Py<::pyo3::PyAny>>>(|py| { + Ok(::pyo3::types::PyModule::from_code( + py, + #py_src, + "_filename", + "_modulename", + )? + .getattr(#py_func_name)? + .into()) + }).expect("Failed to compile python."); + #hydroflow.add_state(func) + }; + #[cfg(not(feature = "python"))] + ::std::compiler_error!(#err_lit); + }; + let closure = quote_spanned! {op_span=> + |x| { + #[cfg(feature = "python")] + { + // TODO(mingwei): maybe this can be outside the closure? + let py_func = #context.state_ref(#py_func_ident); + //::pyo3::Python::with_gil(|py| py_func.call1(py, (x,))) + ::pyo3::Python::with_gil(|py| py_func.call1(py, x)) + } + #[cfg(not(feature = "python"))] + panic!() + } + }; + let write_iterator = if is_pull { + let input = &inputs[0]; + quote_spanned! {op_span=> + let #ident = #input.map(#closure); + } + } else { + let output = &outputs[0]; + quote_spanned! {op_span=> + let #ident = #root::pusherator::map::Map::new(#closure, #output); + } + }; + Ok(OperatorWriteOutput { + write_prologue, + write_iterator, + ..Default::default() + }) + }, +};