2727
2828from plasma .primitives .shots import ShotList
2929
30- from signal_names import *
30+ from signals import *
3131
3232#print("Importing numpy version"+np.__version__)
3333
@@ -45,14 +45,13 @@ def format_save_path(prepath,signal_path,shot_num):
4545 return prepath + signal_path + '/{}.txt' .format (shot_num )
4646
4747
48- def save_shot_signals (shot_num_queue ,complete_queue , c ,signals ,save_prepath ,machine ):
48+ def save_shot (shot_num_queue ,c ,signals ,save_prepath ,machine , sentinel = - 1 ):
4949 missing_values = 0
5050 # if machine == 'd3d':
5151 # reload(gadata) #reloads Gadata object with connection
5252 while True :
53- try :
54- shot_num = shot_num_queue .get (False )
55- except Queue .Empty :
53+ shot_num = shot_num_queue .get ()
54+ if shot_num == sentinel :
5655 break
5756 shot_complete = True
5857 for signal in signals :
@@ -61,12 +60,13 @@ def save_shot_signals(shot_num_queue,complete_queue,c,signals,save_prepath,machi
6160 success = False
6261 if os .path .isfile (save_path_full ):
6362 print ('-' ,end = '' )
63+ success = True
6464 else :
6565 try :
6666 try :
6767 time ,data ,mapping ,success = machine .fetch_data (signal ,shot_num ,c )
6868 except :
69- missing_values += 1
69+ # missing_values += 1
7070 print ('Signal {}, shot {} missing. Filling with zeros' .format (signal_path ,shot_num ))
7171 time ,data = create_missing_value_filler ()
7272
@@ -93,124 +93,82 @@ def save_shot_signals(shot_num_queue,complete_queue,c,signals,save_prepath,machi
9393 #only add shot to list if it was complete
9494 if shot_complete :
9595 print ('saved shot {}' .format (shot_num ))
96- complete_queue .put (shot_num )
96+ # complete_queue.put(shot_num)
9797 else :
9898 print ('shot {} not complete. removing from list.' .format (shot_num ))
9999 print ('Finished with {} missing values total' .format (missing_values ))
100+ return
101+
102+
103+ def download_shot_numbers (shot_numbers ,save_prepath ,machine ,max_cores = 2 ):
104+ sentinel = - 1
105+ fn = partial (save_shot ,signals = signals ,save_prepath = save_prepath ,machine = machine ,sentinel = sentinel )
106+ num_cores = min (mp .cpu_count (),max_cores ) #can only handle 8 connections at once :(
107+ queue = mp .Queue ()
108+ #complete_shots = Array('i',zeros(len(shot_numbers)))# = mp.Queue()
109+
110+ assert (len (shot_numbers ) < 30000 ) # mp.queue can't handle larger queues yet!
111+ for shot_num in shot_numbers :
112+ queue .put (shot_num )
113+ for i in range (num_cores ):
114+ queue .put (sentinel )
115+ connections = [Connection (machine .server ) for _ in range (num_cores )]
116+ processes = [mp .Process (target = fn ,args = (queue ,connections [i ])) for i in range (num_cores )]
117+
118+ print ('running in parallel on {} processes' .format (num_cores ))
119+
120+ for p in processes :
121+ p .start ()
122+ for p in processes :
123+ p .join ()
124+
125+
126+ def download_all_shot_numbers (prepath ,save_path ,shot_numbers_path ,shot_numbers_files ,machine ,max_cores ):
127+ max_len = 30000
128+ save_prepath = prepath + save_path + '/' + machine .name + '/'
129+ shot_numbers ,_ = ShotList .get_multiple_shots_and_disruption_times (prepath + shot_numbers_path ,shot_numbers_files )
130+ shot_numbers_chunks = [shot_numbers [i :i + max_len ] for i in xrange (0 ,len (shot_numbers ),max_len )]#can only use queue of max size 30000
131+ start_time = time .time ()
132+ for shot_numbers_chunk in shot_numbers_chunks :
133+ download_shot_numbers (shot_numbers_chunk ,save_prepath ,machine ,max_cores )
134+
135+ print ('Finished downloading {} shots in {} seconds' .format (len (shot_numbers ),time .time ()- start_time ))
136+
137+
138+
100139
101140
102141
103142prepath = '/cscratch/share/frnn/'
104143shot_numbers_path = 'shot_lists/'
105144save_path = 'signal_data/'
106145machine = d3d
107- signals = all_signals
108-
109-
110-
111-
112- save_prepath = prepath + save_path + '/' + machine + '/'
113-
114- shot_numbers ,_ = ShotList .get_multiple_shots_and_disruption_times (prepath + shot_numbers_path ,shot_numbers_files )
115-
116-
117- fn = partial (save_shot ,signals = signals ,save_prepath = save_prepath ,machine = machine )
118- num_cores = min (mp .cpu_count (),32 ) #can only handle 8 connections at once :(
119- queue = mp .Queue ()
120- complete_queue = mp .Queue ()
121- for shot_num in shot_numbers :
122- queue .put (shot_num )
123- connections = [Connection (server_path ) for _ in range (num_cores )]
124- processes = [mp .Process (target = fn ,args = (queue ,connections [i ])) for i in range (num_cores )]
125-
126- print ('running in parallel on {} processes' .format (num_cores ))
127- start_time = time .time ()
128-
129- for p in processes :
130- p .start ()
131- for p in processes :
132- p .join ()
133-
134- complete_shot_numbers = list (complete_queue )
135- print ('Finished downloading {} ({} complete) shots in {} seconds' .format (len (shot_numbers ),len (complete_shot_numbers ),time .time ()- start_time ))
136-
137-
138- # c = Connection(server_path)
139-
140- # pool = mp.Pool()
141- # for shot_num in shot_numbers:
142- # # save_shot(shot_num,signal_paths,save_prepath,machine,c)
143- # for (i,_) in enumerate(pool.imap_unordered(fn,shot_numbers)):
144- # print('{}/{}'.format(i,len(shot_numbers)))
145-
146- # pool.close()
147- # pool.join()
148-
149-
150-
151-
152-
153- # def save_shot(shot_num_queue,c,signal_paths,save_prepath,machine):
154- # missing_values = 0
155- # # if machine == 'd3d':
156- # # reload(gadata) #reloads Gadata object with connection
157- # while True:
158- # try:
159- # shot_num = shot_num_queue.get(False)
160- # except Queue.Empty:
161- # break
162- # for signal_path in signal_paths:
163- # save_path_full = format_save_path(save_prepath,signal_path,shot_num)
164- # if os.path.isfile(save_path_full):
165- # print('-',end='')
166- # else:
167- # try:
168- # if machine == 'nstx':
169- # tree,tag = get_tree_and_tag(signal_path)
170- # c.openTree(tree,shot_num)
171- # data = c.get(tag).data()
172- # time = c.get('dim_of('+tag+')').data()
173- # elif machine == 'jet':
174- # try:
175- # data = c.get('_sig=jet("{}/",{})'.format(signal_path,shot_num)).data()
176- # time = c.get('_sig=dim_of(jet("{}/",{}))'.format(signal_path,shot_num)).data()
177- # except:
178- # missing_values += 1
179- # print('Signal {}, shot {} missing. Filling with zeros'.format(signal_path,shot_num))
180- # time,data = create_missing_value_filler()
181- # elif machine == 'd3d':
182- # tree,tag = get_tree_and_tag_no_backslash(signal_path)
183- # try:
184- # ga1 = gadata.gadata('{}'.format(tag),shot_num,tree=tree,connection=c)
185- # if not ga1.found:
186- # raise
187- # # ga1 = gadata.gadata('\\{}'.format(signal_path),shot_num,tree='d3d',connection=c)
188- # data = ga1.zdata
189- # time = ga1.xdata
190- # except:
191- # missing_values += 1
192- # print('Signal {}, shot {} missing. Filling with zeros'.format(signal_path,shot_num))
193- # time,data = create_missing_value_filler()
194-
195- # data_two_column = np.vstack((np.atleast_2d(time),np.atleast_2d(data))).transpose()
196- # try: #can lead to race condition
197- # mkdirdepth(save_path_full)
198- # except OSError, e:
199- # if e.errno == errno.EEXIST:
200- # # File exists, and it's a directory, another process beat us to creating this dir, that's OK.
201- # pass
202- # else:
203- # # Our target dir exists as a file, or different error, reraise the error!
204- # raise
205- # np.savetxt(save_path_full,data_two_column,fmt = '%.5e')#fmt = '%f %f')
206- # print('.',end='')
207- # except:
208- # print('Could not save shot {}, signal {}'.format(shot_num,signal_path))
209- # print('Warning: Incomplete!!!')
210- # raise
211- # sys.stdout.flush()
212- # print('saved shot {}'.format(shot_num))
213- # print('Finished with {} missing values total'.format(missing_values))
146+ signals = d3d_signals
147+
148+ #nstx
149+ # shot_numbers_files = ['disrupt_nstx.txt'] #nstx
150+
151+ #d3d
152+ shot_numbers_files = ['shotlist_JaysonBarr_clear.txt' ]
153+ shot_numbers_files += ['shotlist_JaysonBarr_disrupt.txt' ]
154+ # #shot_numbers_files = ['d3d_short_clear.txt']# ,'d3d_clear.txt', 'd3d_disrupt.txt']
155+
156+ #jet
157+ # shot_numbers_files = ['CWall_clear.txt','CFC_unint.txt','BeWall_clear.txt','ILW_unint.txt']#jet
158+
159+ max_cores = 32
160+ download_all_shot_numbers (prepath ,save_path ,shot_numbers_path ,shot_numbers_files ,machine ,max_cores )
161+
162+
163+ #complete_shot_numbers = []
164+ #print(complete_queue)
165+ #print(complete_queue.qsize())
166+ #for i in range(len(complete_shots)):
167+ # if complete_shots[i]:
168+ # complete_shot_numbers.append(shot_numbers[i])
169+ #while not complete_queue.empty():
170+ # complete_shot_numbers.append(complete_queue.get(False))
171+
214172
215173
216174# if machine == 'nstx':
@@ -284,13 +242,3 @@ def save_shot_signals(shot_num_queue,complete_queue,c,signals,save_prepath,machi
284242# signal_paths += ['ppf/kk3/rc{:02d}'.format(i) for i in range(1,97)]
285243
286244
287-
288-
289-
290-
291-
292- # else:
293- # print('unkown machine. exiting')
294- # exit(1)
295-
296-
0 commit comments