From f4b15dc6db678213f2e606e2be9e56ce8f817201 Mon Sep 17 00:00:00 2001 From: EmilyDeardorff-NOAA Date: Wed, 24 Sep 2025 18:43:14 +0000 Subject: [PATCH 01/13] Add threshold download functionality and scaffold Guam. --- tools/catfim/generate_categorical_fim.py | 138 +++++++++++++++--- .../catfim/generate_categorical_fim_flows.py | 112 ++++++++++++-- .../generate_categorical_fim_mapping.py | 2 +- 3 files changed, 221 insertions(+), 31 deletions(-) diff --git a/tools/catfim/generate_categorical_fim.py b/tools/catfim/generate_categorical_fim.py index d42bedbb8..613739a09 100755 --- a/tools/catfim/generate_categorical_fim.py +++ b/tools/catfim/generate_categorical_fim.py @@ -2,6 +2,7 @@ import argparse import glob +import pickle import math import os import shutil @@ -15,7 +16,10 @@ import numpy as np import pandas as pd from dotenv import load_dotenv -from generate_categorical_fim_flows import generate_flows +from generate_categorical_fim_flows import ( + generate_flows, + __load_thresholds, +) from generate_categorical_fim_mapping import ( manage_catfim_mapping, post_process_cat_fim_for_viz, @@ -88,6 +92,7 @@ def process_generate_categorical_fim( past_major_interval_cap, step_num, nwm_metafile, + threshold_file, ): # ================================ @@ -127,6 +132,7 @@ def process_generate_categorical_fim( output_flows_dir = os.path.join(output_catfim_dir, 'flows') output_mapping_dir = os.path.join(output_catfim_dir, 'mapping') attributes_dir = os.path.join(output_catfim_dir, 'attributes') + output_thresholds_dir = os.path.join(output_catfim_dir, 'thresholds') # ================================ set_start_files_folders( @@ -146,6 +152,7 @@ def process_generate_categorical_fim( if file_ext[1].lower() != ".pkl": raise Exception("The nwm_metadata (-me) file appears to be invalid. The extention is not pkl.") + # ================================ # Define default arguments. Modify these if necessary @@ -202,6 +209,50 @@ def process_generate_categorical_fim( ' Verify that you have the correct input folder and if you used the -lh flag that it' ' is a valid matching HUC.' ) + + + # ================================ + if threshold_file != "": + if os.path.exists(threshold_file) == False: + raise Exception("The threshold input file can not be found. Please remove or fix pathing.") + file_ext = os.path.splitext(threshold_file) + if file_ext.count == 0: + raise Exception("The threshold input file appears to be invalid. It is missing an extension.") + if file_ext[1].lower() != ".pkl": + raise Exception("The threshold input file appears to be invalid. The extention is not pkl.") + + # Read pickle file and get a list of unique HUCs + with open(threshold_file, 'rb') as f: + loaded_data = pickle.load(f) + + hucs = loaded_data['huc'].unique().tolist() + threshold_hucs= [str(num).zfill(8) for num in hucs] + + # If a HUC list is specified, check that the HUCs in the list are also in the threshold file + if 'all' not in lst_hucs: + missing_hucs = [huc for huc in valid_ahps_hucs if huc not in threshold_hucs] + if missing_hucs: + raise Exception( + f"The following HUCs from the input list are not present in the threshold file ({threshold_file}): " + f"{', '.join(missing_hucs)}" + ) + else: + # If 'all' is specified, filter valid_ahps_hucs to only those present in the threshold file and warn about dropped HUCs + filtered_hucs = [huc for huc in valid_ahps_hucs if huc in threshold_hucs] + dropped_huc_lst = list(set(valid_ahps_hucs) - set(filtered_hucs)) + if dropped_huc_lst: + FLOG.warning( + f"The following HUCs are present in the FIM run directory but not in the threshold file ({threshold_file}) and will be skipped: " + f"{', '.join(dropped_huc_lst)}" + ) + valid_ahps_hucs = filtered_hucs + num_hucs = len(valid_ahps_hucs) + if num_hucs == 0: + raise ValueError( + f'After filtering, the number of valid HUCs compared to the output directory of {fim_run_dir} is zero.' + ' Verify that you have the correct input folder and threshold file.' + ) + # End of Validation and setup # ================================ @@ -222,6 +273,11 @@ def process_generate_categorical_fim( FLOG.warning('Listed HUCs not available in FIM run directory:') FLOG.warning(dropped_huc_lst) + # Print available hucs in threshold_file ## TEMP DEBUG + if threshold_file != "": ## TEMP DEBUG + FLOG.lprint(f'Threshold file included with data for the following HUCs: {threshold_hucs}') ## TEMP DEBUG + + # For API usage load_dotenv(env_file) API_BASE_URL = os.getenv('API_BASE_URL') if API_BASE_URL is None: @@ -276,6 +332,7 @@ def process_generate_categorical_fim( past_major_interval_cap, nwm_metafile, df_restricted_sites, + threshold_file, ) else: FLOG.lprint("generate_stage_based_categorical_fim step skipped") @@ -322,6 +379,7 @@ def process_generate_categorical_fim( nwm_metafile, FLOG.LOG_FILE_PATH, df_restricted_sites, + threshold_file, ) end = time.time() elapsed_time = (end - start) / 60 @@ -347,6 +405,35 @@ def process_generate_categorical_fim( FLOG.lprint("manage_catfim_mapping step skipped") # end if else + output_pickle_path = os.path.join(output_thresholds_dir, 'thresholds.pkl') + threshold_csv_files = glob.glob(os.path.join(output_thresholds_dir, "*.csv")) + + # Check whether there are saved thresholds and, if so, compile them + if threshold_file != "": + FLOG.lprint(f"Skipping threshold CSV compilation because supplied threshold file was used.") + + else: + if len(threshold_csv_files) == 0: + FLOG.lprint("No threshold CSV files found to compile, threshold.pkl will not be created.") + + else: + # Compile saved thresholds data into a single file + all_dataframes = [] + for file in threshold_csv_files: + df = pd.read_csv(file) + all_dataframes.append(df) + os.remove(file) # Remove individual CSV after reading + thresholds_df = pd.concat(all_dataframes, ignore_index=True) + + try: + with open(output_pickle_path, 'wb') as f: + pickle.dump(thresholds_df, f) + FLOG.lprint(f"Successfully combined {len(threshold_csv_files)} CSVs into {output_pickle_path}") + except Exception as e: + FLOG.lprint(f"Error saving pickle file {output_pickle_path}: {e}") + + ###### + FLOG.lprint("") # This is done for SB and FB @@ -497,6 +584,7 @@ def iterate_through_huc_stage_based( parent_log_output_file, child_log_file_prefix, progress_stmt, + threshold_file, ): """_summary_ This and its children will create stage based tifs and catfim data based on a huc @@ -540,7 +628,7 @@ def iterate_through_huc_stage_based( # -- If necessary files exist, continue -- # # Yes, each lid gets a record no matter what, so we need some of these messages duplicated # per lid record - if not os.path.exists(usgs_elev_table): + if not os.path.exists(usgs_elev_table): # TODO: Guam - Will we need to add Guam to this data table? Or include this data in our manual input? msg = ":Internal Error: Missing key data from HUC record (usgs_elev_table missing)" # all_messages.append(huc + msg) MP_LOG.warning(huc + msg) @@ -625,14 +713,10 @@ def iterate_through_huc_stage_based( MP_LOG.warning(huc_lid_id + msg) continue - # Get stages and flows for each threshold from the WRDS API. Priority given to USGS calculated flows. - thresholds, flows, threshold_count = get_thresholds( - threshold_url=threshold_url, select_by='nws_lid', selector=lid, threshold='all' - ) + # Get thresholds from WRDS API or threshold file + thresholds, flows, threshold_count = __load_thresholds(output_catfim_dir, threshold_url, lid, huc, threshold_file) - # temp debug - # MP_LOG.lprint(f"thresholds are {thresholds}") - # MP_LOG.lprint(f"flows are {flows}") + # TODO: Simplify these status messages # If no thresholds are found, write message and exit. # Many sites that used to have 'Error getting thresholds from WRDS API' should now @@ -660,6 +744,7 @@ def iterate_through_huc_stage_based( MP_LOG.warning(huc_lid_id + msg) continue + # Read stage values and calculate thresholds # The error and warning message is already formatted correctly if applicable # Hold the warning_msg to the end @@ -670,11 +755,12 @@ def iterate_through_huc_stage_based( if err_msg != "": # The error message is already formatted correctly all_messages.append(lid + err_msg) - MP_LOG.warning(huc_lid_id + msg) + MP_LOG.warning(huc_lid_id + err_msg) continue # Look for acceptable elevs - acceptable_usgs_elev_df = __create_acceptable_usgs_elev_df(usgs_elev_df, huc_lid_id) + acceptable_usgs_elev_df = __create_acceptable_usgs_elev_df(usgs_elev_df, huc_lid_id) + # TODO: Guam - Do we need to add Guam to this data? Or have it read in this data from the manual inputs? if acceptable_usgs_elev_df is None or len(acceptable_usgs_elev_df) == 0: msg = ":Unable to find gage data" # TODO: USGS Gage Method: Update this error message to be more descriptive @@ -682,6 +768,7 @@ def iterate_through_huc_stage_based( MP_LOG.warning(huc_lid_id + msg) continue + # TODO: Guam - Do we need to add Guam to this data? Or have it read in this data from the manual inputs? # Get the dem_adj_elevation value from usgs_elev_table.csv. # Prioritize the value that is not from branch 0. lid_usgs_elev, dem_eval_messages = __adj_dem_evalation_val( @@ -1588,6 +1675,7 @@ def generate_stage_based_categorical_fim( past_major_interval_cap, nwm_metafile, df_restricted_sites, + threshold_file, ): ''' Sep 2024, @@ -1605,11 +1693,8 @@ def generate_stage_based_categorical_fim( huc_messages_dir = os.path.join(output_mapping_dir, 'huc_messages') os.makedirs(huc_messages_dir, exist_ok=True) - FLOG.lprint("Starting generate_flows (Stage Based)") - - # If it is stage based, generate flows returns all of these objects. - # If flow based, generate flows returns only + FLOG.lprint("Starting generate_flows (Stage Based)") # Generate flows is only using one of the incoming job number params # so let's multiply -jh (huc) and -jn (inundate) job_flows = job_number_huc * job_number_inundate @@ -1620,7 +1705,7 @@ def generate_stage_based_categorical_fim( # stage based doesn't really need generated flow data # But for flow based, it really does use it to generate flows. # - (huc_dictionary, sites_gdf, ___, threshold_url, all_lists, nwm_flows_df, nwm_flows_alaska_df) = ( + (huc_dictionary, sites_gdf, ___, threshold_url, all_lists, nwm_flows_df, nwm_flows_alaska_df) = ( # TODO: Guam - Add pacific islands flow df generate_flows( output_catfim_dir, nwm_us_search, @@ -1632,6 +1717,7 @@ def generate_stage_based_categorical_fim( nwm_metafile, str(FLOG.LOG_FILE_PATH), df_restricted_sites, + threshold_file, ) ) @@ -1652,7 +1738,12 @@ def generate_stage_based_categorical_fim( if huc in lst_hucs: # FLOG.lprint(f'Generating stage based catfim for : {huc}') - nwm_flows_region_df = nwm_flows_alaska_df if str(huc[:2]) == '19' else nwm_flows_df + if huc[:2] == '19': # Alaska + nwm_flows_region_df = nwm_flows_alaska_df + elif huc[:2] == '22': # Pacific Islands + nwm_flows_region_df = nwm_flows_df # TODO: Guam - change this to nwm_flows_pacific_df when available + else: # CONUS + Hawaii + Puerto Rico + nwm_flows_region_df = nwm_flows_df progress_stmt = f"index {huc_index + 1} of {num_hucs}" executor.submit( @@ -1671,6 +1762,7 @@ def generate_stage_based_categorical_fim( str(FLOG.LOG_FILE_PATH), child_log_file_prefix, progress_stmt, + threshold_file, ) huc_index += 1 @@ -1706,6 +1798,8 @@ def generate_stage_based_categorical_fim( # Write to file if len(all_csv_df) == 0: raise Exception(f"no csv files found - missing attribute CSVs in {attributes_dir}") + # TODO: This error currently occurs if no sites are mapped (usually in a test). + # Make a test that catches this earlier and provides a more legible error/warning message. all_csv_df.to_csv(os.path.join(attributes_dir, 'nws_lid_attributes.csv'), index=False) @@ -1968,6 +2062,16 @@ def set_start_files_folders( default="", ) + parser.add_argument( + '-tf', + '--threshold-file', + help='OPTIONAL: If you have a pre-existing threshold file, you can path to it here. ' + 'Providing this manual input will prevent the WRDS API from being queried for thresholds.' + ' e.g.: /data/catfim/threshold_file.pkl', + required=False, + default="", + ) + parser.add_argument( '-cv', '--catfim-version', diff --git a/tools/catfim/generate_categorical_fim_flows.py b/tools/catfim/generate_categorical_fim_flows.py index 1772ea51a..75646442f 100755 --- a/tools/catfim/generate_categorical_fim_flows.py +++ b/tools/catfim/generate_categorical_fim_flows.py @@ -2,6 +2,7 @@ import argparse import copy +import csv import glob import os import pickle @@ -65,6 +66,8 @@ def generate_flows_for_huc( parent_log_output_file, child_log_file_prefix, df_restricted_sites, + output_catfim_dir, + threshold_file, ): try: @@ -143,11 +146,16 @@ def generate_flows_for_huc( # Careful, for "all_message.append" the syntax into it must be f'{lid}: (whever messages) # this is gets parsed and logic used against it. - # for Stage based, is uses stage values from threshold data supplied by WRDS - # but here (for flow), it uses the values from the flows data from WRDS - stages, flows, threshold_count = get_thresholds( - threshold_url=threshold_url, select_by='nws_lid', selector=lid, threshold='all' - ) + # # TODO: Guam - insert processing for if thresholds are already downloaded (similar to NWM metafile processing) + + # # for Stage based, is uses stage values from threshold data supplied by WRDS + # # but here (for flow), it uses the values from the flows data from WRDS + # stages, flows, threshold_count = get_thresholds( + # threshold_url=threshold_url, select_by='nws_lid', selector=lid, threshold='all' + # ) + + stages, flows, threshold_count = __load_thresholds(output_catfim_dir, threshold_url, lid, huc, threshold_file) + # Yes.. stages, we will handle missing flows later even though we don't use the stage here if stages is None or len(stages) == 0: @@ -374,6 +382,7 @@ def generate_flows( nwm_metafile, log_output_file, df_restricted_sites, + threshold_file, ): # TODO; Most docstrings like this are now very outdated and need updating @@ -429,12 +438,14 @@ def generate_flows( start_dt = datetime.now(timezone.utc) # Open NWM flows geopackages - # TODO: Jul 2025: These shoudl be changed to a bash_variable or something other than + # TODO: Jul 2025: These should be changed to a bash_variable or something other than # hardcoding. Granted.. we don't have new ones but might someday nwm_flows_gpkg = r'/data/inputs/nwm_hydrofabric/nwm_flows.gpkg' nwm_flows_df = gpd.read_file(nwm_flows_gpkg) nwm_flows_alaska_gpkg = r'/data/inputs/nwm_hydrofabric/nwm_flows_alaska_nwmV3_ID.gpkg' nwm_flows_alaska_df = gpd.read_file(nwm_flows_alaska_gpkg) + # nwm_flows_pacific_gpkg = '' # TODO: Guam - Route this correctly r'/data/inputs/nwm_hydrofabric/nwm_flows_pacific_nwmV3_ID.gpkg' + # nwm_flows_pacific_df = gpd.read_file(nwm_flows_pacific_gpkg) # nwm_metafile might be an empty string # maybe ensure all projections are changed to one standard output of 3857 (see shared_variables) as the come out @@ -443,7 +454,7 @@ def generate_flows( # Filter the meta list to just HUCs in the fim run output or huc if sent in as a param all_meta_lists = __load_nwm_metadata( output_catfim_dir, metadata_url, nwm_us_search, nwm_ds_search, nwm_metafile - ) + ) # TODO: Guam - will use nwm_metafile option end_dt = datetime.now(timezone.utc) time_duration = end_dt - start_dt @@ -485,7 +496,7 @@ def generate_flows( threshold_url, all_meta_lists, nwm_flows_df, - nwm_flows_alaska_df, + nwm_flows_alaska_df,# TODO: Guam - Add pacific islands flow df ) # only flow based needs the "flow" dir @@ -501,10 +512,12 @@ def generate_flows( with ProcessPoolExecutor(max_workers=job_number_huc) as executor: for huc in huc_dictionary: - # nwm_flows_region_df = nwm_flows_df # To exclude Alaska - nwm_flows_region_df = ( - nwm_flows_alaska_df if huc[:2] == '19' else nwm_flows_df - ) # To include Alaska + if huc[:2] == '19': # Alaska + nwm_flows_region_df = nwm_flows_alaska_df + elif huc[:2] == '22': # Pacific Islands + nwm_flows_region_df = nwm_flows_df # TODO: Guam - change this to nwm_flows_pacific_df when available + else: # CONUS + Hawaii + Puerto Rico + nwm_flows_region_df = nwm_flows_df # Deep copy that speed up Multi-Proc a little as all_meta_lists # is a huge object. Need to figure out how to filter that down somehow @@ -523,6 +536,8 @@ def generate_flows( log_output_file, child_log_file_prefix, df_restricted_sites, + output_catfim_dir, + threshold_file, ) # end ProcessPoolExecutor @@ -650,6 +665,68 @@ def generate_flows( FLOG.lprint(f"End Wrapping up flows generation Duration: {str(all_time_duration).split('.')[0]}") print() +def __load_thresholds(output_catfim_dir, threshold_url, lid, huc, threshold_file): + + if os.path.isfile(threshold_file) == True: + FLOG.lprint(f"Threshold file already downloaded and exists at {threshold_file}") # TODO: Guam - this option will be used + + # Read pickle file and get the stages and flows dictionary for the site + with open(threshold_file, 'rb') as f: + loaded_data = pickle.load(f) + site_data = loaded_data[loaded_data['nws_lid'] == lid.upper()] # TODO: Check whether we need an upper or lower case conversion + + # Error if site_data is empty + if site_data.empty: + FLOG.error(f"No threshold data found for LID {lid} in the provided threshold file.") + return None, None, 0 + + # Make output dictionaries for stages and flows + # Assuming there's only one record per threshold_type per lid + stages = site_data.loc[site_data['threshold_type'] == 'stages'].to_dict(orient='records')[0] + del stages['threshold_type'] + del stages['huc'] + + flows = site_data.loc[site_data['threshold_type'] == 'flows'].to_dict(orient='records')[0] + del flows['threshold_type'] + del flows['huc'] + + # Print out the stages and flows for debugging + FLOG.lprint(f"Stages for LID {lid}: {stages}") ## TEMP DEBUG + FLOG.lprint(f"Flows for LID {lid}: {flows}") ## TEMP DEBUG + + # Count how many thresholds are not None in stages + threshold_count = 1 ## TEMP DEBUG #sum(1 for key in stages if key in ['action', 'minor', 'moderate', 'major', 'record'] and stages[key] is not None) + # TODO: Guam - Decide what I want to do about the threshold count... remove or fix? + + else: + # Get thresholds from the WRDS API + stages, flows, threshold_count = get_thresholds( + threshold_url=threshold_url, select_by='nws_lid', selector=lid, threshold='all' + ) + + # Save thresholds to output_thresholds_dir (if the file of that LID doesn't already exist) + output_thresholds_dir = os.path.join(output_catfim_dir, "thresholds") + if not os.path.exists(output_thresholds_dir): + os.mkdir(output_thresholds_dir) + + lid_thresholds_csv = os.path.join(output_thresholds_dir, f"thresholds_{lid.lower()}.csv") + + if not os.path.isfile(lid_thresholds_csv): + thresholds = [{'threshold_type': 'stages', 'huc': huc, **stages}, # TODO: Add HUC + {'threshold_type': 'flows', 'huc': huc, **flows}] # TODO: Add HUC + + with open(lid_thresholds_csv, mode='w', newline='') as file: + writer = csv.DictWriter(file, fieldnames=thresholds[1].keys()) + writer.writeheader() + writer.writerows(thresholds) + + + # TODO: In CatFIM post-processing, add a functionality to append all those files together into one thresholds.pkl + # so we can rerun CatFIM without hitting the WRDS API (like we did with __load_nwm_metadata) + # Will need to double check that the memory usage isn't insane with this new idea. + + + return stages, flows, threshold_count # local script calls __load_nwm_metadata so FLOG is already setup def __load_nwm_metadata(output_catfim_dir, metadata_url, nwm_us_search, nwm_ds_search, nwm_metafile): @@ -662,7 +739,7 @@ def __load_nwm_metadata(output_catfim_dir, metadata_url, nwm_us_search, nwm_ds_s # WRDS unless we need a smaller or modified version. This one likely has all nws_lid data. if os.path.isfile(nwm_metafile) == True: - FLOG.lprint(f"Meta file already downloaded and exists at {nwm_metafile}") + FLOG.lprint(f"Meta file already downloaded and exists at {nwm_metafile}") # TODO: Guam - this option will be used with open(nwm_metafile, "rb") as p_handle: output_meta_list = pickle.load(p_handle) @@ -819,6 +896,15 @@ def __load_nwm_metadata(output_catfim_dir, metadata_url, nwm_us_search, nwm_ds_s default="", ) + parser.add_argument( + '-tf', + '--threshold-file', + help='OPTIONAL: Path to the pre-made pickle file that already holds the thresholds', + required=False, + type=str, + default="", + ) + args = vars(parser.parse_args()) # Run get_env_paths and static_flow_lids diff --git a/tools/catfim/generate_categorical_fim_mapping.py b/tools/catfim/generate_categorical_fim_mapping.py index 3c808909a..b29b9cbbe 100755 --- a/tools/catfim/generate_categorical_fim_mapping.py +++ b/tools/catfim/generate_categorical_fim_mapping.py @@ -663,7 +663,7 @@ def post_process_huc( # There may not necessarily be an attributes.csv for this lid, depending on how flow processing went # lots of lids fall out in the attributes or flow steps. if os.path.exists(nws_lid_attributes_filename) == False: - MP_LOG.warning(f"{ahps_lid} has no attributes file which may perfectly fine.") + MP_LOG.warning(f"{ahps_lid} has no attributes file (which may be perfectly fine)") continue # We are going to do an MP in MP. From 45df3e62e6d383066fadfa047e052892bcff89ad Mon Sep 17 00:00:00 2001 From: EmilyDeardorff-NOAA Date: Thu, 25 Sep 2025 18:08:34 +0000 Subject: [PATCH 02/13] Add functionality to handle multiple flow files better. --- tools/catfim/generate_categorical_fim.py | 23 +++--- .../catfim/generate_categorical_fim_flows.py | 80 ++++++++++++------- 2 files changed, 62 insertions(+), 41 deletions(-) diff --git a/tools/catfim/generate_categorical_fim.py b/tools/catfim/generate_categorical_fim.py index 613739a09..dfb58e8ff 100755 --- a/tools/catfim/generate_categorical_fim.py +++ b/tools/catfim/generate_categorical_fim.py @@ -273,9 +273,9 @@ def process_generate_categorical_fim( FLOG.warning('Listed HUCs not available in FIM run directory:') FLOG.warning(dropped_huc_lst) - # Print available hucs in threshold_file ## TEMP DEBUG - if threshold_file != "": ## TEMP DEBUG - FLOG.lprint(f'Threshold file included with data for the following HUCs: {threshold_hucs}') ## TEMP DEBUG + # Print number of available hucs in threshold_file + if threshold_file != "": + FLOG.lprint(f'Threshold file has data for {len(threshold_hucs)} HUC(s)') # For API usage load_dotenv(env_file) @@ -408,11 +408,10 @@ def process_generate_categorical_fim( output_pickle_path = os.path.join(output_thresholds_dir, 'thresholds.pkl') threshold_csv_files = glob.glob(os.path.join(output_thresholds_dir, "*.csv")) - # Check whether there are saved thresholds and, if so, compile them if threshold_file != "": FLOG.lprint(f"Skipping threshold CSV compilation because supplied threshold file was used.") - else: + else: # Check whether there are saved threshold CSVs and, if so, compile them if len(threshold_csv_files) == 0: FLOG.lprint("No threshold CSV files found to compile, threshold.pkl will not be created.") @@ -1705,7 +1704,7 @@ def generate_stage_based_categorical_fim( # stage based doesn't really need generated flow data # But for flow based, it really does use it to generate flows. # - (huc_dictionary, sites_gdf, ___, threshold_url, all_lists, nwm_flows_df, nwm_flows_alaska_df) = ( # TODO: Guam - Add pacific islands flow df + (huc_dictionary, sites_gdf, ___, threshold_url, all_lists, flows_df_dict) = ( generate_flows( output_catfim_dir, nwm_us_search, @@ -1738,12 +1737,14 @@ def generate_stage_based_categorical_fim( if huc in lst_hucs: # FLOG.lprint(f'Generating stage based catfim for : {huc}') - if huc[:2] == '19': # Alaska - nwm_flows_region_df = nwm_flows_alaska_df - elif huc[:2] == '22': # Pacific Islands - nwm_flows_region_df = nwm_flows_df # TODO: Guam - change this to nwm_flows_pacific_df when available + if huc[:4] == '2201': # Guam + nwm_flows_region_df = flows_df_dict['nhd_flows_guam_df'] + elif huc[:4] == '2203': # American Samoa + nwm_flows_region_df = flows_df_dict['nhd_flows_americansamoa_df'] + elif huc[:2] == '19': # Alaska + nwm_flows_region_df = flows_df_dict['nwm_flows_alaska_df'] else: # CONUS + Hawaii + Puerto Rico - nwm_flows_region_df = nwm_flows_df + nwm_flows_region_df = flows_df_dict['nwm_flows_df'] progress_stmt = f"index {huc_index + 1} of {num_hucs}" executor.submit( diff --git a/tools/catfim/generate_categorical_fim_flows.py b/tools/catfim/generate_categorical_fim_flows.py index 75646442f..d4dcae66c 100755 --- a/tools/catfim/generate_categorical_fim_flows.py +++ b/tools/catfim/generate_categorical_fim_flows.py @@ -62,7 +62,7 @@ def generate_flows_for_huc( output_flows_dir, attributes_dir, huc_messages_dir, - nwm_flows_df, + flows_df_dict, #nwm_flows_df, # TODO: Do we need to update this to be region specific df? or the dict of dfs? parent_log_output_file, child_log_file_prefix, df_restricted_sites, @@ -146,18 +146,10 @@ def generate_flows_for_huc( # Careful, for "all_message.append" the syntax into it must be f'{lid}: (whever messages) # this is gets parsed and logic used against it. - # # TODO: Guam - insert processing for if thresholds are already downloaded (similar to NWM metafile processing) - - # # for Stage based, is uses stage values from threshold data supplied by WRDS - # # but here (for flow), it uses the values from the flows data from WRDS - # stages, flows, threshold_count = get_thresholds( - # threshold_url=threshold_url, select_by='nws_lid', selector=lid, threshold='all' - # ) - stages, flows, threshold_count = __load_thresholds(output_catfim_dir, threshold_url, lid, huc, threshold_file) - # Yes.. stages, we will handle missing flows later even though we don't use the stage here + # TODO: Decide if this is still good status messaging, maybe update if stages is None or len(stages) == 0: msg = ':Error getting stages values from WRDS API' all_messages.append(lid + msg) @@ -200,7 +192,18 @@ def generate_flows_for_huc( desired_order = metadata['nwm_feature_data']['stream_order'] # Filter segments to be of like stream order. - segments = filter_nwm_segments_by_stream_order(unfiltered_segments, desired_order, nwm_flows_df) + # TODO: Guam - test this update + if huc[:4] == '2201': # Guam + nwm_flows_region_df = flows_df_dict['nhd_flows_guam_df'] + elif huc[:4] == '2203': # American Samoa + nwm_flows_region_df = flows_df_dict['nhd_flows_americansamoa_df'] + elif huc[:2] == '19': # Alaska + nwm_flows_region_df = flows_df_dict['nwm_flows_alaska_df'] + else: # CONUS + Hawaii + Puerto Rico + nwm_flows_region_df = flows_df_dict['nwm_flows_df'] + + segments = filter_nwm_segments_by_stream_order(unfiltered_segments, desired_order, nwm_flows_region_df) + # Previous input was nwm_flows_df, but now it is region specific df (9/25/25) # If there are no segments, write message and exit out if not segments or len(segments) == 0: @@ -425,8 +428,11 @@ def generate_flows( API_BASE_URL, WBD_LAYER = get_env_paths(env_file) nwm_us_search = int(nwm_us_search) nwm_ds_search = int(nwm_ds_search) - metadata_url = f'{API_BASE_URL}/metadata' - threshold_url = f'{API_BASE_URL}/nws_threshold' + # metadata_url = f'{API_BASE_URL}/metadata' ## TEMP DEBUG - TODO: Guam - add back in after testing + # threshold_url = f'{API_BASE_URL}/nws_threshold' ## TEMP DEBUG - TODO: Guam - add back in after testing + + metadata_url = f'{API_BASE_URL}/metadataBROKEN' ## TEMP DEBUG + threshold_url = f'{API_BASE_URL}/nws_thresholdBROKEN' ## TEMP DEBUG ################################################################### # Create HUC message directory to store messages that will be read and joined after multiprocessing @@ -438,14 +444,27 @@ def generate_flows( start_dt = datetime.now(timezone.utc) # Open NWM flows geopackages - # TODO: Jul 2025: These should be changed to a bash_variable or something other than - # hardcoding. Granted.. we don't have new ones but might someday - nwm_flows_gpkg = r'/data/inputs/nwm_hydrofabric/nwm_flows.gpkg' + # TODO: Pull from bash_variables.env once we switch from using catfim.env to bash_variables.env + nwm_flows_gpkg = r'/data/inputs/nwm_hydrofabric/nwm_flows.gpkg' # TODO: Get from bash_variables.env + nwm_flows_alaska_gpkg = r'/data/inputs/nwm_hydrofabric/nwm_flows_alaska_nwmV3_ID.gpkg' # TODO: Get from bash_variables.env + input_nhd_flows_Guam = r'/data/inputs/nhdplus/Guam_6637/NHDFlowline_Guam_6637.gpkg' # TODO: Get from bash_variables.env + input_nhd_flows_AmericanSamoa = r'/data/inputs/nhdplus/AmericanSamoa_32702/NHDFlowline_AmericanSamoa_32702.gpkg' # TODO: Get from bash_variables.env + nwm_flows_df = gpd.read_file(nwm_flows_gpkg) - nwm_flows_alaska_gpkg = r'/data/inputs/nwm_hydrofabric/nwm_flows_alaska_nwmV3_ID.gpkg' nwm_flows_alaska_df = gpd.read_file(nwm_flows_alaska_gpkg) - # nwm_flows_pacific_gpkg = '' # TODO: Guam - Route this correctly r'/data/inputs/nwm_hydrofabric/nwm_flows_pacific_nwmV3_ID.gpkg' - # nwm_flows_pacific_df = gpd.read_file(nwm_flows_pacific_gpkg) + nhd_flows_guam_df = gpd.read_file(input_nhd_flows_Guam) + nhd_flows_americansamoa_df = gpd.read_file(input_nhd_flows_AmericanSamoa) + + # TODO: Guam - pass this dictionary into functions instead of all four dfs + + # Add the dfs to a dictionary for easy access later + flows_df_dict = { + 'nwm_flows_df': nwm_flows_df, + 'nwm_flows_alaska_df': nwm_flows_alaska_df, + 'nhd_flows_guam_df': nhd_flows_guam_df, + 'nhd_flows_americansamoa_df': nhd_flows_americansamoa_df + } + # nwm_metafile might be an empty string # maybe ensure all projections are changed to one standard output of 3857 (see shared_variables) as the come out @@ -495,8 +514,7 @@ def generate_flows( metadata_url, threshold_url, all_meta_lists, - nwm_flows_df, - nwm_flows_alaska_df,# TODO: Guam - Add pacific islands flow df + flows_df_dict, ) # only flow based needs the "flow" dir @@ -512,12 +530,14 @@ def generate_flows( with ProcessPoolExecutor(max_workers=job_number_huc) as executor: for huc in huc_dictionary: - if huc[:2] == '19': # Alaska - nwm_flows_region_df = nwm_flows_alaska_df - elif huc[:2] == '22': # Pacific Islands - nwm_flows_region_df = nwm_flows_df # TODO: Guam - change this to nwm_flows_pacific_df when available + if huc[:4] == '2201': # Guam + nwm_flows_region_df = flows_df_dict['nhd_flows_guam_df'] + elif huc[:4] == '2203': # American Samoa + nwm_flows_region_df = flows_df_dict['nhd_flows_americansamoa_df'] + elif huc[:2] == '19': # Alaska + nwm_flows_region_df = flows_df_dict['nwm_flows_alaska_df'] else: # CONUS + Hawaii + Puerto Rico - nwm_flows_region_df = nwm_flows_df + nwm_flows_region_df = flows_df_dict['nwm_flows_df'] # Deep copy that speed up Multi-Proc a little as all_meta_lists # is a huge object. Need to figure out how to filter that down somehow @@ -668,7 +688,7 @@ def generate_flows( def __load_thresholds(output_catfim_dir, threshold_url, lid, huc, threshold_file): if os.path.isfile(threshold_file) == True: - FLOG.lprint(f"Threshold file already downloaded and exists at {threshold_file}") # TODO: Guam - this option will be used + # FLOG.lprint(f"Threshold file already downloaded and exists at {threshold_file}") # TODO: Guam - this option will be used # Read pickle file and get the stages and flows dictionary for the site with open(threshold_file, 'rb') as f: @@ -690,9 +710,9 @@ def __load_thresholds(output_catfim_dir, threshold_url, lid, huc, threshold_file del flows['threshold_type'] del flows['huc'] - # Print out the stages and flows for debugging - FLOG.lprint(f"Stages for LID {lid}: {stages}") ## TEMP DEBUG - FLOG.lprint(f"Flows for LID {lid}: {flows}") ## TEMP DEBUG + # # Print out the stages and flows for debugging ## TEMP DEBUG + # FLOG.lprint(f"Stages for LID {lid}: {stages}") ## TEMP DEBUG + # FLOG.lprint(f"Flows for LID {lid}: {flows}") ## TEMP DEBUG # Count how many thresholds are not None in stages threshold_count = 1 ## TEMP DEBUG #sum(1 for key in stages if key in ['action', 'minor', 'moderate', 'major', 'record'] and stages[key] is not None) From 92739d76b7115c23d2eff1b814185a88db97f0a1 Mon Sep 17 00:00:00 2001 From: EmilyDeardorff-NOAA Date: Thu, 25 Sep 2025 18:52:27 +0000 Subject: [PATCH 03/13] Update status messaging for get_thresholds function. --- data/nws/preprocess_ahps_nws.py | 2 +- data/usgs/preprocess_ahps_usgs.py | 2 +- tools/catfim/generate_categorical_fim.py | 31 ++++++++++--------- .../catfim/generate_categorical_fim_flows.py | 12 +++---- tools/tools_shared_functions.py | 22 +++++++++---- 5 files changed, 41 insertions(+), 28 deletions(-) diff --git a/data/nws/preprocess_ahps_nws.py b/data/nws/preprocess_ahps_nws.py index 503048de2..42feaad45 100644 --- a/data/nws/preprocess_ahps_nws.py +++ b/data/nws/preprocess_ahps_nws.py @@ -110,7 +110,7 @@ def preprocess_nws(source_dir, destination, reference_raster): # In multiple instances a USGS ID is given but then no USGS rating curve or in some cases no USGS datum is supplied. select_by = 'nws_lid' selector = code - stages, flows, threshold_count = get_thresholds(threshold_url, select_by, selector, threshold='all') + stages, flows, ___ = get_thresholds(threshold_url, select_by, selector, threshold='all') # Make sure at least one valid threshold is supplied from WRDS. threshold_categories = ['action', 'minor', 'moderate', 'major'] diff --git a/data/usgs/preprocess_ahps_usgs.py b/data/usgs/preprocess_ahps_usgs.py index ff0e02221..d5b480218 100755 --- a/data/usgs/preprocess_ahps_usgs.py +++ b/data/usgs/preprocess_ahps_usgs.py @@ -231,7 +231,7 @@ def preprocess_usgs(source_dir, destination, reference_raster): select_by = 'nws_lid' selector = code - stages, flows, threshold_count = get_thresholds(threshold_url, select_by, selector, threshold='all') + stages, flows, ___ = get_thresholds(threshold_url, select_by, selector, threshold='all') # Make sure at least one valid threshold is supplied from WRDS. threshold_categories = ['action', 'minor', 'moderate', 'major'] diff --git a/tools/catfim/generate_categorical_fim.py b/tools/catfim/generate_categorical_fim.py index dfb58e8ff..3909e3e5c 100755 --- a/tools/catfim/generate_categorical_fim.py +++ b/tools/catfim/generate_categorical_fim.py @@ -713,20 +713,23 @@ def iterate_through_huc_stage_based( continue # Get thresholds from WRDS API or threshold file - thresholds, flows, threshold_count = __load_thresholds(output_catfim_dir, threshold_url, lid, huc, threshold_file) - - # TODO: Simplify these status messages - - # If no thresholds are found, write message and exit. - # Many sites that used to have 'Error getting thresholds from WRDS API' should now - # have this more descriptive status message - if threshold_count == 0: - msg = ':No thresholds found on WRDS API' - all_messages.append(lid + msg) - MP_LOG.warning(huc_lid_id + msg) - continue - - # If there are no thresholds but the threshold_count is greater than 0 or NA (unlikely). + thresholds, flows, status_msg = __load_thresholds(output_catfim_dir, threshold_url, lid, huc, threshold_file) + + MP_LOG.ltrace(status_msg) # TODO: Make this into a more specific site status? + # We need a status that says, 'API call worked but the correct thresholds were not available' + + # Sept 2025 - Removed threshold_count functionality for now. It had the intended functionality of differentiating API errors + # versus zero thresholds being available versus the incorrect thresholds being available, but I think + # it was actually confusing. + + # Count the number of stages available + # if threshold_count == 0: + # msg = ':No thresholds found on WRDS API' + # all_messages.append(lid + msg) + # MP_LOG.warning(huc_lid_id + msg) + # continue + + # If there are no thresholds. # write message and exit. if thresholds is None or len(thresholds) == 0: msg = ':Error getting thresholds from WRDS API' diff --git a/tools/catfim/generate_categorical_fim_flows.py b/tools/catfim/generate_categorical_fim_flows.py index d4dcae66c..eb630bef5 100755 --- a/tools/catfim/generate_categorical_fim_flows.py +++ b/tools/catfim/generate_categorical_fim_flows.py @@ -146,7 +146,9 @@ def generate_flows_for_huc( # Careful, for "all_message.append" the syntax into it must be f'{lid}: (whever messages) # this is gets parsed and logic used against it. - stages, flows, threshold_count = __load_thresholds(output_catfim_dir, threshold_url, lid, huc, threshold_file) + stages, flows, status_msg = __load_thresholds(output_catfim_dir, threshold_url, lid, huc, threshold_file) + + MP_LOG.trace(status_msg) # TODO: Guam - decide how I want this status communicated # Yes.. stages, we will handle missing flows later even though we don't use the stage here # TODO: Decide if this is still good status messaging, maybe update @@ -714,13 +716,11 @@ def __load_thresholds(output_catfim_dir, threshold_url, lid, huc, threshold_file # FLOG.lprint(f"Stages for LID {lid}: {stages}") ## TEMP DEBUG # FLOG.lprint(f"Flows for LID {lid}: {flows}") ## TEMP DEBUG - # Count how many thresholds are not None in stages - threshold_count = 1 ## TEMP DEBUG #sum(1 for key in stages if key in ['action', 'minor', 'moderate', 'major', 'record'] and stages[key] is not None) - # TODO: Guam - Decide what I want to do about the threshold count... remove or fix? + status_msg = 'Thresholds loaded from .pkl file.' else: # Get thresholds from the WRDS API - stages, flows, threshold_count = get_thresholds( + stages, flows, status_msg = get_thresholds( threshold_url=threshold_url, select_by='nws_lid', selector=lid, threshold='all' ) @@ -746,7 +746,7 @@ def __load_thresholds(output_catfim_dir, threshold_url, lid, huc, threshold_file # Will need to double check that the memory usage isn't insane with this new idea. - return stages, flows, threshold_count + return stages, flows, status_msg # local script calls __load_nwm_metadata so FLOG is already setup def __load_nwm_metadata(output_catfim_dir, metadata_url, nwm_us_search, nwm_ds_search, nwm_metafile): diff --git a/tools/tools_shared_functions.py b/tools/tools_shared_functions.py index 3525f849e..d98a2eb37 100755 --- a/tools/tools_shared_functions.py +++ b/tools/tools_shared_functions.py @@ -1058,14 +1058,17 @@ def get_thresholds(threshold_url, select_by, selector, threshold='all'): Dictionary of stages at each threshold. flows : DICT Dictionary of flows at each threshold. - threshold_count : INT - Number of thresholds available for the site. + status_msg : STR + Status of API call and data availability. ''' params = {} params['threshold'] = threshold url = f'{threshold_url}/{select_by}/{selector}' + # Initialize status message + status_msg = f"Selector: {selector}: " + # response = requests.get(url, params=params, verify=False) # Call the API @@ -1081,9 +1084,12 @@ def get_thresholds(threshold_url, select_by, selector, threshold='all'): if response.status_code == 200: thresholds_json = response.json() + # Get metadata thresholds_info = thresholds_json['value_set'] threshold_count = thresholds_json['_metrics']['threshold_count'] + status_msg += f"WRDS response sucessful. {threshold_count} threshold types available. " + # Initialize stages/flows dictionaries stages = {} flows = {} @@ -1104,6 +1110,8 @@ def get_thresholds(threshold_url, select_by, selector, threshold='all'): threshold_data = thresholds_info[0] # Get stages and flows for each threshold if threshold_data: + status_msg += "Thresholds available. " + stages = threshold_data['stage_values'] flows = threshold_data['calc_flow_values'] # Add source information to stages and flows. Flows source inside a nested dictionary. Remove key once source assigned to flows. @@ -1120,12 +1128,14 @@ def get_thresholds(threshold_url, select_by, selector, threshold='all'): flows['usgs_site_code'] = threshold_data.get('metadata').get('usgs_site_code') stages['units'] = threshold_data.get('metadata').get('stage_units') flows['units'] = threshold_data.get('metadata').get('calc_flow_units') - return stages, flows, threshold_count + return stages, flows, status_msg else: - print("WRDS response error: ") - + status_msg += "WRDS response error." + print(status_msg) + stages = None + flows = None -# print(response) + return stages, flows, status_msg ######################################################################## From e469cd78d2d15a45904d1e658057854247a1f486 Mon Sep 17 00:00:00 2001 From: EmilyDeardorff-NOAA Date: Thu, 25 Sep 2025 19:03:21 +0000 Subject: [PATCH 04/13] Update WRDS API call status messing for flow CatFIM. --- .../catfim/generate_categorical_fim_flows.py | 24 +++++++++++-------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/tools/catfim/generate_categorical_fim_flows.py b/tools/catfim/generate_categorical_fim_flows.py index eb630bef5..ebaf51084 100755 --- a/tools/catfim/generate_categorical_fim_flows.py +++ b/tools/catfim/generate_categorical_fim_flows.py @@ -148,17 +148,21 @@ def generate_flows_for_huc( stages, flows, status_msg = __load_thresholds(output_catfim_dir, threshold_url, lid, huc, threshold_file) - MP_LOG.trace(status_msg) # TODO: Guam - decide how I want this status communicated - - # Yes.. stages, we will handle missing flows later even though we don't use the stage here - # TODO: Decide if this is still good status messaging, maybe update - if stages is None or len(stages) == 0: - msg = ':Error getting stages values from WRDS API' - all_messages.append(lid + msg) - MP_LOG.warning(huc_lid_id + msg) - continue + MP_LOG.lprint(status_msg) # TEMP DEBUG + + # Update status if flows are not found + if flows is None or len(flows) == 0: # Changed to flows Sept' 25 + if "WRDS response sucessful." in status_msg: + msg = ':WRDS response sucessful but no flow values available' + all_messages.append(lid + msg) + MP_LOG.warning(huc_lid_id + msg) + continue - # MP_LOG.lprint(f"Thresholds for {huc_lid_id} are : {thresholds}") + else: + msg = ':Error getting flows values from WRDS API' + all_messages.append(lid + msg) + MP_LOG.warning(huc_lid_id + msg) + continue # Check if stages are supplied, if not write message and exit. if all(stages.get(category, None) is None for category in categories): From 671633436045b07e93c768f3e1bb1afd681536eb Mon Sep 17 00:00:00 2001 From: EmilyDeardorff-NOAA Date: Thu, 25 Sep 2025 19:32:46 +0000 Subject: [PATCH 05/13] Updated WRDS status messaging for stage-based. --- tools/catfim/generate_categorical_fim.py | 28 ++++++++++-------------- 1 file changed, 12 insertions(+), 16 deletions(-) diff --git a/tools/catfim/generate_categorical_fim.py b/tools/catfim/generate_categorical_fim.py index 3909e3e5c..4c12e3abd 100755 --- a/tools/catfim/generate_categorical_fim.py +++ b/tools/catfim/generate_categorical_fim.py @@ -715,27 +715,24 @@ def iterate_through_huc_stage_based( # Get thresholds from WRDS API or threshold file thresholds, flows, status_msg = __load_thresholds(output_catfim_dir, threshold_url, lid, huc, threshold_file) - MP_LOG.ltrace(status_msg) # TODO: Make this into a more specific site status? - # We need a status that says, 'API call worked but the correct thresholds were not available' + MP_LOG.ltrace(status_msg) # Sept 2025 - Removed threshold_count functionality for now. It had the intended functionality of differentiating API errors # versus zero thresholds being available versus the incorrect thresholds being available, but I think # it was actually confusing. - # Count the number of stages available - # if threshold_count == 0: - # msg = ':No thresholds found on WRDS API' - # all_messages.append(lid + msg) - # MP_LOG.warning(huc_lid_id + msg) - # continue - - # If there are no thresholds. - # write message and exit. + # Update status if stage thresholds are not found if thresholds is None or len(thresholds) == 0: - msg = ':Error getting thresholds from WRDS API' - all_messages.append(lid + msg) - MP_LOG.warning(huc_lid_id + msg) - continue + if "WRDS response sucessful." in status_msg: + msg = ':WRDS response sucessful but no stage values available' + all_messages.append(lid + msg) + MP_LOG.warning(huc_lid_id + msg) + continue + else: + msg = ':Error getting stage thresholds from WRDS API' + all_messages.append(lid + msg) + MP_LOG.warning(huc_lid_id + msg) + continue # Check if stages are supplied, if not write message and exit. # This message will occur if some thresholds are supplied, but not for the @@ -746,7 +743,6 @@ def iterate_through_huc_stage_based( MP_LOG.warning(huc_lid_id + msg) continue - # Read stage values and calculate thresholds # The error and warning message is already formatted correctly if applicable # Hold the warning_msg to the end From fa768129e29328554f09c25062d48526c5f9e327 Mon Sep 17 00:00:00 2001 From: EmilyDeardorff-NOAA Date: Mon, 29 Sep 2025 21:09:29 +0000 Subject: [PATCH 06/13] Add a function to predownload all threshold data. --- .../catfim/generate_categorical_fim_flows.py | 3 +- tools/tools_shared_functions.py | 113 ++++++++++++++++++ 2 files changed, 114 insertions(+), 2 deletions(-) diff --git a/tools/catfim/generate_categorical_fim_flows.py b/tools/catfim/generate_categorical_fim_flows.py index ebaf51084..1b26a2feb 100755 --- a/tools/catfim/generate_categorical_fim_flows.py +++ b/tools/catfim/generate_categorical_fim_flows.py @@ -157,7 +157,6 @@ def generate_flows_for_huc( all_messages.append(lid + msg) MP_LOG.warning(huc_lid_id + msg) continue - else: msg = ':Error getting flows values from WRDS API' all_messages.append(lid + msg) @@ -186,7 +185,7 @@ def generate_flows_for_huc( ) # Sept 2024: Should we skip these functions that are seen in stage based? Yes - # Flow doesn't need all of the evalation stuff + # Flow doesn't need all of the elevation stuff # acceptable_usgs_elev_df = __create_acceptable_usgs_elev_df(usgs_elev_df, huc_lid_id) # lid_usgs_elev, dem_eval_messages = __adj_dem_evalation_val( # lid_altitude = metadata['usgs_data']['altitude'] diff --git a/tools/tools_shared_functions.py b/tools/tools_shared_functions.py index d98a2eb37..6c5f8608e 100755 --- a/tools/tools_shared_functions.py +++ b/tools/tools_shared_functions.py @@ -5,6 +5,7 @@ import json import logging import os +import csv import pathlib import traceback from pathlib import Path @@ -12,6 +13,7 @@ import geopandas as gpd import numpy as np import pandas as pd +import pickle import rasterio import rasterio.crs import rasterio.shutil @@ -1137,6 +1139,117 @@ def get_thresholds(threshold_url, select_by, selector, threshold='all'): return stages, flows, status_msg +def download_all_thresholds(threshold_url, metadata_pkl_file, output_folder): + """ + Download all thresholds from the WRDS API for a list of LIDs and save them as CSV files. + Combine all CSV files into a single pickle file. + + This function can be run in a Jupyter notebook or as a standalone script in a Python environment + to predownload thresholds for all sites in the metadata pickle file. + + Parameters: + - threshold_url: str, URL of the WRDS API endpoint for thresholds. + - metadata_pkl_file: str, path to the metadata pickle file containing site information. + - output_folder: str, path to the folder where output files will be saved. + + Returns: + - None + + Outputs: + - CSV files for each LID in a subfolder 'threshold_download' within the output_folder. + - A combined pickle file 'all_thresholds.pkl' containing all thresholds. + """ + + # Check that the inputs exist + if not os.path.exists(metadata_pkl_file): + raise FileNotFoundError(f"Metadata pickle file not found: {metadata_pkl_file}") + + if not os.path.exists(output_folder): + raise FileNotFoundError(f"Output folder not found: {output_folder}") + + # Make an intermediate folder in the output folder + thresholds_folder = os.path.join(output_folder, 'threshold_download') + try: + os.makedirs(thresholds_folder, exist_ok=True) + except Exception as e: + raise OSError(f"Error creating directory {thresholds_folder}: {e}") + + # Get list of all LIDs and their corresponding HUC from the metadata.pkl file + with open(metadata_pkl_file, 'rb') as f: + metadata = pickle.load(f) + + print('Start threshold downloads.') + + lid_list = [] + huc_lid_dict = {} + + # Iterate through the metadata to get a list of LIDs and HUCs + for site_entry in metadata: + lid_i = site_entry['identifiers']['nws_lid'] + + huc_nws_i = site_entry['nws_preferred']['huc'] + huc_usgs_i = site_entry['usgs_preferred']['huc'] + + huc_i = huc_usgs_i if huc_nws_i is None else huc_nws_i + + lid_list.append(lid_i) + huc_lid_dict[lid_i] = huc_i + + print('HUC/LID list assembled.') + + # Iterate through LIDs in huc_lid_dict and get thresholds from the WRDS API + for lid, huc in huc_lid_dict.items(): + + try: + stages, flows, status = get_thresholds( + threshold_url=threshold_url, select_by='nws_lid', selector=lid, threshold='all' + ) + except Exception as e: + print(f"Error retrieving thresholds for LID {lid}: {e}") + print(status) + continue + + # Specify the CSV file name and path + csv_filename = f"thresholds_{lid.lower()}.csv" + csv_filepath = os.path.join(thresholds_folder, csv_filename) + + # Combine and label thresholds + thresholds = [{'threshold_type': 'stages', 'huc': huc, **stages}, + {'threshold_type': 'flows', 'huc': huc, **flows}] + + # Open the CSV file and write threshold data + with open(csv_filepath, mode='w', newline='') as file: + writer = csv.DictWriter(file, fieldnames=thresholds[1].keys()) + writer.writeheader() + writer.writerows(thresholds) + + output_pickle_path = os.path.join(thresholds_folder, 'all_thresholds.pkl') + + print('Saved threshold CSVs for sites.') + + + # Get all threshold CSVs + csv_files = glob.glob(os.path.join(thresholds_folder, "*.csv")) + + # Compile thresholds into a dataframe + all_dataframes = [] + for file in csv_files: + df = pd.read_csv(file) + all_dataframes.append(df) + os.remove(file) # Remove individual CSV after reading + + + thresholds_df = pd.concat(all_dataframes, ignore_index=True) + + # Save the combined DataFrame to a pickle file + try: + with open(output_pickle_path, 'wb') as f: + pickle.dump(thresholds_df, f) + print(f"Successfully combined {len(csv_files)} CSVs into {output_pickle_path}") + except Exception as e: + print(f"Error saving pickle file {output_pickle_path}: {e}") + + print('Thresholds compilation complete.') ######################################################################## # Function to write flow file From b9f5a4e238973b66c3c966891dc5ea79b02b6b84 Mon Sep 17 00:00:00 2001 From: EmilyDeardorff-NOAA Date: Tue, 7 Oct 2025 23:11:54 +0000 Subject: [PATCH 07/13] Update tools. --- tools/tools_shared_functions.py | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/tools/tools_shared_functions.py b/tools/tools_shared_functions.py index 6c5f8608e..6a7e37743 100755 --- a/tools/tools_shared_functions.py +++ b/tools/tools_shared_functions.py @@ -6,6 +6,7 @@ import logging import os import csv +import glob import pathlib import traceback from pathlib import Path @@ -1177,9 +1178,9 @@ def download_all_thresholds(threshold_url, metadata_pkl_file, output_folder): # Get list of all LIDs and their corresponding HUC from the metadata.pkl file with open(metadata_pkl_file, 'rb') as f: metadata = pickle.load(f) - - print('Start threshold downloads.') + print('Assembling HUC/LID list from metadata file.') + lid_list = [] huc_lid_dict = {} @@ -1195,11 +1196,11 @@ def download_all_thresholds(threshold_url, metadata_pkl_file, output_folder): lid_list.append(lid_i) huc_lid_dict[lid_i] = huc_i - print('HUC/LID list assembled.') + print('Start threshold downloads.') + print(f'Total sites to download: {len(lid_list)}') # Iterate through LIDs in huc_lid_dict and get thresholds from the WRDS API for lid, huc in huc_lid_dict.items(): - try: stages, flows, status = get_thresholds( threshold_url=threshold_url, select_by='nws_lid', selector=lid, threshold='all' @@ -1223,10 +1224,9 @@ def download_all_thresholds(threshold_url, metadata_pkl_file, output_folder): writer.writeheader() writer.writerows(thresholds) - output_pickle_path = os.path.join(thresholds_folder, 'all_thresholds.pkl') - print('Saved threshold CSVs for sites.') + output_pickle_path = os.path.join(thresholds_folder, 'all_thresholds.pkl') # Get all threshold CSVs csv_files = glob.glob(os.path.join(thresholds_folder, "*.csv")) @@ -1238,7 +1238,6 @@ def download_all_thresholds(threshold_url, metadata_pkl_file, output_folder): all_dataframes.append(df) os.remove(file) # Remove individual CSV after reading - thresholds_df = pd.concat(all_dataframes, ignore_index=True) # Save the combined DataFrame to a pickle file From ee0042a7ee07f4bbe3fd10530670353b1bbefe20 Mon Sep 17 00:00:00 2001 From: EmilyDeardorff-NOAA Date: Thu, 9 Oct 2025 19:46:43 +0000 Subject: [PATCH 08/13] Adjust CatFIM to get Guam processing correct. --- tools/catfim/generate_categorical_fim.py | 132 ++++++++++-------- .../catfim/generate_categorical_fim_flows.py | 20 ++- 2 files changed, 84 insertions(+), 68 deletions(-) diff --git a/tools/catfim/generate_categorical_fim.py b/tools/catfim/generate_categorical_fim.py index 4c12e3abd..e5af3e5d4 100755 --- a/tools/catfim/generate_categorical_fim.py +++ b/tools/catfim/generate_categorical_fim.py @@ -210,8 +210,11 @@ def process_generate_categorical_fim( ' is a valid matching HUC.' ) + # Set default data source to WRDS + data_source = 'WRDS' # ================================ + if threshold_file != "": if os.path.exists(threshold_file) == False: raise Exception("The threshold input file can not be found. Please remove or fix pathing.") @@ -226,8 +229,11 @@ def process_generate_categorical_fim( loaded_data = pickle.load(f) hucs = loaded_data['huc'].unique().tolist() - threshold_hucs= [str(num).zfill(8) for num in hucs] + threshold_hucs= [str(num).zfill(8) for num in hucs] + # Get the source (since it might be Manual_Input) + data_source = loaded_data['source'].tolist()[0] + # If a HUC list is specified, check that the HUCs in the list are also in the threshold file if 'all' not in lst_hucs: missing_hucs = [huc for huc in valid_ahps_hucs if huc not in threshold_hucs] @@ -277,6 +283,8 @@ def process_generate_categorical_fim( if threshold_file != "": FLOG.lprint(f'Threshold file has data for {len(threshold_hucs)} HUC(s)') + FLOG.lprint(f'Data source: {data_source}') # TEMP DEBUG + # For API usage load_dotenv(env_file) API_BASE_URL = os.getenv('API_BASE_URL') @@ -293,8 +301,6 @@ def process_generate_categorical_fim( if not os.path.exists(fim_inputs_csv_path): raise ValueError(f"{fim_inputs_csv_path} not found. Verify that you have the correct input files.") - # print() - # FLOG.lprint("Filtering out HUCs that do not have related ahps site in them.") # valid_ahps_hucs = __filter_hucs_to_ahps(lst_hucs) @@ -333,6 +339,7 @@ def process_generate_categorical_fim( nwm_metafile, df_restricted_sites, threshold_file, + data_source, ) else: FLOG.lprint("generate_stage_based_categorical_fim step skipped") @@ -584,6 +591,7 @@ def iterate_through_huc_stage_based( child_log_file_prefix, progress_stmt, threshold_file, + data_source, ): """_summary_ This and its children will create stage based tifs and catfim data based on a huc @@ -627,7 +635,8 @@ def iterate_through_huc_stage_based( # -- If necessary files exist, continue -- # # Yes, each lid gets a record no matter what, so we need some of these messages duplicated # per lid record - if not os.path.exists(usgs_elev_table): # TODO: Guam - Will we need to add Guam to this data table? Or include this data in our manual input? + + if data_source != 'Manual_Input' and not os.path.exists(usgs_elev_table): msg = ":Internal Error: Missing key data from HUC record (usgs_elev_table missing)" # all_messages.append(huc + msg) MP_LOG.warning(huc + msg) @@ -642,7 +651,8 @@ def iterate_through_huc_stage_based( categories = ['action', 'minor', 'moderate', 'major', 'record'] if skip_lid_process == False: # else skip to message processing - usgs_elev_df = pd.read_csv(usgs_elev_table) + if data_source != 'Manual_Input': # Manual input data does not need usgs_elev_table + usgs_elev_df = pd.read_csv(usgs_elev_table) df_cols = { "nws_lid": pd.Series(dtype='str'), @@ -694,8 +704,6 @@ def iterate_through_huc_stage_based( found_restrict_lid = df_restricted_sites.loc[df_restricted_sites['nws_lid'] == lid.upper()] - # print(found_restrict_lid) - # Assume only one rec for now, fix later if len(found_restrict_lid) > 0: reason = found_restrict_lid.iloc[ @@ -714,13 +722,8 @@ def iterate_through_huc_stage_based( # Get thresholds from WRDS API or threshold file thresholds, flows, status_msg = __load_thresholds(output_catfim_dir, threshold_url, lid, huc, threshold_file) + MP_LOG.trace(status_msg) - MP_LOG.ltrace(status_msg) - - # Sept 2025 - Removed threshold_count functionality for now. It had the intended functionality of differentiating API errors - # versus zero thresholds being available versus the incorrect thresholds being available, but I think - # it was actually confusing. - # Update status if stage thresholds are not found if thresholds is None or len(thresholds) == 0: if "WRDS response sucessful." in status_msg: @@ -756,29 +759,6 @@ def iterate_through_huc_stage_based( MP_LOG.warning(huc_lid_id + err_msg) continue - # Look for acceptable elevs - acceptable_usgs_elev_df = __create_acceptable_usgs_elev_df(usgs_elev_df, huc_lid_id) - # TODO: Guam - Do we need to add Guam to this data? Or have it read in this data from the manual inputs? - - if acceptable_usgs_elev_df is None or len(acceptable_usgs_elev_df) == 0: - msg = ":Unable to find gage data" # TODO: USGS Gage Method: Update this error message to be more descriptive - all_messages.append(lid + msg) - MP_LOG.warning(huc_lid_id + msg) - continue - - # TODO: Guam - Do we need to add Guam to this data? Or have it read in this data from the manual inputs? - # Get the dem_adj_elevation value from usgs_elev_table.csv. - # Prioritize the value that is not from branch 0. - lid_usgs_elev, dem_eval_messages = __adj_dem_evalation_val( - acceptable_usgs_elev_df, lid, huc_lid_id - ) - all_messages = all_messages + dem_eval_messages - if len(dem_eval_messages) > 0: - continue - - # Initialize nested dict for lid attributes - stage_based_att_dict.update({lid: {}}) - # Find lid metadata from master list of metadata dictionaries. metadata = next( (item for item in all_lists if item['identifiers']['nws_lid'] == lid.upper()), False @@ -790,27 +770,63 @@ def iterate_through_huc_stage_based( MP_LOG.warning(huc_lid_id + msg) continue - # Filter out sites that don't have "good" data ## TODO: USGS Gage Method: It doens't seem like the below error messages are performing as expected.... - try: - if not metadata['usgs_data']['alt_method_code'] in acceptable_alt_meth_code_list: - MP_LOG.warning(f"{huc_lid_id}: Not in acceptable alt method codes") + if data_source != 'Manual_Input': # Check elevation data if not manual input + + # Look for acceptable elevations + acceptable_usgs_elev_df = __create_acceptable_usgs_elev_df(usgs_elev_df, huc_lid_id) + + if acceptable_usgs_elev_df is None or len(acceptable_usgs_elev_df) == 0: + msg = ":Unable to find gage data" # TODO: USGS Gage Method: Update this error message to be more descriptive + all_messages.append(lid + msg) + MP_LOG.warning(huc_lid_id + msg) continue - if not metadata['usgs_data']['site_type'] in acceptable_site_type_list: - MP_LOG.warning(f"{huc_lid_id}: Not in acceptable site type codes") + + # Get the dem_adj_elevation value from usgs_elev_table.csv. + # Prioritize the value that is not from branch 0. + lid_usgs_elev, dem_eval_messages = __adj_dem_evalation_val( + acceptable_usgs_elev_df, lid, huc_lid_id + ) + all_messages = all_messages + dem_eval_messages + if len(dem_eval_messages) > 0: continue - if not float(metadata['usgs_data']['alt_accuracy_code']) <= acceptable_alt_acc_thresh: - MP_LOG.warning(f"{huc_lid_id}: Not in acceptable threshold range") + + # Filter out sites that don't have "good" data ## TODO: USGS Gage Method: It doens't seem like the below error messages are performing as expected.... + try: + if not metadata['usgs_data']['alt_method_code'] in acceptable_alt_meth_code_list: + MP_LOG.warning(f"{huc_lid_id}: Not in acceptable alt method codes") + continue + if not metadata['usgs_data']['site_type'] in acceptable_site_type_list: + MP_LOG.warning(f"{huc_lid_id}: Not in acceptable site type codes") + continue + if not float(metadata['usgs_data']['alt_accuracy_code']) <= acceptable_alt_acc_thresh: + MP_LOG.warning(f"{huc_lid_id}: Not in acceptable threshold range") + continue + except Exception: + MP_LOG.error(f"{huc_lid_id}: Filtering out 'bad' data in the usgs data") + MP_LOG.error(traceback.format_exc()) continue - except Exception: - MP_LOG.error(f"{huc_lid_id}: Filtering out 'bad' data in the usgs data") - MP_LOG.error(traceback.format_exc()) - continue - datum_adj_ft, datum_messages = __adjust_datum_ft(flows, metadata, lid, huc_lid_id) - all_messages = all_messages + datum_messages - if datum_adj_ft is None: - MP_LOG.warning(f"{huc_lid_id}: datum_adj_ft is None") - continue + ## TODO: Guam - not running this for Guam, may need to adjust if results are bad + # Adjust datum of HAND grid based on elevation data from usgs_elev_table.csv. + datum_adj_ft, datum_messages = __adjust_datum_ft(flows, metadata, lid, huc_lid_id) + all_messages = all_messages + datum_messages + if datum_adj_ft is None: + MP_LOG.warning(f"{huc_lid_id}: datum_adj_ft is None") + continue + + else: # if source is manual input, we skip the above elevation filtering + MP_LOG.lprint(f"{huc_lid_id}: Skipping elevation checks and datum adjustment for Manual Input source") + + lid_altitude = float(lid_altitude) # LID altitude is expected to be in meters + lid_usgs_elev = (lid_altitude * 0.3048) + # lid_altitude is now in meters to match non-manual input units # TODO: Automate conversion? + + datum_adj_ft = 0 # no datum adjustment for manual input # TODO: Confirm that this is correct + + ## TODO: Guam - is there anything else we need to do to the altitude for manual input? + + # Initialize nested dict for lid attributes + stage_based_att_dict.update({lid: {}}) # Get mainstem segments of LID by intersecting LID segments with known mainstem segments. unfiltered_segments = list(set(get_nwm_segs(metadata))) @@ -829,7 +845,8 @@ def iterate_through_huc_stage_based( continue # Check for large discrepancies between the elevation values from WRDS and HAND. - # Otherwise this causes bad mapping. + # Otherwise this causes bad mapping. + # Manual_Input will have no elev disparity because it's from the the same value. elevation_diff = lid_usgs_elev - (lid_altitude * 0.3048) diff_rounded = round(elevation_diff, 2) @@ -860,9 +877,6 @@ def iterate_through_huc_stage_based( # For each flood category / magnitude MP_LOG.lprint(f"{huc_lid_id}: About to process flood categories") - # child_log_file_prefix_tifs = MP_LOG.MP_calc_prefix_name( - # MP_LOG.LOG_FILE_PATH, child_log_file_prefix + "_MP_tifs" - # ) # Make mapping lid_directory. mapping_lid_directory = os.path.join(mapping_huc_directory, lid) @@ -1392,6 +1406,8 @@ def load_restricted_sites(is_stage_based): def __adjust_datum_ft(flows, metadata, lid, huc_lid_id): + # The purpose of this function is to determine the vertical datum adjustment + # to convert the datum of the rating curve to NAVD88. # TODO: Aug 2024: This whole parts needs revisiting. Lots of lid data has changed and this # is all likely very old. @@ -1674,6 +1690,7 @@ def generate_stage_based_categorical_fim( nwm_metafile, df_restricted_sites, threshold_file, + data_source, ): ''' Sep 2024, @@ -1763,6 +1780,7 @@ def generate_stage_based_categorical_fim( child_log_file_prefix, progress_stmt, threshold_file, + data_source, ) huc_index += 1 diff --git a/tools/catfim/generate_categorical_fim_flows.py b/tools/catfim/generate_categorical_fim_flows.py index 1b26a2feb..bc97c4785 100755 --- a/tools/catfim/generate_categorical_fim_flows.py +++ b/tools/catfim/generate_categorical_fim_flows.py @@ -433,11 +433,8 @@ def generate_flows( API_BASE_URL, WBD_LAYER = get_env_paths(env_file) nwm_us_search = int(nwm_us_search) nwm_ds_search = int(nwm_ds_search) - # metadata_url = f'{API_BASE_URL}/metadata' ## TEMP DEBUG - TODO: Guam - add back in after testing - # threshold_url = f'{API_BASE_URL}/nws_threshold' ## TEMP DEBUG - TODO: Guam - add back in after testing - - metadata_url = f'{API_BASE_URL}/metadataBROKEN' ## TEMP DEBUG - threshold_url = f'{API_BASE_URL}/nws_thresholdBROKEN' ## TEMP DEBUG + metadata_url = f'{API_BASE_URL}/metadata' + threshold_url = f'{API_BASE_URL}/nws_threshold' ################################################################### # Create HUC message directory to store messages that will be read and joined after multiprocessing @@ -450,10 +447,10 @@ def generate_flows( # Open NWM flows geopackages # TODO: Pull from bash_variables.env once we switch from using catfim.env to bash_variables.env - nwm_flows_gpkg = r'/data/inputs/nwm_hydrofabric/nwm_flows.gpkg' # TODO: Get from bash_variables.env - nwm_flows_alaska_gpkg = r'/data/inputs/nwm_hydrofabric/nwm_flows_alaska_nwmV3_ID.gpkg' # TODO: Get from bash_variables.env - input_nhd_flows_Guam = r'/data/inputs/nhdplus/Guam_6637/NHDFlowline_Guam_6637.gpkg' # TODO: Get from bash_variables.env - input_nhd_flows_AmericanSamoa = r'/data/inputs/nhdplus/AmericanSamoa_32702/NHDFlowline_AmericanSamoa_32702.gpkg' # TODO: Get from bash_variables.env + nwm_flows_gpkg = r'/data/inputs/nwm_hydrofabric/nwm_flows.gpkg' + nwm_flows_alaska_gpkg = r'/data/inputs/nwm_hydrofabric/nwm_flows_alaska_nwmV3_ID.gpkg' + input_nhd_flows_Guam = r'/home/emily.deardorff/projects/catfim_guam/input_data/NHDFlowline_Guam_6637.gpkg' # TODO: GUAM - move to correct inputs folder + input_nhd_flows_AmericanSamoa = r'/home/emily.deardorff/projects/catfim_guam/input_data/NHDFlowline_AmericanSamoa_32702.gpkg' # TODO: GUAM - move to correct inputs folder nwm_flows_df = gpd.read_file(nwm_flows_gpkg) nwm_flows_alaska_df = gpd.read_file(nwm_flows_alaska_gpkg) @@ -469,7 +466,6 @@ def generate_flows( 'nhd_flows_guam_df': nhd_flows_guam_df, 'nhd_flows_americansamoa_df': nhd_flows_americansamoa_df } - # nwm_metafile might be an empty string # maybe ensure all projections are changed to one standard output of 3857 (see shared_variables) as the come out @@ -501,7 +497,9 @@ def generate_flows( # Drop list fields if invalid out_gdf = out_gdf.drop(['downstream_nwm_features'], axis=1, errors='ignore') out_gdf = out_gdf.drop(['upstream_nwm_features'], axis=1, errors='ignore') - out_gdf = out_gdf.astype({'metadata_sources': str}) + + if 'metadata_sources' in out_gdf.columns: # TODO: Is this column needed/used? Changed to accomodate Guam + out_gdf = out_gdf.astype({'metadata_sources': str}) FLOG.lprint("+++++++++++++") FLOG.lprint("Start Flow Generation") From 24c427fb5fc35b590fa326f519959f90992723d3 Mon Sep 17 00:00:00 2001 From: EmilyDeardorff-NOAA Date: Thu, 16 Oct 2025 22:39:59 +0000 Subject: [PATCH 09/13] Add docstrings to all functions. --- tools/catfim/generate_categorical_fim.py | 504 ++++++++++++++---- .../catfim/generate_categorical_fim_flows.py | 212 ++++++-- .../generate_categorical_fim_mapping.py | 368 +++++++++++-- 3 files changed, 898 insertions(+), 186 deletions(-) diff --git a/tools/catfim/generate_categorical_fim.py b/tools/catfim/generate_categorical_fim.py index e5af3e5d4..e5aca8094 100755 --- a/tools/catfim/generate_categorical_fim.py +++ b/tools/catfim/generate_categorical_fim.py @@ -29,7 +29,6 @@ filter_nwm_segments_by_stream_order, get_datum, get_nwm_segs, - get_thresholds, ngvd_to_navd_ft, ) from tools_shared_variables import ( @@ -43,11 +42,6 @@ import utils.fim_logger as fl from utils.shared_variables import VIZ_PROJECTION - -# from itertools import repeat -# from pathlib import Path - - # global RLOG FLOG = fl.FIM_logger() # the non mp version MP_LOG = fl.FIM_logger() # the Multi Proc version @@ -55,18 +49,21 @@ gpd.options.io_engine = "pyogrio" -# TODO: Aug 2024: This script was upgraded significantly with lots of misc TODO's embedded. -# Lots of inline documenation needs updating as well - - """ -Jun 17, 2024 +Jun 2024 This system is continuing to mature over time. It has a number of optimizations that can still be applied in areas such as logic, performance and error handling. In the interium there is still a consider amount of debug lines and tools embedded in that can be commented on/off as required. +Aug 2024 +This script was upgraded significantly with lots of misc TODO's embedded. +Lots of inline documenation needs updating as well. + +Oct 2025 +Doc strings and improved documentation was added. + NOTE: For now.. all logs roll up to the parent log file. ie) catfim_2024_07_09-22-20-12.log This creates a VERY large final log file, but the warnings and errors file should be manageable. @@ -94,13 +91,80 @@ def process_generate_categorical_fim( nwm_metafile, threshold_file, ): - - # ================================ - # Step System - # This system allows us to to skip steps. - # Steps that are skipped are assumed to have the valid files that are needed - # When a number is submitted, ie) 2, it means skip steps 1 and start at 2 ''' + Orchestrates the generation of CatFIM products for a set of Hydrologic Unit Codes (HUCs), + supporting both stage-based and flow-based methodologies. Handles validation, setup, filtering, and multi-step processing + including flow generation, mapping, post-processing, and status updates. + + Parameters + ---------- + fim_run_dir : str + Path to the FIM run directory containing HUC folders and input files. + env_file : str + Path to the .env file containing API and environment configuration. + job_number_huc : int + Number of parallel jobs to use for HUC-level processing. + job_number_inundate : int + Number of parallel jobs to use for inundation-level processing. + is_stage_based : bool + If True, runs stage-based CatFIM workflow; otherwise, runs flow-based workflow. + output_folder : str + Base output folder for CatFIM results. + overwrite : bool + If True, allows overwriting existing output files and folders. + search : int or float + Upstream and downstream search distance in miles for site selection. + lst_hucs : str + Space-separated list of HUCs to process, or 'all' to process all available HUCs. + catfim_version : str + CatFIM version string (e.g., '1.0'). + model_version : str + HAND model version string (e.g., '2.1.5.2'). + job_number_intervals : int + Number of parallel jobs for interval-based processing. + past_major_interval_cap : int + Cap for major interval processing (used in stage-based workflow). + step_num : int + Step number to start processing from (optional, allows skipping earlier steps). + nwm_metafile : str + Path to the NWM metadata pickle file (optional, defaults to "" if not included). + threshold_file : str + Path to the threshold pickle file for manual input thresholds (optional, defaults to "" if not included). + + Raises + ------ + Exception + If required files or directories are missing or invalid. + ValueError + If input parameters are inconsistent or result in zero valid HUCs. + + Returns + ------- + None + Results are written to output directories and files; function does not return a value. + + Workflow Steps + ------------- + 1. Validation and setup of input directories, files, and parameters. + 2. Filtering and selection of valid HUCs based on input and threshold files. + 3. Stage-based or flow-based CatFIM processing, including flow generation, mapping, and post-processing. + 4. Compilation of threshold data and cleanup of intermediate files. + 5. Updating mapping status for processed sites. + 6. Logging of progress, warnings, and summary information. + + Notes + ----- + - Supports skipping steps via the `step_num` parameter. + - Handles both manual and automated threshold input via `threshold_file`. + - Uses environment variables for API access and configuration. + - Designed for parallel processing and scalable workflows. + + Step System + ----------- + This system allows us to to skip steps. + Steps that are skipped are assumed to have the valid files that are needed + When a number is submitted, ie) 2, it means skip steps 1 and start at 2 + Step number usage: 0 = cover all (it is changed to 999 so all steps are covered) flow: @@ -111,6 +175,7 @@ def process_generate_categorical_fim( 1 = start at generate_flows and tifs 2 = start at creation of gpkgs 3 = start at update mapping status + ''' # ================================ @@ -467,12 +532,28 @@ def process_generate_categorical_fim( def get_list_ahps_with_library_gpkgs(output_mapping_dir): + ''' + Used in both stage- and flow-based CatFIM. + + Scans the specified output mapping directory for GeoPackage (.gpkg) files within the 'gpkg' subdirectory, + extracts unique AHPS IDs from the filenames, and returns a list of these IDs. + + The function assumes that the AHPS ID is the second segment in the filename when split by underscores. + Only files with at least two underscore-separated segments in their names are considered. + + Args: + output_mapping_dir (str): Path to the directory containing the 'gpkg' subdirectory with .gpkg files. + + Returns: + ahps_ids_with_gpkgs (list): A list of unique AHPS IDs (as strings) extracted from the .gpkg filenames. + + Used to check whether AHPS LID is 5 characters, but no longer does (as of Aug '25) + because LID lengths above 5 characters are probably invalid but we are not checking that here. + ''' - # as it is a set it will dig out unique files ahps_ids_with_gpkgs = [] - # gpkg_file_names = file_pattern = os.path.join(output_mapping_dir, "gpkg") + '/*.gpkg' - # print(file_pattern) + for file_path in glob.glob(file_pattern): file_name = os.path.basename(file_path) file_name_segs = file_name.split("_") @@ -480,25 +561,45 @@ def get_list_ahps_with_library_gpkgs(output_mapping_dir): continue ahps_id = file_name_segs[1] - # if len(ahps_id) == 5: 7/17/25 - Removed this logic - # because LID lengths above 5 characters are probably - # invalid but we are not checking that here. - if ahps_id not in ahps_ids_with_gpkgs: ahps_ids_with_gpkgs.append(ahps_id) return ahps_ids_with_gpkgs -# This is used by both Stage Based and Flow Based def update_sites_mapping_status(output_mapping_dir, catfim_sites_file_path, catfim_version, model_version): ''' - Overview: - - Gets a list of valid ahps that have at least one gkpg file. If we have at least one, then the site mapped something - - We use that list compared to the original sites gpkg (or csv) file name to update the rows for the mapped column - By this point, most shoudl have had status messages until something failed in inundation or creating the gpkg. - - We also use a convention where if a status messsage starts with ---, then remove the ---. It is reserved for showing - that some magnitudes exists and some failed. + Used in both stage- and flow-based CatFIM. + + Updates the mapping status and status messages for CatFIM sites based on the presence of valid inundation GeoPackage files. + + This function reads a GeoPackage or CSV file containing CatFIM site information, checks which sites have valid inundation + mapping outputs, and updates the 'mapped' and 'status' columns accordingly. Gets a list of valid ahps that have at least + one gkpg file. If we have at least one, then the site mapped something. + + It also adds 'model_version' and 'product_version' columns, and saves the updated data back to the original file and as a CSV. + + By this point, most should have had status messages until something failed in inundation or creating the gpkg. + + Args: + output_mapping_dir (str): Directory containing the output mapping files, including inundation GeoPackages. + catfim_sites_file_path (str): Path to the CatFIM sites GeoPackage or CSV file to be updated. + catfim_version (str): The product version string to be recorded in the output. + model_version (str): The model version string to be recorded in the output. + + Raises: + SystemExit: If the input sites file does not exist, is empty, or no valid inundation files are found. + + Side Effects: + - Updates the 'mapped' and 'status' columns in the input sites file. + - Adds 'model_version' and 'product_version' columns. + - Saves the updated file in both GeoPackage and CSV formats. + - Logs critical errors and warnings using FLOG. + + Notes: + - Sites without valid inundation files are marked as 'mapped' = 'no' and their status is updated. + - Sites with valid inundation files are marked as 'mapped' = 'yes' and their status is set to 'Good' if empty. + - If a status message starts with "---", it is removed to indicate a warning rather than an error. ''' # Import geopackage output from flows creation @@ -516,7 +617,6 @@ def update_sites_mapping_status(output_mapping_dir, catfim_sites_file_path, catf sys.exit(1) try: - valid_ahps_ids = get_list_ahps_with_library_gpkgs(output_mapping_dir) if len(valid_ahps_ids) == 0: FLOG.critical(f"No valid ahps gpkg files found in {output_mapping_dir}/gpkg") @@ -574,7 +674,6 @@ def update_sites_mapping_status(output_mapping_dir, catfim_sites_file_path, catf # This is always called as part of Multi-processing so uses MP_LOG variable and # creates it's own logging object. -# This does flow files and mapping in the same function by HUC def iterate_through_huc_stage_based( output_catfim_dir, huc, @@ -593,9 +692,72 @@ def iterate_through_huc_stage_based( threshold_file, data_source, ): - """_summary_ - This and its children will create stage based tifs and catfim data based on a huc - """ + ''' + Processes a single HUC to generate stage-based CatFIM. + + The function iterates through all NWS LIDs (locations) within the HUC, performing data validation, + threshold extraction, elevation adjustment, and mapping for each flood category and interval. + It handles logging, error reporting, and output file generation for each site. + + Does flow files and mapping in the same function by HUC. + + Parameters + ---------- + output_catfim_dir : str + Directory where CatFIM outputs will be saved. + huc : str + Hydrologic Unit Code to process. + fim_dir : str + Directory containing FIM input data for the HUC. + huc_dictionary : dict + Dictionary mapping HUCs to lists of NWS LIDs. + threshold_url : str + URL for WRDS API to fetch flood stage thresholds. + all_lists : list + List of metadata dictionaries for all sites. + past_major_interval_cap : int or float + Maximum interval value for stages past 'major' category. + job_number_inundate : int + Number of parallel jobs for inundation mapping. + job_number_intervals : int + Number of parallel jobs for interval mapping. + nwm_flows_region_df : pandas.DataFrame + DataFrame containing NWM flow data for the region. + df_restricted_sites : pandas.DataFrame + DataFrame listing restricted NWS LIDs and reasons. + parent_log_output_file : str + Path to the parent log file for multiprocessing logging. + child_log_file_prefix : str + Prefix for child log files in multiprocessing. + progress_stmt : str + Statement describing current progress for logging. + threshold_file : str + Path to local threshold file (if not using WRDS API). + data_source : str + Source of input data ('Manual_Input' or other). + + Returns + ------- + None + + Side Effects + ------------ + - Creates output directories and files for mapping and attributes. + - Writes log messages and error reports to log files. + - Generates stage-based and interval-based inundation TIFFs. + - Exports site attribute CSV files for each processed LID. + - Writes status and error messages to a HUC-specific messages file. + + Exceptions + ---------- + - Handles and logs exceptions during processing, continuing to next site or exiting on critical errors. + + Notes + ----- + - This function is designed to be multiprocessing-safe and may be called within a multiprocessing context. + - Extensive logging is performed for debugging and status tracking. + - Sites with missing or invalid data are skipped, and reasons are logged. + ''' try: # This is setting up logging for this function to go up to the parent @@ -770,7 +932,8 @@ def iterate_through_huc_stage_based( MP_LOG.warning(huc_lid_id + msg) continue - if data_source != 'Manual_Input': # Check elevation data if not manual input + # If not manual input, check elevation data and get datum adjustment + if data_source != 'Manual_Input': # Look for acceptable elevations acceptable_usgs_elev_df = __create_acceptable_usgs_elev_df(usgs_elev_df, huc_lid_id) @@ -790,7 +953,8 @@ def iterate_through_huc_stage_based( if len(dem_eval_messages) > 0: continue - # Filter out sites that don't have "good" data ## TODO: USGS Gage Method: It doens't seem like the below error messages are performing as expected.... + # Filter out sites that don't have "good" data + # TODO: USGS Gage Method: It doens't seem like the below error messages are performing as expected.... try: if not metadata['usgs_data']['alt_method_code'] in acceptable_alt_meth_code_list: MP_LOG.warning(f"{huc_lid_id}: Not in acceptable alt method codes") @@ -806,7 +970,6 @@ def iterate_through_huc_stage_based( MP_LOG.error(traceback.format_exc()) continue - ## TODO: Guam - not running this for Guam, may need to adjust if results are bad # Adjust datum of HAND grid based on elevation data from usgs_elev_table.csv. datum_adj_ft, datum_messages = __adjust_datum_ft(flows, metadata, lid, huc_lid_id) all_messages = all_messages + datum_messages @@ -818,12 +981,10 @@ def iterate_through_huc_stage_based( MP_LOG.lprint(f"{huc_lid_id}: Skipping elevation checks and datum adjustment for Manual Input source") lid_altitude = float(lid_altitude) # LID altitude is expected to be in meters - lid_usgs_elev = (lid_altitude * 0.3048) - # lid_altitude is now in meters to match non-manual input units # TODO: Automate conversion? - - datum_adj_ft = 0 # no datum adjustment for manual input # TODO: Confirm that this is correct + lid_usgs_elev = (lid_altitude * 0.3048) # lid_altitude is now in meters to match non-manual input units + # TODO: Automate conversion? - ## TODO: Guam - is there anything else we need to do to the altitude for manual input? + datum_adj_ft = 0 # no datum adjustment for manual input # Initialize nested dict for lid attributes stage_based_att_dict.update({lid: {}}) @@ -1197,26 +1358,32 @@ def iterate_through_huc_stage_based( def __calc_stage_values(categories, thresholds): ''' - Overview: - Calculates values for each of the five stages. Any that do not have a threshold or have invalid values. - Parameters: - - categories: all 5 of the names ("action", "moderate" etc) - - thresholds: values from WRDS for the stages (anywhere from 0 to 5 stages) - - huc_lid_id: a string of the huc and lid values just for logging or display (not logic) - - returns: - - a dataframe with rows for each five stages, defaulted to -1. - - a list of categories names with valid values - - A warning message if some stages are missing values (stages inc record) - - An error message if one exists (including if all five stages are invalid) + Used in stage-based CatFIM. + + Calculates stage values for flood categories based on provided thresholds. + + Args: + categories (list): List of stage names (e.g., "action", "minor", "moderate", "major", "record"). + thresholds (dict): Dictionary mapping stage names to their threshold values (anywhere from 0 to 5 stages). + + Returns: + stage_values_df (pandas.DataFrame): DataFrame with rows for each stage and + their corresponding values (defaulted to -1 if missing or invalid). + valid_stage_names (list): List of stage names with valid threshold values. + warning_msg (str): Warning message if some stages are missing valid values. + err_msg (str): Error message if all stages are missing or invalid. + + Notes: + - Stages with missing or invalid threshold values are assigned -1. + - If all five stages are invalid, returns None for the DataFrame and an error message. + - Warning messages are formatted with "---" to indicate missing stage data. + ''' + # Set default values err_msg = "" warning_msg = "" - - # default values default_stage_data = [['action', -1], ['minor', -1], ['moderate', -1], ['major', -1], ['record', -1]] - valid_stage_names = [] # Setting up a default df (not counting record) @@ -1253,14 +1420,26 @@ def __calc_stage_values(categories, thresholds): def __calc_stage_intervals(non_rec_stage_values_df, past_major_interval_cap, huc_lid_id): ''' - Return: - interval_recs is a list of list [["action", 21], ["action", 22],....] - This represents stage names and depths to inundate against to generate all interval recs we need + Used in stage-based CatFIM. + + Calculate stage intervals for inundation mapping based on non-recurrent stage values. + This function generates a list of intervals between stage values, rounding up to the next whole number + where necessary, and ensures that intervals are unique and in order. For each stage, it determines the + range of integer depths to be used for inundation calculations, up to the next stage or a specified cap + for the last stage. + + Args: + non_rec_stage_values_df (pd.DataFrame): DataFrame containing stage names and their corresponding stage values. + Must have columns "stage_name" and "stage_value". + past_major_interval_cap (int): The number of intervals to add beyond the last stage value. + huc_lid_id (str): Identifier used for logging and tracing. + + Returns: + list: A list of lists, where each sublist contains a stage name and an integer interval value, + e.g., [["action", 21], ["action", 22], ...]. This represents the stage names and depths + to be used for inundation mapping. ''' - - interval_recs = [] - - stage_values_claimed = [] + interval_recs, stage_values_claimed = [], [] MP_LOG.trace( f"{huc_lid_id}: Calculating intervals for non_rec_stage_values_df is {non_rec_stage_values_df}" @@ -1274,14 +1453,13 @@ def __calc_stage_intervals(non_rec_stage_values_df, past_major_interval_cap, huc for idx in non_rec_stage_values_df.index: row = non_rec_stage_values_df.loc[idx] - - # MP_LOG.trace(f"{huc_lid_id}: interval calcs - non_rec_stage_value is idx: {idx}; {row}") - cur_stage_name = row["stage_name"] cur_stage_val = row["stage_value"] - # calc the intervals between the cur and the next stage - # for the cur, we need to round up, but the curr and the next + # MP_LOG.trace(f"{huc_lid_id}: interval calcs - non_rec_stage_value is idx: {idx}; {row}") + + # calc the intervals between the current and the next stage + # for the current, we need to round up, but the current and the next # to stay at full integers. We do this as it is possible for stages to be decimals # ie) action is 2.4, and mod is 4.6, we want intervals at 3 and 4. # The highest value of the interval_list is not included @@ -1295,7 +1473,7 @@ def __calc_stage_intervals(non_rec_stage_values_df, past_major_interval_cap, huc min_interval_val = int(cur_stage_val) + 1 else: - # round up to nextg whole number + # round up to next whole number min_interval_val = math.ceil(cur_stage_val) + 1 if idx < len(non_rec_stage_values_df) - 1: # not the last record @@ -1326,15 +1504,19 @@ def __calc_stage_intervals(non_rec_stage_values_df, past_major_interval_cap, huc def load_restricted_sites(is_stage_based): - """ - Previously, only stage based used this. It is now being used by stage-based and flow-based (1/24/25) + ''' + Used in both stage- and flow-based CatFIM. - The 'catfim_type' column can have three different values: 'stage', 'flow', and 'both'. This determines - whether the site should be filtered out for stage-based CatFIM, flow-based CatFIM, or both of them. + The 'catfim_type' arg is used to determine whether the site should be filtered out + for stage-based CatFIM, flow-based CatFIM, or both. - Returns: a dataframe for the restricted lid and the reason why: - 'nws_lid', 'restricted_reason', 'catfim_type' - """ + Args: + catfim_type (str): Can have three different values: 'stage', 'flow', or 'both'. + + Returns: + df_restricted_sites (pandas.DataFrame): A dataframe for the restricted lid and the reason why. + Columns: 'nws_lid', 'restricted_reason', 'catfim_type' + ''' file_name = "ahps_restricted_sites.csv" current_script_folder = os.path.dirname(__file__) @@ -1358,10 +1540,6 @@ def load_restricted_sites(is_stage_based): df_restricted_sites['nws_lid'] = df_restricted_sites['nws_lid'].str.upper() - # There are enough conditions and a low number of rows that it is easier to - # test / change them via a for loop - # indexs_for_recs_to_be_removed_from_list = [] -> Removed 7/17/25 - # Clean up dataframe for ind, row in df_restricted_sites.iterrows(): nws_lid = row['nws_lid'] @@ -1383,7 +1561,6 @@ def load_restricted_sites(is_stage_based): df_restricted_sites.at[ind, 'restricted_reason'] = restricted_reason FLOG.warning(f"{restricted_reason}. Lid is '{nws_lid}'") continue - # end loop # Invalid records in CSV (not dropping, just completely invalid recs from the csv) # Could be just blank rows from the csv @@ -1406,11 +1583,36 @@ def load_restricted_sites(is_stage_based): def __adjust_datum_ft(flows, metadata, lid, huc_lid_id): - # The purpose of this function is to determine the vertical datum adjustment - # to convert the datum of the rating curve to NAVD88. - - # TODO: Aug 2024: This whole parts needs revisiting. Lots of lid data has changed and this - # is all likely very old. + ''' + Used in stage-based CatFIM. + + Determines the vertical datum adjustment (in feet) to convert the datum of the + rating curve to NAVD88. + + Uses the rating curve source and metadata to get the correct vertical datum and CRS. + + It applies custom workarounds for known sites with special datum or CRS requirements, + and attempts to compute the adjustment using the NOAA VDatum service when necessary. + + Args: + flows (dict): Dictionary containing flow information, including the source of the rating curve. + metadata (dict): Dictionary containing site metadata, including datum and CRS information. + lid (str): Location identifier for the site. + huc_lid_id (str): Combined HUC and location identifier for logging and messaging. + Returns: + tuple: + - datum_adj_ft (float or None): The vertical datum adjustment in feet to convert to NAVD88, + or None if adjustment could not be determined. + - all_messages (list of str): List of messages and warnings generated during processing. + Notes: + - Special handling is included for sites with known datum or CRS issues. + - If the datum is already NAVD88 or equivalent, the adjustment is 0.0. + - If the datum is NGVD29 or similar, an adjustment is attempted using the NOAA VDatum service. + - If errors occur during adjustment, appropriate messages are logged and returned. + + TODO: Aug 2024: This whole parts needs revisiting. Lots of lid data has changed and this + is all likely very old. + ''' # Jul 2024: For now, we will duplicate messages via all_messsages and via the logging system. all_messages = [] @@ -1538,6 +1740,27 @@ def __adjust_datum_ft(flows, metadata, lid, huc_lid_id): def __create_acceptable_usgs_elev_df(usgs_elev_df, huc_lid_id): + ''' + Used in stage-based CatFIM. + + Creates an updated USGS elevation table with a descriptive USGS exclusion status column. + + The function checks each row of the input DataFrame for: + - Acceptable USGS data altitude method code + - Acceptable USGS site type + - Acceptable USGS altitude accuracy threshold + + For each criterion not met, a corresponding message is appended to the 'usgs_exclusion_status' column. + If all criteria are met, the status is set to 'acceptable'. + In case of missing columns or errors, the original DataFrame is returned and errors are logged. + + Args: + usgs_elev_df (pd.DataFrame): DataFrame containing USGS elevation data with required columns. + huc_lid_id (str): Identifier for the HUC/LID, used for logging. + + Returns: + pd.DataFrame: DataFrame with an added 'usgs_exclusion_status' column indicating acceptability. + ''' acceptable_usgs_elev_df = None try: acceptable_msg = '' @@ -1590,6 +1813,28 @@ def __create_acceptable_usgs_elev_df(usgs_elev_df, huc_lid_id): def __adj_dem_evalation_val(acceptable_usgs_elev_df, lid, huc_lid_id): + ''' + Used in stage-based CatFIM. + + Retrieves the DEM-adjusted elevation value for a given USGS gage site (LID) from the provided DataFrame, + and checks for exclusion criteria or data issues. + + Args: + acceptable_usgs_elev_df (pd.DataFrame): DataFrame containing USGS gage information, including + 'nws_lid', 'levpa_id', 'dem_adj_elevation', and 'usgs_exclusion_status' columns. + lid (str): The NWS LID to look up. + huc_lid_id (str): Combined HUC and LID identifier for logging purposes. + + Returns: + tuple: + - lid_usgs_elev (float): The DEM-adjusted elevation value for the specified LID, or 0 if not found or excluded. + - all_messages (list of str): List of warning or error messages encountered during the lookup process. + + Notes: + - If the LID is not found, excluded, or has an elevation of 0, appropriate messages are logged and returned. + - If multiple entries exist for the LID, the one with a non-zero 'levpa_id' is used. + - Exclusion status other than 'acceptable' will result in an early return with a message. + ''' # MP_LOG.trace(locals()) @@ -1651,12 +1896,23 @@ def __adj_dem_evalation_val(acceptable_usgs_elev_df, lid, huc_lid_id): def __calculate_category_key(category, stage_value, is_interval_stage): + ''' + Used in stage-based CatFIM. + + Calcuates the category key which becomes part of a file name + Changes to an int if whole number only. ie.. we don't want 22.00 but 22.0, but keep 22.15 + category_key comes things like this: action, action_24.0ft, or action_24.6ft - # Calcuate the category key which becomes part of a file name - # Change to an int if whole number only. ie.. we don't want 22.00 but 22.0, but keep 22.15 - # category_key comes things like this: action, action_24.0ft, or action_24.6ft - # and yes... this needs a better answer. + Args: + category (str): The flood category (e.g., 'action', 'minor', etc.). + stage_value (float): The stage value for the category. + is_interval_stage (bool): Whether this is an interval stage (True) or a main stage (False). + Returns: + category_key (str): Category and stage value string for file naming. + + TODO: yes... this needs a better answer. + ''' category_key = category + "_" # ie) action_ if float(stage_value) % 1 == 0: # then we have a whole number @@ -1692,6 +1948,47 @@ def generate_stage_based_categorical_fim( threshold_file, data_source, ): + ''' + Generates stage-based CatFIM for a list of HUCs. + + This function orchestrates the workflow for producing stage-based CatFIM outputs, including: + - Generating necessary flow data and site attributes. + - Parallel processing of HUCs to create inundation mapping and attribute files. + - Aggregating and merging results from parallel tasks. + - Compiling a comprehensive GeoPackage and CSV of all candidate sites, indicating mapping status and reasons for unmapped sites. + - Logging and error handling throughout the process. + + Parameters + ---------- + output_catfim_dir (str): Directory where CatFIM outputs will be written. + fim_run_dir (str): Directory containing FIM run data. + nwm_us_search (str): Path or identifier for upstream NWM search data. + nwm_ds_search (str): Path or identifier for downstream NWM search data. + env_file (str): Path to the environment file for configuration. + job_number_inundate (int): Number of parallel jobs for inundation processing. + job_number_huc (int): Number of parallel jobs for HUC processing. + lst_hucs (list of str): List of HUCs to process. + job_number_intervals (int): Number of parallel jobs for interval processing. + past_major_interval_cap (int): Cap for past major intervals. + nwm_metafile (str): Path to the NWM metafile. + df_restricted_sites (pd.DataFrame): DataFrame of restricted sites to exclude from processing. + threshold_file (str): Path to the threshold file for mapping. + data_source (str): Identifier for the data source being used. + + Outputs + ---------- + - Attribute CSVs for each mapped site. + - A merged attribute CSV (`nws_lid_attributes.csv`) in the attributes directory. + - A GeoPackage (`stage_based_catfim_sites.gpkg`) and CSV summarizing all candidate sites and their mapping status. + - Log files documenting the process and any issues encountered. + + Note: The function assumes the existence of several external utilities and global variables (e.g., FLOG, VIZ_PROJECTION, acceptable_* lists). + + Raises + ---------- + Exception: If no attribute CSV files are found or if other critical errors occur during processing. + ''' + ''' Sep 2024, I believe this can be radically simplied, but just startign with a dataframe for each ahps and populate what we @@ -1927,6 +2224,27 @@ def generate_stage_based_categorical_fim( def set_start_files_folders( step_num, output_catfim_dir, output_mapping_dir, output_flows_dir, attributes_dir, overwrite ): + ''' + Used in both stage- and flow-based CatFIM. + + Sets up and manages the initial folder structure and log files. + + Depending on the step number and overwrite flag, this function will: + - Create the main output directory if it does not exist. + - Check for the existence of the output mapping directory as a proxy for all output folders. + - If the mapping directory exists and overwrite is False, raises an Exception to prevent accidental data loss. + - If overwrite is True, deletes and recreates the mapping, flows, and attributes directories. + - Always creates the flows, mapping, and attributes directories if they do not exist. + - Ensures a logs directory exists and sets up logging for the process. + + Args: + step_num (int): The current step number in the workflow. Only performs folder cleaning if step_num == 0. + output_catfim_dir (str): Path to the main output directory for the Categorical FIM process. + output_mapping_dir (str): Path to the output directory for mapping results. + output_flows_dir (str): Path to the output directory for flow results. + attributes_dir (str): Path to the output directory for attribute results. + overwrite (bool): If True, existing output directories will be deleted and recreated. + ''' # ================================ # Folder cleaning based on step system diff --git a/tools/catfim/generate_categorical_fim_flows.py b/tools/catfim/generate_categorical_fim_flows.py index bc97c4785..fccdccf52 100755 --- a/tools/catfim/generate_categorical_fim_flows.py +++ b/tools/catfim/generate_categorical_fim_flows.py @@ -31,8 +31,16 @@ from utils.shared_variables import VIZ_PROJECTION -# TODO: Aug 2024: This script was upgraded significantly with lots of misc TODO's embedded. -# Lots of inline documenation needs updating as well +''' + +Aug 2024 +This script was upgraded significantly with lots of misc TODO's embedded. +Lots of inline documenation needs updating as well + +Oct 2025 +Doc strings and improved documentation was added. + +''' # will become global once initiallized FLOG = fl.FIM_logger() @@ -42,7 +50,18 @@ def get_env_paths(env_file): + ''' + Loads environment variables from a .env file. + Expects the .env file to contain API_BASE_URL and WBD_LAYER variables. + + Parameters + ---------- + env_file (str): Path to the .env file. + Returns + ------- + tuple: (API_BASE_URL (str), WBD_LAYER (str)) + ''' if os.path.exists(env_file) == False: raise Exception(f"The environment file of {env_file} does not seem to exist") @@ -62,13 +81,60 @@ def generate_flows_for_huc( output_flows_dir, attributes_dir, huc_messages_dir, - flows_df_dict, #nwm_flows_df, # TODO: Do we need to update this to be region specific df? or the dict of dfs? + nwm_flows_region_df, parent_log_output_file, child_log_file_prefix, df_restricted_sites, output_catfim_dir, threshold_file, ): + ''' + Only runs for flow-based CatFIM. + + Generates categorical flow files and attribute CSVs for a given HUC + using metadata, thresholds, and NWM stream segments. + + For each NWS site (lid) in the specified HUC: + - Checks for restricted sites and skips them if necessary. + - Loads threshold stage and flow data from WRDS or local files. + - Validates the presence of required stage and flow data for flood categories. + - Filters NWM stream segments by stream order and region. + - Writes flow CSV files for each valid flood category. + - Compiles site attributes and writes an attribute CSV. + - Logs messages and warnings for missing or invalid data. + - Writes a summary message file for the HUC. + + Parameters + ---------- + huc (str): Hydrologic Unit Code to process. + huc_dictionary (dict): Dictionary mapping HUCs to lists of NWS site identifiers (lids). + threshold_url (str): URL for retrieving threshold data from WRDS. + all_meta_lists (list of dict): List of metadata dictionaries for all NWS sites. + output_flows_dir (str): Directory to write output flow CSV files. + attributes_dir (str): Directory to write output attribute CSV files. + huc_messages_dir (str): Directory to write HUC-specific message files. + nwm_flows_region_df (pandas.DataFrame): DataFrame containing NWM stream segment data for the region. + parent_log_output_file (str): Path to the parent log file for multiprocessing logging. + child_log_file_prefix (str): Prefix for child log files created by this function. + df_restricted_sites (pandas.DataFrame): DataFrame listing restricted NWS sites and reasons. + output_catfim_dir (str): Directory for CATFIM output files. + threshold_file (str): Path to local threshold file (if not using WRDS). + + Returns + ------- + None + + Side Effects + ------------ + - Writes flow CSV files for each site and flood category. + - Writes attribute CSV files for each site. + - Writes HUC-specific message text files. + - Logs progress, warnings, and errors to the specified log files. + + Exceptions + ---------- + Logs and handles exceptions, writing error details to the log file. + ''' try: # Note: child_log_file_prefix is "MP_process_gen_flows", meaning all logs created by this function start @@ -196,17 +262,7 @@ def generate_flows_for_huc( unfiltered_segments = list(set(get_nwm_segs(metadata))) desired_order = metadata['nwm_feature_data']['stream_order'] - # Filter segments to be of like stream order. - # TODO: Guam - test this update - if huc[:4] == '2201': # Guam - nwm_flows_region_df = flows_df_dict['nhd_flows_guam_df'] - elif huc[:4] == '2203': # American Samoa - nwm_flows_region_df = flows_df_dict['nhd_flows_americansamoa_df'] - elif huc[:2] == '19': # Alaska - nwm_flows_region_df = flows_df_dict['nwm_flows_alaska_df'] - else: # CONUS + Hawaii + Puerto Rico - nwm_flows_region_df = flows_df_dict['nwm_flows_df'] - + # Filter segments to be of like stream order. segments = filter_nwm_segments_by_stream_order(unfiltered_segments, desired_order, nwm_flows_region_df) # Previous input was nwm_flows_df, but now it is region specific df (9/25/25) @@ -392,30 +448,53 @@ def generate_flows( df_restricted_sites, threshold_file, ): - - # TODO; Most docstrings like this are now very outdated and need updating ''' - This will create static flow files for all nws_lids and save to the - workspace directory with the following format: - huc code - nws_lid_code - threshold (action/minor/moderate/major if they exist/are defined by WRDS) - flow file (ahps_{lid code}_huc_{huc 8 code}_flows_{threshold}.csv) - - This will use the WRDS API to get the nwm segments as well as the flow - values for each threshold at each nws_lid and then create the necessary - flow file to use for inundation mapping. + Runs for both stage- and flow-based CatFIM (but with different outputs/endpoints). + + Generates static flow files for all NWS LIDs and saves them to the specified workspace directory. + The function supports both stage-based and flow-based inundation mapping workflows. + For each HUC, the function: + - Loads NWM flow metadata and region-specific flowline data. + - Aggregates sites by HUC using spatial joins. + - Generates flow files for each threshold (action/minor/moderate/major) using WRDS API data. + - Handles multiprocessing for HUC-level flow generation. + - Merges logs and attributes from child processes. + - Produces summary CSV and GeoPackage files for mapped sites and their statuses. + Parameters ---------- - output_catfim_dir : STR - root catfim dir for the particular run. ie) fim_4_3_3_4_stage_based - nwm_us_search : STR - Upstream distance (in miles) for walking up NWM network. - nwm_ds_search : STR - Downstream distance (in miles) for walking down NWM network. - wbd_path : STR - Location of HUC geospatial data (geopackage). - + output_catfim_dir (str): Root directory for the CATFIM run output (e.g., 'fim_4_3_3_4_stage_based'). + nwm_us_search (int or str): Upstream search distance (in miles) for traversing the NWM network. + nwm_ds_search (int or str): Downstream search distance (in miles) for traversing the NWM network. + env_file (str): Path to the environment file containing API and layer configuration. + job_number_huc (int): Number of parallel jobs to run for HUC-level processing. + is_stage_based (bool): If True, runs in stage-based mode and returns early with relevant objects. + lst_hucs (list of str): List of HUC codes to process. + nwm_metafile (str): Path to NWM metadata file (may be empty). + log_output_file (str): Path to the log output file for logging process information. + df_restricted_sites (pandas.DataFrame): DataFrame of restricted sites to exclude from processing. + threshold_file (str): Path to file containing threshold definitions for mapping. + + Returns + ------- + If is_stage_based is True, returns a tuple: + (huc_dictionary, out_gdf, metadata_url, threshold_url, all_meta_lists, flows_df_dict) + Otherwise, returns None (results are written to disk). + + + Side Effects + ------------ + - Writes flow files, attribute CSVs, and GeoPackages to output directories. + Saves to the workspace directory with the following format: + ///flow file (ahps_{lid code}_huc_{huc 8 code}_flows_{threshold}.csv) + - Logs process information and errors. + - Merges and finalizes mapping results for visualization and downstream use. + + Notes + ----- + - Handles special regions (Guam, American Samoa, Alaska) with region-specific flowline data. + - Uses multiprocessing for efficient HUC-level flow generation. + - Produces summary files for mapped and unmapped sites, including status messages. ''' FLOG.setup(log_output_file) # reusing the parent logs @@ -457,8 +536,6 @@ def generate_flows( nhd_flows_guam_df = gpd.read_file(input_nhd_flows_Guam) nhd_flows_americansamoa_df = gpd.read_file(input_nhd_flows_AmericanSamoa) - # TODO: Guam - pass this dictionary into functions instead of all four dfs - # Add the dfs to a dictionary for easy access later flows_df_dict = { 'nwm_flows_df': nwm_flows_df, @@ -474,7 +551,7 @@ def generate_flows( # Filter the meta list to just HUCs in the fim run output or huc if sent in as a param all_meta_lists = __load_nwm_metadata( output_catfim_dir, metadata_url, nwm_us_search, nwm_ds_search, nwm_metafile - ) # TODO: Guam - will use nwm_metafile option + ) end_dt = datetime.now(timezone.utc) time_duration = end_dt - start_dt @@ -533,6 +610,7 @@ def generate_flows( with ProcessPoolExecutor(max_workers=job_number_huc) as executor: for huc in huc_dictionary: + # Get the correct nwm_flows_region_df based on the HUC if huc[:4] == '2201': # Guam nwm_flows_region_df = flows_df_dict['nhd_flows_guam_df'] elif huc[:4] == '2203': # American Samoa @@ -555,7 +633,7 @@ def generate_flows( output_flows_dir, attributes_dir, huc_messages_dir, - nwm_flows_region_df, + nwm_flows_region_df, log_output_file, child_log_file_prefix, df_restricted_sites, @@ -689,10 +767,27 @@ def generate_flows( print() def __load_thresholds(output_catfim_dir, threshold_url, lid, huc, threshold_file): + ''' + Runs for both stage- and flow-based CatFIM. - if os.path.isfile(threshold_file) == True: - # FLOG.lprint(f"Threshold file already downloaded and exists at {threshold_file}") # TODO: Guam - this option will be used + Loads threshold stage and flow data for a given site (LID) either from a local pickle file or from the WRDS API. + Parameters + ---------- + output_catfim_dir (str): Directory path where output threshold files should be saved. + threshold_url (str): URL for the WRDS API to fetch threshold data. + lid (str): NWS Location Identifier (LID) for the site. + huc (str): Hydrologic Unit Code associated with the site. + threshold_file (str): Path to the local pickle file containing threshold data. + + Returns: + ---------- + stages (dict or None): Dictionary of stage thresholds for the site, or None if not found. + flows (dict or None): Dictionary of flow thresholds for the site, or None if not found. + status_msg (str): Status message indicating the source of the loaded thresholds or error information. + ''' + + if os.path.isfile(threshold_file) == True: # Read pickle file and get the stages and flows dictionary for the site with open(threshold_file, 'rb') as f: loaded_data = pickle.load(f) @@ -741,16 +836,29 @@ def __load_thresholds(output_catfim_dir, threshold_url, lid, huc, threshold_file writer.writeheader() writer.writerows(thresholds) - - # TODO: In CatFIM post-processing, add a functionality to append all those files together into one thresholds.pkl - # so we can rerun CatFIM without hitting the WRDS API (like we did with __load_nwm_metadata) - # Will need to double check that the memory usage isn't insane with this new idea. - - return stages, flows, status_msg + # local script calls __load_nwm_metadata so FLOG is already setup def __load_nwm_metadata(output_catfim_dir, metadata_url, nwm_us_search, nwm_ds_search, nwm_metafile): + ''' + Runs for both stage and flow. Loads and filters NWM metadata. + + This function checks if a local metadata pickle file exists. If it does, the metadata is loaded from the file. + Otherwise, it downloads metadata from the specified URL using two API calls: one for all forecast points and + another for all points in OCONUS regions (HI, PR, AK). The results are combined, and duplicate or None-valued + NWS LIDs are filtered out. The filtered metadata is then saved to a pickle file for future use. + + Args: + output_catfim_dir (str): Directory where output files, including the metadata pickle, are stored. + metadata_url (str): URL endpoint for retrieving NWM metadata. + nwm_us_search (int): Upstream trace distance for metadata search. + nwm_ds_search (int): Downstream trace distance for metadata search. + nwm_metafile (str): Path to the local metadata pickle file. + + Returns: + list: Filtered list of metadata dictionaries, each representing a unique NWS LID site. + ''' FLOG.trace(metadata_url) @@ -760,7 +868,7 @@ def __load_nwm_metadata(output_catfim_dir, metadata_url, nwm_us_search, nwm_ds_s # WRDS unless we need a smaller or modified version. This one likely has all nws_lid data. if os.path.isfile(nwm_metafile) == True: - FLOG.lprint(f"Meta file already downloaded and exists at {nwm_metafile}") # TODO: Guam - this option will be used + FLOG.lprint(f"Meta file already downloaded and exists at {nwm_metafile}") with open(nwm_metafile, "rb") as p_handle: output_meta_list = pickle.load(p_handle) @@ -770,12 +878,6 @@ def __load_nwm_metadata(output_catfim_dir, metadata_url, nwm_us_search, nwm_ds_s FLOG.lprint(f"Meta file will be downloaded and saved at {meta_file}") - # Jan 2025: Removed lid_to_run functionality, so it is no longer needed as an input. - - # Dec 2024: Running two API calls: one to get all forecast points, and another - # to get all points (non-forecast and forecast) for the OCONUS regions. Then, - # duplicate LIDs are removed. - # Get all forecast points forecast_point_meta_list, ___ = get_metadata( metadata_url, @@ -827,7 +929,7 @@ def __load_nwm_metadata(output_catfim_dir, metadata_url, nwm_us_search, nwm_ds_s output_meta_list.append(site) FLOG.lprint(f'{len(duplicate_lids)} duplicate points removed.') - FLOG.lprint(f'Duplicate point LIDs: {duplicate_lids}') + # FLOG.lprint(f'Duplicate point LIDs: {duplicate_lids}') FLOG.lprint(f'{len(nonelid_metadata_list)} points with value of None for nws_lid removed.') FLOG.lprint(f'Filtered metadatada downloaded for {len(output_meta_list)} points.') diff --git a/tools/catfim/generate_categorical_fim_mapping.py b/tools/catfim/generate_categorical_fim_mapping.py index b29b9cbbe..ba700f0a9 100755 --- a/tools/catfim/generate_categorical_fim_mapping.py +++ b/tools/catfim/generate_categorical_fim_mapping.py @@ -18,7 +18,7 @@ from inundate_gms import Inundate_gms from mosaic_inundation import Mosaic_inundation from rasterio.features import shapes -from rasterio.warp import Resampling, calculate_default_transform, reproject +from rasterio.warp import Resampling, reproject from shapely.geometry.multipolygon import MultiPolygon from shapely.geometry.polygon import Polygon from tools_shared_functions import mask_out_lakes @@ -26,11 +26,18 @@ import utils.fim_logger as fl from utils.shared_functions import getDriver -from utils.shared_variables import ALASKA_CRS, PREP_PROJECTION, VIZ_PROJECTION +from utils.shared_variables import VIZ_PROJECTION +''' -# TODO: Aug 2024: This script was upgraded significantly with lots of misc TODO's embedded. -# Lots of inline documenation needs updating as well +Aug 2024 +This script was upgraded significantly with lots of misc TODO's embedded. +Lots of inline documenation needs updating as well + +Oct 2025 +Doc strings and improved documentation was added. + +''' # will become global once initiallized @@ -59,6 +66,66 @@ def produce_stage_based_lid_tifs( mp_parent_log_file, child_log_file_prefix, ): + ''' + Only used for stage-based CatFIM. + + Generates stage-based inundation TIFF files for a given LID (Location ID) and category, mosaics branch-level inundation maps, + and removes intermediary files. Handles multi-processing across branches and logs progress and warnings. + + Parameters + ---------- + stage_val : float + The stage value to use for inundation mapping. + datum_adj_ft : float + Datum adjustment in feet to be applied to the stage value. + branch_dir : str + Directory containing branch subdirectories for processing. + lid_usgs_elev : float + USGS elevation for the LID gage, used to calculate HAND stage. + lid_altitude : float + Altitude adjustment for the LID location. + fim_dir : str + Base directory for HAND FIM data. + segments : list + List of NWM segment feature IDs to process for inundation. + lid : str + Location ID string. + huc : str + Hydrologic Unit Code for the watershed. + lid_directory : str + Directory to store output TIFF files for the LID. + category : str + Category name for the inundation mapping (e.g., "action", "minor"). + category_key : str + Key string representing the category and magnitude. + number_of_jobs : int + Number of parallel jobs to use for multi-processing. + mp_parent_log_file : str + Path to the parent log file for multi-processing logging. + child_log_file_prefix : str + Prefix for child log files in multi-processing. + + Returns + ------- + messages : list + List of warning or status messages generated during processing. + hand_stage : int + Computed HAND stage in millimeters. + datum_adj_wse : float + Datum-adjusted water surface elevation in feet. + datum_adj_wse_m : float + Datum-adjusted water surface elevation in meters. + + Notes + ----- + - The function performs multi-processing across branches to generate inundation maps. + - Branch-level TIFFs are mosaicked into a single extent TIFF for the LID and category. + - Lakes are masked out from the final inundation array. + - Intermediary branch TIFFs are deleted after merging to save space. + - Logging is performed throughout for traceability and error handling. + - If negative HAND stage or missing segments are detected, processing is skipped for those cases. + + ''' MP_LOG.MP_Log_setup(mp_parent_log_file, child_log_file_prefix) @@ -285,9 +352,6 @@ def produce_stage_based_lid_tifs( # This is part of an MP call and needs MP_LOG -# This is a form of inundation which we are doing ourselves -# as we only have one flow value and our normal inundation tools -# are looking for files not single values def produce_inundated_branch_tif( rem_path, catchments_path, @@ -301,13 +365,61 @@ def produce_inundated_branch_tif( parent_log_output_file, child_log_file_prefix, ): - """ - # Open rem_path and catchment_path using rasterio. - - Note: category can have different formats, depending if it is an interval or not or int or float - If it has a stage number it, it is an interval value - ie) action, action_24ft, action_24.6, or action_24.6ft - """ + ''' + Only used in stage-based CatFIM. + + Generates a binary inundation raster (GeoTIFF) for a given branch and stage, + indicating inundated areas within specified catchments. + + This function reads a REM (Raster Elevation Model) and catchments raster, applies + a stage threshold, and masks the result to catchments matching the provided hydroid + list. The output is a raster where inundated cells are marked as 1 and non-inundated + cells as 0. + + This is a form of inundation which is specific to CatFIM because we only have one + flow value and the other FIM inundation tools are looking for flow files not single + values. + + Parameters + ---------- + rem_path : str + Path to the REM raster file. + catchments_path : str + Path to the catchments raster file. + hydroid_list : list of int + List of hydroid identifiers to include in the mask. + hand_stage : float or int + Stage threshold value for inundation. + lid_directory : str + Directory where the output raster will be saved. + category_key : str + Category key used in the output file name. + huc : str + Hydrologic Unit Code for the region. + lid : str + Location identifier for the site. + branch : str + Branch identifier for the mapping. + parent_log_output_file : str + Path to the parent log output file for logging. + child_log_file_prefix : str + Prefix for child log files. + + Returns + ------- + None + The function saves the output raster to disk and does not return any value. + + Notes + ----- + - The output raster is only generated if at least one cell is inundated. + - Logging is set up for error and trace reporting. + - Handles hydroid values by clipping to the last 4 digits for matching. + - Both input rasters are expected to have a nodata value of 0. + - A category can have different formats, depending if it is an interval or not or int or float. + If it has a stage number it, it is an interval value, ie) action, action_24ft, action_24.6, or action_24.6ft + + ''' try: # This is setting up logging for this function to go up to the parent @@ -386,10 +498,35 @@ def produce_inundated_branch_tif( # This is not part of an MP process, but needs to have FLOG carried over so this file can see it -# Used for Flow only? -def run_catfim_inundation( +def run_catfim_inundation( fim_run_dir, output_flows_dir, output_mapping_dir, job_number_huc, job_number_inundate, log_output_file ): + ''' + Only used in flow-based CatFIM. + + Executes the inundation and mosaicking process for CatFIM mapping across multiple HUCs and AHPS sites. + + This function coordinates the parallel execution of inundation mapping tasks using a process pool, handling + multiple HUCs and AHPS sites. It sets up logging, identifies valid HUC directories, and processes each AHPS site + and magnitude threshold within those HUCs. For each combination, it submits an inundation mapping job to the + process pool executor. Errors are logged and handled gracefully, and log files from child processes are merged + into the parent log file upon completion. + + Args: + fim_run_dir (str): Path to the directory containing HAND FIM run outputs for HUCs. + output_flows_dir (str): Path to the directory containing flow outputs for valid HUCs. + output_mapping_dir (str): Path to the directory where inundation mapping results will be saved. + job_number_huc (int): Number of parallel jobs to run for HUC-level processing. + job_number_inundate (int): Number of parallel jobs to run for inundation mapping within each HUC. + log_output_file (str): Path to the log file for recording process output and errors. + + Returns: + None + + Raises: + SystemExit: If a critical error occurs during processing, the function logs the error and exits the program. + ''' + # Adding a pointer in this file coming from generate_categorial_fim so they can share the same log file FLOG.setup(log_output_file) @@ -522,10 +659,6 @@ def run_catfim_inundation( # This is part of an MP Pool -# It is used for flow-based -# It inundates each set based on the ahps/mangnitude list and for each segment in the -# the branch hydrotable -# Then each is inundated per branch and mosiaked for the ahps def run_inundation( magnitude_flows_csv, huc, @@ -538,10 +671,45 @@ def run_inundation( parent_log_output_file, child_log_file_prefix, ): + ''' + Only used in flow-based CatFIM. + + Runs the inundation mapping workflow for a given HUC and magnitude, including logging, + inundation raster generation, mosaicking, and lake masking. + + Inundates each set based on the ahps/mangnitude list and for each segment in the the branch hydrotable. + Then each set is inundated per branch and mosiaked for the AHPS site. + + Parameters: + magnitude_flows_csv (str): Path to the CSV file containing magnitude flows for the forecast. + huc (str): Hydrologic Unit Code (HUC) identifier for the region to process. + output_huc_site_mapping_dir (str): Directory to store output HUC-site mapping files. + output_extent_tif (str): Path to the output inundation extent raster (GeoTIFF). + ahps_site (str): AHPS site identifier associated with the forecast. + magnitude (str): Magnitude value for the forecast (e.g., flood stage). + fim_run_dir (str): Directory containing FIM run data and hydrofabric. + job_number_inundate (int): Number of worker processes to use for inundation generation. + parent_log_output_file (str): Path to the parent log file for logging output. + child_log_file_prefix (str): Prefix for child log files created by this function. + + Returns: + None + + Side Effects: + - Generates inundation extent raster and saves to output_extent_tif. + - Logs progress and errors to the specified log files. + - Masks lakes from the inundation raster. + - Performs mosaicking of inundation rasters. + - Returns early if errors occur or output raster is not created. + + Exceptions: + Logs any exceptions encountered during processing and returns None. + ''' + # Note: child_log_file_prefix is "MP_run_ind", meaning all logs created by this function start # with the phrase "MP_run_ind" # They will be rolled up into the parent_log_output_file - # This is setting up logging for this function to go up to the parent\ + # This is setting up logging for this function to go up to the parent MP_LOG.MP_Log_setup(parent_log_output_file, child_log_file_prefix) # MP_LOG.trace(locals()) @@ -625,6 +793,36 @@ def post_process_huc( child_log_file_prefix, progress_stmt, ): + ''' + Used in both flow-based and stage-based CatFIM. + + Post-processes inundation mapping results for a given HUC. + + This function performs post-processing tasks for a specified HUC, including: + - Setting up logging for the process. + - Iterating through AHPS site directories. + - Identifying and processing rolled-up extent TIFF files for each AHPS site. + - Checking for the existence of corresponding attributes CSV files. + - Reformatting inundation maps using site-specific attributes. + - Handling interval and non-interval stage-based TIFF files. + - Logging warnings and errors as appropriate. + + Args: + output_catfim_dir (str): Directory containing CatFIM output files. + ahps_dir_list (list): List of AHPS site identifiers to process. + huc_dir (str): Directory for the current HUC's data. + gpkg_dir (str): Directory to store GeoPackage outputs. + huc (str): Hydrologic Unit Code being processed. + parent_log_output_file (str): Path to the parent log file for logging output. + child_log_file_prefix (str): Prefix for child log files created by this function. + progress_stmt (str): Statement describing the current progress for logging. + + Returns: + None + + Raises: + Exception: Logs and handles any exceptions that occur during processing. + ''' # Note: child_log_file_prefix is "MP_post_process_{huc}", meaning all logs created by this function start # with the phrase "MP_post_process_{huc}". This one rollups up to the master catfim log @@ -747,6 +945,37 @@ def post_process_huc( def post_process_cat_fim_for_viz( catfim_method, output_catfim_dir, job_huc_ahps, catfim_version, model_version, log_output_file ): + ''' + Used in both flow-based and stage-based CatFIM. + + Post-processes CatFIM outputs for visualization. + + This function performs the following steps: + 1. Sets up logging and prepares output directories. + 2. Identifies HUC/AHPS directories to process. + 3. Uses a process pool to post-process each HUC directory in parallel, converting TIF extents to polygons and saving as GeoPackage files. + 4. Merges all generated GeoPackage layers into a single GeoDataFrame. + 5. Optionally dissolves merged layers by AHPS and magnitude for flow-based methods. + 6. Cleans up unnecessary columns from the merged data. + 7. Adds model and product version metadata. + 8. Saves the final merged layer as both GeoPackage and CSV files for visualization. + 9. Rolls up logs from child processes into the main log file. + + Args: + catfim_method (str): The method used for Categorical FIM (e.g., "flow_based"). + output_catfim_dir (str): Directory where Categorical FIM outputs are stored. + job_huc_ahps (int): Number of parallel jobs for processing HUC/AHPS directories. + catfim_version (str): Version identifier for the Categorical FIM product. + model_version (str): Version identifier for the underlying model. + log_output_file (str): Path to the log file for recording process output. + + Raises: + Exception: If no HUC/AHPS directories are found or if no GeoPackage files are generated. + + Returns: + None + ''' + # Adding a pointer in this file coming from generate_categorial_fim so they can share the same log file FLOG.setup(log_output_file) @@ -875,7 +1104,6 @@ def post_process_cat_fim_for_viz( merged_layers_gdf["model_version"] = model_version merged_layers_gdf["product_version"] = catfim_version - # TODO: Aug 2024: gpkg are not opening in qgis now? project, wkt, non defined geometry columns? gpkg_file_path = os.path.join(output_mapping_dir, f'{output_file_name}.gpkg') FLOG.lprint(f"Saving catfim library gpkg version to {gpkg_file_path}") merged_layers_gdf.to_file(gpkg_file_path, driver='GPKG', engine="fiona") @@ -902,12 +1130,59 @@ def reformat_inundation_maps( parent_log_output_file, child_log_file_prefix, ): - """_summary_ - Turns inundated tifs into dissolved polys gpkg with more attributes - - """ - # interval stage might come in as null and that is ok - + ''' + Used in both flow- and stage-based CatFIM. + + Converts an inundation raster (GeoTIFF) to a dissolved polygon GeoPackage with enriched attributes. + + This function reads an inundation raster file, extracts inundated areas as polygons, dissolves them into a single multipolygon, + and joins additional attributes from a CSV file. The resulting GeoDataFrame is projected to Web Mercator and saved as a GeoPackage. + Logging is performed throughout the process, and special handling is included for interval stages and empty rasters. + + Parameters + ---------- + ahps_lid : str + The AHPS location identifier for the site. + tif_to_process : str + Path to the inundation raster (GeoTIFF) file to process. + gpkg_dir : str + Directory where the output GeoPackage file will be saved. + huc : str + Hydrologic Unit Code (HUC) for the watershed. + magnitude : str or int + Magnitude or recurrence interval for the inundation scenario. + nws_lid_attributes_filename : str + Path to the CSV file containing NWS LID attributes for joining. + interval_stage : float or None + Stage value for interval scenarios; may be None. + is_interval : bool + Flag indicating whether the scenario is an interval. + parent_log_output_file : str + Path to the parent log file for logging output. + child_log_file_prefix : str + Prefix for child log files created by this function. + + Returns + ------- + None + The function saves the output GeoPackage file and logs messages, but does not return a value. + + Raises + ------ + ValueError + If assigning CRS to a GeoDataFrame without a geometry column. + Exception + For any other errors encountered during processing. + + Notes + ----- + - If no inundated areas are found in the raster, a log message is written and the function returns early. + - The output GeoPackage contains dissolved polygons with joined attributes and is projected to Web Mercator. + - For interval scenarios, the 'stage_uncorrected' attribute is set to None to avoid confusion. + - The interval stage might come in as null and that is ok + + ''' + # Note: child_log_file_prefix is "MP_reformat_tifs_{huc}", meaning all logs created by this # function start with the phrase will rollup to the master catfim logs @@ -1013,9 +1288,7 @@ def reformat_inundation_maps( # This is not part of an MP progress and simply needs the # pointer of FLOG carried over here so it can use it directly. - # TODO: Aug, 2024. We need re-evaluate job numbers, see usage of job numbers below -# Used for Flow only def manage_catfim_mapping( fim_run_dir, output_flows_dir, @@ -1028,6 +1301,33 @@ def manage_catfim_mapping( log_output_file, step_number=1, ): + ''' + Only used in flow-based CatFIM. + + Manages the workflow for generating categorical FIM (Flood Inundation Mapping) outputs, + including running inundation mapping and post-processing for visualization. + + Parameters: + fim_run_dir (str): Directory containing the FIM run data. + output_flows_dir (str): Directory where flow outputs are stored. + output_catfim_dir (str): Directory for storing categorical FIM outputs. + catfim_method (str): Method used for categorical FIM generation. + catfim_version (str): Version identifier for the categorical FIM process. + model_version (str): Version identifier for the underlying model. + job_number_huc (int): Number of jobs for HUC (Hydrologic Unit Code) processing. + job_number_inundate (int): Number of jobs for inundation processing. + log_output_file (str): Path to the log file for output messages. + step_number (int, optional): Workflow step to execute. Defaults to 1. + + Returns: + None + + Notes: + - Initializes logging and manages workflow steps. + - Runs inundation mapping if step_number <= 1. + - Performs post-processing for visualization using multiple jobs. + - Logs the elapsed time for the mapping process. + ''' # Adding a pointer in this file coming from generate_categorial_fim so they can share the same log file FLOG.setup(log_output_file) @@ -1116,13 +1416,6 @@ def manage_catfim_mapping( default=1, type=int, ) - # parser.add_argument( - # '-depthtif', - # '--write-depth-tiff', - # help='Using this option will write depth TIFFs.', - # required=False, - # action='store_true', - # ) parser.add_argument( '-step', @@ -1140,7 +1433,6 @@ def manage_catfim_mapping( output_catfim_dir = args['output_catfim_dir'] job_number_huc = int(args['job_number_huc']) job_number_inundate = int(args['job_number_inundate']) - # depthtif = args['write_depth_tiff'] step_num = args['step_number'] log_dir = os.path.join(output_catfim_dir, "logs") From 3b1b54db50a6cbcda83c2e8f4c2be2139c8707f2 Mon Sep 17 00:00:00 2001 From: EmilyDeardorff-NOAA Date: Fri, 31 Oct 2025 18:00:36 +0000 Subject: [PATCH 10/13] Add new wrds processing script to data dir. --- data/wrds/download_process_wrds.py | 535 ++++++++++++++++++ tools/catfim/generate_categorical_fim.py | 2 +- .../catfim/generate_categorical_fim_flows.py | 6 +- 3 files changed, 539 insertions(+), 4 deletions(-) create mode 100644 data/wrds/download_process_wrds.py diff --git a/data/wrds/download_process_wrds.py b/data/wrds/download_process_wrds.py new file mode 100644 index 000000000..3284aab8b --- /dev/null +++ b/data/wrds/download_process_wrds.py @@ -0,0 +1,535 @@ +#!/usr/bin/env python3 +import argparse +import os +import urllib3 + +# import urllib +# import urllib.request +# from collections import defaultdict +# from pathlib import Path +import pickle +import pandas as pd +import requests +from datetime import date +from dotenv import load_dotenv + +from urllib3.exceptions import InsecureRequestWarning +from urllib3.util.retry import Retry +from requests.adapters import HTTPAdapter + + +# ------------------------------------- +# Name: download_process_wrds_data.py +# Script Location: data/wrds ? +# ------------------------------------- + +# moved over from inundation-mapping/tools/tools_shared_functions.py +# TODO: Remove this function from the tools shared functions file +# TODO: Re-route all files that use this function to get it from here +def get_metadata( + metadata_url, + select_by, + selector, + must_include=None, + upstream_trace_distance=None, + downstream_trace_distance=None, +): + ''' + Retrieve metadata for a site or list of sites. + + Parameters + ---------- + metadata_url : STR + metadata base URL. + select_by : STR + Location search option. Options include: 'state', TODO: test 'nws_lid' + selector : LIST + Value to match location data against. Supplied as a LIST. + must_include : STR, optional + What attributes are required to be valid response. The default is None. + upstream_trace_distance : INT, optional + Distance in miles upstream of site to trace NWM network. The default is None. + downstream_trace_distance : INT, optional + Distance in miles downstream of site to trace NWM network. The default is None. + + Returns + ------- + metadata_list : LIST + Dictionary or list of dictionaries containing metadata at each site. + metadata_dataframe : Pandas DataFrame + Dataframe of metadata for each site. + + ''' + + # Format selector variable in case multiple selectors supplied + format_selector = '%2C'.join(selector) + # Define the url + url = f'{metadata_url}/{select_by}/{format_selector}/' + # Assign optional parameters to a dictionary + params = {} + params['must_include'] = must_include + params['upstream_trace_distance'] = upstream_trace_distance + params['downstream_trace_distance'] = downstream_trace_distance + # Suppress Insecure Request Warning + requests.packages.urllib3.disable_warnings(InsecureRequestWarning) + + # Request data from url + response = requests.get(url, params=params, verify=False) + # print(response) + # print(url) + if response.ok: + # Convert data response to a json + metadata_json = response.json() + # Get the count of returned records + location_count = metadata_json['_metrics']['location_count'] + # Get metadata + metadata_list = metadata_json['locations'] + # Add timestamp of WRDS retrieval + timestamp = response.headers['Date'] + # Add timestamp of sources retrieval + timestamp_list = metadata_json['data_sources']['metadata_sources'] + + # Default timestamps to "Not available" and overwrite with real values if possible. + nwis_timestamp, nrldb_timestamp = "Not available", "Not available" + for timestamp in timestamp_list: + if "NWIS" in timestamp: + nwis_timestamp = timestamp + if "NRLDB" in timestamp: + nrldb_timestamp = timestamp + + # nrldb_timestamp, nwis_timestamp = metadata_json['data_sources']['metadata_sources'] + # get crosswalk info (always last dictionary in list) + crosswalk_info = metadata_json['data_sources'] + # Update each dictionary with timestamp and crosswalk info also save to DataFrame. + for metadata in metadata_list: + metadata.update({"wrds_timestamp": timestamp}) + metadata.update({"nrldb_timestamp": nrldb_timestamp}) + metadata.update({"nwis_timestamp": nwis_timestamp}) + metadata.update(crosswalk_info) + metadata_dataframe = pd.json_normalize(metadata_list) + # Replace all periods with underscores in column names + metadata_dataframe.columns = metadata_dataframe.columns.astype(str).str.replace('.', '_') + else: + # if request was not succesful, print error message. + # TODO: Output this as a status string because the print is getting suppressed + print(f'Code: {response.status_code}\nMessage: {response.reason}\nURL: {response.url}') + # Return empty outputs + metadata_list = [] + metadata_dataframe = pd.DataFrame() + return metadata_list, metadata_dataframe + + +# moved over from inundation-mapping/tools/tools_shared_functions.py +# TODO: Remove this function from the tools shared functions file +# TODO: Re-route all files that use this function to get it from here +def get_thresholds(threshold_url, select_by, selector, threshold='all'): + ''' + Get nws_lid threshold stages and flows (i.e. bankfull, action, minor, + moderate, major). Returns a dictionary for stages and one for flows. + + Parameters + ---------- + threshold_url : STR + WRDS threshold API. + select_by : STR + Type of site (nws_lid, usgs_site_code etc). + selector : STR + Site for selection. Must be a single site. + threshold : STR, optional + Threshold option. The default is 'all'. + + Returns + ------- + stages : DICT + Dictionary of stages at each threshold. + flows : DICT + Dictionary of flows at each threshold. + status_msg : STR + Status of API call and data availability. + + ''' + params = {} + params['threshold'] = threshold + url = f'{threshold_url}/{select_by}/{selector}' + + # Initialize status message + status_msg = f"Selector: {selector}: " + + # response = requests.get(url, params=params, verify=False) + + # Call the API + session = requests.Session() + + urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) + + retry = Retry(connect=3, backoff_factor=0.5) + adapter = HTTPAdapter(max_retries=retry) + session.mount('http://', adapter) + + response = session.get(url, params=params, verify=False) + + if response.status_code == 200: + thresholds_json = response.json() + + # Get metadata + thresholds_info = thresholds_json['value_set'] + threshold_count = thresholds_json['_metrics']['threshold_count'] + status_msg += f"WRDS response sucessful. {threshold_count} threshold types available. " + + # Initialize stages/flows dictionaries + stages = {} + flows = {} + # Check if thresholds information is populated. If site is non-existent thresholds info is blank + if thresholds_info: + # Get all rating sources and corresponding indexes in a dictionary + rating_sources = { + i.get('calc_flow_values').get('rating_curve').get('source'): index + for index, i in enumerate(thresholds_info) + } + # Get threshold data use USGS Rating Depot (priority) otherwise NRLDB. + if 'USGS Rating Depot' in rating_sources: + threshold_data = thresholds_info[rating_sources['USGS Rating Depot']] + elif 'NRLDB' in rating_sources: + threshold_data = thresholds_info[rating_sources['NRLDB']] + # If neither USGS or NRLDB is available use first dictionary to get stage values. + else: + threshold_data = thresholds_info[0] + # Get stages and flows for each threshold + if threshold_data: + status_msg += "Thresholds available. " + + stages = threshold_data['stage_values'] + flows = threshold_data['calc_flow_values'] + # Add source information to stages and flows. Flows source inside a nested dictionary. Remove key once source assigned to flows. + stages['source'] = threshold_data.get('metadata').get('threshold_source') + flows['source'] = flows.get('rating_curve', {}).get('source') + flows.pop('rating_curve', None) + # Add timestamp WRDS data was retrieved. + stages['wrds_timestamp'] = response.headers['Date'] + flows['wrds_timestamp'] = response.headers['Date'] + # Add Site information + stages['nws_lid'] = threshold_data.get('metadata').get('nws_lid') + flows['nws_lid'] = threshold_data.get('metadata').get('nws_lid') + stages['usgs_site_code'] = threshold_data.get('metadata').get('usgs_site_code') + flows['usgs_site_code'] = threshold_data.get('metadata').get('usgs_site_code') + stages['units'] = threshold_data.get('metadata').get('stage_units') + flows['units'] = threshold_data.get('metadata').get('calc_flow_units') + return stages, flows, status_msg + else: + status_msg += "WRDS response error." + print(status_msg) + stages = None + flows = None + + return stages, flows, status_msg + + +# ----- + +# TODO: Remove this function from the generate_categorical_fim_flows.py and replace it with code that can read it in +# was __load_nwm_metadata +def download_all_metadata(workspace, metadata_url, search, get_metadata, label_with_date): + + nwm_us_search, nwm_ds_search = search, search + output_meta_list = [] + + # Get all forecast points + forecast_point_meta_list, ___ = get_metadata( + metadata_url, + select_by='nws_lid', + selector=['all'], + must_include='nws_data.rfc_forecast_point', + upstream_trace_distance=nwm_us_search, + downstream_trace_distance=nwm_ds_search, + ) + + # Get all points for OCONUS regions (HI, PR, and AK) + oconus_meta_list, ___ = get_metadata( + metadata_url, + select_by='state', + selector=['HI', 'PR', 'AK'], + must_include=None, + upstream_trace_distance=nwm_us_search, + downstream_trace_distance=nwm_ds_search, + ) + + # Append the lists + unfiltered_meta_list = forecast_point_meta_list + oconus_meta_list + + # Filter the metadata list + output_meta_list = [] + unique_lids_list, duplicate_lids_list = [], [] + duplicate_meta_list, nonelid_metadata_list = [], [] + + for i, site in enumerate(unfiltered_meta_list): + nws_lid = site['identifiers']['nws_lid'] + + if nws_lid is None: + # No LID available + nonelid_metadata_list.append(site) + + elif nws_lid in unique_lids_list: + # Duplicate LID + duplicate_lids_list.append(nws_lid) + duplicate_meta_list.append(site) + + else: + # Unique/unseen LID that's not None + unique_lids_list.append(nws_lid) + output_meta_list.append(site) + + if get_metadata == True: + filename = f'nwm_metafile{label_with_date}.pkl' + meta_filepath = os.path.join(workspace, filename) + print(f"Meta file will be downloaded and saved at {meta_filepath}") + + with open(meta_filepath, "wb") as p_handle: + pickle.dump(output_meta_list, p_handle, protocol=pickle.HIGHEST_PROTOCOL) + + return output_meta_list + + +# moved over from inundation-mapping/tools/tools_shared_functions.py +# TODO: Remove this function from the tools shared functions file +# TODO: Re-route all files that use this function to get it from here +def download_all_thresholds(workspace, threshold_url, metadata_list, label_with_date): + """ + Download all thresholds from the WRDS API for a list of LIDs and save them as CSV files. + Combine all CSV files into a single pickle file. + + This function can be run in a Jupyter notebook or as a standalone script in a Python environment + to predownload thresholds for all sites in the metadata pickle file. + + Parameters: + - threshold_url: str, URL of the WRDS API endpoint for thresholds. + - output_folder: str, path to the folder where output files will be saved. + - metadata_list: + + Returns: + - None + + Outputs: + - CSV files for each LID in a subfolder 'threshold_download' within the output_folder. + - A combined pickle file 'all_thresholds.pkl' containing all thresholds. + """ + + filename = f'thresholds{label_with_date}.pkl' + thresholds_filepath = os.path.join(workspace, filename) + print(f"Thresholds will be downloaded and saved at {thresholds_filepath}") + + lid_list = [] + huc_lid_dict = {} + + # Iterate through the metadata to get a list of LIDs and HUCs + for site_entry in metadata_list: + lid_i = site_entry['identifiers']['nws_lid'] + huc_nws_i = site_entry['nws_preferred']['huc'] + huc_usgs_i = site_entry['usgs_preferred']['huc'] + + huc_i = huc_usgs_i if huc_nws_i is None else huc_nws_i + + lid_list.append(lid_i) + huc_lid_dict[lid_i] = huc_i + + print('Start threshold downloads.') + print(f'Total sites to download: {len(lid_list)}') + + # Iterate through LIDs in huc_lid_dict and get thresholds from the WRDS API + list_threshold_dfs = [] + for lid, huc in huc_lid_dict.items(): + try: + stages, flows, status = get_thresholds( + threshold_url=threshold_url, select_by='nws_lid', selector=lid, threshold='all' + ) + except Exception as e: + print(f"Error retrieving thresholds for LID {lid}: {e}") + print(status) + continue + + # Combine and label thresholds + thresholds_dict = [{'threshold_type': 'stages', 'huc': huc, **stages}, + {'threshold_type': 'flows', 'huc': huc, **flows}] + + # Format into a dataframe and add to the df list + thresholds_df = pd.DataFrame(thresholds_dict) + list_threshold_dfs.append(thresholds_df) + + # Combine all the DataFrames in the list into a single, final DataFrame + all_thresholds_df = pd.concat(list_threshold_dfs, ignore_index=True) + + # Save the combined DataFrame to a pickle file + try: + with open(thresholds_filepath, 'wb') as f: + pickle.dump(all_thresholds_df, f) + except Exception as e: + print(f"Error saving pickle file {thresholds_filepath}: {e}") + + print('Thresholds compilation complete.') + + # return huc_lid_dict and safe as file? TODO: add in, add a note that it's just for tracing + +# def emulate_wrds_data(data_csv) # actually I think that emulate wrds data really might need to be its own file in the wrds folder... # TODO + +# this function will also be able to be run from within CatFIM +def obtain_wrds_data(env_file, + workspace, + label, + lst_hucs, + search, + get_metadata, + get_thresholds): + + print('Starting processing to obtain WRDS data...') + + # Validate workspace + if not os.path.exists(workspace): + raise ValueError(f'Workspace path {workspace} does not exist. Please provide a valid path.') + + + if get_metadata == False and get_thresholds == False: + raise ValueError('At least one of -m (get metadata) or -t (get thresholds) must be specified as True.') + elif get_metadata == True and get_thresholds == True: + print('Both metadata and thresholds will be saved.') + elif get_metadata == True and get_thresholds == False: + print('Only metadata will be saved.') + elif get_thresholds == True and get_metadata == False: + print('Only threshold data will be saved.') + + # For API usage + load_dotenv(env_file) + API_BASE_URL = os.getenv('API_BASE_URL') + if API_BASE_URL is None: + raise ValueError( + 'API base url not found. Ensure inundation_mapping/tools/ has an .env file with the API_BASE_URL.' + ) + + metadata_url = f'{API_BASE_URL}/metadata' + threshold_url = f'{API_BASE_URL}/nws_threshold' + + lst_hucs = lst_hucs.split() + + # If a list of HUCs is provided, add 'subset' to the label + subset = '' if 'all' in lst_hucs else '_subset' + + date_formatted = date.today().strftime("%d%m%Y") + label_with_date = f'{label}{subset}_{date_formatted}' + + # Download metadata and save metadata to pkl file + # (If thresholds_only == True, metadata will be downloaded but not saved) + output_meta_list = download_all_metadata(workspace, metadata_url, search, get_metadata, label_with_date) # TODO: Add lst_hucs filtering + + if get_thresholds == True: + # Download thresholds + download_all_thresholds(workspace, threshold_url, output_meta_list, label_with_date) # TODO: add lst_hucs filtering + # TODO: add a note about why .pkl for thresholds + + print('Processing complete.') # TODO: add runtime tracking. if runtime is super long, add multiproc? + + # TODO: figure out timing, look at file sizes how big + # TODO: bolt in Ali's new logging system + + +if __name__ == '__main__': + # Parse arguments + parser = argparse.ArgumentParser(description='PLACEHOLDER.') # TODO: Add description + + parser.add_argument( + '-e', + '--env-file', + help='OPTIONAL: Docker mount path to the environment file.' + 'default = /data/config/fim_enviro_values.env', + required=False, + default='/data/config/fim_enviro_values.env', + ) + + parser.add_argument('-w', + '--workspace', + help='OPTIONAL: Workspace where all outputs will be saved.', + required=False, + default = '/data/wrds ?' # TODO: Add default data folder path + ) + + parser.add_argument('-l', + '--label', + help='OPTIONAL: Label for filenames. Stucture will be metadata_