Skip to content

Commit

Permalink
Passing vetoes to scripts in pygrb results sub workflow (gwastro#4933)
Browse files Browse the repository at this point in the history
* Passing segments files and vdf to all results scripts from pycbc_pygrb_results_workflow

* Generalised functions involved in loading trigger/injeciton data in pygrb

* Updating pycbc_pygrb_plot_skygrid accordingly

* Typo in comment

* No f-string in logging

* Improved a logging message and 2 code comments

* Renaming variable that was forgotten

* Removed redundant variable

* Blending _load_triggers into load_data

* Blending _load_triggers into load_data
  • Loading branch information
pannarale authored and prayush committed Nov 21, 2024
1 parent 39325f2 commit ca8e4ac
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 35 deletions.
21 changes: 4 additions & 17 deletions bin/pygrb/pycbc_pygrb_plot_skygrid
Original file line number Diff line number Diff line change
Expand Up @@ -42,18 +42,6 @@ __version__ = pycbc.version.git_verbose_msg
__date__ = pycbc.version.date
__program__ = "pycbc_pygrb_plot_skygrid"

# =============================================================================
# Functions
# =============================================================================
# Load trigger data
def load_data(input_file, ifos, vetoes, injections=False):
"""Load data from a trigger/injection file"""

logging.info("Loading triggers...")

trigs = ppu.load_triggers(input_file, ifos, vetoes)

return trigs

# =============================================================================
# Main script starts here
Expand All @@ -80,12 +68,11 @@ for outdir in outdirs:
if not os.path.isdir(outdir):
os.makedirs(outdir)

# Extract IFOs and vetoes
ifos, vetoes = ppu.extract_ifos_and_vetoes(trig_file, opts.veto_files,
opts.veto_category)
# Extract IFOs
ifos = ppu.extract_ifos(trig_file)

# Load trigger data
trig_data = load_data(trig_file, ifos, vetoes)
# Load trigger data: the sky-grid points are not time-slid in the plot
trig_data = ppu.load_data(trig_file, ifos, data_tag=None, slide_id=0)

#
# Generate sky grid plot
Expand Down
43 changes: 36 additions & 7 deletions bin/pygrb/pycbc_pygrb_results_workflow
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ def labels_in_files_metadata(labels, rundir, file_paths):
# Function to retrieve the segments plot (produced in the preprocessing stage)
# that ensures its copy is saved in the output directory before returning to
# the original working directory. Appropriate meta data is added to the plot.
def display_seg_plot(output_dir, segments_dir):
def display_seg_plot(output_dir, segment_dir):
"""Return the File of the segments plot (which was already produced during
the pre-processing) after adding the appropriate metadata to it.
"""
Expand All @@ -90,7 +90,7 @@ def display_seg_plot(output_dir, segments_dir):
'GRB' + wflow.cp.get('workflow', 'trigger-name') + '_segments.png'
)
segments_plot = _workflow.resolve_url_to_file(
os.path.join(args.segment_dir, segments_plot_name)
os.path.join(segment_dir, segments_plot_name)
)
segments_plot_path = segments_plot.storage_path
im = Image.open(segments_plot_path)
Expand Down Expand Up @@ -134,23 +134,25 @@ parser.add_argument(
"-i",
"--inj-files",
action="store",
default=None,
nargs="+",
help="Location(s) of input injection results file(s)",
)
parser.add_argument(
"-b",
"--bank-file",
action="store",
default=None,
help="The location of the full template bank file",
)
parser.add_argument(
"--segment-dir",
action="store",
default=None,
help="The location of the segment files",
)
parser.add_argument(
"--veto-file",
help="File containing segments used to veto injections",
)

_workflow.add_workflow_command_line_group(parser)
_workflow.add_workflow_settings_cli(parser, include_subdax_opts=True)
args = parser.parse_args()
Expand Down Expand Up @@ -218,6 +220,12 @@ if not set(inj_sets).issubset(eff_secs):
inj_sets = [i.upper() for i in inj_sets]
inj_files = labels_in_files_metadata(inj_sets, start_rundir, args.inj_files)

# File instance of the veto file
veto_file = args.veto_file
if veto_file:
veto_file = os.path.join(start_rundir, args.veto_file)
veto_file = _workflow.resolve_url_to_file(veto_file)

# IFOs actually used: determined by data availability
ifos = extract_ifos(offsource_file.storage_path)
wflow.ifos = ifos
Expand All @@ -226,8 +234,6 @@ plotting_nodes = []
html_nodes = []

