From 760beccc5f86d22d59d664b9f0379dc02a88fdde Mon Sep 17 00:00:00 2001 From: myxie Date: Sun, 17 Aug 2025 16:36:30 +0800 Subject: [PATCH] #50: Remove data and data_distribution parameters from SKAWorkflows Signed-off-by: myxie --- skaworkflows/common.py | 30 +++--- skaworkflows/config_generator.py | 2 - skaworkflows/workflow/hpso_to_observation.py | 102 ++----------------- tests/test_config_generator.py | 2 +- tests/test_workflow_generation.py | 26 +++-- 5 files changed, 34 insertions(+), 128 deletions(-) diff --git a/skaworkflows/common.py b/skaworkflows/common.py index d177bbc..8bbb155 100644 --- a/skaworkflows/common.py +++ b/skaworkflows/common.py @@ -26,9 +26,10 @@ import json from pathlib import Path from enum import Enum, IntEnum, auto + try: import importlib.resources as imp_res -except: +except: import importlib_resources as imp_res from skaworkflows import __version__ @@ -45,6 +46,7 @@ class SI(IntEnum): tera = 10 ** 12 peta = 10 ** 15 + class Telescope: """ Generator class for a telescope based on the telescope parameter @@ -65,14 +67,13 @@ def _contains_mid(self, param): def __getattr__(self, item): return getattr(self._telescope, item) - def __instancecheck__(self, instance): isinstance(self._telescope, instance) def __repr__(self): return self._telescope.name - def is_hpso_for_telescope(self, hpso:str)-> bool: + def is_hpso_for_telescope(self, hpso: str) -> bool: """ Used to determine if HPSO can be run on this telescope. @@ -89,7 +90,6 @@ def is_hpso_for_telescope(self, hpso:str)-> bool: True if the telescope can run the HPSO """ - return hasattr(self._telescope, hpso) @@ -149,7 +149,7 @@ class SKAMid: hpso22 = "hpso22" hpso33 = "hpso032" - stations = [64, 102, 140, 197] # antenna + stations = [64, 102, 140, 197] # antenna max_stations = max(stations) max_compute_nodes = 786 @@ -168,8 +168,8 @@ def __str__(self): def __repr__(self): return str(self) -class Workflows: +class Workflows: ical = "ICAL" drepa = "DPrepA" dprepb = "DPrepB" @@ -177,21 +177,19 @@ class Workflows: dprepd = "DPrepD" pulsar = "Pulsar" + # Amount of the telescope an observation can request MAX_TEL_DEMAND_LOW = 512 MAX_TEL_DEMAND_MID = 197 MAX_CHANNELS = 512 # System sizing data paths -DATA_PANDAS_SIZING = imp_res.files("skaworkflows.data.pandas_sizing") -LOW_TOTAL_SIZING = DATA_PANDAS_SIZING.joinpath("total_compute_SKA1_Low_2024-08-20.csv") -LOW_COMPONENT_SIZING = Path( - DATA_PANDAS_SIZING.joinpath("component_compute_SKA1_Low_2024-08-20.csv") -) -MID_TOTAL_SIZING = Path(DATA_PANDAS_SIZING.joinpath("total_compute_SKA1_Mid_2024-08-20.csv")) -MID_COMPONENT_SIZING = Path( - DATA_PANDAS_SIZING.joinpath("component_compute_SKA1_Mid_2024-08-20.csv") -) +DATA_PANDAS_SIZING = Path(str(imp_res.files("skaworkflows.data.pandas_sizing"))) +LOW_TOTAL_SIZING = DATA_PANDAS_SIZING / "total_compute_SKA1_Low_2025-02-25.csv" +LOW_COMPONENT_SIZING = DATA_PANDAS_SIZING / "component_compute_SKA1_Low_2025-02-25.csv" + +MID_TOTAL_SIZING = DATA_PANDAS_SIZING / "total_compute_SKA1_Mid_2025-02-25.csv" +MID_COMPONENT_SIZING = DATA_PANDAS_SIZING / "component_compute_SKA1_Mid_2025-02-25.csv" # Bytes per obseved visibility BYTES_PER_VIS = 12.0 @@ -203,7 +201,6 @@ class Workflows: PULSAR_GRAPH = GRAPH_DIR.joinpath("pulsar.graph") - def create_workflow_header(telescope: str): """ Default JSON "header" elemetn used when generating workflow files @@ -230,6 +227,7 @@ def create_workflow_header(telescope: str): "time": "False", } + class NpEncoder(json.JSONEncoder): # My complete laziness # https://java2blog.com/object-of-type-int64-is-not-json-serializable/ diff --git a/skaworkflows/config_generator.py b/skaworkflows/config_generator.py index 92934d9..885adb9 100644 --- a/skaworkflows/config_generator.py +++ b/skaworkflows/config_generator.py @@ -152,8 +152,6 @@ def create_config( system_sizing, cluster_dict, base_graph_paths, - data, - data_distribution )) LOGGER.info(f"Producing buffer config") diff --git a/skaworkflows/workflow/hpso_to_observation.py b/skaworkflows/workflow/hpso_to_observation.py index 86765d7..41ae93e 100644 --- a/skaworkflows/workflow/hpso_to_observation.py +++ b/skaworkflows/workflow/hpso_to_observation.py @@ -39,7 +39,6 @@ import networkx as nx from typing import List, Dict -from enum import Enum from pathlib import Path import skaworkflows.workflow.eagle_daliuge_translation as edt @@ -48,7 +47,7 @@ SI, create_workflow_header, CONT_IMG_MVP_GRAPH, - BASIC_PROTOTYPE_GRAPH, +BASIC_PROTOTYPE_GRAPH, SCATTER_GRAPH, PULSAR_GRAPH, BYTES_PER_VIS, @@ -548,8 +547,6 @@ def generate_instrument_config( system_sizing, cluster, base_graph_paths, - data=True, - data_distribution: str = "standard", **kwargs, ) -> dict: """ @@ -617,7 +614,7 @@ def generate_instrument_config( wf_file_path.parent.mkdir(parents=True, exist_ok=True) possible_file_name = _find_existing_workflow(config_dir_path / "workflows", - o, data, data_distribution) + o) if possible_file_name: use_existing_file = True wf_file_path = config_dir_path / "workflows" / possible_file_name @@ -631,8 +628,6 @@ def generate_instrument_config( system_sizing, wf_file_name, base_graph_paths, - data=data, - data_distribution=data_distribution, ) else: wf_file_path = wf_file_path @@ -652,8 +647,6 @@ def generate_instrument_config( "baseline": o.baseline, "workflow_type": list(set(base_graph_paths.values())), # TODO convert to set of strings? "graph_type": list(set(base_graph_paths.keys())), # TODO As above - "data": data, - "data_distribution": data_distribution } telescope_observations.append(o.to_json()) @@ -670,7 +663,7 @@ def generate_instrument_config( return telescope_dict -def _find_existing_workflow(dirname, observation, data, data_distribution): +def _find_existing_workflow(dirname, observation): """ "parameters": { "max_arrays": 512, @@ -698,8 +691,6 @@ def _find_existing_workflow(dirname, observation, data, data_distribution): header["parameters"]["hpso"] = observation.hpso # TODO Fix this so it is based on telescope header["parameters"]["max_arrays"] = Telescope(observation.telescope).max_stations - header["parameters"]["data"] = data - header["parameters"]["data_distribution"] = data_distribution # TODO consider caching this information for wf in os.listdir(dirname): @@ -759,8 +750,6 @@ def generate_workflow_from_observation( workflow_path_name, base_graph_paths, concat=True, - data=True, - data_distribution="standard", ): """ Given a pipeline and observation specification, generate a workflow file @@ -840,8 +829,6 @@ def generate_workflow_from_observation( observation, workflow, component_sizing, - data=data, - data_distribution=data_distribution, ) final_graphs[workflow] = intermed_graph workflow_stats[workflow] = task_dict @@ -849,7 +836,7 @@ def generate_workflow_from_observation( write_workflow_stats_to_csv(workflow_stats, final_path) final_workflow = edt.concatenate_workflows(final_graphs, observation.workflows) final_json = produce_final_workflow_structure( - final_workflow, observation, data, data_distribution, time=False + final_workflow, observation, time=False ) with open(final_path, "w") as fp: @@ -893,8 +880,6 @@ def generate_cost_per_product( observation, workflow, component_sizing, - data=True, - data_distribution="standard", final_path=None, ): """ @@ -1005,7 +990,7 @@ def generate_cost_per_product( nx_graph.nodes[node]["comp"] = compute else: nx_graph.nodes[node]["comp"] = observation.duration - if data_cost > 0 and data: + if data_cost > 0: nx_graph.nodes[node]["task_data"] = data_cost else: nx_graph.nodes[node]["task_data"] = 0 @@ -1023,10 +1008,7 @@ def generate_cost_per_product( * SI.mega * BYTES_PER_VIS ) - if data_distribution == "edges": - nx_graph[producer][node]["transfer_data"] = data_cost / num_edges - else: - nx_graph[producer][node]["transfer_data"] = 0 + nx_graph[producer][node]["transfer_data"] = data_cost / num_edges return nx_graph, task_dict @@ -1161,73 +1143,6 @@ def identify_component_cost( return total_cost, total_data -def calculate_major_loop_data( - task_dict, component_sizing, hpso, baseline, workflow, data_distribution -): - """ - If we want to take data into account in the way that the parametric model does, - we use their visibility read rate (Rio) value in their model. - - This is supposed to 'be read' only once per major cycle, but there is - ambiguity around whether that is allocated to a single task, once per - 'set' of major cycle tasks, or once per major cycle & minor cycle tasks. - - The following tasks are in the major cycle (but not in the minor cycle): - - * DeGrid - * Subtract - * Flag - * Grid - - By default we assign the data to the DeGrid as part of its compute task, - and then to the edges of each subsequent node as a potential 'transfer' - cost. If `distribute` is `True`, then we average the data across each - major-loop task above, to determine if it has any impact on the runtime. - - Parameters - ---------- - - Returns - ------- - task_dict : `dict` - Modified task dictionary - keeps the functionality pure. - """ - data = 0 - data_dict = {"Degrid": {}, "Subtract": {}, "Flag": {}, "Grid": {}} - obs_frame = component_sizing[ - (component_sizing["hpso"] == hpso) & (component_sizing["Baseline"] == baseline) - ] - - if data_distribution == "distribute": - raise NotImplementedError( - "Current functionality does not support distributed data costs" - ) - elif data_distribution == "standard": - data = float( - obs_frame[obs_frame["Pipeline"] == f"{workflow}"]["Visibility read rate"] - ) - data_dict["Degrid"]["total_io_cost"] = data - data_dict["Degrid"]["fraction_io_cost"] = data / task_dict["Degrid"]["node"] - for component in data_dict: - if component == "Degrid": - data_dict[component]["total_io_cost"] = data - data_dict[component]["fraction_io_cost"] = ( - data / task_dict["Degrid"]["node"] - ) - else: - data_dict[component]["total_io_cost"] = 0 - data_dict[component]["fraction_io_cost"] = 0 - - data_dict[component]["total_data_cost"] = data - data_dict[component]["fraction_data_cost"] = ( - data / task_dict[component]["node"] - ) - else: - raise RuntimeError("Passing incorrect data distribution method") - - return data_dict - - def retrieve_component_cost(observation, workflow, component, component_sizing): """ @@ -1310,8 +1225,7 @@ def retrieve_workflow_cost(observation, workflow, system_sizing): return flops -def produce_final_workflow_structure(nx_final, observation, data, data_distribution, - time=False): +def produce_final_workflow_structure(nx_final, observation, time=False): """ For a given logical graph template, produce a workflow with the specific number of channels and return it as a JSON serialisable dictionary. @@ -1338,8 +1252,6 @@ def produce_final_workflow_structure(nx_final, observation, data, data_distribut header["parameters"]["duration"] = observation.duration header["parameters"]["workflows"] = observation.workflows header["parameters"]["hpso"] = observation.hpso - header["parameters"]["data"] = data - header["parameters"]["data_distribution"] = data_distribution jgraph = {"header": header, "graph": nx.readwrite.node_link_data(nx_final, edges="links")} return jgraph diff --git a/tests/test_config_generator.py b/tests/test_config_generator.py index ab4d3a5..edbd29c 100644 --- a/tests/test_config_generator.py +++ b/tests/test_config_generator.py @@ -61,7 +61,7 @@ def test_config_generation_low(self): parameters=HPSO_PARAMETERS, output_dir=self.low_path_str, base_graph_paths=self.prototype_workflow_paths, - timestep='seconds', data=True) + timestep='seconds') self.assertTrue(Path(config[0]).exists()) def TestConfigGenerationMid(self): diff --git a/tests/test_workflow_generation.py b/tests/test_workflow_generation.py index aaea5e6..74bf877 100644 --- a/tests/test_workflow_generation.py +++ b/tests/test_workflow_generation.py @@ -181,16 +181,16 @@ def test_use_in_other_functions(self): # with open() - # def tearDown(self) -> None: - # """ - # Remove files generated during various test cases - # Returns - # ------- - # - # """ - # if os.path.exists(PGT_PATH_GENERATED): - # os.remove(PGT_PATH_GENERATED) - # self.assertFalse(os.path.exists(PGT_PATH_GENERATED)) + def tearDown(self) -> None: + """ + Remove files generated during various test cases + Returns + ------- + + """ + if os.path.exists(PGT_PATH_GENERATED): + os.remove(PGT_PATH_GENERATED) + self.assertFalse(os.path.exists(PGT_PATH_GENERATED)) class TestWorkflowFromObservation(unittest.TestCase): @@ -367,7 +367,7 @@ def test_generate_cost_per_product(self): nx_graph, task_dict, cached_workflow_dict = edt.eagle_to_nx(LGT_PATH, wf) self.assertTrue('DPrepA_Degrid_0' in nx_graph.nodes) final_workflow, task_dict = hpo.generate_cost_per_product( - nx_graph, task_dict, self.obs1, wf, self.component_system_sizing, data=True, data_distribution='edges' + nx_graph, task_dict, self.obs1, wf, self.component_system_sizing ) # We want to make sure the comp cost has been updated @@ -401,7 +401,7 @@ def test_generate_cost_per_product(self): self.assertEqual(128, task_dict['Degrid']['node']) self.assertTrue('DPrepA_Degrid_0' in nx_graph.nodes) final_workflow, task_dict = hpo.generate_cost_per_product( - nx_graph, task_dict, self.obs1, wf, self.component_system_sizing, data=False, data_distribution='edges' + nx_graph, task_dict, self.obs1, wf, self.component_system_sizing ) self.assertAlmostEqual( @@ -583,8 +583,6 @@ def testWorkflowFileCorrectness(self): "workflow_parallelism": 1, "workflows": ['DPrepA', 'DPrepB'], "hpso": "hpso01", - "data": True, - "data_distribution": 'standard' }, 'time': False }