Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
152 changes: 73 additions & 79 deletions core.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,33 @@
from data_io import read_lab_fea,open_or_fd,write_mat
from utils import shift

def run_nn_refac01(data_name,data_set,data_end_index,fea_dict,lab_dict,arch_dict,cfg_file,processed_first,next_config_file,dry_run=False):
def read_next_chunk_into_shared_list_with_subprocess(read_lab_fea, shared_list, cfg_file, is_production, output_folder, wait_for_process):
p=threading.Thread(target=read_lab_fea, args=(cfg_file,is_production,shared_list,output_folder,))
p.start()
if wait_for_process:
p.join()
return None
else:
return p
def extract_data_from_shared_list(shared_list):
data_name = shared_list[0]
data_end_index_fea = shared_list[1]
data_end_index_lab = shared_list[2]
fea_dict = shared_list[3]
lab_dict = shared_list[4]
arch_dict = shared_list[5]
data_set = shared_list[6]
return data_name, data_end_index_fea, data_end_index_lab, fea_dict, lab_dict, arch_dict, data_set
def convert_numpy_to_torch(data_set_dict, save_gpumem, use_cuda):
if not(save_gpumem) and use_cuda:
data_set_inp=torch.from_numpy(data_set_dict['input']).float().cuda()
data_set_ref=torch.from_numpy(data_set_dict['ref']).float().cuda()
else:
data_set_inp=torch.from_numpy(data_set_dict['input']).float()
data_set_ref=torch.from_numpy(data_set_dict['ref']).float()
data_set_ref = data_set_ref.view((data_set_ref.shape[0], 1))
return data_set_inp, data_set_ref
def run_nn_refac01(data_name,data_set,data_end_index,fea_dict,lab_dict,arch_dict,cfg_file,processed_first,next_config_file):
def _read_chunk_specific_config(cfg_file):
if not(os.path.exists(cfg_file)):
sys.stderr.write('ERROR: The config file %s does not exist!\n'%(cfg_file))
Expand All @@ -30,23 +56,6 @@ def _read_chunk_specific_config(cfg_file):
config = configparser.ConfigParser()
config.read(cfg_file)
return config
def _read_next_chunk_into_shared_list_with_subprocess(read_lab_fea, shared_list, cfg_file, is_production, output_folder, wait_for_process):
p=threading.Thread(target=read_lab_fea, args=(cfg_file,is_production,shared_list,output_folder,))
p.start()
if wait_for_process:
p.join()
return None
else:
return p
def _extract_data_from_shared_list(shared_list):
data_name = shared_list[0]
data_end_index_fea = shared_list[1]
data_end_index_lab = shared_list[2]
fea_dict = shared_list[3]
lab_dict = shared_list[4]
arch_dict = shared_list[5]
data_set = shared_list[6]
return data_name, data_end_index_fea, data_end_index_lab, fea_dict, lab_dict, arch_dict, data_set
def _get_batch_size_from_config(config, to_do):
if to_do=='train':
batch_size=int(config['batches']['batch_size_train'])
Expand All @@ -60,15 +69,6 @@ def _initialize_random_seed(config):
torch.manual_seed(seed)
random.seed(seed)
np.random.seed(seed)
def _convert_numpy_to_torch(data_set_dict, save_gpumem, use_cuda):
if not(save_gpumem) and use_cuda:
data_set_inp=torch.from_numpy(data_set_dict['input']).float().cuda()
data_set_ref=torch.from_numpy(data_set_dict['ref']).float().cuda()
else:
data_set_inp=torch.from_numpy(data_set_dict['input']).float()
data_set_ref=torch.from_numpy(data_set_dict['ref']).float()
data_set_ref = data_set_ref.view((data_set_ref.shape[0], 1))
return data_set_inp, data_set_ref
def _load_model_and_optimizer(fea_dict,model,config,arch_dict,use_cuda,multi_gpu,to_do):
inp_out_dict = fea_dict
nns, costs = model_init(inp_out_dict,model,config,arch_dict,use_cuda,multi_gpu,to_do)
Expand Down Expand Up @@ -221,16 +221,18 @@ def _get_dim_from_data_set(data_set_inp, data_set_ref):

