Skip to content

Commit

Permalink
Merge pull request #528 from Kimoby/add-mutate-again
Browse files Browse the repository at this point in the history
Add `.mutate()` again, and some general-purpose improvements
  • Loading branch information
guillaume-chevalier authored Jul 28, 2022
2 parents c355500 + a23e590 commit b95e25f
Show file tree
Hide file tree
Showing 20 changed files with 451 additions and 221 deletions.
2 changes: 1 addition & 1 deletion neuraxle/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.8.0"
__version__ = "0.8.1"
247 changes: 214 additions & 33 deletions neuraxle/base.py

Large diffs are not rendered by default.

12 changes: 6 additions & 6 deletions neuraxle/distributed/streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -700,12 +700,9 @@ def transform_data_container(self, data_container: DACT, context: CX) -> DACT:
workers_joiner.set_join_quantities(n_workers, n_minibatches_per_worker)
data_container = workers_joiner.join_workers(data_container, context)

for step in self.body:
step: ParallelWorkersWrapper = step
step.join()
if self.logging_thread is not None:
self.logging_thread.join(timeout=5.0)
self.logging_thread = None
# for step in self.body:
# step: ParallelWorkersWrapper = step
# step.join()

return data_container

Expand All @@ -723,6 +720,9 @@ def _did_transform(self, data_container: DACT, context: CX) -> DACT:
for name, step in self[:-1]:
step: ParallelWorkersWrapper = step
step.stop()
if self.logging_thread is not None:
self.logging_thread.join(timeout=5.0)
self.logging_thread = None

return self.data_joiner.handle_transform(data_container, context)

Expand Down
5 changes: 4 additions & 1 deletion neuraxle/logging/logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,10 @@ def without_file_handler(self) -> 'NeuraxleLogger':
"""
if self.name in LOGGER_FILE_HANDLERS:
LOGGER_FILE_HANDLERS[self.name].close()
self.removeHandler(LOGGER_FILE_HANDLERS[self.name])
try:
self.removeHandler(LOGGER_FILE_HANDLERS[self.name])
except KeyError as ke:
raise ke from ke # Breakpoint here.
del LOGGER_FILE_HANDLERS[self.name]
return self

Expand Down
25 changes: 9 additions & 16 deletions neuraxle/metaopt/auto_ml.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,27 +29,20 @@
from copy import copy
from typing import ContextManager, Iterator, List, Optional, Tuple

from neuraxle.base import (CX, BaseService, BaseStep, BaseStepT,
ExecutionContext, ForceHandleMixin,
TruncableService, _HasChildrenMixin)
from neuraxle.base import (CX, BaseService, BaseStep, BaseStepT, ExecutionContext, ForceHandleMixin, TruncableService,
_HasChildrenMixin)
from neuraxle.data_container import IDT
from neuraxle.data_container import DataContainer as DACT
from neuraxle.hyperparams.space import HyperparameterSpace
from neuraxle.metaopt.callbacks import (ARG_Y_EXPECTED, ARG_Y_PREDICTD,
BaseCallback, CallbackList,
MetricCallback, ScoringCallback)
from neuraxle.metaopt.callbacks import (ARG_Y_EXPECTED, ARG_Y_PREDICTD, BaseCallback, CallbackList, MetricCallback,
ScoringCallback)
from neuraxle.metaopt.context import AutoMLContext
from neuraxle.metaopt.data.aggregates import (Client, Project, Root, Round,
Trial, TrialSplit)
from neuraxle.metaopt.data.aggregates import Client, Project, Root, Round, Trial, TrialSplit
from neuraxle.metaopt.data.reporting import RoundReport
from neuraxle.metaopt.data.vanilla import (DEFAULT_CLIENT, DEFAULT_PROJECT,
RoundDataclass, ScopedLocation)
from neuraxle.metaopt.optimizer import (BaseHyperparameterOptimizer,
GridExplorationSampler,
RandomSearchSampler)
from neuraxle.metaopt.data.vanilla import DEFAULT_CLIENT, DEFAULT_PROJECT, RoundDataclass, ScopedLocation
from neuraxle.metaopt.optimizer import BaseHyperparameterOptimizer, GridExplorationSampler, RandomSearchSampler
from neuraxle.metaopt.repositories.repo import HyperparamsRepository
from neuraxle.metaopt.validation import (BaseValidationSplitter,
ValidationSplitter)
from neuraxle.metaopt.validation import BaseValidationSplitter, ValidationSplitter


class Trainer(BaseService):
Expand Down Expand Up @@ -110,7 +103,7 @@ def train_split(
If validation DACT is None, the evaluation metrics will not save validation results.
"""
trial_split_scope: TrialSplit = trial_split_scope.with_n_epochs(self.n_epochs)
p: BaseStep = pipeline.copy(trial_split_scope.context, deep=True)
p: BaseStep = pipeline._copy(trial_split_scope.context, deep=True)
p.set_hyperparams(trial_split_scope.get_hyperparams())

