Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 14 additions & 16 deletions skaworkflows/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,10 @@
import json
from pathlib import Path
from enum import Enum, IntEnum, auto

try:
import importlib.resources as imp_res
except:
except:
import importlib_resources as imp_res

from skaworkflows import __version__
Expand All @@ -45,6 +46,7 @@ class SI(IntEnum):
tera = 10 ** 12
peta = 10 ** 15


class Telescope:
"""
Generator class for a telescope based on the telescope parameter
Expand All @@ -65,14 +67,13 @@ def _contains_mid(self, param):
def __getattr__(self, item):
return getattr(self._telescope, item)


def __instancecheck__(self, instance):
isinstance(self._telescope, instance)

def __repr__(self):
return self._telescope.name

def is_hpso_for_telescope(self, hpso:str)-> bool:
def is_hpso_for_telescope(self, hpso: str) -> bool:
"""
Used to determine if HPSO can be run on this telescope.

Expand All @@ -89,7 +90,6 @@ def is_hpso_for_telescope(self, hpso:str)-> bool:
True if the telescope can run the HPSO
"""


return hasattr(self._telescope, hpso)


Expand Down Expand Up @@ -149,7 +149,7 @@ class SKAMid:
hpso22 = "hpso22"
hpso33 = "hpso032"

stations = [64, 102, 140, 197] # antenna
stations = [64, 102, 140, 197] # antenna
max_stations = max(stations)
max_compute_nodes = 786

Expand All @@ -168,30 +168,28 @@ def __str__(self):
def __repr__(self):
return str(self)

class Workflows:

class Workflows:
ical = "ICAL"
drepa = "DPrepA"
dprepb = "DPrepB"
dprepc = "DPrepC"
dprepd = "DPrepD"
pulsar = "Pulsar"


# Amount of the telescope an observation can request
MAX_TEL_DEMAND_LOW = 512
MAX_TEL_DEMAND_MID = 197
MAX_CHANNELS = 512

# System sizing data paths
DATA_PANDAS_SIZING = imp_res.files("skaworkflows.data.pandas_sizing")
LOW_TOTAL_SIZING = DATA_PANDAS_SIZING.joinpath("total_compute_SKA1_Low_2024-08-20.csv")
LOW_COMPONENT_SIZING = Path(
DATA_PANDAS_SIZING.joinpath("component_compute_SKA1_Low_2024-08-20.csv")
)
MID_TOTAL_SIZING = Path(DATA_PANDAS_SIZING.joinpath("total_compute_SKA1_Mid_2024-08-20.csv"))
MID_COMPONENT_SIZING = Path(
DATA_PANDAS_SIZING.joinpath("component_compute_SKA1_Mid_2024-08-20.csv")
)
DATA_PANDAS_SIZING = Path(str(imp_res.files("skaworkflows.data.pandas_sizing")))
LOW_TOTAL_SIZING = DATA_PANDAS_SIZING / "total_compute_SKA1_Low_2025-02-25.csv"
LOW_COMPONENT_SIZING = DATA_PANDAS_SIZING / "component_compute_SKA1_Low_2025-02-25.csv"

MID_TOTAL_SIZING = DATA_PANDAS_SIZING / "total_compute_SKA1_Mid_2025-02-25.csv"
MID_COMPONENT_SIZING = DATA_PANDAS_SIZING / "component_compute_SKA1_Mid_2025-02-25.csv"

# Bytes per obseved visibility
BYTES_PER_VIS = 12.0
Expand All @@ -203,7 +201,6 @@ class Workflows:
PULSAR_GRAPH = GRAPH_DIR.joinpath("pulsar.graph")



def create_workflow_header(telescope: str):
"""
Default JSON "header" elemetn used when generating workflow files
Expand All @@ -230,6 +227,7 @@ def create_workflow_header(telescope: str):
"time": "False",
}


class NpEncoder(json.JSONEncoder):
# My complete laziness
# https://java2blog.com/object-of-type-int64-is-not-json-serializable/
Expand Down
2 changes: 0 additions & 2 deletions skaworkflows/config_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,8 +152,6 @@ def create_config(
system_sizing,
cluster_dict,
base_graph_paths,
data,
data_distribution
))

