Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

docs: document automatic celery tasks #246

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 18 additions & 1 deletion estela-api/config/celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,30 +3,47 @@
from celery import Celery
from django.conf import settings

# Set the default Django settings module for the 'celery' program.
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "config.settings.base")

# Initialize a new Celery application instance named "estela".
app = Celery("estela")

# Configure the Celery app to use Django settings with a specific namespace "CELERY".
# All Celery-related configurations are expected to be prefixed with "CELERY_" in Django settings.
app.config_from_object("django.conf:settings", namespace="CELERY")

# Autodiscover and load tasks from all installed Django apps.
# Celery will search for a `tasks.py` module in each Django app and register tasks found in them.
app.autodiscover_tasks()

# These tasks will be executed at regular intervals as specified by the "schedule" argument.
app.conf.beat_schedule = {
# Task to run spider that are IN_QUEUE jobs every 2 minutes.
"run-spider-jobs": {
"task": "core.tasks.run_spider_jobs",
"schedule": 120,
},
# Task to check and update job status errors every minute.
"check-and-update-job-status-errors": {
"task": "core.tasks.check_and_update_job_status_errors",
"schedule": 60,
},
# Task to delete expired job data every hour.
"delete-expired-jobs-data": {
"task": "core.tasks.delete_expired_jobs_data",
"schedule": 3600,
},
}


# Dynamically import and update the beat schedule with periodic tasks from external applications.
# External apps are specified in the Django settings under CELERY_EXTERNAL_IMPORTS.
# Each external app must define its own Celery configuration and beat schedule.
for import_name in settings.CELERY_EXTERNAL_IMPORTS:
# Import the Celery app configuration from the external application.
module = __import__(f"{import_name}.celery", fromlist=["celery"])
external_app = module.app

# Merge the external app's beat schedule with the current Celery app's beat schedule.
# This allows tasks from external apps to be included in the periodic task schedule.
app.conf.beat_schedule.update(external_app.conf.beat_schedule)
23 changes: 23 additions & 0 deletions estela-api/core/signals.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,38 @@

@receiver(post_save, sender=SpiderJob, dispatch_uid="update_usage")
def update_usage(sender, instance: SpiderJob, created, **kwargs):
"""
Signal receiver that triggers after a SpiderJob instance is saved/updated.
When the status of the SpiderJob is one of COMPLETED, STOPPED, or ERROR,
this function initiates asynchronous tasks to process usage data and record job coverage.

Args:
sender (Model): The model class that sent the signal (SpiderJob).
instance (SpiderJob): The specific SpiderJob instance that was saved.
created (bool): Whether this instance was newly created.
**kwargs: Additional keyword arguments provided by the signal.

Functionality:
1. If the SpiderJob instance has a status of COMPLETED, STOPPED, or ERROR, it triggers two asynchronous tasks:
- A task chain to process usage data using `get_chain_to_process_usage_data`.
- A task to record the job's field coverage using `record_job_coverage_event`.
2. Both tasks are scheduled to run after a delay specified in the Django settings.
- The delay for processing usage data is controlled by `settings.COUNTDOWN_RECORD_PROJECT_USAGE_AFTER_JOB_EVENT`.
- The delay for recording job coverage is controlled by `settings.COUNTDOWN_RECORD_COVERAGE_AFTER_JOB_EVENT`.
"""
# Check if the SpiderJob status is either COMPLETED, STOPPED, or ERROR
if instance.status in [
SpiderJob.COMPLETED_STATUS,
SpiderJob.STOPPED_STATUS,
SpiderJob.ERROR_STATUS,
]:
# Initiate the task chain to process usage data after the job event
chain_of_usage_process = get_chain_to_process_usage_data(job_id=instance.jid)
chain_of_usage_process.apply_async(
countdown=settings.COUNTDOWN_RECORD_PROJECT_USAGE_AFTER_JOB_EVENT
)

