Skip to content

Commit

Permalink
local debug
Browse files Browse the repository at this point in the history
  • Loading branch information
zzeppozz committed Jul 20, 2023
1 parent 699e85f commit 4f2fc7b
Show file tree
Hide file tree
Showing 8 changed files with 15,499 additions and 238 deletions.
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ repos:
additional_dependencies: [flake8-bugbear]

- repo: https://github.com/PyCQA/isort
rev: 5.10.1
rev: 5.12.0
hooks:
- id: isort
args: [--sp=.github/linters/.isort.cfg, --diff]
Expand Down
34 changes: 11 additions & 23 deletions bison/common/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -660,7 +660,7 @@ class BisonNameOp():
separator = "_"

@staticmethod
def get_annotated_riis_filename(input_riis_filename, outpath):
def get_annotated_riis_filename(input_riis_filename, outpath=None):
"""Construct a filename for a chunk of CSV records.
Args:
Expand All @@ -671,7 +671,10 @@ def get_annotated_riis_filename(input_riis_filename, outpath):
Returns:
out_filename: full filename for the output file.
"""
basename, ext = os.path.splitext(os.path.split(input_riis_filename)[1])
inpath, fname = os.path.split(input_riis_filename)
if outpath is None:
outpath = inpath
basename, _ = os.path.splitext(fname)
out_filename = os.path.join(outpath, f"{basename}_annotated.csv")
return out_filename

