diff --git a/backend/src/endpoints/jobs.py b/backend/src/endpoints/jobs.py index 558ba25..2c458ee 100644 --- a/backend/src/endpoints/jobs.py +++ b/backend/src/endpoints/jobs.py @@ -7,6 +7,7 @@ from fastapi import APIRouter, HTTPException, Query from schemas.job import ( Job, + JobCreate, JobsInfo, JobSortBy, JobSortOrder, @@ -231,3 +232,31 @@ async def get_hourly_statistics() -> list[JobsTimeStatistics]: jobs = await job_service.get_all_jobs(settings.max_jobs) return job_service.generate_statistics(jobs) + + +@router.post( + "", + summary="Create job", + response_model=Job, + responses={ + 201: { + "model": Job, + "description": "Job successfully created.", + }, + 422: {"description": "Data validation error.", "model": ProblemDetail}, + 500: {"description": "Internal server error.", "model": ProblemDetail}, + }, +) +async def create_job(new_job: JobCreate) -> Job: + """Create job.""" + job_service = JobService( + get_redis_settings(), + get_lru_cache(), + ) + job = await job_service.create_job(new_job) + if not job: + raise HTTPException( + status_code=500, + detail="Failed to create a job.", + ) + return job diff --git a/backend/src/schemas/job.py b/backend/src/schemas/job.py index ca3ce18..f461de7 100644 --- a/backend/src/schemas/job.py +++ b/backend/src/schemas/job.py @@ -1,5 +1,6 @@ -from datetime import datetime +from datetime import datetime, timedelta from enum import Enum +from typing import Any from zoneinfo import ZoneInfo from core.config import Settings, get_app_settings @@ -127,7 +128,7 @@ class Job(BaseModel): examples=["download_content"], ) - args: list[str] | None = Field( + args: list[Any] | None = Field( default=None, description="Arguments passed to the function that was executed by the job", examples=[["https://florm.io"]], @@ -153,6 +154,37 @@ class Config: } +class JobCreate(BaseModel): + """Represents a job creation request.""" + + function: str = Field(..., description="Name of the function to execute") + args: list[Any] = Field( + default_factory=list, + description="Positional arguments of the function", + ) + job_id: str | None = Field(None, description="Job identifier") + queue_name: str | None = Field(None, description="Name of the queue") + defer_until: datetime | None = Field( + None, + description="Postpone execution until the specified date/time", + ) + expires: timedelta | None = Field( + None, + description="Set the expiration time for the job", + ) + kwargs: dict[str, Any] = Field( + default_factory=dict, + description="Named arguments of the function", + ) + + class Config: + """Pydantic model configuration.""" + + json_encoders = { + datetime: lambda v: v.astimezone(ZoneInfo(settings.timezone)).isoformat(), + } + + class Statistics(BaseModel): """Represents statistics for jobs.""" diff --git a/backend/src/services/job_service.py b/backend/src/services/job_service.py index 3be9c84..b076074 100644 --- a/backend/src/services/job_service.py +++ b/backend/src/services/job_service.py @@ -11,7 +11,7 @@ from arq.jobs import Job as ArqJob from core.cache import LRUCache from core.config import Settings, get_app_settings -from schemas.job import ColorStatistics, Job, JobStatus, JobsTimeStatistics +from schemas.job import ColorStatistics, Job, JobCreate, JobStatus, JobsTimeStatistics settings: Settings = get_app_settings() @@ -142,14 +142,14 @@ async def abort_job(self, job_id: str) -> bool: def adjust_color_intensity(self, color_intensity: float) -> float: """Adjust color intensity.""" - if color_intensity < 0.4: + if color_intensity < 0.4: # noqa: PLR2004 return 0.3 - elif color_intensity < 0.6: + if color_intensity < 0.6: # noqa: PLR2004 return 0.5 - elif color_intensity < 0.8: + if color_intensity < 0.8: # noqa: PLR2004 return 0.7 - else: - return 1.0 + + return 1.0 def generate_statistics(self, jobs_list: list[Job]) -> list[JobsTimeStatistics]: """Generate statistics for jobs.""" @@ -203,3 +203,18 @@ def generate_statistics(self, jobs_list: list[Job]) -> list[JobsTimeStatistics]: stat.color = ColorStatistics.orange return statistics + + async def create_job(self, new_job: JobCreate) -> Job | None: + """Create a new job.""" + redis = await create_pool(self.redis_settings) + await redis.enqueue_job( + new_job.function, + *new_job.args, + _job_id=new_job.job_id, + _queue_name=new_job.queue_name, + _defer_until=new_job.defer_until, + _expires=new_job.expires, + **new_job.kwargs, + ) + + return None diff --git a/frontend/src/components/tablejobs.tsx b/frontend/src/components/tablejobs.tsx index 59148fd..2746a7b 100644 --- a/frontend/src/components/tablejobs.tsx +++ b/frontend/src/components/tablejobs.tsx @@ -212,27 +212,51 @@ export const TableJobs = observer(() => { )} args: - {job.args} + {(() => { + try { + return JSON.stringify( + typeof job.args === "string" + ? JSON.parse(job.args) + : job.args, + null, + 2, + ); + } catch (e) { + return job.args.toString(); + } + })()}
kwargs: {(() => { try { - return JSON.stringify(JSON.parse(job.kwargs), null, 2); + return JSON.stringify( + typeof job.kwargs === "string" + ? JSON.parse(job.kwargs) + : job.kwargs, + null, + 2, + ); } catch (e) { - return job.kwargs; + return job.kwargs.toString(); } })()}
result: - + {(() => { try { - return JSON.stringify(JSON.parse(job.result), null, 2); + return JSON.stringify( + typeof job.result === "string" + ? JSON.parse(job.result) + : job.result, + null, + 2, + ); } catch (e) { - return job.result; + return job.result.toString(); } })()}