Skip to content
This repository has been archived by the owner on Apr 26, 2021. It is now read-only.

Get specific available machines by label, tag and platform. #3186

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 55 additions & 41 deletions cuckoo/core/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
)
from cuckoo.common.objects import File
from cuckoo.common.files import Folders
from cuckoo.core.database import Database, TASK_COMPLETED, TASK_REPORTED
from cuckoo.core.database import Database, TASK_COMPLETED, TASK_REPORTED, TASK_RUNNING, TASK_PENDING
from cuckoo.core.guest import GuestManager
from cuckoo.core.plugins import RunAuxiliary, RunProcessing
from cuckoo.core.plugins import RunSignatures, RunReporting
Expand Down Expand Up @@ -162,7 +162,7 @@ def acquire_machine(self):
# In some cases it's possible that we enter this loop without
# having any available machines. We should make sure this is not
# such case, or the analysis task will fail completely.
if not machinery.availables():
if not machinery.availables(label=self.task.machine, platform=self.task.platform, tags=self.task.tags):
machine_lock.release()
time.sleep(1)
continue
Expand Down Expand Up @@ -965,11 +965,18 @@ def _cleanup_managers(self):
cleaned.add(am)
return cleaned

def _thr_periodic_log(self):
log.debug("# Tasks: %d; # Available Machines: %d; # Locked Machines: %d; # Total Machines: %d;",
self.db.count_tasks(status=TASK_PENDING), self.db.count_machines_available(),
len(self.db.list_machines(locked=True)), len(self.db.list_machines()))
threading.Timer(10, self._thr_periodic_log).start()

def start(self):
"""Start scheduler."""
self.initialize()

log.info("Waiting for analysis tasks.")
self._thr_periodic_log()

# Message queue with threads to transmit exceptions (used as IPC).
errors = Queue.Queue()
Expand All @@ -978,27 +985,12 @@ def start(self):
if self.maxcount is None:
self.maxcount = self.cfg.cuckoo.max_analysis_count

launched_analysis = True
# This loop runs forever.
while self.running:
time.sleep(1)

# Run cleanup on finished analysis managers and untrack them
for am in self._cleanup_managers():
self.analysis_managers.discard(am)

# Wait until the machine lock is not locked. This is only the case
# when all machines are fully running, rather that about to start
# or still busy starting. This way we won't have race conditions
# with finding out there are no available machines in the analysis
# manager or having two analyses pick the same machine.
if not machine_lock.acquire(False):
logger(
"Could not acquire machine lock",
action="scheduler.machine_lock", status="busy"
)
continue

machine_lock.release()
if not launched_analysis:
time.sleep(1)
launched_analysis = False

# If not enough free disk space is available, then we print an
# error message and wait another round (this check is ignored
Expand Down Expand Up @@ -1064,28 +1056,50 @@ def start(self):
)
continue

# Fetch a pending analysis task.
# TODO This fixes only submissions by --machine, need to add
# other attributes (tags etc).
# TODO We should probably move the entire "acquire machine" logic
# from the Analysis Manager to the Scheduler and then pass the
# selected machine onto the Analysis Manager instance.
task, available = None, False
for machine in self.db.get_available_machines():
task = self.db.fetch(machine=machine.name)
if task:
break

if machine.is_analysis():
# Get all tasks in the queue
tasks = self.db.list_tasks(status=TASK_PENDING, details=True)
if not tasks:
continue

for task in tasks:
# Run cleanup on finished analysis managers and untrack them
for am in self._cleanup_managers():
self.analysis_managers.discard(am)

# Wait until the machine lock is not locked. This is only the case
# when all machines are fully running, rather that about to start
# or still busy starting. This way we won't have race conditions
# with finding out there are no available machines in the analysis
# manager or having two analyses pick the same machine.
if not machine_lock.acquire(False):
logger(
"Could not acquire machine lock",
action="scheduler.machine_lock", status="busy"
)
continue

machine_lock.release()

available = False
# Note that label > platform > tags
if task.machine:
if machinery.availables(label=task.machine):
available = True
elif task.platform:
if machinery.availables(platform=task.platform):
available = True
elif task.tags:
tag_names = [tag.name for tag in task.tags]
if machinery.availables(tags=tag_names):
available = True
else:
available = True

# We only fetch a new task if at least one of the available
# machines is not a "service" machine (again, please refer to the
# services auxiliary module for more information on service VMs).
if not task and available:
task = self.db.fetch(service=False)
if not available:
continue

self.db.set_status(task.id, TASK_RUNNING)

if task:
log.debug("Processing task #%s", task.id)
self.total_analysis_count += 1

Expand All @@ -1094,7 +1108,7 @@ def start(self):
analysis.daemon = True
analysis.start()
self.analysis_managers.add(analysis)

launched_analysis = True
# Deal with errors.
try:
raise errors.get(block=False)
Expand Down
31 changes: 30 additions & 1 deletion cuckoo/machinery/kvm.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,13 @@
# Copyright (C) 2014-2016 Cuckoo Foundation.
# This file is part of Cuckoo Sandbox - http://www.cuckoosandbox.org
# See the file 'docs/LICENSE' for copying permission.
import logging
from cuckoo.common.abstracts import LibVirtMachinery
from cuckoo.common.exceptions import CuckooCriticalError
from cuckoo.common.exceptions import CuckooMachineError
from cuckoo.core.database import Machine
from sqlalchemy.exc import SQLAlchemyError
log = logging.getLogger(__name__)

try:
import libvirt
Expand Down Expand Up @@ -33,4 +37,29 @@ def _connect(self):

def _disconnect(self, conn):
"""Disconnect, ignore request to disconnect."""
pass
pass

def availables(self, label=None, platform=None, tags=None):
if all(param is None for param in [label, platform, tags]):
return super(KVM, self).availables()
else:
return self._get_specific_availables(label=label, platform=platform, tags=tags)

def _get_specific_availables(self, label=None, platform=None, tags=None):
session = self.db.Session()
try:
machines = session.query(Machine)
# Note that label > platform > tags
if label:
machines = machines.filter_by(locked=False).filter_by(label=label)
elif platform:
machines = machines.filter_by(locked=False).filter_by(platform=platform)
elif tags:
for tag in tags:
machines = machines.filter_by(locked=False).filter(Machine.tags.any(name=tag))
return machines.count()
except SQLAlchemyError as e:
log.exception("Database error getting specific available machines: {0}".format(e))
return 0
finally:
session.close()