diff --git a/bokeh-plots.ipynb b/bokeh-plots.ipynb new file mode 100644 index 0000000..17baff7 --- /dev/null +++ b/bokeh-plots.ipynb @@ -0,0 +1,124 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "ebc12009", + "metadata": {}, + "source": [ + "# Next cell generated by IPS Framework" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "41186aa1", + "metadata": {}, + "outputs": [], + "source": [ + "### This cell autogenerated by IPS Framework. DO NOT EDIT UNTIL IPS RUN IS FINALIZED. ###\n", + "\n", + "import os\n", + "\n", + "# NOTE: directory should be sim_name plus the run id from the Portal\n", + "IPS_DATA_DIR = '/tmp/ipsframework/runs/localhost_5000/1/data/'\n", + "# Uncomment below line to implicitly use any state files saved in the data directory, note that the IPS framework explicitly lists out each file used\n", + "#IPS_STATE_FILES = os.listdir('data')\n", + "# files created during the run\n", + "IPS_STATE_FILES = [\n", + "f'{IPS_DATA_DIR}1.0',\n", + "f'{IPS_DATA_DIR}1.9666666666666668',\n", + "f'{IPS_DATA_DIR}2.9333333333333336',\n", + "f'{IPS_DATA_DIR}3.9',\n", + "f'{IPS_DATA_DIR}4.866666666666667',\n", + "f'{IPS_DATA_DIR}5.833333333333333',\n", + "f'{IPS_DATA_DIR}6.8',\n", + "f'{IPS_DATA_DIR}7.766666666666667',\n", + "f'{IPS_DATA_DIR}8.733333333333334',\n", + "f'{IPS_DATA_DIR}9.7',\n", + "f'{IPS_DATA_DIR}10.666666666666666',\n", + "f'{IPS_DATA_DIR}11.633333333333333',\n", + "f'{IPS_DATA_DIR}12.6',\n", + "f'{IPS_DATA_DIR}13.566666666666666',\n", + "f'{IPS_DATA_DIR}14.533333333333333',\n", + "f'{IPS_DATA_DIR}15.5',\n", + "f'{IPS_DATA_DIR}16.46666666666667',\n", + "f'{IPS_DATA_DIR}17.433333333333334',\n", + "f'{IPS_DATA_DIR}18.4',\n", + "f'{IPS_DATA_DIR}19.366666666666667',\n", + "f'{IPS_DATA_DIR}20.333333333333332',\n", + "f'{IPS_DATA_DIR}21.3',\n", + "f'{IPS_DATA_DIR}22.266666666666666',\n", + "f'{IPS_DATA_DIR}23.233333333333334',\n", + "f'{IPS_DATA_DIR}24.2',\n", + "f'{IPS_DATA_DIR}25.166666666666668',\n", + "f'{IPS_DATA_DIR}26.133333333333333',\n", + "f'{IPS_DATA_DIR}27.1',\n", + "f'{IPS_DATA_DIR}28.066666666666666',\n", + "f'{IPS_DATA_DIR}29.033333333333335',\n", + "f'{IPS_DATA_DIR}30.0',\n", + "]\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "5d75faa3", + "metadata": {}, + "outputs": [], + "source": [ + "# Notebook template, the IPS Framework will add a cell before this one\n", + "# defining IPS_STATE_FILES as a list of state file paths.\n", + "\n", + "# In this example, this notebook is only generated at the end of the run.\n", + "\n", + "# Bokeh is installed by default on the NERSC Jupyter environment, so works well for this example.\n", + "from bokeh.io import output_notebook\n", + "from bokeh.plotting import figure, show\n", + "import json\n", + "\n", + "output_notebook()\n", + "\n", + "DATA = []\n", + "# create DATA list, will depend on user input type (i.e. 'hdf5', 'json')\n", + "for file in IPS_STATE_FILES:\n", + " with open(file, 'rb') as f:\n", + " DATA.append(json.load(f))\n", + "x = [float(f.rpartition('/')[2]) for f in IPS_STATE_FILES]\n", + "\n", + "COLORS = ['red', 'green', 'blue']\n", + "\n", + "GRAPHS = [\n", + " {'title': 'IPS full results', 'graph_format': 'line', 'data_paths': [['y1'], ['y2'], ['y3']]},\n", + " {'title': 'IPS partial results', 'graph_format': 'line', 'data_paths': [['y1']]},\n", + "]\n", + "\n", + "# get data function - will depend on user input (i.e. 'hdf5', 'json', etc.)\n", + "def get_data(d, prop_path):\n", + " data = d[prop_path[0]]\n", + " for inner in prop_path[1:]:\n", + " data = data[inner]\n", + " return data\n", + "\n", + "# generate two simple line graphs from data\n", + "for g in GRAPHS:\n", + " paths = g['data_paths']\n", + " \n", + " graph = figure(title=g['title'])\n", + " graph.xaxis.axis_label = 'IPS Timestep'\n", + " graph.yaxis.axis_label = 'data values'\n", + " \n", + " for idx, prop in enumerate(paths):\n", + " y = [get_data(d, prop) for d in DATA]\n", + " graph.line(x, y, line_color=COLORS[idx % len(COLORS)], line_dash='solid', legend_label='_'.join(prop))\n", + " show(graph)\n" + ] + } + ], + "metadata": { + "language_info": { + "name": "python" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/examples-proposed/004-time-loop/mymodule/components.py b/examples-proposed/004-time-loop/mymodule/components.py index fd079c5..583c176 100644 --- a/examples-proposed/004-time-loop/mymodule/components.py +++ b/examples-proposed/004-time-loop/mymodule/components.py @@ -1,14 +1,18 @@ import json import math +import os import random +import time from sys import stderr from ipsframework import Component -NOTEBOOK_1_TEMPLATE = 'base-notebook-iterative.ipynb' -NOTEBOOK_1_NAME = 'full_state_iterative.ipynb' -NOTEBOOK_2_TEMPLATE = 'base-notebook-one-pass.ipynb' -NOTEBOOK_2_NAME = 'full_state_one_pass.ipynb' +DELAY = bool(os.environ.get('EXAMPLE_DELAY')) + +NOTEBOOK_1_TEMPLATE = 'basic.ipynb' +NOTEBOOK_1_NAME = 'basic.ipynb' +NOTEBOOK_2_TEMPLATE = 'bokeh-plots.ipynb' +NOTEBOOK_2_NAME = 'bokeh-plots.ipynb' class Init(Component): @@ -28,12 +32,16 @@ def step(self, timestamp=0.0): # Needed for notebook template self.services.stage_input_files([NOTEBOOK_1_TEMPLATE, NOTEBOOK_2_TEMPLATE]) - # Example of a notebook we want to initialize and then periodically append to during the run + # Example of initializing two separate notebooks + # Both notebooks should be initialized before the time loop and appended to inside the time loop self.services.initialize_jupyter_notebook( dest_notebook_name=NOTEBOOK_1_NAME, # path is relative to JupyterHub directory source_notebook_path=NOTEBOOK_1_TEMPLATE, # path is relative to input directory ) - # Initialize second notebook + self.services.initialize_jupyter_notebook( + dest_notebook_name=NOTEBOOK_2_NAME, # path is relative to JupyterHub directory + source_notebook_path=NOTEBOOK_2_TEMPLATE, # path is relative to input directory + ) # The time loop is configured in its own section of sim.conf # It is shared across all components @@ -86,6 +94,9 @@ class Monitor(Component): def step(self, timestamp=0.0, **keywords): msg = f'Running Monitor step with timestamp={timestamp}' print(msg, file=stderr) + if DELAY: + print('simulating fake delay for 10 seconds', file=stderr) + time.sleep(10.0) self.services.send_portal_event(event_comment=msg) self.services.stage_state() @@ -96,8 +107,7 @@ def step(self, timestamp=0.0, **keywords): # stage the state file in the JupyterHub directory self.services.add_data_file_to_notebook(state_file, timestamp, NOTEBOOK_1_NAME) + self.services.add_data_file_to_notebook(state_file, timestamp, NOTEBOOK_2_NAME) print('SEND PORTAL DATA', timestamp, data, file=stderr) self.services.send_portal_data(timestamp, data) - - # TODO add a basic sleep to this example for demonstration purposes diff --git a/examples-proposed/004-time-loop/run-delayed.sh b/examples-proposed/004-time-loop/run-delayed.sh new file mode 100755 index 0000000..34e0655 --- /dev/null +++ b/examples-proposed/004-time-loop/run-delayed.sh @@ -0,0 +1 @@ +PYTHONPATH=$PWD PSCRATCH=${PSCRATCH:-/tmp} EXAMPLE_DELAY=true ips.py --config=sim.conf --platform=platform.conf --log=ips.log #--debug --verbose diff --git a/examples-proposed/004-time-loop/sim/input_dir/base-notebook-one-pass.ipynb b/examples-proposed/004-time-loop/sim/input_dir/base-notebook-one-pass.ipynb deleted file mode 100644 index 02b9626..0000000 --- a/examples-proposed/004-time-loop/sim/input_dir/base-notebook-one-pass.ipynb +++ /dev/null @@ -1,30 +0,0 @@ -{ - "cells": [ - { - "cell_type": "code", - "execution_count": null, - "id": "5d75faa3", - "metadata": {}, - "outputs": [], - "source": [ - "# Notebook template, the IPS Framework will add a cell before this one\n", - "# defining IPS_STATE_FILES as a list of state file paths.\n", - "\n", - "# In this example, this notebook is only generated at the end of the run.\n", - "\n", - "mapping = {}\n", - "for file in IPS_STATE_FILES:\n", - " with open(file, 'rb') as f:\n", - " mapping[file] = f.read()\n", - "print(mapping)\n" - ] - } - ], - "metadata": { - "language_info": { - "name": "python" - } - }, - "nbformat": 4, - "nbformat_minor": 5 -} diff --git a/examples-proposed/004-time-loop/sim/input_dir/base-notebook-iterative.ipynb b/examples-proposed/004-time-loop/sim/input_dir/basic.ipynb similarity index 100% rename from examples-proposed/004-time-loop/sim/input_dir/base-notebook-iterative.ipynb rename to examples-proposed/004-time-loop/sim/input_dir/basic.ipynb diff --git a/examples-proposed/004-time-loop/sim/input_dir/bokeh-plots.ipynb b/examples-proposed/004-time-loop/sim/input_dir/bokeh-plots.ipynb new file mode 100644 index 0000000..c01b539 --- /dev/null +++ b/examples-proposed/004-time-loop/sim/input_dir/bokeh-plots.ipynb @@ -0,0 +1,65 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": null, + "id": "5d75faa3", + "metadata": {}, + "outputs": [], + "source": [ + "# Notebook template, the IPS Framework will add a cell before this one\n", + "# defining IPS_STATE_FILES as a list of state file paths.\n", + "\n", + "# In this example, this notebook is only generated at the end of the run.\n", + "\n", + "# Bokeh is installed by default on the NERSC Jupyter environment, so works well for this example.\n", + "from bokeh.io import output_notebook\n", + "from bokeh.plotting import figure, show\n", + "import json\n", + "\n", + "output_notebook()\n", + "\n", + "DATA = []\n", + "# create DATA list, will depend on user input type (i.e. 'hdf5', 'json')\n", + "for file in IPS_STATE_FILES:\n", + " with open(file, 'rb') as f:\n", + " DATA.append(json.load(f))\n", + "x = [float(f.rpartition('/')[2]) for f in IPS_STATE_FILES]\n", + "\n", + "COLORS = ['red', 'green', 'blue']\n", + "\n", + "GRAPHS = [\n", + " {'title': 'IPS full results', 'graph_format': 'line', 'data_paths': [['y1'], ['y2'], ['y3']]},\n", + " {'title': 'IPS partial results', 'graph_format': 'line', 'data_paths': [['y1']]},\n", + "]\n", + "\n", + "# get data function - will depend on user input (i.e. 'hdf5', 'json', etc.)\n", + "def get_data(d, prop_path):\n", + " data = d[prop_path[0]]\n", + " for inner in prop_path[1:]:\n", + " data = data[inner]\n", + " return data\n", + "\n", + "# generate two simple line graphs from data\n", + "for g in GRAPHS:\n", + " paths = g['data_paths']\n", + " \n", + " graph = figure(title=g['title'])\n", + " graph.xaxis.axis_label = 'IPS Timestep'\n", + " graph.yaxis.axis_label = 'data values'\n", + " \n", + " for idx, prop in enumerate(paths):\n", + " y = [get_data(d, prop) for d in DATA]\n", + " graph.line(x, y, line_color=COLORS[idx % len(COLORS)], line_dash='solid', legend_label='_'.join(prop))\n", + " show(graph)\n" + ] + } + ], + "metadata": { + "language_info": { + "name": "python" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/ipsframework/jupyter.py b/ipsframework/jupyter.py index d75fa0d..3873f24 100644 --- a/ipsframework/jupyter.py +++ b/ipsframework/jupyter.py @@ -24,7 +24,10 @@ def replace_last(source_string: str, old: str, new: str) -> str: - """Attempt to replace the last occurence of 'old' with 'new' in 'source_string', searching from the right.""" + """Attempt to replace the last occurence of 'old' with 'new' in 'source_string', searching from the right. + + This should only be called if 'old' can effectively be guaranteed to exist in the string. + """ head, _sep, tail = source_string.rpartition(old) return f'{head}{new}{tail}' @@ -76,18 +79,97 @@ def add_data_file_to_notebook(dest: str, data_file: str, index: Optional[int] = Params: - dest: path to notebook which will be modified - data_file: data file we add to the notebook - - index: optional index of the IPS notebook cell. If not provided, + - index: optional index of the IPS notebook cell. If not provided, search through the notebook via an expected string hook. """ nb: nbf.NotebookNode = nbf.read(dest, as_version=4) if index is None: index = next((i for i, e in enumerate(nb['cells']) if HOOK in e['source']), -1) if index < 0: raise Exception('Cannot find IPS notebook node') - ips_cell = nb['cells'][index]['source'] + ips_cell: str = nb['cells'][index]['source'] + + if ips_cell.find(f"f'{{IPS_DATA_DIR}}{data_file}',\n]") != -1: + # The data file is already referenced in the notebook, so there's nothing else to do + return + # data file does not exist, so we need to add it # search from right of string for the ']' character, should work assuming user does not modify the cell past the variable definition result = replace_last(ips_cell, ']', f"f'{{IPS_DATA_DIR}}{data_file}',\n]") nb['cells'][index]['source'] = result with open(dest, 'w') as f: nbf.write(nb, f) + + +def remove_data_file_from_notebook(dest: str, data_file: str, index: Optional[int] = None): + """Remove data file from the notebook list. + + Params: + - dest: path to notebook which will be modified + - data_file: data file we remove from the notebook + - index: optional index of the IPS notebook cell. If not provided, search through the notebook via an expected string hook. + """ + nb: nbf.NotebookNode = nbf.read(dest, as_version=4) + if index is None: + index = next((i for i, e in enumerate(nb['cells']) if HOOK in e['source']), -1) + if index < 0: + raise Exception('Cannot find IPS notebook node') + ips_cell: str = nb['cells'][index]['source'] + + head, sep, tail = ips_cell.rpartition(f"f'{{IPS_DATA_DIR}}{data_file}',\n") + if sep == '': + # existing match not found, so there's nothing left to remove + return + result = f'{head}\n{tail}' + nb['cells'][index]['source'] = result + + with open(dest, 'w') as f: + nbf.write(nb, f) + + +def remove_last_data_file_from_notebook(dest: str, index: Optional[int] = None) -> Optional[str]: + """Obtain the last data file entry in a notebook, remove it, and then return the name of the file. + + Note that this function assumes the notebook maintains a specific format. + + Returns: + - None if there were no data entries in the notebook, + """ + nb: nbf.NotebookNode = nbf.read(dest, as_version=4) + if index is None: + index = next((i for i, e in enumerate(nb['cells']) if HOOK in e['source']), -1) + if index < 0: + raise Exception('Cannot find IPS notebook node') + ips_cell: str = nb['cells'][index]['source'] + + search_hook = "f'{IPS_DATA_DIR}" + + start_index = ips_cell.rfind(search_hook) + if start_index == -1: + # no data files have been added, nothing to do + return None + + ret = None + file_name_start_index = start_index + len(search_hook) + end_index = file_name_start_index + while True: + try: + end_char = ips_cell[end_index] + end_index += 1 + if end_char == '\n': + # each entry gets its own "line", so we don't need to search anymore + break + if end_char == "'" and ips_cell[end_index] == ',': + # we have found the name of the file + ret = ips_cell[file_name_start_index:end_index] + except IndexError: + # improperly formatted file (reached EOF), fall back to just removing everything after the break + return None + + result = ips_cell[:start_index] + ips_cell[end_index:] + nb['cells'][index]['source'] = result + + with open(dest, 'w') as f: + nbf.write(nb, f) + + return ret diff --git a/ipsframework/services.py b/ipsframework/services.py index 07d918a..dfcc7fc 100644 --- a/ipsframework/services.py +++ b/ipsframework/services.py @@ -22,14 +22,14 @@ import weakref from collections import namedtuple from operator import iadd, itemgetter -from typing import Any, Iterable, List, Optional, Union +from typing import Any, Callable, Iterable, List, Optional, Union from configobj import ConfigObj from . import ipsutil, messages from .cca_es_spec import initialize_event_service from .ips_es_spec import eventManager -from .jupyter import add_data_file_to_notebook, initialize_jupyter_notebook +from .jupyter import add_data_file_to_notebook, initialize_jupyter_notebook, remove_data_file_from_notebook, remove_last_data_file_from_notebook from .taskManager import TaskInit RunningTask = namedtuple('RunningTask', ['process', 'start_time', 'timeout', 'nproc', 'cores_allocated', 'command', 'binary', 'args']) @@ -1894,12 +1894,7 @@ def _get_jupyterhub_url(self) -> Optional[str]: self.warning('PORTAL_URL was not defined, skipping JupyterHub configuration') return None - try: - runid = self.get_config_param('PORTAL_RUNID') - except Exception: - # TODO Figure out how to associate value across components (may need to use a state file?) - self.warning("Couldn't get PORTAL_RUNID, skipping Jupyter URL association of data") - return None + runid = self._get_jupyter_runid() url += f'ipsframework/runs/{portal_url_host}/{runid}/' return url @@ -1948,7 +1943,7 @@ def initialize_jupyter_notebook( self._send_monitor_event('IPS_PORTAL_REGISTER_NOTEBOOK', f'URL = {url}') def add_data_file_to_notebook(self, state_file_path: str, timestamp: float, notebook_name: str, index: Optional[int] = None): - """Add data file to notebook list. + """Add data file to JupyterHub directory, and reference it in the notebook. This function assumes that a notebook has already been created with intialize_jupyter_notebook. Using this function does not call the IPS Portal. @@ -1979,7 +1974,62 @@ def add_data_file_to_notebook(self, state_file_path: str, timestamp: float, note # TODO - maybe add flag which allows us to replace old state files add_data_file_to_notebook(f'{self._jupyterhub_dir}{notebook_name}', state_file_name, index) - def publish(self, topicName, eventName, eventBody): + def remove_data_file_from_notebook(self, state_file_path: str, timestamp: float, notebook_name: str, index: Optional[int] = None): + """Remove data file from JupyterHub data directory and from being referenced in the notebook. + + This function assumes that a notebook has already been created with intialize_jupyter_notebook. Using this function does not call the IPS Portal. + + Params: + - state_file_path: location of the current state file we want to copy to the Jupyter directory + - timestamp: label to assign to the data (currently must be a floating point value) + - notebook_name: name of notebook which will be modified. Note that this path is relative to the JupyterHub directory. + - index: optional index of the IPS notebook cell. If not provided, the IPS Framework will attempt to automatically find the cell it created, + which should work for every usecase where you don't anticipate modifying the notebook until after the run is complete. + """ + + if not self._jupyterhub_dir: + if not self._init_jupyter(): + # TODO generic exception + raise Exception('Unable to initialize base JupyterHub dir') + + file_parts = state_file_path.split('.') + if len(file_parts) > 2: # name of the file could just be a floating point value with no extension + extension = f'.{file_parts[-1]}' + else: + extension = '' + + state_file_name = f'{timestamp}{extension}' + jupyter_data_dir = os.path.join(self._jupyterhub_dir, 'data', state_file_name) + + # if this errors out, we can safely ignore them + shutil.rmtree(jupyter_data_dir, ignore_errors=True) + + # TODO - maybe add flag which allows us to replace old state files + remove_data_file_from_notebook(f'{self._jupyterhub_dir}{notebook_name}', state_file_name, index) + + def remove_last_data_file_from_notebook(self, notebook_name: str, index: Optional[int] = None): + """Remove the last added data file from a notebook and from the filesystem. + + This function assumes that a notebook has already been created with intialize_jupyter_notebook. Using this function does not call the IPS Portal. + + Params: + - notebook_name: name of notebook which will be modified. Note that this path is relative to the JupyterHub directory. + - index: optional index of the IPS notebook cell. If not provided, the IPS Framework will attempt to automatically find the cell it created, + which should work for every usecase where you don't anticipate modifying the notebook until after the run is complete. + """ + + if not self._jupyterhub_dir: + if not self._init_jupyter(): + # TODO generic exception + raise Exception('Unable to initialize base JupyterHub dir') + + last_state_file = remove_last_data_file_from_notebook(notebook_name, index) + if last_state_file is None: + return + data_file = os.path.join(self._jupyterhub_dir, 'data', last_state_file) + shutil.rmtree(data_file, ignore_errors=True) + + def publish(self, topicName: str, eventName: str, eventBody: Any): """ Publish event consisting of *eventName* and *eventBody* to topic *topicName* to the IPS event service. """ @@ -1987,7 +2037,7 @@ def publish(self, topicName, eventName, eventBody): topicName = self.sim_name + '_' + topicName self.event_service.publish(topicName, eventName, eventBody) - def subscribe(self, topicName, callback): + def subscribe(self, topicName: str, callback: Callable): """ Subscribe to topic *topicName* on the IPS event service and register *callback* as the method to be invoked when an event is published to that topic. """ @@ -1995,7 +2045,7 @@ def subscribe(self, topicName, callback): topicName = self.sim_name + '_' + topicName self.event_service.subscribe(topicName, callback) - def unsubscribe(self, topicName): + def unsubscribe(self, topicName: str): """ Remove subscription to topic *topicName*. """