Skip to content

Commit

Permalink
Merge pull request #83 from jarq6c/update_gcp
Browse files Browse the repository at this point in the history
Update default caching functionality in `gcp_client`
  • Loading branch information
jarq6c authored May 21, 2021
2 parents c3c6547 + a8fe272 commit 9c0fb46
Show file tree
Hide file tree
Showing 8 changed files with 153 additions and 41 deletions.
1 change: 1 addition & 0 deletions python/gcp_client/CONTRIBUTING.md
1 change: 1 addition & 0 deletions python/gcp_client/LICENSE
3 changes: 3 additions & 0 deletions python/gcp_client/MANIFEST.in
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
include LICENSE


1 change: 1 addition & 0 deletions python/gcp_client/SECURITY.md
1 change: 1 addition & 0 deletions python/gcp_client/TERMS.md
125 changes: 90 additions & 35 deletions python/gcp_client/hydrotools/gcp_client/gcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@
"""

from pandas.core.indexing import convert_from_missing_indexer_tuple
from hydrotools.caches.hdf import HDFCache

from google.cloud import storage
from io import BytesIO
import xarray as xr
Expand Down Expand Up @@ -46,7 +49,10 @@ def __init__(
self,
bucket_name: str = 'national-water-model',
max_processes: int = None,
location_metadata_mapping: pd.DataFrame = None
*,
location_metadata_mapping: pd.DataFrame = None,
cache_path: Union[str, Path] = "gcp_client.h5",
cache_group: str = 'gcp_client'
):
"""Instantiate NWM Data Service.
Expand All @@ -59,6 +65,13 @@ def __init__(
location_metadata_mapping : pandas.DataFrame with nwm_feature_id Index and
columns of corresponding site metadata. Defaults to 7500+ usgs_site_code
used by the NWM for data assimilation.
cache_path : str or pathlib.Path, optional, default 'gcp_client.h5'
Path to HDF5 file used to store data locally.
cache_group : str, optional, default 'gcp_client'
Root group inside cache_path used to store HDF5 datasets.
Structure defaults to storing pandas.DataFrames in PyTable format.
Individual DataFrames can be accessed directly using key patterns
that look like '/{cache_group}/{configuration}/DT{reference_time}'
Returns
-------
Expand Down Expand Up @@ -93,8 +106,9 @@ def __init__(
).set_index('nwm_feature_id')[['usgs_site_code']])
self.crosswalk = pd.concat(dfs)

# Set default dataframe cache
self.cache = Path('gcp_cache.h5')
# Set caching options
self._cache_path = Path(cache_path)
self._cache_group = cache_group

