@@ -65,6 +65,25 @@ def compress_zarr_dataset(data, file_path, compression='lz4', clevel=5, start_id
65
65
return nan_detected
66
66
67
67
68
+ def check_for_moe (model_dir : str ):
69
+ """
70
+ Extracts the number of phases from the given model directory.
71
+ Returns true if mixture of experts is used.
72
+ The second return argument is the number of phases.
73
+ :param model_dir: Model directory, where either the model directly is stored or the number of phase directories.
74
+ :return: is_moe: bool, number_phases: int or None
75
+ """
76
+ number_phases = 0
77
+ is_moe = False
78
+ for entry in os .listdir (model_dir ):
79
+ if entry .startswith ("phase" ) and "None" not in entry :
80
+ number_phases += 1
81
+ is_moe = True
82
+ if not is_moe :
83
+ number_phases = None
84
+ return is_moe , number_phases
85
+
86
+
68
87
class FileIO :
69
88
"""
70
89
Class to facilitate creation of directories, reading of file
@@ -100,6 +119,17 @@ def __init__(self, orig_binary_name: str, binary_dir: str, uci_variant: str):
100
119
101
120
self .timestamp_format = "%Y-%m-%d-%H-%M-%S"
102
121
122
+ self .is_moe , self .number_phases = check_for_moe (self .model_dir )
123
+
124
+ # Whether to use Staged learning v2.0 for MoE training,
125
+ # i.e. first train on full data and then each phase separately
126
+ self .use_moe_staged_learning = True if os .path .isdir (self .model_dir + "phaseNone" ) else False
127
+
128
+ if self .is_moe :
129
+ logging .info (f"Mixture of experts detected with { self .number_phases } phases." )
130
+ logging .info (f"Use MoE staged learning is { self .use_moe_staged_learning } ." )
131
+ else :
132
+ logging .info ("No mixture of experts detected." )
103
133
self ._create_directories ()
104
134
105
135
# Adjust paths in main_config
@@ -124,14 +154,42 @@ def _create_directories(self):
124
154
create_dir (self .model_dir_archive )
125
155
create_dir (self .logs_dir_archive )
126
156
127
- def _include_data_from_replay_memory (self , nb_files : int , fraction_for_selection : float ):
157
+ if self .is_moe :
158
+ for directory in [self .export_dir_gen_data , self .train_dir , self .val_dir , self .train_dir_archive ,
159
+ self .val_dir_archive , self .model_contender_dir , self .model_dir_archive ]:
160
+ for phase_idx in range (self .number_phases ):
161
+ create_dir (directory + f"phase{ phase_idx } " )
162
+ if self .use_moe_staged_learning :
163
+ create_dir (self .model_contender_dir + "phaseNone" )
164
+ create_dir (self .model_dir_archive + "phaseNone" )
165
+
166
+ def _include_data_from_replay_memory_wrapper (self , nb_files : int , fraction_for_selection : float ):
128
167
"""
168
+ Wrapper for _include_data_from_replay_memory() which handles MoE and non MoE cases.
169
+ :param nb_files: Number of files to include from replay memory into training
170
+ :param fraction_for_selection: Proportion for selecting files from the replay memory
171
+ """
172
+
173
+ if not self .is_moe :
174
+ self ._include_data_from_replay_memory (self .train_dir_archive , self .train_dir , nb_files ,
175
+ fraction_for_selection )
176
+ else :
177
+ for phase_idx in range (self .number_phases ):
178
+ self ._include_data_from_replay_memory (self .train_dir_archive + f"phase{ phase_idx } /" ,
179
+ self .train_dir + f"phase{ phase_idx } /" , nb_files ,
180
+ fraction_for_selection )
181
+
182
+ def _include_data_from_replay_memory (self , from_dir : str , to_dir : str , nb_files : int , fraction_for_selection : float ):
183
+ """
184
+ Moves data from the from_dir directory to the to_dir directory.
185
+ :param from_dir: Usually train_dir_archive
186
+ :param to_dir: Usually train_dir
129
187
:param nb_files: Number of files to include from replay memory into training
130
188
:param fraction_for_selection: Proportion for selecting files from the replay memory
131
189
:return:
132
190
"""
133
191
# get all file/folder names ignoring hidden files
134
- folder_names = [folder_name for folder_name in os .listdir (self . train_dir_archive )
192
+ folder_names = [folder_name for folder_name in os .listdir (from_dir )
135
193
if not folder_name .startswith ('.' )]
136
194
137
195
if len (folder_names ) < nb_files :
@@ -153,90 +211,160 @@ def _include_data_from_replay_memory(self, nb_files: int, fraction_for_selection
153
211
154
212
# move selected files into train dir
155
213
for index in list (indices ):
156
- os .rename (self . train_dir_archive + folder_names [index ], self . train_dir + folder_names [index ])
214
+ os .rename (from_dir + folder_names [index ], to_dir + folder_names [index ])
157
215
158
216
def _move_generated_data_to_train_val (self ):
159
217
"""
160
218
Moves the generated samples, games (pgn format) and the number how many games have been generated to the given
161
219
training and validation directory
162
220
:return:
163
221
"""
164
- file_names = os .listdir (self .export_dir_gen_data )
222
+ if not self .is_moe :
223
+ file_names = os .listdir (self .export_dir_gen_data )
224
+
225
+ # move the last file into the validation directory
226
+ os .rename (self .export_dir_gen_data + file_names [- 1 ], self .val_dir + file_names [- 1 ])
227
+
228
+ # move the rest into the training directory
229
+ for file_name in file_names [:- 1 ]:
230
+ os .rename (self .export_dir_gen_data + file_name , self .train_dir + file_name )
231
+ else :
232
+ for phase_idx in range (self .number_phases ):
233
+ file_names = os .listdir (self .export_dir_gen_data + f"/phase{ phase_idx } " )
165
234
166
- # move the last file into the validation directory
167
- os .rename (self .export_dir_gen_data + file_names [- 1 ], self .val_dir + file_names [- 1 ])
235
+ # move the last file into the validation directory
236
+ os .rename (self .export_dir_gen_data + f"/phase{ phase_idx } /" + file_names [- 1 ],
237
+ self .val_dir + f"/phase{ phase_idx } /" + file_names [- 1 ])
168
238
169
- # move the rest into the training directory
170
- for file_name in file_names [:- 1 ]:
171
- os .rename (self .export_dir_gen_data + file_name , self .train_dir + file_name )
239
+ # move the rest into the training directory
240
+ for file_name in file_names [:- 1 ]:
241
+ os .rename (self .export_dir_gen_data + f"/phase{ phase_idx } /" + file_name ,
242
+ self .train_dir + f"/phase{ phase_idx } /" + file_name )
172
243
173
244
def _move_train_val_data_into_archive (self ):
174
245
"""
175
246
Moves files from training, validation dir into archive directory
176
247
:return:
177
248
"""
178
- move_all_files (self .train_dir , self .train_dir_archive )
179
- move_all_files (self .val_dir , self .val_dir_archive )
249
+ self . _move_all_files_wrapper (self .train_dir , self .train_dir_archive )
250
+ self . _move_all_files_wrapper (self .val_dir , self .val_dir_archive )
180
251
181
252
def _remove_files_in_weight_dir (self ):
182
253
"""
183
254
Removes all files in the weight directory.
184
255
:return:
185
256
"""
186
- file_list = glob .glob (os .path .join (self .weight_dir , "model-*" ))
187
- for file in file_list :
188
- os .remove (file )
257
+ if not self .is_moe :
258
+ file_list = glob .glob (os .path .join (self .weight_dir , "model-*" ))
259
+ for file in file_list :
260
+ os .remove (file )
261
+ else :
262
+ for phase_idx in range (self .number_phases ):
263
+ file_list = glob .glob (os .path .join (self .weight_dir , f"phase{ phase_idx } /model-*" ))
264
+ for file in file_list :
265
+ os .remove (file )
189
266
190
- def compress_dataset (self , device_name : str ):
267
+ def _compress_single_dataset (self , phase : str , device_name : str ):
191
268
"""
192
- Loads the uncompressed data file, selects all sample until the index specified in "startIdx.txt",
193
- compresses it and exports it.
194
- :param device_name: The currently active device name (context_device-id)
195
- :return:
269
+ Loads a single uncompressed data file, selects all samples, compresses it and exports it.
270
+ :param phase: Phase to use, e.g. "phase0/", "phase1". Is empty string for no phase ("").
271
+ :return: export_dir: str
196
272
"""
197
- data = zarr .load (self .binary_dir + "data_" + device_name + ".zarr" )
273
+ data = zarr .load (self .binary_dir + phase + "data_" + device_name + ".zarr" )
198
274
199
- export_dir , time_stamp = self .create_export_dir (device_name )
275
+ export_dir , time_stamp = self .create_export_dir (phase , device_name )
200
276
zarr_path = export_dir + time_stamp + ".zip"
201
- nan_detected = compress_zarr_dataset (data , zarr_path , start_idx = 0 )
277
+
278
+ end_idx = self ._retrieve_end_idx (data )
279
+
280
+ nan_detected = compress_zarr_dataset (data , zarr_path , start_idx = 0 , end_idx = end_idx )
202
281
if nan_detected is True :
203
282
logging .error ("NaN value detected in file %s.zip" % time_stamp )
204
283
new_export_dir = self .binary_dir + time_stamp
205
284
os .rename (export_dir , new_export_dir )
206
285
export_dir = new_export_dir
207
- self .move_game_data_to_export_dir (export_dir , device_name )
208
286
209
- def create_export_dir (self , device_name : str ) -> (str , str ):
287
+ return export_dir
288
+
289
+ def _retrieve_end_idx (self , data ):
290
+ """
291
+ Checks the y_policy sum in the data for is_moe is False and
292
+ returns the first occurence of only 0s.
293
+ An end_idx of 0 means the whole dataset will be used
294
+ :param data: Zarr data object
295
+ :return: end_idx
296
+ """
297
+ if self .is_moe is False :
298
+ return 0
299
+
300
+ sum_y_policy = data ['y_policy' ].sum (axis = 1 )
301
+ potential_end_idx = sum_y_policy .argmin ()
302
+ if sum_y_policy [potential_end_idx ] == 0 :
303
+ return potential_end_idx
304
+ return 0
305
+
306
+ def compress_dataset (self , device_name : str ):
307
+ """
308
+ Calls _compress_single_dataset() for each phase or a single time for no phases.
309
+ Also moves the game data to export directory.
310
+ :param device_name: The currently active device name (context_device-id)
311
+ :return:
312
+ """
313
+ if self .is_moe :
314
+ for phase_idx in range (self .number_phases ):
315
+ export_dir = self ._compress_single_dataset (f"phase{ phase_idx } /" , device_name )
316
+ if phase_idx == 0 :
317
+ self .move_game_data_to_export_dir (export_dir , device_name )
318
+ else :
319
+ export_dir = self ._compress_single_dataset ("" , device_name )
320
+ self .move_game_data_to_export_dir (export_dir , device_name )
321
+
322
+ def create_export_dir (self , phase : str , device_name : str ) -> (str , str ):
210
323
"""
211
324
Create a directory in the 'export_dir_gen_data' path,
212
325
where the name consists of the current date, time and device ID.
326
+ :param phase: Phase to use, e.g. "phase0/", "phase1". Is empty string for no phase ("").
213
327
:param device_name: The currently active device name (context_device-id)
214
328
:return: Path of the created directory; Time stamp used while creating
215
329
"""
216
330
# include current timestamp in dataset export file
217
331
time_stamp = datetime .datetime .fromtimestamp (time .time ()).strftime (self .timestamp_format )
218
- time_stamp_dir = f'{ self .export_dir_gen_data } { time_stamp } -{ device_name } /'
332
+ time_stamp_dir = f'{ self .export_dir_gen_data } { phase } { time_stamp } -{ device_name } /'
219
333
# create a directory of the current time_stamp
220
334
if not os .path .exists (time_stamp_dir ):
221
335
os .makedirs (time_stamp_dir )
222
336
223
337
return time_stamp_dir , time_stamp
224
338
225
- def get_current_model_tar_file (self ) -> str :
339
+ def get_current_model_tar_file (self , phase = None , base_dir = None ) -> str :
226
340
"""
341
+ :param phase: Phase to use. Should be "" if no MoE is used and otherwise e.g. "phase2".
342
+ :param base_dir: Should be self.model_dir in the normal case
343
+ For None default "phase0" or "" will be used.
227
344
Return the filename of the current active model weight (.tar) file for pytorch
228
345
"""
229
- model_params = glob .glob (self .model_dir + "/*.tar" )
346
+ if phase is None :
347
+ if self .is_moe :
348
+ phase = "phase0"
349
+ else :
350
+ phase = ""
351
+ if base_dir is None :
352
+ base_dir = self .model_dir
353
+ model_params = glob .glob (base_dir + phase + "/*.tar" )
230
354
if len (model_params ) == 0 :
231
355
raise FileNotFoundError (f'No model file found in { self .model_dir } ' )
232
356
return model_params [0 ]
233
357
234
358
def get_number_generated_files (self ) -> int :
235
359
"""
236
360
Returns the amount of file that have been generated since the last training run.
237
- :return:
361
+ :return: nb_training_files: int
238
362
"""
239
- return len (glob .glob (self .export_dir_gen_data + "**/*.zip" ))
363
+ if self .is_moe :
364
+ phase = "phase0/"
365
+ else :
366
+ phase = ""
367
+ return len (glob .glob (self .export_dir_gen_data + phase + "**/*.zip" ))
240
368
241
369
def move_game_data_to_export_dir (self , export_dir : str , device_name : str ):
242
370
"""
@@ -271,12 +399,12 @@ def prepare_data_for_training(self, rm_nb_files: int, rm_fraction_for_selection:
271
399
if did_contender_win :
272
400
self ._move_train_val_data_into_archive ()
273
401
# move last contender into archive
274
- move_all_files (self .model_contender_dir , self .model_dir_archive )
402
+ self . _move_all_files_wrapper (self .model_contender_dir , self .model_dir_archive )
275
403
276
404
self ._move_generated_data_to_train_val ()
277
405
# We don’t need them anymore; the last model from last training has already been saved
278
406
self ._remove_files_in_weight_dir ()
279
- self ._include_data_from_replay_memory (rm_nb_files , rm_fraction_for_selection )
407
+ self ._include_data_from_replay_memory_wrapper (rm_nb_files , rm_fraction_for_selection )
280
408
281
409
def remove_intermediate_weight_files (self ):
282
410
"""
@@ -288,10 +416,29 @@ def remove_intermediate_weight_files(self):
288
416
for f in files :
289
417
os .remove (f )
290
418
419
+ def _move_all_files_wrapper (self , from_dir , to_dir ):
420
+ """
421
+ Wrapper function for move_all_files(from_dir, to_dir).
422
+ In case of self.is_moe, all phases directories are moved as well.
423
+ :param from_dir: Origin directory where the files are located
424
+ :param to_dir: Destination directory where all files (including subdirectories directories) will be moved
425
+ :return:
426
+ """
427
+ if not self .is_moe :
428
+ move_all_files (from_dir , to_dir )
429
+ else :
430
+ for phase_idx in range (self .number_phases ):
431
+ move_all_files (from_dir + f"phase{ phase_idx } /" , to_dir + f"phase{ phase_idx } /" )
432
+
433
+ if self .use_moe_staged_learning :
434
+ from_dir_final = from_dir + "phaseNone/"
435
+ to_dir_final = to_dir + "phaseNone/"
436
+ if os .path .isdir (from_dir_final ) and os .path .isdir (to_dir_final ):
437
+ move_all_files (from_dir_final , to_dir_final )
438
+
291
439
def replace_current_model_with_contender (self ):
292
440
"""
293
441
Moves the previous model into archive directory and the model-contender into the model directory
294
442
"""
295
- move_all_files (self .model_dir , self .model_dir_archive )
296
- move_all_files (self .model_contender_dir , self .model_dir )
297
-
443
+ self ._move_all_files_wrapper (self .model_dir , self .model_dir_archive )
444
+ self ._move_all_files_wrapper (self .model_contender_dir , self .model_dir )
0 commit comments