Skip to content

Commit

Permalink
still testing
Browse files Browse the repository at this point in the history
  • Loading branch information
zzeppozz committed Mar 31, 2023
1 parent 8d9b020 commit 699e85f
Show file tree
Hide file tree
Showing 4 changed files with 180 additions and 36 deletions.
2 changes: 2 additions & 0 deletions bison/common/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ class REPORT:
RECORDS_OUTPUT = "records_output"
INFILE = "input_filename"
OUTFILE = "output_filename"
LOGFILE = "log_filename"
REPORTFILE = "report_filename"
SUMMARY = "summary"
REGION = "region"
LOCATION = "locations"
Expand Down
68 changes: 60 additions & 8 deletions bison/common/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -745,20 +745,19 @@ def get_process_outfilename(in_filename, outpath=None, step_or_process=None):
outfname = os.path.join(outpath, outbasename)
return outfname


# .............................................................................
@staticmethod
def get_process_logfilename(in_filename, log_path=None, step_or_process=None):
"""Construct output filename for the next processing step of the given file.
def _get_process_base_filename(in_filename, step_or_process=None):
"""Construct output base filename for this processing step of the given file.
Args:
in_filename (str): base or full filename of CSV data.
log_path (str): Destination directory for log files.
step_or_process (int or lmbison.common.constants.DWC_PROCESS):
stage of processing completed on the output file.
Returns:
logname: name for logger
log_filename: full filename for logging output
base_filename: base filename without path or extension.
Note:
The input filename is parsed for process step.
Expand All @@ -777,10 +776,63 @@ def get_process_logfilename(in_filename, log_path=None, step_or_process=None):
basename = f"{basename}{sep}{chunk}"

pname = step_or_process["postfix"]
logname = f"{basename}{sep}{pname}"
log_fname = os.path.join(log_path, f"{logname}.log")
base_filename = f"{basename}{sep}{pname}"

return base_filename

# .............................................................................
@staticmethod
def get_process_logfilename(in_filename, log_path=None, step_or_process=None):
"""Construct output filename for the next processing step of the given file.
Args:
in_filename (str): base or full filename of CSV data.
log_path (str): Destination directory for log files.
step_or_process (int or lmbison.common.constants.DWC_PROCESS):
stage of processing completed on the output file.
Returns:
logname: name for logger
log_filename: full filename for logging output
Note:
The input filename is parsed for process step.
File will always start with basename, followed by chunk,
followed by process step completed (if any)
"""
base_filename = BisonNameOp._get_process_base_filename(
in_filename, step_or_process=step_or_process)
log_fname = os.path.join(log_path, f"{base_filename}.log")

return base_filename, log_fname

# .............................................................................
@staticmethod
def get_process_report_filename(in_filename, output_path=None, step_or_process=None):
"""Construct output filename for the next processing step of the given file.
Args:
in_filename (str): base or full filename of CSV data.
output_path (str): Destination directory for report files.
step_or_process (int or lmbison.common.constants.DWC_PROCESS):
stage of processing completed on the output file.
Returns:
logname: name for logger
log_filename: full filename for logging output
Note:
The input filename is parsed for process step.
File will always start with basename, followed by chunk,
followed by process step completed (if any)
"""
base_filename = BisonNameOp._get_process_base_filename(
in_filename, step_or_process=step_or_process)
log_fname = os.path.join(output_path, f"{base_filename}.log")

return logname, log_fname
return log_fname

