Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion data/nws/preprocess_ahps_nws.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ def preprocess_nws(source_dir, destination, reference_raster):
# In multiple instances a USGS ID is given but then no USGS rating curve or in some cases no USGS datum is supplied.
select_by = 'nws_lid'
selector = code
stages, flows, threshold_count = get_thresholds(threshold_url, select_by, selector, threshold='all')
stages, flows, ___ = get_thresholds(threshold_url, select_by, selector, threshold='all')

# Make sure at least one valid threshold is supplied from WRDS.
threshold_categories = ['action', 'minor', 'moderate', 'major']
Expand Down
2 changes: 1 addition & 1 deletion data/usgs/preprocess_ahps_usgs.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ def preprocess_usgs(source_dir, destination, reference_raster):

select_by = 'nws_lid'
selector = code
stages, flows, threshold_count = get_thresholds(threshold_url, select_by, selector, threshold='all')
stages, flows, ___ = get_thresholds(threshold_url, select_by, selector, threshold='all')

# Make sure at least one valid threshold is supplied from WRDS.
threshold_categories = ['action', 'minor', 'moderate', 'major']
Expand Down
168 changes: 136 additions & 32 deletions tools/catfim/generate_categorical_fim.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import argparse
import glob
import pickle
import math
import os
import shutil
Expand All @@ -15,7 +16,10 @@
import numpy as np
import pandas as pd
from dotenv import load_dotenv
from generate_categorical_fim_flows import generate_flows
from generate_categorical_fim_flows import (
generate_flows,
__load_thresholds,
)
from generate_categorical_fim_mapping import (
manage_catfim_mapping,
post_process_cat_fim_for_viz,
Expand Down Expand Up @@ -88,6 +92,7 @@ def process_generate_categorical_fim(
past_major_interval_cap,
step_num,
nwm_metafile,
threshold_file,
):

# ================================
Expand Down Expand Up @@ -127,6 +132,7 @@ def process_generate_categorical_fim(
output_flows_dir = os.path.join(output_catfim_dir, 'flows')
output_mapping_dir = os.path.join(output_catfim_dir, 'mapping')
attributes_dir = os.path.join(output_catfim_dir, 'attributes')
output_thresholds_dir = os.path.join(output_catfim_dir, 'thresholds')

# ================================
set_start_files_folders(
Expand All @@ -146,6 +152,7 @@ def process_generate_categorical_fim(
if file_ext[1].lower() != ".pkl":
raise Exception("The nwm_metadata (-me) file appears to be invalid. The extention is not pkl.")


# ================================
# Define default arguments. Modify these if necessary

Expand Down Expand Up @@ -202,6 +209,50 @@ def process_generate_categorical_fim(
' Verify that you have the correct input folder and if you used the -lh flag that it'
' is a valid matching HUC.'
)


# ================================
if threshold_file != "":
if os.path.exists(threshold_file) == False:
raise Exception("The threshold input file can not be found. Please remove or fix pathing.")
file_ext = os.path.splitext(threshold_file)
if file_ext.count == 0:
raise Exception("The threshold input file appears to be invalid. It is missing an extension.")
if file_ext[1].lower() != ".pkl":
raise Exception("The threshold input file appears to be invalid. The extention is not pkl.")

# Read pickle file and get a list of unique HUCs
with open(threshold_file, 'rb') as f:
loaded_data = pickle.load(f)

hucs = loaded_data['huc'].unique().tolist()
threshold_hucs= [str(num).zfill(8) for num in hucs]

# If a HUC list is specified, check that the HUCs in the list are also in the threshold file
if 'all' not in lst_hucs:
missing_hucs = [huc for huc in valid_ahps_hucs if huc not in threshold_hucs]
if missing_hucs:
raise Exception(
f"The following HUCs from the input list are not present in the threshold file ({threshold_file}): "
f"{', '.join(missing_hucs)}"
)
else:
# If 'all' is specified, filter valid_ahps_hucs to only those present in the threshold file and warn about dropped HUCs
filtered_hucs = [huc for huc in valid_ahps_hucs if huc in threshold_hucs]
dropped_huc_lst = list(set(valid_ahps_hucs) - set(filtered_hucs))
if dropped_huc_lst:
FLOG.warning(
f"The following HUCs are present in the FIM run directory but not in the threshold file ({threshold_file}) and will be skipped: "
f"{', '.join(dropped_huc_lst)}"
)
valid_ahps_hucs = filtered_hucs
num_hucs = len(valid_ahps_hucs)
if num_hucs == 0:
raise ValueError(
f'After filtering, the number of valid HUCs compared to the output directory of {fim_run_dir} is zero.'
' Verify that you have the correct input folder and threshold file.'
)

# End of Validation and setup
# ================================

Expand All @@ -222,6 +273,11 @@ def process_generate_categorical_fim(
FLOG.warning('Listed HUCs not available in FIM run directory:')
FLOG.warning(dropped_huc_lst)

# Print number of available hucs in threshold_file
if threshold_file != "":
FLOG.lprint(f'Threshold file has data for {len(threshold_hucs)} HUC(s)')

# For API usage
load_dotenv(env_file)
API_BASE_URL = os.getenv('API_BASE_URL')
if API_BASE_URL is None:
Expand Down Expand Up @@ -276,6 +332,7 @@ def process_generate_categorical_fim(
past_major_interval_cap,
nwm_metafile,
df_restricted_sites,
threshold_file,
)
else:
FLOG.lprint("generate_stage_based_categorical_fim step skipped")
Expand Down Expand Up @@ -322,6 +379,7 @@ def process_generate_categorical_fim(
nwm_metafile,
FLOG.LOG_FILE_PATH,
df_restricted_sites,
threshold_file,
)
end = time.time()
elapsed_time = (end - start) / 60
Expand All @@ -347,6 +405,34 @@ def process_generate_categorical_fim(
FLOG.lprint("manage_catfim_mapping step skipped")
# end if else

output_pickle_path = os.path.join(output_thresholds_dir, 'thresholds.pkl')
threshold_csv_files = glob.glob(os.path.join(output_thresholds_dir, "*.csv"))

if threshold_file != "":
FLOG.lprint(f"Skipping threshold CSV compilation because supplied threshold file was used.")

else: # Check whether there are saved threshold CSVs and, if so, compile them
if len(threshold_csv_files) == 0:
FLOG.lprint("No threshold CSV files found to compile, threshold.pkl will not be created.")

else:
# Compile saved thresholds data into a single file
all_dataframes = []
for file in threshold_csv_files:
df = pd.read_csv(file)
all_dataframes.append(df)
os.remove(file) # Remove individual CSV after reading
thresholds_df = pd.concat(all_dataframes, ignore_index=True)

try:
with open(output_pickle_path, 'wb') as f:
pickle.dump(thresholds_df, f)
FLOG.lprint(f"Successfully combined {len(threshold_csv_files)} CSVs into {output_pickle_path}")
except Exception as e:
FLOG.lprint(f"Error saving pickle file {output_pickle_path}: {e}")

######

FLOG.lprint("")

# This is done for SB and FB
Expand Down Expand Up @@ -497,6 +583,7 @@ def iterate_through_huc_stage_based(
parent_log_output_file,
child_log_file_prefix,
progress_stmt,
threshold_file,
):
"""_summary_
This and its children will create stage based tifs and catfim data based on a huc
Expand Down Expand Up @@ -540,7 +627,7 @@ def iterate_through_huc_stage_based(
# -- If necessary files exist, continue -- #
# Yes, each lid gets a record no matter what, so we need some of these messages duplicated
# per lid record
if not os.path.exists(usgs_elev_table):
if not os.path.exists(usgs_elev_table): # TODO: Guam - Will we need to add Guam to this data table? Or include this data in our manual input?
msg = ":Internal Error: Missing key data from HUC record (usgs_elev_table missing)"
# all_messages.append(huc + msg)
MP_LOG.warning(huc + msg)
Expand Down Expand Up @@ -625,31 +712,27 @@ def iterate_through_huc_stage_based(
MP_LOG.warning(huc_lid_id + msg)
continue

# Get stages and flows for each threshold from the WRDS API. Priority given to USGS calculated flows.
thresholds, flows, threshold_count = get_thresholds(
threshold_url=threshold_url, select_by='nws_lid', selector=lid, threshold='all'
)

# temp debug
# MP_LOG.lprint(f"thresholds are {thresholds}")
# MP_LOG.lprint(f"flows are {flows}")
# Get thresholds from WRDS API or threshold file
thresholds, flows, status_msg = __load_thresholds(output_catfim_dir, threshold_url, lid, huc, threshold_file)

# If no thresholds are found, write message and exit.
# Many sites that used to have 'Error getting thresholds from WRDS API' should now
# have this more descriptive status message
if threshold_count == 0:
msg = ':No thresholds found on WRDS API'
all_messages.append(lid + msg)
MP_LOG.warning(huc_lid_id + msg)
continue
MP_LOG.ltrace(status_msg)

# If there are no thresholds but the threshold_count is greater than 0 or NA (unlikely).
# write message and exit.
# Sept 2025 - Removed threshold_count functionality for now. It had the intended functionality of differentiating API errors
# versus zero thresholds being available versus the incorrect thresholds being available, but I think
# it was actually confusing.

# Update status if stage thresholds are not found
if thresholds is None or len(thresholds) == 0:
msg = ':Error getting thresholds from WRDS API'
all_messages.append(lid + msg)
MP_LOG.warning(huc_lid_id + msg)
continue
if "WRDS response sucessful." in status_msg:
msg = ':WRDS response sucessful but no stage values available'
all_messages.append(lid + msg)
MP_LOG.warning(huc_lid_id + msg)
continue
else:
msg = ':Error getting stage thresholds from WRDS API'
all_messages.append(lid + msg)
MP_LOG.warning(huc_lid_id + msg)
continue

# Check if stages are supplied, if not write message and exit.
# This message will occur if some thresholds are supplied, but not for the
Expand All @@ -670,18 +753,20 @@ def iterate_through_huc_stage_based(
if err_msg != "":
# The error message is already formatted correctly
all_messages.append(lid + err_msg)
MP_LOG.warning(huc_lid_id + msg)
MP_LOG.warning(huc_lid_id + err_msg)
continue

# Look for acceptable elevs
acceptable_usgs_elev_df = __create_acceptable_usgs_elev_df(usgs_elev_df, huc_lid_id)
acceptable_usgs_elev_df = __create_acceptable_usgs_elev_df(usgs_elev_df, huc_lid_id)
# TODO: Guam - Do we need to add Guam to this data? Or have it read in this data from the manual inputs?

if acceptable_usgs_elev_df is None or len(acceptable_usgs_elev_df) == 0:
msg = ":Unable to find gage data" # TODO: USGS Gage Method: Update this error message to be more descriptive
all_messages.append(lid + msg)
MP_LOG.warning(huc_lid_id + msg)
continue

# TODO: Guam - Do we need to add Guam to this data? Or have it read in this data from the manual inputs?
# Get the dem_adj_elevation value from usgs_elev_table.csv.
# Prioritize the value that is not from branch 0.
lid_usgs_elev, dem_eval_messages = __adj_dem_evalation_val(
Expand Down Expand Up @@ -1588,6 +1673,7 @@ def generate_stage_based_categorical_fim(
past_major_interval_cap,
nwm_metafile,
df_restricted_sites,
threshold_file,
):
'''
Sep 2024,
Expand All @@ -1605,11 +1691,8 @@ def generate_stage_based_categorical_fim(
huc_messages_dir = os.path.join(output_mapping_dir, 'huc_messages')
os.makedirs(huc_messages_dir, exist_ok=True)

FLOG.lprint("Starting generate_flows (Stage Based)")

# If it is stage based, generate flows returns all of these objects.
# If flow based, generate flows returns only

FLOG.lprint("Starting generate_flows (Stage Based)")
# Generate flows is only using one of the incoming job number params
# so let's multiply -jh (huc) and -jn (inundate)
job_flows = job_number_huc * job_number_inundate
Expand All @@ -1620,7 +1703,7 @@ def generate_stage_based_categorical_fim(
# stage based doesn't really need generated flow data
# But for flow based, it really does use it to generate flows.
#
(huc_dictionary, sites_gdf, ___, threshold_url, all_lists, nwm_flows_df, nwm_flows_alaska_df) = (
(huc_dictionary, sites_gdf, ___, threshold_url, all_lists, flows_df_dict) = (
generate_flows(
output_catfim_dir,
nwm_us_search,
Expand All @@ -1632,6 +1715,7 @@ def generate_stage_based_categorical_fim(
nwm_metafile,
str(FLOG.LOG_FILE_PATH),
df_restricted_sites,
threshold_file,
)
)

Expand All @@ -1652,7 +1736,14 @@ def generate_stage_based_categorical_fim(
if huc in lst_hucs:
# FLOG.lprint(f'Generating stage based catfim for : {huc}')

nwm_flows_region_df = nwm_flows_alaska_df if str(huc[:2]) == '19' else nwm_flows_df
if huc[:4] == '2201': # Guam
nwm_flows_region_df = flows_df_dict['nhd_flows_guam_df']
elif huc[:4] == '2203': # American Samoa
nwm_flows_region_df = flows_df_dict['nhd_flows_americansamoa_df']
elif huc[:2] == '19': # Alaska
nwm_flows_region_df = flows_df_dict['nwm_flows_alaska_df']
else: # CONUS + Hawaii + Puerto Rico
nwm_flows_region_df = flows_df_dict['nwm_flows_df']

progress_stmt = f"index {huc_index + 1} of {num_hucs}"
executor.submit(
Expand All @@ -1671,6 +1762,7 @@ def generate_stage_based_categorical_fim(
str(FLOG.LOG_FILE_PATH),
child_log_file_prefix,
progress_stmt,
threshold_file,
)
huc_index += 1

Expand Down Expand Up @@ -1706,6 +1798,8 @@ def generate_stage_based_categorical_fim(
# Write to file
if len(all_csv_df) == 0:
raise Exception(f"no csv files found - missing attribute CSVs in {attributes_dir}")
# TODO: This error currently occurs if no sites are mapped (usually in a test).
# Make a test that catches this earlier and provides a more legible error/warning message.

all_csv_df.to_csv(os.path.join(attributes_dir, 'nws_lid_attributes.csv'), index=False)

Expand Down Expand Up @@ -1968,6 +2062,16 @@ def set_start_files_folders(
default="",
)

parser.add_argument(
'-tf',
'--threshold-file',
help='OPTIONAL: If you have a pre-existing threshold file, you can path to it here. '
'Providing this manual input will prevent the WRDS API from being queried for thresholds.'
' e.g.: /data/catfim/threshold_file.pkl',
required=False,
default="",
)

parser.add_argument(
'-cv',
'--catfim-version',
Expand Down
Loading
Loading