Skip to content

Commit

Permalink
Changed default resource requirements
Browse files Browse the repository at this point in the history
  • Loading branch information
jpfeil committed May 9, 2016
1 parent b513fb6 commit f0ac356
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 45 deletions.
2 changes: 1 addition & 1 deletion src/toil/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -1545,7 +1545,7 @@ def __init__(self, userFunction, *args, **kwargs):

# Reset any PromisedRequirement instances to default values until the promise can be fulfilled.
# See toil.job.PromisedRequirement
requirements = {'disk': '1Mi', 'memory': '32M', 'cores': 0.1}
requirements = {'disk': '1M', 'memory': '32M', 'cores': 0.1}
for requirement, default in requirements.items():
if requirement in kwargs and isinstance(kwargs[requirement], PromisedRequirement):
self.promised_requirement[requirement] = kwargs[requirement]
Expand Down
90 changes: 46 additions & 44 deletions src/toil/test/src/promisedRequirementTest.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,15 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import absolute_import
from __future__ import absolute_import, print_function
from abc import ABCMeta, abstractmethod
import sys
import os
import fcntl
import time
import logging
import multiprocessing
from toil.common import Config
from toil.batchSystems.singleMachine import SingleMachineBatchSystem
from toil.batchSystems.mesos.test import MesosTestSupport
from toil.job import Job
from toil.job import PromisedRequirement
Expand All @@ -46,6 +46,7 @@ class AbstractPromisedRequirementsTest(ToilTest):
__metaclass__ = ABCMeta

cpu_count = multiprocessing.cpu_count()
cpu_count = 2

allocated_cores = sorted({1, 2, cpu_count})
allocated_cores = [1]
Expand Down Expand Up @@ -85,45 +86,44 @@ def testPromisedRequirementDynamic(self):
min_value, max_value = getCounters(counter_path)
assert (min_value, max_value) == (0, 0)

root = Job.wrapJobFn(max_concurrency, self.cpu_count, counter_path, cores_per_job,
cores=0.1, memory='32M', disk='1M')
value = Job.Runner.startToil(root, options)
self.assertEqual(value, self.cpu_count / cores_per_job)

def testPromisedRequirementStatic(self):
# for cores_per_job in self.allocated_cores:
cores_per_job = 1
temp_dir = self._createTempDir('testFiles')

options = Job.Runner.getDefaultOptions(self._getTestJobStorePath())
options.logLevel = "DEBUG"
options.batchSystem = self.batchSystemName
options.workDir = temp_dir
options.maxCores = self.cpu_count

counter_path = os.path.join(temp_dir, 'counter')
resetCounters(counter_path)
min_value, max_value = getCounters(counter_path)
assert (min_value, max_value) == (0, 0)

root = Job()
one1 = Job.wrapFn(one, cores=0.1, memory='32M', disk='1M')
one2 = Job.wrapFn(one, cores=0.1, memory='32M', disk='1M')
mb = Job.wrapFn(thirtyTwoMB, cores=0.1, memory='32M', disk='1M')
root.addChild(one1)
root.addChild(one2)
root.addChild(mb)
for _ in range(self.cpu_count):
root.addFollowOn(Job.wrapFn(measure_concurrency, counter_path,
cores=PromisedRequirement(lambda x: x * cores_per_job, one1.rv()),
memory=PromisedRequirement(mb.rv()),
disk=PromisedRequirement(lambda x, y: x + y + 1022, one1.rv(), one2.rv())))
root = Job.wrapJobFn(maxConcurrency, self.cpu_count, counter_path, cores_per_job).encapsulate()
root.addFollowOnFn(getMax, root.rv(), self.cpu_count, cores_per_job, cores=cores_per_job, memory='32M', disk='1Mi')
Job.Runner.startToil(root, options)
min_value, max_value = getCounters(counter_path)
self.assertEqual(max_value, self.cpu_count / cores_per_job)

# def testPromisedRequirementStatic(self):
# for cores_per_job in self.allocated_cores:
# cores_per_job = 1
# temp_dir = self._createTempDir('testFiles')
#
# options = Job.Runner.getDefaultOptions(self._getTestJobStorePath())
# options.logLevel = "DEBUG"
# options.batchSystem = self.batchSystemName
# options.workDir = temp_dir
# options.maxCores = self.cpu_count
#
# counter_path = os.path.join(temp_dir, 'counter')
# resetCounters(counter_path)
# min_value, max_value = getCounters(counter_path)
# assert (min_value, max_value) == (0, 0)
#
# root = Job()
# one1 = Job.wrapFn(one, cores=0.1, memory='32M', disk='1M')
# one2 = Job.wrapFn(one, cores=0.1, memory='32M', disk='1M')
# mb = Job.wrapFn(thirtyTwoMB, cores=0.1, memory='32M', disk='1M')
# root.addChild(one1)
# root.addChild(one2)
# root.addChild(mb)
# for _ in range(self.cpu_count):
# root.addFollowOn(Job.wrapFn(measure_concurrency, counter_path,
# cores=PromisedRequirement(lambda x: x * cores_per_job, one1.rv()),
# memory=PromisedRequirement(mb.rv()),
# disk=PromisedRequirement(lambda x, y: x + y + 1022, one1.rv(), one2.rv())))
# Job.Runner.startToil(root, options)
# min_value, max_value = getCounters(counter_path)
# self.assertEqual(max_value, self.cpu_count / cores_per_job)
#

def max_concurrency(job, cpu_count, filename, cores_per_job):
def maxConcurrency(job, cpu_count, filename, cores_per_job):
"""
Returns the max number of concurrent tasks when using a PromisedRequirement instance
to allocate the number of cores per job.
Expand All @@ -134,27 +134,29 @@ def max_concurrency(job, cpu_count, filename, cores_per_job):
:return int max concurrency value:
"""
one1 = job.addChildFn(one, cores=0.1, memory='32M', disk='1M')
one2 = job.addChildFn(one, cores=0.1, memory='32M', disk='1M')
mb = job.addChildFn(thirtyTwoMB, cores=0.1, memory='32M', disk='1M')

values = []
for _ in range(cpu_count):
value = job.addFollowOnFn(measure_concurrency, filename,
value = job.addFollowOnFn(measureConcurrency, filename,
cores=PromisedRequirement(lambda x: x * cores_per_job, one1.rv()),
memory=PromisedRequirement(mb.rv()),
disk=PromisedRequirement(lambda x, y: x + y + 1022, one1.rv(), one2.rv())).rv()
disk=1).rv()
values.append(value)
return max(values)

return values

def one():
return 1

def thirtyTwoMB():
return '32M'

def getMax(values, cpu_count, cores_per_job):
print('\nABCD\n {} {} {} \n\n'.format(values, cpu_count, cores_per_job), file=sys.stderr)
assert max(values) == cpu_count / cores_per_job

# TODO camel case
def measure_concurrency(filepath):
def measureConcurrency(filepath):
"""
Run in parallel to test the number of concurrent tasks.
This code was copied from toil.batchSystemTestMaxCoresSingleMachineBatchSystemTest
Expand Down
6 changes: 6 additions & 0 deletions src/toil/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,12 @@
# limitations under the License.

from __future__ import absolute_import

# import sys as _sys
# _sys.path.append('/home/jacob/pycharm-2016.1/debug-eggs/pycharm-debug.egg')
# import pydevd
# pydevd.settrace('127.0.0.1', port=21212, suspend=True, stdoutToServer=True, stderrToServer=True, trace_only_current_thread=False)

import os
import sys
import copy
Expand Down

0 comments on commit f0ac356

Please sign in to comment.