diff --git a/docs/source/developer_guide/guides/1_simple_python_stage.md b/docs/source/developer_guide/guides/1_simple_python_stage.md index 95a71012e9..adb813a8b2 100644 --- a/docs/source/developer_guide/guides/1_simple_python_stage.md +++ b/docs/source/developer_guide/guides/1_simple_python_stage.md @@ -230,6 +230,18 @@ Before constructing the pipeline, we need to do a bit of environment configurati ```python configure_logging(log_level=logging.DEBUG) ``` +We use the default configuration with the `DEBUG` logging level. The logger will output to both the console and a file. The logging handlers are non-blocking since they utilize a queue to send the log messages on a separate thread. + +We can also use `configure_logging` to add one or more logging handlers to the default configuration. The added handlers will also be non-blocking. The following is from the Grafana example (`examples/digital_fingerprinting/production/grafana/README.md`) where we add a [Loki](https://grafana.com/oss/loki/) logging handler to also publish Morpheus logs to a Loki log aggregation server. +```python +loki_handler = logging_loki.LokiHandler( + url=f"{loki_url}/loki/api/v1/push", + tags={"app": "morpheus"}, + version="1", +) + +configure_logging(loki_handler, log_level=log_level) +``` Next, we will build a Morpheus `Config` object. We will cover setting some common configuration parameters in the next guide. For now, it is important to know that we will always need to build a `Config` object: ```python diff --git a/examples/digital_fingerprinting/production/conda_env.yml b/examples/digital_fingerprinting/production/conda_env.yml index 1b6117cf80..80e40b9f88 100644 --- a/examples/digital_fingerprinting/production/conda_env.yml +++ b/examples/digital_fingerprinting/production/conda_env.yml @@ -32,3 +32,7 @@ dependencies: - nvtabular=23.06 - papermill - s3fs>=2023.6 + + ##### Pip Dependencies (keep sorted!) ####### + - pip: + - python-logging-loki diff --git a/examples/digital_fingerprinting/production/docker-compose.yml b/examples/digital_fingerprinting/production/docker-compose.yml index 47881c3e58..7cf1c636b1 100644 --- a/examples/digital_fingerprinting/production/docker-compose.yml +++ b/examples/digital_fingerprinting/production/docker-compose.yml @@ -145,9 +145,26 @@ services: - ./grafana/config/dashboards.yaml:/etc/grafana/provisioning/dashboards/dashboards.yaml - ./grafana/dashboards/:/var/lib/grafana/dashboards/ - ./grafana/datasources/:/etc/grafana/provisioning/datasources/ - - ./morpheus:/workspace + - ./grafana:/workspace ports: - "3000:3000" + networks: + - frontend + - backend + depends_on: + - loki + + loki: + image: grafana/loki:2.9.3 + volumes: + - ./grafana/config/loki-config.yml:/etc/loki/loki-config.yml + ports: + - "3100:3100" + networks: + - frontend + - backend + restart: unless-stopped + command: -config.file=/etc/loki/loki-config.yml networks: frontend: diff --git a/examples/digital_fingerprinting/production/grafana/README.md b/examples/digital_fingerprinting/production/grafana/README.md index 8cdf47d2b3..f79fe6e92b 100644 --- a/examples/digital_fingerprinting/production/grafana/README.md +++ b/examples/digital_fingerprinting/production/grafana/README.md @@ -14,24 +14,31 @@ # limitations under the License. --> -# Grafana DFP Dashboard Example +# Using Grafana with Morpheus DFP Pipeline -This example demonstrates how to use [Grafana](https://grafana.com/grafana/) to visualize the inference results from the [Azure DFP pipeline example](../production/README.md). +This example builds on the [Azure DFP pipeline example](../production/README.md) to demonstrate how [Grafana](https://grafana.com/grafana/) can be used for log monitoring, error alerting, and inference results visualization. ## Grafana Configuration -### CSV data source plugin +The data sources and dashboards in this example are managed using config files. [Grafana's provisioning system](https://grafana.com/docs/grafana/latest/administration/provisioning/) then uses these files to add the data sources and dashboards to Grafana upon startup. -The [CSV data source plugin](https://grafana.com/grafana/plugins/marcusolsson-csv-datasource/) is installed to Grafana to read the Azure inference results CSV file. This example assumes we are using the CSV file generated from running the Python script for [Azure DFP pipeline example](../production/README.md). +### Data Sources -If using the [notebook version](../production/morpheus/notebooks/dfp_azure_inference.ipynb) to run inference, you'll need to update the `url` in [datasources.yaml](./datasources/datasources.yaml) as follows: -``` -url: /workspace/notebooks/dfp_detections_azure.csv -``` +Grafana includes built-in support for many data sources. There are also several data sources available that can be installed as plugins. More information about how to manage Grafana data sources can be found [here](https://grafana.com/docs/grafana/latest/datasources/). + +The following data sources for this example are configured in [datasources.yaml](./datasources/datasources.yaml): + +#### Loki data source + +[Loki](https://grafana.com/docs/loki/latest/) is Grafana's log aggregation system. The Loki service is started automatically when the Grafana service starts up. The [Python script for running the DFP pipeline](./run.py) has been updated to configure a logging handler that sends the Morpheus logs to the Loki service. + +#### CSV data source plugin + +The [CSV data source plugin](https://grafana.com/grafana/plugins/marcusolsson-csv-datasource/) is installed to Grafana to read the Azure inference results CSV file. This example assumes we are using the CSV file generated from running the Python script for [Azure DFP pipeline example](../production/README.md). Please note that the use of the CSV plugin is for demonstration purposes only. Grafana includes support for many data sources more suitable for production deployments. See [here](https://grafana.com/docs/grafana/latest/datasources/) for more information. -### Updates to grafana.ini +#### Updates to grafana.ini The following is added to the default `grafana.ini` to enable local mode for CSV data source plugin. This allows the CSV data source plugin to access files on local file system. @@ -40,14 +47,24 @@ The following is added to the default `grafana.ini` to enable local mode for CSV allow_local_mode = true ``` -## Run Azure Production DFP Training and Inference Examples +## Add Loki logging handler to DFP pipeline -### Start Morpheus DFP pipeline container +The [pipeline run script](./run.py) for the Azure DFP example has been updated with the following to add the Loki logging handler which will publish the Morpheus logs to our Loki service: -The following steps are taken from [Azure DFP pipeline example](../production/README.md). Run the followng commands to start the Morpheus container: +``` +loki_handler = logging_loki.LokiHandler( + url=f"{loki_url}/loki/api/v1/push", + tags={"app": "morpheus"}, + version="1", +) -Build the Morpheus container: +configure_logging(loki_handler, log_level=log_level) +``` + +More information about Loki Python logging can be found [here](https://pypi.org/project/python-logging-loki/). +## Build the Morpheus container: +From the root of the Morpheus repo: ```bash ./docker/build_container_release.sh ``` @@ -60,45 +77,96 @@ export MORPHEUS_CONTAINER_VERSION="$(git describe --tags --abbrev=0)-runtime" docker compose build ``` -Create `bash` shell in `morpheus_pipeline` container: +## Start Grafana and Loki services: +To start Grafana and Loki, run the following command on host in `examples/digital_fingerprinting/production`: ```bash -docker compose run morpheus_pipeline bash +docker compose up grafana ``` -### Run Azure training pipeline +## Run Azure DFP Training -Run the following in the container to train Azure models. +Create `bash` shell in `morpheus_pipeline` container: ```bash -python dfp_azure_pipeline.py --log_level INFO --train_users generic --start_time "2022-08-01" --input_file="../../../data/dfp/azure-training-data/AZUREAD_2022*.json" +docker compose run --rm morpheus_pipeline bash ``` -### Run Azure inference pipeline: - -Run the inference pipeline with `filter_threshold=0.0`. This will disable the filtering of the inference results. +Set `PYTHONPATH` environment variable to allow import of production DFP Morpheus stages: +``` +export PYTHONPATH=/workspace/examples/digital_fingerprinting/production/morpheus +``` +Run the following in the container to train the Azure models. ```bash -python dfp_azure_pipeline.py --log_level INFO --train_users none --start_time "2022-08-30" --input_file="../../../data/dfp/azure-inference-data/*.json" --filter_threshold=0.0 +cd /workspace/examples/digital_fingerprinting/production/grafana +python run.py --log_level DEBUG --train_users generic --start_time "2022-08-01" --input_file="../../../data/dfp/azure-training-data/AZUREAD_2022*.json" ``` -The inference results will be saved to `dfp_detection_azure.csv` in the directory where script was run. +## View DFP Logs Dashboard in Grafana + +While the training pipeline is running, you can view Morpheus logs live in a Grafana dashboard at http://localhost:3000/dashboards. + +Click on `DFP Logs` in the `General` folder. You may need to expand the `General` folder to see the link. + + + +This dashboard was provisioned using config files but can also be manually created with the following steps: +1. Click `Dashboards` in the left-side menu. +2. Click `New` and select `New Dashboard`. +3. On the empty dashboard, click `+ Add visualization`. +4. In the dialog box that opens, Select the `Loki` data source. +5. In the `Edit Panel` view, change from `Time Series` visualization to `Logs`. +6. Add label filter: `app = morpheus`. +7. Change Order to `Oldest first`. +8. Click `Apply` to see your changes applied to the dashboard. Then click the save icon in the dashboard header. -## Run Grafana Docker Image +## Set up Error Alerting -To start Grafana, run the following command on host in `examples/digital_fingerprinting/production`: +We demonstrate here with a simple example how we can use Grafana Alerting to notify us of a pipeline error moments after it occurs. This is especially useful with long-running pipelines. +1. Click `Alert Rules` under `Alerting` in the left-side menu. +2. Click `New Alert Rule` +3. Enter alert rule name: `DFP Error Alert Rule` +4. In `Define query and alert condition` section, select `Loki` data source. +5. Switch to `Code` view by clicking the `Code` button on the right. +6. Enter the following Loki Query which counts the number of log lines in last minute that have an error label (`severity=error`): ``` -docker compose up grafana +count_over_time({severity="error"}[1m]) +``` +7. Under `Expressions`, keep default configurations for `Reduce` and `Threshold`. The alert condition threshold will be error counts > 0. +7. In `Set evaluation behavior` section, click `+ New folder`, enter `morpheus` then click `Create` button. +8. Click `+ New evaluation group`, enter `dfp` for `Evaluation group name` and `1m` for `Evaluation interval`, then click `Create` button. +9. Enter `0s` for `Pending period`. This configures alerts to be fired instantly when alert condition is met. +10. Test your alert rule, by running the following in your `morpheus_pipeline` container. This will cause an error because `--input-file` glob will no longer match any of our training data files. +``` +python run.py --log_level DEBUG --train_users generic --start_time "2022-08-01" --input_file="../../../data/dfp/azure-training-data/AZUREAD_2022*.json" ``` +11. Click the `Preview` button to test run the alert rule. You should now see how our alert query picks up the error log, processes it through our reduce/threshold expressions and satisfies our alert condition. This is indicated by the `Firing` label in the `Threshold` section. + + + +12. Finally, click `Save rule and exit` at top right of the page. + +By default, all alerts will be sent through the `grafana-default-email` contact point. You can add email addresses to this contact point by clicking on `Contact points` under `Alerting` in the left-side menu. You would also have to configure SMTP in the `[smtp]` section of your `grafana.ini`. More information about about Grafana Alerting contact points can found [here](https://grafana.com/docs/grafana/latest/alerting/fundamentals/contact-points/). + +## Run Azure DFP Inference: + +Run the inference pipeline with `filter_threshold=0.0`. This will disable the filtering of the inference results. + +```bash +python run.py --log_level DEBUG --train_users none --start_time "2022-08-30" --input_file="../../../data/dfp/azure-inference-data/*.json" --filter_threshold=0.0 +``` + +The inference results will be saved to `dfp_detection_azure.csv` in the directory where script was run. -## View DFP Dashboard +## View DFP Detections Dashboard in Grafana -Our Grafana DFP dashboard can now be accessed via web browser at http://localhost:3000/dashboards. +When the inference pipeline completes, you can view visualizations of the inference results at http://localhost:3000/dashboards. -Click on `DFP_Dashboard` in the `General` folder. You may need to expand the `General` folder to see the link. +Click on `DFP Detections` in the `General` folder. You may need to expand the `General` folder to see the link. - + The dashboard has the following visualization panels: diff --git a/examples/digital_fingerprinting/production/grafana/config/grafana.ini b/examples/digital_fingerprinting/production/grafana/config/grafana.ini index 6b30172ff5..97df7a8bba 100644 --- a/examples/digital_fingerprinting/production/grafana/config/grafana.ini +++ b/examples/digital_fingerprinting/production/grafana/config/grafana.ini @@ -379,7 +379,7 @@ ;token_rotation_interval_minutes = 10 # Set to true to disable (hide) the login form, useful if you use OAuth, defaults to false -disable_login_form = true +;disable_login_form = true # Set to true to disable the sign out link in the side menu. Useful if you use auth.proxy or auth.jwt, defaults to false ;disable_signout_menu = false diff --git a/examples/digital_fingerprinting/production/grafana/config/loki-config.yml b/examples/digital_fingerprinting/production/grafana/config/loki-config.yml new file mode 100644 index 0000000000..77cfa39956 --- /dev/null +++ b/examples/digital_fingerprinting/production/grafana/config/loki-config.yml @@ -0,0 +1,65 @@ +# SPDX-FileCopyrightText: Copyright (c) 2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +auth_enabled: false + +server: + http_listen_port: 3100 + grpc_listen_port: 9096 + +common: + instance_addr: 127.0.0.1 + path_prefix: /tmp/loki + storage: + filesystem: + chunks_directory: /tmp/loki/chunks + rules_directory: /tmp/loki/rules + replication_factor: 1 + ring: + kvstore: + store: inmemory + +query_range: + results_cache: + cache: + embedded_cache: + enabled: true + max_size_mb: 100 + +schema_config: + configs: + - from: 2020-10-24 + store: boltdb-shipper + object_store: filesystem + schema: v11 + index: + prefix: index_ + period: 24h + +ruler: + alertmanager_url: http://localhost:9093 + +# By default, Loki will send anonymous, but uniquely-identifiable usage and configuration +# analytics to Grafana Labs. These statistics are sent to https://stats.grafana.org/ +# +# Statistics help us better understand how Loki is used, and they show us performance +# levels for most users. This helps us prioritize features and documentation. +# For more information on what's sent, look at +# https://github.com/grafana/loki/blob/main/pkg/usagestats/stats.go +# Refer to the buildReport method to see what goes into a report. +# +# If you would like to disable reporting, uncomment the following lines: +#analytics: +# reporting_enabled: false diff --git a/examples/digital_fingerprinting/production/grafana/dashboards/DFP_Dashboard.json b/examples/digital_fingerprinting/production/grafana/dashboards/dfp_detections.json similarity index 99% rename from examples/digital_fingerprinting/production/grafana/dashboards/DFP_Dashboard.json rename to examples/digital_fingerprinting/production/grafana/dashboards/dfp_detections.json index f80780a381..9167ecaca5 100644 --- a/examples/digital_fingerprinting/production/grafana/dashboards/DFP_Dashboard.json +++ b/examples/digital_fingerprinting/production/grafana/dashboards/dfp_detections.json @@ -557,7 +557,7 @@ }, "timepicker": {}, "timezone": "", - "title": "DFP_Dashboard", + "title": "DFP Detections", "uid": "f810d98f-bf31-42d4-98aa-9eb3fa187184", "version": 1, "weekStart": "" diff --git a/examples/digital_fingerprinting/production/grafana/dashboards/dfp_logs.json b/examples/digital_fingerprinting/production/grafana/dashboards/dfp_logs.json new file mode 100644 index 0000000000..c4ed0448c9 --- /dev/null +++ b/examples/digital_fingerprinting/production/grafana/dashboards/dfp_logs.json @@ -0,0 +1,78 @@ +{ + "annotations": { + "list": [ + { + "builtIn": 1, + "datasource": { + "type": "grafana", + "uid": "-- Grafana --" + }, + "enable": true, + "hide": true, + "iconColor": "rgba(0, 211, 255, 1)", + "name": "Annotations & Alerts", + "type": "dashboard" + } + ] + }, + "editable": true, + "fiscalYearStartMonth": 0, + "graphTooltip": 0, + "links": [], + "liveNow": false, + "panels": [ + { + "datasource": { + "type": "loki", + "uid": "P8E80F9AEF21F6940" + }, + "gridPos": { + "h": 18, + "w": 23, + "x": 0, + "y": 0 + }, + "id": 1, + "options": { + "dedupStrategy": "none", + "enableLogDetails": true, + "prettifyLogMessage": false, + "showCommonLabels": false, + "showLabels": false, + "showTime": false, + "sortOrder": "Ascending", + "wrapLogMessage": false + }, + "targets": [ + { + "datasource": { + "type": "loki", + "uid": "P8E80F9AEF21F6940" + }, + "editorMode": "builder", + "expr": "{app=\"morpheus\"} |= ``", + "queryType": "range", + "refId": "A" + } + ], + "type": "logs" + } + ], + "refresh": "5s", + "schemaVersion": 38, + "style": "dark", + "tags": [], + "templating": { + "list": [] + }, + "time": { + "from": "now-6h", + "to": "now" + }, + "timepicker": {}, + "timezone": "", + "title": "DFP Logs", + "uid": "dfb4fe34-daae-4894-9ff0-b8f89b7d256e", + "version": 1, + "weekStart": "" +} \ No newline at end of file diff --git a/examples/digital_fingerprinting/production/grafana/datasources/datasources.yaml b/examples/digital_fingerprinting/production/grafana/datasources/datasources.yaml index 8d5182a3ea..edcebabb8a 100644 --- a/examples/digital_fingerprinting/production/grafana/datasources/datasources.yaml +++ b/examples/digital_fingerprinting/production/grafana/datasources/datasources.yaml @@ -16,6 +16,12 @@ apiVersion: 1 datasources: + - name: Loki + type: loki + access: proxy + url: http://loki:3100 + jsonData: + maxLines: 1000 - name: csv-datasource uid: 1257c93b-f998-438c-a784-7e90fb94fb36 type: marcusolsson-csv-datasource diff --git a/examples/digital_fingerprinting/production/grafana/img/screenshot.png b/examples/digital_fingerprinting/production/grafana/img/dfp_detections_dashboard.png similarity index 100% rename from examples/digital_fingerprinting/production/grafana/img/screenshot.png rename to examples/digital_fingerprinting/production/grafana/img/dfp_detections_dashboard.png diff --git a/examples/digital_fingerprinting/production/grafana/img/dfp_error_alert_setup.png b/examples/digital_fingerprinting/production/grafana/img/dfp_error_alert_setup.png new file mode 100644 index 0000000000..516249f41a --- /dev/null +++ b/examples/digital_fingerprinting/production/grafana/img/dfp_error_alert_setup.png @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:9ada0a466cfeb34363623cf78d478e17737a1d439b43bec989c7b026749a0fe2 +size 474978 diff --git a/examples/digital_fingerprinting/production/grafana/img/dfp_logs_dashboard.png b/examples/digital_fingerprinting/production/grafana/img/dfp_logs_dashboard.png new file mode 100644 index 0000000000..8cec30b668 --- /dev/null +++ b/examples/digital_fingerprinting/production/grafana/img/dfp_logs_dashboard.png @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:753f199658371c462e6cbdbc324ee08acdafcf5453d0dae9b4042d133dfbabe0 +size 581211 diff --git a/examples/digital_fingerprinting/production/grafana/run.py b/examples/digital_fingerprinting/production/grafana/run.py new file mode 100644 index 0000000000..1f10cd4f67 --- /dev/null +++ b/examples/digital_fingerprinting/production/grafana/run.py @@ -0,0 +1,476 @@ +# Copyright (c) 2022-2024, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""DFP training & inference pipelines for Azure Active Directory logs.""" + +import functools +import logging +import logging.handlers +import os +import typing +from datetime import datetime +from datetime import timedelta +from datetime import timezone + +import click +import logging_loki +import mlflow +import pandas as pd +from dfp.stages.dfp_file_batcher_stage import DFPFileBatcherStage +from dfp.stages.dfp_file_to_df import DFPFileToDataFrameStage +from dfp.stages.dfp_inference_stage import DFPInferenceStage +from dfp.stages.dfp_mlflow_model_writer import DFPMLFlowModelWriterStage +from dfp.stages.dfp_postprocessing_stage import DFPPostprocessingStage +from dfp.stages.dfp_preprocessing_stage import DFPPreprocessingStage +from dfp.stages.dfp_rolling_window_stage import DFPRollingWindowStage +from dfp.stages.dfp_split_users_stage import DFPSplitUsersStage +from dfp.stages.dfp_training import DFPTraining +from dfp.stages.multi_file_source import MultiFileSource +from dfp.utils.regex_utils import iso_date_regex + +from morpheus.cli.utils import get_log_levels +from morpheus.cli.utils import get_package_relative_file +from morpheus.cli.utils import load_labels_file +from morpheus.cli.utils import parse_log_level +from morpheus.common import FileTypes +from morpheus.common import FilterSource +from morpheus.config import Config +from morpheus.config import ConfigAutoEncoder +from morpheus.config import CppConfig +from morpheus.pipeline import LinearPipeline +from morpheus.stages.general.monitor_stage import MonitorStage +from morpheus.stages.output.write_to_file_stage import WriteToFileStage +from morpheus.stages.postprocess.filter_detections_stage import FilterDetectionsStage +from morpheus.stages.postprocess.serialize_stage import SerializeStage +from morpheus.utils.column_info import ColumnInfo +from morpheus.utils.column_info import DataFrameInputSchema +from morpheus.utils.column_info import DateTimeColumn +from morpheus.utils.column_info import DistinctIncrementColumn +from morpheus.utils.column_info import IncrementColumn +from morpheus.utils.column_info import RenameColumn +from morpheus.utils.column_info import StringCatColumn +from morpheus.utils.file_utils import date_extractor +from morpheus.utils.logger import configure_logging + + +def _file_type_name_to_enum(file_type: str) -> FileTypes: + """Converts a file type name to a FileTypes enum.""" + if (file_type == "JSON"): + return FileTypes.JSON + if (file_type == "CSV"): + return FileTypes.CSV + if (file_type == "PARQUET"): + return FileTypes.PARQUET + + return FileTypes.Auto + + +@click.command() +@click.option( + "--train_users", + type=click.Choice(["all", "generic", "individual", "none"], case_sensitive=False), + help=("Indicates whether or not to train per user or a generic model for all users. " + "Selecting none runs the inference pipeline."), +) +@click.option( + "--skip_user", + multiple=True, + type=str, + help="User IDs to skip. Mutually exclusive with only_user", +) +@click.option( + "--only_user", + multiple=True, + type=str, + help="Only users specified by this option will be included. Mutually exclusive with skip_user", +) +@click.option( + "--start_time", + type=click.DateTime( + formats=['%Y-%m-%d', '%Y-%m-%dT%H:%M:%S', '%Y-%m-%d %H:%M:%S', '%Y-%m-%dT%H:%M:%S%z', '%Y-%m-%d %H:%M:%S%z']), + default=None, + help="The start of the time window, if undefined start_date will be `now()-duration`", +) +@click.option( + "--duration", + type=str, + default="60d", + help="The duration to run starting from start_time", +) +@click.option( + "--cache_dir", + type=str, + default="./.cache/dfp", + show_envvar=True, + help="The location to cache data such as S3 downloads and pre-processed data", +) +@click.option("--log_level", + default=logging.getLevelName(Config().log_level), + type=click.Choice(get_log_levels(), case_sensitive=False), + callback=parse_log_level, + help="Specify the logging level to use.") +@click.option("--sample_rate_s", + type=int, + default=0, + show_envvar=True, + help="Minimum time step, in milliseconds, between object logs.") +@click.option("--filter_threshold", + type=float, + default=2.0, + show_envvar=True, + help="Filter out inference results below this threshold") +@click.option( + "--input_file", + "-f", + type=str, + multiple=True, + help=("List of files to process. Can specify multiple arguments for multiple files. " + "Also accepts glob (*) wildcards and schema prefixes such as `s3://`. " + "For example, to make a local cache of an s3 bucket, use `filecache::s3://mybucket/*`. " + "Refer to fsspec documentation for list of possible options."), +) +@click.option("--file_type_override", + "-t", + type=click.Choice(["AUTO", "JSON", "CSV", "PARQUET"], case_sensitive=False), + default="JSON", + help="Override the detected file type. Values can be 'AUTO', 'JSON', 'CSV', or 'PARQUET'.", + callback=lambda _, + __, + value: None if value is None else _file_type_name_to_enum(value)) +@click.option('--watch_inputs', + type=bool, + is_flag=True, + default=False, + help=("Instructs the pipeline to continuously check the paths specified by `--input_file` for new files. " + "This assumes that the at least one paths contains a wildcard.")) +@click.option("--watch_interval", + type=float, + default=1.0, + help=("Amount of time, in seconds, to wait between checks for new files. " + "Only used if --watch_inputs is set.")) +@click.option('--tracking_uri', + type=str, + default="http://mlflow:5000", + help=("The MLflow tracking URI to connect to the tracking backend.")) +@click.option('--mlflow_experiment_name_template', + type=str, + default="dfp/azure/training/{reg_model_name}", + help="The MLflow experiment name template to use when logging experiments. ") +@click.option('--mlflow_model_name_template', + type=str, + default="DFP-azure-{user_id}", + help="The MLflow model name template to use when logging models. ") +@click.option('--use_postproc_schema', is_flag=True, help='Assume that input data has already been preprocessed.') +@click.option('--inference_detection_file_name', type=str, default="dfp_detections_azure.csv") +@click.option('--loki_url', + type=str, + default="http://loki:3100", + help=("Loki URL for error logging and alerting in Grafana.")) +def run_pipeline(train_users, + skip_user: typing.Tuple[str], + only_user: typing.Tuple[str], + start_time: datetime, + duration, + cache_dir, + log_level, + sample_rate_s, + filter_threshold, + mlflow_experiment_name_template, + mlflow_model_name_template, + file_type_override, + use_postproc_schema, + inference_detection_file_name, + loki_url, + **kwargs): + """Runs the DFP pipeline.""" + # To include the generic, we must be training all or generic + include_generic = train_users in ("all", "generic") + + # To include individual, we must be either training or inferring + include_individual = train_users != "generic" + + # None indicates we aren't training anything + is_training = train_users != "none" + + skip_users = list(skip_user) + only_users = list(only_user) + + duration = timedelta(seconds=pd.Timedelta(duration).total_seconds()) + if start_time is None: + end_time = datetime.now(tz=timezone.utc) + start_time = end_time - duration + else: + if start_time.tzinfo is None: + start_time = start_time.replace(tzinfo=timezone.utc) + + end_time = start_time + duration + + # Enable the Morpheus logger + loki_handler = logging_loki.LokiHandler( + url=f"{loki_url}/loki/api/v1/push", + tags={"app": "morpheus"}, + version="1", + ) + configure_logging(loki_handler, log_level=log_level) + logging.getLogger("mlflow").setLevel(log_level) + + if (len(skip_users) > 0 and len(only_users) > 0): + logging.error("Option --skip_user and --only_user are mutually exclusive. Exiting") + + logger = logging.getLogger(f"morpheus.{__name__}") + + logger.info("Running training pipeline with the following options: ") + logger.info("Train generic_user: %s", include_generic) + logger.info("Skipping users: %s", skip_users) + logger.info("Start Time: %s", start_time) + logger.info("Duration: %s", duration) + logger.info("Cache Dir: %s", cache_dir) + + if ("tracking_uri" in kwargs): + # Initialize ML Flow + mlflow.set_tracking_uri(kwargs["tracking_uri"]) + logger.info("Tracking URI: %s", mlflow.get_tracking_uri()) + + config = Config() + + CppConfig.set_should_use_cpp(False) + + config.num_threads = os.cpu_count() + + config.ae = ConfigAutoEncoder() + + config.ae.feature_columns = load_labels_file(get_package_relative_file("data/columns_ae_azure.txt")) + config.ae.userid_column_name = "username" + config.ae.timestamp_column_name = "timestamp" + + # Specify the column names to ensure all data is uniform + if (use_postproc_schema): + + source_column_info = [ + ColumnInfo(name="autonomousSystemNumber", dtype=str), + ColumnInfo(name="location_geoCoordinates_latitude", dtype=float), + ColumnInfo(name="location_geoCoordinates_longitude", dtype=float), + ColumnInfo(name="resourceDisplayName", dtype=str), + ColumnInfo(name="travel_speed_kmph", dtype=float), + DateTimeColumn(name=config.ae.timestamp_column_name, dtype=datetime, input_name="time"), + ColumnInfo(name="appDisplayName", dtype=str), + ColumnInfo(name="clientAppUsed", dtype=str), + RenameColumn(name=config.ae.userid_column_name, dtype=str, input_name="userPrincipalName"), + RenameColumn(name="deviceDetailbrowser", dtype=str, input_name="deviceDetail_browser"), + RenameColumn(name="deviceDetaildisplayName", dtype=str, input_name="deviceDetail_displayName"), + RenameColumn(name="deviceDetailoperatingSystem", dtype=str, input_name="deviceDetail_operatingSystem"), + + # RenameColumn(name="location_country", dtype=str, input_name="location_countryOrRegion"), + ColumnInfo(name="location_city_state_country", dtype=str), + ColumnInfo(name="location_state_country", dtype=str), + ColumnInfo(name="location_country", dtype=str), + + # Non-features + ColumnInfo(name="is_corp_vpn", dtype=bool), + ColumnInfo(name="distance_km", dtype=float), + ColumnInfo(name="ts_delta_hour", dtype=float), + ] + source_schema = DataFrameInputSchema(column_info=source_column_info) + + preprocess_column_info = [ + ColumnInfo(name=config.ae.timestamp_column_name, dtype=datetime), + ColumnInfo(name=config.ae.userid_column_name, dtype=str), + + # Resource access + ColumnInfo(name="appDisplayName", dtype=str), + ColumnInfo(name="resourceDisplayName", dtype=str), + ColumnInfo(name="clientAppUsed", dtype=str), + + # Device detail + ColumnInfo(name="deviceDetailbrowser", dtype=str), + ColumnInfo(name="deviceDetaildisplayName", dtype=str), + ColumnInfo(name="deviceDetailoperatingSystem", dtype=str), + + # Location information + ColumnInfo(name="autonomousSystemNumber", dtype=str), + ColumnInfo(name="location_geoCoordinates_latitude", dtype=float), + ColumnInfo(name="location_geoCoordinates_longitude", dtype=float), + ColumnInfo(name="location_city_state_country", dtype=str), + ColumnInfo(name="location_state_country", dtype=str), + ColumnInfo(name="location_country", dtype=str), + + # Derived information + ColumnInfo(name="travel_speed_kmph", dtype=float), + + # Non-features + ColumnInfo(name="is_corp_vpn", dtype=bool), + ColumnInfo(name="distance_km", dtype=float), + ColumnInfo(name="ts_delta_hour", dtype=float), + ] + + preprocess_schema = DataFrameInputSchema(column_info=preprocess_column_info, preserve_columns=["_batch_id"]) + + exclude_from_training = [ + config.ae.userid_column_name, + config.ae.timestamp_column_name, + "is_corp_vpn", + "distance_km", + "ts_delta_hour", + ] + + config.ae.feature_columns = [ + name for (name, dtype) in preprocess_schema.output_columns if name not in exclude_from_training + ] + else: + source_column_info = [ + DateTimeColumn(name=config.ae.timestamp_column_name, dtype=datetime, input_name="time"), + RenameColumn(name=config.ae.userid_column_name, dtype=str, input_name="properties.userPrincipalName"), + RenameColumn(name="appDisplayName", dtype=str, input_name="properties.appDisplayName"), + ColumnInfo(name="category", dtype=str), + RenameColumn(name="clientAppUsed", dtype=str, input_name="properties.clientAppUsed"), + RenameColumn(name="deviceDetailbrowser", dtype=str, input_name="properties.deviceDetail.browser"), + RenameColumn(name="deviceDetaildisplayName", dtype=str, input_name="properties.deviceDetail.displayName"), + RenameColumn(name="deviceDetailoperatingSystem", + dtype=str, + input_name="properties.deviceDetail.operatingSystem"), + StringCatColumn(name="location", + dtype=str, + input_columns=[ + "properties.location.city", + "properties.location.countryOrRegion", + ], + sep=", "), + RenameColumn(name="statusfailureReason", dtype=str, input_name="properties.status.failureReason"), + ] + + source_schema = DataFrameInputSchema(json_columns=["properties"], column_info=source_column_info) + + # Preprocessing schema + preprocess_column_info = [ + ColumnInfo(name=config.ae.timestamp_column_name, dtype=datetime), + ColumnInfo(name=config.ae.userid_column_name, dtype=str), + ColumnInfo(name="appDisplayName", dtype=str), + ColumnInfo(name="clientAppUsed", dtype=str), + ColumnInfo(name="deviceDetailbrowser", dtype=str), + ColumnInfo(name="deviceDetaildisplayName", dtype=str), + ColumnInfo(name="deviceDetailoperatingSystem", dtype=str), + ColumnInfo(name="statusfailureReason", dtype=str), + + # Derived columns + IncrementColumn(name="logcount", + dtype=int, + input_name=config.ae.timestamp_column_name, + groupby_column=config.ae.userid_column_name), + DistinctIncrementColumn(name="locincrement", + dtype=int, + input_name="location", + groupby_column=config.ae.userid_column_name, + timestamp_column=config.ae.timestamp_column_name), + DistinctIncrementColumn(name="appincrement", + dtype=int, + input_name="appDisplayName", + groupby_column=config.ae.userid_column_name, + timestamp_column=config.ae.timestamp_column_name) + ] + + preprocess_schema = DataFrameInputSchema(column_info=preprocess_column_info, preserve_columns=["_batch_id"]) + + # Create a linear pipeline object + pipeline = LinearPipeline(config) + + pipeline.set_source( + MultiFileSource(config, + filenames=list(kwargs["input_file"]), + watch=kwargs["watch_inputs"], + watch_interval=kwargs["watch_interval"])) + + # Batch files into batches by time. Use the default ISO date extractor from the filename + pipeline.add_stage( + DFPFileBatcherStage(config, + period="D", + sampling_rate_s=sample_rate_s, + date_conversion_func=functools.partial(date_extractor, filename_regex=iso_date_regex), + start_time=start_time, + end_time=end_time)) + + parser_kwargs = None + if (file_type_override == FileTypes.JSON): + parser_kwargs = {"lines": False, "orient": "records"} + # Output is a list of fsspec files. Convert to DataFrames. This caches downloaded data + pipeline.add_stage( + DFPFileToDataFrameStage( + config, + schema=source_schema, + file_type=file_type_override, + parser_kwargs=parser_kwargs, # TODO(Devin) probably should be configurable too + cache_dir=cache_dir)) + + pipeline.add_stage(MonitorStage(config, description="Input data rate")) + + # This will split users or just use one single user + pipeline.add_stage( + DFPSplitUsersStage(config, + include_generic=include_generic, + include_individual=include_individual, + skip_users=skip_users, + only_users=only_users)) + + # Next, have a stage that will create rolling windows + pipeline.add_stage( + DFPRollingWindowStage( + config, + min_history=300 if is_training else 1, + min_increment=300 if is_training else 0, + # For inference, we only ever want 1 day max + max_history="60d" if is_training else "1d", + cache_dir=cache_dir)) + + # Output is UserMessageMeta -- Cached frame set + pipeline.add_stage(DFPPreprocessingStage(config, input_schema=preprocess_schema)) + + model_name_formatter = mlflow_model_name_template + experiment_name_formatter = mlflow_experiment_name_template + + if (is_training): + # Finally, perform training which will output a model + pipeline.add_stage(DFPTraining(config, epochs=100, validation_size=0.15)) + + pipeline.add_stage(MonitorStage(config, description="Training rate", smoothing=0.001)) + + # Write that model to MLFlow + pipeline.add_stage( + DFPMLFlowModelWriterStage(config, + model_name_formatter=model_name_formatter, + experiment_name_formatter=experiment_name_formatter)) + else: + # Perform inference on the preprocessed data + pipeline.add_stage(DFPInferenceStage(config, model_name_formatter=model_name_formatter)) + + pipeline.add_stage(MonitorStage(config, description="Inference rate", smoothing=0.001)) + + # Filter for only the anomalous logs + pipeline.add_stage( + FilterDetectionsStage(config, + threshold=filter_threshold, + filter_source=FilterSource.DATAFRAME, + field_name='mean_abs_z')) + pipeline.add_stage(DFPPostprocessingStage(config)) + + # Exclude the columns we don't want in our output + pipeline.add_stage(SerializeStage(config, exclude=['batch_count', 'origin_hash', '_row_hash', '_batch_id'])) + + # Write all anomalies to a CSV file + pipeline.add_stage(WriteToFileStage(config, filename=inference_detection_file_name, overwrite=True)) + + # Run the pipeline + pipeline.run() + + +if __name__ == "__main__": + # pylint: disable=no-value-for-parameter + run_pipeline(obj={}, auto_envvar_prefix='DFP', show_default=True, prog_name="dfp") diff --git a/morpheus/stages/general/buffer_stage.py b/morpheus/stages/general/buffer_stage.py index 6e529adbb5..261b4cec82 100644 --- a/morpheus/stages/general/buffer_stage.py +++ b/morpheus/stages/general/buffer_stage.py @@ -66,7 +66,8 @@ def supports_cpp_node(self): return False def _build_single(self, builder: mrc.Builder, input_node: mrc.SegmentObject) -> mrc.SegmentObject: - # This stage is no longer needed and is just a pass thru stage - deprecated_stage_warning(logger, type(self), self.unique_name) + reason = "The stage is no longer required to manage backpressure and has been deprecated. It has no" \ + " effect acts as a pass through stage." + deprecated_stage_warning(logger, type(self), self.unique_name, reason=reason) return input_node diff --git a/morpheus/stages/general/delay_stage.py b/morpheus/stages/general/delay_stage.py index f83e13b450..49768c32a2 100644 --- a/morpheus/stages/general/delay_stage.py +++ b/morpheus/stages/general/delay_stage.py @@ -66,7 +66,8 @@ def supports_cpp_node(self): return False def _build_single(self, builder: mrc.Builder, input_node: mrc.SegmentObject) -> mrc.SegmentObject: - # This stage is no longer needed and is just a pass thru stage - deprecated_stage_warning(logger, type(self), self.unique_name) + reason = "The stage is no longer required to manage backpressure and has been deprecated. It has no" \ + " effect acts as a pass through stage." + deprecated_stage_warning(logger, type(self), self.unique_name, reason=reason) return input_node diff --git a/morpheus/utils/logger.py b/morpheus/utils/logger.py index 7ef44a8897..693e6574b7 100644 --- a/morpheus/utils/logger.py +++ b/morpheus/utils/logger.py @@ -95,13 +95,14 @@ def _configure_from_log_file(log_config_file: str): logging.config.fileConfig(log_config_file) -def _configure_from_log_level(log_level: int): +def _configure_from_log_level(*extra_handlers: logging.Handler, log_level: int): """ Default config with only option being the logging level. Outputs to both the console and a file. Sets up a logging producer/consumer that works well in multi-thread/process environments. Parameters ---------- + *extra_handlers: List of additional handlers which will handle entries placed on the queue log_level : int Log level and above to report """ @@ -143,6 +144,7 @@ def _configure_from_log_level(log_level: int): queue_listener = logging.handlers.QueueListener(morpheus_logging_queue, console_handler, file_handler, + *extra_handlers, respect_handler_level=True) queue_listener.start() queue_listener._thread.name = "Logging Thread" @@ -155,7 +157,7 @@ def stop_queue_listener(): atexit.register(stop_queue_listener) -def configure_logging(log_level: int, log_config_file: str = None): +def configure_logging(*extra_handlers: logging.Handler, log_level: int = None, log_config_file: str = None): """ Configures Morpheus logging in one of two ways. Either specifying a logging config file to load or a logging level which will use a default configuration. The default configuration outputs to both the console and a file. Sets up a @@ -163,6 +165,7 @@ def configure_logging(log_level: int, log_config_file: str = None): Parameters ---------- + *extra_handlers: List of handlers to add to existing default console and file handlers. log_level: int Specifies the log level and above to output. Must be one of the available levels in the `logging` module. log_config_file: str, optional (default = None): @@ -180,7 +183,8 @@ def configure_logging(log_level: int, log_config_file: str = None): # Configure using log file _configure_from_log_file(log_config_file=log_config_file) else: - _configure_from_log_level(log_level=log_level) + assert log_level is not None, "log_level must be specified" + _configure_from_log_level(*extra_handlers, log_level=log_level) def set_log_level(log_level: int): @@ -211,12 +215,12 @@ def set_log_level(log_level: int): return old_level -def deprecated_stage_warning(logger, cls, name): +def deprecated_stage_warning(logger, cls, name, reason: str = None): """Log a warning about a deprecated stage.""" - logger.warning(("The '%s' stage ('%s') is no longer required to manage backpressure and has been deprecated. " - "It has no effect and acts as a pass through stage."), - cls.__name__, - name) + message = f"The '{cls.__name__}' stage ('{name}') has been deprecated and will be removed in a future version." + if reason is not None: + message = " ".join((message, reason)) + logger.warning(message) def deprecated_message_warning(logger, cls, new_cls): diff --git a/tests/test_logger.py b/tests/test_logger.py new file mode 100644 index 0000000000..dddf05d345 --- /dev/null +++ b/tests/test_logger.py @@ -0,0 +1,159 @@ +# SPDX-FileCopyrightText: Copyright (c) 2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import multiprocessing +import os +from unittest.mock import patch + +import pytest + +from _utils import TEST_DIRS +from morpheus.utils.logger import TqdmLoggingHandler +from morpheus.utils.logger import configure_logging +from morpheus.utils.logger import deprecated_message_warning +from morpheus.utils.logger import deprecated_stage_warning +from morpheus.utils.logger import set_log_level + + +@patch('logging.handlers.QueueListener') +@patch('logging.handlers.QueueHandler.emit') +def test_configure_logging_from_level_default_handlers(queue_handler, queue_listener): + configure_logging(log_level=logging.DEBUG) + morpheus_logger = logging.getLogger("morpheus") + assert morpheus_logger.level == logging.DEBUG + assert morpheus_logger.propagate is False + pos_args = queue_listener.call_args[0] + assert len(pos_args) == 3 + assert isinstance(pos_args[0], multiprocessing.queues.Queue) + assert isinstance(pos_args[1], TqdmLoggingHandler) + assert isinstance(pos_args[2], logging.handlers.RotatingFileHandler) + assert pos_args[2].baseFilename.endswith("morpheus.log") + morpheus_logger.debug("test") + queue_handler.assert_called() + + +def test_configure_logging__no_args(): + with pytest.raises(Exception) as excinfo: + configure_logging() + assert "log_level must be specified" in str(excinfo.value) + + +@patch('logging.handlers.RotatingFileHandler.emit') +@patch('morpheus.utils.logger.TqdmLoggingHandler.emit') +def test_configure_logging_from_file(console_handler, file_handler): + log_config_file = os.path.join(TEST_DIRS.tests_data_dir, "logging.json") + configure_logging(log_config_file=log_config_file) + morpheus_logger = logging.getLogger("morpheus") + assert morpheus_logger.level == logging.DEBUG + assert morpheus_logger.propagate is False + morpheus_logger.debug("test") + console_handler.assert_called_once() + file_handler.assert_called_once() + + +def test_configure_logging_from_file_filenotfound(): + with pytest.raises(FileNotFoundError): + configure_logging(log_config_file="does_not_exist.json") + + +@patch('logging.handlers.QueueListener') +@patch('logging.handlers.QueueHandler.emit') +def test_configure_logging_add_one_handler(queue_handler, queue_listener): + new_handler = logging.StreamHandler() + configure_logging(new_handler, log_level=logging.DEBUG) + morpheus_logger = logging.getLogger("morpheus") + assert morpheus_logger.level == logging.DEBUG + assert morpheus_logger.propagate is False + pos_args = queue_listener.call_args[0] + assert len(pos_args) == 4 + assert isinstance(pos_args[0], multiprocessing.queues.Queue) + assert isinstance(pos_args[1], TqdmLoggingHandler) + assert isinstance(pos_args[2], logging.handlers.RotatingFileHandler) + assert isinstance(pos_args[3], logging.StreamHandler) + morpheus_logger.debug("test") + queue_handler.assert_called() + + +@patch('logging.handlers.QueueListener') +@patch('logging.handlers.QueueHandler.emit') +def test_configure_logging_add_two_handlers(queue_handler, queue_listener): + new_handler_1 = logging.StreamHandler() + new_handler_2 = logging.StreamHandler() + configure_logging(new_handler_1, new_handler_2, log_level=logging.DEBUG) + morpheus_logger = logging.getLogger("morpheus") + assert morpheus_logger.level == logging.DEBUG + assert morpheus_logger.propagate is False + pos_args = queue_listener.call_args[0] + assert len(pos_args) == 5 + assert isinstance(pos_args[0], multiprocessing.queues.Queue) + assert isinstance(pos_args[1], TqdmLoggingHandler) + assert isinstance(pos_args[2], logging.handlers.RotatingFileHandler) + assert isinstance(pos_args[3], logging.StreamHandler) + assert isinstance(pos_args[4], logging.StreamHandler) + morpheus_logger.debug("test") + queue_handler.assert_called() + + +def test_set_log_level(): + configure_logging(log_level=logging.INFO) + morpheus_logger = logging.getLogger("morpheus") + assert morpheus_logger.level == logging.INFO + set_log_level(logging.DEBUG) + assert morpheus_logger.level == logging.DEBUG + + +def test_deprecated_stage_warning(caplog): + + class DummyStage(): + pass + + logger = logging.getLogger() + caplog.set_level(logging.WARNING) + deprecated_stage_warning(logger, DummyStage, "dummy_stage") + assert len(caplog.records) == 1 + assert caplog.records[0].levelname == "WARNING" + assert "The 'DummyStage' stage ('dummy_stage') has been deprecated" in caplog.text + + +def test_deprecated_stage_warning_with_reason(caplog): + + class DummyStage(): + pass + + logger = logging.getLogger() + caplog.set_level(logging.WARNING) + deprecated_stage_warning(logger, DummyStage, "dummy_stage", reason="This is the reason.") + assert len(caplog.records) == 1 + assert caplog.records[0].levelname == "WARNING" + assert "The 'DummyStage' stage ('dummy_stage') has been deprecated and will be removed in a future version. " \ + "This is the reason." in caplog.text + + +def test_deprecated_message_warning(caplog): + + class OldMessage(): + pass + + class NewMessage(): + pass + + logger = logging.getLogger() + caplog.set_level(logging.WARNING) + deprecated_message_warning(logger, OldMessage, NewMessage) + assert len(caplog.records) == 1 + assert caplog.records[0].levelname == "WARNING" + assert "The 'OldMessage' message has been deprecated and will be removed in a future version. " \ + "Please use 'NewMessage' instead." in caplog.text diff --git a/tests/tests_data/logging.json b/tests/tests_data/logging.json new file mode 100644 index 0000000000..bb681247b7 --- /dev/null +++ b/tests/tests_data/logging.json @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:4c9bffb074de716dc573421a31fa8ab75a3476676db1522fb839c2e75807faba +size 752