Expand Down Expand Up @@ -830,9 +833,9 @@ def get_process_report_filename(in_filename, output_path=None, step_or_process=N
"""
base_filename = BisonNameOp._get_process_base_filename(
in_filename, step_or_process=step_or_process)
log_fname = os.path.join(output_path, f"{base_filename}.log")
rpt_fname = os.path.join(output_path, f"{base_filename}.rpt")

return log_fname
return rpt_fname

# .............................................................................
@staticmethod
Expand Down Expand Up @@ -881,27 +884,11 @@ def parse_process_filename(filename):

# ...............................................
@classmethod
def get_raw_summary_name(cls, csvfile):
"""Construct a filename for the summarized version of annotated csvfile.
Args:
csvfile (str): full filename used to construct an annotated filename for
this data.
Returns:
outfname: output filename derived from the annotated GBIF DWC filename
"""
basename, ext = os.path.splitext(csvfile)
outfname = f"{basename}_summary{ext}"
return outfname

# ...............................................
@classmethod
def get_combined_summary_name(cls, csvfile, outpath=None):
def get_combined_summary_name(cls, summary_filename_list, outpath=None):
"""Construct a filename for the summarized version of annotated csvfile.
Args:
csvfile (str): full filename of one subset summary file (of one or more) for
summary_filename_list (list): full filename(s) of subset summary files for
this data.
outpath (str): full directory path for output filename.
Expand All @@ -910,7 +897,8 @@ def get_combined_summary_name(cls, csvfile, outpath=None):
"""
sep = BisonNameOp.separator
path, basename, ext, chunk, postfix = BisonNameOp.parse_process_filename(
csvfile)
summary_filename_list[0]
)
postfix = LMBISON_PROCESS.COMBINE["postfix"]
outbasename = f"{basename}{sep}{postfix}{ext}"
# If outpath is not provided, use the same path as the input file.
Expand Down
110 changes: 53 additions & 57 deletions bison/process/aggregate.py
Original file line number Diff line number Diff line change
Expand Up @@ -675,16 +675,16 @@ def _read_location_summaries(self, summary_filename_list):
for sum_fname in summary_filename_list:
self._read_location_summary(sum_fname)

report = self._report_summary()
return report
summary_report = self._report_summary()
return summary_report

# ...............................................
def _report_summary(self):
report = {REPORT.REGION: {}}
summary = {}
# Report count of locations for each region type
for region_type, loc_dict in self._locations.items():
# Count unique locations for each region type (county, state, PAD, AIANNH)
report[REPORT.REGION][region_type] = {
summary[region_type] = {
REPORT.LOCATION: len(loc_dict),
REPORT.SPECIES: {},
REPORT.OCCURRENCE: {}
Expand All @@ -695,46 +695,41 @@ def _report_summary(self):
for occ_count in sp_count.values():
occ_total += occ_count
# Record number of unique species
report[REPORT.REGION][region_type][loc] = {
summary[region_type][loc] = {
REPORT.SPECIES: len(sp_count),
REPORT.OCCURRENCE: occ_total
}
report[REPORT.SPECIES] = len(self._canonicals)
report[REPORT.PROCESS] = LMBISON_PROCESS.SUMMARIZE["postfix"]
return report
summary[REPORT.SPECIES] = len(self._acc_species_name)
return summary

# ...............................................
def summarize_summaries(self, summary_filename_list, output_path):
def summarize_summaries(self, summary_filename_list, full_summary_filename):
"""Read summary files and combine summaries in self._locations.
Args:
summary_filename_list: list of full filenames of summary files.
output_path (str): destination directory for combined summary file.
full_summary_filename (str): Full filename for combined output summary file.
Returns:
report: dictionary of metadata about the data and process
"""
full_summary_filename = BisonNameOp.get_process_outfilename(
summary_filename_list, outpath=output_path,
step_or_process=LMBISON_PROCESS.AGGREGATE)
self._initialize_combine_summaries_io(full_summary_filename)
summary_report = self._read_location_summaries(summary_filename_list)
# Save to single file
self._write_raw_region_summary()
report = {
REPORT.PROCESS: LMBISON_PROCESS.SUMMARIZE["postfix"],
REPORT.INFILE: summary_filename_list,
REPORT.OUTFILE: full_summary_filename
REPORT.OUTFILE: full_summary_filename,
REPORT.SUMMARY: summary_report
}
self._initialize_combine_summaries_io(full_summary_filename)
report[REPORT.SUMMARY] = self._read_location_summaries(summary_filename_list)
report[REPORT.PROCESS] = LMBISON_PROCESS.SUMMARIZE["postfix"]

# Save to single file
self._write_raw_region_summary()

self.close()
return report

# ...............................................
def summarize_annotated_recs_by_location(
self, annotated_filename, output_path, overwrite=True):
self, annotated_filename, process_path, overwrite=True):
"""Read an annotated file, summarize by species and location, write to csvfile.
Args:
Expand All @@ -752,15 +747,16 @@ def summarize_annotated_recs_by_location(
SPECIES_KEY, GBIF_TAXON_KEY, ASSESS_KEY, STATE_KEY, COUNTY_KEY, COUNT_KEY
"""
summary_filename = BisonNameOp.get_process_outfilename(
annotated_filename, outpath=output_path,
annotated_filename, outpath=process_path,
step_or_process=LMBISON_PROCESS.SUMMARIZE)

self._initialize_summary_io(
annotated_filename, summary_filename, overwrite=overwrite)
# Summarize and write
file_report = self._summarize_annotations_by_region()
report = {
REPORT.INFILE: summary_filename,
REPORT.INFILE: annotated_filename,
REPORT.OUTFILE: summary_filename,
REPORT.SUMMARY: file_report
}
self._write_raw_region_summary()
Expand Down Expand Up @@ -1010,40 +1006,40 @@ def summarize_annotations(annotated_filename, output_path, log_path):
return summary_filename


# .............................................................................
def summarize_summaries(annotated_filename, output_path, log_path):
"""Summarize data in one annotated GBIF DwC file by state, county, and RIIS.
Args:
annotated_filename (str): full filename of an annotated GBIF data file.
output_path (str): destination directory for output files.
log_path (str): destination directory for logfile
Returns:
summary_filename (str): full filename of a summary file
Raises:
FileNotFoundError: on missing input file
"""
if not os.path.exists(annotated_filename):
raise FileNotFoundError(annotated_filename)

datapath, basefname = os.path.split(annotated_filename)
refname = f"summarize_{basefname}"
logger = Logger(refname, log_filename=os.path.join(log_path, f"{refname}.log"))
logger.log(f"Submit {basefname} for summarizing.", refname=refname)

logger.log(f"Start Time : {datetime.now()}", refname=refname)
agg = Aggregator(logger)
summary_filename = BisonNameOp.get_process_outfilename(
annotated_filename, outpath=output_path, step_or_process=LMBISON_PROCESS.AGGREGATE)

# Do not overwrite existing summary
agg.summarize_annotated_recs_by_location(
annotated_filename, summary_filename, overwrite=False)

logger.log(f"End Time : {datetime.now()}", refname=refname)
return summary_filename
# # .............................................................................
# def summarize_summaries(annotated_filename, output_path, log_path):
# """Summarize data in one annotated GBIF DwC file by state, county, and RIIS.
#
# Args:
# annotated_filename (str): full filename of an annotated GBIF data file.
# output_path (str): destination directory for output files.
# log_path (str): destination directory for logfile
#
# Returns:
# summary_filename (str): full filename of a summary file
#
# Raises:
# FileNotFoundError: on missing input file
# """
# if not os.path.exists(annotated_filename):
# raise FileNotFoundError(annotated_filename)
#
# datapath, basefname = os.path.split(annotated_filename)
# refname = f"summarize_{basefname}"
# logger = Logger(refname, log_filename=os.path.join(log_path, f"{refname}.log"))
# logger.log(f"Submit {basefname} for summarizing.", refname=refname)
#
# logger.log(f"Start Time : {datetime.now()}", refname=refname)
# agg = Aggregator(logger)
# summary_filename = BisonNameOp.get_process_outfilename(
# annotated_filename, outpath=output_path, step_or_process=LMBISON_PROCESS.AGGREGATE)
#
# # Do not overwrite existing summary
# agg.summarize_annotated_recs_by_location(
# annotated_filename, summary_filename, overwrite=False)
#
# logger.log(f"End Time : {datetime.now()}", refname=refname)
# return summary_filename


# .............................................................................
Expand Down
94 changes: 50 additions & 44 deletions bison/process/annotate.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,12 +264,14 @@ def _annotate_one_record(self, dwcrec):
return dwcrec

# ...............................................
def annotate_dwca_records(self, dwc_filename, output_path):
def annotate_dwca_records(self, dwc_filename, output_path, overwrite=False):
"""Resolve and append state, county, RIIS assessment and key to GBIF records.
Args:
dwc_filename: full filename of input file of DWC records.
output_path (str): destination directory for annotated occurrence file.
overwrite (bool): Flag indicating whether to overwrite existing annotated
file.
Returns:
report: dictionary of metadata about the data and process
Expand All @@ -285,52 +287,54 @@ def annotate_dwca_records(self, dwc_filename, output_path):

out_filename = BisonNameOp.get_process_outfilename(
dwc_filename, outpath=output_path, step_or_process=LMBISON_PROCESS.ANNOTATE)
self.initialize_occurrences_io(dwc_filename, out_filename)
self._log.log(
f"Annotate {dwc_filename} to {out_filename}", refname=self.__class__.__name__)

report = {
REPORT.INFILE: dwc_filename,
REPORT.OUTFILE: out_filename,
REPORT.ANNOTATE_FAIL: []
REPORT.OUTFILE: out_filename
}
try:
# iterate over DwC records
dwcrec = self._dwcdata.get_record()
while dwcrec is not None:
# Annotate
dwcrec_ann = self._annotate_one_record(dwcrec)
# Write
try:
self._csv_writer.writerow(dwcrec_ann)
except Exception as e:
self._log.log(
f"Error {e} record, gbifID {dwcrec[GBIF.ID_FLD]}",
refname=self.__class__.__name__, log_level=ERROR)
report[REPORT.ANNOTATE_FAIL].append(dwcrec[GBIF.ID_FLD])

if (self._dwcdata.recno % LOG.INTERVAL) == 0:
self._log.log(
f"*** Record number {self._dwcdata.recno}, gbifID: "
f"{dwcrec[GBIF.ID_FLD]} ***", refname=self.__class__.__name__)

# Get next
if not os.path.exists(out_filename) or overwrite is True:
self.initialize_occurrences_io(dwc_filename, out_filename)
self._log.log(
f"Annotate {dwc_filename} to {out_filename}",
refname=self.__class__.__name__)
summary = {REPORT.ANNOTATE_FAIL: []}
try:
# iterate over DwC records
dwcrec = self._dwcdata.get_record()
except Exception as e:
raise Exception(
f"Unexpected error {e} reading {self._dwcdata.input_file} or "
+ f"writing {out_filename}")

report[REPORT.RANK_FAIL] = list(self.bad_ranks)
report[REPORT.RANK_FAIL_COUNT] = self.rank_filtered_records
end = time.perf_counter()
while dwcrec is not None:
# Annotate
dwcrec_ann = self._annotate_one_record(dwcrec)
# Write
try:
self._csv_writer.writerow(dwcrec_ann)
except Exception as e:
self._log.log(
f"Error {e} record, gbifID {dwcrec[GBIF.ID_FLD]}",
refname=self.__class__.__name__, log_level=ERROR)
summary[REPORT.ANNOTATE_FAIL].append(dwcrec[GBIF.ID_FLD])

if (self._dwcdata.recno % LOG.INTERVAL) == 0:
self._log.log(
f"*** Record number {self._dwcdata.recno}, gbifID: "
f"{dwcrec[GBIF.ID_FLD]} ***", refname=self.__class__.__name__)

# Get next
dwcrec = self._dwcdata.get_record()
except Exception as e:
raise Exception(
f"Unexpected error {e} reading {self._dwcdata.input_file} or "
+ f"writing {out_filename}")
else:
end = time.perf_counter()
summary[REPORT.RANK_FAIL] = list(self.bad_ranks)
summary[REPORT.RANK_FAIL_COUNT] = self.rank_filtered_records
report[REPORT.SUMMARY] = summary

self._log.log(
f"End Annotator.annotate_dwca_records: {time.asctime()}, {end - start} "
f"seconds elapsed", refname=self.__class__.__name__)
self._log.log(
f"Filtered out {self.rank_filtered_records} records of {self.bad_ranks}",
refname=self.__class__.__name__)
self._log.log(
f"End Annotator.annotate_dwca_records: {time.asctime()}, {end - start} "
f"seconds elapsed", refname=self.__class__.__name__)
self._log.log(
f"Filtered out {self.rank_filtered_records} records of {self.bad_ranks}",
refname=self.__class__.__name__)

return report

Expand Down Expand Up @@ -383,7 +387,8 @@ def annotate_occurrence_file(

# .............................................................................
def parallel_annotate(
dwc_filenames, riis_with_gbif_filename, geo_path, output_path, main_logger):
dwc_filenames, riis_with_gbif_filename, geo_path, output_path, main_logger,
overwrite):
"""Main method for parallel execution of DwC annotation script.
Args:
Expand All @@ -396,6 +401,7 @@ def parallel_annotate(
output_path (str): destination directory for output annotated occurrence files.
main_logger (logger): logger for the process that calls this function,
initiating subprocesses
overwrite (bool): Flag indicating whether to overwrite existing annotated file.
Returns:
reports (list of dict): metadata for the occurrence annotation data and process.
Expand All @@ -414,7 +420,7 @@ def parallel_annotate(
for dwc_fname in dwc_filenames:
out_fname = BisonNameOp.get_process_outfilename(
dwc_fname, outpath=output_path, step_or_process=LMBISON_PROCESS.ANNOTATE)
if os.path.exists(out_fname):
if os.path.exists(out_fname) and overwrite is False:
msg = f"Annotations exist in {out_fname}."
main_logger.log(msg, refname=refname)
messages.append(msg)
Expand Down
Loading

0 comments on commit 4f2fc7b

Please sign in to comment.