Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions test/test_buffer.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ def setUp(self):
self.config = Config(CONFIG)
self.cluster = Cluster(env=self.env, config=self.config)
self.planner = Planner(
self.env, self.cluster, SHADOWPlanning('heft')
self.env, self.cluster, SHADOWPlanning('heft'),
use_task_data = False, use_edge_data = True
)

def testHotBufferConfig(self):
Expand Down Expand Up @@ -90,7 +91,8 @@ def setUp(self):
self.config = Config(CONFIG)
self.cluster = Cluster(env=self.env, config=self.config)
self.planner = Planner(
self.env, self.cluster, SHADOWPlanning('heft')
self.env, self.cluster, SHADOWPlanning('heft'),
use_task_data = False, use_edge_data = True
)
self.buffer = Buffer(self.env, self.cluster, self.planner, self.config)
self.observation = Observation(
Expand Down Expand Up @@ -212,7 +214,8 @@ def setUp(self):
self.config = Config(CONFIG)
self.cluster = Cluster(env=self.env, config=self.config)
self.planner = Planner(
self.env, self.cluster, SHADOWPlanning('heft')
self.env, self.cluster, SHADOWPlanning('heft'),
use_task_data = False, use_edge_data = True
)
self.buffer = Buffer(
env=self.env, cluster=self.cluster, planner=self.planner, config=self.config
Expand Down
9 changes: 7 additions & 2 deletions test/test_planner.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ def testPlannerBasicConfig(self):
planner = Planner(
env=self.env,
cluster=self.cluster,
model=self.model
model=self.model,
use_task_data=False, use_edge_data=True
)
self.cluster.provision_batch_resources(10, self.observation.name)
available_resources = planner.model._cluster_to_shadow_format(
Expand All @@ -87,6 +88,7 @@ def setUp(self):
self.cluster = Cluster(self.env, config=config)
self.planner = Planner(
self.env, self.cluster, self.model,
use_task_data=False, use_edge_data=True
)
self.buffer = Buffer(env=self.env, cluster=self.cluster, planner=self.planner, config=config)
self.observation = Observation(
Expand Down Expand Up @@ -151,7 +153,9 @@ def setUp(self):
self.model = SHADOWPlanning('heft', dm)
self.cluster = Cluster(self.env, config=config)
self.planner = Planner(self.env, self.cluster,
self.model, delay_model=dm)
self.model, delay_model=dm,
use_task_data=False, use_edge_data=True
)
self.buffer = Buffer(self.env, self.cluster, self.planner, config)
self.observation = Observation(
'planner_observation',
Expand Down Expand Up @@ -206,6 +210,7 @@ def setUp(self) -> None:
self.cluster = Cluster(self.env, config=config)
self.planner = Planner(
self.env, self.cluster, self.model,
use_task_data=False, use_edge_data=True
)
self.buffer = Buffer(env=self.env, cluster=self.cluster, planner=self.planner, config=config)
self.telescope = Telescope(
Expand Down
79 changes: 69 additions & 10 deletions test/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ def setUp(self) -> None:
config = Config(CONFIG)
self.cluster = Cluster(self.env, config)
self.planner = Planner(self.env,
self.cluster, SHADOWPlanning('heft'))
self.cluster, SHADOWPlanning('heft'),
use_task_data=False,use_edge_data=True)
self.buffer = Buffer(self.env, self.cluster, self.planner, config)
self.scheduler = Scheduler(
self.env, self.buffer, self.cluster, self.planner, DynamicSchedulingFromPlan
Expand Down Expand Up @@ -160,7 +161,7 @@ def setUp(self):
config = Config(HEFT_CONFIG)
self.cluster = Cluster(self.env, config)
self.planner = Planner(self.env,
self.cluster, SHADOWPlanning('heft'))
self.cluster, SHADOWPlanning('heft'), use_task_data=False, use_edge_data=True)
self.buffer = Buffer(self.env, self.cluster, self.planner, config)
self.scheduler = Scheduler(self.env, self.buffer,
self.cluster, self.planner, sched_algorithm)
Expand Down Expand Up @@ -217,10 +218,11 @@ def testAllocationTasksNoObservation(self):
self.env.run(1)
self.assertListEqual(
exec_ord,
curr_obs.plan.exec_order
[t.id for t in curr_obs.plan.tasks]
)
self.buffer.hot[0].observations['scheduled'].append(curr_obs)
self.env.run(until=99)
self.assertEqual(97, curr_obs.plan.finished_tasks[-1].aft)
self.assertEqual(10, len(self.cluster._tasks['finished']))
self.assertEqual(0, len(self.cluster._tasks['running']))
self.assertEqual(0, len(self.scheduler.observation_queue))
Expand All @@ -234,7 +236,7 @@ def setUp(self):
config = Config(HEFT_CONFIG_IO)
self.cluster = Cluster(self.env, config)
self.planner = Planner(self.env,
self.cluster, SHADOWPlanning('heft'))
self.cluster, SHADOWPlanning('heft'), use_task_data=True, use_edge_data=True)
self.buffer = Buffer(self.env, self.cluster, self.planner, config)
self.scheduler = Scheduler(self.env, self.buffer,
self.cluster, self.planner, sched_algorithm)
Expand Down Expand Up @@ -268,14 +270,66 @@ def testAllocationWithIODuration(self):
self.env.run(1)
self.assertListEqual(
exec_ord,
curr_obs.plan.exec_order
[t.id for t in curr_obs.plan.tasks]
)
self.buffer.hot[0].observations['scheduled'].append(curr_obs)
self.env.run(until=99)
self.assertEqual(97, curr_obs.plan.finished_tasks[-1].aft)
# TODO need to check the aft of the final task is 98
# Otherwise, we aren't actually checking the tasks are finishing when we expect
self.assertEqual(10, len(self.cluster._tasks['finished']))
self.assertEqual(0, len(self.cluster._tasks['running']))
self.assertEqual(0, len(self.scheduler.observation_queue))

class TestSchedulerDynamicPlanWithIOWithoutIO(unittest.TestCase):
"""
This is a bit of overkill + a hack: here we are testing the `use_task_data`
flag by ensuring that we don't use the task data, which will give us a much lower
runtime value.

This is taking advantage of the HEFT_CONFIG_IO file not having any task data, so
we can confirm our 'switch' works.

"""

def setUp(self):
self.env = simpy.Environment()
sched_algorithm = DynamicSchedulingFromPlan()
config = Config(HEFT_CONFIG_IO)
self.cluster = Cluster(self.env, config)
self.planner = Planner(self.env,
self.cluster, SHADOWPlanning('heft'), use_task_data=False, use_edge_data=True)
self.buffer = Buffer(self.env, self.cluster, self.planner, config)
self.scheduler = Scheduler(self.env, self.buffer,
self.cluster, self.planner, sched_algorithm)
self.telescope = Telescope(
self.env, config, self.planner, self.scheduler
)

def tearDown(self) -> None:
pass

def testAllocationWithIODuration(self):
"""
Use
Returns
-------

"""
curr_obs = self.telescope.observations[0]
gen = self.scheduler.allocate_tasks(curr_obs)
self.scheduler.observation_queue.append(curr_obs)
curr_obs.ast = self.env.now
# curr_obs.plan = self.planner.run(
# curr_obs, self.buffer, self.telescope.max_ingest)
self.env.process(self.scheduler.allocate_tasks(curr_obs))
self.env.run(1)
self.buffer.hot[0].observations['scheduled'].append(curr_obs)
self.env.run(until=99)
self.assertEqual(51, curr_obs.plan.finished_tasks[-1].aft)
self.assertEqual(10, len(self.cluster._tasks['finished']))
self.assertEqual(0, len(self.cluster._tasks['running']))
self.assertEqual(0, len(self.scheduler.observation_queue))


class TestSchedulerEdgeCases(unittest.TestCase):
Expand All @@ -300,7 +354,8 @@ def setUp(self) -> None:
self.env, config, planner=None, scheduler=None
)
self.planner = Planner(self.env,
self.cluster, SHADOWPlanning('heft'))
self.cluster, SHADOWPlanning('heft'),
use_task_data=False, use_edge_data=True)
self.buffer = Buffer(self.env, self.cluster, self.planner, config)

self.scheduler = Scheduler(self.env, self.buffer,
Expand Down Expand Up @@ -338,7 +393,8 @@ def setUp(self):
self.cluster = Cluster(self.env, config)
planning_model = SHADOWPlanning(algorithm='heft')
self.planner = Planner(self.env,
self.cluster, planning_model)
self.cluster, planning_model,
use_task_data=False, use_edge_data=True)
self.buffer = Buffer(self.env, self.cluster, self.planner, config)
self.scheduler = Scheduler(self.env, self.buffer,
self.cluster, self.planner, sched_algorithm)
Expand Down Expand Up @@ -366,7 +422,8 @@ def setUp(self) -> None:
config = Config(LONG_CONFIG)
self.cluster = Cluster(self.env, config)
self.planner = Planner(self.env,
self.cluster, SHADOWPlanning('heft'))
self.cluster, SHADOWPlanning('heft'),
use_task_data=False, use_edge_data=True)
self.buffer = Buffer(self.env, self.cluster, self.planner, config)
self.scheduler = Scheduler(self.env, self.buffer,
self.cluster, self.planner, sched_algorithm)
Expand All @@ -392,7 +449,8 @@ def setUp(self):
config = Config(INTEGRATION)
self.cluster = Cluster(self.env, config)
self.planner = Planner(self.env,
self.cluster, SHADOWPlanning('heft'))
self.cluster, SHADOWPlanning('heft'),
use_task_data=False, use_edge_data=True)
self.buffer = Buffer(self.env, self.cluster, self.planner, config)

