Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

15692: Introduce background jobs #16699

Open
wants to merge 9 commits into
base: feature
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/development/models.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ Depending on its classification, each NetBox model may support various features
| [Custom links](../customization/custom-links.md) | `CustomLinksMixin` | `custom_links` | These models support the assignment of custom links |
| [Custom validation](../customization/custom-validation.md) | `CustomValidationMixin` | - | Supports the enforcement of custom validation rules |
| [Export templates](../customization/export-templates.md) | `ExportTemplatesMixin` | `export_templates` | Users can create custom export templates for these models |
| [Job results](../features/background-jobs.md) | `JobsMixin` | `jobs` | Users can create custom export templates for these models |
| [Job results](../features/background-jobs.md) | `JobsMixin` | `jobs` | Background jobs can be scheduled for these models |
| [Journaling](../features/journaling.md) | `JournalingMixin` | `journaling` | These models support persistent historical commentary |
| [Synchronized data](../integrations/synchronized-data.md) | `SyncedDataMixin` | `synced_data` | Certain model data can be automatically synchronized from a remote data source |
| [Tagging](../models/extras/tag.md) | `TagsMixin` | `tags` | The models can be tagged with user-defined tags |
Expand Down
72 changes: 71 additions & 1 deletion docs/plugins/development/background-tasks.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,76 @@
# Background Tasks

