diff --git a/.vscode/settings.json b/.vscode/settings.json index c504f3ccd5..b72fe1e4f5 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -17,14 +17,15 @@ //"rust-analyzer.cargo.target": "thumbv8m.main-none-eabihf", "rust-analyzer.cargo.features": [ // Comment out these features when working on the examples. Most example crates do not have any cargo features. - "stm32f107rb", - "time-driver-any", - "unstable-pac", - "exti", - "rt", + // "stm32f107rb", + // "time-driver-any", + // "unstable-pac", + // "exti", + // "rt", + "rp2040" ], "rust-analyzer.linkedProjects": [ - "embassy-stm32/Cargo.toml", + "embassy-rp/Cargo.toml", // To work on the examples, comment the line above and all of the cargo.features lines, // then uncomment ONE line below to select the chip you want to work on. // This makes rust-analyzer work on the example crate and all its dependencies. diff --git a/embassy-rp/Cargo.toml b/embassy-rp/Cargo.toml index f6b0900f26..caf9663322 100644 --- a/embassy-rp/Cargo.toml +++ b/embassy-rp/Cargo.toml @@ -190,6 +190,7 @@ document-features = "0.2.10" sha2-const-stable = "0.1" rp-binary-info = { version = "0.1.0", optional = true } smart-leds = "0.4.0" +futures-core = { version = "0.3.31", default-features = false } [dev-dependencies] embassy-executor = { version = "0.9.0", path = "../embassy-executor", features = ["arch-std", "executor-thread"] } diff --git a/embassy-rp/src/dma/double_buffer.rs b/embassy-rp/src/dma/double_buffer.rs new file mode 100644 index 0000000000..99801b93f6 --- /dev/null +++ b/embassy-rp/src/dma/double_buffer.rs @@ -0,0 +1,294 @@ +//! double-buffered dma receive helpers +//! +//! overview: set up two dma channels that alternately write into two user-provided buffers. +//! the api exposes a stream-like interface: awaiting `next()` yields the next filled buffer. +//! dropping the yielded buffer guard re-queues that buffer for the next transfer. only rx is supported. + +use core::future::poll_fn; +use core::task::{Context, Poll}; + +use embassy_hal_internal::Peri; +use rp_pac::dma::regs::CtrlTrig; + +use crate::dma::{AnyChannel, Channel, CHANNEL_COUNT}; +use crate::pac; + +#[cfg(feature = "rp2040")] +use crate::pac::dma::vals::DataSize; +use crate::pac::dma::vals::TreqSel; + +/// which buffer/channel pair +#[derive(Copy, Clone, Eq, PartialEq, Debug)] +enum Which { + A, + B, +} + +/// guard returned to the user. on drop this re-queues the buffer for the next transfer. +pub struct RxBufView<'a, 'buf> { + state: &'a mut State, + buffers: &'a mut Buffers<'buf>, + which: Which, +} + +impl<'a, 'buf> core::ops::Deref for RxBufView<'a, 'buf> { + type Target = [u8]; + fn deref(&self) -> &Self::Target { + self.buffers.slice_for(self.which) + } +} + +impl<'a, 'buf> Drop for RxBufView<'a, 'buf> { + fn drop(&mut self) { + if self.state.borrowed == Some(self.which) { + self.state.borrowed = None; + } + } +} + +struct Info<'peri, C0: Channel, C1: Channel> { + ch_a: Peri<'peri, C0>, + ch_b: Peri<'peri, C1>, + from_ptr: *const u32, + dreq: TreqSel, +} + +struct Buffers<'buf> { + buf_a: &'buf mut [u8], + buf_b: &'buf mut [u8], +} + +impl<'buf> Buffers<'buf> { + fn slice_for<'a>(&'a self, which: Which) -> &'a [u8] { + match which { + Which::A => self.buf_a, + Which::B => self.buf_b, + } + } + + fn slice_for_mut<'a>(&'a mut self, which: Which) -> &'a mut [u8] { + match which { + Which::A => self.buf_a.as_mut(), + Which::B => self.buf_b.as_mut(), + } + } +} + +struct State { + /// tracks which buffer is currently being filled. none means that the + /// stream is closed and neither channel is running + filling: Option, + /// tracks which buffer is currently borrowed by the user + borrowed: Option, + /// tracks which buffer is ready to be yielded + ready: Option, + /// set to true if a transfer happens while a buffer is in use + overrun: bool, +} + +/// double-buffered dma rx stream +pub struct RxStream<'peri, 'buf, C0: Channel, C1: Channel> { + info: Info<'peri, C0, C1>, + buffers: Buffers<'buf>, + state: State, +} + +impl<'peri, 'buf, C0: Channel, C1: Channel> RxStream<'peri, 'buf, C0, C1> { + /// create a new rx stream for a peripheral register `from_ptr` and dreq. + pub fn new<'s>( + ch_a: Peri<'peri, C0>, + ch_b: Peri<'peri, C1>, + from_ptr: *const u32, + dreq: TreqSel, + buf_a: &'buf mut [u8], + buf_b: &'buf mut [u8], + ) -> Self { + let mut s = Self { + info: Info { + ch_a, + ch_b, + from_ptr, + dreq, + }, + state: State { + filling: Some(Which::A), + borrowed: None, + ready: None, + overrun: false, + }, + buffers: Buffers { buf_a, buf_b }, + }; + + // program both channels, chain to each other. start A only to kick off ping-pong. + unsafe { + Self::program_channel(&mut s.info, &mut s.buffers, Which::B, false); + Self::program_channel(&mut s.info, &mut s.buffers, Which::A, true); + } + + s + } + + /// async convenience that yields the next filled buffer. + pub async fn next<'s>(&'s mut self) -> Option> { + let info = &mut self.info; + let state = &mut self.state; + let buffers = &mut self.buffers; + + let which = poll_fn(|cx| Self::poll_next(cx, info, state, buffers)).await; + + match which { + Some(which) => Some(RxBufView { state, buffers, which }), + None => None, + } + } + + /// poll for next completed buffer. + fn poll_next<'cx, 'a>( + cx: &mut Context<'cx>, + info: &'a mut Info<'peri, C0, C1>, + state: &'a mut State, + buffers: &'a mut Buffers<'buf>, + ) -> Poll> { + // register wakers on both channels. any completion will wake us. + // safety: using the same waker for both is fine; irq wakes per-channel. + let a_idx = info.ch_a.number() as usize; + let b_idx = info.ch_b.number() as usize; + debug_assert!(a_idx < CHANNEL_COUNT && b_idx < CHANNEL_COUNT); + super::CHANNEL_WAKERS[a_idx].register(cx.waker()); + super::CHANNEL_WAKERS[b_idx].register(cx.waker()); + + match state.filling { + Some(Which::A) => { + if !info.ch_a.regs().ctrl_trig().read().busy() { + // reconfigure channel A's write address + unsafe { Self::program_channel(info, buffers, Which::A, false) }; + + // buffer A is ready + state.ready = Some(Which::A); + state.filling = Some(Which::B); + + return Poll::Ready(Some(Which::A)); + } + } + Some(Which::B) => { + if !info.ch_b.regs().ctrl_trig().read().busy() { + // reconfigure channel B's write address + unsafe { Self::program_channel(info, buffers, Which::B, false) }; + + // buffer B is ready + state.ready = Some(Which::B); + state.filling = Some(Which::A); + + return Poll::Ready(Some(Which::B)); + } + } + None => return Poll::Ready(None), + } + + Poll::Pending + } + + unsafe fn program_channel(info: &mut Info<'peri, C0, C1>, buffers: &mut Buffers<'buf>, which: Which, start: bool) { + let (ch_this, wptr, len, ch_other_num) = match which { + Which::A => ( + Peri::::from(info.ch_a.reborrow().into()), + buffers.buf_a.as_mut_ptr(), + buffers.buf_a.len(), + info.ch_b.number(), + ), + Which::B => ( + Peri::::from(info.ch_b.reborrow().into()), + buffers.buf_b.as_mut_ptr(), + buffers.buf_b.len(), + info.ch_a.number(), + ), + }; + + let p = ch_this.regs(); + p.read_addr().write_value(info.from_ptr as u32); + p.write_addr().write_value(wptr as u32); + + #[cfg(feature = "rp2040")] + p.trans_count().write(|w| { + *w = len as u32; + }); + + #[cfg(feature = "_rp235x")] + p.trans_count().write(|w| { + w.set_mode(0.into()); + w.set_count(len as u32); + }); + + // ensure previous stores are visible before enabling + core::sync::atomic::compiler_fence(core::sync::atomic::Ordering::SeqCst); + + if start { + p.ctrl_trig().write(|w| { + w.set_treq_sel(info.dreq); + #[cfg(feature = "rp2040")] + w.set_data_size(DataSize::SIZE_BYTE); + // rp235x encodes size in the fifo mapping; byte access by default + w.set_incr_read(false); + w.set_incr_write(true); + // chain to the other channel for continuous ping-pong + w.set_chain_to(ch_other_num); + w.set_en(true); + }); + } else { + // don't start right away + p.al1_ctrl().write(|w| { + let mut cw = CtrlTrig(*w); + cw.set_treq_sel(info.dreq); + #[cfg(feature = "rp2040")] + cw.set_data_size(DataSize::SIZE_BYTE); + // rp235x encodes size in the fifo mapping; byte access by default + cw.set_incr_read(false); + cw.set_incr_write(true); + // chain to the other channel for continuous ping-pong + cw.set_chain_to(ch_other_num); + cw.set_en(true); + *w = cw.0; + }); + } + + core::sync::atomic::compiler_fence(core::sync::atomic::Ordering::SeqCst); + } +} + +impl<'d, 'buf, C0: Channel, C1: Channel> Drop for RxStream<'d, 'buf, C0, C1> { + fn drop(&mut self) { + self.state.filling = None; + // abort both channels to stop transfers + + pac::DMA + .chan_abort() + .modify(|m| m.set_chan_abort((1 << self.info.ch_a.number()) | (1 << self.info.ch_b.number()))); + } +} + +// impl<'peri, 'buf, C0: Channel, C1: Channel> futures_core::stream::Stream for RxStream<'peri, 'buf, C0, C1> { +// type Item = RxBufView<'_, 'peri, 'buf, C0, C1>; + +// fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { +// // safety: we never move fields that are not Unpin; we only delegate to inner method +// let this = unsafe { self.get_unchecked_mut() }; + +// core::task::ready!(RxStream::<'peri, 'buf, C0, C1>::poll_next( +// cx, +// &mut this.info, +// &mut this.state, +// &mut this.buffers +// )); + +// Poll::Ready(this.get_current_buffer()) +// } +// } + +impl<'peri, 'buf, C0: Channel, C1: Channel> RxStream<'peri, 'buf, C0, C1> { + /// returns and clears the overrun flag. true means a buffer was overwritten while in use. + pub fn take_overrun(&mut self) -> bool { + let o = self.state.overrun; + self.state.overrun = false; + o + } +} diff --git a/embassy-rp/src/dma.rs b/embassy-rp/src/dma/mod.rs similarity index 99% rename from embassy-rp/src/dma.rs rename to embassy-rp/src/dma/mod.rs index d31d1e159d..bbe1fb19a5 100644 --- a/embassy-rp/src/dma.rs +++ b/embassy-rp/src/dma/mod.rs @@ -12,6 +12,9 @@ use crate::interrupt::InterruptExt; use crate::pac::dma::vals; use crate::{interrupt, pac, peripherals}; +pub mod double_buffer; +pub use double_buffer::{RxStream, RxBufView}; + #[cfg(feature = "rt")] #[interrupt] fn DMA_IRQ_0() { diff --git a/embassy-rp/src/uart/mod.rs b/embassy-rp/src/uart/mod.rs index 6f4e2ee079..c3145e18cc 100644 --- a/embassy-rp/src/uart/mod.rs +++ b/embassy-rp/src/uart/mod.rs @@ -19,7 +19,9 @@ use crate::pac::io::vals::{Inover, Outover}; use crate::{interrupt, pac, peripherals, RegExt}; mod buffered; +mod streaming; pub use buffered::{BufferedInterruptHandler, BufferedUart, BufferedUartRx, BufferedUartTx}; +pub use streaming::StreamingUartRx; /// Word length. #[derive(Clone, Copy, PartialEq, Eq, Debug)] @@ -1343,6 +1345,7 @@ pub struct Async; impl_mode!(Blocking); impl_mode!(Async); + /// UART instance. #[allow(private_bounds)] pub trait Instance: SealedInstance + PeripheralType { diff --git a/embassy-rp/src/uart/streaming.rs b/embassy-rp/src/uart/streaming.rs new file mode 100644 index 0000000000..0f8ba675d3 --- /dev/null +++ b/embassy-rp/src/uart/streaming.rs @@ -0,0 +1,125 @@ +//! streaming uart rx using double-buffered dma +//! +//! overview: lightweight helper to configure uart for rx dma and produce a +//! double-buffered dma `RxStream` fed directly from the uart data register. + +use embassy_hal_internal::Peri; + +use super::*; +use crate::dma::{self, Channel}; + +/// streaming uart rx handle +pub struct StreamingUartRx<'buf> { + pub(super) info: &'static Info, + pub(super) dma_state: &'static DmaState, + pub(super) inner: dma::RxStream<'static, 'buf, AnyChannel, AnyChannel>, +} + +impl<'buf> StreamingUartRx<'buf> { + /// Create a new streaming UART RX (no flow control). + pub fn new<'d, T: Instance, C0: Channel, C1: Channel>( + _uart: Peri<'d, T>, + rx: Peri<'d, impl RxPin>, + _irq: impl Binding>, // error irq wiring + ch_a: Peri<'static, C0>, + ch_b: Peri<'static, C1>, + buf_a: &'buf mut [u8], + buf_b: &'buf mut [u8], + uart_config: Config, + ) -> Self { + // configure pins/peripheral and enable irq handling in the parent module + super::Uart::::init(T::info(), None, Some(rx.into()), None, None, uart_config); + let info = T::info(); + + // disable all error interrupts initially + info.regs.uartimsc().write_set(|w| { + w.set_oeim(true); + w.set_beim(true); + w.set_peim(true); + w.set_feim(true); + }); + + info.interrupt.unpend(); + unsafe { info.interrupt.enable() }; + + // enable rx dma and abort transfers on uart error conditions + info.regs.uartdmacr().write_set(|reg| { + reg.set_rxdmae(true); + reg.set_dmaonerr(true); + }); + + Self { + inner: dma::RxStream::new( + ch_a.into(), + ch_b.into(), + info.regs.uartdr().as_ptr() as *const _, + info.rx_dreq.into(), + buf_a, + buf_b, + ), + dma_state: T::dma_state(), + info, + } + } + + /// Build a double-buffered DMA stream using the provided DMA channels and buffers. + pub async fn next(&mut self) -> Result>, Error> { + let transfer_result = select( + self.inner.next(), + poll_fn(|cx| { + self.dma_state.rx_err_waker.register(cx.waker()); + match self.dma_state.rx_errs.swap(0, Ordering::Relaxed) { + 0 => Poll::Pending, + e => Poll::Ready(Uartris(e as u32)), + } + }), + ) + .await; + + let (errors, buf_opt) = match transfer_result { + Either::First(buf_opt) => { + // We're here because the DMA finished, BUT if an error occurred on the LAST + // byte, then we may still need to grab the error state! + ( + Uartris(self.dma_state.rx_errs.swap(0, Ordering::Relaxed) as u32), + buf_opt, + ) + } + Either::Second(e) => { + // We're here because we errored, which means this is the error that + // was problematic. + (e, None) + } + }; + + // If we got no error, just return at this point + if errors.0 == 0 { + return Ok(buf_opt); + } + + // If we DID get an error, we need to figure out which one it was. + if errors.oeris() { + return Err(Error::Overrun); + } else if errors.beris() { + return Err(Error::Break); + } else if errors.peris() { + return Err(Error::Parity); + } else if errors.feris() { + return Err(Error::Framing); + } + + unreachable!("unrecognized rx error"); + } +} + +impl<'d> Drop for StreamingUartRx<'d> { + fn drop(&mut self) { + self.info.interrupt.disable(); + + // clear dma flags. irq handlers use these to disambiguate among themselves. + self.info.regs.uartdmacr().write_clear(|reg| { + reg.set_rxdmae(true); + reg.set_dmaonerr(true); + }); + } +}