Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
5a163ca
ruff changes
yawenzzzz Oct 17, 2024
7877e59
add api main
yawenzzzz Oct 17, 2024
c400985
change number of workers to 1
yawenzzzz Oct 17, 2024
8f05ca3
merge conflict
yawenzzzz Oct 17, 2024
7b53fdb
merge conflict
yawenzzzz Oct 17, 2024
2bfc38e
merge conflict
yawenzzzz Oct 17, 2024
19421b5
minor change
yawenzzzz Oct 16, 2024
02ab349
fastapi is working now
yawenzzzz Oct 17, 2024
8c2136b
ruff change
yawenzzzz Oct 17, 2024
f7081e4
ruff
yawenzzzz Oct 18, 2024
b64ab97
minor changes
yawenzzzz Oct 18, 2024
cbdeec9
update dockerfile
yawenzzzz Oct 18, 2024
9df535d
set use_initial_job = True
yawenzzzz Oct 18, 2024
a1b6d1e
move example to scripts folder
yawenzzzz Oct 19, 2024
8691f53
changes based on comments
yawenzzzz Oct 21, 2024
1d7d4d6
uncomment restore config
yawenzzzz Oct 21, 2024
164e895
Add test for Landsat prediction pipeline
favyen2 Oct 21, 2024
0babbbe
add aws credentials
favyen2 Oct 21, 2024
12d6a5b
fix
favyen2 Oct 21, 2024
67f29b5
makes path arguments as optional
yawenzzzz Oct 21, 2024
3f546bc
add gcs access
favyen2 Oct 21, 2024
2fd1847
make sure either scene_id or image_files are provided
yawenzzzz Oct 21, 2024
5a1149c
write crops only if specified path
yawenzzzz Oct 21, 2024
3a9cad1
Merge branch 'pbeukema/fastapi' of github.com:allenai/rslearn_project…
yawenzzzz Oct 21, 2024
ee16650
fix bug with b8_fname not defined and tmp_dir.cleanup not exist
favyen2 Oct 21, 2024
dd674b9
fix
uakfdotb Oct 22, 2024
ec987af
fix
uakfdotb Oct 22, 2024
d505f18
don't run slow test in CI
uakfdotb Oct 22, 2024
c0396a3
Merge remote-tracking branch 'origin/master' into pbeukema/fastapi
favyen2 Oct 22, 2024
e2790b7
fix lint issues
favyen2 Oct 22, 2024
eb0b5a5
minor change
yawenzzzz Oct 22, 2024
850429c
Merge branch 'pbeukema/fastapi' of github.com:allenai/rslearn_project…
yawenzzzz Oct 22, 2024
b1cea89
reduce batch_size to resolve the error: received 0 items of ancdata
yawenzzzz Oct 22, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion .github/workflows/build_test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,20 @@ jobs:
run: |
COMPOSE_DOCKER_CLI_BUILD=1 DOCKER_BUILDKIT=1 docker compose -f docker-compose.yaml build

- name: Authenticate into gcp
uses: "google-github-actions/auth@v2"
with:
credentials_json: ${{ secrets.GOOGLE_CREDENTIALS }}

- name: Run tests with Docker Compose
run: |
docker compose -f docker-compose.yaml run test pytest tests/
docker compose -f docker-compose.yaml run \
-e AWS_ACCESS_KEY_ID=${{ secrets.AWS_ACCESS_KEY_ID }} \
-e AWS_SECRET_ACCESS_KEY=${{ secrets.AWS_SECRET_ACCESS_KEY }} \
-v ${{env.GOOGLE_GHA_CREDS_PATH}}:/tmp/gcp-credentials.json:ro \
-e GOOGLE_APPLICATION_CREDENTIALS=/tmp/gcp-credentials.json \
-e RSLP_BUCKET=rslearn-eai \
test pytest tests/ --ignore tests/integration_slow/

