Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement promised requirements (resolves #729) #828

Merged
merged 3 commits into from
Jun 8, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion docs/toilAPI.rst
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,10 @@ Promise
The class used to reference return values of jobs/services not yet run/started.

.. autoclass:: toil.job::Promise
:members:
:members:

.. autoclass:: toil.job::PromisedRequirement
:members:

Exceptions
----------
Expand Down
7 changes: 6 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,12 @@
author_email='[email protected]',
url="https://github.com/BD2KGenomics/toil",
install_requires=[
'bd2k-python-lib==1.13.dev14'],
'bd2k-python-lib==1.13.dev14',
'dill==0.2.5'],
tests_require=[
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, I didn't catch this earlier but this is a merge error on your part, @jpfeil. You reintroduced lines that I had taken out. Try to be extra careful with merges!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

… sorry, with resolving merge conflicts.

'mock==1.0.1',
'pytest==2.8.3'],
test_suite='toil',
extras_require={
'mesos': [
'psutil==3.0.1'],
Expand Down
2 changes: 1 addition & 1 deletion src/toil/batchSystems/mesos/batchSystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ def resourceOffers(self, driver, offers):
runnableTasks = []
# TODO: In an offer, can there ever be more than one resource with the same name?
offerCores, offerMemory, offerDisk = self._determineOfferResources(offer)
log.debug('Received offer %s with %i MiB memory, %i core(s) and %i MiB of disk.',
log.debug('Received offer %s with %.2f MiB memory, %.2f core(s) and %.2f MiB of disk.',
offer.id.value, offerMemory, offerCores, offerDisk)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be committed separately.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Resolved.

remainingCores = offerCores
remainingMemory = offerMemory
Expand Down
146 changes: 138 additions & 8 deletions src/toil/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import tempfile
import time
import uuid
import dill

from abc import ABCMeta, abstractmethod
from argparse import ArgumentParser
Expand Down Expand Up @@ -209,7 +210,10 @@ def addChildFn(self, fn, *args, **kwargs):
:return: The new child job that wraps fn.
:rtype: toil.job.FunctionWrappingJob
"""
return self.addChild(FunctionWrappingJob(fn, *args, **kwargs))
if PromisedRequirement.convertPromises(kwargs):
return self.addChild(PromisedRequirementFunctionWrappingJob(fn, *args, **kwargs))
else:
return self.addChild(FunctionWrappingJob(fn, *args, **kwargs))

def addFollowOnFn(self, fn, *args, **kwargs):
"""
Expand All @@ -221,7 +225,10 @@ def addFollowOnFn(self, fn, *args, **kwargs):
:return: The new follow-on job that wraps fn.
:rtype: toil.job.FunctionWrappingJob
"""
return self.addFollowOn(FunctionWrappingJob(fn, *args, **kwargs))
if PromisedRequirement.convertPromises(kwargs):
return self.addFollowOn(PromisedRequirementFunctionWrappingJob(fn, *args, **kwargs))
else:
return self.addFollowOn(FunctionWrappingJob(fn, *args, **kwargs))

def addChildJobFn(self, fn, *args, **kwargs):
"""
Expand All @@ -234,7 +241,10 @@ def addChildJobFn(self, fn, *args, **kwargs):
:return: The new child job that wraps fn.
:rtype: toil.job.JobFunctionWrappingJob
"""
return self.addChild(JobFunctionWrappingJob(fn, *args, **kwargs))
if PromisedRequirement.convertPromises(kwargs):
return self.addChild(PromisedRequirementJobFunctionWrappingJob(fn, *args, **kwargs))
else:
return self.addChild(JobFunctionWrappingJob(fn, *args, **kwargs))

def addFollowOnJobFn(self, fn, *args, **kwargs):
"""
Expand All @@ -247,7 +257,10 @@ def addFollowOnJobFn(self, fn, *args, **kwargs):
:return: The new follow-on job that wraps fn.
:rtype: toil.job.JobFunctionWrappingJob
"""
return self.addFollowOn(JobFunctionWrappingJob(fn, *args, **kwargs))
if PromisedRequirement.convertPromises(kwargs):
return self.addFollowOn(PromisedRequirementJobFunctionWrappingJob(fn, *args, **kwargs))
else:
return self.addFollowOn(JobFunctionWrappingJob(fn, *args, **kwargs))

@staticmethod
def wrapFn(fn, *args, **kwargs):
Expand All @@ -261,7 +274,10 @@ def wrapFn(fn, *args, **kwargs):
:return: The new function that wraps fn.
:rtype: toil.job.FunctionWrappingJob
"""
return FunctionWrappingJob(fn, *args, **kwargs)
if PromisedRequirement.convertPromises(kwargs):
return PromisedRequirementFunctionWrappingJob(fn, *args, **kwargs)
else:
return FunctionWrappingJob(fn, *args, **kwargs)

@staticmethod
def wrapJobFn(fn, *args, **kwargs):
Expand All @@ -275,7 +291,10 @@ def wrapJobFn(fn, *args, **kwargs):
:return: The new job function that wraps fn.
:rtype: toil.job.JobFunctionWrappingJob
"""
return JobFunctionWrappingJob(fn, *args, **kwargs)
if PromisedRequirement.convertPromises(kwargs):
return PromisedRequirementJobFunctionWrappingJob(fn, *args, **kwargs)
else:
return JobFunctionWrappingJob(fn, *args, **kwargs)

def encapsulate(self):
"""
Expand Down Expand Up @@ -2225,12 +2244,12 @@ def _serialiseJob(self, jobStore, jobsToJobWrappers, rootJobWrapper):
# method.
with jobStore.writeFileStream(rootJobWrapper.jobStoreID) as (fileHandle, fileStoreID):
cPickle.dump(self, fileHandle, cPickle.HIGHEST_PROTOCOL)
# Note that getUserScript() may have beeen overridden. This is intended. If we used
# Note that getUserScript() may have been overridden. This is intended. If we used
# self.userModule directly, we'd be getting a reference to job.py if the job was
# specified as a function (as opposed to a class) since that is where FunctionWrappingJob
# is defined. What we really want is the module that was loaded as __main__,
# and FunctionWrappingJob overrides getUserScript() to give us just that. Only then can
# filter_main() in _unpickle( ) do its job of resolveing any user-defined type or function.
# filter_main() in _unpickle( ) do its job of resolving any user-defined type or function.
userScript = self.getUserScript().globalize()
jobsToJobWrappers[self].command = ' '.join( ('_toil', fileStoreID) + userScript)
#Update the status of the jobWrapper on disk
Expand Down Expand Up @@ -2428,13 +2447,15 @@ def _jobName(self):
"""
return self.__class__.__name__


class JobException( Exception ):
"""
General job exception.
"""
def __init__( self, message ):
super( JobException, self ).__init__( message )


class JobGraphDeadlockException( JobException ):
"""
An exception raised in the event that a workflow contains an unresolvable \
Expand Down Expand Up @@ -2524,6 +2545,7 @@ def getUserScript(self):
def _jobName(self):
return ".".join((self.__class__.__name__,self.userFunctionModule.name,self.userFunctionName))


class JobFunctionWrappingJob(FunctionWrappingJob):
"""
A job function is a function whose first argument is a :class:`job.Job` \
Expand All @@ -2541,6 +2563,54 @@ def run(self, fileStore):
rValue = userFunction(*((self,) + tuple(self._args)), **self._kwargs)
return rValue


class PromisedRequirementFunctionWrappingJob(FunctionWrappingJob):
"""
Handles dynamic resource allocation using :class:`toil.job.Promise` instances.
Spawns child function using parent function parameters and fulfilled promised
resource requirements. See :class:`toil.job.FunctionWrappingJob` class
"""
def __init__(self, userFunction, *args, **kwargs):
self.promisedRequirements = {}
requirements = {'disk': '1M', 'memory': '32M', 'cores': 0.1}
# Replace PromisedRequirements in intermediate job with small
# resource requirements.
for name, value in requirements.items():
try:
if isinstance(kwargs[name], PromisedRequirement):
self.promisedRequirements[name] = kwargs[name]
kwargs[name] = value
except KeyError:
pass
super(PromisedRequirementFunctionWrappingJob, self).__init__(userFunction, *args, **kwargs)

def run(self, fileStore):
# Assumes promises are fulfilled when parent job is run
self.evaluatePromisedRequirements()
userFunction = self._getUserFunction()
return self.addChildFn(userFunction, *self._args, **self._kwargs).rv()

def evaluatePromisedRequirements(self):
requirements = ["disk", "memory", "cores"]
for requirement in requirements:
try:
self._kwargs[requirement] = self.promisedRequirements[requirement].getValue()
except KeyError:
self._kwargs[requirement] = getattr(self, requirement)


class PromisedRequirementJobFunctionWrappingJob(PromisedRequirementFunctionWrappingJob):
"""
Handles dynamic resource allocation for job functions.
See :class:`toil.job.JobFunctionWrappingJob`
"""

def run(self, fileStore):
self.evaluatePromisedRequirements()
userFunction = self._getUserFunction()
return self.addChildJobFn(userFunction, *self._args, **self._kwargs).rv()


class EncapsulatedJob(Job):
"""
A convenience Job class used to make a job subgraph appear to be a single job.
Expand Down Expand Up @@ -2744,3 +2814,63 @@ def _resolve(cls, jobStoreString, jobStoreFileID):
# corrupted
value = cPickle.load(fileHandle)
return value


class PromisedRequirement(object):
def __init__(self, valueOrCallable, *args):
"""
Class for dynamically allocating job function resource requirements involving
:class:`toil.job.Promise` instances.

Use when resource requirements depend on the return value of a parent function.
PromisedRequirements can be modified by passing a function that takes the
:class:`Promise` as input.

For example, let f, g, and h be functions. Then a Toil workflow can be
defined as follows::
A = Job.wrapFn(f)
B = A.addChildFn(g, cores=PromisedRequirement(A.rv())
C = B.addChildFn(h, cores=PromisedRequirement(lambda x: 2*x, B.rv()))

:param valueOrCallable: A single Promise instance or a function that
takes \*args as input parameters.
:param int|Promise *args: variable length argument list
"""
if hasattr(valueOrCallable, '__call__'):
assert len(args) != 0, 'Need parameters for PromisedRequirement function.'
func = valueOrCallable
else:
assert len(args) == 0, 'Define a PromisedRequirement function to handle multiple arguments.'
func = lambda x: x
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if I call PromisedRequirement(1,2)? Is that a legal way to call the constructor?

args = [valueOrCallable]

self._func = dill.dumps(func)
self._args = list(args)

def getValue(self):
"""
Returns PromisedRequirement value
"""
func = dill.loads(self._func)
return func(*self._args)

@staticmethod
def convertPromises(kwargs):
"""
Returns True if reserved resource keyword is a Promise or
PromisedRequirement instance. Converts Promise instance
to PromisedRequirement.

:param kwargs: function keyword arguments
:return: bool
"""
requirements = ["disk", "memory", "cores"]
foundPromisedRequirement = False
for r in requirements:
if isinstance(kwargs.get(r), Promise):
kwargs[r] = PromisedRequirement(kwargs[r])
foundPromisedRequirement = True
elif isinstance(kwargs.get(r), PromisedRequirement):
foundPromisedRequirement = True
return foundPromisedRequirement

2 changes: 1 addition & 1 deletion src/toil/leader.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ def issueJob(self, jobStoreID, memory, cores, disk, preemptable):
jobBatchSystemID = self.batchSystem.issueBatchJob(jobCommand, memory, cores, disk, preemptable)
self.jobBatchSystemIDToIssuedJob[jobBatchSystemID] = IssuedJob(jobStoreID, memory, cores, disk, preemptable)
logger.debug("Issued job with job store ID: %s and job batch system ID: "
"%s and cores: %i, disk: %i, and memory: %i",
"%s and cores: %.2f, disk: %.2f, and memory: %.2f",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Commit separately together with similar change above.

jobStoreID, str(jobBatchSystemID), cores, disk, memory)

def issueJobs(self, jobs):
Expand Down
51 changes: 20 additions & 31 deletions src/toil/test/batchSystems/batchSystemTest.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,9 +204,9 @@ class AbstractBatchSystemJobTest(ToilTest):

__metaclass__ = ABCMeta

cpu_count = multiprocessing.cpu_count()
allocated_cores = sorted({1, 2, cpu_count})
sleep_time = 5
cpuCount = multiprocessing.cpu_count()
allocatedCores = sorted({1, 2, cpuCount})
sleepTime = 5

@abstractmethod
def getBatchSystemName(self):
Expand All @@ -226,27 +226,26 @@ def testJobConcurrency(self):
"""
Tests that the batch system is allocating core resources properly for concurrent tasks.
"""
for cores_per_job in self.allocated_cores:
temp_dir = self._createTempDir('testFiles')
for coresPerJob in self.allocatedCores:
tempDir = self._createTempDir('testFiles')

options = Job.Runner.getDefaultOptions(self._getTestJobStorePath())
options.workDir = temp_dir
options.maxCores = self.cpu_count
options.workDir = tempDir
options.maxCores = self.cpuCount
options.batchSystem = self.batchSystemName

counter_path = os.path.join(temp_dir, 'counter')
resetCounters(counter_path)
value, max_value = getCounters(counter_path)
assert (value, max_value) == (0, 0)
counterPath = os.path.join(tempDir, 'counter')
resetCounters(counterPath)
value, maxValue = getCounters(counterPath)
assert (value, maxValue) == (0, 0)

root = Job()
for _ in range(self.cpu_count):
root.addFollowOn(Job.wrapFn(measureConcurrency, counter_path, self.sleep_time,
cores=cores_per_job, memory='1M', disk='1Mi'))
for _ in range(self.cpuCount):
root.addFollowOn(Job.wrapFn(measureConcurrency, counterPath, self.sleepTime,
cores=coresPerJob, memory='1M', disk='1Mi'))
Job.Runner.startToil(root, options)

_, max_value = getCounters(counter_path)
self.assertEqual(max_value, self.cpu_count / cores_per_job)
_, maxValue = getCounters(counterPath)
self.assertEqual(maxValue, self.cpuCount / coresPerJob)


@needs_mesos
Expand Down Expand Up @@ -394,26 +393,16 @@ def test(self):
jobIds.remove(jobId)
finally:
bs.shutdown()
concurrentTasks, maxConcurrentTasks = self.getCounters()
concurrentTasks, maxConcurrentTasks = getCounters(self.counterPath)
self.assertEquals(concurrentTasks, 0)
log.info('maxCores: {maxCores}, '
'coresPerJob: {coresPerJob}, '
'load: {load}'.format(**locals()))
# This is the key assertion:
expectedMaxConcurrentTasks = min(maxCores / coresPerJob, jobs)
self.assertEquals(maxConcurrentTasks, expectedMaxConcurrentTasks)
self.resetCounters()

def getCounters(self):
with open(self.counterPath, 'r+') as f:
s = f.read()
log.info('Counter is %s', s)
concurrentTasks, maxConcurrentTasks = map(int, s.split(','))
return concurrentTasks, maxConcurrentTasks
resetCounters(self.counterPath)

def resetCounters(self):
with open(self.counterPath, 'w') as f:
f.write('0,0')

@skipIf(SingleMachineBatchSystem.numCores < 3, 'Need at least three cores to run this test')
def testServices(self):
Expand All @@ -425,7 +414,7 @@ def testServices(self):
with open(self.counterPath, 'r+') as f:
s = f.read()
log.info('Counter is %s', s)
self.assertEqual(self.getCounters(), (0, 3))
self.assertEqual(getCounters(self.counterPath), (0, 3))


# Toil can use only top-level functions so we have to add them here:
Expand Down Expand Up @@ -607,7 +596,7 @@ class MesosBatchSystemJobTest(hidden.AbstractBatchSystemJobTest, MesosTestSuppor
"""

def getBatchSystemName(self):
self._startMesos(self.cpu_count)
self._startMesos(self.cpuCount)
return "mesos"

def tearDown(self):
Expand Down
Loading