# .............................................................................
@staticmethod
Expand Down
76 changes: 68 additions & 8 deletions bison/process/annotate.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Common classes for adding USGS RIIS info to GBIF occurrences."""
from concurrent.futures import ProcessPoolExecutor
import logging
from multiprocessing import Pool
import os
Expand Down Expand Up @@ -335,14 +336,15 @@ def annotate_dwca_records(self, dwc_filename, output_path):


# .............................................................................
def annotate_occurrence_file(dwc_filename, geo_path, riis, output_path, log_path):
def annotate_occurrence_file(
dwc_filename, riis_with_gbif_filename, geo_path, output_path, log_path):
"""Annotate GBIF records with census state and county, and RIIS key and assessment.
Args:
dwc_filename (str): full filename containing GBIF data for annotation.
riis_with_gbif_filename (str): full filename with RIIS records annotated with
gbif accepted taxa
geo_path (str): Base directory containing geospatial region files
riis (bison.provider.riis_data.RIIS): object containing RIIS records annotated
with gbif accepted taxa
output_path (str): destination directory for output annotated occurrence files.
log_path (object): destination directory for logging processing messages.
Expand All @@ -355,13 +357,27 @@ def annotate_occurrence_file(dwc_filename, geo_path, riis, output_path, log_path
if not os.path.exists(dwc_filename):
raise FileNotFoundError(dwc_filename)

rpt_filename = BisonNameOp.get_process_report_filename(
dwc_filename, output_path=output_path, step_or_process=LMBISON_PROCESS.ANNOTATE)
logname, log_fname = BisonNameOp.get_process_logfilename(
dwc_filename, log_path=log_path, step_or_process=LMBISON_PROCESS.ANNOTATE)
logger = Logger(logname, log_filename=log_fname, log_console=False)

ant = Annotator(logger, geo_path, riis=riis)
ant = Annotator(logger, geo_path, riis_with_gbif_filename=riis_with_gbif_filename)
report = ant.annotate_dwca_records(dwc_filename, output_path)

# Write individual output report
import json
try:
with open(rpt_filename, mode='wt') as out_file:
json.dump(report, out_file, indent=4)
except OSError:
raise
except IOError:
raise
logger.log(
f"Wrote report file to {rpt_filename}", refname="annotate_occurrence_file")

return report


Expand Down Expand Up @@ -391,9 +407,52 @@ def parallel_annotate(
output_path = os.path.dirname(dwc_filenames[0])
log_path = main_logger.log_directory

# Use the same resolvers and RIIS for all Annotators
riis = RIIS(riis_with_gbif_filename, main_logger)
riis.read_riis()
main_logger.log(
f"Parallel Annotation Start Time : {time.asctime()}", refname=refname)

with ProcessPoolExecutor() as executor:
for dwc_fname in dwc_filenames:
out_fname = BisonNameOp.get_process_outfilename(
dwc_fname, outpath=output_path, step_or_process=LMBISON_PROCESS.ANNOTATE)
if os.path.exists(out_fname):
msg = f"Annotations exist in {out_fname}."
main_logger.log(msg, refname=refname)
messages.append(msg)
else:
executor.submit(
annotate_occurrence_file, dwc_fname, riis_with_gbif_filename,
geo_path, output_path, log_path)

main_logger.log(
f"Parallel Annotation End Time : {time.asctime()}", refname=refname)



# .............................................................................
def parallel_annotate_new(
dwc_filenames, riis_with_gbif_filename, geo_path, output_path, main_logger):
"""Main method for parallel execution of DwC annotation script.
Args:
dwc_filenames (list): list of full filenames containing GBIF data for
annotation.
riis_with_gbif_filename (str): full filename with RIIS records annotated with
gbif accepted taxa
geo_path (str): input directory containing geospatial files for
geo-referencing occurrence points.
output_path (str): destination directory for output annotated occurrence files.
main_logger (logger): logger for the process that calls this function,
initiating subprocesses
Returns:
reports (list of dict): metadata for the occurrence annotation data and process.
"""
refname = "parallel_annotate"
messages = []
inputs = []
if output_path is None:
output_path = os.path.dirname(dwc_filenames[0])
log_path = main_logger.log_directory

# Process only needed files
for dwc_fname in dwc_filenames:
Expand All @@ -404,7 +463,8 @@ def parallel_annotate(
main_logger.log(msg, refname=refname)
messages.append(msg)
else:
inputs.append((dwc_fname, geo_path, riis, output_path, log_path))
inputs.append(
(dwc_fname, riis_with_gbif_filename, geo_path, output_path, log_path))

main_logger.log(
f"Parallel Annotation Start Time : {time.asctime()}", refname=refname)
Expand Down
70 changes: 50 additions & 20 deletions process_gbif.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
REGION, REPORT, RIIS_DATA)
from bison.common.util import BisonNameOp, Chunker, delete_file, get_csv_dict_reader
from bison.process.aggregate import Aggregator
from bison.process.annotate import (annotate_occurrence_file, parallel_annotate)
from bison.process.annotate import (Annotator, annotate_occurrence_file, parallel_annotate)
from bison.process.geoindex import (GeoResolver, GeoException)
from bison.process.sanity_check import Counter
from bison.provider.riis_data import RIIS
Expand Down Expand Up @@ -110,17 +110,22 @@ def a_resolve_riis_taxa(riis_filename, output_path, logger, overwrite=False):
# Update species data
try:
report = nnsl.resolve_riis_to_gbif_taxa(output_path, overwrite=overwrite)
except Exception as e:
logger.log(
f"Unexpected failure {e} in resolve_riis_taxa", refname=script_name,
log_level=ERROR)
else:
logger.log(
f"Found {report[REPORT.TAXA_RESOLVED]} names, "
f"{report[REPORT.RECORDS_UPDATED]} updated, "
f"{report[REPORT.RECORDS_OUTPUT]} written "
f"of total {report[REPORT.RIIS_IDENTIFIER]} from {riis_filename} "
f"to {report[REPORT.OUTFILE]}.", refname=script_name)
except Exception as e:
logger.log(
f"Unexpected failure {e} in resolve_riis_taxa", refname=script_name,
log_level=ERROR)
else:
report_filename = BisonNameOp.get_process_report_filename(
config["riis_filename"], output_path=config["output_path"],
step_or_process=LMBISON_PROCESS.RESOLVE)
report[REPORT.REPORTFILE] = report_filename

if report[REPORT.RECORDS_OUTPUT] != RIIS_DATA.SPECIES_GEO_DATA_COUNT:
logger.log(
f"Wrote {report[REPORT.RECORDS_OUTPUT]} RIIS records, expecting "
Expand Down Expand Up @@ -166,8 +171,10 @@ def b_annotate_occurrence_files(

# Add locality-intersections and RIIS determinations to GBIF DwC records
start = time.perf_counter()
rpt = annotate_occurrence_file(
occ_fname, geo_path, riis, output_path, log_path)
ant = Annotator(logger, geo_path, riis=riis)
rpt = ant.annotate_dwca_records(occ_fname, output_path)
# rpt = annotate_occurrence_file(
# occ_fname, geo_path, riis, output_path, log_path)
end = time.perf_counter()

reports["reports"].append(rpt)
Expand Down Expand Up @@ -215,6 +222,11 @@ def c_summarize_annotated_files(annotated_filenames, output_path, logger):
else:
report = rpt

report_filename = BisonNameOp.get_process_report_filename(
report[REPORT.OUTFILE], output_path=output_path,
step_or_process=LMBISON_PROCESS.SUMMARIZE)
report[REPORT.REPORTFILE] = report_filename

return report


Expand All @@ -241,6 +253,10 @@ def d_aggregate_summary_by_region(
agg = Aggregator(logger)
report = agg.aggregate_file_summary_for_regions(
summary_filename, resolved_riis_filename, output_path)
report_filename = BisonNameOp.get_process_report_filename(
summary_filename, output_path=output_path,
step_or_process=LMBISON_PROCESS.SUMMARIZE)
report[REPORT.REPORTFILE] = report_filename

return report

Expand Down Expand Up @@ -507,6 +523,8 @@ def execute_command(config, logger):
Raises:
FileNotFoundError: on missing input file.
"""
report = {}
step_or_process = None
(riis_annotated_filename, log_path,
raw_filenames, annotated_filenames, summary_filenames,
full_summary_filename) = _prepare_args(config)
Expand All @@ -518,26 +536,31 @@ def execute_command(config, logger):
log_list(logger, "Input filenames:", raw_filenames)

if config["command"] == "resolve":
step_or_process = LMBISON_PROCESS.RESOLVE
report = a_resolve_riis_taxa(
config["riis_filename"], riis_annotated_filename, logger, overwrite=False)
logger.log(f"Resolved RIIS filename: {report[REPORT.OUTFILE]}")

elif config["command"] == "annotate":
step_or_process = LMBISON_PROCESS.ANNOTATE
# Annotate DwC records with regions, and if found, RIIS determination
report = b_annotate_occurrence_files(
raw_filenames, riis_annotated_filename, config["geo_path"],
config["process_path"], logger, log_path, run_parallel=True)
log_list(
logger, "Newly annotated filenames:",
[rpt[REPORT.OUTFILE] for rpt in report["reports"]])

# log_list(
# logger, "Newly annotated filenames:",
# [rpt[REPORT.OUTFILE] for rpt in report["reports"]])

elif config["command"] == "summarize":
step_or_process = LMBISON_PROCESS.SUMMARIZE
# Summarize each annotated file by region, write summary to a file
report = c_summarize_annotated_files(
annotated_filenames, config["output_path"], logger)
logger.log("Summary of annotations", report[REPORT.OUTFILE])

elif config["command"] == "aggregate":
step_or_process = LMBISON_PROCESS.AGGREGATE
# Write summaries for each region to its own file
report = d_aggregate_summary_by_region(
full_summary_filename, config["output_path"], log_path, logger)
Expand Down Expand Up @@ -570,6 +593,12 @@ def execute_command(config, logger):
f"Unsupported command {config['command']}", refname=script_name,
log_level=ERROR)

if step_or_process is not None and report is not None:
report_filename = BisonNameOp.get_process_report_filename(
config["gbif_filename"], output_path=config["output_path"],
step_or_process=step_or_process)
report[REPORT.REPORTFILE] = report_filename

return report


Expand All @@ -587,14 +616,15 @@ def execute_command(config, logger):
report = execute_command(config, logger)

# Write output report
try:
with open(config["report_filename"], mode='wt') as out_file:
json.dump(report, out_file, indent=4)
except OSError:
raise
except IOError:
raise
logger.log(
f"Wrote report file to {config['report_filename']}", refname=script_name)
if report:
try:
with open(report[REPORT.REPORTFILE], mode='wt') as out_file:
json.dump(report, out_file, indent=4)
except OSError:
raise
except IOError:
raise
logger.log(
f"Wrote report file to {report[REPORT.REPORTFILE]}", refname=script_name)

logger.log(f"main end time: {time.asctime()}", refname=script_name)

0 comments on commit 699e85f

Please sign in to comment.