Skip to content

Commit 9199c3a

Browse files
aspiringmind-codeVijay ChakravartyVijay Chakravarty
authored
Task scheduling (#8676)
* adding task scheduling * Initialising tw_name with 'any' to fix issue #8618 * Changing any to crab-prod-tw01:testing * Final changes: fixing key names * Removing configreq reference * fixing EOL * Lazy logging * Change selectwork comment and make use of return value * Adding an Adhoc section in the configuration and using it to set the selection_limit * Removing hardcoded tw_name value from Task.py and making use of SQL bindings * Adding logging capability in task scheduling * Adding a dry_run option under Adhoc to make user and task count logging conditional * Making round robin task scheduling as an option for dry_run. All waiting tasks are returned if dry_run is false. * Change Adhoc to TaskScheduling and logging a table of username waiting and selected tasks * Default dry_run for taskscheduling set to True --------- Co-authored-by: Vijay Chakravarty <[email protected]> Co-authored-by: Vijay Chakravarty <[email protected]>
1 parent 6063580 commit 9199c3a

File tree

6 files changed

+135
-5
lines changed

6 files changed

+135
-5
lines changed

src/python/CRABInterface/DataWorkflow.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,7 @@ def submit(self, workflow, activity, jobtype, jobsw, jobarch, use_parent, second
175175
self.api.modify(self.Task.New_sql,
176176
task_name = [workflow],
177177
task_activity = [activity],
178-
task_status = ['NEW'],
178+
task_status = ['WAITING'],
179179
task_command = ['SUBMIT'],
180180
task_failure = [''],
181181
job_sw = [jobsw],
@@ -212,6 +212,7 @@ def submit(self, workflow, activity, jobtype, jobsw, jobarch, use_parent, second
212212
job_type = [jobtype],
213213
arguments = [dbSerializer(arguments)],
214214
save_logs = ['T' if savelogsflag else 'F'],
215+
tw_name = ['NotKnownYet'],
215216
user_infiles = [dbSerializer(adduserfiles)],
216217
maxjobruntime = [maxjobruntime],
217218
numcores = [numcores],

src/python/CRABUtils/TaskUtils.py

+3-1
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,9 @@ def updateTaskStatus(crabserver=None, taskName=None, status=None, logger=None):
5353
""" change task status in the DB """
5454
msg = f"Will set to {status} task {taskName}"
5555
logger.info(msg)
56-
if status == 'NEW':
56+
if status == 'WAITING':
57+
command = 'SUBMIT'
58+
elif status == 'NEW':
5759
command = 'SUBMIT'
5860
elif status == 'SUBMITREFUSED':
5961
command = 'SUBMIT'

src/python/Databases/TaskDB/Oracle/Task/Task.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ class Task(object):
5050
tm_split_algo, tm_split_args, tm_totalunits, tm_user_sandbox, tm_debug_files, tm_cache_url, tm_username, tm_user_dn, \
5151
tm_user_vo, tm_user_role, tm_user_group, tm_publish_name, tm_asyncdest, tm_dbs_url, tm_publish_dbs_url, \
5252
tm_publication, tm_outfiles, tm_tfile_outfiles, tm_edm_outfiles, tm_job_type, tm_generator, tm_arguments, \
53-
tm_save_logs, tm_user_infiles, tm_maxjobruntime, tm_numcores, tm_maxmemory, tm_priority, \
53+
tm_save_logs, tw_name, tm_user_infiles, tm_maxjobruntime, tm_numcores, tm_maxmemory, tm_priority, \
5454
tm_scriptexe, tm_scriptargs, tm_extrajdl, tm_events_per_lumi, tm_collector, tm_schedd, tm_dry_run, \
5555
tm_user_files, tm_transfer_outputs, tm_output_lfn, tm_ignore_locality, tm_fail_limit, tm_one_event_mode, tm_submitter_ip_addr, tm_ignore_global_blacklist, \
5656
tm_user_config) \
@@ -59,7 +59,7 @@ class Task(object):
5959
:split_algo, :split_args, :total_units, :user_sandbox, :debug_files, :cache_url, :username, :user_dn, \
6060
:user_vo, :user_role, :user_group, :publish_name, :asyncdest, :dbs_url, :publish_dbs_url, \
6161
:publication, :outfiles, :tfile_outfiles, :edm_outfiles, :job_type, :generator, :arguments, \
62-
:save_logs, :user_infiles, :maxjobruntime, :numcores, :maxmemory, :priority, \
62+
:save_logs, :tw_name, :user_infiles, :maxjobruntime, :numcores, :maxmemory, :priority, \
6363
:scriptexe, :scriptargs, :extrajdl, :events_per_lumi, :collector, :schedd_name, :dry_run, \
6464
:user_files, :transfer_outputs, :output_lfn, :ignore_locality, :fail_limit, :one_event_mode, :submitter_ip_addr, :ignore_global_blacklist, \
6565
:user_config)"

src/python/ServerUtilities.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@
9191
MAX_TB_TO_RECALL_AT_A_SINGLE_SITE = 1000 # effectively no limit. See https://github.com/dmwm/CRABServer/issues/7610
9292

9393
# These are all possible statuses of a task in the TaskDB.
94-
TASKDBSTATUSES_TMP = ['NEW', 'HOLDING', 'QUEUED', 'TAPERECALL', 'KILLRECALL']
94+
TASKDBSTATUSES_TMP = ['WAITING', 'NEW', 'HOLDING', 'QUEUED', 'TAPERECALL', 'KILLRECALL']
9595
TASKDBSTATUSES_FAILURES = ['SUBMITFAILED', 'KILLFAILED', 'RESUBMITFAILED', 'FAILED']
9696
TASKDBSTATUSES_FINAL = ['UPLOADED', 'SUBMITTED', 'KILLED'] + TASKDBSTATUSES_FAILURES
9797
TASKDBSTATUSES = TASKDBSTATUSES_TMP + TASKDBSTATUSES_FINAL

src/python/TaskWorker/Main.py

+3
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,9 @@ def main():
7171
raise ConfigException("Configuration not found")
7272

7373
configuration = loadConfigurationFile(os.path.abspath(options.config))
74+
configuration.section_('TaskScheduling')
75+
configuration.TaskScheduling.selection_limit = 10
76+
configuration.TaskScheduling.dry_run= True
7477
status, msg = validateConfig(configuration)
7578
if not status:
7679
raise ConfigException(msg)

src/python/TaskWorker/MasterWorker.py

+124
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
from TaskWorker.WorkerExceptions import ConfigException
2424
from TaskWorker.Actions.Recurring.BaseRecurringAction import handleRecurring
2525
from TaskWorker.Actions.Handler import handleResubmit, handleNewTask, handleKill
26+
from CRABUtils.TaskUtils import getTasks, updateTaskStatus
27+
import random
2628

2729
## NOW placing this here, then to be verified if going into Action.Handler, or TSM
2830
## The meaning of the elements in the 3-tuples are as follows:
@@ -245,6 +247,123 @@ def getRecurringActionInst(self, actionName):
245247
return getattr(mod, actionName)(self.config.TaskWorker.logsDir)
246248

247249

250+
def _externalScheduling(self, limit):
251+
"""
252+
External scheduling method using round-robin algorithm to get tasks
253+
in waiting status and consider resource utilization for fair share.
254+
"""
255+
self.logger.info("Starting external scheduling.")
256+
257+
try:
258+
# Retrieve tasks with 'WAITING' status
259+
waiting_tasks = getTasks(crabserver=self.crabserver, status='WAITING', logger=self.logger, limit=limit)
260+
261+
if not waiting_tasks:
262+
self.logger.info("No tasks in 'WAITING' status found.")
263+
return []
264+
265+
# Organize tasks by user
266+
tasks_by_user = {}
267+
for task in waiting_tasks:
268+
user = task['tm_username']
269+
if user not in tasks_by_user:
270+
tasks_by_user[user] = []
271+
tasks_by_user[user].append(task)
272+
273+
# Perform round-robin selection among users
274+
users = list(tasks_by_user.keys())
275+
random.shuffle(users) # To ensure fair round-robin each time
276+
selected_tasks = []
277+
278+
for user in users:
279+
user_tasks = tasks_by_user[user]
280+
selected_tasks.extend(user_tasks[:limit // len(users)])
281+
282+
# Create and populate task_count dictionary
283+
task_count = {'selected': {}, 'waiting': {}}
284+
285+
for status, tasks in [('selected', selected_tasks), ('waiting', waiting_tasks)]:
286+
for task in tasks:
287+
username = task['tm_username']
288+
task_count[status][username] = task_count[status].get(username, 0) + 1
289+
290+
# Prepare table headers and rows
291+
headers = ['Username', 'Waiting', 'Selected']
292+
rows = []
293+
294+
# Collect all usernames to ensure every user appears in the table
295+
all_usernames = set(task_count['selected'].keys()).union(task_count['waiting'].keys())
296+
297+
for username in all_usernames:
298+
waiting_count = task_count['waiting'].get(username, 0)
299+
selected_count = task_count['selected'].get(username, 0)
300+
rows.append([username, waiting_count, selected_count])
301+
302+
# Determine the width of each column for formatting
303+
widths = [max(len(header) for header in headers)] + [max(len(str(row[i])) for row in rows) for i in range(1, len(headers))]
304+
305+
# Prepare formatted table string
306+
table_header = ' | '.join(f'{header:<{width}}' for header, width in zip(headers, widths))
307+
table_separator = '-|-'.join('-' * width for width in widths)
308+
table_rows = '\n'.join(' | '.join(f'{str(cell):<{width}}' for cell, width in zip(row, widths)) for row in rows)
309+
310+
# Combine header, separator, and rows into one string
311+
table = f"{table_header}\n{table_separator}\n{table_rows}"
312+
313+
# Log the formatted table
314+
self.logger.info('\n%s', table)
315+
316+
if self.config.TaskScheduling.dry_run:
317+
return selected_tasks #dry_run True (with Task Scheduling)
318+
else:
319+
return waiting_tasks #dry_run False (without Task Scheduling)
320+
321+
except Exception as e:
322+
self.logger.exception("Exception occurred during external scheduling: %s", str(e))
323+
return []
324+
325+
def _pruneTaskQueue(self):
326+
self.logger.info("Pruning the queue if required...logic tbd")
327+
328+
def _reportQueueStatus(self):
329+
self.logger.info("Report Queue status... logic tbd")
330+
331+
332+
def _selectWork(self, limit):
333+
"""This function calls external scheduling and updates task status for the selected tasks"""
334+
self.logger.info("Starting work selection process.")
335+
336+
# Call the external scheduling method
337+
selected_tasks = self._externalScheduling(limit)
338+
339+
if not selected_tasks:
340+
return False
341+
342+
try:
343+
# Update the status of each selected task to 'NEW'
344+
for task in selected_tasks:
345+
task_name = task['tm_taskname']
346+
updateTaskStatus(crabserver=self.crabserver, taskName=task_name, status='NEW', logger=self.logger)
347+
self.logger.info("Task %s status updated to 'NEW'.", task_name)
348+
349+
# Prune the task queue if necessary
350+
self._pruneTaskQueue()
351+
352+
# Report queue status
353+
self._reportQueueStatus()
354+
355+
except HTTPException as hte:
356+
msg = "HTTP Error during _selectWork: %s\n" % str(hte)
357+
msg += "HTTP Headers are %s: " % hte.headers
358+
self.logger.error(msg)
359+
return False
360+
361+
except Exception: #pylint: disable=broad-except
362+
self.logger.exception("Server could not process the _selectWork request.")
363+
return False
364+
365+
return True
366+
248367
def _lockWork(self, limit, getstatus, setstatus):
249368
"""Today this is always returning true, because we do not want the worker to die if
250369
the server endpoint is not avaialable.
@@ -400,6 +519,11 @@ def algorithm(self):
400519
self.restartQueuedTasks()
401520
self.logger.debug("Master Worker Starting Main Cycle.")
402521
while not self.STOP:
522+
selection_limit = self.config.TaskScheduling.selection_limit
523+
if not self._selectWork(limit=selection_limit):
524+
self.logger.warning("Selection of work failed.")
525+
else:
526+
self.logger.info("Work selected successfully.")
403527
limit = self.slaves.queueableTasks()
404528
if not self._lockWork(limit=limit, getstatus='NEW', setstatus='HOLDING'):
405529
time.sleep(self.config.TaskWorker.polling)

0 commit comments

Comments
 (0)