diff --git a/test/test_buffer.py b/test/test_buffer.py index 0eca11b..e200208 100644 --- a/test/test_buffer.py +++ b/test/test_buffer.py @@ -46,7 +46,8 @@ def setUp(self): self.config = Config(CONFIG) self.cluster = Cluster(env=self.env, config=self.config) self.planner = Planner( - self.env, self.cluster, SHADOWPlanning('heft') + self.env, self.cluster, SHADOWPlanning('heft'), + use_task_data = False, use_edge_data = True ) def testHotBufferConfig(self): @@ -90,7 +91,8 @@ def setUp(self): self.config = Config(CONFIG) self.cluster = Cluster(env=self.env, config=self.config) self.planner = Planner( - self.env, self.cluster, SHADOWPlanning('heft') + self.env, self.cluster, SHADOWPlanning('heft'), + use_task_data = False, use_edge_data = True ) self.buffer = Buffer(self.env, self.cluster, self.planner, self.config) self.observation = Observation( @@ -212,7 +214,8 @@ def setUp(self): self.config = Config(CONFIG) self.cluster = Cluster(env=self.env, config=self.config) self.planner = Planner( - self.env, self.cluster, SHADOWPlanning('heft') + self.env, self.cluster, SHADOWPlanning('heft'), + use_task_data = False, use_edge_data = True ) self.buffer = Buffer( env=self.env, cluster=self.cluster, planner=self.planner, config=self.config diff --git a/test/test_planner.py b/test/test_planner.py index c5b50bc..9640ab9 100644 --- a/test/test_planner.py +++ b/test/test_planner.py @@ -65,7 +65,8 @@ def testPlannerBasicConfig(self): planner = Planner( env=self.env, cluster=self.cluster, - model=self.model + model=self.model, + use_task_data=False, use_edge_data=True ) self.cluster.provision_batch_resources(10, self.observation.name) available_resources = planner.model._cluster_to_shadow_format( @@ -87,6 +88,7 @@ def setUp(self): self.cluster = Cluster(self.env, config=config) self.planner = Planner( self.env, self.cluster, self.model, + use_task_data=False, use_edge_data=True ) self.buffer = Buffer(env=self.env, cluster=self.cluster, planner=self.planner, config=config) self.observation = Observation( @@ -151,7 +153,9 @@ def setUp(self): self.model = SHADOWPlanning('heft', dm) self.cluster = Cluster(self.env, config=config) self.planner = Planner(self.env, self.cluster, - self.model, delay_model=dm) + self.model, delay_model=dm, + use_task_data=False, use_edge_data=True + ) self.buffer = Buffer(self.env, self.cluster, self.planner, config) self.observation = Observation( 'planner_observation', @@ -206,6 +210,7 @@ def setUp(self) -> None: self.cluster = Cluster(self.env, config=config) self.planner = Planner( self.env, self.cluster, self.model, + use_task_data=False, use_edge_data=True ) self.buffer = Buffer(env=self.env, cluster=self.cluster, planner=self.planner, config=config) self.telescope = Telescope( diff --git a/test/test_scheduler.py b/test/test_scheduler.py index b4b390e..ab6df3b 100644 --- a/test/test_scheduler.py +++ b/test/test_scheduler.py @@ -72,7 +72,8 @@ def setUp(self) -> None: config = Config(CONFIG) self.cluster = Cluster(self.env, config) self.planner = Planner(self.env, - self.cluster, SHADOWPlanning('heft')) + self.cluster, SHADOWPlanning('heft'), + use_task_data=False,use_edge_data=True) self.buffer = Buffer(self.env, self.cluster, self.planner, config) self.scheduler = Scheduler( self.env, self.buffer, self.cluster, self.planner, DynamicSchedulingFromPlan @@ -160,7 +161,7 @@ def setUp(self): config = Config(HEFT_CONFIG) self.cluster = Cluster(self.env, config) self.planner = Planner(self.env, - self.cluster, SHADOWPlanning('heft')) + self.cluster, SHADOWPlanning('heft'), use_task_data=False, use_edge_data=True) self.buffer = Buffer(self.env, self.cluster, self.planner, config) self.scheduler = Scheduler(self.env, self.buffer, self.cluster, self.planner, sched_algorithm) @@ -217,10 +218,11 @@ def testAllocationTasksNoObservation(self): self.env.run(1) self.assertListEqual( exec_ord, - curr_obs.plan.exec_order + [t.id for t in curr_obs.plan.tasks] ) self.buffer.hot[0].observations['scheduled'].append(curr_obs) self.env.run(until=99) + self.assertEqual(97, curr_obs.plan.finished_tasks[-1].aft) self.assertEqual(10, len(self.cluster._tasks['finished'])) self.assertEqual(0, len(self.cluster._tasks['running'])) self.assertEqual(0, len(self.scheduler.observation_queue)) @@ -234,7 +236,7 @@ def setUp(self): config = Config(HEFT_CONFIG_IO) self.cluster = Cluster(self.env, config) self.planner = Planner(self.env, - self.cluster, SHADOWPlanning('heft')) + self.cluster, SHADOWPlanning('heft'), use_task_data=True, use_edge_data=True) self.buffer = Buffer(self.env, self.cluster, self.planner, config) self.scheduler = Scheduler(self.env, self.buffer, self.cluster, self.planner, sched_algorithm) @@ -268,14 +270,66 @@ def testAllocationWithIODuration(self): self.env.run(1) self.assertListEqual( exec_ord, - curr_obs.plan.exec_order + [t.id for t in curr_obs.plan.tasks] ) self.buffer.hot[0].observations['scheduled'].append(curr_obs) self.env.run(until=99) + self.assertEqual(97, curr_obs.plan.finished_tasks[-1].aft) + # TODO need to check the aft of the final task is 98 + # Otherwise, we aren't actually checking the tasks are finishing when we expect self.assertEqual(10, len(self.cluster._tasks['finished'])) self.assertEqual(0, len(self.cluster._tasks['running'])) self.assertEqual(0, len(self.scheduler.observation_queue)) +class TestSchedulerDynamicPlanWithIOWithoutIO(unittest.TestCase): + """ + This is a bit of overkill + a hack: here we are testing the `use_task_data` + flag by ensuring that we don't use the task data, which will give us a much lower + runtime value. + + This is taking advantage of the HEFT_CONFIG_IO file not having any task data, so + we can confirm our 'switch' works. + + """ + + def setUp(self): + self.env = simpy.Environment() + sched_algorithm = DynamicSchedulingFromPlan() + config = Config(HEFT_CONFIG_IO) + self.cluster = Cluster(self.env, config) + self.planner = Planner(self.env, + self.cluster, SHADOWPlanning('heft'), use_task_data=False, use_edge_data=True) + self.buffer = Buffer(self.env, self.cluster, self.planner, config) + self.scheduler = Scheduler(self.env, self.buffer, + self.cluster, self.planner, sched_algorithm) + self.telescope = Telescope( + self.env, config, self.planner, self.scheduler + ) + + def tearDown(self) -> None: + pass + + def testAllocationWithIODuration(self): + """ + Use + Returns + ------- + + """ + curr_obs = self.telescope.observations[0] + gen = self.scheduler.allocate_tasks(curr_obs) + self.scheduler.observation_queue.append(curr_obs) + curr_obs.ast = self.env.now + # curr_obs.plan = self.planner.run( + # curr_obs, self.buffer, self.telescope.max_ingest) + self.env.process(self.scheduler.allocate_tasks(curr_obs)) + self.env.run(1) + self.buffer.hot[0].observations['scheduled'].append(curr_obs) + self.env.run(until=99) + self.assertEqual(51, curr_obs.plan.finished_tasks[-1].aft) + self.assertEqual(10, len(self.cluster._tasks['finished'])) + self.assertEqual(0, len(self.cluster._tasks['running'])) + self.assertEqual(0, len(self.scheduler.observation_queue)) class TestSchedulerEdgeCases(unittest.TestCase): @@ -300,7 +354,8 @@ def setUp(self) -> None: self.env, config, planner=None, scheduler=None ) self.planner = Planner(self.env, - self.cluster, SHADOWPlanning('heft')) + self.cluster, SHADOWPlanning('heft'), + use_task_data=False, use_edge_data=True) self.buffer = Buffer(self.env, self.cluster, self.planner, config) self.scheduler = Scheduler(self.env, self.buffer, @@ -338,7 +393,8 @@ def setUp(self): self.cluster = Cluster(self.env, config) planning_model = SHADOWPlanning(algorithm='heft') self.planner = Planner(self.env, - self.cluster, planning_model) + self.cluster, planning_model, + use_task_data=False, use_edge_data=True) self.buffer = Buffer(self.env, self.cluster, self.planner, config) self.scheduler = Scheduler(self.env, self.buffer, self.cluster, self.planner, sched_algorithm) @@ -366,7 +422,8 @@ def setUp(self) -> None: config = Config(LONG_CONFIG) self.cluster = Cluster(self.env, config) self.planner = Planner(self.env, - self.cluster, SHADOWPlanning('heft')) + self.cluster, SHADOWPlanning('heft'), + use_task_data=False, use_edge_data=True) self.buffer = Buffer(self.env, self.cluster, self.planner, config) self.scheduler = Scheduler(self.env, self.buffer, self.cluster, self.planner, sched_algorithm) @@ -392,7 +449,8 @@ def setUp(self): config = Config(INTEGRATION) self.cluster = Cluster(self.env, config) self.planner = Planner(self.env, - self.cluster, SHADOWPlanning('heft')) + self.cluster, SHADOWPlanning('heft'), + use_task_data=False, use_edge_data=True) self.buffer = Buffer(self.env, self.cluster, self.planner, config) self.scheduler = Scheduler( @@ -452,7 +510,8 @@ def setUp(self): self.planner = Planner( self.env, self.cluster, SHADOWPlanning('heft'), - delay_model=DelayModel(0.3, "normal") + delay_model=DelayModel(0.3, "normal"), + use_task_data=False, use_edge_data=True ) self.buffer = Buffer(self.env, self.cluster, self.planner, config) diff --git a/test/test_scheduling.py b/test/test_scheduling.py index c155efc..8267fa1 100644 --- a/test/test_scheduling.py +++ b/test/test_scheduling.py @@ -61,6 +61,7 @@ def setUp(self): self.model = BatchPlanning('batch') self.planner = Planner( self.env, self.cluster, self.model, + use_task_data=False, use_edge_data=True ) self.buffer = Buffer(self.env, self.cluster, self.planner, config) self.scheduler = Scheduler( diff --git a/test/test_task.py b/test/test_task.py index 33137c4..5048d84 100644 --- a/test/test_task.py +++ b/test/test_task.py @@ -39,10 +39,27 @@ def test_task_init(self): eft=self.eft, machine_id=self.allocated_machine_id, predecessors=None, - flops=0, task_data=0, io=0, + flops=0, task_data=0, edge_data=0, delay=self.dm) +class TestTaskRuntime(unittest.TestCase): + def setUp(self): + self.env = simpy.Environment() + + # TODO Implement this when refactoring and improving + # Task predecessor and edge data information + def test_task_runtime(self): + t1 = Task( + tid="t4", + est=20, + eft=26, + machine_id="m1", + predecessors=["t2", "t3"], + flops=10, task_data=4, edge_data={"t2": 4, "t3": 12,}, + use_task_data=False, use_edge_data=True) + + class TestTaskDelay(unittest.TestCase): def setUp(self): @@ -67,7 +84,7 @@ def testTaskWithOutDelay(self): eft=11, machine_id=None, predecessors=None, - flops=0, task_data=0, io=0, + flops=0, task_data=0, edge_data=0, delay=dm) # t.est = 0 # t.eft = 11 @@ -84,7 +101,7 @@ def testTaskWithDelay(self): eft=11, machine_id=None, predecessors=None, - flops=0, task_data=0, io=0, + flops=0, task_data=0, edge_data=0, delay=dm) # t.est = 0 @@ -102,7 +119,7 @@ def testTaskDoWorkWithOutDelay(self): eft=11, machine_id=None, predecessors=None, - flops=0, task_data=0, io=0, + flops=0, task_data=0, edge_data=0, delay=dm) # t.ast = 0 t.duration = t.eft - t.est @@ -119,7 +136,7 @@ def testTaskDoWorkWithDelay(self): eft=11, machine_id = None, predecessors=None, - flops=0, task_data=0, io=0, + flops=0, task_data=0, edge_data=0, delay=dm) self.env.process(t.do_work(self.env, None)) self.env.run() diff --git a/test/test_telescope.py b/test/test_telescope.py index 208fb6c..1294fb8 100644 --- a/test/test_telescope.py +++ b/test/test_telescope.py @@ -33,7 +33,9 @@ def setUp(self): self.env = simpy.Environment() self.config = Config(CONFIG) cluster = Cluster(env=self.env, config=self.config) - planner = Planner(self.env, cluster, SHADOWPlanning('heft')) + planner = Planner(self.env, cluster, SHADOWPlanning('heft'), + use_task_data=False, use_edge_data=True + ) buffer = Buffer(env=self.env, cluster=cluster, planner=planner, config=self.config) self.scheduler = Scheduler( env=self.env, buffer=buffer, cluster=cluster, planner=planner, algorithm=None @@ -58,7 +60,9 @@ def setUp(self) -> None: self.cluster = Cluster(env=self.env, config=self.config) self.planner = Planner(self.env, self.cluster, - SHADOWPlanning('heft')) + SHADOWPlanning('heft'), + use_task_data=False, use_edge_data=True + ) self.buffer = Buffer(env=self.env, cluster=self.cluster, planner=self.planner, config=self.config) self.scheduler = Scheduler( diff --git a/topsim/algorithms/planning.py b/topsim/algorithms/planning.py index 1e571da..f8d0004 100644 --- a/topsim/algorithms/planning.py +++ b/topsim/algorithms/planning.py @@ -46,17 +46,14 @@ def __init__(self, algorithm: str, delay_model=None): self.algorithm = algorithm self.delay_model = delay_model - - @abstractmethod def to_string(self): """ Return the string name of the implemenation of this class """ - pass @abstractmethod - def generate_plan(self, clock, cluser, buffer, observation, max_ingest): + def generate_plan(self, clock, cluser, buffer, observation, max_ingest, task_data=False, edge_data=True): """ Build a WorkflowPlan object storing Returns @@ -64,7 +61,7 @@ def generate_plan(self, clock, cluser, buffer, observation, max_ingest): plan : core.topsim.planner.WorkflowPlan WorkflowPlan object """ - pass + @abstractmethod def to_df(self): @@ -77,7 +74,6 @@ def to_df(self): df : pandas.DataFrame """ - pass def _calc_workflow_est(self, observation, buffer): """ diff --git a/topsim/algorithms/scheduling.py b/topsim/algorithms/scheduling.py index e7ce76c..cef0e00 100644 --- a/topsim/algorithms/scheduling.py +++ b/topsim/algorithms/scheduling.py @@ -35,6 +35,7 @@ class Scheduling(ABC): """ # The below constants have been derived from the SDP parametric model. + # TODO these should be set up in the simulation configuration LOW_REALTIME_RESOURCES = 164 MID_REALTIME_RESOURCES = 281 diff --git a/topsim/core/cluster.py b/topsim/core/cluster.py index 44f1347..651f09e 100644 --- a/topsim/core/cluster.py +++ b/topsim/core/cluster.py @@ -67,7 +67,6 @@ def __init__(self, env, config): self.system_bandwidth = system_bandwidth #: System bandwidth across the cluster - # TODO dmachine is a hack, we should improve this self.machine_ids = {machine.id: machine for machine in self.machines} self.cl = ['default'] @@ -82,11 +81,11 @@ def __init__(self, env, config): 'completed': 0, 'demand': 0} # Dictionary of current ingest information + # create output data frame self._usage_data = {'occupied': 0, 'ingest': 0, 'available': len(self._resources['available']), 'running_tasks': 0, - 'finished_tasks': 0} # Data to more easily - # create output data frame + 'finished_tasks': 0} self.num_provisioned_obs = 0 self.events = [] @@ -265,10 +264,10 @@ def allocate_task_to_cluster(self, task, machine, # THIS CHECK DOESN"T WORK FIX IT SOMEHOW if (machine not in self._clusters[c]['resources'][ 'available'] and (machine not in - self._clusters[c]['resources'][ - 'ingest'] and machine not in - self.get_idle_resources( - observation))): + self._clusters[c]['resources'][ + 'ingest'] and machine not in + self.get_idle_resources( + observation))): raise RuntimeError if ingest: # Ingest resources allocated separately from scheduler @@ -421,7 +420,6 @@ def get_machine_from_id(self, id, c='default'): """ return self.machine_ids[id] - def _update_available_resources(self, observation, c='default'): """ De-allacote resources to a given observation (batch-reservation) and @@ -632,7 +630,6 @@ def _remove_available_resource(self, machine, c='default'): """ self._clusters[c]['resources']['available'].remove(machine) - def _add_event(self, observation, resource, event): self.events.append( { diff --git a/topsim/core/config.py b/topsim/core/config.py index e42929e..06e1dbc 100644 --- a/topsim/core/config.py +++ b/topsim/core/config.py @@ -32,7 +32,7 @@ class Config: Parameters ---------- - config : str + config : path File path to the JSON config file Attributes @@ -123,7 +123,8 @@ def parse_cluster_config(self): memory=1, # * timestep_multiplier, disk=1, # * timestep_multiplier, bandwidth=(machines_types[name]["compute_bandwidth"] - * timestep_multiplier) + * timestep_multiplier), + ethernet=self.cluster['system']["system_bandwidth"] * timestep_multiplier ) ) bandwidth = self.cluster['system']["system_bandwidth"] * timestep_multiplier diff --git a/topsim/core/machine.py b/topsim/core/machine.py index 0d5ce4c..9ea5640 100644 --- a/topsim/core/machine.py +++ b/topsim/core/machine.py @@ -14,12 +14,13 @@ class Status(Enum): class Machine(object): - def __init__(self, id, cpu, memory, disk, bandwidth): + def __init__(self, id, cpu, memory, disk, bandwidth, ethernet): self.id = id self.cpu = cpu self.memory = memory self.disk = disk self.bandwidth = bandwidth + self.ethernet = ethernet self.status = Status.IDLE self.transfer_flag = False self.current_task = None @@ -37,7 +38,7 @@ def run(self, task, env, predecessor_allocations): def run_task(self, task_instance): self.cpu -= task_instance.flops self.memory -= task_instance.task_data - self.disk -= task_instance.io + self.disk -= task_instance.edge_data self.current_task = task_instance # self.task_instances.append(task_instance) self.status = Status.IN_USE @@ -45,7 +46,7 @@ def run_task(self, task_instance): def stop_task(self, task_instance): self.cpu += task_instance.flops self.memory += task_instance.task_data - self.disk += task_instance.io + self.disk += task_instance.edge_data self.status = Status.IDLE self.current_task = None diff --git a/topsim/core/planner.py b/topsim/core/planner.py index eafb484..bfc500b 100644 --- a/topsim/core/planner.py +++ b/topsim/core/planner.py @@ -46,18 +46,21 @@ class Planner: """ - def __init__(self, env, cluster, model, delay_model=None): + def __init__(self, env, cluster, model, use_task_data, use_edge_data, delay_model=None): self.env = env #: :py:object:~`simpy.Environment` object for the # simulation self.cluster = cluster # self.envconfig = envconfig self.model = model # algorithm, cluster, delay_model) self.delay_model = delay_model + self.use_task_data = use_task_data + self.use_edge_data = use_edge_data def run(self, observation, buffer, max_ingest): """ Parameters ---------- + max_ingest observation : The observation for which we are generating a plan (by forming a schedule using the predefined static scheduling algorithm). @@ -68,7 +71,7 @@ def run(self, observation, buffer, max_ingest): core.topsim.planner.WorkflowPlan """ return self.model.generate_plan(self.env.now, self.cluster, buffer, - observation, max_ingest) + observation, max_ingest, self.use_task_data, self.use_edge_data) # yield self.env.timeout(0,plan) @@ -92,9 +95,9 @@ def __init__(self, id, est, eft, tasks, exec_order, status, max_ingest, self.est = est self.eft = eft self.tasks = tasks + self.finished_tasks = [] self.exec_order = exec_order self.status = status - self.max_ingest = max_ingest self.graph = graph self.min_resources = None self.max_resources = None diff --git a/topsim/core/scheduler.py b/topsim/core/scheduler.py index e4d7c53..113c036 100644 --- a/topsim/core/scheduler.py +++ b/topsim/core/scheduler.py @@ -277,7 +277,7 @@ def allocate_tasks(self, observation): pbar = None while True: if current_plan: - current_plan.tasks = self._update_current_plan(current_plan) + current_plan.tasks, current_plan.finished_tasks = self._update_current_plan(current_plan) (current_plan, schedule, task_pool, @@ -447,6 +447,7 @@ class attributes. """ remaining_tasks = [] + finished_tasks = current_plan.finished_tasks # if not current_plan: # return remaining_tasks for t in current_plan.tasks: @@ -456,7 +457,8 @@ class attributes. if t.delay_flag: self.schedule_status = ScheduleStatus.DELAYED self.delay_offset += t.delay_offset - return remaining_tasks + finished_tasks.append(t) + return (remaining_tasks, finished_tasks) def _find_pred_allocations(self, task, machine, allocations): """ @@ -481,7 +483,7 @@ def _find_pred_allocations(self, task, machine, allocations): pred_task, pred_machine = allocations[pred] if pred_machine != machine: alt = True - pred_allocations.append(pred_task) + pred_allocations.append(pred_task) # Consider instead modifying the task.predecessor dictionary return pred_allocations def to_df(self): diff --git a/topsim/core/simulation.py b/topsim/core/simulation.py index edd94e9..aad0790 100644 --- a/topsim/core/simulation.py +++ b/topsim/core/simulation.py @@ -130,31 +130,26 @@ def __init__( timestamp=None, to_file=False, hdf5_path=None, + use_task_data=False, + use_edge_data=True, **kwargs ): #: :py:obj:`simpy.Environment` object self.env = env - # if timestamp: - # #: :py:obj:`~topsim.core.monitor.Monitor` instance - # self.monitor = Monitor(self, timestamp) - # self._timestamp = timestamp - # else: - # sim_start_time = f'{time.time()}'.split('.')[0] - # self._timestamp = sim_start_time - # self.monitor = Monitor(self, sim_start_time) + #: :py:obj:`~topsim.core.monitor.Monitor` instance if timestamp is not None: self.monitor = Monitor(self, timestamp) self._timestamp = datetime.datetime.fromtimestamp(timestamp) else: self._timestamp = datetime.datetime.now() self.monitor = Monitor(self, self._timestamp) + # Process necessary config files self._cfg_path = Path(config) #: Configuration path - - # Initiaise Actor and Resource objects + # Initialise Actor and Resource objects self._cfg = Config(config) #: :py:obj:`~topsim.core.cluster.Cluster` instance self.cluster = Cluster(env, self._cfg) @@ -167,7 +162,7 @@ def __init__( # model outside the simulation. delay = DelayModel(0.0, "normal", DelayModel.DelayDegree.NONE) self.planner = Planner( - env, self.cluster, planning_model, delay + env, self.cluster, planning_model, use_task_data, use_edge_data, delay ) self.buffer = Buffer(env, self.cluster, self.planner, self._cfg) scheduling_algorithm = scheduling @@ -212,6 +207,8 @@ def __init__( else: self._delimiters = '' + self.params = {"use_task_data": [use_task_data], "use_edge_data":[use_edge_data]} + self.running = False def start(self, runtime=-1): @@ -396,6 +393,7 @@ def _compose_hdf5_output(self, global_df, summary_df): self._hdf5_store.put(key=f"{final_key}/sim", value=global_df) self._hdf5_store.put(key=f'{final_key}/summary', value=summary_df) + self._hdf5_store.put(key=f'{final_key}/params', value=pd.DataFrame(self.params)) return self._hdf5_store diff --git a/topsim/core/task.py b/topsim/core/task.py index 4e7b808..42e1373 100644 --- a/topsim/core/task.py +++ b/topsim/core/task.py @@ -34,20 +34,17 @@ class TaskStatus(Enum): class Task(object): """ Tasks have priorities inheritted from the workflows from which they are - arrived; once - they arrive on the cluster queue, they are workflow agnositc, and are - processed according to - their priority. + arrived; once they arrive at the cluster queue, they are workflow , and + are processed according to their priority. """ # NB I don't want tasks to have null defaults; should we improve on this # by initialising everything in a task at once? def __init__(self, tid, est, eft, machine_id, predecessors, flops=0, task_data=0, - io=None, delay=None, gid=None): + edge_data: dict=None, delay=None, gid=None, use_task_data=False, use_edge_data=True): """ :param tid: ID of the Task object - :param env: Simulation environment to which the task will be added, - and where it will run as a process + """ self.id = tid @@ -56,7 +53,7 @@ def __init__(self, tid, est, eft, machine_id, predecessors, flops=0, task_data=0 self.ast = -1 self.aft = -1 self.allocated_machine_id = machine_id - self.duration = eft - est # TODO investigate making this a class property + self.duration = eft - est # TODO investigate making this a class property self.est_duration = eft - est self.delay_flag = False self.task_status = TaskStatus.UNSCHEDULED @@ -66,11 +63,15 @@ def __init__(self, tid, est, eft, machine_id, predecessors, flops=0, task_data=0 self.workflow_offset = 0 self.graph_id = gid - # Machine information that is less important - # currently (will update this in future versions) + + # Used to calculate actual runtime on the system self.flops = flops self.task_data = task_data - self.io = io + self.edge_data = edge_data # Input/edge data + + self.use_edge_data = use_edge_data + self.use_task_data = use_task_data + def __repr__(self): return str(self.id) @@ -87,6 +88,8 @@ def do_work(self, env, machine, predecessor_allocations=None): returns after a given duration. Parameters ---------- + predecessor_allocations + machine env alt = True if the machine is different to the predecessors machine. This affects data transfer between tasks. @@ -127,19 +130,22 @@ def do_work(self, env, machine, predecessor_allocations=None): def calculate_runtime(self, machine): """ Calculate the runtime of the task - Task duration is a function of either compute-time (FLOPs) or data-time (bandwidth) + + The duration is a function of either compute-time (FLOPs) or data-time (bandwidth); + we use whichever is greater. Parameters ---------- - machine: The machine we are allocated to (passed in during do_work()) + machine: The machine to which we are allocated (passed in during do_work()) Returns ------- - Maximum integer of either compute- or data-time + Maximum integer time of either compute- or data-time """ + compute_time = int(self.flops / machine.cpu) - data_time = int(self.task_data / machine.bandwidth) - return max(compute_time, data_time) + data_time = int(self.task_data / machine.bandwidth) if self.use_task_data else 0 + return max(compute_time, data_time) def update_allocation(self, machine): """ @@ -184,10 +190,12 @@ def _wait_for_transfer(self, env, machine, predecessor_allocations): """ mx = 0 + if not self.use_edge_data: + return mx # Calculate the difference between the latest start time of the # predecessor and the current time. for task in predecessor_allocations: - transfer_time = self.io[task.id] / machine.bandwidth + transfer_time = self.edge_data[task.id] / machine.ethernet if task.aft + transfer_time - env.now > mx: mx = task.aft + transfer_time - env.now return mx diff --git a/topsim/core/workflow.py b/topsim/core/workflow.py deleted file mode 100644 index 38b28b9..0000000 --- a/topsim/core/workflow.py +++ /dev/null @@ -1,42 +0,0 @@ -class WorkflowPlan(object): - """ - WorkflowPlans are used within the Planner, SchedulerA Actors and Cluster Resource. They are higher-level than the - shadow library representation, as they are a storage component of scheduled tasks, rather than directly representing - the DAG nature of the workflow. This is why the tasks are stored in queues. - """ - - def __init__(self, plan): - self.plan = plan - self.task_order = plan.exec_order - self.allocation = plan.allocation - self.priority = 0 - - def __lt__(self, other): - return self.priority < other.priority - - def __eq__(self, other): - return self.priority == other.priority - - def __gt__(self, other): - return self.priority > other.priority - - -class Task(object): - """ - Tasks have priorities inheritted from the workflows from which they are arrived; once - they arrive on the cluster queue, they are workflow agnositc, and are processed according to - their priority. - """ - - def __init__(self, id, env=None, pred=None): - self.env = env - self.id = id - self.start = 0 - self.finish = 0 - self.flops = 0 - self.memory = None - self.io = 0 - self.alloc = None - self.duration = None - self.pred = pred - self.workflow_offset = 0 diff --git a/topsim/user/plan/batch_planning.py b/topsim/user/plan/batch_planning.py index c462e65..7188bfd 100644 --- a/topsim/user/plan/batch_planning.py +++ b/topsim/user/plan/batch_planning.py @@ -22,6 +22,23 @@ from topsim.core.planner import WorkflowStatus, WorkflowPlan +def _workflow_to_nx(workflow): + """ + Read workflow file into networkx graph + Parameters + ---------- + workflow + + Returns + ------- + graph : networkx.DiGraph object + """ + with open(workflow, 'r') as infile: + config = json.load(infile) + graph = nx.readwrite.node_link_graph(config['graph'], edges="links") + return graph + + class BatchPlanning(Planning): """ Create a placeholder, topologically sorted plan for the batch-scheduler @@ -39,19 +56,23 @@ def __str__(self): def to_string(self): return self.__str__() - def generate_plan(self, clock, cluster, buffer, observation, max_ingest): + def generate_plan(self, clock, cluster, buffer, observation, max_ingest, task_data=False, edge_data=True): """ Parameters ---------- - cluster clock + cluster buffer observation + max_ingest + task_data + edge_data """ + plan = None if self.algorithm is 'batch': - graph = self._workflow_to_nx(observation.workflow) + graph = _workflow_to_nx(observation.workflow) est = clock # self._calc_workflow_est(observation, buffer) # new_graph = nx.DiGraph() mapping = {} @@ -68,22 +89,15 @@ def generate_plan(self, clock, cluster, buffer, observation, max_ingest): x, observation, clock ) for x in pred ] - succ = list(graph.successors(task)) - successors = [ - self._create_observation_task_id( - x, observation, clock - ) for x in pred - ] # Get the data transfer costs edge_costs = {} data = dict(graph.pred[task]) for element in data: - nm = self._create_observation_task_id( + pred_tid = self._create_observation_task_id( element, observation, clock ) - val = data[element]["transfer_data"] - edge_costs[nm] = val + edge_costs[pred_tid] = data[element]["transfer_data"] est, eft = 0, 0 machine_id = None @@ -110,22 +124,5 @@ def generate_plan(self, clock, cluster, buffer, observation, max_ingest): f'{self.algorithm} is not supported by {str(self)}' ) - - def _workflow_to_nx(self, workflow): - """ - Read workflow file into networkx graph - Parameters - ---------- - workflow - - Returns - ------- - graph : networkx.DiGraph object - """ - with open(workflow, 'r') as infile: - config = json.load(infile) - graph = nx.readwrite.node_link_graph(config['graph'], edges="links") - return graph - def to_df(self): pass diff --git a/topsim/user/plan/static_planning.py b/topsim/user/plan/static_planning.py index 97c53c6..e98e586 100644 --- a/topsim/user/plan/static_planning.py +++ b/topsim/user/plan/static_planning.py @@ -51,13 +51,15 @@ def __str__(self): def to_string(self): return self.__str__() - def generate_plan(self, clock, cluster, buffer, observation, max_ingest): + def generate_plan(self, clock, cluster, buffer, observation, max_ingest, task_data=False, edge_data=True): """ For this StaticPlanning example, we are using the SHADOW static scheduling library to produce static plans. Parameters ---------- + edge_data + task_data observation buffer cluster @@ -109,7 +111,7 @@ def generate_plan(self, clock, cluster, buffer, observation, max_ingest): allocation.machine.id, predecessors, task.flops_demand, task.io_demand, edge_costs, - dm + dm, use_task_data=task_data, use_edge_data=edge_data ) mapping[task] = taskobj tasks.append(taskobj) @@ -123,7 +125,7 @@ def generate_plan(self, clock, cluster, buffer, observation, max_ingest): return WorkflowPlan( observation.name, est, eft, tasks, exec_order, - WorkflowStatus.SCHEDULED, max_ingest, new_graph + WorkflowStatus.SCHEDULED, max_ingest, new_graph, ) def to_df(self): diff --git a/topsim/user/schedule/batch_allocation.py b/topsim/user/schedule/batch_allocation.py index d0704d0..f010426 100644 --- a/topsim/user/schedule/batch_allocation.py +++ b/topsim/user/schedule/batch_allocation.py @@ -223,6 +223,8 @@ def _max_resource_provision(self, cluster, observation=None): min_resources = int(graph_dop/2) if available >= min_resources: return min_resources + else: + return 0 else: # TODO consider removing the concept of the max_resources_split max_allowed = int( diff --git a/topsim/user/schedule/dynamic_plan.py b/topsim/user/schedule/dynamic_plan.py index 80b3d41..6e41d79 100644 --- a/topsim/user/schedule/dynamic_plan.py +++ b/topsim/user/schedule/dynamic_plan.py @@ -241,6 +241,8 @@ def _max_resource_provision(self, cluster, observation=None): min_resources = int(graph_dop/2) if available >= min_resources: return min_resources + else: + return 0 else: max_allowed = int( diff --git a/topsim/utils/experiment.py b/topsim/utils/experiment.py index 7a70b45..a827c73 100644 --- a/topsim/utils/experiment.py +++ b/topsim/utils/experiment.py @@ -20,13 +20,10 @@ This provides an example implementation and sits alongside the user.* modules implemented in this codebase. """ -import gc -import os -import sys -import time +import time +import itertools import logging -from builtins import enumerate import simpy from datetime import date @@ -37,7 +34,6 @@ # Framework defined models from topsim.core.simulation import Simulation -from topsim.core.delay import DelayModel # User defined models from topsim.user.telescope import Telescope # Instrument @@ -62,11 +58,14 @@ def __init__( self, configuration: list = None, alloc_combinations: list[tuple] = None, + data_combinations: list[tuple] = None, output=None, delay: bool = False, **kwargs): + self._configurations = configuration - self._combinations = alloc_combinations + self._combinations = list(itertools.product(alloc_combinations, + data_combinations)) self._delay = delay self._output = Path(output) self._sims = [] @@ -80,8 +79,10 @@ def _build_simulations(self): except OSError as e: LOGGER.critical("Failed to make output directory: %s", e) for c in self._configurations: - for ac in self._combinations: + for combination in self._combinations: + ac, dc = combination plan, sched = ac + use_task_data, use_edge_data = dc if plan == "batch": plan = BatchPlanning("batch") elif plan == "static": @@ -97,9 +98,10 @@ def _build_simulations(self): instrument = Telescope result_path_hash = _generate_truncated_hash(c, hash_length=6) yield Simulation(env=env, config=c, instrument=instrument, - planning_model=plan, scheduling=sched, delay=self._delay, timestamp=None, - to_file=True, - hdf5_path=f"{self._output}/results_f{date.today().isoformat()}_{result_path_hash}.h5") + planning_model=plan, scheduling=sched, delay=self._delay, timestamp=None, + to_file=True, + hdf5_path=f"{self._output}/results_f{date.today().isoformat()}_{result_path_hash}.h5", + use_task_data=use_task_data, use_edge_data=use_edge_data) def _run_batch(self): """ @@ -124,7 +126,7 @@ def _run_batch(self): yield Simulation(env=env, config=c, instrument=instrument, planning_model=plan, scheduling=sched, delay=self._delay, timestamp=None, to_file=True, - hdf5_path=f"{self._output}/results_f{date.today().isoformat()}_{path_hash}.h5") + hdf5_path=f"{self._output}/results_f{date.today().isoformat()}_{result_path_hash}.h5")