# Convert the segments files to a FileList
seg_filenames = ['bufferSeg.txt', 'offSourceSeg.txt', 'onSourceSeg.txt']
seg_files = [os.path.join(args.segment_dir, f) for f in seg_filenames]
seg_files = _workflow.build_segment_filelist(args.segment_dir)

# Logfile of this workflow
Expand Down Expand Up @@ -320,6 +326,8 @@ for snr_type in timeseries:
trig_file=offsource_file,
inj_file=inj_file,
ifo=ifo,
seg_files=seg_files,
veto_file=veto_file,
tags=tags,
)
plotting_nodes.append(plot_node)
Expand Down Expand Up @@ -359,6 +367,8 @@ for veto in vetoes:
trig_file=offsource_file,
inj_file=inj_file,
ifo=ifo_arg,
seg_files=seg_files,
veto_file=veto_file,
tags=tags,
)
plotting_nodes.append(plot_node)
Expand Down Expand Up @@ -392,6 +402,8 @@ if wflow.cp.has_section('pygrb_plot_coh_ifosnr'):
trig_file=offsource_file,
inj_file=inj_file,
ifo=ifo,
seg_files=seg_files,
veto_file=veto_file,
tags=tags,
)
plotting_nodes.append(plot_node)
Expand Down Expand Up @@ -436,6 +448,8 @@ for nstat in nstats:
out_dir,
trig_file=offsource_file,
inj_file=inj_file,
seg_files=seg_files,
veto_file=veto_file,
tags=tags,
)
plotting_nodes.append(plot_node)
Expand Down Expand Up @@ -483,6 +497,8 @@ for inj_set in inj_sets:
trig_file=offsource_file,
ifo=ifo,
inj_file=inj_file,
seg_files=seg_files,
veto_file=veto_file,
tags=tags,
)
plotting_nodes.append(plot_node)
Expand All @@ -498,6 +514,7 @@ for inj_set in inj_sets:
offsource_file,
seg_files,
inj_file=inj_file,
veto_file=veto_file,
tags=inj_file.tags,
)
html_nodes.append(html_node)
Expand All @@ -520,6 +537,8 @@ for inj_set in inj_sets:
offsource_file,
'daxes',
out_dir,
seg_files=seg_files,
veto_file=veto_file,
tags=inj_file.tags + ['loudest_quiet_found_injs'],
)
logging.info('Leaving minifollowups')
Expand All @@ -540,6 +559,7 @@ for stat in stats:
trig_file=offsource_file,
inj_file=inj_file,
seg_files=seg_files,
veto_file=veto_file,
tags=[stat],
)
plotting_nodes.append(plot_node)
Expand Down Expand Up @@ -570,6 +590,8 @@ else:
offsource_file,
'daxes',
out_dir,
seg_files=seg_files,
veto_file=veto_file,
tags=['loudest_offsource_events'],
)
logging.info('Leaving minifollowups')
Expand Down Expand Up @@ -603,6 +625,7 @@ for inj_set in inj_sets:
inj_file=inj_file,
bank_file=bank_file,
seg_files=seg_files,
veto_file=veto_file,
tags=tags,
plot_bkgd=True,
)
Expand Down Expand Up @@ -631,6 +654,7 @@ for i, offtrial in enumerate(offtrials):
inj_file=inj_file,
bank_file=bank_file,
seg_files=seg_files,
veto_file=veto_file,
tags=tags,
plot_bkgd=False,
)
Expand Down Expand Up @@ -741,6 +765,8 @@ for snr_type in ['reweighted', 'coherent']:
'pygrb_plot_snr_timeseries',
out_dir,
trig_file=all_times_file,
seg_files=seg_files,
veto_file=veto_file,
tags=[snr_type, 'alltimes'],
)
plotting_nodes.append(plot_node)
Expand All @@ -763,6 +789,8 @@ else:
onsource_file,
'daxes',
out_dir,
seg_files=seg_files,
veto_file=veto_file,
tags=['loudest_onsource_event'],
)
logging.info("Leaving onsource minifollowups")
Expand All @@ -785,6 +813,7 @@ for inj_set in inj_sets:
inj_file=inj_file,
bank_file=bank_file,
seg_files=seg_files,
veto_file=veto_file,
tags=tags,
plot_bkgd=False,
)
Expand Down
45 changes: 37 additions & 8 deletions pycbc/results/pygrb_postprocessing_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -285,15 +285,31 @@ def _dataset_iterator(g, prefix=''):


