Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhancement: Support analyzer to load traces with hashed fields #211

Merged
merged 16 commits into from
Oct 3, 2024
Merged
6 changes: 4 additions & 2 deletions dfanalyzer/dask/conf/corona.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,16 @@ dask:
job:
num_nodes: 1
wall_time_min: 60
cores: 48
env_id: FLUX_JOB_ID
queue: pdebug
scheduler:
cmd: srun -N ${DFTRACER_JOB_NUM_NODES} -t ${DFTRACER_JOB_WALL_TIME_MIN}
cmd: flux alloc -N ${DFTRACER_JOB_NUM_NODES} -t ${DFTRACER_JOB_WALL_TIME_MIN} -q ${DFTRACER_JOB_QUEUE}
port: 10005
kill: flux cancel --all
worker:
ppn: 48
cmd: srun -N ${DFTRACER_JOB_NUM_NODES} --ntasks-per-node=${DFTRACER_WORKER_PPN}
cmd: flux run -N ${DFTRACER_JOB_NUM_NODES} --tasks-per-node=${DFTRACER_WORKER_PPN} --cores=$((DFTRACER_JOB_CORES*DFTRACER_JOB_NUM_NODES))
per_core: 1
threads: 1
local_dir: /l/ssd/$USER/dask-workspace
Expand Down
154 changes: 90 additions & 64 deletions dfanalyzer/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,33 +166,61 @@ def load_objects(line, fn, time_granularity, time_approximate, condition_fn, loa
unicode_line = ''.join([i if ord(i) < 128 else '#' for i in line])
val = json.loads(unicode_line, strict=False)
logging.debug(f"Loading dict {val}")
if "name" in val and "cat" in val:
if "name" in val:
d["name"] = val["name"]
if "cat" in val:
d["cat"] = val["cat"]
d["pid"] = val["pid"]
d["tid"] = val["tid"]
val["dur"] = int(val["dur"])
val["ts"] = int(val["ts"])
d["ts"] = val["ts"]
d["dur"] = val["dur"]
d["te"] = d["ts"] + d["dur"]
if not time_approximate:
d["tinterval"] = I.to_string(I.closed(val["ts"] , val["ts"] + val["dur"]))
d["trange"] = int(((val["ts"] + val["dur"])/2.0) / time_granularity)
if "M" == val["ph"]:
if d["name"] == "FH":
d["type"] = 1 # 1-> file hash
if "args" in val and "name" in val["args"] and "value" in val["args"]:
d["name"] = val["args"]["name"]
d["hash"] = val["args"]["value"]
elif d["name"] == "HH":
d["type"] = 2 # 2-> hostname hash
if "args" in val and "name" in val["args"] and "value" in val["args"]:
d["name"] = val["args"]["name"]
d["hash"] = val["args"]["value"]
elif d["name"] == "SH":
d["type"] = 3 # 3-> string hash
if "args" in val and "name" in val["args"] and "value" in val["args"]:
d["name"] = val["args"]["name"]
d["hash"] = val["args"]["value"]
else:
d["type"] = 4 # 4-> others
if "args" in val and "name" in val["args"] and "value" in val["args"]:
d["name"] = val["args"]["name"]
d["value"] = str(val["args"]["value"])
else:
d["type"] = 0 # 0->regular event
if "pid" in val:
d["pid"] = val["pid"]
if "tid" in val:
d["tid"] = val["tid"]
if "dur" in val:
val["dur"] = int(val["dur"])
val["ts"] = int(val["ts"])
d["ts"] = val["ts"]
d["dur"] = val["dur"]
d["te"] = d["ts"] + d["dur"]
if not time_approximate:
d["tinterval"] = I.to_string(I.closed(val["ts"] , val["ts"] + val["dur"]))
d["trange"] = int(((val["ts"] + val["dur"])/2.0) / time_granularity)
d.update(io_function(val, d, time_approximate,condition_fn))
if fn:
user_d = fn(val, d, time_approximate,condition_fn, load_data)
if type(user_d) is list:
for user_dict in user_d[1:]:
yield user_dict
d.update(user_d[0])
else:
d.update(user_d)
logging.debug(f"built an dictionary for line {d}")
yield d
if fn:
user_d = fn(val, d, time_approximate,condition_fn, load_data)
if type(user_d) is list:
for user_dict in user_d[1:]:
yield user_dict
d.update(user_d[0])
else:
d.update(user_d)
logging.debug(f"built an dictionary for line {d}")
yield d
except ValueError as error:
logging.error(f"Processing {line} failed with {error}")
return {}

def io_function(json_object, current_dict, time_approximate,condition_fn):
d = {}
d["phase"] = 0
Expand Down Expand Up @@ -231,42 +259,34 @@ def io_function(json_object, current_dict, time_approximate,condition_fn):
d["total_time"] = I.to_string(I.empty())
d["io_time"] = I.to_string(I.empty())
if "args" in json_object:
if "fname" in json_object["args"]:
d["filename"] = json_object["args"]["fname"]
if "hostname" in json_object["args"]:
d["hostname"] = json_object["args"]["hostname"]
if "fhash" in json_object["args"]:
d["fhash"] = json_object["args"]["fhash"]
if "hhash" in json_object["args"]:
d["hhash"] = json_object["args"]["hhash"]

if "POSIX" == json_object["cat"] and "ret" in json_object["args"]:
if json_object["name"] == "write":
d["size"] = int(json_object["args"]["ret"])
elif json_object["name"] in ["read", "pread"]:
d["size"] = int(json_object["args"]["ret"])
elif json_object["name"] == "fwrite":
d["size"] = 1
if "ret" in json_object["args"]:
d["size"] *= int(json_object["args"]["ret"])
if "size" in json_object["args"]:
d["size"] *= int(json_object["args"]["size"])
elif json_object["name"] == "fread":
d["size"] = 1
if "ret" in json_object["args"]:
d["size"] *= int(json_object["args"]["ret"])
if "size" in json_object["args"]:
d["size"] *= int(json_object["args"]["size"])
size = int(json_object["args"]["ret"])
if size > 0:
if "write" in json_object["name"]:
d["size"] = size
elif "read" in json_object["name"] and "readdir" not in json_object["name"]:
d["size"] = size
else:
if "image_size" in json_object["args"]:
d["size"] = int(json_object["args"]["image_size"])
size = int(json_object["args"]["image_size"])
if size > 0:
d["size"] = size
return d

def io_columns():
conf = get_dft_configuration()
return {
'hostname': "string[pyarrow]",
'hhash': "uint32[pyarrow]",
hariharan-devarajan marked this conversation as resolved.
Show resolved Hide resolved
'compute_time': "string[pyarrow]" if not conf.time_approximate else "uint64[pyarrow]",
'io_time': "string[pyarrow]" if not conf.time_approximate else "uint64[pyarrow]",
'app_io_time': "string[pyarrow]" if not conf.time_approximate else "uint64[pyarrow]",
'total_time': "string[pyarrow]" if not conf.time_approximate else "uint64[pyarrow]",
'filename': "string[pyarrow]",
'fhash': "uint32[pyarrow]",

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
'fhash': "uint32[pyarrow]",
'fhash': "uint64[pyarrow]",

'phase': "uint16[pyarrow]",
'size': "uint64[pyarrow]"
}
Expand Down Expand Up @@ -360,7 +380,7 @@ def human_format_time(num):

class DFAnalyzer:

def __init__(self, file_pattern, load_fn=None, load_cols={}, load_data = {}):
def __init__(self, file_pattern, load_fn=None, load_cols={}, load_data = {}, metadata_cols = {}):

self.conf = get_dft_configuration()
if self.conf.dask_scheduler:
Expand Down Expand Up @@ -427,13 +447,35 @@ def __init__(self, file_pattern, load_fn=None, load_cols={}, load_data = {}):
elif len(pfw_pattern) > 0:
main_bag = pfw_bag
if main_bag:
columns = {'name': "string[pyarrow]", 'cat': "string[pyarrow]",
columns = {'name': "string[pyarrow]", 'cat': "string[pyarrow]",'type': "uint8[pyarrow]",
'pid': "uint64[pyarrow]", 'tid': "uint64[pyarrow]",
'ts': "uint64[pyarrow]", 'te': "uint64[pyarrow]", 'dur': "uint64[pyarrow]",
'tinterval': "string[pyarrow]" if not self.conf.time_approximate else "uint64[pyarrow]", 'trange': "uint64[pyarrow]"}
columns.update(io_columns())
columns.update(load_cols)
events = main_bag.to_dataframe(meta=columns)
file_hash_columns = {'name': "string[pyarrow]", 'hash':"string[pyarrow]"}
hostname_hash_columns = {'name': "string[pyarrow]", 'hash':"string[pyarrow]"}
string_hash_columns = {'name': "string[pyarrow]", 'hash':"string[pyarrow]"}
other_metadata_columns = { 'name':"string[pyarrow]" ,'value':"string[pyarrow]" }
hariharan-devarajan marked this conversation as resolved.
Show resolved Hide resolved
if "FH" in metadata_cols:
file_hash_columns.update(metadata_cols["FH"])
if "HH" in metadata_cols:
hostname_hash_columns.update(metadata_cols["HH"])
if "SH" in metadata_cols:
string_hash_columns.update(metadata_cols["SH"])
if "M" in metadata_cols:
other_metadata_columns.update(metadata_cols["M"])
columns.update(file_hash_columns)
columns.update(hostname_hash_columns)
columns.update(string_hash_columns)
columns.update(other_metadata_columns)

all_events = main_bag.to_dataframe(meta=columns)
events = all_events.query("type == 0")
self.file_hash = all_events.query("type == 1")[list(file_hash_columns.keys())].set_index('hash').persist()
self.host_hash = all_events.query("type == 2")[list(hostname_hash_columns.keys())].set_index('hash').persist()
self.string_hash = all_events.query("type == 3")[list(string_hash_columns.keys())].set_index('hash').persist()
self.metadata = all_events.query("type == 4")[list(other_metadata_columns.keys())].persist()
self.n_partition = math.ceil(total_size.compute() / (128 * 1024 ** 2))
hariharan-devarajan marked this conversation as resolved.
Show resolved Hide resolved
logging.debug(f"Number of partitions used are {self.n_partition}")
self.events = events.repartition(npartitions=self.n_partition).persist()
Expand All @@ -443,7 +485,7 @@ def __init__(self, file_pattern, load_fn=None, load_cols={}, load_data = {}):
self.events['trange'] = (self.events['ts'] // self.conf.time_granularity).astype('uint16[pyarrow]')
self.events = self.events.persist()

_ = wait(self.events)
_ = wait([self.file_hash, self.host_hash, self.string_hash, self.metadata, self.events])
else:
logging.error(f"Unable to load Traces")
exit(1)
Expand All @@ -463,12 +505,6 @@ def _calculate_time(self):
grouped_df = self.events.groupby(["trange", "pid", "tid"]) \
.agg({"compute_time": sum, "io_time": sum, "app_io_time": sum}) \
.groupby(["trange"]).max()
# check if the max io_time > time_granularity
max_io_time = grouped_df.max().compute()['io_time']
if max_io_time > self.conf.time_granularity:
# throw a warning, running with large granuality
logging.warn(f"The max io_time {max_io_time} exceeds the time_granularity {self.conf.time_granularity}. " \
f"Please adjust the time_granularity to {int(2 * max_io_time /1e6)}e6 and rerun the analyzer.")
grouped_df["io_time"] = grouped_df["io_time"].fillna(0)
hariharan-devarajan marked this conversation as resolved.
Show resolved Hide resolved
grouped_df["compute_time"] = grouped_df["compute_time"].fillna(0)
grouped_df["app_io_time"] = grouped_df["app_io_time"].fillna(0)
Expand Down Expand Up @@ -549,14 +585,6 @@ def _remove_numbers(self, string_items):
logging.info(f"List after removing numbers {list(item_sets)}")
return list(item_sets)

hariharan-devarajan marked this conversation as resolved.
Show resolved Hide resolved
def _check_hosts_time_skew(self):
# check if there is time skew across nodes
hosts_ts_df = self.events.groupby('hostname').agg({'ts': 'min'}).compute()
# filter the hosts if time skew exceeds 30 seconds
max_time_skew = 30e6
if np.std(hosts_ts_df['ts']) > max_time_skew:
logging.warn(f"The time skew exceeds {max_time_skew // 1e6} sec across hosts {hosts_ts_df.index.tolist()}")

def summary(self):
num_events = len(self.events)
logging.info(f"Total number of events in the workload are {num_events}")
Expand All @@ -575,8 +603,6 @@ def summary(self):

hosts_used = hosts_used.to_list()
#hosts_used_regex_str = self._create_host_intervals(hosts_used)
hariharan-devarajan marked this conversation as resolved.
Show resolved Hide resolved
if len(hosts_used) > 1:
self._check_hosts_time_skew()

filenames_accessed = filenames_accessed.to_list()
#filename_basename_regex_str = self._remove_numbers(filenames_accessed)
Expand Down
Loading