Skip to content

Commit

Permalink
Reference count senders and receivers so that we don't close down early.
Browse files Browse the repository at this point in the history
  • Loading branch information
Corey Schuhen committed Mar 9, 2025
1 parent 7c49f48 commit 424d207
Show file tree
Hide file tree
Showing 4 changed files with 199 additions and 69 deletions.
64 changes: 48 additions & 16 deletions embassy-stm32/src/can/bxcan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use self::registers::{Registers, RxFifo};
pub use super::common::{BufferedCanReceiver, BufferedCanSender};
use super::frame::{Envelope, Frame};
use super::util;
use crate::can::enums::{BusError, TryReadError};
use crate::can::enums::{BusError, InternalOperation, TryReadError};
use crate::gpio::{AfType, OutputType, Pull, Speed};
use crate::interrupt::typelevel::Interrupt;
use crate::rcc::{self, RccPeripheral};
Expand Down Expand Up @@ -685,22 +685,18 @@ impl<'d, const TX_BUF_SIZE: usize> BufferedCanTx<'d, TX_BUF_SIZE> {

/// Returns a sender that can be used for sending CAN frames.
pub fn writer(&self) -> BufferedCanSender {
(self.info.internal_operation)(InternalOperation::NotifySenderCreated);
BufferedCanSender {
tx_buf: self.tx_buf.sender().into(),
waker: self.info.tx_waker,
internal_operation: self.info.internal_operation,
}
}
}

impl<'d, const TX_BUF_SIZE: usize> Drop for BufferedCanTx<'d, TX_BUF_SIZE> {
fn drop(&mut self) {
critical_section::with(|_| {
let state = self.state as *const State;
unsafe {
let mut_state = state as *mut State;
(*mut_state).tx_mode = TxMode::NonBuffered(embassy_sync::waitqueue::AtomicWaker::new());
}
});
(self.info.internal_operation)(InternalOperation::NotifySenderDestroyed);
}
}