self.scheduler = Scheduler(
Expand Down Expand Up @@ -452,7 +510,8 @@ def setUp(self):
self.planner = Planner(
self.env,
self.cluster, SHADOWPlanning('heft'),
delay_model=DelayModel(0.3, "normal")
delay_model=DelayModel(0.3, "normal"),
use_task_data=False, use_edge_data=True
)
self.buffer = Buffer(self.env, self.cluster, self.planner, config)

Expand Down
1 change: 1 addition & 0 deletions test/test_scheduling.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ def setUp(self):
self.model = BatchPlanning('batch')
self.planner = Planner(
self.env, self.cluster, self.model,
use_task_data=False, use_edge_data=True
)
self.buffer = Buffer(self.env, self.cluster, self.planner, config)
self.scheduler = Scheduler(
Expand Down
27 changes: 22 additions & 5 deletions test/test_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,27 @@ def test_task_init(self):
eft=self.eft,
machine_id=self.allocated_machine_id,
predecessors=None,
flops=0, task_data=0, io=0,
flops=0, task_data=0, edge_data=0,
delay=self.dm)


class TestTaskRuntime(unittest.TestCase):
def setUp(self):
self.env = simpy.Environment()

# TODO Implement this when refactoring and improving
# Task predecessor and edge data information
def test_task_runtime(self):
t1 = Task(
tid="t4",
est=20,
eft=26,
machine_id="m1",
predecessors=["t2", "t3"],
flops=10, task_data=4, edge_data={"t2": 4, "t3": 12,},
use_task_data=False, use_edge_data=True)


class TestTaskDelay(unittest.TestCase):

def setUp(self):
Expand All @@ -67,7 +84,7 @@ def testTaskWithOutDelay(self):
eft=11,
machine_id=None,
predecessors=None,
flops=0, task_data=0, io=0,
flops=0, task_data=0, edge_data=0,
delay=dm)
# t.est = 0
# t.eft = 11
Expand All @@ -84,7 +101,7 @@ def testTaskWithDelay(self):
eft=11,
machine_id=None,
predecessors=None,
flops=0, task_data=0, io=0,
flops=0, task_data=0, edge_data=0,
delay=dm)

