Skip to content

Commit 7fbf759

Browse files
committed
Rust: Implement a rust dispatcher connected to GCD
GCD (Grand Central Dispatch) is a runtime system for executing code concurrently. It's available on iOS and macOS by default as provided by the system, and available as libdispatch on other *nix systems. We used that before when our dispatcher was purely in Swift. We now go back to use it, but from Rust.
1 parent 62553ae commit 7fbf759

File tree

8 files changed

+465
-283
lines changed

8 files changed

+465
-283
lines changed

.circleci/config.yml

+7
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,13 @@ commands:
7676
if rustc --version | grep -q 'rustc 1.6'; then
7777
cargo run -p sample
7878
fi
79+
- run:
80+
name: Run Rust sample
81+
command: |
82+
sudo apt install --yes --no-install-recommends \
83+
libdispatch0
84+
85+
cargo test --verbose --jobs 6 --features native-dispatcher -- --nocapture
7986
- run:
8087
name: Upload coverage report
8188
command: |

Cargo.lock

+7
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

glean-core/Cargo.toml

+6
Original file line numberDiff line numberDiff line change
@@ -45,13 +45,17 @@ uniffi_macros = "0.21.0"
4545
time = "0.1.40"
4646
remove_dir_all = "0.5.3"
4747
env_logger = { version = "0.9.0", default-features = false, optional = true }
48+
dispatch = { version = "0.2.0", optional = true }
4849

4950
[target.'cfg(target_os = "android")'.dependencies]
5051
android_logger = { version = "0.11.0", default-features = false }
5152

5253
[target.'cfg(target_os = "ios")'.dependencies]
5354
oslog = { version = "0.1.0", default-features = false, features = ["logger"] }
5455

56+
[target.'cfg(any(target_os = "ios"))'.dependencies]
57+
dispatch = "0.2.0"
58+
5559
[dev-dependencies]
5660
env_logger = { version = "0.9.0", default-features = false, features = ["termcolor", "atty", "humantime"] }
5761
tempfile = "3.1.0"
@@ -66,3 +70,5 @@ uniffi_build = { version = "0.21.0", features = ["builtin-bindgen"] }
6670
preinit_million_queue = []
6771
# Enable `env_logger`. Only works on non-Android non-iOS targets.
6872
enable_env_logger = ["env_logger"]
73+
# Use libdispatch, default on iOS
74+
native-dispatcher = ["dispatch"]

glean-core/rlb/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -45,3 +45,4 @@ flate2 = "1.0.19"
4545

4646
[features]
4747
preinit_million_queue = ["glean-core/preinit_million_queue"]
48+
native-dispatcher = ["glean-core/native-dispatcher"]

glean-core/src/dispatcher/global.rs

+5-7
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,12 @@
22
// License, v. 2.0. If a copy of the MPL was not distributed with this
33
// file, You can obtain one at https://mozilla.org/MPL/2.0/.
44

5-
use once_cell::sync::Lazy;
5+
use std::sync::Arc;
66
use std::sync::atomic::{AtomicBool, Ordering};
77
use std::sync::RwLock;
88

9+
use once_cell::sync::Lazy;
10+
911
use super::{DispatchError, DispatchGuard, Dispatcher};
1012

