diff --git a/Cargo.toml b/Cargo.toml index e31979e..97bb0f9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,7 +27,7 @@ async-std-runtime = ["async-std"] attributes = ["pyo3-async-runtimes-macros"] testing = ["clap", "inventory"] tokio-runtime = ["tokio"] -unstable-streams = ["async-channel"] +unstable-streams = ["async-channel", "futures-util/sink", "futures-channel/sink"] default = [] [package.metadata.docs.rs] @@ -118,7 +118,8 @@ required-features = ["async-std-runtime", "testing"] [dependencies] async-channel = { version = "2.3", optional = true } clap = { version = "4.5", optional = true } -futures = "0.3" +futures-channel = "0.3" +futures-util = "0.3" inventory = { version = "0.3", optional = true } once_cell = "1.14" pin-project-lite = "0.2" diff --git a/pytests/test_async_std_asyncio.rs b/pytests/test_async_std_asyncio.rs index 8319c29..e9a9af9 100644 --- a/pytests/test_async_std_asyncio.rs +++ b/pytests/test_async_std_asyncio.rs @@ -16,7 +16,7 @@ use pyo3::{ use pyo3_async_runtimes::TaskLocals; #[cfg(feature = "unstable-streams")] -use futures::{StreamExt, TryStreamExt}; +use futures_util::stream::{StreamExt, TryStreamExt}; #[pyfunction] fn sleep<'p>(py: Python<'p>, secs: Bound<'p, PyAny>) -> PyResult> { diff --git a/pytests/test_tokio_current_thread_asyncio.rs b/pytests/test_tokio_current_thread_asyncio.rs index ffc1cda..acb8fdf 100644 --- a/pytests/test_tokio_current_thread_asyncio.rs +++ b/pytests/test_tokio_current_thread_asyncio.rs @@ -12,7 +12,8 @@ fn main() -> pyo3::PyResult<()> { pyo3_async_runtimes::tokio::init(builder); std::thread::spawn(move || { - pyo3_async_runtimes::tokio::get_runtime().block_on(futures::future::pending::<()>()); + pyo3_async_runtimes::tokio::get_runtime() + .block_on(futures_util::future::pending::<()>()); }); pyo3_async_runtimes::tokio::run(py, pyo3_async_runtimes::testing::main()) diff --git a/pytests/test_tokio_current_thread_run_forever.rs b/pytests/test_tokio_current_thread_run_forever.rs index 0858724..38f5356 100644 --- a/pytests/test_tokio_current_thread_run_forever.rs +++ b/pytests/test_tokio_current_thread_run_forever.rs @@ -10,7 +10,7 @@ fn main() { pyo3_async_runtimes::tokio::init(builder); std::thread::spawn(move || { - pyo3_async_runtimes::tokio::get_runtime().block_on(futures::future::pending::<()>()); + pyo3_async_runtimes::tokio::get_runtime().block_on(futures_util::future::pending::<()>()); }); tokio_run_forever::test_main(); diff --git a/pytests/test_tokio_current_thread_uvloop.rs b/pytests/test_tokio_current_thread_uvloop.rs index d9e2e09..d2ca760 100644 --- a/pytests/test_tokio_current_thread_uvloop.rs +++ b/pytests/test_tokio_current_thread_uvloop.rs @@ -9,7 +9,7 @@ fn main() -> pyo3::PyResult<()> { pyo3_async_runtimes::tokio::init(builder); std::thread::spawn(move || { - pyo3_async_runtimes::tokio::get_runtime().block_on(futures::future::pending::<()>()); + pyo3_async_runtimes::tokio::get_runtime().block_on(futures_util::future::pending::<()>()); }); Python::attach(|py| { diff --git a/pytests/tokio_asyncio/mod.rs b/pytests/tokio_asyncio/mod.rs index 1a3b9c8..ba5fb21 100644 --- a/pytests/tokio_asyncio/mod.rs +++ b/pytests/tokio_asyncio/mod.rs @@ -13,7 +13,7 @@ use pyo3::{ use pyo3_async_runtimes::TaskLocals; #[cfg(feature = "unstable-streams")] -use futures::{StreamExt, TryStreamExt}; +use futures_util::{StreamExt, TryStreamExt}; use crate::common; diff --git a/src/async_std.rs b/src/async_std.rs index 66f4454..9824eab 100644 --- a/src/async_std.rs +++ b/src/async_std.rs @@ -15,7 +15,7 @@ //! ``` use async_std::task; -use futures::FutureExt; +use futures_util::future::FutureExt; use pyo3::prelude::*; use std::{any::Any, cell::RefCell, future::Future, panic, panic::AssertUnwindSafe, pin::Pin}; @@ -481,8 +481,8 @@ where /// /// This function converts the `awaitable` into a Python Task using `run_coroutine_threadsafe`. A /// completion handler sends the result of this Task through a -/// `futures::channel::oneshot::Sender>>` and the future returned by this function -/// simply awaits the result through the `futures::channel::oneshot::Receiver>>`. +/// `futures_channel::oneshot::Sender>>` and the future returned by this function +/// simply awaits the result through the `futures_channel::oneshot::Receiver>>`. /// /// # Arguments /// * `awaitable` - The Python `awaitable` to be converted @@ -544,7 +544,7 @@ pub fn into_future( /// # Examples /// ``` /// use pyo3::prelude::*; -/// use futures::{StreamExt, TryStreamExt}; +/// use futures_util::stream::{StreamExt, TryStreamExt}; /// use std::ffi::CString; /// /// const TEST_MOD: &str = r#" @@ -585,7 +585,7 @@ pub fn into_future( #[cfg(feature = "unstable-streams")] pub fn into_stream_v1( gen: Bound<'_, PyAny>, -) -> PyResult>> + 'static> { +) -> PyResult>> + 'static> { generic::into_stream_v1::(gen) } @@ -602,7 +602,7 @@ pub fn into_stream_v1( /// # Examples /// ``` /// use pyo3::prelude::*; -/// use futures::{StreamExt, TryStreamExt}; +/// use futures_util::stream::{StreamExt, TryStreamExt}; /// use std::ffi::CString; /// /// const TEST_MOD: &str = r#" @@ -647,7 +647,7 @@ pub fn into_stream_v1( pub fn into_stream_with_locals_v1( locals: TaskLocals, gen: Bound<'_, PyAny>, -) -> PyResult>> + 'static> { +) -> PyResult>> + 'static> { generic::into_stream_with_locals_v1::(locals, gen) } @@ -664,7 +664,7 @@ pub fn into_stream_with_locals_v1( /// # Examples /// ``` /// use pyo3::prelude::*; -/// use futures::{StreamExt, TryStreamExt}; +/// use futures_util::stream::{StreamExt, TryStreamExt}; /// use std::ffi::CString; /// /// const TEST_MOD: &str = r#" @@ -709,7 +709,7 @@ pub fn into_stream_with_locals_v1( pub fn into_stream_with_locals_v2( locals: TaskLocals, gen: Bound<'_, PyAny>, -) -> PyResult> + 'static> { +) -> PyResult> + 'static> { generic::into_stream_with_locals_v2::(locals, gen) } @@ -725,7 +725,7 @@ pub fn into_stream_with_locals_v2( /// # Examples /// ``` /// use pyo3::prelude::*; -/// use futures::{StreamExt, TryStreamExt}; +/// use futures_util::stream::{StreamExt, TryStreamExt}; /// use std::ffi::CString; /// /// const TEST_MOD: &str = r#" @@ -766,6 +766,6 @@ pub fn into_stream_with_locals_v2( #[cfg(feature = "unstable-streams")] pub fn into_stream_v2( gen: Bound<'_, PyAny>, -) -> PyResult> + 'static> { +) -> PyResult> + 'static> { generic::into_stream_v2::(gen) } diff --git a/src/generic.rs b/src/generic.rs index df141fd..faa8880 100644 --- a/src/generic.rs +++ b/src/generic.rs @@ -24,9 +24,11 @@ use crate::{ asyncio, call_soon_threadsafe, close, create_future, dump_err, err::RustPanic, get_running_loop, into_future_with_locals, TaskLocals, }; -use futures::channel::oneshot; #[cfg(feature = "unstable-streams")] -use futures::{channel::mpsc, SinkExt}; +use futures_channel::mpsc; +use futures_channel::oneshot; +#[cfg(feature = "unstable-streams")] +use futures_util::SinkExt; use pin_project_lite::pin_project; use pyo3::prelude::*; use pyo3::IntoPyObjectExt; @@ -1307,7 +1309,7 @@ where /// # } /// /// use pyo3::prelude::*; -/// use futures::{StreamExt, TryStreamExt}; +/// use futures_util::stream::{StreamExt, TryStreamExt}; /// use std::ffi::CString; /// /// const TEST_MOD: &str = r#" @@ -1349,7 +1351,7 @@ where pub fn into_stream_with_locals_v1( locals: TaskLocals, gen: Bound<'_, PyAny>, -) -> PyResult>> + 'static> +) -> PyResult>> + 'static> where R: Runtime, { @@ -1461,7 +1463,7 @@ where /// # } /// /// use pyo3::prelude::*; -/// use futures::{StreamExt, TryStreamExt}; +/// use futures_util::stream::{StreamExt, TryStreamExt}; /// use std::ffi::CString; /// /// const TEST_MOD: &str = r#" @@ -1498,7 +1500,7 @@ where #[cfg(feature = "unstable-streams")] pub fn into_stream_v1( gen: Bound<'_, PyAny>, -) -> PyResult>> + 'static> +) -> PyResult>> + 'static> where R: Runtime + ContextExt, { @@ -1660,7 +1662,7 @@ async def forward(gen, sender): /// # } /// /// use pyo3::prelude::*; -/// use futures::{StreamExt, TryStreamExt}; +/// use futures_util::stream::{StreamExt, TryStreamExt}; /// use std::ffi::CString; /// /// const TEST_MOD: &str = r#" @@ -1701,7 +1703,7 @@ async def forward(gen, sender): pub fn into_stream_with_locals_v2( locals: TaskLocals, gen: Bound<'_, PyAny>, -) -> PyResult> + 'static> +) -> PyResult> + 'static> where R: Runtime + ContextExt, { @@ -1819,7 +1821,7 @@ where /// # } /// /// use pyo3::prelude::*; -/// use futures::{StreamExt, TryStreamExt}; +/// use futures_util::stream::{StreamExt, TryStreamExt}; /// use std::ffi::CString; /// /// const TEST_MOD: &str = r#" @@ -1856,7 +1858,7 @@ where #[cfg(feature = "unstable-streams")] pub fn into_stream_v2( gen: Bound<'_, PyAny>, -) -> PyResult> + 'static> +) -> PyResult> + 'static> where R: Runtime + ContextExt, { diff --git a/src/lib.rs b/src/lib.rs index 97e368a..4ba5961 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -397,7 +397,7 @@ pub mod doc_test { use std::future::Future; use std::sync::Arc; -use futures::channel::oneshot; +use futures_channel::oneshot; use pyo3::{call::PyCallArgs, prelude::*, sync::PyOnceLock, types::PyDict}; static ASYNCIO: PyOnceLock> = PyOnceLock::new(); @@ -610,8 +610,8 @@ fn call_soon_threadsafe<'py>( /// /// This function converts the `awaitable` into a Python Task using `run_coroutine_threadsafe`. A /// completion handler sends the result of this Task through a -/// `futures::channel::oneshot::Sender>>` and the future returned by this function -/// simply awaits the result through the `futures::channel::oneshot::Receiver>>`. +/// `futures_channel::oneshot::Sender>>` and the future returned by this function +/// simply awaits the result through the `futures_channel::oneshot::Receiver>>`. /// /// # Arguments /// * `locals` - The Python event loop and context to be used for the provided awaitable diff --git a/src/testing.rs b/src/testing.rs index cc594b5..678c4c8 100644 --- a/src/testing.rs +++ b/src/testing.rs @@ -182,7 +182,7 @@ use std::{future::Future, pin::Pin}; use clap::{Arg, Command}; -use futures::stream::{self, StreamExt}; +use futures_util::stream::StreamExt; use pyo3::prelude::*; /// Args that should be provided to the test program @@ -263,7 +263,7 @@ inventory::collect!(Test); /// Run a sequence of tests while applying any necessary filtering from the `Args` pub async fn test_harness(tests: Vec, args: Args) -> PyResult<()> { - stream::iter(tests) + futures_util::stream::iter(tests) .for_each_concurrent(Some(4), |test| { let mut ignore = false; diff --git a/src/tokio.rs b/src/tokio.rs index 0c09261..9b01e5f 100644 --- a/src/tokio.rs +++ b/src/tokio.rs @@ -35,7 +35,7 @@ use crate::{ #[cfg(feature = "attributes")] pub mod re_exports { /// re-export pending to be used in tokio macros without additional dependency - pub use futures::future::pending; + pub use futures_util::future::pending; /// re-export tokio::runtime to build runtimes in tokio macros without additional dependency pub use tokio::runtime; } @@ -552,8 +552,8 @@ where /// /// This function converts the `awaitable` into a Python Task using `run_coroutine_threadsafe`. A /// completion handler sends the result of this Task through a -/// `futures::channel::oneshot::Sender>>` and the future returned by this function -/// simply awaits the result through the `futures::channel::oneshot::Receiver>>`. +/// `futures_channel::oneshot::Sender>>` and the future returned by this function +/// simply awaits the result through the `futures_channel::oneshot::Receiver>>`. /// /// # Arguments /// * `awaitable` - The Python `awaitable` to be converted @@ -616,7 +616,7 @@ pub fn into_future( /// # Examples /// ``` /// use pyo3::prelude::*; -/// use futures::{StreamExt, TryStreamExt}; +/// use futures_util::stream::{StreamExt, TryStreamExt}; /// use std::ffi::CString; /// /// const TEST_MOD: &str = r#" @@ -661,7 +661,7 @@ pub fn into_future( pub fn into_stream_with_locals_v1( locals: TaskLocals, gen: Bound<'_, PyAny>, -) -> PyResult>> + 'static> { +) -> PyResult>> + 'static> { generic::into_stream_with_locals_v1::(locals, gen) } @@ -677,7 +677,7 @@ pub fn into_stream_with_locals_v1( /// # Examples /// ``` /// use pyo3::prelude::*; -/// use futures::{StreamExt, TryStreamExt}; +/// use futures_util::stream::{StreamExt, TryStreamExt}; /// use std::ffi::CString; /// /// const TEST_MOD: &str = r#" @@ -718,7 +718,7 @@ pub fn into_stream_with_locals_v1( #[cfg(feature = "unstable-streams")] pub fn into_stream_v1( gen: Bound<'_, PyAny>, -) -> PyResult>> + 'static> { +) -> PyResult>> + 'static> { generic::into_stream_v1::(gen) } @@ -735,7 +735,7 @@ pub fn into_stream_v1( /// # Examples /// ``` /// use pyo3::prelude::*; -/// use futures::{StreamExt, TryStreamExt}; +/// use futures_util::stream::{StreamExt, TryStreamExt}; /// use std::ffi::CString; /// /// const TEST_MOD: &str = r#" @@ -780,7 +780,7 @@ pub fn into_stream_v1( pub fn into_stream_with_locals_v2( locals: TaskLocals, gen: Bound<'_, PyAny>, -) -> PyResult> + 'static> { +) -> PyResult> + 'static> { generic::into_stream_with_locals_v2::(locals, gen) } @@ -796,7 +796,7 @@ pub fn into_stream_with_locals_v2( /// # Examples /// ``` /// use pyo3::prelude::*; -/// use futures::{StreamExt, TryStreamExt}; +/// use futures_util::stream::{StreamExt, TryStreamExt}; /// use std::ffi::CString; /// /// const TEST_MOD: &str = r#" @@ -837,6 +837,6 @@ pub fn into_stream_with_locals_v2( #[cfg(feature = "unstable-streams")] pub fn into_stream_v2( gen: Bound<'_, PyAny>, -) -> PyResult> + 'static> { +) -> PyResult> + 'static> { generic::into_stream_v2::(gen) }