if processed_first:
shared_list = list()
p = _read_next_chunk_into_shared_list_with_subprocess(read_lab_fea, shared_list, cfg_file, is_production, output_folder, wait_for_process=True)
data_name, data_end_index_fea, data_end_index_lab, fea_dict, lab_dict, arch_dict, data_set_dict = _extract_data_from_shared_list(shared_list)
data_set_inp, data_set_ref = _convert_numpy_to_torch(data_set_dict, save_gpumem, use_cuda)
p = read_next_chunk_into_shared_list_with_subprocess(read_lab_fea, shared_list, cfg_file, is_production, output_folder, wait_for_process=True)
data_name, data_end_index_fea, data_end_index_lab, fea_dict, lab_dict, arch_dict, data_set_dict = extract_data_from_shared_list(shared_list)
data_set_inp, data_set_ref = convert_numpy_to_torch(data_set_dict, save_gpumem, use_cuda)
else:
data_set_inp = data_set['input']
data_set_ref = data_set['ref']
data_end_index_fea = data_end_index['fea']
data_end_index_lab = data_end_index['lab']
shared_list = list()
data_loading_process = _read_next_chunk_into_shared_list_with_subprocess(read_lab_fea, shared_list, next_config_file, is_production, output_folder, wait_for_process=False)
data_loading_process = None
if not next_config_file is None:
data_loading_process = read_next_chunk_into_shared_list_with_subprocess(read_lab_fea, shared_list, next_config_file, is_production, output_folder, wait_for_process=False)
nns, costs, optimizers, inp_out_dict = _load_model_and_optimizer(fea_dict,model,config,arch_dict,use_cuda,multi_gpu,to_do)
if to_do=='forward':
post_file = _open_forward_output_files_and_get_file_handles(forward_outs, require_decodings, info_file, output_folder)
Expand All @@ -253,25 +255,22 @@ def _get_dim_from_data_set(data_set_inp, data_set_ref):
start_time = time.time()
for i in range(N_batches):
inp, ref, max_len_fea, max_len_lab, snt_len_fea, snt_len_lab, beg_snt_fea, beg_snt_lab, snt_index = _prepare_input(snt_index, batch_size, data_set_inp_dim, data_set_ref_dim, beg_snt_fea, beg_snt_lab, data_end_index_fea, data_end_index_lab, beg_batch, end_batch, seq_model, arr_snt_len_fea, arr_snt_len_lab, data_set_inp, data_set_ref, use_cuda)
if dry_run:
outs_dict = dict()
if to_do=='train':
outs_dict = forward_model(fea_dict, lab_dict, arch_dict, model, nns, costs, inp, ref, inp_out_dict, max_len_fea, max_len_lab, batch_size, to_do, forward_outs)
_optimization_step(optimizers, outs_dict, config, arch_dict)
else:
if to_do=='train':
with torch.no_grad():
outs_dict = forward_model(fea_dict, lab_dict, arch_dict, model, nns, costs, inp, ref, inp_out_dict, max_len_fea, max_len_lab, batch_size, to_do, forward_outs)
_optimization_step(optimizers, outs_dict, config, arch_dict)
else:
with torch.no_grad():
outs_dict = forward_model(fea_dict, lab_dict, arch_dict, model, nns, costs, inp, ref, inp_out_dict, max_len_fea, max_len_lab, batch_size, to_do, forward_outs)
if to_do == 'forward':
for out_id in range(len(forward_outs)):
out_save = outs_dict[forward_outs[out_id]].data.cpu().numpy()
if forward_normalize_post[out_id]:
counts = load_counts(forward_count_files[out_id])
out_save=out_save-np.log(counts/np.sum(counts))
write_mat(output_folder,post_file[forward_outs[out_id]], out_save, data_name[i])
else:
loss_sum=loss_sum+outs_dict['loss_final'].detach()
err_sum=err_sum+outs_dict['err_final'].detach()
if to_do == 'forward':
for out_id in range(len(forward_outs)):
out_save = outs_dict[forward_outs[out_id]].data.cpu().numpy()
if forward_normalize_post[out_id]:
counts = load_counts(forward_count_files[out_id])
out_save=out_save-np.log(counts/np.sum(counts))
write_mat(output_folder,post_file[forward_outs[out_id]], out_save, data_name[i])
else:
loss_sum=loss_sum+outs_dict['loss_final'].detach()
err_sum=err_sum+outs_dict['err_final'].detach()
beg_batch=end_batch
end_batch=beg_batch+batch_size
_update_progress_bar(to_do, i, N_batches, loss_sum)
Expand All @@ -286,13 +285,16 @@ def _get_dim_from_data_set(data_set_inp, data_set_ref):
_write_info_file(info_file, to_do, loss_tot, err_tot, elapsed_time_chunk)
if not data_loading_process is None:
data_loading_process.join()
data_name, data_end_index_fea, data_end_index_lab, fea_dict, lab_dict, arch_dict, data_set_dict = _extract_data_from_shared_list(shared_list)
data_set_inp, data_set_ref = _convert_numpy_to_torch(data_set_dict, save_gpumem, use_cuda)
data_set = {'input': data_set_inp, 'ref': data_set_ref}
data_end_index = {'fea': data_end_index_fea,'lab': data_end_index_lab}
return [data_name,data_set,data_end_index,fea_dict,lab_dict,arch_dict]
data_name, data_end_index_fea, data_end_index_lab, fea_dict, lab_dict, arch_dict, data_set_dict = extract_data_from_shared_list(shared_list)
data_set_inp, data_set_ref = convert_numpy_to_torch(data_set_dict, save_gpumem, use_cuda)
data_set = {'input': data_set_inp, 'ref': data_set_ref}
data_end_index = {'fea': data_end_index_fea,'lab': data_end_index_lab}
return [data_name,data_set,data_end_index,fea_dict,lab_dict,arch_dict]
else:
return [None,None,None,None,None,None]


