diff --git a/examples/auto_ml/plot_automl_loop_clean_kata.py b/examples/auto_ml/plot_automl_loop_clean_kata.py index ae178476..04456b0f 100644 --- a/examples/auto_ml/plot_automl_loop_clean_kata.py +++ b/examples/auto_ml/plot_automl_loop_clean_kata.py @@ -116,7 +116,7 @@ def main(tmpdir: str): auto_ml = AutoML( pipeline=pipeline, hyperparams_optimizer=RandomSearchSampler(), - validation_splitter=ValidationSplitter(validation_size=0.20), + validation_splitter=ValidationSplitter(validation_size=0.20).set_to_force_expected_outputs_for_scoring(), scoring_callback=ScoringCallback(accuracy_score, higher_score_is_better=True), n_trials=7, epochs=1, diff --git a/neuraxle/base.py b/neuraxle/base.py index 31ad7a79..8da66764 100644 --- a/neuraxle/base.py +++ b/neuraxle/base.py @@ -330,9 +330,7 @@ class _HasRecursiveMethods: An internal class to represent a step that has recursive methods. The apply :func:`apply` function is used to apply a method to a step and its children. - - Example usage : - + Example usage: .. code-block:: python @@ -502,6 +500,38 @@ def _return_empty(*args, **kwargs): f'Method {method} of {self} must return None or a RecursiveDict, as it is applied recursively.') return results + def __str__(self) -> str: + """ + Return a pretty representation of the step or service. + Use :func:`~neuraxle.base.BaseTransformer.__repr__`, for a more + detailed string representation if needed. + + :return: return pretty representation such as ``StepName(name='StepName')``. + """ + return self._repr(verbose=False) + + def __repr__(self) -> str: + """ + Return a detailed and pretty representation of the pipeline step. + Use :func:`~neuraxle.base.BaseTransformer.__str__`, for a less + detailed string representation if needed. + + :return: return pretty representation, such as ``StepName(name='StepName', hyperparameters=HyperparameterSamples({...}))``. + """ + return self._repr(verbose=True) + + def _repr(self, level=0, verbose=False): + output = self.__class__.__name__ + '(' + output += self._repr_params(level=level, verbose=verbose).replace(', ', '', 1) + return output + ')' + + def _repr_params(self, level=0, verbose=False) -> str: + output = '' + has_name = self.__class__.__name__ != self.name + if has_name: + output += ", name='" + self.name + "'" + return output + class _HasConfig(ABC): """ @@ -577,6 +607,13 @@ def get_config(self) -> RecursiveDict: def _get_config(self) -> RecursiveDict: return self.config + def _repr_params(self, level=0, verbose=False): + if verbose: + conf: RecursiveDict = self._get_config() + if len(conf) > 0: + return ", config=" + pprint.pformat(conf) + return '' + class _CanMutate: """ @@ -694,6 +731,16 @@ def __init__(self, config: Union[Dict, RecursiveDict] = None, name: str = None): _HasConfig.__init__(self, config=config) _CanMutate.__init__(self) + def _repr_params(self, level=0, verbose=False): + output = "" + _name = _HasRecursiveMethods._repr_params(self, level=level, verbose=verbose) + output += _name + _config = _HasConfig._repr_params(self, level=level, verbose=verbose) + if not (len(_name) > 0): + _config = _config.replace(', ', '', 1) + output += _config + return output + BaseServiceT = TypeVar('BaseServiceT', bound=BaseService) @@ -835,13 +882,7 @@ def get_children(self) -> List[BaseServiceT]: def _repr(self, level=0, verbose=False) -> str: output = self.__class__.__name__ + "(" output += self.wrapped._repr(level=level + 1, verbose=verbose) - has_name = self.__class__.__name__ != self.name - if has_name: - output += ", name='" + self.name + "'" - if verbose: - conf: RecursiveDict = self._get_config() - if len(conf) > 0: - output += ", config=" + pprint.pformat(conf) + output += self._repr_params(level, verbose) output += ")" return output @@ -895,8 +936,10 @@ def __init__( NamedServicesList = List[Union[Tuple[str, BaseServiceT], BaseServiceT]] -class _TruncableMixin: +class _TruncableMixin(MixinForBaseService): # TODO: Merge common code of TruncableServiceMixin and TruncableStepsMixin into this. + def __init__(self): + MixinForBaseService.__init__(self) def mutate(self, new_method="inverse_transform", method_to_assign_to="transform", warn=False) -> 'BaseTransformer': """ @@ -925,11 +968,96 @@ def mutate(self, new_method="inverse_transform", method_to_assign_to="transform" # they won't exist afterward. return BaseStep.mutate(self, new_method, method_to_assign_to, warn) + def _repr(self, level=0, verbose=False) -> str: + + output = self.__class__.__name__ + "(" + output += self._repr_children(level, verbose) + output += self._repr_params(level, verbose) + output += ")" + return output + + def _repr_children(self, level, verbose) -> str: + """ + Returns a string representation of the children of the step like this: + + .. code-block:: python + output = '''[ + ChildrenA, + ChildrenB, + ChildrenC + ]''' + + """ + output = "" + + children: List[BaseService] = self.get_children() + is_compact: bool = len(children) < 2 + + tab0 = " " * level + tab1 = " " * (level + 1) + _nl = "\n" + _nl2 = _nl + if is_compact: + tab1 = "" + _nl = "" + _nl2 = " " + + output += "[" + childs_reprs = [] + for child in children: + try: + if hasattr(child, '_repr'): + c_repr = child._repr(level=level + 1, verbose=verbose) + else: + c_repr = repr(child) + except: + # raise Exception(f"Could not repr child `{child}` of self `{self}`:\n{e}") from e + # c_repr = child._repr(level=level + 1, verbose=verbose) + c_repr = repr(child) # breakpoint here if needed. + childs_reprs.append(c_repr) + + output += _nl + tab1 + ("," + _nl2 + tab1).join(childs_reprs) + output += _nl + (tab1 if is_compact else tab0) + "]" + + return output + + +class _TruncableServiceWithBodyMixin(MixinForBaseService): + """ + This is a mixin to enable the .joiner and .body methods to be + used on a truncable step that has a joiner at its end. + """ + + def __init__(self): + MixinForBaseService.__init__(self) + + @property + def joiner(self) -> BaseService: + """ + returns `self[-1]` + """ + return self[-1] + + @property + def body(self) -> List[BaseService]: + """ + returns `list(self.values())[:-1]`, that is all the steps except the last joiner. + """ + return list(self.values())[:-1] + + @property + def named_body(self) -> List[BaseService]: + """ + returns `list(self.values())[:-1]`, that is all the steps except the last joiner. + """ + return self[:-1] + class TruncableServiceMixin(_TruncableMixin, _HasChildrenMixin): def __init__(self, services: Dict[ServiceName, 'BaseServiceT']): _HasChildrenMixin.__init__(self) + _TruncableMixin.__init__(self) self.set_services(services) def set_services(self, services: Dict[ServiceName, 'BaseServiceT']): @@ -1360,13 +1488,24 @@ def push(self, step: 'BaseTransformer') -> 'CX': services=self.services, ) - def _copy(self): - copy_kwargs = self._get_copy_kwargs() + def _copy(self, copy_func: str = '_copy'): + """ + Copy the execution context, and call a copy function on its services as well. + + :param copy_func: services' copy function. By default is `_copy`. Could also be: `copy`, `_copy_trial`, `_copy_trial_split`, `_copy_train`, `_copy_validation`, and more as needed. + :return: a copy of the execution context (self) using the given copy function on the services. + """ + copy_kwargs = self._get_copy_kwargs(copy_func) return self.__class__(**copy_kwargs) - def _get_copy_kwargs(self): + def _get_copy_kwargs(self, copy_func: str): possibly_copied_services = { - k: (v.copy() if hasattr(v, "copy") else v) + k: ( + getattr(v, copy_func)() if hasattr(v, copy_func) else + v._copy() if hasattr(v, '_copy') else + v.copy() if hasattr(v, 'copy') else + v + ) for k, v in self.services.items() } copy_kwargs = { @@ -1380,22 +1519,6 @@ def _get_copy_kwargs(self): return copy_kwargs - def train(self) -> 'CX': - """ - Set the context's execution phase to train. - """ - new_self = self._copy() - new_self.set_execution_phase(ExecutionPhase.TRAIN) - return new_self - - def validation(self) -> 'CX': - """ - Set the context's execution phase to validation. - """ - new_self = self._copy() - new_self.set_execution_phase(ExecutionPhase.VALIDATION) - return new_self - def synchroneous(self) -> 'CX': if self.has_service('HyperparamsRepository'): repo = self.get_service('HyperparamsRepository') @@ -1599,8 +1722,19 @@ def flush_cache_local(self): shutil.rmtree(self.get_path()) def __len__(self): + # TODO: on services instead maybe? return len(self.parents) + def _repr(self, level=0, verbose=False) -> str: + output = self.__class__.__name__ + output += f'<{self.get_identifier()}>(' + output += self._repr_children(level, verbose) + level += 1 + if len(self.parents) > 0 and verbose is True: + output += f",\n{' ' * level}parents[0]={self.parents[0]._repr(level, verbose)}" + output += ')' + return output + CX = ExecutionContext @@ -1999,7 +2133,7 @@ def _fit_data_container(self, data_container: TrainDACT, context: CX) -> '_Fitta :param context: execution context :return: (fitted self, data container) """ - return self.fit(data_container.di, data_container.eo) + return self.fit(data_container.data_inputs, data_container.expected_outputs) def _did_fit(self, data_container: TrainDACT, context: CX) -> TrainDACT: """ @@ -2072,7 +2206,7 @@ def _fit_transform_data_container( :param context: execution context :return: (fitted self, data container) """ - new_self, out = self.fit_transform(data_container.di, data_container.eo) + new_self, out = self.fit_transform(data_container.data_inputs, data_container.expected_outputs) data_container.set_data_inputs(out) return new_self, data_container @@ -2122,6 +2256,9 @@ class _CustomHandlerMethods(MixinForBaseService): :class:`~neuraxle.distributed.streaming.BaseQueuedPipeline` """ + def __init__(self): + MixinForBaseService.__init__(self) + def handle_fit(self, data_container: TrainDACT, context: CX) -> 'BaseStep': """ Handle fit with a custom handler method for fitting the data container. @@ -2548,6 +2685,15 @@ def get_params(self, deep=False) -> dict: results: HyperparameterSamples = self.apply(method='_get_hyperparams') return results.to_flat_dict() + def _repr_params(self, level: int = 0, verbose: bool = False) -> str: + output = '' + if verbose: + hps: HyperparameterSamples = self._get_hyperparams() + if not hps.is_empty(): + # hps = hps.to_flat_dict(use_wildcards=not verbose) + output += ", hyperparams=" + pprint.pformat(hps) + return output + class _HasSavers(MixinForBaseService): """ @@ -2985,37 +3131,16 @@ def _set_train(self, is_train) -> Optional[RecursiveDict]: self.is_train = is_train return RecursiveDict() - def __str__(self) -> str: - """ - Return a pretty representation of the pipeline step. - Use :func:`~neuraxle.base.BaseTransformer.__repr__`, for a more - detailed string representation if needed. - - :return: return pretty representation such as ``StepName(name='StepName')``. - """ - return self._repr(verbose=False) - - def __repr__(self) -> str: - """ - Return a detailed and pretty representation of the pipeline step. - Use :func:`~neuraxle.base.BaseTransformer.__str__`, for a less - detailed string representation if needed. - - :return: return pretty representation, such as ``StepName(name='StepName', hyperparameters=HyperparameterSamples({...}))``. - """ - return self._repr(verbose=True) - def _repr(self, level=0, verbose=False) -> str: output = self.__class__.__name__ + "(" - has_name: bool = self.__class__.__name__ != self.name - if has_name: - output += "name='" + self.name + "'" - if verbose: - hps: HyperparameterSamples = self._get_hyperparams() - if len(hps) > 0: - if has_name: - output += ", " - output += "hyperparams=" + pprint.pformat(hps) + _params = BaseService._repr_params(self, level=level, verbose=verbose) + if _params.startswith(", "): + _params = _params.replace(', ', '', 1) + output += _params + _hparams = _HasHyperparams._repr_params(self, level=level, verbose=verbose) + if _hparams.startswith(", ") and len(_params) == 0: + _hparams = _hparams.replace(', ', '', 1) + output += _hparams output += ")" return output @@ -3469,6 +3594,7 @@ def __init__( mute_step_renaming_warning: bool = True, ): _HasChildrenMixin.__init__(self) + _TruncableMixin.__init__(self) self.warn_step_renaming = not mute_step_renaming_warning self.set_steps(steps_as_tuple, invalidate=False) @@ -3864,34 +3990,6 @@ def __add__(self, other: 'TruncableSteps') -> 'TruncableSteps': new_self = new_self.set_steps(self.steps_as_tuple + other.steps_as_tuple) return new_self - def _repr(self, level=0, verbose=False) -> str: - children = self.get_children() - is_compact: bool = len(children) < 2 - - tab0 = " " * level - tab1 = " " * (level + 1) - _nl = "\n" - _nl2 = _nl - if is_compact: - tab1 = "" - _nl = "" - _nl2 = " " - - output = self.__class__.__name__ + "([" - output += _nl + tab1 + ("," + _nl2 + tab1).join( - [s._repr(level=level + 1, verbose=verbose) for s in children] - ) - output += _nl + (tab1 if is_compact else tab0) + "]" - has_name = self.__class__.__name__ != self.name - if has_name: - output += ", name='" + self.name + "'" - if verbose: - hps: HyperparameterSamples = self._get_hyperparams() - if len(hps) > 0: - output += ", hyperparams=" + pprint.pformat(hps) - output += ")" - return output - class Identity(NonTransformableMixin, NonFittableMixin, BaseStep): """ @@ -4273,7 +4371,7 @@ def _did_process( class AssertExpectedOutputIsNoneMixin(WillProcessAssertionMixin): def _assert_at_lifecycle(self, data_container: DACT, context: CX): - eo_empty = (data_container.expected_outputs is None) or all(v is None for v in data_container.eo) + eo_empty = (data_container.expected_outputs is None) or all(v is None for v in data_container.expected_outputs) self._assert( eo_empty, f"Expected datacontainer.expected_outputs to be a `None` or a list of `None`. Received {data_container.expected_outputs}.", @@ -4283,7 +4381,7 @@ def _assert_at_lifecycle(self, data_container: DACT, context: CX): class AssertExpectedOutputIsNotNoneMixin(WillProcessAssertionMixin): def _assert_at_lifecycle(self, data_container: DACT, context: CX): - eo_empty = (data_container.expected_outputs is None) or all(v is None for v in data_container.eo) + eo_empty = (data_container.expected_outputs is None) or all(v is None for v in data_container.expected_outputs) self._assert( not eo_empty, f"Expected datacontainer.expected_outputs to not be a `None` nor a list of `None`. Received {data_container.expected_outputs}.", diff --git a/neuraxle/data_container.py b/neuraxle/data_container.py index 5156da6e..d742f69c 100644 --- a/neuraxle/data_container.py +++ b/neuraxle/data_container.py @@ -26,10 +26,10 @@ import copy import math from operator import attrgetter -from typing import (Any, Callable, Generic, Iterable, Iterator, List, Optional, - Tuple, TypeVar, Union) +from typing import Any, Callable, Generic, Iterable, Iterator, List, Optional, Tuple, TypeVar, Union import numpy as np +import pandas as pd NamedDACTTuple = Tuple[str, 'DataContainer'] IDT = TypeVar('IDT', bound=Iterable) # Ids Type that is often a list of things @@ -236,6 +236,9 @@ def with_di(self, di: DIT) -> 'DACT[IDT, DIT, EOT]': def with_eo(self, eo: EOT) -> 'DACT[IDT, DIT, EOT]': return self.copy().set_expected_outputs(eo) + def with_ids(self, ids: DIT) -> 'DACT[IDT, DIT, EOT]': + return self.copy().set_ids(ids) + def set_ids(self, ids: IDT) -> 'DACT': """ Set ids. @@ -268,7 +271,9 @@ def set_expected_outputs(self, expected_outputs: EOT) -> 'DACT': self.expected_outputs: EOT = expected_outputs return self - def get_ids_summary(self) -> str: + def get_ids_summary(self) -> Optional[str]: + if self._ids is None: + return None return ','.join([str(i) for i in self.ids if i is not None]) def add_sub_data_container(self, name: str, data_container: 'DACT') -> 'DACT': @@ -361,9 +366,9 @@ def minibatches( """ for i in range(0, len(self.data_inputs), batch_size): data_container: DACT[IDT, DIT, EOT] = DACT( - ids=self.ids[i:i + batch_size], - data_inputs=self.di[i:i + batch_size], - expected_outputs=self.eo[i:i + batch_size] + ids=self.ids[i:i + batch_size] if self._ids is not None else None, + data_inputs=self.di[i:i + batch_size] if self.data_inputs is not None else None, + expected_outputs=self.eo[i:i + batch_size] if self.expected_outputs is not None else None ) incomplete_batch = len(data_container.data_inputs) < batch_size @@ -498,29 +503,54 @@ def __iter__(self) -> Iterator[Tuple[IDT, DIT, EOT]]: :return: iterator of tuples containing ids, data_inputs, and expected outputs :rtype: Iterator[Tuple] """ - return zip(self.ids, self.di, self.eo) + if self.data_inputs is None: + return iter(()) + + _ids: Optional[List[DACTData]] = self.ids + _di: Optional[List[DACTData]] = self.di + _eo: Optional[List[DACTData]] = self.eo + if _ids is None or _di is None or _eo is None: + return iter(()) + + return zip(_ids, _di, _eo) def __repr__(self): return str(self) def __str__(self): - di_rep = self._str_data(self.di) - eo_rep = self._str_data(self.eo) + ids = self._ids + di = self.data_inputs + eo = self.expected_outputs + ids_rep = self._str_data(ids) + di_rep = self._str_data(di) + eo_rep = self._str_data(eo) return ( - f"{self.__class__.__name__}(" - f"ids={repr(list(self.ids))}, " - f"di={di_rep}, " - f"eo={eo_rep})" + f"{self.__class__.__name__}[{type(ids).__name__}, {type(di).__name__}, {type(eo).__name__}](\n" + f"\tids={ids_rep},\n" + f"\tdi={di_rep},\n" + f"\teo={eo_rep}\n)" ) def _str_data(self, _idata: DACTData) -> str: - if hasattr(_idata, '__len__'): - if len(_idata) > 10: - _len_rep = ("") if hasattr(self.di, "__len__") else "" - _rep = f"{type(self.di)}{_len_rep}" - else: - _rep = repr(_idata) - return _rep + if _idata is None: + return str(None) + + if len(_idata) > 10 and hasattr(_idata, '__getitem__'): + _shortrepr = repr(_idata[:15]) + else: + _shortrepr = repr(_idata) + _shortrepr = _shortrepr[:70] + ("" if len(_shortrepr) < 70 else "...") + + _len = "len=?" + if isinstance(_idata, pd.DataFrame): + _len = f"shape={_idata.values.shape}" + elif isinstance(_idata, np.ndarray): + _len = f"shape={_idata.shape}" + elif hasattr(_idata, "__len__"): + _len = f"len={len(_idata)}" + + _len_rep = f"<`{_shortrepr}` of {_len}>" + return _len_rep.replace("\n", " ").replace("\t", " ").replace(" ", " ") def __len__(self): return len(self.data_inputs) @@ -537,12 +567,14 @@ def __len__(self): class ExpandedDataContainer(DACT): """ Sub class of DataContainer to expand data container dimension. + This is akin from passing from `shape` to `[1, *shape]` + when using :func:`ExpandedDataContainer.create_from`. .. seealso:: :class:`DataContainer` """ - def __init__(self, data_inputs, ids, expected_outputs, old_ids): + def __init__(self, data_inputs, ids, expected_outputs, _old_ids): DACT.__init__( self, ids=ids, @@ -550,7 +582,24 @@ def __init__(self, data_inputs, ids, expected_outputs, old_ids): eo=expected_outputs, ) - self.old_ids = old_ids + self._old_ids = _old_ids + + @staticmethod + def create_from(data_container: DACT) -> 'ExpandedDataContainer': + """ + Create ExpandedDataContainer with a summary id for the new id. + This is akin from passing from `shape` to `[1, *shape]`. + + :param data_container: data container to transform + :type data_container: DataContainer + :return: expanded data container + """ + return ExpandedDataContainer( + ids=[data_container.get_ids_summary()], + data_inputs=[data_container.data_inputs] if data_container.data_inputs is not None else None, + expected_outputs=[data_container.expected_outputs] if data_container.expected_outputs is not None else None, + _old_ids=data_container._ids + ) def reduce_dim(self) -> 'DACT': """ @@ -564,29 +613,12 @@ def reduce_dim(self) -> 'DACT': 'Invalid Expanded Data Container. Please create ExpandedDataContainer with ExpandedDataContainer.create_from(data_container) method.') return DACT( - data_inputs=self.di[0], - ids=self.old_ids, - expected_outputs=self.eo[0], + ids=self._old_ids, + data_inputs=self.data_inputs[0] if self.data_inputs is not None else None, + expected_outputs=self.expected_outputs[0] if self.expected_outputs is not None else None, sub_data_containers=self.sub_data_containers ) - @staticmethod - def create_from(data_container: DACT) -> 'ExpandedDataContainer': - """ - Create ExpandedDataContainer with a summary id for the new single id. - - :param data_container: data container to transform - :type data_container: DataContainer - :return: expanded data container - :rtype: ExpandedDataContainer - """ - return ExpandedDataContainer( - ids=[data_container.get_ids_summary()], - data_inputs=[data_container.di], - expected_outputs=[data_container.eo], - old_ids=data_container.ids - ) - class ZipDataContainer(DACT): """ @@ -616,12 +648,12 @@ def create_from(data_container: DACT, *other_data_containers: List[DACT], zip_ex expected_outputs = tuple( zip(*map(attrgetter("eo"), [data_container] + list(other_data_containers)))) else: - expected_outputs = data_container.eo + expected_outputs = data_container.expected_outputs return ZipDataContainer( data_inputs=new_data_inputs, expected_outputs=expected_outputs, - ids=data_container.ids, + ids=data_container._ids, sub_data_containers=data_container.sub_data_containers ) @@ -770,21 +802,23 @@ def _pad_incomplete_batch( batch_size=batch_size ) if pad_ids else data_container.ids, data_inputs=_pad_data( - data_container.di, + data_container.data_inputs, default_value=default_value_data_inputs, batch_size=batch_size - ) if pad_di else data_container.di, + ) if pad_di else data_container.data_inputs, expected_outputs=_pad_data( - data_container.eo, + data_container.expected_outputs, default_value=default_value_expected_outputs, batch_size=batch_size - ) if pad_eo else data_container.eo + ) if pad_eo else data_container.expected_outputs, ) return data_container -def _pad_data(data: Iterable, default_value: Any, batch_size: int): +def _pad_data(data: DACTData, default_value: Any, batch_size: int): + if data is None: + return None data_ = [] data_.extend(data) padding = copy.copy([default_value] * (batch_size - len(data))) diff --git a/neuraxle/distributed/streaming.py b/neuraxle/distributed/streaming.py index 62e0571f..c1fc0ec9 100644 --- a/neuraxle/distributed/streaming.py +++ b/neuraxle/distributed/streaming.py @@ -37,14 +37,11 @@ from neuraxle.base import BaseSaver, BaseTransformer from neuraxle.base import ExecutionContext as CX -from neuraxle.base import (MetaStep, MixinForBaseTransformer, NamedStepsList, - NonFittableMixin, _FittableStep) -from neuraxle.data_container import (DACT, DIT, EOT, IDT, ListDataContainer, - PredsDACT, StripAbsentValues) +from neuraxle.base import MetaStep, MixinForBaseTransformer, NamedStepsList, NonFittableMixin, _FittableStep +from neuraxle.data_container import DACT, DIT, EOT, IDT, ListDataContainer, PredsDACT, StripAbsentValues from neuraxle.hyperparams.space import RecursiveDict -from neuraxle.logging.logging import ( - ParallelLoggingConsumerThread, - register_log_producer_for_main_logger_thread_to_consume) +from neuraxle.logging.logging import (ParallelLoggingConsumerThread, + register_log_producer_for_main_logger_thread_to_consume) from neuraxle.pipeline import Joiner, MiniBatchSequentialPipeline, Pipeline from neuraxle.steps.numpy import NumpyConcatenateOuterBatch @@ -614,7 +611,7 @@ def _setup(self, context: CX = None) -> 'BaseTransformer': :return: step :rtype: BaseStep """ - workers_joiner: WorkersJoiner = self[-1] + workers_joiner: WorkersJoiner = self.joiner workers_joiner._setup(context=context) for step in self.body: step: ParallelWorkersWrapper = step @@ -637,7 +634,7 @@ def fit_transform_data_container( :return: """ has_fittable_step: bool = False - for _, step in self[:-1]: + for step in self.body: if isinstance(step.get_step(), _FittableStep) and not isinstance(step.get_step(), NonFittableMixin): has_fittable_step = True @@ -677,6 +674,7 @@ def transform_data_container(self, data_container: DACT, context: CX) -> DACT: step.start(context, logging_queue) # prepare minibatch iterator: + data_container.set_ids(data_container.ids) minibatch_iterator: Iterable[DACT[IDT, DIT, EOT]] = data_container.minibatches( batch_size=self.batch_size, keep_incomplete_batch=self.keep_incomplete_batch, @@ -696,7 +694,7 @@ def transform_data_container(self, data_container: DACT, context: CX) -> DACT: # join output queues. n_workers = self.get_n_workers_to_join() - workers_joiner: WorkersJoiner = self[-1] + workers_joiner: WorkersJoiner = self.joiner workers_joiner.set_join_quantities(n_workers, n_minibatches_per_worker) data_container = workers_joiner.join_workers(data_container, context) @@ -717,7 +715,7 @@ def _did_transform(self, data_container: DACT, context: CX) -> DACT: :return: data container :rtype: DataContainer """ - for name, step in self[:-1]: + for step in self.body: step: ParallelWorkersWrapper = step step.stop() if self.logging_thread is not None: @@ -751,7 +749,7 @@ def _connect_queued_pipeline(self): def _disconnect_queued_pipeline(self): if self.is_pipeline_connected: - workers_joiner: WorkersJoiner = self[-1] + workers_joiner: WorkersJoiner = self.joiner workers_joiner.join() for step in self.body: step: ParallelWorkersWrapper = step @@ -814,7 +812,7 @@ def _dispatch_minibatch_to_consumer_workers(self, minibatch_index: int, task: Qu :param data_container: data container batch :return: """ - workers_joiner: WorkersJoiner = self[-1] + workers_joiner: WorkersJoiner = self.joiner # TODO: extract method. last_consumer_name: str = self.steps_as_tuple[-2][0] @@ -845,8 +843,8 @@ def _connect_queued_pipeline(self): :return: """ if not self.is_pipeline_connected: - joiner_consumer: _ProducerConsumerMixin = self[-1] - for name, producer in self[:-1]: + joiner_consumer: _ProducerConsumerMixin = self.joiner + for producer in self.body: producer: _ProducerConsumerMixin = producer producer.register_consumer(joiner_consumer) self.is_pipeline_connected = True @@ -857,10 +855,10 @@ def _dispatch_minibatch_to_consumer_workers(self, minibatch_index: int, task: Qu to each of the workers that will work in parallel to consume the same copy sent to all. """ # TODO: task index in task. - workers_joiner: WorkersJoiner = self[-1] - for name, consumer in self[:-1]: - workers_joiner.append_terminal_summary(name, task) - consumer: _ProducerConsumerMixin = consumer + workers_joiner: WorkersJoiner = self.joiner + for consumer in self.body: + consumer: ParallelWorkersWrapper = consumer + workers_joiner.append_terminal_summary(consumer.get_name(), task) consumer.put_minibatch_produced(task) diff --git a/neuraxle/logging/logging.py b/neuraxle/logging/logging.py index edfdeeb5..af0a7f65 100644 --- a/neuraxle/logging/logging.py +++ b/neuraxle/logging/logging.py @@ -57,6 +57,10 @@ def filter(self, record): class NeuraxleLogger(logging.Logger): + @staticmethod + def root() -> 'NeuraxleLogger': + return NeuraxleLogger(NEURAXLE_LOGGER_NAME) + @staticmethod def from_identifier(identifier: str) -> 'NeuraxleLogger': """ diff --git a/neuraxle/metaopt/auto_ml.py b/neuraxle/metaopt/auto_ml.py index ea67f511..bdea6e38 100644 --- a/neuraxle/metaopt/auto_ml.py +++ b/neuraxle/metaopt/auto_ml.py @@ -29,10 +29,11 @@ from copy import copy from typing import ContextManager, Iterator, List, Optional, Tuple -from neuraxle.base import (CX, BaseService, BaseStep, BaseStepT, ExecutionContext, ForceHandleMixin, TruncableService, - _HasChildrenMixin) -from neuraxle.data_container import IDT +from neuraxle.base import (CX, BaseService, BaseStep, BaseStepT, ExecutionContext, ExecutionMode, ForceHandleMixin, + TruncableService, _HasChildrenMixin) +from neuraxle.data_container import ARG_X_INPUTTED, IDT from neuraxle.data_container import DataContainer as DACT +from neuraxle.data_container import TrainDACT from neuraxle.hyperparams.space import HyperparameterSpace from neuraxle.metaopt.callbacks import (ARG_Y_EXPECTED, ARG_Y_PREDICTD, BaseCallback, CallbackList, MetricCallback, ScoringCallback) @@ -48,9 +49,15 @@ class Trainer(BaseService): """ Class used to train a pipeline using various data splits and callbacks for evaluation purposes. + It loops on splits that the splitter yields, and on epochs as well, to train and validate the pipeline + with the given metrics and other callbacks. + + If the predicted expected output of a pipeline's prediction dact is not empty, then it will be used + in the metrics instead of using the fed dacts' expected output. This is to allow for the use of + autoregressive models, where the expected output is not known at the time of sending it to the model at train-time, but is known for the least at validation time as per the validation splitter. """ # TODO: add this `with_val_set` method that would change splitter to - # PresetValidationSetSplitter(self, val) and override. + # PresetValidationSetSplitter(self, val) and override..? def __init__( self, @@ -101,32 +108,51 @@ def train_split( """ Train a pipeline split. You probably want to use `self.train` instead, to use the validation splitter. If validation DACT is None, the evaluation metrics will not save validation results. + + It is to be noted that here, if the data container, after a prediction at train and validation time, + has an empty expected output (of .expected_outputs of length 0 or that is None), then the + trainer will pick the expected output of the pre-predicted data container that was fed as an input. """ trial_split_scope: TrialSplit = trial_split_scope.with_n_epochs(self.n_epochs) p: BaseStep = pipeline._copy(trial_split_scope.context, deep=True) p.set_hyperparams(trial_split_scope.get_hyperparams()) + context: AutoMLContext = trial_split_scope.context for _ in range(self.n_epochs): e = trial_split_scope.next_epoch() # Fit train p = p.set_train(True) + context = context.train() p = p.handle_fit( train_dact.copy(), - trial_split_scope.context.train()) + context + ) # Predict train & val p = p.set_train(False) + context = context.validation() eval_dact_train = p.handle_predict( train_dact.without_eo(), - trial_split_scope.context.validation()) - eval_dact_train: DACT[IDT, ARG_Y_PREDICTD, ARG_Y_EXPECTED] = eval_dact_train.with_eo(train_dact.eo) + context + ) + eval_dact_train: DACT[IDT, ARG_Y_PREDICTD, ARG_Y_EXPECTED] = eval_dact_train + _has_empty_eo = eval_dact_train.expected_outputs is None or (hasattr( + eval_dact_train.expected_outputs, "__len__") and len(eval_dact_train.expected_outputs) == 0) + if _has_empty_eo or self.validation_splitter.force_fixed_metric_expected_outputs is True: + eval_dact_train = eval_dact_train.with_eo(train_dact.expected_outputs) if val_dact is not None: + context = context.validation() eval_dact_valid = p.handle_predict( val_dact.without_eo(), - trial_split_scope.context.validation()) - eval_dact_valid: DACT[IDT, ARG_Y_PREDICTD, ARG_Y_EXPECTED] = eval_dact_valid.with_eo(val_dact.eo) + context + ) + eval_dact_valid: DACT[IDT, ARG_Y_PREDICTD, ARG_Y_EXPECTED] = eval_dact_valid + _has_empty_eo = eval_dact_valid.expected_outputs is None or (hasattr( + eval_dact_valid.expected_outputs, "__len__") and len(eval_dact_valid.expected_outputs) == 0) + if _has_empty_eo or self.validation_splitter.force_fixed_metric_expected_outputs is True: + eval_dact_valid = eval_dact_valid.with_eo(val_dact.expected_outputs) else: eval_dact_valid = None @@ -276,14 +302,18 @@ class ControlledAutoML(ForceHandleMixin, _HasChildrenMixin[BaseStepT], BaseStep) automatically split the data into train and validation splits, and execute an hyperparameter optimization on the splits to find the best hyperparameters. - The Controller Loop is useful to possibly split the execution into multiple - threads, or even multiple machines. + The :class:`BaseControllerLoop` is useful to possibly split the execution into multiple + threads, or even multiple machines to decide how to execute the loop. - The Trainer is responsible for training the pipeline on the train and validation - splits as splitted. + The :class:`Trainer` is responsible for training the pipeline on the train and validation + splits, as per the data split provided by the splitter, and the predicted data containers. - The step with the chosen good hyperparameters will be refitted to the full - unsplitted data if desired. + It is to be noted that if the data container, after a prediction at train and validation time, + has an empty expected output (of .expected_outputs of length 0 or that is None), then the + trainer will pick the expected output of the pre-predicted data container that was fed as an input. + + The step with the chosen best hyperparameters will be optionnally refitted to the full + unsplitted data (pre-split data) if desired, and will be useable using :func:`refit_best_trial`. """ def __init__( @@ -348,6 +378,7 @@ def to_force_refit_best_trial(self) -> 'ControlledAutoML': self_copy = copy(self) self_copy.refit_best_trial = True self_copy.has_model_been_retrained = False + self_copy.start_new_round = False self_copy.loop = self_copy.loop.for_refit_only() return self_copy @@ -390,6 +421,29 @@ def get_automl_context(self, context: ExecutionContext, with_loc=True) -> AutoML cx = cx.with_loc(loc) return cx + def _encapsulate_data( + self, data_inputs: ARG_X_INPUTTED, expected_outputs: ARG_Y_EXPECTED, execution_mode: ExecutionMode + ) -> Tuple[CX, TrainDACT]: + """ + This method is overriden from :class:`ForceHandleMixin` to encapsulate + the repository in a AutoMLContext instead of in a regular CX in case the + handler methods were not called but the repository was passed at construction. + """ + data_container = TrainDACT(data_inputs=data_inputs, expected_outputs=expected_outputs) + context = CX(execution_mode=execution_mode) + context = self.get_automl_context(context) + return context, data_container + + def get_best_model(self) -> BaseStep: + """ + Get the best model if it has been refit, otherwise raises an assertion error. + """ + self._assert( + self.has_model_been_retrained, + "The model has not been retrained, so it cannot be used to get the best model." + ) + return self.pipeline + @property def round_number(self) -> Optional[int]: if self._round_number is None: @@ -401,7 +455,8 @@ def round_number(self) -> Optional[int]: @property def report(self) -> RoundReport: - dc: RoundDataclass = self.repo.load(ScopedLocation(self.project_name, self.client_name, self.round_number), deep=True) + dc: RoundDataclass = self.repo.load(ScopedLocation( + self.project_name, self.client_name, self.round_number), deep=True) return RoundReport(dc) def _transform_data_container(self, data_container: DACT, context: CX) -> DACT: @@ -418,6 +473,10 @@ class AutoML(ControlledAutoML): This class provides a nice interface to easily use the ControlledAutoML class and the metaopt module in general. + It is a wrapper around the :class:`ControlledAutoML` class, which is a wrapper around + the AutoML loop contained in the :class:`BaseControllerLoop` class + and the :class:`Trainer` class all contained in this module. + :param pipeline: pipeline to copy and use for training :param validation_splitter: validation splitter to use :param refit_best_trial: whether to refit the best model on the whole dataset after the optimization diff --git a/neuraxle/metaopt/context.py b/neuraxle/metaopt/context.py index 72ba2e34..51f9c854 100644 --- a/neuraxle/metaopt/context.py +++ b/neuraxle/metaopt/context.py @@ -30,7 +30,7 @@ import logging -from neuraxle.base import ExecutionContext as CX +from neuraxle.base import ExecutionContext as CX, ExecutionPhase from neuraxle.logging.logging import NeuraxleLogger from neuraxle.metaopt.data.vanilla import (BaseDataclass, ScopedLocation, SubDataclassT) @@ -49,19 +49,20 @@ def logger(self) -> NeuraxleLogger: def logger_at_scoped_loc(self) -> NeuraxleLogger: return logging.getLogger(self.get_identifier(include_step_names=False)) - def add_scoped_logger_file_handler(self) -> NeuraxleLogger: + def add_scoped_logger_file_handler(self) -> 'AutoMLContext': """ Add a file handler to the logger at the current scoped location to capture logs at this scope and below this scope. """ - self.repo.add_logging_handler(self.logger_at_scoped_loc, self.loc) + return self - def free_scoped_logger_file_handler(self): + def free_scoped_logger_file_handler(self) -> 'AutoMLContext': """ Remove file handlers from logger to free file lock (especially on Windows). """ self.logger_at_scoped_loc.without_file_handler() + return self def read_scoped_log(self) -> str: """ @@ -70,12 +71,44 @@ def read_scoped_log(self) -> str: # TODO: with self.lock: return self.repo.get_log_from_logging_handler(self.logger, self.loc) - def _copy(self): - copy_kwargs = self._get_copy_kwargs() + def _copy(self, copy_func: str = '_copy'): + copy_kwargs = self._get_copy_kwargs(copy_func) return AutoMLContext(**copy_kwargs) - def _get_copy_kwargs(self): - return super()._get_copy_kwargs() + def _get_copy_kwargs(self, copy_func: str): + return CX._get_copy_kwargs(self, copy_func) + + def new_trial(self) -> 'CX': + """ + Set the context's execution phase to train. + """ + new_self = self._copy(copy_func='_copy_trial') + new_self.set_execution_phase(ExecutionPhase.PRETRAIN) + return new_self + + def new_trial_split(self) -> 'CX': + """ + Set the context's execution phase to train. + """ + new_self = self._copy(copy_func='_copy_trial_split') + new_self.set_execution_phase(ExecutionPhase.PRETRAIN) + return new_self + + def train(self) -> 'CX': + """ + Set the context's execution phase to train. + """ + new_self = self._copy(copy_func='_copy_train') + new_self.set_execution_phase(ExecutionPhase.TRAIN) + return new_self + + def validation(self) -> 'CX': + """ + Set the context's execution phase to validation. + """ + new_self = self._copy(copy_func='_copy_validation') + new_self.set_execution_phase(ExecutionPhase.VALIDATION) + return new_self @staticmethod def from_context( diff --git a/neuraxle/metaopt/data/aggregates.py b/neuraxle/metaopt/data/aggregates.py index a29cd38a..97d1987d 100644 --- a/neuraxle/metaopt/data/aggregates.py +++ b/neuraxle/metaopt/data/aggregates.py @@ -112,7 +112,7 @@ def __init__(self, _dataclass: SubDataclassT, context: AutoMLContext, is_deep=Fa self._dataclass: SubDataclassT = _dataclass self._spare: SubDataclassT = copy.copy(_dataclass).shallow() # TODO: pre-push context to allow for dc auto-loading and easier parent auto-loading? - self.context: AutoMLContext = context.push_attr(_dataclass) + self.context: AutoMLContext = context.push_attr(_dataclass).add_scoped_logger_file_handler() self.loc: ScopedLocation = self.context.loc._copy() self.is_deep = is_deep self._parent: ParentAggregateT = parent @@ -293,9 +293,6 @@ def __enter__(self) -> SubAggregateT: # self.context.free_scoped_logger_handler_file() self._invariant() self._managed_resource._invariant() - with self.repo.lock: # TODO: locking twice, not needed. - self._managed_resource.context.add_scoped_logger_file_handler() - return self._managed_resource def __exit__( @@ -348,6 +345,7 @@ def _release_managed_subresource(self, resource: SubAggregateT, e: Exception = N with self.repo.lock: self.refresh(self.is_deep) self.save(False) # TODO: is this bad? + handled_error = e is None return handled_error @@ -559,7 +557,7 @@ def _acquire_managed_subresource(self, new_trial: Optional[bool] = True, continu else: self.flow.log_retraining(trial_id, _trial_dataclass.hyperparams) - subagg: Trial = Trial(_trial_dataclass, self.context, is_deep=True) + subagg: Trial = Trial(_trial_dataclass, self.context.new_trial(), is_deep=True) if continue_on_error: subagg.continue_loop_on_error() return subagg @@ -687,7 +685,7 @@ def _acquire_managed_subresource(self, continue_loop_on_error: bool, retrain_spl _split_dataclass: TrialSplitDataclass = self.repo.load(split_loc) _split_dataclass.hyperparams = self.get_hyperparams() - subagg: TrialSplit = TrialSplit(_split_dataclass, self.context, is_deep=True) + subagg: TrialSplit = TrialSplit(_split_dataclass, self.context.new_trial_split(), is_deep=True) return subagg def _release_managed_subresource(self, resource: 'TrialSplit', e: Exception = None) -> bool: diff --git a/neuraxle/metaopt/repositories/db.py b/neuraxle/metaopt/repositories/db.py index 6598e692..57ba5d28 100644 --- a/neuraxle/metaopt/repositories/db.py +++ b/neuraxle/metaopt/repositories/db.py @@ -509,6 +509,8 @@ def get_log_from_logging_handler(self, logger: NeuraxleLogger, scope: ScopedLoca class DatabaseHyperparamRepository(_DatabaseLoggerHandlerMixin, HyperparamsRepository): def __init__(self, engine, session): + HyperparamsRepository.__init__(self) + _DatabaseLoggerHandlerMixin.__init__(self) self.engine = engine self.session = session @@ -553,7 +555,7 @@ def __init__(self, sqllite_db_path, echo=False): _Session.configure(bind=engine) session = _Session() - super().__init__(engine, session) + DatabaseHyperparamRepository.__init__(self, engine, session) self.create_db() diff --git a/neuraxle/metaopt/repositories/json.py b/neuraxle/metaopt/repositories/json.py index 38deb45a..896a8606 100644 --- a/neuraxle/metaopt/repositories/json.py +++ b/neuraxle/metaopt/repositories/json.py @@ -58,12 +58,13 @@ def add_logging_handler(self, logger: NeuraxleLogger, scope: ScopedLocation) -> Adds an on-disk logging handler to the repository. The file at this scope can be retrieved with the method :func:`get_scoped_logger_path`. """ - logging_file = self.get_scoped_logger_path(scope) - os.makedirs(os.path.dirname(logging_file), exist_ok=True) + logging_file = self._create_scoped_logger_path(scope) logger.with_file_handler(logging_file) return self def get_log_from_logging_handler(self, logger: NeuraxleLogger, scope: ScopedLocation) -> str: + logging_file = self._create_scoped_logger_path(scope) + logger.with_file_handler(logging_file) return ''.join(logger.read_log_file()) def get_folder_at_scope(self, scope: ScopedLocation) -> str: @@ -71,9 +72,11 @@ def get_folder_at_scope(self, scope: ScopedLocation) -> str: _scope_attrs = [ON_DISK_DELIM + s for s in _scope_attrs] return os.path.join(self.cache_folder, *_scope_attrs) - def get_scoped_logger_path(self, scope: ScopedLocation) -> str: + def _create_scoped_logger_path(self, scope: ScopedLocation) -> str: scoped_path: str = self.get_folder_at_scope(scope) - return os.path.join(scoped_path, 'log.txt') + logging_file = os.path.join(scoped_path, 'log.txt') + os.makedirs(os.path.dirname(logging_file), exist_ok=True) + return logging_file class HyperparamsOnDiskRepository(_OnDiskRepositoryLoggerHandlerMixin, HyperparamsRepository): diff --git a/neuraxle/metaopt/validation.py b/neuraxle/metaopt/validation.py index 9c53ff11..5bf644c9 100644 --- a/neuraxle/metaopt/validation.py +++ b/neuraxle/metaopt/validation.py @@ -38,6 +38,29 @@ class BaseValidationSplitter(ABC): + + def __init__(self, force_fixed_metric_expected_outputs: bool = False): + """ + :param force_fixed_metric_expected_outputs: If True, the expected outputs provided at split time are used to compute the metric instead of their possibly modified version after passing through the pipeline. More info in the documentation of :func:`set_to_force_expected_outputs_for_scoring`. + """ + self.force_fixed_metric_expected_outputs: bool = False + + def set_to_force_expected_outputs_for_scoring(self) -> 'BaseValidationSplitter': + """ + Set self.force_fixed_metric_expected_outputs to True. + + Use this in case you do not want the pipeline to be able to + affect the Y (expected_output) value throughout the fit or transform process. This is to have a way to + force using the provided expected output for the calculation of metrics in the Trainer's epochs loop. + + Do not use this when the pipeline can change the expected_outputs, for instance within an autoencoder + that would split a time series and set its own expected output inside the pipeline, such as where the + initial expected_output would be none at split time, and then would be computed on the fly through the + pipeline and would be expected to be used for the metrics after this computation. + """ + self.force_fixed_metric_expected_outputs = True + return self + def split_dact(self, data_container: DACT, context: CX) -> FoldsList[Tuple[TrainDACT, ValidDACT]]: """ Wrap a validation split function with a split data container function. @@ -107,6 +130,7 @@ class ValidationSplitter(BaseValidationSplitter): """ def __init__(self, validation_size: float): + BaseValidationSplitter.__init__(self) self.validation_size = validation_size def split( diff --git a/neuraxle/pipeline.py b/neuraxle/pipeline.py index 60c3ac9f..6de24d52 100644 --- a/neuraxle/pipeline.py +++ b/neuraxle/pipeline.py @@ -27,14 +27,12 @@ from abc import ABC, abstractmethod from typing import Any, List, Tuple, Union -from neuraxle.base import BaseStep, BaseTransformer +from neuraxle.base import BaseStep from neuraxle.base import ExecutionContext as CX -from neuraxle.base import (ExecutionMode, ForceHandleMixin, Identity, - NamedStepsList, TruncableSteps, - _CustomHandlerMethods) -from neuraxle.data_container import StripAbsentValues +from neuraxle.base import (ExecutionMode, ForceHandleMixin, Identity, NamedStepsList, TruncableSteps, + _CustomHandlerMethods, _TruncableServiceWithBodyMixin) from neuraxle.data_container import DataContainer as DACT -from neuraxle.data_container import ListDataContainer, ZipDataContainer +from neuraxle.data_container import ListDataContainer, StripAbsentValues, ZipDataContainer from neuraxle.logging.warnings import warn_deprecated_arg @@ -212,7 +210,7 @@ def _inverse_transform_data_container( return data_container -class MiniBatchSequentialPipeline(_CustomHandlerMethods, ForceHandleMixin, Pipeline): +class MiniBatchSequentialPipeline(_TruncableServiceWithBodyMixin, _CustomHandlerMethods, ForceHandleMixin, Pipeline): """ Mini Batch Sequential Pipeline class to create a pipeline processing data inputs in batch. @@ -335,6 +333,9 @@ def __init__( ): Pipeline.__init__(self, steps=steps) ForceHandleMixin.__init__(self) + _CustomHandlerMethods.__init__(self) + _TruncableServiceWithBodyMixin.__init__(self) + self.default_value_data_inputs = default_value_data_inputs self.default_value_expected_outputs = default_value_expected_outputs self._validate_barriers_batch_size(batch_size=batch_size) @@ -398,14 +399,6 @@ def _patch_missing_barrier( self._refresh_steps() - @property - def joiner(self) -> 'Barrier': - return self[-1] - - @property - def body(self) -> List[BaseTransformer]: - return list(self.values())[:-1] - def transform_data_container(self, data_container: DACT, context: CX) -> DACT: """ Transform all sub pipelines splitted by the Barrier steps. @@ -416,7 +409,7 @@ def transform_data_container(self, data_container: DACT, context: CX) -> DACT: sub_pipelines: List['MiniBatchSequentialPipeline'] = self._split_on_barriers() for sub_pipeline in sub_pipelines: - barrier: Barrier = sub_pipeline[-1] + barrier: Barrier = sub_pipeline.joiner data_container = barrier.join_transform( step=sub_pipeline, data_container=data_container, diff --git a/neuraxle/steps/data.py b/neuraxle/steps/data.py index 8aba8e02..94d50431 100644 --- a/neuraxle/steps/data.py +++ b/neuraxle/steps/data.py @@ -29,10 +29,10 @@ from neuraxle.data_container import DACT, _inner_concatenate_np_array from neuraxle.pipeline import Pipeline from neuraxle.steps.flow import TrainOnlyWrapper -from neuraxle.steps.output_handlers import InputAndOutputTransformerMixin +from neuraxle.steps.output_handlers import IdsAndInputAndOutputTransformerMixin -class DataShuffler(InputAndOutputTransformerMixin, BaseTransformer): +class DataShuffler(IdsAndInputAndOutputTransformerMixin, BaseTransformer): """ Data Shuffling step that shuffles data inputs, and expected_outputs at the same time. @@ -55,7 +55,7 @@ class DataShuffler(InputAndOutputTransformerMixin, BaseTransformer): def __init__(self, seed=None, increment_seed_after_each_fit=True): BaseTransformer.__init__(self) - InputAndOutputTransformerMixin.__init__(self) + IdsAndInputAndOutputTransformerMixin.__init__(self) if seed is None: seed = 42 self.seed = seed @@ -71,13 +71,17 @@ def transform(self, data_inputs): if self.increment_seed_after_each_fit: self.seed += 1 - di, eo = data_inputs - data = list(zip(di, eo)) - random.Random(self.seed).shuffle(data) + ids, di, eo = data_inputs + + ids_is_none = (ids is None) + if ids_is_none: + ids = range(len(di)) - data_inputs_shuffled, expected_outputs_shuffled = list(zip(*data)) + data = list(zip(ids, di, eo)) + random.Random(self.seed).shuffle(data) + ids_shuffled, data_inputs_shuffled, expected_outputs_shuffled = list(zip(*data)) - return list(data_inputs_shuffled), list(expected_outputs_shuffled) + return ids_shuffled, list(data_inputs_shuffled), list(expected_outputs_shuffled) class EpochRepeater(ForceHandleOnlyMixin, MetaStep): @@ -398,4 +402,3 @@ def _batch_zip_sub_data_container(self, data_container, data_container_to_zip) - data_container.set_expected_outputs(new_expected_outputs) return data_container - diff --git a/neuraxle/steps/flow.py b/neuraxle/steps/flow.py index 7bc28ba0..e0791d87 100644 --- a/neuraxle/steps/flow.py +++ b/neuraxle/steps/flow.py @@ -39,14 +39,14 @@ from operator import attrgetter from typing import Callable, Dict, Optional, Tuple, Union -from neuraxle.base import (CX, DACT, BaseStep, BaseTransformer, ExecutionPhase, - ForceHandleOnlyMixin, HandleOnlyMixin, MetaStep, - NonFittableMixin, TransformHandlerOnlyMixin, - TruncableSteps) +from neuraxle.base import BaseStep, BaseTransformer +from neuraxle.base import ExecutionContext as CX +from neuraxle.base import (ExecutionPhase, ForceHandleOnlyMixin, HandleOnlyMixin, MetaStep, NonFittableMixin, + TransformHandlerOnlyMixin, TruncableSteps) +from neuraxle.data_container import DataContainer as DACT from neuraxle.data_container import ExpandedDataContainer from neuraxle.hyperparams.distributions import Boolean, Choice -from neuraxle.hyperparams.space import (HyperparameterSamples, - HyperparameterSpace) +from neuraxle.hyperparams.space import HyperparameterSamples, HyperparameterSpace from neuraxle.steps.numpy import NumpyConcatenateOnAxisIfNotEmpty from neuraxle.union import FeatureUnion @@ -393,20 +393,20 @@ class ChooseOneStepOf(FeatureUnion): :class:`Optional` """ - def __init__(self, steps, hyperparams=None): + def __init__(self, steps, default_choice=None): FeatureUnion.__init__(self, steps, joiner=SelectNonEmptyDataContainer()) self._make_all_steps_optional() choices = list(self.keys())[:-1] - if hyperparams is None: + if default_choice is None: self.update_hyperparams({ ChooseOneStepOf.CHOICE_HYPERPARAM: choices[0] }) else: self.update_hyperparams({ - ChooseOneStepOf.CHOICE_HYPERPARAM: hyperparams + ChooseOneStepOf.CHOICE_HYPERPARAM: default_choice }) self.update_hyperparams_space({ ChooseOneStepOf.CHOICE_HYPERPARAM: Choice(choices) @@ -545,7 +545,7 @@ def _transform_data_container(self, data_container: DACT, context: CX): data_inputs = data_inputs[0] data_container = DACT(data_inputs=data_inputs, ids=data_container.ids, - expected_outputs=data_container.expected_outputs) + expected_outputs=data_container.expected_outputs) return data_container @@ -566,17 +566,17 @@ def __init__(self): def _transform_data_container(self, data_container: DACT, context: CX): - data_containers = list(filter( - lambda dc: (len(dc.di) > 0 and len(dc.eo) > 0), + filtered_data_containers = list(filter( + lambda dc: (len(dc.data_inputs) > 0 or len(dc.expected_outputs) > 0), data_container.data_inputs )) - if len(data_containers) == 1: - return data_containers[0] + if len(filtered_data_containers) == 1: + return filtered_data_containers[0] else: return DACT( - ids=data_container.ids, - di=list(map(attrgetter("di"))), - eo=list(map(attrgetter("eo"))), + ids=data_container._ids, + di=list(map(attrgetter("data_inputs"))), + eo=list(map(attrgetter("expected_outputs"))), ) @@ -584,6 +584,10 @@ class ExpandDim(MetaStep): """ Similar to numpys expand_dim function, ExpandDim step expands the dimension of all the data inside the data container. ExpandDim sends the expanded data container to the wrapped step. + + This is akin from passing the dact data from `shape` to `[1, *shape]` within the wrapped step, + to then by default go back to the original shape (optional). The ids will now contain a summary id temporarily. + ExpandDim returns the transformed expanded dim reduced to its original shape (see :func:`~neuraxle.steps.loop.ExpandedDataContainer.reduce_dim`). The wrapped step will receive a single current_id, data_input, and expected output: @@ -592,23 +596,23 @@ class ExpandDim(MetaStep): - The expected_outputs is a list of one element that contains the original expected outputs list. .. seealso:: - :class:`~neuraxle.base.ForceAlwaysHandleMixin`, - :class:`~neuraxle.base.MetaStepMixin`, - :class:`~neuraxle.base.BaseStep` - :class:`~neuraxle.base.BaseHasher` :class:`~neuraxle.data_container.ExpandedDataContainer` """ - def __init__(self, wrapped: BaseTransformer): + def __init__(self, wrapped: BaseTransformer, then_unexpand: bool = True): MetaStep.__init__(self, wrapped) - def _will_process(self, data_container, context): + self.then_unexpand: bool = then_unexpand + + def _will_process(self, data_container: DACT, context: CX) -> Tuple[ExpandedDataContainer, CX]: data_container, context = BaseStep._will_process(self, data_container, context) return ExpandedDataContainer.create_from(data_container), context - def _did_process(self, data_container: DACT, context: CX): - data_container = super()._did_process(data_container, context) - return data_container.reduce_dim() + def _did_process(self, data_container: ExpandedDataContainer, context: CX) -> DACT: + data_container: ExpandedDataContainer = super()._did_process(data_container, context) + if self.then_unexpand: + data_container = data_container.reduce_dim() + return data_container class ReversiblePreprocessingWrapper(HandleOnlyMixin, TruncableSteps): diff --git a/neuraxle/steps/loop.py b/neuraxle/steps/loop.py index ab4a70d2..bd3d1f9b 100644 --- a/neuraxle/steps/loop.py +++ b/neuraxle/steps/loop.py @@ -24,12 +24,11 @@ """ import copy from operator import itemgetter -from typing import Callable, List, Tuple +from typing import Callable, List, Optional, Tuple, Union -from neuraxle.base import (CX, DACT, BaseStep, BaseTransformer, - ForceHandleMixin, ForceHandleOnlyMixin, Identity, +from neuraxle.base import (CX, DACT, BaseStep, BaseTransformer, ForceHandleMixin, ForceHandleOnlyMixin, Identity, MetaStep, NamedStepsList, TruncableJoblibStepSaver) -from neuraxle.data_container import ListDataContainer +from neuraxle.data_container import DIT, EOT, IDT, DACTData, ListDataContainer from neuraxle.steps.flow import ExecuteIf import numpy as np @@ -315,20 +314,16 @@ def keys(self): return list(map(itemgetter(0), self.steps_as_tuple)) +lens = int # Lengths of the dact data items that was pre-flattening + + class FlattenForEach(ForceHandleMixin, MetaStep): """ Step that reduces a dimension instead of manually looping on it. - .. seealso:: - :class:`~neuraxle.base.BaseStep`, - :class:`~neuraxle.base.BaseSaver`, - :class:`~neuraxle.base.BaseHasher`, - :class:`~neuraxle.base.MetaStepMixin`, - :class:`~neuraxle.base.NonTransformableMixin`, - :class:`~neuraxle.pipeline.Pipeline`, - :class:`~neuraxle.hyperparams.space.HyperparameterSamples`, - :class:`~neuraxle.hyperparams.space.HyperparameterSpace`, - :class:`~neuraxle.data_container.DataContainer` + Using this step is equivalent to doing `sum(dact_data, [])` for each dact_data + that might be a IDT, DIT, or EOT of a DACT (DAtaConTainer) to flatten the data, + then the data is by default unflattened at the end of the loop in _did_process. """ def __init__( @@ -341,94 +336,129 @@ def __init__( self.then_unflatten = then_unflatten - self.len_di = [] - self.len_eo = [] - self.len_ids = [] + self.spare_ids: Optional[IDT] = None + # Lengths temporarily stored in _will_process to be able to unflatten the data container in _did_process: + self.len_ids: List[lens] = [] + self.len_di: List[lens] = [] + self.len_eo: List[lens] = [] def _will_process( - self, data_container: DACT, context: CX - ) -> Tuple['BaseStep', DACT]: + self, data_container: DACT[Optional[Union[IDT, List[IDT]]], Optional[List[DIT]], Optional[List[EOT]]], context: CX + ) -> Tuple['BaseTransformer', DACT]: """ - Flatten data container before any processing is done on the wrapped step. - - :param data_container: data container to flatten - :param context: execution context - :return: (data container, execution context) + Flatten data container before any processing is done on the wrapped step, using lists. """ data_container, context = super()._will_process(data_container, context) - if data_container.expected_outputs is None: - expected_outputs = np.empty_like(np.array(data_container.data_inputs)) - expected_outputs.fill(np.nan) - data_container.set_expected_outputs(expected_outputs) - di, self.len_di = self._flatten_list(data_container.data_inputs) - _id, self.len_ids = self._flatten_list(data_container.ids) eo, self.len_eo = self._flatten_list(data_container.expected_outputs) + # If is ID and not iterable nested thing, treat them as a special case that replicates the DIT: + if data_container._ids is not None and len(self.len_di) > 0 and all(isinstance(i, (str, int)) for i in data_container._ids): + # TODO: this code could be put inside the _flatten_list function to avoid duplicating it, + # but it would be more complicated to implement as we'd need to consider the len_di + # for the ids and eo and add a spare_eo as well. + self.spare_ids = data_container._ids + _ids: List[Union[int, str]] = sum([ + copy.deepcopy([_id] * _count) + for _id, _count + in zip(self.spare_ids, self.len_di) + ], []) + self.len_ids = copy.copy(self.len_di) + else: + self.spare_ids = None + _ids, self.len_ids = self._flatten_list(data_container._ids) + flattened_data_container = DACT( + ids=_ids, data_inputs=di, - ids=_id, expected_outputs=eo, sub_data_containers=data_container.sub_data_containers ) + self._invariant(data_container, flattened_data_container) + return flattened_data_container, context - def _flatten_list(self, list_to_flatten): + def _flatten_list(self, _data: Union[Optional[DACTData], List[DACTData]]) -> Tuple[Optional[DACTData], List[lens]]: """ Flatten the first dimension of a list. :param list_to_flatten: list to flatten :return: flattened list, len flattened lists """ - if not isinstance(list_to_flatten, np.ndarray): - list_to_flatten = np.array(list_to_flatten) + if _data is None or all(v is None for v in _data): + return None, [] - if len(list_to_flatten.shape) == 1: - return list_to_flatten, [1 for x in list_to_flatten] + if len(_data) != 0: + try: + iter(_data) + except TypeError: + return _data, [1 for x in _data] - list_to_flatten = list(list_to_flatten) - list_to_flatten = [list(x) for x in list_to_flatten] - len_list_to_flatten = [len(x) for x in list_to_flatten] - flattened_list = sum(list_to_flatten, []) + _data = [list(x) for x in _data] + len_list_to_flatten = [len(x) for x in _data] + flattened_list = sum(_data, []) return flattened_list, len_list_to_flatten def _did_process(self, data_container: DACT, context: CX) -> DACT: """ - Reaugment the flattened data container. - - :param data_container: data container to then_unflatten - :param context: execution context - :return: data container + Reaugment the flattened data container back to its full original shape using lists. """ - data_container = super()._did_process(data_container, context) + reaug_dact = super()._did_process(data_container, context).copy() if self.then_unflatten: - data_container.set_data_inputs(self._reaugment_list(data_container.data_inputs, self.len_di)) - data_container.set_expected_outputs(self._reaugment_list(data_container.expected_outputs, self.len_eo)) - data_container.set_ids(self._reaugment_list(data_container.ids, self.len_ids)) - self.len_di = [] - self.len_eo = [] + if reaug_dact._ids is not None and self.spare_ids is None: + reaug_dact.set_ids(self._reaugment_list(reaug_dact._ids, self.len_ids)) + else: + reaug_dact.set_ids(self.spare_ids) + if reaug_dact.data_inputs is not None: + reaug_dact.set_data_inputs(self._reaugment_list(reaug_dact.data_inputs, self.len_di)) + if reaug_dact.expected_outputs is not None: + reaug_dact.set_expected_outputs(self._reaugment_list(reaug_dact.expected_outputs, self.len_eo)) + + self._invariant(reaug_dact, data_container) + + self.spare_ids = None + self.len_ids = [] + self.len_di = [] + self.len_eo = [] - return data_container + return reaug_dact - def _reaugment_list(self, list_to_reaugment, flattened_dimension_lengths): + def _reaugment_list(self, _data: DACTData, flattened_dims_lengths: List[lens]) -> List[DACTData]: """ Reaugment list with the flattened dimension lengths. - - :param list_to_reaugment: list to then_unflatten - :return: reaugmented numpy array """ - if not self.then_unflatten or list_to_reaugment is None: - return list_to_reaugment + if not self.then_unflatten or _data is None: + return _data - reaugmented_list = [] + reaugmented_list: List[DACTData] = [] i = 0 - for list_length in flattened_dimension_lengths: - sub_list = list_to_reaugment[i:i + list_length] + for list_length in flattened_dims_lengths: + sub_list: DACTData = _data[i:i + list_length] reaugmented_list.append(sub_list) i += list_length return reaugmented_list + + def _invariant(self, augmented_dact, flattened_dact): + """ + Data consitency checks. + """ + _raise = False + if flattened_dact.di is not None: + if flattened_dact.eo is not None and len(flattened_dact.di) != len(flattened_dact.eo): + if all(v is None for v in flattened_dact.eo): + flattened_dact.eo = [None] * len(flattened_dact.di) + else: + _raise = True + if flattened_dact._ids is not None and len(flattened_dact.di) != len(flattened_dact._ids): + _raise = True + if _raise: + raise ValueError( + f"FlattenForEach: Cannot flatten or unflatten data properly. Expected outputs has a " + f"different len than data inputs for flattened DACT: {flattened_dact}, and for " + f"augmented DACT: {augmented_dact}." + ) diff --git a/neuraxle/steps/misc.py b/neuraxle/steps/misc.py index cbfd7279..e30275d4 100644 --- a/neuraxle/steps/misc.py +++ b/neuraxle/steps/misc.py @@ -26,15 +26,16 @@ import random import time -from abc import ABC -from typing import Any, Callable, List, Optional, Tuple import uuid +from abc import ABC +from typing import Any, Callable, List, Optional, Tuple, Union -from neuraxle.base import BaseStep, BaseTransformer, NonFittableMixin +from neuraxle.base import BaseStep, BaseTransformer from neuraxle.base import ExecutionContext as CX -from neuraxle.base import ForceHandleOnlyMixin, HandleOnlyMixin, MetaStep +from neuraxle.base import ForceHandleOnlyMixin, HandleOnlyMixin, MetaStep, NonFittableMixin +from neuraxle.data_container import DIT, EOT, DACTData from neuraxle.data_container import DataContainer as DACT -from neuraxle.hyperparams.space import RecursiveDict +from neuraxle.hyperparams.space import HyperparameterSamples, RecursiveDict VALUE_CACHING = 'value_caching' @@ -55,16 +56,25 @@ def _fit_data_container(self, data_container, context): self._assert(False, self.message, context) +NoneType = type(None) + + class BaseCallbackStep(BaseStep, ABC): """Base class for callback steps.""" - def __init__(self, callback_function, more_arguments: List = tuple(), - hyperparams=None, fit_callback_function=None, transform_function=None): + def __init__( + self, + callback_function: Callable[[DACTData], NoneType], + more_arguments: List[Any] = tuple(), + hyperparams: HyperparameterSamples = None, + fit_callback_function: Callable[[DACTData], NoneType] = None, + transform_function: Callable[[DACTData], NoneType] = None + ): """ Create the callback step with a function and extra arguments to send to the function :param callback_function: The function that will be called on events. - :param more_arguments: Extra arguments that will be sent to the callback after the processed data (optional). + :param more_arguments: Extra arguments that will be star-sent (*) to the callback after the processed data. """ BaseStep.__init__(self, hyperparams=hyperparams) self.transform_function = transform_function @@ -213,8 +223,8 @@ def clear_callbacks(self): class CallbackWrapper(HandleOnlyMixin, MetaStep): """ - A step that calls a callback function for each of his methods : transform, fit, fit_transform, and even inverse_transform. - To be used with :class:`TapeCallbackFunction`. + A step that calls a callback function for each of his methods: transform, fit, fit_transform, and even inverse_transform. + To be used with :class:`TapeCallbackFunction` most of the time, passed in the constructor. .. code-block:: python @@ -233,9 +243,9 @@ class CallbackWrapper(HandleOnlyMixin, MetaStep): def __init__( self, wrapped, - transform_callback_function, - fit_callback_function, - inverse_transform_callback_function=None, + transform_callback_function: Union['TapeCallbackFunction', Callable], + fit_callback_function: Union['TapeCallbackFunction', Callable], + inverse_transform_callback_function: Union['TapeCallbackFunction', Callable] = None, more_arguments: List = tuple(), hyperparams=None ): @@ -332,13 +342,13 @@ class TapeCallbackFunction: def __init__(self): """Initialize the tape (cache lists).""" - self.data: List = [] + self.data: List[DACTData] = [] # at each time the callback is called, data is appened. self.name_tape: List[str] = [] def __call__(self, *args, **kwargs): return self.callback(*args, **kwargs) - def callback(self, data, name: str = ""): + def callback(self, data: Tuple[DACTData, ...], name: str = ""): """ Will stick the data and name to the tape. diff --git a/neuraxle/steps/output_handlers.py b/neuraxle/steps/output_handlers.py index 993f6715..a6e93520 100644 --- a/neuraxle/steps/output_handlers.py +++ b/neuraxle/steps/output_handlers.py @@ -152,18 +152,18 @@ def _set_expected_outputs( return data_container -class _DidProcessInputOutputHandlerMixin(MixinForBaseTransformer): +class _DidProcessIdsInputOutputHandlerMixin(MixinForBaseTransformer): def _did_process(self, data_container: DACT, context: CX) -> DACT: - di, eo = data_container.data_inputs - + ids, di, eo = data_container.data_inputs self._assert( - di is None or eo is None or (len(di) == len(eo)), + di is None or eo is None or ids is None or (len(di) == len(eo) and len(ids) == len(di)), f'{self.name}: Found different len for non-null data inputs, and expected outputs. ' f'Please return the same the same amount of data inputs, and expected outputs, or ' f'otherwise create your own handler methods to do more funky things.', context ) + data_container.set_ids(ids=ids) data_container.set_data_inputs(data_inputs=di) data_container.set_expected_outputs(expected_outputs=eo) @@ -178,9 +178,9 @@ def _did_process(self, data_container: DACT, context: CX) -> DACT: return data_container -class InputAndOutputTransformerWrapper(_DidProcessInputOutputHandlerMixin, ForceHandleOnlyMixin, MetaStep): +class IdsInputAndOutputTransformerWrapper(_DidProcessIdsInputOutputHandlerMixin, ForceHandleOnlyMixin, MetaStep): """ - Wrapper step to transform both data inputs, and expected output at the same. + Wrapper step to transform both ids, data inputs, and expected output at the same time using classical fit and transform methods. It sends the data_inputs, and the expected_outputs to the wrapped step so that it can transform them. .. seealso:: @@ -191,7 +191,7 @@ class InputAndOutputTransformerWrapper(_DidProcessInputOutputHandlerMixin, Force def __init__(self, wrapped): MetaStep.__init__(self, wrapped) ForceHandleOnlyMixin.__init__(self) - _DidProcessInputOutputHandlerMixin.__init__(self) + _DidProcessIdsInputOutputHandlerMixin.__init__(self) def _transform_data_container(self, data_container: DACT, context: CX) -> DACT: """ @@ -205,8 +205,8 @@ def _transform_data_container(self, data_container: DACT, context: CX) -> DACT: """ output_data_container = self.wrapped.handle_transform( DACT( - data_inputs=(data_container.data_inputs, data_container.expected_outputs), - ids=data_container.ids, + ids=data_container._ids, + data_inputs=(data_container._ids, data_container.data_inputs, data_container.expected_outputs), expected_outputs=None ), context @@ -227,15 +227,17 @@ def _fit_data_container( """ self.wrapped = self.wrapped.handle_fit( DACT( - data_inputs=(copy.copy(data_container.data_inputs), copy.copy(data_container.expected_outputs)), - ids=data_container.ids, + ids=data_container._ids, + data_inputs=(copy.copy(data_container._ids), copy.copy( + data_container.data_inputs), copy.copy(data_container.expected_outputs)), expected_outputs=None ), context ) - data_container.set_data_inputs((data_container.data_inputs, data_container.expected_outputs)) - data_container.set_expected_outputs(expected_outputs=None) + data_container.set_data_inputs( + (data_container._ids, data_container.data_inputs, data_container.expected_outputs)) + data_container.set_expected_outputs(expected_outputs=None) # TODO: thy this? return self @@ -253,8 +255,8 @@ def _fit_transform_data_container( """ self.wrapped, output_data_container = self.wrapped.handle_fit_transform( DACT( - data_inputs=(data_container.data_inputs, data_container.expected_outputs), - ids=data_container.ids, + ids=data_container._ids, + data_inputs=(data_container._ids, data_container.data_inputs, data_container.expected_outputs), expected_outputs=None ), context @@ -273,8 +275,8 @@ def handle_inverse_transform(self, data_container: DACT, context: CX) -> DACT: """ output_data_container = self.wrapped.handle_inverse_transform( DACT( - data_inputs=(data_container.data_inputs, data_container.expected_outputs), - ids=data_container.ids, + ids=data_container._ids, + data_inputs=(data_container._ids, data_container.data_inputs, data_container.expected_outputs), expected_outputs=None ), context.push(self.wrapped) @@ -283,9 +285,9 @@ def handle_inverse_transform(self, data_container: DACT, context: CX) -> DACT: return output_data_container -class InputAndOutputTransformerMixin(_DidProcessInputOutputHandlerMixin): +class IdsAndInputAndOutputTransformerMixin(_DidProcessIdsInputOutputHandlerMixin): """ - Base output transformer step that can modify data inputs, and expected_outputs at the same time. + Base output transformer step that can modify ids, data inputs, and expected_outputs at the same time. """ def _transform_data_container(self, data_container: DACT, context: CX) -> DACT: @@ -296,9 +298,9 @@ def _transform_data_container(self, data_container: DACT, context: CX) -> DACT: :param data_container: :return: """ - di_eo = (data_container.data_inputs, data_container.expected_outputs) - new_data_inputs, new_expected_outputs = self.transform(di_eo) - data_container.set_data_inputs((new_data_inputs, new_expected_outputs)) + di_eo = (data_container._ids, data_container.data_inputs, data_container.expected_outputs) + new_ids, new_data_inputs, new_expected_outputs = self.transform(di_eo) + data_container.set_data_inputs((new_ids, new_data_inputs, new_expected_outputs)) return data_container def _fit_data_container(self, data_container: DACT, context: CX) -> 'BaseStep': @@ -310,9 +312,10 @@ def _fit_data_container(self, data_container: DACT, context: CX) -> 'BaseStep': :param data_container: :return: """ - new_self = self.fit((data_container.data_inputs, data_container.expected_outputs), None) - data_container.set_data_inputs((data_container.data_inputs, data_container.expected_outputs)) - data_container.set_expected_outputs(expected_outputs=None) + new_self = self.fit((data_container._ids, data_container.data_inputs, data_container.expected_outputs), None) + data_container.set_data_inputs( + (data_container._ids, data_container.data_inputs, data_container.expected_outputs)) + data_container.set_expected_outputs(expected_outputs=None) # TODO: check if this None eo is correct return new_self def _fit_transform_data_container( @@ -326,7 +329,7 @@ def _fit_transform_data_container( :param data_container: :return: """ - new_self, (new_data_inputs, new_expected_outputs) = self.fit_transform( - (data_container.data_inputs, data_container.expected_outputs), None) - data_container.set_data_inputs((new_data_inputs, new_expected_outputs)) + new_self, (new_ids, new_data_inputs, new_expected_outputs) = self.fit_transform( + (data_container._ids, data_container.data_inputs, data_container.expected_outputs), None) + data_container.set_data_inputs((new_ids, new_data_inputs, new_expected_outputs)) return new_self, data_container diff --git a/neuraxle/union.py b/neuraxle/union.py index acc3ac11..4a93d9ff 100644 --- a/neuraxle/union.py +++ b/neuraxle/union.py @@ -28,17 +28,16 @@ from joblib import Parallel, delayed -from neuraxle.base import (BaseStep, BaseTransformer, DACT, - CX, ForceHandleOnlyMixin, Identity, - NamedStepsList, NonFittableMixin, TruncableSteps) +from neuraxle.base import (CX, DACT, BaseStep, BaseTransformer, ForceHandleOnlyMixin, Identity, NamedStepsList, + NonFittableMixin, TruncableSteps, _TruncableServiceWithBodyMixin) from neuraxle.data_container import ZipDataContainer from neuraxle.steps.numpy import NumpyConcatenateInnerFeatures -class FeatureUnion(ForceHandleOnlyMixin, TruncableSteps): +class FeatureUnion(_TruncableServiceWithBodyMixin, ForceHandleOnlyMixin, TruncableSteps): """ Transform features in parallel as the union of many pipeline steps. - + This step is also available with true parallel processing threads or processes in the streaming package of Neuraxle. @@ -82,6 +81,7 @@ def __init__( steps_as_tuple.append(('joiner', joiner)) TruncableSteps.__init__(self, steps_as_tuple) ForceHandleOnlyMixin.__init__(self, cache_folder=cache_folder_when_no_handle) + _TruncableServiceWithBodyMixin.__init__(self) self.n_jobs = n_jobs self.backend = backend @@ -94,17 +94,17 @@ def _fit_data_container(self, data_container, context): """ # Actually fit: if self.n_jobs != 1: - fitted_steps = Parallel(backend=self.backend, n_jobs=self.n_jobs)( + fitted_body = Parallel(backend=self.backend, n_jobs=self.n_jobs)( delayed(step.handle_fit)(data_container.copy(), context) - for _, step in self.steps_as_tuple[:-1] + for step in self.body ) else: - fitted_steps = [ + fitted_body = [ step.handle_fit(data_container.copy(), context) - for _, step in self.steps_as_tuple[:-1] + for step in self.body ] - self._save_fitted_steps(fitted_steps) + self._save_fitted_body(fitted_body) return self @@ -118,12 +118,12 @@ def _transform_data_container(self, data_container, context): if self.n_jobs != 1: data_containers = Parallel(backend=self.backend, n_jobs=self.n_jobs)( delayed(step.handle_transform)(data_container.copy(), context) - for _, step in self.steps_as_tuple[:-1] + for step in self.body ) else: data_containers = [ step.handle_transform(data_container.copy(), context) - for _, step in self.steps_as_tuple[:-1] + for step in self.body ] return DACT( @@ -149,7 +149,7 @@ def _fit_transform_data_container(self, data_container, context): return new_self, data_container - def _save_fitted_steps(self, fitted_steps): + def _save_fitted_body(self, fitted_steps): # Save fitted steps for i, fitted_step in enumerate(fitted_steps[:-1]): self.steps_as_tuple[i] = (self.steps_as_tuple[i][0], fitted_step) diff --git a/testing_neuraxle/hyperparams/test_scipy_distributions.py b/testing_neuraxle/hyperparams/test_scipy_distributions.py index 356c783b..9feeee7a 100644 --- a/testing_neuraxle/hyperparams/test_scipy_distributions.py +++ b/testing_neuraxle/hyperparams/test_scipy_distributions.py @@ -6,13 +6,11 @@ import numpy as np import pytest from neuraxle.base import Identity -from neuraxle.hyperparams.distributions import (Choice, LogNormal, LogUniform, - Normal, PriorityChoice, - RandInt, Uniform, +from neuraxle.hyperparams.distributions import (Choice, LogNormal, LogUniform, Normal, PriorityChoice, RandInt, Uniform, get_index_in_list_with_bool) -from neuraxle.hyperparams.scipy_distributions import ( - Gaussian, Histogram, Poisson, ScipyContinuousDistributionWrapper, - ScipyDiscreteDistributionWrapper, ScipyLogUniform, StdMeanLogNormal) +from neuraxle.hyperparams.scipy_distributions import (Gaussian, Histogram, Poisson, ScipyContinuousDistributionWrapper, + ScipyDiscreteDistributionWrapper, ScipyLogUniform, + StdMeanLogNormal) from neuraxle.hyperparams.space import HyperparameterSpace from scipy.stats import gamma, norm, randint, uniform @@ -26,10 +24,10 @@ def get_many_samples_for(hd): def test_wrapped_sk_learn_distributions_should_be_able_to_use_sklearn_methods(): wrapped_sklearn_distribution = Gaussian(min_included=0, max_included=10, null_default_value=0) - assert wrapped_sklearn_distribution.logpdf(5) == -13.418938533204672 - assert wrapped_sklearn_distribution.logcdf(5) == -0.6931477538632531 - assert wrapped_sklearn_distribution.sf(5) == 0.5000002866515718 - assert wrapped_sklearn_distribution.logsf(5) == -0.693146607256966 + assert np.isclose(wrapped_sklearn_distribution.logpdf(5), -13.418938533204672) + assert np.isclose(wrapped_sklearn_distribution.logcdf(5), -0.6931477538632531) + assert np.isclose(wrapped_sklearn_distribution.sf(5), 0.5000002866515718) + assert np.isclose(wrapped_sklearn_distribution.logsf(5), -0.693146607256966) assert np.all(wrapped_sklearn_distribution.ppf([0.0, 0.01, 0.05, 0.1, 1 - 0.10, 1 - 0.05, 1 - 0.01, 1.0], 10)) assert 8 < wrapped_sklearn_distribution.isf(q=0.5) > 8 assert wrapped_sklearn_distribution.moment(2) > 50 @@ -38,10 +36,10 @@ def test_wrapped_sk_learn_distributions_should_be_able_to_use_sklearn_methods(): assert stats[1] assert np.array_equal(wrapped_sklearn_distribution.entropy(), np.array(0.7094692666023363)) assert wrapped_sklearn_distribution.median() - assert wrapped_sklearn_distribution.mean() == 5.398942280397029 + assert np.isclose(wrapped_sklearn_distribution.mean(), 5.398942280397029) assert np.isclose(wrapped_sklearn_distribution.std(), 4.620759921685375) assert np.isclose(wrapped_sklearn_distribution.var(), 21.351422253853833) - assert wrapped_sklearn_distribution.expect() == 0.39894228040143276 + assert np.isclose(wrapped_sklearn_distribution.expect(), 0.39894228040143276) interval = wrapped_sklearn_distribution.interval(alpha=[0.25, 0.50]) assert np.all(interval[0]) assert np.all(interval[1]) @@ -103,10 +101,10 @@ def _test_discrete_poisson(poisson_distribution: Poisson): rvs = [poisson_distribution.rvs() for i in range(10)] assert not all(x == rvs[0] for x in rvs) assert 0.0 <= poisson_distribution.rvs() <= 10.0 - assert poisson_distribution.pdf(10) == 0.01813278870782187 + assert np.isclose(poisson_distribution.pdf(10), 0.01813278870782187) assert np.isclose(poisson_distribution.pdf(0), 0.006737946999085467) - assert poisson_distribution.cdf(5.0) == 0.6159606548330632 - assert poisson_distribution.cdf(0) == 0.006737946999085467 + assert np.isclose(poisson_distribution.cdf(5.0), 0.6159606548330632) + assert np.isclose(poisson_distribution.cdf(0), 0.006737946999085467) def test_randint(): @@ -172,7 +170,7 @@ def _test_loguniform(hd: ScipyLogUniform): assert min(samples) >= 0.001 assert max(samples) <= 10.0 assert hd.pdf(0.0001) == 0. - assert abs(hd.pdf(2) - 0.054286810237906484) < 2e-6 + assert np.isclose(hd.pdf(2), 0.054286810237906484) assert hd.pdf(10.1) == 0. assert hd.cdf(0.0001) == 0. assert abs(hd.cdf(2) - (math.log2(2) - math.log2(0.001)) / (math.log2(10) - math.log2(0.001))) < 1e-6 @@ -197,12 +195,12 @@ def _test_normal(hd): assert 0.6 > samples_mean > 0.4 samples_std = np.std(samples) assert 0.1 < samples_std < 0.6 - assert (abs(hd.pdf(-1.) - 0.24) - 0.24) < 1e-10 - assert (abs(hd.pdf(0.) - 0.40) - 0.31125636093539194) < 1e-10 - assert (abs(hd.pdf(1.)) - 0.08874363906460808) < 1e-10 - assert (abs(hd.cdf(-1.) - 0.15) - 0.15) < 1e-10 - assert (abs(hd.cdf(+0.) - 0.5) - 0.5) < 1e-10 - assert (abs(hd.cdf(+1.) - 0.85) - 0.15000000000000002) < 1e-10 + assert np.isclose(abs(hd.pdf(-1.) - 0.24), 0.24) + assert np.isclose(abs(hd.pdf(0.) - 0.40), 0.31125636093539194) + assert np.isclose(abs(hd.pdf(1.)), 0.08874363906460808) + assert np.isclose(abs(hd.cdf(-1.) - 0.15), 0.15) + assert np.isclose(abs(hd.cdf(+0.) - 0.5), 0.5) + assert np.isclose(abs(hd.cdf(+1.) - 0.85), 0.15000000000000002) def test_lognormal(): @@ -223,12 +221,12 @@ def _test_lognormal(hd: StdMeanLogNormal): assert -5 < samples_median < 5 samples_std = np.std(samples) assert 0 < samples_std < 4 - assert hd.pdf(0.) == 0. - assert abs(hd.pdf(1.) - 0.28777602476804065) < 1e-6 - assert abs(hd.pdf(5.) - 0.029336304593386688) < 1e-6 - assert hd.cdf(0.) == 0. - assert hd.cdf(1.) == 0.49999999998280026 - assert abs(hd.cdf(5.) - 0.8771717397015799) == 0.12282826029842009 + assert np.isclose(hd.pdf(0.), 0.) + assert np.isclose(hd.pdf(1.), 0.28777602476804065) + assert np.isclose(hd.pdf(5.), 0.029336304593386688) + assert np.isclose(hd.cdf(0.), 0.) + assert np.isclose(hd.cdf(1.), 0.49999999998280026) + assert np.isclose(abs(hd.cdf(5.) - 0.8771717397015799), 0.12282826029842009) @pytest.mark.parametrize("hd, test_method", [ diff --git a/testing_neuraxle/metaopt/test_automl_redesign.py b/testing_neuraxle/metaopt/test_automl_redesign.py index b34e6958..5ac4c93e 100644 --- a/testing_neuraxle/metaopt/test_automl_redesign.py +++ b/testing_neuraxle/metaopt/test_automl_redesign.py @@ -3,15 +3,17 @@ import numpy as np import pytest +from neuraxle.base import BaseStep +from neuraxle.base import ExecutionContext as CX +from neuraxle.base import Identity, NonFittableMixin from neuraxle.data_container import DataContainer as DACT -from neuraxle.hyperparams.distributions import (Boolean, Choice, LogUniform, - RandInt) +from neuraxle.hyperparams.distributions import Boolean, Choice, LogUniform, RandInt from neuraxle.hyperparams.space import HyperparameterSpace from neuraxle.metaopt.auto_ml import AutoML, RandomSearchSampler -from neuraxle.metaopt.callbacks import (EarlyStoppingCallback, MetricCallback, - ScoringCallback) +from neuraxle.metaopt.callbacks import EarlyStoppingCallback, MetricCallback, ScoringCallback from neuraxle.metaopt.context import AutoMLContext -from neuraxle.metaopt.repositories.repo import VanillaHyperparamsRepository +from neuraxle.metaopt.data.vanilla import ScopedLocation +from neuraxle.metaopt.repositories.repo import HyperparamsRepository, VanillaHyperparamsRepository from neuraxle.metaopt.validation import ValidationSplitter from neuraxle.pipeline import Pipeline from neuraxle.steps.numpy import NumpyRavel @@ -20,8 +22,7 @@ from sklearn.linear_model import LogisticRegression from sklearn.metrics import accuracy_score, mean_squared_error from sklearn.preprocessing import StandardScaler -from testing_neuraxle.metaopt.test_automl_repositories import ( - CX_WITH_REPO_CTORS, TmpDir) +from testing_neuraxle.metaopt.test_automl_repositories import CX_WITH_REPO_CTORS, TmpDir def _create_data_source(): @@ -30,7 +31,29 @@ def _create_data_source(): return data_inputs, expected_outputs -def _create_pipeline(): +class SetNoneEO(Identity): + + def __init__(self): + Identity.__init__(self) + + def _will_process(self, dact: DACT, cx: CX): + dact, cx = Identity._will_process(self, dact, cx) + dact = dact.with_eo(None) + return dact, cx + + +class FailingStep(NonFittableMixin, BaseStep): + + def __init__(self): + BaseStep.__init__(self) + NonFittableMixin.__init__(self) + + def _will_process(self, dact: DACT, cx: CX): + raise ValueError("This error should be found in the logs of the test.") + return dact, cx + + +def _create_pipeline(has_failing_step=False): return Pipeline([ StandardScaler(), OutputTransformerWrapper(NumpyRavel()), @@ -42,15 +65,18 @@ def _create_pipeline(): 'penalty': Choice(['none', 'l2']), 'max_iter': RandInt(20, 200) }) - ) + ), + FailingStep() if has_failing_step else Identity(), + SetNoneEO(), ]) @pytest.mark.parametrize('cx_repo_ctor', CX_WITH_REPO_CTORS) -def test_automl_api_entry_point(tmpdir, cx_repo_ctor: Callable[[Optional[TmpDir]], AutoMLContext]): +@pytest.mark.parametrize('has_failing_step', [False, True]) +def test_automl_api_entry_point(tmpdir, cx_repo_ctor: Callable[[Optional[TmpDir]], AutoMLContext], has_failing_step: bool): data_inputs, expected_outputs = _create_data_source() dact = DACT(data_inputs=data_inputs, expected_outputs=expected_outputs) - pipeline = _create_pipeline() + pipeline = _create_pipeline(has_failing_step=has_failing_step) # TODO: # HyperbandControllerLoop(), ClusteringParallelFor() ? a: AutoML = AutoML( @@ -66,12 +92,16 @@ def test_automl_api_entry_point(tmpdir, cx_repo_ctor: Callable[[Optional[TmpDir] continue_loop_on_error=True, n_trials=4, epochs=5, - refit_best_trial=True, + refit_best_trial=False, ) + cx: CX = cx_repo_ctor() + repo: HyperparamsRepository = cx.repo - a, _out = a.handle_fit_transform( - dact, - cx_repo_ctor() - ) + a = a.handle_fit(dact, cx) - assert _out is not None + if has_failing_step: + assert 'ValueError("This error should be found in the logs of the test.")' in repo.get_log_from_logging_handler( + cx.logger, ScopedLocation()) + else: + a, _out = a.to_force_refit_best_trial().handle_fit_transform(dact, cx) + assert _out is not None diff --git a/testing_neuraxle/steps/test_expand_dim.py b/testing_neuraxle/steps/test_expand_dim.py index e097ff75..289a0826 100644 --- a/testing_neuraxle/steps/test_expand_dim.py +++ b/testing_neuraxle/steps/test_expand_dim.py @@ -10,33 +10,28 @@ def test_expand_dim_transform(): di = np.array(range(10)) - eo = [None] * 10 - handle_fit_callback = TapeCallbackFunction() - handle_transform_callback = TapeCallbackFunction() - handle_fit_transform_callback = TapeCallbackFunction() + eo = None + fit_callback, transform_callback, fit_transform_callback = ( + TapeCallbackFunction(), TapeCallbackFunction(), TapeCallbackFunction()) p = Pipeline([ ExpandDim( - HandleCallbackStep( - handle_fit_callback, - handle_transform_callback, - handle_fit_transform_callback - ) + HandleCallbackStep(fit_callback, transform_callback, fit_transform_callback) ) ]) outputs = p.transform(di) assert np.array_equal(outputs, di) - assert handle_fit_callback.data == [] + assert fit_callback.data == [] assert np.array_equal( - np.array(handle_transform_callback.data[0][0].di), + np.array(transform_callback.data[0][0].di), np.array([di]) ) assert np.array_equal( - np.array(handle_transform_callback.data[0][0].eo), + np.array(transform_callback.data[0][0].eo), np.array([eo]) ) - assert handle_fit_transform_callback.data == [] + assert fit_transform_callback.data == [] def test_expand_dim_fit(): diff --git a/testing_neuraxle/steps/test_output_transformer_wrapper.py b/testing_neuraxle/steps/test_output_transformer_wrapper.py index 7294b257..27468a28 100644 --- a/testing_neuraxle/steps/test_output_transformer_wrapper.py +++ b/testing_neuraxle/steps/test_output_transformer_wrapper.py @@ -3,14 +3,12 @@ from neuraxle.base import BaseTransformer from neuraxle.base import ExecutionContext as CX from neuraxle.data_container import DataContainer as DACT -from neuraxle.hyperparams.space import (HyperparameterSamples, - HyperparameterSpace) +from neuraxle.hyperparams.space import HyperparameterSamples, HyperparameterSpace from neuraxle.pipeline import Pipeline -from neuraxle.steps.output_handlers import InputAndOutputTransformerMixin -from py._path.local import LocalPath +from neuraxle.steps.output_handlers import IdsAndInputAndOutputTransformerMixin -class MultiplyBy2OutputTransformer(InputAndOutputTransformerMixin, BaseTransformer): +class MultiplyBy2OutputTransformer(IdsAndInputAndOutputTransformerMixin, BaseTransformer): def __init__( self, hyperparams: HyperparameterSamples = None, @@ -18,10 +16,10 @@ def __init__( name: str = None ): BaseTransformer.__init__(self, hyperparams, hyperparams_space, name) - InputAndOutputTransformerMixin.__init__(self) + IdsAndInputAndOutputTransformerMixin.__init__(self) def transform(self, data_inputs) -> Tuple[Any, Any]: - dis, eos = data_inputs + ids, dis, eos = data_inputs new_dis = [] new_eos = [] @@ -29,17 +27,17 @@ def transform(self, data_inputs) -> Tuple[Any, Any]: new_dis.append(di * 2) new_eos.append(eo * 2) - return new_dis, new_eos + return ids, new_dis, new_eos -def test_output_transformer_should_zip_data_input_and_expected_output_in_the_transformed_output(tmpdir: LocalPath): +def test_output_transformer_should_zip_data_input_and_expected_output_in_the_transformed_output(): pipeline = Pipeline([ MultiplyBy2OutputTransformer() ]) pipeline, new_data_container = pipeline.handle_fit_transform( DACT(data_inputs=[1, 2, 3], ids=[0, 1, 2], expected_outputs=[2, 3, 4]), - CX(tmpdir) + CX() ) assert new_data_container.data_inputs == [2, 4, 6] diff --git a/testing_neuraxle/steps/test_sklearn_wrapper.py b/testing_neuraxle/steps/test_sklearn_wrapper.py index 85f725e8..516270dd 100644 --- a/testing_neuraxle/steps/test_sklearn_wrapper.py +++ b/testing_neuraxle/steps/test_sklearn_wrapper.py @@ -120,8 +120,6 @@ def _create_data_source(shape): return data_inputs, expected_outputs -# With AutoML loop - def _test_within_auto_ml_loop(tmpdir, pipeline): X_train = np.random.random((25, 50)).astype(np.float32) Y_train = np.random.random((25,)).astype(np.float32) @@ -144,13 +142,11 @@ def _test_within_auto_ml_loop(tmpdir, pipeline): auto_ml.fit(X_train, Y_train) -@pytest.mark.skip(reason="AutoML loop refactor") def test_automl_sklearn(tmpdir): grad_boost = SKLearnWrapper(GradientBoostingRegressor()) _test_within_auto_ml_loop(tmpdir, grad_boost) -@pytest.mark.skip(reason="AutoML loop refactor") def test_automl_sklearn_model_with_base_estimator(tmpdir): grad_boost = GradientBoostingRegressor() bagged_regressor = BaggingRegressor( @@ -159,7 +155,7 @@ def test_automl_sklearn_model_with_base_estimator(tmpdir): wrapped_bagged_regressor = SKLearnWrapper( bagged_regressor, HyperparameterSpace({ - "n_estimators": RandInt(10, 100), + "n_estimators": RandInt(2, 15), "max_features": Uniform(0.6, 1.0)}), # return_all_sklearn_default_params_on_get=True ) diff --git a/testing_neuraxle/test_context_logger.py b/testing_neuraxle/test_context_logger.py index 83dc6e70..086acc67 100644 --- a/testing_neuraxle/test_context_logger.py +++ b/testing_neuraxle/test_context_logger.py @@ -164,7 +164,8 @@ def test_automl_neuraxle_logger_logs_to_repo_file(tmpdir): tsc.flow.log_status(TrialStatus.RUNNING) tsc.flow.log_end(TrialStatus.ABORTED) - log_file_path_at_loc = cx.repo.wrapped.get_scoped_logger_path(tsc.loc) + _repo: HyperparamsOnDiskRepository = cx.repo.wrapped + log_file_path_at_loc = _repo._create_scoped_logger_path(tsc.loc) assert os.path.exists(log_file_path_at_loc) log1 = tsc.context.read_scoped_log() with open(log_file_path_at_loc, 'r') as _file: @@ -276,32 +277,33 @@ def __init__(self, logging_queue: Queue, n_process: int): def start(self): for i in range(self.n_process): proc = Process( - target=self.logger_producer_thread, + target=logger_producer_thread, name=f"worker_{i}", args=(self.logging_queue,) ) self.workers.append(proc) proc.start() - @staticmethod - def logger_producer_thread(logging_queue: Queue): - queue_handler = logging.handlers.QueueHandler(logging_queue) - root = logging.getLogger() - root.setLevel(logging.DEBUG) - root.addHandler(queue_handler) - - logger = CX().logger - logger.log(logging.ERROR, SomeParallelLogginWorkers.FIRST_LOG_MESSAGE) - - dact = DACT(di=range(10)) - _, out = FitTransformCounterLoggingStep().set_name("Producer").handle_fit_transform(dact, CX()) - return - def join(self): for worker in self.workers: worker.join() +def logger_producer_thread(logging_queue: Queue): + # TODO: isn't this duplicated from the logging module or streaming.py code? + queue_handler = logging.handlers.QueueHandler(logging_queue) + root = logging.getLogger() + root.setLevel(logging.DEBUG) + root.addHandler(queue_handler) + + logger = CX().logger + logger.log(logging.ERROR, SomeParallelLogginWorkers.FIRST_LOG_MESSAGE) + + dact = DACT(di=range(10)) + _, out = FitTransformCounterLoggingStep().set_name("Producer").handle_fit_transform(dact, CX()) + return + + def test_neuraxle_logger_can_operate_in_parallel(): # TODO: test with disk files as well? logging_queue = Queue() diff --git a/testing_neuraxle/test_full_pipeline_dump.py b/testing_neuraxle/test_full_pipeline_dump.py index 5049cc97..189b42e8 100644 --- a/testing_neuraxle/test_full_pipeline_dump.py +++ b/testing_neuraxle/test_full_pipeline_dump.py @@ -30,7 +30,7 @@ def test_load_full_dump_from_pipeline_name(tmpdir): step_b_wrapped_step = pipeline.wrapped['step_b'].wrapped assert np.array_equal(step_b_wrapped_step.transform_callback_function.data[0], EXPECTED_OUTPUTS) assert np.array_equal(step_b_wrapped_step.fit_callback_function.data[0][0], EXPECTED_OUTPUTS) - assert np.array_equal(step_b_wrapped_step.fit_callback_function.data[0][1], [None] * len(EXPECTED_OUTPUTS)) + assert np.array_equal(step_b_wrapped_step.fit_callback_function.data[0][1], None) pipeline.save(CX(tmpdir), full_dump=True) @@ -44,7 +44,7 @@ def test_load_full_dump_from_pipeline_name(tmpdir): loaded_step_b_wrapped_step = loaded_pipeline['step_b'].wrapped assert np.array_equal(loaded_step_b_wrapped_step.transform_callback_function.data[0], EXPECTED_OUTPUTS) assert np.array_equal(loaded_step_b_wrapped_step.fit_callback_function.data[0][0], EXPECTED_OUTPUTS) - assert np.array_equal(loaded_step_b_wrapped_step.fit_callback_function.data[0][1], [None] * len(EXPECTED_OUTPUTS)) + assert np.array_equal(loaded_step_b_wrapped_step.fit_callback_function.data[0][1], None) def test_load_full_dump_from_path(tmpdir): @@ -69,4 +69,4 @@ def test_load_full_dump_from_path(tmpdir): loaded_step_b_wrapped_step = loaded_pipeline.wrapped assert np.array_equal(loaded_step_b_wrapped_step.transform_callback_function.data[0], EXPECTED_OUTPUTS) assert np.array_equal(loaded_step_b_wrapped_step.fit_callback_function.data[0][0], EXPECTED_OUTPUTS) - assert np.array_equal(loaded_step_b_wrapped_step.fit_callback_function.data[0][1], [None] * len(EXPECTED_OUTPUTS)) + assert np.array_equal(loaded_step_b_wrapped_step.fit_callback_function.data[0][1], None) diff --git a/testing_neuraxle/test_output_transformer_wrapper.py b/testing_neuraxle/test_output_transformer_wrapper.py index e01316cd..a5afbee1 100644 --- a/testing_neuraxle/test_output_transformer_wrapper.py +++ b/testing_neuraxle/test_output_transformer_wrapper.py @@ -1,14 +1,15 @@ import numpy as np import pytest - -from neuraxle.base import CX, BaseTransformer, ForceHandleMixin +from neuraxle.base import BaseTransformer +from neuraxle.base import ExecutionContext as CX +from neuraxle.base import ForceHandleMixin +from neuraxle.data_container import EOT from neuraxle.data_container import DataContainer as DACT from neuraxle.hyperparams.space import HyperparameterSamples from neuraxle.pipeline import Pipeline from neuraxle.steps.misc import FitCallbackStep, TapeCallbackFunction from neuraxle.steps.numpy import MultiplyByN -from neuraxle.steps.output_handlers import OutputTransformerWrapper, InputAndOutputTransformerWrapper, \ - InputAndOutputTransformerMixin +from neuraxle.steps.output_handlers import IdsInputAndOutputTransformerWrapper, OutputTransformerWrapper class MultiplyByNInputAndOutput(BaseTransformer): @@ -16,7 +17,7 @@ def __init__(self, multiply_by=1): super().__init__(hyperparams=HyperparameterSamples({'multiply_by': multiply_by})) def transform(self, data_inputs): - data_inputs, expected_outputs = data_inputs + ids, data_inputs, expected_outputs = data_inputs if not isinstance(data_inputs, np.ndarray): data_inputs = np.array(data_inputs) @@ -24,10 +25,10 @@ def transform(self, data_inputs): if not isinstance(expected_outputs, np.ndarray): expected_outputs = np.array(expected_outputs) - return data_inputs * self.hyperparams['multiply_by'], expected_outputs * self.hyperparams['multiply_by'] + return ids, data_inputs * self.hyperparams['multiply_by'], expected_outputs * self.hyperparams['multiply_by'] def inverse_transform(self, processed_outputs): - processed_outputs, expected_outputs = processed_outputs + ids, processed_outputs, expected_outputs = processed_outputs if not isinstance(processed_outputs, np.ndarray): processed_outputs = np.array(processed_outputs) @@ -35,7 +36,7 @@ def inverse_transform(self, processed_outputs): if not isinstance(expected_outputs, np.ndarray): expected_outputs = np.array(expected_outputs) - return processed_outputs / self.hyperparams['multiply_by'], expected_outputs / self.hyperparams['multiply_by'] + return ids, processed_outputs / self.hyperparams['multiply_by'], expected_outputs / self.hyperparams['multiply_by'] def test_output_transformer_wrapper_should_fit_with_data_inputs_and_expected_outputs_as_data_inputs(): @@ -47,7 +48,8 @@ def test_output_transformer_wrapper_should_fit_with_data_inputs_and_expected_out assert np.array_equal(tape.data[0][0], expected_outputs) for i in range(10): - assert tape.data[0][1][i] is None + _first_eot_seen: EOT = tape.data[0][1] + assert _first_eot_seen is None def test_output_transformer_wrapper_should_fit_transform_with_data_inputs_and_expected_outputs(): @@ -64,7 +66,8 @@ def test_output_transformer_wrapper_should_fit_transform_with_data_inputs_and_ex assert np.array_equal(data_container.expected_outputs, expected_outputs * 2) assert np.array_equal(tape.data[0][0], expected_outputs * 2) for i in range(10): - assert tape.data[0][1][i] is None + _first_eot_seen: EOT = tape.data[0][1] + assert _first_eot_seen is None def test_output_transformer_wrapper_should_transform_with_data_inputs_and_expected_outputs(): @@ -82,33 +85,36 @@ def test_output_transformer_wrapper_should_transform_with_data_inputs_and_expect def test_input_and_output_transformer_wrapper_should_fit_with_data_inputs_and_expected_outputs_as_data_inputs(): tape = TapeCallbackFunction() - p = InputAndOutputTransformerWrapper(FitCallbackStep(tape)) - data_inputs, expected_outputs = _create_data_source((10, 10)) + p = IdsInputAndOutputTransformerWrapper(FitCallbackStep(tape)) + ids, data_inputs, expected_outputs = _create_data_source((10, 10), with_ids=True) - p.fit(data_inputs, expected_outputs) + p.handle_fit(DACT(ids=ids, di=data_inputs, eo=expected_outputs), CX()) - assert np.array_equal(tape.data[0][0][0], data_inputs) - assert np.array_equal(tape.data[0][0][1], expected_outputs) + assert np.array_equal(tape.data[0][0][0], ids) + assert np.array_equal(tape.data[0][0][1], data_inputs) + assert np.array_equal(tape.data[0][0][2], expected_outputs) def test_input_and_output_transformer_wrapper_should_fit_transform_with_data_inputs_and_expected_outputs(): tape = TapeCallbackFunction() - p = InputAndOutputTransformerWrapper(Pipeline([MultiplyByNInputAndOutput(2), FitCallbackStep(tape)])) - data_inputs, expected_outputs = _create_data_source((10, 10)) + p = IdsInputAndOutputTransformerWrapper(Pipeline([MultiplyByNInputAndOutput(2), FitCallbackStep(tape)])) + ids, data_inputs, expected_outputs = _create_data_source((10, 10), with_ids=True) p, data_container = p.handle_fit_transform(DACT( + ids=ids, data_inputs=data_inputs, expected_outputs=expected_outputs ), CX()) assert np.array_equal(data_container.data_inputs, data_inputs * 2) assert np.array_equal(data_container.expected_outputs, expected_outputs * 2) - assert np.array_equal(tape.data[0][0][0], data_inputs * 2) - assert np.array_equal(tape.data[0][0][1], expected_outputs * 2) + assert np.array_equal(tape.data[0][0][0], ids) + assert np.array_equal(tape.data[0][0][1], data_inputs * 2) + assert np.array_equal(tape.data[0][0][2], expected_outputs * 2) def test_input_and_output_transformer_wrapper_should_transform_with_data_inputs_and_expected_outputs(): - p = InputAndOutputTransformerWrapper(MultiplyByNInputAndOutput(2)) + p = IdsInputAndOutputTransformerWrapper(MultiplyByNInputAndOutput(2)) data_inputs, expected_outputs = _create_data_source((10, 10)) data_container = p.handle_transform(DACT( @@ -129,11 +135,11 @@ def __init__(self): super().__init__() def transform(self, data_inputs): - data_inputs, expected_outputs = data_inputs - return data_inputs[0:int(len(data_inputs) / 2)], expected_outputs + ids, data_inputs, expected_outputs = data_inputs + return ids, data_inputs[0:int(len(data_inputs) / 2)], expected_outputs -class ChangeLenDataInputsAndExpectedOutputs(BaseTransformer): +class ChangeLenDataInputsAndExpectedOutputsWithoutIds(BaseTransformer): """ This should raise an error because ids are not changed to fit the new length of data_inputs and expected_outputs """ @@ -142,8 +148,9 @@ def __init__(self): BaseTransformer.__init__(self) def transform(self, data_inputs): - data_inputs, expected_outputs = data_inputs - return data_inputs[0:int(len(data_inputs) / 2)], expected_outputs[0:int(len(data_inputs) / 2)] + ids, data_inputs, expected_outputs = data_inputs + _clip = int(len(data_inputs) / 2) + return ids, data_inputs[:_clip], expected_outputs[:_clip] class DoubleData(ForceHandleMixin, BaseTransformer): @@ -156,49 +163,61 @@ def __init__(self): ForceHandleMixin.__init__(self) def _transform_data_container(self, data_container: DACT, context: CX) -> DACT: - di, eo = data_container.data_inputs - return DACT(data_inputs=(di[0].tolist()*2, eo[0].tolist()*2), - ids=data_container.ids*2) + ids, di, eo = data_container.data_inputs + return DACT( + data_inputs=( + ids * 2 if ids is not None else None, + di[0].tolist() * 2, + eo[0].tolist() * 2), + ids=list(data_container.ids) + ) def test_input_and_output_transformer_wrapper_should_not_return_a_different_amount_of_data_inputs_and_expected_outputs(): with pytest.raises(AssertionError): - p = InputAndOutputTransformerWrapper(ChangeLenDataInputs()) - data_inputs, expected_outputs = _create_data_source((10, 10)) + p = IdsInputAndOutputTransformerWrapper(ChangeLenDataInputs()) + ids, data_inputs, expected_outputs = _create_data_source((10, 10), with_ids=True) p.handle_transform(DACT( + ids=ids, data_inputs=data_inputs, expected_outputs=expected_outputs ), CX()) def test_input_and_output_transformer_wrapper_should_raise_an_assertion_error_if_ids_have_not_been_resampled_correctly(): - with pytest.raises(AssertionError) as e: - p = InputAndOutputTransformerWrapper(ChangeLenDataInputsAndExpectedOutputs()) - data_inputs, expected_outputs = _create_data_source((10, 10)) + with pytest.raises(AssertionError): + p = IdsInputAndOutputTransformerWrapper(ChangeLenDataInputsAndExpectedOutputsWithoutIds()) + ids, data_inputs, expected_outputs = _create_data_source((10, 10), with_ids=True) p.handle_transform(DACT( + ids=ids, data_inputs=data_inputs, expected_outputs=expected_outputs ), CX()) def test_data_doubler(): - p = InputAndOutputTransformerWrapper(DoubleData()) - data_inputs, expected_outputs = _create_data_source((10, 10)) + p = IdsInputAndOutputTransformerWrapper(DoubleData()) + ids, data_inputs, expected_outputs = _create_data_source((10, 10), with_ids=True) out = p.handle_transform(DACT( + ids=ids, data_inputs=data_inputs, expected_outputs=expected_outputs ), CX()) doubled_length = len(out.data_inputs) - assert doubled_length == 2*len(data_inputs) + assert doubled_length == 2 * len(data_inputs) assert doubled_length == len(out.expected_outputs) assert doubled_length == len(out.ids) -def _create_data_source(shape): +def _create_data_source(shape, with_ids=False): data_inputs = np.random.random(shape).astype(np.float32) expected_outputs = np.random.random(shape).astype(np.float32) - return data_inputs, expected_outputs + if with_ids: + ids = list(range(len(data_inputs))) + return ids, data_inputs, expected_outputs + else: + return data_inputs, expected_outputs