Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 131 additions & 3 deletions pyglider/seaexplorer.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import glob
import logging
import os
import warnings

import numpy as np
import polars as pl
Expand Down Expand Up @@ -280,7 +281,23 @@ def merge_parquet(indir, outdir, deploymentyaml, incremental=False, kind='raw'):
if not files:
_log.warning(f'No *gli*.parquet files found in {indir}')
return False
gli = pl.read_parquet(indir + '/*.gli.sub.*.parquet')
# lets figure out what columns to read for situations where the number of columns changes:
gli0 = pl.read_parquet(files[0])
gli1 = pl.read_parquet(files[-1])
columns = list(set(gli0.columns) | set(gli1.columns))
columns = set([c for c in columns if len(c) > 0])
dfs = []
for f in files:
df = pl.read_parquet(f)
missing = columns - set(df.columns)
for col in missing:
df = df.with_columns(pl.lit(None).cast(pl.Float64).alias(col))

dfs.append(df.select(sorted(columns)))
gli = pl.concat(dfs, rechunk=True)

#gli = pl.read_parquet(indir + '/*.gli.sub.*.parquet', columns=columns,
# missing_columns='insert')
gli = drop_pre_1971_samples(gli)
gli.write_parquet(outgli)
_log.info(f'Done writing {outgli}')
Expand All @@ -290,7 +307,22 @@ def merge_parquet(indir, outdir, deploymentyaml, incremental=False, kind='raw'):
if not files:
_log.warning(f'No *{kind}*.parquet files found in {indir}')
return False
pld = pl.read_parquet(indir + '/*.pld1.' + kind + '.*.parquet')
gli0 = pl.read_parquet(files[0])
gli1 = pl.read_parquet(files[-1])
columns = list(set(gli0.columns) | set(gli1.columns))
columns = set([c for c in columns if len(c) > 0])
dfs = []
for f in files:
df = pl.read_parquet(f)
missing = columns - set(df.columns)
for col in missing:
df = df.with_columns(pl.lit(None).cast(pl.Float64).alias(col))

dfs.append(df.select(sorted(columns)))
pld = pl.concat(dfs, rechunk=True)

# pld = pl.read_parquet(indir + '/*.pld1.' + kind + '.*.parquet',
# missing_columns='insert', columns=columns)
pld = drop_pre_1971_samples(pld)
pld.write_parquet(outpld)

Expand Down Expand Up @@ -338,6 +370,29 @@ def _remove_fill_values(df, fill_value=9999):
return df


def _forward_fill(gli, todo='Lat'):
"""Forward-fill the specified column (todo) to propagate the last good value at each row."""
gli = gli.with_columns([
pl.col(todo).fill_null(strategy="forward").alias("temp_fill")
])
gli = gli.with_columns([
pl.when(
(pl.col(todo) == pl.col("temp_fill").shift(1)) & pl.col(todo).is_not_null()
).then(np.nan).otherwise(pl.col(todo)).alias(todo)
])
gli = gli.drop("temp_fill")
return gli


def _drop_if(gli, todo='Lat', condit='DeadReckoning', value=1):
"""Drop Lat if DeadReckoning is 1"""
gli = gli.with_columns([
pl.when(pl.col(condit) == value).then(np.nan).otherwise(pl.col(todo)).alias(todo)
])
return gli



def raw_to_timeseries(
indir,
outdir,
Expand All @@ -348,12 +403,66 @@ def raw_to_timeseries(
maxgap=10,
interpolate=False,
fnamesuffix='',
deadreckon=False,
replace_attrs=None
):
"""
A little different than above, for the 4-file version of the data set.
Convert raw seaexplorer data to a timeseries netcdf file.

Parameters
----------
indir : str
Directory with the raw files are kept.

outdir : str
Directory to write the matching ``*.nc`` files.

deploymentyaml : str
YAML text file with deployment information for this glider.

kind : 'raw' or 'sub'
The type of data to process. 'raw' is the full resolution data, 'sub'
is the sub-sampled data. The default is 'raw'. Note that realtime data is
typically sub-sampled.

profile_filt_time : float
Time in seconds to use for filtering the profiles. Default is 100.

profile_min_time : float
Minimum time in seconds for a profile to be considered a valid profile.
Default is 300.

maxgap : float
Maximum gap in seconds to interpolate over. Default is 10.

interpolate : bool
If *True*, interpolate the data to fill in gaps. Default is False.

fnamesuffix : str
Suffix to add to the output file name. Default is ''.

deadreckon : bool
If *True* use the dead reckoning latitude and longitude data from the glider. Default
is *False*, and latitude and longitude are linearly interpolated between surface fixes.
*False* is the default, and recommended to avoid a-physical underwater jumps.

replace_attrs : dict or None
replace global attributes in the metadata after reading the metadata
file in. Helpful when processing runs with only a couple things that
change.


Returns
-------
outname : str
Name of the output netcdf file.

"""

deployment = utils._get_deployment(deploymentyaml)
if replace_attrs:
for att in replace_attrs:
deployment['metadata'][att] = replace_attrs[att]

metadata = deployment['metadata']
ncvar = deployment['netcdf_variables']
Expand All @@ -365,6 +474,25 @@ def raw_to_timeseries(
sensor = pl.read_parquet(f'{indir}/{id}-{kind}pld.parquet')
sensor = _remove_fill_values(sensor)

# don't use lat/lon if deadreckoned:
if not deadreckon:
if not ncvar['latitude']['source'] == 'Lat':
warnings.warn("For deadreckon=False, it is suggested to use 'Lat' as the source for latitude.")
if not ncvar['longitude']['source'] == 'Lon':
warnings.warn("For deadreckon=False, it is suggested to use 'Lon' as the source for longitude.")
if 'DeadReckoning' in gli.columns:
_log.info('Not using deadreckoning; glider has DeadReckoning column')
gli = _drop_if(gli, todo='Lat', condit='DeadReckoning', value=1)
gli = _drop_if(gli, todo='Lon', condit='DeadReckoning', value=1)
else:
_log.info('Not using deadreckoning; glider does not have DeadReckoning column')
gli = _drop_if(gli, todo='Lat', condit='NavState', value=116)
gli = _drop_if(gli, todo='Lon', condit='NavState', value=116)
# drop a lat/lon if it is not unique. Happens when there
# are stale fixes.
gli = _forward_fill(gli, todo='Lat')
gli = _forward_fill(gli, todo='Lon')

# build a new data set based on info in `deploymentyaml.`
# We will use ctd as the interpolant
ds = xr.Dataset()
Expand Down
Loading