Skip to content

Commit

Permalink
Write flow at gage locations to netCDF (CHANOBS) file (#505)
Browse files Browse the repository at this point in the history
* create chanobs draft

* move writing sequence to function in nhd_io

* yaml documentation

Co-authored-by: awlostowski-noaa <[email protected]>
  • Loading branch information
awlostowski-noaa and awlostowski-noaa authored Dec 9, 2021
1 parent c74cf3f commit 51dc687
Show file tree
Hide file tree
Showing 3 changed files with 189 additions and 3 deletions.
16 changes: 15 additions & 1 deletion doc/v3_doc.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -488,8 +488,22 @@ compute_parameters:
# parameters controlling model outputs
output_parameters:
# ---------------
# chanobs writing parameters
# "CHANOBS" netCDF files contain timeseries of t-route simulated routed flow results
# at gage locations only.
# optional, only needed if writing data to CHANOBS (netCDF) files
chanobs_output:
# ---------------
# path to directory where chanobs files will be written
# (!!) mandatory if writing chanobs files
chanobs_output_directory:
# ---------------
# name of netCDF file where chanobs data will be written
# (!!) mandatory if writng chanobs files
chanobs_filepath:
# ---------------
# csv writing parameters
# (!!) mandatory
# optional, only needed if writing data to CSV
# TODO: remove output dependencies on csv_output field, here
csv_output:
# ---------------
Expand Down
23 changes: 21 additions & 2 deletions src/nwm_routing/src/nwm_routing/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import numpy as np
import pandas as pd
from pathlib import Path
from datetime import datetime
from datetime import datetime, timedelta
import troute.nhd_io as nhd_io
from build_tests import parity_check
import logging
Expand Down Expand Up @@ -73,6 +73,7 @@ def nwm_output_generator(
rsrto = output_parameters.get("hydro_rst_output", None)
chrto = output_parameters.get("chrtout_output", None)
test = output_parameters.get("test_output", None)
chano = output_parameters.get("chanobs_output", None)

if csv_output:
csv_output_folder = output_parameters["csv_output"].get(
Expand All @@ -81,7 +82,7 @@ def nwm_output_generator(
csv_output_segments = csv_output.get("csv_output_segments", None)


if csv_output_folder or rsrto or chrto or test:
if csv_output_folder or rsrto or chrto or chano or test:

start = time.time()
qvd_columns = pd.MultiIndex.from_product(
Expand Down Expand Up @@ -213,6 +214,24 @@ def nwm_output_generator(
LOG.debug("writing CSV file took %s seconds." % (time.time() - start))
# usgs_df_filtered = usgs_df[usgs_df.index.isin(csv_output_segments)]
# usgs_df_filtered.to_csv(output_path.joinpath("usgs_df.csv"))

if chano:

LOG.info("- writing t-route flow results at gage locations to CHANOBS file")
start = time.time()

nhd_io.write_chanobs(
Path(chano['chanobs_output_directory'] + chano['chanobs_filepath']),
flowveldepth,
link_gage_df,
t0,
dt,
nts,
# TODO allow user to pass a list of segments at which they would like to print results
# rather than just printing at gages.
)

LOG.debug("writing flow data to CHANOBS took %s seconds." % (time.time() - start))

# Write out LastObs as netcdf.
# Assumed that this capability is only needed for AnA simulations
Expand Down
153 changes: 153 additions & 0 deletions src/python_framework_v02/troute/nhd_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import time
import logging
from joblib import delayed, Parallel
from cftime import date2num



Expand Down Expand Up @@ -345,6 +346,158 @@ def get_ql_from_wrf_hydro_mf(
def drop_all_coords(ds):
return ds.reset_coords(drop=True)

def write_chanobs(
chanobs_filepath,
flowveldepth,
link_gage_df,
t0,
dt,
nts
):

'''
Write results at gage locations to netcdf.
If the user specified file does not exist, create it.
If the user specified file already exiss, append it.
Arguments
-------------
chanobs_filepath (Path or string) -
flowveldepth (DataFrame) - t-route flow velocity and depth results
link_gage_df (DataFrame) - linkIDs of gages in network
t0 (datetime) - initial time
dt (int) - timestep duration (seconds)
nts (int) - number of timesteps in simulation
Returns
-------------
'''

# array of segment linkIDs at gage locations. Results from these segments will be written
gage_feature_id = link_gage_df.index.to_numpy(dtype = "int32")

# array of simulated flow data at gage locations
gage_flow_data = flowveldepth.loc[link_gage_df.index].iloc[:,::3].to_numpy(dtype="float32")

# array of simulation time
gage_flow_time = [t0 + timedelta(seconds = (i+1) * dt) for i in range(nts)]

if not chanobs_filepath.is_file():

# if no chanobs file exists, create a new one
# open netCDF4 Dataset in write mode
with netCDF4.Dataset(
filename = chanobs_filepath,
mode = 'w',
format = "NETCDF4"
) as f:

# =========== DIMENSIONS ===============
_ = f.createDimension("time", None)
_ = f.createDimension("feature_id", len(gage_feature_id))
_ = f.createDimension("reference_time", 1)

# =========== time VARIABLE ===============
TIME = f.createVariable(
varname = "time",
datatype = 'int32',
dimensions = ("time",),
)
TIME[:] = date2num(
gage_flow_time,
units = "minutes since 1970-01-01 00:00:00 UTC",
calendar = "gregorian"
)
f['time'].setncatts(
{
'long_name': 'model initialization time',
'standard_name': 'forecast_reference_time',
'units': 'minutes since 1970-01-01 00:00:00 UTC'
}
)

# =========== reference_time VARIABLE ===============
REF_TIME = f.createVariable(
varname = "reference_time",
datatype = 'int32',
dimensions = ("reference_time",),
)
REF_TIME[:] = date2num(
t0,
units = "minutes since 1970-01-01 00:00:00 UTC",
calendar = "gregorian"
)
f['reference_time'].setncatts(
{
'long_name': 'vaild output time',
'standard_name': 'time',
'units': 'minutes since 1970-01-01 00:00:00 UTC'
}
)

# =========== feature_id VARIABLE ===============
FEATURE_ID = f.createVariable(
varname = "feature_id",
datatype = 'int32',
dimensions = ("feature_id",),
)
FEATURE_ID[:] = gage_feature_id
f['feature_id'].setncatts(
{
'long_name': 'Reach ID',
'comment': 'NHDPlusv2 ComIDs within CONUS, arbitrary Reach IDs outside of CONUS',
'cf_role:': 'timeseries_id'
}
)

# =========== streamflow VARIABLE ===============
y = f.createVariable(
varname = "streamflow",
datatype = "f4",
dimensions = ("time", "feature_id"),
fill_value = np.nan
)
y[:] = gage_flow_data.reshape(
len(gage_flow_time),
len(gage_feature_id)
)

# =========== GLOBAL ATTRIBUTES ===============
f.setncatts(
{
'model_initialization_time': t0.strftime('%Y-%m-%d_%H:%M:%S'),
'model_output_valid_time': gage_flow_time[0].strftime('%Y-%m-%d_%H:%M:%S'),
}
)

else:

# append data to chanobs file
# open netCDF4 Dataset in r+ mode to append
with netCDF4.Dataset(
filename = chanobs_filepath,
mode = 'r+',
format = "NETCDF4"
) as f:

# =========== format variable data to be appended ===============
time_new = date2num(
gage_flow_time,
units = "minutes since 1970-01-01 00:00:00 UTC",
calendar = "gregorian"
)

flow_new = gage_flow_data.reshape(
len(gage_flow_time),
len(gage_feature_id)
)

# =========== append new flow data ===============
tshape = len(f.dimensions['time'])
f['time'][tshape:(tshape+nts)] = time_new
f['streamflow'][tshape:(tshape+nts)] = flow_new

def write_to_netcdf(f, variables, datatype = 'f4'):

'''
Expand Down

0 comments on commit 51dc687

Please sign in to comment.