Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixing a bug in thredds loader that limited crawling ability #44

Merged
merged 4 commits into from
Jan 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,13 @@ DOCKER_TAG := ghcr.io/crim-ca/stac-populator:$(APP_VERSION)
IMP_DIR := $(APP_NAME)/implementations
STAC_HOST ?= http://localhost:8880/stac
# CATALOG = https://pavics.ouranos.ca/twitcher/ows/proxy/thredds/catalog/birdhouse/testdata/xclim/cmip6/catalog.html
CATALOG = https://daccs.cs.toronto.edu/twitcher/ows/proxy/thredds/catalog/datasets/CMIP6/catalog.html
# CATALOG = https://daccs.cs.toronto.edu/twitcher/ows/proxy/thredds/catalog/datasets/CMIP6/catalog.html
# CATALOG = https://daccs.cs.toronto.edu/twitcher/ows/proxy/thredds/catalog/datasets/CMIP6/CMIP/NOAA-GFDL/catalog.html
# CATALOG = https://daccs.cs.toronto.edu/twitcher/ows/proxy/thredds/catalog/datasets/CMIP6/CMIP/AS-RCEC/catalog.html

# CATALOG = https://daccs.cs.toronto.edu/twitcher/ows/proxy/thredds/catalog/datasets/CMIP6/CMIP/NUIST/catalog.html
CATALOG = https://daccs.cs.toronto.edu/twitcher/ows/proxy/thredds/catalog/datasets/CMIP6/CMIP/MIROC/catalog.html

PYESSV_ARCHIVE_DIR ?= ~/.esdoc/pyessv-archive
PYESSV_ARCHIVE_REF ?= https://github.com/ES-DOC/pyessv-archive

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import argparse
import os.path
from typing import NoReturn, Optional, MutableMapping, Any
from typing import Any, MutableMapping, NoReturn, Optional

from requests.sessions import Session

Expand Down Expand Up @@ -45,8 +45,9 @@ def make_parser() -> argparse.ArgumentParser:
parser.add_argument("directory", type=str, help="Path to a directory structure with STAC Collections and Items.")
parser.add_argument("--update", action="store_true", help="Update collection and its items.")
parser.add_argument(
"--prune", action="store_true",
help="Limit search of STAC Collections only to first top-most matches in the crawled directory structure."
"--prune",
action="store_true",
help="Limit search of STAC Collections only to first top-most matches in the crawled directory structure.",
)
add_request_options(parser)
return parser
Expand All @@ -57,7 +58,7 @@ def runner(ns: argparse.Namespace) -> Optional[int] | NoReturn:

with Session() as session:
apply_request_options(session, ns)
for collection_path, collection_json in STACDirectoryLoader(ns.directory, "collection", ns.prune):
for _, collection_path, collection_json in STACDirectoryLoader(ns.directory, "collection", ns.prune):
collection_dir = os.path.dirname(collection_path)
loader = STACDirectoryLoader(collection_dir, "item", prune=ns.prune)
populator = DirectoryPopulator(ns.stac_host, loader, ns.update, collection_json, session=session)
Expand Down
40 changes: 28 additions & 12 deletions STACpopulator/input.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ class THREDDSCatalog(TDSCatalog):
Because of how :class:`TDSCatalog` automatically loads and parses right away from ``__init__`` call,
we need to hack around how the ``session`` attribute gets defined.
"""

def __init__(self, catalog_url: str, session: Optional[Session] = None) -> None:
self._session = session
super().__init__(catalog_url)
Expand Down Expand Up @@ -91,7 +92,8 @@ def __init__(
:type depth: int, optional
"""
super().__init__()
self._depth = depth if depth is not None else 1000
self._max_depth = depth if depth is not None else 1000
self._depth = 0

self.thredds_catalog_URL = self.validate_catalog_url(thredds_catalog_url)

Expand Down Expand Up @@ -134,18 +136,26 @@ def reset(self):
"""Reset the generator."""
self.catalog_head = self.catalog

