From 71ef0e7658b104ab247b346280c3f619687d6157 Mon Sep 17 00:00:00 2001 From: Ulf Lilleengen Date: Wed, 10 Apr 2024 21:13:15 +0200 Subject: [PATCH 1/3] update nrf-sdc version --- examples/nrf-sdc/src/bin/ble_bas_peripheral.rs | 4 ++-- examples/nrf-sdc/src/bin/ble_l2cap_central.rs | 4 ++-- examples/nrf-sdc/src/bin/ble_l2cap_peripheral.rs | 4 ++-- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/examples/nrf-sdc/src/bin/ble_bas_peripheral.rs b/examples/nrf-sdc/src/bin/ble_bas_peripheral.rs index c91ac24..19bbdc3 100644 --- a/examples/nrf-sdc/src/bin/ble_bas_peripheral.rs +++ b/examples/nrf-sdc/src/bin/ble_bas_peripheral.rs @@ -57,8 +57,8 @@ const PACKET_POOL_SIZE: usize = 10; fn build_sdc<'d, const N: usize>( p: nrf_sdc::Peripherals<'d>, - rng: &'d RngPool<'d>, - mpsl: &'d MultiprotocolServiceLayer<'d>, + rng: &'d RngPool, + mpsl: &'d MultiprotocolServiceLayer, mem: &'d mut sdc::Mem, ) -> Result, nrf_sdc::Error> { sdc::Builder::new()? diff --git a/examples/nrf-sdc/src/bin/ble_l2cap_central.rs b/examples/nrf-sdc/src/bin/ble_l2cap_central.rs index 944313e..93ce822 100644 --- a/examples/nrf-sdc/src/bin/ble_l2cap_central.rs +++ b/examples/nrf-sdc/src/bin/ble_l2cap_central.rs @@ -64,8 +64,8 @@ const PACKET_POOL_SIZE: usize = (L2CAP_TXQ + L2CAP_RXQ) as usize; fn build_sdc<'d, const N: usize>( p: nrf_sdc::Peripherals<'d>, - rng: &'d RngPool<'d>, - mpsl: &'d MultiprotocolServiceLayer<'d>, + rng: &'d RngPool, + mpsl: &'d MultiprotocolServiceLayer, mem: &'d mut sdc::Mem, ) -> Result, nrf_sdc::Error> { sdc::Builder::new()? diff --git a/examples/nrf-sdc/src/bin/ble_l2cap_peripheral.rs b/examples/nrf-sdc/src/bin/ble_l2cap_peripheral.rs index eb6b141..acb119f 100644 --- a/examples/nrf-sdc/src/bin/ble_l2cap_peripheral.rs +++ b/examples/nrf-sdc/src/bin/ble_l2cap_peripheral.rs @@ -63,8 +63,8 @@ const PACKET_POOL_SIZE: usize = (L2CAP_TXQ + L2CAP_RXQ) as usize; fn build_sdc<'d, const N: usize>( p: nrf_sdc::Peripherals<'d>, - rng: &'d RngPool<'d>, - mpsl: &'d MultiprotocolServiceLayer<'d>, + rng: &'d RngPool, + mpsl: &'d MultiprotocolServiceLayer, mem: &'d mut sdc::Mem, ) -> Result, nrf_sdc::Error> { sdc::Builder::new()? From dab2c20886e5669040a7de5d2bde38969f41e339 Mon Sep 17 00:00:00 2001 From: Ulf Lilleengen Date: Wed, 10 Apr 2024 21:14:10 +0200 Subject: [PATCH 2/3] improve scanning and connection handling * Fix a bug issuing credits with the wrong channel id * Add flow control policies to allow more configurability * Use filter_accept_list rather than scanning and specifying peer for connecting. * Support reassembly of ACL packets. * Indicate to controller size of buffers. * Allow accepting multiple l2cap PSMs --- examples/nrf-sdc/src/bin/ble_l2cap_central.rs | 2 +- .../nrf-sdc/src/bin/ble_l2cap_peripheral.rs | 2 +- host/Cargo.toml | 2 +- host/src/adapter.rs | 254 ++++++------ host/src/channel_manager.rs | 363 ++++++++++++++---- host/src/connection.rs | 40 +- host/src/connection_manager.rs | 24 +- host/src/l2cap.rs | 152 ++++++-- host/src/lib.rs | 2 + host/src/packet_pool.rs | 24 +- host/src/scan.rs | 3 - host/tests/l2cap.rs | 4 +- 12 files changed, 634 insertions(+), 238 deletions(-) diff --git a/examples/nrf-sdc/src/bin/ble_l2cap_central.rs b/examples/nrf-sdc/src/bin/ble_l2cap_central.rs index 93ce822..e48687d 100644 --- a/examples/nrf-sdc/src/bin/ble_l2cap_central.rs +++ b/examples/nrf-sdc/src/bin/ble_l2cap_central.rs @@ -142,7 +142,7 @@ async fn main(spawner: Spawner) { info!("Connected, creating l2cap channel"); const PAYLOAD_LEN: usize = 27; let mut ch1: L2capChannel<'_, '_, _, PAYLOAD_LEN> = - unwrap!(L2capChannel::create(&adapter, &conn, 0x2349, PAYLOAD_LEN as u16).await); + unwrap!(L2capChannel::create(&adapter, &conn, 0x2349, PAYLOAD_LEN as u16, Default::default()).await); info!("New l2cap channel created, sending some data!"); for i in 0..10 { let tx = [i; PAYLOAD_LEN]; diff --git a/examples/nrf-sdc/src/bin/ble_l2cap_peripheral.rs b/examples/nrf-sdc/src/bin/ble_l2cap_peripheral.rs index acb119f..ee061e8 100644 --- a/examples/nrf-sdc/src/bin/ble_l2cap_peripheral.rs +++ b/examples/nrf-sdc/src/bin/ble_l2cap_peripheral.rs @@ -152,7 +152,7 @@ async fn main(spawner: Spawner) { info!("Connection established"); let mut ch1: L2capChannel<'_, '_, _, PAYLOAD_LEN> = - unwrap!(L2capChannel::accept(&adapter, &conn, 0x2349, PAYLOAD_LEN as u16).await); + unwrap!(L2capChannel::accept(&adapter, &conn, &[0x2349], PAYLOAD_LEN as u16, Default::default()).await); info!("L2CAP channel accepted"); diff --git a/host/Cargo.toml b/host/Cargo.toml index 1883f89..8159ab1 100644 --- a/host/Cargo.toml +++ b/host/Cargo.toml @@ -15,7 +15,7 @@ categories = [ resolver = "2" [dependencies] -bt-hci = { version = "0.1.0" } +bt-hci = { version = "0.1.0", features = ["embassy-time"] } embedded-io-async = { version = "0.6" } embassy-sync = "0.5" embassy-time = "0.3" diff --git a/host/src/adapter.rs b/host/src/adapter.rs index 8bc54a3..50f3709 100644 --- a/host/src/adapter.rs +++ b/host/src/adapter.rs @@ -3,14 +3,14 @@ use crate::channel_manager::ChannelManager; use crate::connection::{ConnectConfig, Connection}; use crate::connection_manager::{ConnectionInfo, ConnectionManager}; use crate::cursor::{ReadCursor, WriteCursor}; -use crate::l2cap::{L2capPacket, L2CAP_CID_ATT, L2CAP_CID_DYN_START, L2CAP_CID_LE_U_SIGNAL}; -use crate::packet_pool::{DynamicPacketPool, PacketPool, Qos}; +use crate::l2cap::{L2capHeader, PacketReassembly, L2CAP_CID_ATT, L2CAP_CID_DYN_START, L2CAP_CID_LE_U_SIGNAL}; +use crate::packet_pool::{AllocId, DynamicPacketPool, PacketPool, Qos}; use crate::pdu::Pdu; use crate::scan::{PhySet, ScanConfig, ScanReport}; use crate::types::l2cap::L2capLeSignal; use crate::Address; use crate::{AdapterError, Error}; -use bt_hci::cmd::controller_baseband::{Reset, SetEventMask}; +use bt_hci::cmd::controller_baseband::{HostBufferSize, Reset, SetEventMask}; use bt_hci::cmd::le::{ LeAddDeviceToFilterAcceptList, LeClearAdvSets, LeClearFilterAcceptList, LeCreateConn, LeCreateConnParams, LeExtCreateConn, LeReadBufferSize, LeSetAdvSetRandomAddr, LeSetEventMask, LeSetExtAdvData, LeSetExtAdvEnable, @@ -18,7 +18,6 @@ use bt_hci::cmd::le::{ LeSetScanEnable, LeSetScanParams, }; use bt_hci::cmd::link_control::{Disconnect, DisconnectParams}; -use bt_hci::cmd::status::ReadRssi; use bt_hci::cmd::{AsyncCmd, SyncCmd}; use bt_hci::controller::Controller; use bt_hci::controller::{ControllerCmdAsync, ControllerCmdSync}; @@ -67,6 +66,7 @@ pub struct Adapter< pub(crate) address: Option
, pub(crate) controller: T, pub(crate) connections: ConnectionManager, + pub(crate) reassembly: PacketReassembly<'d, CONNS>, pub(crate) channels: ChannelManager<'d, M, CHANNELS, L2CAP_TXQ, L2CAP_RXQ>, pub(crate) att_inbound: Channel), L2CAP_RXQ>, pub(crate) pool: &'d dyn DynamicPacketPool<'d>, @@ -102,6 +102,7 @@ where address: None, controller, connections: ConnectionManager::new(), + reassembly: PacketReassembly::new(), channels: ChannelManager::new(&host_resources.pool), pool: &host_resources.pool, att_inbound: Channel::new(), @@ -136,12 +137,22 @@ where Ok(()) } - pub(crate) async fn read_rssi(&self, conn: ConnHandle) -> Result> + pub async fn command(&self, cmd: C) -> Result> where - T: ControllerCmdSync, + C: SyncCmd, + T: ControllerCmdSync, { - let val = ReadRssi::new(conn).exec(&self.controller).await?; - Ok(val.rssi) + let ret = cmd.exec(&self.controller).await?; + Ok(ret) + } + + pub async fn async_command(&self, cmd: C) -> Result<(), AdapterError> + where + C: AsyncCmd, + T: ControllerCmdAsync, + { + let ret = cmd.exec(&self.controller).await?; + Ok(ret) } pub(crate) async fn connect(&self, config: &ConnectConfig<'_>) -> Result, AdapterError> @@ -158,73 +169,56 @@ where if config.scan_config.filter_accept_list.is_empty() { return Err(Error::InvalidValue.into()); } - - self.start_scan(&config.scan_config).await?; - loop { - let Some(report) = self.scanner.receive().await else { - return Err(Error::Timeout.into()); + self.set_accept_filter(config.scan_config.filter_accept_list).await?; + + if config.scan_config.extended { + let initiating = InitiatingPhy { + scan_interval: config.scan_config.interval.into(), + scan_window: config.scan_config.window.into(), + conn_interval_min: config.connect_params.min_connection_interval.into(), + conn_interval_max: config.connect_params.max_connection_interval.into(), + max_latency: config.connect_params.max_latency, + supervision_timeout: config.connect_params.supervision_timeout.into(), + min_ce_len: config.connect_params.event_length.into(), + max_ce_len: config.connect_params.event_length.into(), }; - if config.scan_config.extended { - if let Some(entry) = report.iter_ext().next() { - self.stop_scan(&config.scan_config).await?; - let entry = entry.map_err(Error::HciDecode)?; - let initiating = InitiatingPhy { - scan_interval: bt_hci::param::Duration::from_micros(config.scan_config.interval.as_micros()), - scan_window: bt_hci::param::Duration::from_micros(config.scan_config.window.as_micros()), - conn_interval_min: bt_hci::param::Duration::from_micros( - config.connect_params.min_connection_interval.as_micros(), - ), - conn_interval_max: bt_hci::param::Duration::from_micros( - config.connect_params.max_connection_interval.as_micros(), - ), - max_latency: config.connect_params.max_latency, - supervision_timeout: bt_hci::param::Duration::from_micros( - config.connect_params.supervision_timeout.as_micros(), - ), - min_ce_len: bt_hci::param::Duration::from_millis(0), - max_ce_len: bt_hci::param::Duration::from_millis(0), - }; - let phy_params = Self::create_phy_params(initiating, config.scan_config.phys); - LeExtCreateConn::new( - true, - self.address.map(|a| a.kind).unwrap_or(AddrKind::RANDOM), - entry.addr_kind, - entry.addr, - phy_params, - ) - .exec(&self.controller) - .await?; - let info = self.connections.accept(Some(entry.addr)).await; - return Ok(Connection { - info, - control: self.control.sender().into(), - }); - } - } else if let Some(entry) = report.iter().next() { - self.stop_scan(&config.scan_config).await?; - let entry = entry.map_err(Error::HciDecode)?; - LeCreateConn::new( - bt_hci::param::Duration::from_micros(config.scan_config.interval.as_micros()), - bt_hci::param::Duration::from_micros(config.scan_config.window.as_micros()), - true, - entry.addr_kind, - entry.addr, - self.address.map(|a| a.kind).unwrap_or(AddrKind::RANDOM), - bt_hci::param::Duration::from_micros(config.connect_params.min_connection_interval.as_micros()), - bt_hci::param::Duration::from_micros(config.connect_params.max_connection_interval.as_micros()), - config.connect_params.max_latency, - bt_hci::param::Duration::from_micros(config.connect_params.supervision_timeout.as_micros()), - bt_hci::param::Duration::from_millis(0), - bt_hci::param::Duration::from_millis(0), - ) - .exec(&self.controller) - .await?; - let info = self.connections.accept(Some(entry.addr)).await; - return Ok(Connection { - info, - control: self.control.sender().into(), - }); - } + let phy_params = Self::create_phy_params(initiating, config.scan_config.phys); + LeExtCreateConn::new( + true, + self.address.map(|a| a.kind).unwrap_or(AddrKind::RANDOM), + AddrKind::RANDOM, + BdAddr::default(), + phy_params, + ) + .exec(&self.controller) + .await?; + let info = self.connections.accept(config.scan_config.filter_accept_list).await; + return Ok(Connection { + info, + control: self.control.sender().into(), + }); + } else { + LeCreateConn::new( + config.scan_config.interval.into(), + config.scan_config.window.into(), + true, + AddrKind::RANDOM, + BdAddr::default(), + self.address.map(|a| a.kind).unwrap_or(AddrKind::RANDOM), + config.connect_params.min_connection_interval.into(), + config.connect_params.max_connection_interval.into(), + config.connect_params.max_latency, + config.connect_params.supervision_timeout.into(), + config.connect_params.event_length.into(), + config.connect_params.event_length.into(), + ) + .exec(&self.controller) + .await?; + let info = self.connections.accept(config.scan_config.filter_accept_list).await; + return Ok(Connection { + info, + control: self.control.sender().into(), + }); } } @@ -260,8 +254,8 @@ where if config.extended { let scanning = ScanningPhy { active_scan: config.active, - scan_interval: bt_hci::param::Duration::from_micros(config.interval.as_micros()), - scan_window: bt_hci::param::Duration::from_micros(config.window.as_micros()), + scan_interval: config.interval.into(), + scan_window: config.window.into(), }; let phy_params = Self::create_phy_params(scanning, config.phys); LeSetExtScanParams::new( @@ -278,7 +272,7 @@ where LeSetExtScanEnable::new( true, FilterDuplicates::Disabled, - bt_hci::param::Duration::from_micros(config.timeout.as_micros()), + config.timeout.into(), bt_hci::param::Duration::from_secs(0), ) .exec(&self.controller) @@ -290,8 +284,8 @@ where } else { bt_hci::param::LeScanKind::Passive }, - bt_hci::param::Duration::from_micros(config.interval.as_micros()), - bt_hci::param::Duration::from_micros(config.interval.as_micros()), + config.interval.into(), + config.interval.into(), bt_hci::param::AddrKind::RANDOM, if config.filter_accept_list.is_empty() { bt_hci::param::ScanningFilterPolicy::BasicUnfiltered @@ -313,8 +307,8 @@ where if config.extended { LeSetExtScanEnable::new( false, - FilterDuplicates::Enabled, - bt_hci::param::Duration::from_micros(config.timeout.as_micros()), + FilterDuplicates::Disabled, + bt_hci::param::Duration::from_secs(0), bt_hci::param::Duration::from_secs(0), ) .exec(&self.controller) @@ -370,7 +364,7 @@ where let mut params = params.into(); let timeout = config .timeout - .map(|m| bt_hci::param::Duration::from_micros(m.as_micros())) + .map(|m| m.into()) .unwrap_or(bt_hci::param::Duration::from_secs(0)); let max_events = config.max_events.unwrap_or(0); @@ -384,8 +378,8 @@ where LeSetExtAdvParams::new( handle, params.props, - bt_hci::param::ExtDuration::from_micros(config.interval_min.as_micros()), - bt_hci::param::ExtDuration::from_micros(config.interval_min.as_micros()), + config.interval_min.into(), + config.interval_min.into(), config.channel_map, self.address.map(|a| a.kind).unwrap_or(AddrKind::RANDOM), peer.kind, @@ -457,33 +451,59 @@ where } async fn handle_acl(&self, acl: AclPacket<'_>) -> Result<(), Error> { - let (conn, packet) = L2capPacket::decode(acl)?; - match packet.channel { - L2CAP_CID_ATT => { - #[cfg(feature = "gatt")] - if let Some(mut p) = self.pool.alloc(crate::packet_pool::ATT_ID) { - let len = packet.payload.len(); - p.as_mut()[..len].copy_from_slice(packet.payload); - self.att_inbound.send((conn, Pdu { packet: p, len })).await; + let (header, packet) = match acl.boundary_flag() { + AclPacketBoundary::FirstFlushable => { + let (header, data) = L2capHeader::decode(&acl)?; + + // Avoids using the packet buffer for signalling packets + if header.channel == L2CAP_CID_LE_U_SIGNAL { + assert!(data.len() == header.length as usize); + let mut r = ReadCursor::new(data); + let signal: L2capLeSignal = r.read()?; + self.channels.control(acl.handle(), signal).await?; + return Ok(()); + } + + let Some(mut p) = self.pool.alloc(AllocId::from_channel(header.channel)) else { + return Err(Error::OutOfMemory); + }; + p.as_mut()[..data.len()].copy_from_slice(data); + + if header.length as usize != data.len() { + self.reassembly.init(acl.handle(), header, p, data.len())?; + return Ok(()); + } + (header, p) + } + // Next (potentially last) in a fragment + AclPacketBoundary::Continuing => { + // Get the existing fragment + if let Some((header, p)) = self.reassembly.update(acl.handle(), acl.data())? { + (header, p) } else { - // TODO: Signal back + // Do not process yet + return Ok(()); } + } + other => { + warn!("Unexpected boundary flag: {:?}!", other); + return Err(Error::NotSupported); + } + }; + match header.channel { + L2CAP_CID_ATT => { + #[cfg(feature = "gatt")] + self.att_inbound + .send((acl.handle(), Pdu::new(packet, header.length as usize))) + .await; #[cfg(not(feature = "gatt"))] return Err(Error::NotSupported); } L2CAP_CID_LE_U_SIGNAL => { - let mut r = ReadCursor::new(packet.payload); - let signal: L2capLeSignal = r.read()?; - match self.channels.control(conn, signal).await { - Ok(_) => {} - Err(_) => { - return Err(Error::Other); - } - } + panic!("le signalling channel was fragmented, impossible!"); } - - other if other >= L2CAP_CID_DYN_START => match self.channels.dispatch(packet).await { + other if other >= L2CAP_CID_DYN_START => match self.channels.dispatch(header, packet).await { Ok(_) => {} Err(e) => { warn!("Error dispatching l2cap packet to channel: {:?}", e); @@ -501,20 +521,21 @@ where T: ControllerCmdSync + ControllerCmdSync + ControllerCmdSync + + ControllerCmdSync + ControllerCmdSync + ControllerCmdAsync // + ControllerCmdSync // + ControllerCmdSync + ControllerCmdSync, { + const MAX_HCI_PACKET_LEN: usize = 259; self.control.send(ControlCommand::Init).await; - + let mut disconnects = 0; loop { // Task handling receiving data from the controller. let rx_fut = async { - let mut rx = [0u8; 259]; + let mut rx = [0u8; MAX_HCI_PACKET_LEN]; match self.controller.read(&mut rx).await { - // info!("Incoming event: {:?}", result); Ok(ControllerToHostPacket::Acl(acl)) => match self.handle_acl(acl).await { Ok(_) => {} Err(e) => { @@ -530,6 +551,7 @@ where handle: e.handle, status: e.status, role: e.role, + peer_addr_kind: e.peer_addr_kind, peer_address: e.peer_addr, interval: e.conn_interval.as_u16(), latency: e.peripheral_latency, @@ -557,12 +579,14 @@ where .await; } _ => { - warn!("Unknown event: {:?}", event); + error!("Unknown event: {:?}", event); } }, Event::DisconnectionComplete(e) => { - info!("Disconnected: {:?}", e); + disconnects += 1; + info!("Disconnected (total {}): {:?}", disconnects, e); let _ = self.connections.disconnect(e.handle); + let _ = self.channels.disconnected_connection(e.handle); } Event::NumberOfCompletedPackets(c) => { // trace!("Confirmed {} packets sent", c.completed_packets.len()); @@ -615,6 +639,15 @@ where } ControlCommand::Init => { Reset::new().exec(&self.controller).await?; + info!("Informing controller we have buffer size of {}", self.pool.mtu()); + HostBufferSize::new( + self.pool.mtu() as u16, + self.pool.mtu() as u8, + L2CAP_RXQ as u16, + L2CAP_RXQ as u16, + ) + .exec(&self.controller) + .await?; SetEventMask::new( EventMask::new() .enable_le_meta(true) @@ -629,6 +662,7 @@ where LeSetEventMask::new( LeEventMask::new() .enable_le_conn_complete(true) + .enable_le_conn_update_complete(true) .enable_le_adv_report(true) .enable_le_scan_timeout(true) .enable_le_ext_adv_report(true), @@ -677,13 +711,14 @@ impl<'d, T: Controller> HciController<'d, T> { let permit = self .permits .try_acquire(1) - .ok_or::>(Error::Busy.into())?; + .ok_or::>(Error::NoPermits.into())?; let acl = AclPacket::new( handle, AclPacketBoundary::FirstNonFlushable, AclBroadcastFlag::PointToPoint, pdu, ); + // info!("Sent ACL {:?}", acl); let fut = self.controller.write_acl_data(&acl); match embassy_futures::poll_once(fut) { Poll::Ready(result) => result.map_err(AdapterError::Controller), @@ -712,6 +747,7 @@ impl<'d, T: Controller> HciController<'d, T> { response: L2capLeSignal, ) -> Result<(), AdapterError> { // TODO: Refactor signal to avoid encode/decode + // info!("[{}] sending signal: {:?}", handle, response); let mut tx = [0; 32]; let mut w = WriteCursor::new(&mut tx); let (mut header, mut body) = w.split(4)?; diff --git a/host/src/channel_manager.rs b/host/src/channel_manager.rs index 82ec13e..2dd543f 100644 --- a/host/src/channel_manager.rs +++ b/host/src/channel_manager.rs @@ -13,8 +13,8 @@ use embassy_sync::{ use crate::{ adapter::HciController, - l2cap::L2capPacket, - packet_pool::{AllocId, DynamicPacketPool}, + l2cap::L2capHeader, + packet_pool::{AllocId, DynamicPacketPool, Packet}, pdu::Pdu, types::l2cap::{ L2capLeSignal, L2capLeSignalData, LeCreditConnReq, LeCreditConnRes, LeCreditConnResultCode, LeCreditFlowInd, @@ -25,6 +25,7 @@ use crate::{ const BASE_ID: u16 = 0x40; struct State { + next_req_id: u8, channels: [ChannelState; CHANNELS], accept_waker: WakerRegistration, create_waker: WakerRegistration, @@ -41,8 +42,9 @@ pub struct ChannelManager<'d, M: RawMutex, const CHANNELS: usize, const L2CAP_TX pub trait DynamicChannelManager<'d> { fn poll_request_to_send(&self, cid: u16, credits: usize, cx: Option<&mut Context<'_>>) -> Poll>; - fn confirm_received(&self, cid: u16, credits: usize) -> Result<(ConnHandle, L2capLeSignal), Error>; + fn flow_control(&self, cid: u16) -> Result, Error>; fn confirm_disconnected(&self, cid: u16) -> Result<(), Error>; + fn disconnect(&self, cid: u16) -> Result<(), Error>; } impl<'d, M: RawMutex, const CHANNELS: usize, const L2CAP_TXQ: usize, const L2CAP_RXQ: usize> @@ -56,6 +58,7 @@ impl<'d, M: RawMutex, const CHANNELS: usize, const L2CAP_TXQ: usize, const L2CAP Self { pool, state: Mutex::new(RefCell::new(State { + next_req_id: 0, channels: [Self::DISCONNECTED; CHANNELS], accept_waker: WakerRegistration::new(), create_waker: WakerRegistration::new(), @@ -65,11 +68,28 @@ impl<'d, M: RawMutex, const CHANNELS: usize, const L2CAP_TXQ: usize, const L2CAP } } - async fn disconnect(&self, cid: u16) -> Result<(), Error> { + fn next_request_id(&self) -> u8 { + self.state.lock(|state| { + let mut state = state.borrow_mut(); + let next = state.next_req_id; + state.next_req_id = state.next_req_id.wrapping_add(1); + next + }) + } + + fn disconnect(&self, cid: u16) -> Result<(), Error> { let idx = self.state.lock(|state| { let mut state = state.borrow_mut(); for (idx, storage) in state.channels.iter_mut().enumerate() { match storage { + ChannelState::Disconnecting(state) if cid == state.cid => { + *storage = ChannelState::Disconnected; + return Ok(idx); + } + ChannelState::PeerConnecting(state) if cid == state.cid => { + *storage = ChannelState::Disconnecting(DisconnectingState { conn: state.conn, cid }); + return Ok(idx); + } ChannelState::Connecting(state) if cid == state.cid => { *storage = ChannelState::Disconnecting(DisconnectingState { conn: state.conn, cid }); return Ok(idx); @@ -83,7 +103,7 @@ impl<'d, M: RawMutex, const CHANNELS: usize, const L2CAP_TXQ: usize, const L2CAP } Err(Error::NotFound) })?; - self.inbound[idx].send(None).await; + let _ = self.inbound[idx].try_send(None); Ok(()) } @@ -92,12 +112,20 @@ impl<'d, M: RawMutex, const CHANNELS: usize, const L2CAP_TXQ: usize, const L2CAP let mut state = state.borrow_mut(); for storage in state.channels.iter_mut() { match storage { - ChannelState::Connecting(state) if cid == state.cid => { + ChannelState::Disconnecting(state) if cid == state.cid => { *storage = ChannelState::Disconnected; break; } + ChannelState::PeerConnecting(state) if cid == state.cid => { + *storage = ChannelState::Disconnecting(DisconnectingState { conn: state.conn, cid }); + break; + } + ChannelState::Connecting(state) if cid == state.cid => { + *storage = ChannelState::Disconnecting(DisconnectingState { conn: state.conn, cid }); + break; + } ChannelState::Connected(state) if cid == state.cid => { - *storage = ChannelState::Disconnected; + *storage = ChannelState::Disconnecting(DisconnectingState { conn: state.conn, cid }); break; } _ => {} @@ -107,16 +135,62 @@ impl<'d, M: RawMutex, const CHANNELS: usize, const L2CAP_TXQ: usize, const L2CAP }) } - fn connect(&self, mut req: ConnectingState) -> Result<(usize, u16), Error> { + pub fn disconnected_connection(&self, conn: ConnHandle) -> Result<(), Error> { + self.state.lock(|state| { + let mut state = state.borrow_mut(); + for (idx, storage) in state.channels.iter_mut().enumerate() { + match storage { + ChannelState::PeerConnecting(state) if conn == state.conn => { + *storage = ChannelState::Disconnecting(DisconnectingState { conn, cid: state.cid }); + let _ = self.inbound[idx].try_send(None); + } + ChannelState::Connecting(state) if conn == state.conn => { + *storage = ChannelState::Disconnecting(DisconnectingState { conn, cid: state.cid }); + let _ = self.inbound[idx].try_send(None); + } + ChannelState::Connected(state) if conn == state.conn => { + *storage = ChannelState::Disconnecting(DisconnectingState { conn, cid: state.cid }); + let _ = self.inbound[idx].try_send(None); + } + _ => {} + } + } + state.accept_waker.wake(); + state.create_waker.wake(); + for w in state.credit_wakers.iter_mut() { + w.wake(); + } + }); + Ok(()) + } + + fn peer_connect PeerConnectingState>(&self, f: F) -> Result<(), Error> { + self.state.lock(|state| { + let mut state = state.borrow_mut(); + for (idx, storage) in state.channels.iter_mut().enumerate() { + if let ChannelState::Disconnected = storage { + let cid: u16 = BASE_ID + idx as u16; + let mut req = f(idx, cid); + req.cid = cid; + *storage = ChannelState::PeerConnecting(req); + state.accept_waker.wake(); + return Ok(()); + } + } + Err(Error::NoChannelAvailable) + }) + } + + fn connect ConnectingState>(&self, f: F) -> Result<(), Error> { self.state.lock(|state| { let mut state = state.borrow_mut(); for (idx, storage) in state.channels.iter_mut().enumerate() { if let ChannelState::Disconnected = storage { let cid: u16 = BASE_ID + idx as u16; + let mut req = f(idx, cid); req.cid = cid; *storage = ChannelState::Connecting(req); - state.accept_waker.wake(); - return Ok((idx, cid)); + return Ok(()); } } Err(Error::NoChannelAvailable) @@ -134,6 +208,7 @@ impl<'d, M: RawMutex, const CHANNELS: usize, const L2CAP_TXQ: usize, const L2CAP match storage { ChannelState::Connecting(req) if request_id == req.request_id => { let res = f(idx, req); + // info!("Connection created, properties: {:?}", res); *storage = ChannelState::Connected(res); state.create_waker.wake(); return Ok(()); @@ -162,10 +237,10 @@ impl<'d, M: RawMutex, const CHANNELS: usize, const L2CAP_TXQ: usize, const L2CAP }) } - fn poll_accept ConnectedState>( + fn poll_accept ConnectedState>( &self, conn: ConnHandle, - psm: u16, + psm: &[u16], cx: &mut Context<'_>, f: F, ) -> Poll<(usize, ConnectedState)> { @@ -173,7 +248,7 @@ impl<'d, M: RawMutex, const CHANNELS: usize, const L2CAP_TXQ: usize, const L2CAP let mut state = state.borrow_mut(); for (idx, storage) in state.channels.iter_mut().enumerate() { match storage { - ChannelState::Connecting(req) if req.conn == conn && req.psm == psm => { + ChannelState::PeerConnecting(req) if req.conn == conn && psm.contains(&req.psm) => { let state = f(idx, req); let cid = state.cid; *storage = ChannelState::Connected(state.clone()); @@ -190,8 +265,9 @@ impl<'d, M: RawMutex, const CHANNELS: usize, const L2CAP_TXQ: usize, const L2CAP pub(crate) async fn accept( &self, conn: ConnHandle, - psm: u16, + psm: &[u16], mut mtu: u16, + credit_flow: CreditFlowPolicy, controller: &HciController<'_, T>, ) -> Result<(ConnectedState, DynamicReceiver<'_, Option>>), AdapterError> { let mut req_id = 0; @@ -201,13 +277,13 @@ impl<'d, M: RawMutex, const CHANNELS: usize, const L2CAP_TXQ: usize, const L2CAP let mps = req.mps.min(self.pool.mtu() as u16 - 4); mtu = req.mtu.min(mtu); let credits = self.pool.min_available(AllocId::dynamic(idx)) as u16; - info!("Accept, initial credits: {}", credits); + //info!("Accept, initial credits: {}", credits); ConnectedState { conn: req.conn, cid: req.cid, psm: req.psm, - credits: self.pool.min_available(AllocId::dynamic(idx)) as u16, - peer_credits: req.initial_credits, + flow_control: CreditFlowControl::new(credit_flow, credits), + peer_credits: req.offered_credits, peer_cid: req.peer_cid, pool_id: AllocId::dynamic(idx), mps, @@ -223,7 +299,7 @@ impl<'d, M: RawMutex, const CHANNELS: usize, const L2CAP_TXQ: usize, const L2CAP mps: state.mps, dcid: state.cid, mtu, - credits: state.credits, + credits: 0, result: LeCreditConnResultCode::Success, }), ); @@ -231,6 +307,22 @@ impl<'d, M: RawMutex, const CHANNELS: usize, const L2CAP_TXQ: usize, const L2CAP // info!("Responding to create: {:?}", response); controller.signal(conn, response).await?; + + // Send initial credits + let next_req_id = self.next_request_id(); + controller + .signal( + conn, + L2capLeSignal::new( + next_req_id, + L2capLeSignalData::LeCreditFlowInd(LeCreditFlowInd { + cid: state.cid, + credits: state.flow_control.available(), + }), + ), + ) + .await?; + Ok((state, self.inbound[idx].receiver().into())) } @@ -239,39 +331,52 @@ impl<'d, M: RawMutex, const CHANNELS: usize, const L2CAP_TXQ: usize, const L2CAP conn: ConnHandle, psm: u16, mtu: u16, + credit_flow: CreditFlowPolicy, controller: &HciController<'_, T>, ) -> Result<(ConnectedState, DynamicReceiver<'_, Option>>), AdapterError> { - let state = ConnectingState { - conn, - cid: 0, - request_id: 0, - psm, - peer_cid: 0, - initial_credits: 0, - mps: self.pool.mtu() as u16, - mtu, - }; - let (idx, cid) = self.connect(state)?; + let req_id = self.next_request_id(); + let mut credits = 0; + let mut cid: u16 = 0; + self.connect(|i, c| { + cid = c; + credits = self.pool.min_available(AllocId::dynamic(i)) as u16; + ConnectingState { + conn, + cid, + request_id: req_id, + psm, + initial_credits: credits, + flow_control_policy: credit_flow, + mps: self.pool.mtu() as u16 - 4, + mtu, + } + })?; + //info!("Created connect state with idx cid {}", cid); let command = L2capLeSignal::new( - 0, + req_id, L2capLeSignalData::LeCreditConnReq(LeCreditConnReq { psm, mps: self.pool.mtu() as u16 - 4, scid: cid, mtu, - credits: self.pool.min_available(AllocId::dynamic(idx)) as u16, + credits: 0, }), ); + //info!("Signal packet to remote: {:?}", command); controller.signal(conn, command).await?; + // info!("Sent signal packet to remote, awaiting response"); let (idx, state) = poll_fn(|cx| { self.state.lock(|state| { let mut state = state.borrow_mut(); for (idx, storage) in state.channels.iter_mut().enumerate() { match storage { + ChannelState::Disconnecting(req) if req.conn == conn && req.cid == cid => { + return Poll::Ready(Err(Error::Disconnected)); + } ChannelState::Connected(req) if req.conn == conn && req.cid == cid => { - return Poll::Ready((idx, req.clone())); + return Poll::Ready(Ok((idx, req.clone()))); } _ => {} } @@ -280,20 +385,38 @@ impl<'d, M: RawMutex, const CHANNELS: usize, const L2CAP_TXQ: usize, const L2CAP Poll::Pending }) }) - .await; + .await?; + + // info!("Peer setup cid {} Sending initial credits", state.peer_cid); + + // Send initial credits + let next_req_id = self.next_request_id(); + controller + .signal( + conn, + L2capLeSignal::new( + next_req_id, + L2capLeSignalData::LeCreditFlowInd(LeCreditFlowInd { + cid: state.cid, + credits, + }), + ), + ) + .await?; + + // info!("Done!"); - // let tx = self.inbound[idx].sender().into(); let rx = self.inbound[idx].receiver().into(); Ok((state, rx)) } - pub async fn dispatch(&self, packet: L2capPacket<'_>) -> Result<(), Error> { - if packet.channel < BASE_ID { + pub async fn dispatch(&self, header: L2capHeader, packet: Packet<'d>) -> Result<(), Error> { + if header.channel < BASE_ID { return Err(Error::InvalidChannelId); } - let chan = (packet.channel - BASE_ID) as usize; + let chan = (header.channel - BASE_ID) as usize; if chan > self.inbound.len() { return Err(Error::InvalidChannelId); } @@ -302,12 +425,12 @@ impl<'d, M: RawMutex, const CHANNELS: usize, const L2CAP_TXQ: usize, const L2CAP let mut state = state.borrow_mut(); for (idx, storage) in state.channels.iter_mut().enumerate() { match storage { - ChannelState::Connected(state) if packet.channel == state.cid => { - if state.credits > 0 { - state.credits -= 1; - } else { + ChannelState::Connected(state) if header.channel == state.cid => { + if state.flow_control.available() == 0 { + // info!("No credits available on channel {}", state.cid); return Err(Error::OutOfMemory); } + state.flow_control.received(1); } _ => {} } @@ -315,35 +438,30 @@ impl<'d, M: RawMutex, const CHANNELS: usize, const L2CAP_TXQ: usize, const L2CAP Ok(()) })?; - let chan_alloc = AllocId::dynamic(chan); - if let Some(mut p) = self.pool.alloc(chan_alloc) { - let len = packet.payload.len(); - p.as_mut()[..len].copy_from_slice(packet.payload); - self.inbound[chan].send(Some(Pdu::new(p, len))).await; - Ok(()) - } else { - warn!("No memory for channel {} (id {})", packet.channel, chan); - Err(Error::OutOfMemory) - } + self.inbound[chan] + .send(Some(Pdu::new(packet, header.length as usize))) + .await; + Ok(()) } pub async fn control(&self, conn: ConnHandle, signal: L2capLeSignal) -> Result<(), Error> { // info!("Inbound signal: {:?}", signal); match signal.data { L2capLeSignalData::LeCreditConnReq(req) => { - self.connect(ConnectingState { + self.peer_connect(|i, c| PeerConnectingState { conn, - cid: 0, + cid: c, psm: req.psm, request_id: signal.id, peer_cid: req.scid, - initial_credits: req.credits, + offered_credits: req.credits, mps: req.mps, mtu: req.mtu, })?; Ok(()) } L2capLeSignalData::LeCreditConnRes(res) => { + // info!("Got response to create request: {:?}", res); match res.result { LeCreditConnResultCode::Success => { // Must be a response of a previous request which should already by allocated a channel for @@ -351,7 +469,7 @@ impl<'d, M: RawMutex, const CHANNELS: usize, const L2CAP_TXQ: usize, const L2CAP conn: req.conn, cid: req.cid, psm: req.psm, - credits: 0, + flow_control: CreditFlowControl::new(req.flow_control_policy, req.initial_credits), peer_credits: res.credits, peer_cid: res.dcid, pool_id: AllocId::dynamic(idx), @@ -376,7 +494,7 @@ impl<'d, M: RawMutex, const CHANNELS: usize, const L2CAP_TXQ: usize, const L2CAP } L2capLeSignalData::DisconnectionReq(req) => { info!("Disconnect request: {:?}!", req); - self.disconnect(req.dcid).await?; + self.disconnect(req.dcid)?; Ok(()) } L2capLeSignalData::DisconnectionRes(res) => { @@ -391,33 +509,36 @@ impl<'d, M: RawMutex, const CHANNELS: usize, const L2CAP_TXQ: usize, const L2CAP impl<'d, M: RawMutex, const CHANNELS: usize, const L2CAP_TXQ: usize, const L2CAP_RXQ: usize> DynamicChannelManager<'d> for ChannelManager<'d, M, CHANNELS, L2CAP_TXQ, L2CAP_RXQ> { - fn confirm_received(&self, cid: u16, credits: usize) -> Result<(ConnHandle, L2capLeSignal), Error> { - let (conn, signal) = self.state.lock(|state| { + fn flow_control(&self, cid: u16) -> Result, Error> { + let next_req_id = self.next_request_id(); + let ret = self.state.lock(|state| { let mut state = state.borrow_mut(); for (idx, storage) in state.channels.iter_mut().enumerate() { match storage { ChannelState::Connected(state) if cid == state.cid => { - // Don't set credits higher than what we can promise - let increment = self.pool.min_available(AllocId::dynamic(idx)).min(credits); - state.credits += increment as u16; - // info!("Credits: {}. Increment {}", state.credits, increment); - return Ok(( - state.conn, - L2capLeSignal::new( - (cid % 255) as u8, - L2capLeSignalData::LeCreditFlowInd(LeCreditFlowInd { - cid: state.peer_cid, - credits: increment as u16, - }), - ), - )); + return Ok(state.flow_control.process().map(|credits| { + ( + state.conn, + L2capLeSignal::new( + next_req_id, + L2capLeSignalData::LeCreditFlowInd(LeCreditFlowInd { + cid: state.cid, + credits, + }), + ), + ) + })); } _ => {} } } Err(Error::NotFound) })?; - Ok((conn, signal)) + Ok(ret) + } + + fn disconnect(&self, cid: u16) -> Result<(), Error> { + ChannelManager::disconnect(self, cid) } fn confirm_disconnected(&self, cid: u16) -> Result<(), Error> { @@ -463,31 +584,117 @@ impl<'d, M: RawMutex, const CHANNELS: usize, const L2CAP_TXQ: usize, const L2CAP pub enum ChannelState { Disconnected, Connecting(ConnectingState), + PeerConnecting(PeerConnectingState), Connected(ConnectedState), Disconnecting(DisconnectingState), } -#[derive(Clone)] +/// Control how credits are issued by the receiving end. +#[derive(Clone, Copy, Debug)] +#[cfg_attr(feature = "defmt", derive(defmt::Format))] +pub enum CreditFlowPolicy { + /// Issue credits for every N messages received + Every(u16), + /// Issue credits when below a threshold + MinThreshold(u16), +} + +impl Default for CreditFlowPolicy { + fn default() -> Self { + Self::Every(1) + } +} + +#[derive(Clone, Debug)] +#[cfg_attr(feature = "defmt", derive(defmt::Format))] +pub(crate) struct CreditFlowControl { + policy: CreditFlowPolicy, + credits: u16, + received: u16, +} + +impl CreditFlowControl { + fn new(policy: CreditFlowPolicy, initial_credits: u16) -> Self { + Self { + policy, + credits: initial_credits, + received: 0, + } + } + + fn available(&self) -> u16 { + self.credits + } + + fn received(&mut self, n: u16) { + self.credits = self.credits.saturating_sub(n); + self.received = self.received.saturating_add(n); + } + + fn process(&mut self) -> Option { + let flow = match self.policy { + CreditFlowPolicy::Every(count) => { + if self.received >= count { + let amount = self.received; + self.received = 0; + self.credits += amount; + Some(amount) + } else { + None + } + } + CreditFlowPolicy::MinThreshold(threshold) => { + if self.credits <= threshold { + let amount = self.received; + self.received = 0; + self.credits += amount; + Some(amount) + } else { + None + } + } + }; + flow + } +} + +#[derive(Clone, Debug)] +#[cfg_attr(feature = "defmt", derive(defmt::Format))] pub struct ConnectingState { pub(crate) conn: ConnHandle, pub(crate) cid: u16, pub(crate) request_id: u8, + pub(crate) flow_control_policy: CreditFlowPolicy, pub(crate) psm: u16, - pub(crate) peer_cid: u16, pub(crate) initial_credits: u16, pub(crate) mps: u16, pub(crate) mtu: u16, } -#[derive(Clone)] +#[derive(Clone, Debug)] +#[cfg_attr(feature = "defmt", derive(defmt::Format))] +pub struct PeerConnectingState { + pub(crate) conn: ConnHandle, + pub(crate) cid: u16, + pub(crate) request_id: u8, + + pub(crate) psm: u16, + pub(crate) peer_cid: u16, + pub(crate) offered_credits: u16, + pub(crate) mps: u16, + pub(crate) mtu: u16, +} + +#[derive(Clone, Debug)] +#[cfg_attr(feature = "defmt", derive(defmt::Format))] pub struct ConnectedState { pub(crate) conn: ConnHandle, pub(crate) cid: u16, pub(crate) psm: u16, pub(crate) mps: u16, pub(crate) mtu: u16, - pub(crate) credits: u16, + pub(crate) flow_control: CreditFlowControl, pub(crate) peer_cid: u16, pub(crate) peer_credits: u16, @@ -495,6 +702,8 @@ pub struct ConnectedState { pub(crate) pool_id: AllocId, } +#[derive(Clone, Debug)] +#[cfg_attr(feature = "defmt", derive(defmt::Format))] pub struct DisconnectingState { pub(crate) conn: ConnHandle, pub(crate) cid: u16, diff --git a/host/src/connection.rs b/host/src/connection.rs index 0222251..aab0c86 100644 --- a/host/src/connection.rs +++ b/host/src/connection.rs @@ -1,8 +1,8 @@ use bt_hci::{ cmd::{ le::{ - LeAddDeviceToFilterAcceptList, LeClearFilterAcceptList, LeCreateConn, LeExtCreateConn, LeSetExtScanEnable, - LeSetExtScanParams, LeSetScanEnable, LeSetScanParams, + LeAddDeviceToFilterAcceptList, LeClearFilterAcceptList, LeConnUpdate, LeCreateConn, LeExtCreateConn, + LeSetExtScanEnable, LeSetExtScanParams, LeSetScanEnable, LeSetScanParams, }, link_control::DisconnectParams, status::ReadRssi, @@ -34,6 +34,7 @@ pub struct ConnectParams { pub min_connection_interval: Duration, pub max_connection_interval: Duration, pub max_latency: u16, + pub event_length: Duration, pub supervision_timeout: Duration, } @@ -43,6 +44,7 @@ impl Default for ConnectParams { min_connection_interval: Duration::from_millis(80), max_connection_interval: Duration::from_millis(80), max_latency: 0, + event_length: Duration::from_secs(0), supervision_timeout: Duration::from_secs(8), } } @@ -63,7 +65,7 @@ impl<'d> Connection<'d> { >( adapter: &'d Adapter<'_, M, T, CONNS, CHANNELS, L2CAP_TXQ, L2CAP_RXQ>, ) -> Self { - let info = adapter.connections.accept(None).await; + let info = adapter.connections.accept(&[]).await; Connection { info, control: adapter.control.sender().into(), @@ -101,7 +103,37 @@ impl<'d> Connection<'d> { where T: ControllerCmdSync, { - adapter.read_rssi(self.info.handle).await + let ret = adapter.command(ReadRssi::new(self.info.handle)).await?; + Ok(ret.rssi) + } + + pub async fn set_connection_params< + M: RawMutex, + T, + const CONNS: usize, + const CHANNELS: usize, + const L2CAP_TXQ: usize, + const L2CAP_RXQ: usize, + >( + &self, + adapter: &'d Adapter<'_, M, T, CONNS, CHANNELS, L2CAP_TXQ, L2CAP_RXQ>, + params: ConnectParams, + ) -> Result<(), AdapterError> + where + T: ControllerCmdAsync, + { + let ret = adapter + .async_command(LeConnUpdate::new( + self.info.handle, + params.min_connection_interval.into(), + params.max_connection_interval.into(), + params.max_latency, + params.supervision_timeout.into(), + bt_hci::param::Duration::from_secs(0), + bt_hci::param::Duration::from_secs(0), + )) + .await?; + Ok(()) } pub async fn connect< diff --git a/host/src/connection_manager.rs b/host/src/connection_manager.rs index aaf8193..05e7128 100644 --- a/host/src/connection_manager.rs +++ b/host/src/connection_manager.rs @@ -4,7 +4,7 @@ use core::{ task::{Context, Poll}, }; -use bt_hci::param::{BdAddr, ConnHandle, LeConnRole, Status}; +use bt_hci::param::{AddrKind, BdAddr, ConnHandle, LeConnRole, Status}; use embassy_sync::{ blocking_mutex::{raw::RawMutex, Mutex}, waitqueue::WakerRegistration, @@ -64,16 +64,18 @@ impl ConnectionManager { }) } - pub fn poll_accept(&self, peer: Option, cx: &mut Context<'_>) -> Poll { + pub fn poll_accept(&self, peers: &[(AddrKind, &BdAddr)], cx: &mut Context<'_>) -> Poll { self.state.lock(|state| { let mut state = state.borrow_mut(); for storage in state.connections.iter_mut() { if let ConnectionState::Connecting(handle, info) = storage { - if let Some(peer) = peer { - if info.peer_address == peer { - let i = *info; - *storage = ConnectionState::Connected(*handle, *info); - return Poll::Ready(i); + if !peers.is_empty() { + for peer in peers.iter() { + if info.peer_addr_kind == peer.0 && &info.peer_address == peer.1 { + let i = *info; + *storage = ConnectionState::Connected(*handle, *info); + return Poll::Ready(i); + } } } else { let i = *info; @@ -87,8 +89,8 @@ impl ConnectionManager { }) } - pub async fn accept(&self, peer: Option) -> ConnectionInfo { - poll_fn(move |cx| self.poll_accept(peer, cx)).await + pub async fn accept(&self, peers: &[(AddrKind, &BdAddr)]) -> ConnectionInfo { + poll_fn(move |cx| self.poll_accept(peers, cx)).await } pub fn info(&self, handle: ConnHandle) -> Result { @@ -149,11 +151,13 @@ impl DynamicConnectionManager for ConnectionMan } } -#[derive(Clone, Copy)] +#[derive(Clone, Copy, Debug)] +#[cfg_attr(feature = "defmt", derive(defmt::Format))] pub struct ConnectionInfo { pub handle: ConnHandle, pub status: Status, pub role: LeConnRole, + pub peer_addr_kind: AddrKind, pub peer_address: BdAddr, pub interval: u16, pub latency: u16, diff --git a/host/src/l2cap.rs b/host/src/l2cap.rs index 33b51f4..7170028 100644 --- a/host/src/l2cap.rs +++ b/host/src/l2cap.rs @@ -1,3 +1,4 @@ +use core::cell::RefCell; use core::future::poll_fn; use crate::adapter::{Adapter, ControlCommand, HciController}; @@ -5,7 +6,7 @@ use crate::channel_manager::DynamicChannelManager; use crate::codec; use crate::connection::Connection; use crate::cursor::{ReadCursor, WriteCursor}; -use crate::packet_pool::{AllocId, DynamicPacketPool}; +use crate::packet_pool::{AllocId, DynamicPacketPool, Packet}; use crate::pdu::Pdu; use crate::{AdapterError, Error}; use bt_hci::cmd::link_control::DisconnectParams; @@ -18,36 +19,124 @@ use embassy_sync::blocking_mutex::raw::RawMutex; use embassy_sync::channel::DynamicReceiver; use embassy_sync::channel::DynamicSender; +pub use crate::channel_manager::CreditFlowPolicy; + pub(crate) const L2CAP_CID_ATT: u16 = 0x0004; pub(crate) const L2CAP_CID_LE_U_SIGNAL: u16 = 0x0005; pub(crate) const L2CAP_CID_DYN_START: u16 = 0x0040; +pub struct AssembledPacket<'d> { + packet: Packet<'d>, + written: usize, +} + +impl<'d> AssembledPacket<'d> { + pub fn new(packet: Packet<'d>, initial: usize) -> Self { + Self { + packet, + written: initial, + } + } + + pub fn write(&mut self, data: &[u8]) -> Result<(), Error> { + if self.written + data.len() > self.packet.len() { + return Err(Error::InsufficientSpace); + } + self.packet.as_mut()[self.written..self.written + data.len()].copy_from_slice(data); + self.written += data.len(); + Ok(()) + } + + pub fn len(&self) -> usize { + self.written + } + + pub fn finalize(self, header: L2capHeader) -> Result<(L2capHeader, Packet<'d>), Error> { + if header.length as usize != self.written { + return Err(Error::InvalidValue); + } + Ok((header, self.packet)) + } +} + +// Handles reassembling of packets +pub struct PacketReassembly<'d, const CONNS: usize> { + handles: RefCell<[Option<(ConnHandle, L2capHeader, AssembledPacket<'d>)>; CONNS]>, +} + +impl<'d, const CONNS: usize> PacketReassembly<'d, CONNS> { + const EMPTY: Option<(ConnHandle, L2capHeader, AssembledPacket<'d>)> = None; + pub fn new() -> Self { + Self { + handles: RefCell::new([Self::EMPTY; CONNS]), + } + } + + /// Initializes a reassembly for a given connection + /// + /// Returns InvalidState if there is already an ongoing reassembly for this connection + /// Returns InsufficientSpace if there is no space for this reassembly + pub fn init(&self, handle: ConnHandle, header: L2capHeader, p: Packet<'d>, initial: usize) -> Result<(), Error> { + let mut state = self.handles.borrow_mut(); + + // Sanity check + for entry in state.iter() { + if let Some(entry) = entry { + if entry.0 == handle { + return Err(Error::InvalidState); + } + } + } + + // Sanity check + for entry in state.iter_mut() { + if entry.is_none() { + entry.replace((handle, header, AssembledPacket::new(p, initial))); + return Ok(()); + } + } + Err(Error::InsufficientSpace) + } + + /// Updates any in progress packet assembly for the connection + /// + /// If the reassembly is complete, the l2cap header + packet is returned. + pub fn update(&self, handle: ConnHandle, data: &[u8]) -> Result)>, Error> { + let mut state = self.handles.borrow_mut(); + + for entry in state.iter_mut() { + match entry { + Some((conn, header, packet)) if *conn == handle => { + let (conn, header, mut packet) = entry.take().unwrap(); + packet.write(data)?; + if packet.len() == header.length as usize { + return Ok(Some(packet.finalize(header)?)); + } else { + entry.replace((conn, header, packet)); + return Ok(None); + } + } + _ => {} + } + } + Err(Error::NotFound) + } +} + #[cfg_attr(feature = "defmt", derive(defmt::Format))] #[derive(Debug)] -pub struct L2capPacket<'d> { +pub struct L2capHeader { + pub length: u16, pub channel: u16, - pub payload: &'d [u8], } -impl<'d> L2capPacket<'d> { - pub fn decode(packet: AclPacket<'_>) -> Result<(bt_hci::param::ConnHandle, L2capPacket), codec::Error> { - let handle = packet.handle(); +impl L2capHeader { + pub fn decode<'m>(packet: &AclPacket<'m>) -> Result<(L2capHeader, &'m [u8]), codec::Error> { let data = packet.data(); - let mut r = ReadCursor::new(data); let length: u16 = r.read()?; let channel: u16 = r.read()?; - let payload = r.consume(length as usize)?; - - Ok((handle, L2capPacket { channel, payload })) - } - - pub fn encode(&self, dest: &mut [u8]) -> Result { - let mut w = WriteCursor::new(dest); - w.write(self.payload.len() as u16)?; - w.write(self.channel)?; - w.append(self.payload)?; - Ok(w.len()) + Ok((Self { length, channel }, &packet.data()[4..])) } } @@ -107,6 +196,7 @@ impl<'a, 'd, T: Controller, const L2CAP_MTU: usize> L2capChannel<'a, 'd, T, L2CA assert!(p_buf.len() >= self.mps + 4); // The number of packets we'll need to send for this payload let n_packets = 1 + (buf.len().saturating_sub(self.mps - 2)).div_ceil(self.mps); + // info!("Sending data of len {} into {} packets", buf.len(), n_packets); poll_fn(|cx| self.manager.poll_request_to_send(self.cid, n_packets, Some(cx))).await?; @@ -133,6 +223,7 @@ impl<'a, 'd, T: Controller, const L2CAP_MTU: usize> L2capChannel<'a, 'd, T, L2CA // The number of packets we'll need to send for this payload let n_packets = 1 + (buf.len().saturating_sub(self.mps - 2)).div_ceil(self.mps); + //info!("Sending data of len {} into {} packets", buf.len(), n_packets); match self.manager.poll_request_to_send(self.cid, n_packets, None) { Poll::Ready(res) => res?, @@ -185,7 +276,7 @@ impl<'a, 'd, T: Controller, const L2CAP_MTU: usize> L2capChannel<'a, 'd, T, L2CA let mut remaining = remaining as usize - data.len(); drop(packet); - self.issue_credits(1).await?; + self.flow_control().await?; //info!( // "Total size of PDU is {}, read buffer size is {} remaining; {}", // len, @@ -203,16 +294,17 @@ impl<'a, 'd, T: Controller, const L2CAP_MTU: usize> L2capChannel<'a, 'd, T, L2CA } remaining -= packet.len; drop(packet); - self.issue_credits(1).await?; + self.flow_control().await?; } // info!("Total reserved {} bytes", pos); Ok(pos) } - async fn issue_credits(&mut self, credits: usize) -> Result<(), AdapterError> { - let (handle, response) = self.manager.confirm_received(self.cid, credits)?; - self.tx.signal(handle, response).await?; + async fn flow_control(&mut self) -> Result<(), AdapterError> { + if let Some((handle, response)) = self.manager.flow_control(self.cid)? { + self.tx.signal(handle, response).await?; + } Ok(()) } @@ -225,13 +317,16 @@ impl<'a, 'd, T: Controller, const L2CAP_MTU: usize> L2capChannel<'a, 'd, T, L2CA >( adapter: &'a Adapter<'d, M, T, CONNS, CHANNELS, L2CAP_TXQ, L2CAP_RXQ>, connection: &Connection<'a>, - psm: u16, + psm: &[u16], mtu: u16, + flow_policy: CreditFlowPolicy, ) -> Result, AdapterError> { let connections = &adapter.connections; let channels = &adapter.channels; - let (state, rx) = channels.accept(connection.handle(), psm, mtu, &adapter.hci()).await?; + let (state, rx) = channels + .accept(connection.handle(), psm, mtu, flow_policy, &adapter.hci()) + .await?; Ok(Self { conn: connection.handle(), @@ -248,7 +343,7 @@ impl<'a, 'd, T: Controller, const L2CAP_MTU: usize> L2capChannel<'a, 'd, T, L2CA } pub fn disconnect(&self, close_connection: bool) -> Result<(), AdapterError> { - self.manager.confirm_disconnected(self.cid)?; + self.manager.disconnect(self.cid)?; if close_connection { self.control .try_send(ControlCommand::Disconnect(DisconnectParams { @@ -271,13 +366,12 @@ impl<'a, 'd, T: Controller, const L2CAP_MTU: usize> L2capChannel<'a, 'd, T, L2CA connection: &Connection<'a>, psm: u16, mtu: u16, + flow_policy: CreditFlowPolicy, ) -> Result> where { - // TODO: Use unique signal ID to ensure no collision of signal messages - // let (state, rx) = adapter .channels - .create(connection.handle(), psm, mtu, &adapter.hci()) + .create(connection.handle(), psm, mtu, flow_policy, &adapter.hci()) .await?; Ok(Self { diff --git a/host/src/lib.rs b/host/src/lib.rs index be8d1bc..4444065 100644 --- a/host/src/lib.rs +++ b/host/src/lib.rs @@ -68,11 +68,13 @@ pub enum Error { InvalidChannelId, NoChannelAvailable, NotFound, + InvalidState, OutOfMemory, NotSupported, ChannelClosed, Timeout, Busy, + NoPermits, Disconnected, Other, } diff --git a/host/src/packet_pool.rs b/host/src/packet_pool.rs index ca7ab7c..4a5d5ae 100644 --- a/host/src/packet_pool.rs +++ b/host/src/packet_pool.rs @@ -2,11 +2,14 @@ use core::cell::{RefCell, UnsafeCell}; use embassy_sync::blocking_mutex::{raw::RawMutex, Mutex}; +use crate::l2cap::{L2CAP_CID_ATT, L2CAP_CID_DYN_START}; + // Generic client ID used by ATT PDU #[cfg(feature = "gatt")] pub(crate) const ATT_ID: AllocId = AllocId(0); -#[derive(Clone, Copy)] +#[derive(Clone, Copy, Debug)] +#[cfg_attr(feature = "defmt", derive(defmt::Format))] pub struct AllocId(usize); impl AllocId { @@ -17,6 +20,19 @@ impl AllocId { #[cfg(not(feature = "gatt"))] return AllocId(idx); } + + pub fn from_channel(cid: u16) -> AllocId { + match cid { + L2CAP_CID_ATT => { + #[cfg(feature = "gatt")] + return ATT_ID; + #[cfg(not(feature = "gatt"))] + panic!("gatt feature must be enabled to support gatt"); + } + cid if cid >= L2CAP_CID_DYN_START => Self::dynamic((cid - L2CAP_CID_DYN_START) as usize), + _ => unimplemented!(), + } + } } struct PacketBuf { @@ -211,6 +227,12 @@ pub struct Packet<'d> { pool: &'d dyn DynamicPacketPool<'d>, } +impl<'d> Packet<'d> { + pub fn len(&self) -> usize { + self.as_ref().len() + } +} + impl<'d> Drop for Packet<'d> { fn drop(&mut self) { if let Some(r) = self.p_ref.take() { diff --git a/host/src/scan.rs b/host/src/scan.rs index 048e990..f21202a 100644 --- a/host/src/scan.rs +++ b/host/src/scan.rs @@ -1,4 +1,3 @@ -use crate::advertise::TxPower; use bt_hci::{ param::{AddrKind, BdAddr, LeAdvReport, LeExtAdvReport}, FromHciBytes, FromHciBytesError, @@ -15,7 +14,6 @@ pub struct ScanConfig<'d> { pub interval: Duration, pub window: Duration, pub timeout: Duration, - pub tx_power: TxPower, } impl<'d> Default for ScanConfig<'d> { @@ -28,7 +26,6 @@ impl<'d> Default for ScanConfig<'d> { interval: Duration::from_secs(1), window: Duration::from_secs(1), timeout: Duration::from_secs(0), - tx_power: TxPower::ZerodBm, } } } diff --git a/host/tests/l2cap.rs b/host/tests/l2cap.rs index e2d6376..f50295e 100644 --- a/host/tests/l2cap.rs +++ b/host/tests/l2cap.rs @@ -100,7 +100,7 @@ async fn l2cap_connection_oriented_channels() { println!("[peripheral] connected"); let mut ch1: L2capChannel<'_, '_, _, 27> = - L2capChannel::accept(&adapter, &conn, 0x2349, PAYLOAD_LEN as u16).await?; + L2capChannel::accept(&adapter, &conn, &[0x2349], PAYLOAD_LEN as u16, Default::default()).await?; println!("[peripheral] channel created"); @@ -170,7 +170,7 @@ async fn l2cap_connection_oriented_channels() { println!("[central] connected"); const PAYLOAD_LEN: usize = 27; let mut ch1: L2capChannel<'_, '_, _, 27> = - L2capChannel::create(&adapter, &conn, 0x2349, PAYLOAD_LEN as u16).await?; + L2capChannel::create(&adapter, &conn, 0x2349, PAYLOAD_LEN as u16, Default::default()).await?; println!("[central] channel created"); for i in 0..10 { let tx = [i; PAYLOAD_LEN]; From 8fd359a5340b23b8138a5f5e85d448c8eb1c0583 Mon Sep 17 00:00:00 2001 From: Ulf Lilleengen Date: Wed, 10 Apr 2024 21:26:28 +0200 Subject: [PATCH 3/3] fix clippy --- host/src/adapter.rs | 4 ++-- host/src/channel_manager.rs | 5 ++--- host/src/connection.rs | 2 +- host/src/l2cap.rs | 21 ++++++++++++--------- host/tests/l2cap.rs | 4 ++-- 5 files changed, 19 insertions(+), 17 deletions(-) diff --git a/host/src/adapter.rs b/host/src/adapter.rs index 50f3709..7e10374 100644 --- a/host/src/adapter.rs +++ b/host/src/adapter.rs @@ -151,8 +151,8 @@ where C: AsyncCmd, T: ControllerCmdAsync, { - let ret = cmd.exec(&self.controller).await?; - Ok(ret) + cmd.exec(&self.controller).await?; + Ok(()) } pub(crate) async fn connect(&self, config: &ConnectConfig<'_>) -> Result, AdapterError> diff --git a/host/src/channel_manager.rs b/host/src/channel_manager.rs index 2dd543f..3064e50 100644 --- a/host/src/channel_manager.rs +++ b/host/src/channel_manager.rs @@ -632,7 +632,7 @@ impl CreditFlowControl { } fn process(&mut self) -> Option { - let flow = match self.policy { + match self.policy { CreditFlowPolicy::Every(count) => { if self.received >= count { let amount = self.received; @@ -653,8 +653,7 @@ impl CreditFlowControl { None } } - }; - flow + } } } diff --git a/host/src/connection.rs b/host/src/connection.rs index aab0c86..3d208e6 100644 --- a/host/src/connection.rs +++ b/host/src/connection.rs @@ -122,7 +122,7 @@ impl<'d> Connection<'d> { where T: ControllerCmdAsync, { - let ret = adapter + adapter .async_command(LeConnUpdate::new( self.info.handle, params.min_connection_interval.into(), diff --git a/host/src/l2cap.rs b/host/src/l2cap.rs index 7170028..3f7348a 100644 --- a/host/src/l2cap.rs +++ b/host/src/l2cap.rs @@ -31,14 +31,14 @@ pub struct AssembledPacket<'d> { } impl<'d> AssembledPacket<'d> { - pub fn new(packet: Packet<'d>, initial: usize) -> Self { + pub(crate) fn new(packet: Packet<'d>, initial: usize) -> Self { Self { packet, written: initial, } } - pub fn write(&mut self, data: &[u8]) -> Result<(), Error> { + pub(crate) fn write(&mut self, data: &[u8]) -> Result<(), Error> { if self.written + data.len() > self.packet.len() { return Err(Error::InsufficientSpace); } @@ -47,11 +47,11 @@ impl<'d> AssembledPacket<'d> { Ok(()) } - pub fn len(&self) -> usize { + pub(crate) fn len(&self) -> usize { self.written } - pub fn finalize(self, header: L2capHeader) -> Result<(L2capHeader, Packet<'d>), Error> { + pub(crate) fn finalize(self, header: L2capHeader) -> Result<(L2capHeader, Packet<'d>), Error> { if header.length as usize != self.written { return Err(Error::InvalidValue); } @@ -63,6 +63,11 @@ impl<'d> AssembledPacket<'d> { pub struct PacketReassembly<'d, const CONNS: usize> { handles: RefCell<[Option<(ConnHandle, L2capHeader, AssembledPacket<'d>)>; CONNS]>, } +impl<'d, const CONNS: usize> Default for PacketReassembly<'d, CONNS> { + fn default() -> Self { + Self::new() + } +} impl<'d, const CONNS: usize> PacketReassembly<'d, CONNS> { const EMPTY: Option<(ConnHandle, L2capHeader, AssembledPacket<'d>)> = None; @@ -80,11 +85,9 @@ impl<'d, const CONNS: usize> PacketReassembly<'d, CONNS> { let mut state = self.handles.borrow_mut(); // Sanity check - for entry in state.iter() { - if let Some(entry) = entry { - if entry.0 == handle { - return Err(Error::InvalidState); - } + for entry in state.iter().flatten() { + if entry.0 == handle { + return Err(Error::InvalidState); } } diff --git a/host/tests/l2cap.rs b/host/tests/l2cap.rs index f50295e..30702bf 100644 --- a/host/tests/l2cap.rs +++ b/host/tests/l2cap.rs @@ -196,8 +196,8 @@ async fn l2cap_connection_oriented_channels() { match tokio::time::timeout(Duration::from_secs(30), local).await { Ok(_) => { - let _ = central.await.unwrap().unwrap(); - let _ = peripheral.await.unwrap().unwrap(); + central.await.unwrap().unwrap(); + peripheral.await.unwrap().unwrap(); println!("Test completed successfully"); } Err(e) => {