Skip to content

Commit 5d3573f

Browse files
authored
Merge pull request #20 from xcube-dev/pont-19-restructure-stac
Integrate output datasets with EOAP STAC catalogue
2 parents ff84e56 + a21d4d0 commit 5d3573f

File tree

8 files changed

+145
-42
lines changed

8 files changed

+145
-42
lines changed
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
name: publish-docs
2+
on:
3+
push:
4+
branches:
5+
- main
6+
permissions:
7+
contents: write
8+
jobs:
9+
deploy:
10+
runs-on: ubuntu-latest
11+
steps:
12+
- uses: actions/checkout@v4
13+
- name: Configure Git Credentials
14+
run: |
15+
git config user.name github-actions[bot]
16+
git config user.email 41898282+github-actions[bot]@users.noreply.github.com
17+
- uses: actions/setup-python@v5
18+
with:
19+
python-version: 3.x
20+
- run: echo "cache_id=$(date --utc '+%V')" >> $GITHUB_ENV
21+
- uses: actions/cache@v4
22+
with:
23+
key: mkdocs-material-${{ env.cache_id }}
24+
path: .cache
25+
restore-keys: |
26+
mkdocs-material-
27+
- run: pip install mkdocs-material
28+
- run: mkdocs gh-deploy --force

docs/index.md

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
# xcengine
2+
3+
xcube tools for compute engines
4+
5+
## Overview
6+
7+
xcengine provides a command `xcetool` which converts a Jupyter Python notebook
8+
to a compute engine. A compute engine consists of:
9+
10+
- A docker container image packaging the notebook code, a Python environment,
11+
and an [xcube](https://xcube.readthedocs.io/) server component, letting
12+
the notebook results be served over multiple supported APIs and explored
13+
in the interactive xcube viewer.
14+
- An accompanying CWL file defining an OGC
15+
[Earth Observation Application Package](docs.ogc.org/bp/20-089r1.html) using
16+
the container image. This lets your notebook code run as part of a processing
17+
workflow on any EOAP platform.
18+
19+
In the conversion process, xcengine tries to maximize convenience for the user
20+
by requiring as little extra configuration and input as possible. Input
21+
variables and their types can be defined by tagging a notebook cell
22+
(similarly to [papermill](https://papermill.readthedocs.io/)), and output
23+
datasets are automatically extracted from the notebook’s environment.
24+
Some user configuration is unavoidable, but xcengine automates much of the
25+
boilerplate required to create an EOAP.

mkdocs.yml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
site_name: xcengine
2+
site_url: https://xcube-dev.github.io/xcengine
3+
theme:
4+
name: material

test/test_util.py

Lines changed: 31 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
import pytest
66
import xarray as xr
77

8-
from xcengine.util import clear_directory, write_stac
8+
from xcengine.util import clear_directory, write_stac, save_datasets
99

1010

1111
@pytest.fixture
@@ -38,23 +38,42 @@ def test_clear_directory(tmp_path):
3838
assert os.listdir(tmp_path) == []
3939

4040

41-
def test_write_stac(tmp_path, dataset):
42-
write_stac({"ds1": dataset, "ds2": dataset}, tmp_path)
41+
@pytest.mark.parametrize("write_zarrs", [False, True])
42+
def test_write_stac(tmp_path, dataset, write_zarrs):
43+
datasets = {"ds1": dataset, "ds2": dataset}
44+
if write_zarrs:
45+
output_path = tmp_path / "output"
46+
output_path.mkdir()
47+
for ds_id, ds in datasets.items():
48+
ds.to_zarr(output_path / (ds_id + ".zarr"))
49+
50+
write_stac(datasets, tmp_path)
4351
catalog = pystac.Catalog.from_file(tmp_path / "catalog.json")
4452
items = set(catalog.get_items(recursive=True))
45-
assert {item.id for item in items} == {"ds1", "ds2"}
53+
assert {item.id for item in items} == datasets.keys()
4654
catalog.make_all_asset_hrefs_absolute()
4755
data_asset_hrefs = {
48-
item.id: [
49-
a.href # (Path(item.self_href) / a.href).resolve(strict=False)
50-
for a in item.assets.values()
51-
if "data" in a.roles
52-
]
56+
item.id: [a.href for a in item.assets.values() if "data" in a.roles]
5357
for item in items
5458
}
5559
assert data_asset_hrefs == {
56-
ds: [
57-
str(Path(tmp_path / "output" / f"{ds}.zarr").resolve(strict=False))
60+
ds_id: [
61+
str(Path(tmp_path / ds_id / f"{ds_id}.zarr").resolve(strict=False))
5862
]
59-
for ds in {"ds1", "ds2"}
63+
for ds_id in datasets.keys()
6064
}
65+
66+
67+
@pytest.mark.parametrize("eoap_mode", [False, True])
68+
def test_save_datasets(tmp_path, dataset, eoap_mode):
69+
datasets = {"ds1": dataset, "ds2": dataset}
70+
save_datasets(datasets, tmp_path, eoap_mode)
71+
for ds_id in datasets.keys():
72+
assert (
73+
tmp_path / (ds_id if eoap_mode else "output") / (ds_id + ".zarr")
74+
).is_dir()
75+
catalogue_path = tmp_path / "catalog.json"
76+
if eoap_mode:
77+
assert catalogue_path.is_file()
78+
else:
79+
assert not catalogue_path.exists()

test/test_wrapper.py

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,23 @@
11
import os
2-
from unittest.mock import patch
3-
import sys
2+
from unittest.mock import patch, Mock
3+
import pytest
44

55

6-
@patch("sys.argv", ["wrapper.py", "--verbose"])
7-
def test_wrapper(tmp_path, monkeypatch):
6+
@pytest.mark.parametrize("cli_args", [["--verbose"], ["--batch"]])
7+
def test_wrapper(tmp_path, monkeypatch, cli_args):
88
import xcengine
99

10-
for path in xcengine.__path__:
11-
monkeypatch.syspath_prepend(path)
12-
user_code_path = tmp_path / "user_code.py"
13-
user_code_path.touch()
14-
os.environ["XC_USER_CODE_PATH"] = str(user_code_path)
15-
from xcengine import wrapper
10+
with patch("sys.argv", ["wrapper.py"] + cli_args):
11+
for path in xcengine.__path__:
12+
monkeypatch.syspath_prepend(path)
13+
user_code_path = tmp_path / "user_code.py"
14+
user_code_path.touch()
15+
os.environ["XC_USER_CODE_PATH"] = str(user_code_path)
16+
from xcengine import wrapper
1617

17-
xcengine.wrapper.main()
18+
with patch("util.save_datasets", save_datasets_mock := Mock()):
19+
xcengine.wrapper.main()
20+
21+
assert save_datasets_mock.call_count == (
22+
1 if "--batch" in cli_args else 0
23+
)

xcengine/core.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ def create_cwl(self, image_tag: str) -> dict[str, Any]:
148148
"python3",
149149
"/home/mambauser/execute.py",
150150
],
151-
"arguments": ["--batch"],
151+
"arguments": ["--batch", "--eoap"],
152152
# TODO: Handle stage-in and stage-out properly
153153
"inputs": self.nb_params.get_cwl_commandline_inputs(),
154154
"outputs": {

xcengine/util.py

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,21 @@ def write_stac(
2727
href=f"{stac_root}/catalog.json",
2828
)
2929
for ds_name, ds in datasets.items():
30-
asset_path = str(stac_root / "output" / (ds_name + ".zarr"))
30+
zarr_name = ds_name + ".zarr"
31+
zarr_path = stac_root / "output" / zarr_name
32+
asset_parent = stac_root / ds_name
33+
asset_parent.mkdir(parents=True, exist_ok=True)
34+
asset_path = asset_parent / zarr_name
35+
if zarr_path.exists():
36+
# If a Zarr for this asset is present in the output directory,
37+
# move it into the corresponding STAC subdirectory. If not,
38+
# we write the same STAC items with the same asset links anyway
39+
# and assume that the caller will take care of actually writing
40+
# the asset.
41+
zarr_path.rename(asset_path)
3142
asset = pystac.Asset(
3243
roles=["data", "visual"],
33-
href=asset_path,
44+
href=str(asset_path),
3445
# No official media type for Zarr yet, but "application/vnd.zarr"
3546
# https://github.com/radiantearth/stac-spec/issues/713 and listed in
3647
# https://humanbrainproject.github.io/openMINDS/v3/core/v4/data/contentType.html
@@ -65,3 +76,24 @@ def write_stac(
6576
catalog.add_item(item)
6677
catalog.make_all_asset_hrefs_relative()
6778
catalog.save(catalog_type=pystac.CatalogType.SELF_CONTAINED)
79+
80+
81+
def save_datasets(
82+
datasets, output_path: pathlib.Path, eoap_mode: bool
83+
) -> dict[str, xr.Dataset]:
84+
saved_datasets = {}
85+
# EOAP doesn't require an "output" subdirectory (output can go anywhere
86+
# in the CWD) but it's used by xcetool's built-in runner.
87+
# Note that EOAP runners typically override the image-specified CWD.
88+
for ds_id, ds in datasets.items():
89+
output_subpath = output_path / (ds_id if eoap_mode else "output")
90+
output_subpath.mkdir(parents=True, exist_ok=True)
91+
dataset_path = output_subpath / (ds_id + ".zarr")
92+
saved_datasets[ds_id] = dataset_path
93+
ds.to_zarr(dataset_path)
94+
# The "finished" file is a flag to indicate to a runner when
95+
# processing is complete, though the xcetool runner doesn't yet use it.
96+
(output_path / "finished").touch()
97+
if eoap_mode:
98+
write_stac(datasets, output_path)
99+
return saved_datasets

xcengine/wrapper.py

Lines changed: 5 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,11 @@
99
import os
1010
import pathlib
1111
import sys
12+
import util
1213

1314
print("CWD", os.getcwd())
1415

1516
import parameters
16-
import util
1717

1818
LOGGER = logging.getLogger(__name__)
1919
logging.basicConfig(level=logging.INFO)
@@ -54,6 +54,7 @@ def main():
5454
parser.add_argument("--batch", action="store_true")
5555
parser.add_argument("--server", action="store_true")
5656
parser.add_argument("--from-saved", action="store_true")
57+
parser.add_argument("--eoap", action="store_true")
5758
parser.add_argument("-v", "--verbose", action="count", default=0)
5859
args, _ = parser.parse_known_args()
5960
if args.verbose > 0:
@@ -69,21 +70,9 @@ def main():
6970
saved_datasets = {}
7071

7172
if args.batch:
72-
# TODO: Implement EOAP-compliant stage-in and stage-out
73-
# EOAP doesn't require an "output" subdirectory (output can go anywhere
74-
# in the CWD) but it's used by xcetool's built-in runner.
75-
# Note that EOAP runners typically override the image-specified CWD.
76-
output_path = pathlib.Path.cwd()
77-
output_subpath = output_path / "output"
78-
output_subpath.mkdir(parents=True, exist_ok=True)
79-
for name, dataset in datasets.items():
80-
dataset_path = output_subpath / (name + ".zarr")
81-
saved_datasets[name] = dataset_path
82-
dataset.to_zarr(dataset_path)
83-
# The "finished" file is a flag to indicate to a runner when
84-
# processing is complete, though the xcetool runner doesn't yet use it.
85-
(output_path / "finished").touch()
86-
util.write_stac(datasets, output_path)
73+
saved_datasets = util.save_datasets(
74+
datasets, pathlib.Path.cwd(), args.eoap
75+
)
8776

8877
if args.server:
8978
xcube.util.plugin.init_plugins()

0 commit comments

Comments
 (0)