1113
#[cfg(feature = "preinit_million_queue")]
@@ -26,7 +28,7 @@ pub fn is_test_mode() -> bool {
2628
///
2729
/// A dispatcher is cheap to create, so we create one on every access instead of caching it.
2830
/// This avoids troubles for tests where the global dispatcher _can_ change.
29-
fn guard() -> DispatchGuard {
31+
fn guard() -> Arc<DispatchGuard> {
3032
GLOBAL_DISPATCHER
3133
.read()
3234
.unwrap()
@@ -89,11 +91,7 @@ fn join_dispatcher_thread() -> Result<(), DispatchError> {
8991
let mut lock = GLOBAL_DISPATCHER.write().unwrap();
9092
let dispatcher = lock.as_mut().expect("Global dispatcher has gone missing");
9193

92-
if let Some(worker) = dispatcher.worker.take() {
93-
return worker.join().map_err(|_| DispatchError::WorkerPanic);
94-
}
95-
96-
Ok(())
94+
dispatcher.join()
9795
}
9896

9997
/// Kill the blocked dispatcher without processing the queue.

glean-core/src/dispatcher/imp.rs

+281
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,281 @@
1+
// This Source Code Form is subject to the terms of the Mozilla Public
2+
// License, v. 2.0. If a copy of the MPL was not distributed with this
3+
// file, You can obtain one at https://mozilla.org/MPL/2.0/.
4+
5+
use std::{
6+
mem,
7+
sync::{
8+
atomic::{AtomicBool, AtomicUsize, Ordering},
9+
Arc,
10+
},
11+
thread::{self, JoinHandle},
12+
};
13+
14+
use crossbeam_channel::{bounded, unbounded, Sender};
15+
16+
use super::DispatchError;
17+
18+
/// Command received while blocked from further work.
19+
enum Blocked {
20+
/// Shutdown immediately without processing the queue.
21+
Shutdown,
22+
/// Unblock and continue with work as normal.
23+
Continue,
24+
}
25+
26+
/// The command a worker should execute.
27+
enum Command {
28+
/// A task is a user-defined function to run.
29+
Task(Box<dyn FnOnce() + Send>),
30+
31+
/// Swap the channel
32+
Swap(Sender<()>),
33+
34+
/// Signal the worker to finish work and shut down.
35+
Shutdown,
36+
}
37+
38+
/// A clonable guard for a dispatch queue.
39+
#[derive(Clone)]
40+
pub struct DispatchGuard {
41+
/// Whether to queue on the preinit buffer or on the unbounded queue
42+
queue_preinit: Arc<AtomicBool>,
43+
44+
/// The number of items that were added to the queue after it filled up.
45+
overflow_count: Arc<AtomicUsize>,
46+
47+
/// The maximum pre-init queue size
48+
max_queue_size: usize,
49+
50+
/// Used to unblock the worker thread initially.
51+
block_sender: Sender<Blocked>,
52+
53+
/// Sender for the preinit queue.
54+
preinit_sender: Sender<Command>,
55+
56+
/// Sender for the unbounded queue.
57+
sender: Sender<Command>,
58+
}
59+
60+
impl DispatchGuard {
61+
pub fn launch(&self, task: impl FnOnce() + Send + 'static) -> Result<(), DispatchError> {
62+
let task = Command::Task(Box::new(task));
63+
self.send(task)
64+
}
65+
66+
pub fn shutdown(&self) -> Result<(), DispatchError> {
67+
// Need to flush in order for the thread to actually process anything,
68+
// including the shutdown command.
69+
self.flush_init().ok();
70+
self.send(Command::Shutdown)
71+
}
72+
73+
fn send(&self, task: Command) -> Result<(), DispatchError> {
74+
if self.queue_preinit.load(Ordering::SeqCst) {
75+
if self.preinit_sender.len() < self.max_queue_size {
76+
self.preinit_sender.send(task)?;
77+
Ok(())
78+
} else {
79+
self.overflow_count.fetch_add(1, Ordering::SeqCst);
80+
// Instead of using a bounded queue, we are handling the bounds
81+
// checking ourselves. If a bounded queue were full, we would return
82+
// a QueueFull DispatchError, so we do the same here.
83+
Err(DispatchError::QueueFull)
84+
}
85+
} else {
86+
self.sender.send(task)?;
87+
Ok(())
88+
}
89+
}
90+
91+
pub fn block_on_queue(&self) {
92+
let (tx, rx) = crossbeam_channel::bounded(0);
93+
94+
// We explicitly don't use `self.launch` here.
95+
// We always put this task on the unbounded queue.
96+
// The pre-init queue might be full before its flushed, in which case this would panic.
97+
// Blocking on the queue can only work if it is eventually flushed anyway.
98+
99+
let task = Command::Task(Box::new(move || {
100+
tx.send(())
101+
.expect("(worker) Can't send message on single-use channel");
102+
}));
103+
self.sender
104+
.send(task)
105+
.expect("Failed to launch the blocking task");
106+
107+
rx.recv()
108+
.expect("Failed to receive message on single-use channel");
109+
}
110+
111+
pub fn kill(&self) -> Result<(), DispatchError> {
112+
// We immediately stop queueing in the pre-init buffer.
113+
let old_val = self.queue_preinit.swap(false, Ordering::SeqCst);
114+
if !old_val {
115+
return Err(DispatchError::AlreadyFlushed);
116+
}
117+
118+
// Unblock the worker thread exactly once.
119+
self.block_sender.send(Blocked::Shutdown)?;
120+
Ok(())
121+
}
122+
123+
/// Flushes the pre-init buffer.
124+
///
125+
/// This function blocks until tasks queued prior to this call are finished.
126+
/// Once the initial queue is empty the dispatcher will wait for new tasks to be launched.
127+
///
128+
/// Returns an error if called multiple times.
129+
pub fn flush_init(&self) -> Result<usize, DispatchError> {
130+
// We immediately stop queueing in the pre-init buffer.
131+
let old_val = self.queue_preinit.swap(false, Ordering::SeqCst);
132+
if !old_val {
133+
return Err(DispatchError::AlreadyFlushed);
134+
}
135+
136+
// Unblock the worker thread exactly once.
137+
self.block_sender.send(Blocked::Continue)?;
138+
139+
// Single-use channel to communicate with the worker thread.
140+
let (swap_sender, swap_receiver) = bounded(0);
141+
142+
// Send final command and block until it is sent.
143+
self.preinit_sender
144+
.send(Command::Swap(swap_sender))
145+
.map_err(|_| DispatchError::SendError)?;
146+
147+
// Now wait for the worker thread to do the swap and inform us.
148+
// This blocks until all tasks in the preinit buffer have been processed.
149+
swap_receiver.recv()?;
150+
151+
// We're not queueing anymore.
152+
super::global::QUEUE_TASKS.store(false, Ordering::SeqCst);
153+
154+
let overflow_count = self.overflow_count.load(Ordering::SeqCst);
155+
if overflow_count > 0 {
156+
Ok(overflow_count)
157+
} else {
158+
Ok(0)
159+
}
160+
}
161+
}
162+
163+
/// A dispatcher.
164+
///
165+
/// Run expensive processing tasks sequentially off the main thread.
166+
/// Tasks are processed in a single separate thread in the order they are submitted.
167+
/// The dispatch queue will enqueue tasks while not flushed, up to the maximum queue size.
168+
/// Processing will start after flushing once, processing already enqueued tasks first, then
169+
/// waiting for further tasks to be enqueued.
170+
pub struct Dispatcher {
171+
/// Guard used for communication with the worker thread.
172+
guard: DispatchGuard,
173+
174+
/// Handle to the worker thread, allows to wait for it to finish.
175+
pub worker: Option<JoinHandle<()>>,
176+
}
177+
178+
impl Dispatcher {
179+
/// Creates a new dispatcher with a maximum queue size.
180+
///
181+
/// Launched tasks won't run until [`flush_init`] is called.
182+
///
183+
/// [`flush_init`]: #method.flush_init
184+
pub fn new(max_queue_size: usize) -> Self {
185+
let (block_sender, block_receiver) = bounded(1);
186+
let (preinit_sender, preinit_receiver) = unbounded();
187+
let (sender, mut unbounded_receiver) = unbounded();
188+
189+
let queue_preinit = Arc::new(AtomicBool::new(true));
190+
let overflow_count = Arc::new(AtomicUsize::new(0));
191+
192+
let worker = thread::Builder::new()
193+
.name("glean.dispatcher".into())
194+
.spawn(move || {
195+
match block_receiver.recv() {
196+
Err(_) => {
197+
// The other side was disconnected.
198+
// There's nothing the worker thread can do.
199+
log::error!("The task producer was disconnected. Worker thread will exit.");
200+
return;
201+
}
202+
Ok(Blocked::Shutdown) => {
203+
// The other side wants us to stop immediately
204+
return;
205+
}
206+
Ok(Blocked::Continue) => {
207+
// Queue is unblocked, processing continues as normal.
208+
}
209+
}
210+
211+
let mut receiver = preinit_receiver;
212+
loop {
213+
use Command::*;
214+
215+
match receiver.recv() {
216+
Ok(Shutdown) => {
217+
break;
218+
}
219+
220+
Ok(Task(f)) => {
221+
(f)();
222+
}
223+
224+
Ok(Swap(swap_done)) => {
225+
// A swap should only occur exactly once.
226+
// This is upheld by `flush_init`, which errors out if the preinit buffer
227+
// was already flushed.
228+
229+
// We swap the channels we listen on for new tasks.
230+
// The next iteration will continue with the unbounded queue.
231+
mem::swap(&mut receiver, &mut unbounded_receiver);
232+
233+
// The swap command MUST be the last one received on the preinit buffer,
234+
// so by the time we run this we know all preinit tasks were processed.
235+
// We can notify the other side.
236+
swap_done
237+
.send(())
238+
.expect("The caller of `flush_init` has gone missing");
239+
}
240+
241+
// Other side was disconnected.
242+
Err(_) => {
243+
log::error!(
244+
"The task producer was disconnected. Worker thread will exit."
245+
);
246+
return;
247+
}
248+
}
249+
}
250+
})
251+
.expect("Failed to spawn Glean's dispatcher thread");
252+
253+
let guard = DispatchGuard {
254+
queue_preinit,
255+
overflow_count,
256+
max_queue_size,
257+
block_sender,
258+
preinit_sender,
259+
sender,
260+
};
261+
262+
Dispatcher {
263+
guard,
264+
worker: Some(worker),
265+
}
266+
}
267+
268+
pub fn guard(&self) -> Arc<DispatchGuard> {
269+
Arc::new(self.guard.clone())
270+
}
271+
272+
/// Waits for the worker thread to finish and finishes the dispatch queue.
273+
///
274+
/// You need to call `shutdown` to initiate a shutdown of the queue.
275+
pub fn join(&mut self) -> Result<(), DispatchError> {
276+
if let Some(worker) = self.worker.take() {
277+
worker.join().map_err(|_| DispatchError::WorkerPanic)?;
278+
}
279+
Ok(())
280+
}
281+
}

0 commit comments

Comments
 (0)