Skip to content

Commit

Permalink
Issue #604/#644 UDPJobFactory: improve geometry support
Browse files Browse the repository at this point in the history
  • Loading branch information
soxofaan committed Oct 11, 2024
1 parent 04296c1 commit f8db877
Show file tree
Hide file tree
Showing 6 changed files with 282 additions and 18 deletions.
65 changes: 52 additions & 13 deletions openeo/extra/job_management.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,16 @@
import pandas as pd
import requests
import shapely.errors
import shapely.geometry.base
import shapely.wkt
from requests.adapters import HTTPAdapter, Retry

from openeo import BatchJob, Connection
from openeo.internal.processes.parse import Process, parse_remote_process_definition
from openeo.internal.processes.parse import (
Parameter,
Process,
parse_remote_process_definition,
)
from openeo.rest import OpenEoApiError
from openeo.util import deep_get, repr_truncate, rfc3339

Expand Down Expand Up @@ -943,11 +948,17 @@ class UDPJobFactory:
"""

def __init__(
self, process_id: str, *, namespace: Union[str, None] = None, parameter_defaults: Optional[dict] = None
self,
process_id: str,
*,
namespace: Union[str, None] = None,
parameter_defaults: Optional[dict] = None,
parameter_column_map: Optional[dict] = None,
):
self._process_id = process_id
self._namespace = namespace
self._parameter_defaults = parameter_defaults or {}
self._parameter_column_map = parameter_column_map

def _get_process_definition(self, connection: Connection) -> Process:
if isinstance(self._namespace, str) and re.match("https?://", self._namespace):
Expand Down Expand Up @@ -979,33 +990,38 @@ def start_job(self, row: pd.Series, connection: Connection, **_) -> BatchJob:

process_definition = self._get_process_definition(connection=connection)
parameters = process_definition.parameters or []

if self._parameter_column_map is None:
self._parameter_column_map = self._guess_parameter_column_map(parameters=parameters, row=row)

arguments = {}
for parameter in parameters:
name = parameter.name
schema = parameter.schema
if name in row.index:
# Higherst priority: value from dataframe row
value = row[name]
elif name in self._parameter_defaults:
param_name = parameter.name
column_name = self._parameter_column_map.get(param_name, param_name)
if column_name in row.index:
# Get value from dataframe row
value = row.loc[column_name]
elif param_name in self._parameter_defaults:
# Fallback on default values from constructor
value = self._parameter_defaults[name]
value = self._parameter_defaults[param_name]
elif parameter.has_default():
# Explicitly use default value from parameter schema
value = parameter.default
elif parameter.optional:
# Skip optional parameters without any fallback default value
continue
else:
raise ValueError(f"Missing required parameter {name!r} for process {self._process_id!r}")
raise ValueError(f"Missing required parameter {param_name !r} for process {self._process_id!r}")

# TODO: validation or normalization based on schema?
# Some pandas/numpy data types need a bit of conversion for JSON encoding
# Prepare some values/dtypes for JSON encoding
if isinstance(value, numpy.integer):
value = int(value)
elif isinstance(value, numpy.number):
value = float(value)
elif isinstance(value, shapely.geometry.base.BaseGeometry):
value = shapely.geometry.mapping(value)

arguments[name] = value
arguments[param_name] = value

cube = connection.datacube_from_process(process_id=self._process_id, namespace=self._namespace, **arguments)

Expand All @@ -1020,3 +1036,26 @@ def start_job(self, row: pd.Series, connection: Connection, **_) -> BatchJob:
def __call__(self, *arg, **kwargs) -> BatchJob:
"""Syntactic sugar for calling `start_job` directly."""
return self.start_job(*arg, **kwargs)

@staticmethod
def _guess_parameter_column_map(parameters: List[Parameter], row: pd.Series) -> dict:
"""
Guess parameter-column mapping from given parameter list and dataframe row
"""
parameter_column_map = {}
# Geometry based mapping: try to automatically map geometry columns to geojson parameters
geojson_parameters = [p.name for p in parameters if p.schema.accepts_geojson()]
geometry_columns = [i for (i, v) in row.items() if isinstance(v, shapely.geometry.base.BaseGeometry)]
if geojson_parameters and geometry_columns:
if len(geojson_parameters) == 1 and len(geometry_columns) == 1:
# Most common case: one geometry parameter and one geometry column: can be mapped naively
parameter_column_map[geojson_parameters[0]] = geometry_columns[0]
elif all(p in geometry_columns for p in geojson_parameters):
# Each geometry param has geometry column with same name: easy to map
parameter_column_map.update((p, p) for p in geojson_parameters)
else:
raise RuntimeError(
f"Problem with mapping geometry columns ({geometry_columns}) to process parameters ({geojson_parameters})"
)
_log.debug(f"Guessed parameter-column map: {parameter_column_map}")
return parameter_column_map
12 changes: 12 additions & 0 deletions openeo/internal/processes/parse.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,18 @@ def is_process_graph(self) -> bool:
and self.schema.get("subtype") == "process-graph"
)

def accepts_geojson(self) -> bool:
"""Does this schema accept inline GeoJSON objects?"""

def is_geojson_schema(schema) -> bool:
return isinstance(schema, dict) and schema.get("type") == "object" and schema.get("subtype") == "geojson"

if isinstance(self.schema, dict):
return is_geojson_schema(self.schema)
elif isinstance(self.schema, list):
return any(is_geojson_schema(s) for s in self.schema)
return False


_NO_DEFAULT = object()

Expand Down
36 changes: 33 additions & 3 deletions openeo/rest/_testing.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import collections
import json
import re
from typing import Optional, Union
from typing import Callable, Iterator, Optional, Sequence, Union

from openeo import Connection, DataCube
from openeo.rest.vectorcube import VectorCube
Expand All @@ -25,6 +26,7 @@ class DummyBackend:
"validation_requests",
"next_result",
"next_validation_errors",
"job_status_updater",
)

# Default result (can serve both as JSON or binary data)
Expand All @@ -37,6 +39,13 @@ def __init__(self, requests_mock, connection: Connection):
self.validation_requests = []
self.next_result = self.DEFAULT_RESULT
self.next_validation_errors = []

# Job status update hook:
# callable that is called on starting a job, and getting job metadata
# allows to dynamically change how the status of a job evolves
# By default: immediately set to "finished" once job is started
self.job_status_updater = lambda job_id, current_status: "finished"

requests_mock.post(
connection.build_url("/result"),
content=self._handle_post_result,
Expand Down Expand Up @@ -90,13 +99,19 @@ def _handle_post_job_results(self, request, context):
"""Handler of `POST /job/{job_id}/results` (start batch job)."""
job_id = self._get_job_id(request)
assert self.batch_jobs[job_id]["status"] == "created"
# TODO: support custom status sequence (instead of directly going to status "finished")?
self.batch_jobs[job_id]["status"] = "finished"
self.batch_jobs[job_id]["status"] = self.job_status_updater(
job_id=job_id, current_status=self.batch_jobs[job_id]["status"]
)
context.status_code = 202

def _handle_get_job(self, request, context):
"""Handler of `GET /job/{job_id}` (get batch job status and metadata)."""
job_id = self._get_job_id(request)
# Allow updating status with `job_status_setter` once job got past status "created"
if self.batch_jobs[job_id]["status"] != "created":
self.batch_jobs[job_id]["status"] = self.job_status_updater(
job_id=job_id, current_status=self.batch_jobs[job_id]["status"]
)
return {"id": job_id, "status": self.batch_jobs[job_id]["status"]}

def _handle_get_job_results(self, request, context):
Expand Down Expand Up @@ -162,6 +177,21 @@ def execute(self, cube: Union[DataCube, VectorCube], process_id: Optional[str] =
cube.execute()
return self.get_pg(process_id=process_id)

def setup_simple_job_status_flow(self, *, queued: int = 1, running: int = 4, final: str = "finished"):
"""
Set up simple job status flow:
queued (a couple of times) -> running (a couple of times) -> finished/error.
"""
template = ["queued"] * queued + ["running"] * running + [final]
job_stacks = collections.defaultdict(template.copy)

def get_status(job_id: str, current_status: str) -> str:
stack = job_stacks[job_id]
# Pop first item each time, but repeat the last one at the end
return stack.pop(0) if len(stack) > 1 else stack[0]

self.job_status_updater = get_status


def build_capabilities(
*,
Expand Down
109 changes: 107 additions & 2 deletions tests/extra/test_job_management.py
Original file line number Diff line number Diff line change
Expand Up @@ -1011,7 +1011,9 @@ def test_create_job_db(tmp_path, filename, expected):
class TestUDPJobFactory:
@pytest.fixture
def dummy_backend(self, requests_mock, con120) -> DummyBackend:
return DummyBackend(requests_mock=requests_mock, connection=con120)
dummy = DummyBackend(requests_mock=requests_mock, connection=con120)
dummy.setup_simple_job_status_flow(queued=2, running=3, final="finished")
return dummy

@pytest.fixture(autouse=True)
def remote_process_definitions(self, requests_mock):
Expand Down Expand Up @@ -1043,6 +1045,31 @@ def remote_process_definitions(self, requests_mock):
},
},
)
requests_mock.get(
"https://remote.test/offset_poplygon.json",
json={
"id": "offset_poplygon",
"parameters": [
{"name": "data", "description": "data", "schema": {"type": "number"}},
{
"name": "polygons",
"description": "polygons",
"schema": {
"title": "GeoJSON",
"type": "object",
"subtype": "geojson",
},
},
{
"name": "offset",
"description": "Offset",
"schema": {"type": "number"},
"optional": True,
"default": 0,
},
],
},
)

def test_minimal(self, con120, dummy_backend):
"""Bare minimum: just start a job, no parameters/arguments"""
Expand Down Expand Up @@ -1124,7 +1151,7 @@ def test_basic_parameterization(self, con120, dummy_backend, parameter_defaults,
@pytest.fixture
def job_manager(self, tmp_path, dummy_backend) -> MultiBackendJobManager:
job_manager = MultiBackendJobManager(root_dir=tmp_path / "job_mgr_root")
job_manager.add_backend("dummy", connection=dummy_backend.connection)
job_manager.add_backend("dummy", connection=dummy_backend.connection, parallel_jobs=1)
return job_manager

def test_udp_job_manager_basic(self, tmp_path, requests_mock, dummy_backend, job_manager, sleep_mock):
Expand All @@ -1143,6 +1170,8 @@ def test_udp_job_manager_basic(self, tmp_path, requests_mock, dummy_backend, job
"sleep": dirty_equals.IsInt(gt=1),
"start_job call": 3,
"job start": 3,
"job started running": 3,
"job finished": 3,
}
)
assert set(job_db.read().status) == {"finished"}
Expand Down Expand Up @@ -1244,6 +1273,7 @@ def test_udp_job_manager_parameter_handling(
"sleep": dirty_equals.IsInt(gt=1),
"start_job call": 3,
"job start": 3,
"job finished": 3,
}
)
assert set(job_db.read().status) == {"finished"}
Expand Down Expand Up @@ -1286,3 +1316,78 @@ def test_udp_job_manager_parameter_handling(
"status": "finished",
},
}

def test_udp_job_manager_geometry(self, tmp_path, requests_mock, dummy_backend, job_manager, sleep_mock):
job_starter = UDPJobFactory(
process_id="offset_poplygon",
namespace="https://remote.test/offset_poplygon.json",
parameter_defaults={"data": 123},
)

df = geopandas.GeoDataFrame.from_features(
{
"type": "FeatureCollection",
"features": [
{
"type": "Feature",
"id": "one",
"properties": {"offset": 11},
"geometry": {"type": "Point", "coordinates": (1.0, 2.0)},
},
{
"type": "Feature",
"id": "two",
"properties": {"offset": 22},
"geometry": {"type": "Point", "coordinates": (3.0, 4.0)},
},
],
}
)

job_db = CsvJobDatabase(tmp_path / "jobs.csv").initialize_from_df(df)

stats = job_manager.run_jobs(job_db=job_db, start_job=job_starter)
assert stats == dirty_equals.IsPartialDict(
{
"sleep": dirty_equals.IsInt(gt=1),
"start_job call": 2,
"job start": 2,
"job finished": 2,
}
)
assert set(job_db.read().status) == {"finished"}

assert dummy_backend.batch_jobs == {
"job-000": {
"job_id": "job-000",
"pg": {
"offsetpoplygon1": {
"process_id": "offset_poplygon",
"namespace": "https://remote.test/offset_poplygon.json",
"arguments": {
"data": 123,
"polygons": {"type": "Point", "coordinates": [1.0, 2.0]},
"offset": 11,
},
"result": True,
}
},
"status": "finished",
},
"job-001": {
"job_id": "job-001",
"pg": {
"offsetpoplygon1": {
"process_id": "offset_poplygon",
"namespace": "https://remote.test/offset_poplygon.json",
"arguments": {
"data": 123,
"polygons": {"type": "Point", "coordinates": [3.0, 4.0]},
"offset": 22,
},
"result": True,
}
},
"status": "finished",
},
}
14 changes: 14 additions & 0 deletions tests/internal/processes/test_parse.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,20 @@ def test_schema_equality():
assert Schema({"type": "number"}) != Schema({"type": "string"})


@pytest.mark.parametrize(
["schema", "expected"],
[
({"type": "object", "subtype": "geojson"}, True),
({"type": "object"}, False),
({"subtype": "geojson"}, False),
({"type": "object", "subtype": "vectorzz"}, False),
],
)
def test_schema_accepts_geojson(schema, expected):
assert Schema(schema).accepts_geojson() == expected
assert Schema([{"type": "number"}, schema]).accepts_geojson() == expected


def test_parameter():
p = Parameter.from_dict({
"name": "foo",
Expand Down
Loading

0 comments on commit f8db877

Please sign in to comment.