@@ -170,13 +170,19 @@ def get_worker(self, nowait=False):
170
170
while self .running :
171
171
best_score = 99999999
172
172
best_worker = None
173
+ good_worker = 0
173
174
idle_num = 0
174
175
now = time .time ()
175
176
for worker in self .workers :
176
- if not worker . accept_task or worker .is_life_end ():
177
+ if worker .is_life_end ():
177
178
# self.logger.debug("not accept")
178
179
continue
179
180
181
+ good_worker += 1
182
+
183
+ if not worker .accept_task :
184
+ continue
185
+
180
186
if worker .version == "1.1" :
181
187
idle_num += 1
182
188
else :
@@ -189,7 +195,7 @@ def get_worker(self, nowait=False):
189
195
best_score = score
190
196
best_worker = worker
191
197
192
- if len ( self . workers ) < self .config .dispather_max_workers and \
198
+ if good_worker < self .config .dispather_max_workers and \
193
199
(best_worker is None or
194
200
idle_num < self .config .dispather_min_idle_workers or
195
201
len (self .workers ) < self .config .dispather_min_workers or
@@ -268,11 +274,15 @@ def request(self, method, host, path, headers, body, url=b"", timeout=60):
268
274
if response and response .status == 200 :
269
275
self .success_num += 1
270
276
self .continue_fail_num = 0
277
+ task .worker .continue_fail_tasks = 0
271
278
else :
272
279
self .logger .warn ("task %s %s %s timeout" , method , host , path )
273
280
self .fail_num += 1
274
281
self .continue_fail_num += 1
275
282
self .last_fail_time = time .time ()
283
+ task .worker .continue_fail_tasks += 1
284
+ if task .worker .continue_fail_tasks > self .config .dispather_worker_max_continue_fail :
285
+ self .trigger_create_worker_cv .notify ()
276
286
277
287
task .set_state ("get_response" )
278
288
return response
0 commit comments