Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions .github/workflows/publish-docs.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
name: publish-docs
on:
push:
branches:
- main
permissions:
contents: write
jobs:
deploy:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Configure Git Credentials
run: |
git config user.name github-actions[bot]
git config user.email 41898282+github-actions[bot]@users.noreply.github.com
- uses: actions/setup-python@v5
with:
python-version: 3.x
- run: echo "cache_id=$(date --utc '+%V')" >> $GITHUB_ENV
- uses: actions/cache@v4
with:
key: mkdocs-material-${{ env.cache_id }}
path: .cache
restore-keys: |
mkdocs-material-
- run: pip install mkdocs-material
- run: mkdocs gh-deploy --force
25 changes: 25 additions & 0 deletions docs/index.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# xcengine

xcube tools for compute engines

## Overview

xcengine provides a command `xcetool` which converts a Jupyter Python notebook
to a compute engine. A compute engine consists of:

- A docker container image packaging the notebook code, a Python environment,
and an [xcube](https://xcube.readthedocs.io/) server component, letting
the notebook results be served over multiple supported APIs and explored
in the interactive xcube viewer.
- An accompanying CWL file defining an OGC
[Earth Observation Application Package](docs.ogc.org/bp/20-089r1.html) using
the container image. This lets your notebook code run as part of a processing
workflow on any EOAP platform.

In the conversion process, xcengine tries to maximize convenience for the user
by requiring as little extra configuration and input as possible. Input
variables and their types can be defined by tagging a notebook cell
(similarly to [papermill](https://papermill.readthedocs.io/)), and output
datasets are automatically extracted from the notebook’s environment.
Some user configuration is unavoidable, but xcengine automates much of the
boilerplate required to create an EOAP.
4 changes: 4 additions & 0 deletions mkdocs.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
site_name: xcengine
site_url: https://xcube-dev.github.io/xcengine
theme:
name: material
43 changes: 31 additions & 12 deletions test/test_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import pytest
import xarray as xr

from xcengine.util import clear_directory, write_stac
from xcengine.util import clear_directory, write_stac, save_datasets


@pytest.fixture
Expand Down Expand Up @@ -38,23 +38,42 @@ def test_clear_directory(tmp_path):
assert os.listdir(tmp_path) == []


def test_write_stac(tmp_path, dataset):
write_stac({"ds1": dataset, "ds2": dataset}, tmp_path)
@pytest.mark.parametrize("write_zarrs", [False, True])
def test_write_stac(tmp_path, dataset, write_zarrs):
datasets = {"ds1": dataset, "ds2": dataset}
if write_zarrs:
output_path = tmp_path / "output"
output_path.mkdir()
for ds_id, ds in datasets.items():
ds.to_zarr(output_path / (ds_id + ".zarr"))

write_stac(datasets, tmp_path)
catalog = pystac.Catalog.from_file(tmp_path / "catalog.json")
items = set(catalog.get_items(recursive=True))
assert {item.id for item in items} == {"ds1", "ds2"}
assert {item.id for item in items} == datasets.keys()
catalog.make_all_asset_hrefs_absolute()
data_asset_hrefs = {
item.id: [
a.href # (Path(item.self_href) / a.href).resolve(strict=False)
for a in item.assets.values()
if "data" in a.roles
]
item.id: [a.href for a in item.assets.values() if "data" in a.roles]
for item in items
}
assert data_asset_hrefs == {
ds: [
str(Path(tmp_path / "output" / f"{ds}.zarr").resolve(strict=False))
ds_id: [
str(Path(tmp_path / ds_id / f"{ds_id}.zarr").resolve(strict=False))
]
for ds in {"ds1", "ds2"}
for ds_id in datasets.keys()
}


@pytest.mark.parametrize("eoap_mode", [False, True])
def test_save_datasets(tmp_path, dataset, eoap_mode):
datasets = {"ds1": dataset, "ds2": dataset}
save_datasets(datasets, tmp_path, eoap_mode)
for ds_id in datasets.keys():
assert (
tmp_path / (ds_id if eoap_mode else "output") / (ds_id + ".zarr")
).is_dir()
catalogue_path = tmp_path / "catalog.json"
if eoap_mode:
assert catalogue_path.is_file()
else:
assert not catalogue_path.exists()
28 changes: 17 additions & 11 deletions test/test_wrapper.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,23 @@
import os
from unittest.mock import patch
import sys
from unittest.mock import patch, Mock
import pytest


@patch("sys.argv", ["wrapper.py", "--verbose"])
def test_wrapper(tmp_path, monkeypatch):
@pytest.mark.parametrize("cli_args", [["--verbose"], ["--batch"]])
def test_wrapper(tmp_path, monkeypatch, cli_args):
import xcengine

for path in xcengine.__path__:
monkeypatch.syspath_prepend(path)
user_code_path = tmp_path / "user_code.py"
user_code_path.touch()
os.environ["XC_USER_CODE_PATH"] = str(user_code_path)
from xcengine import wrapper
with patch("sys.argv", ["wrapper.py"] + cli_args):
for path in xcengine.__path__:
monkeypatch.syspath_prepend(path)
user_code_path = tmp_path / "user_code.py"
user_code_path.touch()
os.environ["XC_USER_CODE_PATH"] = str(user_code_path)
from xcengine import wrapper

xcengine.wrapper.main()
with patch("util.save_datasets", save_datasets_mock := Mock()):
xcengine.wrapper.main()

assert save_datasets_mock.call_count == (
1 if "--batch" in cli_args else 0
)
2 changes: 1 addition & 1 deletion xcengine/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ def create_cwl(self, image_tag: str) -> dict[str, Any]:
"python3",
"/home/mambauser/execute.py",
],
"arguments": ["--batch"],
"arguments": ["--batch", "--eoap"],
# TODO: Handle stage-in and stage-out properly
"inputs": self.nb_params.get_cwl_commandline_inputs(),
"outputs": {
Expand Down
36 changes: 34 additions & 2 deletions xcengine/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,21 @@ def write_stac(
href=f"{stac_root}/catalog.json",
)
for ds_name, ds in datasets.items():
asset_path = str(stac_root / "output" / (ds_name + ".zarr"))
zarr_name = ds_name + ".zarr"
zarr_path = stac_root / "output" / zarr_name
asset_parent = stac_root / ds_name
asset_parent.mkdir(parents=True, exist_ok=True)
asset_path = asset_parent / zarr_name
if zarr_path.exists():
# If a Zarr for this asset is present in the output directory,
# move it into the corresponding STAC subdirectory. If not,
# we write the same STAC items with the same asset links anyway
# and assume that the caller will take care of actually writing
# the asset.
zarr_path.rename(asset_path)
asset = pystac.Asset(
roles=["data", "visual"],
href=asset_path,
href=str(asset_path),
# No official media type for Zarr yet, but "application/vnd.zarr"
# https://github.com/radiantearth/stac-spec/issues/713 and listed in
# https://humanbrainproject.github.io/openMINDS/v3/core/v4/data/contentType.html
Expand Down Expand Up @@ -65,3 +76,24 @@ def write_stac(
catalog.add_item(item)
catalog.make_all_asset_hrefs_relative()
catalog.save(catalog_type=pystac.CatalogType.SELF_CONTAINED)


def save_datasets(
datasets, output_path: pathlib.Path, eoap_mode: bool
) -> dict[str, xr.Dataset]:
saved_datasets = {}
# EOAP doesn't require an "output" subdirectory (output can go anywhere
# in the CWD) but it's used by xcetool's built-in runner.
# Note that EOAP runners typically override the image-specified CWD.
for ds_id, ds in datasets.items():
output_subpath = output_path / (ds_id if eoap_mode else "output")
output_subpath.mkdir(parents=True, exist_ok=True)
dataset_path = output_subpath / (ds_id + ".zarr")
saved_datasets[ds_id] = dataset_path
ds.to_zarr(dataset_path)
# The "finished" file is a flag to indicate to a runner when
# processing is complete, though the xcetool runner doesn't yet use it.
(output_path / "finished").touch()
if eoap_mode:
write_stac(datasets, output_path)
return saved_datasets
21 changes: 5 additions & 16 deletions xcengine/wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@
import os
import pathlib
import sys
import util

print("CWD", os.getcwd())

import parameters
import util

LOGGER = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)
Expand Down Expand Up @@ -54,6 +54,7 @@ def main():
parser.add_argument("--batch", action="store_true")
parser.add_argument("--server", action="store_true")
parser.add_argument("--from-saved", action="store_true")
parser.add_argument("--eoap", action="store_true")
parser.add_argument("-v", "--verbose", action="count", default=0)
args, _ = parser.parse_known_args()
if args.verbose > 0:
Expand All @@ -69,21 +70,9 @@ def main():
saved_datasets = {}

if args.batch:
# TODO: Implement EOAP-compliant stage-in and stage-out
# EOAP doesn't require an "output" subdirectory (output can go anywhere
# in the CWD) but it's used by xcetool's built-in runner.
# Note that EOAP runners typically override the image-specified CWD.
output_path = pathlib.Path.cwd()
output_subpath = output_path / "output"
output_subpath.mkdir(parents=True, exist_ok=True)
for name, dataset in datasets.items():
dataset_path = output_subpath / (name + ".zarr")
saved_datasets[name] = dataset_path
dataset.to_zarr(dataset_path)
# The "finished" file is a flag to indicate to a runner when
# processing is complete, though the xcetool runner doesn't yet use it.
(output_path / "finished").touch()
util.write_stac(datasets, output_path)
saved_datasets = util.save_datasets(
datasets, pathlib.Path.cwd(), args.eoap
)

if args.server:
xcube.util.plugin.init_plugins()
Expand Down