Skip to content

Commit 9c37b8c

Browse files
committed
introducing the Submitter class: takes care of submitting jobs locally and to the cluster, enabling estimation of the injection scale prior to submission, progress on #28
1 parent e5283cc commit 9c37b8c

File tree

1 file changed

+328
-0
lines changed

1 file changed

+328
-0
lines changed

flarestack/cluster/submitter.py

+328
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,328 @@
1+
import os, subprocess, time, logging, shutil
2+
import numpy as np
3+
from flarestack.shared import fs_dir, log_dir, fs_scratch_dir, make_analysis_pickle, host_server, \
4+
inj_dir_name, name_pickle_output_dir
5+
from flarestack.core.multiprocess_wrapper import run_multiprocess
6+
from flarestack.core.minimisation import MinimisationHandler
7+
from flarestack.core.results import ResultsHandler
8+
9+
10+
logger = logging.getLogger(__name__)
11+
12+
13+
class Submitter(object):
14+
15+
submitter_dict = dict()
16+
17+
def __init__(self, mh_dict, use_cluster, n_cpu,
18+
do_sensitivity_scale_estimation=False, remove_old_results=False,
19+
**cluster_kwargs):
20+
"""
21+
A class that takes care of submitting the trial calculations.
22+
Also can estimate the sensitivity scale before submitting.
23+
:param mh_dict: dict, MinimisationHandler dictionary
24+
:param use_cluster: bool, whether to run the trials locally or on the cluster
25+
:param n_cpu: int, number of cores to use
26+
:param do_sensitivity_scale_estimation: str, containing 'asimov', 'quick_injections' or both
27+
:param remove_old_results: bool, if True will delete directories containing injection values and pickled
28+
results from previous trials
29+
:param cluster_kwargs: keyword arguments used by the cluster
30+
"""
31+
self.mh_dict = mh_dict
32+
self.use_cluster = use_cluster
33+
self.n_cpu = n_cpu
34+
self.job_id = None
35+
self.remove_old_results = remove_old_results
36+
self.do_sensitivity_scale_estimation = do_sensitivity_scale_estimation
37+
self.successful_guess_by_quick_injections = False
38+
self.cluster_kwargs = cluster_kwargs
39+
40+
def submit_cluster(self, mh_dict):
41+
"""Splits the trials into jobs and submits them to be calculated on the cluster"""
42+
raise NotImplementedError
43+
44+
def submit_local(self, mh_dict):
45+
"""Uses the MultiprocessWrapper to split the trials into jobs and run them locally"""
46+
# max CPU number is all but one
47+
make_analysis_pickle(mh_dict)
48+
n_cpu = min(self.n_cpu, os.cpu_count() - 1)
49+
run_multiprocess(n_cpu=n_cpu, mh_dict=mh_dict)
50+
51+
def submit(self, mh_dict):
52+
if self.remove_old_results:
53+
self._clean_injection_values_and_pickled_results(self.mh_dict['name'])
54+
if self.use_cluster:
55+
self.submit_cluster(mh_dict)
56+
else:
57+
self.submit_local(mh_dict)
58+
59+
def wait_for_job(self):
60+
"""Waits until the cluster is finished processing the job with the ID self.job_id"""
61+
raise NotImplementedError
62+
63+
@property
64+
def _quick_injections_name(self):
65+
name = self.mh_dict['name']
66+
return f'{name if not name.endswith(os.sep) else name[:-1]}_quick_injection/'
67+
68+
def run_quick_injections_to_estimate_sensitivity_scale(self):
69+
"""
70+
Roughly estimates the injection scale in order to find a better scale range.
71+
The quick injection trials are run locally.
72+
Note that a scale still has to be given in the mh_dict as a first estimate.
73+
"""
74+
logger.info(f'doing quick trials to estimate scale')
75+
76+
# repeat the guessing until success:
77+
while not self.successful_guess_by_quick_injections:
78+
79+
# The given scale will serve as an initial guess
80+
initial_guess = self.mh_dict['scale']
81+
82+
quick_injections_mh_dict = dict(self.mh_dict)
83+
quick_injections_mh_dict['name'] = self._quick_injections_name
84+
quick_injections_mh_dict['background_ntrials_factor'] = 1
85+
quick_injections_mh_dict['n_trials'] = 20
86+
self.submit_local(quick_injections_mh_dict)
87+
88+
# collect the quick injections
89+
quick_injections_rh = ResultsHandler(quick_injections_mh_dict, do_sens=False, do_disc=False)
90+
91+
# guess the disc and sens scale
92+
disc_guess, sens_guess = quick_injections_rh.estimate_sens_disc_scale()
93+
94+
if any((guess < 0) or (guess > initial_guess) for guess in [disc_guess, sens_guess]):
95+
logger.info(f'Could not perform scale guess because '
96+
f'at least one guess outside [0, {initial_guess}]! '
97+
f'Adjusting accordingly.')
98+
self.mh_dict['scale'] = max((sens_guess, disc_guess)) * 1.5
99+
100+
elif initial_guess > 5 * disc_guess:
101+
logger.info(f'Could not perform scale guess beause '
102+
f'initial scale guess {initial_guess} much larger than '
103+
f'disc scale guess {disc_guess}. '
104+
f'Adjusting initial guess to {4 * disc_guess} and retry.')
105+
self.mh_dict['scale'] = 4 * disc_guess
106+
107+
else:
108+
logger.info('Scale guess successful. Adjusting injection scale.')
109+
self.successful_guess_by_quick_injections = True
110+
self.mh_dict['scale'] = sens_guess
111+
112+
self._clean_injection_values_and_pickled_results(quick_injections_rh.name)
113+
114+
@staticmethod
115+
def _clean_injection_values_and_pickled_results(name):
116+
"""Removes directories containing injection values and pickled results"""
117+
directories = [name_pickle_output_dir(name), inj_dir_name(name)]
118+
for d in directories:
119+
if os.path.isdir(d):
120+
logger.debug(f'removing {d}')
121+
shutil.rmtree(d)
122+
else:
123+
logger.warning(f'Can not remove {d}! It is not a directory!')
124+
125+
def do_asimov_scale_estimation(self):
126+
"""estimate the injection scale using Asimov estimation"""
127+
logger.info('doing asimov estimation')
128+
mh = MinimisationHandler.create(self.mh_dict)
129+
scale_estimate = mh.guess_scale()
130+
logger.debug(f'estimated scale: {scale_estimate}')
131+
self.mh_dict['scale'] = scale_estimate
132+
133+
def analyse(self):
134+
if self.do_sensitivity_scale_estimation:
135+
if 'asimov' in self.do_sensitivity_scale_estimation:
136+
self.do_asimov_scale_estimation()
137+
138+
if 'quick_injections' in self.do_sensitivity_scale_estimation:
139+
self.run_quick_injections_to_estimate_sensitivity_scale()
140+
141+
self.submit(self.mh_dict)
142+
143+
@classmethod
144+
def register_submitter_class(cls, server_name):
145+
"""Adds a new subclass of Submitter, with class name equal to "server_name"."""
146+
def decorator(subclass):
147+
cls.submitter_dict[server_name] = subclass
148+
return subclass
149+
return decorator
150+
151+
@classmethod
152+
def get_submitter(cls, *args, **kwargs):
153+
154+
if host_server not in cls.submitter_dict:
155+
logger.warning(f'No submitter implemented for host server {host_server}! '
156+
f'Using LocalSubmitter but you wont\'t be able to use cluster operations!')
157+
return cls.submitter_dict['local'](*args, **kwargs)
158+
159+
return cls.submitter_dict[host_server](*args, **kwargs)
160+
161+
162+
@Submitter.register_submitter_class("local")
163+
class LocalSubmitter(Submitter):
164+
165+
def __init__(self, mh_dict, use_cluster, n_cpu, do_sensitivity_scale_estimation=False, **cluster_kwargs):
166+
if use_cluster:
167+
raise NotImplementedError('No cluster operation implemented because you are using the LocalSubmitter!')
168+
169+
super(LocalSubmitter, self).__init__(
170+
mh_dict, use_cluster, n_cpu, do_sensitivity_scale_estimation, **cluster_kwargs
171+
)
172+
173+
174+
@Submitter.register_submitter_class("DESY")
175+
class DESYSubmitter(Submitter):
176+
177+
cluster_dir = os.path.dirname(os.path.realpath(__file__)) + "/"
178+
submit_file = cluster_dir + "SubmitDESY.sh"
179+
180+
def __init__(self, mh_dict, use_cluster, n_cpu, **cluster_kwargs):
181+
super(DESYSubmitter, self).__init__(mh_dict, use_cluster, n_cpu, **cluster_kwargs)
182+
183+
# extract information that will be used by the cluster script
184+
self.h_cpu = self.cluster_kwargs.get("h_cpu", "23:59:00")
185+
self.trials_per_task = self.cluster_kwargs.get("trials_per_task", 1)
186+
self.cluster_cpu = self.cluster_kwargs.get('cluster_cpu', n_cpu)
187+
self.ram_per_core = self.cluster_kwargs.get(
188+
"ram_per_core",
189+
"{0:.1f}G".format(6. / float(self.cluster_cpu) + 2.)
190+
)
191+
192+
self.username = os.path.basename(os.environ['HOME'])
193+
self.status_cmd = f'qstat -u {self.username}'
194+
self.submit_cmd = 'qsub '
195+
self.root_dir = os.path.dirname(fs_dir[:-1])
196+
197+
@staticmethod
198+
def _qstat_output(qstat_command):
199+
"""return the output of the qstat_command"""
200+
# start a subprocess to query the cluster
201+
process = subprocess.Popen(qstat_command, stdout=subprocess.PIPE, shell=True)
202+
# read the ouput
203+
tmp = process.stdout.read().decode()
204+
return str(tmp)
205+
206+
def _ntasks_from_qstat_command(self, qstat_command):
207+
"""Returns the number of tasks from the output of qstat_command"""
208+
# get the ouput of qstat_command
209+
st = self._qstat_output(qstat_command)
210+
# If the output is an empty string there are no tasks left
211+
if st == '':
212+
return 0
213+
else:
214+
# Extract the number of tasks with my job_id
215+
ids = np.array([int(s.split(' ')[2]) for s in st.split('\n')[2:-1]])
216+
return len(ids[ids == self.job_id])
217+
218+
@property
219+
def ntasks_total(self):
220+
"""Returns the total number of tasks"""
221+
return self._ntasks_from_qstat_command(self.status_cmd)
222+
223+
@property
224+
def ntasks_running(self):
225+
"""Returns the number of running tasks"""
226+
return self._ntasks_from_qstat_command(self.status_cmd + " -s r")
227+
228+
def wait_for_job(self):
229+
"""
230+
Runs the command cmd, which queries the status of the job on the
231+
cluster, and reads the output. While the output is not an empty
232+
string (indicating job completion), the cluster is re-queried
233+
every 30 seconds. Occasionally outputs the number of remaining sub-tasks
234+
on cluster, and outputs full table result every ~ 8 minutes. On
235+
completion of job, terminates function process and allows the script to
236+
continue.
237+
"""
238+
time.sleep(10)
239+
i = 31
240+
j = 6
241+
while self.ntasks_total != 0:
242+
if i > 3:
243+
logger.info(f'{time.asctime(time.localtime())} - Job{self.job_id}:'
244+
f' {self.ntasks_total} entries in queue. '
245+
f'Of these, {self.ntasks_running} are running tasks, and '
246+
f'{self.ntasks_total - self.ntasks_running} are tasks still waiting to be executed.')
247+
i = 0
248+
j += 1
249+
250+
if j > 7:
251+
logger.info(self._qstat_output(self.status_cmd))
252+
j = 0
253+
254+
time.sleep(30)
255+
i += 1
256+
257+
def make_cluster_submission_script(self):
258+
flarestack_scratch_dir = os.path.dirname(fs_scratch_dir[:-1]) + "/"
259+
260+
text = "#!/bin/zsh \n" \
261+
"## \n" \
262+
"##(otherwise the default shell would be used) \n" \
263+
"#$ -S /bin/zsh \n" \
264+
"## \n" \
265+
"##(the running time for this job) \n" \
266+
f"#$ -l h_cpu={self.h_cpu} \n" \
267+
"#$ -l h_rss=" + str(self.ram_per_core) + "\n" \
268+
"## \n" \
269+
"## \n" \
270+
"##(send mail on job's abort) \n" \
271+
"#$ -m a \n" \
272+
"## \n" \
273+
"##(stderr and stdout are merged together to stdout) \n" \
274+
"#$ -j y \n" \
275+
"## \n" \
276+
"## name of the job \n" \
277+
"## -N Flarestack script " + self.username + " \n" \
278+
"## \n" \
279+
"##(redirect output to:) \n" \
280+
"#$ -o /dev/null \n" \
281+
"## \n" \
282+
"sleep $(( ( RANDOM % 60 ) + 1 )) \n" \
283+
'exec > "$TMPDIR"/${JOB_ID}_stdout.txt ' \
284+
'2>"$TMPDIR"/${JOB_ID}_stderr.txt \n' \
285+
'eval $(/cvmfs/icecube.opensciencegrid.org/py3-v4.1.0/setup.sh) \n' \
286+
'export PYTHONPATH=' + self.root_dir + '/ \n' \
287+
'export FLARESTACK_SCRATCH_DIR=' + flarestack_scratch_dir + " \n" \
288+
'python ' + fs_dir + 'core/multiprocess_wrapper.py -f $1 -n $2 \n' \
289+
'cp $TMPDIR/${JOB_ID}_stdout.txt ' + log_dir + '\n' \
290+
'cp $TMPDIR/${JOB_ID}_stderr.txt ' + log_dir + '\n '
291+
292+
logger.info("Creating file at {0}".format(DESYSubmitter.submit_file))
293+
294+
with open(DESYSubmitter.submit_file, "w") as f:
295+
f.write(text)
296+
297+
logger.debug("Bash file created: \n {0}".format(text))
298+
299+
cmd = "chmod +x " + DESYSubmitter.submit_file
300+
os.system(cmd)
301+
302+
def submit_cluster(self, mh_dict):
303+
"""Submits the job to the cluster"""
304+
# Get the number of tasks that will have to be submitted in order to get ntrials
305+
ntrials = mh_dict['n_trials']
306+
n_tasks = int(ntrials / self.trials_per_task)
307+
logger.debug(f'running {ntrials} trials in {n_tasks} tasks')
308+
309+
# The mh_dict will be submitted n_task times and will perform mh_dict['n_trials'] each time.
310+
# Therefore we have to adjust mh_dict['n_trials'] in order to actually perform the number
311+
# specified in self.mh_dict['n_trials']
312+
mh_dict['n_trials'] = self.trials_per_task
313+
path = make_analysis_pickle(mh_dict)
314+
315+
# assemble the submit command
316+
submit_cmd = self.submit_cmd
317+
if self.cluster_cpu > 1:
318+
submit_cmd += " -pe multicore {0} -R y ".format(self.cluster_cpu)
319+
submit_cmd += f"-t 1-{n_tasks}:1 {DESYSubmitter.submit_file} {path} {self.cluster_cpu}"
320+
logger.debug(f"Ram per core: {self.ram_per_core}")
321+
logger.info(f"{time.asctime(time.localtime())}: {submit_cmd}")
322+
323+
self.make_cluster_submission_script()
324+
325+
process = subprocess.Popen(submit_cmd, stdout=subprocess.PIPE, shell=True)
326+
msg = process.stdout.read().decode()
327+
logger.info(str(msg))
328+
self.job_id = int(str(msg).split('job-array')[1].split('.')[0])

0 commit comments

Comments
 (0)