- name: Clean up
if: always()
Expand Down
6 changes: 1 addition & 5 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,12 +1,8 @@
FROM pytorch/pytorch:2.4.0-cuda11.8-cudnn9-runtime@sha256:58a28ab734f23561aa146fbaf777fb319a953ca1e188832863ed57d510c9f197

# TEMPORARY Until RSLEARN Is Public
ARG GIT_USERNAME
ARG GIT_TOKEN

RUN apt update
RUN apt install -y libpq-dev ffmpeg libsm6 libxext6 git
RUN git clone https://${GIT_USERNAME}:${GIT_TOKEN}@github.com/allenai/rslearn.git /opt/rslearn_projects/rslearn
RUN git clone https://github.com/allenai/rslearn.git /opt/rslearn_projects/rslearn
RUN pip install -r /opt/rslearn_projects/rslearn/requirements.txt
RUN pip install -r /opt/rslearn_projects/rslearn/extra_requirements.txt
COPY requirements.txt /opt/rslearn_projects/requirements.txt
Expand Down
4 changes: 3 additions & 1 deletion landsat/recheck_landsat_labels/phase123_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,12 @@ data:
allow_invalid: true
skip_unknown_categories: true
prob_property: "prob"
positive_class: "correct"
positive_class_threshold: 0.85
input_mapping:
class:
label: "targets"
batch_size: 64
batch_size: 32
num_workers: 32
default_config:
transforms:
Expand Down
5 changes: 5 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
beaker-py
fastapi
interrogate
pydantic
pytest
python-dotenv
ruff
typing-extensions
uvicorn
12 changes: 12 additions & 0 deletions rslp/landsat_vessels/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# Base image
FROM base-image:latest

# Environment variables
ENV PYTHONPATH="/opt/rslearn_projects:${PYTHONPATH}"
ENV LANDSAT_PORT=5555

# Make port 5555 available to the world outside this container
EXPOSE $LANDSAT_PORT

# Run app.py when the container launches
CMD ["python3", "rslp/landsat_vessels/api_main.py"]
98 changes: 98 additions & 0 deletions rslp/landsat_vessels/api_main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
"""Landsat Vessel Detection Service."""

from __future__ import annotations

import logging
import multiprocessing
import os

import uvicorn
from fastapi import FastAPI, Response
from pydantic import BaseModel

from rslp.landsat_vessels.predict_pipeline import FormattedPrediction, predict_pipeline

app = FastAPI()

# Set up the logger
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

LANDSAT_HOST = "0.0.0.0"
LANDSAT_PORT = 5555


class LandsatResponse(BaseModel):
"""Response object for vessel detections."""

status: list[str]
predictions: list[FormattedPrediction]


class LandsatRequest(BaseModel):
"""Request object for vessel detections."""

scene_id: str | None = None
image_files: dict[str, str] | None = None
crop_path: str | None = None
scratch_path: str | None = None
json_path: str | None = None


@app.on_event("startup")
async def rslp_init() -> None:
"""Landsat Vessel Service Initialization."""
logger.info("Initializing")
multiprocessing.set_start_method("forkserver", force=True)
multiprocessing.set_forkserver_preload(
[
"rslp.utils.rslearn.materialize_dataset",
"rslp.utils.rslearn.run_model_predict",
]
)


@app.get("/")
async def home() -> dict:
"""Returns a simple message to indicate the service is running."""
return {"message": "Landsat Detections App"}


@app.post("/detections", response_model=LandsatResponse)
async def get_detections(info: LandsatRequest, response: Response) -> LandsatResponse:
"""Returns vessel detections Response object for a given Request object."""
# Ensure that either scene_id or image_files is specified.
if info.scene_id is None and info.image_files is None:
raise ValueError("Either scene_id or image_files must be specified.")

