diff --git a/sorc/wxflow b/sorc/wxflow index afbbf2da25..de88f84924 160000 --- a/sorc/wxflow +++ b/sorc/wxflow @@ -1 +1 @@ -Subproject commit afbbf2da25ed8f8e539fc379dff96b7956448f95 +Subproject commit de88f849241a7e5d7563d99d6543544f7faf785d diff --git a/ush/python/pygfs/task/analysis.py b/ush/python/pygfs/task/analysis.py index 4286b28f75..a8578a07de 100644 --- a/ush/python/pygfs/task/analysis.py +++ b/ush/python/pygfs/task/analysis.py @@ -3,7 +3,7 @@ import os from logging import getLogger from typing import Any, Dict -from wxflow import (AttrDict, Task, +from wxflow import (AttrDict, Task, WorkflowException, Executable, add_to_datetime, to_timedelta, to_isotime, parse_j2yaml, logit) @@ -50,6 +50,12 @@ def __init__(self, config: Dict[str, Any]): else: _da_prefix = 'gdas' + # Map ocean resolution to number of vertical levels + _ocnres_to_nlev = {'500': 25, + '100': 75, + '050': 75, + '025': 75} + # Extend task_config with variables that are repeatedly used across this class self.task_config.update(AttrDict( { @@ -63,9 +69,9 @@ def __init__(self, config: Dict[str, Any]): 'APREFIX_ENS': f"enkf{self.task_config.RUN.replace('enkf', '')}.t{self.task_config.cyc:02d}z.", 'GPREFIX': f"{_da_prefix}.t{self.task_config.previous_cycle.hour:02d}z.", 'GPREFIX_ENS': f"enkf{_da_prefix}.t{self.task_config.previous_cycle.hour:02d}z.", - 'OCNRES': f"{self.task_config.OCNRES:03d}", 'iau_times_iso': _iau_times_iso, - 'snow_bkg_path': os.path.join('.', 'bkg/'), # TODO: remove this line + 'MOM6_LEVS': _ocnres_to_nlev[f"{self.task_config.OCNRES:03d}"], + 'mom_domain_stack_size': 116640000, # TODO: Make the stack size resolution dependent } )) @@ -80,3 +86,23 @@ def finalize(self) -> None: def clean(self) -> None: super().clean() + + @staticmethod + @logit(logger) + def run(exec_cmd: Executable) -> None: + """Run the executable command + This method will run the executable command + Parameters + ---------- + exec_cmd: Executable + executable command to run + Returns + ---------- + None + """ + + logger.info(f"Executing {exec_cmd}") + try: + exec_cmd() + except WorkflowException as e: + raise WorkflowException(f"An error occurred during execution of {exec_cmd}:\n{e}") from e diff --git a/ush/python/pygfs/task/marine_analysis.py b/ush/python/pygfs/task/marine_analysis.py index 04f6156b46..6f96014ed0 100644 --- a/ush/python/pygfs/task/marine_analysis.py +++ b/ush/python/pygfs/task/marine_analysis.py @@ -1,14 +1,15 @@ #!/usr/bin/env python3 -from datetime import datetime +from datetime import datetime, timedelta +import dateutil.parser as dparser +from netCDF4 import Dataset from logging import getLogger import os -import pygfs.utils.marine_da_utils as mdau from pygfs.jedi import Jedi from pygfs.task.analysis import Analysis from wxflow import (AttrDict, FileHandler, - to_timedelta, to_fv3time, - parse_j2yaml, + to_timedelta, to_fv3time, to_isotime, + parse_j2yaml, parse_j2tmpl, logit) logger = getLogger(__name__.split('.')[-1]) @@ -60,6 +61,19 @@ def __init__(self, config): _rst_date = to_fv3time(self.task_config.current_cycle) _cice_rst_date = to_fv3time(self.task_config.current_cycle) + # Generate list of pseudo model states + dt_pseudo = 3 + fcst_hour_list = list(range(6, 10, dt_pseudo)) + _marine_pseudo_model_states = [] + bkg_date = self.task_config.WINDOW_BEGIN + for fcst_hour in fcst_hour_list: + bkg_date = bkg_date + timedelta(hours=dt_pseudo) + _marine_pseudo_model_states.append({'date': to_isotime(bkg_date), + 'basename': './bkg/', + 'ocn_filename': f"ocean.bkg.f{str(fcst_hour).zfill(3)}.nc", + 'ice_filename': f"ice.bkg.f{str(fcst_hour).zfill(3)}.nc", + 'read_from_file': 1}) + # Create a local dictionary that is repeatedly used across this class self.task_config.update(AttrDict( { @@ -68,10 +82,7 @@ def __init__(self, config): 'berror_model': _berror_model, 'rst_date': _rst_date, 'cice_rst_date': _cice_rst_date, - 'MOM6_LEVS': mdau.get_mom6_levels(str(self.task_config.OCNRES).zfill(3)), - 'DOMAIN_STACK_SIZE': 116640000, # TODO: Make the stack size resolution dependent - 'marine_pseudo_model_states': mdau.gen_bkg_list(bkg_path='./bkg', - window_begin=self.task_config.WINDOW_BEGIN) + 'marine_pseudo_model_states': _marine_pseudo_model_states } )) @@ -114,18 +125,21 @@ def initialize(self) -> None: # prepare the deterministic MOM6 input.nml logger.info(f"Preparing deterministic MOM6 input namelist") - mdau.prep_input_nml(self.task_config) + parse_j2tmpl(os.path.join(self.task_config.PARMmarine, 'mom_input.nml.j2'), + self.task_config, + output_file="mom_input.nml") # prepare the input.nml for the analysis geometry logger.info(f"Preparing analysis geometry input namelist") - mdau.prep_input_nml(self.task_config, output_nml="./anl_geom/mom_input.nml", - simple_geom=True, mom_input="./anl_geom/MOM_input") + parse_j2tmpl(os.path.join(self.task_config.PARMmarine, 'mom_input_anlgeom.nml.j2'), + self.task_config, + output_file="./anl_geom/mom_input.nml") # assert that dates of the history files are correct - mdau.test_hist_date('./INPUT/MOM.res.nc', self.task_config.WINDOW_BEGIN) + test_hist_date('./INPUT/MOM.res.nc', self.task_config.WINDOW_BEGIN) for state in self.task_config.marine_pseudo_model_states: - mdau.test_hist_date(state['basename'] + state['ocn_filename'], - datetime.strptime(state['date'], '%Y-%m-%dT%H:%M:%SZ')) + test_hist_date(state['basename'] + state['ocn_filename'], + datetime.strptime(state['date'], '%Y-%m-%dT%H:%M:%SZ')) # initialize JEDI applications logger.info(f"Initializing JEDI applications") @@ -225,3 +239,19 @@ def initialize_obs_stats(self) -> None: # Initialize the observation statistics logger.info(f"Initializing JEDI SOCA observation statistics application") self.jedi_dict['soca_diag_stats'].initialize(self.task_config) + + +@logit(logger) +def test_hist_date(histfile: str, ref_date: datetime) -> None: + """ + Check that the date in the MOM6 history file is the expected one for the cycle. + TODO: Implement the same for seaice + """ + + ncf = Dataset(histfile, 'r') + hist_date = dparser.parse(ncf.variables['time'].units, fuzzy=True) + timedelta(hours=int(ncf.variables['time'][0])) + ncf.close() + logger.info(f"*** history file date: {hist_date} expected date: {ref_date}") + + if hist_date != ref_date: + raise ValueError(f"FATAL ERROR: Inconsistent bkg date, Expected {ref_date}, {histfile} contains {hist_date}") diff --git a/ush/python/pygfs/task/marine_bmat.py b/ush/python/pygfs/task/marine_bmat.py index 057d2acef8..62adf797af 100644 --- a/ush/python/pygfs/task/marine_bmat.py +++ b/ush/python/pygfs/task/marine_bmat.py @@ -3,12 +3,11 @@ import os import glob from logging import getLogger -import pygfs.utils.marine_da_utils as mdau from pygfs.task.analysis import Analysis from wxflow import (AttrDict, FileHandler, Executable, add_to_datetime, to_timedelta, to_isotime, chdir, - parse_j2yaml, save_as_yaml, + parse_j2yaml, parse_j2tmpl, save_as_yaml, logit) from pygfs.jedi import Jedi @@ -53,8 +52,6 @@ def __init__(self, config): 'CALC_SCALE_EXEC': _calc_scale_exec, 'ENSPERT_RELPATH': _enspert_relpath, 'CALC_SCALE_EXEC': _calc_scale_exec, - 'MOM6_LEVS': mdau.get_mom6_levels(str(self.task_config.OCNRES)), - 'DOMAIN_STACK_SIZE': 116640000, # TODO: Make the stack size resolution dependent } )) @@ -91,11 +88,14 @@ def initialize(self) -> None: FileHandler(self.task_config.data_in).sync() # prepare the deterministic MOM6 input.nml - mdau.prep_input_nml(self.task_config) + parse_j2tmpl(os.path.join(self.task_config.PARMmarine, 'mom_input.nml.j2'), + self.task_config, + output_file="mom_input.nml") # prepare the input.nml for the analysis geometry - mdau.prep_input_nml(self.task_config, output_nml="./anl_geom/mom_input.nml", - simple_geom=True, mom_input="./anl_geom/MOM_input") + parse_j2tmpl(os.path.join(self.task_config.PARMmarine, 'mom_input_anlgeom.nml.j2'), + self.task_config, + output_file="./anl_geom/mom_input.nml") # initialize vtscales python script vtscales_config = self.jedi_dict['soca_parameters_diffusion_vt'].render_jcb_template('soca_vtscales') @@ -147,7 +147,7 @@ def execute(self) -> None: exec_name = self.task_config.CALC_SCALE_EXEC exec_cmd.add_default_arg(exec_name) exec_cmd.add_default_arg('soca_vtscales.yaml') - mdau.run(exec_cmd) + self.run(exec_cmd) self.jedi_dict['soca_parameters_diffusion_vt'].execute() diff --git a/ush/python/pygfs/task/marine_letkf.py b/ush/python/pygfs/task/marine_letkf.py index 8acd81493a..7732d5f1d4 100644 --- a/ush/python/pygfs/task/marine_letkf.py +++ b/ush/python/pygfs/task/marine_letkf.py @@ -1,13 +1,12 @@ #!/usr/bin/env python3 import os -import pygfs.utils.marine_da_utils as mdau from logging import getLogger from pygfs.task.analysis import Analysis from pygfs.jedi import Jedi from typing import Dict from wxflow import (AttrDict, Executable, FileHandler, - parse_j2yaml, save_as_yaml, + parse_j2yaml, parse_j2tmpl, save_as_yaml, to_timedelta, to_YMDH, logit) @@ -51,7 +50,6 @@ def __init__(self, config: Dict) -> None: 'ENSPERT_RELPATH': _enspert_relpath, 'letkf_app': 'true', 'DIST_HALO_SIZE': 3500000, - 'DOMAIN_STACK_SIZE': 116640000, # TODO: Make the stack size resolution dependent } )) @@ -91,7 +89,9 @@ def initialize(self): # prepare the ensemble MOM6 input.nml logger.info(f"Preparing ensemble MOM6 input namelist") - mdau.prep_input_nml(self.task_config) + parse_j2tmpl(os.path.join(self.task_config.PARMmarine, 'mom_input.nml.j2'), + self.task_config, + output_file="mom_input.nml") # initialize JEDI applications logger.info(f"Initializing JEDI applications") diff --git a/ush/python/pygfs/task/marine_recenter.py b/ush/python/pygfs/task/marine_recenter.py index 5039960c9b..418aafb463 100644 --- a/ush/python/pygfs/task/marine_recenter.py +++ b/ush/python/pygfs/task/marine_recenter.py @@ -3,12 +3,11 @@ from logging import getLogger import os from typing import Dict -import pygfs.utils.marine_da_utils as mdau from pygfs.jedi import Jedi from pygfs.task.analysis import Analysis from wxflow import (AttrDict, FileHandler, add_to_datetime, to_timedelta, to_fv3time, to_isotime, - parse_j2yaml, + parse_j2yaml, parse_j2tmpl, logit) logger = getLogger(__name__.split('.')[-1]) @@ -55,7 +54,6 @@ def __init__(self, config: Dict) -> None: 'PARMmarine': os.path.join(self.task_config.PARMgfs, 'gdas', 'marine'), 'ENSPERT_RELPATH': _enspert_relpath, 'cice_rst_date': _cice_rst_date, - 'DOMAIN_STACK_SIZE': 116640000, # TODO: Make the stack size resolution dependent } )) @@ -89,7 +87,9 @@ def initialize(self): FileHandler(self.task_config.data_in).sync() # prepare the MOM6 input.nml - mdau.prep_input_nml(self.task_config) + parse_j2tmpl(os.path.join(self.task_config.PARMmarine, 'mom_input.nml.j2'), + self.task_config, + output_file="mom_input.nml") # initialize JEDI applications logger.info(f"Initializing JEDI applications")