# =============================================================================
# Functions to load triggers
# Function to load trigger/injection data
# =============================================================================
def load_triggers(input_file, ifos, vetoes, rw_snr_threshold=None,
slide_id=None):
"""Loads triggers from PyGRB output file, returning a dictionary"""
def load_data(input_file, ifos, rw_snr_threshold=None, data_tag=None,
slide_id=None):
"""Load data from a trigger/injection PyGRB output file, returning a
dictionary. If the input_file is None, None is returned. data_tag enables
logging information about the number of triggers/injections found, so the
user should not set it to 'trigs'/'injs' when processing the onsource."""

if not input_file:
return None

trigs = HFile(input_file, 'r')
rw_snr = trigs['network/reweighted_snr'][:]
net_ids = trigs['network/event_id'][:]

# Output the number of items loaded only upon a request by the user who is
# expected not to set data_tag to 'trigs'or 'injs' when processing the
# onsource
if data_tag=='trigs':
logging.info("%d triggers loaded.", len(rw_snr))
elif data_tag=='injs':
logging.info("%d injections loaded.", len(rw_snr))
else:
logging.info("Loading triggers.")
ifo_ids = {}
for ifo in ifos:
ifo_ids[ifo] = trigs[ifo+'/event_id'][:]
Expand All @@ -307,6 +323,16 @@ def load_triggers(input_file, ifos, vetoes, rw_snr_threshold=None,
above_thresh = rw_snr > 0
num_orig_pts = len(above_thresh)

# Output the number of items surviging vetoes with the same logic as above
msg = ""
if data_tag=='trigs':
msg += f"{sum(above_thresh)} triggers "
elif data_tag=='injs':
msg = f"{sum(above_thresh)} injections "
if msg:
msg += f"surviving reweighted SNR cut at {rw_snr_threshold}."
logging.info(msg)

# Do not assume that IFO and network datasets are sorted the same way:
# find where each surviving network/event_id is placed in the IFO/event_id
ifo_ids_above_thresh_locations = {}
Expand All @@ -315,10 +341,10 @@ def load_triggers(input_file, ifos, vetoes, rw_snr_threshold=None,
numpy.array([numpy.where(ifo_ids[ifo] == net_id)[0][0]
for net_id in net_ids[above_thresh]])

# Apply the cut on all the data by remove points with reweighted SNR = 0
# Apply the cut on all the data by removing points with reweighted SNR = 0
trigs_dict = {}
with HFile(input_file, "r") as trigs:
for (path, dset) in dataset_iterator(trigs):
for (path, dset) in _dataset_iterator(trigs):
# The dataset contains information other than trig/inj properties:
# just copy it
if len(dset) != num_orig_pts:
Expand All @@ -336,12 +362,15 @@ def load_triggers(input_file, ifos, vetoes, rw_snr_threshold=None,
trigs_dict[path] = dset[above_thresh]

if trigs_dict[path].size == trigs['network/slide_id'][:].size:
trigs_dict[path] = slide_filter(trigs, trigs_dict[path],
slide_id=slide_id)
trigs_dict[path] = _slide_filter(trigs, trigs_dict[path],
slide_id=slide_id)

return trigs_dict





# =============================================================================
# Detector utils:
# * Function to calculate the antenna response F+^2 + Fx^2
Expand Down
4 changes: 1 addition & 3 deletions pycbc/workflow/jobsetup.py
Original file line number Diff line number Diff line change
Expand Up @@ -276,15 +276,13 @@ def multi_ifo_coherent_job_setup(workflow, out_files, curr_exe_job,
data_seg, job_valid_seg = curr_exe_job.get_valid_times()
curr_out_files = FileList([])
ipn_sky_points = None
veto_file = None
bank_veto = None
input_files = FileList(datafind_outs)
for f in datafind_outs:
if 'IPN_SKY_POINTS' in f.description:
ipn_sky_points = f
input_files.remove(f)
elif 'vetoes' in f.description:
veto_file = f
input_files.remove(f)
elif 'INPUT_BANK_VETO_BANK' in f.description:
bank_veto = f
Expand All @@ -311,7 +309,7 @@ def multi_ifo_coherent_job_setup(workflow, out_files, curr_exe_job,
tag.append(split_bank.tag_str)
node = curr_exe_job.create_node(data_seg, job_valid_seg,
parent=split_bank, inj_file=inj_file, tags=tag,
dfParents=frame_files, bankVetoBank=bank_veto,
dfParents=input_files, bankVetoBank=bank_veto,
ipn_file=ipn_sky_points)
workflow.add_node(node)
split_bank_counter += 1
Expand Down

0 comments on commit ca8e4ac

Please sign in to comment.