Skip to content

Commit f310a1e

Browse files
Merge pull request #172 from rosswhitfield/fix_trace_duplicate_step_call
Fix trace IDs when multiple calls to component method with same timestamp
2 parents 5d5cb8a + 646cdb7 commit f310a1e

File tree

7 files changed

+193
-15
lines changed

7 files changed

+193
-15
lines changed

ipsframework/component.py

+8-3
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ def __init__(self, services, config):
3232
self.__start_time = 0.0
3333
self.__sys_exit = None
3434
self.__method_name = None
35+
self.__call_id = 0
3536
self.__args = None
3637
for i in config.keys():
3738
try:
@@ -122,7 +123,7 @@ def __run__(self):
122123
msg = self.__invocation_q.get()
123124
self.services.log('Received Message ')
124125
sender_id = msg.sender_id
125-
call_id = msg.call_id
126+
self.__call_id = msg.call_id
126127
self.__method_name = msg.target_method
127128
self.__args = msg.args
128129
keywords = msg.keywords
@@ -140,12 +141,12 @@ def __run__(self):
140141
self.services.exception('Uncaught Exception in component method.')
141142
response_msg = MethodResultMessage(self.component_id,
142143
sender_id,
143-
call_id,
144+
self.call_id,
144145
Message.FAILURE, e)
145146
else:
146147
response_msg = MethodResultMessage(self.component_id,
147148
sender_id,
148-
call_id,
149+
self.call_id,
149150
Message.SUCCESS, retval)
150151
self.services.fwk_in_q.put(response_msg)
151152

@@ -169,6 +170,10 @@ def start_time(self):
169170
def method_name(self):
170171
return self.__method_name
171172

173+
@property
174+
def call_id(self):
175+
return self.__call_id
176+
172177
@property
173178
def args(self):
174179
return self.__args

ipsframework/ips.py

+4-3
Original file line numberDiff line numberDiff line change
@@ -517,7 +517,8 @@ def run(self):
517517
start_time=start_time,
518518
end_time=time.time(),
519519
target=comp,
520-
operation=f'{method}({arg})')
520+
operation=f'{method}({arg})',
521+
call_id=msg.call_id)
521522
sim_msg_list = self.call_queue_map[msg.call_id]
522523
del self.call_queue_map[msg.call_id]
523524
if msg.status == Message.FAILURE:
@@ -581,7 +582,7 @@ def initiate_new_simulation(self, sim_name):
581582
self.call_queue_map[call_id] = msg_list
582583
self.outstanding_calls_list[call_id] = sim_name, comp, method, arg, time.time()
583584

584-
def _send_monitor_event(self, sim_name='', eventType='', comment='', ok=True, target=None, operation=None, start_time=None, end_time=None):
585+
def _send_monitor_event(self, sim_name='', eventType='', comment='', ok=True, target=None, operation=None, start_time=None, end_time=None, call_id=0):
585586
"""
586587
Publish a portal monitor event to the *_IPS_MONITOR* event topic.
587588
Event topics that start with an underscore are reserved for use by the
@@ -674,7 +675,7 @@ def _send_monitor_event(self, sim_name='', eventType='', comment='', ok=True, ta
674675
if target is not None:
675676
trace['localEndpoint'] = {"serviceName": target}
676677
trace['name'] = operation
677-
trace['id'] = hashlib.md5(f"{target}:{operation}".encode()).hexdigest()[:16]
678+
trace['id'] = hashlib.md5(f"{target}:{operation}:{call_id}".encode()).hexdigest()[:16]
678679
trace["parentId"] = hashlib.md5(f'{sim_name}@{self.component_id}'.encode()).hexdigest()[:16]
679680

680681
if trace:

ipsframework/services.py

+20-7
Original file line numberDiff line numberDiff line change
@@ -399,7 +399,8 @@ def _send_monitor_event(self,
399399
target=None,
400400
operation=None,
401401
procs_requested=None,
402-
cores_allocated=None):
402+
cores_allocated=None,
403+
call_id=0):
403404
"""
404405
Construct and send an event populated with the component's
405406
information, *eventType*, *comment*, *ok*, *state*, and a wall time
@@ -427,9 +428,10 @@ def _send_monitor_event(self,
427428
trace['name'] = operation
428429
formatted_args = ['%.3f' % (x) if isinstance(x, float)
429430
else str(x) for x in self.component_ref.args]
430-
trace['id'] = hashlib.md5(f"{target}:{operation}".encode()).hexdigest()[:16]
431-
trace['parentId'] = hashlib.md5(f"{self.component_ref.component_id}:{self.component_ref.method_name}({' ,'.join(formatted_args)})"
432-
.encode()).hexdigest()[:16]
431+
trace['id'] = hashlib.md5(f"{target}:{operation}:{call_id}".encode()).hexdigest()[:16]
432+
trace['parentId'] = hashlib.md5(
433+
f"{self.component_ref.component_id}:{self.component_ref.method_name}({' ,'.join(formatted_args)}):{self.component_ref.call_id}"
434+
.encode()).hexdigest()[:16]
433435
trace['tags'] = {}
434436
if procs_requested is not None:
435437
trace['tags']['procs_requested'] = str(procs_requested)
@@ -549,7 +551,8 @@ def wait_call(self, call_id, block=True):
549551
end_time=time.time(),
550552
elapsed_time=time.time()-start_time,
551553
target=target,
552-
operation=f'{method_name}({formatted_args})')
554+
operation=f'{method_name}({formatted_args})',
555+
call_id=call_id)
553556
except Exception as e:
554557
self._send_monitor_event('IPS_CALL_END',
555558
f'Error: "{e}" Target = {target_full}',
@@ -558,6 +561,7 @@ def wait_call(self, call_id, block=True):
558561
elapsed_time=time.time()-start_time,
559562
target=target,
560563
operation=f'{method_name}({formatted_args})',
564+
call_id=call_id,
561565
ok=False)
562566
raise
563567

