diff --git a/pyglider/seaexplorer.py b/pyglider/seaexplorer.py index 6c20c2d..2aa9073 100644 --- a/pyglider/seaexplorer.py +++ b/pyglider/seaexplorer.py @@ -7,6 +7,7 @@ import glob import logging import os +import warnings import numpy as np import polars as pl @@ -280,7 +281,23 @@ def merge_parquet(indir, outdir, deploymentyaml, incremental=False, kind='raw'): if not files: _log.warning(f'No *gli*.parquet files found in {indir}') return False - gli = pl.read_parquet(indir + '/*.gli.sub.*.parquet') + # lets figure out what columns to read for situations where the number of columns changes: + gli0 = pl.read_parquet(files[0]) + gli1 = pl.read_parquet(files[-1]) + columns = list(set(gli0.columns) | set(gli1.columns)) + columns = set([c for c in columns if len(c) > 0]) + dfs = [] + for f in files: + df = pl.read_parquet(f) + missing = columns - set(df.columns) + for col in missing: + df = df.with_columns(pl.lit(None).cast(pl.Float64).alias(col)) + + dfs.append(df.select(sorted(columns))) + gli = pl.concat(dfs, rechunk=True) + + #gli = pl.read_parquet(indir + '/*.gli.sub.*.parquet', columns=columns, + # missing_columns='insert') gli = drop_pre_1971_samples(gli) gli.write_parquet(outgli) _log.info(f'Done writing {outgli}') @@ -290,7 +307,22 @@ def merge_parquet(indir, outdir, deploymentyaml, incremental=False, kind='raw'): if not files: _log.warning(f'No *{kind}*.parquet files found in {indir}') return False - pld = pl.read_parquet(indir + '/*.pld1.' + kind + '.*.parquet') + gli0 = pl.read_parquet(files[0]) + gli1 = pl.read_parquet(files[-1]) + columns = list(set(gli0.columns) | set(gli1.columns)) + columns = set([c for c in columns if len(c) > 0]) + dfs = [] + for f in files: + df = pl.read_parquet(f) + missing = columns - set(df.columns) + for col in missing: + df = df.with_columns(pl.lit(None).cast(pl.Float64).alias(col)) + + dfs.append(df.select(sorted(columns))) + pld = pl.concat(dfs, rechunk=True) + + # pld = pl.read_parquet(indir + '/*.pld1.' + kind + '.*.parquet', + # missing_columns='insert', columns=columns) pld = drop_pre_1971_samples(pld) pld.write_parquet(outpld) @@ -338,6 +370,29 @@ def _remove_fill_values(df, fill_value=9999): return df +def _forward_fill(gli, todo='Lat'): + """Forward-fill the specified column (todo) to propagate the last good value at each row.""" + gli = gli.with_columns([ + pl.col(todo).fill_null(strategy="forward").alias("temp_fill") + ]) + gli = gli.with_columns([ + pl.when( + (pl.col(todo) == pl.col("temp_fill").shift(1)) & pl.col(todo).is_not_null() + ).then(np.nan).otherwise(pl.col(todo)).alias(todo) + ]) + gli = gli.drop("temp_fill") + return gli + + +def _drop_if(gli, todo='Lat', condit='DeadReckoning', value=1): + """Drop Lat if DeadReckoning is 1""" + gli = gli.with_columns([ + pl.when(pl.col(condit) == value).then(np.nan).otherwise(pl.col(todo)).alias(todo) + ]) + return gli + + + def raw_to_timeseries( indir, outdir, @@ -348,12 +403,66 @@ def raw_to_timeseries( maxgap=10, interpolate=False, fnamesuffix='', + deadreckon=False, + replace_attrs=None ): """ - A little different than above, for the 4-file version of the data set. + Convert raw seaexplorer data to a timeseries netcdf file. + + Parameters + ---------- + indir : str + Directory with the raw files are kept. + + outdir : str + Directory to write the matching ``*.nc`` files. + + deploymentyaml : str + YAML text file with deployment information for this glider. + + kind : 'raw' or 'sub' + The type of data to process. 'raw' is the full resolution data, 'sub' + is the sub-sampled data. The default is 'raw'. Note that realtime data is + typically sub-sampled. + + profile_filt_time : float + Time in seconds to use for filtering the profiles. Default is 100. + + profile_min_time : float + Minimum time in seconds for a profile to be considered a valid profile. + Default is 300. + + maxgap : float + Maximum gap in seconds to interpolate over. Default is 10. + + interpolate : bool + If *True*, interpolate the data to fill in gaps. Default is False. + + fnamesuffix : str + Suffix to add to the output file name. Default is ''. + + deadreckon : bool + If *True* use the dead reckoning latitude and longitude data from the glider. Default + is *False*, and latitude and longitude are linearly interpolated between surface fixes. + *False* is the default, and recommended to avoid a-physical underwater jumps. + + replace_attrs : dict or None + replace global attributes in the metadata after reading the metadata + file in. Helpful when processing runs with only a couple things that + change. + + + Returns + ------- + outname : str + Name of the output netcdf file. + """ deployment = utils._get_deployment(deploymentyaml) + if replace_attrs: + for att in replace_attrs: + deployment['metadata'][att] = replace_attrs[att] metadata = deployment['metadata'] ncvar = deployment['netcdf_variables'] @@ -365,6 +474,25 @@ def raw_to_timeseries( sensor = pl.read_parquet(f'{indir}/{id}-{kind}pld.parquet') sensor = _remove_fill_values(sensor) + # don't use lat/lon if deadreckoned: + if not deadreckon: + if not ncvar['latitude']['source'] == 'Lat': + warnings.warn("For deadreckon=False, it is suggested to use 'Lat' as the source for latitude.") + if not ncvar['longitude']['source'] == 'Lon': + warnings.warn("For deadreckon=False, it is suggested to use 'Lon' as the source for longitude.") + if 'DeadReckoning' in gli.columns: + _log.info('Not using deadreckoning; glider has DeadReckoning column') + gli = _drop_if(gli, todo='Lat', condit='DeadReckoning', value=1) + gli = _drop_if(gli, todo='Lon', condit='DeadReckoning', value=1) + else: + _log.info('Not using deadreckoning; glider does not have DeadReckoning column') + gli = _drop_if(gli, todo='Lat', condit='NavState', value=116) + gli = _drop_if(gli, todo='Lon', condit='NavState', value=116) + # drop a lat/lon if it is not unique. Happens when there + # are stale fixes. + gli = _forward_fill(gli, todo='Lat') + gli = _forward_fill(gli, todo='Lon') + # build a new data set based on info in `deploymentyaml.` # We will use ctd as the interpolant ds = xr.Dataset()