try:
if info.scene_id is not None:
logger.info(f"Received request with scene_id: {info.scene_id}")
elif info.image_files is not None:
logger.info("Received request with image_files")
json_data = predict_pipeline(
crop_path=info.crop_path,
scene_id=info.scene_id,
image_files=info.image_files,
scratch_path=info.scratch_path,
json_path=info.json_path,
)
return LandsatResponse(
status=["success"],
predictions=[pred for pred in json_data],
)
except ValueError as e:
logger.error(f"Value error during prediction pipeline: {e}")
return LandsatResponse(status=["error"], predictions=[])
except Exception as e:
logger.error(f"Unexpected error during prediction pipeline: {e}")
return LandsatResponse(status=["error"], predictions=[])


if __name__ == "__main__":
uvicorn.run(
"api_main:app",
host=os.getenv("LANDSAT_HOST", default="0.0.0.0"),
port=int(os.getenv("LANDSAT_PORT", default=5555)),
proxy_headers=True,
)
33 changes: 33 additions & 0 deletions rslp/landsat_vessels/docker-compose.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
version: "3.9"

services:
# Define the base image
base-image:
build:
context: ../..
dockerfile: Dockerfile
image: base-image:latest # Tag it as "base-image"

# Define the landsat-vessels service
landsat-vessels:
build:
context: .
dockerfile: Dockerfile
shm_size: '10G' # This adds the shared memory size
depends_on:
- base-image
ports:
- "5555:5555"
environment:
- RSLP_BUCKET
- S3_ACCESS_KEY_ID
- S3_SECRET_ACCESS_KEY
- AWS_ACCESS_KEY_ID
- AWS_SECRET_ACCESS_KEY
- NVIDIA_VISIBLE_DEVICES=all # Make all GPUs visible
deploy:
resources:
reservations:
devices:
- capabilities: [gpu] # Ensure this service can access GPUs
runtime: nvidia # Use the NVIDIA runtime
90 changes: 72 additions & 18 deletions rslp/landsat_vessels/predict_pipeline.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
"""Landsat vessel prediction pipeline."""

import json
import tempfile
import time
from datetime import datetime, timedelta

import numpy as np
Expand All @@ -14,6 +16,7 @@
from rslearn.dataset import Dataset, Window
from rslearn.utils import Projection, STGeometry
from rslearn.utils.get_utm_ups_crs import get_utm_ups_projection
from typing_extensions import TypedDict
from upath import UPath

