From b0af34302e321ada670c931cfa0c2f0c1efc168f Mon Sep 17 00:00:00 2001 From: Dario Nieuwenhuis Date: Wed, 20 Mar 2024 19:53:44 +0100 Subject: [PATCH] android: switch from futures-channel to async-channel. `futures-channel` doesn't support blocking send, so in the callbacks from java (which are not in an async context) we're forced to do `try_send`, which will lose data if the channel is full because the Rust side is too busy. This is not a big deal for scanning, but is for stuff like gatt/l2cap responses that are coming next. --- Cargo.toml | 2 +- src/android/adapter.rs | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 4a0e7b2..11c8d82 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -56,7 +56,7 @@ tokio = { version = "1.20.1", features = ["rt-multi-thread"] } [target.'cfg(target_os = "android")'.dependencies] java-spaghetti = "0.1.0" -futures-channel = "0.3.24" +async-channel = "2.2.0" [target.'cfg(any(target_os = "macos", target_os = "ios"))'.dependencies] async-broadcast = "0.5.1" diff --git a/src/android/adapter.rs b/src/android/adapter.rs index faf31a5..5fe1620 100644 --- a/src/android/adapter.rs +++ b/src/android/adapter.rs @@ -4,7 +4,7 @@ use std::sync::atomic::{AtomicI32, Ordering}; use std::sync::{Arc, Mutex}; use std::task::{Context, Poll}; -use futures_channel::mpsc::{Receiver, Sender}; +use async_channel::{Receiver, Sender}; use futures_core::Stream; use futures_lite::{stream, StreamExt}; use java_spaghetti::{Arg, ByteArray, Env, Global, Local, PrimitiveArray, VM}; @@ -96,7 +96,7 @@ impl AdapterImpl { }); }); - Ok(receiver.map(move |x| { + Ok(Box::pin(receiver).map(move |x| { let _guard = &guard; x })) @@ -174,7 +174,7 @@ impl CallbackRouter { fn allocate(&'static self) -> CallbackReceiver { let id = self.next_id.fetch_add(1, Ordering::Relaxed); - let (sender, receiver) = futures_channel::mpsc::channel(16); + let (sender, receiver) = async_channel::bounded(16); self.map .lock() .unwrap() @@ -190,7 +190,7 @@ impl CallbackRouter { fn callback(&'static self, id: i32, val: T) { if let Some(sender) = self.map.lock().unwrap().as_mut().unwrap().get_mut(&id) { - if let Err(e) = sender.try_send(val) { + if let Err(e) = sender.send_blocking(val) { warn!("failed to send scan callback: {:?}", e) } }