Skip to content

Commit

Permalink
Use API to get specifications (#343)
Browse files Browse the repository at this point in the history
* Initial commit for specifications

* Formatting and linting commit

* Add specification tests

* Formatting and linting commit

* Give the e2e tests a first go over

* Fix MARI rules tests

* Fix Specifications _rule_old

* Update tests.yml

* Adjust e2e to stop requests being made to the FIA-API

* Adjust e2e to stop requests being made to the FIA-API

* Further adjust test files

* Create a fake api for e2e tests to use instead of using the real one

* Fix ruff linting error

* Fix mypy errors

* Ensure that order does not matter as rules that should be last self declare

* Formatting and linting commit

* Fix pytest error

* Review comments/mypy fixes

* Quick comment addition

* Fix some errors with linting and long comments

* Formatting and linting commit

---------

Co-authored-by: github-actions <[email protected]>
  • Loading branch information
Pasarus and github-actions authored Dec 12, 2024
1 parent eb19b19 commit 385c615
Show file tree
Hide file tree
Showing 65 changed files with 321 additions and 150 deletions.
5 changes: 5 additions & 0 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,21 +39,26 @@ jobs:

e2e:
runs-on: ubuntu-latest

steps:
- name: Checkout project
uses: actions/checkout@692973e3d937129bcbf40652eb9f2f61becf3332 # v4.1.7

- name: Set up python
uses: actions/setup-python@39cd14951b08e74b54015e9e001cdefcf80e669f # v5.1.1
with:
python-version: '3.12'

- name: Install dependencies
run: |
python -m pip install --upgrade pip
python -m pip install .[test]
- name: Start e2e docker compose environment
run: |
cd test
docker compose up -d
- name: Run e2e test
run: pytest -l -v --random-order --random-order-bucket=global test/test_e2e.py

Expand Down
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -130,3 +130,5 @@ dmypy.json

# Pyre type checker
.pyre/

fia-api/
4 changes: 3 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ test = [
"pytest==8.2.2",
"pytest-cov==5.0.0",
"requests==2.32.3",
"pytest-random-order==1.1.1"
"pytest-random-order==1.1.1",
"fastapi[all]==0.111.1",
"pydantic==2.8.2",
]

dev = [
Expand Down
5 changes: 4 additions & 1 deletion rundetection/rules/common_rules.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ class MolSpecStitchRule(Rule[bool]):
Enables Tosca, Osiris, and Iris Run stitching
"""

def __init__(self, value: bool) -> None:
super().__init__(value)
self.should_be_last = True

@staticmethod
def _is_title_similar(title: str, other_title: str) -> bool:
"""
Expand Down Expand Up @@ -89,7 +93,6 @@ def _get_runs_to_stitch(self, run_path: Path, run_number: int, run_title: str, i
else:
next_run_number = f"{instrument.upper()}{run_number:08d}.nxs"
run_path = Path(run_path.parent, next_run_number)
logger.info("Run path %s does not exist", run_path)
logger.info("Returning run numbers %s", run_numbers)
return run_numbers

Expand Down
4 changes: 4 additions & 0 deletions rundetection/rules/inter_rules.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ class InterStitchRule(Rule[bool]):
Rule for collecting each related inter run and including them into the additional values
"""

def __init__(self, value: bool) -> None:
super().__init__(value)
self.should_be_last = True

@staticmethod
def _get_run_group(job_request: JobRequest) -> str:
"""
Expand Down
24 changes: 4 additions & 20 deletions rundetection/rules/mari_rules.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,9 @@
Mari Rules
"""

import json
import logging
from copy import deepcopy
from pathlib import Path
from typing import Any

from rundetection.ingestion.ingest import get_run_title
from rundetection.job_requests import JobRequest
Expand All @@ -17,12 +15,12 @@

class MariStitchRule(Rule[bool]):
"""
The MariStitchRule is the rule that applies
The MariStitchRule is the rule that applies, dependent on the other rules running first. This runs last.
"""

def __init__(self, value: bool) -> None:
super().__init__(value)
self._spec_values = self._load_mari_spec()
self.should_be_last = True

@staticmethod
def _get_runs_to_stitch(run_path: Path, run_number: int, run_title: str) -> list[int]:
Expand All @@ -35,20 +33,6 @@ def _get_runs_to_stitch(run_path: Path, run_number: int, run_title: str) -> list
run_path = Path(run_path.parent, f"MAR{run_number}.nxs")
return run_numbers

@staticmethod
def _load_mari_spec() -> Any:
"""
Load the entire mari specification into a dictionary
:return: Mari spec as dict
"""
try:
path = Path("rundetection/specifications/mari_specification.json")
with path.open(encoding="utf-8") as spec_file:
return json.load(spec_file)
except FileNotFoundError as exc:
logger.warning("Mari Specification could not be reloaded")
raise RuntimeError("Mari specification is no longer available") from exc

def verify(self, job_request: JobRequest) -> None:
if not self._value: # if the stitch rule is set to false, skip
return
Expand All @@ -62,8 +46,8 @@ def verify(self, job_request: JobRequest) -> None:
additional_request.additional_values["sum_runs"] = True
# We must reapply the common mari rules manually here, if we apply the whole spec automatically it will
# produce an infinite loop
additional_request.additional_values["mask_file_link"] = self._spec_values["marimaskfile"]
additional_request.additional_values["wbvan"] = self._spec_values["mariwbvan"]
additional_request.additional_values["mask_file_link"] = job_request.additional_values["mask_file_link"]
additional_request.additional_values["wbvan"] = job_request.additional_values["wbvan"]
job_request.additional_requests.append(additional_request)


Expand Down
6 changes: 5 additions & 1 deletion rundetection/rules/rule.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"""

from abc import ABC, abstractmethod
from typing import Generic, TypeVar
from typing import Any, Generic, TypeVar

from rundetection.job_requests import JobRequest

Expand All @@ -15,8 +15,12 @@ class Rule(Generic[T], ABC):
Abstract Rule, implement to define a rule that must be followed to allow a reduction to be run on a nexus file
"""

def __eq__(self, other: Any) -> bool:
return isinstance(other, type(self)) and self._value == other._value

def __init__(self, value: T):
self._value: T = value
self.should_be_last = False

@abstractmethod
def verify(self, job_request: JobRequest) -> None:
Expand Down
60 changes: 46 additions & 14 deletions rundetection/specifications.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@
Contains the InstrumentSpecification class, the abstract Rule Class and Rule Implementations
"""

import json
import datetime
import logging
import os
import typing
from pathlib import Path

import requests

from rundetection.exceptions import RuleViolationError
from rundetection.job_requests import JobRequest
Expand All @@ -18,6 +20,9 @@

logger = logging.getLogger(__name__)

FIA_API_URL = os.getenv("FIA_API_URL", "http://localhost:8000")
SPEC_REQUEST_TIMEOUT_MINS = 10


class InstrumentSpecification:
"""
Expand All @@ -26,20 +31,40 @@ class InstrumentSpecification:
"""

def __init__(self, instrument: str) -> None:
logger.info("Loading instrument specification for: %s", instrument)
self._instrument = instrument
self._rules: list[Rule[Any]] = []
self._load_rules()

def _load_rules(self) -> None:
try:
path = Path(f"rundetection/specifications/{self._instrument.lower()}_specification.json")
with path.open(encoding="utf-8") as spec_file:
spec: dict[str, Any] = json.load(spec_file)
self._rules = [rule_factory(key, value) for key, value in spec.items()]
except FileNotFoundError:
logger.error("No specification for file: %s", self._instrument)
raise
self.loaded_time: datetime.datetime | None = None
self._load_rules_from_api()

def _load_rules_from_api(self) -> None:
logger.info("Requesting specification from API for %s", self._instrument)
fia_api_api_key = os.environ["FIA_API_API_KEY"]
headers: dict[str, Any] = {"Authorization": f"Bearer {fia_api_api_key}", "accept": "application/json"}
response = requests.get(
url=f"{FIA_API_URL}/instrument/{self._instrument.upper()}/specification", headers=headers, timeout=1
)
response.raise_for_status()
spec: dict[str, Any] = response.json()
logger.info("Response from API for spec is: \n%s", spec)
self._rules = [rule_factory(key, value) for key, value in spec.items()]
self._order_rules()
self.loaded_time = datetime.datetime.now(tz=datetime.UTC)
logger.info("Loaded instrument specification for: %s at: %s", self._instrument, self.loaded_time)

def _order_rules(self) -> None:
"""
Sometimes we need to ensure some rules end up at the end of the list, notably those with stitch in the name
"""
for rule in self._rules:
# We need to ensure rules that do a stitch, or any that added extra jobs, need to come last.
if rule.should_be_last:
self._rules.remove(rule)
self._rules.append(rule)

def _rule_old(self) -> bool:
return self.loaded_time is None or datetime.timedelta(minutes=SPEC_REQUEST_TIMEOUT_MINS) < (
datetime.datetime.now(tz=datetime.UTC) - self.loaded_time
)

def verify(self, job_request: JobRequest) -> None:
"""
Expand All @@ -48,6 +73,13 @@ def verify(self, job_request: JobRequest) -> None:
:param job_request: A JobRequest
:return: whether the specification is met
"""
if self._rule_old():
logger.info(
"Rule for instrument %s is older than %s minutes, reloading rule from API",
self._instrument,
SPEC_REQUEST_TIMEOUT_MINS,
)
self._load_rules_from_api()
if len(self._rules) == 0:
job_request.will_reduce = False
for rule in self._rules:
Expand Down
19 changes: 19 additions & 0 deletions test/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,35 @@ services:
RABBITMQ_USER: guest
RABBITMQ_PASSWORD: guest

fake-fia-api:
build:
context: e2e_components/
dockerfile: Dockerfile
healthcheck:
test: curl --fail http://localhost:80/healthz || exit 1
interval: 1s
timeout: 1s
retries: 3
start_period: 1s
volumes:
- ../test/test_data/specifications:/data

run-detection:
build:
context: ../
dockerfile: container/Dockerfile
depends_on:
rabbit-mq:
condition: service_healthy
fake-fia-api:
condition: service_healthy
environment:
QUEUE_HOST: "rabbit-mq"
QUEUE_USER: "guest"
QUEUE_PASSWORD: "guest"
INGRESS_QUEUE_NAME: "watched-files"
EGRESS_QUEUE_NAME: "scheduled-jobs"
FIA_API_API_KEY: "shh"
FIA_API_URL: http://fake-fia-api:80
volumes:
- ../test/test_data/e2e_data:/archive
12 changes: 12 additions & 0 deletions test/e2e_components/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
FROM python:3.12

WORKDIR /fia_api

COPY . /fia_api

RUN apt-get update \
&& apt-get -y install libpq-dev gcc \
&& python -m pip install --upgrade pip \
&& python -m pip install --no-cache-dir .

CMD ["uvicorn", "fake_fia_api.fia_api:app", "--host", "0.0.0.0", "--port", "80"]
Empty file.
14 changes: 14 additions & 0 deletions test/e2e_components/fake_fia_api/fia_api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
"""
Fake API app to be used for FastAPI
"""

from fastapi import FastAPI

from fake_fia_api.router import ROUTER

app = FastAPI()

# This must be updated before exposing outside the vpn
ALLOWED_ORIGINS = ["*"]

app.include_router(ROUTER)
32 changes: 32 additions & 0 deletions test/e2e_components/fake_fia_api/router.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import json
from pathlib import Path
from typing import Any, Literal

from fastapi import APIRouter

ROUTER = APIRouter()


def get_specification_from_file(instrument: str) -> Any:
"""
Given an instrument, return the specification
:param instrument: The instrument for which specification to get
:return: The specification file contents
"""
path = Path(f"/data/{instrument.lower()}_specification.json")
with path.open(encoding="utf-8") as fle:
return json.load(fle)


@ROUTER.get("/instrument/{instrument_name}/specification", response_model=None)
async def get_instrument_specification(instrument_name: str) -> Any:
"""
This is a fake API for the e2e tests to be deployed to provide specifications for rundetection e2e tests
"""
return get_specification_from_file(instrument_name)


@ROUTER.get("/healthz")
async def get() -> Literal["ok"]:
"""Health Check endpoint."""
return "ok"
Loading

0 comments on commit 385c615

Please sign in to comment.