Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
* Support writing of stage-out STAC by notebook (#32)
* Make viewer work on non-default ports (#21)
* Improve dynamic example notebook
* Support NetCDF output (#28)

## Changes in 0.1.0

Expand Down
12 changes: 12 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,18 @@ parameters cell and make them available as command-line parameters for the
output script and container, and as workflow parameters for the application
package.

# Customizing output formats

An xcengine-generated script or image can automatically write all
`xarray.Dataset` objects from the notebook code to disk, for example to be
staged out as EO Application Package outputs. By default, Zarr format is
used, but this can be changed to NetCDF on a per-dataset basis by applying
an attribute:

```python
my_dataset.attrs["xcengine_output_format"] = "netcdf"
```

# xcetool usage

xcengine provides a command-line tool called `xcetool`, which has several
Expand Down
39 changes: 23 additions & 16 deletions test/test_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,15 @@ def test_clear_directory(tmp_path):
assert os.listdir(tmp_path) == []


@pytest.mark.parametrize("write_zarrs", [False, True])
def test_write_stac(tmp_path, dataset, write_zarrs):
datasets = {"ds1": dataset, "ds2": dataset}
if write_zarrs:
@pytest.mark.parametrize("write_datasets", [False, True])
def test_write_stac(tmp_path, dataset, write_datasets):
datasets = {"ds1": dataset, "ds2": dataset.copy()}
datasets["ds2"].attrs["xcengine_output_format"] = "netcdf"
if write_datasets:
output_path = tmp_path / "output"
output_path.mkdir()
for ds_id, ds in datasets.items():
ds.to_zarr(output_path / (ds_id + ".zarr"))
datasets["ds1"].to_zarr(output_path / ("ds1.zarr"))
datasets["ds2"].to_netcdf(output_path / ("ds2.nc"))

write_stac(datasets, tmp_path)
catalog = pystac.Catalog.from_file(tmp_path / "catalog.json")
Expand All @@ -57,21 +58,27 @@ def test_write_stac(tmp_path, dataset, write_zarrs):
for item in items
}
assert data_asset_hrefs == {
ds_id: [
str(Path(tmp_path / ds_id / f"{ds_id}.zarr").resolve(strict=False))
]
for ds_id in datasets.keys()
"ds1": [str((tmp_path / "ds1" / "ds1.zarr").resolve(strict=False))],
"ds2": [str((tmp_path / "ds2" / "ds2.nc").resolve(strict=False))],
}


@pytest.mark.parametrize("eoap_mode", [False, True])
def test_save_datasets(tmp_path, dataset, eoap_mode):
datasets = {"ds1": dataset, "ds2": dataset}
@pytest.mark.parametrize("ds2_format", [None, "zarr", "netcdf"])
def test_save_datasets(tmp_path, dataset, eoap_mode, ds2_format):
datasets = {"ds1": dataset, "ds2": dataset.copy()}
if ds2_format is not None:
datasets["ds2"].attrs["xcengine_output_format"] = ds2_format
save_datasets(datasets, tmp_path, eoap_mode)
for ds_id in datasets.keys():
assert (
tmp_path / (ds_id if eoap_mode else "output") / (ds_id + ".zarr")
).is_dir()
def outdir(ds_id):
return tmp_path / (ds_id if eoap_mode else "output")
assert (outdir("ds1") / "ds1.zarr").is_dir()
ds2_suffix = "nc" if ds2_format == "netcdf" else "zarr"
ds2_path = outdir("ds2") / f"ds2.{ds2_suffix}"
if ds2_format == "netcdf":
assert ds2_path.is_file()
else:
assert ds2_path.is_dir()
catalogue_path = tmp_path / "catalog.json"
if eoap_mode:
assert catalogue_path.is_file()
Expand Down
23 changes: 15 additions & 8 deletions xcengine/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,18 +31,20 @@ def write_stac(
href=f"{catalog_path}",
)
for ds_name, ds in datasets.items():
zarr_name = ds_name + ".zarr"
zarr_path = stac_root / "output" / zarr_name
output_format = ds.attrs.get("xcengine_output_format", "zarr")
suffix = "nc" if output_format == "netcdf" else "zarr"
output_name = f"{ds_name}.{suffix}"
output_path = stac_root / "output" / output_name
asset_parent = stac_root / ds_name
asset_parent.mkdir(parents=True, exist_ok=True)
asset_path = asset_parent / zarr_name
if zarr_path.exists():
asset_path = asset_parent / output_name
if output_path.exists():
# If a Zarr for this asset is present in the output directory,
# move it into the corresponding STAC subdirectory. If not,
# we write the same STAC items with the same asset links anyway
# and assume that the caller will take care of actually writing
# the asset.
zarr_path.rename(asset_path)
output_path.rename(asset_path)
asset = pystac.Asset(
roles=["data", "visual"],
href=str(asset_path),
Expand All @@ -52,7 +54,7 @@ def write_stac(
# https://planetarycomputer.microsoft.com/api/stac/v1/collections/terraclimate
# uses the similar "application/vnd+zarr" but RFC 6838 mandates
# "." rather than "+".
media_type="application/vnd.zarr",
media_type="application/x-netcdf" if output_format == "netcdf" else "application/vnd.zarr",
title=ds.attrs.get("title", ds_name),
)
bb = namedtuple("Bounds", ["left", "bottom", "right", "top"])(
Expand Down Expand Up @@ -92,9 +94,14 @@ def save_datasets(
for ds_id, ds in datasets.items():
output_subpath = output_path / (ds_id if eoap_mode else "output")
output_subpath.mkdir(parents=True, exist_ok=True)
dataset_path = output_subpath / (ds_id + ".zarr")
output_format = ds.attrs.get("xcengine_output_format", "zarr")
suffix = "nc" if output_format == "netcdf" else "zarr"
dataset_path = output_subpath / f"{ds_id}.{suffix}"
saved_datasets[ds_id] = dataset_path
ds.to_zarr(dataset_path)
if output_format == "netcdf":
ds.to_netcdf(dataset_path)
else:
ds.to_zarr(dataset_path)
# The "finished" file is a flag to indicate to a runner when
# processing is complete, though the xcetool runner doesn't yet use it.
(output_path / "finished").touch()
Expand Down