Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ async-std-runtime = ["async-std"]
attributes = ["pyo3-async-runtimes-macros"]
testing = ["clap", "inventory"]
tokio-runtime = ["tokio"]
unstable-streams = ["async-channel"]
unstable-streams = ["async-channel", "futures-util/sink", "futures-channel/sink"]
default = []

[package.metadata.docs.rs]
Expand Down Expand Up @@ -118,7 +118,8 @@ required-features = ["async-std-runtime", "testing"]
[dependencies]
async-channel = { version = "2.3", optional = true }
clap = { version = "4.5", optional = true }
futures = "0.3"
futures-channel = "0.3"
futures-util = "0.3"
inventory = { version = "0.3", optional = true }
once_cell = "1.14"
pin-project-lite = "0.2"
Expand Down
2 changes: 1 addition & 1 deletion pytests/test_async_std_asyncio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use pyo3::{
use pyo3_async_runtimes::TaskLocals;

#[cfg(feature = "unstable-streams")]
use futures::{StreamExt, TryStreamExt};
use futures_util::stream::{StreamExt, TryStreamExt};

#[pyfunction]
fn sleep<'p>(py: Python<'p>, secs: Bound<'p, PyAny>) -> PyResult<Bound<'p, PyAny>> {
Expand Down
3 changes: 2 additions & 1 deletion pytests/test_tokio_current_thread_asyncio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ fn main() -> pyo3::PyResult<()> {

pyo3_async_runtimes::tokio::init(builder);
std::thread::spawn(move || {
pyo3_async_runtimes::tokio::get_runtime().block_on(futures::future::pending::<()>());
pyo3_async_runtimes::tokio::get_runtime()
.block_on(futures_util::future::pending::<()>());
});

pyo3_async_runtimes::tokio::run(py, pyo3_async_runtimes::testing::main())
Expand Down
2 changes: 1 addition & 1 deletion pytests/test_tokio_current_thread_run_forever.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ fn main() {

pyo3_async_runtimes::tokio::init(builder);
std::thread::spawn(move || {
pyo3_async_runtimes::tokio::get_runtime().block_on(futures::future::pending::<()>());
pyo3_async_runtimes::tokio::get_runtime().block_on(futures_util::future::pending::<()>());
});

tokio_run_forever::test_main();
Expand Down
2 changes: 1 addition & 1 deletion pytests/test_tokio_current_thread_uvloop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ fn main() -> pyo3::PyResult<()> {

pyo3_async_runtimes::tokio::init(builder);
std::thread::spawn(move || {
pyo3_async_runtimes::tokio::get_runtime().block_on(futures::future::pending::<()>());
pyo3_async_runtimes::tokio::get_runtime().block_on(futures_util::future::pending::<()>());
});

Python::attach(|py| {
Expand Down
2 changes: 1 addition & 1 deletion pytests/tokio_asyncio/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use pyo3::{
use pyo3_async_runtimes::TaskLocals;

#[cfg(feature = "unstable-streams")]
use futures::{StreamExt, TryStreamExt};
use futures_util::{StreamExt, TryStreamExt};

use crate::common;

Expand Down
22 changes: 11 additions & 11 deletions src/async_std.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
//! ```

use async_std::task;
use futures::FutureExt;
use futures_util::future::FutureExt;
use pyo3::prelude::*;
use std::{any::Any, cell::RefCell, future::Future, panic, panic::AssertUnwindSafe, pin::Pin};

Expand Down Expand Up @@ -481,8 +481,8 @@ where
///
/// This function converts the `awaitable` into a Python Task using `run_coroutine_threadsafe`. A
/// completion handler sends the result of this Task through a
/// `futures::channel::oneshot::Sender<PyResult<Py<PyAny>>>` and the future returned by this function
/// simply awaits the result through the `futures::channel::oneshot::Receiver<PyResult<Py<PyAny>>>`.
/// `futures_channel::oneshot::Sender<PyResult<Py<PyAny>>>` and the future returned by this function
/// simply awaits the result through the `futures_channel::oneshot::Receiver<PyResult<Py<PyAny>>>`.
///
/// # Arguments
/// * `awaitable` - The Python `awaitable` to be converted
Expand Down Expand Up @@ -544,7 +544,7 @@ pub fn into_future(
/// # Examples
/// ```
/// use pyo3::prelude::*;
/// use futures::{StreamExt, TryStreamExt};
/// use futures_util::stream::{StreamExt, TryStreamExt};
/// use std::ffi::CString;
///
/// const TEST_MOD: &str = r#"
Expand Down Expand Up @@ -585,7 +585,7 @@ pub fn into_future(
#[cfg(feature = "unstable-streams")]
pub fn into_stream_v1(
gen: Bound<'_, PyAny>,
) -> PyResult<impl futures::Stream<Item = PyResult<Py<PyAny>>> + 'static> {
) -> PyResult<impl futures_util::stream::Stream<Item = PyResult<Py<PyAny>>> + 'static> {
generic::into_stream_v1::<AsyncStdRuntime>(gen)
}

Expand All @@ -602,7 +602,7 @@ pub fn into_stream_v1(
/// # Examples
/// ```
/// use pyo3::prelude::*;
/// use futures::{StreamExt, TryStreamExt};
/// use futures_util::stream::{StreamExt, TryStreamExt};
/// use std::ffi::CString;
///
/// const TEST_MOD: &str = r#"
Expand Down Expand Up @@ -647,7 +647,7 @@ pub fn into_stream_v1(
pub fn into_stream_with_locals_v1(
locals: TaskLocals,
gen: Bound<'_, PyAny>,
) -> PyResult<impl futures::Stream<Item = PyResult<Py<PyAny>>> + 'static> {
) -> PyResult<impl futures_util::stream::Stream<Item = PyResult<Py<PyAny>>> + 'static> {
generic::into_stream_with_locals_v1::<AsyncStdRuntime>(locals, gen)
}

Expand All @@ -664,7 +664,7 @@ pub fn into_stream_with_locals_v1(
/// # Examples
/// ```
/// use pyo3::prelude::*;
/// use futures::{StreamExt, TryStreamExt};
/// use futures_util::stream::{StreamExt, TryStreamExt};
/// use std::ffi::CString;
///
/// const TEST_MOD: &str = r#"
Expand Down Expand Up @@ -709,7 +709,7 @@ pub fn into_stream_with_locals_v1(
pub fn into_stream_with_locals_v2(
locals: TaskLocals,
gen: Bound<'_, PyAny>,
) -> PyResult<impl futures::Stream<Item = Py<PyAny>> + 'static> {
) -> PyResult<impl futures_util::stream::Stream<Item = Py<PyAny>> + 'static> {
generic::into_stream_with_locals_v2::<AsyncStdRuntime>(locals, gen)
}

Expand All @@ -725,7 +725,7 @@ pub fn into_stream_with_locals_v2(
/// # Examples
/// ```
/// use pyo3::prelude::*;
/// use futures::{StreamExt, TryStreamExt};
/// use futures_util::stream::{StreamExt, TryStreamExt};
/// use std::ffi::CString;
///
/// const TEST_MOD: &str = r#"
Expand Down Expand Up @@ -766,6 +766,6 @@ pub fn into_stream_with_locals_v2(
#[cfg(feature = "unstable-streams")]
pub fn into_stream_v2(
gen: Bound<'_, PyAny>,
) -> PyResult<impl futures::Stream<Item = Py<PyAny>> + 'static> {
) -> PyResult<impl futures_util::stream::Stream<Item = Py<PyAny>> + 'static> {
generic::into_stream_v2::<AsyncStdRuntime>(gen)
}
22 changes: 12 additions & 10 deletions src/generic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@ use crate::{
asyncio, call_soon_threadsafe, close, create_future, dump_err, err::RustPanic,
get_running_loop, into_future_with_locals, TaskLocals,
};
use futures::channel::oneshot;
#[cfg(feature = "unstable-streams")]
use futures::{channel::mpsc, SinkExt};
use futures_channel::mpsc;
use futures_channel::oneshot;
#[cfg(feature = "unstable-streams")]
use futures_util::SinkExt;
use pin_project_lite::pin_project;
use pyo3::prelude::*;
use pyo3::IntoPyObjectExt;
Expand Down Expand Up @@ -1307,7 +1309,7 @@ where
/// # }
///
/// use pyo3::prelude::*;
/// use futures::{StreamExt, TryStreamExt};
/// use futures_util::stream::{StreamExt, TryStreamExt};
/// use std::ffi::CString;
///
/// const TEST_MOD: &str = r#"
Expand Down Expand Up @@ -1349,7 +1351,7 @@ where
pub fn into_stream_with_locals_v1<R>(
locals: TaskLocals,
gen: Bound<'_, PyAny>,
) -> PyResult<impl futures::Stream<Item = PyResult<Py<PyAny>>> + 'static>
) -> PyResult<impl futures_util::stream::Stream<Item = PyResult<Py<PyAny>>> + 'static>
where
R: Runtime,
{
Expand Down Expand Up @@ -1461,7 +1463,7 @@ where
/// # }
///
/// use pyo3::prelude::*;
/// use futures::{StreamExt, TryStreamExt};
/// use futures_util::stream::{StreamExt, TryStreamExt};
/// use std::ffi::CString;
///
/// const TEST_MOD: &str = r#"
Expand Down Expand Up @@ -1498,7 +1500,7 @@ where
#[cfg(feature = "unstable-streams")]
pub fn into_stream_v1<R>(
gen: Bound<'_, PyAny>,
) -> PyResult<impl futures::Stream<Item = PyResult<Py<PyAny>>> + 'static>
) -> PyResult<impl futures_util::stream::Stream<Item = PyResult<Py<PyAny>>> + 'static>
where
R: Runtime + ContextExt,
{
Expand Down Expand Up @@ -1660,7 +1662,7 @@ async def forward(gen, sender):
/// # }
///
/// use pyo3::prelude::*;
/// use futures::{StreamExt, TryStreamExt};
/// use futures_util::stream::{StreamExt, TryStreamExt};
/// use std::ffi::CString;
///
/// const TEST_MOD: &str = r#"
Expand Down Expand Up @@ -1701,7 +1703,7 @@ async def forward(gen, sender):
pub fn into_stream_with_locals_v2<R>(
locals: TaskLocals,
gen: Bound<'_, PyAny>,
) -> PyResult<impl futures::Stream<Item = Py<PyAny>> + 'static>
) -> PyResult<impl futures_util::stream::Stream<Item = Py<PyAny>> + 'static>
where
R: Runtime + ContextExt,
{
Expand Down Expand Up @@ -1819,7 +1821,7 @@ where
/// # }
///
/// use pyo3::prelude::*;
/// use futures::{StreamExt, TryStreamExt};
/// use futures_util::stream::{StreamExt, TryStreamExt};
/// use std::ffi::CString;
///
/// const TEST_MOD: &str = r#"
Expand Down Expand Up @@ -1856,7 +1858,7 @@ where
#[cfg(feature = "unstable-streams")]
pub fn into_stream_v2<R>(
gen: Bound<'_, PyAny>,
) -> PyResult<impl futures::Stream<Item = Py<PyAny>> + 'static>
) -> PyResult<impl futures_util::stream::Stream<Item = Py<PyAny>> + 'static>
where
R: Runtime + ContextExt,
{
Expand Down
6 changes: 3 additions & 3 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,7 @@ pub mod doc_test {
use std::future::Future;
use std::sync::Arc;

use futures::channel::oneshot;
use futures_channel::oneshot;
use pyo3::{call::PyCallArgs, prelude::*, sync::PyOnceLock, types::PyDict};

static ASYNCIO: PyOnceLock<Py<PyAny>> = PyOnceLock::new();
Expand Down Expand Up @@ -610,8 +610,8 @@ fn call_soon_threadsafe<'py>(
///
/// This function converts the `awaitable` into a Python Task using `run_coroutine_threadsafe`. A
/// completion handler sends the result of this Task through a
/// `futures::channel::oneshot::Sender<PyResult<Py<PyAny>>>` and the future returned by this function
/// simply awaits the result through the `futures::channel::oneshot::Receiver<PyResult<Py<PyAny>>>`.
/// `futures_channel::oneshot::Sender<PyResult<Py<PyAny>>>` and the future returned by this function
/// simply awaits the result through the `futures_channel::oneshot::Receiver<PyResult<Py<PyAny>>>`.
///
/// # Arguments
/// * `locals` - The Python event loop and context to be used for the provided awaitable
Expand Down
4 changes: 2 additions & 2 deletions src/testing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@
use std::{future::Future, pin::Pin};

use clap::{Arg, Command};
use futures::stream::{self, StreamExt};
use futures_util::stream::StreamExt;
use pyo3::prelude::*;

/// Args that should be provided to the test program
Expand Down Expand Up @@ -263,7 +263,7 @@ inventory::collect!(Test);

/// Run a sequence of tests while applying any necessary filtering from the `Args`
pub async fn test_harness(tests: Vec<Test>, args: Args) -> PyResult<()> {
stream::iter(tests)
futures_util::stream::iter(tests)
.for_each_concurrent(Some(4), |test| {
let mut ignore = false;

Expand Down
22 changes: 11 additions & 11 deletions src/tokio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use crate::{
#[cfg(feature = "attributes")]
pub mod re_exports {
/// re-export pending to be used in tokio macros without additional dependency
pub use futures::future::pending;
pub use futures_util::future::pending;
/// re-export tokio::runtime to build runtimes in tokio macros without additional dependency
pub use tokio::runtime;
}
Expand Down Expand Up @@ -552,8 +552,8 @@ where
///
/// This function converts the `awaitable` into a Python Task using `run_coroutine_threadsafe`. A
/// completion handler sends the result of this Task through a
/// `futures::channel::oneshot::Sender<PyResult<Py<PyAny>>>` and the future returned by this function
/// simply awaits the result through the `futures::channel::oneshot::Receiver<PyResult<Py<PyAny>>>`.
/// `futures_channel::oneshot::Sender<PyResult<Py<PyAny>>>` and the future returned by this function
/// simply awaits the result through the `futures_channel::oneshot::Receiver<PyResult<Py<PyAny>>>`.
///
/// # Arguments
/// * `awaitable` - The Python `awaitable` to be converted
Expand Down Expand Up @@ -616,7 +616,7 @@ pub fn into_future(
/// # Examples
/// ```
/// use pyo3::prelude::*;
/// use futures::{StreamExt, TryStreamExt};
/// use futures_util::stream::{StreamExt, TryStreamExt};
/// use std::ffi::CString;
///
/// const TEST_MOD: &str = r#"
Expand Down Expand Up @@ -661,7 +661,7 @@ pub fn into_future(
pub fn into_stream_with_locals_v1(
locals: TaskLocals,
gen: Bound<'_, PyAny>,
) -> PyResult<impl futures::Stream<Item = PyResult<Py<PyAny>>> + 'static> {
) -> PyResult<impl futures_util::stream::Stream<Item = PyResult<Py<PyAny>>> + 'static> {
generic::into_stream_with_locals_v1::<TokioRuntime>(locals, gen)
}

Expand All @@ -677,7 +677,7 @@ pub fn into_stream_with_locals_v1(
/// # Examples
/// ```
/// use pyo3::prelude::*;
/// use futures::{StreamExt, TryStreamExt};
/// use futures_util::stream::{StreamExt, TryStreamExt};
/// use std::ffi::CString;
///
/// const TEST_MOD: &str = r#"
Expand Down Expand Up @@ -718,7 +718,7 @@ pub fn into_stream_with_locals_v1(
#[cfg(feature = "unstable-streams")]
pub fn into_stream_v1(
gen: Bound<'_, PyAny>,
) -> PyResult<impl futures::Stream<Item = PyResult<Py<PyAny>>> + 'static> {
) -> PyResult<impl futures_util::stream::Stream<Item = PyResult<Py<PyAny>>> + 'static> {
generic::into_stream_v1::<TokioRuntime>(gen)
}

Expand All @@ -735,7 +735,7 @@ pub fn into_stream_v1(
/// # Examples
/// ```
/// use pyo3::prelude::*;
/// use futures::{StreamExt, TryStreamExt};
/// use futures_util::stream::{StreamExt, TryStreamExt};
/// use std::ffi::CString;
///
/// const TEST_MOD: &str = r#"
Expand Down Expand Up @@ -780,7 +780,7 @@ pub fn into_stream_v1(
pub fn into_stream_with_locals_v2(
locals: TaskLocals,
gen: Bound<'_, PyAny>,
) -> PyResult<impl futures::Stream<Item = Py<PyAny>> + 'static> {
) -> PyResult<impl futures_util::stream::Stream<Item = Py<PyAny>> + 'static> {
generic::into_stream_with_locals_v2::<TokioRuntime>(locals, gen)
}

Expand All @@ -796,7 +796,7 @@ pub fn into_stream_with_locals_v2(
/// # Examples
/// ```
/// use pyo3::prelude::*;
/// use futures::{StreamExt, TryStreamExt};
/// use futures_util::stream::{StreamExt, TryStreamExt};
/// use std::ffi::CString;
///
/// const TEST_MOD: &str = r#"
Expand Down Expand Up @@ -837,6 +837,6 @@ pub fn into_stream_with_locals_v2(
#[cfg(feature = "unstable-streams")]
pub fn into_stream_v2(
gen: Bound<'_, PyAny>,
) -> PyResult<impl futures::Stream<Item = Py<PyAny>> + 'static> {
) -> PyResult<impl futures_util::stream::Stream<Item = Py<PyAny>> + 'static> {
generic::into_stream_v2::<TokioRuntime>(gen)
}