@@ -929,7 +933,15 @@ def wait_task(self, task_id, timeout=-1, delay=1):
929933
process.kill()
930934
task_retval = process.wait()
931935
self._send_monitor_event('IPS_TASK_END', 'task_id = %s TIMEOUT elapsed time = %.2f S' %
932-
(str(task_id), finish_time - start_time))
936+
(str(task_id), finish_time - start_time),
937+
start_time=start_time,
938+
end_time=finish_time,
939+
elapsed_time=finish_time - start_time,
940+
procs_requested=nproc,
941+
cores_allocated=cores,
942+
target=binary,
943+
operation=" ".join(args),
944+
call_id=task_id)
933945
else:
934946
self._send_monitor_event('IPS_TASK_END', 'task_id = %s elapsed time = %.2f S' %
935947
(str(task_id), finish_time - start_time),
@@ -939,7 +951,8 @@ def wait_task(self, task_id, timeout=-1, delay=1):
939951
procs_requested=nproc,
940952
cores_allocated=cores,
941953
target=binary,
942-
operation=" ".join(args))
954+
operation=" ".join(args),
955+
call_id=task_id)
943956

944957
del self.task_map[task_id]
945958
try:
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
from ipsframework import Component
2+
3+
4+
class driver(Component):
5+
def step(self, timestamp=0.0, **keywords):
6+
w = self.services.get_port('WORKER')
7+
# call the same worker step twice to check that the trace is correct
8+
self.services.call(w, 'step', 0)
9+
self.services.call(w, 'step', 0)
+14
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
# -------------------------------------------------------------------------------
2+
# Copyright 2006-2022 UT-Battelle, LLC. See LICENSE for more information.
3+
# -------------------------------------------------------------------------------
4+
from ipsframework import Component
5+
6+
7+
class simple_sleep(Component):
8+
def step(self, timestamp=0.0, **keywords):
9+
self.services.wait_task(
10+
self.services.launch_task(1,
11+
"/tmp",
12+
"/bin/sleep",
13+
1)
14+
)

tests/helloworld/test_helloworld.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -371,10 +371,10 @@ def handle(self):
371371
assert 'duration' in trace
372372
assert 'timestamp' in trace
373373
assert 'id' in trace
374-
assert trace['id'] == hashlib.md5('Hello_world_1@HelloWorker@2:init(0.000)'.encode()).hexdigest()[:16]
374+
assert trace['id'] == hashlib.md5('Hello_world_1@HelloWorker@2:init(0.000):7'.encode()).hexdigest()[:16]
375375
assert 'traceId' in trace
376376
assert trace['traceId'] == hashlib.md5(event['portal_runid'].encode()).hexdigest()
377377
assert 'parentId' in trace
378-
assert trace['parentId'] == hashlib.md5('Hello_world_1@HelloDriver@1:init(0)'.encode()).hexdigest()[:16]
378+
assert trace['parentId'] == hashlib.md5('Hello_world_1@HelloDriver@1:init(0):5'.encode()).hexdigest()[:16]
379379
assert 'localEndpoint' in trace
380380
assert trace['localEndpoint']['serviceName'] == 'Hello_world_1@HelloWorker@2'

tests/new/test_trace.py

