-
Notifications
You must be signed in to change notification settings - Fork 13
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
docs: document automatic celery tasks #246
base: main
Are you sure you want to change the base?
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -28,6 +28,16 @@ | |
|
||
|
||
def get_default_token(job): | ||
""" | ||
Retrieves or creates an authentication token for the first user | ||
associated with the given job's spider project. | ||
|
||
Args: | ||
job (SpiderJob): The job for which to get the user's token. | ||
|
||
Returns: | ||
str: The token key for the first user, or None if no user is found. | ||
""" | ||
user = job.spider.project.users.first() | ||
if not user: | ||
return None | ||
|
@@ -37,6 +47,11 @@ def get_default_token(job): | |
|
||
@celery_app.task | ||
def run_spider_jobs(): | ||
""" | ||
Task to run spider jobs that are in the IN_QUEUE_STATUS. | ||
Selects jobs from the queue and updates their status to WAITING_STATUS. | ||
Initializes job execution using the job manager. | ||
""" | ||
jobs = SpiderJob.objects.filter(status=SpiderJob.IN_QUEUE_STATUS)[ | ||
: settings.RUN_JOBS_PER_LOT | ||
] | ||
|
@@ -59,6 +74,18 @@ def run_spider_jobs(): | |
|
||
|
||
def delete_data(pid, sid, jid, data_type): | ||
""" | ||
Deletes specific data (items, requests, logs) associated with a job. | ||
|
||
Args: | ||
pid (str): Project ID. | ||
sid (str): Spider ID. | ||
jid (str): Job ID. | ||
data_type (str): The type of data to delete ('items', 'requests', 'logs'). | ||
|
||
Returns: | ||
bool: False if connection to spiderdata_db_client cannot be established. | ||
""" | ||
if not spiderdata_db_client.get_connection(): | ||
return False | ||
job = SpiderJob.objects.get(jid=jid) | ||
|
@@ -76,6 +103,16 @@ def delete_data(pid, sid, jid, data_type): | |
|
||
@celery_app.task(name="core.tasks.launch_job") | ||
def launch_job(sid_, data_, data_expiry_days=None, token=None): | ||
""" | ||
Task to launch a spider job with the provided data and optional token. | ||
Creates a job using SpiderJobCreateSerializer and passes the job to the job manager. | ||
|
||
Args: | ||
sid_ (int): Spider ID. | ||
data_ (dict): Job data to be serialized and saved. | ||
data_expiry_days (int, optional): Number of days before data expiry. | ||
token (str, optional): Authentication token. If not provided, a default token is used. | ||
""" | ||
Comment on lines
+106
to
+115
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In what scenarios would we want to provide the auth token as an argument? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @joaquingx I'm not sure about it. I don't see anywhere in the code where the code is passed manually to this function. @Adriana618 I remember you created this task, do you remember the purpose for adding the token argument? |
||
spider = Spider.objects.get(sid=sid_) | ||
|
||
if data_expiry_days is None: | ||
|
@@ -122,6 +159,11 @@ def launch_job(sid_, data_, data_expiry_days=None, token=None): | |
|
||
@celery_app.task(name="core.tasks.check_and_update_job_status_errors") | ||
def check_and_update_job_status_errors(): | ||
""" | ||
Task to check jobs with WAITING_STATUS and update their status to ERROR_STATUS | ||
if no active or succeeded status is detected. Also updates job stats from Redis | ||
and deletes Redis stats if applicable. | ||
""" | ||
jobs = SpiderJob.objects.filter(status=SpiderJob.WAITING_STATUS)[ | ||
: settings.CHECK_JOB_ERRORS_BATCH_SIZE | ||
] | ||
|
@@ -146,6 +188,20 @@ def check_and_update_job_status_errors(): | |
retry_kwargs={"max_retries": None, "countdown": 600}, | ||
) | ||
def record_project_usage_after_data_delete(project_id, job_id): | ||
""" | ||
Task to record the project usage statistics after deleting job data. | ||
Calculates the new usage records based on database size after data deletion. | ||
|
||
Args: | ||
project_id (int): The project ID. | ||
job_id (int): The job ID. | ||
|
||
Raises: | ||
TaskError: If the connection to spiderdata_db_client cannot be established. | ||
|
||
Returns: | ||
str: JSON string with job data size and usage details. | ||
""" | ||
joaquingx marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if not spiderdata_db_client.get_connection(): | ||
raise TaskError("Could not get a connection to the database.") | ||
|
||
|
@@ -186,6 +242,14 @@ def record_project_usage_after_data_delete(project_id, job_id): | |
|
||
@celery_app.task() | ||
def delete_job_data(job_key): | ||
""" | ||
Task to delete all job-related data including items, requests, and logs. | ||
Updates job data status to DELETED_STATUS after deletion. | ||
Calculates a new usage record after data deletion. | ||
|
||
Args: | ||
job_key (str): Unique identifier for the job, typically in the format "jid.sid.pid". | ||
""" | ||
jid, sid, pid = job_key.split(".") | ||
for data_type in ["items", "requests", "logs"]: | ||
delete_data(pid, sid, jid, data_type) | ||
|
@@ -195,6 +259,10 @@ def delete_job_data(job_key): | |
|
||
@celery_app.task(name="core.tasks.delete_expired_jobs_data") | ||
def delete_expired_jobs_data(): | ||
""" | ||
Task to delete job data for jobs that have passed their data expiry date. | ||
This checks jobs with a PENDING_STATUS and triggers deletion if expiry is met. | ||
""" | ||
pending_data_delete_jobs = SpiderJob.objects.filter( | ||
data_status=DataStatus.PENDING_STATUS, | ||
status__in=[SpiderJob.COMPLETED_STATUS, SpiderJob.STOPPED_STATUS], | ||
|
@@ -211,6 +279,19 @@ def delete_expired_jobs_data(): | |
retry_kwargs={"max_retries": None, "countdown": 600}, | ||
) | ||
def record_project_usage_after_job_event(job_id): | ||
""" | ||
Task to record the project usage statistics after a job event (e.g., job completion). | ||
Calculates new usage records based on the job data and updates the project usage records. | ||
|
||
Args: | ||
job_id (int): The job ID. | ||
|
||
Raises: | ||
TaskError: If the connection to spiderdata_db_client cannot be established. | ||
|
||
Returns: | ||
str: JSON string with job data size and usage details. | ||
""" | ||
if not spiderdata_db_client.get_connection(): | ||
raise TaskError("Could not get a connection to the database.") | ||
|
||
|
@@ -230,14 +311,17 @@ def record_project_usage_after_job_event(job_id): | |
str(project.pid), items_collection_name | ||
) | ||
unique_collection = False | ||
|
||
requests_collection_name = "{}-{}-job_requests".format(job.spider.sid, job.jid) | ||
requests_data_size = spiderdata_db_client.get_dataset_size( | ||
str(project.pid), requests_collection_name | ||
) | ||
|
||
logs_collection_name = "{}-{}-job_logs".format(job.spider.sid, job.jid) | ||
logs_data_size = spiderdata_db_client.get_dataset_size( | ||
str(project.pid), logs_collection_name | ||
) | ||
|
||
# Tracking Proxy Usage | ||
proxy_details = {} | ||
for proxy_name, proxy_usage_name in settings.PROXY_PROVIDERS_TO_TRACK: | ||
|
@@ -303,6 +387,16 @@ def record_project_usage_after_job_event(job_id): | |
retry_kwargs={"max_retries": None, "countdown": 60}, | ||
) | ||
def record_job_coverage_event(job_id): | ||
""" | ||
Task to record job field coverage statistics after a job event. | ||
Collects data on field coverage and updates the job's statistics document in the database. | ||
|
||
Args: | ||
job_id (int): The job ID. | ||
|
||
Raises: | ||
TaskError: If the connection to spiderdata_db_client cannot be established. | ||
""" | ||
if not spiderdata_db_client.get_connection(): | ||
raise TaskError("Could not get a connection to the database.") | ||
job = SpiderJob.objects.get(jid=job_id) | ||
|
@@ -339,6 +433,19 @@ def record_job_coverage_event(job_id): | |
|
||
|
||
def get_chain_to_process_usage_data(after_delete=False, project_id=None, job_id=None): | ||
joaquingx marked this conversation as resolved.
Show resolved
Hide resolved
|
||
""" | ||
Generates a Celery chain to process usage data after a job event or data deletion. | ||
This function dynamically imports tasks from external apps specified in settings and chains | ||
them together with either record_project_usage_after_data_delete or record_project_usage_after_job_event. | ||
|
||
Args: | ||
after_delete (bool): If True, processes usage data after a data deletion event. | ||
project_id (int, optional): Project ID for the data deletion event. | ||
job_id (int, optional): Job ID for the job event or data deletion. | ||
|
||
Returns: | ||
Celery chain: A chain of tasks to be executed sequentially. | ||
""" | ||
list_of_process_functions = [] | ||
for external_app in settings.DJANGO_EXTERNAL_APPS: | ||
module = __import__(f"{external_app}.tasks", fromlist=["tasks"]) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are we utilizing this task? It seems like we’re launching jobs immediately when they are created, as seen here: estela-api/api/views/job.py#L136-L146. For cron jobs, we’re using the
launch_job
task.If I recall correctly, this task was intended to throttle the number of jobs and prevent overloading the Kubernetes cluster. Could you confirm if this is the case? If so, can you provide more context or explain the reasons behind this task?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@joaquingx You are right. The task is not actively used since the request to launch jobs is usually sent without an async parameter. However, the logic is present here in case jobs are launched with the async parameter here:
estela/estela-api/api/views/job.py
Lines 147 to 153 in 5da817d
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you add to this documentation that it will be used for asyncs jobs launched please?