From 106f6dd6e6bf9830e33870b3a9de7506faead7c7 Mon Sep 17 00:00:00 2001 From: Lance-Drane Date: Mon, 5 Aug 2024 21:00:14 -0400 Subject: [PATCH] simplify jupyter notebook API Signed-off-by: Lance-Drane --- .../004-time-loop/mymodule/components.py | 33 +++++---- ...ok.ipynb => base-notebook-iterative.ipynb} | 2 + .../input_dir/base-notebook-one-pass.ipynb | 30 ++++++++ ipsframework/jupyter.py | 70 ++++++++++++++++--- ipsframework/portalBridge.py | 6 +- ipsframework/services.py | 47 ++++++++----- 6 files changed, 145 insertions(+), 43 deletions(-) rename examples-proposed/004-time-loop/sim/input_dir/{base-notebook.ipynb => base-notebook-iterative.ipynb} (86%) create mode 100644 examples-proposed/004-time-loop/sim/input_dir/base-notebook-one-pass.ipynb diff --git a/examples-proposed/004-time-loop/mymodule/components.py b/examples-proposed/004-time-loop/mymodule/components.py index 418c12e..8e8ec33 100644 --- a/examples-proposed/004-time-loop/mymodule/components.py +++ b/examples-proposed/004-time-loop/mymodule/components.py @@ -5,6 +5,11 @@ from ipsframework import Component +NOTEBOOK_1_TEMPLATE = 'base-notebook-iterative.ipynb' +NOTEBOOK_1_NAME = 'full_state_iterative.ipynb' +NOTEBOOK_2_TEMPLATE = 'base-notebook-one-pass.ipynb' +NOTEBOOK_2_NAME = 'full_state_one_pass.ipynb' + class Init(Component): """Empty init component.""" @@ -16,14 +21,19 @@ class Driver(Component): """In this example, the driver iterates through the time loop and calls both the worker and the monitor component on each timestep.""" def step(self, timestamp=0.0): - NOTEBOOK_TEMPLATE = 'base-notebook.ipynb' - worker = self.services.get_port('WORKER') monitor = self.services.get_port('MONITOR') self.services.call(worker, 'init', 0) # Needed for notebook template - self.services.stage_input_files(NOTEBOOK_TEMPLATE) + self.services.stage_input_files([NOTEBOOK_1_TEMPLATE, NOTEBOOK_2_TEMPLATE]) + + # Example of a notebook we want to initialize and then periodically append to during the run + self.services.initialize_jupyter_notebook( + dest_notebook_name=NOTEBOOK_1_NAME, # path is relative to JupyterHub directory + source_notebook_path=NOTEBOOK_1_TEMPLATE, # path is relative to input directory + ) + # Initialize second notebook # The time loop is configured in its own section of sim.conf # It is shared across all components @@ -33,15 +43,12 @@ def step(self, timestamp=0.0): # TODO - perhaps monitor timestep does not need to be called every step, but only every 20 steps? self.services.call(monitor, 'step', t) - # create notebook here - NOTEBOOK_NAME = 'full_state.ipynb' - jupyter_state_files = self.services.get_staged_jupyterhub_files() - self.services.stage_jupyter_notebook( - dest_notebook_name=NOTEBOOK_NAME, # path is relative to JupyterHub directory - source_notebook_path='base-notebook.ipynb', # path is relative to input directory - tags=jupyter_state_files, + # With this second "example" notebook, we only create it once and only write to it once. + self.services.initialize_jupyter_notebook( + dest_notebook_name=NOTEBOOK_2_NAME, # path is relative to JupyterHub directory + source_notebook_path=NOTEBOOK_2_TEMPLATE, # path is relative to input directory + initial_data_files=self.services.get_staged_jupyterhub_files(), ) - self.services.portal_register_jupyter_notebook(NOTEBOOK_NAME) self.services.call(worker, 'finalize', 0) @@ -95,7 +102,9 @@ def step(self, timestamp=0.0, **keywords): data = f.read() # stage the state file in the JupyterHub directory - self.services.jupyterhub_make_state(state_file, timestamp) + data_file = self.services.jupyterhub_make_state(state_file, timestamp) + print('ADD DATA FILE', data_file) + self.services.add_data_file_to_notebook(NOTEBOOK_1_NAME, data_file) print('SEND PORTAL DATA', timestamp, data, file=stderr) self.services.send_portal_data(timestamp, data) diff --git a/examples-proposed/004-time-loop/sim/input_dir/base-notebook.ipynb b/examples-proposed/004-time-loop/sim/input_dir/base-notebook-iterative.ipynb similarity index 86% rename from examples-proposed/004-time-loop/sim/input_dir/base-notebook.ipynb rename to examples-proposed/004-time-loop/sim/input_dir/base-notebook-iterative.ipynb index 78ee66a..988a582 100644 --- a/examples-proposed/004-time-loop/sim/input_dir/base-notebook.ipynb +++ b/examples-proposed/004-time-loop/sim/input_dir/base-notebook-iterative.ipynb @@ -10,6 +10,8 @@ "# Notebook template, the IPS Framework will add a cell before this one\n", "# defining FILES as a list of state file paths.\n", "\n", + "# In this example, this notebook is generated during the time loop.\n", + "\n", "mapping = {}\n", "for file in FILES:\n", " with open(file, 'rb') as f:\n", diff --git a/examples-proposed/004-time-loop/sim/input_dir/base-notebook-one-pass.ipynb b/examples-proposed/004-time-loop/sim/input_dir/base-notebook-one-pass.ipynb new file mode 100644 index 0000000..925b512 --- /dev/null +++ b/examples-proposed/004-time-loop/sim/input_dir/base-notebook-one-pass.ipynb @@ -0,0 +1,30 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": null, + "id": "5d75faa3", + "metadata": {}, + "outputs": [], + "source": [ + "# Notebook template, the IPS Framework will add a cell before this one\n", + "# defining FILES as a list of state file paths.\n", + "\n", + "# In this example, this notebook is only generated at the end of the run.\n", + "\n", + "mapping = {}\n", + "for file in FILES:\n", + " with open(file, 'rb') as f:\n", + " mapping[file] = f.read()\n", + "print(mapping)\n" + ] + } + ], + "metadata": { + "language_info": { + "name": "python" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/ipsframework/jupyter.py b/ipsframework/jupyter.py index bfea3ed..02c77d0 100644 --- a/ipsframework/jupyter.py +++ b/ipsframework/jupyter.py @@ -10,25 +10,54 @@ ...in a shell on Jupyter NERSC. """ -from os.path import sep -from typing import List +from typing import List, Optional import nbformat as nbf +HOOK = '### This cell autogenerated by IPS Framework. DO NOT EDIT UNTIL IPS RUN IS FINALIZED. ###' +"""This hook is used to determine which "cell" the IPS framework should work with. + +It is written to a notebook cell on initializing it, and is searched for when adding a data file to it. +""" + + +def replace_last(source_string: str, old: str, new: str) -> str: + """Attempt to replace the last occurence of 'old' with 'new' in 'source_string', searching from the right.""" + head, _sep, tail = source_string.rpartition(old) + return f'{head}{new}{tail}' -def _get_state_file_notebook_code_cell(variable: str, tags: List[str]): - itemsep = ',\n' - return f"""import os +def _initial_jupyter_file_notebook_cell(variable: str, initial_data_files: Optional[List[str]] = None) -> str: + if not initial_data_files: + initial = '' + else: + itemsep = '\n' + initial = '\n' + itemsep.join([f"'{file}'," for file in initial_data_files]) + return f"""{HOOK} + +import os + +# NOTE: directory should be sim_name plus the run id from the Portal +# NOTE: add absolute path as a comment to the notebook cell # Uncomment below line to use any state files saved #{variable} = os.listdir('data') # files created during the run -{variable} = [{itemsep.join([f"'data{sep}{file}'" for file in tags])}] +{variable} = [{initial} +] """ -def stage_jupyter_notebook(dest: str, src: str, tags: List[str], variable_name: str, index: int): - """""" +def initialize_jupyter_notebook(dest: str, src: str, variable_name: str, index: int, initial_data_files: Optional[List[str]] = None): + """Create a new notebook from an old notebook, copying the result from 'src' to 'dest'. + + Params: + - dest - location of notebook to create on filesystem + - src - location of source notebook on filesystem (is not overwritten unless src == dest) + - variable_name: what to call the variable + - index: insert new cells at position before this value (will not remove preexisting cells) + - initial_data_files: optional list of files to initialize the notebook with + + """ # to avoid conversion, use as_version=nbf.NO_CONVERT # nb: nbf.NotebookNode = nbf.read(src, as_version=4) @@ -36,10 +65,33 @@ def stage_jupyter_notebook(dest: str, src: str, tags: List[str], variable_name: header = '# Next cell generated by IPS Framework' nb['cells'] = ( nb['cells'][:index] - + [nbf.v4.new_markdown_cell(header), nbf.v4.new_code_cell(_get_state_file_notebook_code_cell(variable_name, tags))] + + [nbf.v4.new_markdown_cell(header), nbf.v4.new_code_cell(_initial_jupyter_file_notebook_cell(variable_name, initial_data_files))] + nb['cells'][index:] ) nbf.validate(nb) with open(dest, 'w') as f: nbf.write(nb, f) + + +def add_data_file_to_notebook(dest: str, data_file: str, index: Optional[int] = None): + """Add data file to notebook list. + + Params: + - dest: path to notebook which will be modified + - data_file: data file we add to the notebook + - index: optional index of the IPS notebook cell. If not provided, + """ + nb: nbf.NotebookNode = nbf.read(dest, as_version=4) + if index is None: + index = next((i for i, e in enumerate(nb['cells']) if HOOK in e['source']), -1) + if index < 0: + raise Exception('Cannot find IPS notebook node') + ips_cell = nb['cells'][index]['source'] + + # search from right of string for the ']' character, should work assuming user does not modify the cell past the variable definition + result = replace_last(ips_cell, ']', f"'{data_file}',\n]") + nb['cells'][index]['source'] = result + + with open(dest, 'w') as f: + nbf.write(nb, f) diff --git a/ipsframework/portalBridge.py b/ipsframework/portalBridge.py index 43fe63a..d293094 100644 --- a/ipsframework/portalBridge.py +++ b/ipsframework/portalBridge.py @@ -99,7 +99,7 @@ def send_post_data(conn: Connection, stop: EventType, url: str): break -def send_put_jupyter_url(conn: Connection, stop: EventType, url: str): +def send_post_jupyter_url(conn: Connection, stop: EventType, url: str): fail_count = 0 http = urllib3.PoolManager(retries=urllib3.util.Retry(3, backoff_factor=0.25)) @@ -110,7 +110,7 @@ def send_put_jupyter_url(conn: Connection, stop: EventType, url: str): # TODO - consider using multipart/form-data instead try: resp = http.request( - 'PUT', + 'POST', url, body=json.dumps({'url': next_val['url'], 'portal_runid': next_val['portal_runid']}).encode(), headers={ @@ -430,7 +430,7 @@ def send_notebook_url(self, sim_data, event_data): self.dataurl_parent_conn, child_conn = Pipe() self.dataurl_childProcessStop = Event() self.dataurl_childProcess = Process( - target=send_put_jupyter_url, args=(child_conn, self.dataurl_childProcessStop, self.portal_url + '/api/data/add_url') + target=send_post_jupyter_url, args=(child_conn, self.dataurl_childProcessStop, self.portal_url + '/api/data/add_url') ) self.dataurl_childProcess.start() self.dataurl_first_event = False diff --git a/ipsframework/services.py b/ipsframework/services.py index a1f0147..2e0436f 100644 --- a/ipsframework/services.py +++ b/ipsframework/services.py @@ -10,6 +10,7 @@ import logging import logging.handlers import os +import pathlib import queue import shutil import signal @@ -28,7 +29,7 @@ from . import ipsutil, messages from .cca_es_spec import initialize_event_service from .ips_es_spec import eventManager -from .jupyter import stage_jupyter_notebook +from .jupyter import add_data_file_to_notebook, initialize_jupyter_notebook from .taskManager import TaskInit RunningTask = namedtuple('RunningTask', ['process', 'start_time', 'timeout', 'nproc', 'cores_allocated', 'command', 'binary', 'args']) @@ -1842,14 +1843,15 @@ def get_staged_jupyterhub_files(self) -> List[str]: # TODO generic exception raise Exception('Unable to initialize base JupyterHub dir') - return os.listdir(os.path.join(self._jupyterhub_dir, 'data')) + data_dir = pathlib.Path(pathlib.Path(self._jupyterhub_dir) / 'data') + return [str(p.resolve()) for p in data_dir.glob('*')] def jupyterhub_make_state(self, state_file_path: str, timestamp: float) -> str: """ Move a state file into the JupyterHub directory. Returns: - - the path to the state file in the JupyterHub directory + - the path to the state file in the JupyterHub directory. This will be an absolute path. Raises: - Exception, if unable to move file to the provided JUPYTERHUB_DIR @@ -1885,22 +1887,21 @@ def _get_jupyterhub_url(self) -> Optional[str]: url += f'ipsframework/runs/{runid}/' return url - def stage_jupyter_notebook( + def initialize_jupyter_notebook( self, dest_notebook_name: str, source_notebook_path: str, - tags: List[str], + initial_data_files: Optional[List[str]] = None, variable_name: str = 'FILES', cell_to_modify: int = 0, ) -> None: - """Loads a notebook from source_notebook_path, adds a cell to load the data, and then saves it to source_notebook_path. + """Loads a notebook from source_notebook_path, adds a cell to load the data, and then saves it to source_notebook_path. Will also try to register the notebook with the IPS Portal, if available. Does not modify the source notebook. Params: - dest_notebook_name: name of the JupyterNotebook you want to write (do not include file paths). - source_notebook_path: location you want to load the source notebook from - - tags: list of state files you want to load in the notebook. - variable_name: name of the variable you want to load files from (default: "FILES") - cell_to_modify: which cell in the JupyterNotebook you want to add the data call to (0-indexed). (This will not overwrite any cells, just appends.) @@ -1910,22 +1911,14 @@ def stage_jupyter_notebook( if not self._init_jupyter(): raise Exception('Unable to initialize base JupyterHub dir') - stage_jupyter_notebook(f'{self._jupyterhub_dir}{dest_notebook_name}', source_notebook_path, tags, variable_name, cell_to_modify) + # adds notebook to JupyterHub + initialize_jupyter_notebook(f'{self._jupyterhub_dir}{dest_notebook_name}', source_notebook_path, variable_name, cell_to_modify, initial_data_files) - def portal_register_jupyter_notebook(self, notebook_name: str) -> None: - """Associate a JupyterNotebook with tags on the IPS Portal - - NOTE: It's best to ONLY run this if you're wanting to associate multiple data files with a single notebook. - If you just want to save a single file, set the appropriate parameter on send_portal_data instead. - - Params - - notebook_name: name of the notebook (do not provide any directories, use the config file for this) - - tags: list of tags to associate the notebook with - """ + # register notebook with IPS Portal url = self._get_jupyterhub_url() if not url: return - url += notebook_name + url += dest_notebook_name event_data = {} event_data['sim_name'] = self.sim_conf['__PORTAL_SIM_NAME'] @@ -1938,6 +1931,22 @@ def portal_register_jupyter_notebook(self, notebook_name: str) -> None: self.publish('_IPS_MONITOR', 'PORTAL_REGISTER_NOTEBOOK', event_data) self._send_monitor_event('IPS_PORTAL_REGISTER_NOTEBOOK', f'URL = {url}') + def add_data_file_to_notebook(self, notebook_name: str, state_file: str, index: Optional[int] = None): + """Add data file to notebook list. + + This function assumes that a notebook has already been created with intialize_jupyter_notebook. Using this function does not call the IPS Portal. + + Params: + - notebook_name: name of notebook which will be modified. Note that this path is relative to the JupyterHub directory. + - data_file: data file we add to the notebook (simple string). This value should almost always be the return value from "self.services.jupyterhub_make_state". + - index: optional index of the IPS notebook cell. If not provided, the IPS Framework will attempt to automatically find the cell it created, + which should work for every usecase where you don't anticipate modifying the notebook until after the run is complete. + """ + if not self._jupyterhub_dir: + if not self._init_jupyter(): + raise Exception('Unable to initialize base JupyterHub dir') + add_data_file_to_notebook(f'{self._jupyterhub_dir}{notebook_name}', state_file, index) + def publish(self, topicName, eventName, eventBody): """ Publish event consisting of *eventName* and *eventBody* to topic *topicName* to the IPS event service.