diff --git a/examples/digital_fingerprinting/production/morpheus/dfp/modules/__init__.py b/examples/digital_fingerprinting/production/morpheus/dfp/modules/__init__.py index fd5169d061..549cf4c680 100644 --- a/examples/digital_fingerprinting/production/morpheus/dfp/modules/__init__.py +++ b/examples/digital_fingerprinting/production/morpheus/dfp/modules/__init__.py @@ -17,17 +17,17 @@ # When segment modules are imported, they're added to the module registry. # To avoid flake8 warnings about unused code, the noqa flag is used during import. -from dfp.modules import dfp_monitor -from dfp.modules import dfp_split_users from dfp.modules import dfp_data_prep +from dfp.modules import dfp_deployment from dfp.modules import dfp_inference +from dfp.modules import dfp_inference_pipe +from dfp.modules import dfp_monitor from dfp.modules import dfp_postprocessing from dfp.modules import dfp_preproc from dfp.modules import dfp_rolling_window +from dfp.modules import dfp_split_users from dfp.modules import dfp_training -from dfp.modules import dfp_inference_pipe from dfp.modules import dfp_training_pipe -from dfp.modules import dfp_deployment __all__ = [ "dfp_monitor", diff --git a/examples/digital_fingerprinting/production/morpheus/dfp/modules/dfp_inference.py b/examples/digital_fingerprinting/production/morpheus/dfp/modules/dfp_inference.py index 48f8e41382..8fa9ce97de 100644 --- a/examples/digital_fingerprinting/production/morpheus/dfp/modules/dfp_inference.py +++ b/examples/digital_fingerprinting/production/morpheus/dfp/modules/dfp_inference.py @@ -64,15 +64,23 @@ def dfp_inference(builder: mrc.Builder): model_name_formatter = config.get("model_name_formatter", None) fallback_user = config.get("fallback_username", "generic_user") - + model_fetch_timeout = config.get("model_fetch_timeout", 1.0) timestamp_column_name = config.get("timestamp_column_name", "timestamp") client = MlflowClient() - model_manager = ModelManager(model_name_formatter=model_name_formatter) + + model_manager = None def get_model(user: str) -> ModelCache: + nonlocal model_manager + + if not model_manager: + model_manager = ModelManager(model_name_formatter=model_name_formatter) - return model_manager.load_user_model(client, user_id=user, fallback_user_ids=[fallback_user]) + return model_manager.load_user_model(client, + user_id=user, + fallback_user_ids=[fallback_user], + timeout=model_fetch_timeout) def process_task(control_message: ControlMessage): start_time = time.time() diff --git a/examples/digital_fingerprinting/production/morpheus/dfp/modules/dfp_monitor.py b/examples/digital_fingerprinting/production/morpheus/dfp/modules/dfp_monitor.py index 5f70a92695..7706af78c3 100644 --- a/examples/digital_fingerprinting/production/morpheus/dfp/modules/dfp_monitor.py +++ b/examples/digital_fingerprinting/production/morpheus/dfp/modules/dfp_monitor.py @@ -21,9 +21,9 @@ from mrc.core import operators as ops from tqdm import tqdm +from morpheus.controllers.monitor_controller import MonitorController from morpheus.utils.module_ids import MORPHEUS_MODULE_NAMESPACE from morpheus.utils.module_utils import register_module -from morpheus.utils.monitor_utils import MonitorController from morpheus.utils.monitor_utils import MorpheusTqdm from morpheus.utils.monitor_utils import SilentMorpheusTqdm diff --git a/examples/digital_fingerprinting/production/morpheus/dfp/modules/dfp_training.py b/examples/digital_fingerprinting/production/morpheus/dfp/modules/dfp_training.py index ec8ff30db5..aec5f9a2dc 100644 --- a/examples/digital_fingerprinting/production/morpheus/dfp/modules/dfp_training.py +++ b/examples/digital_fingerprinting/production/morpheus/dfp/modules/dfp_training.py @@ -16,6 +16,7 @@ import mrc from mrc.core import operators as ops +from sklearn.model_selection import train_test_split import cudf @@ -87,8 +88,16 @@ def on_data(control_message: ControlMessage): # Only train on the feature columns train_df = final_df[final_df.columns.intersection(feature_columns)] + validation_df = None + run_validation = False + + # Split into training and validation sets + if validation_size > 0.0: + train_df, validation_df = train_test_split(train_df, test_size=validation_size, shuffle=False) + run_validation = True + logger.debug("Training AE model for user: '%s'...", user_id) - model.fit(train_df, epochs=epochs) + model.fit(train_df, epochs=epochs, val_data=validation_df, run_validation=run_validation) logger.debug("Training AE model for user: '%s'... Complete.", user_id) dfp_mm = DFPMessageMeta(cudf.from_pandas(final_df), user_id=user_id) diff --git a/examples/digital_fingerprinting/production/morpheus/dfp/stages/dfp_file_batcher_stage.py b/examples/digital_fingerprinting/production/morpheus/dfp/stages/dfp_file_batcher_stage.py index 271acc4833..7a9eee94af 100644 --- a/examples/digital_fingerprinting/production/morpheus/dfp/stages/dfp_file_batcher_stage.py +++ b/examples/digital_fingerprinting/production/morpheus/dfp/stages/dfp_file_batcher_stage.py @@ -46,7 +46,7 @@ class DFPFileBatcherStage(SinglePortStage): Parameters ---------- - c : `morpheus.config.Config` + config : `morpheus.config.Config` Pipeline configuration instance. date_conversion_func : callable A function that takes a file object and returns a `datetime` object representing the date of the file. @@ -69,14 +69,14 @@ class DFPFileBatcherStage(SinglePortStage): """ def __init__(self, - c: Config, + config: Config, date_conversion_func: typing.Callable[[fsspec.core.OpenFile], datetime], period: str = "D", sampling_rate_s: typing.Optional[int] = None, start_time: datetime = None, end_time: datetime = None, sampling: typing.Union[str, float, int, None] = None): - super().__init__(c) + super().__init__(config) self._date_conversion_func = date_conversion_func self._period = period diff --git a/examples/digital_fingerprinting/production/morpheus/dfp/stages/dfp_file_to_df.py b/examples/digital_fingerprinting/production/morpheus/dfp/stages/dfp_file_to_df.py index ec0ac35a09..a8c37ae9b6 100644 --- a/examples/digital_fingerprinting/production/morpheus/dfp/stages/dfp_file_to_df.py +++ b/examples/digital_fingerprinting/production/morpheus/dfp/stages/dfp_file_to_df.py @@ -13,62 +13,24 @@ # limitations under the License. """Stage for converting fsspec file objects to a DataFrame.""" -import hashlib -import json import logging -import os -import time import typing -from functools import partial -import fsspec import mrc import pandas as pd from mrc.core import operators as ops from morpheus.common import FileTypes from morpheus.config import Config -from morpheus.io.deserializers import read_file_to_df +from morpheus.controllers.file_to_df_controller import FileToDFController from morpheus.pipeline.preallocator_mixin import PreallocatorMixin from morpheus.pipeline.single_port_stage import SinglePortStage from morpheus.pipeline.stream_pair import StreamPair from morpheus.utils.column_info import DataFrameInputSchema -from morpheus.utils.column_info import process_dataframe -from morpheus.utils.downloader import Downloader logger = logging.getLogger(f"morpheus.{__name__}") -def _single_object_to_dataframe(file_object: fsspec.core.OpenFile, - schema: DataFrameInputSchema, - file_type: FileTypes, - filter_null: bool, - parser_kwargs: dict) -> pd.DataFrame: - retries = 0 - df = None - while (retries < 2): - try: - with file_object as f: - df = read_file_to_df(f, - file_type, - filter_nulls=filter_null, - df_type="pandas", - parser_kwargs=parser_kwargs) - - break - except Exception as e: - if (retries < 2): - logger.warning("Error fetching %s: %s\nRetrying...", file_object, e) - retries += 1 - - # Optimistaclly prep the dataframe (Not necessary since this will happen again in process_dataframe, but it - # increases performance significantly) - if (schema.prep_dataframe is not None): - df = schema.prep_dataframe(df) - - return df - - class DFPFileToDataFrameStage(PreallocatorMixin, SinglePortStage): """ Stage for converting fsspec file objects to a DataFrame, pre-processing the DataFrame according to `schema`, and @@ -102,14 +64,12 @@ def __init__(self, cache_dir: str = "./.cache/dfp"): super().__init__(config) - self._schema = schema - - self._file_type = file_type - self._filter_null = filter_null - self._parser_kwargs = {} if parser_kwargs is None else parser_kwargs - self._cache_dir = os.path.join(cache_dir, "file_cache") - - self._downloader = Downloader() + self._controller = FileToDFController(schema=schema, + filter_null=filter_null, + file_type=file_type, + parser_kwargs=parser_kwargs, + cache_dir=cache_dir, + timestamp_column_name=config.ae.timestamp_column_name) @property def name(self) -> str: @@ -124,103 +84,10 @@ def accepted_types(self) -> typing.Tuple: """Accepted input types.""" return (typing.Any, ) - def _get_or_create_dataframe_from_batch( - self, file_object_batch: typing.Tuple[fsspec.core.OpenFiles, int]) -> typing.Tuple[pd.DataFrame, bool]: - - if (not file_object_batch): - raise RuntimeError("No file objects to process") - - file_list = file_object_batch[0] - batch_count = file_object_batch[1] - - file_system: fsspec.AbstractFileSystem = file_list.fs - - # Create a list of dictionaries that only contains the information we are interested in hashing. `ukey` just - # hashes all the output of `info()` which is perfect - hash_data = [{"ukey": file_system.ukey(file_object.path)} for file_object in file_list] - - # Convert to base 64 encoding to remove - values - objects_hash_hex = hashlib.md5(json.dumps(hash_data, sort_keys=True).encode()).hexdigest() - - batch_cache_location = os.path.join(self._cache_dir, "batches", f"{objects_hash_hex}.pkl") - - # Return the cache if it exists - if (os.path.exists(batch_cache_location)): - output_df = pd.read_pickle(batch_cache_location) - output_df["batch_count"] = batch_count - output_df["origin_hash"] = objects_hash_hex - - return (output_df, True) - - # Cache miss - download_method = partial(_single_object_to_dataframe, - schema=self._schema, - file_type=self._file_type, - filter_null=self._filter_null, - parser_kwargs=self._parser_kwargs) - - download_buckets = file_list - - # Loop over dataframes and concat into one - try: - dfs = self._downloader.download(download_buckets, download_method) - except Exception: - logger.exception("Failed to download logs. Error: ", exc_info=True) - raise - - if (dfs is None or len(dfs) == 0): - raise ValueError("No logs were downloaded") - - output_df: pd.DataFrame = pd.concat(dfs) - output_df = process_dataframe(df_in=output_df, input_schema=self._schema) - - # Finally sort by timestamp and then reset the index - output_df.sort_values(by=[self._config.ae.timestamp_column_name], inplace=True) - - output_df.reset_index(drop=True, inplace=True) - - # Save dataframe to cache future runs - os.makedirs(os.path.dirname(batch_cache_location), exist_ok=True) - - try: - output_df.to_pickle(batch_cache_location) - except Exception: - logger.warning("Failed to save batch cache. Skipping cache for this batch.", exc_info=True) - - output_df["batch_count"] = batch_count - output_df["origin_hash"] = objects_hash_hex - - return (output_df, False) - - def convert_to_dataframe(self, fsspec_batch: typing.Tuple[fsspec.core.OpenFiles, int]): - """Converts a batch of fsspec objects to a DataFrame.""" - if (not fsspec_batch): - return None - - start_time = time.time() - - try: - - output_df, cache_hit = self._get_or_create_dataframe_from_batch(fsspec_batch) - - duration = (time.time() - start_time) * 1000.0 - - if (output_df is not None and logger.isEnabledFor(logging.DEBUG)): - logger.debug("fsspec objects to DF complete. Rows: %s, Cache: %s, Duration: %s ms, Rate: %s rows/s", - len(output_df), - "hit" if cache_hit else "miss", - duration, - len(output_df) / (duration / 1000.0)) - - return output_df - except Exception: - logger.exception("Error while converting fsspec batch to DF.") - raise - def _build_single(self, builder: mrc.Builder, input_stream: StreamPair) -> StreamPair: stream = builder.make_node(self.unique_name, - ops.map(self.convert_to_dataframe), - ops.on_completed(self._downloader.close)) + ops.map(self._controller.convert_to_dataframe), + ops.on_completed(self._controller.close)) builder.make_edge(input_stream[0], stream) return stream, pd.DataFrame diff --git a/examples/digital_fingerprinting/production/morpheus/dfp/stages/dfp_mlflow_model_writer.py b/examples/digital_fingerprinting/production/morpheus/dfp/stages/dfp_mlflow_model_writer.py index 240a329065..3daba9b6c2 100644 --- a/examples/digital_fingerprinting/production/morpheus/dfp/stages/dfp_mlflow_model_writer.py +++ b/examples/digital_fingerprinting/production/morpheus/dfp/stages/dfp_mlflow_model_writer.py @@ -13,35 +13,18 @@ # limitations under the License. """Publishes models into MLflow""" -import hashlib import logging -import os import typing -import urllib.parse -import mlflow import mrc -import requests -from mlflow.exceptions import MlflowException -from mlflow.models.signature import ModelSignature -from mlflow.protos.databricks_pb2 import RESOURCE_ALREADY_EXISTS -from mlflow.protos.databricks_pb2 import ErrorCode -from mlflow.store.artifact.runs_artifact_repo import RunsArtifactRepository -from mlflow.tracking import MlflowClient -from mlflow.types import ColSpec -from mlflow.types import Schema -from mlflow.types.utils import _infer_pandas_column -from mlflow.types.utils import _infer_schema from mrc.core import operators as ops from morpheus.config import Config +from morpheus.controllers.mlflow_model_writer_controller import MLFlowModelWriterController from morpheus.messages.multi_ae_message import MultiAEMessage -from morpheus.models.dfencoder import AutoEncoder from morpheus.pipeline.single_port_stage import SinglePortStage from morpheus.pipeline.stream_pair import StreamPair -from ..utils.model_cache import user_to_model_name - # Setup conda environment conda_env = { 'channels': ['defaults', 'conda-forge'], @@ -70,18 +53,24 @@ class DFPMLFlowModelWriterStage(SinglePortStage): the field names have been applied. databricks_permissions : dict, optional When not `None` sets permissions needed when using a databricks hosted MLflow server. + timeout : float, optional + Timeout for get requests. """ def __init__(self, c: Config, model_name_formatter: str = "dfp-{user_id}", experiment_name_formatter: str = "/dfp-models/{reg_model_name}", - databricks_permissions: dict = None): + databricks_permissions: dict = None, + timeout=1.0): super().__init__(c) - self._model_name_formatter = model_name_formatter - self._experiment_name_formatter = experiment_name_formatter - self._databricks_permissions = databricks_permissions + self._controller = MLFlowModelWriterController(model_name_formatter=model_name_formatter, + experiment_name_formatter=experiment_name_formatter, + databricks_permissions=databricks_permissions, + conda_env=conda_env, + timeout=timeout, + timestamp_column_name=c.ae.timestamp_column_name) @property def name(self) -> str: @@ -96,178 +85,8 @@ def accepted_types(self) -> typing.Tuple: """Types accepted by this stage""" return (MultiAEMessage, ) - def user_id_to_model(self, user_id: str) -> str: - """Converts a user ID to a model name""" - return user_to_model_name(user_id=user_id, model_name_formatter=self._model_name_formatter) - - def user_id_to_experiment(self, user_id: str) -> str: - """Converts a user ID to an experiment name""" - kwargs = { - "user_id": user_id, - "user_md5": hashlib.md5(user_id.encode('utf-8')).hexdigest(), - "reg_model_name": self.user_id_to_model(user_id=user_id) - } - - return self._experiment_name_formatter.format(**kwargs) - - def _apply_model_permissions(self, reg_model_name: str): - - # Check the required variables - databricks_host = os.environ.get("DATABRICKS_HOST", None) - databricks_token = os.environ.get("DATABRICKS_TOKEN", None) - - if (databricks_host is None or databricks_token is None): - raise RuntimeError("Cannot set Databricks model permissions. " - "Environment variables `DATABRICKS_HOST` and `DATABRICKS_TOKEN` must be set") - - headers = {"Authorization": f"Bearer {databricks_token}"} - - url_base = f"{databricks_host}" - - try: - # First get the registered model ID - get_registered_model_url = urllib.parse.urljoin(url_base, - "/api/2.0/mlflow/databricks/registered-models/get") - - get_registered_model_response = requests.get(url=get_registered_model_url, - headers=headers, - params={"name": reg_model_name}, - timeout=10) - - registered_model_response = get_registered_model_response.json() - - reg_model_id = registered_model_response["registered_model_databricks"]["id"] - - # Now apply the permissions. If it exists already, it will be overwritten or it is a no-op - patch_registered_model_permissions_url = urllib.parse.urljoin( - url_base, f"/api/2.0/preview/permissions/registered-models/{reg_model_id}") - - patch_registered_model_permissions_body = { - "access_control_list": [{ - "group_name": group, "permission_level": permission - } for group, - permission in self._databricks_permissions.items()] - } - - requests.patch(url=patch_registered_model_permissions_url, - headers=headers, - json=patch_registered_model_permissions_body, - timeout=10) - - except Exception: - logger.exception("Error occurred trying to apply model permissions to model: %s", - reg_model_name, - exc_info=True) - - def on_data(self, message: MultiAEMessage): - """Stores incoming models into MLflow.""" - user = message.meta.user_id - - model: AutoEncoder = message.model - - model_path = "dfencoder" - reg_model_name = self.user_id_to_model(user_id=user) - - # Write to ML Flow - try: - mlflow.end_run() - - experiment_name = self.user_id_to_experiment(user_id=user) - - # Creates a new experiment if it doesn't exist - experiment = mlflow.set_experiment(experiment_name) - - with mlflow.start_run(run_name="autoencoder model training run", - experiment_id=experiment.experiment_id) as run: - - model_path = f"{model_path}-{run.info.run_uuid}" - - # Log all params in one dict to avoid round trips - mlflow.log_params({ - "Algorithm": "Denosing Autoencoder", - "Epochs": model.lr_decay.state_dict().get("last_epoch", "unknown"), - "Learning rate": model.lr, - "Batch size": model.batch_size, - "Start Epoch": message.get_meta(self._config.ae.timestamp_column_name).min(), - "End Epoch": message.get_meta(self._config.ae.timestamp_column_name).max(), - "Log Count": message.mess_count, - }) - - metrics_dict: typing.Dict[str, float] = {} - - # Add info on the embeddings - for key, value in model.categorical_fts.items(): - embedding = value.get("embedding", None) - - if (embedding is None): - continue - - metrics_dict[f"embedding-{key}-num_embeddings"] = embedding.num_embeddings - metrics_dict[f"embedding-{key}-embedding_dim"] = embedding.embedding_dim - - mlflow.log_metrics(metrics_dict) - - # Use the prepare_df function to setup the direct inputs to the model. Only include features returned by - # prepare_df to show the actual inputs to the model (any extra are discarded) - input_df = message.get_meta().iloc[0:1] - prepared_df = model.prepare_df(input_df) - output_values = model.get_anomaly_score(input_df) - - input_schema = Schema([ - ColSpec(type=_infer_pandas_column(input_df[col_name]), name=col_name) - for col_name in list(prepared_df.columns) - ]) - output_schema = _infer_schema(output_values) - - model_sig = ModelSignature(inputs=input_schema, outputs=output_schema) - - model_info = mlflow.pytorch.log_model( - pytorch_model=model, - artifact_path=model_path, - conda_env=conda_env, - signature=model_sig, - ) - - client = MlflowClient() - - # First ensure a registered model has been created - try: - create_model_response = client.create_registered_model(reg_model_name) - logger.debug("Successfully registered model '%s'.", create_model_response.name) - except MlflowException as e: - if e.error_code == ErrorCode.Name(RESOURCE_ALREADY_EXISTS): - pass - else: - raise e - - # If we are using databricks, make sure we set the correct permissions - if (self._databricks_permissions is not None and mlflow.get_tracking_uri() == "databricks"): - # Need to apply permissions - self._apply_model_permissions(reg_model_name=reg_model_name) - - model_src = RunsArtifactRepository.get_underlying_uri(model_info.model_uri) - - tags = { - "start": message.get_meta(self._config.ae.timestamp_column_name).min(), - "end": message.get_meta(self._config.ae.timestamp_column_name).max(), - "count": message.get_meta(self._config.ae.timestamp_column_name).count() - } - - # Now create the model version - model_version = client.create_model_version(name=reg_model_name, - source=model_src, - run_id=run.info.run_id, - tags=tags) - - logger.debug("ML Flow model upload complete: %s:%s:%s", user, reg_model_name, model_version.version) - - except Exception: - logger.exception("Error uploading model to ML Flow", exc_info=True) - - return message - def _build_single(self, builder: mrc.Builder, input_stream: StreamPair) -> StreamPair: - stream = builder.make_node(self.unique_name, ops.map(self.on_data)) + stream = builder.make_node(self.unique_name, ops.map(self._controller.on_data)) builder.make_edge(input_stream[0], stream) return stream, MultiAEMessage diff --git a/examples/digital_fingerprinting/production/morpheus/dfp/utils/dfp_arg_parser.py b/examples/digital_fingerprinting/production/morpheus/dfp/utils/dfp_arg_parser.py index 6bf71a0a3d..4b807443ad 100644 --- a/examples/digital_fingerprinting/production/morpheus/dfp/utils/dfp_arg_parser.py +++ b/examples/digital_fingerprinting/production/morpheus/dfp/utils/dfp_arg_parser.py @@ -24,7 +24,7 @@ from morpheus.utils.logger import configure_logging -logger = logging.getLogger(__name__) +logger = logging.getLogger(f"morpheus.{__name__}") @dataclass @@ -95,6 +95,7 @@ def time_fields(self): def silence_monitors(self): return self._silence_monitors + @property @verify_init def include_generic(self): return self._include_generic diff --git a/examples/digital_fingerprinting/production/morpheus/dfp/utils/model_cache.py b/examples/digital_fingerprinting/production/morpheus/dfp/utils/model_cache.py index 2a0da79752..ffc5304e5b 100644 --- a/examples/digital_fingerprinting/production/morpheus/dfp/utils/model_cache.py +++ b/examples/digital_fingerprinting/production/morpheus/dfp/utils/model_cache.py @@ -131,13 +131,13 @@ def __init__(self, manager: "ModelManager", user_id: str, fallback_user_ids: typ self._lock = threading.RLock() self._child_user_model_cache: UserModelMap = None - def load_model_cache(self, client) -> ModelCache: + def load_model_cache(self, client, timeout: float = 1.0) -> ModelCache: now = datetime.now() # Lock to prevent additional access try: - with timed_acquire(self._lock, timeout=1.0): + with timed_acquire(self._lock, timeout=timeout): # Check if we have checked before or if we need to check again if (self._last_checked is None or (now - self._last_checked).seconds < self._manager.cache_timeout_sec): @@ -146,22 +146,26 @@ def load_model_cache(self, client) -> ModelCache: self._last_checked = now # Try to load from the manager - model_cache = self._manager.load_model_cache(client=client, reg_model_name=self._reg_model_name) + model_cache = self._manager.load_model_cache(client=client, + reg_model_name=self._reg_model_name, + timeout=timeout) # If we have a hit, there is nothing else to do if (model_cache is None and len(self._fallback_user_ids) > 0): # Our model does not exist, use fallback self._child_user_model_cache = self._manager.load_user_model_cache( - self._fallback_user_ids[0], fallback_user_ids=self._fallback_user_ids[1:]) + self._fallback_user_ids[0], timeout, fallback_user_ids=self._fallback_user_ids[1:]) else: return model_cache # See if we have a child cache and use that if (self._child_user_model_cache is not None): - return self._child_user_model_cache.load_model_cache(client=client) + return self._child_user_model_cache.load_model_cache(client=client, timeout=timeout) # Otherwise load the model - model_cache = self._manager.load_model_cache(client=client, reg_model_name=self._reg_model_name) + model_cache = self._manager.load_model_cache(client=client, + reg_model_name=self._reg_model_name, + timeout=timeout) if (model_cache is None): raise RuntimeError(f"Model was found but now no longer exists. Model: {self._reg_model_name}") @@ -197,7 +201,7 @@ def __init__(self, model_name_formatter: str) -> None: def cache_timeout_sec(self): return self._cache_timeout_sec - def _model_exists(self, reg_model_name: str) -> bool: + def _model_exists(self, reg_model_name: str, timeout: float = 1.0) -> bool: now = datetime.now() @@ -205,7 +209,7 @@ def _model_exists(self, reg_model_name: str) -> bool: if ((now - self._existing_models_updated).seconds > self._cache_timeout_sec): try: - with timed_acquire(self._model_cache_lock, timeout=1.0): + with timed_acquire(self._model_cache_lock, timeout=timeout): logger.debug("Updating list of available models...") client = MlflowClient() @@ -241,22 +245,28 @@ def _model_exists(self, reg_model_name: str) -> bool: def user_id_to_model(self, user_id: str): return user_to_model_name(user_id=user_id, model_name_formatter=self._model_name_formatter) - def load_user_model(self, client, user_id: str, fallback_user_ids: typing.List[str] = None) -> ModelCache: + def load_user_model(self, + client, + user_id: str, + fallback_user_ids: typing.List[str], + timeout: float = 1.0) -> ModelCache: - if (fallback_user_ids is None): + if fallback_user_ids is None: fallback_user_ids = [] # First get the UserModel - user_model_cache = self.load_user_model_cache(user_id=user_id, fallback_user_ids=fallback_user_ids) + user_model_cache = self.load_user_model_cache(user_id=user_id, + timeout=timeout, + fallback_user_ids=fallback_user_ids) - return user_model_cache.load_model_cache(client=client) + return user_model_cache.load_model_cache(client=client, timeout=timeout) - def load_model_cache(self, client: MlflowClient, reg_model_name: str) -> ModelCache: + def load_model_cache(self, client: MlflowClient, reg_model_name: str, timeout: float = 1.0) -> ModelCache: now = datetime.now() try: - with timed_acquire(self._model_cache_lock, timeout=1.0): + with timed_acquire(self._model_cache_lock, timeout=timeout): model_cache = self._model_cache.get(reg_model_name, None) @@ -267,7 +277,7 @@ def load_model_cache(self, client: MlflowClient, reg_model_name: str) -> ModelCa # Cache miss. Try to check for a model try: - if (not self._model_exists(reg_model_name)): + if (not self._model_exists(reg_model_name, timeout)): # Break early return None @@ -323,12 +333,10 @@ def load_model_cache(self, client: MlflowClient, reg_model_name: str) -> ModelCa logger.error("Deadlock when trying to acquire model cache lock") raise RuntimeError("Deadlock when trying to acquire model cache lock") from e - def load_user_model_cache(self, user_id: str, fallback_user_ids: typing.List[str] = None) -> UserModelMap: - if (fallback_user_ids is None): - fallback_user_ids = [] + def load_user_model_cache(self, user_id: str, timeout: float, fallback_user_ids: typing.List[str]) -> UserModelMap: try: - with timed_acquire(self._user_model_cache_lock, timeout=1.0): + with timed_acquire(self._user_model_cache_lock, timeout=timeout): if (user_id not in self._user_model_cache): self._user_model_cache[user_id] = UserModelMap(manager=self, diff --git a/examples/ransomware_detection/common/feature_extractor.py b/examples/ransomware_detection/common/feature_extractor.py index d8b579d128..46df5c9181 100644 --- a/examples/ransomware_detection/common/feature_extractor.py +++ b/examples/ransomware_detection/common/feature_extractor.py @@ -15,6 +15,7 @@ import typing import pandas as pd + from common.data_models import FeatureConfig from common.data_models import ProtectionData from common.feature_constants import FeatureConstants as fc @@ -110,59 +111,59 @@ def _extract_threadlist(self, x: pd.DataFrame): wait_reason_df = x[x.WaitReason == wait_reason] self._features['threadlist_df_wait_reason_' + wait_reason] = len(wait_reason_df) - def _extract_vad_cc(self, cc: pd.Series): + def _extract_vad_cc(self, commit_charge: pd.Series): """ This function extracts 'vad' specific commit charge features. """ - cc_size = len(cc) + cc_size = len(commit_charge) # Calculate mean, max, sum of commit charged of vad if cc_size: - self._features['get_commit_charge_mean_vad'] = cc.mean() - self._features['get_commit_charge_max_vad'] = cc.max() - self._features['get_commit_charge_sum_vad'] = cc.sum() + self._features['get_commit_charge_mean_vad'] = commit_charge.mean() + self._features['get_commit_charge_max_vad'] = commit_charge.max() + self._features['get_commit_charge_sum_vad'] = commit_charge.sum() - def _extract_cc(self, cc: pd.Series): + def _extract_cc(self, commit_charge: pd.Series): """ This function extracts commit charge features. """ - cc_size = len(cc) + cc_size = len(commit_charge) # Calculate mean, max, sum, len of the commit charged if cc_size: - self._features['get_commit_charge_mean'] = cc.mean() - self._features['get_commit_charge_max'] = cc.max() - self._features['get_commit_charge_sum'] = cc.sum() + self._features['get_commit_charge_mean'] = commit_charge.mean() + self._features['get_commit_charge_max'] = commit_charge.max() + self._features['get_commit_charge_sum'] = commit_charge.sum() self._features['get_commit_charge_len'] = cc_size - def _extract_vads_cc(self, cc: pd.Series, vads_cc: pd.Series): + def _extract_vads_cc(self, commit_charge: pd.Series, vads_cc: pd.Series): """ This function extracts 'vads' commit charge features. """ - cc_size = len(cc) + cc_size = len(commit_charge) # Calculate min of commit charged of vads if cc_size: - self._features['get_commit_charge_min_vads'] = cc.min() + self._features['get_commit_charge_min_vads'] = commit_charge.min() # Calculate the amount of entire memory commit charged of vads - cc = vads_cc[vads_cc == fc.FULL_MEMORY_ADDRESS] - self._features['count_entire_commit_charge_vads'] = len(cc) + commit_charge = vads_cc[vads_cc == fc.FULL_MEMORY_ADDRESS] + self._features['count_entire_commit_charge_vads'] = len(commit_charge) - def _extract_cc_vad_page_noaccess(self, cc: pd.Series): + def _extract_cc_vad_page_noaccess(self, commit_charge: pd.Series): """ This function extracts 'vad' commit charge features specific to 'page_noaccess' protection. """ - cc = cc[cc < fc.FULL_MEMORY_ADDRESS] + commit_charge = commit_charge[commit_charge < fc.FULL_MEMORY_ADDRESS] # Calculate min and mean of commit charged of vad memory with PAGE_NOACCESS protection - if not cc.empty: - self._features['get_commit_charge_min_vad_page_noaccess'] = cc.min() - self._features['get_commit_charge_mean_vad_page_noaccess'] = cc.mean() + if not commit_charge.empty: + self._features['get_commit_charge_min_vad_page_noaccess'] = commit_charge.min() + self._features['get_commit_charge_mean_vad_page_noaccess'] = commit_charge.mean() def _extract_unique_file_extns(self, x: pd.DataFrame): """ @@ -210,20 +211,20 @@ def _extract_vadinfo(self, x: pd.DataFrame): self._features['ratio_private_memory'] = (vad_private_memory_len / vad_size) self._features['vad_ratio'] = (vadinfo_size / vad_size) - cc = x[x.CommitCharge < fc.FULL_MEMORY_ADDRESS].CommitCharge - self._extract_cc(cc) + commit_charge = x[x.CommitCharge < fc.FULL_MEMORY_ADDRESS].CommitCharge + self._extract_cc(commit_charge) # calculating the amount of commit charged of vad - cc = vad_cc[vad_cc < fc.FULL_MEMORY_ADDRESS] - self._extract_vad_cc(cc) + commit_charge = vad_cc[vad_cc < fc.FULL_MEMORY_ADDRESS] + self._extract_vad_cc(commit_charge) # Calculate the amount of commit charged of vads - cc = vads_cc[vads_cc < fc.FULL_MEMORY_ADDRESS] - self._extract_vads_cc(cc, vads_cc) + commit_charge = vads_cc[vads_cc < fc.FULL_MEMORY_ADDRESS] + self._extract_vads_cc(commit_charge, vads_cc) # calculating commit charged of memory with PAGE_NOACCESS protection - cc = x[(x.Protection == fc.PAGE_NOACCESS) & (x.Tag == fc.VAD)].CommitCharge - self._extract_cc_vad_page_noaccess(cc) + commit_charge = x[(x.Protection == fc.PAGE_NOACCESS) & (x.Tag == fc.VAD)].CommitCharge + self._extract_cc_vad_page_noaccess(commit_charge) self._extract_protections(x, vad_size, vadsinfo_size, vadinfo_size) @@ -240,15 +241,15 @@ def _get_protection_data(self, """ protection_df = x[x.Protection == protection] - cc = protection_df.CommitCharge - cc = cc[cc < fc.FULL_MEMORY_ADDRESS] + commit_charge = protection_df.CommitCharge + commit_charge = commit_charge[commit_charge < fc.FULL_MEMORY_ADDRESS] vads_protection_size = len(protection_df[protection_df.Tag == fc.VADS]) vad_protection_size = len(protection_df[protection_df.Tag == fc.VAD]) - commit_charge_size = len(cc) + commit_charge_size = len(commit_charge) protection_df_size = len(protection_df) protection_id = fc.PROTECTIONS[protection] - p_data = ProtectionData(cc, + p_data = ProtectionData(commit_charge, vads_protection_size, vad_protection_size, commit_charge_size, @@ -265,14 +266,14 @@ def _page_execute_readwrite(self, x: ProtectionData): This function extracts 'page_execute_readwrite' protection reelated features. """ - cc = x.commit_charges + commit_charge = x.commit_charges if x.commit_charge_size: - self._features['get_commit_charge_mean_page_execute_readwrite'] = cc.mean() - self._features['get_commit_charge_min_page_execute_readwrite'] = cc.min() - self._features['get_commit_charge_max_page_execute_readwrite'] = cc.max() - self._features['get_commit_charge_sum_page_execute_readwrite'] = cc.sum() - self._features['get_commit_charge_std_page_execute_readwrite'] = cc.std(ddof=0) + self._features['get_commit_charge_mean_page_execute_readwrite'] = commit_charge.mean() + self._features['get_commit_charge_min_page_execute_readwrite'] = commit_charge.min() + self._features['get_commit_charge_max_page_execute_readwrite'] = commit_charge.max() + self._features['get_commit_charge_sum_page_execute_readwrite'] = commit_charge.sum() + self._features['get_commit_charge_std_page_execute_readwrite'] = commit_charge.std(ddof=0) # Calculate amount and ratio of memory pages with 'PAGE_EXECUTE_READWRITE protection if x.protection_df_size: @@ -289,13 +290,13 @@ def _page_noaccess(self, x: ProtectionData): This function extracts 'page_noaccess' protection reelated features. """ - cc = x.commit_charges + commit_charge = x.commit_charges if x.commit_charge_size: - self._features['get_commit_charge_mean_page_no_access'] = cc.mean() - self._features['get_commit_charge_min_page_no_access'] = cc.min() - self._features['get_commit_charge_max_page_no_access'] = cc.max() - self._features['get_commit_charge_sum_page_no_access'] = cc.sum() + self._features['get_commit_charge_mean_page_no_access'] = commit_charge.mean() + self._features['get_commit_charge_min_page_no_access'] = commit_charge.min() + self._features['get_commit_charge_max_page_no_access'] = commit_charge.max() + self._features['get_commit_charge_sum_page_no_access'] = commit_charge.sum() # Calculate amount and ratio of memory pages with 'PAGE_NOACCESS' protection if x.protection_df_size: @@ -317,12 +318,12 @@ def _page_execute_writecopy(self, x: ProtectionData): This function extracts 'page_execute_writecopy' protection reelated features. """ - cc = x.commit_charges + commit_charge = x.commit_charges # Calculate min and sum of commit charged with memory pages with 'PAGE_EXECUTE_WRITECOPY' protection if x.commit_charge_size: - self._features['get_commit_charge_min_page_execute_writecopy'] = cc.min() - self._features['get_commit_charge_sum_page_execute_writecopy'] = cc.sum() + self._features['get_commit_charge_min_page_execute_writecopy'] = commit_charge.min() + self._features['get_commit_charge_sum_page_execute_writecopy'] = commit_charge.sum() # Calculate amount and ratio of vad memory pages with 'PAGE_EXECUTE_WRITECOPY' protection self._features['page_execute_writecopy_vad_count'] = x.vad_protection_size @@ -334,11 +335,11 @@ def _page_readonly(self, x: ProtectionData): This function extracts 'page_readonly' protection reelated features. """ - cc = x.commit_charges + commit_charge = x.commit_charges # Calculate mean of commit charged with memory pages with 'PAGE_READONLY' protection if x.commit_charge_size: - self._features['get_commit_charge_mean_page_readonly'] = cc.mean() + self._features['get_commit_charge_mean_page_readonly'] = commit_charge.mean() # Calculate amount and ratio of memory pages with 'PAGE_READONLY' protection if x.protection_df_size: @@ -380,7 +381,7 @@ def _extract_protections(self, x: pd.DataFrame, vadinfo_df_size: int, vadsinfo_s """ page_execute_writecopy_count = 0 - for protection in fc.PROTECTIONS.keys(): + for protection in fc.PROTECTIONS: p_data = self._get_protection_data(x, protection, vadinfo_df_size, vadsinfo_size, vadinfo_size) @@ -422,16 +423,16 @@ def _extract_handle_types(self, x: pd.DataFrame): """ # Get count and ratio for the handles by their type. - for t in (fc.HANDLES_TYPES + fc.HANDLES_TYPES_2): + for h_type in (fc.HANDLES_TYPES + fc.HANDLES_TYPES_2): - df = x[x.Type == t[0]] + df = x[x.Type == h_type[0]] df_len = len(df) - if t in fc.HANDLES_TYPES: - col = 'handles_df_' + t[1] + '_count' + if h_type in fc.HANDLES_TYPES: + col = 'handles_df_' + h_type[1] + '_count' self._features[col] = df_len - col = 'handles_df_' + t[1] + '_ratio' + col = 'handles_df_' + h_type[1] + '_ratio' self._features[col] = df_len / (self._features['handles_df_count'] + 1) def _extract_file_handle_dirs(self, file_paths: pd.Series): @@ -559,7 +560,7 @@ def extract_features(self, x: pd.DataFrame, feas_all_zeros: typing.Dict[str, int handles_df = fltr_plugin_dict['handles'] except KeyError as e: - raise KeyError('Missing required plugins: %s' % (e)) + raise KeyError(f'Missing required plugins: {e}') from e # Envars plugin features displays a process's environment variables. # Typically this will show the number of CPUs installed and the hardware architecture, diff --git a/morpheus/controllers/file_to_df_controller.py b/morpheus/controllers/file_to_df_controller.py new file mode 100644 index 0000000000..2839f4e3c2 --- /dev/null +++ b/morpheus/controllers/file_to_df_controller.py @@ -0,0 +1,237 @@ +# Copyright (c) 2022-2023, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Morpheus pipeline module for fetching files and emitting them as DataFrames.""" + +import hashlib +import json +import logging +import os +import time +import typing +from functools import partial + +import fsspec +import pandas as pd + +import cudf + +from morpheus.common import FileTypes +from morpheus.io.deserializers import read_file_to_df +from morpheus.utils.column_info import DataFrameInputSchema +from morpheus.utils.column_info import PreparedDFInfo +from morpheus.utils.column_info import process_dataframe +from morpheus.utils.downloader import Downloader + +logger = logging.getLogger(__name__) + + +def single_object_to_dataframe(file_object: fsspec.core.OpenFile, + schema: DataFrameInputSchema, + file_type: FileTypes, + filter_null: bool, + parser_kwargs: dict) -> pd.DataFrame: + """ + Converts a file object into a Pandas DataFrame with optional preprocessing. + + Parameters + ---------- + file_object : `fsspec.core.OpenFile` + A file object, typically from a remote storage system. + schema : `morpheus.utils.column_info.DataFrameInputSchema` + A schema defining how to process the data. + file_type : `morpheus.common.FileTypes` + The type of the file being processed (e.g., CSV, Parquet). + filter_null : bool + Flag to indicate whether to filter out null values. + parser_kwargs : dict + Additional keyword arguments to pass to the file parser. + + Returns + ------- + pd.DataFrame: The resulting Pandas DataFrame after processing and optional preprocessing. + """ + + retries = 0 + df = None + while (retries < 2): + try: + with file_object as f: + df = read_file_to_df(f, + file_type, + filter_nulls=filter_null, + df_type="pandas", + parser_kwargs=parser_kwargs) + + break + except Exception as e: + if (retries < 2): + logger.warning("Error fetching %s: %s\nRetrying...", file_object, e) + retries += 1 + + # Optimistaclly prep the dataframe (Not necessary since this will happen again in process_dataframe, but it + # increases performance significantly) + if (schema.prep_dataframe is not None): + prepared_df_info: PreparedDFInfo = schema.prep_dataframe(df) + + return prepared_df_info.df + + +class FileToDFController: + """ + Controller class for converting file objects to Pandas DataFrames with optional preprocessing. + + Parameters + ---------- + schema : DataFrameInputSchema + A schema defining how to process the data. + filter_null : bool + Flag to indicate whether to filter out null values. + file_type : FileTypes + The type of the file being processed (e.g., CSV, Parquet). + parser_kwargs : dict + Additional keyword arguments to pass to the file parser. + cache_dir : str + Directory where cache will be stored. + timestamp_column_name : str + Name of the timestamp column. + """ + + def __init__(self, + schema: DataFrameInputSchema, + filter_null: bool, + file_type: FileTypes, + parser_kwargs: dict, + cache_dir: str, + timestamp_column_name: str): + + self._schema = schema + self._file_type = file_type + self._filter_null = filter_null + self._parser_kwargs = {} if parser_kwargs is None else parser_kwargs + self._cache_dir = os.path.join(cache_dir, "file_cache") + self._timestamp_column_name = timestamp_column_name + + self._downloader = Downloader() + + def _get_or_create_dataframe_from_batch( + self, file_object_batch: typing.Tuple[fsspec.core.OpenFiles, int]) -> typing.Tuple[cudf.DataFrame, bool]: + + if (not file_object_batch): + raise RuntimeError("No file objects to process") + + file_list = file_object_batch[0] + batch_count = file_object_batch[1] + + file_system: fsspec.AbstractFileSystem = file_list.fs + + # Create a list of dictionaries that only contains the information we are interested in hashing. `ukey` just + # hashes all of the output of `info()` which is perfect + hash_data = [{"ukey": file_system.ukey(file_object.path)} for file_object in file_list] + + # Convert to base 64 encoding to remove - values + objects_hash_hex = hashlib.md5(json.dumps(hash_data, sort_keys=True).encode()).hexdigest() + + batch_cache_location = os.path.join(self._cache_dir, "batches", f"{objects_hash_hex}.pkl") + + # Return the cache if it exists + if (os.path.exists(batch_cache_location)): + output_df = pd.read_pickle(batch_cache_location) + output_df["batch_count"] = batch_count + output_df["origin_hash"] = objects_hash_hex + + return (output_df, True) + + # Cache miss + download_method_func = partial(single_object_to_dataframe, + file_type=self._file_type, + schema=self._schema, + filter_null=self._filter_null, + parser_kwargs=self._parser_kwargs) + + download_buckets = file_list + + # Loop over dataframes and concat into one + try: + dfs = self._downloader.download(download_buckets, download_method_func) + except Exception: + logger.exception("Failed to download logs. Error: ", exc_info=True) + raise + + if (dfs is None or len(dfs) == 0): + raise ValueError("No logs were downloaded") + + output_df: pd.DataFrame = pd.concat(dfs) + + output_df = process_dataframe(df_in=output_df, input_schema=self._schema) + + # Finally sort by timestamp and then reset the index + output_df.sort_values(by=[self._timestamp_column_name], inplace=True) + + output_df.reset_index(drop=True, inplace=True) + + # Save dataframe to cache future runs + os.makedirs(os.path.dirname(batch_cache_location), exist_ok=True) + + try: + output_df.to_pickle(batch_cache_location) + except Exception: + logger.warning("Failed to save batch cache. Skipping cache for this batch.", exc_info=True) + + output_df["batch_count"] = batch_count + output_df["origin_hash"] = objects_hash_hex + + return (output_df, False) + + def convert_to_dataframe(self, file_object_batch: typing.Tuple[fsspec.core.OpenFiles, int]) -> pd.DataFrame: + """ + Convert a batch of file objects to a DataFrame. + + Parameters + ---------- + file_object_batch : typing.Tuple[fsspec.core.OpenFiles, int] + A batch of file objects and batch count. + + Returns + ------- + cudf.DataFrame + The resulting DataFrame. + """ + + if (not file_object_batch): + return None + + start_time = time.time() + + try: + output_df, cache_hit = self._get_or_create_dataframe_from_batch(file_object_batch) + + duration = (time.time() - start_time) * 1000.0 + + if (output_df is not None and logger.isEnabledFor(logging.DEBUG)): + logger.debug("S3 objects to DF complete. Rows: %s, Cache: %s, Duration: %s ms, Rate: %s rows/s", + len(output_df), + "hit" if cache_hit else "miss", + duration, + len(output_df) / (duration / 1000.0)) + + return output_df + except Exception: + logger.exception("Error while converting S3 buckets to DF.") + raise + + def close(self): + """ + Close the resources used by the controller. + """ + self._downloader.close() diff --git a/morpheus/controllers/filter_detections_controller.py b/morpheus/controllers/filter_detections_controller.py new file mode 100644 index 0000000000..c346fab0ae --- /dev/null +++ b/morpheus/controllers/filter_detections_controller.py @@ -0,0 +1,165 @@ +# Copyright (c) 2022-2023, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import typing + +import cupy as cp +import numpy as np +import typing_utils + +from morpheus.common import FilterSource +from morpheus.messages import MultiMessage +from morpheus.messages import MultiResponseMessage + +logger = logging.getLogger(__name__) + + +class FilterDetectionsController: + """ + Controller class for filtering detections based on a specified threshold and source. + + Parameters + ---------- + threshold : float + The threshold value for filtering detections. + filter_source : `morpheus.common.FilterSource` + The source used for filtering. + field_name : str + The name of the field used for filtering. + """ + + def __init__(self, threshold: float, filter_source: FilterSource, field_name: str) -> None: + self._threshold = threshold + self._filter_source = filter_source + self._field_name = field_name + + @property + def threshold(self): + """ + Get the threshold value. + """ + return self._threshold + + @property + def filter_source(self): + """ + Get the filter source. + """ + return self._filter_source + + @property + def field_name(self): + """ + Get the field name. + """ + return self._field_name + + def _find_detections(self, x: MultiMessage) -> typing.Union[cp.ndarray, np.ndarray]: + # Determind the filter source + if self._filter_source == FilterSource.TENSOR: + filter_source = x.get_output(self._field_name) + else: + filter_source = x.get_meta(self._field_name).values + + if (isinstance(filter_source, np.ndarray)): + array_mod = np + else: + array_mod = cp + + # Get per row detections + detections = (filter_source > self._threshold) + + if (len(detections.shape) > 1): + detections = detections.any(axis=1) + + # Surround in False to ensure we get an even number of pairs + detections = array_mod.concatenate([array_mod.array([False]), detections, array_mod.array([False])]) + + return array_mod.where(detections[1:] != detections[:-1])[0].reshape((-1, 2)) + + def filter_copy(self, x: MultiMessage) -> MultiMessage: + """ + This function uses a threshold value to filter the messages. + + Parameters + ---------- + x : `morpheus.pipeline.messages.MultiMessage` + Response message with probabilities calculated from inference results. + + Returns + ------- + `morpheus.pipeline.messages.MultiMessage` + A new message containing a copy of the rows above the threshold. + + """ + if x is None: + return None + + true_pairs = self._find_detections(x) + + # If we didnt have any detections, return None + if (true_pairs.shape[0] == 0): + return None + + return x.copy_ranges(true_pairs) + + def filter_slice(self, x: MultiMessage) -> typing.List[MultiMessage]: + """ + This function uses a threshold value to filter the messages. + + Parameters + ---------- + x : `morpheus.pipeline.messages.MultiMessage` + Response message with probabilities calculated from inference results. + + Returns + ------- + typing.List[`morpheus.pipeline.messages.MultiMessage`] + List of filtered messages. + + """ + # Unfortunately we have to convert this to a list in case there are non-contiguous groups + output_list = [] + if x is not None: + true_pairs = self._find_detections(x) + for pair in true_pairs: + pair = tuple(pair.tolist()) + if ((pair[1] - pair[0]) > 0): + output_list.append(x.get_slice(*pair)) + + return output_list + + def update_filter_source(self, message_type: typing.Any): + """ + This function updates filter source. + + Parameters + ---------- + message_type : `typing.Any` + Response message with probabilities calculated from inference results. + """ + + # Unfortunately we have to convert this to a list in case there are non-contiguous groups + if self._filter_source == FilterSource.Auto: + if (typing_utils.issubtype(message_type, MultiResponseMessage)): + self._filter_source = FilterSource.TENSOR + else: + self._filter_source = FilterSource.DATAFRAME + + logger.debug( + "filter_source was set to Auto, inferring a filter source of %s based on an input " + "message type of %s", + self._filter_source, + message_type) diff --git a/morpheus/controllers/mlflow_model_writer_controller.py b/morpheus/controllers/mlflow_model_writer_controller.py new file mode 100644 index 0000000000..dca198ddcb --- /dev/null +++ b/morpheus/controllers/mlflow_model_writer_controller.py @@ -0,0 +1,305 @@ +# Copyright (c) 2021-2023, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import hashlib +import logging +import os +import typing +import urllib.parse + +import mlflow +import requests +from mlflow.exceptions import MlflowException +from mlflow.models.signature import ModelSignature +from mlflow.protos.databricks_pb2 import RESOURCE_ALREADY_EXISTS +from mlflow.protos.databricks_pb2 import ErrorCode +from mlflow.store.artifact.runs_artifact_repo import RunsArtifactRepository +from mlflow.tracking import MlflowClient +from mlflow.types import ColSpec +from mlflow.types import Schema +from mlflow.types.utils import _infer_pandas_column +from mlflow.types.utils import _infer_schema + +import cudf + +from morpheus.messages.multi_ae_message import MultiAEMessage +from morpheus.models.dfencoder import AutoEncoder + +logger = logging.getLogger(__name__) + + +class MLFlowModelWriterController: + """ + Controller class for writing machine learning models to MLflow with optional permissions and configurations. + + Parameters + ---------- + model_name_formatter : str + Model name formatter. + experiment_name_formatter : str + Experiment name formatter. + databricks_permissions : dict + Users with read/write permissions. + conda_env : dict + Conda environment. + timeout : + Timeout for get requests. + timestamp_column_name : + Timestamp column name to be used from the dataframe. + + """ + + def __init__(self, + model_name_formatter, + experiment_name_formatter, + databricks_permissions, + conda_env, + timeout, + timestamp_column_name): + self._model_name_formatter = model_name_formatter + self._experiment_name_formatter = experiment_name_formatter + self._databricks_permissions = databricks_permissions + self._conda_env = conda_env + self._timeout = timeout + self._timestamp_column_name = timestamp_column_name + + @property + def model_name_formatter(self): + return self._model_name_formatter + + @property + def experiment_name_formatter(self): + return self._experiment_name_formatter + + @property + def databricks_permissions(self): + return self._databricks_permissions + + def user_id_to_model(self, user_id: str): + """ + Converts a user ID to an model name + + Parameters + ---------- + user_id : str + The user ID. + + Returns + ------- + str + The generated model name. + """ + + kwargs = { + "user_id": user_id, + "user_md5": hashlib.md5(user_id.encode('utf-8')).hexdigest(), + } + + return self._model_name_formatter.format(**kwargs) + + def user_id_to_experiment(self, user_id: str) -> str: + """ + Converts a user ID to an experiment name + + Parameters + ---------- + user_id : str + The user ID. + + Returns + ------- + str + The generated experiment name. + """ + + kwargs = { + "user_id": user_id, + "user_md5": hashlib.md5(user_id.encode('utf-8')).hexdigest(), + "reg_model_name": self.user_id_to_model(user_id=user_id) + } + + return self._experiment_name_formatter.format(**kwargs) + + def _apply_model_permissions(self, reg_model_name: str): + + # Check the required variables + databricks_host = os.environ.get("DATABRICKS_HOST", None) + databricks_token = os.environ.get("DATABRICKS_TOKEN", None) + + if (databricks_host is None or databricks_token is None): + raise RuntimeError("Cannot set Databricks model permissions. " + "Environment variables `DATABRICKS_HOST` and `DATABRICKS_TOKEN` must be set") + + headers = {"Authorization": f"Bearer {databricks_token}"} + + url_base = f"{databricks_host}" + + try: + # First get the registered model ID + get_registered_model_url = urllib.parse.urljoin(url_base, + "/api/2.0/mlflow/databricks/registered-models/get") + + get_registered_model_response = requests.get(url=get_registered_model_url, + headers=headers, + params={"name": reg_model_name}, + timeout=self._timeout) + + registered_model_response = get_registered_model_response.json() + + reg_model_id = registered_model_response["registered_model_databricks"]["id"] + + # Now apply the permissions. If it exists already, it will be overwritten or it is a no-op + patch_registered_model_permissions_url = urllib.parse.urljoin( + url_base, f"/api/2.0/preview/permissions/registered-models/{reg_model_id}") + + patch_registered_model_permissions_body = { + "access_control_list": [{ + "group_name": group, "permission_level": permission + } for group, + permission in self._databricks_permissions.items()] + } + + requests.patch(url=patch_registered_model_permissions_url, + headers=headers, + json=patch_registered_model_permissions_body, + timeout=self._timeout) + + except Exception: + logger.exception("Error occurred trying to apply model permissions to model: %s", + reg_model_name, + exc_info=True) + + def on_data(self, message: MultiAEMessage): + """ + Stores incoming models into MLflow. + + Parameters + ---------- + message : MultiAEMessage + The incoming message containing the model and related metadata. + + Returns + ------- + MultiAEMessage + The processed message. + """ + + user = message.meta.user_id + + model: AutoEncoder = message.model + + model_path = "dfencoder" + reg_model_name = self.user_id_to_model(user_id=user) + + # Write to ML Flow + try: + mlflow.end_run() + + experiment_name = self.user_id_to_experiment(user_id=user) + + # Creates a new experiment if it doesn't exist + experiment = mlflow.set_experiment(experiment_name) + + with mlflow.start_run(run_name="autoencoder model training run", + experiment_id=experiment.experiment_id) as run: + + model_path = f"{model_path}-{run.info.run_uuid}" + + # Log all params in one dict to avoid round trips + mlflow.log_params({ + "Algorithm": "Denosing Autoencoder", + "Epochs": model.lr_decay.state_dict().get("last_epoch", "unknown"), + "Learning rate": model.lr, + "Batch size": model.batch_size, + "Start Epoch": message.get_meta(self._timestamp_column_name).min(), + "End Epoch": message.get_meta(self._timestamp_column_name).max(), + "Log Count": message.mess_count, + }) + + metrics_dict: typing.Dict[str, float] = {} + + # Add info on the embeddings + for key, value in model.categorical_fts.items(): + embedding = value.get("embedding", None) + + if (embedding is None): + continue + + metrics_dict[f"embedding-{key}-num_embeddings"] = embedding.num_embeddings + metrics_dict[f"embedding-{key}-embedding_dim"] = embedding.embedding_dim + + mlflow.log_metrics(metrics_dict) + + # Use the prepare_df function to setup the direct inputs to the model. Only include features returned by + # prepare_df to show the actual inputs to the model (any extra are discarded) + input_df = message.get_meta().iloc[0:1] + + if isinstance(input_df, cudf.DataFrame): + input_df = input_df.to_pandas() + + prepared_df = model.prepare_df(input_df) + output_values = model.get_anomaly_score(input_df) + + input_schema = Schema([ + ColSpec(type=_infer_pandas_column(input_df[col_name]), name=col_name) + for col_name in list(prepared_df.columns) + ]) + output_schema = _infer_schema(output_values) + + model_sig = ModelSignature(inputs=input_schema, outputs=output_schema) + + model_info = mlflow.pytorch.log_model( + pytorch_model=model, + artifact_path=model_path, + conda_env=self._conda_env, + signature=model_sig, + ) + + client = MlflowClient() + + # First ensure a registered model has been created + try: + create_model_response = client.create_registered_model(reg_model_name) + logger.debug("Successfully registered model '%s'.", create_model_response.name) + except MlflowException as e: + if e.error_code == ErrorCode.Name(RESOURCE_ALREADY_EXISTS): + pass + else: + raise e + + # If we are using databricks, make sure we set the correct permissions + if (self._databricks_permissions is not None and mlflow.get_tracking_uri() == "databricks"): + # Need to apply permissions + self._apply_model_permissions(reg_model_name=reg_model_name) + + model_src = RunsArtifactRepository.get_underlying_uri(model_info.model_uri) + + tags = { + "start": message.get_meta(self._timestamp_column_name).min(), + "end": message.get_meta(self._timestamp_column_name).max(), + "count": message.get_meta(self._timestamp_column_name).count() + } + + # Now create the model version + mv_obj = client.create_model_version(name=reg_model_name, + source=model_src, + run_id=run.info.run_id, + tags=tags) + + logger.debug("ML Flow model upload complete: %s:%s:%s", user, reg_model_name, mv_obj.version) + + except Exception: + logger.exception("Error uploading model to ML Flow", exc_info=True) + + return message diff --git a/morpheus/controllers/monitor_controller.py b/morpheus/controllers/monitor_controller.py new file mode 100644 index 0000000000..30940caf7b --- /dev/null +++ b/morpheus/controllers/monitor_controller.py @@ -0,0 +1,235 @@ +# Copyright (c) 2022-2023, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import typing +from functools import reduce + +import fsspec +from tqdm import tqdm + +import cudf + +from morpheus.messages import ControlMessage +from morpheus.messages import MessageMeta +from morpheus.messages import MultiMessage +from morpheus.utils.logger import LogLevels +from morpheus.utils.monitor_utils import MorpheusTqdm + +logger = logging.getLogger(__name__) + + +class MonitorController: + """ + Controls and displays throughput numbers at a specific point in the pipeline. + + Parameters + ---------- + position: int + Specifies the monitor's position on the console. + description : str, default = "Progress" + Name to show for this Monitor Stage in the console window. + smoothing : float + Smoothing parameter to determine how much the throughput should be averaged. 0 = Instantaneous, 1 = + Average. + unit : str + Units to show in the rate value. + delayed_start : bool + When delayed_start is enabled, the progress bar will not be shown until the first message is received. + Otherwise, the progress bar is shown on pipeline startup and will begin timing immediately. In large pipelines, + this option may be desired to give a more accurate timing. + determine_count_fn : typing.Callable[[typing.Any], int] + Custom function for determining the count in a message. Gets called for each message. Allows for + correct counting of batched and sliced messages. + log_level : `morpheus.utils.logger.LogLevels`, default = 'INFO' + Enable this stage when the configured log level is at `log_level` or lower. + tqdm_class: `tqdm`, default = None + Custom implementation of tqdm if required. + """ + + controller_count: int = 0 + + def __init__(self, + position: int, + description: str, + smoothing: float, + unit: str, + delayed_start: bool, + determine_count_fn: typing.Callable[[typing.Any], int], + log_level: LogLevels, + tqdm_class: tqdm = None): + + self._progress: tqdm = None + self._position = position + self._description = description + self._smoothing = smoothing + self._unit = unit + self._delayed_start = delayed_start + self._determine_count_fn = determine_count_fn + self._tqdm_class = tqdm_class if tqdm_class else MorpheusTqdm + + if isinstance(log_level, LogLevels): # pylint: disable=isinstance-second-argument-not-valid-type + log_level = log_level.value + + self._log_level = log_level + self._enabled = None # defined on first call to _is_enabled + + @property + def delayed_start(self) -> bool: + return self._delayed_start + + @property + def progress(self) -> tqdm: + return self._progress + + def is_enabled(self) -> bool: + """ + Returns a boolean indicating whether or not the logger is enabled. + """ + + if self._enabled is None: + self._enabled = logger.isEnabledFor(self._log_level) + + return self._enabled + + def ensure_progress_bar(self): + """ + Ensures that the progress bar is initialized and ready for display. + """ + + if (self._progress is None): + self._progress = self._tqdm_class(desc=self._description, + smoothing=self._smoothing, + dynamic_ncols=True, + unit=(self._unit if self._unit.startswith(" ") else f" {self._unit}"), + mininterval=0.25, + maxinterval=1.0, + miniters=1, + position=self._position) + + self._progress.reset() + + def refresh_progress(self, _): + """ + Refreshes the progress bar display. + """ + self._progress.refresh() + + def progress_sink(self, x: typing.Union[cudf.DataFrame, MultiMessage, MessageMeta, ControlMessage, typing.List]): + """ + Receives a message and determines the count of the message. + The progress bar is displayed and the progress is updated. + + Parameters + ---------- + x: typing.Union[cudf.DataFrame, MultiMessage, MessageMeta, ControlMessage, typing.List] + Message that determines the count of the message + + Returns + ------- + x: typing.Union[cudf.DataFrame, MultiMessage, MessageMeta, ControlMessage, typing.List] + + """ + + # Make sure the progress bar is shown + self.ensure_progress_bar() + + if (self._determine_count_fn is None): + self._determine_count_fn = self.auto_count_fn(x) + + # Skip incase we have empty objects + if (self._determine_count_fn is None): + return x + + # Do our best to determine the count + count = self._determine_count_fn(x) + + self._progress.update(n=count) + + return x + + def auto_count_fn(self, x: typing.Union[cudf.DataFrame, MultiMessage, MessageMeta, ControlMessage, typing.List]): + """ + This is a helper function that is used to determine the count of messages received by the + monitor. + + Parameters + ---------- + x: typing.Union[cudf.DataFrame, MultiMessage, MessageMeta, ControlMessage, typing.List] + Message that determines the count of the message + + Returns + ------- + Message count. + + """ + + # pylint: disable=too-many-return-statements + + if (x is None): + return None + + # Wait for a list thats not empty + if (isinstance(x, list) and len(x) == 0): + return None + + if (isinstance(x, cudf.DataFrame)): + return lambda y: len(y.index) + + if (isinstance(x, MultiMessage)): + return lambda y: y.mess_count + + if (isinstance(x, MessageMeta)): + return lambda y: y.count + + if isinstance(x, ControlMessage): + + def check_df(y): + df = y.payload().df + if df is not None: + return len(df) + + return 0 + + return check_df + + if (isinstance(x, list)): + item_count_fn = self.auto_count_fn(x[0]) + return lambda y: reduce(lambda sum, z, item_count_fn=item_count_fn: sum + item_count_fn(z), y, 0) + + if (isinstance(x, (str, fsspec.core.OpenFile))): + return lambda y: 1 + + if (hasattr(x, "__len__")): + return len # Return len directly (same as `lambda y: len(y)`) + + raise NotImplementedError(f"Unsupported type: {type(x)}") + + def sink_on_completed(self): + """ + Stops the progress bar and prevents the monitors from writing over each other when the last + stage completes. + """ + + # Set the name to complete. This refreshes the display + self.progress.set_description_str(self.progress.desc + "[Complete]") + + self.progress.stop() + + # To prevent the monitors from writing over eachother, stop the monitor when the last stage completes + MonitorController.controller_count -= 1 + + if (MonitorController.controller_count <= 0 and self._tqdm_class.monitor is not None): + self._tqdm_class.monitor.exit() + self._tqdm_class.monitor = None diff --git a/morpheus/controllers/serialize_controller.py b/morpheus/controllers/serialize_controller.py new file mode 100644 index 0000000000..9750741a76 --- /dev/null +++ b/morpheus/controllers/serialize_controller.py @@ -0,0 +1,135 @@ +# Copyright (c) 2021-2023, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import copy +import re +import typing + +from morpheus.messages import MessageMeta +from morpheus.messages import MultiMessage + + +class SerializeController: + """ + Controller class for converting data to JSON lines format with customizable column selection and exclusion. + + Parameters + ---------- + include : typing.List[str] + List of columns to include. + exclude : typing.List[str] + List of columns to exclude. + fixed_columns : bool + Flag to indicate whether columns should be fixed. + """ + + def __init__(self, include: typing.List[str], exclude: typing.List[str], fixed_columns: bool): + self._include_columns = copy.copy(include) + self._exclude_columns = copy.copy(exclude) + self._fixed_columns = fixed_columns + self._columns = None + + @property + def include_columns(self): + """ + Get the list of included columns. + """ + return self._include_columns + + @property + def exclude_columns(self): + """ + Get the list of excluded columns. + """ + return self._exclude_columns + + @property + def fixed_columns(self): + """ + Get the flag indicating whether columns are fixed. + """ + return self._fixed_columns + + def convert_to_df(self, + x: MultiMessage, + include_columns: typing.Pattern, + exclude_columns: typing.List[typing.Pattern]): + """ + Converts dataframe to entries to JSON lines. + + Parameters + ---------- + x : `morpheus.pipeline.messages.MultiMessage` + MultiMessage instance that contains data. + include_columns : typing.Pattern + Columns that are required send to downstream stage. + exclude_columns : typing.List[typing.Pattern] + Columns that are not required send to downstream stage. + + """ + + if self._fixed_columns and self._columns is not None: + columns = self._columns + else: + columns: typing.List[str] = [] + + # Minimize access to x.meta.df + df_columns = list(x.meta.df.columns) + + # First build up list of included. If no include regex is specified, select all + if (include_columns is None): + columns = df_columns + else: + columns = [y for y in df_columns if include_columns.match(y)] + + # Now remove by the ignore + for test in exclude_columns: + columns = [y for y in columns if not test.match(y)] + + self._columns = columns + + # Get metadata from columns + df = x.get_meta(columns) + + return MessageMeta(df=df) + + def get_include_col_pattern(self): + """ + Get the compiled pattern for include columns. + + Returns + ------- + typing.Pattern + The compiled pattern for include columns. + """ + + include_columns = None + + if (self._include_columns is not None and len(self._include_columns) > 0): + include_columns = re.compile(f"({'|'.join(self._include_columns)})") + + return include_columns + + def get_exclude_col_pattern(self): + """ + Get the list of compiled patterns for exclude columns. + + Returns + ------- + typing.List[typing.Pattern] + The list of compiled patterns for exclude columns. + """ + exclude_columns = [re.compile(x) for x in self._exclude_columns] + + return exclude_columns diff --git a/morpheus/controllers/write_to_file_controller.py b/morpheus/controllers/write_to_file_controller.py new file mode 100644 index 0000000000..15bc014548 --- /dev/null +++ b/morpheus/controllers/write_to_file_controller.py @@ -0,0 +1,136 @@ +# Copyright (c) 2023, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os + +import mrc +import mrc.core.operators as ops + +from morpheus.common import FileTypes +from morpheus.common import determine_file_type +from morpheus.io import serializers +from morpheus.messages import MessageMeta +from morpheus.utils.type_aliases import DataFrameType + + +class WriteToFileController: + """ + Controller class for writing data to a file with customizable options. + + Parameters + ---------- + filename : str + The output file name. + overwrite : bool + Flag to indicate whether to overwrite an existing file. + file_type : FileTypes + The type of the output file (e.g., CSV, JSON). + include_index_col : bool + Flag to indicate whether to include the index column in the output. + flush : bool + Flag to indicate whether to flush the output file after writing. + """ + + def __init__(self, filename: str, overwrite: bool, file_type: FileTypes, include_index_col: bool, flush: bool): + self._output_file = filename + self._overwrite = overwrite + + if (os.path.exists(self._output_file)): + if (self._overwrite): + os.remove(self._output_file) + else: + raise FileExistsError( + f"Cannot output classifications to '{self._output_file}'. File exists and overwrite = False") + + self._file_type = file_type + + if (self._file_type == FileTypes.Auto): + self._file_type = determine_file_type(self._output_file) + + self._is_first = True + self._include_index_col = include_index_col + self._flush = flush + + @property + def output_file(self): + """ + Get the output file name. + """ + return self._output_file + + @property + def overwrite(self): + """ + Get the flag indicating whether to overwrite an existing file. + """ + return self._overwrite + + @property + def file_type(self): + """ + Get the type of the output file. + """ + return self._file_type + + @property + def include_index_col(self): + """ + Get the flag indicating whether to include the index column in the output. + """ + return self._include_index_col + + @property + def flush(self): + """ + Get the flag indicating whether to flush the output file after writing. + """ + return self._flush + + def _convert_to_strings(self, df: DataFrameType): + if self._file_type in (FileTypes.JSON, 'JSON'): + output_strs = serializers.df_to_json(df, include_index_col=self._include_index_col) + elif self._file_type in (FileTypes.CSV, 'CSV'): + output_strs = serializers.df_to_csv(df, + include_header=self._is_first, + include_index_col=self._include_index_col) + self._is_first = False + else: + raise NotImplementedError(f"Unknown file type: {self._file_type}") + + # Remove any trailing whitespace + if (len(output_strs[-1].strip()) == 0): + output_strs = output_strs[:-1] + + return output_strs + + def node_fn(self, obs: mrc.Observable, sub: mrc.Subscriber): + + # Ensure our directory exists + os.makedirs(os.path.realpath(os.path.dirname(self._output_file)), exist_ok=True) + + # Open up the file handle + with open(self._output_file, "a", encoding='UTF-8') as out_file: + + def write_to_file(x: MessageMeta): + + lines = self._convert_to_strings(x.df) + + out_file.writelines(lines) + + if self._flush: + out_file.flush() + + return x + + obs.pipe(ops.map(write_to_file)).subscribe(sub) diff --git a/morpheus/loaders/file_to_df_loader.py b/morpheus/loaders/file_to_df_loader.py index 2169b3f105..ff69d89366 100644 --- a/morpheus/loaders/file_to_df_loader.py +++ b/morpheus/loaders/file_to_df_loader.py @@ -13,28 +13,17 @@ # limitations under the License. """Loader for fetching files and emitting them as DataFrames.""" -import hashlib -import json import logging -import os import pickle -import time -import typing -from functools import partial import fsspec -import fsspec.utils -import pandas as pd import cudf from morpheus.cli.utils import str_to_file_type -from morpheus.common import FileTypes -from morpheus.io.deserializers import read_file_to_df +from morpheus.controllers.file_to_df_controller import FileToDFController from morpheus.messages import ControlMessage from morpheus.messages.message_meta import MessageMeta -from morpheus.utils.column_info import process_dataframe -from morpheus.utils.downloader import Downloader from morpheus.utils.loader_ids import FILE_TO_DF_LOADER from morpheus.utils.loader_utils import register_loader @@ -72,6 +61,8 @@ def file_to_df_loader(control_message: ControlMessage, task: dict): raise RuntimeError("Only 'aggregate' strategy is supported for file_to_df loader.") files = task.get("files", None) + n_groups = task.get("n_groups", None) + config = task["batcher_config"] timestamp_column_name = config.get("timestamp_column_name", "timestamp") @@ -88,14 +79,10 @@ def file_to_df_loader(control_message: ControlMessage, task: dict): parser_kwargs = config.get("parser_kwargs", None) cache_dir = config.get("cache_dir", None) - downloader = Downloader() - if (cache_dir is None): cache_dir = "./.cache" logger.warning("Cache directory not set. Defaulting to ./.cache") - cache_dir = os.path.join(cache_dir, "file_cache") - # Load input schema schema = pickle.loads(bytes(schema_str, encoding)) @@ -104,135 +91,20 @@ def file_to_df_loader(control_message: ControlMessage, task: dict): except Exception as exec_info: raise ValueError(f"Invalid input file type '{file_type}'. Available file types are: CSV, JSON.") from exec_info - def single_object_to_dataframe(file_object: fsspec.core.OpenFile, - file_type: FileTypes, - filter_null: bool, - parser_kwargs: dict): - retries = 0 - s3_df = None - while (retries < 2): - try: - with file_object as f: - s3_df = read_file_to_df(f, - file_type, - filter_nulls=filter_null, - df_type="pandas", - parser_kwargs=parser_kwargs) - break - except Exception as exec_info: - if (retries < 2): - logger.warning("Refreshing S3 credentials") - retries += 1 - else: - raise exec_info - - # Run the pre-processing before returning - if (s3_df is None): - return s3_df - - # Optimistaclly prep the dataframe (Not necessary since this will happen again in process_dataframe, but it - # increases performance significantly) - if (schema.prep_dataframe is not None): - s3_df = schema.prep_dataframe(s3_df) - - return s3_df - - def get_or_create_dataframe_from_s3_batch(file_name_batch: typing.List[str]) -> typing.Tuple[cudf.DataFrame, bool]: - - if (not file_name_batch): - raise RuntimeError("No file objects to process") - - file_list = fsspec.open_files(file_name_batch) - # batch_count = file_name_batch[1] - - file_system: fsspec.AbstractFileSystem = file_list.fs - - # Create a list of dictionaries that only contains the information we are interested in hashing. `ukey` just - # hashes all the output of `info()` which is perfect - hash_data = [{"ukey": file_system.ukey(file_object.path)} for file_object in file_list] - - # Convert to base 64 encoding to remove - values - objects_hash_hex = hashlib.md5(json.dumps(hash_data, sort_keys=True).encode()).hexdigest() - - batch_cache_location = os.path.join(cache_dir, "batches", f"{objects_hash_hex}.pkl") - - # Return the cache if it exists - if (os.path.exists(batch_cache_location)): - output_df = pd.read_pickle(batch_cache_location) - output_df["origin_hash"] = objects_hash_hex - # output_df["batch_count"] = batch_count - - return (output_df, True) - - # Cache miss - download_method_func = partial(single_object_to_dataframe, - file_type=file_type, - filter_null=filter_null, - parser_kwargs=parser_kwargs) - - download_buckets = file_list - - # Loop over dataframes and concat into one - try: - dfs = downloader.download(download_buckets, download_method_func) - except Exception: - logger.exception("Failed to download logs. Error: ", exc_info=True) - raise - - if (dfs is None or len(dfs) == 0): - raise ValueError("No logs were downloaded") - - output_df: pd.DataFrame = pd.concat(dfs) - output_df = process_dataframe(df_in=output_df, input_schema=schema) - - # Finally sort by timestamp and then reset the index - output_df.sort_values(by=[timestamp_column_name], inplace=True) - - output_df.reset_index(drop=True, inplace=True) - - # Save dataframe to cache future runs - os.makedirs(os.path.dirname(batch_cache_location), exist_ok=True) - - try: - output_df.to_pickle(batch_cache_location) - except Exception: - logger.warning("Failed to save batch cache. Skipping cache for this batch.", exc_info=True) - - # output_df["batch_count"] = batch_count - output_df["origin_hash"] = objects_hash_hex - - return (output_df, False) - - def convert_to_dataframe(filenames: typing.List[str]): - - if (not filenames): - return None - - start_time = time.time() - - try: - - output_df, cache_hit = get_or_create_dataframe_from_s3_batch(filenames) - - duration = (time.time() - start_time) * 1000.0 - - if (output_df is not None and logger.isEnabledFor(logging.DEBUG)): - logger.debug("S3 objects to DF complete. Rows: %s, Cache: %s, Duration: %s ms, Rate: %s rows/s", - len(output_df), - "hit" if cache_hit else "miss", - duration, - len(output_df) / (duration / 1000.0)) - - return output_df - except Exception: - logger.exception("Error while converting S3 buckets to DF.") - raise - - pdf = convert_to_dataframe(files) - - df = cudf.from_pandas(pdf) - - # Overwriting payload with derived data - control_message.payload(MessageMeta(df)) + try: + controller = FileToDFController(schema=schema, + filter_null=filter_null, + file_type=file_type, + parser_kwargs=parser_kwargs, + cache_dir=cache_dir, + timestamp_column_name=timestamp_column_name) + pdf = controller.convert_to_dataframe(file_object_batch=(fsspec.open_files(files), n_groups)) + df = cudf.from_pandas(pdf) + + # Overwriting payload with derived data + control_message.payload(MessageMeta(df)) + + finally: + controller.close() return control_message diff --git a/morpheus/modules/file_to_df.py b/morpheus/modules/file_to_df.py index 32c09f8a66..d7c053aef4 100644 --- a/morpheus/modules/file_to_df.py +++ b/morpheus/modules/file_to_df.py @@ -13,28 +13,14 @@ # limitations under the License. """Morpheus pipeline module for fetching files and emitting them as DataFrames.""" -import hashlib -import json import logging -import os import pickle -import time -import typing -from functools import partial -import fsspec -import fsspec.utils import mrc -import pandas as pd from mrc.core import operators as ops -import cudf - from morpheus.cli.utils import str_to_file_type -from morpheus.common import FileTypes -from morpheus.io.deserializers import read_file_to_df -from morpheus.utils.column_info import process_dataframe -from morpheus.utils.downloader import Downloader +from morpheus.controllers.file_to_df_controller import FileToDFController from morpheus.utils.module_ids import FILE_TO_DF from morpheus.utils.module_ids import MORPHEUS_MODULE_NAMESPACE from morpheus.utils.module_utils import register_module @@ -80,14 +66,10 @@ def file_to_df(builder: mrc.Builder): parser_kwargs = config.get("parser_kwargs", None) cache_dir = config.get("cache_dir", None) - downloader = Downloader() - if (cache_dir is None): cache_dir = "./.cache" logger.warning("Cache directory not set. Defaulting to ./.cache") - cache_dir = os.path.join(cache_dir, "file_cache") - # Load input schema schema = pickle.loads(bytes(schema_str, encoding)) @@ -96,136 +78,14 @@ def file_to_df(builder: mrc.Builder): except Exception as exec_info: raise ValueError(f"Invalid input file type '{file_type}'. Available file types are: CSV, JSON.") from exec_info - def single_object_to_dataframe(file_object: fsspec.core.OpenFile, - file_type: FileTypes, - filter_null: bool, - parser_kwargs: dict): - - retries = 0 - s3_df = None - while (retries < 2): - try: - with file_object as f: - s3_df = read_file_to_df(f, - file_type, - filter_nulls=filter_null, - df_type="pandas", - parser_kwargs=parser_kwargs) - - break - except Exception as e: - if (retries < 2): - logger.warning("Refreshing S3 credentials") - retries += 1 - else: - raise e - - # Run the pre-processing before returning - if (s3_df is None): - return s3_df - - # Optimistaclly prep the dataframe (Not necessary since this will happen again in process_dataframe, but it - # increases performance significantly) - if (schema.prep_dataframe is not None): - s3_df = schema.prep_dataframe(s3_df) - - return s3_df - - def get_or_create_dataframe_from_s3_batch( - file_object_batch: typing.Tuple[fsspec.core.OpenFiles, int]) -> typing.Tuple[cudf.DataFrame, bool]: - - if (not file_object_batch): - raise RuntimeError("No file objects to process") - - file_list = file_object_batch[0] - batch_count = file_object_batch[1] - - file_system: fsspec.AbstractFileSystem = file_list.fs - - # Create a list of dictionaries that only contains the information we are interested in hashing. `ukey` just - # hashes all of the output of `info()` which is perfect - hash_data = [{"ukey": file_system.ukey(file_object.path)} for file_object in file_list] - - # Convert to base 64 encoding to remove - values - objects_hash_hex = hashlib.md5(json.dumps(hash_data, sort_keys=True).encode()).hexdigest() - - batch_cache_location = os.path.join(cache_dir, "batches", f"{objects_hash_hex}.pkl") - - # Return the cache if it exists - if (os.path.exists(batch_cache_location)): - output_df = pd.read_pickle(batch_cache_location) - output_df["origin_hash"] = objects_hash_hex - output_df["batch_count"] = batch_count - - return (output_df, True) - - # Cache miss - download_method_func = partial(single_object_to_dataframe, - file_type=file_type, - filter_null=filter_null, - parser_kwargs=parser_kwargs) - - download_buckets = file_list - - # Loop over dataframes and concat into one - try: - dfs = downloader.download(download_buckets, download_method_func) - except Exception: - logger.exception("Failed to download logs. Error: ", exc_info=True) - raise - - if (dfs is None or len(dfs) == 0): - raise ValueError("No logs were downloaded") - - output_df: pd.DataFrame = pd.concat(dfs) - - output_df = process_dataframe(df_in=output_df, input_schema=schema) - - # Finally sort by timestamp and then reset the index - output_df.sort_values(by=[timestamp_column_name], inplace=True) - - output_df.reset_index(drop=True, inplace=True) - - # Save dataframe to cache future runs - os.makedirs(os.path.dirname(batch_cache_location), exist_ok=True) - - try: - output_df.to_pickle(batch_cache_location) - except Exception: - logger.warning("Failed to save batch cache. Skipping cache for this batch.", exc_info=True) - - output_df["batch_count"] = batch_count - output_df["origin_hash"] = objects_hash_hex - - return (output_df, False) - - def convert_to_dataframe(file_object_batch: typing.Tuple[fsspec.core.OpenFiles, int]): - if (not file_object_batch): - return None - - start_time = time.time() - - try: - output_df, cache_hit = get_or_create_dataframe_from_s3_batch(file_object_batch) - - duration = (time.time() - start_time) * 1000.0 - - if (output_df is not None and logger.isEnabledFor(logging.DEBUG)): - logger.debug("S3 objects to DF complete. Rows: %s, Cache: %s, Duration: %s ms, Rate: %s rows/s", - len(output_df), - "hit" if cache_hit else "miss", - duration, - len(output_df) / (duration / 1000.0)) - - return output_df - except Exception: - logger.exception("Error while converting S3 buckets to DF.") - raise - - def node_fn(obs: mrc.Observable, sub: mrc.Subscriber): - obs.pipe(ops.map(convert_to_dataframe), ops.on_completed(downloader.close)).subscribe(sub) + controller = FileToDFController(schema=schema, + filter_null=filter_null, + file_type=file_type, + parser_kwargs=parser_kwargs, + cache_dir=cache_dir, + timestamp_column_name=timestamp_column_name) - node = builder.make_node(FILE_TO_DF, mrc.core.operators.build(node_fn)) + node = builder.make_node(FILE_TO_DF, ops.map(controller.convert_to_dataframe), ops.on_completed(controller.close)) # Register input and output port for a module. builder.register_module_input("input", node) diff --git a/morpheus/modules/filter_detections.py b/morpheus/modules/filter_detections.py index e19e54e5d6..41a59639ac 100644 --- a/morpheus/modules/filter_detections.py +++ b/morpheus/modules/filter_detections.py @@ -14,17 +14,13 @@ import logging import pickle -import typing -import cupy as cp import mrc -import numpy as np -import typing_utils from mrc.core import operators as ops +import morpheus._lib.stages as _stages from morpheus.common import FilterSource -from morpheus.messages import MultiMessage -from morpheus.messages.multi_response_message import MultiResponseMessage +from morpheus.controllers.filter_detections_controller import FilterDetectionsController from morpheus.utils.module_ids import FILTER_DETECTIONS from morpheus.utils.module_ids import MORPHEUS_MODULE_NAMESPACE from morpheus.utils.module_utils import register_module @@ -85,6 +81,10 @@ def filter_detections(builder: mrc.Builder): field_name = config.get("field_name", "probs") threshold = config.get("threshold", 0.5) filter_source = config.get("filter_source", "AUTO") + use_cpp = config.get("use_cpp", False) + + filter_source_dict = {"AUTO": FilterSource.Auto, "DATAFRAME": FilterSource.DATAFRAME, "TENSOR": FilterSource.TENSOR} + copy = config.get("copy", True) if ("schema" not in config): @@ -96,100 +96,27 @@ def filter_detections(builder: mrc.Builder): message_type = pickle.loads(bytes(input_message_type, encoding)) - def find_detections(multi_message: MultiMessage, _filter_source) -> typing.Union[cp.ndarray, np.ndarray]: - - # Determind the filter source - if _filter_source == FilterSource.TENSOR: - _filter_source = multi_message.get_output(field_name) - else: - _filter_source = multi_message.get_meta(field_name).values - - if (isinstance(_filter_source, np.ndarray)): - array_mod = np - else: - array_mod = cp - - # Get per row detections - detections = (_filter_source > threshold) - - if (len(detections.shape) > 1): - detections = detections.any(axis=1) - - # Surround in False to ensure we get an even number of pairs - detections = array_mod.concatenate([array_mod.array([False]), detections, array_mod.array([False])]) - - return array_mod.where(detections[1:] != detections[:-1])[0].reshape((-1, 2)) - - def filter_copy(multi_message: MultiMessage) -> typing.Union[MultiMessage, None]: - """ - This function uses a threshold value to filter the messages. - - Parameters - ---------- - multi_message : `morpheus.pipeline.messages.MultiMessage` - Response message with probabilities calculated from inference results. - - Returns - ------- - `morpheus.pipeline.messages.MultiMessage` - A new message containing a copy of the rows above the threshold. - - """ - if multi_message is None: - return None + controller = FilterDetectionsController(threshold=threshold, + filter_source=filter_source_dict[filter_source], + field_name=field_name) - true_pairs = find_detections(multi_message, filter_source) + controller.update_filter_source(message_type=message_type) - if (true_pairs.shape[0] == 0): - return None - - return multi_message.copy_ranges(true_pairs) - - def filter_slice(multi_message: MultiMessage) -> typing.List[MultiMessage]: - """ - This function uses a threshold value to filter the messages. - - Parameters - ---------- - multi_message : `morpheus.pipeline.messages.MultiMessage` - Response message with probabilities calculated from inference results. - - Returns - ------- - typing.List[`morpheus.pipeline.messages.MultiMessage`] - List of filtered messages. - - """ - - # Unfortunately we have to convert this to a list in case there are non-contiguous groups - output_list = [] - if multi_message is not None: - true_pairs = find_detections(multi_message, filter_source) - for pair in true_pairs: - pair = tuple(pair.tolist()) - if ((pair[1] - pair[0]) > 0): - output_list.append(multi_message.get_slice(*pair)) - - return output_list - - if filter_source == "AUTO": - if (typing_utils.issubtype(message_type, MultiResponseMessage)): - filter_source = FilterSource.TENSOR - else: - filter_source = FilterSource.DATAFRAME - - # logger.debug(f"filter_source was set to Auto, infering a filter source of {filter_source} based on an input " - # "message type of {message_type}") - elif filter_source == "DATAFRAME": - filter_source = FilterSource.DATAFRAME + if use_cpp: + node = _stages.FilterDetectionsStage(builder, + FILTER_DETECTIONS, + controller.threshold, + copy, + controller.filter_source, + controller.field_name) else: - raise RuntimeError(f"Unknown filter source: {filter_source}") - - if copy: - node = builder.make_node(FILTER_DETECTIONS, ops.map(filter_copy)) - else: - # Convert list returned by `filter_slice` back to individual messages - node = builder.make_node(FILTER_DETECTIONS, ops.map(filter_slice), ops.flatten()) + if copy: + node = builder.make_node(FILTER_DETECTIONS, + ops.map(controller.filter_copy), + ops.filter(lambda x: x is not None)) + else: + # Convert list returned by `filter_slice` back to individual messages + node = builder.make_node(FILTER_DETECTIONS, ops.map(controller.filter_slice), ops.flatten()) # Register input and output port for a module. builder.register_module_input("input", node) diff --git a/morpheus/modules/mlflow_model_writer.py b/morpheus/modules/mlflow_model_writer.py index 4facb7c0ba..6c842d64d3 100644 --- a/morpheus/modules/mlflow_model_writer.py +++ b/morpheus/modules/mlflow_model_writer.py @@ -12,29 +12,12 @@ # See the License for the specific language governing permissions and # limitations under the License. -import hashlib import logging -import os -import typing -import urllib.parse -import mlflow import mrc -import requests -from mlflow.exceptions import MlflowException -from mlflow.models.signature import ModelSignature -from mlflow.protos.databricks_pb2 import RESOURCE_ALREADY_EXISTS -from mlflow.protos.databricks_pb2 import ErrorCode -from mlflow.store.artifact.runs_artifact_repo import RunsArtifactRepository -from mlflow.tracking import MlflowClient -from mlflow.types import ColSpec -from mlflow.types import Schema -from mlflow.types.utils import _infer_pandas_column -from mlflow.types.utils import _infer_schema from mrc.core import operators as ops -from morpheus.messages.multi_ae_message import MultiAEMessage -from morpheus.models.dfencoder import AutoEncoder +from morpheus.controllers.mlflow_model_writer_controller import MLFlowModelWriterController from morpheus.utils.module_ids import MLFLOW_MODEL_WRITER from morpheus.utils.module_ids import MORPHEUS_MODULE_NAMESPACE from morpheus.utils.module_utils import register_module @@ -62,7 +45,7 @@ def mlflow_model_writer(builder: mrc.Builder): - model_name_formatter (str): Formatter for the model name; Example: `model_name_{timestamp}`; Default: `[Required]` - timestamp_column_name (str): Name of the timestamp column; Example: `timestamp`; Default: timestamp - - source (str): from source where the logs are generated; Example: `azure`; Default: `[Required]` + - timeout (float): Timeout for get requests. databricks_permissions: - read (array): List of users with read permissions; Example: `["read_user1", "read_user2"]`; Default: - @@ -71,11 +54,9 @@ def mlflow_model_writer(builder: mrc.Builder): config = builder.get_current_module_config() + timeout = config.get("timeout", 1.0) timestamp_column_name = config.get("timestamp_column_name", "timestamp") - if ("source" not in config): - raise ValueError("Source is required") - if ("model_name_formatter" not in config): raise ValueError("Model name formatter is required") @@ -85,190 +66,21 @@ def mlflow_model_writer(builder: mrc.Builder): if ("conda_env" not in config): raise ValueError("Conda environment is required") - source = config["source"] model_name_formatter = config["model_name_formatter"] experiment_name_formatter = config["experiment_name_formatter"] conda_env = config.get("conda_env", None) databricks_permissions = config.get("databricks_permissions", None) - def user_id_to_model(user_id: str): - - kwargs = { - "user_id": user_id, - "user_md5": hashlib.md5(user_id.encode('utf-8')).hexdigest(), - } - - return model_name_formatter.format(**kwargs) - - def user_id_to_experiment(user_id: str): - - kwargs = { - "user_id": user_id, - "user_md5": hashlib.md5(user_id.encode('utf-8')).hexdigest(), - "reg_model_name": user_id_to_model(user_id=user_id) - } - - return experiment_name_formatter.format(**kwargs) - - def apply_model_permissions(reg_model_name: str): - - # Check the required variables - databricks_host = os.environ.get("DATABRICKS_HOST", None) - databricks_token = os.environ.get("DATABRICKS_TOKEN", None) - - if (databricks_host is None or databricks_token is None): - raise RuntimeError("Cannot set Databricks model permissions. " - "Environment variables `DATABRICKS_HOST` and `DATABRICKS_TOKEN` must be set") - - headers = {"Authorization": f"Bearer {databricks_token}"} - - url_base = f"{databricks_host}" - - try: - # First get the registered model ID - get_registered_model_url = urllib.parse.urljoin(url_base, - "/api/2.0/mlflow/databricks/registered-models/get") - - # Remove once https://github.com/nv-morpheus/Morpheus/issues/1050 is resolved - # pylint: disable=missing-timeout - get_registered_model_response = requests.get(url=get_registered_model_url, - headers=headers, - params={"name": reg_model_name}) - - registered_model_response = get_registered_model_response.json() - - reg_model_id = registered_model_response["registered_model_databricks"]["id"] - - # Now apply the permissions. If it exists already, it will be overwritten or it is a no-op - patch_registered_model_permissions_url = urllib.parse.urljoin( - url_base, f"/api/2.0/preview/permissions/registered-models/{reg_model_id}") - - patch_registered_model_permissions_body = { - "access_control_list": [{ - "group_name": group, "permission_level": permission - } for group, - permission in databricks_permissions.items()] - } - - requests.patch(url=patch_registered_model_permissions_url, - headers=headers, - json=patch_registered_model_permissions_body) - - except Exception: - logger.exception("Error occurred trying to apply model permissions to model: %s", - reg_model_name, - exc_info=True) - - def on_data(message: MultiAEMessage): - - user = message.meta.user_id - - model: AutoEncoder = message.model - - model_path = "dfencoder" - reg_model_name = user_id_to_model(user_id=user) - - # Write to ML Flow - try: - mlflow.end_run() - - experiment_name = user_id_to_experiment(user_id=user) - - # Creates a new experiment if it doesnt exist - experiment = mlflow.set_experiment(experiment_name) - - with mlflow.start_run(run_name=f"{source} autoencoder model training run", - experiment_id=experiment.experiment_id) as run: - - model_path = f"{model_path}-{run.info.run_uuid}" - - # Log all params in one dict to avoid round trips - mlflow.log_params({ - "Algorithm": "Denosing Autoencoder", - "Epochs": model.lr_decay.state_dict().get("last_epoch", "unknown"), - "Learning rate": model.lr, - "Batch size": model.batch_size, - "Start Epoch": message.get_meta("timestamp").min(), - "End Epoch": message.get_meta("timestamp").max(), - "Log Count": message.mess_count, - }) - - metrics_dict: typing.Dict[str, float] = {} - - # Add info on the embeddings - for k, val in model.categorical_fts.items(): - embedding = val.get("embedding", None) - - if (embedding is None): - continue - - metrics_dict[f"embedding-{k}-num_embeddings"] = embedding.num_embeddings - metrics_dict[f"embedding-{k}-embedding_dim"] = embedding.embedding_dim - - mlflow.log_metrics(metrics_dict) - - # Use the prepare_df function to setup the direct inputs to the model. Only include features - # returned by prepare_df to show the actual inputs to the model (any extra are discarded) - input_df = message.get_meta().iloc[0:1].to_pandas() - prepared_df = model.prepare_df(input_df) - output_values = model.get_anomaly_score(input_df) - - input_schema = Schema([ - ColSpec(type=_infer_pandas_column(input_df[col_name]), name=col_name) - for col_name in list(prepared_df.columns) - ]) - output_schema = _infer_schema(output_values) - - model_sig = ModelSignature(inputs=input_schema, outputs=output_schema) - - model_info = mlflow.pytorch.log_model( - pytorch_model=model, - artifact_path=model_path, - conda_env=conda_env, - signature=model_sig, - ) - - client = MlflowClient() - - # First ensure a registered model has been created - try: - create_model_response = client.create_registered_model(reg_model_name) - logger.debug("Successfully registered model '%s'.", create_model_response.name) - except MlflowException as e: - if e.error_code == ErrorCode.Name(RESOURCE_ALREADY_EXISTS): - pass - else: - raise e - - # If we are using databricks, make sure we set the correct permissions - if (databricks_permissions is not None and mlflow.get_tracking_uri() == "databricks"): - # Need to apply permissions - apply_model_permissions(reg_model_name=reg_model_name) - - model_src = RunsArtifactRepository.get_underlying_uri(model_info.model_uri) - - tags = { - "start": message.get_meta(timestamp_column_name).min(), - "end": message.get_meta(timestamp_column_name).max(), - "count": message.get_meta(timestamp_column_name).count() - } - - # Now create the model version - model_ver = client.create_model_version(name=reg_model_name, - source=model_src, - run_id=run.info.run_id, - tags=tags) - - logger.debug("ML Flow model upload complete: %s:%s:%s", user, reg_model_name, model_ver.version) - - except Exception: - logger.exception("Error uploading model to ML Flow", exc_info=True) - - return message + controller = MLFlowModelWriterController(model_name_formatter=model_name_formatter, + experiment_name_formatter=experiment_name_formatter, + databricks_permissions=databricks_permissions, + conda_env=conda_env, + timeout=timeout, + timestamp_column_name=timestamp_column_name) def node_fn(obs: mrc.Observable, sub: mrc.Subscriber): - obs.pipe(ops.map(on_data), ops.filter(lambda x: x is not None)).subscribe(sub) + obs.pipe(ops.map(controller.on_data), ops.filter(lambda x: x is not None)).subscribe(sub) node = builder.make_node(MLFLOW_MODEL_WRITER, mrc.core.operators.build(node_fn)) diff --git a/morpheus/modules/serialize.py b/morpheus/modules/serialize.py index 3263e33759..9fd8b4bd31 100644 --- a/morpheus/modules/serialize.py +++ b/morpheus/modules/serialize.py @@ -13,17 +13,11 @@ # limitations under the License. import logging -import re -import typing from functools import partial import mrc -import pandas as pd -import cudf - -from morpheus.messages import MultiMessage -from morpheus.messages.message_meta import MessageMeta +from morpheus.controllers.serialize_controller import SerializeController from morpheus.utils.module_ids import MORPHEUS_MODULE_NAMESPACE from morpheus.utils.module_ids import SERIALIZE from morpheus.utils.module_utils import register_module @@ -58,64 +52,17 @@ def serialize(builder: mrc.Builder): config = builder.get_current_module_config() - include_columns = config.get("include", None) - exclude_columns = config.get("exclude", [r'^ID$', r'^_ts_']) + include = config.get("include", None) + exclude = config.get("exclude", [r'^ID$', r'^_ts_']) fixed_columns = config.get("fixed_columns", True) - columns = config.get("columns", None) - use_cpp = config.get("use_cpp", False) - - def convert_to_df(x: MultiMessage, - include_columns: typing.Pattern, - exclude_columns: typing.List[typing.Pattern], - columns: typing.List[str]): - """ - Converts dataframe to entries to JSON lines. - - Parameters - ---------- - x : `morpheus.pipeline.messages.MultiMessage` - MultiMessage instance that contains data. - include_columns : typing.Pattern - Columns that are required send to downstream stage. - exclude_columns : typing.List[typing.Pattern] - Columns that are not required send to downstream stage. - columns : typing.List[str] - Explicit list of columns to include, if not `None` and `fixed_columns` is `True`, then `include_columns` - and `exclude_columns` will be ignored. - """ - - if (not fixed_columns or columns is None): - columns: typing.List[str] = [] - - # Minimize access to x.meta.df - df_columns = list(x.meta.df.columns) - - # First build up list of included. If no include regex is specified, select all - if (include_columns is None): - columns = df_columns - else: - columns = [y for y in df_columns if include_columns.match(y)] - - # Now remove by the ignore - for test in exclude_columns: - columns = [y for y in columns if not test.match(y)] - - # Get metadata from columns - df = x.get_meta(columns) - - if (isinstance(df, pd.DataFrame) and use_cpp): - df = cudf.from_pandas(df) - - return MessageMeta(df=df) - if (include_columns is not None and len(include_columns) > 0): - include_columns = re.compile(f"({'|'.join(include_columns)})") + controller = SerializeController(include=include, exclude=exclude, fixed_columns=fixed_columns) - exclude_columns = [re.compile(x) for x in exclude_columns] + include_columns = controller.get_include_col_pattern() + exclude_columns = controller.get_exclude_col_pattern() node = builder.make_node( - SERIALIZE, - partial(convert_to_df, include_columns=include_columns, exclude_columns=exclude_columns, columns=columns)) + SERIALIZE, partial(controller.convert_to_df, include_columns=include_columns, exclude_columns=exclude_columns)) # Register input and output port for a module. builder.register_module_input("input", node) diff --git a/morpheus/modules/write_to_file.py b/morpheus/modules/write_to_file.py index 5067bb45b8..c2a7b0b9b2 100644 --- a/morpheus/modules/write_to_file.py +++ b/morpheus/modules/write_to_file.py @@ -14,19 +14,11 @@ """To File Sink Module.""" import logging -import os -import typing import mrc -import pandas as pd -from mrc.core import operators as ops - -import cudf from morpheus.common import FileTypes -from morpheus.common import determine_file_type -from morpheus.io import serializers -from morpheus.messages.message_meta import MessageMeta +from morpheus.controllers.write_to_file_controller import WriteToFileController from morpheus.utils.module_ids import MORPHEUS_MODULE_NAMESPACE from morpheus.utils.module_ids import WRITE_TO_FILE from morpheus.utils.module_utils import register_module @@ -55,67 +47,19 @@ def write_to_file(builder: mrc.Builder): """ config = builder.get_current_module_config() - output_file = config.get("filename", None) + filename = config.get("filename", None) overwrite = config.get("overwrite", False) flush = config.get("flush", False) file_type = config.get("file_type", FileTypes.Auto) include_index_col = config.get("include_index_col", True) - is_first = True - - if (os.path.exists(output_file)): - if (overwrite): - os.remove(output_file) - else: - raise FileExistsError( - f"Cannot output classifications to '{output_file}'. File exists and overwrite = False") - - if (file_type == FileTypes.Auto): - file_type = determine_file_type(output_file) - - def convert_to_strings(df: typing.Union[pd.DataFrame, cudf.DataFrame]): - nonlocal is_first - - if (file_type == FileTypes.JSON): - output_strs = serializers.df_to_json(df, include_index_col=include_index_col) - elif (file_type == FileTypes.CSV): - output_strs = serializers.df_to_csv(df, include_header=is_first, include_index_col=include_index_col) - else: - raise NotImplementedError(f"Unknown file type: {file_type}") - - is_first = False - - # Remove any trailing whitespace - if (len(output_strs[-1].strip()) == 0): - output_strs = output_strs[:-1] - - return output_strs - - # Sink to file - - def node_fn(obs: mrc.Observable, sub: mrc.Subscriber): - - # Ensure our directory exists - os.makedirs(os.path.realpath(os.path.dirname(output_file)), exist_ok=True) - - # Open up the file handle - with open(output_file, "a", encoding='UTF-8') as out_file: - - def _write_to_file(x: MessageMeta): - lines = convert_to_strings(x.df) - - out_file.writelines(lines) - - if flush: - out_file.flush() - - return x - - obs.pipe(ops.map(_write_to_file)).subscribe(sub) - - # File should be closed by here + controller = WriteToFileController(filename=filename, + overwrite=overwrite, + file_type=file_type, + include_index_col=include_index_col, + flush=flush) - node = builder.make_node(WRITE_TO_FILE, mrc.core.operators.build(node_fn)) + node = builder.make_node(WRITE_TO_FILE, mrc.core.operators.build(controller.node_fn)) # Register input and output port for a module. builder.register_module_input("input", node) diff --git a/morpheus/stages/general/monitor_stage.py b/morpheus/stages/general/monitor_stage.py index c3e318bcd6..66b6118407 100644 --- a/morpheus/stages/general/monitor_stage.py +++ b/morpheus/stages/general/monitor_stage.py @@ -21,10 +21,10 @@ from morpheus.cli.register_stage import register_stage from morpheus.config import Config +from morpheus.controllers.monitor_controller import MonitorController from morpheus.pipeline.single_port_stage import SinglePortStage from morpheus.pipeline.stream_pair import StreamPair from morpheus.utils.logger import LogLevels -from morpheus.utils.monitor_utils import MonitorController logger = logging.getLogger(__name__) diff --git a/morpheus/stages/output/write_to_file_stage.py b/morpheus/stages/output/write_to_file_stage.py index c6405587e8..a23468a418 100644 --- a/morpheus/stages/output/write_to_file_stage.py +++ b/morpheus/stages/output/write_to_file_stage.py @@ -13,7 +13,6 @@ # limitations under the License. """Write to file stage.""" -import os import typing import mrc @@ -22,13 +21,11 @@ import morpheus._lib.stages as _stages from morpheus.cli.register_stage import register_stage from morpheus.common import FileTypes -from morpheus.common import determine_file_type from morpheus.config import Config -from morpheus.io import serializers +from morpheus.controllers.write_to_file_controller import WriteToFileController from morpheus.messages import MessageMeta from morpheus.pipeline.single_port_stage import SinglePortStage from morpheus.pipeline.stream_pair import StreamPair -from morpheus.utils.type_aliases import DataFrameType @register_stage("to-file", rename_options={"include_index_col": "--include-index-col"}) @@ -65,24 +62,11 @@ def __init__(self, super().__init__(c) - self._output_file = filename - self._overwrite = overwrite - - if (os.path.exists(self._output_file)): - if (self._overwrite): - os.remove(self._output_file) - else: - raise FileExistsError( - f"Cannot output classifications to '{self._output_file}'. File exists and overwrite = False") - - self._file_type = file_type - - if (self._file_type == FileTypes.Auto): - self._file_type = determine_file_type(self._output_file) - - self._is_first = True - self._include_index_col = include_index_col - self._flush = flush + self._controller = WriteToFileController(filename=filename, + overwrite=overwrite, + file_type=file_type, + include_index_col=include_index_col, + flush=flush) @property def name(self) -> str: @@ -105,23 +89,6 @@ def supports_cpp_node(self): """Indicates whether this stage supports a C++ node.""" return True - def _convert_to_strings(self, df: DataFrameType): - if (self._file_type == FileTypes.JSON): - output_strs = serializers.df_to_json(df, include_index_col=self._include_index_col) - elif (self._file_type == FileTypes.CSV): - output_strs = serializers.df_to_csv(df, - include_header=self._is_first, - include_index_col=self._include_index_col) - self._is_first = False - else: - raise NotImplementedError(f"Unknown file type: {self._file_type}") - - # Remove any trailing whitespace - if (len(output_strs[-1].strip()) == 0): - output_strs = output_strs[:-1] - - return output_strs - def _build_single(self, builder: mrc.Builder, input_stream: StreamPair) -> StreamPair: stream = input_stream[0] @@ -130,37 +97,14 @@ def _build_single(self, builder: mrc.Builder, input_stream: StreamPair) -> Strea if (self._build_cpp_node()): to_file = _stages.WriteToFileStage(builder, self.unique_name, - self._output_file, + self._controller.output_file, "w", - self._file_type, - self._include_index_col, - self._flush) + self._controller.file_type, + self._controller.include_index_col, + self._controller.flush) else: - def node_fn(obs: mrc.Observable, sub: mrc.Subscriber): - - # Ensure our directory exists - os.makedirs(os.path.realpath(os.path.dirname(self._output_file)), exist_ok=True) - - # Open up the file handle - with open(self._output_file, "a", encoding='UTF-8') as out_file: - - def write_to_file(x: MessageMeta): - - lines = self._convert_to_strings(x.df) - - out_file.writelines(lines) - - if self._flush: - out_file.flush() - - return x - - obs.pipe(ops.map(write_to_file)).subscribe(sub) - - # File should be closed by here - - to_file = builder.make_node(self.unique_name, ops.build(node_fn)) + to_file = builder.make_node(self.unique_name, ops.build(self._controller.node_fn)) builder.make_edge(stream, to_file) stream = to_file diff --git a/morpheus/stages/postprocess/filter_detections_stage.py b/morpheus/stages/postprocess/filter_detections_stage.py index fb24c7f142..2682300e68 100644 --- a/morpheus/stages/postprocess/filter_detections_stage.py +++ b/morpheus/stages/postprocess/filter_detections_stage.py @@ -15,16 +15,14 @@ import logging import typing -import cupy as cp import mrc -import numpy as np -import typing_utils from mrc.core import operators as ops import morpheus._lib.stages as _stages from morpheus.cli.register_stage import register_stage from morpheus.common import FilterSource from morpheus.config import Config +from morpheus.controllers.filter_detections_controller import FilterDetectionsController from morpheus.messages import MultiMessage from morpheus.messages import MultiResponseMessage from morpheus.pipeline.single_port_stage import SinglePortStage @@ -85,12 +83,10 @@ def __init__(self, field_name: str = "probs"): super().__init__(c) - # Probability to consider a detection - self._threshold = threshold self._copy = copy - - self._filter_source = filter_source - self._field_name = field_name + self._controller = FilterDetectionsController(threshold=threshold, + filter_source=filter_source, + field_name=field_name) @property def name(self) -> str: @@ -106,7 +102,7 @@ def accepted_types(self) -> typing.Tuple: Accepted input types. """ - if self._filter_source == FilterSource.TENSOR: + if self._controller.filter_source == FilterSource.TENSOR: return (MultiResponseMessage, ) return (MultiMessage, ) @@ -115,109 +111,27 @@ def supports_cpp_node(self): # Enable support by default return True - def _find_detections(self, x: MultiMessage) -> typing.Union[cp.ndarray, np.ndarray]: - # Determind the filter source - if self._filter_source == FilterSource.TENSOR: - filter_source = x.get_output(self._field_name) - else: - filter_source = x.get_meta(self._field_name).values - - if (isinstance(filter_source, np.ndarray)): - array_mod = np - else: - array_mod = cp - - # Get per row detections - detections = (filter_source > self._threshold) - - if (len(detections.shape) > 1): - detections = detections.any(axis=1) - - # Surround in False to ensure we get an even number of pairs - detections = array_mod.concatenate([array_mod.array([False]), detections, array_mod.array([False])]) - - return array_mod.where(detections[1:] != detections[:-1])[0].reshape((-1, 2)) - - def filter_copy(self, x: MultiMessage) -> MultiMessage: - """ - This function uses a threshold value to filter the messages. - - Parameters - ---------- - x : `morpheus.pipeline.messages.MultiMessage` - Response message with probabilities calculated from inference results. - - Returns - ------- - `morpheus.pipeline.messages.MultiMessage` - A new message containing a copy of the rows above the threshold. - - """ - if x is None: - return None - - true_pairs = self._find_detections(x) - - # If we didnt have any detections, return None - if (true_pairs.shape[0] == 0): - return None - - return x.copy_ranges(true_pairs) - - def filter_slice(self, x: MultiMessage) -> typing.List[MultiMessage]: - """ - This function uses a threshold value to filter the messages. - - Parameters - ---------- - x : `morpheus.pipeline.messages.MultiMessage` - Response message with probabilities calculated from inference results. - - Returns - ------- - typing.List[`morpheus.pipeline.messages.MultiMessage`] - List of filtered messages. - - """ - # Unfortunately we have to convert this to a list in case there are non-contiguous groups - output_list = [] - if x is not None: - true_pairs = self._find_detections(x) - for pair in true_pairs: - pair = tuple(pair.tolist()) - if ((pair[1] - pair[0]) > 0): - output_list.append(x.get_slice(*pair)) - - return output_list - def _build_single(self, builder: mrc.Builder, input_stream: StreamPair) -> StreamPair: (parent_node, message_type) = input_stream - if self._filter_source == FilterSource.Auto: - if (typing_utils.issubtype(message_type, MultiResponseMessage)): - self._filter_source = FilterSource.TENSOR - else: - self._filter_source = FilterSource.DATAFRAME - logger.debug(("filter_source was set to Auto, inferring a filter source of %s based on an input " - "message type of %s"), - self._filter_source, - message_type) + self._controller.update_filter_source(message_type=message_type) if self._build_cpp_node(): node = _stages.FilterDetectionsStage(builder, self.unique_name, - self._threshold, + self._controller.threshold, self._copy, - self._filter_source, - self._field_name) + self._controller.filter_source, + self._controller.field_name) else: + if self._copy: node = builder.make_node(self.unique_name, - ops.map(self.filter_copy), + ops.map(self._controller.filter_copy), ops.filter(lambda x: x is not None)) else: # Use `ops.flatten` to convert the list returned by `filter_slice` back to individual messages - node = builder.make_node(self.unique_name, ops.map(self.filter_slice), ops.flatten()) + node = builder.make_node(self.unique_name, ops.map(self._controller.filter_slice), ops.flatten()) builder.make_edge(parent_node, node) diff --git a/morpheus/stages/postprocess/serialize_stage.py b/morpheus/stages/postprocess/serialize_stage.py index 7c421c8bf0..9f72426aa5 100644 --- a/morpheus/stages/postprocess/serialize_stage.py +++ b/morpheus/stages/postprocess/serialize_stage.py @@ -12,8 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -import copy -import re import typing from functools import partial @@ -23,6 +21,7 @@ import morpheus._lib.stages as _stages from morpheus.cli.register_stage import register_stage from morpheus.config import Config +from morpheus.controllers.serialize_controller import SerializeController from morpheus.messages import MessageMeta from morpheus.messages import MultiMessage from morpheus.pipeline.single_port_stage import SinglePortStage @@ -62,11 +61,7 @@ def __init__(self, if (exclude is None): exclude = [r'^ID$', r'^_ts_'] - # Make copies of the arrays to prevent changes after the Regex is compiled - self._include_columns = copy.copy(include) - self._exclude_columns = copy.copy(exclude) - self._fixed_columns = fixed_columns - self._columns = None + self._controller = SerializeController(include=include, exclude=exclude, fixed_columns=fixed_columns) @property def name(self) -> str: @@ -88,67 +83,23 @@ def supports_cpp_node(self): # Enable support by default return True - def convert_to_df(self, - x: MultiMessage, - include_columns: typing.Pattern, - exclude_columns: typing.List[typing.Pattern]): - """ - Converts dataframe to entries to JSON lines. - - Parameters - ---------- - x : `morpheus.pipeline.messages.MultiMessage` - MultiMessage instance that contains data. - include_columns : typing.Pattern - Columns that are required send to downstream stage. - exclude_columns : typing.List[typing.Pattern] - Columns that are not required send to downstream stage. - - """ - - if self._fixed_columns and self._columns is not None: - columns = self._columns - else: - columns: typing.List[str] = [] - - # Minimize access to x.meta.df - df_columns = list(x.meta.df.columns) - - # First build up list of included. If no include regex is specified, select all - if (include_columns is None): - columns = df_columns - else: - columns = [y for y in df_columns if include_columns.match(y)] - - # Now remove by the ignore - for test in exclude_columns: - columns = [y for y in columns if not test.match(y)] - - self._columns = columns - - # Get metadata from columns - df = x.get_meta(columns) - - return MessageMeta(df=df) - def _build_single(self, builder: mrc.Builder, input_stream: StreamPair) -> StreamPair: if (self._build_cpp_node()): stream = _stages.SerializeStage(builder, self.unique_name, - self._include_columns or [], - self._exclude_columns, - self._fixed_columns) + self._controller.include_columns or [], + self._controller.exclude_columns, + self._controller.fixed_columns) else: - include_columns = None - - if (self._include_columns is not None and len(self._include_columns) > 0): - include_columns = re.compile(f"({'|'.join(self._include_columns)})") - - exclude_columns = [re.compile(x) for x in self._exclude_columns] + include_columns = self._controller.get_include_col_pattern() + exclude_columns = self._controller.get_exclude_col_pattern() stream = builder.make_node( self.unique_name, - ops.map(partial(self.convert_to_df, include_columns=include_columns, exclude_columns=exclude_columns))) + ops.map( + partial(self._controller.convert_to_df, + include_columns=include_columns, + exclude_columns=exclude_columns))) builder.make_edge(input_stream[0], stream) diff --git a/morpheus/utils/column_info.py b/morpheus/utils/column_info.py index 2ca7078a38..80f7e69694 100644 --- a/morpheus/utils/column_info.py +++ b/morpheus/utils/column_info.py @@ -582,6 +582,22 @@ def _process_column(self, df: pd.DataFrame) -> pd.Series: return increment_col.astype(self.get_pandas_dtype()) +@dataclasses.dataclass +class PreparedDFInfo: + """ + Represents the result of preparing a DataFrame along with avilable columns to be preserved. + + Attributes + ---------- + df : typing.Union[pd.DataFrame, cudf.DataFrame] + The prepared DataFrame. + columns_to_preserve : typing.List[str] + A list of column names that are to be preserved. + """ + df: typing.Union[pd.DataFrame, cudf.DataFrame] + columns_to_preserve: typing.List[str] + + def _json_flatten(df_input: typing.Union[pd.DataFrame, cudf.DataFrame], input_columns: dict[str, str], json_cols: list[str], @@ -607,9 +623,14 @@ def _json_flatten(df_input: typing.Union[pd.DataFrame, cudf.DataFrame], The processed DataFrame. """ + columns_to_preserve = set() + + if (preserve_re): + columns_to_preserve.update(col for col in df_input.columns if re.match(preserve_re, col)) + # Early exit if (json_cols is None or len(json_cols) == 0): - return df_input + return PreparedDFInfo(df=df_input, columns_to_preserve=list(columns_to_preserve)) # Check if we even have any JSON columns to flatten if (not df_input.columns.intersection(json_cols).empty): @@ -620,9 +641,9 @@ def _json_flatten(df_input: typing.Union[pd.DataFrame, cudf.DataFrame], df_input = df_input.to_pandas() json_normalized = [] - cols_to_keep = list(df_input.columns) + columns_to_keep = list(df_input.columns) for col in json_cols: - if (col not in cols_to_keep): + if (col not in columns_to_keep): continue pd_series = df_input[col] @@ -639,12 +660,11 @@ def _json_flatten(df_input: typing.Union[pd.DataFrame, cudf.DataFrame], json_normalized.append(pdf_norm) - # Remove from the list of remaining columns if (preserve_re is None or not preserve_re.match(col)): - cols_to_keep.remove(col) + columns_to_keep.remove(col) # Combine the original DataFrame with the normalized JSON columns - df_input = pd.concat([df_input[cols_to_keep]] + json_normalized, axis=1) + df_input = pd.concat([df_input[columns_to_keep]] + json_normalized, axis=1) if (convert_to_cudf): df_input = cudf.from_pandas(df_input).reset_index(drop=True) @@ -654,7 +674,7 @@ def _json_flatten(df_input: typing.Union[pd.DataFrame, cudf.DataFrame], df_input = df_input.astype(input_columns) - return df_input + return PreparedDFInfo(df=df_input, columns_to_preserve=list(columns_to_preserve)) def _resolve_json_output_columns(json_cols: list[str], input_cols: dict[str, str]) -> list[tuple[str, str]]: diff --git a/morpheus/utils/monitor_utils.py b/morpheus/utils/monitor_utils.py index e37567d692..586d0730d2 100644 --- a/morpheus/utils/monitor_utils.py +++ b/morpheus/utils/monitor_utils.py @@ -13,21 +13,11 @@ # limitations under the License. import logging -import typing -from functools import reduce -import fsspec from tqdm import TMonitor from tqdm import TqdmSynchronisationWarning from tqdm import tqdm -import cudf - -from morpheus.messages import ControlMessage -from morpheus.messages import MessageMeta -from morpheus.messages import MultiMessage -from morpheus.utils.logger import LogLevels - logger = logging.getLogger(__name__) @@ -144,208 +134,3 @@ class SilentMorpheusTqdm(MorpheusTqdm): def refresh(self, nolock=False, lock_args=None): return - - -class MonitorController: - """ - Controls and displays throughput numbers at a specific point in the pipeline. - - Parameters - ---------- - position: int - Specifies the monitor's position on the console. - description : str, default = "Progress" - Name to show for this Monitor Stage in the console window. - smoothing : float - Smoothing parameter to determine how much the throughput should be averaged. 0 = Instantaneous, 1 = - Average. - unit : str - Units to show in the rate value. - delayed_start : bool - When delayed_start is enabled, the progress bar will not be shown until the first message is received. - Otherwise, the progress bar is shown on pipeline startup and will begin timing immediately. In large pipelines, - this option may be desired to give a more accurate timing. - determine_count_fn : typing.Callable[[typing.Any], int] - Custom function for determining the count in a message. Gets called for each message. Allows for - correct counting of batched and sliced messages. - log_level : `morpheus.utils.logger.LogLevels`, default = 'INFO' - Enable this stage when the configured log level is at `log_level` or lower. - tqdm_class: `tqdm`, default = None - Custom implementation of tqdm if required. - """ - - controller_count: int = 0 - - def __init__(self, - position: int, - description: str, - smoothing: float, - unit: str, - delayed_start: bool, - determine_count_fn: typing.Callable[[typing.Any], int], - log_level: LogLevels, - tqdm_class: tqdm = None): - - self._progress: tqdm = None - self._position = position - self._description = description - self._smoothing = smoothing - self._unit = unit - self._delayed_start = delayed_start - self._determine_count_fn = determine_count_fn - self._tqdm_class = tqdm_class if tqdm_class else MorpheusTqdm - - if isinstance(log_level, LogLevels): # pylint: disable=isinstance-second-argument-not-valid-type - log_level = log_level.value - - self._log_level = log_level - self._enabled = None # defined on first call to _is_enabled - - @property - def delayed_start(self) -> bool: - return self._delayed_start - - @property - def progress(self) -> tqdm: - return self._progress - - def is_enabled(self) -> bool: - """ - Returns a boolean indicating whether or not the logger is enabled. - """ - - if self._enabled is None: - self._enabled = logger.isEnabledFor(self._log_level) - - return self._enabled - - def ensure_progress_bar(self): - """ - Ensures that the progress bar is initialized and ready for display. - """ - - if (self._progress is None): - self._progress = self._tqdm_class(desc=self._description, - smoothing=self._smoothing, - dynamic_ncols=True, - unit=(self._unit if self._unit.startswith(" ") else f" {self._unit}"), - mininterval=0.25, - maxinterval=1.0, - miniters=1, - position=self._position) - - self._progress.reset() - - def refresh_progress(self, _): - """ - Refreshes the progress bar display. - """ - self._progress.refresh() - - def progress_sink(self, x: typing.Union[cudf.DataFrame, MultiMessage, MessageMeta, ControlMessage, typing.List]): - """ - Receives a message and determines the count of the message. - The progress bar is displayed and the progress is updated. - - Parameters - ---------- - x: typing.Union[cudf.DataFrame, MultiMessage, MessageMeta, ControlMessage, typing.List] - Message that determines the count of the message - - Returns - ------- - x: typing.Union[cudf.DataFrame, MultiMessage, MessageMeta, ControlMessage, typing.List] - - """ - - # Make sure the progress bar is shown - self.ensure_progress_bar() - - if (self._determine_count_fn is None): - self._determine_count_fn = self.auto_count_fn(x) - - # Skip incase we have empty objects - if (self._determine_count_fn is None): - return x - - # Do our best to determine the count - count = self._determine_count_fn(x) - - self._progress.update(n=count) - - return x - - def auto_count_fn(self, x: typing.Union[cudf.DataFrame, MultiMessage, MessageMeta, ControlMessage, typing.List]): - """ - This is a helper function that is used to determine the count of messages received by the - monitor. - - Parameters - ---------- - x: typing.Union[cudf.DataFrame, MultiMessage, MessageMeta, ControlMessage, typing.List] - Message that determines the count of the message - - Returns - ------- - Message count. - - """ - - # pylint: disable=too-many-return-statements - - if (x is None): - return None - - # Wait for a list thats not empty - if (isinstance(x, list) and len(x) == 0): - return None - - if (isinstance(x, cudf.DataFrame)): - return lambda y: len(y.index) - - if (isinstance(x, MultiMessage)): - return lambda y: y.mess_count - - if (isinstance(x, MessageMeta)): - return lambda y: y.count - - if isinstance(x, ControlMessage): - - def check_df(y): - df = y.payload().df - if df is not None: - return len(df) - - return 0 - - return check_df - - if (isinstance(x, list)): - item_count_fn = self.auto_count_fn(x[0]) - return lambda y: reduce(lambda sum, z, item_count_fn=item_count_fn: sum + item_count_fn(z), y, 0) - - if (isinstance(x, (str, fsspec.core.OpenFile))): - return lambda y: 1 - - if (hasattr(x, "__len__")): - return len # Return len directly (same as `lambda y: len(y)`) - - raise NotImplementedError(f"Unsupported type: {type(x)}") - - def sink_on_completed(self): - """ - Stops the progress bar and prevents the monitors from writing over each other when the last - stage completes. - """ - - # Set the name to complete. This refreshes the display - self.progress.set_description_str(self.progress.desc + "[Complete]") - - self.progress.stop() - - # To prevent the monitors from writing over eachother, stop the monitor when the last stage completes - MonitorController.controller_count -= 1 - - if (MonitorController.controller_count <= 0 and self._tqdm_class.monitor is not None): - self._tqdm_class.monitor.exit() - self._tqdm_class.monitor = None diff --git a/morpheus/utils/schema_transforms.py b/morpheus/utils/schema_transforms.py index 37fa539fb1..8abbccf9c3 100644 --- a/morpheus/utils/schema_transforms.py +++ b/morpheus/utils/schema_transforms.py @@ -22,6 +22,7 @@ import cudf from morpheus.utils.column_info import DataFrameInputSchema +from morpheus.utils.column_info import PreparedDFInfo from morpheus.utils.nvt import patches from morpheus.utils.nvt.extensions import morpheus_ext from morpheus.utils.nvt.schema_converters import create_and_attach_nvt_workflow @@ -101,10 +102,18 @@ def process_dataframe( # Note(Devin): pre-flatten to avoid Dask hang when calling json_normalize within an NVT operator if (input_schema.prep_dataframe is not None): - df_in = input_schema.prep_dataframe(df_in) + prepared_df_info: PreparedDFInfo = input_schema.prep_dataframe(df_in) nvt_workflow = input_schema.nvt_workflow + preserve_df = None + + if prepared_df_info is not None: + df_in = prepared_df_info.df + + if prepared_df_info.columns_to_preserve: + preserve_df = df_in[prepared_df_info.columns_to_preserve] + if (convert_to_pd): df_in = cudf.DataFrame(df_in) @@ -127,6 +136,17 @@ def process_dataframe( df_result.set_index(saved_index.take(df_result.index), inplace=True) if (convert_to_pd): - return df_result.to_pandas() + df_result = df_result.to_pandas() + + # Restore preserved columns + if (preserve_df is not None): + # Ensure there is no overlap with columns to preserve + columns_to_merge = set(preserve_df.columns) - set(df_result.columns) + columns_to_merge = list(columns_to_merge) + if (columns_to_merge): + if (convert_to_pd): + df_result = pd.concat([df_result, preserve_df[columns_to_merge]], axis=1) + else: + df_result = cudf.concat([df_result, preserve_df[columns_to_merge]], axis=1) return df_result diff --git a/tests/examples/digital_fingerprinting/test_dfp_file_to_df.py b/tests/examples/digital_fingerprinting/test_dfp_file_to_df.py index 7502aaaa78..be0e7c0848 100644 --- a/tests/examples/digital_fingerprinting/test_dfp_file_to_df.py +++ b/tests/examples/digital_fingerprinting/test_dfp_file_to_df.py @@ -26,6 +26,7 @@ from _utils.dataset_manager import DatasetManager from morpheus.common import FileTypes from morpheus.config import Config +from morpheus.controllers.file_to_df_controller import single_object_to_dataframe from morpheus.pipeline.preallocator_mixin import PreallocatorMixin from morpheus.pipeline.single_port_stage import SinglePortStage from morpheus.utils.column_info import CustomColumn @@ -47,12 +48,11 @@ def single_file_obj(): # pylint: disable=redefined-outer-name def test_single_object_to_dataframe(single_file_obj: fsspec.core.OpenFile): - from dfp.stages.dfp_file_to_df import _single_object_to_dataframe fake_lambda = mock.MagicMock() schema = DataFrameInputSchema(column_info=[CustomColumn(name='data', dtype=str, process_column_fn=fake_lambda)]) - df = _single_object_to_dataframe(single_file_obj, schema, FileTypes.Auto, False, {}) + df = single_object_to_dataframe(single_file_obj, schema, FileTypes.Auto, False, {}) fake_lambda.assert_not_called() assert sorted(df.columns) == sorted(['plugin', 'titles', 'data', 'count']) @@ -67,12 +67,11 @@ def test_single_object_to_dataframe(single_file_obj: fsspec.core.OpenFile): def test_single_object_to_dataframe_timeout(): - from dfp.stages.dfp_file_to_df import _single_object_to_dataframe input_glob = os.path.join(TEST_DIRS.tests_data_dir, 'appshield', 'snapshot-1', 'fake_wont_match*.json') bad_file = fsspec.core.OpenFile(fs=fsspec.open_files(input_glob).fs, path='/tmp/fake/doesnt/exit.csv') - assert _single_object_to_dataframe(bad_file, DataFrameInputSchema(), FileTypes.CSV, False, {}) is None + assert single_object_to_dataframe(bad_file, DataFrameInputSchema(), FileTypes.CSV, False, {}) is None @pytest.mark.usefixtures("restore_environ") @@ -92,11 +91,11 @@ def test_constructor(config: Config): assert isinstance(stage, SinglePortStage) assert isinstance(stage, PreallocatorMixin) - assert stage._schema is schema - assert stage._file_type == FileTypes.PARQUET - assert not stage._filter_null - assert stage._parser_kwargs == {'test': 'this'} - assert stage._cache_dir.startswith('/test/path/cache') + assert stage._controller._schema is schema + assert stage._controller._file_type == FileTypes.PARQUET + assert not stage._controller._filter_null + assert stage._controller._parser_kwargs == {'test': 'this'} + assert stage._controller._cache_dir.startswith('/test/path/cache') # pylint: disable=redefined-outer-name @@ -106,9 +105,9 @@ def test_constructor(config: Config): @mock.patch('multiprocessing.get_context') @mock.patch('dask.distributed.Client') @mock.patch('dask_cuda.LocalCUDACluster') -@mock.patch('dfp.stages.dfp_file_to_df._single_object_to_dataframe') +@mock.patch('morpheus.controllers.file_to_df_controller.single_object_to_dataframe') @mock.patch('morpheus.utils.downloader.Distributed') -@mock.patch('dfp.stages.dfp_file_to_df.process_dataframe') +@mock.patch('morpheus.controllers.file_to_df_controller.process_dataframe') def test_get_or_create_dataframe_from_batch_cache_miss(mock_proc_df: mock.MagicMock, mock_distributed: mock.MagicMock, mock_obf_to_df: mock.MagicMock, @@ -172,9 +171,9 @@ def test_get_or_create_dataframe_from_batch_cache_miss(mock_proc_df: mock.MagicM if use_convert_to_dataframe: # convert_to_dataframe is a thin wrapper around _get_or_create_dataframe_from_batch, no need to create # a new test for it - output_df = stage.convert_to_dataframe((batch, 1)) + output_df = stage._controller.convert_to_dataframe((batch, 1)) else: - (output_df, cache_hit) = stage._get_or_create_dataframe_from_batch((batch, 1)) + (output_df, cache_hit) = stage._controller._get_or_create_dataframe_from_batch((batch, 1)) assert not cache_hit if dl_type in ("multiprocess", "multiprocessing"): @@ -200,7 +199,7 @@ def test_get_or_create_dataframe_from_batch_cache_miss(mock_proc_df: mock.MagicM dataset_pandas.assert_df_equal(output_df, expected_df) - expected_cache_file_path = os.path.join(stage._cache_dir, "batches", f"{expected_hash}.pkl") + expected_cache_file_path = os.path.join(stage._controller._cache_dir, "batches", f"{expected_hash}.pkl") assert os.path.exists(expected_cache_file_path) dataset_pandas.assert_df_equal(pd.read_pickle(expected_cache_file_path), expected_df[dataset_pandas['filter_probs.csv'].columns]) @@ -213,7 +212,7 @@ def test_get_or_create_dataframe_from_batch_cache_miss(mock_proc_df: mock.MagicM @mock.patch('dask.config') @mock.patch('dask.distributed.Client') @mock.patch('dask_cuda.LocalCUDACluster') -@mock.patch('dfp.stages.dfp_file_to_df._single_object_to_dataframe') +@mock.patch('morpheus.controllers.file_to_df_controller.single_object_to_dataframe') def test_get_or_create_dataframe_from_batch_cache_hit(mock_obf_to_df: mock.MagicMock, mock_dask_cluster: mock.MagicMock, mock_dask_client: mock.MagicMock, @@ -260,9 +259,9 @@ def test_get_or_create_dataframe_from_batch_cache_hit(mock_obf_to_df: mock.Magic if use_convert_to_dataframe: # convert_to_dataframe is a thin wrapper around _get_or_create_dataframe_from_batch, no need to create # a new test for it - output_df = stage.convert_to_dataframe((batch, 1)) + output_df = stage._controller.convert_to_dataframe((batch, 1)) else: - (output_df, cache_hit) = stage._get_or_create_dataframe_from_batch((batch, 1)) + (output_df, cache_hit) = stage._controller._get_or_create_dataframe_from_batch((batch, 1)) assert cache_hit # When we get a cache hit, none of the download methods should be executed @@ -283,7 +282,7 @@ def test_get_or_create_dataframe_from_batch_cache_hit(mock_obf_to_df: mock.Magic @mock.patch('dask.config') @mock.patch('dask.distributed.Client') @mock.patch('dask_cuda.LocalCUDACluster') -@mock.patch('dfp.stages.dfp_file_to_df._single_object_to_dataframe') +@mock.patch('morpheus.controllers.file_to_df_controller.single_object_to_dataframe') def test_get_or_create_dataframe_from_batch_none_noop(mock_obf_to_df: mock.MagicMock, mock_dask_cluster: mock.MagicMock, mock_dask_client: mock.MagicMock, @@ -304,10 +303,10 @@ def test_get_or_create_dataframe_from_batch_none_noop(mock_obf_to_df: mock.Magic os.environ['MORPHEUS_FILE_DOWNLOAD_TYPE'] = dl_type stage = DFPFileToDataFrameStage(config, DataFrameInputSchema(), cache_dir=tmp_path) if use_convert_to_dataframe: - assert stage.convert_to_dataframe(None) is None + assert stage._controller.convert_to_dataframe(None) is None else: with pytest.raises(RuntimeError, match="No file objects to process"): - stage._get_or_create_dataframe_from_batch(None) + stage._controller._get_or_create_dataframe_from_batch(None) mock_obf_to_df.assert_not_called() mock_dask_cluster.assert_not_called() diff --git a/tests/examples/digital_fingerprinting/test_dfp_mlflow_model_writer.py b/tests/examples/digital_fingerprinting/test_dfp_mlflow_model_writer.py index caf0dd532a..54b438d4a3 100644 --- a/tests/examples/digital_fingerprinting/test_dfp_mlflow_model_writer.py +++ b/tests/examples/digital_fingerprinting/test_dfp_mlflow_model_writer.py @@ -61,11 +61,12 @@ def mock_requests_fixture(): yield MockedRequests(mock_requests_get, mock_requests_patch, mock_response) -@pytest.fixture(name="mock_mlflow") -def mock_mlflow_fixture(): - with (mock.patch("dfp.stages.dfp_mlflow_model_writer.MlflowClient") as mock_mlflow_client, - mock.patch("dfp.stages.dfp_mlflow_model_writer.ModelSignature") as mock_model_signature, - mock.patch("dfp.stages.dfp_mlflow_model_writer.RunsArtifactRepository") as mock_runs_artifact_repository, +@pytest.fixture +def mock_mlflow(): + with (mock.patch("morpheus.controllers.mlflow_model_writer_controller.MlflowClient") as mock_mlflow_client, + mock.patch("morpheus.controllers.mlflow_model_writer_controller.ModelSignature") as mock_model_signature, + mock.patch("morpheus.controllers.mlflow_model_writer_controller.RunsArtifactRepository") as + mock_runs_artifact_repository, mock.patch("mlflow.end_run") as mock_mlflow_end_run, mock.patch("mlflow.get_tracking_uri") as mock_mlflow_get_tracking_uri, mock.patch("mlflow.log_metrics") as mock_mlflow_log_metrics, @@ -114,9 +115,9 @@ def test_constructor(config: Config): experiment_name_formatter="/test/{user_id}-{user_md5}-{reg_model_name}", databricks_permissions={'test': 'this'}) assert isinstance(stage, SinglePortStage) - assert stage._model_name_formatter == "test_model_name-{user_id}-{user_md5}" - assert stage._experiment_name_formatter == "/test/{user_id}-{user_md5}-{reg_model_name}" - assert stage._databricks_permissions == {'test': 'this'} + assert stage._controller.model_name_formatter == "test_model_name-{user_id}-{user_md5}" + assert stage._controller.experiment_name_formatter == "/test/{user_id}-{user_md5}-{reg_model_name}" + assert stage._controller.databricks_permissions == {'test': 'this'} @pytest.mark.parametrize( @@ -131,7 +132,7 @@ def test_user_id_to_model(config: Config, model_name_formatter: str, user_id: st from dfp.stages.dfp_mlflow_model_writer import DFPMLFlowModelWriterStage stage = DFPMLFlowModelWriterStage(config, model_name_formatter=model_name_formatter) - assert stage.user_id_to_model(user_id) == expected_val + assert stage._controller.user_id_to_model(user_id) == expected_val @pytest.mark.parametrize("experiment_name_formatter,user_id,expected_val", @@ -151,7 +152,7 @@ def test_user_id_to_experiment(config: Config, experiment_name_formatter: str, u stage = DFPMLFlowModelWriterStage(config, model_name_formatter="dfp-{user_id}", experiment_name_formatter=experiment_name_formatter) - assert stage.user_id_to_experiment(user_id) == expected_val + assert stage._controller.user_id_to_experiment(user_id) == expected_val def verify_apply_model_permissions(mock_requests: MockedRequests, @@ -177,8 +178,8 @@ def verify_apply_model_permissions(mock_requests: MockedRequests, def test_apply_model_permissions(config: Config, databricks_env: dict, mock_requests: MockedRequests): from dfp.stages.dfp_mlflow_model_writer import DFPMLFlowModelWriterStage databricks_permissions = OrderedDict([('group1', 'CAN_READ'), ('group2', 'CAN_WRITE')]) - stage = DFPMLFlowModelWriterStage(config, databricks_permissions=databricks_permissions) - stage._apply_model_permissions("test_experiment") + stage = DFPMLFlowModelWriterStage(config, databricks_permissions=databricks_permissions, timeout=10) + stage._controller._apply_model_permissions("test_experiment") verify_apply_model_permissions(mock_requests, databricks_env, databricks_permissions, 'test_experiment') @@ -206,7 +207,7 @@ def test_apply_model_permissions_no_perms_error(config: Config, from dfp.stages.dfp_mlflow_model_writer import DFPMLFlowModelWriterStage stage = DFPMLFlowModelWriterStage(config) with pytest.raises(RuntimeError): - stage._apply_model_permissions("test_experiment") + stage._controller._apply_model_permissions("test_experiment") mock_requests.get.assert_not_called() mock_requests.patch.assert_not_called() @@ -217,8 +218,8 @@ def test_apply_model_permissions_requests_error(config: Config, mock_requests: M from dfp.stages.dfp_mlflow_model_writer import DFPMLFlowModelWriterStage mock_requests.get.side_effect = RuntimeError("test error") - stage = DFPMLFlowModelWriterStage(config) - stage._apply_model_permissions("test_experiment") + stage = DFPMLFlowModelWriterStage(config, timeout=10) + stage._controller._apply_model_permissions("test_experiment") # This method just catches and logs any errors mock_requests.get.assert_called_once() @@ -227,13 +228,14 @@ def test_apply_model_permissions_requests_error(config: Config, mock_requests: M @pytest.mark.parametrize("databricks_permissions", [None, {}]) @pytest.mark.parametrize("tracking_uri", ['file:///home/user/morpheus/mlruns', "databricks"]) -def test_on_data(config: Config, - mock_mlflow: MockedMLFlow, - mock_requests: MockedRequests, - dataset_pandas: DatasetManager, - databricks_env: dict, - databricks_permissions: dict, - tracking_uri: str): +def test_on_data( + config: Config, + mock_mlflow: MockedMLFlow, # pylint: disable=redefined-outer-name + mock_requests: MockedRequests, + dataset_pandas: DatasetManager, + databricks_env: dict, + databricks_permissions: dict, + tracking_uri: str): from dfp.messages.multi_dfp_message import DFPMessageMeta from dfp.stages.dfp_mlflow_model_writer import DFPMLFlowModelWriterStage from dfp.stages.dfp_mlflow_model_writer import conda_env @@ -271,8 +273,8 @@ def test_on_data(config: Config, meta = DFPMessageMeta(df, 'Account-123456789') msg = MultiAEMessage(meta=meta, model=mock_model) - stage = DFPMLFlowModelWriterStage(config, databricks_permissions=databricks_permissions) - assert stage.on_data(msg) is msg # Should be a pass-thru + stage = DFPMLFlowModelWriterStage(config, databricks_permissions=databricks_permissions, timeout=10) + assert stage._controller.on_data(msg) is msg # Should be a pass-thru # Test mocks in order that they're called mock_mlflow.end_run.assert_called_once() diff --git a/tests/test_cli.py b/tests/test_cli.py index a2379fe074..23408165ee 100755 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -244,7 +244,7 @@ def test_pipeline_ae(self, config, callback_values): assert isinstance(serialize, SerializeStage) assert isinstance(to_file, WriteToFileStage) - assert to_file._output_file == 'out.csv' + assert to_file._controller._output_file == 'out.csv' @pytest.mark.replace_callback('pipeline_ae') def test_pipeline_ae_all(self, callback_values): @@ -338,7 +338,7 @@ def test_pipeline_ae_all(self, callback_values): assert isinstance(serialize, SerializeStage) assert isinstance(to_file, WriteToFileStage) - assert to_file._output_file == 'out.csv' + assert to_file._controller._output_file == 'out.csv' assert isinstance(to_kafka, WriteToKafkaStage) assert to_kafka._kafka_conf['bootstrap.servers'] == 'kserv1:123,kserv2:321' @@ -404,7 +404,7 @@ def test_pipeline_fil(self, config, callback_values): assert isinstance(serialize, SerializeStage) assert isinstance(to_file, WriteToFileStage) - assert to_file._output_file == 'out.csv' + assert to_file._controller._output_file == 'out.csv' @pytest.mark.replace_callback('pipeline_fil') def test_pipeline_fil_all(self, config, callback_values, tmp_path, mlflow_uri): @@ -528,7 +528,7 @@ def test_pipeline_fil_all(self, config, callback_values, tmp_path, mlflow_uri): assert isinstance(serialize, SerializeStage) assert isinstance(to_file, WriteToFileStage) - assert to_file._output_file == 'out.csv' + assert to_file._controller._output_file == 'out.csv' assert isinstance(to_kafka, WriteToKafkaStage) assert to_kafka._kafka_conf['bootstrap.servers'] == 'kserv1:123,kserv2:321' @@ -624,7 +624,7 @@ def test_enum_parsing(self, config, callback_values, tmp_path, mlflow_uri): assert isinstance(deserialize, DeserializeStage) assert isinstance(filter_stage, FilterDetectionsStage) - assert filter_stage._filter_source == FilterSource.TENSOR + assert filter_stage._controller._filter_source == FilterSource.TENSOR assert isinstance(dropna, DropNullStage) assert dropna._column == 'xyz' @@ -662,8 +662,8 @@ def test_enum_parsing(self, config, callback_values, tmp_path, mlflow_uri): assert isinstance(serialize, SerializeStage) assert isinstance(to_file, WriteToFileStage) - assert to_file._output_file == 'out.csv' - assert to_file._file_type == FileTypes.CSV + assert to_file._controller._output_file == 'out.csv' + assert to_file._controller._file_type == FileTypes.CSV assert isinstance(to_kafka, WriteToKafkaStage) assert to_kafka._kafka_conf['bootstrap.servers'] == 'kserv1:123,kserv2:321' @@ -745,7 +745,7 @@ def test_pipeline_nlp(self, config, callback_values): assert isinstance(serialize, SerializeStage) assert isinstance(to_file, WriteToFileStage) - assert to_file._output_file == 'out.csv' + assert to_file._controller._output_file == 'out.csv' @pytest.mark.replace_callback('pipeline_nlp') def test_pipeline_nlp_all(self, config, callback_values, tmp_path, mlflow_uri): @@ -877,7 +877,7 @@ def test_pipeline_nlp_all(self, config, callback_values, tmp_path, mlflow_uri): assert isinstance(serialize, SerializeStage) assert isinstance(to_file, WriteToFileStage) - assert to_file._output_file == 'out.csv' + assert to_file._controller._output_file == 'out.csv' assert isinstance(to_kafka, WriteToKafkaStage) assert to_kafka._kafka_conf['bootstrap.servers'] == 'kserv1:123,kserv2:321' diff --git a/tests/test_filter_detections_stage.py b/tests/test_filter_detections_stage.py index 9eeead93e2..e147d17d34 100755 --- a/tests/test_filter_detections_stage.py +++ b/tests/test_filter_detections_stage.py @@ -40,7 +40,7 @@ def test_constructor(config): assert len(accepted_types) > 0 fds = FilterDetectionsStage(config, threshold=0.2) - assert fds._threshold == 0.2 + assert fds._controller._threshold == 0.2 @pytest.mark.use_cudf @@ -52,7 +52,7 @@ def test_filter_copy(config, filter_probs_df): mock_message = _make_message(filter_probs_df, probs) # All values are at or below the threshold so nothing should be returned - output_message = fds.filter_copy(mock_message) + output_message = fds._controller.filter_copy(mock_message) assert output_message is None # Only one row has a value above the threshold @@ -64,7 +64,7 @@ def test_filter_copy(config, filter_probs_df): mock_message = _make_message(filter_probs_df, probs) - output_message = fds.filter_copy(mock_message) + output_message = fds._controller.filter_copy(mock_message) assert output_message.get_meta().to_cupy().tolist() == filter_probs_df.loc[1:1, :].to_cupy().tolist() # Two adjacent rows have a value above the threashold @@ -78,7 +78,7 @@ def test_filter_copy(config, filter_probs_df): mock_message = _make_message(filter_probs_df, probs) - output_message = fds.filter_copy(mock_message) + output_message = fds._controller.filter_copy(mock_message) assert output_message.get_meta().to_cupy().tolist() == filter_probs_df.loc[2:3, :].to_cupy().tolist() # Two non-adjacent rows have a value above the threashold @@ -93,7 +93,7 @@ def test_filter_copy(config, filter_probs_df): mock_message = _make_message(filter_probs_df, probs) - output_message = fds.filter_copy(mock_message) + output_message = fds._controller.filter_copy(mock_message) mask = cp.zeros(len(filter_probs_df), dtype=cp.bool_) mask[2] = True mask[4] = True @@ -118,7 +118,7 @@ def test_filter_column(config, filter_probs_df, do_copy, threshold, field_name): mock_message = _make_message(filter_probs_df, probs) # All values are at or below the threshold - output_message = fds.filter_copy(mock_message) + output_message = fds._controller.filter_copy(mock_message) assert output_message.get_meta().to_cupy().tolist() == expected_df.to_numpy().tolist() @@ -132,7 +132,7 @@ def test_filter_slice(config, filter_probs_df): mock_message = _make_message(filter_probs_df, probs) # All values are at or below the threshold - output_messages = fds.filter_slice(mock_message) + output_messages = fds._controller.filter_slice(mock_message) assert len(output_messages) == 0 # Only one row has a value above the threshold @@ -144,7 +144,7 @@ def test_filter_slice(config, filter_probs_df): mock_message = _make_message(filter_probs_df, probs) - output_messages = fds.filter_slice(mock_message) + output_messages = fds._controller.filter_slice(mock_message) assert len(output_messages) == 1 output_message = output_messages[0] assert output_message.get_meta().to_cupy().tolist() == filter_probs_df.loc[1:1, :].to_cupy().tolist() @@ -160,7 +160,7 @@ def test_filter_slice(config, filter_probs_df): mock_message = _make_message(filter_probs_df, probs) - output_messages = fds.filter_slice(mock_message) + output_messages = fds._controller.filter_slice(mock_message) assert len(output_messages) == 1 output_message = output_messages[0] assert output_message.offset == 2 @@ -179,7 +179,7 @@ def test_filter_slice(config, filter_probs_df): mock_message = _make_message(filter_probs_df, probs) - output_messages = fds.filter_slice(mock_message) + output_messages = fds._controller.filter_slice(mock_message) assert len(output_messages) == 2 (msg1, msg2) = output_messages # pylint: disable=unbalanced-tuple-unpacking assert msg1.offset == 2 diff --git a/tests/test_monitor_stage.py b/tests/test_monitor_stage.py index 586bb04e75..1e6e045459 100755 --- a/tests/test_monitor_stage.py +++ b/tests/test_monitor_stage.py @@ -59,7 +59,7 @@ def two_x(x): assert stage._mc._determine_count_fn is two_x -@mock.patch('morpheus.utils.monitor_utils.MorpheusTqdm') +@mock.patch('morpheus.controllers.monitor_controller.MorpheusTqdm') def test_on_start(mock_morph_tqdm, config): mock_morph_tqdm.return_value = mock_morph_tqdm @@ -72,7 +72,7 @@ def test_on_start(mock_morph_tqdm, config): assert stage._mc._progress is mock_morph_tqdm -@mock.patch('morpheus.utils.monitor_utils.MorpheusTqdm') +@mock.patch('morpheus.controllers.monitor_controller.MorpheusTqdm') def test_stop(mock_morph_tqdm, config): mock_morph_tqdm.return_value = mock_morph_tqdm @@ -88,7 +88,7 @@ def test_stop(mock_morph_tqdm, config): mock_morph_tqdm.close.assert_called_once() -@mock.patch('morpheus.utils.monitor_utils.MorpheusTqdm') +@mock.patch('morpheus.controllers.monitor_controller.MorpheusTqdm') def test_refresh(mock_morph_tqdm, config): mock_morph_tqdm.return_value = mock_morph_tqdm @@ -134,7 +134,7 @@ def test_auto_count_fn_not_impl(config, value: typing.Any): stage._mc.auto_count_fn(value) -@mock.patch('morpheus.utils.monitor_utils.MorpheusTqdm') +@mock.patch('morpheus.controllers.monitor_controller.MorpheusTqdm') def test_progress_sink(mock_morph_tqdm, config): mock_morph_tqdm.return_value = mock_morph_tqdm diff --git a/tests/test_serialize_stage.py b/tests/test_serialize_stage.py index 0f596b5980..9030a19e90 100755 --- a/tests/test_serialize_stage.py +++ b/tests/test_serialize_stage.py @@ -42,16 +42,16 @@ def test_fixed_columns(config): include_re_str = '^app.*' include_re = re.compile(include_re_str) - s = SerializeStage(config, include=[include_re_str], fixed_columns=True) - meta1 = s.convert_to_df(mm1, include_columns=include_re, exclude_columns=[]) - meta2 = s.convert_to_df(mm2, include_columns=include_re, exclude_columns=[]) + stage = SerializeStage(config, include=[include_re_str], fixed_columns=True) + meta1 = stage._controller.convert_to_df(mm1, include_columns=include_re, exclude_columns=[]) + meta2 = stage._controller.convert_to_df(mm2, include_columns=include_re, exclude_columns=[]) assert meta1.df.columns.to_list() == ['apples', 'apple_sauce'] assert meta2.df.columns.to_list() == ['apples', 'apple_sauce'] - s = SerializeStage(config, include=[include_re_str], fixed_columns=False) - meta1 = s.convert_to_df(mm1, include_columns=include_re, exclude_columns=[]) - meta2 = s.convert_to_df(mm2, include_columns=include_re, exclude_columns=[]) + stage = SerializeStage(config, include=[include_re_str], fixed_columns=False) + meta1 = stage._controller.convert_to_df(mm1, include_columns=include_re, exclude_columns=[]) + meta2 = stage._controller.convert_to_df(mm2, include_columns=include_re, exclude_columns=[]) assert meta1.df.columns.to_list() == ['apples', 'apple_sauce'] assert meta2.df.columns.to_list() == ['apples', 'applause', 'apple_sauce'] diff --git a/tests/utils/nvt/test_schema_converters.py b/tests/utils/nvt/test_schema_converters.py index 917f5cf90a..03270a6da5 100644 --- a/tests/utils/nvt/test_schema_converters.py +++ b/tests/utils/nvt/test_schema_converters.py @@ -26,6 +26,7 @@ from morpheus.utils.column_info import DateTimeColumn from morpheus.utils.column_info import DistinctIncrementColumn from morpheus.utils.column_info import IncrementColumn +from morpheus.utils.column_info import PreparedDFInfo from morpheus.utils.column_info import RenameColumn from morpheus.utils.column_info import StringCatColumn from morpheus.utils.column_info import StringJoinColumn @@ -361,8 +362,8 @@ def test_input_schema_conversion_interdependent_columns(): test_df["application"] = ['{"name": "AnotherApp", "version": "1.0"}'] modified_schema = create_and_attach_nvt_workflow(modified_schema) - test_df = modified_schema.prep_dataframe(test_df) - dataset = nvt.Dataset(test_df) + prepared_df_info: PreparedDFInfo = modified_schema.prep_dataframe(test_df) + dataset = nvt.Dataset(prepared_df_info.df) output_df = modified_schema.nvt_workflow.transform(dataset).to_ddf().compute().to_pandas() expected_df = pd.DataFrame({ @@ -399,8 +400,8 @@ def test_input_schema_conversion_nested_operations(): modified_schema.column_info.append(ColumnInfo(name="appsuffix", dtype="str")) modified_schema = create_and_attach_nvt_workflow(modified_schema) - test_df = modified_schema.prep_dataframe(test_df) - dataset = nvt.Dataset(test_df) + prepared_df_info: PreparedDFInfo = modified_schema.prep_dataframe(test_df) + dataset = nvt.Dataset(prepared_df_info.df) output_df = modified_schema.nvt_workflow.transform(dataset).to_ddf().compute().to_pandas() expected_df = pd.DataFrame({ @@ -503,8 +504,8 @@ def test_input_schema_conversion(): modified_schema = create_and_attach_nvt_workflow(example_schema) # Apply the returned nvt.Workflow to the test dataframe - test_df = modified_schema.prep_dataframe(test_df) - dataset = nvt.Dataset(test_df) + prepared_df_info: PreparedDFInfo = modified_schema.prep_dataframe(test_df) + dataset = nvt.Dataset(prepared_df_info.df) output_df = modified_schema.nvt_workflow.transform(dataset).to_ddf().compute().to_pandas() # Check if the output dataframe has the expected schema and values @@ -587,8 +588,8 @@ def test_input_schema_conversion_with_functional_filter(): example_schema = create_and_attach_nvt_workflow(example_schema) # Apply the returned nvt.Workflow to the test dataframe - test_df = example_schema.prep_dataframe(test_df) - dataset = nvt.Dataset(test_df) + prepared_df_info: PreparedDFInfo = example_schema.prep_dataframe(test_df) + dataset = nvt.Dataset(prepared_df_info.df) output_df = example_schema.nvt_workflow.transform(dataset).to_ddf().compute().to_pandas() # Check if the output dataframe has the expected schema and values