Skip to content

Commit

Permalink
start work on post-analysis multirun API, add tarball capabilities
Browse files Browse the repository at this point in the history
Signed-off-by: Lance-Drane <[email protected]>
  • Loading branch information
Lance-Drane committed Sep 25, 2024
1 parent 655766e commit f915e44
Show file tree
Hide file tree
Showing 5 changed files with 109 additions and 53 deletions.
6 changes: 1 addition & 5 deletions examples-proposed/004-time-loop/mymodule/components.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
NOTEBOOK_1_NAME = 'basic.ipynb'
NOTEBOOK_2_TEMPLATE = 'bokeh-plots.ipynb'
NOTEBOOK_2_NAME = 'bokeh-plots.ipynb'
DATA_MODULE_NAME = 'data_files'


class Init(Component):
Expand All @@ -41,12 +40,10 @@ def step(self, timestamp=0.0):
self.services.initialize_jupyter_notebook(
dest_notebook_name=NOTEBOOK_1_NAME, # path is relative to JupyterHub directory
source_notebook_path=NOTEBOOK_1_TEMPLATE, # path is relative to input directory
data_module_name=DATA_MODULE_NAME,
)
self.services.initialize_jupyter_notebook(
dest_notebook_name=NOTEBOOK_2_NAME, # path is relative to JupyterHub directory
source_notebook_path=NOTEBOOK_2_TEMPLATE, # path is relative to input directory
data_module_name=DATA_MODULE_NAME,
)

# The time loop is configured in its own section of sim.conf
Expand Down Expand Up @@ -113,12 +110,11 @@ def step(self, timestamp=0.0, **keywords):

# stage the state file in the JupyterHub directory and update the module file to handle it
if REPLACE:
self.services.add_analysis_data_file(state_file, os.path.basename(state_file), DATA_MODULE_NAME, replace=True)
self.services.add_analysis_data_file(state_file, os.path.basename(state_file), replace=True)
else:
self.services.add_analysis_data_file(
state_file,
f'{timestamp}_{os.path.basename(state_file)}',
DATA_MODULE_NAME,
timestamp=timestamp,
)

Expand Down
6 changes: 6 additions & 0 deletions ipsframework/_jupyter/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
"""Internal logic for interacting with the Jupyter API.
Users should not need to access anything in this module directoy, please use the corresponding services functions instead.
The APIs should only be accessed outside of the IPS Framework, when performing bulk operations with multiple runids.
"""
62 changes: 62 additions & 0 deletions ipsframework/_jupyter/api_v1.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
"""This file is meant to be directly imported and utilized in the Jupyter analysis stage when comparing multiple runs."""

import datetime
import importlib.util
import os
import tarfile
from pathlib import Path
from typing import Dict, Iterable, Union

THIS_DIR = Path(__file__).resolve().parent


def get_data_from_runid(runid: int) -> Dict[float, str]:
"""Load all data associated with a single runid into a dictionary.
Params:
- runid: the run id we're working with
Returns:
- a dictionary mapping timesteps to associated data file paths.
"""
spec = importlib.util.spec_from_file_location('', f'{os.path.join(THIS_DIR, str(runid), "data_listing.py")}')
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
return module.DATA_FILES


def get_data_from_runids(runids: Iterable[int]) -> Dict[int, Dict[float, str]]:
"""Load all data associated with multiple runids into a common data structure.
Params:
- runids: iterable of existing runids (note that it is the caller's responsibility to verify uniqueness)
Returns:
- a dictionary of runids to the common runid data structure. This data structure is a mapping of timesteps to associated data file paths.
"""
return {runid: get_data_from_runid(runid) for runid in runids}


def generate_tar_from_runids(runids: Union[Iterable[int], int]) -> str:
"""
Generate a tarball containing all data from the provided runs
Params:
- runids: list of runids where we want to include the data
Returns:
- the absolute path of the tarball generated
"""
tarball_name = f'{datetime.datetime.now(datetime.timezone.utc).isoformat().replace(":", "-").replace("+", "_")}__ips_runs'
tarball = THIS_DIR / f'{tarball_name}.tar.gz'
archive = tarfile.open(tarball, 'w:gz')

if isinstance(runids, int):
runids = [runids]

for runid in runids:
arcname = os.path.join(tarball_name, str(runid), 'data')
archive.add(os.path.join(THIS_DIR, str(runid), 'data'), arcname=arcname)

