-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathparallel_runner.py
251 lines (191 loc) · 8.33 KB
/
parallel_runner.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
import argparse
import datetime
import numpy as np
import os
import os.path as osp
import shutil
from utils.gpu_allocater import GPUAllocater
from utils.logger import setup_logger, print
from utils.mail import MailClient
from utils.result_parser import ResultParser
from configs import get_config
from templates import get_command
class ParallelRunner(object):
def __init__(self, cfg):
self.cfg = cfg
self.data_cfg = cfg['data']
self.train_cfg = cfg['train']
self.grid_search_cfg = cfg['grid_search']
self.output_cfg = cfg['output']
self.mail_cfg = cfg['mail']
self.allocater = GPUAllocater(cfg['gpu_ids'])
self.mail = MailClient(self.mail_cfg)
def run(self):
""" main method """
grid_search_cfg = self.grid_search_cfg
output_cfg = self.output_cfg
# remove useless directories
remove_dirs = [output_cfg[name] for name in output_cfg['remove_dirs']]
for dir_ in remove_dirs:
if osp.exists(dir_):
print(f'Remove directory >>> {dir_}')
shutil.rmtree(dir_)
os.makedirs(dir_)
setup_logger(osp.join(output_cfg['root'], 'log.txt'), write_to_console=True)
start_time = datetime.datetime.now()
try:
# main
if grid_search_cfg['enable']:
result_paths = self.run_grid_serach()
else:
result_paths = [self.run_single()]
except:
# handle exception, contents of exception will be sent to your email
end_time = datetime.datetime.now()
contents = [f'<b>Training tasks FAILED!</b> Time cost: {end_time - start_time}\n\n',
'<b>Exception is following above:</b>\n']
exception_path = osp.join(output_cfg['root'], 'exceptions.txt')
with open(exception_path) as f:
contents += f.readlines()
print('Training tasks FAILED! Mail will be sent >>> {}'.format(self.mail_cfg['to']))
self.mail.send('Training Tasks FAILED!', contents)
return
# after finished, results will be sent to your email
end_time = datetime.datetime.now()
contents = [f'<b>Training tasks FINISHED!</b> Time cost: {end_time - start_time}\n\n',
'<b>Results are following above:</b>\n']
for result_path in result_paths:
contents += [f'\n<b>{result_path}</b>\n']
with open(result_path) as f:
contents += f.readlines()
print('Training tasks FINISHED! Mail will be sent >>> {}'.format(self.mail_cfg['to']))
self.mail.send('Training Tasks FINISHED!', contents)
def run_grid_serach(self):
""" run if grid search is enabled """
output_cfg = self.output_cfg
root = output_cfg['root']
# parse gird search params
dirnames, opts_list = self.get_grid_search_opts()
print('Grid search opts:')
for opts in opts_list:
print(opts)
print()
result_paths = []
for idx, (dirname, opts) in enumerate(zip(dirnames, opts_list)):
# run single task for each grid search param group
print(f'[{idx + 1} / {len(dirnames)}] Running task {opts}\n')
output_cfg['root'] = osp.join(root, dirname)
self.dirname = dirname
result_paths.append(self.run_single(opts))
return result_paths
def run_single(self, opts=[]):
cfg = self.cfg
train_cfg = self.train_cfg
grid_search_cfg = self.grid_search_cfg
output_cfg = self.output_cfg
# get command
if cfg['mode'] == 'b2n':
commands = self.get_base_to_new_commands(opts)
else:
commands = self.get_cross_dataset_commands(opts)
# add command
for command in commands:
self.allocater.add_command(command)
# run command
self.allocater.run()
# save result
if not grid_search_cfg['enable']:
filename = '{}-{}-{}.csv'.format(cfg['mode'], train_cfg['trainer'], train_cfg['cfg'])
else:
filename = '{}-{}-{}-{}.csv'.format(cfg['mode'], train_cfg['trainer'], train_cfg['cfg'], self.dirname)
os.makedirs(output_cfg['result'], exist_ok=True)
result_path = osp.join(output_cfg['result'], filename)
print(f'Results will be save >>> {result_path}')
parser = ResultParser(cfg['mode'], output_cfg['root'], result_path)
parser.parse_and_save()
return result_path
def get_base_to_new_commands(self, opts=[]):
data_cfg = self.data_cfg
train_cfg = self.train_cfg
output_cfg = self.output_cfg
data_root = data_cfg['root']
datasets = data_cfg['datasets_base_to_new']
trainer = train_cfg['trainer']
cfg = train_cfg['cfg']
seeds = train_cfg['seeds']
loadep = train_cfg['loadep']
shots = train_cfg['shots']
opts += train_cfg['opts']
root = output_cfg['root']
commands = []
# training on all datasets
for dataset in datasets:
for seed in seeds:
cmd = get_command(data_root, seed, trainer, dataset, cfg, root,
shots, dataset, loadep, opts, mode='b2n', train=True)
commands.append(cmd)
# testing on all datasets
for dataset in datasets:
for seed in seeds:
cmd = get_command(data_root, seed, trainer, dataset, cfg, root,
shots, dataset, loadep, opts, mode='b2n', train=False)
commands.append(cmd)
return commands
def get_cross_dataset_commands(self, opts):
data_cfg = self.data_cfg
train_cfg = self.train_cfg
output_cfg = self.output_cfg
data_root = data_cfg['root']
datasets = data_cfg['datasets_cross_dataset']
trainer = train_cfg['trainer']
cfg = train_cfg['cfg']
seeds = train_cfg['seeds']
loadep = train_cfg['loadep']
shots = train_cfg['shots']
opts += train_cfg['opts']
root = output_cfg['root']
commands = []
# training on image
load_dataset = 'imagenet'
for seed in seeds:
cmd = get_command(data_root, seed, trainer, load_dataset, cfg, root,
shots, load_dataset, loadep, opts, mode='xd', train=True)
commands.append(cmd)
# testing on other datasets
for dataset in datasets:
for seed in seeds:
cmd = get_command(data_root, seed, trainer, dataset, cfg, root,
shots, load_dataset, loadep, opts, mode='xd', train=False)
commands.append(cmd)
return commands
def get_grid_search_opts(self):
grid_search_cfg = self.grid_search_cfg
mode = grid_search_cfg['mode']
params = grid_search_cfg['params']
names = [param['name'] for param in params]
aliases = [param['alias'] for param in params]
values_list = [param['values'] for param in params]
# grid to sequential
if mode == 'grid' and len(names) > 1:
values_list = [list(arr.flatten()) for arr in np.meshgrid(*values_list)]
# build opts
dirnames, grid_search_opts_list = [], []
for i in range(len(values_list[0])):
values = [values[i] for values in values_list]
dirname, opts = [], []
for name, alias, value in zip(names, aliases, values):
dirname.append(f'{alias}{value}')
opts += [name, value]
dirname = '_'.join(dirname)
dirnames.append(dirname)
grid_search_opts_list.append(opts)
return dirnames, grid_search_opts_list
def main(args):
cfg = get_config(args.cfg)
runner = ParallelRunner(cfg)
runner.run()
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--cfg', type=str)
args = parser.parse_args()
main(args)