Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v8.0] Introduce Scout Agent and Optimizer #7251

Open
wants to merge 20 commits into
base: integration
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 19 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
228 changes: 228 additions & 0 deletions src/DIRAC/WorkloadManagementSystem/Agent/ScoutingJobStatusAgent.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,228 @@
""" Agent for scout job framework to monitor scout status
and update main job status according to scout status.
"""
from DIRAC import S_OK, S_ERROR
from DIRAC.Core.Base.AgentModule import AgentModule
from DIRAC.WorkloadManagementSystem.Client import JobStatus
from DIRAC.ConfigurationSystem.Client.Helpers.Operations import Operations
from DIRAC.WorkloadManagementSystem.Client.JobStateUpdateClient import JobStateUpdateClient
from DIRAC.WorkloadManagementSystem.DB.JobLoggingDB import JobLoggingDB


class ScoutingJobStatusAgent(AgentModule):
"""
This agent checks for jobs with Scouting status
and manages the job status in relation to associated scout jobs
"""

def __init__(self, *args, **kwargs):
""" c'tor
"""
AgentModule.__init__(self, *args, **kwargs)

self.jobDB = None
self.logDB = None

#############################################################################
def initialize(self):
"""Sets defaults
"""

self.jobDB = JobStateUpdateClient()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it was me not being clear, but this can't be a 1-to-1 substitution between JobDB and JobStateUpdateClient, as the 2 don't expose the same methods. For example, JobStateUpdateClient does not expose the selectJobs method used below.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I see. Does JobStateUpdateClient access only the methods found in JobStateUpdateHandler, or are there others?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only those.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general, I would do this, for added clarity:

Suggested change
self.jobDB = JobStateUpdateClient()
self.jobStateUpdate = JobStateUpdateClient()

then you'll need at least also

        self.jobMonitoring = JobMonitoringClient()

self.logDB = JobLoggingDB()

return S_OK()

#############################################################################
def beginExecution(self):

self.totalScoutJobs = Operations().getValue('WorkloadManagement/Scouting/totalScoutJobs', 10)
self.criteriaFailedRate = Operations().getValue('WorkloadManagement/Scouting/criteriaFailedRate', 0.5)
self.criteriaSucceededRate = Operations().getValue('WorkloadManagement/Scouting/criteriaSucceededRate', 0.3)
self.criteriaStalledRate = Operations().getValue('WorkloadManagement/Scouting/criteriaStalledRate', 1.0)
self.criteriaFailed = Operations().getValue('WorkloadManagement/Scouting/criteriaFailed',
int(self.totalScoutJobs * self.criteriaFailedRate))
self.criteriaSucceeded = Operations().getValue('WorkloadManagement/Scouting/criteriaSucceeded',
int(self.totalScoutJobs * self.criteriaSucceededRate))
self.criteriaStalled = Operations().getValue('WorkloadManagement/Scouting/criteriaStalled',
int(self.totalScoutJobs * self.criteriaStalledRate))
Comment on lines +39 to +48
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this go via the Agent options, or is this used also in other places?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In any case you have to add the agent to the ConfigTemplate file?


if int(self.totalScoutJobs * self.criteriaFailedRate) > self.criteriaFailed:
self.criteriaFailedRate = int(self.criteriaFailed / self.totalScoutJobs)
if int(self.totalScoutJobs * self.criteriaSucceededRate) > self.criteriaSucceeded:
self.criteriaSucceededRate = int(self.criteriaSucceeded / self.totalScoutJobs)
if int(self.totalScoutJobs * self.criteriaStalledRate) > self.criteriaStalled:
self.criteriaStalledRate = int(self.criteriaStalled / self.totalScoutJobs)

