diff --git a/src/satctl/sources/base.py b/src/satctl/sources/base.py index bbafd1c..6e953a0 100644 --- a/src/satctl/sources/base.py +++ b/src/satctl/sources/base.py @@ -150,7 +150,6 @@ def download_item( """ ... - @abstractmethod def save_item( self, item: Granule, @@ -169,9 +168,58 @@ def save_item( force (bool): If True, overwrite existing files. Defaults to False. Returns: - dict[str, list]: Dictionary mapping granule_id to list of output paths + dict[str, list]: Dictionary mapping granule_id to list of output paths. + Empty list means all files were skipped (already exist). + + Raises: + FileNotFoundError: If granule data not downloaded + ValueError: If invalid configuration + Exception: If processing fails (scene loading, resampling, writing) """ - ... + try: + # Validate inputs using base class helper + self._validate_save_inputs(item, params) + + # Parse datasets using base class helper + datasets_dict = self._prepare_datasets(writer, params) + + # Filter existing files using base class helper + datasets_dict = self._filter_existing_files(datasets_dict, destination, item.granule_id, writer, force) + + # Early return if no datasets to process (all files already exist) + if not datasets_dict: + log.info("Skipping %s - all datasets already exist", item.granule_id) + return {item.granule_id: []} + + # Load and resample scene + log.debug("Loading and resampling scene for %s", item.granule_id) + scene = self.load_scene(item, datasets=list(datasets_dict.values())) + + # Define area using base class helper + area_def = self.define_area( + target_crs=params.target_crs_obj, + area=params.area_geometry, + scene=scene, + source_crs=params.source_crs_obj, + resolution=params.resolution, + ) + scene = self.resample(scene, area_def=area_def) + + # Write datasets using base class helper + result = self._write_scene_datasets(scene, datasets_dict, destination, item.granule_id, writer) + + # Log success + num_files = len(result.get(item.granule_id, [])) + log.info("Successfully processed %s - wrote %d file(s)", item.granule_id, num_files) + return result + + except (FileNotFoundError, ValueError): + # Re-raise validation errors (these are user configuration issues) + raise + except Exception as e: + # Log processing errors and re-raise + log.error("Failed to process %s: %s", item.granule_id, str(e), exc_info=True) + raise @property def authenticator(self) -> Authenticator: @@ -466,7 +514,8 @@ def save( force (bool): If True, overwrite existing files. Defaults to False. Returns: - tuple[list, list]: Tuple of (successful_items, failed_items) + tuple[list, list]: Tuple of (successful_items, failed_items). + Successful includes both processed and skipped items. """ if not isinstance(items, Iterable): items = [items] @@ -474,6 +523,7 @@ def save( success = [] failure = [] + skipped = [] # Track skipped items separately for logging num_workers = num_workers or 1 batch_id = str(uuid.uuid4()) # this prevents pickle errors for unpicklable entities @@ -504,10 +554,32 @@ def save( } for future in as_completed(future_to_item_map): item = future_to_item_map[future] - if future.result(): - success.append(item) - else: + try: + result = future.result() + # Check if files were actually written + files_written = result.get(item.granule_id, []) + if files_written: + # Files were written - successful processing + success.append(item) + else: + # Empty list = skipped (all files already existed) + success.append(item) + skipped.append(item) + except Exception as e: + # Worker raised an exception = processing failed failure.append(item) + log.warning("Failed to process %s: %s", item.granule_id, str(e)) + + # Log summary + if skipped: + log.info( + "Batch complete: %d processed, %d skipped, %d failed", + len(success) - len(skipped), + len(skipped), + len(failure), + ) + else: + log.info("Batch complete: %d processed, %d failed", len(success), len(failure)) emit_event( ProgressEventType.BATCH_COMPLETED, @@ -519,6 +591,7 @@ def save( log.info("Interrupted, cleaning up...") if executor: executor.shutdown(wait=False, cancel_futures=True) + raise # Re-raise to allow outer handler to clean up finally: emit_event( ProgressEventType.BATCH_COMPLETED, diff --git a/src/satctl/sources/earthdata.py b/src/satctl/sources/earthdata.py index c32aefa..545c885 100644 --- a/src/satctl/sources/earthdata.py +++ b/src/satctl/sources/earthdata.py @@ -11,9 +11,8 @@ from satctl.auth import AuthBuilder from satctl.downloaders import DownloadBuilder, Downloader -from satctl.model import ConversionParams, Granule, ProductInfo, SearchParams +from satctl.model import Granule, ProductInfo, SearchParams from satctl.sources import DataSource -from satctl.writers import Writer log = logging.getLogger(__name__) @@ -421,61 +420,6 @@ def download_item(self, item: Granule, destination: Path, downloader: Downloader item.to_file(granule_dir) return True - def save_item( - self, - item: Granule, - destination: Path, - writer: Writer, - params: ConversionParams, - force: bool = False, - ) -> dict[str, list]: - """Save granule item to output files after processing. - - Args: - item (Granule): Granule to process - destination (Path): Base destination directory - writer (Writer): Writer instance for output - params (ConversionParams): Conversion parameters - force (bool): If True, overwrite existing files. Defaults to False. - - Returns: - dict[str, list]: Dictionary mapping granule_id to list of output paths - """ - # Validate inputs using base class helper - self._validate_save_inputs(item, params) - - # Parse datasets using base class helper - datasets_dict = self._prepare_datasets(writer, params) - - # Filter existing files using base class helper - datasets_dict = self._filter_existing_files(datasets_dict, destination, item.granule_id, writer, force) - - # Early return if no datasets to process - if not datasets_dict: - log.debug("All datasets already exist for %s, skipping", item.granule_id) - return {item.granule_id: []} - - # Get files for processing - files = self.get_files(item) - log.debug("Found %d files to process", len(files)) - - # Load and resample scene - log.debug("Loading and resampling scene") - scene = self.load_scene(item, datasets=list(datasets_dict.values())) - - # Define area using base class helper - area_def = self.define_area( - target_crs=params.target_crs_obj, - area=params.area_geometry, - scene=scene, - source_crs=params.source_crs_obj, - resolution=params.resolution, - ) - scene = self.resample(scene, area_def=area_def) - - # Write datasets using base class helper - return self._write_scene_datasets(scene, datasets_dict, destination, item.granule_id, writer) - def validate(self, item: Granule) -> None: """Validate a granule. diff --git a/src/satctl/sources/mtg.py b/src/satctl/sources/mtg.py index 80d94ee..c06150a 100644 --- a/src/satctl/sources/mtg.py +++ b/src/satctl/sources/mtg.py @@ -287,32 +287,61 @@ def save_item( force (bool): If True, overwrite existing files. Defaults to False. Returns: - dict[str, list]: Dictionary mapping granule_id to list of output paths + dict[str, list]: Dictionary mapping granule_id to list of output paths. + Empty list means all files were skipped (already exist). + + Raises: + FileNotFoundError: If granule data not downloaded + ValueError: If invalid configuration + Exception: If processing fails (scene loading, resampling, writing) """ - self._validate_save_inputs(item, params) - datasets_dict = self._prepare_datasets(writer, params) - datasets_dict = self._filter_existing_files(datasets_dict, destination, item.granule_id, writer, force) - - with self._netcdf_lock: - # Load and resample scene - log.debug("Loading and resampling scene") - scene = self.load_scene(item, datasets=list(datasets_dict.values())) - - # Define area using base class helper - area_def = self.define_area( - target_crs=params.target_crs_obj, - area=params.area_geometry, - scene=scene, - source_crs=params.source_crs_obj, - resolution=params.resolution, - ) - scene = scene.compute() - scene = self.resample(scene, area_def=area_def) + try: + # Validate inputs using base class helper + self._validate_save_inputs(item, params) + + # Parse datasets using base class helper + datasets_dict = self._prepare_datasets(writer, params) + + # Filter existing files using base class helper + datasets_dict = self._filter_existing_files(datasets_dict, destination, item.granule_id, writer, force) + + # Early return if no datasets to process (all files already exist) + if not datasets_dict: + log.info("Skipping %s - all datasets already exist", item.granule_id) + return {item.granule_id: []} + + # Load and resample scene (with thread-safe NetCDF locking) + with self._netcdf_lock: + log.debug("Loading and resampling scene for %s", item.granule_id) + scene = self.load_scene(item, datasets=list(datasets_dict.values())) + + # Define area using base class helper + area_def = self.define_area( + target_crs=params.target_crs_obj, + area=params.area_geometry, + scene=scene, + source_crs=params.source_crs_obj, + resolution=params.resolution, + ) + # Compute scene before resampling (MTG-specific) + scene = scene.compute() + scene = self.resample(scene, area_def=area_def) - # Write datasets using base class helper - res = self._write_scene_datasets(scene, datasets_dict, destination, item.granule_id, writer) + # Write datasets using base class helper + result = self._write_scene_datasets(scene, datasets_dict, destination, item.granule_id, writer) - return res + # Log success + num_files = len(result.get(item.granule_id, [])) + log.info("Successfully processed %s - wrote %d file(s)", item.granule_id, num_files) + return result + + except (FileNotFoundError, ValueError): + # Re-raise validation errors (these are user configuration issues) + raise + except Exception as e: + # Log processing errors and re-raise + log.error("Failed to process %s: %s", item.granule_id, str(e), exc_info=True) + raise def _write_scene_datasets( self, diff --git a/src/satctl/sources/sentinel1.py b/src/satctl/sources/sentinel1.py index ed70c5d..100accc 100644 --- a/src/satctl/sources/sentinel1.py +++ b/src/satctl/sources/sentinel1.py @@ -1,20 +1,17 @@ import logging import re from abc import abstractmethod -from collections import defaultdict from datetime import datetime, timezone from pathlib import Path from typing import cast from pydantic import BaseModel from pystac_client import Client -from xarray import DataArray from satctl.auth import AuthBuilder from satctl.downloaders import DownloadBuilder, Downloader -from satctl.model import ConversionParams, Granule, ProductInfo, SearchParams +from satctl.model import Granule, ProductInfo, SearchParams from satctl.sources import DataSource -from satctl.writers import Writer log = logging.getLogger(__name__) @@ -342,87 +339,6 @@ def download_item(self, item: Granule, destination: Path, downloader: Downloader return all_success - def save_item( - self, - item: Granule, - destination: Path, - writer: Writer, - params: ConversionParams, - force: bool = False, - ) -> dict[str, list]: - """Process and save Sentinel-1 granule to output files. - - Workflow: - 1. Validate inputs (local files exist, datasets specified) - 2. Load scene with SAR data using sar-c_safe reader - 3. Define target area (from params or full granule extent) - 4. Resample to target projection and resolution - 5. Write datasets to output files - - Args: - item: Granule to process (must have local_path set) - destination: Base destination directory for outputs - writer: Writer instance for file output (GeoTIFF, NetCDF, etc.) - params: Conversion parameters including: - - datasets: List of datasets to process - - area_geometry: Optional AOI for spatial subsetting - - target_crs: Target coordinate reference system - - resolution: Target spatial resolution - force: If True, overwrite existing output files. Defaults to False. - - Returns: - Dictionary mapping granule_id to list of output file paths - - Raises: - FileNotFoundError: If local_path doesn't exist - ValueError: If datasets is None and no default composite is configured - """ - # Validate that granule was downloaded - if item.local_path is None or not item.local_path.exists(): - raise FileNotFoundError(f"Invalid source file or directory: {item.local_path}") - - # Ensure datasets are specified - if params.datasets is None and self.default_composite is None: - raise ValueError("Missing datasets or default composite for storage") - - # Parse dataset names and prepare output filenames - datasets_dict = self._prepare_datasets(writer, params) - - # Filter existing files using base class helper - datasets_dict = self._filter_existing_files(datasets_dict, destination, item.granule_id, writer, force) - - # Load scene with requested SAR datasets - log.debug("Loading and resampling scene") - scene = self.load_scene(item, datasets=list(datasets_dict.values())) - - # Define target area for resampling - area_def = self.define_area( - target_crs=params.target_crs_obj, - area=params.area_geometry, - scene=scene, - source_crs=params.source_crs_obj, - resolution=params.resolution, - ) - - # Resample to target area - scene = self.resample(scene, area_def=area_def) - - # Write each dataset to output file - paths: dict[str, list] = defaultdict(list) - output_dir = destination / item.granule_id - output_dir.mkdir(exist_ok=True, parents=True) - - for dataset_name, file_name in datasets_dict.items(): - output_path = output_dir / f"{file_name}.{writer.extension}" - paths[item.granule_id].append( - writer.write( - dataset=cast(DataArray, scene[dataset_name]), - output_path=output_path, - ) - ) - - return paths - class Sentinel1GRDSource(Sentinel1Source): """Source for Sentinel-1 Ground Range Detected (GRD) products. diff --git a/src/satctl/sources/sentinel2.py b/src/satctl/sources/sentinel2.py index 9126509..20106c0 100644 --- a/src/satctl/sources/sentinel2.py +++ b/src/satctl/sources/sentinel2.py @@ -11,9 +11,8 @@ from satctl.auth import AuthBuilder from satctl.downloaders import DownloadBuilder, Downloader -from satctl.model import ConversionParams, Granule, ProductInfo, SearchParams +from satctl.model import Granule, ProductInfo, SearchParams from satctl.sources import DataSource -from satctl.writers import Writer log = logging.getLogger(__name__) @@ -345,47 +344,6 @@ def download_item(self, item: Granule, destination: Path, downloader: Downloader log.warning("Failed to download all required assets for: %s", item.granule_id) return all_success - def save_item( - self, - item: Granule, - destination: Path, - writer: Writer, - params: ConversionParams, - force: bool = False, - ) -> dict[str, list]: - """Save granule item to output files after processing. - - Args: - item (Granule): Granule to process - destination (Path): Base destination directory - writer (Writer): Writer instance for output - params (ConversionParams): Conversion parameters - force (bool): If True, overwrite existing files. Defaults to False. - - Returns: - dict[str, list]: Dictionary mapping granule_id to list of output paths - """ - # Validate inputs using base class helper - self._validate_save_inputs(item, params) - # Parse datasets using base class helper - datasets_dict = self._prepare_datasets(writer, params) - # Filter existing files using base class helper - datasets_dict = self._filter_existing_files(datasets_dict, destination, item.granule_id, writer, force) - # Load and resample scene - log.debug("Loading and resampling scene") - scene = self.load_scene(item, datasets=list(datasets_dict.values())) - # Define area using base class helper - area_def = self.define_area( - target_crs=params.target_crs_obj, - area=params.area_geometry, - scene=scene, - source_crs=params.source_crs_obj, - resolution=params.resolution, - ) - scene = self.resample(scene, area_def=area_def) - # Write datasets using base class helper - return self._write_scene_datasets(scene, datasets_dict, destination, item.granule_id, writer) - class Sentinel2L2ASource(Sentinel2Source): """Source for Sentinel-2 MSI L2A product.""" diff --git a/src/satctl/sources/sentinel3.py b/src/satctl/sources/sentinel3.py index c5a4bb8..688e063 100644 --- a/src/satctl/sources/sentinel3.py +++ b/src/satctl/sources/sentinel3.py @@ -230,36 +230,66 @@ def save_item( force (bool): If True, overwrite existing files. Defaults to False. Returns: - dict[str, list]: Dictionary mapping granule_id to list of output paths + dict[str, list]: Dictionary mapping granule_id to list of output paths. + Empty list means all files were skipped (already exist). + + Raises: + FileNotFoundError: If granule data not downloaded + ValueError: If invalid configuration + Exception: If processing fails (scene loading, resampling, writing) """ - self._validate_save_inputs(item, params) - datasets_dict = self._prepare_datasets(writer, params) - datasets_dict = self._filter_existing_files(datasets_dict, destination, item.granule_id, writer, force) - - # Load and resample scene - log.debug("Loading and resampling scene") - - # workaround patch to fix broker SLSTR reader - # see https://github.com/pytroll/satpy/issues/3251 - # TLDR: SLSTR revision 004 switches band F1 from stripe i to f - # current satpy reader does not allow for missing files - custom_reader = None - if item.info.instrument == "slstr" and item.granule_id.endswith("004"): - custom_reader = f"{self.reader}_rev4" - scene = self.load_scene(item, reader=custom_reader, datasets=list(datasets_dict.values())) - - # Define area using base class helper - area_def = self.define_area( - target_crs=params.target_crs_obj, - area=params.area_geometry, - scene=scene, - source_crs=params.source_crs_obj, - resolution=params.resolution, - ) - scene = self.resample(scene, area_def=area_def) + try: + # Validate inputs using base class helper + self._validate_save_inputs(item, params) + + # Parse datasets using base class helper + datasets_dict = self._prepare_datasets(writer, params) + + # Filter existing files using base class helper + datasets_dict = self._filter_existing_files(datasets_dict, destination, item.granule_id, writer, force) + + # Early return if no datasets to process (all files already exist) + if not datasets_dict: + log.info("Skipping %s - all datasets already exist", item.granule_id) + return {item.granule_id: []} + + # Load and resample scene + log.debug("Loading and resampling scene for %s", item.granule_id) + + # workaround patch to fix broker SLSTR reader + # see https://github.com/pytroll/satpy/issues/3251 + # TLDR: SLSTR revision 004 switches band F1 from stripe i to f + # current satpy reader does not allow for missing files + custom_reader = None + if item.info.instrument == "slstr" and item.granule_id.endswith("004"): + custom_reader = f"{self.reader}_rev4" + scene = self.load_scene(item, reader=custom_reader, datasets=list(datasets_dict.values())) + + # Define area using base class helper + area_def = self.define_area( + target_crs=params.target_crs_obj, + area=params.area_geometry, + scene=scene, + source_crs=params.source_crs_obj, + resolution=params.resolution, + ) + scene = self.resample(scene, area_def=area_def) + + # Write datasets using base class helper + result = self._write_scene_datasets(scene, datasets_dict, destination, item.granule_id, writer) - # Write datasets using base class helper - return self._write_scene_datasets(scene, datasets_dict, destination, item.granule_id, writer) + # Log success + num_files = len(result.get(item.granule_id, [])) + log.info("Successfully processed %s - wrote %d file(s)", item.granule_id, num_files) + return result + + except (FileNotFoundError, ValueError): + # Re-raise validation errors (these are user configuration issues) + raise + except Exception as e: + # Log processing errors and re-raise + log.error("Failed to process %s: %s", item.granule_id, str(e), exc_info=True) + raise class SLSTRSource(Sentinel3Source):