Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ async-std-runtime = ["async-std"]
attributes = ["pyo3-async-runtimes-macros"]
testing = ["clap", "inventory"]
tokio-runtime = ["tokio"]
unstable-streams = ["async-channel"]
unstable-streams = ["async-channel", "futures-util/sink", "futures-channel/sink"]
default = []

[package.metadata.docs.rs]
Expand Down Expand Up @@ -116,7 +116,8 @@ required-features = ["async-std-runtime", "testing"]
[dependencies]
async-channel = { version = "2.3", optional = true }
clap = { version = "4.5", optional = true }
futures = "0.3"
futures-channel = "0.3"
futures-util = "0.3"
inventory = { version = "0.3", optional = true }
once_cell = "1.14"
pin-project-lite = "0.2"
Expand Down
2 changes: 1 addition & 1 deletion pytests/test_async_std_asyncio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use pyo3::{
use pyo3_async_runtimes::TaskLocals;

#[cfg(feature = "unstable-streams")]
use futures::{StreamExt, TryStreamExt};
use futures_util::stream::{StreamExt, TryStreamExt};

#[pyfunction]
fn sleep<'p>(py: Python<'p>, secs: Bound<'p, PyAny>) -> PyResult<Bound<'p, PyAny>> {
Expand Down
3 changes: 2 additions & 1 deletion pytests/test_tokio_current_thread_asyncio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ fn main() -> pyo3::PyResult<()> {

pyo3_async_runtimes::tokio::init(builder);
std::thread::spawn(move || {
pyo3_async_runtimes::tokio::get_runtime().block_on(futures::future::pending::<()>());
pyo3_async_runtimes::tokio::get_runtime()
.block_on(futures_util::future::pending::<()>());
});

pyo3_async_runtimes::tokio::run(py, pyo3_async_runtimes::testing::main())
Expand Down
2 changes: 1 addition & 1 deletion pytests/test_tokio_current_thread_run_forever.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ fn main() {

pyo3_async_runtimes::tokio::init(builder);
std::thread::spawn(move || {
pyo3_async_runtimes::tokio::get_runtime().block_on(futures::future::pending::<()>());
pyo3_async_runtimes::tokio::get_runtime().block_on(futures_util::future::pending::<()>());
});

tokio_run_forever::test_main();
Expand Down
2 changes: 1 addition & 1 deletion pytests/test_tokio_current_thread_uvloop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ fn main() -> pyo3::PyResult<()> {

pyo3_async_runtimes::tokio::init(builder);
std::thread::spawn(move || {
pyo3_async_runtimes::tokio::get_runtime().block_on(futures::future::pending::<()>());
pyo3_async_runtimes::tokio::get_runtime().block_on(futures_util::future::pending::<()>());
});

Python::attach(|py| {
Expand Down
2 changes: 1 addition & 1 deletion pytests/tokio_asyncio/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use pyo3::{
use pyo3_async_runtimes::TaskLocals;

#[cfg(feature = "unstable-streams")]
use futures::{StreamExt, TryStreamExt};
use futures_util::{StreamExt, TryStreamExt};

use crate::common;

Expand Down
22 changes: 11 additions & 11 deletions src/async_std.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
//! ```

use async_std::task;
use futures::FutureExt;
use futures_util::future::FutureExt;
use pyo3::prelude::*;
use std::{any::Any, cell::RefCell, future::Future, panic, panic::AssertUnwindSafe, pin::Pin};

Expand Down Expand Up @@ -481,8 +481,8 @@ where
///
/// This function converts the `awaitable` into a Python Task using `run_coroutine_threadsafe`. A
/// completion handler sends the result of this Task through a
/// `futures::channel::oneshot::Sender<PyResult<Py<PyAny>>>` and the future returned by this function
/// simply awaits the result through the `futures::channel::oneshot::Receiver<PyResult<Py<PyAny>>>`.
/// `futures_channel::oneshot::Sender<PyResult<Py<PyAny>>>` and the future returned by this function
/// simply awaits the result through the `futures_channel::oneshot::Receiver<PyResult<Py<PyAny>>>`.
///
/// # Arguments
/// * `awaitable` - The Python `awaitable` to be converted
Expand Down Expand Up @@ -544,7 +544,7 @@ pub fn into_future(
/// # Examples
/// ```
/// use pyo3::prelude::*;
/// use futures::{StreamExt, TryStreamExt};
/// use futures_util::stream::{StreamExt, TryStreamExt};
/// use std::ffi::CString;
///
/// const TEST_MOD: &str = r#"
Expand Down Expand Up @@ -585,7 +585,7 @@ pub fn into_future(
#[cfg(feature = "unstable-streams")]
pub fn into_stream_v1(
gen: Bound<'_, PyAny>,
) -> PyResult<impl futures::Stream<Item = PyResult<Py<PyAny>>> + 'static> {
) -> PyResult<impl futures_util::stream::Stream<Item = PyResult<Py<PyAny>>> + 'static> {
generic::into_stream_v1::<AsyncStdRuntime>(gen)
}

Expand All @@ -602,7 +602,7 @@ pub fn into_stream_v1(
/// # Examples
/// ```
/// use pyo3::prelude::*;
/// use futures::{StreamExt, TryStreamExt};
/// use futures_util::stream::{StreamExt, TryStreamExt};
/// use std::ffi::CString;
///
/// const TEST_MOD: &str = r#"
Expand Down Expand Up @@ -647,7 +647,7 @@ pub fn into_stream_v1(
pub fn into_stream_with_locals_v1(
locals: TaskLocals,
gen: Bound<'_, PyAny>,
) -> PyResult<impl futures::Stream<Item = PyResult<Py<PyAny>>> + 'static> {
) -> PyResult<impl futures_util::stream::Stream<Item = PyResult<Py<PyAny>>> + 'static> {
generic::into_stream_with_locals_v1::<AsyncStdRuntime>(locals, gen)
}

Expand All @@ -664,7 +664,7 @@ pub fn into_stream_with_locals_v1(
/// # Examples
/// ```
/// use pyo3::prelude::*;
/// use futures::{StreamExt, TryStreamExt};
/// use futures_util::stream::{StreamExt, TryStreamExt};
/// use std::ffi::CString;
///
/// const TEST_MOD: &str = r#"
Expand Down Expand Up @@ -709,7 +709,7 @@ pub fn into_stream_with_locals_v1(
pub fn into_stream_with_locals_v2(
locals: TaskLocals,
gen: Bound<'_, PyAny>,
) -> PyResult<impl futures::Stream<Item = Py<PyAny>> + 'static> {
) -> PyResult<impl futures_util::stream::Stream<Item = Py<PyAny>> + 'static> {
generic::into_stream_with_locals_v2::<AsyncStdRuntime>(locals, gen)
}

Expand All @@ -725,7 +725,7 @@ pub fn into_stream_with_locals_v2(
/// # Examples
/// ```
/// use pyo3::prelude::*;
/// use futures::{StreamExt, TryStreamExt};
/// use futures_util::stream::{StreamExt, TryStreamExt};
/// use std::ffi::CString;
///
/// const TEST_MOD: &str = r#"
Expand Down Expand Up @@ -766,6 +766,6 @@ pub fn into_stream_with_locals_v2(
#[cfg(feature = "unstable-streams")]
pub fn into_stream_v2(
gen: Bound<'_, PyAny>,
) -> PyResult<impl futures::Stream<Item = Py<PyAny>> + 'static> {
) -> PyResult<impl futures_util::stream::Stream<Item = Py<PyAny>> + 'static> {
generic::into_stream_v2::<AsyncStdRuntime>(gen)
}
22 changes: 12 additions & 10 deletions src/generic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@ use crate::{
asyncio, call_soon_threadsafe, close, create_future, dump_err, err::RustPanic,
get_running_loop, into_future_with_locals, TaskLocals,
};
use futures::channel::oneshot;
#[cfg(feature = "unstable-streams")]
use futures::{channel::mpsc, SinkExt};
use futures_channel::mpsc;
use futures_channel::oneshot;
#[cfg(feature = "unstable-streams")]
use futures_util::SinkExt;
use pin_project_lite::pin_project;
use pyo3::prelude::*;
use pyo3::IntoPyObjectExt;
Expand Down Expand Up @@ -1307,7 +1309,7 @@ where
/// # }
///
/// use pyo3::prelude::*;
/// use futures::{StreamExt, TryStreamExt};
/// use futures_util::stream::{StreamExt, TryStreamExt};
/// use std::ffi::CString;
///
/// const TEST_MOD: &str = r#"
Expand Down Expand Up @@ -1349,7 +1351,7 @@ where
pub fn into_stream_with_locals_v1<R>(
locals: TaskLocals,
gen: Bound<'_, PyAny>,
) -> PyResult<impl futures::Stream<Item = PyResult<Py<PyAny>>> + 'static>
) -> PyResult<impl futures_util::stream::Stream<Item = PyResult<Py<PyAny>>> + 'static>
where
R: Runtime,
{
Expand Down Expand Up @@ -1461,7 +1463,7 @@ where
/// # }
///
/// use pyo3::prelude::*;
/// use futures::{StreamExt, TryStreamExt};
/// use futures_util::stream::{StreamExt, TryStreamExt};
/// use std::ffi::CString;
///
/// const TEST_MOD: &str = r#"
Expand Down Expand Up @@ -1498,7 +1500,7 @@ where
#[cfg(feature = "unstable-streams")]
pub fn into_stream_v1<R>(
gen: Bound<'_, PyAny>,
) -> PyResult<impl futures::Stream<Item = PyResult<Py<PyAny>>> + 'static>
) -> PyResult<impl futures_util::stream::Stream<Item = PyResult<Py<PyAny>>> + 'static>
where
R: Runtime + ContextExt,
{
Expand Down Expand Up @@ -1660,7 +1662,7 @@ async def forward(gen, sender):
/// # }
///
/// use pyo3::prelude::*;
/// use futures::{StreamExt, TryStreamExt};
/// use futures_util::stream::{StreamExt, TryStreamExt};
/// use std::ffi::CString;
///
/// const TEST_MOD: &str = r#"
Expand Down Expand Up @@ -1701,7 +1703,7 @@ async def forward(gen, sender):
pub fn into_stream_with_locals_v2<R>(
locals: TaskLocals,
gen: Bound<'_, PyAny>,
) -> PyResult<impl futures::Stream<Item = Py<PyAny>> + 'static>
) -> PyResult<impl futures_util::stream::Stream<Item = Py<PyAny>> + 'static>
where
R: Runtime + ContextExt,
{
Expand Down Expand Up @@ -1819,7 +1821,7 @@ where
/// # }
///
/// use pyo3::prelude::*;
/// use futures::{StreamExt, TryStreamExt};
/// use futures_util::stream::{StreamExt, TryStreamExt};
/// use std::ffi::CString;
///
/// const TEST_MOD: &str = r#"
Expand Down Expand Up @@ -1856,7 +1858,7 @@ where
#[cfg(feature = "unstable-streams")]
pub fn into_stream_v2<R>(
gen: Bound<'_, PyAny>,
) -> PyResult<impl futures::Stream<Item = Py<PyAny>> + 'static>
) -> PyResult<impl futures_util::stream::Stream<Item = Py<PyAny>> + 'static>
where
R: Runtime + ContextExt,
{
Expand Down
6 changes: 3 additions & 3 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,7 @@ pub mod doc_test {
use std::future::Future;
use std::sync::Arc;

use futures::channel::oneshot;
use futures_channel::oneshot;
use pyo3::{call::PyCallArgs, prelude::*, sync::PyOnceLock, types::PyDict};

static ASYNCIO: PyOnceLock<Py<PyAny>> = PyOnceLock::new();
Expand Down Expand Up @@ -610,8 +610,8 @@ fn call_soon_threadsafe<'py>(
///
/// This function converts the `awaitable` into a Python Task using `run_coroutine_threadsafe`. A
/// completion handler sends the result of this Task through a
/// `futures::channel::oneshot::Sender<PyResult<Py<PyAny>>>` and the future returned by this function
/// simply awaits the result through the `futures::channel::oneshot::Receiver<PyResult<Py<PyAny>>>`.
/// `futures_channel::oneshot::Sender<PyResult<Py<PyAny>>>` and the future returned by this function
/// simply awaits the result through the `futures_channel::oneshot::Receiver<PyResult<Py<PyAny>>>`.
///
/// # Arguments
/// * `locals` - The Python event loop and context to be used for the provided awaitable
Expand Down
4 changes: 2 additions & 2 deletions src/testing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@
use std::{future::Future, pin::Pin};

use clap::{Arg, Command};
use futures::stream::{self, StreamExt};
use futures_util::stream::StreamExt;
use pyo3::prelude::*;

/// Args that should be provided to the test program
Expand Down Expand Up @@ -263,7 +263,7 @@ inventory::collect!(Test);

/// Run a sequence of tests while applying any necessary filtering from the `Args`
pub async fn test_harness(tests: Vec<Test>, args: Args) -> PyResult<()> {
stream::iter(tests)
futures_util::stream::iter(tests)
.for_each_concurrent(Some(4), |test| {
let mut ignore = false;

Expand Down
22 changes: 11 additions & 11 deletions src/tokio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use crate::{
#[cfg(feature = "attributes")]
pub mod re_exports {
/// re-export pending to be used in tokio macros without additional dependency
pub use futures::future::pending;
pub use futures_util::future::pending;
/// re-export tokio::runtime to build runtimes in tokio macros without additional dependency
pub use tokio::runtime;
}
Expand Down Expand Up @@ -552,8 +552,8 @@ where
///
/// This function converts the `awaitable` into a Python Task using `run_coroutine_threadsafe`. A
/// completion handler sends the result of this Task through a
/// `futures::channel::oneshot::Sender<PyResult<Py<PyAny>>>` and the future returned by this function
/// simply awaits the result through the `futures::channel::oneshot::Receiver<PyResult<Py<PyAny>>>`.
/// `futures_channel::oneshot::Sender<PyResult<Py<PyAny>>>` and the future returned by this function
/// simply awaits the result through the `futures_channel::oneshot::Receiver<PyResult<Py<PyAny>>>`.
///
/// # Arguments
/// * `awaitable` - The Python `awaitable` to be converted
Expand Down Expand Up @@ -616,7 +616,7 @@ pub fn into_future(
/// # Examples
/// ```
/// use pyo3::prelude::*;
/// use futures::{StreamExt, TryStreamExt};
/// use futures_util::stream::{StreamExt, TryStreamExt};
/// use std::ffi::CString;
///
/// const TEST_MOD: &str = r#"
Expand Down Expand Up @@ -661,7 +661,7 @@ pub fn into_future(
pub fn into_stream_with_locals_v1(
locals: TaskLocals,
gen: Bound<'_, PyAny>,
) -> PyResult<impl futures::Stream<Item = PyResult<Py<PyAny>>> + 'static> {
) -> PyResult<impl futures_util::stream::Stream<Item = PyResult<Py<PyAny>>> + 'static> {
generic::into_stream_with_locals_v1::<TokioRuntime>(locals, gen)
}

Expand All @@ -677,7 +677,7 @@ pub fn into_stream_with_locals_v1(
/// # Examples
/// ```
/// use pyo3::prelude::*;
/// use futures::{StreamExt, TryStreamExt};
/// use futures_util::stream::{StreamExt, TryStreamExt};
/// use std::ffi::CString;
///
/// const TEST_MOD: &str = r#"
Expand Down Expand Up @@ -718,7 +718,7 @@ pub fn into_stream_with_locals_v1(
#[cfg(feature = "unstable-streams")]
pub fn into_stream_v1(
gen: Bound<'_, PyAny>,
) -> PyResult<impl futures::Stream<Item = PyResult<Py<PyAny>>> + 'static> {
) -> PyResult<impl futures_util::stream::Stream<Item = PyResult<Py<PyAny>>> + 'static> {
generic::into_stream_v1::<TokioRuntime>(gen)
}

Expand All @@ -735,7 +735,7 @@ pub fn into_stream_v1(
/// # Examples
/// ```
/// use pyo3::prelude::*;
/// use futures::{StreamExt, TryStreamExt};
/// use futures_util::stream::{StreamExt, TryStreamExt};
/// use std::ffi::CString;
///
/// const TEST_MOD: &str = r#"
Expand Down Expand Up @@ -780,7 +780,7 @@ pub fn into_stream_v1(
pub fn into_stream_with_locals_v2(
locals: TaskLocals,
gen: Bound<'_, PyAny>,
) -> PyResult<impl futures::Stream<Item = Py<PyAny>> + 'static> {
) -> PyResult<impl futures_util::stream::Stream<Item = Py<PyAny>> + 'static> {
generic::into_stream_with_locals_v2::<TokioRuntime>(locals, gen)
}

Expand All @@ -796,7 +796,7 @@ pub fn into_stream_with_locals_v2(
/// # Examples
/// ```
/// use pyo3::prelude::*;
/// use futures::{StreamExt, TryStreamExt};
/// use futures_util::stream::{StreamExt, TryStreamExt};
/// use std::ffi::CString;
///
/// const TEST_MOD: &str = r#"
Expand Down Expand Up @@ -837,6 +837,6 @@ pub fn into_stream_with_locals_v2(
#[cfg(feature = "unstable-streams")]
pub fn into_stream_v2(
gen: Bound<'_, PyAny>,
) -> PyResult<impl futures::Stream<Item = Py<PyAny>> + 'static> {
) -> PyResult<impl futures_util::stream::Stream<Item = Py<PyAny>> + 'static> {
generic::into_stream_v2::<TokioRuntime>(gen)
}
Loading