Skip to content

Commit

Permalink
The initial functionality of creating a job
Browse files Browse the repository at this point in the history
  • Loading branch information
antonko committed Mar 29, 2024
1 parent 4ec40e0 commit 630a1b2
Show file tree
Hide file tree
Showing 4 changed files with 114 additions and 14 deletions.
29 changes: 29 additions & 0 deletions backend/src/endpoints/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from fastapi import APIRouter, HTTPException, Query
from schemas.job import (
Job,
JobCreate,
JobsInfo,
JobSortBy,
JobSortOrder,
Expand Down Expand Up @@ -231,3 +232,31 @@ async def get_hourly_statistics() -> list[JobsTimeStatistics]:
jobs = await job_service.get_all_jobs(settings.max_jobs)

return job_service.generate_statistics(jobs)


@router.post(
"",
summary="Create job",
response_model=Job,
responses={
201: {
"model": Job,
"description": "Job successfully created.",
},
422: {"description": "Data validation error.", "model": ProblemDetail},
500: {"description": "Internal server error.", "model": ProblemDetail},
},
)
async def create_job(new_job: JobCreate) -> Job:
"""Create job."""
job_service = JobService(
get_redis_settings(),
get_lru_cache(),
)
job = await job_service.create_job(new_job)
if not job:
raise HTTPException(
status_code=500,
detail="Failed to create a job.",
)
return job
36 changes: 34 additions & 2 deletions backend/src/schemas/job.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from datetime import datetime
from datetime import datetime, timedelta
from enum import Enum
from typing import Any
from zoneinfo import ZoneInfo

from core.config import Settings, get_app_settings
Expand Down Expand Up @@ -127,7 +128,7 @@ class Job(BaseModel):
examples=["download_content"],
)

args: list[str] | None = Field(
args: list[Any] | None = Field(
default=None,
description="Arguments passed to the function that was executed by the job",
examples=[["https://florm.io"]],
Expand All @@ -153,6 +154,37 @@ class Config:
}


class JobCreate(BaseModel):
"""Represents a job creation request."""

function: str = Field(..., description="Name of the function to execute")
args: list[Any] = Field(
default_factory=list,
description="Positional arguments of the function",
)
job_id: str | None = Field(None, description="Job identifier")
queue_name: str | None = Field(None, description="Name of the queue")
defer_until: datetime | None = Field(
None,
description="Postpone execution until the specified date/time",
)
expires: timedelta | None = Field(
None,
description="Set the expiration time for the job",
)
kwargs: dict[str, Any] = Field(
default_factory=dict,
description="Named arguments of the function",
)

class Config:
"""Pydantic model configuration."""

json_encoders = {
datetime: lambda v: v.astimezone(ZoneInfo(settings.timezone)).isoformat(),
}


class Statistics(BaseModel):
"""Represents statistics for jobs."""

Expand Down
27 changes: 21 additions & 6 deletions backend/src/services/job_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from arq.jobs import Job as ArqJob
from core.cache import LRUCache
from core.config import Settings, get_app_settings
from schemas.job import ColorStatistics, Job, JobStatus, JobsTimeStatistics
from schemas.job import ColorStatistics, Job, JobCreate, JobStatus, JobsTimeStatistics

settings: Settings = get_app_settings()

Expand Down Expand Up @@ -142,14 +142,14 @@ async def abort_job(self, job_id: str) -> bool:

def adjust_color_intensity(self, color_intensity: float) -> float:
"""Adjust color intensity."""
if color_intensity < 0.4:
if color_intensity < 0.4: # noqa: PLR2004
return 0.3
elif color_intensity < 0.6:
if color_intensity < 0.6: # noqa: PLR2004
return 0.5
elif color_intensity < 0.8:
if color_intensity < 0.8: # noqa: PLR2004
return 0.7
else:
return 1.0

return 1.0

def generate_statistics(self, jobs_list: list[Job]) -> list[JobsTimeStatistics]:
"""Generate statistics for jobs."""
Expand Down Expand Up @@ -203,3 +203,18 @@ def generate_statistics(self, jobs_list: list[Job]) -> list[JobsTimeStatistics]:
stat.color = ColorStatistics.orange

return statistics

async def create_job(self, new_job: JobCreate) -> Job | None:
"""Create a new job."""
redis = await create_pool(self.redis_settings)
await redis.enqueue_job(
new_job.function,
*new_job.args,
_job_id=new_job.job_id,
_queue_name=new_job.queue_name,
_defer_until=new_job.defer_until,
_expires=new_job.expires,
**new_job.kwargs,
)

return None
36 changes: 30 additions & 6 deletions frontend/src/components/tablejobs.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -212,27 +212,51 @@ export const TableJobs = observer(() => {
)}
args:
<Code block mt={10} mah={200}>
{job.args}
{(() => {
try {
return JSON.stringify(
typeof job.args === "string"
? JSON.parse(job.args)
: job.args,
null,
2,
);
} catch (e) {
return job.args.toString();
}
})()}
</Code>
<br />
kwargs:
<Code block mt={10} mah={200}>
{(() => {
try {
return JSON.stringify(JSON.parse(job.kwargs), null, 2);
return JSON.stringify(
typeof job.kwargs === "string"
? JSON.parse(job.kwargs)
: job.kwargs,
null,
2,
);
} catch (e) {
return job.kwargs;
return job.kwargs.toString();
}
})()}
</Code>
<br />
result:
<Code block mt={10} mah={500}>
<Code block mt={10} mah={200}>
{(() => {
try {
return JSON.stringify(JSON.parse(job.result), null, 2);
return JSON.stringify(
typeof job.result === "string"
? JSON.parse(job.result)
: job.result,
null,
2,
);
} catch (e) {
return job.result;
return job.result.toString();
}
})()}
</Code>
Expand Down

0 comments on commit 630a1b2

Please sign in to comment.