Skip to content

Commit

Permalink
Merge pull request #828 from BD2KGenomics/issues/729-promised-require…
Browse files Browse the repository at this point in the history
…ment

Implement promised requirements (resolves #729)
  • Loading branch information
hannes-ucsc committed Jun 8, 2016
2 parents d172118 + 9ae4eb5 commit 1195eff
Show file tree
Hide file tree
Showing 7 changed files with 333 additions and 43 deletions.
5 changes: 4 additions & 1 deletion docs/toilAPI.rst
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,10 @@ Promise
The class used to reference return values of jobs/services not yet run/started.

.. autoclass:: toil.job::Promise
:members:
:members:

.. autoclass:: toil.job::PromisedRequirement
:members:

Exceptions
----------
Expand Down
7 changes: 6 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,12 @@
author_email='[email protected]',
url="https://github.com/BD2KGenomics/toil",
install_requires=[
'bd2k-python-lib==1.13.dev14'],
'bd2k-python-lib==1.13.dev14',
'dill==0.2.5'],
tests_require=[
'mock==1.0.1',
'pytest==2.8.3'],
test_suite='toil',
extras_require={
'mesos': [
'psutil==3.0.1'],
Expand Down
2 changes: 1 addition & 1 deletion src/toil/batchSystems/mesos/batchSystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ def resourceOffers(self, driver, offers):
runnableTasks = []
# TODO: In an offer, can there ever be more than one resource with the same name?
offerCores, offerMemory, offerDisk = self._determineOfferResources(offer)
log.debug('Received offer %s with %i MiB memory, %i core(s) and %i MiB of disk.',
log.debug('Received offer %s with %.2f MiB memory, %.2f core(s) and %.2f MiB of disk.',
offer.id.value, offerMemory, offerCores, offerDisk)
remainingCores = offerCores
remainingMemory = offerMemory
Expand Down
146 changes: 138 additions & 8 deletions src/toil/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import tempfile
import time
import uuid
import dill

from abc import ABCMeta, abstractmethod
from argparse import ArgumentParser
Expand Down Expand Up @@ -209,7 +210,10 @@ def addChildFn(self, fn, *args, **kwargs):
:return: The new child job that wraps fn.
:rtype: toil.job.FunctionWrappingJob
"""
return self.addChild(FunctionWrappingJob(fn, *args, **kwargs))
if PromisedRequirement.convertPromises(kwargs):
return self.addChild(PromisedRequirementFunctionWrappingJob(fn, *args, **kwargs))
else:
return self.addChild(FunctionWrappingJob(fn, *args, **kwargs))

def addFollowOnFn(self, fn, *args, **kwargs):
"""
Expand All @@ -221,7 +225,10 @@ def addFollowOnFn(self, fn, *args, **kwargs):
:return: The new follow-on job that wraps fn.
:rtype: toil.job.FunctionWrappingJob
"""
return self.addFollowOn(FunctionWrappingJob(fn, *args, **kwargs))
if PromisedRequirement.convertPromises(kwargs):
return self.addFollowOn(PromisedRequirementFunctionWrappingJob(fn, *args, **kwargs))
else:
return self.addFollowOn(FunctionWrappingJob(fn, *args, **kwargs))

def addChildJobFn(self, fn, *args, **kwargs):
"""
Expand All @@ -234,7 +241,10 @@ def addChildJobFn(self, fn, *args, **kwargs):
:return: The new child job that wraps fn.
:rtype: toil.job.JobFunctionWrappingJob
"""
return self.addChild(JobFunctionWrappingJob(fn, *args, **kwargs))
if PromisedRequirement.convertPromises(kwargs):
return self.addChild(PromisedRequirementJobFunctionWrappingJob(fn, *args, **kwargs))
else:
return self.addChild(JobFunctionWrappingJob(fn, *args, **kwargs))

def addFollowOnJobFn(self, fn, *args, **kwargs):
"""
Expand All @@ -247,7 +257,10 @@ def addFollowOnJobFn(self, fn, *args, **kwargs):
:return: The new follow-on job that wraps fn.
:rtype: toil.job.JobFunctionWrappingJob
"""
return self.addFollowOn(JobFunctionWrappingJob(fn, *args, **kwargs))
if PromisedRequirement.convertPromises(kwargs):
return self.addFollowOn(PromisedRequirementJobFunctionWrappingJob(fn, *args, **kwargs))
else:
return self.addFollowOn(JobFunctionWrappingJob(fn, *args, **kwargs))

@staticmethod
def wrapFn(fn, *args, **kwargs):
Expand All @@ -261,7 +274,10 @@ def wrapFn(fn, *args, **kwargs):
:return: The new function that wraps fn.
:rtype: toil.job.FunctionWrappingJob
"""
return FunctionWrappingJob(fn, *args, **kwargs)
if PromisedRequirement.convertPromises(kwargs):
return PromisedRequirementFunctionWrappingJob(fn, *args, **kwargs)
else:
return FunctionWrappingJob(fn, *args, **kwargs)

@staticmethod
def wrapJobFn(fn, *args, **kwargs):
Expand All @@ -275,7 +291,10 @@ def wrapJobFn(fn, *args, **kwargs):
:return: The new job function that wraps fn.
:rtype: toil.job.JobFunctionWrappingJob
"""
return JobFunctionWrappingJob(fn, *args, **kwargs)
if PromisedRequirement.convertPromises(kwargs):
return PromisedRequirementJobFunctionWrappingJob(fn, *args, **kwargs)
else:
return JobFunctionWrappingJob(fn, *args, **kwargs)

def encapsulate(self):
"""
Expand Down Expand Up @@ -2225,12 +2244,12 @@ def _serialiseJob(self, jobStore, jobsToJobWrappers, rootJobWrapper):
# method.
with jobStore.writeFileStream(rootJobWrapper.jobStoreID) as (fileHandle, fileStoreID):
cPickle.dump(self, fileHandle, cPickle.HIGHEST_PROTOCOL)
# Note that getUserScript() may have beeen overridden. This is intended. If we used
# Note that getUserScript() may have been overridden. This is intended. If we used
# self.userModule directly, we'd be getting a reference to job.py if the job was
# specified as a function (as opposed to a class) since that is where FunctionWrappingJob
# is defined. What we really want is the module that was loaded as __main__,
# and FunctionWrappingJob overrides getUserScript() to give us just that. Only then can
# filter_main() in _unpickle( ) do its job of resolveing any user-defined type or function.
# filter_main() in _unpickle( ) do its job of resolving any user-defined type or function.
userScript = self.getUserScript().globalize()
jobsToJobWrappers[self].command = ' '.join( ('_toil', fileStoreID) + userScript)
#Update the status of the jobWrapper on disk
Expand Down Expand Up @@ -2428,13 +2447,15 @@ def _jobName(self):
"""
return self.__class__.__name__


class JobException( Exception ):
"""
General job exception.
"""
def __init__( self, message ):
super( JobException, self ).__init__( message )


class JobGraphDeadlockException( JobException ):
"""
An exception raised in the event that a workflow contains an unresolvable \
Expand Down Expand Up @@ -2524,6 +2545,7 @@ def getUserScript(self):
def _jobName(self):
return ".".join((self.__class__.__name__,self.userFunctionModule.name,self.userFunctionName))


class JobFunctionWrappingJob(FunctionWrappingJob):
"""
A job function is a function whose first argument is a :class:`job.Job` \
Expand All @@ -2541,6 +2563,54 @@ def run(self, fileStore):
rValue = userFunction(*((self,) + tuple(self._args)), **self._kwargs)
return rValue


class PromisedRequirementFunctionWrappingJob(FunctionWrappingJob):
"""
Handles dynamic resource allocation using :class:`toil.job.Promise` instances.
Spawns child function using parent function parameters and fulfilled promised
resource requirements. See :class:`toil.job.FunctionWrappingJob` class
"""
def __init__(self, userFunction, *args, **kwargs):
self.promisedRequirements = {}
requirements = {'disk': '1M', 'memory': '32M', 'cores': 0.1}
# Replace PromisedRequirements in intermediate job with small
# resource requirements.
for name, value in requirements.items():
try:
if isinstance(kwargs[name], PromisedRequirement):
self.promisedRequirements[name] = kwargs[name]
kwargs[name] = value
except KeyError:
pass
super(PromisedRequirementFunctionWrappingJob, self).__init__(userFunction, *args, **kwargs)

def run(self, fileStore):
# Assumes promises are fulfilled when parent job is run
self.evaluatePromisedRequirements()
userFunction = self._getUserFunction()
return self.addChildFn(userFunction, *self._args, **self._kwargs).rv()

def evaluatePromisedRequirements(self):
requirements = ["disk", "memory", "cores"]
for requirement in requirements:
try:
self._kwargs[requirement] = self.promisedRequirements[requirement].getValue()
except KeyError:
self._kwargs[requirement] = getattr(self, requirement)


class PromisedRequirementJobFunctionWrappingJob(PromisedRequirementFunctionWrappingJob):
"""
Handles dynamic resource allocation for job functions.
See :class:`toil.job.JobFunctionWrappingJob`
"""

def run(self, fileStore):
self.evaluatePromisedRequirements()
userFunction = self._getUserFunction()
return self.addChildJobFn(userFunction, *self._args, **self._kwargs).rv()


class EncapsulatedJob(Job):
"""
A convenience Job class used to make a job subgraph appear to be a single job.
Expand Down Expand Up @@ -2744,3 +2814,63 @@ def _resolve(cls, jobStoreString, jobStoreFileID):
# corrupted
value = cPickle.load(fileHandle)
return value


class PromisedRequirement(object):
def __init__(self, valueOrCallable, *args):
"""
Class for dynamically allocating job function resource requirements involving
:class:`toil.job.Promise` instances.
Use when resource requirements depend on the return value of a parent function.
PromisedRequirements can be modified by passing a function that takes the
:class:`Promise` as input.
For example, let f, g, and h be functions. Then a Toil workflow can be
defined as follows::
A = Job.wrapFn(f)
B = A.addChildFn(g, cores=PromisedRequirement(A.rv())
C = B.addChildFn(h, cores=PromisedRequirement(lambda x: 2*x, B.rv()))
:param valueOrCallable: A single Promise instance or a function that
takes \*args as input parameters.
:param int|Promise *args: variable length argument list
"""
if hasattr(valueOrCallable, '__call__'):
assert len(args) != 0, 'Need parameters for PromisedRequirement function.'
func = valueOrCallable
else:
assert len(args) == 0, 'Define a PromisedRequirement function to handle multiple arguments.'
func = lambda x: x
args = [valueOrCallable]

self._func = dill.dumps(func)
self._args = list(args)

def getValue(self):
"""
Returns PromisedRequirement value
"""
func = dill.loads(self._func)
return func(*self._args)

@staticmethod
def convertPromises(kwargs):
"""
Returns True if reserved resource keyword is a Promise or
PromisedRequirement instance. Converts Promise instance
to PromisedRequirement.
:param kwargs: function keyword arguments
:return: bool
"""
requirements = ["disk", "memory", "cores"]
foundPromisedRequirement = False
for r in requirements:
if isinstance(kwargs.get(r), Promise):
kwargs[r] = PromisedRequirement(kwargs[r])
foundPromisedRequirement = True
elif isinstance(kwargs.get(r), PromisedRequirement):
foundPromisedRequirement = True
return foundPromisedRequirement

2 changes: 1 addition & 1 deletion src/toil/leader.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ def issueJob(self, jobStoreID, memory, cores, disk, preemptable):
jobBatchSystemID = self.batchSystem.issueBatchJob(jobCommand, memory, cores, disk, preemptable)
self.jobBatchSystemIDToIssuedJob[jobBatchSystemID] = IssuedJob(jobStoreID, memory, cores, disk, preemptable)
logger.debug("Issued job with job store ID: %s and job batch system ID: "
"%s and cores: %i, disk: %i, and memory: %i",
"%s and cores: %.2f, disk: %.2f, and memory: %.2f",
jobStoreID, str(jobBatchSystemID), cores, disk, memory)

def issueJobs(self, jobs):
Expand Down
51 changes: 20 additions & 31 deletions src/toil/test/batchSystems/batchSystemTest.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,9 +204,9 @@ class AbstractBatchSystemJobTest(ToilTest):

__metaclass__ = ABCMeta

cpu_count = multiprocessing.cpu_count()
allocated_cores = sorted({1, 2, cpu_count})
sleep_time = 5
cpuCount = multiprocessing.cpu_count()
allocatedCores = sorted({1, 2, cpuCount})
sleepTime = 5

@abstractmethod
def getBatchSystemName(self):
Expand All @@ -226,27 +226,26 @@ def testJobConcurrency(self):
"""
Tests that the batch system is allocating core resources properly for concurrent tasks.
"""
for cores_per_job in self.allocated_cores:
temp_dir = self._createTempDir('testFiles')
for coresPerJob in self.allocatedCores:
tempDir = self._createTempDir('testFiles')

options = Job.Runner.getDefaultOptions(self._getTestJobStorePath())
options.workDir = temp_dir
options.maxCores = self.cpu_count
options.workDir = tempDir
options.maxCores = self.cpuCount
options.batchSystem = self.batchSystemName

counter_path = os.path.join(temp_dir, 'counter')
resetCounters(counter_path)
value, max_value = getCounters(counter_path)
assert (value, max_value) == (0, 0)
counterPath = os.path.join(tempDir, 'counter')
resetCounters(counterPath)
value, maxValue = getCounters(counterPath)
assert (value, maxValue) == (0, 0)

root = Job()
for _ in range(self.cpu_count):
root.addFollowOn(Job.wrapFn(measureConcurrency, counter_path, self.sleep_time,
cores=cores_per_job, memory='1M', disk='1Mi'))
for _ in range(self.cpuCount):
root.addFollowOn(Job.wrapFn(measureConcurrency, counterPath, self.sleepTime,
cores=coresPerJob, memory='1M', disk='1Mi'))
Job.Runner.startToil(root, options)

_, max_value = getCounters(counter_path)
self.assertEqual(max_value, self.cpu_count / cores_per_job)
_, maxValue = getCounters(counterPath)
self.assertEqual(maxValue, self.cpuCount / coresPerJob)


@needs_mesos
Expand Down Expand Up @@ -394,26 +393,16 @@ def test(self):
jobIds.remove(jobId)
finally:
bs.shutdown()
concurrentTasks, maxConcurrentTasks = self.getCounters()
concurrentTasks, maxConcurrentTasks = getCounters(self.counterPath)
self.assertEquals(concurrentTasks, 0)
log.info('maxCores: {maxCores}, '
'coresPerJob: {coresPerJob}, '
'load: {load}'.format(**locals()))
# This is the key assertion:
expectedMaxConcurrentTasks = min(maxCores / coresPerJob, jobs)
self.assertEquals(maxConcurrentTasks, expectedMaxConcurrentTasks)
self.resetCounters()

def getCounters(self):
with open(self.counterPath, 'r+') as f:
s = f.read()
log.info('Counter is %s', s)
concurrentTasks, maxConcurrentTasks = map(int, s.split(','))
return concurrentTasks, maxConcurrentTasks
resetCounters(self.counterPath)

def resetCounters(self):
with open(self.counterPath, 'w') as f:
f.write('0,0')

@skipIf(SingleMachineBatchSystem.numCores < 3, 'Need at least three cores to run this test')
def testServices(self):
Expand All @@ -425,7 +414,7 @@ def testServices(self):
with open(self.counterPath, 'r+') as f:
s = f.read()
log.info('Counter is %s', s)
self.assertEqual(self.getCounters(), (0, 3))
self.assertEqual(getCounters(self.counterPath), (0, 3))


# Toil can use only top-level functions so we have to add them here:
Expand Down Expand Up @@ -607,7 +596,7 @@ class MesosBatchSystemJobTest(hidden.AbstractBatchSystemJobTest, MesosTestSuppor
"""

def getBatchSystemName(self):
self._startMesos(self.cpu_count)
self._startMesos(self.cpuCount)
return "mesos"

def tearDown(self):
Expand Down
Loading

0 comments on commit 1195eff

Please sign in to comment.