# Schedule the task to record job coverage statistics
record_job_coverage_event.apply_async(
args=[instance.jid],
countdown=settings.COUNTDOWN_RECORD_COVERAGE_AFTER_JOB_EVENT,
Expand Down
107 changes: 107 additions & 0 deletions estela-api/core/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,16 @@


def get_default_token(job):
"""
Retrieves or creates an authentication token for the first user
associated with the given job's spider project.

Args:
job (SpiderJob): The job for which to get the user's token.

Returns:
str: The token key for the first user, or None if no user is found.
"""
user = job.spider.project.users.first()
if not user:
return None
Expand All @@ -37,6 +47,11 @@ def get_default_token(job):

@celery_app.task
def run_spider_jobs():
"""
Task to run spider jobs that are in the IN_QUEUE_STATUS.
Selects jobs from the queue and updates their status to WAITING_STATUS.
Initializes job execution using the job manager.
"""
Comment on lines +50 to +54
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we utilizing this task? It seems like we’re launching jobs immediately when they are created, as seen here: estela-api/api/views/job.py#L136-L146. For cron jobs, we’re using the launch_job task.

If I recall correctly, this task was intended to throttle the number of jobs and prevent overloading the Kubernetes cluster. Could you confirm if this is the case? If so, can you provide more context or explain the reasons behind this task?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@joaquingx You are right. The task is not actively used since the request to launch jobs is usually sent without an async parameter. However, the logic is present here in case jobs are launched with the async parameter here:

else:
serializer.save(
spider=spider,
status=SpiderJob.IN_QUEUE_STATUS,
data_status=data_status,
data_expiry_days=data_expiry_days,
)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add to this documentation that it will be used for asyncs jobs launched please?

Suggested change
"""
Task to run spider jobs that are in the IN_QUEUE_STATUS.
Selects jobs from the queue and updates their status to WAITING_STATUS.
Initializes job execution using the job manager.
"""
"""
Task to run spider jobs that are in the IN_QUEUE_STATUS.
Selects jobs from the queue and updates their status to WAITING_STATUS.
Initializes job execution using the job manager.
Note: This task will be used for async jobs, although job requests are typically sent without the async parameter.
"""

jobs = SpiderJob.objects.filter(status=SpiderJob.IN_QUEUE_STATUS)[
: settings.RUN_JOBS_PER_LOT
]
Expand All @@ -59,6 +74,18 @@ def run_spider_jobs():


def delete_data(pid, sid, jid, data_type):
"""
Deletes specific data (items, requests, logs) associated with a job.

Args:
pid (str): Project ID.
sid (str): Spider ID.
jid (str): Job ID.
data_type (str): The type of data to delete ('items', 'requests', 'logs').

Returns:
bool: False if connection to spiderdata_db_client cannot be established.
"""
if not spiderdata_db_client.get_connection():
return False
job = SpiderJob.objects.get(jid=jid)
Expand All @@ -76,6 +103,16 @@ def delete_data(pid, sid, jid, data_type):

@celery_app.task(name="core.tasks.launch_job")
def launch_job(sid_, data_, data_expiry_days=None, token=None):
"""
Task to launch a spider job with the provided data and optional token.
Creates a job using SpiderJobCreateSerializer and passes the job to the job manager.

Args:
sid_ (int): Spider ID.
data_ (dict): Job data to be serialized and saved.
data_expiry_days (int, optional): Number of days before data expiry.
token (str, optional): Authentication token. If not provided, a default token is used.
"""
Comment on lines +106 to +115
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In what scenarios would we want to provide the auth token as an argument?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@joaquingx I'm not sure about it. I don't see anywhere in the code where the code is passed manually to this function. @Adriana618 I remember you created this task, do you remember the purpose for adding the token argument?

spider = Spider.objects.get(sid=sid_)

if data_expiry_days is None:
Expand Down Expand Up @@ -122,6 +159,11 @@ def launch_job(sid_, data_, data_expiry_days=None, token=None):

@celery_app.task(name="core.tasks.check_and_update_job_status_errors")
def check_and_update_job_status_errors():
"""
Task to check jobs with WAITING_STATUS and update their status to ERROR_STATUS
if no active or succeeded status is detected. Also updates job stats from Redis
and deletes Redis stats if applicable.
"""
jobs = SpiderJob.objects.filter(status=SpiderJob.WAITING_STATUS)[
: settings.CHECK_JOB_ERRORS_BATCH_SIZE
]
Expand All @@ -146,6 +188,20 @@ def check_and_update_job_status_errors():
retry_kwargs={"max_retries": None, "countdown": 600},
)
def record_project_usage_after_data_delete(project_id, job_id):
"""
Task to record the project usage statistics after deleting job data.
Calculates the new usage records based on database size after data deletion.

Args:
project_id (int): The project ID.
job_id (int): The job ID.

Raises:
TaskError: If the connection to spiderdata_db_client cannot be established.

Returns:
str: JSON string with job data size and usage details.
"""
joaquingx marked this conversation as resolved.
Show resolved Hide resolved
if not spiderdata_db_client.get_connection():
raise TaskError("Could not get a connection to the database.")