archive.close()
return str(tarball)
50 changes: 27 additions & 23 deletions ipsframework/jupyter.py → ipsframework/_jupyter/initializer.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,17 @@
"""

import re
import shutil
from os.path import sep
from pathlib import Path
from typing import Optional

import nbformat as nbf

DIRECTORY_VARIABLE_NAME = 'DATA_DIR'
DATA_VARIABLE_NAME = 'DATA_FILES'
DATA_MODULE_NAME = 'data_listing'
CURRENT_API_VERSION = 'v1'


def replace_last(source_string: str, old: str, new: str) -> str:
Expand All @@ -33,76 +37,76 @@ def replace_last(source_string: str, old: str, new: str) -> str:
return f'{head}{new}{tail}'


def _initial_data_file_code(dest: str, files_variable_name: str) -> str:
return f"""# This file should be imported by a jupyter notebook. DO NOT EDIT UNTIL IPS RUN IS FINALIZED>
def _initial_data_file_code(dest: str) -> str:
return f"""# This file should be imported by a jupyter notebook or the generated API. DO NOT EDIT UNTIL IPS RUN IS FINALIZED.
import os
# NOTE: directory should be sim_name plus the run id from the Portal
{DIRECTORY_VARIABLE_NAME} = '{str(Path(dest).parent / 'data') + sep}'
{files_variable_name} = {{
{DATA_VARIABLE_NAME} = {{
}}
"""


def initialize_jupyter_notebook(notebook_dest: str, notebook_src: str, module_name: str, variable_name: str, index: int):
def initialize_jupyter_python_api(jupyterhub_dir: str):
api_filepath = Path(__file__).parent / f'api_{CURRENT_API_VERSION}.py'
file_dest = Path(jupyterhub_dir) / f'api_{CURRENT_API_VERSION}.py'
shutil.copyfile(api_filepath, file_dest)


def initialize_jupyter_notebook(notebook_dest: str, notebook_src: str):
"""Create a new notebook from an old notebook, copying the result from 'src' to 'dest'.
This adds an additional cell which will import the data files. The notebook should not be written again after this function.
Params:
- notebook_dest - location of notebook to create on filesystem (absolute file path)
- notebook_src - location of source notebook on filesystem (is not overwritten unless src == dest)
- module_name - name of the python module which will contain the data file list
- variable_name: what to call the variable
- index: insert new cells at position before this value (will not remove preexisting cells)
"""
# to avoid conversion, use as_version=nbf.NO_CONVERT
nb: nbf.NotebookNode = nbf.read(notebook_src, as_version=4)