Expand Down Expand Up @@ -825,7 +821,11 @@ impl<'d, const RX_BUF_SIZE: usize> BufferedCanRx<'d, RX_BUF_SIZE> {

/// Returns a receiver that can be used for receiving CAN frames. Note, each CAN frame will only be received by one receiver.
pub fn reader(&self) -> BufferedCanReceiver {
self.rx_buf.receiver().into()
(self.info.internal_operation)(InternalOperation::NotifyReceiverCreated);
BufferedCanReceiver {
rx_buf: self.rx_buf.receiver().into(),
internal_operation: self.info.internal_operation,
}
}

/// Accesses the filter banks owned by this CAN peripheral.
Expand All @@ -839,13 +839,7 @@ impl<'d, const RX_BUF_SIZE: usize> BufferedCanRx<'d, RX_BUF_SIZE> {

impl<'d, const RX_BUF_SIZE: usize> Drop for BufferedCanRx<'d, RX_BUF_SIZE> {
fn drop(&mut self) {
critical_section::with(|_| {
let state = self.state as *const State;
unsafe {
let mut_state = state as *mut State;
(*mut_state).rx_mode = RxMode::NonBuffered(embassy_sync::waitqueue::AtomicWaker::new());
}
});
(self.info.internal_operation)(InternalOperation::NotifyReceiverDestroyed);
}
}

Expand Down Expand Up @@ -1048,6 +1042,8 @@ pub(crate) struct State {
pub(crate) rx_mode: RxMode,
pub(crate) tx_mode: TxMode,
pub err_waker: AtomicWaker,
receiver_instance_count: usize,
sender_instance_count: usize,
}

impl State {
Expand All @@ -1056,6 +1052,8 @@ impl State {
rx_mode: RxMode::NonBuffered(AtomicWaker::new()),
tx_mode: TxMode::NonBuffered(AtomicWaker::new()),
err_waker: AtomicWaker::new(),
receiver_instance_count: 1,
sender_instance_count: 1,
}
}
}
Expand All @@ -1067,6 +1065,7 @@ pub(crate) struct Info {
rx1_interrupt: crate::interrupt::Interrupt,
sce_interrupt: crate::interrupt::Interrupt,
tx_waker: fn(),
internal_operation: fn(InternalOperation),

/// The total number of filter banks available to the instance.
///
Expand All @@ -1079,6 +1078,7 @@ trait SealedInstance {
fn regs() -> crate::pac::can::Can;
fn state() -> &'static State;
unsafe fn mut_state() -> &'static mut State;
fn internal_operation(val: InternalOperation);
}

/// CAN instance trait.
Expand Down Expand Up @@ -1136,6 +1136,7 @@ foreach_peripheral!(
rx1_interrupt: crate::_generated::peripheral_interrupts::$inst::RX1::IRQ,
sce_interrupt: crate::_generated::peripheral_interrupts::$inst::SCE::IRQ,
tx_waker: crate::_generated::peripheral_interrupts::$inst::TX::pend,
internal_operation: peripherals::$inst::internal_operation,
num_filter_banks: peripherals::$inst::NUM_FILTER_BANKS,
};
&INFO
Expand All @@ -1151,6 +1152,37 @@ foreach_peripheral!(
fn state() -> &'static State {
unsafe { peripherals::$inst::mut_state() }
}


fn internal_operation(val: InternalOperation) {
critical_section::with(|_| {
//let state = self.state as *const State;
unsafe {
//let mut_state = state as *mut State;
let mut_state = peripherals::$inst::mut_state();
match val {
InternalOperation::NotifySenderCreated => {
mut_state.sender_instance_count += 1;
}
InternalOperation::NotifySenderDestroyed => {
mut_state.sender_instance_count -= 1;
if ( 0 == mut_state.sender_instance_count) {
(*mut_state).tx_mode = TxMode::NonBuffered(embassy_sync::waitqueue::AtomicWaker::new());
}
}
InternalOperation::NotifyReceiverCreated => {
mut_state.receiver_instance_count += 1;
}
InternalOperation::NotifyReceiverDestroyed => {
mut_state.receiver_instance_count -= 1;
if ( 0 == mut_state.receiver_instance_count) {
(*mut_state).rx_mode = RxMode::NonBuffered(embassy_sync::waitqueue::AtomicWaker::new());
}
}
}
}
});
}
}

impl Instance for peripherals::$inst {
Expand Down
86 changes: 79 additions & 7 deletions embassy-stm32/src/can/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,22 +22,22 @@ pub(crate) struct FdBufferedTxInner {
}

/// Sender that can be used for sending CAN frames.
#[derive(Copy, Clone)]
pub struct BufferedCanSender {
pub(crate) tx_buf: embassy_sync::channel::DynamicSender<'static, Frame>,
pub struct BufferedSender<'ch, FRAME> {
pub(crate) tx_buf: embassy_sync::channel::DynamicSender<'ch, FRAME>,
pub(crate) waker: fn(),
pub(crate) internal_operation: fn(InternalOperation),
}

impl BufferedCanSender {
impl<'ch, FRAME> BufferedSender<'ch, FRAME> {
/// Async write frame to TX buffer.
pub fn try_write(&mut self, frame: Frame) -> Result<(), embassy_sync::channel::TrySendError<Frame>> {
pub fn try_write(&mut self, frame: FRAME) -> Result<(), embassy_sync::channel::TrySendError<FRAME>> {
self.tx_buf.try_send(frame)?;
(self.waker)();
Ok(())
}

/// Async write frame to TX buffer.
pub async fn write(&mut self, frame: Frame) {
pub async fn write(&mut self, frame: FRAME) {
self.tx_buf.send(frame).await;
(self.waker)();
}
Expand All @@ -48,5 +48,77 @@ impl BufferedCanSender {
}
}

impl<'ch, FRAME> Clone for BufferedSender<'ch, FRAME> {
fn clone(&self) -> Self {
(self.internal_operation)(InternalOperation::NotifySenderCreated);
Self {
tx_buf: self.tx_buf,
waker: self.waker,
internal_operation: self.internal_operation,
}
}
}

impl<'ch, FRAME> Drop for BufferedSender<'ch, FRAME> {
fn drop(&mut self) {
(self.internal_operation)(InternalOperation::NotifySenderDestroyed);
}
}

/// Sender that can be used for sending Classic CAN frames.
pub type BufferedCanSender = BufferedSender<'static, Frame>;

/// Receiver that can be used for receiving CAN frames. Note, each CAN frame will only be received by one receiver.
pub type BufferedCanReceiver = embassy_sync::channel::DynamicReceiver<'static, Result<Envelope, BusError>>;
pub struct BufferedReceiver<'ch, ENVELOPE> {
pub(crate) rx_buf: embassy_sync::channel::DynamicReceiver<'ch, Result<ENVELOPE, BusError>>,
pub(crate) internal_operation: fn(InternalOperation),
}

impl<'ch, ENVELOPE> BufferedReceiver<'ch, ENVELOPE> {
/// Receive the next frame.
///
/// See [`Channel::receive()`].
pub fn receive(&self) -> embassy_sync::channel::DynamicReceiveFuture<'_, Result<ENVELOPE, BusError>> {
self.rx_buf.receive()
}

/// Attempt to immediately receive the next frame.
///
/// See [`Channel::try_receive()`]
pub fn try_receive(&self) -> Result<Result<ENVELOPE, BusError>, embassy_sync::channel::TryReceiveError> {
self.rx_buf.try_receive()
}

/// Allows a poll_fn to poll until the channel is ready to receive
///
/// See [`Channel::poll_ready_to_receive()`]
pub fn poll_ready_to_receive(&self, cx: &mut core::task::Context<'_>) -> core::task::Poll<()> {
self.rx_buf.poll_ready_to_receive(cx)
}

/// Poll the channel for the next frame
///
/// See [`Channel::poll_receive()`]
pub fn poll_receive(&self, cx: &mut core::task::Context<'_>) -> core::task::Poll<Result<ENVELOPE, BusError>> {
self.rx_buf.poll_receive(cx)
}
}

impl<'ch, ENVELOPE> Clone for BufferedReceiver<'ch, ENVELOPE> {
fn clone(&self) -> Self {
(self.internal_operation)(InternalOperation::NotifyReceiverCreated);
Self {
rx_buf: self.rx_buf,
internal_operation: self.internal_operation,
}
}
}

impl<'ch, ENVELOPE> Drop for BufferedReceiver<'ch, ENVELOPE> {
fn drop(&mut self) {
(self.internal_operation)(InternalOperation::NotifyReceiverDestroyed);
}
}

/// A BufferedCanReceiver for Classic CAN frames.
pub type BufferedCanReceiver = BufferedReceiver<'static, Envelope>;
14 changes: 14 additions & 0 deletions embassy-stm32/src/can/enums.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,3 +68,17 @@ pub enum TryReadError {
/// Receive buffer is empty
Empty,
}

/// Internal Operation
#[derive(Debug)]
#[cfg_attr(feature = "defmt", derive(defmt::Format))]
pub enum InternalOperation {
/// Notify receiver created
NotifyReceiverCreated,
/// Notify receiver destroyed
NotifyReceiverDestroyed,
/// Notify sender created
NotifySenderCreated,
/// Notify sender destroyed
NotifySenderDestroyed,
}
Loading

0 comments on commit 424d207

Please sign in to comment.