Skip to content

Commit

Permalink
Add promised requirement test
Browse files Browse the repository at this point in the history
  • Loading branch information
jpfeil committed May 2, 2016
1 parent 9ea6e29 commit e9de509
Show file tree
Hide file tree
Showing 3 changed files with 388 additions and 13 deletions.
3 changes: 2 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@
author_email='[email protected]',
url="https://github.com/BD2KGenomics/toil",
install_requires=[
'bd2k-python-lib==1.13.dev14'],
'bd2k-python-lib==1.13.dev14',
'dill==0.2.5'],
tests_require=[
'mock==1.0.1',
'pytest==2.8.3'],
Expand Down
165 changes: 153 additions & 12 deletions src/toil/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import stat
import inspect
from collections import defaultdict
import dill
from threading import Thread, Semaphore, Event
from Queue import Queue, Empty
from bd2k.util.expando import Expando
Expand Down Expand Up @@ -191,23 +192,33 @@ def addChildFn(self, fn, *args, **kwargs):
:param fn: Function to be run as a child job with ``*args`` and ``**kwargs`` as \
arguments to this function. See toil.job.FunctionWrappingJob for reserved \
keyword arguments used to specify resource requirements.
keyword arguments used to specify resource requirements. Can also accept a
PromisedRequirement instance for the reserved resource requirements.
:return: The new child job that wraps fn.
:rtype: toil.job.FunctionWrappingJob
"""
return self.addChild(FunctionWrappingJob(fn, *args, **kwargs))
convertPromiseToPromisedRequirement(kwargs)
if hasPromisedRequirement(kwargs):
return self.addChild(PromisedRequirementFunctionWrappingJob(fn, *args, **kwargs))
else:
return self.addChild(FunctionWrappingJob(fn, *args, **kwargs))

def addFollowOnFn(self, fn, *args, **kwargs):
"""
Adds a function as a follow-on job.
:param fn: Function to be run as a follow-on job with ``*args`` and ``**kwargs`` as \
arguments to this function. See toil.job.FunctionWrappingJob for reserved \
keyword arguments used to specify resource requirements.
keyword arguments used to specify resource requirements. Can also accept a
PromisedRequirement instance for the reserved resource requirements.
:return: The new follow-on job that wraps fn.
:rtype: toil.job.FunctionWrappingJob
"""
return self.addFollowOn(FunctionWrappingJob(fn, *args, **kwargs))
convertPromiseToPromisedRequirement(kwargs)
if hasPromisedRequirement(kwargs):
return self.addFollowOn(PromisedRequirementFunctionWrappingJob(fn, *args, **kwargs))
else:
return self.addFollowOn(FunctionWrappingJob(fn, *args, **kwargs))

def addChildJobFn(self, fn, *args, **kwargs):
"""
Expand All @@ -216,11 +227,16 @@ def addChildJobFn(self, fn, *args, **kwargs):
:param fn: Job function to be run as a child job with ``*args`` and ``**kwargs`` as \
arguments to this function. See toil.job.JobFunctionWrappingJob for reserved \
keyword arguments used to specify resource requirements.
keyword arguments used to specify resource requirements. Can also accept a
PromisedRequirement instance for the reserved resource requirements.
:return: The new child job that wraps fn.
:rtype: toil.job.JobFunctionWrappingJob
"""
return self.addChild(JobFunctionWrappingJob(fn, *args, **kwargs))
convertPromiseToPromisedRequirement(kwargs)
if hasPromisedRequirement(kwargs):
return self.addChild(PromisedRequirementJobFunctionWrappingJob(fn, *args, **kwargs))
else:
return self.addChild(JobFunctionWrappingJob(fn, *args, **kwargs))

def addFollowOnJobFn(self, fn, *args, **kwargs):
"""
Expand All @@ -229,11 +245,16 @@ def addFollowOnJobFn(self, fn, *args, **kwargs):
:param fn: Job function to be run as a follow-on job with ``*args`` and ``**kwargs`` as \
arguments to this function. See toil.job.JobFunctionWrappingJob for reserved \
keyword arguments used to specify resource requirements.
keyword arguments used to specify resource requirements. Can also accept a
PromisedRequirement instance for the reserved resource requirements.
:return: The new follow-on job that wraps fn.
:rtype: toil.job.JobFunctionWrappingJob
"""
return self.addFollowOn(JobFunctionWrappingJob(fn, *args, **kwargs))
convertPromiseToPromisedRequirement(kwargs)
if hasPromisedRequirement(kwargs):
return self.addFollowOn(PromisedRequirementJobFunctionWrappingJob(fn, *args, **kwargs))
else:
return self.addFollowOn(JobFunctionWrappingJob(fn, *args, **kwargs))

@staticmethod
def wrapFn(fn, *args, **kwargs):
Expand All @@ -243,11 +264,16 @@ def wrapFn(fn, *args, **kwargs):
:param fn: Function to be run with ``*args`` and ``**kwargs`` as arguments. \
See toil.job.JobFunctionWrappingJob for reserved keyword arguments used \
to specify resource requirements.
to specify resource requirements. Can also accept a PromisedRequirement
instance for the reserved resource requirements.
:return: The new function that wraps fn.
:rtype: toil.job.FunctionWrappingJob
"""
return FunctionWrappingJob(fn, *args, **kwargs)
convertPromiseToPromisedRequirement(kwargs)
if hasPromisedRequirement(kwargs):
return PromisedRequirementFunctionWrappingJob(fn, *args, **kwargs)
else:
return FunctionWrappingJob(fn, *args, **kwargs)

