diff --git a/doc/user_guides/child_runs.png b/doc/user_guides/child_runs.png new file mode 100644 index 00000000..dc5db8e3 Binary files /dev/null and b/doc/user_guides/child_runs.png differ diff --git a/doc/user_guides/child_runs_trace.png b/doc/user_guides/child_runs_trace.png new file mode 100644 index 00000000..bac7154e Binary files /dev/null and b/doc/user_guides/child_runs_trace.png differ diff --git a/doc/user_guides/portal_guides.rst b/doc/user_guides/portal_guides.rst index 383c55b9..02c4435d 100644 --- a/doc/user_guides/portal_guides.rst +++ b/doc/user_guides/portal_guides.rst @@ -35,7 +35,11 @@ in either your :doc:`Platform Configuration File` or your Tracing ------- -IPS (version >= 0.6.0) has the ability to capture a trace of the +.. note:: + + New in IPS-Framework 0.6.0 + +IPS has the ability to capture a trace of the workflow to allow analysis and visualizations. The traces are captured in the `Zipkin Span format `_ and viewed within IPS portal using `Jaeger @@ -60,3 +64,39 @@ The statistics can be further broken down by operation. .. note:: Self time (ST) is the total time spent in a span when it was not waiting on children. For example, a 10ms span with two 4ms non-overlapping children would have self-time = 10ms - 2 * 4ms = 2ms. + + +Child Runs +---------- + +.. note:: + + New in IPS-Framework 0.7.0 + +If you have a workflow where you are running ``ips`` as a task of +another IPS simulation you can create a relation between them that +will allow it to be viewed together in the IPS-portal and get a single +trace for the entire collection. + +To setup the hierarchical structure between different IPS runs, so if +one run starts other runs as a separate simulation, you can set the +``PARENT_PORTAL_RUNID`` parameter in the child simulation +configuration. This can be done dynamically from the parent simulation +like: + +.. code-block:: python + + child_conf['PARENT_PORTAL_RUNID'] = self.services.get_config_param("PORTAL_RUNID") + +This is automatically configured when running +``ips_dakota_dynamic.py``. + +The child runs will not appear on the main runs list but will appear +on a tab next to the events. + +.. image:: child_runs.png + +The trace of the primary simulation will contain the traces from all +the simulations: + +.. image:: child_runs_trace.png diff --git a/ipsframework/configurationManager.py b/ipsframework/configurationManager.py index 6ed8fcbd..4499f49b 100644 --- a/ipsframework/configurationManager.py +++ b/ipsframework/configurationManager.py @@ -9,6 +9,7 @@ import uuid import logging import socket +import time from multiprocessing import Queue, Process, set_start_method from .configobj import ConfigObj from . import ipsLogging @@ -40,7 +41,8 @@ class SimulationData: entry in the configurationManager class """ - def __init__(self, sim_name): + def __init__(self, sim_name, start_time=time.time()): + self.start_time = start_time self.sim_name = sim_name self.portal_sim_name = None self.sim_root = None @@ -281,7 +283,7 @@ def initialize(self, data_mgr, resource_mgr, task_mgr): sim_name_list.append(sim_name) sim_root_list.append(sim_root) log_file_list.append(log_file) - new_sim = self.SimulationData(sim_name) + new_sim = self.SimulationData(sim_name, self.fwk.start_time) conf['__PORTAL_SIM_NAME'] = sim_name new_sim.sim_conf = conf new_sim.config_file = conf_file @@ -301,7 +303,7 @@ def initialize(self, data_mgr, resource_mgr, task_mgr): if not self.fwk_sim_name: fwk_sim_conf = conf.dict() fwk_sim_conf['SIM_NAME'] = '_'.join([conf['SIM_NAME'], 'FWK']) - fwk_sim = self.SimulationData(fwk_sim_conf['SIM_NAME']) + fwk_sim = self.SimulationData(fwk_sim_conf['SIM_NAME'], self.fwk.start_time) fwk_sim.sim_conf = fwk_sim_conf fwk_sim.sim_root = new_sim.sim_root fwk_sim.log_file = self.fwk.log_file # sys.stdout @@ -380,6 +382,7 @@ def _initialize_fwk_components(self): portal_conf['USER'] = self.sim_map[self.fwk_sim_name].sim_conf['USER'] except KeyError: portal_conf['USER'] = self.platform_conf['USER'] + portal_conf['HOST'] = self.platform_conf['HOST'] if self.fwk.log_level == logging.DEBUG: portal_conf['LOG_LEVEL'] = 'DEBUG' @@ -502,7 +505,6 @@ def _create_component(self, comp_conf, sim_data): # SIMYAN: removed else conditional, copying files in runspaceInit # component now - svc_response_q = Queue(0) invocation_q = Queue(0) component_id = ComponentID(class_name, sim_name) @@ -512,7 +514,7 @@ def _create_component(self, comp_conf, sim_data): services_proxy = ServicesProxy(self.fwk, fwk_inq, svc_response_q, sim_data.sim_conf, log_pipe_name) new_component = component_class(services_proxy, comp_conf) - new_component.__initialize__(component_id, invocation_q, self.fwk.start_time) + new_component.__initialize__(component_id, invocation_q, sim_data.start_time) services_proxy.__initialize__(new_component) self.comp_registry.addEntry(component_id, svc_response_q, invocation_q, new_component, @@ -643,7 +645,7 @@ def create_simulation(self, sim_name, config_file, override, sub_workflow=False) self.sim_name_list.append(sim_name) self.sim_root_list.append(sim_root) self.log_file_list.append(log_file) - new_sim = self.SimulationData(sim_name) + new_sim = self.SimulationData(sim_name, start_time=self.fwk.start_time if sub_workflow else time.time()) new_sim.sim_conf = conf new_sim.config_file = config_file new_sim.sim_root = sim_root diff --git a/ipsframework/dakota_bridge.py b/ipsframework/dakota_bridge.py index 214e2dd5..e78ad256 100644 --- a/ipsframework/dakota_bridge.py +++ b/ipsframework/dakota_bridge.py @@ -136,6 +136,7 @@ def step(self, timestamp=0, **keywords): # pragma: no cover self.old_master_conf['SIM_NAME'] = self.sim_name + '_%s' % (instance_id) self.old_master_conf['LOG_FILE'] = self.sim_logfile + '_%s' % (instance_id) self.old_master_conf['OUT_REDIRECT'] = 'TRUE' + self.old_master_conf['PARENT_PORTAL_RUNID'] = services.get_config_param("PORTAL_RUNID") fname = "%s.out" % (self.old_master_conf['SIM_NAME']) fname = os.path.join(self.sim_root, fname) self.old_master_conf['OUT_REDIRECT_FNAME'] = fname diff --git a/ipsframework/ips.py b/ipsframework/ips.py index 08bb8a34..75ca5205 100755 --- a/ipsframework/ips.py +++ b/ipsframework/ips.py @@ -603,7 +603,7 @@ def _send_monitor_event(self, sim_name='', eventType='', comment='', ok=True, ta portal_data['eventtype'] = eventType portal_data['ok'] = ok portal_data['comment'] = comment - portal_data['walltime'] = '%.2f' % (event_time - self.start_time) + portal_data['walltime'] = '%.2f' % (event_time - self.config_manager.sim_map[sim_name].start_time) portal_data['time'] = getTimeString(time.localtime(event_time)) topic_name = '_IPS_MONITOR' @@ -611,7 +611,7 @@ def _send_monitor_event(self, sim_name='', eventType='', comment='', ok=True, ta get_config = self.config_manager.get_config_parameter if eventType == 'IPS_START': portal_data['state'] = 'Running' - portal_data['host'] = get_config(sim_name, 'HOST') + portal_data['host'] = self.config_manager.get_platform_parameter('HOST') try: portal_data['outputprefix'] = get_config(sim_name, 'OUTPUT_PREFIX') except KeyError: @@ -647,18 +647,24 @@ def _send_monitor_event(self, sim_name='', eventType='', comment='', ok=True, ta portal_data['sim_runid'] = get_config(sim_name, 'RUN_ID') except KeyError: pass - portal_data['startat'] = getTimeString(time.localtime(self.start_time)) + portal_data['startat'] = getTimeString(time.localtime(self.config_manager.sim_map[sim_name].start_time)) portal_data['ips_version'] = get_versions()['version'] + + try: + portal_data['parent_portal_runid'] = get_config(sim_name, 'PARENT_PORTAL_RUNID') + except KeyError: + pass + elif eventType == 'IPS_END': portal_data['state'] = 'Completed' portal_data['stopat'] = getTimeString(time.localtime(event_time)) # Zipkin json format - portal_data['trace'] = {"timestamp": int(self.start_time*1e6), - "duration": int((event_time - self.start_time)*1e6), + portal_data['trace'] = {"timestamp": int(self.config_manager.sim_map[sim_name].start_time*1e6), + "duration": int((event_time - self.config_manager.sim_map[sim_name].start_time)*1e6), "localEndpoint": { - "serviceName": str(self.component_id) + "serviceName": f'{sim_name}@{self.component_id}' }, - "id": hashlib.md5(str(self.component_id).encode()).hexdigest()[:16], + "id": hashlib.md5(f'{sim_name}@{self.component_id}'.encode()).hexdigest()[:16], 'tags': {'total_cores': str(self.resource_manager.total_cores)}} elif eventType == "IPS_CALL_END": trace = {} # Zipkin json format @@ -669,7 +675,7 @@ def _send_monitor_event(self, sim_name='', eventType='', comment='', ok=True, ta trace['localEndpoint'] = {"serviceName": target} trace['name'] = operation trace['id'] = hashlib.md5(f"{target}:{operation}".encode()).hexdigest()[:16] - trace["parentId"] = hashlib.md5(str(self.component_id).encode()).hexdigest()[:16] + trace["parentId"] = hashlib.md5(f'{sim_name}@{self.component_id}'.encode()).hexdigest()[:16] if trace: portal_data['trace'] = trace diff --git a/ipsframework/portalBridge.py b/ipsframework/portalBridge.py index 6a1e3d21..128b88b9 100644 --- a/ipsframework/portalBridge.py +++ b/ipsframework/portalBridge.py @@ -81,7 +81,6 @@ def __init__(self, services, config): :py:class:`component.Component` object. """ super().__init__(services, config) - self.host = '' self.curTime = time.localtime() self.startTime = self.curTime self.sim_map = {} @@ -109,7 +108,6 @@ def init(self, timestamp=0.0, **keywords): self.portal_url = self.PORTAL_URL except AttributeError: pass - self.host = self.services.get_config_param('HOST') self.services.subscribe('_IPS_MONITOR', "process_event") try: freq = int(self.services.get_config_param("HTML_DUMP_FREQ", silent=True)) @@ -427,7 +425,7 @@ def init_simulation(self, sim_name, sim_root): d = datetime.datetime.now() date_str = "%s.%03d" % (d.strftime("%Y-%m-%dT%H:%M:%S"), int(d.microsecond / 1000)) - sim_data.portal_runid = "_".join([self.host, "USER", date_str]) + sim_data.portal_runid = "_".join([sim_name, getattr(self, "HOST"), getattr(self, "USER"), date_str]) try: self.services.set_config_param('PORTAL_RUNID', sim_data.portal_runid, target_sim_name=sim_name) @@ -445,8 +443,7 @@ def init_simulation(self, sim_name, sim_root): (sim_log_dir, oserr.errno, oserr.strerror)) raise - sim_data.monitor_file_name = os.path.join(sim_log_dir, - sim_data.sim_name + '_' + sim_data.portal_runid + '.eventlog') + sim_data.monitor_file_name = os.path.join(sim_log_dir, sim_data.portal_runid + '.eventlog') try: sim_data.monitor_file = open(sim_data.monitor_file_name, 'wb', 0) except IOError as oserr: diff --git a/tests/dakota/dakota_test_Gaussian.ips b/tests/dakota/dakota_test_Gaussian.ips index 1d193935..c3e32771 100644 --- a/tests/dakota/dakota_test_Gaussian.ips +++ b/tests/dakota/dakota_test_Gaussian.ips @@ -1,4 +1,4 @@ -RUN_ID = DAKOTA_Rosenbrock # Identifier for this simulation run +RUN_ID = DAKOTA_Gaussian # Identifier for this simulation run TOKAMAK_ID = TEST SHOT_NUMBER = 1 # Numerical identifier for specific case diff --git a/tests/dakota/test_dakota.py b/tests/dakota/test_dakota.py index 6d54d318..3b90a364 100644 --- a/tests/dakota/test_dakota.py +++ b/tests/dakota/test_dakota.py @@ -3,6 +3,7 @@ import shutil import glob import sys +import json import pytest from ipsframework import ips_dakota_dynamic @@ -48,6 +49,44 @@ def test_dakota(tmpdir): assert float(X) == pytest.approx(0.5, rel=1e-4) + # Check PARENT CHILD relationship + # Get parent PORTAL_RUNID + json_files = glob.glob(str(tmpdir.join("DAKOTA_Gaussian_TEST_1").join("simulation_log").join("*.json"))) + assert len(json_files) == 1 + + with open(json_files[0], 'r') as json_file: + lines = json_file.readlines() + + lines = [json.loads(line.strip()) for line in lines] + assert len(lines) == 9 + + # get portal_runid event + portal_runid = lines[0]['portal_runid'] + sim_name, host, user, _ = portal_runid.rsplit('_', maxsplit=3) + assert sim_name == "DAKOTA_Gaussian_TEST_1" + assert host == "workstation" + assert user == "user" + + # Check child run + json_files = glob.glob(str(tmpdir.join("DAKOTA_Gaussian_TEST_1").join("simulation_*_0000").join("simulation_log").join("*.json"))) + assert len(json_files) == 1 + with open(json_files[0], 'r') as json_file: + lines = json_file.readlines() + + lines = [json.loads(line.strip()) for line in lines] + assert len(lines) == 8 + child_portal_runid = lines[0]['portal_runid'] + assert child_portal_runid != portal_runid + + sim_name, host, user, _ = child_portal_runid.rsplit('_', maxsplit=3) + assert sim_name.startswith("DAKOTA_Gaussian_TEST_1") + assert len(sim_name) > len("DAKOTA_Gaussian_TEST_1") + assert host == "workstation" + assert user == "user" + + parent_portal_runid = lines[0]['parent_portal_runid'] + assert parent_portal_runid == portal_runid + @mock.patch('ipsframework.ips_dakota_dynamic.DakotaDynamic') def test_dakota_main(MockDakotaDynamic): diff --git a/tests/dakota/workstation.conf b/tests/dakota/workstation.conf index 8fb05faf..16c044a1 100644 --- a/tests/dakota/workstation.conf +++ b/tests/dakota/workstation.conf @@ -1,4 +1,5 @@ -HOST = my_laptop +HOST = workstation +USER = user MPIRUN = eval #######################################