Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 80 additions & 7 deletions src/satctl/sources/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,6 @@ def download_item(
"""
...

@abstractmethod
def save_item(
self,
item: Granule,
Expand All @@ -169,9 +168,58 @@ def save_item(
force (bool): If True, overwrite existing files. Defaults to False.

Returns:
dict[str, list]: Dictionary mapping granule_id to list of output paths
dict[str, list]: Dictionary mapping granule_id to list of output paths.
Empty list means all files were skipped (already exist).

Raises:
FileNotFoundError: If granule data not downloaded
ValueError: If invalid configuration
Exception: If processing fails (scene loading, resampling, writing)
"""
...
try:
# Validate inputs using base class helper
self._validate_save_inputs(item, params)

# Parse datasets using base class helper
datasets_dict = self._prepare_datasets(writer, params)

# Filter existing files using base class helper
datasets_dict = self._filter_existing_files(datasets_dict, destination, item.granule_id, writer, force)

# Early return if no datasets to process (all files already exist)
if not datasets_dict:
log.info("Skipping %s - all datasets already exist", item.granule_id)
return {item.granule_id: []}

# Load and resample scene
log.debug("Loading and resampling scene for %s", item.granule_id)
scene = self.load_scene(item, datasets=list(datasets_dict.values()))

# Define area using base class helper
area_def = self.define_area(
target_crs=params.target_crs_obj,
area=params.area_geometry,
scene=scene,
source_crs=params.source_crs_obj,
resolution=params.resolution,
)
scene = self.resample(scene, area_def=area_def)

# Write datasets using base class helper
result = self._write_scene_datasets(scene, datasets_dict, destination, item.granule_id, writer)

# Log success
num_files = len(result.get(item.granule_id, []))
log.info("Successfully processed %s - wrote %d file(s)", item.granule_id, num_files)
return result

except (FileNotFoundError, ValueError):
# Re-raise validation errors (these are user configuration issues)
raise
except Exception as e:
# Log processing errors and re-raise
log.error("Failed to process %s: %s", item.granule_id, str(e), exc_info=True)
raise

@property
def authenticator(self) -> Authenticator:
Expand Down Expand Up @@ -466,14 +514,16 @@ def save(
force (bool): If True, overwrite existing files. Defaults to False.

Returns:
tuple[list, list]: Tuple of (successful_items, failed_items)
tuple[list, list]: Tuple of (successful_items, failed_items).
Successful includes both processed and skipped items.
"""
if not isinstance(items, Iterable):
items = [items]
items = cast(list, items)

success = []
failure = []
skipped = [] # Track skipped items separately for logging
num_workers = num_workers or 1
batch_id = str(uuid.uuid4())
# this prevents pickle errors for unpicklable entities
Expand Down Expand Up @@ -504,10 +554,32 @@ def save(
}
for future in as_completed(future_to_item_map):
item = future_to_item_map[future]
if future.result():
success.append(item)
else:
try:
result = future.result()
# Check if files were actually written
files_written = result.get(item.granule_id, [])
if files_written:
# Files were written - successful processing
success.append(item)
else:
# Empty list = skipped (all files already existed)
success.append(item)
skipped.append(item)
except Exception as e:
# Worker raised an exception = processing failed
failure.append(item)
log.warning("Failed to process %s: %s", item.granule_id, str(e))

# Log summary
if skipped:
log.info(
"Batch complete: %d processed, %d skipped, %d failed",
len(success) - len(skipped),
len(skipped),
len(failure),
)
else:
log.info("Batch complete: %d processed, %d failed", len(success), len(failure))

emit_event(
ProgressEventType.BATCH_COMPLETED,
Expand All @@ -519,6 +591,7 @@ def save(
log.info("Interrupted, cleaning up...")
if executor:
executor.shutdown(wait=False, cancel_futures=True)
raise # Re-raise to allow outer handler to clean up
finally:
emit_event(
ProgressEventType.BATCH_COMPLETED,
Expand Down
58 changes: 1 addition & 57 deletions src/satctl/sources/earthdata.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,8 @@

from satctl.auth import AuthBuilder
from satctl.downloaders import DownloadBuilder, Downloader
from satctl.model import ConversionParams, Granule, ProductInfo, SearchParams
from satctl.model import Granule, ProductInfo, SearchParams
from satctl.sources import DataSource
from satctl.writers import Writer

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -421,61 +420,6 @@ def download_item(self, item: Granule, destination: Path, downloader: Downloader
item.to_file(granule_dir)
return True

def save_item(
self,
item: Granule,
destination: Path,
writer: Writer,
params: ConversionParams,
force: bool = False,
) -> dict[str, list]:
"""Save granule item to output files after processing.

Args:
item (Granule): Granule to process
destination (Path): Base destination directory
writer (Writer): Writer instance for output
params (ConversionParams): Conversion parameters
force (bool): If True, overwrite existing files. Defaults to False.

Returns:
dict[str, list]: Dictionary mapping granule_id to list of output paths
"""
# Validate inputs using base class helper
self._validate_save_inputs(item, params)

# Parse datasets using base class helper
datasets_dict = self._prepare_datasets(writer, params)

# Filter existing files using base class helper
datasets_dict = self._filter_existing_files(datasets_dict, destination, item.granule_id, writer, force)

# Early return if no datasets to process
if not datasets_dict:
log.debug("All datasets already exist for %s, skipping", item.granule_id)
return {item.granule_id: []}

# Get files for processing
files = self.get_files(item)
log.debug("Found %d files to process", len(files))

# Load and resample scene
log.debug("Loading and resampling scene")
scene = self.load_scene(item, datasets=list(datasets_dict.values()))

# Define area using base class helper
area_def = self.define_area(
target_crs=params.target_crs_obj,
area=params.area_geometry,
scene=scene,
source_crs=params.source_crs_obj,
resolution=params.resolution,
)
scene = self.resample(scene, area_def=area_def)

# Write datasets using base class helper
return self._write_scene_datasets(scene, datasets_dict, destination, item.granule_id, writer)

def validate(self, item: Granule) -> None:
"""Validate a granule.

Expand Down
75 changes: 52 additions & 23 deletions src/satctl/sources/mtg.py
Original file line number Diff line number Diff line change
Expand Up @@ -287,32 +287,61 @@ def save_item(
force (bool): If True, overwrite existing files. Defaults to False.

Returns:
dict[str, list]: Dictionary mapping granule_id to list of output paths
dict[str, list]: Dictionary mapping granule_id to list of output paths.
Empty list means all files were skipped (already exist).

Raises:
FileNotFoundError: If granule data not downloaded
ValueError: If invalid configuration
Exception: If processing fails (scene loading, resampling, writing)
"""
self._validate_save_inputs(item, params)
datasets_dict = self._prepare_datasets(writer, params)
datasets_dict = self._filter_existing_files(datasets_dict, destination, item.granule_id, writer, force)

with self._netcdf_lock:
# Load and resample scene
log.debug("Loading and resampling scene")
scene = self.load_scene(item, datasets=list(datasets_dict.values()))

# Define area using base class helper
area_def = self.define_area(
target_crs=params.target_crs_obj,
area=params.area_geometry,
scene=scene,
source_crs=params.source_crs_obj,
resolution=params.resolution,
)
scene = scene.compute()
scene = self.resample(scene, area_def=area_def)
try:
# Validate inputs using base class helper
self._validate_save_inputs(item, params)

# Parse datasets using base class helper
datasets_dict = self._prepare_datasets(writer, params)

# Filter existing files using base class helper
datasets_dict = self._filter_existing_files(datasets_dict, destination, item.granule_id, writer, force)

# Early return if no datasets to process (all files already exist)
if not datasets_dict:
log.info("Skipping %s - all datasets already exist", item.granule_id)
return {item.granule_id: []}

# Load and resample scene (with thread-safe NetCDF locking)
with self._netcdf_lock:
log.debug("Loading and resampling scene for %s", item.granule_id)
scene = self.load_scene(item, datasets=list(datasets_dict.values()))

# Define area using base class helper
area_def = self.define_area(
target_crs=params.target_crs_obj,
area=params.area_geometry,
scene=scene,
source_crs=params.source_crs_obj,
resolution=params.resolution,
)
# Compute scene before resampling (MTG-specific)
scene = scene.compute()
scene = self.resample(scene, area_def=area_def)

# Write datasets using base class helper
res = self._write_scene_datasets(scene, datasets_dict, destination, item.granule_id, writer)
# Write datasets using base class helper
result = self._write_scene_datasets(scene, datasets_dict, destination, item.granule_id, writer)

return res
# Log success
num_files = len(result.get(item.granule_id, []))
log.info("Successfully processed %s - wrote %d file(s)", item.granule_id, num_files)
return result

except (FileNotFoundError, ValueError):
# Re-raise validation errors (these are user configuration issues)
raise
except Exception as e:
# Log processing errors and re-raise
log.error("Failed to process %s: %s", item.granule_id, str(e), exc_info=True)
raise

def _write_scene_datasets(
self,
Expand Down
86 changes: 1 addition & 85 deletions src/satctl/sources/sentinel1.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,17 @@
import logging
import re
from abc import abstractmethod
from collections import defaultdict
from datetime import datetime, timezone
from pathlib import Path
from typing import cast

from pydantic import BaseModel
from pystac_client import Client
from xarray import DataArray

from satctl.auth import AuthBuilder
from satctl.downloaders import DownloadBuilder, Downloader
from satctl.model import ConversionParams, Granule, ProductInfo, SearchParams
from satctl.model import Granule, ProductInfo, SearchParams
from satctl.sources import DataSource
from satctl.writers import Writer

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -342,87 +339,6 @@ def download_item(self, item: Granule, destination: Path, downloader: Downloader

return all_success

def save_item(
self,
item: Granule,
destination: Path,
writer: Writer,
params: ConversionParams,
force: bool = False,
) -> dict[str, list]:
"""Process and save Sentinel-1 granule to output files.

Workflow:
1. Validate inputs (local files exist, datasets specified)
2. Load scene with SAR data using sar-c_safe reader
3. Define target area (from params or full granule extent)
4. Resample to target projection and resolution
5. Write datasets to output files

Args:
item: Granule to process (must have local_path set)
destination: Base destination directory for outputs
writer: Writer instance for file output (GeoTIFF, NetCDF, etc.)
params: Conversion parameters including:
- datasets: List of datasets to process
- area_geometry: Optional AOI for spatial subsetting
- target_crs: Target coordinate reference system
- resolution: Target spatial resolution
force: If True, overwrite existing output files. Defaults to False.

Returns:
Dictionary mapping granule_id to list of output file paths

Raises:
FileNotFoundError: If local_path doesn't exist
ValueError: If datasets is None and no default composite is configured
"""
# Validate that granule was downloaded
if item.local_path is None or not item.local_path.exists():
raise FileNotFoundError(f"Invalid source file or directory: {item.local_path}")

# Ensure datasets are specified
if params.datasets is None and self.default_composite is None:
raise ValueError("Missing datasets or default composite for storage")

# Parse dataset names and prepare output filenames
datasets_dict = self._prepare_datasets(writer, params)

# Filter existing files using base class helper
datasets_dict = self._filter_existing_files(datasets_dict, destination, item.granule_id, writer, force)

# Load scene with requested SAR datasets
log.debug("Loading and resampling scene")
scene = self.load_scene(item, datasets=list(datasets_dict.values()))

# Define target area for resampling
area_def = self.define_area(
target_crs=params.target_crs_obj,
area=params.area_geometry,
scene=scene,
source_crs=params.source_crs_obj,
resolution=params.resolution,
)

# Resample to target area
scene = self.resample(scene, area_def=area_def)

# Write each dataset to output file
paths: dict[str, list] = defaultdict(list)
output_dir = destination / item.granule_id
output_dir.mkdir(exist_ok=True, parents=True)

for dataset_name, file_name in datasets_dict.items():
output_path = output_dir / f"{file_name}.{writer.extension}"
paths[item.granule_id].append(
writer.write(
dataset=cast(DataArray, scene[dataset_name]),
output_path=output_path,
)
)

return paths


class Sentinel1GRDSource(Sentinel1Source):
"""Source for Sentinel-1 Ground Range Detected (GRD) products.
Expand Down
Loading
Loading