Skip to content

Commit f4adf49

Browse files
committed
multi job with better matching
1 parent eaa539e commit f4adf49

8 files changed

Lines changed: 282 additions & 201 deletions

File tree

.env.example

Whitespace-only changes.

TODO

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
- make the layers rout without the pred name
2+
- auto0 ofc
Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,13 @@
11
"""Services for cloudcasting_backend."""
22

3-
from cloudcasting_backend.services.s3_downloader import start_download_thread
3+
from cloudcasting_backend.services.s3_downloader import (
4+
run_update_job,
5+
get_download_status,
6+
get_current_forecast_folder,
7+
)
48

5-
__all__ = ["start_download_thread"]
9+
__all__ = [
10+
"run_update_job",
11+
"get_download_status",
12+
"get_current_forecast_folder",
13+
]

cloudcasting_backend/services/s3_downloader.py

Lines changed: 218 additions & 159 deletions
Large diffs are not rendered by default.

cloudcasting_backend/settings.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ class Settings(BaseSettings):
6464
env_file=".env",
6565
env_prefix="CLOUDCASTING_BACKEND_",
6666
env_file_encoding="utf-8",
67+
extra="ignore", # Ignore extra fields from .env file
6768
)
6869

6970

cloudcasting_backend/web/api/cloudcasting/views.py

Lines changed: 49 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -2,16 +2,16 @@
22

33
import os
44
from pathlib import Path
5-
from typing import List
5+
from typing import List, Optional
66

77
from fastapi import APIRouter, HTTPException, status
88
from fastapi.responses import FileResponse
99
from pydantic import BaseModel
1010

1111
from cloudcasting_backend.services.s3_downloader import (
12-
download_s3_folder,
12+
run_update_job,
13+
get_download_status,
1314
get_current_forecast_folder,
14-
convert_zarr_to_geotiffs,
1515
GEOTIFF_STORAGE_PATH,
1616
)
1717
from cloudcasting_backend.settings import settings
@@ -33,6 +33,22 @@ class AvailableLayersResponse(BaseModel):
3333
steps: List[int]
3434

3535

36+
class BackgroundTaskResponse(BaseModel):
37+
"""Response model for background tasks."""
38+
39+
task_id: str
40+
message: str
41+
42+
43+
class DownloadStatusResponse(BaseModel):
44+
"""Response model for download status."""
45+
46+
is_running: bool
47+
current_task: Optional[str] = None
48+
last_completed: Optional[str] = None
49+
error: Optional[str] = None
50+
51+
3652
@router.get("/status", response_model=CloudcastingResponse)
3753
async def get_cloudcasting_status() -> CloudcastingResponse:
3854
"""
@@ -61,48 +77,46 @@ async def get_cloudcasting_status() -> CloudcastingResponse:
6177
)
6278

6379

64-
@router.post("/trigger-download", response_model=CloudcastingResponse)
65-
async def trigger_download() -> CloudcastingResponse:
80+
@router.post("/trigger-download", response_model=BackgroundTaskResponse)
81+
async def trigger_download() -> BackgroundTaskResponse:
6682
"""
67-
Manually trigger a download of the latest cloudcasting data.
68-
Downloads the data and converts it to GeoTIFF format, then completes.
83+
Trigger a background download of the latest cloudcasting data.
84+
Downloads the data and converts it to GeoTIFF format in the background.
85+
Returns immediately with a task ID that can be used to check status.
6986
"""
7087
try:
71-
bucket_name = settings.s3_bucket_name
72-
s3_folder = get_current_forecast_folder()
73-
local_dir = settings.zarr_storage_path
74-
75-
# Ensure directory exists
76-
Path(local_dir).mkdir(parents=True, exist_ok=True)
77-
78-
# Download the data
79-
downloaded_zarr_path = download_s3_folder(bucket_name, s3_folder, local_dir)
80-
81-
# Convert to GeoTIFF if download was successful
82-
if downloaded_zarr_path:
83-
convert_zarr_to_geotiffs(downloaded_zarr_path, GEOTIFF_STORAGE_PATH)
84-
85-
# Check if data is available and return path
86-
latest_path = Path(local_dir) / "cloudcasting_forecast" / "latest.zarr"
87-
if latest_path.exists():
88-
relative_path = latest_path.relative_to(
89-
Path(settings.zarr_storage_path).parent,
90-
)
91-
data_path = f"/static/{relative_path}"
92-
else:
93-
data_path = ""
94-
95-
return CloudcastingResponse(
96-
message="Download and conversion completed successfully",
97-
data_path=data_path,
88+
# Directly call run_update_job to start the background process
89+
run_update_job()
90+
91+
# Get the current status to return task info
92+
status_info = get_download_status()
93+
task_id = status_info.get("current_task", "unknown")
94+
95+
return BackgroundTaskResponse(
96+
task_id=task_id,
97+
message=f"Background download started: {task_id}"
98+
)
99+
except RuntimeError as e:
100+
raise HTTPException(
101+
status_code=status.HTTP_409_CONFLICT,
102+
detail=str(e),
98103
)
99104
except Exception as e:
100105
raise HTTPException(
101106
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
102-
detail=f"Failed to download data: {e!s}",
107+
detail=f"Failed to start download: {e!s}",
103108
)
104109

105110

111+
@router.get("/download-status", response_model=DownloadStatusResponse)
112+
async def get_download_status_endpoint() -> DownloadStatusResponse:
113+
"""
114+
Get the current status of background download processes.
115+
"""
116+
status_info = get_download_status()
117+
return DownloadStatusResponse(**status_info)
118+
119+
106120
@router.get("/layers", response_model=AvailableLayersResponse)
107121
async def get_available_layers() -> AvailableLayersResponse:
108122
"""

cloudcasting_backend/web/lifespan.py

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
1+
import os
12
from contextlib import asynccontextmanager
23
from typing import AsyncGenerator
34

45
from fastapi import FastAPI
6+
from loguru import logger as log
57
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
68
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
79
from opentelemetry.sdk.resources import (
@@ -14,7 +16,6 @@
1416
from opentelemetry.sdk.trace.export import BatchSpanProcessor
1517
from opentelemetry.trace import set_tracer_provider
1618

17-
from cloudcasting_backend.services import start_download_thread
1819
from cloudcasting_backend.settings import settings
1920

2021

@@ -93,10 +94,6 @@ async def lifespan_setup(
9394
setup_opentelemetry(app)
9495
app.middleware_stack = app.build_middleware_stack()
9596

96-
# Start the scheduled S3 download thread
97-
# Disabled: Auto-running thread removed, use /trigger-download endpoint instead
98-
# if settings.environment != "pytest": # Don't start in test environment
99-
# start_download_thread()
10097

10198
yield
10299
stop_opentelemetry(app)

tests/test_auth.py

Whitespace-only changes.

0 commit comments

Comments
 (0)