Skip to content

Commit

Permalink
sync: add `{Receiver,UnboundedReceiver}::{sender_strong_count,sender_…
Browse files Browse the repository at this point in the history
…weak_count}` (#6661)
  • Loading branch information
Rustin170506 authored Jul 2, 2024
1 parent dff4ecd commit fe7285d
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 10 deletions.
10 changes: 10 additions & 0 deletions tokio/src/sync/mpsc/bounded.rs
Original file line number Diff line number Diff line change
Expand Up @@ -711,6 +711,16 @@ impl<T> Receiver<T> {
) -> Poll<usize> {
self.chan.recv_many(cx, buffer, limit)
}

/// Returns the number of [`Sender`] handles.
pub fn sender_strong_count(&self) -> usize {
self.chan.sender_strong_count()
}

/// Returns the number of [`WeakSender`] handles.
pub fn sender_weak_count(&self) -> usize {
self.chan.sender_weak_count()
}
}

impl<T> fmt::Debug for Receiver<T> {
Expand Down
8 changes: 8 additions & 0 deletions tokio/src/sync/mpsc/chan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,14 @@ impl<T, S: Semaphore> Rx<T, S> {
pub(super) fn semaphore(&self) -> &S {
&self.inner.semaphore
}

pub(super) fn sender_strong_count(&self) -> usize {
self.inner.tx_count.load(Acquire)
}

pub(super) fn sender_weak_count(&self) -> usize {
self.inner.tx_weak_count.load(Relaxed)
}
}

impl<T, S: Semaphore> Drop for Rx<T, S> {
Expand Down
12 changes: 11 additions & 1 deletion tokio/src/sync/mpsc/unbounded.rs
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ impl<T> UnboundedReceiver<T> {
/// assert!(!rx.is_closed());
///
/// rx.close();
///
///
/// assert!(rx.is_closed());
/// }
/// ```
Expand Down Expand Up @@ -498,6 +498,16 @@ impl<T> UnboundedReceiver<T> {
) -> Poll<usize> {
self.chan.recv_many(cx, buffer, limit)
}

/// Returns the number of [`UnboundedSender`] handles.
pub fn sender_strong_count(&self) -> usize {
self.chan.sender_strong_count()
}

/// Returns the number of [`WeakUnboundedSender`] handles.
pub fn sender_weak_count(&self) -> usize {
self.chan.sender_weak_count()
}
}

impl<T> UnboundedSender<T> {
Expand Down
33 changes: 24 additions & 9 deletions tokio/tests/sync_mpsc_weak.rs
Original file line number Diff line number Diff line change
Expand Up @@ -532,12 +532,13 @@ async fn test_rx_unbounded_is_closed_when_dropping_all_senders_except_weak_sende

#[tokio::test]
async fn sender_strong_count_when_cloned() {
let (tx, _rx) = mpsc::channel::<()>(1);
let (tx, rx) = mpsc::channel::<()>(1);

let tx2 = tx.clone();

assert_eq!(tx.strong_count(), 2);
assert_eq!(tx2.strong_count(), 2);
assert_eq!(rx.sender_strong_count(), 2);
}

#[tokio::test]
Expand All @@ -552,29 +553,31 @@ async fn sender_weak_count_when_downgraded() {

#[tokio::test]
async fn sender_strong_count_when_dropped() {
let (tx, _rx) = mpsc::channel::<()>(1);
let (tx, rx) = mpsc::channel::<()>(1);

let tx2 = tx.clone();

drop(tx2);

assert_eq!(tx.strong_count(), 1);
assert_eq!(rx.sender_strong_count(), 1);
}

#[tokio::test]
async fn sender_weak_count_when_dropped() {
let (tx, _rx) = mpsc::channel::<()>(1);
let (tx, rx) = mpsc::channel::<()>(1);

let weak = tx.downgrade();

drop(weak);

assert_eq!(tx.weak_count(), 0);
assert_eq!(rx.sender_weak_count(), 0);
}

#[tokio::test]
async fn sender_strong_and_weak_conut() {
let (tx, _rx) = mpsc::channel::<()>(1);
let (tx, rx) = mpsc::channel::<()>(1);

let tx2 = tx.clone();

Expand All @@ -585,67 +588,75 @@ async fn sender_strong_and_weak_conut() {
assert_eq!(tx2.strong_count(), 2);
assert_eq!(weak.strong_count(), 2);
assert_eq!(weak2.strong_count(), 2);
assert_eq!(rx.sender_strong_count(), 2);

assert_eq!(tx.weak_count(), 2);
assert_eq!(tx2.weak_count(), 2);
assert_eq!(weak.weak_count(), 2);
assert_eq!(weak2.weak_count(), 2);
assert_eq!(rx.sender_weak_count(), 2);

drop(tx2);
drop(weak2);

assert_eq!(tx.strong_count(), 1);
assert_eq!(weak.strong_count(), 1);
assert_eq!(rx.sender_strong_count(), 1);

assert_eq!(tx.weak_count(), 1);
assert_eq!(weak.weak_count(), 1);
assert_eq!(rx.sender_weak_count(), 1);
}

#[tokio::test]
async fn unbounded_sender_strong_count_when_cloned() {
let (tx, _rx) = mpsc::unbounded_channel::<()>();
let (tx, rx) = mpsc::unbounded_channel::<()>();

let tx2 = tx.clone();

assert_eq!(tx.strong_count(), 2);
assert_eq!(tx2.strong_count(), 2);
assert_eq!(rx.sender_strong_count(), 2);
}

#[tokio::test]
async fn unbounded_sender_weak_count_when_downgraded() {
let (tx, _rx) = mpsc::unbounded_channel::<()>();
let (tx, rx) = mpsc::unbounded_channel::<()>();

let weak = tx.downgrade();

assert_eq!(tx.weak_count(), 1);
assert_eq!(weak.weak_count(), 1);
assert_eq!(rx.sender_weak_count(), 1);
}

#[tokio::test]
async fn unbounded_sender_strong_count_when_dropped() {
let (tx, _rx) = mpsc::unbounded_channel::<()>();
let (tx, rx) = mpsc::unbounded_channel::<()>();

let tx2 = tx.clone();

drop(tx2);

assert_eq!(tx.strong_count(), 1);
assert_eq!(rx.sender_strong_count(), 1);
}

#[tokio::test]
async fn unbounded_sender_weak_count_when_dropped() {
let (tx, _rx) = mpsc::unbounded_channel::<()>();
let (tx, rx) = mpsc::unbounded_channel::<()>();

let weak = tx.downgrade();

drop(weak);

assert_eq!(tx.weak_count(), 0);
assert_eq!(rx.sender_weak_count(), 0);
}

#[tokio::test]
async fn unbounded_sender_strong_and_weak_conut() {
let (tx, _rx) = mpsc::unbounded_channel::<()>();
let (tx, rx) = mpsc::unbounded_channel::<()>();

let tx2 = tx.clone();

Expand All @@ -656,18 +667,22 @@ async fn unbounded_sender_strong_and_weak_conut() {
assert_eq!(tx2.strong_count(), 2);
assert_eq!(weak.strong_count(), 2);
assert_eq!(weak2.strong_count(), 2);
assert_eq!(rx.sender_strong_count(), 2);

assert_eq!(tx.weak_count(), 2);
assert_eq!(tx2.weak_count(), 2);
assert_eq!(weak.weak_count(), 2);
assert_eq!(weak2.weak_count(), 2);
assert_eq!(rx.sender_weak_count(), 2);

drop(tx2);
drop(weak2);

assert_eq!(tx.strong_count(), 1);
assert_eq!(weak.strong_count(), 1);
assert_eq!(rx.sender_strong_count(), 1);

assert_eq!(tx.weak_count(), 1);
assert_eq!(weak.weak_count(), 1);
assert_eq!(rx.sender_weak_count(), 1);
}

0 comments on commit fe7285d

Please sign in to comment.