Skip to content
Open
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 20 additions & 4 deletions tokio/src/sync/mpsc/bounded.rs
Original file line number Diff line number Diff line change
Expand Up @@ -546,10 +546,18 @@ impl<T> Receiver<T> {
self.chan.len()
}

/// Returns the current capacity of the channel.
/// Returns the current number of reservations which can immediately be
/// returned from the channel.
///
/// The capacity goes down when the sender sends a value by calling [`Sender::send`] or by reserving
/// capacity with [`Sender::reserve`]. The capacity goes up when values are received.
/// capacity with [`Sender::reserve`].
///
/// The capacity goes up when values are received, unless there are
/// existing, non-cancelled calls to [`Sender::send`] or [`Sender::reserve`]
/// which have returned [`Poll::Pending`]. While those calls exist, reading
/// values from the [`Receiver`] gives access to a channel slot directly to
/// those callers, in FIFO order, without modifying the capacity.
///
/// This is distinct from [`max_capacity`], which always returns buffer capacity initially
/// specified when calling [`channel`].
///
Expand Down Expand Up @@ -1502,10 +1510,18 @@ impl<T> Sender<T> {
self.chan.same_channel(&other.chan)
}

/// Returns the current capacity of the channel.
/// Returns the current number of reservations which can immediately be
/// returned from the channel.
///
/// The capacity goes down when sending a value by calling [`send`] or by reserving capacity
/// with [`reserve`]. The capacity goes up when values are received by the [`Receiver`].
/// with [`reserve`].
///
/// The capacity goes up when values are received, unless there are
/// existing, non-cancelled calls to [`Sender::send`] or [`Sender::reserve`]
/// which have returned [`Poll::Pending`]. While those calls exist, reading
/// values from the [`Receiver`] gives access to a channel slot directly to
/// those callers, in FIFO order, without modifying the capacity.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this wording is too pedantic, and adding this would be confusing. If you receive a value and there is a call to reserve, then from the caller's perspective it just went up and then immediately went down again. The fact that we are able to avoid the up-then-down is an implementation detail.

The primary purpose of this section is to explain how capacity() and len() are different.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that the wording is a bit pedantic. To back up a second, I'm trying to cope with the following case:

let (tx, rx) = tokio::mpsc::channel(1);

...

loop {
  tokio::select! {
    // Suppose this is queued first, but is only returning `Poll::pending`
    _ = rx.send(...) => { return }
    // Suppose instead, this select! branch triggers...
    _ = yield_interval.tick() => {
      // ... Then this call to `send` will be stuck forever, queued behind the first rx.send,
      // which is no longer polled.
      let result = rx.send(...).await;
      if result = ... { return }
    }
  }
}

We've hit this issue in our codebase (oxidecomputer/omicron#9272).

This deadlock makes sense to me with the knowledge that:

  1. tokio::mpsc::Sender uses a semaphore queue for FIFO access to capacity, and callers are enqueued for access to this semaphore when they call send or reserve and get back Poll::Pending.
  2. When a tokio::mpsc::Receiver calls recv, if there are any pending waiters, it grants capacity directly to pending tokio::mpsc::Sender::send futures, if any exist, regardless of whether or not they are being polled. Granting capacity is distinct from granting access to a particular slot.

There is a comment on send and reserve about this:

This channel uses a queue to ensure that calls to send and reserve complete in the order they were requested.

But I think there's important nuance in my aforementioned case: when we say "calls to send and reserve complete in the order they were requested", we're not saying "they will return Poll::Ok" in a particular order, nor are we saying "we will enforce that they send messages in the mpsc with an ordering of when they were first polled". Rather, this statement actually means:

  • Capacity from the channel will be given to callers of send and receive in this FIFO order (internally: permits from the semaphore queue) based on their calling order
  • Once permits are granted, order of sending messages is based off of "when the caller next polls". It's possible for a future created by send to get a permit, but yet not be re-polled for a while - it would not necessarily block the order of the mpsc by taking a specific slot, but it would be holding on to a portion of the total capacity.

I created a playground link for this case here: https://play.rust-lang.org/?version=stable&mode=debug&edition=2024&gist=c4f2c5b7b0616eee84eee4ba63193fdc

If you receive a value and there is a call to reserve, then from the caller's perspective it just went up and then immediately went down again. The fact that we are able to avoid the up-then-down is an implementation detail.

I'm struggling with treating this as an implementation detail - "how capacity works", along with the doc comment about "calls to send and reserve complete in the order they were requested", leave a few user-visible oddities:

  1. As shown in the playground link, the a call to recv can cause the capacity to go from zero -> zero, even with no concurrent operations, just with the existence of a prior incomplete send/reserve future in existence somewhere. This feels at odds with the comment that "The capacity goes up when values are received by the Receiver".
  2. Calling send/reserve and getting back a Poll::Pending future has a lot of implications for ordering, which are also user-visible: as long as the future remains non-cancelled, the guarantee provided here is that capacity is granted to the futures in FIFO order, not that the calls to send/reserve will actually complete in a particular order.

Both of these aspects combined led me to "wanting to clarify what capacity means, and how it is provided to callers, especially in the context of a saturated channel".

Copy link
Member

@hawkw hawkw Oct 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. As shown in the playground link, the a call to recv can cause the capacity to go from zero -> zero, even with no concurrent operations, just with the existence of a prior incomplete send/reserve future in existence somewhere. This feels at odds with the comment that "The capacity goes up when values are received by the Receiver".

At the risk of being even more pedantic (sorry), I don't agree that there are "no concurrent operations" in this case; I think "the existence of a prior incomplete send/reserve future" is reasonably considered a concurrent operation, even if it is not executing in parallel. When multiple futures are multiplexed into a single task using concurrency primitives like select!, FuturesUnordered, join!, et cetera, we generally think of this as a form of concurrency even if it is not parallelism.

I do, however, agree with @smklein that we should make sure it's clear to the user of the MPSC that capacity is always assigned to outstanding concurrent send operations in the order that they started to wait on send/reserve calls, even if those send/reserve calls do not complete in that order. And, we should make sure that it is clear to the caller that in situations where there are outstanding send/reserve calls, the capacity may not be visible in calls to capacity() if it has already been assigned to a waiting sender.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On the concurrency note - valid point, especially with the example above using a timer. That's definitely concurrent, but not parallel. But I thiiiiiiink the playground example (https://play.rust-lang.org/?version=stable&mode=debug&edition=2024&gist=c4f2c5b7b0616eee84eee4ba63193fdc) is not concurrent, even though there is a biased select! operation in-play. I'm just using it as a shorthand for:

  • Poll the pinned send
  • Then go do the other branch

Which I expect it'll do in a deterministic ordering, because of the biased keyword.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm happy to see the things you mentioned be documented. But I don't think the fn capacity() docs are the right place. Perhaps you could add a new section to the struct docs instead?

///
/// This is distinct from [`max_capacity`], which always returns buffer capacity initially
/// specified when calling [`channel`]
///
Expand Down