1
- # Copyright 2021 The StackStorm Authors.
1
+ # Copyright 2021-2023 The StackStorm Authors.
2
2
# Copyright 2019 Extreme Networks, Inc.
3
3
#
4
4
# Licensed under the Apache License, Version 2.0 (the "License");
14
14
# limitations under the License.
15
15
16
16
import logging
17
- import six
18
-
19
- from six .moves import queue
17
+ import queue
20
18
21
19
from orquesta import constants
22
20
from orquesta import events
@@ -119,7 +117,7 @@ def get_task_sequence(self, task_id, route):
119
117
task_id , route = q .get ()
120
118
121
119
for i , t in enumerate (self .sequence ):
122
- for k , v in six . iteritems ( t ["prev" ]):
120
+ for k , v in t ["prev" ]. items ( ):
123
121
p = self .sequence [v ]
124
122
if p ["id" ] == task_id and p ["route" ] == route :
125
123
seq .append ((i , t ))
@@ -590,7 +588,7 @@ def get_task(self, task_id, route):
590
588
if getattr (task_spec , "delay" , None ):
591
589
task_delay = task_spec .delay
592
590
593
- if isinstance (task_delay , six . string_types ):
591
+ if isinstance (task_delay , str ):
594
592
task_delay = expr_base .evaluate (task_delay , task_ctx )
595
593
596
594
if not isinstance (task_delay , int ):
@@ -790,7 +788,7 @@ def setup_retry_in_task_state(self, task_state_entry, in_ctx_idxs):
790
788
791
789
# Evaluate the retry delay value.
792
790
if "delay" in task_state_entry ["retry" ] and isinstance (
793
- task_state_entry ["retry" ]["delay" ], six . string_types
791
+ task_state_entry ["retry" ]["delay" ], str
794
792
):
795
793
delay_value = expr_base .evaluate (task_state_entry ["retry" ]["delay" ], in_ctx )
796
794
@@ -801,7 +799,7 @@ def setup_retry_in_task_state(self, task_state_entry, in_ctx_idxs):
801
799
802
800
# Evaluate the retry count value.
803
801
if "count" in task_state_entry ["retry" ] and isinstance (
804
- task_state_entry ["retry" ]["count" ], six . string_types
802
+ task_state_entry ["retry" ]["count" ], str
805
803
):
806
804
count_value = expr_base .evaluate (task_state_entry ["retry" ]["count" ], in_ctx )
807
805
@@ -1227,7 +1225,7 @@ def _collapse_task_rerun_requests(self, tasks=None):
1227
1225
# Only the index is required for further evaluation below.
1228
1226
result = {
1229
1227
k : [i [0 ] for i in self .workflow_state .get_task_sequence (t .task_id , t .route )]
1230
- for k , t in six . iteritems ( tasks )
1228
+ for k , t in tasks . items ( )
1231
1229
}
1232
1230
1233
1231
# If the list of task request is greater than one, then we have to check whether
@@ -1238,10 +1236,7 @@ def _collapse_task_rerun_requests(self, tasks=None):
1238
1236
# The for loops below identify task requests that have subsequent task sequences
1239
1237
# not in other task requests.
1240
1238
result = {
1241
- k : i
1242
- for k , i in six .iteritems (result )
1243
- for j in result .values ()
1244
- if len (set (i ) - set (j )) > 0
1239
+ k : i for k , i in result .items () for j in result .values () if len (set (i ) - set (j )) > 0
1245
1240
}
1246
1241
1247
1242
return result
@@ -1256,9 +1251,7 @@ def request_workflow_rerun(self, task_requests=None):
1256
1251
tasks = {t .task_state_entry_id : t for t in task_requests or []}
1257
1252
1258
1253
# If the list of tasks is provided, verify if task exist and rerunnable.
1259
- invalid_rerun_requests = [
1260
- t for k , t in six .iteritems (tasks ) if k not in self .workflow_state .tasks
1261
- ]
1254
+ invalid_rerun_requests = [t for k , t in tasks .items () if k not in self .workflow_state .tasks ]
1262
1255
1263
1256
if invalid_rerun_requests :
1264
1257
raise exc .InvalidTaskRerunRequest (invalid_rerun_requests )
@@ -1277,7 +1270,7 @@ def request_workflow_rerun(self, task_requests=None):
1277
1270
self ._get_task_state_idx (t .task_id , t .route ),
1278
1271
self .workflow_state .get_task (t .task_id , t .route ),
1279
1272
)
1280
- for k , t in six . iteritems ( tasks )
1273
+ for k , t in tasks . items ( )
1281
1274
if k in self ._collapse_task_rerun_requests (tasks )
1282
1275
}
1283
1276
@@ -1299,11 +1292,11 @@ def request_workflow_rerun(self, task_requests=None):
1299
1292
continuable_candidates = {
1300
1293
constants .TASK_STATE_ROUTE_FORMAT % (t ["id" ], str (t ["route" ])): t
1301
1294
for i , t in self .workflow_state .get_terminal_tasks ()
1302
- if len ([k for k , v in six . iteritems ( t ["next" ]) if v ]) > 0
1295
+ if len ([k for k , v in t ["next" ]. items ( ) if v ]) > 0
1303
1296
}
1304
1297
1305
1298
# Automatically resume all continuable candidates.
1306
- for _ , task in sorted (six . iteritems ( continuable_candidates ), key = lambda x : x [0 ]):
1299
+ for _ , task in sorted (continuable_candidates . items ( ), key = lambda x : x [0 ]):
1307
1300
# Reset terminal status for the continuable candidate.
1308
1301
task .pop ("term" , None )
1309
1302
0 commit comments