NetBox supports the queuing of tasks that need to be performed in the background, decoupled from the request-response cycle, using the [Python RQ](https://python-rq.org/) library. Three task queues of differing priority are defined by default:
NetBox supports the queuing of tasks that need to be performed in the background, decoupled from the request-response cycle.


## High level API

NetBox provides an easy-to-use interface for programming and managing different types of jobs. In general, there are different types of jobs that can be used to perform any kind of background task. Due to inheritance, the general job logic remains the same, but each of them fulfills a specific task and has its own management logic around it.

### Background Job

A background job implements a basic [Job](../../models/core/job.md) executor for all kinds of tasks. It has logic implemented to handle the management of the associated job object, rescheduling of periodic jobs in the given interval and error handling. Adding custom jobs is done by subclassing NetBox's `BackgroundJob` class.

**Example:**

```python title="jobs.py"
from utilities.jobs import BackgroundJob

class MyTestJob(BackgroundJob):
@classmethod
def run(cls, job, *args, **kwargs):
obj = job.object
# your logic goes here
```

You can schedule the background job from within your code (e.g. from a model's `save()` method or a view) by calling `MyTestJob.enqueue()`. This method passes through all arguments to `Job.enqueue()`.

::: core.models.Job.enqueue

### Scheduled Job

During execution, a scheduled job behaves like a background job and is therefore implemented in the same way, but must be subclassed from NetBox's `ScheduledJob` class.

However, for management purposes, a `schedule()` method allows a schedule to be set exactly once to avoid duplicates. If a job is already scheduled for a particular instance, a second one won't be scheduled, respecting thread safety. An example use case would be to schedule a periodic task that is bound to an instance in general, but not to any event of that instance (such as updates). The parameters of the `schedule()` method are identical to those of `enqueue()`. Note that this class doesn't allow you to pass the `name` parameter for both methods, but uses a generic name instead.

!!! tip
It is not forbidden to `enqueue()` additional jobs while an interval schedule is active. An example use of this would be to schedule a periodic daily synchronization, but also trigger additional synchronizations on demand when the user presses a button.

### System Job

The last type of job is a system job that is not bound to any particular instance. A typical use case for these jobs is a general synchronization of NetBox objects from another system or housekeeping. The implementation of system jobs is the same as for background and scheduled jobs, but they must be subclassed from NetBox's `SystemJob` class. In addition to avoiding the `name` parameter, no `instance` parameter may be passed to `enqueue()`, as a placeholder will be used instead.

Typically, a system job is set up during NetBox startup when the plugin is loaded. This ensures that the job is running in the background even when no requests are being processed. For this purpose, the `setup()` method can be used to setup a new schedule outside of the request-response cycle. It can be safely called from the plugin's ready function and will register the new schedule right after all plugins are loaded and the database is connected.

**Example:**

```python title="jobs.py"
from utilities.jobs import SystemJob

class MyHousekeepingJob(SystemJob):
@classmethod
def run(cls, *args, **kwargs):
# your logic goes here
pass
```
```python title="__init__.py"
from netbox.plugins import PluginConfig

class MyPluginConfig(PluginConfig):
def ready(self):
from .jobs import MyHousekeepingJob
MyHousekeepingJob.setup(interval=60)
```


## Low Level API

Instead of using the high-level APIs provided by NetBox, plugins may access the task scheduler directly using the [Python RQ](https://python-rq.org/) library. This allows scheduling background tasks without the need to add [Job](../../models/core/job.md) to the database or implementing custom job handling.


## Task queues

Three task queues of differing priority are defined by default:

* High
* Default
Expand Down
1 change: 1 addition & 0 deletions docs/plugins/development/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ project-name/
- __init__.py
- filtersets.py
- graphql.py
- jobs.py
- models.py
- middleware.py
- navigation.py
Expand Down
2 changes: 2 additions & 0 deletions docs/plugins/development/models.md
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,8 @@ For more information about database migrations, see the [Django documentation](h

::: netbox.models.features.ExportTemplatesMixin

::: netbox.models.features.JobsMixin

::: netbox.models.features.JournalingMixin

::: netbox.models.features.TagsMixin
Expand Down
30 changes: 14 additions & 16 deletions netbox/core/jobs.py
Original file line number Diff line number Diff line change
@@ -1,33 +1,31 @@
import logging

from netbox.search.backends import search_backend
from .choices import *
from utilities.jobs import BackgroundJob
from .choices import DataSourceStatusChoices
from .exceptions import SyncError
from .models import DataSource
from rq.timeouts import JobTimeoutException

logger = logging.getLogger(__name__)


def sync_datasource(job, *args, **kwargs):
class SyncDataSourceJob(BackgroundJob):
"""
Call sync() on a DataSource.
"""
datasource = DataSource.objects.get(pk=job.object_id)

try:
job.start()
datasource.sync()
@classmethod
def run(cls, job, *args, **kwargs):
datasource = DataSource.objects.get(pk=job.object_id)

# Update the search cache for DataFiles belonging to this source
search_backend.cache(datasource.datafiles.iterator())
try:
datasource.sync()

job.terminate()
# Update the search cache for DataFiles belonging to this source
search_backend.cache(datasource.datafiles.iterator())

except Exception as e:
job.terminate(status=JobStatusChoices.STATUS_ERRORED, error=repr(e))
DataSource.objects.filter(pk=datasource.pk).update(status=DataSourceStatusChoices.FAILED)
if type(e) in (SyncError, JobTimeoutException):
logging.error(e)
else:
except Exception as e:
DataSource.objects.filter(pk=datasource.pk).update(status=DataSourceStatusChoices.FAILED)
if type(e) is SyncError:
logging.error(e)
raise e
13 changes: 6 additions & 7 deletions netbox/core/management/commands/syncdatasource.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,12 @@ def handle(self, *args, **options):
for i, datasource in enumerate(datasources, start=1):
self.stdout.write(f"[{i}] Syncing {datasource}... ", ending='')
self.stdout.flush()
try:
datasource.sync()
self.stdout.write(datasource.get_status_display())
self.stdout.flush()
except Exception as e:
DataSource.objects.filter(pk=datasource.pk).update(status=DataSourceStatusChoices.FAILED)
raise e

datasource.enqueue_sync_job()
datasource.refresh_from_db()

self.stdout.write(datasource.get_status_display())
self.stdout.flush()

if len(options['name']) > 1:
self.stdout.write(f"Finished.")
9 changes: 5 additions & 4 deletions netbox/core/models/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ def to_objectchange(self, action):

return objectchange

def enqueue_sync_job(self, request):
def enqueue_sync_job(self, request=None):
"""
Enqueue a background job to synchronize the DataSource by calling sync().
"""
Expand All @@ -162,10 +162,11 @@ def enqueue_sync_job(self, request):
DataSource.objects.filter(pk=self.pk).update(status=self.status)

# Enqueue a sync job
return Job.enqueue(
import_string('core.jobs.sync_datasource'),
SyncDataSourceJob = import_string('core.jobs.SyncDataSourceJob')
return SyncDataSourceJob.enqueue(
instance=self,
user=request.user
user=(request.user if request else None),
run_now=(request is None),
)

def get_backend(self):
Expand Down
10 changes: 9 additions & 1 deletion netbox/core/models/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ def terminate(self, status=JobStatusChoices.STATUS_COMPLETED, error=None):
job_end.send(self)

@classmethod
def enqueue(cls, func, instance, name='', user=None, schedule_at=None, interval=None, **kwargs):
def enqueue(cls, func, instance, name='', user=None, schedule_at=None, interval=None, run_now=False, **kwargs):
"""
Create a Job instance and enqueue a job using the given callable

Expand All @@ -209,6 +209,8 @@ def enqueue(cls, func, instance, name='', user=None, schedule_at=None, interval=
user: The user responsible for running the job
schedule_at: Schedule the job to be executed at the passed date and time
interval: Recurrence interval (in minutes)
run_now: Run the job immediately without scheduling it in the background. Should be used for interactive
management commands only.
"""
object_type = ObjectType.objects.get_for_model(instance, for_concrete_model=False)
rq_queue_name = get_queue_for_model(object_type.model)
Expand All @@ -225,6 +227,12 @@ def enqueue(cls, func, instance, name='', user=None, schedule_at=None, interval=
job_id=uuid.uuid4()
)

# Optionally, the job can be run immediately without being scheduled to run in the background.
if run_now:
func(job_id=str(job.job_id), job=job, **kwargs)
return job

# Schedule the job to run asynchronously in the background.
if schedule_at:
queue.enqueue_at(schedule_at, func, job_id=str(job.job_id), job=job, **kwargs)
else:
Expand Down
6 changes: 3 additions & 3 deletions netbox/extras/api/views.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from django.http import Http404
from django.shortcuts import get_object_or_404
from django.utils.module_loading import import_string
from django_rq.queues import get_connection
from rest_framework import status
from rest_framework.decorators import action
Expand All @@ -14,7 +15,6 @@
from core.models import Job, ObjectType
from extras import filtersets
from extras.models import *
from extras.scripts import run_script
from netbox.api.authentication import IsAuthenticatedOrLoginNotRequired
from netbox.api.features import SyncedDataMixin
from netbox.api.metadata import ContentTypeMetadata
Expand Down Expand Up @@ -252,8 +252,8 @@ def post(self, request, pk):
raise RQWorkerNotRunningException()

if input_serializer.is_valid():
Job.enqueue(
run_script,
ScriptJob = import_string("extras.jobs.ScriptJob")
ScriptJob.enqueue(
instance=script,
name=script.python_class.class_name,
user=request.user,
Expand Down
5 changes: 2 additions & 3 deletions netbox/extras/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
from django_rq import get_queue

from core.choices import ObjectChangeActionChoices
from core.models import Job
from netbox.config import get_config
from netbox.constants import RQ_QUEUE_DEFAULT
from netbox.registry import registry
Expand Down Expand Up @@ -125,8 +124,8 @@ def process_event_rules(event_rules, model_name, event, data, username=None, sna
script = event_rule.action_object.python_class()

# Enqueue a Job to record the script's execution
Job.enqueue(
"extras.scripts.run_script",
ScriptJob = import_string("extras.jobs.ScriptJob")
ScriptJob.enqueue(
instance=event_rule.action_object,
name=script.name,
user=user,
Expand Down
105 changes: 105 additions & 0 deletions netbox/extras/jobs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
import logging
import traceback
from contextlib import nullcontext

from django.db import transaction
from django.utils.translation import gettext as _

from extras.models import Script as ScriptModel
from extras.signals import clear_events
from netbox.context_managers import event_tracking
from utilities.exceptions import AbortScript, AbortTransaction
from utilities.jobs import BackgroundJob
from .utils import is_report


class ScriptJob(BackgroundJob):
"""
Script execution job.

A wrapper for calling Script.run(). This performs error handling and provides a hook for committing changes. It
exists outside the Script class to ensure it cannot be overridden by a script author.
"""

@staticmethod
def run_script(script, job, request, data, commit):
"""
Core script execution task. We capture this within a method to allow for conditionally wrapping it with the
event_tracking context manager (which is bypassed if commit == False).

Args:
job: The Job associated with this execution
request: The WSGI request associated with this execution (if any)
data: A dictionary of data to be passed to the script upon execution
commit: Passed through to Script.run()
"""
try:
try:
with transaction.atomic():
script.output = script.run(data, commit)
if not commit:
raise AbortTransaction()
except AbortTransaction:
script.log_info(message=_("Database changes have been reverted automatically."))
if script.failed:
logger.warning(f"Script failed")
raise

except Exception as e:
if type(e) is AbortScript:
msg = _("Script aborted with error: ") + str(e)
if is_report(type(script)):
script.log_failure(message=msg)
else:
script.log_failure(msg)
logger.error(f"Script aborted with error: {e}")

else:
stacktrace = traceback.format_exc()
script.log_failure(
message=_("An exception occurred: ") + f"`{type(e).__name__}: {e}`\n```\n{stacktrace}\n```"
)
logger.error(f"Exception raised during script execution: {e}")

if type(e) is not AbortTransaction:
script.log_info(message=_("Database changes have been reverted due to error."))

# Clear all pending events. Job termination (including setting the status) is handled by the job framework.
if request:
clear_events.send(request)
raise

# Update the job data regardless of the execution status of the job. Successes should be reported as well as
# failures.
finally:
job.data = script.get_job_data()

@classmethod
def run(cls, job, data, request=None, commit=True, **kwargs):
"""
Run the script.

Args:
job: The Job associated with this execution
data: A dictionary of data to be passed to the script upon execution
request: The WSGI request associated with this execution (if any)
commit: Passed through to Script.run()
"""
script = ScriptModel.objects.get(pk=job.object_id).python_class()

logger = logging.getLogger(f"netbox.scripts.{script.full_name}")
logger.info(f"Running script (commit={commit})")

# Add files to form data
if request:
files = request.FILES
for field_name, fileobj in files.items():
data[field_name] = fileobj

# Add the current request as a property of the script
script.request = request

# Execute the script. If commit is True, wrap it with the event_tracking context manager to ensure we process
# change logging, event rules, etc.
with event_tracking(request) if commit else nullcontext():
cls.run_script(script, job, request, data, commit)
Loading