diff --git a/MANIFEST.in b/MANIFEST.in index 04e9be62..6ad3b6aa 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -1,5 +1,5 @@ graft src -graft CMake +graft cmake graft dependency graft include diff --git a/test/paper/load_darshan.py b/test/paper/load_darshan.py new file mode 100644 index 00000000..fc43a71f --- /dev/null +++ b/test/paper/load_darshan.py @@ -0,0 +1,95 @@ +from glob import glob +import pandas as pd +print(f"pd {pd.__version__}") +import dask +import dask.dataframe as dd +from dask.distributed import Client, LocalCluster, progress, wait +print(f"dask {dask.__version__}") +import pyarrow as pa +print(f"pa {pa.__version__}") +import logging +from glob import glob +import argparse +import time + + +import otf2 +from otf2.events import * + +logging.basicConfig(filename='recorder_main.log', encoding='utf-8', level=logging.DEBUG) + + +def generate_darshan_records(log_file): + def get_dict(row): + d = {} + d["size"] = row["length"] + d["ts"] = int(row["start_time"] * 10e6) + d["dur"] = int(row["end_time"] * 10e6) - d["ts"] + d["tinterval"] = I.to_string(I.closed(d["ts"] , d["ts"] + d["dur"])) + d["trange"] = int(((d["ts"] + d["dur"])/2.0) / time_granularity) + d["phase"] = 2 + d["compute_time"] = I.to_string(I.empty()) + d["io_time"] = d["tinterval"] + return d + report = darshan.DarshanReport(log_file, read_all=True) + if "DXT_POSIX" in report.modules: + time_granularity=10e3 + for val in report.records['DXT_POSIX'].to_df(): + d = {} + fileid = val["id"] + write_df = val["write_segments"] + read_df = val["read_segments"] + d["hostname"] = val["hostname"] + d["pid"] = val["rank"] + d["tid"] = 0 + d["cat"] = "POSIX" + d["filename"] = report.data['name_records'][fileid] + for index, row in write_df.iterrows(): + d["name"] = "write" + d.update(get_dict(row)) + yield d + for index, row in read_df.iterrows(): + d["name"] = "read" + d.update(get_dict(row)) + yield d + +parser = argparse.ArgumentParser( + description="Time functions and print time spent in each function", + formatter_class=argparse.RawDescriptionHelpFormatter, +) +parser.add_argument("trace_file", help="Trace file to load", type=str) + +parser.add_argument("--workers", help="Number of workers", type=int, default=1) +args = parser.parse_args() +filename = args.trace_file + +cluster = LocalCluster(n_workers=args.workers) # Launches a scheduler and workers locally +client = Client(cluster) # Connect to distributed cluster and override default + +args = parser.parse_args() +filename = args.trace_file + + +file_pattern = glob(filename) + +all_records = [] +start = time.time() + +create_bag = dask.bag.from_delayed([dask.delayed(generate_darshan_records)(file) + for file in file_pattern]) +columns = {'name':"string[pyarrow]", 'cat': "string[pyarrow]", + 'pid': "uint64[pyarrow]",'tid': "uint64[pyarrow]", + 'dur': "uint64[pyarrow]", 'tinterval': "string[pyarrow]", + 'trange': "uint64[pyarrow]", 'hostname': "string[pyarrow]", + 'compute_time': "string[pyarrow]", 'io_time': "string[pyarrow]", + 'filename': "string[pyarrow]", 'phase': "uint16[pyarrow]", + 'size': "uint64[pyarrow]"} +events = create_bag.to_dataframe(meta=columns) + +n_partition = 1 +events = events.repartition(npartitions=n_partition).persist() +progress(events) +_ = wait(events) + +end = time.time() +print(f"Loading Darshan trace took {end-start} seconds.") \ No newline at end of file diff --git a/test/paper/load_recorder.py b/test/paper/load_recorder.py new file mode 100644 index 00000000..73b956c3 --- /dev/null +++ b/test/paper/load_recorder.py @@ -0,0 +1,80 @@ +from glob import glob +import pandas as pd +print(f"pd {pd.__version__}") + +import dask +import dask.dataframe as dd +from dask.distributed import Client, LocalCluster, progress, wait +print(f"dask {dask.__version__}") +import pyarrow as pa +print(f"pa {pa.__version__}") + +import logging +from glob import glob +import argparse +import time + +import recorder_viz +from recorder_viz import RecorderReader + +logging.basicConfig(filename='darshan_main.log', encoding='utf-8', level=logging.DEBUG) + + +def get_json(func, ts, dur, rank): + d = {} + #print(location, start) + d["name"] = func + d["cat"] = "Recorder" + d["ts"] = int(ts) + d["dur"] = int(dur) + d["pid"] = rank + d["tid"] = 0 + #print(d) + return d + + +def read_trace(trace_name): + map_events = {} + count = 0 + reader = RecorderReader(trace_name) + func_list = reader.funcs + for rank, records in enumerate(reader.records): + lm = reader.LMs[rank] + for record in records: + if len(func_list) > record.func_id: + func_name = func_list[record.func_id] + if record.func_id > 0 and "MPI" not in func_name: + yield get_json(func_name, 0, 10, rank) + +parser = argparse.ArgumentParser( + description="Time functions and print time spent in each function", + formatter_class=argparse.RawDescriptionHelpFormatter, +) +parser.add_argument("trace_file", help="Trace file to load", type=str) + +parser.add_argument("--workers", help="Number of workers", type=int, default=1) +args = parser.parse_args() +filename = args.trace_file + +cluster = LocalCluster(n_workers=args.workers) # Launches a scheduler and workers locally +client = Client(cluster) # Connect to distributed cluster and override default + +file_pattern = glob(filename) + +all_records = [] +start = time.time() + +create_bag = dask.bag.from_delayed([dask.delayed(read_trace)(file) + for file in file_pattern]) +columns = {'name':"string", 'cat': "string", + 'pid': "string",'tid': "string", + 'dur': "uint64", 'ts': "uint64"} +events = create_bag.to_dataframe(meta=columns) +#events.head() +n_partition = 1 +events = events.repartition(npartitions=n_partition).persist() +progress(events) +_ = wait(events) + +end = time.time() +print(f"Loading Recorder trace took {end-start} seconds.") \ No newline at end of file diff --git a/test/paper/load_scorep.py b/test/paper/load_scorep.py new file mode 100644 index 00000000..11274827 --- /dev/null +++ b/test/paper/load_scorep.py @@ -0,0 +1,109 @@ +from glob import glob +import pandas as pd +print(f"pd {pd.__version__}") + +import dask +import dask.dataframe as dd +from dask.distributed import Client, LocalCluster, progress, wait +print(f"dask {dask.__version__}") +import pyarrow as pa +print(f"pa {pa.__version__}") + +import logging +from glob import glob +import argparse +import time + +import otf2 +from otf2.events import * + + +logging.basicConfig(filename='score-p_main.log', encoding='utf-8', level=logging.DEBUG) + +def get_json(location, start, end): + d = {} + #print(location, start) + d["name"] = start.region.name + d["cat"] = start.region.region_role + d["ts"] = start.time + d["dur"] = end.time - start.time + return d +def get_json_one(location, start): + d = {} + #print(location.group, start) + if hasattr(start, 'region'): + d["name"] = start.region.name + d["cat"] = start.region.region_role + else: + d["name"] = start.__class__ + d["cat"] = "Program" + d["ts"] = start.time + d["dur"] = 0 + d["tid"] = location.name + d["pid"] = location.group.name + return d + + +def read_trace(trace_name): + map_events = {} + count = 0 + with otf2.reader.open(trace_name) as trace: + #print("Read {} string definitions".format(len(trace.definitions.strings))) + for location, event in trace.events: + if isinstance(event, Enter): + unique_id = (location, event.region) + map_events[unique_id] = [event] + #print(f"Encountered enter event into {event.region} on location {location.group} at {event.attributes}") + elif isinstance(event, Leave): + unique_id = (location, event.region) + if unique_id in map_events: + map_events[unique_id].append(event) + else: + map_events[unique_id] = [event] + #print(f"Encountered enter event int") + if len(map_events[unique_id]) == 2: + yield dict(**get_json(location = location, start = map_events[unique_id][0], end = map_events[unique_id][1])) + elif len(map_events[unique_id]) == 1: + yield dict(**get_json_one(location = location, start = map_events[unique_id][0])) + del map_events[unique_id] + #print(f"Encountered leave event for {event.region} on location {location} at {event}") + else: + yield dict(**get_json_one(location = location, start = event)) + #print(f"Encountered event on location {location} at {event}") + count = count + 1 + if count % 1000 == 0: + print(f"Done {count} in {time.time() - start}", end="\r") + +parser = argparse.ArgumentParser( + description="Time functions and print time spent in each function", + formatter_class=argparse.RawDescriptionHelpFormatter, +) +parser.add_argument("trace_file", help="Trace file to load", type=str) +parser.add_argument("--workers", help="Number of workers", type=int, default=1) +args = parser.parse_args() +filename = args.trace_file + +cluster = LocalCluster(n_workers=args.workers) # Launches a scheduler and workers locally +client = Client(cluster) # Connect to distributed cluster and override default + +args = parser.parse_args() +filename = args.trace_file + +file_pattern = glob(filename) + +all_records = [] +start = time.time() +create_bag = dask.bag.from_delayed([dask.delayed(read_trace)(file) + for file in file_pattern]) +columns = {'name':"string", 'cat': "string", + 'pid': "string",'tid': "string", + 'dur': "uint64", 'ts': "uint64"} +events = create_bag.to_dataframe(meta=columns) +#events.head() +n_partition = 1 +events = events.repartition(npartitions=n_partition).persist() +progress(events) +_ = wait(events) + +end = time.time() +print(f"Loading Score-P trace took {end-start} seconds.") \ No newline at end of file