+136
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
import glob
2+
import json
3+
import hashlib
4+
from ipsframework import Framework
5+
6+
7+
def write_basic_config_and_platform_files(tmpdir, timeout='', logfile='', errfile='', nproc=1, exe='/bin/sleep', value='', shifter=False):
8+
platform_file = tmpdir.join('platform.conf')
9+
10+
platform = """MPIRUN = eval
11+
NODE_DETECTION = manual
12+
CORES_PER_NODE = 2
13+
SOCKETS_PER_NODE = 1
14+
NODE_ALLOCATION_MODE = shared
15+
HOST =
16+
SCRATCH =
17+
"""
18+
19+
with open(platform_file, 'w') as f:
20+
f.write(platform)
21+
22+
config_file = tmpdir.join('ips.config')
23+
24+
config = f"""RUN_COMMENT = trace testing
25+
SIM_NAME = trace
26+
LOG_FILE = {str(tmpdir)}/sim.log
27+
LOG_LEVEL = INFO
28+
SIM_ROOT = {str(tmpdir)}
29+
SIMULATION_MODE = NORMAL
30+
[PORTS]
31+
NAMES = DRIVER WORKER
32+
[[DRIVER]]
33+
IMPLEMENTATION = DRIVER
34+
[[WORKER]]
35+
IMPLEMENTATION = WORKER
36+
[DRIVER]
37+
CLASS = DRIVER
38+
SUB_CLASS =
39+
NAME = driver
40+
BIN_PATH =
41+
NPROC = 1
42+
INPUT_FILES =
43+
OUTPUT_FILES =
44+
SCRIPT =
45+
MODULE = components.drivers.driver_double_trace
46+
[WORKER]
47+
CLASS = WORKER
48+
SUB_CLASS =
49+
NAME = simple_sleep
50+
NPROC = 1
51+
BIN_PATH =
52+
INPUT_FILES =
53+
OUTPUT_FILES =
54+
SCRIPT =
55+
MODULE = components.workers.simple_sleep
56+
"""
57+
58+
with open(config_file, 'w') as f:
59+
f.write(config)
60+
61+
return platform_file, config_file
62+
63+
64+
def test_trace_info(tmpdir):
65+
platform_file, config_file = write_basic_config_and_platform_files(tmpdir, value=1)
66+
67+
framework = Framework(config_file_list=[str(config_file)],
68+
log_file_name=str(tmpdir.join('ips.log')),
69+
platform_file_name=str(platform_file),
70+
debug=None,
71+
verbose_debug=None,
72+
cmd_nodes=0,
73+
cmd_ppn=0)
74+
75+
framework.run()
76+
77+
# check simulation_log, make sure it includes events from dask tasks
78+
json_files = glob.glob(str(tmpdir.join("simulation_log").join("*.json")))
79+
assert len(json_files) == 1
80+
with open(json_files[0], 'r') as json_file:
81+
lines = json_file.readlines()
82+
lines = [json.loads(line.strip()) for line in lines]
83+
assert len(lines) == 17
84+
85+
portal_runid = lines[0]['portal_runid']
86+
87+
traces = [e['trace'] for e in lines if "trace" in e]
88+
89+
assert len(traces) == 8
90+
91+
call_ids = [5, 1, 8, 2, 9, 7, 10, None]
92+
service_names = ['trace@driver@1',
93+
'/bin/sleep',
94+
'trace@simple_sleep@2',
95+
'/bin/sleep',
96+
'trace@simple_sleep@2',
97+
'trace@driver@1',
98+
'trace@driver@1',
99+
'trace@FRAMEWORK@Framework@0']
100+
names = ['init(0)',
101+
'1',
102+
'step(0)',
103+
'1',
104+
'step(0)',
105+
'step(0)',
106+
'finalize(0)',
107+
None]
108+
tags = [None,
109+
{"procs_requested": "1", "cores_allocated": "1"},
110+
{},
111+
{"procs_requested": "1", "cores_allocated": "1"},
112+
{},
113+
None,
114+
None,
115+
{'total_cores': '2'}]
116+
parents = [7, 2, 5, 4, 5, 7, 7, None]
117+
118+
for n, trace in enumerate(traces):
119+
assert isinstance(trace['timestamp'], int)
120+
assert isinstance(trace['duration'], int)
121+
assert trace['traceId'] == hashlib.md5(portal_runid.encode()).hexdigest()
122+
assert trace['localEndpoint']['serviceName'] == service_names[n]
123+
assert "id" in trace
124+
assert trace.get('tags') == tags[n]
125+
126+
if names[n]:
127+
assert trace['name'] == names[n]
128+
assert trace['id'] == hashlib.md5(f"{trace['localEndpoint']['serviceName']}:{trace['name']}:{call_ids[n]}".encode()).hexdigest()[:16]
129+
else:
130+
assert trace['id'] == hashlib.md5(f"{trace['localEndpoint']['serviceName']}".encode()).hexdigest()[:16]
131+
132+
if parents[n]:
133+
if names[parents[n]]:
134+
assert trace['parentId'] == hashlib.md5(f"{service_names[parents[n]]}:{names[parents[n]]}:{call_ids[parents[n]]}".encode()).hexdigest()[:16]
135+
else:
136+
assert trace['parentId'] == hashlib.md5(f"{service_names[parents[n]]}".encode()).hexdigest()[:16]

0 commit comments

Comments
 (0)