Skip to content

Commit

Permalink
Merge pull request #27 from caglorithm/feature/array_results
Browse files Browse the repository at this point in the history
Feature/array results
  • Loading branch information
caglorithm authored Dec 2, 2020
2 parents 797b163 + 4815b84 commit dd3d0c5
Show file tree
Hide file tree
Showing 4 changed files with 104 additions and 82 deletions.
54 changes: 54 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
# This workflow will install Python dependencies, run tests and lint with a variety of Python versions
# For more information see: https://help.github.com/actions/language-and-framework-guides/using-python-with-github-actions

name: ci

on:
push:
branches:
- "*"
pull_request:
branches:
- "*"

jobs:
build:
runs-on: ${{ matrix.os }}
strategy:
fail-fast: false
matrix:
os: [ubuntu-latest, macos-latest]
python-version: [3.6, 3.7, 3.8]
exclude:
- os: macos-latest
python-version: 3.8

steps:
- uses: actions/checkout@v2
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v2
with:
python-version: ${{ matrix.python-version }}
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install pytest flake8 codecov pytest-cov wheel setuptools
if [ -f requirements.txt ]; then pip install -r requirements.txt; fi
- name: Lint with flake8
run: |
# stop the build if there are Python syntax errors or undefined names
flake8 . --count --select=E9,F63,F7,F82 --show-source --statistics
# exit-zero treats all errors as warnings. The GitHub editor is 127 chars wide
flake8 . --count --exit-zero --max-complexity=10 --max-line-length=127 --statistics
- name: Test with pytest
run: |
PYTHONPATH=. pytest --durations=0 --cov-report=xml --cov=mopet tests/
- name: Upload coverage to Codecov
uses: codecov/codecov-action@v1
with:
file: ./coverage.xml
files: ./coverage1.xml,./coverage2.xml
flags: unittests
env_vars: OS,PYTHON
name: codecov-umbrella
verbose: true
3 changes: 1 addition & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ You can load the exploration results using
ex.load_results(all=True)
```

Note that using `all=True` will load all results into memory. Please make sure that you have enough free memory for this. If not, do not use `all=True` but load individual results using their `run_id` (which is an integer counting up one per run):
Note that using `all=True` will load all results into memory (as opposed to just the parameters of each run). Please make sure that you have enough free memory for this since your simulation results could be huge. If you do not want this, you can load individual results using their `run_id` (which is an integer counting up one per run):

```python
ex.get_run(run_id=0)
Expand All @@ -112,7 +112,6 @@ Let's plot the results!
```python

import matplotlib.pyplot as plt
# a nice color map
plt.imshow(pivoted, \
extent = [min(ex.df.x), max(ex.df.x),
min(ex.df.y), max(ex.df.y)], origin='lower')
Expand Down
115 changes: 43 additions & 72 deletions mopet/mopet.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,11 @@ def __init__(
num_gpus: int = None,
):
"""Defines a parameter exploration of a given `function`.
:param function: Function to evaluate at each run
:type function: function
:param explore_params: Exploration parameters (individual) for each run
:type explore_params: dict
:type explore_params: dict
:param default_params: Default (shared) parameters to load for each run, optional, defaults to None
:type default_params: dict
:param exploration_name: Name of the run, will create a name if left empty, defaults to None
Expand Down Expand Up @@ -68,9 +68,7 @@ def __init__(
self.explore_params = copy.deepcopy(explore_params)

if exploration_name is None:
exploration_name = "exploration" + datetime.datetime.now().strftime(
"_%Y_%m_%d_%HH_%MM_%SS"
)
exploration_name = "exploration" + datetime.datetime.now().strftime("_%Y_%m_%d_%HH_%MM_%SS")
self.exploration_name = exploration_name

if hdf_filename is None:
Expand Down Expand Up @@ -177,17 +175,15 @@ def run(self):

self._shutdown_ray()

def load_results(
self, filename=None, exploration_name=None, aggregate=True, all=False
):
"""Load results from previous explorations. This function
will open an HDF file and look for an exploration. It will
def load_results(self, filename=None, exploration_name=None, aggregate=True, all=False):
"""Load results from previous explorations. This function
will open an HDF file and look for an exploration. It will
create a Pandas `Dataframe` object (accessible through the
attribute `.df`) with a list of all runs and their parameters.
You can load the exploration results using following parameters:
- If `aggregate==True`, all scalar results (such as `float`
- If `aggregate==True`, all scalar results (such as `float`
or `int`) from the exploration will be added to the Dataframe.
- If `all==True`, then all results, including arrays and other
types, will be saved in the attribute `.results`. This can take
Expand Down Expand Up @@ -221,7 +217,7 @@ def load_results(
def get_run(self, run_id=None, run_name=None, filename=None, exploration_name=None):
"""Get a single result from a previous exploration. This function
will load a single result from the HDF file. Use this function
if you want to avoid loading all results to memory, which you can
if you want to avoid loading all results to memory, which you can
do using `.load_results(all=True)`.
Note: This function will open the HDF for reading but will not close
Expand All @@ -237,15 +233,13 @@ def get_run(self, run_id=None, run_name=None, filename=None, exploration_name=No
:type filename: str, optional
:param exploration_name: Name of the exploration to load data from. Previously used exploration_name will be used if not given, defaults to None
:type exploration_name: str, optional
:return: Results of the run
:rtype: dict
:raises: NoSuchExplorationError if hdf5 file does not contain `exploration_name` group.
"""
# get result by id or if not then by run_name (hdf_run)
assert (
run_id is not None or run_name is not None
), "Either use `run_id` or `run_name`."
assert run_id is not None or run_name is not None, "Either use `run_id` or `run_name`."

if exploration_name:
self.exploration_name = exploration_name
Expand All @@ -257,31 +251,24 @@ def get_run(self, run_id=None, run_name=None, filename=None, exploration_name=No
self._open_hdf(filename)

try:
run_results_group = self.h5file.get_node(
"/" + self.exploration_name, "runs"
)[run_name]
run_results_group = self.h5file.get_node("/" + self.exploration_name, "runs")[run_name]
except NoSuchNodeError:
raise ExplorationNotFoundError(
"Exploration %s could not be found in HDF file %s".format(
self.exploration_name, self.hdf_filename
)
"Exploration %s could not be found in HDF file %s".format(self.exploration_name, self.hdf_filename)
)

result = self._read_group_as_dict(run_results_group)
return result

def _cartesian_product_dict(self, input_dict):
"""Returns the cartesian product of the exploration parameters.
:param input_dict: Parameter names and their values to explore
:type input_dict: dict
:return: List of dictionaries of all possible combinations
:rtype: list
"""
return [
dict(zip(input_dict.keys(), values))
for values in itertools.product(*input_dict.values())
]
return [dict(zip(input_dict.keys(), values)) for values in itertools.product(*input_dict.values())]

##############################################
## MULTIPROCESSING
Expand All @@ -302,8 +289,7 @@ def _init_ray(self, num_cpus: int = None, num_gpus: int = None):
assert ray.is_initialized() is True, "Could not initialize ray."

def _shutdown_ray(self):
"""Shutdown ray.
"""
"""Shutdown ray."""
ray.shutdown()
assert ray.is_initialized() is False, "Could not shutdown ray."

Expand All @@ -313,7 +299,7 @@ def _shutdown_ray(self):

def _store_dict_to_hdf(self, group, dict_data):
"""Stores a dictionary into a group of the hdf file.
:param group: group in hdf file to store data in
:type group: [type]
:param dict_data: dictionary with data to store
Expand All @@ -323,9 +309,7 @@ def _store_dict_to_hdf(self, group, dict_data):
try:
self.h5file.create_array(group, r_key, obj=r_val)
except:
logging.warning(
f"Could not store dict entry {r_key} (type: {type(r_val)})"
)
logging.warning(f"Could not store dict entry {r_key} (type: {type(r_val)})")

def _init_hdf(self):
"""Create hdf storage file and all necessary groups.
Expand All @@ -335,9 +319,7 @@ def _init_hdf(self):
try:
self.h5file = tables.open_file(self.hdf_filename, mode="a")
except IOError:
raise Hdf5FileNotExistsError(
"Hdf5 file {} does not exist".format(self.hdf_filename)
)
raise Hdf5FileNotExistsError("Hdf5 file {} does not exist".format(self.hdf_filename))

try:
self.run_group = self.h5file.create_group("/", self.exploration_name)
Expand All @@ -349,9 +331,7 @@ def _init_hdf(self):
)

# create group in which all data from runs will be saved
self.runs_group = self.h5file.create_group(
self.h5file.root[self.exploration_name], "runs"
)
self.runs_group = self.h5file.create_group(self.h5file.root[self.exploration_name], "runs")

if self.default_params is not None:
# create group in which all default parameters will be saved
Expand All @@ -362,28 +342,24 @@ def _init_hdf(self):
self._store_dict_to_hdf(self.default_params_group, self.default_params)

# create group in which exploration parameters will be saved
self.explore_params_group = self.h5file.create_group(
self.h5file.root[self.exploration_name], "explore_params"
)
self.explore_params_group = self.h5file.create_group(self.h5file.root[self.exploration_name], "explore_params")
self._store_dict_to_hdf(self.explore_params_group, self.explore_params)

# create group in which information about this run is saved
# self.info_group = self.h5file.create_group("/", "info")

def _pre_storage_routine(self):
"""Routines for preparing the hdf storage.
"""
"""Routines for preparing the hdf storage."""
# initialize the hdf file
self._init_hdf()

def _post_storage_routine(self):
"""Routines for closing the hdf storage.
"""
"""Routines for closing the hdf storage."""
self.h5file.close()

def _store_result(self, result_id, ray_object, run_params):
"""Resolves results from the ray object and stores the results.
:param result_id: id of the run
:type result_id: int
:param ray_object: ray object
Expand All @@ -397,15 +373,13 @@ def _store_result(self, result_id, ray_object, run_params):
# resolve the ray object and get the returned dictionary from the evaluation function
result_dict = ray.get(ray_object)

assert isinstance(
result_dict, dict
), f"Returned result must be a dictionary, is `{type(result_dict)}`."
assert isinstance(result_dict, dict), f"Returned result must be a dictionary, is `{type(result_dict)}`."

self._store_result_in_hdf(run_result_name, result_dict, run_params)

def _store_result_in_hdf(self, run_result_name, result_dict, run_params):
"""Stores the results of a ray object of a single run and the parameters of the run.
:param run_result_name: Name of the result
:type run_result_name: str
:param run_params: Explored parameters of the run
Expand Down Expand Up @@ -436,9 +410,7 @@ def _create_df(self):
"""
logging.info("Creating new results DataFrame")
self.explore_params = self._read_explore_params()
self.dfResults = pd.DataFrame(
columns=self.explore_params.keys(), index=self.run_ids, dtype=object
)
self.dfResults = pd.DataFrame(columns=self.explore_params.keys(), index=self.run_ids, dtype=object)
for key, value in self.params.items():
self.dfResults.loc[key] = value
return self.dfResults
Expand All @@ -452,28 +424,23 @@ def _open_hdf(self, filename=None):
"""
if filename is not None:
self.hdf_filename = filename
assert (
self.hdf_filename is not None
), "No hdf filename was given or previously set."
assert self.hdf_filename is not None, "No hdf filename was given or previously set."

try:
self.h5file = tables.open_file(self.hdf_filename, mode="r+")
except OSError:
raise Hdf5FileNotExistsError(
"Hdf5 file %s does not exist".format(self.hdf_filename)
)
raise Hdf5FileNotExistsError("Hdf5 file %s does not exist".format(self.hdf_filename))

self._hdf_open_for_reading = True
logging.info(f"{self.hdf_filename} opened for reading.")

def close_hdf(self):
"""Close a previously opened HDF file.
"""
"""Close a previously opened HDF file."""
self.h5file.close()
self._hdf_open_for_reading = False
logging.info(f"{self.hdf_filename} closed.")

def _aggregate_results(self, exploration_name=None):
def _aggregate_results(self, exploration_name=None, arrays=True):
"""Go through all results saved in `.results` and store all floats in the results table.
TODO: Direct reading from hdf without having to load it to memory, like in neurolib
Expand All @@ -484,13 +451,19 @@ def _aggregate_results(self, exploration_name=None):
"""
nan_value = np.nan
logging.info("Aggregating scalar results ...")
for runId, parameters in tqdm.tqdm(
self.dfResults.iterrows(), total=len(self.dfResults)
):
for runId, parameters in tqdm.tqdm(self.dfResults.iterrows(), total=len(self.dfResults)):
result = self.get_run(runId)
for key, value in result.items():
if isinstance(value, float):
# we check for the type of the value and
# save it to the datafram accordingly
if isinstance(value, (float, int)):
self.dfResults.loc[runId, key] = value
elif isinstance(value, np.ndarray) and arrays == True:
# to save a numpy array, convert column to object type
if key not in self.dfResults:
self.dfResults[key] = None
self.dfResults[key] = self.dfResults[key].astype(object)
self.dfResults.at[runId, key] = value
else:
self.dfResults.loc[runId, key] = nan_value
# drop nan columns
Expand Down Expand Up @@ -575,9 +548,7 @@ def _read_explore_params(self):
:return: Dictionary with explored parameters
:rtype: dict
"""
explore_params_group = self.h5file.get_node(
"/" + self.exploration_name, "explore_params"
)
explore_params_group = self.h5file.get_node("/" + self.exploration_name, "explore_params")
self.explore_params = self._read_group_as_dict(explore_params_group)
return self.explore_params

Expand All @@ -599,7 +570,7 @@ def df(self):

@ray.remote
def _ray_remote(function, params):
""" This is a ray remote function (see ray documentation). It runs the `function` on each ray worker.
"""This is a ray remote function (see ray documentation). It runs the `function` on each ray worker.
:param function: function to be executed remotely.
:type function: callable
Expand Down
Loading

0 comments on commit dd3d0c5

Please sign in to comment.