forked from ITISFoundation/osparc-simcore
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy path_long_running_tasks.py
74 lines (61 loc) · 1.88 KB
/
_long_running_tasks.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
# pylint: disable=redefined-outer-name
# pylint: disable=unused-argument
# pylint: disable=unused-variable
# pylint: disable=too-many-arguments
from typing import Annotated, Any
from fastapi import APIRouter, Depends, status
from models_library.generics import Envelope
from models_library.rest_error import EnvelopedError
from servicelib.aiohttp.long_running_tasks._routes import _PathParam
from servicelib.long_running_tasks._models import TaskGet, TaskStatus
from simcore_service_webserver._meta import API_VTAG
from simcore_service_webserver.tasks._exception_handlers import (
_TO_HTTP_ERROR_MAP as data_export_http_error_map,
)
router = APIRouter(
prefix=f"/{API_VTAG}",
tags=[
"long-running-tasks",
],
)
_data_export_responses: dict[int | str, dict[str, Any]] = {
i.status_code: {"model": EnvelopedError}
for i in data_export_http_error_map.values()
}
@router.get(
"/tasks",
response_model=Envelope[list[TaskGet]],
name="list_tasks",
description="Lists all long running tasks",
responses=_data_export_responses,
)
def get_async_jobs(): ...
@router.get(
"/tasks/{task_id}",
response_model=Envelope[TaskStatus],
name="get_task_status",
description="Retrieves the status of a task",
responses=_data_export_responses,
)
def get_async_job_status(
_path_params: Annotated[_PathParam, Depends()],
): ...
@router.delete(
"/tasks/{task_id}",
name="cancel_and_delete_task",
description="Cancels and deletes a task",
responses=_data_export_responses,
status_code=status.HTTP_204_NO_CONTENT,
)
def abort_async_job(
_path_params: Annotated[_PathParam, Depends()],
): ...
@router.get(
"/tasks/{task_id}/result",
name="get_task_result",
description="Retrieves the result of a task",
responses=_data_export_responses,
)
def get_async_job_result(
_path_params: Annotated[_PathParam, Depends()],
): ...