Expand Down Expand Up @@ -186,6 +242,14 @@ def record_project_usage_after_data_delete(project_id, job_id):

@celery_app.task()
def delete_job_data(job_key):
"""
Task to delete all job-related data including items, requests, and logs.
Updates job data status to DELETED_STATUS after deletion.
Calculates a new usage record after data deletion.

Args:
job_key (str): Unique identifier for the job, typically in the format "jid.sid.pid".
"""
jid, sid, pid = job_key.split(".")
for data_type in ["items", "requests", "logs"]:
delete_data(pid, sid, jid, data_type)
Expand All @@ -195,6 +259,10 @@ def delete_job_data(job_key):

@celery_app.task(name="core.tasks.delete_expired_jobs_data")
def delete_expired_jobs_data():
"""
Task to delete job data for jobs that have passed their data expiry date.
This checks jobs with a PENDING_STATUS and triggers deletion if expiry is met.
"""
pending_data_delete_jobs = SpiderJob.objects.filter(
data_status=DataStatus.PENDING_STATUS,
status__in=[SpiderJob.COMPLETED_STATUS, SpiderJob.STOPPED_STATUS],
Expand All @@ -211,6 +279,19 @@ def delete_expired_jobs_data():
retry_kwargs={"max_retries": None, "countdown": 600},
)
def record_project_usage_after_job_event(job_id):
"""
Task to record the project usage statistics after a job event (e.g., job completion).
Calculates new usage records based on the job data and updates the project usage records.

Args:
job_id (int): The job ID.

Raises:
TaskError: If the connection to spiderdata_db_client cannot be established.

Returns:
str: JSON string with job data size and usage details.
"""
if not spiderdata_db_client.get_connection():
raise TaskError("Could not get a connection to the database.")

Expand All @@ -230,14 +311,17 @@ def record_project_usage_after_job_event(job_id):
str(project.pid), items_collection_name
)
unique_collection = False

requests_collection_name = "{}-{}-job_requests".format(job.spider.sid, job.jid)
requests_data_size = spiderdata_db_client.get_dataset_size(
str(project.pid), requests_collection_name
)

logs_collection_name = "{}-{}-job_logs".format(job.spider.sid, job.jid)
logs_data_size = spiderdata_db_client.get_dataset_size(
str(project.pid), logs_collection_name
)

# Tracking Proxy Usage
proxy_details = {}
for proxy_name, proxy_usage_name in settings.PROXY_PROVIDERS_TO_TRACK:
Expand Down Expand Up @@ -303,6 +387,16 @@ def record_project_usage_after_job_event(job_id):
retry_kwargs={"max_retries": None, "countdown": 60},
)
def record_job_coverage_event(job_id):
"""
Task to record job field coverage statistics after a job event.
Collects data on field coverage and updates the job's statistics document in the database.

Args:
job_id (int): The job ID.

Raises:
TaskError: If the connection to spiderdata_db_client cannot be established.
"""
if not spiderdata_db_client.get_connection():
raise TaskError("Could not get a connection to the database.")
job = SpiderJob.objects.get(jid=job_id)
Expand Down Expand Up @@ -339,6 +433,19 @@ def record_job_coverage_event(job_id):


def get_chain_to_process_usage_data(after_delete=False, project_id=None, job_id=None):
joaquingx marked this conversation as resolved.
Show resolved Hide resolved
"""
Generates a Celery chain to process usage data after a job event or data deletion.
This function dynamically imports tasks from external apps specified in settings and chains
them together with either record_project_usage_after_data_delete or record_project_usage_after_job_event.

Args:
after_delete (bool): If True, processes usage data after a data deletion event.
project_id (int, optional): Project ID for the data deletion event.
job_id (int, optional): Job ID for the job event or data deletion.

Returns:
Celery chain: A chain of tasks to be executed sequentially.
"""
list_of_process_functions = []
for external_app in settings.DJANGO_EXTERNAL_APPS:
module = __import__(f"{external_app}.tasks", fromlist=["tasks"])
Expand Down