Skip to content

Commit

Permalink
Release v1.0.0 (#102)
Browse files Browse the repository at this point in the history
* added scripts for AD (#99)

* Paper/ad ae (#100)

* added scripts for AD

* Improved competitors

* fixed manifest file for pypi source release. (#101)
  • Loading branch information
hariharan-devarajan authored Jun 26, 2024
1 parent aae7dee commit b6df57d
Show file tree
Hide file tree
Showing 4 changed files with 285 additions and 1 deletion.
2 changes: 1 addition & 1 deletion MANIFEST.in
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
graft src
graft CMake
graft cmake
graft dependency
graft include

Expand Down
95 changes: 95 additions & 0 deletions test/paper/load_darshan.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
from glob import glob
import pandas as pd
print(f"pd {pd.__version__}")
import dask
import dask.dataframe as dd
from dask.distributed import Client, LocalCluster, progress, wait
print(f"dask {dask.__version__}")
import pyarrow as pa
print(f"pa {pa.__version__}")
import logging
from glob import glob
import argparse
import time


import otf2
from otf2.events import *

logging.basicConfig(filename='recorder_main.log', encoding='utf-8', level=logging.DEBUG)


def generate_darshan_records(log_file):
def get_dict(row):
d = {}
d["size"] = row["length"]
d["ts"] = int(row["start_time"] * 10e6)
d["dur"] = int(row["end_time"] * 10e6) - d["ts"]
d["tinterval"] = I.to_string(I.closed(d["ts"] , d["ts"] + d["dur"]))
d["trange"] = int(((d["ts"] + d["dur"])/2.0) / time_granularity)
d["phase"] = 2
d["compute_time"] = I.to_string(I.empty())
d["io_time"] = d["tinterval"]
return d
report = darshan.DarshanReport(log_file, read_all=True)
if "DXT_POSIX" in report.modules:
time_granularity=10e3
for val in report.records['DXT_POSIX'].to_df():
d = {}
fileid = val["id"]
write_df = val["write_segments"]
read_df = val["read_segments"]
d["hostname"] = val["hostname"]
d["pid"] = val["rank"]
d["tid"] = 0
d["cat"] = "POSIX"
d["filename"] = report.data['name_records'][fileid]
for index, row in write_df.iterrows():
d["name"] = "write"
d.update(get_dict(row))
yield d
for index, row in read_df.iterrows():
d["name"] = "read"
d.update(get_dict(row))
yield d

parser = argparse.ArgumentParser(
description="Time functions and print time spent in each function",
formatter_class=argparse.RawDescriptionHelpFormatter,
)
parser.add_argument("trace_file", help="Trace file to load", type=str)

parser.add_argument("--workers", help="Number of workers", type=int, default=1)
args = parser.parse_args()
filename = args.trace_file

cluster = LocalCluster(n_workers=args.workers) # Launches a scheduler and workers locally
client = Client(cluster) # Connect to distributed cluster and override default

args = parser.parse_args()
filename = args.trace_file


file_pattern = glob(filename)

all_records = []
start = time.time()

create_bag = dask.bag.from_delayed([dask.delayed(generate_darshan_records)(file)
for file in file_pattern])
columns = {'name':"string[pyarrow]", 'cat': "string[pyarrow]",
'pid': "uint64[pyarrow]",'tid': "uint64[pyarrow]",
'dur': "uint64[pyarrow]", 'tinterval': "string[pyarrow]",
'trange': "uint64[pyarrow]", 'hostname': "string[pyarrow]",
'compute_time': "string[pyarrow]", 'io_time': "string[pyarrow]",
'filename': "string[pyarrow]", 'phase': "uint16[pyarrow]",
'size': "uint64[pyarrow]"}
events = create_bag.to_dataframe(meta=columns)

n_partition = 1
events = events.repartition(npartitions=n_partition).persist()
progress(events)
_ = wait(events)

end = time.time()
print(f"Loading Darshan trace took {end-start} seconds.")
80 changes: 80 additions & 0 deletions test/paper/load_recorder.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
from glob import glob
import pandas as pd
print(f"pd {pd.__version__}")

import dask
import dask.dataframe as dd
from dask.distributed import Client, LocalCluster, progress, wait
print(f"dask {dask.__version__}")
import pyarrow as pa
print(f"pa {pa.__version__}")

import logging
from glob import glob
import argparse
import time

import recorder_viz
from recorder_viz import RecorderReader

logging.basicConfig(filename='darshan_main.log', encoding='utf-8', level=logging.DEBUG)


def get_json(func, ts, dur, rank):
d = {}
#print(location, start)
d["name"] = func
d["cat"] = "Recorder"
d["ts"] = int(ts)
d["dur"] = int(dur)
d["pid"] = rank
d["tid"] = 0
#print(d)
return d


def read_trace(trace_name):
map_events = {}
count = 0
reader = RecorderReader(trace_name)
func_list = reader.funcs
for rank, records in enumerate(reader.records):
lm = reader.LMs[rank]
for record in records:
if len(func_list) > record.func_id:
func_name = func_list[record.func_id]
if record.func_id > 0 and "MPI" not in func_name:
yield get_json(func_name, 0, 10, rank)

parser = argparse.ArgumentParser(
description="Time functions and print time spent in each function",
formatter_class=argparse.RawDescriptionHelpFormatter,
)
parser.add_argument("trace_file", help="Trace file to load", type=str)

parser.add_argument("--workers", help="Number of workers", type=int, default=1)
args = parser.parse_args()
filename = args.trace_file

cluster = LocalCluster(n_workers=args.workers) # Launches a scheduler and workers locally
client = Client(cluster) # Connect to distributed cluster and override default

file_pattern = glob(filename)

all_records = []
start = time.time()

create_bag = dask.bag.from_delayed([dask.delayed(read_trace)(file)
for file in file_pattern])
columns = {'name':"string", 'cat': "string",
'pid': "string",'tid': "string",
'dur': "uint64", 'ts': "uint64"}
events = create_bag.to_dataframe(meta=columns)
#events.head()
n_partition = 1
events = events.repartition(npartitions=n_partition).persist()
progress(events)
_ = wait(events)

end = time.time()
print(f"Loading Recorder trace took {end-start} seconds.")
109 changes: 109 additions & 0 deletions test/paper/load_scorep.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
from glob import glob
import pandas as pd
print(f"pd {pd.__version__}")

import dask
import dask.dataframe as dd
from dask.distributed import Client, LocalCluster, progress, wait
print(f"dask {dask.__version__}")
import pyarrow as pa
print(f"pa {pa.__version__}")

import logging
from glob import glob
import argparse
import time

import otf2
from otf2.events import *


logging.basicConfig(filename='score-p_main.log', encoding='utf-8', level=logging.DEBUG)

def get_json(location, start, end):
d = {}
#print(location, start)
d["name"] = start.region.name
d["cat"] = start.region.region_role
d["ts"] = start.time
d["dur"] = end.time - start.time
return d
def get_json_one(location, start):
d = {}
#print(location.group, start)
if hasattr(start, 'region'):
d["name"] = start.region.name
d["cat"] = start.region.region_role
else:
d["name"] = start.__class__
d["cat"] = "Program"
d["ts"] = start.time
d["dur"] = 0
d["tid"] = location.name
d["pid"] = location.group.name
return d


def read_trace(trace_name):
map_events = {}
count = 0
with otf2.reader.open(trace_name) as trace:
#print("Read {} string definitions".format(len(trace.definitions.strings)))
for location, event in trace.events:
if isinstance(event, Enter):
unique_id = (location, event.region)
map_events[unique_id] = [event]
#print(f"Encountered enter event into {event.region} on location {location.group} at {event.attributes}")
elif isinstance(event, Leave):
unique_id = (location, event.region)
if unique_id in map_events:
map_events[unique_id].append(event)
else:
map_events[unique_id] = [event]
#print(f"Encountered enter event int")
if len(map_events[unique_id]) == 2:
yield dict(**get_json(location = location, start = map_events[unique_id][0], end = map_events[unique_id][1]))
elif len(map_events[unique_id]) == 1:
yield dict(**get_json_one(location = location, start = map_events[unique_id][0]))
del map_events[unique_id]
#print(f"Encountered leave event for {event.region} on location {location} at {event}")
else:
yield dict(**get_json_one(location = location, start = event))
#print(f"Encountered event on location {location} at {event}")
count = count + 1
if count % 1000 == 0:
print(f"Done {count} in {time.time() - start}", end="\r")

parser = argparse.ArgumentParser(
description="Time functions and print time spent in each function",
formatter_class=argparse.RawDescriptionHelpFormatter,
)
parser.add_argument("trace_file", help="Trace file to load", type=str)
parser.add_argument("--workers", help="Number of workers", type=int, default=1)
args = parser.parse_args()
filename = args.trace_file

cluster = LocalCluster(n_workers=args.workers) # Launches a scheduler and workers locally
client = Client(cluster) # Connect to distributed cluster and override default

args = parser.parse_args()
filename = args.trace_file

file_pattern = glob(filename)

all_records = []
start = time.time()
create_bag = dask.bag.from_delayed([dask.delayed(read_trace)(file)
for file in file_pattern])
columns = {'name':"string", 'cat': "string",
'pid': "string",'tid': "string",
'dur': "uint64", 'ts': "uint64"}
events = create_bag.to_dataframe(meta=columns)
#events.head()
n_partition = 1
events = events.repartition(npartitions=n_partition).persist()
progress(events)
_ = wait(events)

end = time.time()
print(f"Loading Score-P trace took {end-start} seconds.")

0 comments on commit b6df57d

Please sign in to comment.