diff --git a/data/nws/preprocess_ahps_nws.py b/data/nws/preprocess_ahps_nws.py index 503048de2..42feaad45 100644 --- a/data/nws/preprocess_ahps_nws.py +++ b/data/nws/preprocess_ahps_nws.py @@ -110,7 +110,7 @@ def preprocess_nws(source_dir, destination, reference_raster): # In multiple instances a USGS ID is given but then no USGS rating curve or in some cases no USGS datum is supplied. select_by = 'nws_lid' selector = code - stages, flows, threshold_count = get_thresholds(threshold_url, select_by, selector, threshold='all') + stages, flows, ___ = get_thresholds(threshold_url, select_by, selector, threshold='all') # Make sure at least one valid threshold is supplied from WRDS. threshold_categories = ['action', 'minor', 'moderate', 'major'] diff --git a/data/usgs/preprocess_ahps_usgs.py b/data/usgs/preprocess_ahps_usgs.py index ff0e02221..d5b480218 100755 --- a/data/usgs/preprocess_ahps_usgs.py +++ b/data/usgs/preprocess_ahps_usgs.py @@ -231,7 +231,7 @@ def preprocess_usgs(source_dir, destination, reference_raster): select_by = 'nws_lid' selector = code - stages, flows, threshold_count = get_thresholds(threshold_url, select_by, selector, threshold='all') + stages, flows, ___ = get_thresholds(threshold_url, select_by, selector, threshold='all') # Make sure at least one valid threshold is supplied from WRDS. threshold_categories = ['action', 'minor', 'moderate', 'major'] diff --git a/tools/catfim/generate_categorical_fim.py b/tools/catfim/generate_categorical_fim.py index d42bedbb8..4c12e3abd 100755 --- a/tools/catfim/generate_categorical_fim.py +++ b/tools/catfim/generate_categorical_fim.py @@ -2,6 +2,7 @@ import argparse import glob +import pickle import math import os import shutil @@ -15,7 +16,10 @@ import numpy as np import pandas as pd from dotenv import load_dotenv -from generate_categorical_fim_flows import generate_flows +from generate_categorical_fim_flows import ( + generate_flows, + __load_thresholds, +) from generate_categorical_fim_mapping import ( manage_catfim_mapping, post_process_cat_fim_for_viz, @@ -88,6 +92,7 @@ def process_generate_categorical_fim( past_major_interval_cap, step_num, nwm_metafile, + threshold_file, ): # ================================ @@ -127,6 +132,7 @@ def process_generate_categorical_fim( output_flows_dir = os.path.join(output_catfim_dir, 'flows') output_mapping_dir = os.path.join(output_catfim_dir, 'mapping') attributes_dir = os.path.join(output_catfim_dir, 'attributes') + output_thresholds_dir = os.path.join(output_catfim_dir, 'thresholds') # ================================ set_start_files_folders( @@ -146,6 +152,7 @@ def process_generate_categorical_fim( if file_ext[1].lower() != ".pkl": raise Exception("The nwm_metadata (-me) file appears to be invalid. The extention is not pkl.") + # ================================ # Define default arguments. Modify these if necessary @@ -202,6 +209,50 @@ def process_generate_categorical_fim( ' Verify that you have the correct input folder and if you used the -lh flag that it' ' is a valid matching HUC.' ) + + + # ================================ + if threshold_file != "": + if os.path.exists(threshold_file) == False: + raise Exception("The threshold input file can not be found. Please remove or fix pathing.") + file_ext = os.path.splitext(threshold_file) + if file_ext.count == 0: + raise Exception("The threshold input file appears to be invalid. It is missing an extension.") + if file_ext[1].lower() != ".pkl": + raise Exception("The threshold input file appears to be invalid. The extention is not pkl.") + + # Read pickle file and get a list of unique HUCs + with open(threshold_file, 'rb') as f: + loaded_data = pickle.load(f) + + hucs = loaded_data['huc'].unique().tolist() + threshold_hucs= [str(num).zfill(8) for num in hucs] + + # If a HUC list is specified, check that the HUCs in the list are also in the threshold file + if 'all' not in lst_hucs: + missing_hucs = [huc for huc in valid_ahps_hucs if huc not in threshold_hucs] + if missing_hucs: + raise Exception( + f"The following HUCs from the input list are not present in the threshold file ({threshold_file}): " + f"{', '.join(missing_hucs)}" + ) + else: + # If 'all' is specified, filter valid_ahps_hucs to only those present in the threshold file and warn about dropped HUCs + filtered_hucs = [huc for huc in valid_ahps_hucs if huc in threshold_hucs] + dropped_huc_lst = list(set(valid_ahps_hucs) - set(filtered_hucs)) + if dropped_huc_lst: + FLOG.warning( + f"The following HUCs are present in the FIM run directory but not in the threshold file ({threshold_file}) and will be skipped: " + f"{', '.join(dropped_huc_lst)}" + ) + valid_ahps_hucs = filtered_hucs + num_hucs = len(valid_ahps_hucs) + if num_hucs == 0: + raise ValueError( + f'After filtering, the number of valid HUCs compared to the output directory of {fim_run_dir} is zero.' + ' Verify that you have the correct input folder and threshold file.' + ) + # End of Validation and setup # ================================ @@ -222,6 +273,11 @@ def process_generate_categorical_fim( FLOG.warning('Listed HUCs not available in FIM run directory:') FLOG.warning(dropped_huc_lst) + # Print number of available hucs in threshold_file + if threshold_file != "": + FLOG.lprint(f'Threshold file has data for {len(threshold_hucs)} HUC(s)') + + # For API usage load_dotenv(env_file) API_BASE_URL = os.getenv('API_BASE_URL') if API_BASE_URL is None: @@ -276,6 +332,7 @@ def process_generate_categorical_fim( past_major_interval_cap, nwm_metafile, df_restricted_sites, + threshold_file, ) else: FLOG.lprint("generate_stage_based_categorical_fim step skipped") @@ -322,6 +379,7 @@ def process_generate_categorical_fim( nwm_metafile, FLOG.LOG_FILE_PATH, df_restricted_sites, + threshold_file, ) end = time.time() elapsed_time = (end - start) / 60 @@ -347,6 +405,34 @@ def process_generate_categorical_fim( FLOG.lprint("manage_catfim_mapping step skipped") # end if else + output_pickle_path = os.path.join(output_thresholds_dir, 'thresholds.pkl') + threshold_csv_files = glob.glob(os.path.join(output_thresholds_dir, "*.csv")) + + if threshold_file != "": + FLOG.lprint(f"Skipping threshold CSV compilation because supplied threshold file was used.") + + else: # Check whether there are saved threshold CSVs and, if so, compile them + if len(threshold_csv_files) == 0: + FLOG.lprint("No threshold CSV files found to compile, threshold.pkl will not be created.") + + else: + # Compile saved thresholds data into a single file + all_dataframes = [] + for file in threshold_csv_files: + df = pd.read_csv(file) + all_dataframes.append(df) + os.remove(file) # Remove individual CSV after reading + thresholds_df = pd.concat(all_dataframes, ignore_index=True) + + try: + with open(output_pickle_path, 'wb') as f: + pickle.dump(thresholds_df, f) + FLOG.lprint(f"Successfully combined {len(threshold_csv_files)} CSVs into {output_pickle_path}") + except Exception as e: + FLOG.lprint(f"Error saving pickle file {output_pickle_path}: {e}") + + ###### + FLOG.lprint("") # This is done for SB and FB @@ -497,6 +583,7 @@ def iterate_through_huc_stage_based( parent_log_output_file, child_log_file_prefix, progress_stmt, + threshold_file, ): """_summary_ This and its children will create stage based tifs and catfim data based on a huc @@ -540,7 +627,7 @@ def iterate_through_huc_stage_based( # -- If necessary files exist, continue -- # # Yes, each lid gets a record no matter what, so we need some of these messages duplicated # per lid record - if not os.path.exists(usgs_elev_table): + if not os.path.exists(usgs_elev_table): # TODO: Guam - Will we need to add Guam to this data table? Or include this data in our manual input? msg = ":Internal Error: Missing key data from HUC record (usgs_elev_table missing)" # all_messages.append(huc + msg) MP_LOG.warning(huc + msg) @@ -625,31 +712,27 @@ def iterate_through_huc_stage_based( MP_LOG.warning(huc_lid_id + msg) continue - # Get stages and flows for each threshold from the WRDS API. Priority given to USGS calculated flows. - thresholds, flows, threshold_count = get_thresholds( - threshold_url=threshold_url, select_by='nws_lid', selector=lid, threshold='all' - ) - - # temp debug - # MP_LOG.lprint(f"thresholds are {thresholds}") - # MP_LOG.lprint(f"flows are {flows}") + # Get thresholds from WRDS API or threshold file + thresholds, flows, status_msg = __load_thresholds(output_catfim_dir, threshold_url, lid, huc, threshold_file) - # If no thresholds are found, write message and exit. - # Many sites that used to have 'Error getting thresholds from WRDS API' should now - # have this more descriptive status message - if threshold_count == 0: - msg = ':No thresholds found on WRDS API' - all_messages.append(lid + msg) - MP_LOG.warning(huc_lid_id + msg) - continue + MP_LOG.ltrace(status_msg) - # If there are no thresholds but the threshold_count is greater than 0 or NA (unlikely). - # write message and exit. + # Sept 2025 - Removed threshold_count functionality for now. It had the intended functionality of differentiating API errors + # versus zero thresholds being available versus the incorrect thresholds being available, but I think + # it was actually confusing. + + # Update status if stage thresholds are not found if thresholds is None or len(thresholds) == 0: - msg = ':Error getting thresholds from WRDS API' - all_messages.append(lid + msg) - MP_LOG.warning(huc_lid_id + msg) - continue + if "WRDS response sucessful." in status_msg: + msg = ':WRDS response sucessful but no stage values available' + all_messages.append(lid + msg) + MP_LOG.warning(huc_lid_id + msg) + continue + else: + msg = ':Error getting stage thresholds from WRDS API' + all_messages.append(lid + msg) + MP_LOG.warning(huc_lid_id + msg) + continue # Check if stages are supplied, if not write message and exit. # This message will occur if some thresholds are supplied, but not for the @@ -670,11 +753,12 @@ def iterate_through_huc_stage_based( if err_msg != "": # The error message is already formatted correctly all_messages.append(lid + err_msg) - MP_LOG.warning(huc_lid_id + msg) + MP_LOG.warning(huc_lid_id + err_msg) continue # Look for acceptable elevs - acceptable_usgs_elev_df = __create_acceptable_usgs_elev_df(usgs_elev_df, huc_lid_id) + acceptable_usgs_elev_df = __create_acceptable_usgs_elev_df(usgs_elev_df, huc_lid_id) + # TODO: Guam - Do we need to add Guam to this data? Or have it read in this data from the manual inputs? if acceptable_usgs_elev_df is None or len(acceptable_usgs_elev_df) == 0: msg = ":Unable to find gage data" # TODO: USGS Gage Method: Update this error message to be more descriptive @@ -682,6 +766,7 @@ def iterate_through_huc_stage_based( MP_LOG.warning(huc_lid_id + msg) continue + # TODO: Guam - Do we need to add Guam to this data? Or have it read in this data from the manual inputs? # Get the dem_adj_elevation value from usgs_elev_table.csv. # Prioritize the value that is not from branch 0. lid_usgs_elev, dem_eval_messages = __adj_dem_evalation_val( @@ -1588,6 +1673,7 @@ def generate_stage_based_categorical_fim( past_major_interval_cap, nwm_metafile, df_restricted_sites, + threshold_file, ): ''' Sep 2024, @@ -1605,11 +1691,8 @@ def generate_stage_based_categorical_fim( huc_messages_dir = os.path.join(output_mapping_dir, 'huc_messages') os.makedirs(huc_messages_dir, exist_ok=True) - FLOG.lprint("Starting generate_flows (Stage Based)") - - # If it is stage based, generate flows returns all of these objects. - # If flow based, generate flows returns only + FLOG.lprint("Starting generate_flows (Stage Based)") # Generate flows is only using one of the incoming job number params # so let's multiply -jh (huc) and -jn (inundate) job_flows = job_number_huc * job_number_inundate @@ -1620,7 +1703,7 @@ def generate_stage_based_categorical_fim( # stage based doesn't really need generated flow data # But for flow based, it really does use it to generate flows. # - (huc_dictionary, sites_gdf, ___, threshold_url, all_lists, nwm_flows_df, nwm_flows_alaska_df) = ( + (huc_dictionary, sites_gdf, ___, threshold_url, all_lists, flows_df_dict) = ( generate_flows( output_catfim_dir, nwm_us_search, @@ -1632,6 +1715,7 @@ def generate_stage_based_categorical_fim( nwm_metafile, str(FLOG.LOG_FILE_PATH), df_restricted_sites, + threshold_file, ) ) @@ -1652,7 +1736,14 @@ def generate_stage_based_categorical_fim( if huc in lst_hucs: # FLOG.lprint(f'Generating stage based catfim for : {huc}') - nwm_flows_region_df = nwm_flows_alaska_df if str(huc[:2]) == '19' else nwm_flows_df + if huc[:4] == '2201': # Guam + nwm_flows_region_df = flows_df_dict['nhd_flows_guam_df'] + elif huc[:4] == '2203': # American Samoa + nwm_flows_region_df = flows_df_dict['nhd_flows_americansamoa_df'] + elif huc[:2] == '19': # Alaska + nwm_flows_region_df = flows_df_dict['nwm_flows_alaska_df'] + else: # CONUS + Hawaii + Puerto Rico + nwm_flows_region_df = flows_df_dict['nwm_flows_df'] progress_stmt = f"index {huc_index + 1} of {num_hucs}" executor.submit( @@ -1671,6 +1762,7 @@ def generate_stage_based_categorical_fim( str(FLOG.LOG_FILE_PATH), child_log_file_prefix, progress_stmt, + threshold_file, ) huc_index += 1 @@ -1706,6 +1798,8 @@ def generate_stage_based_categorical_fim( # Write to file if len(all_csv_df) == 0: raise Exception(f"no csv files found - missing attribute CSVs in {attributes_dir}") + # TODO: This error currently occurs if no sites are mapped (usually in a test). + # Make a test that catches this earlier and provides a more legible error/warning message. all_csv_df.to_csv(os.path.join(attributes_dir, 'nws_lid_attributes.csv'), index=False) @@ -1968,6 +2062,16 @@ def set_start_files_folders( default="", ) + parser.add_argument( + '-tf', + '--threshold-file', + help='OPTIONAL: If you have a pre-existing threshold file, you can path to it here. ' + 'Providing this manual input will prevent the WRDS API from being queried for thresholds.' + ' e.g.: /data/catfim/threshold_file.pkl', + required=False, + default="", + ) + parser.add_argument( '-cv', '--catfim-version', diff --git a/tools/catfim/generate_categorical_fim_flows.py b/tools/catfim/generate_categorical_fim_flows.py index 1772ea51a..1b26a2feb 100755 --- a/tools/catfim/generate_categorical_fim_flows.py +++ b/tools/catfim/generate_categorical_fim_flows.py @@ -2,6 +2,7 @@ import argparse import copy +import csv import glob import os import pickle @@ -61,10 +62,12 @@ def generate_flows_for_huc( output_flows_dir, attributes_dir, huc_messages_dir, - nwm_flows_df, + flows_df_dict, #nwm_flows_df, # TODO: Do we need to update this to be region specific df? or the dict of dfs? parent_log_output_file, child_log_file_prefix, df_restricted_sites, + output_catfim_dir, + threshold_file, ): try: @@ -143,20 +146,22 @@ def generate_flows_for_huc( # Careful, for "all_message.append" the syntax into it must be f'{lid}: (whever messages) # this is gets parsed and logic used against it. - # for Stage based, is uses stage values from threshold data supplied by WRDS - # but here (for flow), it uses the values from the flows data from WRDS - stages, flows, threshold_count = get_thresholds( - threshold_url=threshold_url, select_by='nws_lid', selector=lid, threshold='all' - ) + stages, flows, status_msg = __load_thresholds(output_catfim_dir, threshold_url, lid, huc, threshold_file) - # Yes.. stages, we will handle missing flows later even though we don't use the stage here - if stages is None or len(stages) == 0: - msg = ':Error getting stages values from WRDS API' - all_messages.append(lid + msg) - MP_LOG.warning(huc_lid_id + msg) - continue + MP_LOG.lprint(status_msg) # TEMP DEBUG - # MP_LOG.lprint(f"Thresholds for {huc_lid_id} are : {thresholds}") + # Update status if flows are not found + if flows is None or len(flows) == 0: # Changed to flows Sept' 25 + if "WRDS response sucessful." in status_msg: + msg = ':WRDS response sucessful but no flow values available' + all_messages.append(lid + msg) + MP_LOG.warning(huc_lid_id + msg) + continue + else: + msg = ':Error getting flows values from WRDS API' + all_messages.append(lid + msg) + MP_LOG.warning(huc_lid_id + msg) + continue # Check if stages are supplied, if not write message and exit. if all(stages.get(category, None) is None for category in categories): @@ -180,7 +185,7 @@ def generate_flows_for_huc( ) # Sept 2024: Should we skip these functions that are seen in stage based? Yes - # Flow doesn't need all of the evalation stuff + # Flow doesn't need all of the elevation stuff # acceptable_usgs_elev_df = __create_acceptable_usgs_elev_df(usgs_elev_df, huc_lid_id) # lid_usgs_elev, dem_eval_messages = __adj_dem_evalation_val( # lid_altitude = metadata['usgs_data']['altitude'] @@ -192,7 +197,18 @@ def generate_flows_for_huc( desired_order = metadata['nwm_feature_data']['stream_order'] # Filter segments to be of like stream order. - segments = filter_nwm_segments_by_stream_order(unfiltered_segments, desired_order, nwm_flows_df) + # TODO: Guam - test this update + if huc[:4] == '2201': # Guam + nwm_flows_region_df = flows_df_dict['nhd_flows_guam_df'] + elif huc[:4] == '2203': # American Samoa + nwm_flows_region_df = flows_df_dict['nhd_flows_americansamoa_df'] + elif huc[:2] == '19': # Alaska + nwm_flows_region_df = flows_df_dict['nwm_flows_alaska_df'] + else: # CONUS + Hawaii + Puerto Rico + nwm_flows_region_df = flows_df_dict['nwm_flows_df'] + + segments = filter_nwm_segments_by_stream_order(unfiltered_segments, desired_order, nwm_flows_region_df) + # Previous input was nwm_flows_df, but now it is region specific df (9/25/25) # If there are no segments, write message and exit out if not segments or len(segments) == 0: @@ -374,6 +390,7 @@ def generate_flows( nwm_metafile, log_output_file, df_restricted_sites, + threshold_file, ): # TODO; Most docstrings like this are now very outdated and need updating @@ -416,8 +433,11 @@ def generate_flows( API_BASE_URL, WBD_LAYER = get_env_paths(env_file) nwm_us_search = int(nwm_us_search) nwm_ds_search = int(nwm_ds_search) - metadata_url = f'{API_BASE_URL}/metadata' - threshold_url = f'{API_BASE_URL}/nws_threshold' + # metadata_url = f'{API_BASE_URL}/metadata' ## TEMP DEBUG - TODO: Guam - add back in after testing + # threshold_url = f'{API_BASE_URL}/nws_threshold' ## TEMP DEBUG - TODO: Guam - add back in after testing + + metadata_url = f'{API_BASE_URL}/metadataBROKEN' ## TEMP DEBUG + threshold_url = f'{API_BASE_URL}/nws_thresholdBROKEN' ## TEMP DEBUG ################################################################### # Create HUC message directory to store messages that will be read and joined after multiprocessing @@ -429,12 +449,27 @@ def generate_flows( start_dt = datetime.now(timezone.utc) # Open NWM flows geopackages - # TODO: Jul 2025: These shoudl be changed to a bash_variable or something other than - # hardcoding. Granted.. we don't have new ones but might someday - nwm_flows_gpkg = r'/data/inputs/nwm_hydrofabric/nwm_flows.gpkg' + # TODO: Pull from bash_variables.env once we switch from using catfim.env to bash_variables.env + nwm_flows_gpkg = r'/data/inputs/nwm_hydrofabric/nwm_flows.gpkg' # TODO: Get from bash_variables.env + nwm_flows_alaska_gpkg = r'/data/inputs/nwm_hydrofabric/nwm_flows_alaska_nwmV3_ID.gpkg' # TODO: Get from bash_variables.env + input_nhd_flows_Guam = r'/data/inputs/nhdplus/Guam_6637/NHDFlowline_Guam_6637.gpkg' # TODO: Get from bash_variables.env + input_nhd_flows_AmericanSamoa = r'/data/inputs/nhdplus/AmericanSamoa_32702/NHDFlowline_AmericanSamoa_32702.gpkg' # TODO: Get from bash_variables.env + nwm_flows_df = gpd.read_file(nwm_flows_gpkg) - nwm_flows_alaska_gpkg = r'/data/inputs/nwm_hydrofabric/nwm_flows_alaska_nwmV3_ID.gpkg' nwm_flows_alaska_df = gpd.read_file(nwm_flows_alaska_gpkg) + nhd_flows_guam_df = gpd.read_file(input_nhd_flows_Guam) + nhd_flows_americansamoa_df = gpd.read_file(input_nhd_flows_AmericanSamoa) + + # TODO: Guam - pass this dictionary into functions instead of all four dfs + + # Add the dfs to a dictionary for easy access later + flows_df_dict = { + 'nwm_flows_df': nwm_flows_df, + 'nwm_flows_alaska_df': nwm_flows_alaska_df, + 'nhd_flows_guam_df': nhd_flows_guam_df, + 'nhd_flows_americansamoa_df': nhd_flows_americansamoa_df + } + # nwm_metafile might be an empty string # maybe ensure all projections are changed to one standard output of 3857 (see shared_variables) as the come out @@ -443,7 +478,7 @@ def generate_flows( # Filter the meta list to just HUCs in the fim run output or huc if sent in as a param all_meta_lists = __load_nwm_metadata( output_catfim_dir, metadata_url, nwm_us_search, nwm_ds_search, nwm_metafile - ) + ) # TODO: Guam - will use nwm_metafile option end_dt = datetime.now(timezone.utc) time_duration = end_dt - start_dt @@ -484,8 +519,7 @@ def generate_flows( metadata_url, threshold_url, all_meta_lists, - nwm_flows_df, - nwm_flows_alaska_df, + flows_df_dict, ) # only flow based needs the "flow" dir @@ -501,10 +535,14 @@ def generate_flows( with ProcessPoolExecutor(max_workers=job_number_huc) as executor: for huc in huc_dictionary: - # nwm_flows_region_df = nwm_flows_df # To exclude Alaska - nwm_flows_region_df = ( - nwm_flows_alaska_df if huc[:2] == '19' else nwm_flows_df - ) # To include Alaska + if huc[:4] == '2201': # Guam + nwm_flows_region_df = flows_df_dict['nhd_flows_guam_df'] + elif huc[:4] == '2203': # American Samoa + nwm_flows_region_df = flows_df_dict['nhd_flows_americansamoa_df'] + elif huc[:2] == '19': # Alaska + nwm_flows_region_df = flows_df_dict['nwm_flows_alaska_df'] + else: # CONUS + Hawaii + Puerto Rico + nwm_flows_region_df = flows_df_dict['nwm_flows_df'] # Deep copy that speed up Multi-Proc a little as all_meta_lists # is a huge object. Need to figure out how to filter that down somehow @@ -523,6 +561,8 @@ def generate_flows( log_output_file, child_log_file_prefix, df_restricted_sites, + output_catfim_dir, + threshold_file, ) # end ProcessPoolExecutor @@ -650,6 +690,66 @@ def generate_flows( FLOG.lprint(f"End Wrapping up flows generation Duration: {str(all_time_duration).split('.')[0]}") print() +def __load_thresholds(output_catfim_dir, threshold_url, lid, huc, threshold_file): + + if os.path.isfile(threshold_file) == True: + # FLOG.lprint(f"Threshold file already downloaded and exists at {threshold_file}") # TODO: Guam - this option will be used + + # Read pickle file and get the stages and flows dictionary for the site + with open(threshold_file, 'rb') as f: + loaded_data = pickle.load(f) + site_data = loaded_data[loaded_data['nws_lid'] == lid.upper()] # TODO: Check whether we need an upper or lower case conversion + + # Error if site_data is empty + if site_data.empty: + FLOG.error(f"No threshold data found for LID {lid} in the provided threshold file.") + return None, None, 0 + + # Make output dictionaries for stages and flows + # Assuming there's only one record per threshold_type per lid + stages = site_data.loc[site_data['threshold_type'] == 'stages'].to_dict(orient='records')[0] + del stages['threshold_type'] + del stages['huc'] + + flows = site_data.loc[site_data['threshold_type'] == 'flows'].to_dict(orient='records')[0] + del flows['threshold_type'] + del flows['huc'] + + # # Print out the stages and flows for debugging ## TEMP DEBUG + # FLOG.lprint(f"Stages for LID {lid}: {stages}") ## TEMP DEBUG + # FLOG.lprint(f"Flows for LID {lid}: {flows}") ## TEMP DEBUG + + status_msg = 'Thresholds loaded from .pkl file.' + + else: + # Get thresholds from the WRDS API + stages, flows, status_msg = get_thresholds( + threshold_url=threshold_url, select_by='nws_lid', selector=lid, threshold='all' + ) + + # Save thresholds to output_thresholds_dir (if the file of that LID doesn't already exist) + output_thresholds_dir = os.path.join(output_catfim_dir, "thresholds") + if not os.path.exists(output_thresholds_dir): + os.mkdir(output_thresholds_dir) + + lid_thresholds_csv = os.path.join(output_thresholds_dir, f"thresholds_{lid.lower()}.csv") + + if not os.path.isfile(lid_thresholds_csv): + thresholds = [{'threshold_type': 'stages', 'huc': huc, **stages}, # TODO: Add HUC + {'threshold_type': 'flows', 'huc': huc, **flows}] # TODO: Add HUC + + with open(lid_thresholds_csv, mode='w', newline='') as file: + writer = csv.DictWriter(file, fieldnames=thresholds[1].keys()) + writer.writeheader() + writer.writerows(thresholds) + + + # TODO: In CatFIM post-processing, add a functionality to append all those files together into one thresholds.pkl + # so we can rerun CatFIM without hitting the WRDS API (like we did with __load_nwm_metadata) + # Will need to double check that the memory usage isn't insane with this new idea. + + + return stages, flows, status_msg # local script calls __load_nwm_metadata so FLOG is already setup def __load_nwm_metadata(output_catfim_dir, metadata_url, nwm_us_search, nwm_ds_search, nwm_metafile): @@ -662,7 +762,7 @@ def __load_nwm_metadata(output_catfim_dir, metadata_url, nwm_us_search, nwm_ds_s # WRDS unless we need a smaller or modified version. This one likely has all nws_lid data. if os.path.isfile(nwm_metafile) == True: - FLOG.lprint(f"Meta file already downloaded and exists at {nwm_metafile}") + FLOG.lprint(f"Meta file already downloaded and exists at {nwm_metafile}") # TODO: Guam - this option will be used with open(nwm_metafile, "rb") as p_handle: output_meta_list = pickle.load(p_handle) @@ -819,6 +919,15 @@ def __load_nwm_metadata(output_catfim_dir, metadata_url, nwm_us_search, nwm_ds_s default="", ) + parser.add_argument( + '-tf', + '--threshold-file', + help='OPTIONAL: Path to the pre-made pickle file that already holds the thresholds', + required=False, + type=str, + default="", + ) + args = vars(parser.parse_args()) # Run get_env_paths and static_flow_lids diff --git a/tools/catfim/generate_categorical_fim_mapping.py b/tools/catfim/generate_categorical_fim_mapping.py index 3c808909a..b29b9cbbe 100755 --- a/tools/catfim/generate_categorical_fim_mapping.py +++ b/tools/catfim/generate_categorical_fim_mapping.py @@ -663,7 +663,7 @@ def post_process_huc( # There may not necessarily be an attributes.csv for this lid, depending on how flow processing went # lots of lids fall out in the attributes or flow steps. if os.path.exists(nws_lid_attributes_filename) == False: - MP_LOG.warning(f"{ahps_lid} has no attributes file which may perfectly fine.") + MP_LOG.warning(f"{ahps_lid} has no attributes file (which may be perfectly fine)") continue # We are going to do an MP in MP. diff --git a/tools/tools_shared_functions.py b/tools/tools_shared_functions.py index 3525f849e..6c5f8608e 100755 --- a/tools/tools_shared_functions.py +++ b/tools/tools_shared_functions.py @@ -5,6 +5,7 @@ import json import logging import os +import csv import pathlib import traceback from pathlib import Path @@ -12,6 +13,7 @@ import geopandas as gpd import numpy as np import pandas as pd +import pickle import rasterio import rasterio.crs import rasterio.shutil @@ -1058,14 +1060,17 @@ def get_thresholds(threshold_url, select_by, selector, threshold='all'): Dictionary of stages at each threshold. flows : DICT Dictionary of flows at each threshold. - threshold_count : INT - Number of thresholds available for the site. + status_msg : STR + Status of API call and data availability. ''' params = {} params['threshold'] = threshold url = f'{threshold_url}/{select_by}/{selector}' + # Initialize status message + status_msg = f"Selector: {selector}: " + # response = requests.get(url, params=params, verify=False) # Call the API @@ -1081,9 +1086,12 @@ def get_thresholds(threshold_url, select_by, selector, threshold='all'): if response.status_code == 200: thresholds_json = response.json() + # Get metadata thresholds_info = thresholds_json['value_set'] threshold_count = thresholds_json['_metrics']['threshold_count'] + status_msg += f"WRDS response sucessful. {threshold_count} threshold types available. " + # Initialize stages/flows dictionaries stages = {} flows = {} @@ -1104,6 +1112,8 @@ def get_thresholds(threshold_url, select_by, selector, threshold='all'): threshold_data = thresholds_info[0] # Get stages and flows for each threshold if threshold_data: + status_msg += "Thresholds available. " + stages = threshold_data['stage_values'] flows = threshold_data['calc_flow_values'] # Add source information to stages and flows. Flows source inside a nested dictionary. Remove key once source assigned to flows. @@ -1120,13 +1130,126 @@ def get_thresholds(threshold_url, select_by, selector, threshold='all'): flows['usgs_site_code'] = threshold_data.get('metadata').get('usgs_site_code') stages['units'] = threshold_data.get('metadata').get('stage_units') flows['units'] = threshold_data.get('metadata').get('calc_flow_units') - return stages, flows, threshold_count + return stages, flows, status_msg else: - print("WRDS response error: ") + status_msg += "WRDS response error." + print(status_msg) + stages = None + flows = None + + return stages, flows, status_msg + +def download_all_thresholds(threshold_url, metadata_pkl_file, output_folder): + """ + Download all thresholds from the WRDS API for a list of LIDs and save them as CSV files. + Combine all CSV files into a single pickle file. + + This function can be run in a Jupyter notebook or as a standalone script in a Python environment + to predownload thresholds for all sites in the metadata pickle file. + + Parameters: + - threshold_url: str, URL of the WRDS API endpoint for thresholds. + - metadata_pkl_file: str, path to the metadata pickle file containing site information. + - output_folder: str, path to the folder where output files will be saved. + + Returns: + - None + + Outputs: + - CSV files for each LID in a subfolder 'threshold_download' within the output_folder. + - A combined pickle file 'all_thresholds.pkl' containing all thresholds. + """ + + # Check that the inputs exist + if not os.path.exists(metadata_pkl_file): + raise FileNotFoundError(f"Metadata pickle file not found: {metadata_pkl_file}") + + if not os.path.exists(output_folder): + raise FileNotFoundError(f"Output folder not found: {output_folder}") + + # Make an intermediate folder in the output folder + thresholds_folder = os.path.join(output_folder, 'threshold_download') + try: + os.makedirs(thresholds_folder, exist_ok=True) + except Exception as e: + raise OSError(f"Error creating directory {thresholds_folder}: {e}") + + # Get list of all LIDs and their corresponding HUC from the metadata.pkl file + with open(metadata_pkl_file, 'rb') as f: + metadata = pickle.load(f) + + print('Start threshold downloads.') + + lid_list = [] + huc_lid_dict = {} + + # Iterate through the metadata to get a list of LIDs and HUCs + for site_entry in metadata: + lid_i = site_entry['identifiers']['nws_lid'] + huc_nws_i = site_entry['nws_preferred']['huc'] + huc_usgs_i = site_entry['usgs_preferred']['huc'] + + huc_i = huc_usgs_i if huc_nws_i is None else huc_nws_i -# print(response) + lid_list.append(lid_i) + huc_lid_dict[lid_i] = huc_i + print('HUC/LID list assembled.') + + # Iterate through LIDs in huc_lid_dict and get thresholds from the WRDS API + for lid, huc in huc_lid_dict.items(): + + try: + stages, flows, status = get_thresholds( + threshold_url=threshold_url, select_by='nws_lid', selector=lid, threshold='all' + ) + except Exception as e: + print(f"Error retrieving thresholds for LID {lid}: {e}") + print(status) + continue + + # Specify the CSV file name and path + csv_filename = f"thresholds_{lid.lower()}.csv" + csv_filepath = os.path.join(thresholds_folder, csv_filename) + + # Combine and label thresholds + thresholds = [{'threshold_type': 'stages', 'huc': huc, **stages}, + {'threshold_type': 'flows', 'huc': huc, **flows}] + + # Open the CSV file and write threshold data + with open(csv_filepath, mode='w', newline='') as file: + writer = csv.DictWriter(file, fieldnames=thresholds[1].keys()) + writer.writeheader() + writer.writerows(thresholds) + + output_pickle_path = os.path.join(thresholds_folder, 'all_thresholds.pkl') + + print('Saved threshold CSVs for sites.') + + + # Get all threshold CSVs + csv_files = glob.glob(os.path.join(thresholds_folder, "*.csv")) + + # Compile thresholds into a dataframe + all_dataframes = [] + for file in csv_files: + df = pd.read_csv(file) + all_dataframes.append(df) + os.remove(file) # Remove individual CSV after reading + + + thresholds_df = pd.concat(all_dataframes, ignore_index=True) + + # Save the combined DataFrame to a pickle file + try: + with open(output_pickle_path, 'wb') as f: + pickle.dump(thresholds_df, f) + print(f"Successfully combined {len(csv_files)} CSVs into {output_pickle_path}") + except Exception as e: + print(f"Error saving pickle file {output_pickle_path}: {e}") + + print('Thresholds compilation complete.') ######################################################################## # Function to write flow file