Skip to content

Commit

Permalink
PR #596 finetune tests for cancel feature of job manager
Browse files Browse the repository at this point in the history
Call public `run_jobs` instead of internal `_track_statuses`.
Use a fake backend that interacts with standard requests, instead of mocking internals.
Parameterize `cancel_after_seconds` instead of separate test functions
  • Loading branch information
soxofaan committed Sep 3, 2024
1 parent 712a6fa commit 3ee39de
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 72 deletions.
1 change: 1 addition & 0 deletions openeo/extra/job_management.py
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,7 @@ def run_jobs(

while (
df[
# TODO: risk on infinite loop if a backend reports a (non-standard) terminal status that is not covered here
(df.status != "finished")
& (df.status != "skipped")
& (df.status != "start_failed")
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
"matplotlib",
"geopandas",
"flake8>=5.0.0",
"time_machine",
"time_machine>=2.13.0",
"pyproj>=3.2.0", # Pyproj is an optional, best-effort runtime dependency
"dirty_equals>=0.6.0",
# (#578) On Python 3.7: avoid dirty_equals 0.7.1 which wrongly claims to be Python 3.7 compatible
Expand Down
153 changes: 82 additions & 71 deletions tests/extra/test_job_management.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
import datetime
import json
import re
import textwrap
import threading
import time
from typing import Callable, Union
from unittest import mock

import dirty_equals
import geopandas

# TODO: can we avoid using httpretty?
Expand All @@ -22,6 +25,7 @@
import time_machine

import openeo
import openeo.extra.job_management
from openeo import BatchJob
from openeo.extra.job_management import (
MAX_RETRIES,
Expand All @@ -33,8 +37,47 @@
from openeo.util import rfc3339


class FakeBackend:
"""
Fake openEO backend with some basic job management functionality for testing job manager logic.
"""
def __init__(self, *, backend_root_url: str = "http://openeo.test", requests_mock):
self.url = backend_root_url.rstrip("/")
requests_mock.get(f"{self.url}/", json={"api_version": "1.1.0"})
self.job_db = {}
self.get_job_metadata_mock = requests_mock.get(
re.compile(rf"^{self.url}/jobs/[\w_-]*$"),
json=self._handle_get_job_metadata,
)
self.cancel_job_mock = requests_mock.delete(
re.compile(rf"^{self.url}/jobs/[\w_-]*/results$"),
json=self._handle_cancel_job,
)
requests_mock.get(re.compile(rf"^{self.url}/jobs/[\w_-]*/results"), json={"links": []})

def set_job_status(self, job_id: str, status: Union[str, Callable[[], str]]):
self.job_db.setdefault(job_id, {})["status"] = status

def get_job_status(self, job_id: str):
status = self.job_db[job_id]["status"]
if callable(status):
status = status()
return status

def _handle_get_job_metadata(self, request, context):
job_id = request.path.split("/")[-1]
return {"id": job_id, "status": self.get_job_status(job_id)}

def _handle_cancel_job(self, request, context):
job_id = request.path.split("/")[-2]
assert self.get_job_status(job_id) == "running"
self.set_job_status(job_id, "canceled")
context.status_code = 204


class TestMultiBackendJobManager:


@pytest.fixture
def sleep_mock(self):
with mock.patch("time.sleep") as sleep:
Expand Down Expand Up @@ -448,81 +491,49 @@ def start_job(row, connection_provider, connection, **kwargs):
assert set(result.status) == {"running"}
assert set(result.backend_name) == {"foo"}

def test_cancel_prolonged_job_exceeds_duration(self):
# Set up a sample DataFrame with job data
df = pd.DataFrame(
{
"id": ["job_1"],
"backend_name": ["foo"],
"status": ["running"],
"running_start_time": ["2020-01-01T00:00:00Z"],
}
)

# Initialize the manager with the cancel_running_job_after parameter
cancel_after_seconds = 12 * 60 * 60 # 12 hours
manager = MultiBackendJobManager(cancel_running_job_after=cancel_after_seconds)

# Mock the connection and job retrieval
mock_connection = mock.MagicMock()
mock_job = mock.MagicMock()
mock_job.describe.return_value = {"status": "running"}
manager._get_connection = mock.MagicMock(return_value=mock_connection)
mock_connection.job.return_value = mock_job

# Set up the running start time and future time
job_running_timestamp = datetime.datetime.strptime(df.loc[0, "running_start_time"], "%Y-%m-%dT%H:%M:%SZ")
future_time = (
job_running_timestamp + datetime.timedelta(seconds=cancel_after_seconds) + datetime.timedelta(seconds=1)
)

# Replace _cancel_prolonged_job with a mock to track its calls
manager._cancel_prolonged_job = mock.MagicMock()

# Travel to the future where the job has exceeded its allowed running time
with time_machine.travel(future_time, tick=False):
manager._track_statuses(df)

# Verify that the _cancel_prolonged_job method was called with the correct job and row
manager._cancel_prolonged_job.assert_called_once

def test_cancel_prolonged_job_within_duration(self):
# Set up a sample DataFrame with job data
df = pd.DataFrame(
{
"id": ["job_1"],
"backend_name": ["foo"],
"status": ["running"],
"running_start_time": ["2020-01-01T00:00:00Z"],
}
)

# Initialize the manager with the cancel_running_job_after parameter
cancel_after_seconds = 12 * 60 * 60 # 12 hours
manager = MultiBackendJobManager(cancel_running_job_after=cancel_after_seconds)

# Mock the connection and job retrieval
mock_connection = mock.MagicMock()
mock_job = mock.MagicMock()
mock_job.describe.return_value = {"status": "running"}
manager._get_connection = mock.MagicMock(return_value=mock_connection)
mock_connection.job.return_value = mock_job

# Set up the running start time and future time
job_running_timestamp = datetime.datetime.strptime(df.loc[0, "running_start_time"], "%Y-%m-%dT%H:%M:%SZ")
future_time = (
job_running_timestamp + datetime.timedelta(seconds=cancel_after_seconds) - datetime.timedelta(seconds=1)
@pytest.mark.parametrize(
["start_time", "end_time", "end_status", "cancel_after_seconds", "expected_status"],
[
("2024-09-01T10:00:00Z", "2024-09-01T20:00:00Z", "finished", 6 * 60 * 60, "canceled"),
("2024-09-01T10:00:00Z", "2024-09-01T20:00:00Z", "finished", 12 * 60 * 60, "finished"),
],
)
def test_automatic_cancel_of_too_long_running_jobs(
self,
requests_mock,
tmp_path,
time_machine,
start_time,
end_time,
end_status,
cancel_after_seconds,
expected_status,
):
fake_backend = FakeBackend(requests_mock=requests_mock)

# For simplicity, set up pre-existing job with status "running" (instead of job manager creating+starting it)
job_id = "job-123"
fake_backend.set_job_status(job_id, lambda: "running" if rfc3339.utcnow() < end_time else end_status)

manager = MultiBackendJobManager(root_dir=tmp_path, cancel_running_job_after=cancel_after_seconds)
manager.add_backend("foo", connection=openeo.connect(fake_backend.url))

# Initialize data frame with status "created" (to make sure the start of "running" state is recorded)
df = pd.DataFrame({"id": [job_id], "backend_name": ["foo"], "status": ["created"]})

time_machine.move_to(start_time)
# Mock sleep() to not actually sleep, but skip one hour at a time
with mock.patch.object(openeo.extra.job_management.time, "sleep", new=lambda s: time_machine.shift(60 * 60)):
manager.run_jobs(df=df, start_job=lambda **kwargs: None, job_db=tmp_path / "jobs.csv")

final_df = CsvJobDatabase(tmp_path / "jobs.csv").read()
assert final_df.iloc[0].to_dict() == dirty_equals.IsPartialDict(
id="job-123", status=expected_status, running_start_time="2024-09-01T10:00:00Z"
)

# Replace _cancel_prolonged_job with a mock to track its calls
manager._cancel_prolonged_job = mock.MagicMock()
assert fake_backend.cancel_job_mock.called == (expected_status == "canceled")

# Travel to the future where the job has exceeded its allowed running time
with time_machine.travel(future_time, tick=False):
manager._track_statuses(df)

# Verify that the _cancel_prolonged_job method was called with the correct job and row
manager._cancel_prolonged_job.assert_not_called


JOB_DB_DF_BASICS = pd.DataFrame(
Expand Down

0 comments on commit 3ee39de

Please sign in to comment.