# t.est = 0
Expand All @@ -102,7 +119,7 @@ def testTaskDoWorkWithOutDelay(self):
eft=11,
machine_id=None,
predecessors=None,
flops=0, task_data=0, io=0,
flops=0, task_data=0, edge_data=0,
delay=dm)
# t.ast = 0
t.duration = t.eft - t.est
Expand All @@ -119,7 +136,7 @@ def testTaskDoWorkWithDelay(self):
eft=11,
machine_id = None,
predecessors=None,
flops=0, task_data=0, io=0,
flops=0, task_data=0, edge_data=0,
delay=dm)
self.env.process(t.do_work(self.env, None))
self.env.run()
Expand Down
8 changes: 6 additions & 2 deletions test/test_telescope.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ def setUp(self):
self.env = simpy.Environment()
self.config = Config(CONFIG)
cluster = Cluster(env=self.env, config=self.config)
planner = Planner(self.env, cluster, SHADOWPlanning('heft'))
planner = Planner(self.env, cluster, SHADOWPlanning('heft'),
use_task_data=False, use_edge_data=True
)
buffer = Buffer(env=self.env, cluster=cluster, planner=planner, config=self.config)
self.scheduler = Scheduler(
env=self.env, buffer=buffer, cluster=cluster, planner=planner, algorithm=None
Expand All @@ -58,7 +60,9 @@ def setUp(self) -> None:

self.cluster = Cluster(env=self.env, config=self.config)
self.planner = Planner(self.env, self.cluster,
SHADOWPlanning('heft'))
SHADOWPlanning('heft'),
use_task_data=False, use_edge_data=True
)
self.buffer = Buffer(env=self.env, cluster=self.cluster, planner=self.planner,
config=self.config)
self.scheduler = Scheduler(
Expand Down
8 changes: 2 additions & 6 deletions topsim/algorithms/planning.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,25 +46,22 @@ def __init__(self, algorithm: str, delay_model=None):
self.algorithm = algorithm
self.delay_model = delay_model



@abstractmethod
def to_string(self):
"""
Return the string name of the implemenation of this class
"""
pass

@abstractmethod
def generate_plan(self, clock, cluser, buffer, observation, max_ingest):
def generate_plan(self, clock, cluser, buffer, observation, max_ingest, task_data=False, edge_data=True):
"""
Build a WorkflowPlan object storing
Returns
-------
plan : core.topsim.planner.WorkflowPlan
WorkflowPlan object
"""
pass


@abstractmethod
def to_df(self):
Expand All @@ -77,7 +74,6 @@ def to_df(self):
df : pandas.DataFrame
"""

pass

def _calc_workflow_est(self, observation, buffer):
"""
Expand Down
1 change: 1 addition & 0 deletions topsim/algorithms/scheduling.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ class Scheduling(ABC):
"""

# The below constants have been derived from the SDP parametric model.
# TODO these should be set up in the simulation configuration
LOW_REALTIME_RESOURCES = 164
MID_REALTIME_RESOURCES = 281

Expand Down
Loading