# TODO find publicly available authoritative source of service
# compatible valid model configuration strings
Expand Down Expand Up @@ -269,7 +283,7 @@ def get_DataFrame(

# Rename columns
df = df.rename(columns={
'time': 'valid_time',
'time': 'value_time',
'feature_id': 'nwm_feature_id'
})

Expand All @@ -284,7 +298,7 @@ def get_DataFrame(
# Return DataFrame
return df

def get(
def get_cycle(
self,
configuration: str,
reference_time: str
Expand Down Expand Up @@ -316,18 +330,6 @@ def get(
... )
"""
# Check cache
# TODO Numpy complains about deprecated np.objects, downstream
# packages haven't caught up yet, in this case tables and/or pandas
key = f'{configuration}/DT{reference_time}'
if self.cache.exists():
with pd.HDFStore(self.cache) as store:
if key in store:
with warnings.catch_warnings():
warnings.filterwarnings("ignore", category=DeprecationWarning)

return store[key]

# Get list of blob names
blob_list = self.list_blobs(
configuration=configuration,
Expand Down Expand Up @@ -366,26 +368,87 @@ def get(

# Sort values
df = df.sort_values(
by=['nwm_feature_id', 'valid_time'],
by=['nwm_feature_id', 'value_time'],
ignore_index=True
)

# Cache
# TODO Remove warning when tables/pandas catches up to Numpy
with pd.HDFStore(self.cache) as store:
if key not in store:
with warnings.catch_warnings():
warnings.filterwarnings("ignore", category=DeprecationWarning)

store.put(key, value=df, format='table')

# Return all data
return df

def get(
self,
configuration: str,
reference_time: str,
cache_data: bool = True,
) -> pd.DataFrame:
"""Return streamflow data for a single model cycle in a pandas DataFrame.
Parameters
----------
configuration : str, required
Particular model simulation or forecast configuration. For a list
of available configurations see NWMDataService.configurations
reference_time : str, required
Model simulation or forecast issuance/reference time in
YYYYmmddTHHZ format.
cache_data : bool, optional, default True
If True use a local HDFStore to save retrieved data.
Returns
-------
df : pandas.DataFrame
Simluted or forecasted streamflow data associated with a single
run of the National Water Model.
Examples
--------
>>> from hydrotools.gcp_client import gcp
>>> model_data_service = gcp.NWMDataService()
>>> forecast_data = model_data_service.get(
... configuration = "short_range",
... reference_time = "20210101T01Z"
... )
"""
# Return with caching
if cache_data:
key = f"/{self.cache_group}/{configuration}/DT{reference_time}"
with HDFCache(
path=self.cache_path,
complevel=1,
complib='zlib',
fletch32=True
) as cache:
return cache.get(
self.get_cycle,
key,
configuration=configuration,
reference_time=reference_time
)

# Return without caching
return self.get_cycle(configuration, reference_time)

@property
def bucket_name(self) -> str:
return self._bucket_name

@property
def cache_path(self) -> Path:
return self._cache_path

@cache_path.setter
def cache_path(self, path):
self._cache_path = Path(path)

@property
def cache_group(self) -> str:
return self._cache_group

@cache_group.setter
def cache_group(self, group):
self._cache_group = group

@property
def max_processes(self) -> int:
return self._max_procs
Expand All @@ -407,14 +470,6 @@ def crosswalk(self, mapping):

# Set crosswalk
self._crosswalk = mapping

@property
def cache(self) -> Path:
return self._cache

@cache.setter
def cache(self, filepath):
self._cache = Path(filepath)

@property
def configurations(self) -> list:
Expand Down
11 changes: 9 additions & 2 deletions python/gcp_client/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
SUBPACKAGE_SLUG = f"{NAMESPACE_PACKAGE_NAME}.{SUBPACKAGE_NAME}"

# Subpackage version
VERSION = "2.3.4"
VERSION = "3.0.0"

# Package author information
AUTHOR = "Jason Regina"
Expand All @@ -28,7 +28,14 @@
LONG_DESCRIPTION = f.read()

# Package dependency requirements
REQUIREMENTS = ["pandas", "xarray", "google-cloud-storage", "h5netcdf", "tables", "numpy>=1.20.0"]
REQUIREMENTS = [
"pandas",
"xarray",
"google-cloud-storage",
"hydrotools.caches==0.1.1",
"h5netcdf",
"numpy>=1.20.0"
]

setup(
name=SUBPACKAGE_SLUG,
Expand Down
51 changes: 47 additions & 4 deletions python/gcp_client/tests/test_gcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,17 @@ def test_crosswalk(setup_gcp):
with pytest.raises(Exception):
setup_gcp.crosswalk = pd.DataFrame()

def test_cache(setup_gcp):
assert str(setup_gcp.cache) == 'gcp_cache.h5'
def test_cache_path(setup_gcp):
assert str(setup_gcp.cache_path) == 'gcp_client.h5'

setup_gcp.cache = 'custom_cache.h5'
assert str(setup_gcp.cache) == 'custom_cache.h5'
setup_gcp.cache_path = 'custom_cache.h5'
assert str(setup_gcp.cache_path) == 'custom_cache.h5'

def test_cache_group(setup_gcp):
assert str(setup_gcp.cache_group) == 'gcp_client'

setup_gcp.cache_group = 'simulations'
assert str(setup_gcp.cache_group) == 'simulations'

@pytest.mark.slow
def test_list_blobs(setup_gcp):
Expand Down Expand Up @@ -88,6 +94,43 @@ def test_get_DataFrame(setup_gcp):
df = setup_gcp.get_DataFrame(blob_name, streamflow_only=False)
assert len(df.columns) > 4

@pytest.mark.slow
def test_get_cycle(setup_gcp):
# Test ANA
df = setup_gcp.get_cycle(
configuration="analysis_assim",
reference_time="20210101T01Z"
)
assert df['valid_time'].unique().size == 3

@pytest.mark.slow
def test_cache_disable(setup_gcp):
setup_gcp.cache_path = 'disabled_cache.h5'

# Test ANA
df = setup_gcp.get(
configuration="analysis_assim",
reference_time="20210101T01Z",
cache_data=False
)
assert df['valid_time'].unique().size == 3
assert not setup_gcp.cache_path.exists()

@pytest.mark.slow
def test_cache_key(setup_gcp):
# Test ANA
df1 = setup_gcp.get(
configuration="analysis_assim",
reference_time="20210101T02Z",
cache_data=True
)
first = df1['valid_time'].unique().size

with pd.HDFStore(setup_gcp.cache_path) as store:
df2 = store[f"/{setup_gcp.cache_group}/analysis_assim/DT20210101T02Z"]
second = df2['valid_time'].unique().size
assert first == second

@pytest.mark.slow
def test_get(setup_gcp):
# Test ANA
Expand Down

0 comments on commit 9c0fb46

Please sign in to comment.