From 1b4810805ccbce52ad9d1a27a94190f2f839f1c8 Mon Sep 17 00:00:00 2001 From: Pontus Lurcock Date: Fri, 25 Apr 2025 19:07:41 +0200 Subject: [PATCH 1/9] Integrate output datasets with EOAP STAC catalogue The wrapper script now supports an --eoap command-line flag, which determines whether a STAC catalogue is written. If a STAC catalogue is written, the output is also moved to the same directories as the STAC items, as specified in Issue #19. --- xcengine/core.py | 2 +- xcengine/util.py | 9 +++++++-- xcengine/wrapper.py | 4 +++- 3 files changed, 11 insertions(+), 4 deletions(-) diff --git a/xcengine/core.py b/xcengine/core.py index 6f61108..b034418 100755 --- a/xcengine/core.py +++ b/xcengine/core.py @@ -148,7 +148,7 @@ def create_cwl(self, image_tag: str) -> dict[str, Any]: "python3", "/home/mambauser/execute.py", ], - "arguments": ["--batch"], + "arguments": ["--batch", "--eoap"], # TODO: Handle stage-in and stage-out properly "inputs": self.nb_params.get_cwl_commandline_inputs(), "outputs": { diff --git a/xcengine/util.py b/xcengine/util.py index 6f33b13..97328ed 100644 --- a/xcengine/util.py +++ b/xcengine/util.py @@ -27,10 +27,15 @@ def write_stac( href=f"{stac_root}/catalog.json", ) for ds_name, ds in datasets.items(): - asset_path = str(stac_root / "output" / (ds_name + ".zarr")) + zarr_name = (ds_name + ".zarr") + zarr_path = stac_root / "output" / zarr_name + asset_parent = stac_root / ds_name + asset_parent.mkdir(parents=True, exist_ok=True) + asset_path = asset_parent / zarr_name + zarr_path.rename(asset_path) asset = pystac.Asset( roles=["data", "visual"], - href=asset_path, + href=str(asset_path), # No official media type for Zarr yet, but "application/vnd.zarr" # https://github.com/radiantearth/stac-spec/issues/713 and listed in # https://humanbrainproject.github.io/openMINDS/v3/core/v4/data/contentType.html diff --git a/xcengine/wrapper.py b/xcengine/wrapper.py index 2025733..1ce044a 100644 --- a/xcengine/wrapper.py +++ b/xcengine/wrapper.py @@ -54,6 +54,7 @@ def main(): parser.add_argument("--batch", action="store_true") parser.add_argument("--server", action="store_true") parser.add_argument("--from-saved", action="store_true") + parser.add_argument("--eoap", action="store_true") parser.add_argument("-v", "--verbose", action="count", default=0) args, _ = parser.parse_known_args() if args.verbose > 0: @@ -83,7 +84,8 @@ def main(): # The "finished" file is a flag to indicate to a runner when # processing is complete, though the xcetool runner doesn't yet use it. (output_path / "finished").touch() - util.write_stac(datasets, output_path) + if args.eoap: + util.write_stac(datasets, output_path) if args.server: xcube.util.plugin.init_plugins() From 992871d7383c3c255e8f1f1523a3a62a94881d4a Mon Sep 17 00:00:00 2001 From: Pontus Lurcock Date: Mon, 28 Apr 2025 17:05:49 +0200 Subject: [PATCH 2/9] Only move output data if present; fix failing test - In EOAP mode, check whether an output Zarr is actually present before trying to move it into the STAC hierarchy. - Fix a unit test that hadn't been updated to cope with the new STAC output structure. --- test/test_util.py | 11 ++++++----- xcengine/util.py | 8 +++++++- 2 files changed, 13 insertions(+), 6 deletions(-) diff --git a/test/test_util.py b/test/test_util.py index ae89b50..379276c 100644 --- a/test/test_util.py +++ b/test/test_util.py @@ -39,10 +39,11 @@ def test_clear_directory(tmp_path): def test_write_stac(tmp_path, dataset): - write_stac({"ds1": dataset, "ds2": dataset}, tmp_path) + datasets = {"ds1": dataset, "ds2": dataset} + write_stac(datasets, tmp_path) catalog = pystac.Catalog.from_file(tmp_path / "catalog.json") items = set(catalog.get_items(recursive=True)) - assert {item.id for item in items} == {"ds1", "ds2"} + assert {item.id for item in items} == datasets.keys() catalog.make_all_asset_hrefs_absolute() data_asset_hrefs = { item.id: [ @@ -53,8 +54,8 @@ def test_write_stac(tmp_path, dataset): for item in items } assert data_asset_hrefs == { - ds: [ - str(Path(tmp_path / "output" / f"{ds}.zarr").resolve(strict=False)) + ds_id: [ + str(Path(tmp_path / ds_id / f"{ds_id}.zarr").resolve(strict=False)) ] - for ds in {"ds1", "ds2"} + for ds_id in datasets.keys() } diff --git a/xcengine/util.py b/xcengine/util.py index 97328ed..a76d4b4 100644 --- a/xcengine/util.py +++ b/xcengine/util.py @@ -32,7 +32,13 @@ def write_stac( asset_parent = stac_root / ds_name asset_parent.mkdir(parents=True, exist_ok=True) asset_path = asset_parent / zarr_name - zarr_path.rename(asset_path) + if zarr_path.exists(): + # If a Zarr for this asset is present in the output directory, + # move it into the corresponding STAC subdirectory. If not, + # we write the same STAC items with the same asset links anyway + # and assume that the caller will take care of actually writing + # the asset. + zarr_path.rename(asset_path) asset = pystac.Asset( roles=["data", "visual"], href=str(asset_path), From 2f9035b5a520c6e89aa7046bdc07a4adc14334e5 Mon Sep 17 00:00:00 2001 From: Pontus Lurcock Date: Mon, 28 Apr 2025 17:47:34 +0200 Subject: [PATCH 3/9] Improve test coverage of util.py --- test/test_util.py | 15 +++++++++------ xcengine/wrapper.py | 1 - 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/test/test_util.py b/test/test_util.py index 379276c..f3f1708 100644 --- a/test/test_util.py +++ b/test/test_util.py @@ -38,19 +38,22 @@ def test_clear_directory(tmp_path): assert os.listdir(tmp_path) == [] -def test_write_stac(tmp_path, dataset): +@pytest.mark.parametrize("write_zarrs", [False, True]) +def test_write_stac(tmp_path, dataset, write_zarrs): datasets = {"ds1": dataset, "ds2": dataset} + if write_zarrs: + output_path = tmp_path / "output" + output_path.mkdir() + for ds_id, ds in datasets.items(): + ds.to_zarr(output_path / (ds_id + ".zarr")) + write_stac(datasets, tmp_path) catalog = pystac.Catalog.from_file(tmp_path / "catalog.json") items = set(catalog.get_items(recursive=True)) assert {item.id for item in items} == datasets.keys() catalog.make_all_asset_hrefs_absolute() data_asset_hrefs = { - item.id: [ - a.href # (Path(item.self_href) / a.href).resolve(strict=False) - for a in item.assets.values() - if "data" in a.roles - ] + item.id: [a.href for a in item.assets.values() if "data" in a.roles] for item in items } assert data_asset_hrefs == { diff --git a/xcengine/wrapper.py b/xcengine/wrapper.py index 1ce044a..d94a19e 100644 --- a/xcengine/wrapper.py +++ b/xcengine/wrapper.py @@ -70,7 +70,6 @@ def main(): saved_datasets = {} if args.batch: - # TODO: Implement EOAP-compliant stage-in and stage-out # EOAP doesn't require an "output" subdirectory (output can go anywhere # in the CWD) but it's used by xcetool's built-in runner. # Note that EOAP runners typically override the image-specified CWD. From 88af78334c8e18ff5de4ef3afea64e75e8134d0a Mon Sep 17 00:00:00 2001 From: Pontus Lurcock Date: Tue, 29 Apr 2025 12:39:11 +0200 Subject: [PATCH 4/9] Refactor dataset writing; improve test coverage --- test/test_util.py | 15 ++++++++++++++- xcengine/util.py | 19 +++++++++++++++++++ xcengine/wrapper.py | 19 +++---------------- 3 files changed, 36 insertions(+), 17 deletions(-) diff --git a/test/test_util.py b/test/test_util.py index f3f1708..2f1f677 100644 --- a/test/test_util.py +++ b/test/test_util.py @@ -5,7 +5,7 @@ import pytest import xarray as xr -from xcengine.util import clear_directory, write_stac +from xcengine.util import clear_directory, write_stac, save_datasets @pytest.fixture @@ -62,3 +62,16 @@ def test_write_stac(tmp_path, dataset, write_zarrs): ] for ds_id in datasets.keys() } + + +@pytest.mark.parametrize("eoap_mode", [False, True]) +def test_save_datasets(tmp_path, dataset, eoap_mode): + datasets = {"ds1": dataset, "ds2": dataset} + save_datasets(datasets, tmp_path, eoap_mode) + for ds_id in datasets.keys(): + assert (tmp_path / (ds_id if eoap_mode else "output") / (ds_id + ".zarr")).is_dir() + catalogue_path = (tmp_path / "catalog.json") + if eoap_mode: + assert catalogue_path.is_file() + else: + assert not catalogue_path.exists() diff --git a/xcengine/util.py b/xcengine/util.py index a76d4b4..d242802 100644 --- a/xcengine/util.py +++ b/xcengine/util.py @@ -76,3 +76,22 @@ def write_stac( catalog.add_item(item) catalog.make_all_asset_hrefs_relative() catalog.save(catalog_type=pystac.CatalogType.SELF_CONTAINED) + + +def save_datasets(datasets, output_path: pathlib.Path, eoap_mode: bool) -> dict[str, xr.Dataset]: + saved_datasets = {} + # EOAP doesn't require an "output" subdirectory (output can go anywhere + # in the CWD) but it's used by xcetool's built-in runner. + # Note that EOAP runners typically override the image-specified CWD. + for ds_id, ds in datasets.items(): + output_subpath = output_path / (ds_id if eoap_mode else "output") + output_subpath.mkdir(parents=True, exist_ok=True) + dataset_path = output_subpath / (ds_id + ".zarr") + saved_datasets[ds_id] = dataset_path + ds.to_zarr(dataset_path) + # The "finished" file is a flag to indicate to a runner when + # processing is complete, though the xcetool runner doesn't yet use it. + (output_path / "finished").touch() + if eoap_mode: + write_stac(datasets, output_path) + return saved_datasets diff --git a/xcengine/wrapper.py b/xcengine/wrapper.py index d94a19e..c371419 100644 --- a/xcengine/wrapper.py +++ b/xcengine/wrapper.py @@ -10,10 +10,11 @@ import pathlib import sys +from xcengine.util import save_datasets + print("CWD", os.getcwd()) import parameters -import util LOGGER = logging.getLogger(__name__) logging.basicConfig(level=logging.INFO) @@ -70,21 +71,7 @@ def main(): saved_datasets = {} if args.batch: - # EOAP doesn't require an "output" subdirectory (output can go anywhere - # in the CWD) but it's used by xcetool's built-in runner. - # Note that EOAP runners typically override the image-specified CWD. - output_path = pathlib.Path.cwd() - output_subpath = output_path / "output" - output_subpath.mkdir(parents=True, exist_ok=True) - for name, dataset in datasets.items(): - dataset_path = output_subpath / (name + ".zarr") - saved_datasets[name] = dataset_path - dataset.to_zarr(dataset_path) - # The "finished" file is a flag to indicate to a runner when - # processing is complete, though the xcetool runner doesn't yet use it. - (output_path / "finished").touch() - if args.eoap: - util.write_stac(datasets, output_path) + saved_datasets = save_datasets(datasets, pathlib.Path.cwd(), args.eoap) if args.server: xcube.util.plugin.init_plugins() From 96cae383a85a5436c547284f21bfa905f9a781f9 Mon Sep 17 00:00:00 2001 From: Pontus Lurcock Date: Tue, 29 Apr 2025 12:43:56 +0200 Subject: [PATCH 5/9] Reformat code --- test/test_util.py | 6 ++++-- xcengine/util.py | 6 ++++-- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/test/test_util.py b/test/test_util.py index 2f1f677..98090e3 100644 --- a/test/test_util.py +++ b/test/test_util.py @@ -69,8 +69,10 @@ def test_save_datasets(tmp_path, dataset, eoap_mode): datasets = {"ds1": dataset, "ds2": dataset} save_datasets(datasets, tmp_path, eoap_mode) for ds_id in datasets.keys(): - assert (tmp_path / (ds_id if eoap_mode else "output") / (ds_id + ".zarr")).is_dir() - catalogue_path = (tmp_path / "catalog.json") + assert ( + tmp_path / (ds_id if eoap_mode else "output") / (ds_id + ".zarr") + ).is_dir() + catalogue_path = tmp_path / "catalog.json" if eoap_mode: assert catalogue_path.is_file() else: diff --git a/xcengine/util.py b/xcengine/util.py index d242802..3312c4b 100644 --- a/xcengine/util.py +++ b/xcengine/util.py @@ -27,7 +27,7 @@ def write_stac( href=f"{stac_root}/catalog.json", ) for ds_name, ds in datasets.items(): - zarr_name = (ds_name + ".zarr") + zarr_name = ds_name + ".zarr" zarr_path = stac_root / "output" / zarr_name asset_parent = stac_root / ds_name asset_parent.mkdir(parents=True, exist_ok=True) @@ -78,7 +78,9 @@ def write_stac( catalog.save(catalog_type=pystac.CatalogType.SELF_CONTAINED) -def save_datasets(datasets, output_path: pathlib.Path, eoap_mode: bool) -> dict[str, xr.Dataset]: +def save_datasets( + datasets, output_path: pathlib.Path, eoap_mode: bool +) -> dict[str, xr.Dataset]: saved_datasets = {} # EOAP doesn't require an "output" subdirectory (output can go anywhere # in the CWD) but it's used by xcetool's built-in runner. From c9bcd1116e23ae7725d378305454a450edcbf564 Mon Sep 17 00:00:00 2001 From: Pontus Lurcock Date: Tue, 29 Apr 2025 17:06:19 +0200 Subject: [PATCH 6/9] Improve test coverage --- test/test_wrapper.py | 28 +++++++++++++++++----------- xcengine/wrapper.py | 7 ++++--- 2 files changed, 21 insertions(+), 14 deletions(-) diff --git a/test/test_wrapper.py b/test/test_wrapper.py index 9a7757a..37ebb72 100644 --- a/test/test_wrapper.py +++ b/test/test_wrapper.py @@ -1,17 +1,23 @@ import os -from unittest.mock import patch -import sys +from unittest.mock import patch, Mock +import pytest -@patch("sys.argv", ["wrapper.py", "--verbose"]) -def test_wrapper(tmp_path, monkeypatch): +@patch("xcengine.util.save_datasets") +@pytest.mark.parametrize("cli_args", [["--verbose"], ["--batch"]]) +def test_wrapper(save_datasets_mock, tmp_path, monkeypatch, cli_args): import xcengine - for path in xcengine.__path__: - monkeypatch.syspath_prepend(path) - user_code_path = tmp_path / "user_code.py" - user_code_path.touch() - os.environ["XC_USER_CODE_PATH"] = str(user_code_path) - from xcengine import wrapper + with patch("sys.argv", ["wrapper.py"] + cli_args): + for path in xcengine.__path__: + monkeypatch.syspath_prepend(path) + user_code_path = tmp_path / "user_code.py" + user_code_path.touch() + os.environ["XC_USER_CODE_PATH"] = str(user_code_path) + from xcengine import wrapper - xcengine.wrapper.main() + xcengine.wrapper.main() + + assert save_datasets_mock.call_count == ( + 1 if "--batch" in cli_args else 0 + ) diff --git a/xcengine/wrapper.py b/xcengine/wrapper.py index c371419..813c8a5 100644 --- a/xcengine/wrapper.py +++ b/xcengine/wrapper.py @@ -9,8 +9,7 @@ import os import pathlib import sys - -from xcengine.util import save_datasets +import xcengine.util print("CWD", os.getcwd()) @@ -71,7 +70,9 @@ def main(): saved_datasets = {} if args.batch: - saved_datasets = save_datasets(datasets, pathlib.Path.cwd(), args.eoap) + saved_datasets = xcengine.util.save_datasets( + datasets, pathlib.Path.cwd(), args.eoap + ) if args.server: xcube.util.plugin.init_plugins() From 36d6f2334cfb2db88a798f5dcea02e37a3e6aa1f Mon Sep 17 00:00:00 2001 From: Pontus Lurcock Date: Fri, 9 May 2025 16:48:13 +0200 Subject: [PATCH 7/9] Add minimal documentation and mkdocs configuration --- docs/index.md | 25 +++++++++++++++++++++++++ mkdocs.yml | 4 ++++ 2 files changed, 29 insertions(+) create mode 100644 docs/index.md create mode 100644 mkdocs.yml diff --git a/docs/index.md b/docs/index.md new file mode 100644 index 0000000..97f17cf --- /dev/null +++ b/docs/index.md @@ -0,0 +1,25 @@ +# xcengine + +xcube tools for compute engines + +## Overview + +xcengine provides a command `xcetool` which converts a Jupyter Python notebook +to a compute engine. A compute engine consists of: + +- A docker container image packaging the notebook code, a Python environment, + and an [xcube](https://xcube.readthedocs.io/) server component, letting + the notebook results be served over multiple supported APIs and explored + in the interactive xcube viewer. +- An accompanying CWL file defining an OGC + [Earth Observation Application Package](docs.ogc.org/bp/20-089r1.html) using + the container image. This lets your notebook code run as part of a processing + workflow on any EOAP platform. + +In the conversion process, xcengine tries to maximize convenience for the user +by requiring as little extra configuration and input as possible. Input +variables and their types can be defined by tagging a notebook cell +(similarly to [papermill](https://papermill.readthedocs.io/)), and output +datasets are automatically extracted from the notebook’s environment. +Some user configuration is unavoidable, but xcengine automates much of the +boilerplate required to create an EOAP. diff --git a/mkdocs.yml b/mkdocs.yml new file mode 100644 index 0000000..b1c401f --- /dev/null +++ b/mkdocs.yml @@ -0,0 +1,4 @@ +site_name: xcengine +site_url: https://xcube-dev.github.io/xcengine +theme: + name: material From 872f904ba394824916b1598a8352b67c21165db8 Mon Sep 17 00:00:00 2001 From: Pontus Lurcock Date: Fri, 9 May 2025 16:50:45 +0200 Subject: [PATCH 8/9] Add GitHub workflow to publish docs --- .github/workflows/publish-docs.yaml | 28 ++++++++++++++++++++++++++++ 1 file changed, 28 insertions(+) create mode 100644 .github/workflows/publish-docs.yaml diff --git a/.github/workflows/publish-docs.yaml b/.github/workflows/publish-docs.yaml new file mode 100644 index 0000000..c6eb026 --- /dev/null +++ b/.github/workflows/publish-docs.yaml @@ -0,0 +1,28 @@ +name: publish-docs +on: + push: + branches: + - main +permissions: + contents: write +jobs: + deploy: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - name: Configure Git Credentials + run: | + git config user.name github-actions[bot] + git config user.email 41898282+github-actions[bot]@users.noreply.github.com + - uses: actions/setup-python@v5 + with: + python-version: 3.x + - run: echo "cache_id=$(date --utc '+%V')" >> $GITHUB_ENV + - uses: actions/cache@v4 + with: + key: mkdocs-material-${{ env.cache_id }} + path: .cache + restore-keys: | + mkdocs-material- + - run: pip install mkdocs-material + - run: mkdocs gh-deploy --force From a21d4d028832a3f58ef89479a600c8f86ad6cd5e Mon Sep 17 00:00:00 2001 From: Pontus Lurcock Date: Fri, 9 May 2025 18:03:51 +0200 Subject: [PATCH 9/9] Fix bug in wrapper script Import path for util module was incorrect. --- test/test_wrapper.py | 6 +++--- xcengine/wrapper.py | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/test/test_wrapper.py b/test/test_wrapper.py index 37ebb72..93919c0 100644 --- a/test/test_wrapper.py +++ b/test/test_wrapper.py @@ -3,9 +3,8 @@ import pytest -@patch("xcengine.util.save_datasets") @pytest.mark.parametrize("cli_args", [["--verbose"], ["--batch"]]) -def test_wrapper(save_datasets_mock, tmp_path, monkeypatch, cli_args): +def test_wrapper(tmp_path, monkeypatch, cli_args): import xcengine with patch("sys.argv", ["wrapper.py"] + cli_args): @@ -16,7 +15,8 @@ def test_wrapper(save_datasets_mock, tmp_path, monkeypatch, cli_args): os.environ["XC_USER_CODE_PATH"] = str(user_code_path) from xcengine import wrapper - xcengine.wrapper.main() + with patch("util.save_datasets", save_datasets_mock := Mock()): + xcengine.wrapper.main() assert save_datasets_mock.call_count == ( 1 if "--batch" in cli_args else 0 diff --git a/xcengine/wrapper.py b/xcengine/wrapper.py index 813c8a5..f91d3dd 100644 --- a/xcengine/wrapper.py +++ b/xcengine/wrapper.py @@ -9,7 +9,7 @@ import os import pathlib import sys -import xcengine.util +import util print("CWD", os.getcwd()) @@ -70,7 +70,7 @@ def main(): saved_datasets = {} if args.batch: - saved_datasets = xcengine.util.save_datasets( + saved_datasets = util.save_datasets( datasets, pathlib.Path.cwd(), args.eoap )