from rslp.utils.rslearn import materialize_dataset, run_model_predict
Expand Down Expand Up @@ -54,6 +57,16 @@ def __init__(
self.crop_window_dir = crop_window_dir


class FormattedPrediction(TypedDict):
"""Formatted prediction for a single vessel detection."""

latitude: float
longitude: float
score: float
rgb_fname: str
b8_fname: str


def get_vessel_detections(
ds_path: UPath,
projection: Projection,
Expand Down Expand Up @@ -180,12 +193,12 @@ def run_classifier(


def predict_pipeline(
scratch_path: str,
json_path: str,
crop_path: str,
crop_path: str | None = None,
scratch_path: str | None = None,
json_path: str | None = None,
image_files: dict[str, str] | None = None,
scene_id: str | None = None,
) -> None:
) -> list[FormattedPrediction]:
"""Run the Landsat vessel prediction pipeline.

This inputs a Landsat scene (consisting of per-band GeoTIFFs) and produces the
Expand All @@ -201,6 +214,15 @@ def predict_pipeline(
scene_id: Landsat scene ID. Exactly one of image_files or scene_id should be
specified.
"""
start_time = time.time() # Start the timer
time_profile = {}

if scratch_path is None:
tmp_dir = tempfile.TemporaryDirectory()
scratch_path = tmp_dir.name
else:
tmp_dir = None

ds_path = UPath(scratch_path)
ds_path.mkdir(parents=True, exist_ok=True)

Expand Down Expand Up @@ -259,18 +281,29 @@ def predict_pipeline(
dst_geom.time_range[1] + timedelta(minutes=30),
)

time_profile["setup"] = time.time() - start_time

# Run pipeline.
step_start_time = time.time()
print("run detector")
detections = get_vessel_detections(
ds_path,
projection,
scene_bounds, # type: ignore
time_range=time_range,
)
time_profile["get_vessel_detections"] = time.time() - step_start_time

step_start_time = time.time()
print("run classifier")
detections = run_classifier(ds_path, detections, time_range=time_range)
time_profile["run_classifier"] = time.time() - step_start_time

# Write JSON and crops.
json_upath = UPath(json_path)
crop_upath = UPath(crop_path)
step_start_time = time.time()
if crop_path:
crop_upath = UPath(crop_path)
crop_upath.mkdir(parents=True, exist_ok=True)

json_data = []
for idx, detection in enumerate(detections):
Expand Down Expand Up @@ -304,13 +337,17 @@ def predict_pipeline(
[images["B4_sharp"], images["B3_sharp"], images["B2_sharp"]], axis=2
)

rgb_fname = crop_upath / f"{idx}_rgb.png"
with rgb_fname.open("wb") as f:
Image.fromarray(rgb).save(f, format="PNG")
if crop_path:
rgb_fname = crop_upath / f"{idx}_rgb.png"
with rgb_fname.open("wb") as f:
Image.fromarray(rgb).save(f, format="PNG")

b8_fname = crop_upath / f"{idx}_b8.png"
with b8_fname.open("wb") as f:
Image.fromarray(images["B8"]).save(f, format="PNG")
b8_fname = crop_upath / f"{idx}_b8.png"
with b8_fname.open("wb") as f:
Image.fromarray(images["B8"]).save(f, format="PNG")
else:
rgb_fname = ""
b8_fname = ""

# Get longitude/latitude.
src_geom = STGeometry(
Expand All @@ -321,14 +358,31 @@ def predict_pipeline(
lat = dst_geom.shp.y

json_data.append(
dict(
FormattedPrediction(
longitude=lon,
latitude=lat,
score=detection.score,
rgb_fname=str(rgb_fname),
b8_fname=str(b8_fname),
)
rgb_fname=rgb_fname,
b8_fname=b8_fname,
),
)

with json_upath.open("w") as f:
json.dump(json_data, f)
time_profile["write_json_and_crops"] = time.time() - step_start_time

elapsed_time = time.time() - start_time # Calculate elapsed time
time_profile["total"] = elapsed_time

# Clean up any temporary directories.
if tmp_dir:
tmp_dir.cleanup()

if json_path:
json_upath = UPath(json_path)
with json_upath.open("w") as f:
json.dump(json_data, f)

print(f"Prediction pipeline completed in {elapsed_time:.2f} seconds")
for step, duration in time_profile.items():
print(f"{step} took {duration:.2f} seconds")

return json_data
2 changes: 2 additions & 0 deletions rslp/utils/rslearn.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,14 @@ def materialize_dataset(
dataset,
workers=workers,
group=group,
use_initial_job=False,
)
apply_on_windows(
MaterializeHandler(),
dataset,
workers=workers,
group=group,
use_initial_job=False,
)


Expand Down
17 changes: 17 additions & 0 deletions tests/integration_slow/landsat_vessels/test_fastapi.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from fastapi.testclient import TestClient

from rslp.landsat_vessels.api_main import app

client = TestClient(app)


def test_singapore_dense_scene() -> None:
# LC08_L1TP_125059_20240913_20240920_02_T1 is a scene that includes southeast coast
# of Singapore where there are hundreds of vessels.
response = client.post(
"/detections", json={"scene_id": "LC08_L1TP_125059_20240913_20240920_02_T1"}
)
assert response.status_code == 200
predictions = response.json()["predictions"]
# There are many correct vessels in this scene.
assert len(predictions) >= 100
Loading