@staticmethod
def wrapJobFn(fn, *args, **kwargs):
Expand All @@ -257,11 +283,16 @@ def wrapJobFn(fn, *args, **kwargs):
:param fn: Job function to be run with ``*args`` and ``**kwargs`` as arguments. \
See toil.job.JobFunctionWrappingJob for reserved keyword arguments used \
to specify resource requirements.
to specify resource requirements. Can also accept a
PromisedRequirement instance for the reserved resource requirements.
:return: The new job function that wraps fn.
:rtype: toil.job.JobFunctionWrappingJob
"""
return JobFunctionWrappingJob(fn, *args, **kwargs)
convertPromiseToPromisedRequirement(kwargs)
if hasPromisedRequirement(kwargs):
return PromisedRequirementJobFunctionWrappingJob(fn, *args, **kwargs)
else:
return JobFunctionWrappingJob(fn, *args, **kwargs)

def encapsulate(self):
"""
Expand Down Expand Up @@ -1410,13 +1441,15 @@ def _jobName(self):
"""
return self.__class__.__name__


class JobException( Exception ):
"""
General job exception.
"""
def __init__( self, message ):
super( JobException, self ).__init__( message )


class JobGraphDeadlockException( JobException ):
"""
An exception raised in the event that a workflow contains an unresolvable \
Expand All @@ -1425,6 +1458,7 @@ class JobGraphDeadlockException( JobException ):
def __init__( self, string ):
super( JobGraphDeadlockException, self ).__init__( string )


class FunctionWrappingJob(Job):
"""
Job used to wrap a function. In its run method the wrapped function is called.
Expand Down Expand Up @@ -1473,6 +1507,7 @@ def getUserScript(self):
def _jobName(self):
return ".".join((self.__class__.__name__,self.userFunctionModule.name,self.userFunctionName))


class JobFunctionWrappingJob(FunctionWrappingJob):
"""
A job function is a function whose first argument is a :class:`job.Job` \
Expand All @@ -1490,6 +1525,56 @@ def run(self, fileStore):
rValue = userFunction(*((self,) + tuple(self._args)), **self._kwargs)
return rValue


class PromisedRequirementFunctionWrappingJob(FunctionWrappingJob):
"""
Adds child function using parent function parameters and fulfilled Promise values.
(see :class:`toil.job.FunctionWrappingJob` and :class:`toil.job.Job`)
"""

def __init__(self, userFunction, *args, **kwargs):

self.promised_requirement = {}

# Reset any PromisedRequirement instances to default values until the promise can be fulfilled.
# See toil.job.PromisedRequirement
requirements = {'disk': '32M', 'memory': '32M', 'cores': 0.1}
for requirement, default in requirements.items():
if requirement in kwargs and isinstance(kwargs[requirement], PromisedRequirement):
self.promised_requirement[requirement] = kwargs[requirement]
kwargs[requirement] = default

super(PromisedRequirementFunctionWrappingJob, self).__init__(userFunction, *args, **kwargs)

def run(self, fileStore):
# Assumes promises are fulfilled when parent job is run
self.replacePromisedRequirements()
userFunction = self._getUserFunction()
return self.addChildFn(userFunction, *self._args, **self._kwargs).rv()

def replacePromisedRequirements(self):
requirements = ["disk", "memory", "cores"]
for requirement in requirements:
try:
self._kwargs[requirement] = self.promised_requirement[requirement].get_value()
except KeyError:
self._kwargs[requirement] = getattr(self, requirement)


class PromisedRequirementJobFunctionWrappingJob(PromisedRequirementFunctionWrappingJob):
"""
Add child job function using parent function parameters and fulfilled Promise values.
(see :class:`toil.job.JobFunctionWrappingJob and :class:`toil.job.Job`)
"""

def run(self, fileStore):
self.replacePromisedRequirements()
userFunction = self._getUserFunction()
return self.addChildJobFn(userFunction, *self._args, **self._kwargs).rv()


class EncapsulatedJob(Job):
"""
A convenience Job class used to make a job subgraph appear to be a single job.
Expand Down Expand Up @@ -1694,3 +1779,59 @@ def _resolve(cls, jobStoreString, jobStoreFileID):
# corrupted
value = cPickle.load(fileHandle)
return value


class PromisedRequirement(object):
def __init__(self, *args):
"""
Class for manipulating Promise instances.
(see toil.job.Promise)
:param args[0]: function to act on instance args
default: lambda x: x
:param int|Promise args[1:]: list of args, ordered with respect to function parameters
"""
if hasattr(args[0], '__call__'):
func = args[0]
args = args[1:]
else:
func = lambda x: x

self._func = dill.dumps(func)
self._args = list(args)

def get_value(self):
"""
Returns PromisedRequirement value
"""
func = dill.loads(self._func)
return func(*self._args)


def convertPromiseToPromisedRequirement(kwargs):
"""
Converts Promise to PromisedRequirement instance
for reserved resource arguments.
:param dict kwargs: function keyword arguments
:return: kwargs object with updated values
"""
requirements = ["disk", "memory", "cores"]
# If a PromisedJobReturnValue was passed as a job requirement,
# convert it into a PromisedRequirement object
for r in requirements:
if isinstance(kwargs.get(r), Promise):
kwargs[r] = PromisedRequirement(kwargs[r])


def hasPromisedRequirement(kwargs):
"""
Returns true if a reserved resource keyword is a
PromisedRequirement instance.
:param kwargs: function keyword arguments
:return: bool
"""
requirements = ["disk", "memory", "cores"]
return any(isinstance(kwargs.get(r), PromisedRequirement) for r in requirements)

Loading

0 comments on commit e9de509

Please sign in to comment.