for _ in range(self.n_epochs):
Expand Down
10 changes: 5 additions & 5 deletions neuraxle/metaopt/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class AutoMLContext(CX):

@property
def logger(self) -> NeuraxleLogger:
self.add_scoped_logger_file_handler() # TODO: this is perhaps why logs are duplicated.
# self.add_scoped_logger_file_handler() # TODO: this is perhaps why logs are duplicated.
return CX.logger.fget(self)

@property
Expand Down Expand Up @@ -70,7 +70,7 @@ def read_scoped_log(self) -> str:
# TODO: with self.lock:
return self.repo.get_log_from_logging_handler(self.logger, self.loc)

def copy(self):
def _copy(self):
copy_kwargs = self._get_copy_kwargs()
return AutoMLContext(**copy_kwargs)

Expand All @@ -89,13 +89,13 @@ def from_context(
:param context: ExecutionContext
"""
new_context: AutoMLContext = AutoMLContext.copy(
new_context: AutoMLContext = AutoMLContext._copy(
context if context is not None else AutoMLContext()
)
if not new_context.has_service(HyperparamsRepository):
new_context.register_service(
HyperparamsRepository,
repo or VanillaHyperparamsRepository(new_context.get_path())
(repo or VanillaHyperparamsRepository(new_context.get_path())).with_lock()
)
if not new_context.has_service(ScopedLocation):
new_context.register_service(
Expand Down Expand Up @@ -142,7 +142,7 @@ def with_loc(self, loc: ScopedLocation) -> 'AutoMLContext':
:param loc: ScopedLocation
:return: an AutoMLContext copy with the new loc attribute.
"""
new_self: AutoMLContext = self.copy()
new_self: AutoMLContext = self._copy()
new_self.register_service(ScopedLocation, loc)
return new_self

Expand Down
52 changes: 17 additions & 35 deletions neuraxle/metaopt/data/aggregates.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,34 +42,20 @@
from abc import abstractmethod
from collections import OrderedDict
from types import TracebackType
from typing import (Callable, ContextManager, Dict, Generic, Iterable, List,
Optional, Type, TypeVar)
from typing import Callable, ContextManager, Dict, Generic, Iterable, List, Optional, Type, TypeVar

import numpy as np
from neuraxle.base import (BaseService, Flow, TrialStatus,
_CouldHaveContext)
from neuraxle.hyperparams.space import (FlatDict, HyperparameterSamples,
HyperparameterSpace)
from neuraxle.base import BaseService, Flow, TrialStatus, _CouldHaveContext
from neuraxle.hyperparams.space import FlatDict, HyperparameterSamples, HyperparameterSpace
from neuraxle.metaopt.context import AutoMLContext
from neuraxle.metaopt.data.reporting import (BaseReport, ClientReport,
MetricResultsReport,
ProjectReport, RootReport,
RoundReport, SubReportT,
TrialReport, TrialSplitReport,
dataclass_2_report)
from neuraxle.metaopt.data.vanilla import (DEFAULT_CLIENT, DEFAULT_PROJECT,
RETRAIN_TRIAL_SPLIT_ID,
BaseDataclass, ClientDataclass,
MetricResultsDataclass,
ProjectDataclass, RootDataclass,
RoundDataclass, ScopedLocation,
ScopedLocationAttr,
ScopedLocationAttrInt,
SubDataclassT, TrialDataclass,
TrialSplitDataclass,
dataclass_2_id_attr)
from neuraxle.metaopt.data.reporting import (BaseReport, ClientReport, MetricResultsReport, ProjectReport, RootReport,
RoundReport, SubReportT, TrialReport, TrialSplitReport, dataclass_2_report)
from neuraxle.metaopt.data.vanilla import (DEFAULT_CLIENT, DEFAULT_PROJECT, RETRAIN_TRIAL_SPLIT_ID, BaseDataclass,
ClientDataclass, MetricResultsDataclass, ProjectDataclass, RootDataclass,
RoundDataclass, ScopedLocation, ScopedLocationAttr, ScopedLocationAttrInt,
SubDataclassT, TrialDataclass, TrialSplitDataclass, dataclass_2_id_attr)
from neuraxle.metaopt.optimizer import BaseHyperparameterOptimizer
from neuraxle.metaopt.repositories.repo import (HyperparamsRepository,
from neuraxle.metaopt.repositories.repo import (HyperparamsRepository, SynchronizedHyperparamsRepositoryWrapper,
VanillaHyperparamsRepository)

SubAggregateT = TypeVar('SubAggregateT', bound=Optional['BaseAggregate'])
Expand Down Expand Up @@ -127,7 +113,7 @@ def __init__(self, _dataclass: SubDataclassT, context: AutoMLContext, is_deep=Fa
self._spare: SubDataclassT = copy.copy(_dataclass).shallow()
# TODO: pre-push context to allow for dc auto-loading and easier parent auto-loading?
self.context: AutoMLContext = context.push_attr(_dataclass)
self.loc: ScopedLocation = self.context.loc.copy()
self.loc: ScopedLocation = self.context.loc._copy()
self.is_deep = is_deep
self._parent: ParentAggregateT = parent

Expand Down Expand Up @@ -212,7 +198,7 @@ def flow(self) -> Flow:
return self.context.flow

@property
def repo(self) -> HyperparamsRepository:
def repo(self) -> SynchronizedHyperparamsRepositoryWrapper:
return self.context.repo

def subaggregate(self, _dataclass: SubDataclassT, context: AutoMLContext, is_deep=False, parent: ParentAggregateT = None) -> SubAggregateT:
Expand Down Expand Up @@ -307,8 +293,8 @@ def __enter__(self) -> SubAggregateT:
# self.context.free_scoped_logger_handler_file()
self._invariant()
self._managed_resource._invariant()

self._managed_resource.context.add_scoped_logger_file_handler()
with self.repo.lock: # TODO: locking twice, not needed.
self._managed_resource.context.add_scoped_logger_file_handler()

return self._managed_resource

Expand All @@ -318,10 +304,9 @@ def __exit__(
exc_val: Optional[BaseException],
exc_tb: Optional[TracebackType]
) -> Optional[bool]:
self._managed_resource.context.free_scoped_logger_file_handler()
# self.context.add_scoped_logger_file_handler()

handled_err: bool = self._release_managed_subresource(self._managed_resource, exc_val)
with self.repo.lock: # TODO: locking twice, probably not needed.
handled_err: bool = self._release_managed_subresource(self._managed_resource, exc_val)
self._managed_resource.context.free_scoped_logger_file_handler()
return handled_err

@_with_method_as_context_manager
Expand Down Expand Up @@ -635,9 +620,6 @@ def append(self, trial: 'Trial'):
"""
self.save_subaggregate(trial, deep=False)

def copy(self) -> 'Round':
return Round(copy.deepcopy(self._dataclass), self.context.copy().with_loc(self.loc.popped()))

@property
def main_metric_name(self) -> str:
return self._dataclass.main_metric_name
Expand Down
8 changes: 4 additions & 4 deletions neuraxle/metaopt/data/vanilla.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ def __getitem__(

raise ValueError(f"Invalid key type {key.__class__.__name__} for key {key}.")

def copy(self) -> 'ScopedLocation':
def _copy(self) -> 'ScopedLocation':
"""
Returns a copy of the :class:`ScopedLocation`.
"""
Expand All @@ -158,10 +158,10 @@ def with_dc(self, dc: 'BaseDataclass') -> 'ScopedLocation':
if isinstance(dc, RootDataclass):
return ScopedLocation()
elif dc.is_terminal_leaf():
cpy = self.copy()
cpy = self._copy()
cpy.metric_name = dc.get_id()
return cpy
self_copy = self.copy()
self_copy = self._copy()
self_copy[dc.__class__] = dc.get_id()
return self_copy

Expand All @@ -181,7 +181,7 @@ def pad_nans(self) -> 'ScopedLocation':
"""
Returns a :class:`ScopedLocation` with the missing elements filled with the default null values.
"""
self_copy = self.copy()
self_copy = self._copy()
null_vals = [
NULL_PROJECT,
NULL_CLIENT,
Expand Down
8 changes: 4 additions & 4 deletions neuraxle/metaopt/hyperopt/tpe.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ def __init__(
expected_n_trials=number_of_initial_random_step
)

def find_next_best_hyperparams(self, round: RoundReport, hp_space: HyperparameterSpace) -> HyperparameterSamples:
def find_next_best_hyperparams(self, _round: RoundReport, hp_space: HyperparameterSpace) -> HyperparameterSamples:
"""
Find the next best hyperparams using previous trials.
Expand All @@ -77,11 +77,11 @@ def find_next_best_hyperparams(self, round: RoundReport, hp_space: Hyperparamete
"""

# Perform a first pseudo-randomized search:
if len(round) < self.number_of_initial_random_step:
return self.initial_auto_ml_algo.find_next_best_hyperparams(round, hp_space)
if len(_round) < self.number_of_initial_random_step:
return self.initial_auto_ml_algo.find_next_best_hyperparams(_round, hp_space)

# Create gaussian mixture of good and gaussian mixture of bads. Lists here are on a per-hp basis:
hyperparams_keys, divided_good_and_bad_distrs = self.mixture_factory.create_from(round, hp_space)
hyperparams_keys, divided_good_and_bad_distrs = self.mixture_factory.create_from(_round, hp_space)

# Sample the next hyperparams finally:
return self._sample_next_hyperparams_from_gaussians_div(
Expand Down
12 changes: 6 additions & 6 deletions neuraxle/metaopt/optimizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
class BaseHyperparameterOptimizer(ABC):

@abstractmethod
def find_next_best_hyperparams(self, round: RoundReport, hp_space: HyperparameterSpace) -> HyperparameterSamples:
def find_next_best_hyperparams(self, _round: RoundReport, hp_space: HyperparameterSpace) -> HyperparameterSamples:
"""
Find the next best hyperparams using previous trials, that is the
whole :class:`neuraxle.metaopt.data.aggregate.Round`.
Expand All @@ -64,7 +64,7 @@ class HyperparameterSamplerStub(BaseHyperparameterOptimizer):
def __init__(self, preconfigured_hp_samples: HyperparameterSamples):
self.preconfigured_hp_samples = preconfigured_hp_samples

def find_next_best_hyperparams(self, round: RoundReport, hp_space: HyperparameterSpace) -> HyperparameterSamples:
def find_next_best_hyperparams(self, _round: RoundReport, hp_space: HyperparameterSpace) -> HyperparameterSamples:
return self.preconfigured_hp_samples


Expand All @@ -81,7 +81,7 @@ class RandomSearchSampler(BaseHyperparameterOptimizer):
def __init__(self):
BaseHyperparameterOptimizer.__init__(self)

def find_next_best_hyperparams(self, round: RoundReport, hp_space: HyperparameterSpace) -> HyperparameterSamples:
def find_next_best_hyperparams(self, _round: RoundReport, hp_space: HyperparameterSpace) -> HyperparameterSamples:
"""
Randomly sample the next hyperparams to try.
Expand Down Expand Up @@ -144,19 +144,19 @@ def _reinitialize_grid(self, hp_space: HyperparameterSpace, previous_trials_hp:
vals: Tuple[int] = tuple(flat_dict_sample.values())
self._seen_hp_grid_values.add(vals)

def find_next_best_hyperparams(self, round: RoundReport, hp_space: HyperparameterSpace) -> HyperparameterSamples:
def find_next_best_hyperparams(self, _round: RoundReport, hp_space: HyperparameterSpace) -> HyperparameterSamples:
"""
Sample the next hyperparams to try.
:param round_scope: round scope
:return: next hyperparams
"""
self._reinitialize_grid(hp_space, round.get_all_hyperparams())
self._reinitialize_grid(hp_space, _round.get_all_hyperparams())

_space_max = reduce(operator.mul, self.flat_hp_grid_lens, 1)

if self._n_sampled >= max(self.expected_n_trials, _space_max):
return RandomSearchSampler().find_next_best_hyperparams(round, hp_space)
return RandomSearchSampler().find_next_best_hyperparams(_round, hp_space)
for _ in range(_space_max):
i_grid_keys: Tuple[int] = tuple(self._gen_keys_for_grid())
grid_values: OrderedDict[str, Any] = tuple(self[i_grid_keys].values())
Expand Down
4 changes: 2 additions & 2 deletions neuraxle/metaopt/repositories/repo.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,10 +110,10 @@ def with_lock(self) -> 'SynchronizedHyperparamsRepositoryWrapper':

def func_with_rlock():
def decorator(func):
def f(self, *args, **kwargs):
def _LOCKED_REPO(self, *args, **kwargs):
with self.lock:
return func(self, *args, **kwargs)
return f
return _LOCKED_REPO
return decorator


Expand Down
14 changes: 8 additions & 6 deletions neuraxle/steps/output_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@ class OutputTransformerWrapper(ForceHandleOnlyMixin, MetaStep):
so that it can transform the expected outputs.
"""

def __init__(self, wrapped, cache_folder_when_no_handle=None):
def __init__(self, wrapped):
MetaStep.__init__(self, wrapped)
ForceHandleOnlyMixin.__init__(self, cache_folder_when_no_handle)
ForceHandleOnlyMixin.__init__(self)

def _transform_data_container(self, data_container: DACT, context: CX) -> DACT:
"""
Expand Down Expand Up @@ -157,8 +157,10 @@ def _did_process(self, data_container: DACT, context: CX) -> DACT:
di, eo = data_container.data_inputs

self._assert(
len(di) == len(eo),
f'{self.name}: Found different len for data inputs, and expected outputs. Please return the same the same amount of data inputs, and expected outputs, or otherwise create your own handler methods to do more funky things.',
di is None or eo is None or (len(di) == len(eo)),
f'{self.name}: Found different len for non-null data inputs, and expected outputs. '
f'Please return the same the same amount of data inputs, and expected outputs, or '
f'otherwise create your own handler methods to do more funky things.',
context
)

Expand Down Expand Up @@ -186,9 +188,9 @@ class InputAndOutputTransformerWrapper(_DidProcessInputOutputHandlerMixin, Force
:class:`~neuraxle.base.ForceHandleOnlyMixin`
"""

def __init__(self, wrapped, cache_folder_when_no_handle=None):
def __init__(self, wrapped):
MetaStep.__init__(self, wrapped)
ForceHandleOnlyMixin.__init__(self, cache_folder_when_no_handle)
ForceHandleOnlyMixin.__init__(self)
_DidProcessInputOutputHandlerMixin.__init__(self)

def _transform_data_container(self, data_container: DACT, context: CX) -> DACT:
Expand Down
5 changes: 4 additions & 1 deletion neuraxle/union.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,10 @@

class FeatureUnion(ForceHandleOnlyMixin, TruncableSteps):
"""
Parallelize the union of many pipeline steps.
Transform features in parallel as the union of many pipeline steps.
This step is also available with true parallel processing threads or
processes in the streaming package of Neuraxle.
.. code-block:: python
Expand Down
Loading

0 comments on commit b95e25f

Please sign in to comment.