@@ -7,7 +7,6 @@ use std::any::Any;
7
7
use std:: panic:: { AssertUnwindSafe , catch_unwind, resume_unwind} ;
8
8
9
9
use parking_lot:: Mutex ;
10
- use rayon:: iter:: { FromParallelIterator , IntoParallelIterator , ParallelIterator } ;
11
10
12
11
use crate :: FatalErrorMarker ;
13
12
use crate :: sync:: { DynSend , DynSync , FromDyn , IntoDynSyncSend , mode} ;
@@ -97,11 +96,11 @@ macro_rules! parallel {
97
96
// This function only works when `mode::is_dyn_thread_safe()`.
98
97
pub fn scope < ' scope , OP , R > ( op : OP ) -> R
99
98
where
100
- OP : FnOnce ( & rayon :: Scope < ' scope > ) -> R + DynSend ,
99
+ OP : FnOnce ( & rayon_core :: Scope < ' scope > ) -> R + DynSend ,
101
100
R : DynSend ,
102
101
{
103
102
let op = FromDyn :: from ( op) ;
104
- rayon :: scope ( |s| FromDyn :: from ( op. into_inner ( ) ( s) ) ) . into_inner ( )
103
+ rayon_core :: scope ( |s| FromDyn :: from ( op. into_inner ( ) ( s) ) ) . into_inner ( )
105
104
}
106
105
107
106
#[ inline]
@@ -114,7 +113,7 @@ where
114
113
let oper_a = FromDyn :: from ( oper_a) ;
115
114
let oper_b = FromDyn :: from ( oper_b) ;
116
115
let ( a, b) = parallel_guard ( |guard| {
117
- rayon :: join (
116
+ rayon_core :: join (
118
117
move || guard. run ( move || FromDyn :: from ( oper_a. into_inner ( ) ( ) ) ) ,
119
118
move || guard. run ( move || FromDyn :: from ( oper_b. into_inner ( ) ( ) ) ) ,
120
119
)
@@ -125,56 +124,99 @@ where
125
124
}
126
125
}
127
126
128
- pub fn par_for_each_in < I , T : IntoIterator < Item = I > + IntoParallelIterator < Item = I > > (
127
+ fn par_slice < I : DynSend > (
128
+ items : & mut [ I ] ,
129
+ guard : & ParallelGuard ,
130
+ for_each : impl Fn ( & mut I ) + DynSync + DynSend ,
131
+ ) {
132
+ struct State < ' a , F > {
133
+ for_each : FromDyn < F > ,
134
+ guard : & ' a ParallelGuard ,
135
+ group : usize ,
136
+ }
137
+
138
+ fn par_rec < I : DynSend , F : Fn ( & mut I ) + DynSync + DynSend > (
139
+ items : & mut [ I ] ,
140
+ state : & State < ' _ , F > ,
141
+ ) {
142
+ if items. len ( ) <= state. group {
143
+ for item in items {
144
+ state. guard . run ( || ( state. for_each ) ( item) ) ;
145
+ }
146
+ } else {
147
+ let ( left, right) = items. split_at_mut ( items. len ( ) / 2 ) ;
148
+ let mut left = state. for_each . derive ( left) ;
149
+ let mut right = state. for_each . derive ( right) ;
150
+ rayon_core:: join ( move || par_rec ( * left, state) , move || par_rec ( * right, state) ) ;
151
+ }
152
+ }
153
+
154
+ let state = State {
155
+ for_each : FromDyn :: from ( for_each) ,
156
+ guard,
157
+ group : std:: cmp:: max ( items. len ( ) / 128 , 1 ) ,
158
+ } ;
159
+ par_rec ( items, & state)
160
+ }
161
+
162
+ pub fn par_for_each_in < I : DynSend , T : IntoIterator < Item = I > > (
129
163
t : T ,
130
- for_each : impl Fn ( I ) + DynSync + DynSend ,
164
+ for_each : impl Fn ( & I ) + DynSync + DynSend ,
131
165
) {
132
166
parallel_guard ( |guard| {
133
167
if mode:: is_dyn_thread_safe ( ) {
134
- let for_each = FromDyn :: from ( for_each) ;
135
- t. into_par_iter ( ) . for_each ( |i| {
136
- guard. run ( || for_each ( i) ) ;
137
- } ) ;
168
+ let mut items: Vec < _ > = t. into_iter ( ) . collect ( ) ;
169
+ par_slice ( & mut items, guard, |i| for_each ( & * i) )
138
170
} else {
139
171
t. into_iter ( ) . for_each ( |i| {
140
- guard. run ( || for_each ( i) ) ;
172
+ guard. run ( || for_each ( & i) ) ;
141
173
} ) ;
142
174
}
143
175
} ) ;
144
176
}
145
177
146
- pub fn try_par_for_each_in <
147
- T : IntoIterator + IntoParallelIterator < Item = <T as IntoIterator >:: Item > ,
148
- E : Send ,
149
- > (
178
+ pub fn try_par_for_each_in < T : IntoIterator , E : DynSend > (
150
179
t : T ,
151
- for_each : impl Fn ( <T as IntoIterator >:: Item ) -> Result < ( ) , E > + DynSync + DynSend ,
152
- ) -> Result < ( ) , E > {
180
+ for_each : impl Fn ( & <T as IntoIterator >:: Item ) -> Result < ( ) , E > + DynSync + DynSend ,
181
+ ) -> Result < ( ) , E >
182
+ where
183
+ <T as IntoIterator >:: Item : DynSend ,
184
+ {
153
185
parallel_guard ( |guard| {
154
186
if mode:: is_dyn_thread_safe ( ) {
155
- let for_each = FromDyn :: from ( for_each) ;
156
- t. into_par_iter ( )
157
- . filter_map ( |i| guard. run ( || for_each ( i) ) )
158
- . reduce ( || Ok ( ( ) ) , Result :: and)
187
+ let mut items: Vec < _ > = t. into_iter ( ) . collect ( ) ;
188
+
189
+ let error = Mutex :: new ( None ) ;
190
+
191
+ par_slice ( & mut items, guard, |i| {
192
+ if let Err ( err) = for_each ( & * i) {
193
+ * error. lock ( ) = Some ( err) ;
194
+ }
195
+ } ) ;
196
+
197
+ if let Some ( err) = error. into_inner ( ) { Err ( err) } else { Ok ( ( ) ) }
159
198
} else {
160
- t. into_iter ( ) . filter_map ( |i| guard. run ( || for_each ( i) ) ) . fold ( Ok ( ( ) ) , Result :: and)
199
+ t. into_iter ( ) . filter_map ( |i| guard. run ( || for_each ( & i) ) ) . fold ( Ok ( ( ) ) , Result :: and)
161
200
}
162
201
} )
163
202
}
164
203
165
- pub fn par_map <
166
- I ,
167
- T : IntoIterator < Item = I > + IntoParallelIterator < Item = I > ,
168
- R : std:: marker:: Send ,
169
- C : FromIterator < R > + FromParallelIterator < R > ,
170
- > (
204
+ pub fn par_map < I : DynSend , T : IntoIterator < Item = I > , R : DynSend , C : FromIterator < R > > (
171
205
t : T ,
172
206
map : impl Fn ( I ) -> R + DynSync + DynSend ,
173
207
) -> C {
174
208
parallel_guard ( |guard| {
175
209
if mode:: is_dyn_thread_safe ( ) {
176
210
let map = FromDyn :: from ( map) ;
177
- t. into_par_iter ( ) . filter_map ( |i| guard. run ( || map ( i) ) ) . collect ( )
211
+
212
+ let mut items: Vec < ( Option < I > , Option < R > ) > =
213
+ t. into_iter ( ) . map ( |i| ( Some ( i) , None ) ) . collect ( ) ;
214
+
215
+ par_slice ( & mut items, guard, |i| {
216
+ i. 1 = Some ( map ( i. 0 . take ( ) . unwrap ( ) ) ) ;
217
+ } ) ;
218
+
219
+ items. into_iter ( ) . filter_map ( |i| i. 1 ) . collect ( )
178
220
} else {
179
221
t. into_iter ( ) . filter_map ( |i| guard. run ( || map ( i) ) ) . collect ( )
180
222
}
0 commit comments