def run_nn(data_name,data_set,data_end_index,fea_dict,lab_dict,arch_dict,cfg_file,processed_first,next_config_file,dry_run=False):
def run_nn(data_name,data_set,data_end_index,fea_dict,lab_dict,arch_dict,cfg_file,processed_first,next_config_file):

# This function processes the current chunk using the information in cfg_file. In parallel, the next chunk is load into the CPU memory

Expand Down Expand Up @@ -479,17 +481,13 @@ def run_nn(data_name,data_set,data_end_index,fea_dict,lab_dict,arch_dict,cfg_fil

if to_do=='train':
# Forward input, with autograd graph active
if not dry_run:
outs_dict=forward_model(fea_dict,lab_dict,arch_dict,model,nns,costs,inp,inp_out_dict,max_len,batch_size,to_do,forward_outs)
else:
outs_dict = dict()
outs_dict=forward_model(fea_dict,lab_dict,arch_dict,model,nns,costs,inp,inp_out_dict,max_len,batch_size,to_do,forward_outs)

for opt in optimizers.keys():
optimizers[opt].zero_grad()


if not dry_run:
outs_dict['loss_final'].backward()
outs_dict['loss_final'].backward()

# Gradient Clipping (th 0.1)
#for net in nns.keys():
Expand All @@ -501,28 +499,24 @@ def run_nn(data_name,data_set,data_end_index,fea_dict,lab_dict,arch_dict,cfg_fil
optimizers[opt].step()
else:
with torch.no_grad(): # Forward input without autograd graph (save memory)
if not dry_run:
outs_dict=forward_model(fea_dict,lab_dict,arch_dict,model,nns,costs,inp,inp_out_dict,max_len,batch_size,to_do,forward_outs)
else:
outs_dict = dict()
outs_dict=forward_model(fea_dict,lab_dict,arch_dict,model,nns,costs,inp,inp_out_dict,max_len,batch_size,to_do,forward_outs)


if not dry_run:
if to_do=='forward':
for out_id in range(len(forward_outs)):

out_save=outs_dict[forward_outs[out_id]].data.cpu().numpy()
if to_do=='forward':
for out_id in range(len(forward_outs)):

out_save=outs_dict[forward_outs[out_id]].data.cpu().numpy()

if forward_normalize_post[out_id]:
# read the config file
counts = load_counts(forward_count_files[out_id])
out_save=out_save-np.log(counts/np.sum(counts))

if forward_normalize_post[out_id]:
# read the config file
counts = load_counts(forward_count_files[out_id])
out_save=out_save-np.log(counts/np.sum(counts))

# save the output
write_mat(output_folder,post_file[forward_outs[out_id]], out_save, data_name[i])
else:
loss_sum=loss_sum+outs_dict['loss_final'].detach()
err_sum=err_sum+outs_dict['err_final'].detach()
# save the output
write_mat(output_folder,post_file[forward_outs[out_id]], out_save, data_name[i])
else:
loss_sum=loss_sum+outs_dict['loss_final'].detach()
err_sum=err_sum+outs_dict['err_final'].detach()

# update it to the next batch
beg_batch=end_batch
Expand Down
66 changes: 39 additions & 27 deletions run_exp.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,22 @@
from utils import check_cfg,create_lists,create_configs, compute_avg_performance, \
read_args_command_line, run_shell,compute_n_chunks, get_all_archs,cfg_item2sec, \
dump_epoch_results, create_curves,change_lr_cfg,expand_str_ep
from data_io import read_lab_fea_refac01 as read_lab_fea
from shutil import copyfile
from core import read_next_chunk_into_shared_list_with_subprocess, extract_data_from_shared_list, convert_numpy_to_torch
import re
from distutils.util import strtobool
import importlib
import math
import multiprocessing

def _run_forwarding_in_subprocesses(config):
use_cuda=strtobool(config['exp']['use_cuda'])
if use_cuda:
return False
else:
return True

# Reading global cfg file (first argument-mandatory file)
cfg_file=sys.argv[1]
if not(os.path.exists(cfg_file)):
Expand Down Expand Up @@ -309,7 +318,8 @@
N_ck_forward=compute_n_chunks(out_folder,forward_data,ep,N_ep_str_format,'forward')
N_ck_str_format='0'+str(max(math.ceil(np.log10(N_ck_forward)),1))+'d'

kwargs_list = list()
processes = list()
info_files = list()
for ck in range(N_ck_forward):

if not is_production:
Expand All @@ -331,36 +341,38 @@
next_config_file=cfg_file_list[op_counter]

# run chunk processing
#[data_name,data_set,data_end_index,fea_dict,lab_dict,arch_dict]=run_nn(data_name,data_set,data_end_index,fea_dict,lab_dict,arch_dict,config_chunk_file,processed_first,next_config_file)
kwargs = dict()
for e in ['data_name','data_set','data_end_index','fea_dict','lab_dict','arch_dict','config_chunk_file','processed_first','next_config_file']:
if e == "config_chunk_file":
kwargs['cfg_file'] = eval(e)
else:
kwargs[e] = eval(e)
kwargs_list.append(kwargs)
[data_name,data_set,data_end_index,fea_dict,lab_dict,arch_dict]=run_nn(data_name,data_set,data_end_index,fea_dict,lab_dict,arch_dict,config_chunk_file,processed_first,next_config_file,dry_run=True)

if _run_forwarding_in_subprocesses(config):
shared_list = list()
output_folder = config['exp']['out_folder']
save_gpumem = strtobool(config['exp']['save_gpumem'])
use_cuda=strtobool(config['exp']['use_cuda'])
p = read_next_chunk_into_shared_list_with_subprocess(read_lab_fea, shared_list, config_chunk_file, is_production, output_folder, wait_for_process=True)
data_name, data_end_index_fea, data_end_index_lab, fea_dict, lab_dict, arch_dict, data_set_dict = extract_data_from_shared_list(shared_list)
data_set_inp, data_set_ref = convert_numpy_to_torch(data_set_dict, save_gpumem, use_cuda)
data_set = {'input': data_set_inp, 'ref': data_set_ref}
data_end_index = {'fea': data_end_index_fea,'lab': data_end_index_lab}
p = multiprocessing.Process(target=run_nn, kwargs={'data_name': data_name, 'data_set': data_set, 'data_end_index': data_end_index, 'fea_dict': fea_dict, 'lab_dict': lab_dict, 'arch_dict': arch_dict, 'cfg_file': config_chunk_file, 'processed_first': False, 'next_config_file': None})
processes.append(p)
p.start()
else:
[data_name,data_set,data_end_index,fea_dict,lab_dict,arch_dict]=run_nn(data_name,data_set,data_end_index,fea_dict,lab_dict,arch_dict,config_chunk_file,processed_first,next_config_file)
processed_first=False
if not(os.path.exists(info_file)):
sys.stderr.write("ERROR: forward chunk %i of dataset %s not done! File %s does not exist.\nSee %s \n" % (ck,forward_data,info_file,log_file))
sys.exit(0)

# update the first_processed variable
processed_first=False

if not(os.path.exists(info_file)):
sys.stderr.write("ERROR: forward chunk %i of dataset %s not done! File %s does not exist.\nSee %s \n" % (ck,forward_data,info_file,log_file))
sys.exit(0)

info_files.append(info_file)

# update the operation counter
op_counter+=1
processes = list()
for kwargs in kwargs_list:
p = multiprocessing.Process(target=run_nn, kwargs=kwargs)
processes.append(p)
p.start()
for process in processes:
process.join()


if _run_forwarding_in_subprocesses(config):
for process in processes:
process.join()
for info_file in info_files:
if not(os.path.exists(info_file)):
sys.stderr.write("ERROR: File %s does not exist. Forwarding did not suceed.\nSee %s \n" % (info_file,log_file))
sys.exit(0)



# --------DECODING--------#
Expand Down