From b1c148d4545fe9ac9dbfac8af96fd23a82c16156 Mon Sep 17 00:00:00 2001 From: Stefano Belforte Date: Thu, 9 Jan 2025 15:12:32 +0100 Subject: [PATCH 01/20] add support for --jobId arg to CMSRunAnalysis.py --- scripts/CMSRunAnalysis.py | 28 ++++++++++++++++++++++++++-- 1 file changed, 26 insertions(+), 2 deletions(-) diff --git a/scripts/CMSRunAnalysis.py b/scripts/CMSRunAnalysis.py index c10673b21d..6ed9f79a72 100644 --- a/scripts/CMSRunAnalysis.py +++ b/scripts/CMSRunAnalysis.py @@ -286,6 +286,7 @@ def handleException(exitAcronym, exitCode, exitMsg): def parseArgs(): parser = PassThroughOptionParser() + parser.add_option('--jobId', dest='jobId', type='string') parser.add_option('--json', dest='jsonArgFile', type='string') parser.add_option('-a', dest='archiveJob', type='string') parser.add_option('-o', dest='outFiles', type='string') @@ -319,8 +320,31 @@ def parseArgs(): if value == 'None': setattr(opts, name, None) - # allow for most input arguments to be passed via a JSON file - # in this case only -r and --JobNumber need to be present as arguments + # allow for arguments simply be the jobId (a string because automtic splitting has format like N-M + if getattr(opts, 'jobId', None): + with open('input_args.json', 'r', encoding='UTF-8') as fh: + allArgs = json.load(fh) # read file prepared by DagmanCreator + for arguments in allArgs: + if arguments['CRAB_Id'] == opts.jobId: + break # pick the arguments for this job + # TODO: fail with error if jobId is not found in input_args.json (useful when using via preparelocal) + for key, value in arguments.items(): + setattr(opts, key, value) + + # remap key in input_args.json to the argument names required by CMSRunAnalysis.py + # use as : value_of_argument_name = inputArgs[argMap[argument_name]] + # to ease transition to cleaner code the new key are only added if missing + argMap = { + 'archiveJob': 'CRAB_Archive', 'outFiles': 'CRAB_AdditionalOutputFiles', + 'sourceURL': 'CRAB_ISB', 'cmsswVersion': 'CRAB_JobSW', + 'scramArch': 'CRAB_JobArch', 'runAndLumis': 'runAndLumiMask', + 'inputFile' : 'inputFiles', 'lheInputFiles': 'lheInputFiles' + } + for key, value in argMap.items(): + if not getattr(opts, key, None): + setattr(opts, key, arguments[value]) # assign to our variables + + # allow for most input arguments to be passed via a (job specific) JSON file if getattr(opts, 'jsonArgFile', None): with open(opts.jsonArgFile, 'r', encoding='UTF-8') as fh: arguments = json.load(fh) From cbb60c94c6d637084dae756a66e731297171b324 Mon Sep 17 00:00:00 2001 From: Stefano Belforte Date: Fri, 10 Jan 2025 18:10:01 +0100 Subject: [PATCH 02/20] transfer input_args.json to WN --- src/python/TaskWorker/Actions/DagmanCreator.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/python/TaskWorker/Actions/DagmanCreator.py b/src/python/TaskWorker/Actions/DagmanCreator.py index ba68b46a47..68782d6c5c 100644 --- a/src/python/TaskWorker/Actions/DagmanCreator.py +++ b/src/python/TaskWorker/Actions/DagmanCreator.py @@ -565,6 +565,7 @@ def makeJobSubmit(self, task): else: raise TaskWorkerException(f"Cannot find TaskManagerRun.tar.gz inside the cwd: {os.getcwd()}") info['additional_input_file'] += ", sandbox.tar.gz" # it will be present on SPOOL_DIR after dab_bootstrap + info['additional_input_file'] += ", input_args.json" info['additional_input_file'] += ", run_and_lumis.tar.gz" info['additional_input_file'] += ", input_files.tar.gz" info['additional_input_file'] += ", submit_env.sh" From bfb919ca72d787329582efdbd8d6e4a1941f0250 Mon Sep 17 00:00:00 2001 From: Stefano Belforte Date: Wed, 15 Jan 2025 17:50:29 +0100 Subject: [PATCH 03/20] make it work also for automatic splitting --- .../TaskWorker/Actions/DagmanCreator.py | 164 +++++++++++------- 1 file changed, 105 insertions(+), 59 deletions(-) diff --git a/src/python/TaskWorker/Actions/DagmanCreator.py b/src/python/TaskWorker/Actions/DagmanCreator.py index 68782d6c5c..3d010eb1e1 100644 --- a/src/python/TaskWorker/Actions/DagmanCreator.py +++ b/src/python/TaskWorker/Actions/DagmanCreator.py @@ -56,7 +56,12 @@ SCRIPT DEFER 4 1800 POST Job{count} dag_bootstrap.sh POSTJOB $JOBID $RETURN $RETRY $MAX_RETRIES {taskname} {count} {tempDest} {outputDest} cmsRun_{count}.log.tar.gz {stage} {remoteOutputFiles} #PRE_SKIP Job{count} 3 RETRY Job{count} {maxretries} UNLESS-EXIT 2 -VARS Job{count} count="{count}" runAndLumiMask="job_lumis_{count}.json" lheInputFiles="{lheInputFiles}" firstEvent="{firstEvent}" firstLumi="{firstLumi}" lastEvent="{lastEvent}" firstRun="{firstRun}" maxRuntime="{maxRuntime}" eventsPerLumi="{eventsPerLumi}" seeding="{seeding}" inputFiles="job_input_file_list_{count}.txt" scriptExe="{scriptExe}" scriptArgs="{scriptArgs}" +CRAB_localOutputFiles="\\"{localOutputFiles}\\"" +CRAB_DataBlock="\\"{block}\\"" +CRAB_Destination="\\"{destination}\\"" +VARS Job{count} count="{count}" +# following 3 classAds could possibly be moved to Job.submit but as they are job-dependent +# would need to be done in the PreJob... doing it here is a bit ugly, but simpler +VARS Job{count} My.CRAB_localOutputFiles="\\"{localOutputFiles}\\"" +VARS Job{count} My.CRAB_DataBlock="\\"{block}\\"" +VARS Job{count} My.CRAB_Destination="\\"{destination}\\"" ABORT-DAG-ON Job{count} 3 """ @@ -146,7 +151,7 @@ Log = job_log # args changed... -Arguments = "-a $(CRAB_Archive) --sourceURL=$(CRAB_ISB) --jobNumber=$(CRAB_Id) --cmsswVersion=$(CRAB_JobSW) --scramArch=$(CRAB_JobArch) '--inputFile=$(inputFiles)' '--runAndLumis=$(runAndLumiMask)' --lheInputFiles=$(lheInputFiles) --firstEvent=$(firstEvent) --firstLumi=$(firstLumi) --lastEvent=$(lastEvent) --firstRun=$(firstRun) --seeding=$(seeding) --scriptExe=$(scriptExe) --eventsPerLumi=$(eventsPerLumi) --maxRuntime=$(maxRuntime) '--scriptArgs=$(scriptArgs)' -o $(CRAB_AdditionalOutputFiles)" +Arguments = "--jobId=$(CRAB_Id)" transfer_input_files = CMSRunAnalysis.sh, cmscp.py%(additional_input_file)s transfer_output_files = jobReport.json.$(count), WMArchiveReport.json.$(count) @@ -371,6 +376,7 @@ def __init__(self, config, crabserver, procnum=-1, rucioClient=None): """ need a comment line here """ TaskAction.__init__(self, config, crabserver, procnum) self.rucioClient = rucioClient + self.runningInTW = crabserver is not None def populateGlideinMatching(self, info): """ actually simply set the required arch and microarch """ @@ -403,7 +409,6 @@ def populateGlideinMatching(self, info): self.logger.error(f"Not supported microarch: {min_micro_arch}. Ignore it") info['required_minimum_microarch'] = 'any' - def getDashboardTaskType(self, task): """ Get the dashboard activity name for the task. """ @@ -448,7 +453,6 @@ def isGlobalBlacklistIgnored(self, kwargs): return kwargs['task']['tm_ignore_global_blacklist'] == 'T' - def makeJobSubmit(self, task): """ Create the submit file. This is reused by all jobs in the task; differences @@ -702,14 +706,15 @@ def makeDagSpecs(self, task, siteinfo, jobgroup, block, availablesites, datasite return dagSpecs, i - def prepareLocal(self, dagSpecs, info, kw, inputFiles, subdags): - """ Prepare a file named "input_args.json" with all the input parameters of each jobs. It is a list - with a dictionary for each job. The dictionary key/value pairs are the arguments of gWMS-CMSRunAnalysis.sh - N.B.: in the JDL: "Executable = gWMS-CMSRunAnalysis.sh" and "Arguments = $(CRAB_Archive) --sourceURL=$(CRAB_ISB) ..." - where each argument of each job is set in "input_args.json". - Also, this prepareLocal method prepare a single "InputFiles.tar.gz" file with all the inputs files moved - from the TW to the schedd. - This is used by the client preparelocal command. + def prepareJobArguments(self, dagSpecs, task): + """ Prepare an object with all the input parameters of each jobs. It is a list + with a dictionary for each job. The dictionary key/value pairs are the variables needed in CMSRunAnalysis.py + This will be save in "input_args*.json", a differnt json file for the main DAG and each subdags + Inputs: + dagSpecs : list of dictionaries with information for each DAG job + task: dictionary, the "standard" task dictionary with info from the DataBase TASK table + Returns: + argdicts : list of dictionaries, one per job, with the args needeed by CMSRunAnalysis.py """ argdicts = [] @@ -718,35 +723,66 @@ def prepareLocal(self, dagSpecs, info, kw, inputFiles, subdags): argDict['inputFiles'] = f"job_input_file_list_{dagspec['count']}.txt" #'job_input_file_list_1.txt' argDict['runAndLumiMask'] = f"job_lumis_{dagspec['count']}.json" argDict['CRAB_Id'] = dagspec['count'] #'1' - argDict['lheInputFiles'] = dagspec['lheInputFiles'] #False - argDict['firstEvent'] = dagspec['firstEvent'] #'None' - argDict['lastEvent'] = dagspec['lastEvent'] #'None' - argDict['firstLumi'] = dagspec['firstLumi'] #'None' - argDict['firstRun'] = dagspec['firstRun'] #'None' - argDict['CRAB_Archive'] = info['cachefilename_flatten'] #'sandbox.tar.gz' - argDict['CRAB_ISB'] = info['cacheurl_flatten'] #u'https://cmsweb.cern.ch/crabcache' - argDict['CRAB_JobSW'] = info['jobsw_flatten'] #u'CMSSW_9_2_5' - argDict['CRAB_JobArch'] = info['jobarch_flatten'] #u'slc6_amd64_gcc530' + argDict['lheInputFiles'] = dagspec['lheInputFiles'] # False + argDict['firstEvent'] = dagspec['firstEvent'] # 'None' + argDict['lastEvent'] = dagspec['lastEvent'] # 'None' + argDict['firstLumi'] = dagspec['firstLumi'] # 'None' + argDict['firstRun'] = dagspec['firstRun'] # 'None' + argDict['CRAB_Archive'] = task['tm_user_sandbox'] #'sandbox.tar.gz' + argDict['CRAB_ISB'] = task['tm_cache_url'] # 'https://cmsweb.cern.ch/crabcache' + argDict['CRAB_JobSW'] = task['tm_job_sw'] # 'CMSSW_9_2_5' + argDict['CRAB_JobArch'] = task['tm_job_arch'] # 'slc6_amd64_gcc530' argDict['seeding'] = 'AutomaticSeeding' - argDict['scriptExe'] = kw['task']['tm_scriptexe'] # - argDict['eventsPerLumi'] = kw['task']['tm_events_per_lumi'] # - argDict['maxRuntime'] = kw['task']['max_runtime'] #-1 - argDict['scriptArgs'] = kw['task']['tm_scriptargs'] - argDict['CRAB_AdditionalOutputFiles'] = info['addoutputfiles_flatten'] - #The following two are for fixing up job.submit files - argDict['CRAB_localOutputFiles'] = dagspec['localOutputFiles'] - argDict['CRAB_Destination'] = dagspec['destination'] + argDict['scriptExe'] = task['tm_scriptexe'] # + argDict['eventsPerLumi'] = task['tm_events_per_lumi'] # + argDict['maxRuntime'] = dagspec['maxRuntime'] # -1 + argDict['scriptArgs'] = task['tm_scriptargs'] + argDict['CRAB_AdditionalOutputFiles'] = "{}" + # The following two are for fixing up job.submit files + # SB argDict['CRAB_localOutputFiles'] = dagspec['localOutputFiles'] + # SB argDict['CRAB_Destination'] = dagspec['destination'] argdicts.append(argDict) + return argdicts - with open('input_args.json', 'w', encoding='utf-8') as fd: - json.dump(argdicts, fd) + def prepareTarballForSched(self, filesForSched, subdags): + """ prepare a single "InputFiles.tar.gz" file with all the files to be moved + from the TW to the schedd. + This file will also be used by by the client preparelocal command. + """ with tarfile.open('InputFiles.tar.gz', mode='w:gz') as tf: - for ifname in inputFiles + subdags + ['input_args.json']: + for ifname in filesForSched + subdags: tf.add(ifname) def createSubdag(self, splitterResult, **kwargs): - """ beware the "Sub" in the name ! This is used also for Main DAG """ + """ beware the "Sub" in the name ! This is used also for Main DAG + Does the actual DAG file creation and writes out relevant files + Handles both conventional tasks (only one DAG created in the TW) and + automatic splitting (multiple subdags which will be added in the scheduler by + the PreDag.py script which calls this DagmanCreator + Returns: + info : dictionary : passes info to next action (DagmanSubmitter) + splitterResult : object : this is the output of previous action (Splitter) and is part of input + arguments to DagmanCreator ! As far as Stefano can tell returning it + here is a "perverse" way to pass it also to DagmanSubmitter + subdags : list : list of subdags files created which will need to be sent to scheduler + Stefano does not understans why it is needed since the subdags will be + overwritten by PreDag in the scheduler, maybe DAGMAN requires that the subdag + file indicated in the DAG exists, even if empty and eventually filled by the PreDag + e.g. the probe stage DAG ends with: + SUBDAG EXTERNAL Job0SubJobs RunJobs0.subdag NOOP + SCRIPT DEFER 4 300 PRE Job0SubJobs dag_bootstrap.sh PREDAG processing 5 0 + + Side effects - writes these files to cwd: + RunJobs.dag : the initial DAGMAN which will be submitted by DagmanSubmitter + RunJobs{0,1,2,3}.subdag : the DAGMAN description for processing and tail subdags + input_args.json : the arguments needed by CMSRunAnalysis.py for each job + datadiscovery.pkl : the object returned in output from DataDiscovery action, PreDag will need it + in order to call Splitter and create the correct subdags + taskinformation.pkl : the content of the task dictionary + taskworkerconfig.pkl : the content of the TaskWorkerConfig object + site.ad.json : sites assigned to jobs in each job group (info from Splitter) + """ startjobid = kwargs.get('startjobid', 0) parent = kwargs.get('parent', None) @@ -1076,35 +1112,44 @@ def getBlacklistMsg(): shutil.rmtree(tempDir2) if stage in ('probe', 'conventional'): - name = "RunJobs.dag" - ## Cache data discovery + dagFileName = "RunJobs.dag" with open("datadiscovery.pkl", "wb") as fd: - pickle.dump(splitterResult[1], fd) - - ## Cache task information + pickle.dump(splitterResult[1], fd) # Cache data discovery with open("taskinformation.pkl", "wb") as fd: - pickle.dump(kwargs['task'], fd) - - ## Cache TaskWorker configuration + pickle.dump(kwargs['task'], fd) # Cache task information with open("taskworkerconfig.pkl", "wb") as fd: - pickle.dump(self.config, fd) + pickle.dump(self.config, fd) # Cache TaskWorker configuration elif stage == 'processing': - name = "RunJobs0.subdag" + dagFileName = "RunJobs0.subdag" else: - name = f"RunJobs{parent}.subdag" + dagFileName = f"RunJobs{parent}.subdag" + argFileName = "input_args.json" ## Cache site information with open("site.ad.json", "w", encoding='utf-8') as fd: json.dump(siteinfo, fd) + ## Save the DAG into a file. - with open(name, "w", encoding='utf-8') as fd: + with open(dagFileName, "w", encoding='utf-8') as fd: fd.write(dag) kwargs['task']['jobcount'] = len(dagSpecs) info = self.makeJobSubmit(kwargs['task']) + # list of input arguments needed for each jobs + argdicts = self.prepareJobArguments(dagSpecs, kwargs['task']) + # save the input arguments to each job's CMSRunAnalysis.py in input_args.json file + if stage in ['processing', 'tail']: # add to argument list from previous stage + with open(argFileName, 'r', encoding='utf-8') as fd: + oldArgs = json.load(fd) + argdicts = oldArgs + argdicts + # no worry of overwriting, even in automatic splitting multiple DagmanCreator + # is executed inside PreDag which is wrapped with a lock + with open(argFileName, 'w', encoding='utf-8') as fd: + json.dump(argdicts, fd) + maxidle = getattr(self.config.TaskWorker, 'maxIdle', MAX_IDLE_JOBS) if maxidle == -1: maxidle = info['jobcount'] @@ -1128,7 +1173,7 @@ def getBlacklistMsg(): elif info.get('faillimit') < 0: info['faillimit'] = -1 - return info, splitterResult, subdags, dagSpecs + return info, splitterResult, subdags def getHighPrioUsers(self, userProxy, workflow, egroups): """ get the list of high priority users """ @@ -1170,7 +1215,7 @@ def executeInternal(self, *args, **kw): sandboxTarBall = 'sandbox.tar.gz' # Bootstrap the ISB if we are running in the TW - if self.crabserver: + if self.runningInTW: username = kw['task']['tm_username'] taskname = kw['task']['tm_taskname'] sandboxName = kw['task']['tm_user_sandbox'] @@ -1195,19 +1240,20 @@ def executeInternal(self, *args, **kw): kw['task']['dbinstance'] = self.crabserver.getDbInstance() params = {} - inputFiles = ['gWMS-CMSRunAnalysis.sh', 'submit_env.sh', 'CMSRunAnalysis.sh', 'cmscp.py', 'cmscp.sh', 'RunJobs.dag', 'Job.submit', 'dag_bootstrap.sh', - 'AdjustSites.py', 'site.ad.json', 'datadiscovery.pkl', 'taskinformation.pkl', 'taskworkerconfig.pkl', - 'run_and_lumis.tar.gz', 'input_files.tar.gz'] + # files to be transferred to remove WN's via Job.submit, could pack most in a tarball + filesForWN = ['submit_env.sh', 'CMSRunAnalysis.sh', 'cmscp.py', 'cmscp.sh', 'CMSRunAnalysis.tar.gz', + 'run_and_lumis.tar.gz', 'input_files.tar.gz', 'input_args.json'] + # files to be transferred to the scheduler by fDagmanSubmitter (these will all be placed in InputFiles.tar.gz) + filesForSched = filesForWN + \ + ['gWMS-CMSRunAnalysis.sh', 'RunJobs.dag', 'Job.submit', 'dag_bootstrap.sh', + 'AdjustSites.py', 'site.ad.json', 'TaskManagerRun.tar.gz', + 'datadiscovery.pkl', 'taskinformation.pkl', 'taskworkerconfig.pkl',] - if os.path.exists("CMSRunAnalysis.tar.gz"): - inputFiles.append("CMSRunAnalysis.tar.gz") - if os.path.exists("TaskManagerRun.tar.gz"): - inputFiles.append("TaskManagerRun.tar.gz") if kw['task']['tm_input_dataset']: - inputFiles.append("input_dataset_lumis.json") - inputFiles.append("input_dataset_duplicate_lumis.json") + filesForSched.append("input_dataset_lumis.json") + filesForSched.append("input_dataset_duplicate_lumis.json") - info, splitterResult, subdags, dagSpecs = self.createSubdag(*args, **kw) + info, splitterResult, subdags = self.createSubdag(*args, **kw) # as splitter summary is useful for dryrun, let's add it to the InputFiles tarball jobGroups = splitterResult[0] # the first returned value of Splitter action is the splitterFactory output @@ -1216,9 +1262,9 @@ def executeInternal(self, *args, **kw): jobs = jobgroup.getJobs() splittingSummary.addJobs(jobs) splittingSummary.dump('splitting-summary.json') - inputFiles.append('splitting-summary.json') + filesForSched.append('splitting-summary.json') - self.prepareLocal(dagSpecs, info, kw, inputFiles, subdags) + self.prepareTarballForSched(filesForSched, subdags) return info, params, ["InputFiles.tar.gz"], splitterResult From 7ae4331bcd08972be810667cf1053de3d91f4ff2 Mon Sep 17 00:00:00 2001 From: Stefano Belforte Date: Fri, 17 Jan 2025 14:56:02 +0100 Subject: [PATCH 04/20] pylint: avoid undef. vars in CMSRunAnalysis.py --- scripts/CMSRunAnalysis.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/scripts/CMSRunAnalysis.py b/scripts/CMSRunAnalysis.py index 6ed9f79a72..e5d88006ca 100644 --- a/scripts/CMSRunAnalysis.py +++ b/scripts/CMSRunAnalysis.py @@ -322,12 +322,15 @@ def parseArgs(): # allow for arguments simply be the jobId (a string because automtic splitting has format like N-M if getattr(opts, 'jobId', None): + arguments = {} with open('input_args.json', 'r', encoding='UTF-8') as fh: allArgs = json.load(fh) # read file prepared by DagmanCreator - for arguments in allArgs: - if arguments['CRAB_Id'] == opts.jobId: - break # pick the arguments for this job - # TODO: fail with error if jobId is not found in input_args.json (useful when using via preparelocal) + for args in allArgs: + if args['CRAB_Id'] == opts.jobId: + arguments = args # pick the arguments for this job + break + if not arguments: + raise Exception("input jobId not found in input_args.json") for key, value in arguments.items(): setattr(opts, key, value) @@ -346,6 +349,7 @@ def parseArgs(): # allow for most input arguments to be passed via a (job specific) JSON file if getattr(opts, 'jsonArgFile', None): + arguments = {} with open(opts.jsonArgFile, 'r', encoding='UTF-8') as fh: arguments = json.load(fh) for key, value in arguments.items(): From 63e4ec0fed5422d1bd828f896e0262baaa98c055 Mon Sep 17 00:00:00 2001 From: Stefano Belforte Date: Fri, 17 Jan 2025 16:34:56 +0100 Subject: [PATCH 05/20] remove unused outFiles and sourceURL from CMSRunAnalysis --- scripts/CMSRunAnalysis.py | 22 +++------------------- 1 file changed, 3 insertions(+), 19 deletions(-) diff --git a/scripts/CMSRunAnalysis.py b/scripts/CMSRunAnalysis.py index e5d88006ca..c8004c5fcf 100644 --- a/scripts/CMSRunAnalysis.py +++ b/scripts/CMSRunAnalysis.py @@ -289,9 +289,7 @@ def parseArgs(): parser.add_option('--jobId', dest='jobId', type='string') parser.add_option('--json', dest='jsonArgFile', type='string') parser.add_option('-a', dest='archiveJob', type='string') - parser.add_option('-o', dest='outFiles', type='string') parser.add_option('--inputFile', dest='inputFile', type='string') - parser.add_option('--sourceURL', dest='sourceURL', type='string') parser.add_option('--jobNumber', dest='jobNumber', type='string') parser.add_option('--cmsswVersion', dest='cmsswVersion', type='string') parser.add_option('--scramArch', dest='scramArch', type='string') @@ -338,8 +336,8 @@ def parseArgs(): # use as : value_of_argument_name = inputArgs[argMap[argument_name]] # to ease transition to cleaner code the new key are only added if missing argMap = { - 'archiveJob': 'CRAB_Archive', 'outFiles': 'CRAB_AdditionalOutputFiles', - 'sourceURL': 'CRAB_ISB', 'cmsswVersion': 'CRAB_JobSW', + 'archiveJob': 'CRAB_Archive', + 'cmsswVersion': 'CRAB_JobSW', 'scramArch': 'CRAB_JobArch', 'runAndLumis': 'runAndLumiMask', 'inputFile' : 'inputFiles', 'lheInputFiles': 'lheInputFiles' } @@ -367,12 +365,10 @@ def parseArgs(): try: print(f"==== Parameters Dump at {UTCNow()} ===") print("archiveJob: ", opts.archiveJob) - print("sourceURL: ", opts.sourceURL) print("jobNumber: ", opts.jobNumber) print("cmsswVersion: ", opts.cmsswVersion) print("scramArch: ", opts.scramArch) print("inputFile ", opts.inputFile) - print("outFiles: ", opts.outFiles) print("runAndLumis: ", opts.runAndLumis) print("lheInputFiles: ", opts.lheInputFiles) print("firstEvent: ", opts.firstEvent) @@ -901,19 +897,7 @@ def compareBrachListWithReference(branchList, tier): mintime() sys.exit(EC_ReportHandlingErr) - # rename output files. Doing this after checksums otherwise outfile is not found. - if jobExitCode == 0: - try: - oldName = 'UNKNOWN' - newName = 'UNKNOWN' - for oldName, newName in literal_eval(options.outFiles).items(): - os.rename(oldName, newName) - except Exception as ex: # pylint: disable=broad-except - handleException("FAILED", EC_MoveOutErr, f"Exception while renaming file {oldName} to {newName}.") - mintime() - sys.exit(EC_MoveOutErr) - else: - mintime() + mintime() print(f"==== CMSRunAnalysis.py FINISHED at {UTCNow()} ====") print(f"Local time : {time.ctime()}") From 23ce7be7f795deffc269802b8732f9f97b1b8d93 Mon Sep 17 00:00:00 2001 From: Stefano Belforte Date: Fri, 17 Jan 2025 17:14:19 +0100 Subject: [PATCH 06/20] use same args name in input_args.json and CMSRunAnalysis --- scripts/CMSRunAnalysis.py | 14 +++++--------- scripts/TweakPSet.py | 4 ++-- src/python/TaskWorker/Actions/DagmanCreator.py | 10 +++++----- 3 files changed, 12 insertions(+), 16 deletions(-) diff --git a/scripts/CMSRunAnalysis.py b/scripts/CMSRunAnalysis.py index c8004c5fcf..2fe5e05c5a 100644 --- a/scripts/CMSRunAnalysis.py +++ b/scripts/CMSRunAnalysis.py @@ -288,8 +288,8 @@ def parseArgs(): parser = PassThroughOptionParser() parser.add_option('--jobId', dest='jobId', type='string') parser.add_option('--json', dest='jsonArgFile', type='string') - parser.add_option('-a', dest='archiveJob', type='string') - parser.add_option('--inputFile', dest='inputFile', type='string') + parser.add_option('--userSandbox', dest='userSandbox', type='string') + parser.add_option('--inputFileList', dest='inputFileList', type='string') parser.add_option('--jobNumber', dest='jobNumber', type='string') parser.add_option('--cmsswVersion', dest='cmsswVersion', type='string') parser.add_option('--scramArch', dest='scramArch', type='string') @@ -336,10 +336,6 @@ def parseArgs(): # use as : value_of_argument_name = inputArgs[argMap[argument_name]] # to ease transition to cleaner code the new key are only added if missing argMap = { - 'archiveJob': 'CRAB_Archive', - 'cmsswVersion': 'CRAB_JobSW', - 'scramArch': 'CRAB_JobArch', 'runAndLumis': 'runAndLumiMask', - 'inputFile' : 'inputFiles', 'lheInputFiles': 'lheInputFiles' } for key, value in argMap.items(): if not getattr(opts, key, None): @@ -364,11 +360,11 @@ def parseArgs(): try: print(f"==== Parameters Dump at {UTCNow()} ===") - print("archiveJob: ", opts.archiveJob) + print("userSandbox: ", opts.userSandbox) print("jobNumber: ", opts.jobNumber) print("cmsswVersion: ", opts.cmsswVersion) print("scramArch: ", opts.scramArch) - print("inputFile ", opts.inputFile) + print("inputFileList ", opts.inputFileList) print("runAndLumis: ", opts.runAndLumis) print("lheInputFiles: ", opts.lheInputFiles) print("firstEvent: ", opts.firstEvent) @@ -693,7 +689,7 @@ def compareBrachListWithReference(branchList, tier): print(f"==== SCRAM Obj INITIALIZED at {UTCNow()} ====") print("==== Extract user sandbox in top and CMSSW directory ====") - extractUserSandbox(options.archiveJob) + extractUserSandbox(options.userSandbox) #Multi-microarch env: Setting cmssw env after extracting the sandbox print(f"==== SCRAM runtime environment CREATED at {UTCNow()} ====") diff --git a/scripts/TweakPSet.py b/scripts/TweakPSet.py index 756a3d38ee..734cb9b8c4 100644 --- a/scripts/TweakPSet.py +++ b/scripts/TweakPSet.py @@ -121,8 +121,8 @@ def createScriptLines(opts, pklIn): if opts.runAndLumis: runAndLumis = readFileFromTarball(opts.runAndLumis, 'run_and_lumis.tar.gz') inputFiles = {} - if opts.inputFile: - inputFiles = readFileFromTarball(opts.inputFile, 'input_files.tar.gz') + if opts.inputFileList: + inputFiles = readFileFromTarball(opts.inputFileList, 'input_files.tar.gz') # build a tweak object with the needed changes to be applied to PSet tweak = PSetTweak() diff --git a/src/python/TaskWorker/Actions/DagmanCreator.py b/src/python/TaskWorker/Actions/DagmanCreator.py index 3d010eb1e1..2e7941b3d8 100644 --- a/src/python/TaskWorker/Actions/DagmanCreator.py +++ b/src/python/TaskWorker/Actions/DagmanCreator.py @@ -720,18 +720,18 @@ def prepareJobArguments(self, dagSpecs, task): argdicts = [] for dagspec in dagSpecs: argDict = {} - argDict['inputFiles'] = f"job_input_file_list_{dagspec['count']}.txt" #'job_input_file_list_1.txt' - argDict['runAndLumiMask'] = f"job_lumis_{dagspec['count']}.json" + argDict['inputFileList'] = f"job_input_file_list_{dagspec['count']}.txt" #'job_input_file_list_1.txt' + argDict['runAndLumis'] = f"job_lumis_{dagspec['count']}.json" argDict['CRAB_Id'] = dagspec['count'] #'1' argDict['lheInputFiles'] = dagspec['lheInputFiles'] # False argDict['firstEvent'] = dagspec['firstEvent'] # 'None' argDict['lastEvent'] = dagspec['lastEvent'] # 'None' argDict['firstLumi'] = dagspec['firstLumi'] # 'None' argDict['firstRun'] = dagspec['firstRun'] # 'None' - argDict['CRAB_Archive'] = task['tm_user_sandbox'] #'sandbox.tar.gz' + argDict['userSandbox'] = task['tm_user_sandbox'] #'sandbox.tar.gz' argDict['CRAB_ISB'] = task['tm_cache_url'] # 'https://cmsweb.cern.ch/crabcache' - argDict['CRAB_JobSW'] = task['tm_job_sw'] # 'CMSSW_9_2_5' - argDict['CRAB_JobArch'] = task['tm_job_arch'] # 'slc6_amd64_gcc530' + argDict['cmsswVersion'] = task['tm_job_sw'] # 'CMSSW_9_2_5' + argDict['scramArch'] = task['tm_job_arch'] # 'slc6_amd64_gcc530' argDict['seeding'] = 'AutomaticSeeding' argDict['scriptExe'] = task['tm_scriptexe'] # argDict['eventsPerLumi'] = task['tm_events_per_lumi'] # From 74be0c52938c0804a41587a384c8f91a4e319b5d Mon Sep 17 00:00:00 2001 From: Stefano Belforte Date: Fri, 17 Jan 2025 17:29:31 +0100 Subject: [PATCH 07/20] cleanup a bit DagmanCreator --- .../TaskWorker/Actions/DagmanCreator.py | 22 +++++++------------ .../TaskWorker/Actions/DagmanSubmitter.py | 1 - 2 files changed, 8 insertions(+), 15 deletions(-) diff --git a/src/python/TaskWorker/Actions/DagmanCreator.py b/src/python/TaskWorker/Actions/DagmanCreator.py index 2e7941b3d8..5133e9500b 100644 --- a/src/python/TaskWorker/Actions/DagmanCreator.py +++ b/src/python/TaskWorker/Actions/DagmanCreator.py @@ -83,7 +83,6 @@ +CRAB_PublishName = %(publishname)s +CRAB_Publish = %(publication)s +CRAB_PublishDBSURL = %(publishdbsurl)s -+CRAB_ISB = %(cacheurl)s +CRAB_AdditionalOutputFiles = %(addoutputfiles)s +CRAB_EDMOutputFiles = %(edmoutfiles)s +CRAB_TFileOutputFiles = %(tfileoutfiles)s @@ -100,11 +99,9 @@ +CRAB_DbInstance = %(dbinstance)s +CRAB_NumAutomJobRetries = %(numautomjobretries)s CRAB_Attempt = %(attempt)d -CRAB_ISB = %(cacheurl_flatten)s -CRAB_AdditionalOutputFiles = %(addoutputfiles_flatten)s -CRAB_JobSW = %(jobsw_flatten)s -CRAB_JobArch = %(jobarch_flatten)s -CRAB_Archive = %(cachefilename_flatten)s +CRAB_AdditionalOutputFiles = %(addoutputfiles)s +CRAB_JobSW = %(jobsw)s +CRAB_JobArch = %(jobarch)s CRAB_Id = $(count) +CRAB_Id = "$(count)" +CRAB_JobCount = %(jobcount)d @@ -300,7 +297,7 @@ def transform_strings(data): """ info = {} for var in 'workflow', 'jobtype', 'jobsw', 'jobarch', 'inputdata', 'primarydataset', 'splitalgo', 'algoargs', \ - 'cachefilename', 'cacheurl', 'userhn', 'publishname', 'asyncdest', 'dbsurl', 'publishdbsurl', \ + 'userhn', 'publishname', 'asyncdest', 'dbsurl', 'publishdbsurl', \ 'userdn', 'requestname', 'oneEventMode', 'tm_user_vo', 'tm_user_role', 'tm_user_group', \ 'tm_maxmemory', 'tm_numcores', 'tm_maxjobruntime', 'tm_priority', \ 'stageoutpolicy', 'taskType', 'worker_name', 'cms_wmtool', 'cms_tasktype', 'cms_type', \ @@ -331,10 +328,10 @@ def transform_strings(data): info['algoargs'] = '"' + json.dumps({'halt_job_on_file_boundaries': False, 'splitOnRun': False, splitArgName : data['algoargs']}).replace('"', r'\"') + '"' info['attempt'] = 0 - for var in ["cacheurl", "jobsw", "jobarch", "cachefilename", "asyncdest", "requestname"]: - info[var+"_flatten"] = data[var] + for var in ["jobsw", "jobarch", "asyncdest", "requestname"]: + info[var] = data[var] - info["addoutputfiles_flatten"] = '{}' + info["addoutputfiles"] = '{}' temp_dest, dest = makeLFNPrefixes(data) info["temp_dest"] = temp_dest @@ -482,8 +479,6 @@ def makeJobSubmit(self, task): #info['primarydataset'] = info['tm_primary_dataset'] info['splitalgo'] = info['tm_split_algo'] info['algoargs'] = info['tm_split_args'] - info['cachefilename'] = info['tm_user_sandbox'] - info['cacheurl'] = info['tm_cache_url'] info['userhn'] = info['tm_username'] info['publishname'] = info['tm_publish_name'] info['asyncdest'] = info['tm_asyncdest'] @@ -550,7 +545,7 @@ def makeJobSubmit(self, task): info['accelerator_jdl'] += f"\n+CUDARuntime={classad.quote(cudaRuntime)}" else: info['accelerator_jdl'] = '' - arch = info['jobarch_flatten'].split("_")[0] # extracts "slc7" from "slc7_amd64_gcc10" + arch = info['jobarch'].split("_")[0] # extracts "slc7" from "slc7_amd64_gcc10" required_os_list = ARCH_TO_OS.get(arch) if not required_os_list: raise SubmissionRefusedException(f"Unsupported architecture {arch}") @@ -729,7 +724,6 @@ def prepareJobArguments(self, dagSpecs, task): argDict['firstLumi'] = dagspec['firstLumi'] # 'None' argDict['firstRun'] = dagspec['firstRun'] # 'None' argDict['userSandbox'] = task['tm_user_sandbox'] #'sandbox.tar.gz' - argDict['CRAB_ISB'] = task['tm_cache_url'] # 'https://cmsweb.cern.ch/crabcache' argDict['cmsswVersion'] = task['tm_job_sw'] # 'CMSSW_9_2_5' argDict['scramArch'] = task['tm_job_arch'] # 'slc6_amd64_gcc530' argDict['seeding'] = 'AutomaticSeeding' diff --git a/src/python/TaskWorker/Actions/DagmanSubmitter.py b/src/python/TaskWorker/Actions/DagmanSubmitter.py index f0a0458379..334cc712d2 100644 --- a/src/python/TaskWorker/Actions/DagmanSubmitter.py +++ b/src/python/TaskWorker/Actions/DagmanSubmitter.py @@ -44,7 +44,6 @@ ('+CRAB_Publish', 'publication'), ('+CRAB_PublishDBSURL', 'publishdbsurl'), ('+CRAB_PrimaryDataset', 'primarydataset'), - ('+CRAB_ISB', 'cacheurl'), ('+CRAB_AdditionalOutputFiles', 'addoutputfiles'), ('+CRAB_EDMOutputFiles', 'edmoutfiles'), ('+CRAB_TFileOutputFiles', 'tfileoutfiles'), From 83e02b3af297a8c51a3176b582b74884a12519e4 Mon Sep 17 00:00:00 2001 From: Stefano Belforte Date: Fri, 17 Jan 2025 18:07:26 +0100 Subject: [PATCH 08/20] fixes --- src/python/TaskWorker/Actions/DagmanCreator.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/src/python/TaskWorker/Actions/DagmanCreator.py b/src/python/TaskWorker/Actions/DagmanCreator.py index 5133e9500b..c3b3c19aaa 100644 --- a/src/python/TaskWorker/Actions/DagmanCreator.py +++ b/src/python/TaskWorker/Actions/DagmanCreator.py @@ -307,10 +307,12 @@ def transform_strings(data): if val is None: info[var] = 'undefined' else: + # should better handle double quotes when filling JDL's and remove this ! + # now it is a mess since some things in info get their double quote here, some in the JOB_SUBMIT string info[var] = json.dumps(val) - for var in 'accounting_group', 'accounting_group_user': - info[var] = data[var] + #for var in 'accounting_group', 'accounting_group_user': + # info[var] = data[var] for var in 'savelogsflag', 'blacklistT1', 'retry_aso', 'aso_timeout', 'publication', 'saveoutput', 'numautomjobretries', 'jobcount': info[var] = int(data[var]) @@ -328,10 +330,10 @@ def transform_strings(data): info['algoargs'] = '"' + json.dumps({'halt_job_on_file_boundaries': False, 'splitOnRun': False, splitArgName : data['algoargs']}).replace('"', r'\"') + '"' info['attempt'] = 0 - for var in ["jobsw", "jobarch", "asyncdest", "requestname"]: - info[var] = data[var] + #for var in ["jobsw", "jobarch", "asyncdest", "requestname"]: + # info[var] = data[var] - info["addoutputfiles"] = '{}' + # info["addoutputfiles"] = '{}' temp_dest, dest = makeLFNPrefixes(data) info["temp_dest"] = temp_dest From 7c3b2985d1e2b58c2b940742ef7ae3cdf9a05b4d Mon Sep 17 00:00:00 2001 From: Stefano Belforte Date: Mon, 20 Jan 2025 18:14:16 +0100 Subject: [PATCH 09/20] WIP --- .../TaskWorker/Actions/DagmanCreator.py | 246 ++++++++++-------- 1 file changed, 141 insertions(+), 105 deletions(-) diff --git a/src/python/TaskWorker/Actions/DagmanCreator.py b/src/python/TaskWorker/Actions/DagmanCreator.py index c3b3c19aaa..de72caa68e 100644 --- a/src/python/TaskWorker/Actions/DagmanCreator.py +++ b/src/python/TaskWorker/Actions/DagmanCreator.py @@ -72,61 +72,61 @@ JOB_SUBMIT = \ """ -+CRAB_ReqName = %(requestname)s -+CRAB_Workflow = %(workflow)s -+CMS_JobType = %(jobtype)s -+CRAB_JobSW = %(jobsw)s -+CRAB_JobArch = %(jobarch)s -+CRAB_DBSURL = %(dbsurl)s ++CRAB_ReqName = "%(requestname)s" ++CRAB_Workflow = "%(workflow)s" ++CMS_JobType = "%(jobtype)s" ++CRAB_JobSW = "%(jobsw)s" ++CRAB_JobArch = "%(jobarch)s" ++CRAB_DBSURL = "%(dbsurl)s" +CRAB_PostJobStatus = "NOT RUN" +CRAB_PostJobLastUpdate = 0 -+CRAB_PublishName = %(publishname)s -+CRAB_Publish = %(publication)s -+CRAB_PublishDBSURL = %(publishdbsurl)s ++CRAB_PublishName = "%(publishname)s" ++CRAB_Publish = "%(publication)s" ++CRAB_PublishDBSURL = "%(publishdbsurl)s" +CRAB_AdditionalOutputFiles = %(addoutputfiles)s +CRAB_EDMOutputFiles = %(edmoutfiles)s +CRAB_TFileOutputFiles = %(tfileoutfiles)s -+CRAB_UserDN = %(userdn)s -+CRAB_UserHN = %(userhn)s -+CRAB_AsyncDest = %(asyncdest)s -+CRAB_StageoutPolicy = %(stageoutpolicy)s -+CRAB_UserRole = %(tm_user_role)s -+CRAB_UserGroup = %(tm_user_group)s -+CRAB_TaskWorker = %(worker_name)s -+CRAB_RetryOnASOFailures = %(retry_aso)s ++CRAB_UserDN = "%(userdn)s" ++CRAB_UserHN = "%(userhn)s" ++CRAB_AsyncDest = "%(asyncdest)s" ++CRAB_StageoutPolicy = "%(stageoutpolicy)s" ++CRAB_UserRole = "%(tm_user_role)s" ++CRAB_UserGroup = "%(tm_user_group)s" ++CRAB_TaskWorker = "%(worker_name)s" ++CRAB_RetryOnASOFailures = "%(retry_aso)s" +CRAB_ASOTimeout = %(aso_timeout)s -+CRAB_RestHost = %(resthost)s -+CRAB_DbInstance = %(dbinstance)s ++CRAB_RestHost = "%(resthost)s" ++CRAB_DbInstance = "%(dbinstance)s" +CRAB_NumAutomJobRetries = %(numautomjobretries)s CRAB_Attempt = %(attempt)d CRAB_AdditionalOutputFiles = %(addoutputfiles)s -CRAB_JobSW = %(jobsw)s -CRAB_JobArch = %(jobarch)s +CRAB_JobSW = "%(jobsw)s" +CRAB_JobArch = "%(jobarch)s" CRAB_Id = $(count) +CRAB_Id = "$(count)" +CRAB_JobCount = %(jobcount)d +CRAB_OutTempLFNDir = "%(temp_dest)s" +CRAB_OutLFNDir = "%(output_dest)s" -+CRAB_oneEventMode = %(oneEventMode)s -+CRAB_PrimaryDataset = %(primarydataset)s ++CRAB_oneEventMode = "%(oneEventMode)s" ++CRAB_PrimaryDataset = "%(primarydataset)s" +CRAB_DAGType = "Job" -accounting_group = %(accounting_group)s -accounting_group_user = %(accounting_group_user)s -+CRAB_SubmitterIpAddr = %(submitter_ip_addr)s +accounting_group = "%(accounting_group)s" +accounting_group_user = "%(accounting_group_user)s" ++CRAB_SubmitterIpAddr = "%(submitter_ip_addr)s" +CRAB_TaskLifetimeDays = %(task_lifetime_days)s +CRAB_TaskEndTime = %(task_endtime)s -+CRAB_SplitAlgo = %(splitalgo)s ++CRAB_SplitAlgo = "%(splitalgo)s" +CRAB_AlgoArgs = %(algoargs)s -+CMS_WMTool = %(cms_wmtool)s -+CMS_TaskType = %(cms_tasktype)s ++CMS_WMTool = "%(cms_wmtool)s" ++CMS_TaskType = "%(cms_tasktype)s" +CMS_SubmissionTool = "CRAB" -+CMS_Type = %(cms_type)s ++CMS_Type = "%(cms_type)s" # These attributes help gWMS decide what platforms this job can run on; see https://twiki.cern.ch/twiki/bin/view/CMSPublic/CompOpsMatchArchitecture -+REQUIRED_ARCH = %(required_arch)s -+REQUIRED_MINIMUM_MICROARCH = %(required_minimum_microarch)s -+DESIRED_CMSDataset = %(inputdata)s ++REQUIRED_ARCH = "%(required_arch)s" ++REQUIRED_MINIMUM_MICROARCH = "%(required_minimum_microarch)s" ++DESIRED_CMSDataset = "%(inputdata)s" +JOBGLIDEIN_CMSSite = "$$([ifThenElse(GLIDEIN_CMSSite is undefined, \\"Unknown\\", GLIDEIN_CMSSite)])" job_ad_information_attrs = MATCH_EXP_JOBGLIDEIN_CMSSite, JOBGLIDEIN_CMSSite, RemoteSysCpu, RemoteUserCpu @@ -290,6 +290,27 @@ def validateUserLFNs(path, outputFiles): msg += "\n and therefore can not be handled in our DataBase" raise SubmissionRefusedException(msg) +def createJobSubmit(data): + """ + create a JobSubmit object template which will be persisted as Job.submit file and send to + scheduler, where it will be used to submit individual jobs after PreJob.py fills in the job-specific valued + """ + jobSubmit = htcondor.Submit() + for var in 'workflow', 'jobtype', 'jobsw', 'jobarch', 'inputdata', 'primarydataset', 'splitalgo', 'algoargs', \ + 'userhn', 'publishname', 'asyncdest', 'dbsurl', 'publishdbsurl', \ + 'userdn', 'requestname', 'oneEventMode', 'tm_user_vo', 'tm_user_role', 'tm_user_group', \ + 'tm_maxmemory', 'tm_numcores', 'tm_maxjobruntime', 'tm_priority', \ + 'stageoutpolicy', 'taskType', 'worker_name', 'cms_wmtool', 'cms_tasktype', 'cms_type', \ + 'required_arch', 'required_minimum_microarch', 'resthost', 'dbinstance', 'submitter_ip_addr', \ + 'task_lifetime_days', 'task_endtime', 'maxproberuntime', 'maxtailruntime': + val = data.get(var, None) + if val is None: + jobSubmit[var] = 'undefined' + else: + # should better handle double quotes when filling JDL's and remove this ! + # now it is a mess since some things in info get their double quote here, some in the JOB_SUBMIT string + jobSubmit[var] = val + def transform_strings(data): """ Converts the arguments in the data dictionary to the arguments necessary @@ -330,8 +351,9 @@ def transform_strings(data): info['algoargs'] = '"' + json.dumps({'halt_job_on_file_boundaries': False, 'splitOnRun': False, splitArgName : data['algoargs']}).replace('"', r'\"') + '"' info['attempt'] = 0 - #for var in ["jobsw", "jobarch", "asyncdest", "requestname"]: - # info[var] = data[var] + # SB these must not be "json dumped" here ! So revert what doen at line 312 ... oh my my my ..... + for var in ["jobsw", "jobarch", "asyncdest", "requestname"]: + info[var] = data[var] # info["addoutputfiles"] = '{}' @@ -454,77 +476,92 @@ def isGlobalBlacklistIgnored(self, kwargs): def makeJobSubmit(self, task): """ - Create the submit file. This is reused by all jobs in the task; differences - between the jobs are taken care of in the makeDagSpecs. - Any key defined in the dictionary passed to transform_strings - is deleted unless accounted for in the transform_strings method. + Prepare an HTCondor Submit object which will serve as template for + all job submissions. It will be persisted in Job.submiy JDL file and + customized by PreJob.py for each job using info from DAG + Differences between the jobs are taken care of in the makeDagSpecs and + propagated to the scheduler via the DAG description files """ if os.path.exists("Job.submit"): - info = {'jobcount': int(task['jobcount'])} - return info - - # From here on out, we convert from tm_* names to the DataWorkflow names - info = dict(task) - - info['workflow'] = task['tm_taskname'] - info['jobtype'] = 'Analysis' - info['jobsw'] = info['tm_job_sw'] - info['jobarch'] = info['tm_job_arch'] - info['inputdata'] = info['tm_input_dataset'] - ## The 1st line below is for backward compatibility with entries in the TaskDB - ## made by CRAB server < 3.3.1511, where tm_primary_dataset = null - ## and tm_input_dataset always contains at least one '/'. Once we don't - ## care about backward compatibility anymore, remove the 1st line and - ## uncomment the 2nd line. - info['primarydataset'] = info['tm_primary_dataset'] if info['tm_primary_dataset'] else info['tm_input_dataset'].split('/')[1] - #info['primarydataset'] = info['tm_primary_dataset'] - info['splitalgo'] = info['tm_split_algo'] - info['algoargs'] = info['tm_split_args'] - info['userhn'] = info['tm_username'] - info['publishname'] = info['tm_publish_name'] - info['asyncdest'] = info['tm_asyncdest'] - info['dbsurl'] = info['tm_dbs_url'] - info['publishdbsurl'] = info['tm_publish_dbs_url'] - info['publication'] = 1 if info['tm_publication'] == 'T' else 0 - info['userdn'] = info['tm_user_dn'] - info['requestname'] = task['tm_taskname'].replace('"', '') - info['savelogsflag'] = 1 if info['tm_save_logs'] == 'T' else 0 # Note: this must always be 0 for probe jobs, is taken care of in PostJob.py - info['blacklistT1'] = 0 - info['siteblacklist'] = task['tm_site_blacklist'] - info['sitewhitelist'] = task['tm_site_whitelist'] - info['addoutputfiles'] = task['tm_outfiles'] - info['tfileoutfiles'] = task['tm_tfile_outfiles'] - info['edmoutfiles'] = task['tm_edm_outfiles'] - info['oneEventMode'] = 1 if info['tm_one_event_mode'] == 'T' else 0 - info['taskType'] = self.getDashboardTaskType(task) - info['worker_name'] = getattr(self.config.TaskWorker, 'name', 'unknown') - info['retry_aso'] = 1 if getattr(self.config.TaskWorker, 'retryOnASOFailures', True) else 0 - if task['tm_output_lfn'].startswith('/store/user/rucio') or \ - task['tm_output_lfn'].startswith('/store/group/rucio'): - info['aso_timeout'] = getattr(self.config.TaskWorker, 'ASORucioTimeout', 0) - else: - info['aso_timeout'] = getattr(self.config.TaskWorker, 'ASOTimeout', 0) - info['submitter_ip_addr'] = task['tm_submitter_ip_addr'] - info['cms_wmtool'] = self.setCMS_WMTool(task) - info['cms_tasktype'] = self.setCMS_TaskType(task) - info['cms_type'] = self.setCMS_Type(task) + return None + #info = {'jobcount': int(task['jobcount'])} + #return info + + jobSbumit = htcondor.Submit() + # these are classAds that we want to be added to each grid job + # Note that argument to classad.quote can only be string or None + jobSubmit['My.CRAB_Reqname'] = classad.quote(task['tm_taskname']) + jobSubmit['My.CRAB_workflow'] = classad.quote(task['tm_taskname']) + jobSubmit['My.CMS_JobtYpe'] = classad.quote('Analysis') + jobSubmit['My.CRAB_JobSw'] = classad.quote(task['tm_job_sw']) + jobSubmit['My.CRAB_JobArch'] = classad.quote(task['tm_job_arch']) + jobSubmit['My.CRAB_DBSURL'] = classad.quote(task['tm_dbs_url']) + jobSubmit['My.CRAB_PostJobStatus'] = classad.quote("NOT RUN") + jobSubmit['My.CRAB_PostJobLastUpdate'] = classad.quote("0") + jobSubmit['My.CRAB_PublishName'] = %(publishname) + jobSubmit['My.CRAB_Publish = %(publication) + jobSubmit['My.CRAB_PublishDBSURL'] = classad.quote(task['tm_publish_dbs_url']) + jobSubmit['My.CRAB_ISB'] = classad.quote(task['tm_cache_url']) + + def pythonListToClassAdValue(aList): + # python lists need special handling to become the string '{"a","b",...,"c"}' + quotedItems = json.dumps(aList) # from [s1, s2] to the string '["s1","s2"]' + quotedItems = quotedItems.lstrip('[').rstrip(']') # remove square brackets [ ] + value = "{" + quotedItems + "}" # make final string adding the curly brackets { } + return value + jobSubmit['My.CRAB_AdditionalOutputFiles'] = pythonListToClassAdValue(task['tm_outfiles']) + jobSubmit['My.CRAB_EDMOutputFiles'] = pythonListToClassAdValue(task['tm_edm_outfiles']) + jobSubmit['My.CRAB_TFileOutputFiles'] = pythonListToClassAdValue(task['tm_outfiles']) + jobSubmit['My.CRAB_UserDN'] = classad.quote(task['tm_user_dn']) + jobSubmit['My.CRAB_UserHN'] = classad.quote(task['tm_username']) + jobSubmit['My.CRAB_AsyncDest'] = classad.quote(task['asyncdest']) + jobSubmit['My.CRAB_StageoutPolicy'] = classad.quote(task['stageoutpolicy']) + jobSubmit['My.CRAB_UserRole'] = classad.quote(task['tm_user_role']) + jobSubmit['My.CRAB_UserGroup'] = classad.quote(task['tm_user_group']) + jobSubmit['My.CRAB_TaskWorker'] = classad.quote(getattr(self.config.TaskWorker, 'name', 'unknown')) + retry_aso = 1 if getattr(self.config.TaskWorker, 'retryOnASOFailures', True) else 0 + jobSubmit['My.CRAB_RetryOnASOFailures'] = classad.quote(str(retry_aso)) + jobSubmit['My.CRAB_ASOTimeout'] = classad.quote(str(getattr(self.config.TaskWorker, 'ASORucioTimeout', 0))) + jobSubmit['My.CRAB_RestHost'] = classad.quote(task['resthost']) + jobSubmit['My.CRAB_DbInstance'] = classad.quote(task['dbinstance']) + jobSubmit['My.CRAB_NumAutomJobRetries'] = classad.quote(str(task['numautomjobretries'])) + jobSubmit['My.CRAB_Id'] = "$(count)" # count macro will be defined via VARS line in the DAG description file + jobSubmit['My.CRAB_JobCount'] = classad.quote(str(task['jobcount'])) + temp_dest, dest = makeLFNPrefixes(task) + jobSubmit['My.CRAB_OutTempLFNDir'] = classad.quote(temp_dest) + jobSubmit['My.CRAB_OutLFNDir'] = classad.quote(dest) + oneEventMode = 1 if info['tm_one_event_mode'] == 'T' else 0 + jobSubmit['My.CRAB_oneEventMode'] = classad.quote(str(oneEventMode)) + jobSubmit['My.CRAB_PrimaryDataset'] = classad.quote(task['tm_primary_dataset']) + jobSubmit['My.CRAB_DAGType'] = classad.quote("Job") + jobSubmit['My.CRAB_SubmitterIpAddr'] = classad.quote(task['tm_submitter_ip_addr']) + jobSubmit['My.CRAB_TaskLifetimeDays'] = classad.quote(str(TASKLIFETIME // 24 // 60 // 60)) + jobSubmit['My.CRAB_TaskEndTime'] = classad.quote(str(int(task["tm_start_time"]) + TASKLIFETIME)) + jobSubmit['My.CRAB_SplitAlgo'] = info['tm_split_algo'] + jobSubmit['My.CRAB_AlgoArgs'] = info['tm_split_args'] + jobSubmit['My.CMS_WMTool'] = classad.quote(self.setCMS_WMTool(task)) + jobSubmit['My.CMS_TaskType'] = classad.quote(self.setCMS_TaskType(task)) + jobSubmit['My.CMS_SubmissionTool'] = classad.quote("CRAB") + jobSubmit['My.CMS_Type'] = classad.quote(self.setCMS_Type(task)) + + transferOutputs = "1" if task['tm_transfer_outputs'] == 'T' else "0" # Note: this must always be 0 for probe jobs, is taken care of in PostJob.py + jobSbumit['.My.CRAB_TransferOutputs'] = classad.quote(transferOutputs) + + + # now actual HTC Job Submission commands, here right hand side can be simply strings - #Classads for task lifetime management, see https://github.com/dmwm/CRABServer/issues/5505 - info['task_lifetime_days'] = TASKLIFETIME // 24 // 60 // 60 - info['task_endtime'] = int(task["tm_start_time"]) + TASKLIFETIME + egroups = getattr(self.config.TaskWorker, 'highPrioEgroups', []) + if egroups and task['tm_username'] in self.getHighPrioUsers(info['user_proxy'], info['workflow'], egroups): + info['accounting_group'] = 'highprio' + else: + info['accounting_group'] = 'analysis' + jobSbumit['accounting_group_user'] = task['tm_username'] self.populateGlideinMatching(info) info['runs'] = [] info['lumis'] = [] - info['saveoutput'] = 1 if info['tm_transfer_outputs'] == 'T' else 0 # Note: this must always be 0 for probe jobs, is taken care of in PostJob.py - egroups = getattr(self.config.TaskWorker, 'highPrioEgroups', []) - if egroups and info['userhn'] in self.getHighPrioUsers(info['user_proxy'], info['workflow'], egroups): - info['accounting_group'] = 'highprio' - else: - info['accounting_group'] = 'analysis' - info['accounting_group_user'] = info['userhn'] info = transform_strings(info) info['faillimit'] = task['tm_fail_limit'] # tm_extrajdl and tm_user_config['acceleratorparams'] contain list of k=v @@ -565,14 +602,14 @@ def makeJobSubmit(self, task): info['additional_environment_options'] += ' CRAB_TASKMANAGER_TARBALL=local' else: raise TaskWorkerException(f"Cannot find TaskManagerRun.tar.gz inside the cwd: {os.getcwd()}") - info['additional_input_file'] += ", sandbox.tar.gz" # it will be present on SPOOL_DIR after dab_bootstrap - info['additional_input_file'] += ", input_args.json" - info['additional_input_file'] += ", run_and_lumis.tar.gz" - info['additional_input_file'] += ", input_files.tar.gz" - info['additional_input_file'] += ", submit_env.sh" - info['additional_input_file'] += ", cmscp.sh" + jobSubmit['additional_input_file'] += ", sandbox.tar.gz" # it will be present on SPOOL_DIR after dab_bootstrap + jobSubmit['additional_input_file'] += ", input_args.json" + jobSubmit['additional_input_file'] += ", run_and_lumis.tar.gz" + jobSubmit['additional_input_file'] += ", input_files.tar.gz" + jobSubmit['additional_input_file'] += ", submit_env.sh" + jobSubmit['additional_input_file'] += ", cmscp.sh" - info['max_disk_space'] = MAX_DISK_SPACE + jobSbumit['max_disk_space'] = MAX_DISK_SPACE with open("Job.submit", "w", encoding='utf-8') as fd: fd.write(JOB_SUBMIT % info) @@ -1125,7 +1162,6 @@ def getBlacklistMsg(): with open("site.ad.json", "w", encoding='utf-8') as fd: json.dump(siteinfo, fd) - ## Save the DAG into a file. with open(dagFileName, "w", encoding='utf-8') as fd: fd.write(dag) From 56de12a8ec67056fe01590eac403ffbe96cdd11b Mon Sep 17 00:00:00 2001 From: Stefano Belforte Date: Tue, 21 Jan 2025 10:31:29 +0100 Subject: [PATCH 10/20] WIP --- .../TaskWorker/Actions/DagmanCreator.py | 47 +++++++++++++------ 1 file changed, 32 insertions(+), 15 deletions(-) diff --git a/src/python/TaskWorker/Actions/DagmanCreator.py b/src/python/TaskWorker/Actions/DagmanCreator.py index de72caa68e..d59e140788 100644 --- a/src/python/TaskWorker/Actions/DagmanCreator.py +++ b/src/python/TaskWorker/Actions/DagmanCreator.py @@ -399,11 +399,12 @@ def __init__(self, config, crabserver, procnum=-1, rucioClient=None): self.rucioClient = rucioClient self.runningInTW = crabserver is not None - def populateGlideinMatching(self, info): + def populateGlideinMatching(self, task): """ actually simply set the required arch and microarch """ - scram_arch = info['tm_job_arch'] + matchInfo = {} + scram_arch = task['tm_job_arch'] # required_arch, set default - info['required_arch'] = "X86_64" + matchInfo['required_arch'] = "X86_64" # The following regex matches a scram arch into four groups # for example el9_amd64_gcc10 is matched as (el)(9)_(amd64)_(gcc10) # later, only the third group is returned, the one corresponding to the arch. @@ -413,22 +414,24 @@ def populateGlideinMatching(self, info): if arch not in SCRAM_TO_ARCH: msg = f"Job configured for non-supported ScramArch '{arch}'" raise SubmissionRefusedException(msg) - info['required_arch'] = SCRAM_TO_ARCH.get(arch) + matchInfo['required_arch'] = SCRAM_TO_ARCH.get(arch) # required minimum micro_arch may need to be handled differently in the future (arm, risc, ...) # and may need different classAd(s) in the JDL, so try to be general here - min_micro_arch = info['tm_job_min_microarch'] + min_micro_arch = task['tm_job_min_microarch'] if not min_micro_arch: - info['required_minimum_microarch'] = '2' # the current default for CMSSW + matchInfo['required_minimum_microarch'] = '2' # the current default for CMSSW return if min_micro_arch == 'any': - info['required_minimum_microarch'] = 0 + matchInfo['required_minimum_microarch'] = 0 return if min_micro_arch.startswith('x86-64-v'): - info['required_minimum_microarch'] = int(min_micro_arch.split('v')[-1]) + matchInfo['required_minimum_microarch'] = int(min_micro_arch.split('v')[-1]) return self.logger.error(f"Not supported microarch: {min_micro_arch}. Ignore it") - info['required_minimum_microarch'] = 'any' + matchInfo['required_minimum_microarch'] = 'any' + + return matchInfo def getDashboardTaskType(self, task): """ Get the dashboard activity name for the task. @@ -549,16 +552,30 @@ def pythonListToClassAdValue(aList): jobSbumit['.My.CRAB_TransferOutputs'] = classad.quote(transferOutputs) + matchInfo = self.populateGlideinMatching(task) + + # These attributes help gWMS decide what platforms this job can run on; see https://twiki.cern.ch/twiki/bin/view/CMSPublic/CompOpsMatchArchitecture + jobSbumit['My.REQUIRED_ARCH'] = matchInfo['required_arch'] + jobSbumit['My.REQUIRED_MINIMUM_MICROARCH'] = matchInfo['required_minimum_microarch'] + jobSbumit['My.DESIRED_CMSDataset'] = classad.quote(task['tm_input_dataset']) + + # Stefano is not sure why we need this, i.e. whether we can replace its use with GLIDEIN_CMSSite + jobSbumit['My.JOBGLIDEIN_CMSSite'] = classad.quote("$$([ifThenElse(GLIDEIN_CMSSite is undefined, \\"Unknown\\", GLIDEIN_CMSSite)])") + # now actual HTC Job Submission commands, here right hand side can be simply strings egroups = getattr(self.config.TaskWorker, 'highPrioEgroups', []) - if egroups and task['tm_username'] in self.getHighPrioUsers(info['user_proxy'], info['workflow'], egroups): - info['accounting_group'] = 'highprio' + if egroups and task['tm_username'] in self.getHighPrioUsers(egroups): + jobSbumit['accounting_group'] = 'highprio' else: - info['accounting_group'] = 'analysis' + jobSbumit['accounting_group'] = 'analysis' jobSbumit['accounting_group_user'] = task['tm_username'] - self.populateGlideinMatching(info) + job_ad_information_attrs = MATCH_EXP_JOBGLIDEIN_CMSSite, JOBGLIDEIN_CMSSite, RemoteSysCpu, RemoteUserCpu + + # Recover job output and logs on eviction events; make sure they aren't spooled + # This allows us to return stdout to users when they hit memory limits (which triggers PeriodicRemove). + WhenToTransferOutput = ON_EXIT_OR_EVICT info['runs'] = [] info['lumis'] = [] @@ -1207,7 +1224,7 @@ def getBlacklistMsg(): return info, splitterResult, subdags - def getHighPrioUsers(self, userProxy, workflow, egroups): + def getHighPrioUsers(self, egroups): """ get the list of high priority users """ highPrioUsers = set() @@ -1218,7 +1235,7 @@ def getHighPrioUsers(self, userProxy, workflow, egroups): msg = "Error when getting the high priority users list." \ " Will ignore the high priority list and continue normally." \ f" Error reason: {ex}" - self.uploadWarning(msg, userProxy, workflow) + self.logger.error(msg) return [] return highPrioUsers From c16181742c902b69121f259be0b0601c2810350d Mon Sep 17 00:00:00 2001 From: Stefano Belforte Date: Tue, 21 Jan 2025 21:54:43 +0100 Subject: [PATCH 11/20] WIP --- .../TaskWorker/Actions/DagmanCreator.py | 137 ++++++++++++++---- 1 file changed, 106 insertions(+), 31 deletions(-) diff --git a/src/python/TaskWorker/Actions/DagmanCreator.py b/src/python/TaskWorker/Actions/DagmanCreator.py index d59e140788..919b930769 100644 --- a/src/python/TaskWorker/Actions/DagmanCreator.py +++ b/src/python/TaskWorker/Actions/DagmanCreator.py @@ -491,7 +491,7 @@ def makeJobSubmit(self, task): #info = {'jobcount': int(task['jobcount'])} #return info - jobSbumit = htcondor.Submit() + jobSubmit = htcondor.Submit() # these are classAds that we want to be added to each grid job # Note that argument to classad.quote can only be string or None jobSubmit['My.CRAB_Reqname'] = classad.quote(task['tm_taskname']) @@ -523,66 +523,141 @@ def pythonListToClassAdValue(aList): jobSubmit['My.CRAB_UserRole'] = classad.quote(task['tm_user_role']) jobSubmit['My.CRAB_UserGroup'] = classad.quote(task['tm_user_group']) jobSubmit['My.CRAB_TaskWorker'] = classad.quote(getattr(self.config.TaskWorker, 'name', 'unknown')) - retry_aso = 1 if getattr(self.config.TaskWorker, 'retryOnASOFailures', True) else 0 - jobSubmit['My.CRAB_RetryOnASOFailures'] = classad.quote(str(retry_aso)) - jobSubmit['My.CRAB_ASOTimeout'] = classad.quote(str(getattr(self.config.TaskWorker, 'ASORucioTimeout', 0))) + retry_aso = "1" if getattr(self.config.TaskWorker, 'retryOnASOFailures', True) else "0" + jobSubmit['My.CRAB_RetryOnASOFailures'] = retry_aso + jobSubmit['My.CRAB_ASOTimeout'] = str(getattr(self.config.TaskWorker, 'ASORucioTimeout', 0)) jobSubmit['My.CRAB_RestHost'] = classad.quote(task['resthost']) jobSubmit['My.CRAB_DbInstance'] = classad.quote(task['dbinstance']) - jobSubmit['My.CRAB_NumAutomJobRetries'] = classad.quote(str(task['numautomjobretries'])) + jobSubmit['My.CRAB_NumAutomJobRetries'] = str(task['numautomjobretries']) jobSubmit['My.CRAB_Id'] = "$(count)" # count macro will be defined via VARS line in the DAG description file - jobSubmit['My.CRAB_JobCount'] = classad.quote(str(task['jobcount'])) + jobSubmit['My.CRAB_JobCount'] = str(task['jobcount']) temp_dest, dest = makeLFNPrefixes(task) jobSubmit['My.CRAB_OutTempLFNDir'] = classad.quote(temp_dest) jobSubmit['My.CRAB_OutLFNDir'] = classad.quote(dest) - oneEventMode = 1 if info['tm_one_event_mode'] == 'T' else 0 - jobSubmit['My.CRAB_oneEventMode'] = classad.quote(str(oneEventMode)) + oneEventMode = "1" if info['tm_one_event_mode'] == 'T' else "0" + jobSubmit['My.CRAB_oneEventMode'] = oneEventMode jobSubmit['My.CRAB_PrimaryDataset'] = classad.quote(task['tm_primary_dataset']) jobSubmit['My.CRAB_DAGType'] = classad.quote("Job") jobSubmit['My.CRAB_SubmitterIpAddr'] = classad.quote(task['tm_submitter_ip_addr']) - jobSubmit['My.CRAB_TaskLifetimeDays'] = classad.quote(str(TASKLIFETIME // 24 // 60 // 60)) - jobSubmit['My.CRAB_TaskEndTime'] = classad.quote(str(int(task["tm_start_time"]) + TASKLIFETIME)) - jobSubmit['My.CRAB_SplitAlgo'] = info['tm_split_algo'] - jobSubmit['My.CRAB_AlgoArgs'] = info['tm_split_args'] + jobSubmit['My.CRAB_TaskLifetimeDays'] = str(TASKLIFETIME // 24 // 60 // 60) + jobSubmit['My.CRAB_TaskEndTime'] = str(int(task["tm_start_time"]) + TASKLIFETIME) + jobSubmit['My.CRAB_SplitAlgo'] = classad.quote(task['tm_split_algo']) + jobSubmit['My.CRAB_AlgoArgs'] = classad.quote(task['tm_split_args']) jobSubmit['My.CMS_WMTool'] = classad.quote(self.setCMS_WMTool(task)) jobSubmit['My.CMS_TaskType'] = classad.quote(self.setCMS_TaskType(task)) jobSubmit['My.CMS_SubmissionTool'] = classad.quote("CRAB") jobSubmit['My.CMS_Type'] = classad.quote(self.setCMS_Type(task)) - transferOutputs = "1" if task['tm_transfer_outputs'] == 'T' else "0" # Note: this must always be 0 for probe jobs, is taken care of in PostJob.py - jobSbumit['.My.CRAB_TransferOutputs'] = classad.quote(transferOutputs) - - - matchInfo = self.populateGlideinMatching(task) + jobSubmit['.My.CRAB_TransferOutputs'] = transferOutputs # These attributes help gWMS decide what platforms this job can run on; see https://twiki.cern.ch/twiki/bin/view/CMSPublic/CompOpsMatchArchitecture - jobSbumit['My.REQUIRED_ARCH'] = matchInfo['required_arch'] - jobSbumit['My.REQUIRED_MINIMUM_MICROARCH'] = matchInfo['required_minimum_microarch'] - jobSbumit['My.DESIRED_CMSDataset'] = classad.quote(task['tm_input_dataset']) + matchInfo = self.populateGlideinMatching(task) + jobSubmit['My.REQUIRED_ARCH'] = matchInfo['required_arch'] + jobSubmit['My.REQUIRED_MINIMUM_MICROARCH'] = matchInfo['required_minimum_microarch'] + jobSubmit['My.DESIRED_CMSDataset'] = classad.quote(task['tm_input_dataset']) # Stefano is not sure why we need this, i.e. whether we can replace its use with GLIDEIN_CMSSite - jobSbumit['My.JOBGLIDEIN_CMSSite'] = classad.quote("$$([ifThenElse(GLIDEIN_CMSSite is undefined, \\"Unknown\\", GLIDEIN_CMSSite)])") + # but that would require changes elsewhere in the code base as well + jobSubmit['My.JOBGLIDEIN_CMSSite'] = classad.quote("$$([ifThenElse(GLIDEIN_CMSSite is undefined, \\"Unknown\\", GLIDEIN_CMSSite)])") + # # now actual HTC Job Submission commands, here right hand side can be simply strings + # egroups = getattr(self.config.TaskWorker, 'highPrioEgroups', []) if egroups and task['tm_username'] in self.getHighPrioUsers(egroups): - jobSbumit['accounting_group'] = 'highprio' + jobSubmit['accounting_group'] = 'highprio' else: - jobSbumit['accounting_group'] = 'analysis' - jobSbumit['accounting_group_user'] = task['tm_username'] + jobSubmit['accounting_group'] = 'analysis' + jobSubmit['accounting_group_user'] = task['tm_username'] - job_ad_information_attrs = MATCH_EXP_JOBGLIDEIN_CMSSite, JOBGLIDEIN_CMSSite, RemoteSysCpu, RemoteUserCpu + jobSubmit['job_ad_information_attrs'] = "MATCH_EXP_JOBGLIDEIN_CMSSite, JOBGLIDEIN_CMSSite, RemoteSysCpu, RemoteUserCpu" # Recover job output and logs on eviction events; make sure they aren't spooled # This allows us to return stdout to users when they hit memory limits (which triggers PeriodicRemove). - WhenToTransferOutput = ON_EXIT_OR_EVICT + jobSubmit['WhenToTransferOutput'] = "ON_EXIT_OR_EVICT" + # old code had this line in Job.submit, but I can't find it used nor documented anywhere + # +SpoolOnEvict = false + + # Keep job in the queue upon completion long enough for the postJob to run, + # allowing the monitoring script to fetch the postJob status and job exit-code updated by the postJob + jobSubmit['LeaveJobInQueue'] = "ifThenElse((JobStatus=?=4 || JobStatus=?=3)" + \ + "&& (time() - EnteredCurrentStatus < 30 * 60*60), true, false)" + + jobSubmit['universe'] = "vanilla" + jobSubmit['Executable'] = "gWMS-CMSRunAnalysis.sh" + jobSubmit['Output'] = "job_out.$(count)" + jobSubmit['Error'] = "job_err.$(count)" + jobSubmit['Log'] = "job_log" + + jobSubmit['Arguments'] = "--jobNumber=$(CRAB_Id)" + jobSubmit['transfer_output_files'] = "jobReport.json.$(count), WMArchiveReport.json.$(count)" + additional_input_files = ..... + jobSubmit['transfer_input_files'] = f"CMSRunAnalysis.sh, cmscp.py{additional_input_file}" + # make sure coredump (if any) is not added to output files ref: https://lists.cs.wisc.edu/archive/htcondor-users/2022-September/msg00052.shtml + jobSubmit['coresize'] = "0" + # TODO: fold this into the config file instead of hardcoding things. + additional_environment_options = ..... + jobSubmit['Environment'] = f"SCRAM_ARCH=$(CRAB_JobArch) {additional_environment_options}" + jobSubmit['should_transfer_files'] = "YES" + jobSubmit['use_x509userproxy'] = "true" + + arch = task['tm_job_arch'].split("_")[0] # extracts "slc7" from "slc7_amd64_gcc10" + required_os_list = ARCH_TO_OS.get(arch) + if not required_os_list: + raise SubmissionRefusedException(f"Unsupported architecture {arch}") + # ARCH_TO_OS.get("slc7") gives a list with one item only: ['rhel7'] + jobSubmit['My.REQUIRED_OS'] = required_os_list[0] + jobSubmit['Requirements'] = "stringListMember(TARGET.Arch, REQUIRED_ARCH)" + + # Ref: https://htcondor.readthedocs.io/en/latest/classad-attributes/job-classad-attributes.html#HoldReasonCode + jobSubmit['periodic_release'] = "(HoldReasonCode == 28) || (HoldReasonCode == 30) " + \ + " || (HoldReasonCode == 13) || HoldReasonCode == 6)" + + # Remove if + # a) job is in the 'held' status for more than 7 minutes + # b) job is idle more than 7 days + # c) job is running and one of: + # 1) Over memory use + # 2) Over wall clock limit + # 3) Over disk usage of N GB, which is set in ServerUtilities + # d) the taks EndTime has been reached + # e) job is idle and users proxy expired 1 day ago. + # (P.S. why 1 day ago? because there is recurring action which is updating user proxy and lifetime.) + # ** If New periodic remove expression is added, also it should have Periodic Remove Reason. ** + # ** Otherwise message will not be clear and it is hard to debug ** + periodicRemove = "( (JobStatus =?= 5) && (time() - EnteredCurrentStatus > 7*60) )" # a) + periodicRemove += "|| ( (JobStatus =?= 1) && (time() - EnteredCurrentStatus > 7*24*60*60) )" # b) + periodicRemove += "|| ( (JobStatus =?= 2) && ( " # c) + periodicRemove += "(MemoryUsage = != UNDEFINED && MemoryUsage > RequestMemory)" # c) 1) + periodicRemove += "|| (MaxWallTimeMinsRun * 60 < time() - EnteredCurrentStatus)" # c) 2) + periodicRemove += f"|| (DiskUsage > {MAX_DISK_SPACE})" # c) 3) + periodicRemove += "))" # these parentheses close the "if running" condition, i.e. JobStatus==2 + periodicRemove += "|| (time() > CRAB_TaskEndTime)" # d) + periodicRemove += "|| ( (JobStatus =?= 1) && (time() > (x509UserProxyExpiration + 86400)))""" # e) + jobSubmit['periodic_remove'] = periodicRemove + + # remove reasons are "ordered" in the following big IF starting from the less-conditial ones + # order is relevant and getting it right is "an art" + periodicRemoveReason = "ifThenElse(" + periodicRemoveReason += "time() - EnteredCurrentStatus > 7 * 24 * 60 * 60 && isUndefined(MemoryUsage)," + periodicRemoveReason += "\"Removed due to idle time limit\"," # set this reasons. Else + periodicRemoveReason += "ifThenElse(time() > x509UserProxyExpiration, \"Removed job due to proxy expiration\"," + periodicRemoveReason += "ifThenElse(MemoryUsage > RequestMemory, \"Removed due to memory use\"," + periodicRemoveReason += "ifThenElse(MaxWallTimeMinsRun * 60 < time() - EnteredCurrentStatus, \"Removed due to wall clock limit\"," + periodicRemoveReason += f"ifThenElse(DiskUsage > {MAX_DISK_SPACE}, \"Removed due to disk usage\"," + periodicRemoveReason += "ifThenElse(time() > CRAB_TaskEndTime, \"Removed due to reached CRAB_TaskEndTime\"," + periodicRemoveReason += "\"Removed due to job being held\"))))))" # one closed ")" for each "ifThenElse(" + jobSubmit['My.PeriodicRemoveReason'] = periodicRemoveReason + + %(accelerator_jdl) - info['runs'] = [] - info['lumis'] = [] - info = transform_strings(info) - info['faillimit'] = task['tm_fail_limit'] # tm_extrajdl and tm_user_config['acceleratorparams'] contain list of k=v # assignements to be turned into classAds, so here we turn them from a python list of strings to + for extraJdl in task['tm_extra_jdl']: + k,v = extraJdl.split('=',1) + jobSubmit[k] = v + # a single string with k=v separated by \n which can be pasted into the Job.submit JDL info['extra_jdl'] = '\n'.join(literal_eval(task['tm_extrajdl'])) if task['tm_user_config']['requireaccelerator']: @@ -626,7 +701,7 @@ def pythonListToClassAdValue(aList): jobSubmit['additional_input_file'] += ", submit_env.sh" jobSubmit['additional_input_file'] += ", cmscp.sh" - jobSbumit['max_disk_space'] = MAX_DISK_SPACE + jobSubmit['max_disk_space'] = MAX_DISK_SPACE with open("Job.submit", "w", encoding='utf-8') as fd: fd.write(JOB_SUBMIT % info) From 12b1b4cfe1f844713bffbb5b3568505ae00f5bbe Mon Sep 17 00:00:00 2001 From: Stefano Belforte Date: Wed, 22 Jan 2025 17:28:20 +0100 Subject: [PATCH 12/20] src/WIP --- src/python/TaskWorker/Actions/DagmanCreator.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/python/TaskWorker/Actions/DagmanCreator.py b/src/python/TaskWorker/Actions/DagmanCreator.py index 919b930769..d3a3e33c28 100644 --- a/src/python/TaskWorker/Actions/DagmanCreator.py +++ b/src/python/TaskWorker/Actions/DagmanCreator.py @@ -502,8 +502,8 @@ def makeJobSubmit(self, task): jobSubmit['My.CRAB_DBSURL'] = classad.quote(task['tm_dbs_url']) jobSubmit['My.CRAB_PostJobStatus'] = classad.quote("NOT RUN") jobSubmit['My.CRAB_PostJobLastUpdate'] = classad.quote("0") - jobSubmit['My.CRAB_PublishName'] = %(publishname) - jobSubmit['My.CRAB_Publish = %(publication) + ?jobSubmit['My.CRAB_PublishName'] = %(publishname) + ?jobSubmit['My.CRAB_Publish = %(publication) jobSubmit['My.CRAB_PublishDBSURL'] = classad.quote(task['tm_publish_dbs_url']) jobSubmit['My.CRAB_ISB'] = classad.quote(task['tm_cache_url']) From f1c6c36bb068591328ccdf189753ba6159240075 Mon Sep 17 00:00:00 2001 From: Stefano Belforte Date: Wed, 22 Jan 2025 23:24:08 +0100 Subject: [PATCH 13/20] a DagmanCreator which compiles --- .../TaskWorker/Actions/DagmanCreator.py | 125 +++++++++--------- 1 file changed, 60 insertions(+), 65 deletions(-) diff --git a/src/python/TaskWorker/Actions/DagmanCreator.py b/src/python/TaskWorker/Actions/DagmanCreator.py index d3a3e33c28..eef541e2f8 100644 --- a/src/python/TaskWorker/Actions/DagmanCreator.py +++ b/src/python/TaskWorker/Actions/DagmanCreator.py @@ -36,8 +36,10 @@ from WMCore.WMRuntime.Tools.Scram import ARCH_TO_OS, SCRAM_TO_ARCH if 'useHtcV2' in os.environ: + import htcondor2 as htcondor import classad2 as classad else: + import htcondor import classad DAG_HEADER = """ @@ -421,13 +423,13 @@ def populateGlideinMatching(self, task): min_micro_arch = task['tm_job_min_microarch'] if not min_micro_arch: matchInfo['required_minimum_microarch'] = '2' # the current default for CMSSW - return + return matchInfo if min_micro_arch == 'any': matchInfo['required_minimum_microarch'] = 0 - return + return matchInfo if min_micro_arch.startswith('x86-64-v'): matchInfo['required_minimum_microarch'] = int(min_micro_arch.split('v')[-1]) - return + return matchInfo self.logger.error(f"Not supported microarch: {min_micro_arch}. Ignore it") matchInfo['required_minimum_microarch'] = 'any' @@ -486,12 +488,12 @@ def makeJobSubmit(self, task): propagated to the scheduler via the DAG description files """ + jobSubmit = htcondor.Submit() + if os.path.exists("Job.submit"): - return None - #info = {'jobcount': int(task['jobcount'])} - #return info + jobSubmit['jobcount'] = str(task['jobcount']) + return jobSubmit - jobSubmit = htcondor.Submit() # these are classAds that we want to be added to each grid job # Note that argument to classad.quote can only be string or None jobSubmit['My.CRAB_Reqname'] = classad.quote(task['tm_taskname']) @@ -502,8 +504,8 @@ def makeJobSubmit(self, task): jobSubmit['My.CRAB_DBSURL'] = classad.quote(task['tm_dbs_url']) jobSubmit['My.CRAB_PostJobStatus'] = classad.quote("NOT RUN") jobSubmit['My.CRAB_PostJobLastUpdate'] = classad.quote("0") - ?jobSubmit['My.CRAB_PublishName'] = %(publishname) - ?jobSubmit['My.CRAB_Publish = %(publication) + jobSubmit['My.CRAB_PublishName'] = classad.quote(task['tm_publish_name']) + jobSubmit['My.CRAB_Publish'] = "1" if task['tm_publication'] == 'T' else "0" jobSubmit['My.CRAB_PublishDBSURL'] = classad.quote(task['tm_publish_dbs_url']) jobSubmit['My.CRAB_ISB'] = classad.quote(task['tm_cache_url']) @@ -534,7 +536,7 @@ def pythonListToClassAdValue(aList): temp_dest, dest = makeLFNPrefixes(task) jobSubmit['My.CRAB_OutTempLFNDir'] = classad.quote(temp_dest) jobSubmit['My.CRAB_OutLFNDir'] = classad.quote(dest) - oneEventMode = "1" if info['tm_one_event_mode'] == 'T' else "0" + oneEventMode = "1" if task['tm_one_event_mode'] == 'T' else "0" jobSubmit['My.CRAB_oneEventMode'] = oneEventMode jobSubmit['My.CRAB_PrimaryDataset'] = classad.quote(task['tm_primary_dataset']) jobSubmit['My.CRAB_DAGType'] = classad.quote("Job") @@ -558,7 +560,7 @@ def pythonListToClassAdValue(aList): # Stefano is not sure why we need this, i.e. whether we can replace its use with GLIDEIN_CMSSite # but that would require changes elsewhere in the code base as well - jobSubmit['My.JOBGLIDEIN_CMSSite'] = classad.quote("$$([ifThenElse(GLIDEIN_CMSSite is undefined, \\"Unknown\\", GLIDEIN_CMSSite)])") + jobSubmit['My.JOBGLIDEIN_CMSSite'] = classad.quote('$$([ifThenElse(GLIDEIN_CMSSite is undefined, "Unknown", GLIDEIN_CMSSite)])') # # now actual HTC Job Submission commands, here right hand side can be simply strings @@ -592,12 +594,28 @@ def pythonListToClassAdValue(aList): jobSubmit['Arguments'] = "--jobNumber=$(CRAB_Id)" jobSubmit['transfer_output_files'] = "jobReport.json.$(count), WMArchiveReport.json.$(count)" - additional_input_files = ..... + + additional_input_file = "" + additional_environment_options = "" + if os.path.exists("CMSRunAnalysis.tar.gz"): + additional_environment_options += ' CRAB_RUNTIME_TARBALL=local' + additional_input_file += ", CMSRunAnalysis.tar.gz" + else: + raise TaskWorkerException(f"Cannot find CMSRunAnalysis.tar.gz inside the cwd: {os.getcwd()}") + if os.path.exists("TaskManagerRun.tar.gz"): + additional_environment_options += ' CRAB_TASKMANAGER_TARBALL=local' + else: + raise TaskWorkerException(f"Cannot find TaskManagerRun.tar.gz inside the cwd: {os.getcwd()}") + additional_input_file += ", sandbox.tar.gz" # it will be present on SPOOL_DIR after dab_bootstrap + additional_input_file += ", run_and_lumis.tar.gz" + additional_input_file += ", input_files.tar.gz" + additional_input_file += ", submit_env.sh" + additional_input_file += ", cmscp.sh" jobSubmit['transfer_input_files'] = f"CMSRunAnalysis.sh, cmscp.py{additional_input_file}" # make sure coredump (if any) is not added to output files ref: https://lists.cs.wisc.edu/archive/htcondor-users/2022-September/msg00052.shtml jobSubmit['coresize'] = "0" - # TODO: fold this into the config file instead of hardcoding things. - additional_environment_options = ..... + + # we should fold this into the config file instead of hardcoding things. jobSubmit['Environment'] = f"SCRAM_ARCH=$(CRAB_JobArch) {additional_environment_options}" jobSubmit['should_transfer_files'] = "YES" jobSubmit['use_x509userproxy'] = "true" @@ -650,50 +668,27 @@ def pythonListToClassAdValue(aList): periodicRemoveReason += "\"Removed due to job being held\"))))))" # one closed ")" for each "ifThenElse(" jobSubmit['My.PeriodicRemoveReason'] = periodicRemoveReason - %(accelerator_jdl) - # tm_extrajdl and tm_user_config['acceleratorparams'] contain list of k=v # assignements to be turned into classAds, so here we turn them from a python list of strings to for extraJdl in task['tm_extra_jdl']: k,v = extraJdl.split('=',1) jobSubmit[k] = v - # a single string with k=v separated by \n which can be pasted into the Job.submit JDL - info['extra_jdl'] = '\n'.join(literal_eval(task['tm_extrajdl'])) if task['tm_user_config']['requireaccelerator']: # hardcoding accelerator to GPU (SI currently only have nvidia GPU) - info['accelerator_jdl'] = '+RequiresGPU=1\nrequest_GPUs=1' + jobSubmit['My.RequiresGPU'] = "1" + jobSubmit['request_GPUs'] = "1" if task['tm_user_config']['acceleratorparams']: gpuMemoryMB = task['tm_user_config']['acceleratorparams'].get('GPUMemoryMB', None) cudaCapabilities = task['tm_user_config']['acceleratorparams'].get('CUDACapabilities', None) cudaRuntime = task['tm_user_config']['acceleratorparams'].get('CUDARuntime', None) if gpuMemoryMB: - info['accelerator_jdl'] += f"\n+GPUMemoryMB={gpuMemoryMB}" + jobSubmit['My.GPUMemoryMB'] = classad.quote(gpuMemoryMB) if cudaCapabilities: cudaCapability = ','.join(sorted(cudaCapabilities)) - info['accelerator_jdl'] += f"\n+CUDACapability={classad.quote(cudaCapability)}" + jobSubmit['My.CUDACapability'] = classad.quote(cudaCapability) if cudaRuntime: - info['accelerator_jdl'] += f"\n+CUDARuntime={classad.quote(cudaRuntime)}" - else: - info['accelerator_jdl'] = '' - arch = info['jobarch'].split("_")[0] # extracts "slc7" from "slc7_amd64_gcc10" - required_os_list = ARCH_TO_OS.get(arch) - if not required_os_list: - raise SubmissionRefusedException(f"Unsupported architecture {arch}") - # ARCH_TO_OS.get("slc7") gives a list with one item only: ['rhel7'] - info['opsys_req'] = f'+REQUIRED_OS="{required_os_list[0]}"' - - info.setdefault("additional_environment_options", '') - info.setdefault("additional_input_file", "") - if os.path.exists("CMSRunAnalysis.tar.gz"): - info['additional_environment_options'] += 'CRAB_RUNTIME_TARBALL=local' - info['additional_input_file'] += ", CMSRunAnalysis.tar.gz" - else: - raise TaskWorkerException(f"Cannot find CMSRunAnalysis.tar.gz inside the cwd: {os.getcwd()}") - if os.path.exists("TaskManagerRun.tar.gz"): - info['additional_environment_options'] += ' CRAB_TASKMANAGER_TARBALL=local' - else: - raise TaskWorkerException(f"Cannot find TaskManagerRun.tar.gz inside the cwd: {os.getcwd()}") + jobSubmit['My.CUDARuntime'] = classad.quote(cudaRuntime) jobSubmit['additional_input_file'] += ", sandbox.tar.gz" # it will be present on SPOOL_DIR after dab_bootstrap jobSubmit['additional_input_file'] += ", input_args.json" jobSubmit['additional_input_file'] += ", run_and_lumis.tar.gz" @@ -704,9 +699,9 @@ def pythonListToClassAdValue(aList): jobSubmit['max_disk_space'] = MAX_DISK_SPACE with open("Job.submit", "w", encoding='utf-8') as fd: - fd.write(JOB_SUBMIT % info) + print(jobSubmit, file=fd) - return info + return jobSubmit def getPreScriptDefer(self, task, jobid): @@ -886,7 +881,7 @@ def createSubdag(self, splitterResult, **kwargs): automatic splitting (multiple subdags which will be added in the scheduler by the PreDag.py script which calls this DagmanCreator Returns: - info : dictionary : passes info to next action (DagmanSubmitter) + jobSubmit : HTCondor submit object : passes the Job.submit template to next action (DagmanSubmitter) splitterResult : object : this is the output of previous action (Splitter) and is part of input arguments to DagmanCreator ! As far as Stefano can tell returning it here is a "perverse" way to pass it also to DagmanSubmitter @@ -1260,7 +1255,7 @@ def getBlacklistMsg(): kwargs['task']['jobcount'] = len(dagSpecs) - info = self.makeJobSubmit(kwargs['task']) + jobSubmit = self.makeJobSubmit(kwargs['task']) # list of input arguments needed for each jobs argdicts = self.prepareJobArguments(dagSpecs, kwargs['task']) @@ -1276,28 +1271,28 @@ def getBlacklistMsg(): maxidle = getattr(self.config.TaskWorker, 'maxIdle', MAX_IDLE_JOBS) if maxidle == -1: - maxidle = info['jobcount'] + maxidle = kwargs['task']['jobcount'] elif maxidle == 0: - maxidle = int(max(MAX_IDLE_JOBS, info['jobcount']*.1)) - info['maxidle'] = maxidle + maxidle = int(max(MAX_IDLE_JOBS, kwargs['task']['jobcount']*.1)) + jobSubmit['maxidle'] = str(maxidle) maxpost = getattr(self.config.TaskWorker, 'maxPost', MAX_POST_JOBS) if maxpost == -1: - maxpost = info['jobcount'] + maxpost = kwargs['task']['jobcount'] elif maxpost == 0: - maxpost = int(max(MAX_POST_JOBS, info['jobcount']*.1)) - info['maxpost'] = maxpost + maxpost = int(max(MAX_POST_JOBS, kwargs['task']['jobcount']*.1)) + jobSubmit['maxpost'] = str(maxpost) - if info.get('faillimit') is None: - info['faillimit'] = -1 - #if info['jobcount'] > 200 - # info['faillimit'] = 100 + if not 'faillimit' in jobSubmit: + jobSubmit['faillimit'] = "-1" + #if jobSubmit['jobcount'] > 200 + # jobSubmit['faillimit'] = 100 #else: - # info['faillimit'] = -1 - elif info.get('faillimit') < 0: - info['faillimit'] = -1 + # jobSubmit['faillimit'] = -1 + elif int(jobSubmit['faillimit']) < 0: + jobSubmit['faillimit'] = "-1" - return info, splitterResult, subdags + return jobSubmit, splitterResult, subdags def getHighPrioUsers(self, egroups): """ get the list of high priority users """ @@ -1371,13 +1366,13 @@ def executeInternal(self, *args, **kw): filesForSched = filesForWN + \ ['gWMS-CMSRunAnalysis.sh', 'RunJobs.dag', 'Job.submit', 'dag_bootstrap.sh', 'AdjustSites.py', 'site.ad.json', 'TaskManagerRun.tar.gz', - 'datadiscovery.pkl', 'taskinformation.pkl', 'taskworkerconfig.pkl',] + 'datadiscovery.pkl', 'taskjobSubmitrmation.pkl', 'taskworkerconfig.pkl',] if kw['task']['tm_input_dataset']: filesForSched.append("input_dataset_lumis.json") filesForSched.append("input_dataset_duplicate_lumis.json") - info, splitterResult, subdags = self.createSubdag(*args, **kw) + jobSubmit, splitterResult, subdags = self.createSubdag(*args, **kw) # as splitter summary is useful for dryrun, let's add it to the InputFiles tarball jobGroups = splitterResult[0] # the first returned value of Splitter action is the splitterFactory output @@ -1390,7 +1385,7 @@ def executeInternal(self, *args, **kw): self.prepareTarballForSched(filesForSched, subdags) - return info, params, ["InputFiles.tar.gz"], splitterResult + return jobSubmit, params, ["InputFiles.tar.gz"], splitterResult def execute(self, *args, **kw): @@ -1398,7 +1393,7 @@ def execute(self, *args, **kw): cwd = os.getcwd() try: os.chdir(kw['tempDir']) - info, params, inputFiles, splitterResult = self.executeInternal(*args, **kw) - return TaskWorker.DataObjects.Result.Result(task=kw['task'], result=(info, params, inputFiles, splitterResult)) + jobSubmit, params, inputFiles, splitterResult = self.executeInternal(*args, **kw) + return TaskWorker.DataObjects.Result.Result(task=kw['task'], result=(jobSubmit, params, inputFiles, splitterResult)) finally: os.chdir(cwd) From 8a76e73f5321055b48e39512649644561427c52d Mon Sep 17 00:00:00 2001 From: Stefano Belforte Date: Thu, 23 Jan 2025 12:42:19 +0100 Subject: [PATCH 14/20] a DagmanCreator which compiles --- .../TaskWorker/Actions/DagmanCreator.py | 85 +------------------ 1 file changed, 3 insertions(+), 82 deletions(-) diff --git a/src/python/TaskWorker/Actions/DagmanCreator.py b/src/python/TaskWorker/Actions/DagmanCreator.py index eef541e2f8..876668cebf 100644 --- a/src/python/TaskWorker/Actions/DagmanCreator.py +++ b/src/python/TaskWorker/Actions/DagmanCreator.py @@ -30,7 +30,6 @@ from RucioUtils import getWritePFN from CMSGroupMapper import get_egroup_users -import WMCore.WMSpec.WMTask from WMCore import Lexicon from WMCore.Services.CRIC.CRIC import CRIC from WMCore.WMRuntime.Tools.Scram import ARCH_TO_OS, SCRAM_TO_ARCH @@ -240,6 +239,7 @@ def makeLFNPrefixes(task): return temp_dest, dest + def validateLFNs(path, outputFiles): """ validate against standard Lexicon the LFN's that this task will try to publish in DBS @@ -266,6 +266,7 @@ def validateLFNs(path, outputFiles): msg += "\n and therefore can not be handled in our DataBase" raise SubmissionRefusedException(msg) + def validateUserLFNs(path, outputFiles): """ validate against standard Lexicon a user-defined LFN which will not go in DBS, but still needs to be sane @@ -292,82 +293,6 @@ def validateUserLFNs(path, outputFiles): msg += "\n and therefore can not be handled in our DataBase" raise SubmissionRefusedException(msg) -def createJobSubmit(data): - """ - create a JobSubmit object template which will be persisted as Job.submit file and send to - scheduler, where it will be used to submit individual jobs after PreJob.py fills in the job-specific valued - """ - jobSubmit = htcondor.Submit() - for var in 'workflow', 'jobtype', 'jobsw', 'jobarch', 'inputdata', 'primarydataset', 'splitalgo', 'algoargs', \ - 'userhn', 'publishname', 'asyncdest', 'dbsurl', 'publishdbsurl', \ - 'userdn', 'requestname', 'oneEventMode', 'tm_user_vo', 'tm_user_role', 'tm_user_group', \ - 'tm_maxmemory', 'tm_numcores', 'tm_maxjobruntime', 'tm_priority', \ - 'stageoutpolicy', 'taskType', 'worker_name', 'cms_wmtool', 'cms_tasktype', 'cms_type', \ - 'required_arch', 'required_minimum_microarch', 'resthost', 'dbinstance', 'submitter_ip_addr', \ - 'task_lifetime_days', 'task_endtime', 'maxproberuntime', 'maxtailruntime': - val = data.get(var, None) - if val is None: - jobSubmit[var] = 'undefined' - else: - # should better handle double quotes when filling JDL's and remove this ! - # now it is a mess since some things in info get their double quote here, some in the JOB_SUBMIT string - jobSubmit[var] = val - -def transform_strings(data): - """ - Converts the arguments in the data dictionary to the arguments necessary - for the job submit file string. - """ - info = {} - for var in 'workflow', 'jobtype', 'jobsw', 'jobarch', 'inputdata', 'primarydataset', 'splitalgo', 'algoargs', \ - 'userhn', 'publishname', 'asyncdest', 'dbsurl', 'publishdbsurl', \ - 'userdn', 'requestname', 'oneEventMode', 'tm_user_vo', 'tm_user_role', 'tm_user_group', \ - 'tm_maxmemory', 'tm_numcores', 'tm_maxjobruntime', 'tm_priority', \ - 'stageoutpolicy', 'taskType', 'worker_name', 'cms_wmtool', 'cms_tasktype', 'cms_type', \ - 'required_arch', 'required_minimum_microarch', 'resthost', 'dbinstance', 'submitter_ip_addr', \ - 'task_lifetime_days', 'task_endtime', 'maxproberuntime', 'maxtailruntime': - val = data.get(var, None) - if val is None: - info[var] = 'undefined' - else: - # should better handle double quotes when filling JDL's and remove this ! - # now it is a mess since some things in info get their double quote here, some in the JOB_SUBMIT string - info[var] = json.dumps(val) - - #for var in 'accounting_group', 'accounting_group_user': - # info[var] = data[var] - - for var in 'savelogsflag', 'blacklistT1', 'retry_aso', 'aso_timeout', 'publication', 'saveoutput', 'numautomjobretries', 'jobcount': - info[var] = int(data[var]) - - for var in 'siteblacklist', 'sitewhitelist', 'addoutputfiles', 'tfileoutfiles', 'edmoutfiles': - val = data[var] - if val is None: - info[var] = "{}" - else: - info[var] = "{" + json.dumps(val)[1:-1] + "}" - - info['lumimask'] = '"' + json.dumps(WMCore.WMSpec.WMTask.buildLumiMask(data['runs'], data['lumis'])).replace(r'"', r'\"') + '"' - - splitArgName = SPLIT_ARG_MAP[data['splitalgo']] - info['algoargs'] = '"' + json.dumps({'halt_job_on_file_boundaries': False, 'splitOnRun': False, splitArgName : data['algoargs']}).replace('"', r'\"') + '"' - info['attempt'] = 0 - - # SB these must not be "json dumped" here ! So revert what doen at line 312 ... oh my my my ..... - for var in ["jobsw", "jobarch", "asyncdest", "requestname"]: - info[var] = data[var] - - # info["addoutputfiles"] = '{}' - - temp_dest, dest = makeLFNPrefixes(data) - info["temp_dest"] = temp_dest - info["output_dest"] = dest - info['x509up_file'] = os.path.split(data['user_proxy'])[-1] - info['user_proxy'] = data['user_proxy'] - info['scratch'] = data['scratch'] - - return info - def getLocation(default_name): """ Get the location of the runtime code (job wrapper, postjob, anything executed on the schedd @@ -703,7 +628,6 @@ def pythonListToClassAdValue(aList): return jobSubmit - def getPreScriptDefer(self, task, jobid): """ Return the string to be used for deferring prejobs If the extrajdl CRAB_JobReleaseTimeout is not set in the client it returns @@ -723,7 +647,6 @@ def getPreScriptDefer(self, task, jobid): prescriptDeferString = '' return prescriptDeferString - def makeDagSpecs(self, task, siteinfo, jobgroup, block, availablesites, datasites, outfiles, startjobid, parent=None, stage='conventional'): """ need a comment line here """ dagSpecs = [] @@ -830,7 +753,7 @@ def makeDagSpecs(self, task, siteinfo, jobgroup, block, availablesites, datasite def prepareJobArguments(self, dagSpecs, task): """ Prepare an object with all the input parameters of each jobs. It is a list with a dictionary for each job. The dictionary key/value pairs are the variables needed in CMSRunAnalysis.py - This will be save in "input_args*.json", a differnt json file for the main DAG and each subdags + This will be save in "input_args.json" by the caller, adding to existing list in ther if any Inputs: dagSpecs : list of dictionaries with information for each DAG job task: dictionary, the "standard" task dictionary with info from the DataBase TASK table @@ -1309,7 +1232,6 @@ def getHighPrioUsers(self, egroups): return [] return highPrioUsers - def executeInternal(self, *args, **kw): """ all real work is done here """ transform_location = getLocation('CMSRunAnalysis.sh') @@ -1387,7 +1309,6 @@ def executeInternal(self, *args, **kw): return jobSubmit, params, ["InputFiles.tar.gz"], splitterResult - def execute(self, *args, **kw): """ entry point called by Hanlder """ cwd = os.getcwd() From 075c21a05c50126d78daf5e2aa2656308a73460f Mon Sep 17 00:00:00 2001 From: Stefano Belforte Date: Thu, 23 Jan 2025 13:37:43 +0100 Subject: [PATCH 15/20] fixes --- .../TaskWorker/Actions/DagmanCreator.py | 20 +++++++------------ 1 file changed, 7 insertions(+), 13 deletions(-) diff --git a/src/python/TaskWorker/Actions/DagmanCreator.py b/src/python/TaskWorker/Actions/DagmanCreator.py index 876668cebf..193c9e8ea3 100644 --- a/src/python/TaskWorker/Actions/DagmanCreator.py +++ b/src/python/TaskWorker/Actions/DagmanCreator.py @@ -445,7 +445,7 @@ def pythonListToClassAdValue(aList): jobSubmit['My.CRAB_TFileOutputFiles'] = pythonListToClassAdValue(task['tm_outfiles']) jobSubmit['My.CRAB_UserDN'] = classad.quote(task['tm_user_dn']) jobSubmit['My.CRAB_UserHN'] = classad.quote(task['tm_username']) - jobSubmit['My.CRAB_AsyncDest'] = classad.quote(task['asyncdest']) + jobSubmit['My.CRAB_AsyncDest'] = classad.quote(task['tm_asyncdest']) jobSubmit['My.CRAB_StageoutPolicy'] = classad.quote(task['stageoutpolicy']) jobSubmit['My.CRAB_UserRole'] = classad.quote(task['tm_user_role']) jobSubmit['My.CRAB_UserGroup'] = classad.quote(task['tm_user_group']) @@ -469,7 +469,7 @@ def pythonListToClassAdValue(aList): jobSubmit['My.CRAB_TaskLifetimeDays'] = str(TASKLIFETIME // 24 // 60 // 60) jobSubmit['My.CRAB_TaskEndTime'] = str(int(task["tm_start_time"]) + TASKLIFETIME) jobSubmit['My.CRAB_SplitAlgo'] = classad.quote(task['tm_split_algo']) - jobSubmit['My.CRAB_AlgoArgs'] = classad.quote(task['tm_split_args']) + jobSubmit['My.CRAB_AlgoArgs'] = classad.quote(str(task['tm_split_args'])) # from dict to str before quoting jobSubmit['My.CMS_WMTool'] = classad.quote(self.setCMS_WMTool(task)) jobSubmit['My.CMS_TaskType'] = classad.quote(self.setCMS_TaskType(task)) jobSubmit['My.CMS_SubmissionTool'] = classad.quote("CRAB") @@ -593,9 +593,11 @@ def pythonListToClassAdValue(aList): periodicRemoveReason += "\"Removed due to job being held\"))))))" # one closed ")" for each "ifThenElse(" jobSubmit['My.PeriodicRemoveReason'] = periodicRemoveReason - # tm_extrajdl and tm_user_config['acceleratorparams'] contain list of k=v - # assignements to be turned into classAds, so here we turn them from a python list of strings to - for extraJdl in task['tm_extra_jdl']: + # tm_extrajdl and tm_user_config['acceleratorparams'] contain list of k=v assignements to be turned into classAds + # also special handling is needed because is retrieved from DB not as a python list, but as a string + # with format "['a=b','c=d'...]" (a change in RESTWorkerWorkflow would be needed to get a python list + # so we use here the same literal_eval which is used in RESTWorkerWorkflow.py ) + for extraJdl in literal_eval(task['tm_extrajdl']): k,v = extraJdl.split('=',1) jobSubmit[k] = v @@ -614,14 +616,6 @@ def pythonListToClassAdValue(aList): jobSubmit['My.CUDACapability'] = classad.quote(cudaCapability) if cudaRuntime: jobSubmit['My.CUDARuntime'] = classad.quote(cudaRuntime) - jobSubmit['additional_input_file'] += ", sandbox.tar.gz" # it will be present on SPOOL_DIR after dab_bootstrap - jobSubmit['additional_input_file'] += ", input_args.json" - jobSubmit['additional_input_file'] += ", run_and_lumis.tar.gz" - jobSubmit['additional_input_file'] += ", input_files.tar.gz" - jobSubmit['additional_input_file'] += ", submit_env.sh" - jobSubmit['additional_input_file'] += ", cmscp.sh" - - jobSubmit['max_disk_space'] = MAX_DISK_SPACE with open("Job.submit", "w", encoding='utf-8') as fd: print(jobSubmit, file=fd) From 1714e59e6e209c06139a633c41173befb7332944 Mon Sep 17 00:00:00 2001 From: Stefano Belforte Date: Thu, 23 Jan 2025 13:44:03 +0100 Subject: [PATCH 16/20] fixes --- src/python/TaskWorker/Actions/DagmanCreator.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/python/TaskWorker/Actions/DagmanCreator.py b/src/python/TaskWorker/Actions/DagmanCreator.py index 193c9e8ea3..37436b68d7 100644 --- a/src/python/TaskWorker/Actions/DagmanCreator.py +++ b/src/python/TaskWorker/Actions/DagmanCreator.py @@ -1282,7 +1282,7 @@ def executeInternal(self, *args, **kw): filesForSched = filesForWN + \ ['gWMS-CMSRunAnalysis.sh', 'RunJobs.dag', 'Job.submit', 'dag_bootstrap.sh', 'AdjustSites.py', 'site.ad.json', 'TaskManagerRun.tar.gz', - 'datadiscovery.pkl', 'taskjobSubmitrmation.pkl', 'taskworkerconfig.pkl',] + 'datadiscovery.pkl', 'taskinformation.pkl', 'taskworkerconfig.pkl',] if kw['task']['tm_input_dataset']: filesForSched.append("input_dataset_lumis.json") From 1238a24d420373bb868eeb1c53c0b5a909091240 Mon Sep 17 00:00:00 2001 From: Stefano Belforte Date: Thu, 23 Jan 2025 15:05:47 +0100 Subject: [PATCH 17/20] fixes --- .../TaskWorker/Actions/DagmanCreator.py | 28 +++++++++++-------- 1 file changed, 16 insertions(+), 12 deletions(-) diff --git a/src/python/TaskWorker/Actions/DagmanCreator.py b/src/python/TaskWorker/Actions/DagmanCreator.py index 37436b68d7..f83ec0f57d 100644 --- a/src/python/TaskWorker/Actions/DagmanCreator.py +++ b/src/python/TaskWorker/Actions/DagmanCreator.py @@ -424,11 +424,11 @@ def makeJobSubmit(self, task): jobSubmit['My.CRAB_Reqname'] = classad.quote(task['tm_taskname']) jobSubmit['My.CRAB_workflow'] = classad.quote(task['tm_taskname']) jobSubmit['My.CMS_JobtYpe'] = classad.quote('Analysis') - jobSubmit['My.CRAB_JobSw'] = classad.quote(task['tm_job_sw']) + jobSubmit['My.CRAB_JobSW'] = classad.quote(task['tm_job_sw']) jobSubmit['My.CRAB_JobArch'] = classad.quote(task['tm_job_arch']) jobSubmit['My.CRAB_DBSURL'] = classad.quote(task['tm_dbs_url']) jobSubmit['My.CRAB_PostJobStatus'] = classad.quote("NOT RUN") - jobSubmit['My.CRAB_PostJobLastUpdate'] = classad.quote("0") + jobSubmit['My.CRAB_PostJobLastUpdate'] = "0" jobSubmit['My.CRAB_PublishName'] = classad.quote(task['tm_publish_name']) jobSubmit['My.CRAB_Publish'] = "1" if task['tm_publication'] == 'T' else "0" jobSubmit['My.CRAB_PublishDBSURL'] = classad.quote(task['tm_publish_dbs_url']) @@ -447,8 +447,12 @@ def pythonListToClassAdValue(aList): jobSubmit['My.CRAB_UserHN'] = classad.quote(task['tm_username']) jobSubmit['My.CRAB_AsyncDest'] = classad.quote(task['tm_asyncdest']) jobSubmit['My.CRAB_StageoutPolicy'] = classad.quote(task['stageoutpolicy']) - jobSubmit['My.CRAB_UserRole'] = classad.quote(task['tm_user_role']) - jobSubmit['My.CRAB_UserGroup'] = classad.quote(task['tm_user_group']) + # for VOMS role and group, PostJob and RenewRemoteProxies code want the undefined value, not " + userRole = task['tm_user_role']" + jobSubmit['My.CRAB_UserRole'] = classad.quote(userRole) if userRole else 'undefined' + userGroup = task['tm_user_group'] + jobSubmit['My.CRAB_UserRole'] = classad.quote(userGroup) if userGroup else 'undefined' + jobSubmit['My.CRAB_UserGroup'] = classad.quote(userGroup) jobSubmit['My.CRAB_TaskWorker'] = classad.quote(getattr(self.config.TaskWorker, 'name', 'unknown')) retry_aso = "1" if getattr(self.config.TaskWorker, 'retryOnASOFailures', True) else "0" jobSubmit['My.CRAB_RetryOnASOFailures'] = retry_aso @@ -456,7 +460,7 @@ def pythonListToClassAdValue(aList): jobSubmit['My.CRAB_RestHost'] = classad.quote(task['resthost']) jobSubmit['My.CRAB_DbInstance'] = classad.quote(task['dbinstance']) jobSubmit['My.CRAB_NumAutomJobRetries'] = str(task['numautomjobretries']) - jobSubmit['My.CRAB_Id'] = "$(count)" # count macro will be defined via VARS line in the DAG description file + jobSubmit['My.CRAB_Id'] = classad.quote("$(count)") # count macro defined via VARS line in the DAG file jobSubmit['My.CRAB_JobCount'] = str(task['jobcount']) temp_dest, dest = makeLFNPrefixes(task) jobSubmit['My.CRAB_OutTempLFNDir'] = classad.quote(temp_dest) @@ -475,12 +479,12 @@ def pythonListToClassAdValue(aList): jobSubmit['My.CMS_SubmissionTool'] = classad.quote("CRAB") jobSubmit['My.CMS_Type'] = classad.quote(self.setCMS_Type(task)) transferOutputs = "1" if task['tm_transfer_outputs'] == 'T' else "0" # Note: this must always be 0 for probe jobs, is taken care of in PostJob.py - jobSubmit['.My.CRAB_TransferOutputs'] = transferOutputs + jobSubmit['My.CRAB_TransferOutputs'] = transferOutputs # These attributes help gWMS decide what platforms this job can run on; see https://twiki.cern.ch/twiki/bin/view/CMSPublic/CompOpsMatchArchitecture matchInfo = self.populateGlideinMatching(task) - jobSubmit['My.REQUIRED_ARCH'] = matchInfo['required_arch'] - jobSubmit['My.REQUIRED_MINIMUM_MICROARCH'] = matchInfo['required_minimum_microarch'] + jobSubmit['My.REQUIRED_ARCH'] = classad.quote(matchInfo['required_arch']) + jobSubmit['My.REQUIRED_MINIMUM_MICROARCH'] = classad.quote(matchInfo['required_minimum_microarch']) jobSubmit['My.DESIRED_CMSDataset'] = classad.quote(task['tm_input_dataset']) # Stefano is not sure why we need this, i.e. whether we can replace its use with GLIDEIN_CMSSite @@ -508,7 +512,7 @@ def pythonListToClassAdValue(aList): # Keep job in the queue upon completion long enough for the postJob to run, # allowing the monitoring script to fetch the postJob status and job exit-code updated by the postJob - jobSubmit['LeaveJobInQueue'] = "ifThenElse((JobStatus=?=4 || JobStatus=?=3)" + \ + jobSubmit['LeaveJobInQueue'] = "ifThenElse((JobStatus=?=4 || JobStatus=?=3) " + \ "&& (time() - EnteredCurrentStatus < 30 * 60*60), true, false)" jobSubmit['universe'] = "vanilla" @@ -541,7 +545,7 @@ def pythonListToClassAdValue(aList): jobSubmit['coresize'] = "0" # we should fold this into the config file instead of hardcoding things. - jobSubmit['Environment'] = f"SCRAM_ARCH=$(CRAB_JobArch) {additional_environment_options}" + jobSubmit['Environment'] = classad.quote(f"SCRAM_ARCH=$(CRAB_JobArch) {additional_environment_options}") jobSubmit['should_transfer_files'] = "YES" jobSubmit['use_x509userproxy'] = "true" @@ -550,12 +554,12 @@ def pythonListToClassAdValue(aList): if not required_os_list: raise SubmissionRefusedException(f"Unsupported architecture {arch}") # ARCH_TO_OS.get("slc7") gives a list with one item only: ['rhel7'] - jobSubmit['My.REQUIRED_OS'] = required_os_list[0] + jobSubmit['My.REQUIRED_OS'] = classad.quote(required_os_list[0]) jobSubmit['Requirements'] = "stringListMember(TARGET.Arch, REQUIRED_ARCH)" # Ref: https://htcondor.readthedocs.io/en/latest/classad-attributes/job-classad-attributes.html#HoldReasonCode jobSubmit['periodic_release'] = "(HoldReasonCode == 28) || (HoldReasonCode == 30) " + \ - " || (HoldReasonCode == 13) || HoldReasonCode == 6)" + "|| (HoldReasonCode == 13) || (HoldReasonCode == 6)" # Remove if # a) job is in the 'held' status for more than 7 minutes From 119b1c2a21fe7ccb012d6e557d132fb8531a0d79 Mon Sep 17 00:00:00 2001 From: Stefano Belforte Date: Thu, 23 Jan 2025 15:06:11 +0100 Subject: [PATCH 18/20] fixes --- src/python/TaskWorker/Actions/DagmanCreator.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/python/TaskWorker/Actions/DagmanCreator.py b/src/python/TaskWorker/Actions/DagmanCreator.py index f83ec0f57d..53414ee8cf 100644 --- a/src/python/TaskWorker/Actions/DagmanCreator.py +++ b/src/python/TaskWorker/Actions/DagmanCreator.py @@ -448,7 +448,7 @@ def pythonListToClassAdValue(aList): jobSubmit['My.CRAB_AsyncDest'] = classad.quote(task['tm_asyncdest']) jobSubmit['My.CRAB_StageoutPolicy'] = classad.quote(task['stageoutpolicy']) # for VOMS role and group, PostJob and RenewRemoteProxies code want the undefined value, not " - userRole = task['tm_user_role']" + userRole = task['tm_user_role'] jobSubmit['My.CRAB_UserRole'] = classad.quote(userRole) if userRole else 'undefined' userGroup = task['tm_user_group'] jobSubmit['My.CRAB_UserRole'] = classad.quote(userGroup) if userGroup else 'undefined' From e32caeda4d10b781d8e8fc158200a6f93a6a15aa Mon Sep 17 00:00:00 2001 From: Stefano Belforte Date: Thu, 23 Jan 2025 15:28:39 +0100 Subject: [PATCH 19/20] fixes --- src/python/TaskWorker/Actions/DagmanCreator.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/src/python/TaskWorker/Actions/DagmanCreator.py b/src/python/TaskWorker/Actions/DagmanCreator.py index 53414ee8cf..275c23b85a 100644 --- a/src/python/TaskWorker/Actions/DagmanCreator.py +++ b/src/python/TaskWorker/Actions/DagmanCreator.py @@ -451,8 +451,7 @@ def pythonListToClassAdValue(aList): userRole = task['tm_user_role'] jobSubmit['My.CRAB_UserRole'] = classad.quote(userRole) if userRole else 'undefined' userGroup = task['tm_user_group'] - jobSubmit['My.CRAB_UserRole'] = classad.quote(userGroup) if userGroup else 'undefined' - jobSubmit['My.CRAB_UserGroup'] = classad.quote(userGroup) + jobSubmit['My.CRAB_UserGroup'] = classad.quote(userGroup) if userGroup else 'undefined' jobSubmit['My.CRAB_TaskWorker'] = classad.quote(getattr(self.config.TaskWorker, 'name', 'unknown')) retry_aso = "1" if getattr(self.config.TaskWorker, 'retryOnASOFailures', True) else "0" jobSubmit['My.CRAB_RetryOnASOFailures'] = retry_aso @@ -478,7 +477,7 @@ def pythonListToClassAdValue(aList): jobSubmit['My.CMS_TaskType'] = classad.quote(self.setCMS_TaskType(task)) jobSubmit['My.CMS_SubmissionTool'] = classad.quote("CRAB") jobSubmit['My.CMS_Type'] = classad.quote(self.setCMS_Type(task)) - transferOutputs = "1" if task['tm_transfer_outputs'] == 'T' else "0" # Note: this must always be 0 for probe jobs, is taken care of in PostJob.py + transferOutputs = "1" if task['tm_transfer_outputs'] == 'T' else "0" # Note: this must always be 0 for probe jobs, is taken care of in PreJob.py jobSubmit['My.CRAB_TransferOutputs'] = transferOutputs # These attributes help gWMS decide what platforms this job can run on; see https://twiki.cern.ch/twiki/bin/view/CMSPublic/CompOpsMatchArchitecture @@ -545,7 +544,7 @@ def pythonListToClassAdValue(aList): jobSubmit['coresize'] = "0" # we should fold this into the config file instead of hardcoding things. - jobSubmit['Environment'] = classad.quote(f"SCRAM_ARCH=$(CRAB_JobArch) {additional_environment_options}") + jobSubmit['Environment'] = classad.quote(f"SCRAM_ARCH=$(CRAB_JobArch){additional_environment_options}") jobSubmit['should_transfer_files'] = "YES" jobSubmit['use_x509userproxy'] = "true" From f49b5bc6360ea09760e7b7f28f66f46cd5292410 Mon Sep 17 00:00:00 2001 From: Stefano Belforte Date: Thu, 23 Jan 2025 15:31:02 +0100 Subject: [PATCH 20/20] fixes --- src/python/TaskWorker/Actions/DagmanCreator.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/python/TaskWorker/Actions/DagmanCreator.py b/src/python/TaskWorker/Actions/DagmanCreator.py index 275c23b85a..7427dad325 100644 --- a/src/python/TaskWorker/Actions/DagmanCreator.py +++ b/src/python/TaskWorker/Actions/DagmanCreator.py @@ -423,7 +423,7 @@ def makeJobSubmit(self, task): # Note that argument to classad.quote can only be string or None jobSubmit['My.CRAB_Reqname'] = classad.quote(task['tm_taskname']) jobSubmit['My.CRAB_workflow'] = classad.quote(task['tm_taskname']) - jobSubmit['My.CMS_JobtYpe'] = classad.quote('Analysis') + jobSubmit['My.CMS_JobType'] = classad.quote('Analysis') jobSubmit['My.CRAB_JobSW'] = classad.quote(task['tm_job_sw']) jobSubmit['My.CRAB_JobArch'] = classad.quote(task['tm_job_arch']) jobSubmit['My.CRAB_DBSURL'] = classad.quote(task['tm_dbs_url'])