From 6d8f353624f7acb0d9b39b3adf2d18afbe7b759d Mon Sep 17 00:00:00 2001 From: Ulf Lilleengen Date: Mon, 6 May 2024 21:27:31 +0200 Subject: [PATCH] improve credit handling * use per-link flow control * introduce grant type to more safely handle error situations --- host/src/adapter.rs | 73 ++++++----- host/src/channel_manager.rs | 221 ++++++++++++++++++++------------- host/src/connection_manager.rs | 104 ++++++++++++++++ host/src/types/l2cap.rs | 9 ++ 4 files changed, 293 insertions(+), 114 deletions(-) diff --git a/host/src/adapter.rs b/host/src/adapter.rs index ff35db1..bc38822 100644 --- a/host/src/adapter.rs +++ b/host/src/adapter.rs @@ -2,6 +2,7 @@ //! //! The adapter module contains the main entry point for the TrouBLE host. use core::future::poll_fn; +use core::task::Poll; use bt_hci::cmd::controller_baseband::{HostBufferSize, Reset, SetEventMask}; use bt_hci::cmd::le::{ @@ -22,10 +23,9 @@ use bt_hci::param::{ }; use bt_hci::{ControllerToHostPacket, FromHciBytes, WriteHci}; use embassy_futures::select::{select, Either}; -use embassy_sync::blocking_mutex::raw::{NoopRawMutex, RawMutex}; +use embassy_sync::blocking_mutex::raw::RawMutex; use embassy_sync::channel::Channel; use embassy_sync::once_lock::OnceLock; -use embassy_sync::semaphore::{GreedySemaphore, Semaphore as _}; use futures::pin_mut; use crate::advertise::{Advertisement, AdvertisementConfig, RawAdvertisement}; @@ -95,7 +95,6 @@ pub struct Adapter< pub(crate) channels: ChannelManager<'d, M, CHANNELS, L2CAP_MTU, L2CAP_TXQ, L2CAP_RXQ>, pub(crate) att_inbound: Channel), L2CAP_RXQ>, pub(crate) pool: &'d dyn DynamicPacketPool<'d>, - pub(crate) permits: GreedySemaphore, pub(crate) scanner: Channel, 1>, } @@ -132,7 +131,6 @@ where pool: &host_resources.pool, att_inbound: Channel::new(), scanner: Channel::new(), - permits: GreedySemaphore::new(0), } } @@ -589,6 +587,7 @@ where } let Some(mut p) = self.pool.alloc(AllocId::from_channel(header.channel)) else { + trace!("No memory for packets on channel {}", header.channel); return Err(Error::OutOfMemory); }; p.as_mut()[..data.len()].copy_from_slice(data); @@ -719,9 +718,14 @@ where .await?; let ret = LeReadBufferSize::new().exec(&self.controller).await?; - self.permits.set(ret.total_num_le_acl_data_packets as usize); - // TODO: Configure ACL max buffer size as well? + info!( + "[adapter] setting permits to {}", + ret.total_num_le_acl_data_packets as usize + ); + self.connections + .set_link_credits(ret.total_num_le_acl_data_packets as usize); + // TODO: Configure ACL max buffer size as well? let _ = self.initialized.init(()); loop { @@ -788,8 +792,17 @@ where let _ = self.channels.disconnected(e.handle); } Event::NumberOfCompletedPackets(c) => { - // info!("Confirmed {} packets sent", c.completed_packets.len()); - self.permits.release(c.completed_packets.len()); + // Explicitly ignoring for now + for entry in c.completed_packets.iter() { + match (entry.handle(), entry.num_completed_packets()) { + (Ok(handle), Ok(completed)) => { + let _ = self.connections.confirm_sent(handle, completed as usize); + } + _ => {} // Ignoring for now + } + + //c.completed_packets.len()); + } } Event::Vendor(vendor) => { if let Some(handler) = vendor_handler { @@ -821,38 +834,41 @@ where } } - pub(crate) fn hci(&self) -> HciController<'_, T> { + pub(crate) fn hci(&self) -> HciController<'_, M, T, CONNS> { HciController { controller: &self.controller, - permits: &self.permits, + connections: &self.connections, } } } -pub struct HciController<'d, T: Controller> { +pub struct HciController<'d, M: RawMutex, T: Controller, const CONNS: usize> { pub(crate) controller: &'d T, - pub(crate) permits: &'d GreedySemaphore, + pub(crate) connections: &'d ConnectionManager, } -impl<'d, T: Controller> Clone for HciController<'d, T> { +impl<'d, M: RawMutex, T: Controller, const CONNS: usize> Clone for HciController<'d, M, T, CONNS> { fn clone(&self) -> Self { Self { controller: self.controller, - permits: self.permits, + connections: self.connections, } } } -impl<'d, T: Controller> HciController<'d, T> { +impl<'d, M: RawMutex, T: Controller, const CONNS: usize> HciController<'d, M, T, CONNS> { pub(crate) fn try_send(&self, handle: ConnHandle, pdu: &[u8]) -> Result<(), AdapterError> where T: blocking::Controller, { - // info!("[try_send] permits: {}", self.permits.permits()); - let permit = self - .permits - .try_acquire(1) - .ok_or::>(Error::NoPermits.into())?; + let mut grant = match self.connections.poll_request_to_send(handle, 1, None) { + Poll::Ready(res) => res?, + Poll::Pending => { + warn!("[link][handle = {}]: not enough credits", handle); + return Err(Error::Busy.into()); + } + }; + let acl = AclPacket::new( handle, AclPacketBoundary::FirstNonFlushable, @@ -862,7 +878,7 @@ impl<'d, T: Controller> HciController<'d, T> { // info!("Sent ACL {:?}", acl); match self.controller.try_write_acl_data(&acl) { Ok(result) => { - permit.disarm(); + grant.confirm(1); Ok(result) } Err(blocking::TryError::Busy) => { @@ -874,12 +890,7 @@ impl<'d, T: Controller> HciController<'d, T> { } pub(crate) async fn send(&self, handle: ConnHandle, pdu: &[u8]) -> Result<(), AdapterError> { - // info!("[send] permits: {}", self.permits.permits()); - let permit = self - .permits - .acquire(1) - .await - .map_err(|_| AdapterError::Adapter(Error::NoPermits))?; + let mut grant = poll_fn(|cx| self.connections.poll_request_to_send(handle, 1, Some(cx))).await?; let acl = AclPacket::new( handle, AclPacketBoundary::FirstNonFlushable, @@ -890,7 +901,7 @@ impl<'d, T: Controller> HciController<'d, T> { .write_acl_data(&acl) .await .map_err(AdapterError::Controller)?; - permit.disarm(); + grant.confirm(1); Ok(()) } @@ -901,6 +912,12 @@ impl<'d, T: Controller> HciController<'d, T> { signal: &D, p_buf: &mut [u8], ) -> Result<(), AdapterError> { + trace!( + "[l2cap][conn = {}] sending control signal (req = {}) signal: {:?}", + handle, + identifier, + signal + ); let header = L2capSignalHeader { identifier, code: D::code(), diff --git a/host/src/channel_manager.rs b/host/src/channel_manager.rs index dc8d2eb..e138fcc 100644 --- a/host/src/channel_manager.rs +++ b/host/src/channel_manager.rs @@ -91,7 +91,6 @@ impl< ChannelState::Disconnecting if cid == storage.cid => { storage.state = ChannelState::Disconnected; storage.cid = 0; - let _ = self.inbound[idx].try_send(None); return Ok(storage.conn); } ChannelState::PeerConnecting(_) if cid == storage.cid => { @@ -112,6 +111,7 @@ impl< _ => {} } } + trace!("[l2cap][disconnect] channel {} not found", cid); Err(Error::NotFound) })?; Ok(ConnHandle::new(handle)) @@ -146,12 +146,13 @@ impl< Ok(()) } - fn alloc(&self, f: F) -> Result<(), Error> { + fn alloc(&self, conn: ConnHandle, f: F) -> Result<(), Error> { self.state.lock(|state| { let mut state = state.borrow_mut(); for (idx, storage) in state.channels.iter_mut().enumerate() { if let ChannelState::Disconnected = storage.state { let cid: u16 = BASE_ID + idx as u16; + storage.conn = conn.raw(); storage.cid = cid; f(storage); return Ok(()); @@ -161,14 +162,14 @@ impl< }) } - pub(crate) async fn accept( + pub(crate) async fn accept( &self, conn: ConnHandle, psm: &[u16], mtu: u16, credit_flow: CreditFlowPolicy, initial_credits: Option, - controller: &HciController<'_, T>, + controller: &HciController<'_, M, T, CONNS>, ) -> Result> { // Wait until we find a channel for our connection in the connecting state matching our PSM. let (req_id, mps, mtu, cid, credits) = poll_fn(|cx| { @@ -226,14 +227,14 @@ impl< Ok(cid) } - pub(crate) async fn create( + pub(crate) async fn create( &self, conn: ConnHandle, psm: u16, mtu: u16, credit_flow: CreditFlowPolicy, initial_credits: Option, - controller: &HciController<'_, T>, + controller: &HciController<'_, M, T, CONNS>, ) -> Result> { let req_id = self.next_request_id(); let mut credits = 0; @@ -241,13 +242,13 @@ impl< let mps = self.pool.mtu() as u16 - 4; // Allocate space for our new channel. - self.alloc(|storage| { + self.alloc(conn, |storage| { cid = storage.cid; credits = initial_credits.unwrap_or(self.pool.min_available(AllocId::from_channel(storage.cid)) as u16); + storage.psm = psm; storage.mps = mps; storage.mtu = mtu; storage.flow_control = CreditFlowControl::new(credit_flow, credits); - storage.state = ChannelState::Connecting(req_id); })?; @@ -303,7 +304,6 @@ impl< if chan > self.inbound.len() { return Err(Error::InvalidChannelId); } - trace!("[l2cap] inbound data packet for {}", header.channel); self.state.lock(|state| { let mut state = state.borrow_mut(); @@ -311,6 +311,8 @@ impl< match storage.state { ChannelState::Connected if header.channel == storage.cid => { if storage.flow_control.available() == 0 { + trace!("[l2cap][cid = {}] no credits available", header.channel); + self.dump_state(); return Err(Error::OutOfMemory); } storage.flow_control.received(1); @@ -330,7 +332,13 @@ impl< /// Handle incoming L2CAP signal pub(crate) async fn signal(&self, conn: ConnHandle, data: &[u8]) -> Result<(), Error> { let (header, data) = L2capSignalHeader::from_hci_bytes(data)?; - trace!("[l2cap] inbound signal code {:?}", header.code); + trace!( + "[l2cap][conn = {}] received signal (req {}) code {:?}", + conn, + header.identifier, + header.code + ); + self.dump_state(); match header.code { L2capSignalCode::LeCreditConnReq => { let req = LeCreditConnReq::from_hci_bytes_complete(data)?; @@ -342,7 +350,8 @@ impl< } L2capSignalCode::LeCreditFlowInd => { let req = LeCreditFlowInd::from_hci_bytes_complete(data)?; - self.handle_credit_flow(&req)?; + trace!("[l2cap] credit flow: {:?}", req); + self.handle_credit_flow(conn, &req)?; Ok(()) } L2capSignalCode::CommandRejectRes => { @@ -352,11 +361,13 @@ impl< } L2capSignalCode::DisconnectionReq => { let req = DisconnectionReq::from_hci_bytes_complete(data)?; + trace!("[l2cap][conn = {}, cid = {}] disconnect request", conn, req.dcid); self.disconnect(req.dcid)?; Ok(()) } L2capSignalCode::DisconnectionRes => { let res = DisconnectionRes::from_hci_bytes_complete(data)?; + trace!("[l2cap][conn = {}, cid = {}] disconnect response", conn, res.scid); self.handle_disconnect_response(&res) } _ => Err(Error::NotSupported), @@ -364,7 +375,7 @@ impl< } fn handle_connect_request(&self, conn: ConnHandle, identifier: u8, req: &LeCreditConnReq) -> Result<(), Error> { - self.alloc(|storage| { + self.alloc(conn, |storage| { storage.conn = conn.raw(); storage.psm = req.psm; storage.peer_cid = req.scid; @@ -399,6 +410,10 @@ impl< _ => {} } } + trace!( + "[l2cap][handle_connect_response] request with id {} not found", + identifier + ); Err(Error::NotFound) }) } @@ -409,12 +424,12 @@ impl< } } - fn handle_credit_flow(&self, req: &LeCreditFlowInd) -> Result<(), Error> { + fn handle_credit_flow(&self, conn: ConnHandle, req: &LeCreditFlowInd) -> Result<(), Error> { self.state.lock(|state| { let mut state = state.borrow_mut(); for (idx, storage) in state.channels.iter_mut().enumerate() { match storage.state { - ChannelState::Connected if storage.peer_cid == req.cid => { + ChannelState::Connected if storage.peer_cid == req.cid && conn.raw() == storage.conn => { storage.peer_credits += req.credits; state.credit_wakers[idx].wake(); return Ok(()); @@ -422,12 +437,13 @@ impl< _ => {} } } + trace!("[l2cap][handle_credit_flow] peer channel {} not found", req.cid); Err(Error::NotFound) }) } fn handle_disconnect_response(&self, res: &DisconnectionRes) -> Result<(), Error> { - let cid = res.dcid; + let cid = res.scid; self.state.lock(|state| { let mut state = state.borrow_mut(); for storage in state.channels.iter_mut() { @@ -458,11 +474,11 @@ impl< /// Receive data on a given channel and copy it into the buffer. /// /// The length provided buffer slice must be equal or greater to the agreed MTU. - pub(crate) async fn receive( + pub(crate) async fn receive( &self, cid: u16, buf: &mut [u8], - hci: &HciController<'_, T>, + hci: &HciController<'_, M, T, CONNS>, ) -> Result> { let idx = self.connected_channel_index(cid)?; @@ -507,15 +523,16 @@ impl< return Ok(idx); } } + trace!("[l2cap][connected_channel_index] channel {} not found", cid); Err(Error::NotFound) }) } - async fn receive_pdu( + async fn receive_pdu( &self, cid: u16, idx: usize, - hci: &HciController<'_, T>, + hci: &HciController<'_, M, T, CONNS>, ) -> Result, AdapterError> { match self.inbound[idx].receive().await { Some(pdu) => Ok(pdu), @@ -531,44 +548,35 @@ impl< /// The buffer will be segmented to the maximum payload size agreed in the opening handshake. /// /// If the channel has been closed or the channel id is not valid, an error is returned. - pub(crate) async fn send( + pub(crate) async fn send( &self, cid: u16, buf: &[u8], - hci: &HciController<'_, T>, + hci: &HciController<'_, M, T, CONNS>, ) -> Result<(), AdapterError> { let mut p_buf = [0u8; L2CAP_MTU]; let (conn, mps, peer_cid) = self.connected_channel_params(cid)?; // The number of packets we'll need to send for this payload let n_packets = 1 + ((buf.len() as u16).saturating_sub(mps - 2)).div_ceil(mps); - poll_fn(|cx| self.poll_request_to_send(cid, n_packets, Some(cx))).await?; + let mut grant = poll_fn(|cx| self.poll_request_to_send(cid, n_packets, Some(cx))).await?; - let mut unsent = n_packets; - let result: Result<(), AdapterError> = async { - // Segment using mps - let (first, remaining) = buf.split_at(buf.len().min(mps as usize - 2)); + // Segment using mps + let (first, remaining) = buf.split_at(buf.len().min(mps as usize - 2)); - let len = encode(first, &mut p_buf[..], peer_cid, Some(buf.len() as u16))?; - hci.send(conn, &p_buf[..len]).await?; - unsent -= 1; + let len = encode(first, &mut p_buf[..], peer_cid, Some(buf.len() as u16))?; + hci.send(conn, &p_buf[..len]).await?; + grant.confirm(1); - let chunks = remaining.chunks(mps as usize); - let num_chunks = chunks.len(); + let chunks = remaining.chunks(mps as usize); + let num_chunks = chunks.len(); - for (i, chunk) in chunks.enumerate() { - let len = encode(chunk, &mut p_buf[..], peer_cid, None)?; - hci.send(conn, &p_buf[..len]).await?; - unsent -= 1; - } - Ok(()) - } - .await; - if unsent > 0 { - warn!("Replenishing credits for {} unsent packets", unsent); - self.abort_send(cid, unsent)?; + for (i, chunk) in chunks.enumerate() { + let len = encode(chunk, &mut p_buf[..], peer_cid, None)?; + hci.send(conn, &p_buf[..len]).await?; + grant.confirm(1); } - result + Ok(()) } /// Send the provided buffer over a given l2cap channel. @@ -576,11 +584,11 @@ impl< /// The buffer will be segmented to the maximum payload size agreed in the opening handshake. /// /// If the channel has been closed or the channel id is not valid, an error is returned. - pub(crate) fn try_send( + pub(crate) fn try_send( &self, cid: u16, buf: &[u8], - hci: &HciController<'_, T>, + hci: &HciController<'_, M, T, CONNS>, ) -> Result<(), AdapterError> { let mut p_buf = [0u8; L2CAP_MTU]; let (conn, mps, peer_cid) = self.connected_channel_params(cid)?; @@ -588,39 +596,31 @@ impl< // The number of packets we'll need to send for this payload let n_packets = 1 + ((buf.len() as u16).saturating_sub(mps - 2)).div_ceil(mps); - match self.poll_request_to_send(cid, n_packets, None) { + let mut grant = match self.poll_request_to_send(cid, n_packets, None) { Poll::Ready(res) => res?, Poll::Pending => { - warn!("l2cap: not enough credits for {} packets", n_packets); + warn!("[l2cap][cid = {}]: not enough credits for {} packets", cid, n_packets); + self.dump_state(); return Err(Error::Busy.into()); } - } - - let mut unsent = n_packets; - let result: Result<(), AdapterError> = { - // Segment using mps - let (first, remaining) = buf.split_at(buf.len().min(mps as usize - 2)); + }; - let len = encode(first, &mut p_buf[..], peer_cid, Some(buf.len() as u16))?; - hci.try_send(conn, &p_buf[..len])?; - unsent -= 1; + // Segment using mps + let (first, remaining) = buf.split_at(buf.len().min(mps as usize - 2)); - let chunks = remaining.chunks(mps as usize); - let num_chunks = chunks.len(); + let len = encode(first, &mut p_buf[..], peer_cid, Some(buf.len() as u16))?; + hci.try_send(conn, &p_buf[..len])?; + grant.confirm(1); - for (i, chunk) in chunks.enumerate() { - let len = encode(chunk, &mut p_buf[..], peer_cid, None)?; - hci.try_send(conn, &p_buf[..len])?; - unsent -= 1; - } - Ok(()) - }; + let chunks = remaining.chunks(mps as usize); + let num_chunks = chunks.len(); - if unsent > 0 { - warn!("Replenishing credits for {} unsent packets", unsent); - self.abort_send(cid, unsent)?; + for (i, chunk) in chunks.enumerate() { + let len = encode(chunk, &mut p_buf[..], peer_cid, None)?; + hci.try_send(conn, &p_buf[..len])?; + grant.confirm(1); } - result + Ok(()) } fn connected_channel_params(&self, cid: u16) -> Result<(ConnHandle, u16, u16), Error> { @@ -634,16 +634,17 @@ impl< _ => {} } } + trace!("[l2cap][connected_channel_params] channel {} not found", cid); Err(Error::NotFound) }) } // Check the current state of flow control and send flow indications if // our policy says so. - async fn flow_control( + async fn flow_control( &self, cid: u16, - hci: &HciController<'_, T>, + hci: &HciController<'_, M, T, CONNS>, mut packet: Packet<'_>, ) -> Result<(), AdapterError> { let (conn, credits) = self.state.lock(|state| { @@ -656,6 +657,7 @@ impl< _ => {} } } + trace!("[l2cap][flow_control] channel {} not found", cid); Err(Error::NotFound) })?; @@ -670,10 +672,10 @@ impl< Ok(()) } - async fn confirm_disconnected( + async fn confirm_disconnected( &self, cid: u16, - hci: &HciController<'_, T>, + hci: &HciController<'_, M, T, CONNS>, ) -> Result<(), AdapterError> { let (handle, dcid, scid) = self.state.lock(|state| { let mut state = state.borrow_mut(); @@ -692,6 +694,7 @@ impl< _ => {} } } + trace!("[l2cap][confirm_disconnected] channel {} not found", cid); Err(Error::NotFound) })?; @@ -707,23 +710,23 @@ impl< Ok(()) } - fn abort_send(&self, cid: u16, credits: u16) -> Result<(), Error> { + fn dump_state(&self) { self.state.lock(|state| { - let mut state = state.borrow_mut(); - for (idx, storage) in state.channels.iter_mut().enumerate() { - match storage.state { - ChannelState::Connected if cid == storage.cid => { - storage.peer_credits += credits; - return Ok(()); - } - _ => {} + let state = state.borrow(); + for (idx, storage) in state.channels.iter().enumerate() { + if let ChannelState::Connected = storage.state { + trace!("[l2cap][idx = {}] state = {:?}", idx, storage); } } - Err(Error::NotFound) }) } - fn poll_request_to_send(&self, cid: u16, credits: u16, cx: Option<&mut Context<'_>>) -> Poll> { + fn poll_request_to_send( + &self, + cid: u16, + credits: u16, + cx: Option<&mut Context<'_>>, + ) -> Poll, Error>> { self.state.lock(|state| { let mut state = state.borrow_mut(); for (idx, storage) in state.channels.iter_mut().enumerate() { @@ -731,7 +734,7 @@ impl< ChannelState::Connected if cid == storage.cid => { if credits <= storage.peer_credits { storage.peer_credits -= credits; - return Poll::Ready(Ok(())); + return Poll::Ready(Ok(CreditGrant::new(&self.state, cid, credits))); } else { if let Some(cx) = cx { state.credit_wakers[idx].register(cx.waker()); @@ -742,6 +745,7 @@ impl< _ => {} } } + trace!("[l2cap][pool_request_to_send] channel {} not found", cid); Poll::Ready(Err(Error::NotFound)) }) } @@ -764,6 +768,8 @@ fn encode(data: &[u8], packet: &mut [u8], peer_cid: u16, header: Option) -> Ok(w.len()) } +#[derive(Debug)] +#[cfg_attr(feature = "defmt", derive(defmt::Format))] pub struct ChannelStorage { state: ChannelState, conn: u16, @@ -792,7 +798,8 @@ impl ChannelStorage { }; } -#[derive(PartialEq)] +#[derive(Debug, PartialEq)] +#[cfg_attr(feature = "defmt", derive(defmt::Format))] pub enum ChannelState { Disconnected, Connecting(u8), @@ -867,3 +874,45 @@ impl CreditFlowControl { } } } + +pub struct CreditGrant<'d, M: RawMutex, const CHANNELS: usize> { + state: &'d Mutex>>, + cid: u16, + credits: u16, +} + +impl<'d, M: RawMutex, const CHANNELS: usize> CreditGrant<'d, M, CHANNELS> { + fn new(state: &'d Mutex>>, cid: u16, credits: u16) -> Self { + Self { state, cid, credits } + } + + pub(crate) fn confirm(&mut self, sent: u16) { + self.credits = self.credits.saturating_sub(sent); + } + + fn done(&mut self) { + self.credits = 0; + } +} + +impl<'d, M: RawMutex, const CHANNELS: usize> Drop for CreditGrant<'d, M, CHANNELS> { + fn drop(&mut self) { + if self.credits > 0 { + self.state.lock(|state| { + let mut state = state.borrow_mut(); + for (idx, storage) in state.channels.iter_mut().enumerate() { + match storage.state { + ChannelState::Connected if self.cid == storage.cid => { + storage.peer_credits += self.credits; + state.credit_wakers[idx].wake(); + break; + } + _ => {} + } + } + // make it an assert? + trace!("[l2cap][credit grant drop] channel {} not found", self.cid); + }) + } + } +} diff --git a/host/src/connection_manager.rs b/host/src/connection_manager.rs index a3f114e..dd66e1d 100644 --- a/host/src/connection_manager.rs +++ b/host/src/connection_manager.rs @@ -15,6 +15,8 @@ struct State { connections: [ConnectionStorage; CONNS], accept_waker: WakerRegistration, disconnect_waker: WakerRegistration, + link_credit_wakers: [WakerRegistration; CONNS], + default_link_credits: usize, } pub(crate) struct ConnectionManager { @@ -23,12 +25,15 @@ pub(crate) struct ConnectionManager { } impl ConnectionManager { + const CREDIT_WAKER: WakerRegistration = WakerRegistration::new(); pub(crate) fn new() -> Self { Self { state: Mutex::new(RefCell::new(State { connections: [ConnectionStorage::DISCONNECTED; CONNS], accept_waker: WakerRegistration::new(), disconnect_waker: WakerRegistration::new(), + link_credit_wakers: [Self::CREDIT_WAKER; CONNS], + default_link_credits: 0, })), canceled: Signal::new(), } @@ -119,9 +124,11 @@ impl ConnectionManager { pub(crate) fn connect(&self, handle: ConnHandle, info: &LeConnectionComplete) -> Result<(), Error> { self.state.lock(|state| { let mut state = state.borrow_mut(); + let default_credits = state.default_link_credits; for storage in state.connections.iter_mut() { if let ConnectionState::Disconnected = storage.state { storage.state = ConnectionState::Connecting; + storage.link_credits = default_credits; storage.handle.replace(handle); storage.peer_addr_kind.replace(info.peer_addr_kind); storage.peer_addr.replace(info.peer_addr); @@ -170,6 +177,63 @@ impl ConnectionManager { pub(crate) async fn accept(&self, peers: &[(AddrKind, &BdAddr)]) -> ConnHandle { poll_fn(move |cx| self.poll_accept(peers, cx)).await } + + pub(crate) fn set_link_credits(&self, credits: usize) { + self.state.lock(|state| { + let mut state = state.borrow_mut(); + state.default_link_credits = credits; + for storage in state.connections.iter_mut() { + storage.link_credits = credits; + } + }); + } + + pub(crate) fn confirm_sent(&self, handle: ConnHandle, packets: usize) -> Result<(), Error> { + self.state.lock(|state| { + let mut state = state.borrow_mut(); + for (idx, storage) in state.connections.iter_mut().enumerate() { + match storage.state { + ConnectionState::Connected if handle == storage.handle.unwrap() => { + storage.link_credits += packets; + state.link_credit_wakers[idx].wake(); + return Ok(()); + } + _ => {} + } + } + trace!("[link][confirm_sent] connection {} not found", handle); + Err(Error::NotFound) + }) + } + + pub(crate) fn poll_request_to_send( + &self, + handle: ConnHandle, + packets: usize, + cx: Option<&mut Context<'_>>, + ) -> Poll, Error>> { + self.state.lock(|state| { + let mut state = state.borrow_mut(); + for (idx, storage) in state.connections.iter_mut().enumerate() { + match storage.state { + ConnectionState::Connected if storage.handle.unwrap() == handle => { + if packets <= storage.link_credits { + storage.link_credits -= packets; + return Poll::Ready(Ok(PacketGrant::new(&self.state, handle, packets))); + } else { + if let Some(cx) = cx { + state.link_credit_wakers[idx].register(cx.waker()); + } + return Poll::Pending; + } + } + _ => {} + } + } + trace!("[link][pool_request_to_send] connection {} not found", handle); + Poll::Ready(Err(Error::NotFound)) + }) + } } pub trait DynamicConnectionManager { @@ -229,6 +293,7 @@ pub struct ConnectionStorage { pub peer_addr_kind: Option, pub peer_addr: Option, pub att_mtu: u16, + pub link_credits: usize, } impl ConnectionStorage { @@ -239,6 +304,7 @@ impl ConnectionStorage { peer_addr_kind: None, peer_addr: None, att_mtu: 23, + link_credits: 0, }; } @@ -250,3 +316,41 @@ pub enum ConnectionState { Connecting, Connected, } + +pub struct PacketGrant<'d, M: RawMutex, const CONNS: usize> { + state: &'d Mutex>>, + handle: ConnHandle, + packets: usize, +} + +impl<'d, M: RawMutex, const CONNS: usize> PacketGrant<'d, M, CONNS> { + fn new(state: &'d Mutex>>, handle: ConnHandle, packets: usize) -> Self { + Self { state, handle, packets } + } + + pub(crate) fn confirm(&mut self, sent: usize) { + self.packets = self.packets.saturating_sub(sent); + } +} + +impl<'d, M: RawMutex, const CHANNELS: usize> Drop for PacketGrant<'d, M, CHANNELS> { + fn drop(&mut self) { + if self.packets > 0 { + self.state.lock(|state| { + let mut state = state.borrow_mut(); + for (idx, storage) in state.connections.iter_mut().enumerate() { + match storage.state { + ConnectionState::Connected if self.handle == storage.handle.unwrap() => { + storage.link_credits += self.packets; + state.link_credit_wakers[idx].wake(); + break; + } + _ => {} + } + } + // make it an assert? + trace!("[link] connection {} not found", self.handle); + }) + } + } +} diff --git a/host/src/types/l2cap.rs b/host/src/types/l2cap.rs index 6942ad3..4b37c62 100644 --- a/host/src/types/l2cap.rs +++ b/host/src/types/l2cap.rs @@ -35,6 +35,7 @@ unsafe impl FixedSizeValue for L2capSignalHeader { } } +#[cfg(not(feature = "defmt"))] pub trait L2capSignal: WriteHci + FixedSizeValue { fn channel() -> u16 { L2CAP_CID_LE_U_SIGNAL @@ -42,6 +43,14 @@ pub trait L2capSignal: WriteHci + FixedSizeValue { fn code() -> L2capSignalCode; } +#[cfg(feature = "defmt")] +pub trait L2capSignal: WriteHci + FixedSizeValue + defmt::Format { + fn channel() -> u16 { + L2CAP_CID_LE_U_SIGNAL + } + fn code() -> L2capSignalCode; +} + #[cfg_attr(feature = "defmt", derive(defmt::Format))] #[derive(Debug, Clone, Copy, PartialEq)] pub enum L2capSignalCode {