nb['cells'] = (
nb['cells'][:index]
+ [
# explicitly mark the IPS cell for users inspecting the file, unused programatically
nbf.v4.new_markdown_cell('## Next cell generated by IPS Framework'),
nbf.v4.new_code_cell(f"""
from {module_name} import {variable_name}
nb['cells'] = [
# explicitly mark the IPS cell for users inspecting the file, unused programatically
nbf.v4.new_markdown_cell('## Next cell generated by IPS Framework'),
nbf.v4.new_code_cell(f"""
from {DATA_MODULE_NAME} import {DATA_VARIABLE_NAME}
import importlib
importlib.reload('{variable_name}')
importlib.reload('{DATA_VARIABLE_NAME}')
"""),
]
+ nb['cells'][index:]
)
] + nb['cells'][:]

nbf.validate(nb)
with open(notebook_dest, 'w') as f:
nbf.write(nb, f)


def initialize_jupyter_import_module_file(dest: str, variable_name: str):
def initialize_jupyter_import_module_file(dest: str):
"""Create a new notebook from an old notebook, copying the result from 'src' to 'dest'.
Params:
- dest - location of notebook to create on filesystem (absolute file path)
- variable_name: what to call the variable
- dest - directory where we will create the module file on filesystem (absolute file path)
"""

dest = f'{dest}{DATA_MODULE_NAME}.py'
with open(dest, 'w') as f:
f.write(_initial_data_file_code(dest, variable_name))
f.write(_initial_data_file_code(dest))


def update_module_file_with_data_file(dest: str, data_file: str, replace: bool, timestamp: float = 0.0) -> Optional[str]:
"""
Params:
- dest: path to module file which will be modified
- dest: directory of the module file which will be modified
- data_file: file which will be added to the module
- replace: if True, we can update
- timestamp: key we associate the data file with
Returns:
- if we replaced a file, the name of the file which was replaced; otherwise, None
"""
dest = f'{dest}{DATA_MODULE_NAME}.py'
with open(dest, 'r') as f:
old_module_code = f.read()

Expand Down
38 changes: 13 additions & 25 deletions ipsframework/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,14 @@
from configobj import ConfigObj

from . import ipsutil, messages
from .cca_es_spec import initialize_event_service
from .ips_es_spec import eventManager
from .jupyter import (
from ._jupyter.initializer import (
initialize_jupyter_import_module_file,
initialize_jupyter_notebook,
initialize_jupyter_python_api,
update_module_file_with_data_file,
)
from .cca_es_spec import initialize_event_service
from .ips_es_spec import eventManager
from .taskManager import TaskInit

RunningTask = namedtuple('RunningTask', ['process', 'start_time', 'timeout', 'nproc', 'cores_allocated', 'command', 'binary', 'args'])
Expand Down Expand Up @@ -1903,13 +1904,11 @@ def _get_jupyterhub_url(self) -> Optional[str]:
url += f'ipsframework/runs/{portal_url_host}/{runid}/'
return url

# TODO consider how we use variable_name in the API and get rid of it if it's not necessary
def initialize_jupyter_notebook(
self,
dest_notebook_name: str,
source_notebook_path: str,
data_module_name: str,
variable_name: str = 'DATA_FILES',
cell_to_modify: int = 0,
) -> None:
"""Loads a notebook from source_notebook_path, adds a cell to load the data, and then saves it to source_notebook_path. Will also try to register the notebook with the IPS Portal, if available.
Expand All @@ -1919,23 +1918,19 @@ def initialize_jupyter_notebook(
- dest_notebook_name: name of the JupyterNotebook you want to write (do not include file paths).
- source_notebook_path: location you want to load the source notebook from
- data_module_name: name of the python file you want to generate a data file for (do not include file paths or file extensions)
- variable_name: name of the variable in the module file you want to load files from (default: "DATA_FILES")
- cell_to_modify: which cell in the JupyterNotebook you want to add the data call to (0-indexed).
(This will not overwrite any cells, just appends.)
By default, the data listing will happen in the FIRST cell.
"""
if not self._jupyterhub_dir:
if not self._init_jupyter():
raise Exception('Unable to initialize base JupyterHub dir')

if data_module_name.endswith('.py'):
data_module_name = data_module_name[:-3]

# adds module file to Jupyterhub
initialize_jupyter_import_module_file(f'{self._jupyterhub_dir}{data_module_name}.py', variable_name)
initialize_jupyter_import_module_file(self._jupyterhub_dir)

# add the shared python API if it doesn't exist
initialize_jupyter_python_api(str(pathlib.Path(self._jupyterhub_dir).parent))

# adds notebook to JupyterHub
initialize_jupyter_notebook(f'{self._jupyterhub_dir}{dest_notebook_name}', source_notebook_path, data_module_name, variable_name, cell_to_modify)
initialize_jupyter_notebook(f'{self._jupyterhub_dir}{dest_notebook_name}', source_notebook_path)

# register notebook with IPS Portal
url = self._get_jupyterhub_url()
Expand All @@ -1954,26 +1949,20 @@ def initialize_jupyter_notebook(
self.publish('_IPS_MONITOR', 'PORTAL_REGISTER_NOTEBOOK', event_data)
self._send_monitor_event('IPS_PORTAL_REGISTER_NOTEBOOK', f'URL = {url}')

def add_analysis_data_file(
self, current_data_file_path: str, new_data_file_name: str, data_module_name: str, timestamp: float = 0.0, replace: bool = False
):
def add_analysis_data_file(self, current_data_file_path: str, new_data_file_name: str, timestamp: float = 0.0, replace: bool = False):
"""Add data file to the module file referenced by the Jupyter Notebook.
Params:
- data_file_path: location of the current data file we want to copy to the Jupyter directory. This will usually be a state file.
- current_data_file_path: location of the current data file we want to copy to the Jupyter directory. This will usually be a state file.
- new_data_file_name: name of the new data file (relative to Jupyterhub data directory, should be unique per run)
- timestamp: label to assign to the data (currently must be a floating point value)
- data_module_name: name of notebook which will be modified. Note that this path is relative to the JupyterHub directory.
- replace: If True, replace the last data file added with the new data file. If False, simply append the new data file. (default: False)
"""
if not self._jupyterhub_dir:
if not self._init_jupyter():
# TODO generic exception
raise Exception('Unable to initialize base JupyterHub dir')

if not data_module_name.endswith('.py'):
data_module_name += '.py'

# make sure we're working with a file, and not a directory, regarding the data file name
new_data_file_name = os.path.basename(new_data_file_name)

Expand All @@ -1982,9 +1971,8 @@ def add_analysis_data_file(
shutil.copyfile(current_data_file_path, jupyter_data_file)

# update the module file
replaced_file_name = update_module_file_with_data_file(f'{self._jupyterhub_dir}{data_module_name}', new_data_file_name, replace, timestamp)
replaced_file_name = update_module_file_with_data_file(self._jupyterhub_dir, new_data_file_name, replace, timestamp)
if replaced_file_name:
print('REPLACING FILE', replaced_file_name)
# now remove the state file from the filesystem
file_to_remove = os.path.join(self._jupyterhub_dir, 'data', replaced_file_name)
try:
Expand Down

0 comments on commit f915e44

Please sign in to comment.