Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,13 @@ dependencies = [
"envyaml>=1.10.211231",
"eumdac>=3.0.0",
"geojson-pydantic>=2.1.0",
"h5netcdf==1.7.3",
"h5py==3.14.0",
"netcdf4==1.7.1",
"h5netcdf>=1.7.3",
"h5py>=3.15.1",
"netcdf4>=1.7.2",
"odc-stac>=0.4.0",
"pydantic>=2.11.9",
"pydantic-settings>=2.10.1",
"pyhdf>=0.11.6",
"pyorbital==1.10.2",
"pyproj>=3.7.1",
"pyresample>=1.31.0",
"pyspectral>=0.13.6",
Expand Down
37 changes: 36 additions & 1 deletion src/satctl/sources/mtg.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from pathlib import Path
from typing import Any, cast

import dask.config
import numpy as np
from eumdac.datastore import DataStore
from pydantic import BaseModel
Expand All @@ -13,7 +14,7 @@
from satctl.auth import AuthBuilder
from satctl.auth.eumetsat import EUMETSATAuthenticator
from satctl.downloaders import DownloadBuilder, Downloader
from satctl.model import Granule, ProductInfo, SearchParams
from satctl.model import ConversionParams, Granule, ProductInfo, SearchParams
from satctl.sources import DataSource
from satctl.utils import extract_zip
from satctl.writers import Writer
Expand Down Expand Up @@ -264,6 +265,40 @@ def download_item(self, item: Granule, destination: Path, downloader: Downloader
log.warning("Failed to download: %s", item.granule_id)
return result

def save_item(
self,
item: Granule,
destination: Path,
writer: Writer,
params: ConversionParams,
force: bool = False,
) -> dict[str, list]:
"""Override to use synchronous dask scheduler for MTG FCI NetCDF processing.

MTG FCI data is stored in NetCDF4 format, which **currently** is not thread-safe when used
with dask's default threaded scheduler. This override forces synchronous
execution to prevent race conditions during scene loading and computation.

Args:
item (Granule): Granule to process
destination (Path): Base destination directory
writer (Writer): Writer instance for output
params (ConversionParams): Conversion parameters
force (bool): If True, overwrite existing files. Defaults to False.

Returns:
dict[str, list]: Dictionary mapping granule_id to list of output paths.
Empty list means all files were skipped (already exist).

Raises:
FileNotFoundError: If granule data not downloaded
ValueError: If invalid configuration
Exception: If processing fails (scene loading, resampling, writing)
"""

with dask.config.set(scheduler="synchronous"):
return super().save_item(item, destination, writer, params, force)

def _write_scene_datasets(
self,
scene: Scene,
Expand Down