Skip to content

Commit

Permalink
Make rtc::Closed, chmux::SenderSink and chmux::ListenerStream Sync.
Browse files Browse the repository at this point in the history
Allows them to be used in more places.
  • Loading branch information
surban committed Sep 27, 2024
1 parent bfd1248 commit 07bdcbc
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 13 deletions.
9 changes: 4 additions & 5 deletions remoc/src/chmux/listener.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
use futures::{
future::BoxFuture,
ready,
stream::Stream,
task::{Context, Poll},
FutureExt,
};
use std::{error::Error, fmt, pin::Pin, sync::Arc};
use tokio::sync::{mpsc, oneshot, Mutex};
use tokio_util::sync::ReusableBoxFuture;

use super::{
mux::PortEvt,
Expand Down Expand Up @@ -268,7 +267,7 @@ impl Drop for Listener {
pub struct ListenerStream {
server: Arc<Mutex<Listener>>,
#[allow(clippy::type_complexity)]
accept_fut: Option<BoxFuture<'static, Option<Result<(Sender, Receiver), ListenerError>>>>,
accept_fut: Option<ReusableBoxFuture<'static, Option<Result<(Sender, Receiver), ListenerError>>>>,
}

impl ListenerStream {
Expand All @@ -283,11 +282,11 @@ impl ListenerStream {

fn poll_next(&mut self, cx: &mut Context) -> Poll<Option<Result<(Sender, Receiver), ListenerError>>> {
if self.accept_fut.is_none() {
self.accept_fut = Some(Self::accept(self.server.clone()).boxed());
self.accept_fut = Some(ReusableBoxFuture::new(Self::accept(self.server.clone())));
}

let accept_fut = self.accept_fut.as_mut().unwrap();
let res = ready!(accept_fut.as_mut().poll(cx));
let res = ready!(accept_fut.poll(cx));

self.accept_fut = None;
Poll::Ready(res)
Expand Down
10 changes: 5 additions & 5 deletions remoc/src/chmux/sender.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use bytes::Bytes;
use futures::{
future::{self, BoxFuture},
ready,
future, ready,
sink::Sink,
task::{Context, Poll},
Future, FutureExt,
Expand All @@ -17,6 +16,7 @@ use std::{
},
};
use tokio::sync::{mpsc, oneshot, Mutex};
use tokio_util::sync::ReusableBoxFuture;

use super::{
client::ConnectResponse,
Expand Down Expand Up @@ -563,7 +563,7 @@ impl<'a> ChunkSender<'a> {
/// A sink sending byte data over a channel.
pub struct SenderSink {
sender: Option<Arc<Mutex<Sender>>>,
send_fut: Option<BoxFuture<'static, Result<(), SendError>>>,
send_fut: Option<ReusableBoxFuture<'static, Result<(), SendError>>>,
}

impl SenderSink {
Expand All @@ -583,7 +583,7 @@ impl SenderSink {

match self.sender.clone() {
Some(sender) => {
self.send_fut = Some(Self::send(sender, data).boxed());
self.send_fut = Some(ReusableBoxFuture::new(Self::send(sender, data)));
Ok(())
}
None => panic!("start_send after sink has been closed"),
Expand All @@ -593,7 +593,7 @@ impl SenderSink {
fn poll_send(&mut self, cx: &mut Context) -> Poll<Result<(), SendError>> {
match &mut self.send_fut {
Some(fut) => {
let res = ready!(fut.as_mut().poll(cx));
let res = ready!(fut.poll(cx));
self.send_fut = None;
Poll::Ready(res)
}
Expand Down
7 changes: 4 additions & 3 deletions remoc/src/rtc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,14 +221,15 @@
//! ```
//!
use futures::{future::BoxFuture, Future, FutureExt};
use futures::{Future, FutureExt};
use std::{
error::Error,
fmt,
pin::Pin,
sync::Arc,
task::{Context, Poll},
};
use tokio_util::sync::ReusableBoxFuture;

use crate::{
chmux,
Expand Down Expand Up @@ -404,7 +405,7 @@ pub trait Client {
/// or the connection between them has been lost.
///
/// This can be obtained via [Client::closed].
pub struct Closed(BoxFuture<'static, ()>);
pub struct Closed(ReusableBoxFuture<'static, ()>);

impl fmt::Debug for Closed {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
Expand All @@ -415,7 +416,7 @@ impl fmt::Debug for Closed {
impl Closed {
#[doc(hidden)]
pub fn new(fut: impl Future<Output = ()> + Send + 'static) -> Self {
Self(fut.boxed())
Self(ReusableBoxFuture::new(fut))
}
}

Expand Down

0 comments on commit 07bdcbc

Please sign in to comment.