self.log.info(f'Scouting parameters: Total: {self.totalScoutJobs}, Succeeded: {self.criteriaSucceeded}({self.criteriaSucceededRate}),
Failed: {self.criteriaFailed}({self.criteriaFailedRate}), Stalled" {self.criteriaStalled}({self.criteriaStalledRate})')

return S_OK()

def execute(self):
"""The ScoutingJobStatus execution method.
"""
result = self.jobDB.selectJobs({'Status': 'Scouting'})
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

JobStateUpdateClient doesn't seem to have a method that does what we want here. However, it seems that getJobs() in JobMonitoringHandler might have this functionality. Would you recommend using getJobs, or implementing this in JobStateUpdateHandler?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use getJobs.

if not result['OK']:
return result

joblist = result['Value']
if not joblist:
self.log.info('No Jobs with scouting status. Skipping this cycle')
return S_OK()

self.log.info(f'Check {len(joblist)} scouting jobs')
self.log.debug('joblist: ', joblist)

scoutIDdict = {}
for jobID in joblist:
result = self.jobDB.getJobParameters(int(jobID), ['ScoutID']) # <lowest jobID>:<Highest jobID>
if not result['OK']:
self.log.warn(result['Message'])
continue
if not result['Value'].get(int(jobID)):
continue

scoutID = result['Value'][int(jobID)]['ScoutID']
scoutStatus = scoutIDdict.get(scoutID)
if not scoutStatus:
result = self.__getScoutStatus(scoutID)
if not result['OK']:
self.log.warn(result['Message'])
continue
scoutStatus = result['Value']

scoutIDdict[scoutID] = scoutStatus
if scoutStatus['Status'] == 'NotComplete':
self.log.verbose(f"{jobID}: skipping since corresponding scout does not complete yet.")
continue
else:
result = self.__updateJobStatus(jobID, status=scoutStatus['Status'],
minorstatus=scoutStatus['MinorStatus'],
appstatus=scoutStatus['appstatus'])
if not result['OK']:
self.log.warn(result['Message'])

self.log.info('final scoutIDdict:%s' % scoutIDdict)
return S_OK()

def __getScoutStatus(self, scoutid):
ids = scoutid.split(':')
scoutjoblist = list(range(int(ids[0]), int(ids[1]) + 1))

result = self.jobDB.getJobsAttributes(scoutjoblist, ['Status', 'Site'])
if not result['OK']:
return result

donejoblist = []
donesitelist = []
failedjoblist = []
failedsitelist = []
stalledjoblist = []
scoutjobs = result['Value'].keys()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

beware, this is python3 and not py2 code.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is safer:

Suggested change
scoutjobs = result['Value'].keys()
scoutjobs = list(result['Value'])

for scoutjob in scoutjobs:
status = result['Value'][scoutjob]['Status']
site = result['Value'][scoutjob]['Site']
jobid = scoutjob
if status == JobStatus.DONE:
donejoblist.append(jobid)
donesitelist.append(site)
elif status == JobStatus.FAILED:
failedjoblist.append(jobid)
failedsitelist.append(site)
elif status == JobStatus.STALLED:
stalledjoblist.append(jobid)

if self.criteriaSucceeded > len(scoutjobs):
criteriaSucceeded = max(int(len(scoutjobs) * self.criteriaSucceededRate), 1)
self.log.verbose(f'criteriaSucceeded = {self.criteriaSucceeded}')
else:
criteriaSucceeded = self.criteriaSucceeded
self.log.debug(f'criteriaSucceeded = {self.criteriaSucceeded}')

if self.criteriaFailed > len(scoutjobs):
criteriaFailed = max(int(len(scoutjobs) * self.criteriaFailedRate), 1)
self.log.verbose(f'criteriaFailed = {self.criteriaFailed}')
else:
criteriaFailed = self.criteriaFailed
self.log.debug(f'criteriaFailed = {self.criteriaFailed}')

if self.criteriaStalled > len(scoutjobs):
criteriaStalled = max(int(len(scoutjobs) * self.criteriaStalledRate), 1)
self.log.verbose(f'criteriaStalled = {self.criteriaStalled}')
else:
criteriaStalled = self.criteriaStalled
self.log.debug(f'criteriaStalled = {self.criteriaStalled}')

if len(donejoblist) >= criteriaSucceeded:
self.log.verbose(f'Scout (ID = {scoutid}) are done.')
scoutStatus = {'Status': 'Checking', 'MinorStatus': 'Scouting', 'appstatus': 'Scout Complete'}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
scoutStatus = {'Status': 'Checking', 'MinorStatus': 'Scouting', 'appstatus': 'Scout Complete'}
scoutStatus = {'Status': JobStatus.CHECKING, 'MinorStatus': 'Scouting', 'appstatus': 'Scout Complete'}

and similar in the lines below.


elif len(failedjoblist) >= criteriaFailed:
self.log.verbose(f'Scout (ID = {scoutid}) are failed.')
msg = 'Failed scout job ' + str(failedjoblist)
scoutStatus = {'Status': 'Failed', 'MinorStatus': 'Failed in scouting', 'appstatus': msg}

elif len(stalledjoblist) >= criteriaStalled:
self.log.verbose(f'Scout (ID = {scoutid}) are stalled.')
msg = 'Stalled scout job ' + str(stalledjoblist)
scoutStatus = {'Status': 'Stalled', 'MinorStatus': 'Stalled in scouting', 'appstatus': msg}

else:
self.log.verbose(f'Scout (ID = {scoutid}) did not completed.')
scoutStatus = {'Status': 'NotComplete'}

return S_OK(scoutStatus)

def __updateJobStatus(self, job, status=None, minorstatus=None, appstatus=None):
""" This method updates the job status in the JobDB.
"""
self.log.info(f'Job {job} set Status="{status}", MinorStatus="{minorstatus}", ApplicationStatus="{appstatus}".')
if not self.am_getOption('Enable', True):
result = S_OK('DisabledMode')

# Update ApplicationStatus
if not appstatus:
result = self.jobDB.getJobAttributes(job, ['ApplicationStatus'])
if result['OK']:
minorstatus = result['Value']['ApplicationStatus']

self.log.verbose(f"self.jobDB.setJobAttribute({job},'ApplicationStatus','{appstatus}',update=True)")
result = self.jobDB.setJobAttribute(job, 'ApplicationStatus', appstatus, update=True)
if not result['OK']:
return result

# Update MinorStatus
if not minorstatus:
result = self.jobDB.getJobAttributes(job, ['MinorStatus'])
if result['OK']:
minorstatus = result['Value']['MinorStatus']

self.log.verbose(f"self.jobDB.setJobAttribute({job},'MinorStatus','{minorstatus}',update=True)")
result = self.jobDB.setJobAttribute(job, 'MinorStatus', minorstatus, update=True)
if not result['OK']:
return result
qdcampagna marked this conversation as resolved.
Show resolved Hide resolved

# Update ScoutFlag
result = self.jobDB.setJobParameter(int(job), 'ScoutFlag', 1)
if not result['OK']:
return result

# Update Status
if not status: # Retain last minor status for stalled jobs
result = self.jobDB.getJobAttributes(job, ['Status'])
if result['OK']:
status = result['Value']['Status']

self.log.verbose(f"self.jobDB.setJobAttribute({job},'Status','{status}',update=True)")
result = self.jobDB.setJobAttribute(job, 'Status', status, update=True)
if not result['OK']:
return result

logStatus = status
result = self.logDB.addLoggingRecord(job, status=logStatus, minor=minorstatus,
source='ScoutingJobStatusAgent')
if not result['OK']:
self.log.warn(result)

return result
132 changes: 132 additions & 0 deletions src/DIRAC/WorkloadManagementSystem/Executor/Scouting.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
""" Executor to set status "Scouting" for a main job which has scout jobs
"""

from DIRAC import S_OK, S_ERROR

from DIRAC.WorkloadManagementSystem.Executor.Base.OptimizerExecutor import OptimizerExecutor
from DIRAC.WorkloadManagementSystem.Client import JobStatus
from DIRAC.ResourceStatusSystem.Client.SiteStatus import SiteStatus
from DIRAC.WorkloadManagementSystem.Client.JobStateUpdateClient import JobStateUpdateClient


class Scouting(OptimizerExecutor):
"""
The specific Optimizer must provide the following methods:
- optimizeJob() - the main method called for each job
and it can provide:
- initializeOptimizer() before each execution cycle
"""

@classmethod
def initializeOptimizer(cls):
""" Initialization of the optimizer.
"""
cls.__jobDB = JobStateUpdateClient()
return S_OK()

def optimizeJob(self, jid, jobState):
self.jobLog.info('Getting scoutparams from JobParameters')

result = self.__jobDB.getJobParameters(jid, ['ScoutFlag', 'ScoutID'])
if not result['OK']:
return result

rCounter = 0
if result['Value']:
scoutparams = result['Value'].get(jid)
self.jobLog.info('scoutparams: %s' % scoutparams)
if not scoutparams:
self.jobLog.info('Skipping optimizer, since scoutparams are abnormal')
return self.setNextOptimizer(jobState)

scoutID, scoutFlag = self.__getIDandFlag(scoutparams)
fstagni marked this conversation as resolved.
Show resolved Hide resolved
if not scoutID:
self.jobLog.info('Skipping optimizers, since this job has not enough scoutparams.')
return self.setNextOptimizer(jobState)

else:
result = jobState.getManifest()
if not result['OK']:
return result
jobManifest = result['Value']
scoutID = jobManifest.getOption('ScoutID', None)
if not scoutID:
self.jobLog.info('Skipping optimizer, since no scout \
corresponding to this job group')
Comment on lines +54 to +55
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would print lots of empty spaces. Better to do

Suggested change
self.jobLog.info('Skipping optimizer, since no scout \
corresponding to this job group')
self.jobLog.info('Skipping optimizer, since no scout '
'corresponding to this job group')

return self.setNextOptimizer(jobState)

scoutFlag = 0
result = jobState.getAttribute('RescheduleCounter')
if not result['OK']:
return S_ERROR('Could not retrieve RescheduleCounter')
qdcampagna marked this conversation as resolved.
Show resolved Hide resolved
if result['Value'] == None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if result['Value'] == None:
if result['Value'] is None:

return S_ERROR('Reschedule Counter not found')

rCounter = result['Value']
if int(rCounter) > 0:
qdcampagna marked this conversation as resolved.
Show resolved Hide resolved
rCycle = int(rCounter) - 1
result = self.__jobDB.getAtticJobParameters(jid, ['"ScoutFlag"'],
rescheduleCounter=rCycle)
self.jobLog.info("From AtticJobParameter: %s" % result)
fstagni marked this conversation as resolved.
Show resolved Hide resolved
if result['OK']:
try:
scoutFlag = result['Value'].get(rCycle).get('ScoutFlag', 0)
except:
pass
else:
self.jobLog.info(result['Message'])
self.jobLog.info('Setting scoutparams (ID:%s, Flag:%s) to JobParamter'
% (scoutID, scoutFlag))
result = self.__setScoutparamsInJobParameters(jid, scoutID, scoutFlag, jobState)
if not result['OK']:
self.jobLog.info('Skipping, since failed in setting scoutparams of JobParameters.')
return self.setNextOptimizer(jobState)

if int(scoutFlag) == 1:
self.jobLog.info('Skipping optimizer, since corresponding scout jobs complete \
(ScoutFlag = %s)'% scoutFlag)
return self.setNextOptimizer(jobState)

self.jobLog.info('Job %s set scouting status' % jid)
return self.__setScoutingStatus(jobState)

def __getIDandFlag(self, scoutparams):

scoutID = scoutparams.get('ScoutID')
scoutFlag = scoutparams.get('ScoutFlag')
return scoutID, scoutFlag

def __setScoutparamsInJobParameters(self, jid, scoutID, scoutFlag, jobState=None):

if not jobState:
jobState = self.__jobData.jobState

paramList = []
paramList.append(('ScoutID', scoutID))
paramList.append(('ScoutFlag', scoutFlag))
result = self.__jobDB.setJobParameters(jid, paramList)
if not result['OK']:
self.jobLog.info('Skipping, since failed in recovering scoutparams of JobParameters.')

return result

def __setScoutingStatus(self, jobState=None):

if not jobState:
jobState = self.__jobData.jobState

result = jobState.getStatus()
if not (result := jobState.getStatus())['OK']:
return result

opName = self.ex_optimizerName()
result = jobState.setStatus(self.ex_getOption('WaitingStatus', JobStatus.SCOUTING),
minorStatus=self.ex_getOption('WaitingMinorStatus',
'Waiting for Scout Job Completion'),
appStatus="Unknown",
source=opName)
if not result['OK']:
return result

return S_OK()

Loading