LOGGER.info(f"Producing buffer config")
Expand Down
102 changes: 7 additions & 95 deletions skaworkflows/workflow/hpso_to_observation.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import networkx as nx

from typing import List, Dict
from enum import Enum
from pathlib import Path

import skaworkflows.workflow.eagle_daliuge_translation as edt
Expand All @@ -48,7 +47,7 @@
SI,
create_workflow_header,
CONT_IMG_MVP_GRAPH,
BASIC_PROTOTYPE_GRAPH,
BASIC_PROTOTYPE_GRAPH,
SCATTER_GRAPH,
PULSAR_GRAPH,
BYTES_PER_VIS,
Expand Down Expand Up @@ -548,8 +547,6 @@ def generate_instrument_config(
system_sizing,
cluster,
base_graph_paths,
data=True,
data_distribution: str = "standard",
**kwargs,
) -> dict:
"""
Expand Down Expand Up @@ -617,7 +614,7 @@ def generate_instrument_config(
wf_file_path.parent.mkdir(parents=True, exist_ok=True)

possible_file_name = _find_existing_workflow(config_dir_path / "workflows",
o, data, data_distribution)
o)
if possible_file_name:
use_existing_file = True
wf_file_path = config_dir_path / "workflows" / possible_file_name
Expand All @@ -631,8 +628,6 @@ def generate_instrument_config(
system_sizing,
wf_file_name,
base_graph_paths,
data=data,
data_distribution=data_distribution,
)
else:
wf_file_path = wf_file_path
Expand All @@ -652,8 +647,6 @@ def generate_instrument_config(
"baseline": o.baseline,
"workflow_type": list(set(base_graph_paths.values())), # TODO convert to set of strings?
"graph_type": list(set(base_graph_paths.keys())), # TODO As above
"data": data,
"data_distribution": data_distribution
}
telescope_observations.append(o.to_json())

Expand All @@ -670,7 +663,7 @@ def generate_instrument_config(
return telescope_dict


def _find_existing_workflow(dirname, observation, data, data_distribution):
def _find_existing_workflow(dirname, observation):
"""
"parameters": {
"max_arrays": 512,
Expand Down Expand Up @@ -698,8 +691,6 @@ def _find_existing_workflow(dirname, observation, data, data_distribution):
header["parameters"]["hpso"] = observation.hpso
# TODO Fix this so it is based on telescope
header["parameters"]["max_arrays"] = Telescope(observation.telescope).max_stations
header["parameters"]["data"] = data
header["parameters"]["data_distribution"] = data_distribution

# TODO consider caching this information
for wf in os.listdir(dirname):
Expand Down Expand Up @@ -759,8 +750,6 @@ def generate_workflow_from_observation(
workflow_path_name,
base_graph_paths,
concat=True,
data=True,
data_distribution="standard",
):
"""
Given a pipeline and observation specification, generate a workflow file
Expand Down Expand Up @@ -840,16 +829,14 @@ def generate_workflow_from_observation(
observation,
workflow,
component_sizing,
data=data,
data_distribution=data_distribution,
)
final_graphs[workflow] = intermed_graph
workflow_stats[workflow] = task_dict

write_workflow_stats_to_csv(workflow_stats, final_path)
final_workflow = edt.concatenate_workflows(final_graphs, observation.workflows)
final_json = produce_final_workflow_structure(
final_workflow, observation, data, data_distribution, time=False
final_workflow, observation, time=False
)

with open(final_path, "w") as fp:
Expand Down Expand Up @@ -893,8 +880,6 @@ def generate_cost_per_product(
observation,
workflow,
component_sizing,
data=True,
data_distribution="standard",
final_path=None,
):
"""
Expand Down Expand Up @@ -1005,7 +990,7 @@ def generate_cost_per_product(
nx_graph.nodes[node]["comp"] = compute
else:
nx_graph.nodes[node]["comp"] = observation.duration
if data_cost > 0 and data:
if data_cost > 0:
nx_graph.nodes[node]["task_data"] = data_cost
else:
nx_graph.nodes[node]["task_data"] = 0
Expand All @@ -1023,10 +1008,7 @@ def generate_cost_per_product(
* SI.mega
* BYTES_PER_VIS
)
if data_distribution == "edges":
nx_graph[producer][node]["transfer_data"] = data_cost / num_edges
else:
nx_graph[producer][node]["transfer_data"] = 0
nx_graph[producer][node]["transfer_data"] = data_cost / num_edges

return nx_graph, task_dict

Expand Down Expand Up @@ -1161,73 +1143,6 @@ def identify_component_cost(
return total_cost, total_data


def calculate_major_loop_data(
task_dict, component_sizing, hpso, baseline, workflow, data_distribution
):
"""
If we want to take data into account in the way that the parametric model does,
we use their visibility read rate (Rio) value in their model.

This is supposed to 'be read' only once per major cycle, but there is
ambiguity around whether that is allocated to a single task, once per
'set' of major cycle tasks, or once per major cycle & minor cycle tasks.

The following tasks are in the major cycle (but not in the minor cycle):

* DeGrid
* Subtract
* Flag
* Grid

By default we assign the data to the DeGrid as part of its compute task,
and then to the edges of each subsequent node as a potential 'transfer'
cost. If `distribute` is `True`, then we average the data across each
major-loop task above, to determine if it has any impact on the runtime.

Parameters
----------

Returns
-------
task_dict : `dict`
Modified task dictionary - keeps the functionality pure.
"""
data = 0
data_dict = {"Degrid": {}, "Subtract": {}, "Flag": {}, "Grid": {}}
obs_frame = component_sizing[
(component_sizing["hpso"] == hpso) & (component_sizing["Baseline"] == baseline)
]

if data_distribution == "distribute":
raise NotImplementedError(
"Current functionality does not support distributed data costs"
)
elif data_distribution == "standard":
data = float(
obs_frame[obs_frame["Pipeline"] == f"{workflow}"]["Visibility read rate"]
)
data_dict["Degrid"]["total_io_cost"] = data
data_dict["Degrid"]["fraction_io_cost"] = data / task_dict["Degrid"]["node"]
for component in data_dict:
if component == "Degrid":
data_dict[component]["total_io_cost"] = data
data_dict[component]["fraction_io_cost"] = (
data / task_dict["Degrid"]["node"]
)
else:
data_dict[component]["total_io_cost"] = 0
data_dict[component]["fraction_io_cost"] = 0

data_dict[component]["total_data_cost"] = data
data_dict[component]["fraction_data_cost"] = (
data / task_dict[component]["node"]
)
else:
raise RuntimeError("Passing incorrect data distribution method")

return data_dict


def retrieve_component_cost(observation, workflow, component, component_sizing):
"""

Expand Down Expand Up @@ -1310,8 +1225,7 @@ def retrieve_workflow_cost(observation, workflow, system_sizing):
return flops


def produce_final_workflow_structure(nx_final, observation, data, data_distribution,
time=False):
def produce_final_workflow_structure(nx_final, observation, time=False):
"""
For a given logical graph template, produce a workflow with the specific
number of channels and return it as a JSON serialisable dictionary.
Expand All @@ -1338,8 +1252,6 @@ def produce_final_workflow_structure(nx_final, observation, data, data_distribut
header["parameters"]["duration"] = observation.duration
header["parameters"]["workflows"] = observation.workflows
header["parameters"]["hpso"] = observation.hpso
header["parameters"]["data"] = data
header["parameters"]["data_distribution"] = data_distribution
jgraph = {"header": header, "graph": nx.readwrite.node_link_data(nx_final, edges="links")}
return jgraph

Expand Down
2 changes: 1 addition & 1 deletion tests/test_config_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ def test_config_generation_low(self):
parameters=HPSO_PARAMETERS,
output_dir=self.low_path_str,
base_graph_paths=self.prototype_workflow_paths,
timestep='seconds', data=True)
timestep='seconds')
self.assertTrue(Path(config[0]).exists())

def TestConfigGenerationMid(self):
Expand Down
Loading