2
2
// License, v. 2.0. If a copy of the MPL was not distributed with this
3
3
// file, You can obtain one at https://mozilla.org/MPL/2.0/.
4
4
5
- use std:: sync:: atomic:: { AtomicU8 , Ordering } ;
5
+ use std:: sync:: atomic:: { AtomicU8 , AtomicUsize , Ordering } ;
6
6
use std:: sync:: Arc ;
7
7
use std:: sync:: Mutex ;
8
8
@@ -38,12 +38,13 @@ impl Dispatcher {
38
38
pub fn new ( max_queue_size : usize ) -> Self {
39
39
let queue = Queue :: create ( "glean.dispatcher" , QueueAttribute :: Serial ) ;
40
40
let preinit_queue = Mutex :: new ( Vec :: with_capacity ( 10 ) ) ;
41
+ let overflow_count = Arc :: new ( AtomicUsize :: new ( 0 ) ) ;
41
42
42
43
let guard = DispatchGuard {
43
44
queue : Some ( queue) ,
44
-
45
45
flushed : AtomicU8 :: new ( QueueStatus :: NotFlushed as u8 ) ,
46
46
max_queue_size,
47
+ overflow_count,
47
48
preinit_queue,
48
49
} ;
49
50
@@ -60,15 +61,11 @@ impl Dispatcher {
60
61
///
61
62
/// You need to call `shutdown` to initiate a shutdown of the queue.
62
63
pub fn join ( & mut self ) -> Result < ( ) , DispatchError > {
63
- self . guard . shutdown ( ) . ok ( ) ;
64
- if let Some ( queue) = self . guard . queue . as_ref ( ) {
65
- queue. exec_sync ( || {
66
- // intentionally left empty
67
- } ) ;
68
- }
69
-
70
64
if let Some ( guard) = Arc :: get_mut ( & mut self . guard ) {
71
65
if let Some ( queue) = guard. queue . take ( ) {
66
+ queue. exec_sync ( || {
67
+ // intentionally left empty
68
+ } ) ;
72
69
drop ( queue) ;
73
70
}
74
71
}
@@ -78,9 +75,21 @@ impl Dispatcher {
78
75
79
76
/// A clonable guard for a dispatch queue.
80
77
pub struct DispatchGuard {
78
+ /// The queue to run on
81
79
queue : Option < Queue > ,
80
+
81
+ /// Status of the queue. One of `QueueStatus`
82
82
flushed : AtomicU8 ,
83
+
84
+ /// The maximum pre-init queue size
83
85
max_queue_size : usize ,
86
+
87
+ /// The number of items that were added to the queue after it filled up
88
+ overflow_count : Arc < AtomicUsize > ,
89
+
90
+ /// The pre-init queue
91
+ ///
92
+ /// Collects tasks before `flush_init` is called up until `max_queue_size`.
84
93
preinit_queue : Mutex < Vec < Box < dyn FnOnce ( ) + Send + ' static > > > ,
85
94
}
86
95
@@ -95,12 +104,20 @@ impl DispatchGuard {
95
104
pub fn launch ( & self , task : impl FnOnce ( ) + Send + ' static ) -> Result < ( ) , DispatchError > {
96
105
if self . flushed . load ( Ordering :: SeqCst ) == QueueStatus :: IsFlushed as u8 {
97
106
self . queue ( ) . exec_async ( task) ;
107
+ Ok ( ( ) )
98
108
} else {
99
109
let mut queue = self . preinit_queue . lock ( ) . unwrap ( ) ;
100
- queue. push ( Box :: new ( task) )
110
+ if queue. len ( ) < self . max_queue_size {
111
+ queue. push ( Box :: new ( task) ) ;
112
+ Ok ( ( ) )
113
+ } else {
114
+ self . overflow_count . fetch_add ( 1 , Ordering :: SeqCst ) ;
115
+ // Instead of using a bounded queue, we are handling the bounds
116
+ // checking ourselves. If a bounded queue were full, we would return
117
+ // a QueueFull DispatchError, so we do the same here.
118
+ Err ( DispatchError :: QueueFull )
119
+ }
101
120
}
102
-
103
- Ok ( ( ) )
104
121
}
105
122
106
123
/// Shut down the dispatch queue.
@@ -147,32 +164,23 @@ impl DispatchGuard {
147
164
return Err ( DispatchError :: AlreadyFlushed ) ;
148
165
}
149
166
150
- let over;
151
- let queue_size;
152
167
{
153
168
let mut queue = self . preinit_queue . lock ( ) . unwrap ( ) ;
154
- queue_size = queue. len ( ) ;
155
- over = queue_size. saturating_sub ( self . max_queue_size ) ;
156
- let end = std:: cmp:: min ( queue_size, self . max_queue_size ) ;
157
-
158
- for task in queue. drain ( ..end) {
169
+ for task in queue. drain ( ..) {
159
170
self . queue ( ) . exec_sync ( task) ;
160
171
}
161
172
}
162
- // give others a chance to get the lock
163
- {
164
- let mut queue = self . preinit_queue . lock ( ) . unwrap ( ) ;
165
- if queue. len ( ) > over {
166
- let start = queue. len ( ) - over;
167
- for task in queue. drain ( start..) {
168
- self . queue ( ) . exec_sync ( task) ;
169
- }
170
- }
171
- }
173
+
174
+ let overflow_count = self . overflow_count . load ( Ordering :: SeqCst ) ;
172
175
173
176
self . flushed
174
177
. store ( QueueStatus :: IsFlushed as u8 , Ordering :: SeqCst ) ;
175
- Ok ( over)
178
+
179
+ if overflow_count > 0 {
180
+ Ok ( overflow_count)
181
+ } else {
182
+ Ok ( 0 )
183
+ }
176
184
}
177
185
178
186
pub fn kill ( & self ) -> Result < ( ) , DispatchError > {
0 commit comments