From 498cff4637dd5376d373e0b8ff9b299589b6f737 Mon Sep 17 00:00:00 2001 From: Devindi97 Date: Fri, 27 Sep 2024 15:30:44 +0200 Subject: [PATCH] Template Classes for Anomaly Prediction --- ...OA_Anomaly_Predictors_class_diagram.drawio | 76 ++++++++++++++----- .../tasks/anomalypredictors/tsf/ad_based.py | 72 +++++++++++++++++- .../anomalypredictors/tsf/mini_batch_based.py | 53 +++++++++++-- 3 files changed, 173 insertions(+), 28 deletions(-) diff --git a/doc/rtd/content/99_appendices/appendix2/sub/pool/mlpro_oa/streams/tasks/anomaly_predictors/images/MLPro-OA_Anomaly_Predictors_class_diagram.drawio b/doc/rtd/content/99_appendices/appendix2/sub/pool/mlpro_oa/streams/tasks/anomaly_predictors/images/MLPro-OA_Anomaly_Predictors_class_diagram.drawio index 3be8df8d5..d50940be7 100644 --- a/doc/rtd/content/99_appendices/appendix2/sub/pool/mlpro_oa/streams/tasks/anomaly_predictors/images/MLPro-OA_Anomaly_Predictors_class_diagram.drawio +++ b/doc/rtd/content/99_appendices/appendix2/sub/pool/mlpro_oa/streams/tasks/anomaly_predictors/images/MLPro-OA_Anomaly_Predictors_class_diagram.drawio @@ -1,6 +1,6 @@ - + - + @@ -54,25 +54,25 @@ - - + + - + - + - + - - + + - + @@ -140,33 +140,45 @@ - - + + - + + + + + + + + + + - + - - + + + + + - + - + - + @@ -229,6 +241,32 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/src/mlpro/oa/streams/tasks/anomalypredictors/tsf/ad_based.py b/src/mlpro/oa/streams/tasks/anomalypredictors/tsf/ad_based.py index 51d898498..5147956c4 100644 --- a/src/mlpro/oa/streams/tasks/anomalypredictors/tsf/ad_based.py +++ b/src/mlpro/oa/streams/tasks/anomalypredictors/tsf/ad_based.py @@ -7,25 +7,89 @@ ## -- yyyy-mm-dd Ver. Auth. Description ## -- 2024-06-04 0.0.0 DA/DS Creation ## -- 2024-08-23 0.1.0 DA/DS Creation +## -- 2024-09-27 0.2.0 DS Creation ## ------------------------------------------------------------------------------------------------- """ -Ver. 0.1.0 (2024-08-23) +Ver. 0.2.0 (2024-09-27) This module provides basic templates for online anomaly prediction in MLPro. """ +from mlpro.bf.ml import Log +from mlpro.bf.streams import Log, StreamTask +from mlpro.bf.various import Log from mlpro.oa.streams.tasks.anomalypredictors.tsf.basics import AnomalyPredictorTSF +from mlpro.oa.streams.tasks.anomalydetectors.basics import Anomaly ## ------------------------------------------------------------------------------------------------- ## ------------------------------------------------------------------------------------------------- -class AnomalyPredictorAD (AnomalyPredictorTSF): +class AnomalyPredictorAD (AnomalyPredictorTSF, Anomaly): """ - ... + Parameters + ----------- + p_name : str + Optional name of the task. Default is None. + p_range_max : int + Maximum range of asynchonicity. See class Range. Default is Range.C_RANGE_PROCESS. + p_ada : bool + Boolean switch for adaptivitiy. Default = True. + p_buffer_size : int, optional + + p_duplicate_data : bool, optional + If True, instances will be duplicated before processing. Default = False. + p_visualize : bool, optional + Boolean switch for visualisation. Default = False. + p_logging : int + Log level (see constants of class Log). Default: Log.C_LOG_ALL + p_kwargs : dict + Further optional named parameters. + """ + C_TYPE = 'Anomaly Predictor AD' - pass + +## ------------------------------------------------------------------------------------------------- + + def __init__(self, + p_name: str = None, + p_range_max=StreamTask.C_RANGE_THREAD, + p_ada: bool = True, + p_buffer_size: int = 0, + p_duplicate_data: bool = False, + p_visualize: bool = False, + p_logging=Log.C_LOG_ALL, + **p_kwargs): + + super().__init__(p_name, + p_range_max, + p_ada, + p_buffer_size, + p_duplicate_data, + p_visualize, + p_logging, + **p_kwargs) + + self.capture_anomalies = {} + + +## ------------------------------------------------------------------------------------------------- + + def get_anomaly(self, ad_anomaly): + """ + Process incoming anomaly data from the anomaly detector. + + parameters + ---------- + ad_anomaly + Anomaly data coming from the anomaly detector. + + """ + + self.ad_anomaly = ad_anomaly + self.captured_anomalies.append(ad_anomaly) + diff --git a/src/mlpro/oa/streams/tasks/anomalypredictors/tsf/mini_batch_based.py b/src/mlpro/oa/streams/tasks/anomalypredictors/tsf/mini_batch_based.py index 5bc8c7141..464a2934f 100644 --- a/src/mlpro/oa/streams/tasks/anomalypredictors/tsf/mini_batch_based.py +++ b/src/mlpro/oa/streams/tasks/anomalypredictors/tsf/mini_batch_based.py @@ -7,30 +7,73 @@ ## -- yyyy-mm-dd Ver. Auth. Description ## -- 2024-06-04 0.0.0 DA/DS Creation ## -- 2024-08-23 0.1.0 DA/DS Creation +## -- 2024-09-27 0.2.0 DS Creation ## ------------------------------------------------------------------------------------------------- """ -Ver. 0.1.0 (2024-08-23) +Ver. 0.2.0 (2024-09-27) -This module provides basic templates for online anomaly prediction in MLPro. +This module provides template for managing mini-batches for time series forcasting tasks in MLPro. """ from mlpro.oa.streams.tasks.anomalypredictors.tsf.basics import OATimeSeriesForcaster - +from mlpro.oa.streams.tasks.anomalydetectors.basics import AnomalyDetector ## ------------------------------------------------------------------------------------------------- ## ------------------------------------------------------------------------------------------------- -class MiniBatchManager: +class MiniBatchManager(AnomalyDetector): """ + This module implements a template for managing mini-batches for time series forcasting tasks in MLPro. + + Parameters + ---------- + data + Time series data to be split into batches. + batch_size + Size of a mini-batch. + """ +## -------------------------------------------------------------------------------------------------- + def __init__(self, mb_data, mb_batch_size ): + + self.mb_data = mb_data # dictionary contains anomaly data/ detector output + self.mb_batch_size = mb_batch_size #good default for batch size is 32. + self.mb_batches = self.create_mini_batches() + self.mb_current_batch = 0 - pass + +## -------------------------------------------------------------------------------------------------- + def create_mini_batches(self): + """ + Method to be used to create mini_batches from the data. + Parameters + ---------- + """ + n_data_points = len(self.mb_data) + mini_batches = [self.mb_data[i:i + self.mb_batch_size] for i in range(0, n_data_points, self.mb_batch_size)] + return mini_batches + + +## ------------------------------------------------------------------------------------------------- + def get_batch(self): + """ + Method to be used to get the next mini batch for processing. + + Parameters + ---------- + """ + if self.mb_current_batch < len(self.mb_batches): + batch = self.mb_batches[self.mb_current_batch] + self.mb_current_batch += 1 + return batch + else: + raise StopIteration ## -------------------------------------------------------------------------------------------------