def __iter__(self) -> Iterator[Tuple[str, MutableMapping[str, Any]]]:
"""Return a generator walking a THREDDS data catalog for datasets."""
def __iter__(self) -> Iterator[Tuple[str, str, MutableMapping[str, Any]]]:
"""Return a generator walking a THREDDS data catalog for datasets.

:yield: Returns three quantities: name of the item, location of the item, and its attributes
:rtype: Iterator[Tuple[str, str, MutableMapping[str, Any]]]
"""

if self._depth > self._max_depth:
return

if self.catalog_head.datasets.items():
for item_name, ds in self.catalog_head.datasets.items():
attrs = self.extract_metadata(ds)
yield item_name, attrs
yield item_name, ds.url_path, attrs

if self._depth > 0:
for name, ref in self.catalog_head.catalog_refs.items():
self.catalog_head = ref.follow()
self._depth -= 1
yield from self
for name, ref in self.catalog_head.catalog_refs.items():
self.catalog_head = ref.follow()
self._depth -= 1
yield from self
self._depth += 1

def __getitem__(self, dataset):
return self.catalog.datasets[dataset]
Expand Down Expand Up @@ -192,7 +202,13 @@ def __init__(self, path: str, mode: Literal["collection", "item"], prune: bool =
self._collection_mode = mode == "collection"
self._collection_name = "collection.json"

def __iter__(self) -> Iterator[Tuple[str, MutableMapping[str, Any]]]:
def __iter__(self) -> Iterator[Tuple[str, str, MutableMapping[str, Any]]]:
"""Return a generator that walks through a directory structure looking for sTAC Collections or Items.

:yield: Returns three quantities: name of the item, location of the item, and its attributes
:rtype: Iterator[Tuple[str, str, MutableMapping[str, Any]]]
"""

is_root = True
for root, dirs, files in self.iter:
# since there can ever be only one 'collection' file name in a same directory
Expand All @@ -201,7 +217,7 @@ def __iter__(self) -> Iterator[Tuple[str, MutableMapping[str, Any]]]:
if self.prune: # stop recursive search if requested
del dirs[:]
col_path = os.path.join(root, self._collection_name)
yield col_path, self._load_json(col_path)
yield self._collection_name, col_path, self._load_json(col_path)
# if a collection is found deeper when not expected for items parsing
# drop the nested directories to avoid over-crawling nested collections
elif not self._collection_mode and not is_root and self._collection_name in files:
Expand All @@ -211,7 +227,7 @@ def __iter__(self) -> Iterator[Tuple[str, MutableMapping[str, Any]]]:
for name in files:
if not self._collection_mode and self._is_item(name):
item_path = os.path.join(root, name)
yield item_path, self._load_json(item_path)
yield self._collection_name, item_path, self._load_json(item_path)

def _is_item(self, path: Union[os.PathLike[str], str]) -> bool:
name = os.path.split(path)[-1]
Expand Down
11 changes: 8 additions & 3 deletions STACpopulator/populator_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
from STACpopulator.models import AnyGeometry
from STACpopulator.stac_utils import get_logger, load_config, url_validate


LOGGER = get_logger(__name__)


Expand Down Expand Up @@ -144,9 +143,11 @@ def publish_stac_collection(self, collection_data: dict[str, Any]) -> None:
post_stac_collection(self.stac_host, collection_data, self.update, session=self._session)

def ingest(self) -> None:
counter = 0
LOGGER.info("Data ingestion")
for item_name, item_data in self._ingest_pipeline:
LOGGER.info(f"Creating STAC representation for {item_name}")
for item_name, item_loc, item_data in self._ingest_pipeline:
LOGGER.info(f"New data item: {item_name}")
LOGGER.info(f"Data location: {item_loc}")
stac_item = self.create_stac_item(item_name, item_data)
if stac_item:
post_stac_item(
Expand All @@ -157,3 +158,7 @@ def ingest(self) -> None:
update=self.update,
session=self._session,
)
counter += 1
LOGGER.info(f"Processed {counter} data items")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should there be a warning log if the STAC item was not generated/returned? Possibility an unexpected error that was silently ignored?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, good point. I've been wanting to add some more error management code. For the moment, I have added an error log message here.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add the item name and loc to the log error message so we can debug the one that failed appropriately?

else:
LOGGER.error("Failed to create STAC representation")
Loading