diff --git a/pyproject.toml b/pyproject.toml index 31fd81a..5704e90 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -17,14 +17,13 @@ dependencies = [ "envyaml>=1.10.211231", "eumdac>=3.0.0", "geojson-pydantic>=2.1.0", - "h5netcdf==1.7.3", - "h5py==3.14.0", - "netcdf4==1.7.1", + "h5netcdf>=1.7.3", + "h5py>=3.15.1", + "netcdf4>=1.7.2", "odc-stac>=0.4.0", "pydantic>=2.11.9", "pydantic-settings>=2.10.1", "pyhdf>=0.11.6", - "pyorbital==1.10.2", "pyproj>=3.7.1", "pyresample>=1.31.0", "pyspectral>=0.13.6", diff --git a/src/satctl/sources/mtg.py b/src/satctl/sources/mtg.py index 4f300eb..8ebfecd 100644 --- a/src/satctl/sources/mtg.py +++ b/src/satctl/sources/mtg.py @@ -5,6 +5,7 @@ from pathlib import Path from typing import Any, cast +import dask.config import numpy as np from eumdac.datastore import DataStore from pydantic import BaseModel @@ -13,7 +14,7 @@ from satctl.auth import AuthBuilder from satctl.auth.eumetsat import EUMETSATAuthenticator from satctl.downloaders import DownloadBuilder, Downloader -from satctl.model import Granule, ProductInfo, SearchParams +from satctl.model import ConversionParams, Granule, ProductInfo, SearchParams from satctl.sources import DataSource from satctl.utils import extract_zip from satctl.writers import Writer @@ -264,6 +265,40 @@ def download_item(self, item: Granule, destination: Path, downloader: Downloader log.warning("Failed to download: %s", item.granule_id) return result + def save_item( + self, + item: Granule, + destination: Path, + writer: Writer, + params: ConversionParams, + force: bool = False, + ) -> dict[str, list]: + """Override to use synchronous dask scheduler for MTG FCI NetCDF processing. + + MTG FCI data is stored in NetCDF4 format, which **currently** is not thread-safe when used + with dask's default threaded scheduler. This override forces synchronous + execution to prevent race conditions during scene loading and computation. + + Args: + item (Granule): Granule to process + destination (Path): Base destination directory + writer (Writer): Writer instance for output + params (ConversionParams): Conversion parameters + force (bool): If True, overwrite existing files. Defaults to False. + + Returns: + dict[str, list]: Dictionary mapping granule_id to list of output paths. + Empty list means all files were skipped (already exist). + + Raises: + FileNotFoundError: If granule data not downloaded + ValueError: If invalid configuration + Exception: If processing fails (scene loading, resampling, writing) + """ + + with dask.config.set(scheduler="synchronous"): + return super().save_item(item, destination, writer, params, force) + def _write_scene_datasets( self, scene: Scene,