From 9e7a417abdf8621d39438f07bbe3bcfea558516f Mon Sep 17 00:00:00 2001 From: Eli Fajardo Date: Tue, 9 Jan 2024 19:19:54 -0500 Subject: [PATCH 01/15] add log monitoring to grafana dfp example --- .../production/conda_env.yml | 4 + .../production/docker-compose.yml | 19 +- .../production/grafana/README.md | 77 ++- .../production/grafana/config/grafana.ini | 2 +- .../production/grafana/config/loki-config.yml | 50 ++ ...DFP_Dashboard.json => dfp_detections.json} | 2 +- .../grafana/dashboards/dfp_logs.json | 78 +++ .../grafana/datasources/datasources.yaml | 6 + .../production/grafana/run.py | 535 ++++++++++++++++++ 9 files changed, 740 insertions(+), 33 deletions(-) create mode 100644 examples/digital_fingerprinting/production/grafana/config/loki-config.yml rename examples/digital_fingerprinting/production/grafana/dashboards/{DFP_Dashboard.json => dfp_detections.json} (99%) create mode 100644 examples/digital_fingerprinting/production/grafana/dashboards/dfp_logs.json create mode 100644 examples/digital_fingerprinting/production/grafana/run.py diff --git a/examples/digital_fingerprinting/production/conda_env.yml b/examples/digital_fingerprinting/production/conda_env.yml index 777fb6dbaa..7032734d0f 100644 --- a/examples/digital_fingerprinting/production/conda_env.yml +++ b/examples/digital_fingerprinting/production/conda_env.yml @@ -32,3 +32,7 @@ dependencies: - nvtabular=23.06 - papermill - s3fs>=2023.6 + + ##### Pip Dependencies (keep sorted!) ####### + - pip: + - python-logging-loki diff --git a/examples/digital_fingerprinting/production/docker-compose.yml b/examples/digital_fingerprinting/production/docker-compose.yml index a5105bb58a..159869417f 100644 --- a/examples/digital_fingerprinting/production/docker-compose.yml +++ b/examples/digital_fingerprinting/production/docker-compose.yml @@ -145,9 +145,26 @@ services: - ./grafana/config/dashboards.yaml:/etc/grafana/provisioning/dashboards/dashboards.yaml - ./grafana/dashboards/:/var/lib/grafana/dashboards/ - ./grafana/datasources/:/etc/grafana/provisioning/datasources/ - - ./morpheus:/workspace + - ./grafana:/workspace ports: - "3000:3000" + networks: + - frontend + - backend + depends_on: + - loki + + loki: + image: grafana/loki:2.9.1 + volumes: + - ./grafana/config/loki-config.yml:/etc/loki/loki-config.yml + ports: + - "3100:3100" + networks: + - frontend + - backend + restart: unless-stopped + command: -config.file=/etc/loki/loki-config.yml networks: frontend: diff --git a/examples/digital_fingerprinting/production/grafana/README.md b/examples/digital_fingerprinting/production/grafana/README.md index 797270e7b6..46bed2bcd5 100644 --- a/examples/digital_fingerprinting/production/grafana/README.md +++ b/examples/digital_fingerprinting/production/grafana/README.md @@ -1,5 +1,5 @@ -# Grafana DFP Dashboard Example +# Using Grafana with Morpheus DFP Pipeline -This example demonstrates how to use [Grafana](https://grafana.com/grafana/) to visualize the inference results from the [Azure DFP pipeline example](../production/README.md). +This example builds on the [Azure DFP pipeline example](../production/README.md) to demonstrate how [Grafana](https://grafana.com/grafana/) can be used for log monitoring, error alerting, and inference results visualization. ## Grafana Configuration -### CSV data source plugin +The data sources and dashboards in this example are managed using config files. [Grafana's provisioning system](https://grafana.com/docs/grafana/latest/administration/provisioning/) then uses these files to add the data sources and dashboards to Grafana upon startup. + +### Data Sources + +Grafana includes built-in support for many data sources. There are also several data sources available that can be installed as plugins. More information about how to manage Grafana data sources can be found [here](https://grafana.com/docs/grafana/latest/datasources/). + +The following data sources for this example are configured in [datasources.yaml](./datasources/datasources.yaml): + +#### Loki data source + +[Loki](https://grafana.com/docs/loki/latest/) is Grafana's log aggregation system. The Loki service is started automatically when the Grafana service starts up. The [Python script for running the DFP pipeline](./run.py) has been updated to configure a logging handler that sends the Morpheus logs to the Loki service. + +#### CSV data source plugin The [CSV data source plugin](https://grafana.com/grafana/plugins/marcusolsson-csv-datasource/) is installed to Grafana to read the Azure inference results CSV file. This example assumes we are using the CSV file generated from running the Python script for [Azure DFP pipeline example](../production/README.md). @@ -31,7 +43,7 @@ url: /workspace/notebooks/dfp_detections_azure.csv Please note that the use of the CSV plugin is for demonstration purposes only. Grafana includes support for many data sources more suitable for production deployments. See [here](https://grafana.com/docs/grafana/latest/datasources/) for more information. -### Updates to grafana.ini +#### Updates to grafana.ini The following is added to the default `grafana.ini` to enable local mode for CSV data source plugin. This allows the CSV data source plugin to access files on local file system. @@ -40,14 +52,8 @@ The following is added to the default `grafana.ini` to enable local mode for CSV allow_local_mode = true ``` -## Run Azure Production DFP Training and Inference Examples - -### Start Morpheus DFP pipeline container - -The following steps are taken from [Azure DFP pipeline example](../production/README.md). Run the followng commands to start the Morpheus container: - -Build the Morpheus container: - +## Build the Morpheus container: +From the root of the Morpheus repo: ```bash ./docker/build_container_release.sh ``` @@ -60,43 +66,54 @@ export MORPHEUS_CONTAINER_VERSION="$(git describe --tags --abbrev=0)-runtime" docker compose build ``` -Create `bash` shell in `morpheus_pipeline` container: +## Start Grafana and Loki services: +To start Grafana and Loki, run the following command on host in `examples/digital_fingerprinting/production`: ```bash -docker compose run morpheus_pipeline bash +docker compose up grafana ``` -### Run Azure training pipeline +## Run Azure DFP Training -Run the following in the container to train Azure models. +Create `bash` shell in `morpheus_pipeline` container: ```bash -python dfp_azure_pipeline.py --log_level INFO --train_users generic --start_time "2022-08-01" --input_file="../../../data/dfp/azure-training-data/AZUREAD_2022*.json" +docker compose run --rm morpheus_pipeline bash ``` -### Run Azure inference pipeline: - -Run the inference pipeline with `filter_threshold=0.0`. This will disable the filtering of the inference results. +Set `PYTHONPATH` environment variable to allow import of production DFP Morpheus stages: +``` +export PYTHONPATH=/workspace/examples/digital_fingerprinting/production/morpheus +``` +Run the following in the container to train the Azure models. ```bash -python dfp_azure_pipeline.py --log_level INFO --train_users none --start_time "2022-08-30" --input_file="../../../data/dfp/azure-inference-data/*.json" --filter_threshold=0.0 +cd /workspace/examples/digital_fingerprinting/production/grafana +python run.py --log_level DEBUG --train_users generic --start_time "2022-08-01" --input_file="../../../data/dfp/azure-training-data/AZUREAD_2022*.json" ``` -The inference results will be saved to `dfp_detection_azure.csv` in the directory where script was run. +## View DFP Logs Dashboard in Grafana -## Run Grafana Docker Image +While the training pipeline is running, you can view Morpheus logs live in a Grafana dashboard at http://localhost:3000/dashboards. -To start Grafana, run the following command on host in `examples/digital_fingerprinting/production`: +Click on `DFP Logs` in the `General` folder. You may need to expand the `General` folder to see the link. + +## Run Azure DFP Inference: + +Run the inference pipeline with `filter_threshold=0.0`. This will disable the filtering of the inference results. + +```bash +python run.py --log_level DEBUG --train_users none --start_time "2022-08-30" --input_file="../../../data/dfp/azure-inference-data/*.json" --filter_threshold=0.0 ``` -docker compose up grafana -``` -## View DFP Dashboard +The inference results will be saved to `dfp_detection_azure.csv` in the directory where script was run. + +## View DFP Detections Dashboard in Grafana -Our Grafana DFP dashboard can now be accessed via web browser at http://localhost:3000/dashboards. +When the inference pipeline completes, you can view visualizations of the inference results at http://localhost:3000/dashboards. -Click on `DFP_Dashboard` in the `General` folder. You may need to expand the `General` folder to see the link. +Click on `DFP Detections` in the `General` folder. You may need to expand the `General` folder to see the link. diff --git a/examples/digital_fingerprinting/production/grafana/config/grafana.ini b/examples/digital_fingerprinting/production/grafana/config/grafana.ini index 6b30172ff5..97df7a8bba 100644 --- a/examples/digital_fingerprinting/production/grafana/config/grafana.ini +++ b/examples/digital_fingerprinting/production/grafana/config/grafana.ini @@ -379,7 +379,7 @@ ;token_rotation_interval_minutes = 10 # Set to true to disable (hide) the login form, useful if you use OAuth, defaults to false -disable_login_form = true +;disable_login_form = true # Set to true to disable the sign out link in the side menu. Useful if you use auth.proxy or auth.jwt, defaults to false ;disable_signout_menu = false diff --git a/examples/digital_fingerprinting/production/grafana/config/loki-config.yml b/examples/digital_fingerprinting/production/grafana/config/loki-config.yml new file mode 100644 index 0000000000..68c3c9fb08 --- /dev/null +++ b/examples/digital_fingerprinting/production/grafana/config/loki-config.yml @@ -0,0 +1,50 @@ +auth_enabled: false + +server: + http_listen_port: 3100 + grpc_listen_port: 9096 + +common: + instance_addr: 127.0.0.1 + path_prefix: /tmp/loki + storage: + filesystem: + chunks_directory: /tmp/loki/chunks + rules_directory: /tmp/loki/rules + replication_factor: 1 + ring: + kvstore: + store: inmemory + +query_range: + results_cache: + cache: + embedded_cache: + enabled: true + max_size_mb: 100 + +schema_config: + configs: + - from: 2020-10-24 + store: boltdb-shipper + object_store: filesystem + schema: v11 + index: + prefix: index_ + period: 24h + +ruler: + alertmanager_url: http://localhost:9093 + +# By default, Loki will send anonymous, but uniquely-identifiable usage and configuration +# analytics to Grafana Labs. These statistics are sent to https://stats.grafana.org/ +# +# Statistics help us better understand how Loki is used, and they show us performance +# levels for most users. This helps us prioritize features and documentation. +# For more information on what's sent, look at +# https://github.com/grafana/loki/blob/main/pkg/usagestats/stats.go +# Refer to the buildReport method to see what goes into a report. +# +# If you would like to disable reporting, uncomment the following lines: +#analytics: +# reporting_enabled: false diff --git a/examples/digital_fingerprinting/production/grafana/dashboards/DFP_Dashboard.json b/examples/digital_fingerprinting/production/grafana/dashboards/dfp_detections.json similarity index 99% rename from examples/digital_fingerprinting/production/grafana/dashboards/DFP_Dashboard.json rename to examples/digital_fingerprinting/production/grafana/dashboards/dfp_detections.json index f80780a381..9167ecaca5 100644 --- a/examples/digital_fingerprinting/production/grafana/dashboards/DFP_Dashboard.json +++ b/examples/digital_fingerprinting/production/grafana/dashboards/dfp_detections.json @@ -557,7 +557,7 @@ }, "timepicker": {}, "timezone": "", - "title": "DFP_Dashboard", + "title": "DFP Detections", "uid": "f810d98f-bf31-42d4-98aa-9eb3fa187184", "version": 1, "weekStart": "" diff --git a/examples/digital_fingerprinting/production/grafana/dashboards/dfp_logs.json b/examples/digital_fingerprinting/production/grafana/dashboards/dfp_logs.json new file mode 100644 index 0000000000..c4ed0448c9 --- /dev/null +++ b/examples/digital_fingerprinting/production/grafana/dashboards/dfp_logs.json @@ -0,0 +1,78 @@ +{ + "annotations": { + "list": [ + { + "builtIn": 1, + "datasource": { + "type": "grafana", + "uid": "-- Grafana --" + }, + "enable": true, + "hide": true, + "iconColor": "rgba(0, 211, 255, 1)", + "name": "Annotations & Alerts", + "type": "dashboard" + } + ] + }, + "editable": true, + "fiscalYearStartMonth": 0, + "graphTooltip": 0, + "links": [], + "liveNow": false, + "panels": [ + { + "datasource": { + "type": "loki", + "uid": "P8E80F9AEF21F6940" + }, + "gridPos": { + "h": 18, + "w": 23, + "x": 0, + "y": 0 + }, + "id": 1, + "options": { + "dedupStrategy": "none", + "enableLogDetails": true, + "prettifyLogMessage": false, + "showCommonLabels": false, + "showLabels": false, + "showTime": false, + "sortOrder": "Ascending", + "wrapLogMessage": false + }, + "targets": [ + { + "datasource": { + "type": "loki", + "uid": "P8E80F9AEF21F6940" + }, + "editorMode": "builder", + "expr": "{app=\"morpheus\"} |= ``", + "queryType": "range", + "refId": "A" + } + ], + "type": "logs" + } + ], + "refresh": "5s", + "schemaVersion": 38, + "style": "dark", + "tags": [], + "templating": { + "list": [] + }, + "time": { + "from": "now-6h", + "to": "now" + }, + "timepicker": {}, + "timezone": "", + "title": "DFP Logs", + "uid": "dfb4fe34-daae-4894-9ff0-b8f89b7d256e", + "version": 1, + "weekStart": "" +} \ No newline at end of file diff --git a/examples/digital_fingerprinting/production/grafana/datasources/datasources.yaml b/examples/digital_fingerprinting/production/grafana/datasources/datasources.yaml index 6489932465..21f2a2fa0f 100644 --- a/examples/digital_fingerprinting/production/grafana/datasources/datasources.yaml +++ b/examples/digital_fingerprinting/production/grafana/datasources/datasources.yaml @@ -16,6 +16,12 @@ apiVersion: 1 datasources: + - name: Loki + type: loki + access: proxy + url: http://loki:3100 + jsonData: + maxLines: 1000 - name: csv-datasource uid: 1257c93b-f998-438c-a784-7e90fb94fb36 type: marcusolsson-csv-datasource diff --git a/examples/digital_fingerprinting/production/grafana/run.py b/examples/digital_fingerprinting/production/grafana/run.py new file mode 100644 index 0000000000..4f2f66ffd9 --- /dev/null +++ b/examples/digital_fingerprinting/production/grafana/run.py @@ -0,0 +1,535 @@ +# Copyright (c) 2022-2023, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""DFP training & inference pipelines for Azure Active Directory logs.""" + +import functools +import logging +import os +import typing +from datetime import datetime +from datetime import timedelta +from datetime import timezone + +import click +import mlflow +import mrc +import pandas as pd +from dfp.stages.dfp_file_batcher_stage import DFPFileBatcherStage +from dfp.stages.dfp_file_to_df import DFPFileToDataFrameStage +from dfp.stages.dfp_inference_stage import DFPInferenceStage +from dfp.stages.dfp_mlflow_model_writer import DFPMLFlowModelWriterStage +from dfp.stages.dfp_postprocessing_stage import DFPPostprocessingStage +from dfp.stages.dfp_preprocessing_stage import DFPPreprocessingStage +from dfp.stages.dfp_rolling_window_stage import DFPRollingWindowStage +from dfp.stages.dfp_split_users_stage import DFPSplitUsersStage +from dfp.stages.dfp_training import DFPTraining +from dfp.stages.multi_file_source import MultiFileSource +from dfp.utils.regex_utils import iso_date_regex + +from morpheus.cli.utils import get_log_levels +from morpheus.cli.utils import get_package_relative_file +from morpheus.cli.utils import load_labels_file +from morpheus.cli.utils import parse_log_level +from morpheus.common import FileTypes +from morpheus.common import FilterSource +from morpheus.config import Config +from morpheus.config import ConfigAutoEncoder +from morpheus.config import CppConfig +from morpheus.pipeline import LinearPipeline +from morpheus.stages.general.monitor_stage import MonitorStage +from morpheus.stages.output.write_to_file_stage import WriteToFileStage +from morpheus.stages.postprocess.filter_detections_stage import FilterDetectionsStage +from morpheus.stages.postprocess.serialize_stage import SerializeStage +from morpheus.utils.column_info import ColumnInfo +from morpheus.utils.column_info import DataFrameInputSchema +from morpheus.utils.column_info import DateTimeColumn +from morpheus.utils.column_info import DistinctIncrementColumn +from morpheus.utils.column_info import IncrementColumn +from morpheus.utils.column_info import RenameColumn +from morpheus.utils.column_info import StringCatColumn +from morpheus.utils.file_utils import date_extractor +from morpheus.utils.logger import set_log_level +from morpheus.utils.logger import TqdmLoggingHandler + +import appdirs +import logging.handlers +import logging_loki +import multiprocessing + + +def configure_logging(log_level: int, loki_url: str): + mrc.logging.init_logging("morpheus") + logging.captureWarnings(True) + + # Get the root Morpheus logger + morpheus_logger = logging.getLogger("morpheus") + + # Set the level here + set_log_level(log_level=log_level) + + # Dont propagate upstream + morpheus_logger.propagate = False + morpheus_logging_queue = multiprocessing.Queue() + + # This needs the be the only handler for morpheus logger + morpheus_queue_handler = logging.handlers.QueueHandler(morpheus_logging_queue) + + # At this point, any morpheus logger will propagate upstream to the morpheus root and then be handled by the queue + # handler + morpheus_logger.addHandler(morpheus_queue_handler) + + log_file = os.path.join(appdirs.user_log_dir(appauthor="NVIDIA", appname="morpheus"), "morpheus.log") + + # Ensure the log directory exists + os.makedirs(os.path.dirname(log_file), exist_ok=True) + + # Now we build all of the handlers for the queue listener + file_handler = logging.handlers.RotatingFileHandler(filename=log_file, backupCount=5, maxBytes=1000000) + file_handler.setLevel(logging.DEBUG) + file_handler.setFormatter( + logging.Formatter('%(asctime)s - [%(levelname)s]: %(message)s {%(name)s, %(threadName)s}')) + + # Tqdm stream handler (avoids messing with progress bars) + console_handler = TqdmLoggingHandler() + + loki_handler = logging_loki.LokiHandler( + url=f"{loki_url}/loki/api/v1/push", + # url="http://loki:3100/loki/api/v1/push", + tags={"app": "morpheus"}, + version="1", + ) + + # Build and run the queue listener to actually process queued messages + queue_listener = logging.handlers.QueueListener(morpheus_logging_queue, + console_handler, + file_handler, + loki_handler, + respect_handler_level=True) + queue_listener.start() + queue_listener._thread.name = "Logging Thread" + + # Register a function to kill the listener thread before shutting down. prevents error on intpreter close + def stop_queue_listener(): + queue_listener.stop() + + import atexit + atexit.register(stop_queue_listener) + + +def _file_type_name_to_enum(file_type: str) -> FileTypes: + """Converts a file type name to a FileTypes enum.""" + if (file_type == "JSON"): + return FileTypes.JSON + if (file_type == "CSV"): + return FileTypes.CSV + if (file_type == "PARQUET"): + return FileTypes.PARQUET + + return FileTypes.Auto + + +@click.command() +@click.option( + "--train_users", + type=click.Choice(["all", "generic", "individual", "none"], case_sensitive=False), + help=("Indicates whether or not to train per user or a generic model for all users. " + "Selecting none runs the inference pipeline."), +) +@click.option( + "--skip_user", + multiple=True, + type=str, + help="User IDs to skip. Mutually exclusive with only_user", +) +@click.option( + "--only_user", + multiple=True, + type=str, + help="Only users specified by this option will be included. Mutually exclusive with skip_user", +) +@click.option( + "--start_time", + type=click.DateTime( + formats=['%Y-%m-%d', '%Y-%m-%dT%H:%M:%S', '%Y-%m-%d %H:%M:%S', '%Y-%m-%dT%H:%M:%S%z', '%Y-%m-%d %H:%M:%S%z']), + default=None, + help="The start of the time window, if undefined start_date will be `now()-duration`", +) +@click.option( + "--duration", + type=str, + default="60d", + help="The duration to run starting from start_time", +) +@click.option( + "--cache_dir", + type=str, + default="./.cache/dfp", + show_envvar=True, + help="The location to cache data such as S3 downloads and pre-processed data", +) +@click.option("--log_level", + default=logging.getLevelName(Config().log_level), + type=click.Choice(get_log_levels(), case_sensitive=False), + callback=parse_log_level, + help="Specify the logging level to use.") +@click.option("--sample_rate_s", + type=int, + default=0, + show_envvar=True, + help="Minimum time step, in milliseconds, between object logs.") +@click.option("--filter_threshold", + type=float, + default=2.0, + show_envvar=True, + help="Filter out inference results below this threshold") +@click.option( + "--input_file", + "-f", + type=str, + multiple=True, + help=("List of files to process. Can specify multiple arguments for multiple files. " + "Also accepts glob (*) wildcards and schema prefixes such as `s3://`. " + "For example, to make a local cache of an s3 bucket, use `filecache::s3://mybucket/*`. " + "Refer to fsspec documentation for list of possible options."), +) +@click.option("--file_type_override", + "-t", + type=click.Choice(["AUTO", "JSON", "CSV", "PARQUET"], case_sensitive=False), + default="JSON", + help="Override the detected file type. Values can be 'AUTO', 'JSON', 'CSV', or 'PARQUET'.", + callback=lambda _, + __, + value: None if value is None else _file_type_name_to_enum(value)) +@click.option('--watch_inputs', + type=bool, + is_flag=True, + default=False, + help=("Instructs the pipeline to continuously check the paths specified by `--input_file` for new files. " + "This assumes that the at least one paths contains a wildcard.")) +@click.option("--watch_interval", + type=float, + default=1.0, + help=("Amount of time, in seconds, to wait between checks for new files. " + "Only used if --watch_inputs is set.")) +@click.option('--tracking_uri', + type=str, + default="http://mlflow:5000", + help=("The MLflow tracking URI to connect to the tracking backend.")) +@click.option('--mlflow_experiment_name_template', + type=str, + default="dfp/azure/training/{reg_model_name}", + help="The MLflow experiment name template to use when logging experiments. ") +@click.option('--mlflow_model_name_template', + type=str, + default="DFP-azure-{user_id}", + help="The MLflow model name template to use when logging models. ") +@click.option('--use_postproc_schema', is_flag=True, help='Assume that input data has already been preprocessed.') +@click.option('--inference_detection_file_name', type=str, default="dfp_detections_azure.csv") +@click.option('--loki_url', + type=str, + default="http://loki:3100", + help=("Loki URL for error logging and alerting in Grafana.")) +def run_pipeline(train_users, + skip_user: typing.Tuple[str], + only_user: typing.Tuple[str], + start_time: datetime, + duration, + cache_dir, + log_level, + sample_rate_s, + filter_threshold, + mlflow_experiment_name_template, + mlflow_model_name_template, + file_type_override, + use_postproc_schema, + inference_detection_file_name, + loki_url, + **kwargs): + """Runs the DFP pipeline.""" + # To include the generic, we must be training all or generic + include_generic = train_users in ("all", "generic") + + # To include individual, we must be either training or inferring + include_individual = train_users != "generic" + + # None indicates we aren't training anything + is_training = train_users != "none" + + skip_users = list(skip_user) + only_users = list(only_user) + + duration = timedelta(seconds=pd.Timedelta(duration).total_seconds()) + if start_time is None: + end_time = datetime.now(tz=timezone.utc) + start_time = end_time - duration + else: + if start_time.tzinfo is None: + start_time = start_time.replace(tzinfo=timezone.utc) + + end_time = start_time + duration + + # Enable the Morpheus logger + configure_logging(log_level=log_level, loki_url=loki_url) + logging.getLogger("mlflow").setLevel(log_level) + + if (len(skip_users) > 0 and len(only_users) > 0): + logging.error("Option --skip_user and --only_user are mutually exclusive. Exiting") + + logger = logging.getLogger(f"morpheus.{__name__}") + + logger.info("Running training pipeline with the following options: ") + logger.info("Train generic_user: %s", include_generic) + logger.info("Skipping users: %s", skip_users) + logger.info("Start Time: %s", start_time) + logger.info("Duration: %s", duration) + logger.info("Cache Dir: %s", cache_dir) + + if ("tracking_uri" in kwargs): + # Initialize ML Flow + mlflow.set_tracking_uri(kwargs["tracking_uri"]) + logger.info("Tracking URI: %s", mlflow.get_tracking_uri()) + + config = Config() + + CppConfig.set_should_use_cpp(False) + + config.num_threads = os.cpu_count() + + config.ae = ConfigAutoEncoder() + + config.ae.feature_columns = load_labels_file(get_package_relative_file("data/columns_ae_azure.txt")) + config.ae.userid_column_name = "username" + config.ae.timestamp_column_name = "timestamp" + + # Specify the column names to ensure all data is uniform + if (use_postproc_schema): + + source_column_info = [ + ColumnInfo(name="autonomousSystemNumber", dtype=str), + ColumnInfo(name="location_geoCoordinates_latitude", dtype=float), + ColumnInfo(name="location_geoCoordinates_longitude", dtype=float), + ColumnInfo(name="resourceDisplayName", dtype=str), + ColumnInfo(name="travel_speed_kmph", dtype=float), + DateTimeColumn(name=config.ae.timestamp_column_name, dtype=datetime, input_name="time"), + ColumnInfo(name="appDisplayName", dtype=str), + ColumnInfo(name="clientAppUsed", dtype=str), + RenameColumn(name=config.ae.userid_column_name, dtype=str, input_name="userPrincipalName"), + RenameColumn(name="deviceDetailbrowser", dtype=str, input_name="deviceDetail_browser"), + RenameColumn(name="deviceDetaildisplayName", dtype=str, input_name="deviceDetail_displayName"), + RenameColumn(name="deviceDetailoperatingSystem", dtype=str, input_name="deviceDetail_operatingSystem"), + + # RenameColumn(name="location_country", dtype=str, input_name="location_countryOrRegion"), + ColumnInfo(name="location_city_state_country", dtype=str), + ColumnInfo(name="location_state_country", dtype=str), + ColumnInfo(name="location_country", dtype=str), + + # Non-features + ColumnInfo(name="is_corp_vpn", dtype=bool), + ColumnInfo(name="distance_km", dtype=float), + ColumnInfo(name="ts_delta_hour", dtype=float), + ] + source_schema = DataFrameInputSchema(column_info=source_column_info) + + preprocess_column_info = [ + ColumnInfo(name=config.ae.timestamp_column_name, dtype=datetime), + ColumnInfo(name=config.ae.userid_column_name, dtype=str), + + # Resource access + ColumnInfo(name="appDisplayName", dtype=str), + ColumnInfo(name="resourceDisplayName", dtype=str), + ColumnInfo(name="clientAppUsed", dtype=str), + + # Device detail + ColumnInfo(name="deviceDetailbrowser", dtype=str), + ColumnInfo(name="deviceDetaildisplayName", dtype=str), + ColumnInfo(name="deviceDetailoperatingSystem", dtype=str), + + # Location information + ColumnInfo(name="autonomousSystemNumber", dtype=str), + ColumnInfo(name="location_geoCoordinates_latitude", dtype=float), + ColumnInfo(name="location_geoCoordinates_longitude", dtype=float), + ColumnInfo(name="location_city_state_country", dtype=str), + ColumnInfo(name="location_state_country", dtype=str), + ColumnInfo(name="location_country", dtype=str), + + # Derived information + ColumnInfo(name="travel_speed_kmph", dtype=float), + + # Non-features + ColumnInfo(name="is_corp_vpn", dtype=bool), + ColumnInfo(name="distance_km", dtype=float), + ColumnInfo(name="ts_delta_hour", dtype=float), + ] + + preprocess_schema = DataFrameInputSchema(column_info=preprocess_column_info, preserve_columns=["_batch_id"]) + + exclude_from_training = [ + config.ae.userid_column_name, + config.ae.timestamp_column_name, + "is_corp_vpn", + "distance_km", + "ts_delta_hour", + ] + + config.ae.feature_columns = [ + name for (name, dtype) in preprocess_schema.output_columns if name not in exclude_from_training + ] + else: + source_column_info = [ + DateTimeColumn(name=config.ae.timestamp_column_name, dtype=datetime, input_name="time"), + RenameColumn(name=config.ae.userid_column_name, dtype=str, input_name="properties.userPrincipalName"), + RenameColumn(name="appDisplayName", dtype=str, input_name="properties.appDisplayName"), + ColumnInfo(name="category", dtype=str), + RenameColumn(name="clientAppUsed", dtype=str, input_name="properties.clientAppUsed"), + RenameColumn(name="deviceDetailbrowser", dtype=str, input_name="properties.deviceDetail.browser"), + RenameColumn(name="deviceDetaildisplayName", dtype=str, input_name="properties.deviceDetail.displayName"), + RenameColumn(name="deviceDetailoperatingSystem", + dtype=str, + input_name="properties.deviceDetail.operatingSystem"), + StringCatColumn(name="location", + dtype=str, + input_columns=[ + "properties.location.city", + "properties.location.countryOrRegion", + ], + sep=", "), + RenameColumn(name="statusfailureReason", dtype=str, input_name="properties.status.failureReason"), + ] + + source_schema = DataFrameInputSchema(json_columns=["properties"], column_info=source_column_info) + + # Preprocessing schema + preprocess_column_info = [ + ColumnInfo(name=config.ae.timestamp_column_name, dtype=datetime), + ColumnInfo(name=config.ae.userid_column_name, dtype=str), + ColumnInfo(name="appDisplayName", dtype=str), + ColumnInfo(name="clientAppUsed", dtype=str), + ColumnInfo(name="deviceDetailbrowser", dtype=str), + ColumnInfo(name="deviceDetaildisplayName", dtype=str), + ColumnInfo(name="deviceDetailoperatingSystem", dtype=str), + ColumnInfo(name="statusfailureReason", dtype=str), + + # Derived columns + IncrementColumn(name="logcount", + dtype=int, + input_name=config.ae.timestamp_column_name, + groupby_column=config.ae.userid_column_name), + DistinctIncrementColumn(name="locincrement", + dtype=int, + input_name="location", + groupby_column=config.ae.userid_column_name, + timestamp_column=config.ae.timestamp_column_name), + DistinctIncrementColumn(name="appincrement", + dtype=int, + input_name="appDisplayName", + groupby_column=config.ae.userid_column_name, + timestamp_column=config.ae.timestamp_column_name) + ] + + preprocess_schema = DataFrameInputSchema(column_info=preprocess_column_info, preserve_columns=["_batch_id"]) + + # Create a linear pipeline object + pipeline = LinearPipeline(config) + + pipeline.set_source( + MultiFileSource(config, + filenames=list(kwargs["input_file"]), + watch=kwargs["watch_inputs"], + watch_interval=kwargs["watch_interval"])) + + # Batch files into batches by time. Use the default ISO date extractor from the filename + pipeline.add_stage( + DFPFileBatcherStage(config, + period="D", + sampling_rate_s=sample_rate_s, + date_conversion_func=functools.partial(date_extractor, filename_regex=iso_date_regex), + start_time=start_time, + end_time=end_time)) + + parser_kwargs = None + if (file_type_override == FileTypes.JSON): + parser_kwargs = {"lines": False, "orient": "records"} + # Output is a list of fsspec files. Convert to DataFrames. This caches downloaded data + pipeline.add_stage( + DFPFileToDataFrameStage( + config, + schema=source_schema, + file_type=file_type_override, + parser_kwargs=parser_kwargs, # TODO(Devin) probably should be configurable too + cache_dir=cache_dir)) + + pipeline.add_stage(MonitorStage(config, description="Input data rate")) + + # This will split users or just use one single user + pipeline.add_stage( + DFPSplitUsersStage(config, + include_generic=include_generic, + include_individual=include_individual, + skip_users=skip_users, + only_users=only_users)) + + # Next, have a stage that will create rolling windows + pipeline.add_stage( + DFPRollingWindowStage( + config, + min_history=300 if is_training else 1, + min_increment=300 if is_training else 0, + # For inference, we only ever want 1 day max + max_history="60d" if is_training else "1d", + cache_dir=cache_dir)) + + # Output is UserMessageMeta -- Cached frame set + pipeline.add_stage(DFPPreprocessingStage(config, input_schema=preprocess_schema)) + + model_name_formatter = mlflow_model_name_template + experiment_name_formatter = mlflow_experiment_name_template + + if (is_training): + # Finally, perform training which will output a model + pipeline.add_stage(DFPTraining(config, epochs=100, validation_size=0.15)) + + pipeline.add_stage(MonitorStage(config, description="Training rate", smoothing=0.001)) + + # Write that model to MLFlow + pipeline.add_stage( + DFPMLFlowModelWriterStage(config, + model_name_formatter=model_name_formatter, + experiment_name_formatter=experiment_name_formatter)) + else: + # Perform inference on the preprocessed data + pipeline.add_stage(DFPInferenceStage(config, model_name_formatter=model_name_formatter)) + + pipeline.add_stage(MonitorStage(config, description="Inference rate", smoothing=0.001)) + + # Filter for only the anomalous logs + pipeline.add_stage( + FilterDetectionsStage(config, + threshold=filter_threshold, + filter_source=FilterSource.DATAFRAME, + field_name='mean_abs_z')) + pipeline.add_stage(DFPPostprocessingStage(config)) + + # Exclude the columns we don't want in our output + pipeline.add_stage(SerializeStage(config, exclude=['batch_count', 'origin_hash', '_row_hash', '_batch_id'])) + + # Write all anomalies to a CSV file + pipeline.add_stage(WriteToFileStage(config, filename=inference_detection_file_name, overwrite=True)) + + # Run the pipeline + pipeline.run() + + +if __name__ == "__main__": + # pylint: disable=no-value-for-parameter + run_pipeline(obj={}, auto_envvar_prefix='DFP', show_default=True, prog_name="dfp") From ef4bc1264c71595500aebb11b1fa9babec2aec05 Mon Sep 17 00:00:00 2001 From: Eli Fajardo Date: Wed, 10 Jan 2024 10:40:10 -0500 Subject: [PATCH 02/15] update copyright years --- .../production/conda_env.yml | 2 +- .../production/docker-compose.yml | 2 +- .../production/grafana/config/loki-config.yml | 15 +++++++++++++++ .../grafana/datasources/datasources.yaml | 2 +- .../production/grafana/run.py | 13 ++++++------- 5 files changed, 24 insertions(+), 10 deletions(-) diff --git a/examples/digital_fingerprinting/production/conda_env.yml b/examples/digital_fingerprinting/production/conda_env.yml index 7032734d0f..80e40b9f88 100644 --- a/examples/digital_fingerprinting/production/conda_env.yml +++ b/examples/digital_fingerprinting/production/conda_env.yml @@ -1,4 +1,4 @@ -# SPDX-FileCopyrightText: Copyright (c) 2021-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-FileCopyrightText: Copyright (c) 2021-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. # SPDX-License-Identifier: Apache-2.0 # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/examples/digital_fingerprinting/production/docker-compose.yml b/examples/digital_fingerprinting/production/docker-compose.yml index 159869417f..61f02c9201 100644 --- a/examples/digital_fingerprinting/production/docker-compose.yml +++ b/examples/digital_fingerprinting/production/docker-compose.yml @@ -1,4 +1,4 @@ -# SPDX-FileCopyrightText: Copyright (c) 2022-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-FileCopyrightText: Copyright (c) 2022-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. # SPDX-License-Identifier: Apache-2.0 # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/examples/digital_fingerprinting/production/grafana/config/loki-config.yml b/examples/digital_fingerprinting/production/grafana/config/loki-config.yml index 68c3c9fb08..77cfa39956 100644 --- a/examples/digital_fingerprinting/production/grafana/config/loki-config.yml +++ b/examples/digital_fingerprinting/production/grafana/config/loki-config.yml @@ -1,3 +1,18 @@ +# SPDX-FileCopyrightText: Copyright (c) 2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + auth_enabled: false server: diff --git a/examples/digital_fingerprinting/production/grafana/datasources/datasources.yaml b/examples/digital_fingerprinting/production/grafana/datasources/datasources.yaml index 21f2a2fa0f..edcebabb8a 100644 --- a/examples/digital_fingerprinting/production/grafana/datasources/datasources.yaml +++ b/examples/digital_fingerprinting/production/grafana/datasources/datasources.yaml @@ -1,4 +1,4 @@ -# SPDX-FileCopyrightText: Copyright (c) 2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-FileCopyrightText: Copyright (c) 2023-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. # SPDX-License-Identifier: Apache-2.0 # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/examples/digital_fingerprinting/production/grafana/run.py b/examples/digital_fingerprinting/production/grafana/run.py index 4f2f66ffd9..f70044abd0 100644 --- a/examples/digital_fingerprinting/production/grafana/run.py +++ b/examples/digital_fingerprinting/production/grafana/run.py @@ -1,4 +1,4 @@ -# Copyright (c) 2022-2023, NVIDIA CORPORATION. +# Copyright (c) 2022-2024, NVIDIA CORPORATION. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -15,13 +15,17 @@ import functools import logging +import logging.handlers +import multiprocessing import os import typing from datetime import datetime from datetime import timedelta from datetime import timezone +import appdirs import click +import logging_loki import mlflow import mrc import pandas as pd @@ -59,13 +63,8 @@ from morpheus.utils.column_info import RenameColumn from morpheus.utils.column_info import StringCatColumn from morpheus.utils.file_utils import date_extractor -from morpheus.utils.logger import set_log_level from morpheus.utils.logger import TqdmLoggingHandler - -import appdirs -import logging.handlers -import logging_loki -import multiprocessing +from morpheus.utils.logger import set_log_level def configure_logging(log_level: int, loki_url: str): From 51e630f838219182dfe34ceb2d306106cd6ce09e Mon Sep 17 00:00:00 2001 From: Eli Fajardo Date: Thu, 11 Jan 2024 14:10:03 -0500 Subject: [PATCH 03/15] add dfp logs dashboard screenshot --- examples/digital_fingerprinting/production/grafana/README.md | 3 ++- .../img/{screenshot.png => dfp_detections_dashboard.png} | 0 .../production/grafana/img/dfp_logs_dashboard.png | 3 +++ 3 files changed, 5 insertions(+), 1 deletion(-) rename examples/digital_fingerprinting/production/grafana/img/{screenshot.png => dfp_detections_dashboard.png} (100%) create mode 100644 examples/digital_fingerprinting/production/grafana/img/dfp_logs_dashboard.png diff --git a/examples/digital_fingerprinting/production/grafana/README.md b/examples/digital_fingerprinting/production/grafana/README.md index 46bed2bcd5..2d83ee0b1e 100644 --- a/examples/digital_fingerprinting/production/grafana/README.md +++ b/examples/digital_fingerprinting/production/grafana/README.md @@ -98,6 +98,7 @@ While the training pipeline is running, you can view Morpheus logs live in a Gra Click on `DFP Logs` in the `General` folder. You may need to expand the `General` folder to see the link. + ## Run Azure DFP Inference: @@ -115,7 +116,7 @@ When the inference pipeline completes, you can view visualizations of the infere Click on `DFP Detections` in the `General` folder. You may need to expand the `General` folder to see the link. - + The dashboard has the following visualization panels: diff --git a/examples/digital_fingerprinting/production/grafana/img/screenshot.png b/examples/digital_fingerprinting/production/grafana/img/dfp_detections_dashboard.png similarity index 100% rename from examples/digital_fingerprinting/production/grafana/img/screenshot.png rename to examples/digital_fingerprinting/production/grafana/img/dfp_detections_dashboard.png diff --git a/examples/digital_fingerprinting/production/grafana/img/dfp_logs_dashboard.png b/examples/digital_fingerprinting/production/grafana/img/dfp_logs_dashboard.png new file mode 100644 index 0000000000..8cec30b668 --- /dev/null +++ b/examples/digital_fingerprinting/production/grafana/img/dfp_logs_dashboard.png @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:753f199658371c462e6cbdbc324ee08acdafcf5453d0dae9b4042d133dfbabe0 +size 581211 From 433d1f2b24819e8ab324cf688d01bdc2f663f50c Mon Sep 17 00:00:00 2001 From: Eli Fajardo Date: Thu, 11 Jan 2024 18:43:45 -0500 Subject: [PATCH 04/15] update grafana and loki versions --- examples/digital_fingerprinting/production/docker-compose.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/examples/digital_fingerprinting/production/docker-compose.yml b/examples/digital_fingerprinting/production/docker-compose.yml index 61f02c9201..fbb232ca29 100644 --- a/examples/digital_fingerprinting/production/docker-compose.yml +++ b/examples/digital_fingerprinting/production/docker-compose.yml @@ -137,7 +137,7 @@ services: - sys_nice grafana: - image: grafana/grafana:10.0.0 + image: grafana/grafana:10.2.3 environment: GF_INSTALL_PLUGINS: "marcusolsson-csv-datasource" volumes: @@ -155,7 +155,7 @@ services: - loki loki: - image: grafana/loki:2.9.1 + image: grafana/loki:2.9.3 volumes: - ./grafana/config/loki-config.yml:/etc/loki/loki-config.yml ports: From 9ac1f800057e7cdea577830bf035cb24711b6125 Mon Sep 17 00:00:00 2001 From: Eli Fajardo Date: Thu, 11 Jan 2024 18:44:38 -0500 Subject: [PATCH 05/15] add error alerting section to readme --- .../production/grafana/README.md | 44 ++++++++++++++++--- .../grafana/img/dfp_error_alert_setup.png | 3 ++ 2 files changed, 42 insertions(+), 5 deletions(-) create mode 100644 examples/digital_fingerprinting/production/grafana/img/dfp_error_alert_setup.png diff --git a/examples/digital_fingerprinting/production/grafana/README.md b/examples/digital_fingerprinting/production/grafana/README.md index 2d83ee0b1e..88c9572ff3 100644 --- a/examples/digital_fingerprinting/production/grafana/README.md +++ b/examples/digital_fingerprinting/production/grafana/README.md @@ -36,11 +36,6 @@ The following data sources for this example are configured in [datasources.yaml] The [CSV data source plugin](https://grafana.com/grafana/plugins/marcusolsson-csv-datasource/) is installed to Grafana to read the Azure inference results CSV file. This example assumes we are using the CSV file generated from running the Python script for [Azure DFP pipeline example](../production/README.md). -If using the [notebook version](../production/morpheus/notebooks/dfp_azure_inference.ipynb) to run inference, you'll need to update the `url` in [datasources.yaml](./datasources/datasources.yaml) as follows: -``` -url: /workspace/notebooks/dfp_detections_azure.csv -``` - Please note that the use of the CSV plugin is for demonstration purposes only. Grafana includes support for many data sources more suitable for production deployments. See [here](https://grafana.com/docs/grafana/latest/datasources/) for more information. #### Updates to grafana.ini @@ -100,6 +95,45 @@ Click on `DFP Logs` in the `General` folder. You may need to expand the `General +This dashboard was provisioned using config files but can also be manually created with the following steps: +1. Click `Dashboards` in the left-side menu. +2. Click `New` and select `New Dashboard`. +3. On the empty dashboard, click `+ Add visualization`. +4. In the dialog box that opens, Select the `Loki` data source. +5. In the `Edit Panel` view, change from `Time Series` visualization to `Logs`. +6. Add label filter: `app = morpheus`. +7. Change Order to `Oldest first`. +8. Click `Apply` to see your changes applied to the dashboard. Then click the save icon in the dashboard header. + +## Set up Error Alerting + +We demonstrate here with a simple example how we can use Grafana Alerting to notify us of a pipeline error moments after it occurs. This is especially useful with long-running pipelines. + +1. Click `Alert Rules` under `Alerting` in the left-side menu. +2. Click `New Alert Rule` +3. Enter alert rule name: `DFP Error Alert Rule` +4. In `Define query and alert condition` section, select `Loki` data source. +5. Switch to `Code` view by clicking the `Code` button on the right. +6. Enter the folling Loki Query which counts the number of log lines in last minute that have an error label (`severity=error`): +``` +count_over_time({severity="error"}[1m]) +``` +7. Under `Expressions`, keep default configurations for `Reduce` and `Threshold`. The alert condition threshold will be error counts > 0. +7. In `Set evaluation behavior` section, click `+ New folder`, enter `morpheus` then click `Create` button. +8. Click `+ New evaluation group`, enter `dfp` for `Evaluation group name` and `1m` for `Evaluation interval`, then click `Create` button. +9. Enter `0s` for `Pending period`. This configures alert to be fired instantly when alert condition is met. +10. Test your alert rule, by running the following in your `morpheus_pipeline` container. This will cause an error because `--input-file` glob will no longer match any of our training data files. +``` +python run.py --log_level DEBUG --train_users generic --start_time "2022-08-01" --input_file="../../../data/dfp/azure-training-data/AZUREAD_2022*.json" +``` +11. Click the `Preview` button to test run the alert rule. You should now see the how our alert query picks up the error log, processes through our reduce/threshold expressions and satisfies our alert condition. This is indicated by the `Firing` label in the `Threshold` section. + + + +12. Finally, click `Save rule and exit` at top right of the page. + +By default, all alerts will be sent through the `grafana-default-email` contact point. You can add email addresses to this contact point by clicking on `Contact points` under `Alerting` in the left-side menu. You would also have to configure SMTP in the `[smtp]` section of your `grafana.ini`. More information about about Grafana Alerting contact points can found [here](https://grafana.com/docs/grafana/latest/alerting/fundamentals/contact-points/). + ## Run Azure DFP Inference: Run the inference pipeline with `filter_threshold=0.0`. This will disable the filtering of the inference results. diff --git a/examples/digital_fingerprinting/production/grafana/img/dfp_error_alert_setup.png b/examples/digital_fingerprinting/production/grafana/img/dfp_error_alert_setup.png new file mode 100644 index 0000000000..516249f41a --- /dev/null +++ b/examples/digital_fingerprinting/production/grafana/img/dfp_error_alert_setup.png @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:9ada0a466cfeb34363623cf78d478e17737a1d439b43bec989c7b026749a0fe2 +size 474978 From bd8b0187e91f342fe43591c4b66277978bff903a Mon Sep 17 00:00:00 2001 From: Eli Fajardo Date: Fri, 12 Jan 2024 16:50:49 -0500 Subject: [PATCH 06/15] add loki logging handler snippet to readme --- .../production/grafana/README.md | 26 ++++++++++++++++--- .../production/grafana/run.py | 1 - 2 files changed, 23 insertions(+), 4 deletions(-) diff --git a/examples/digital_fingerprinting/production/grafana/README.md b/examples/digital_fingerprinting/production/grafana/README.md index 88c9572ff3..e2d0e4caf8 100644 --- a/examples/digital_fingerprinting/production/grafana/README.md +++ b/examples/digital_fingerprinting/production/grafana/README.md @@ -47,6 +47,26 @@ The following is added to the default `grafana.ini` to enable local mode for CSV allow_local_mode = true ``` +## Add Loki logging handler to DFP pipeline + +The [pipeline run script](./run.py) for the Azure DFP example has been updated with the following to add the Loki logging handler which will publish the Morpheus logs to our Loki service: + +``` +loki_handler = logging_loki.LokiHandler( + url=f"{loki_url}/loki/api/v1/push", + tags={"app": "morpheus"}, + version="1", +) + +queue_listener = logging.handlers.QueueListener(morpheus_logging_queue, + console_handler, + file_handler, + loki_handler, + respect_handler_level=True) +``` + +More information about Loki Python logging can be found [here](https://pypi.org/project/python-logging-loki/). + ## Build the Morpheus container: From the root of the Morpheus repo: ```bash @@ -114,19 +134,19 @@ We demonstrate here with a simple example how we can use Grafana Alerting to not 3. Enter alert rule name: `DFP Error Alert Rule` 4. In `Define query and alert condition` section, select `Loki` data source. 5. Switch to `Code` view by clicking the `Code` button on the right. -6. Enter the folling Loki Query which counts the number of log lines in last minute that have an error label (`severity=error`): +6. Enter the following Loki Query which counts the number of log lines in last minute that have an error label (`severity=error`): ``` count_over_time({severity="error"}[1m]) ``` 7. Under `Expressions`, keep default configurations for `Reduce` and `Threshold`. The alert condition threshold will be error counts > 0. 7. In `Set evaluation behavior` section, click `+ New folder`, enter `morpheus` then click `Create` button. 8. Click `+ New evaluation group`, enter `dfp` for `Evaluation group name` and `1m` for `Evaluation interval`, then click `Create` button. -9. Enter `0s` for `Pending period`. This configures alert to be fired instantly when alert condition is met. +9. Enter `0s` for `Pending period`. This configures alerts to be fired instantly when alert condition is met. 10. Test your alert rule, by running the following in your `morpheus_pipeline` container. This will cause an error because `--input-file` glob will no longer match any of our training data files. ``` python run.py --log_level DEBUG --train_users generic --start_time "2022-08-01" --input_file="../../../data/dfp/azure-training-data/AZUREAD_2022*.json" ``` -11. Click the `Preview` button to test run the alert rule. You should now see the how our alert query picks up the error log, processes through our reduce/threshold expressions and satisfies our alert condition. This is indicated by the `Firing` label in the `Threshold` section. +11. Click the `Preview` button to test run the alert rule. You should now see how our alert query picks up the error log, processes it through our reduce/threshold expressions and satisfies our alert condition. This is indicated by the `Firing` label in the `Threshold` section. diff --git a/examples/digital_fingerprinting/production/grafana/run.py b/examples/digital_fingerprinting/production/grafana/run.py index f70044abd0..19830b94c3 100644 --- a/examples/digital_fingerprinting/production/grafana/run.py +++ b/examples/digital_fingerprinting/production/grafana/run.py @@ -104,7 +104,6 @@ def configure_logging(log_level: int, loki_url: str): loki_handler = logging_loki.LokiHandler( url=f"{loki_url}/loki/api/v1/push", - # url="http://loki:3100/loki/api/v1/push", tags={"app": "morpheus"}, version="1", ) From fe2e2b5d88e0a0616dbcd71e22b99b5a98d6fc76 Mon Sep 17 00:00:00 2001 From: Eli Fajardo Date: Tue, 16 Jan 2024 12:18:10 -0500 Subject: [PATCH 07/15] revert grafana version --- examples/digital_fingerprinting/production/docker-compose.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/digital_fingerprinting/production/docker-compose.yml b/examples/digital_fingerprinting/production/docker-compose.yml index fbb232ca29..7cf1c636b1 100644 --- a/examples/digital_fingerprinting/production/docker-compose.yml +++ b/examples/digital_fingerprinting/production/docker-compose.yml @@ -137,7 +137,7 @@ services: - sys_nice grafana: - image: grafana/grafana:10.2.3 + image: grafana/grafana:10.0.0 environment: GF_INSTALL_PLUGINS: "marcusolsson-csv-datasource" volumes: From 7625ac9a168c184eb7a4dce67773cb1d97242315 Mon Sep 17 00:00:00 2001 From: Eli Fajardo Date: Fri, 9 Feb 2024 16:54:07 -0500 Subject: [PATCH 08/15] update logger to accept additional handlers --- .../production/grafana/run.py | 71 ++----------------- morpheus/utils/logger.py | 9 ++- 2 files changed, 13 insertions(+), 67 deletions(-) diff --git a/examples/digital_fingerprinting/production/grafana/run.py b/examples/digital_fingerprinting/production/grafana/run.py index 19830b94c3..1f10cd4f67 100644 --- a/examples/digital_fingerprinting/production/grafana/run.py +++ b/examples/digital_fingerprinting/production/grafana/run.py @@ -16,18 +16,15 @@ import functools import logging import logging.handlers -import multiprocessing import os import typing from datetime import datetime from datetime import timedelta from datetime import timezone -import appdirs import click import logging_loki import mlflow -import mrc import pandas as pd from dfp.stages.dfp_file_batcher_stage import DFPFileBatcherStage from dfp.stages.dfp_file_to_df import DFPFileToDataFrameStage @@ -63,66 +60,7 @@ from morpheus.utils.column_info import RenameColumn from morpheus.utils.column_info import StringCatColumn from morpheus.utils.file_utils import date_extractor -from morpheus.utils.logger import TqdmLoggingHandler -from morpheus.utils.logger import set_log_level - - -def configure_logging(log_level: int, loki_url: str): - mrc.logging.init_logging("morpheus") - logging.captureWarnings(True) - - # Get the root Morpheus logger - morpheus_logger = logging.getLogger("morpheus") - - # Set the level here - set_log_level(log_level=log_level) - - # Dont propagate upstream - morpheus_logger.propagate = False - morpheus_logging_queue = multiprocessing.Queue() - - # This needs the be the only handler for morpheus logger - morpheus_queue_handler = logging.handlers.QueueHandler(morpheus_logging_queue) - - # At this point, any morpheus logger will propagate upstream to the morpheus root and then be handled by the queue - # handler - morpheus_logger.addHandler(morpheus_queue_handler) - - log_file = os.path.join(appdirs.user_log_dir(appauthor="NVIDIA", appname="morpheus"), "morpheus.log") - - # Ensure the log directory exists - os.makedirs(os.path.dirname(log_file), exist_ok=True) - - # Now we build all of the handlers for the queue listener - file_handler = logging.handlers.RotatingFileHandler(filename=log_file, backupCount=5, maxBytes=1000000) - file_handler.setLevel(logging.DEBUG) - file_handler.setFormatter( - logging.Formatter('%(asctime)s - [%(levelname)s]: %(message)s {%(name)s, %(threadName)s}')) - - # Tqdm stream handler (avoids messing with progress bars) - console_handler = TqdmLoggingHandler() - - loki_handler = logging_loki.LokiHandler( - url=f"{loki_url}/loki/api/v1/push", - tags={"app": "morpheus"}, - version="1", - ) - - # Build and run the queue listener to actually process queued messages - queue_listener = logging.handlers.QueueListener(morpheus_logging_queue, - console_handler, - file_handler, - loki_handler, - respect_handler_level=True) - queue_listener.start() - queue_listener._thread.name = "Logging Thread" - - # Register a function to kill the listener thread before shutting down. prevents error on intpreter close - def stop_queue_listener(): - queue_listener.stop() - - import atexit - atexit.register(stop_queue_listener) +from morpheus.utils.logger import configure_logging def _file_type_name_to_enum(file_type: str) -> FileTypes: @@ -278,7 +216,12 @@ def run_pipeline(train_users, end_time = start_time + duration # Enable the Morpheus logger - configure_logging(log_level=log_level, loki_url=loki_url) + loki_handler = logging_loki.LokiHandler( + url=f"{loki_url}/loki/api/v1/push", + tags={"app": "morpheus"}, + version="1", + ) + configure_logging(loki_handler, log_level=log_level) logging.getLogger("mlflow").setLevel(log_level) if (len(skip_users) > 0 and len(only_users) > 0): diff --git a/morpheus/utils/logger.py b/morpheus/utils/logger.py index 7ef44a8897..315bb03c97 100644 --- a/morpheus/utils/logger.py +++ b/morpheus/utils/logger.py @@ -95,13 +95,14 @@ def _configure_from_log_file(log_config_file: str): logging.config.fileConfig(log_config_file) -def _configure_from_log_level(log_level: int): +def _configure_from_log_level(*extra_handlers: logging.Handler, log_level: int): """ Default config with only option being the logging level. Outputs to both the console and a file. Sets up a logging producer/consumer that works well in multi-thread/process environments. Parameters ---------- + *extra_handlers: List of additional handlers which will handle entries placed on the queue log_level : int Log level and above to report """ @@ -143,6 +144,7 @@ def _configure_from_log_level(log_level: int): queue_listener = logging.handlers.QueueListener(morpheus_logging_queue, console_handler, file_handler, + *extra_handlers, respect_handler_level=True) queue_listener.start() queue_listener._thread.name = "Logging Thread" @@ -155,7 +157,7 @@ def stop_queue_listener(): atexit.register(stop_queue_listener) -def configure_logging(log_level: int, log_config_file: str = None): +def configure_logging(*extra_handlers: logging.Handler, log_level: int, log_config_file: str = None): """ Configures Morpheus logging in one of two ways. Either specifying a logging config file to load or a logging level which will use a default configuration. The default configuration outputs to both the console and a file. Sets up a @@ -163,6 +165,7 @@ def configure_logging(log_level: int, log_config_file: str = None): Parameters ---------- + *extra_handlers: List of additional handlers which will handle entries placed on the queue log_level: int Specifies the log level and above to output. Must be one of the available levels in the `logging` module. log_config_file: str, optional (default = None): @@ -180,7 +183,7 @@ def configure_logging(log_level: int, log_config_file: str = None): # Configure using log file _configure_from_log_file(log_config_file=log_config_file) else: - _configure_from_log_level(log_level=log_level) + _configure_from_log_level(*extra_handlers, log_level=log_level) def set_log_level(log_level: int): From d17b3a4c3ee024b0ae7fce97103bba6ac325eac6 Mon Sep 17 00:00:00 2001 From: Eli Fajardo Date: Fri, 9 Feb 2024 17:00:25 -0500 Subject: [PATCH 09/15] readme update --- .../digital_fingerprinting/production/grafana/README.md | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/examples/digital_fingerprinting/production/grafana/README.md b/examples/digital_fingerprinting/production/grafana/README.md index e2d0e4caf8..f79fe6e92b 100644 --- a/examples/digital_fingerprinting/production/grafana/README.md +++ b/examples/digital_fingerprinting/production/grafana/README.md @@ -58,11 +58,7 @@ loki_handler = logging_loki.LokiHandler( version="1", ) -queue_listener = logging.handlers.QueueListener(morpheus_logging_queue, - console_handler, - file_handler, - loki_handler, - respect_handler_level=True) +configure_logging(loki_handler, log_level=log_level) ``` More information about Loki Python logging can be found [here](https://pypi.org/project/python-logging-loki/). From 305c05dd2cf024ff862e7131e3b91ae366be5e78 Mon Sep 17 00:00:00 2001 From: Eli Fajardo Date: Wed, 14 Feb 2024 12:35:39 -0500 Subject: [PATCH 10/15] update deprecated_stage_warning to accept reason --- morpheus/stages/general/buffer_stage.py | 5 +++-- morpheus/stages/general/delay_stage.py | 5 +++-- morpheus/utils/logger.py | 13 +++++++------ 3 files changed, 13 insertions(+), 10 deletions(-) diff --git a/morpheus/stages/general/buffer_stage.py b/morpheus/stages/general/buffer_stage.py index 6e529adbb5..261b4cec82 100644 --- a/morpheus/stages/general/buffer_stage.py +++ b/morpheus/stages/general/buffer_stage.py @@ -66,7 +66,8 @@ def supports_cpp_node(self): return False def _build_single(self, builder: mrc.Builder, input_node: mrc.SegmentObject) -> mrc.SegmentObject: - # This stage is no longer needed and is just a pass thru stage - deprecated_stage_warning(logger, type(self), self.unique_name) + reason = "The stage is no longer required to manage backpressure and has been deprecated. It has no" \ + " effect acts as a pass through stage." + deprecated_stage_warning(logger, type(self), self.unique_name, reason=reason) return input_node diff --git a/morpheus/stages/general/delay_stage.py b/morpheus/stages/general/delay_stage.py index f83e13b450..49768c32a2 100644 --- a/morpheus/stages/general/delay_stage.py +++ b/morpheus/stages/general/delay_stage.py @@ -66,7 +66,8 @@ def supports_cpp_node(self): return False def _build_single(self, builder: mrc.Builder, input_node: mrc.SegmentObject) -> mrc.SegmentObject: - # This stage is no longer needed and is just a pass thru stage - deprecated_stage_warning(logger, type(self), self.unique_name) + reason = "The stage is no longer required to manage backpressure and has been deprecated. It has no" \ + " effect acts as a pass through stage." + deprecated_stage_warning(logger, type(self), self.unique_name, reason=reason) return input_node diff --git a/morpheus/utils/logger.py b/morpheus/utils/logger.py index 315bb03c97..bb1caf97ca 100644 --- a/morpheus/utils/logger.py +++ b/morpheus/utils/logger.py @@ -157,7 +157,7 @@ def stop_queue_listener(): atexit.register(stop_queue_listener) -def configure_logging(*extra_handlers: logging.Handler, log_level: int, log_config_file: str = None): +def configure_logging(*extra_handlers: logging.Handler, log_level: int = None, log_config_file: str = None): """ Configures Morpheus logging in one of two ways. Either specifying a logging config file to load or a logging level which will use a default configuration. The default configuration outputs to both the console and a file. Sets up a @@ -183,6 +183,7 @@ def configure_logging(*extra_handlers: logging.Handler, log_level: int, log_conf # Configure using log file _configure_from_log_file(log_config_file=log_config_file) else: + assert log_level is not None, "log_level must be specified" _configure_from_log_level(*extra_handlers, log_level=log_level) @@ -214,12 +215,12 @@ def set_log_level(log_level: int): return old_level -def deprecated_stage_warning(logger, cls, name): +def deprecated_stage_warning(logger, cls, name, reason: str = None): """Log a warning about a deprecated stage.""" - logger.warning(("The '%s' stage ('%s') is no longer required to manage backpressure and has been deprecated. " - "It has no effect and acts as a pass through stage."), - cls.__name__, - name) + message = f"The '{cls.__name__}' stage ('{name}') has been deprecated and will be removed in a future version." + if reason is not None: + message = " ".join((message, reason)) + logger.warning(message) def deprecated_message_warning(logger, cls, new_cls): From b2569e9a251d1397052755e3c1f3b4ec2e0b843f Mon Sep 17 00:00:00 2001 From: Eli Fajardo Date: Wed, 14 Feb 2024 12:37:03 -0500 Subject: [PATCH 11/15] add logger tests --- tests/test_logger.py | 155 ++++++++++++++++++++++++++++++++++ tests/tests_data/logging.json | 3 + 2 files changed, 158 insertions(+) create mode 100644 tests/test_logger.py create mode 100644 tests/tests_data/logging.json diff --git a/tests/test_logger.py b/tests/test_logger.py new file mode 100644 index 0000000000..bbd22d75d6 --- /dev/null +++ b/tests/test_logger.py @@ -0,0 +1,155 @@ +# SPDX-FileCopyrightText: Copyright (c) 2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import multiprocessing +import os +from unittest.mock import patch + +import pytest + +from morpheus.utils.logger import configure_logging, deprecated_message_warning, deprecated_stage_warning, TqdmLoggingHandler, set_log_level +from _utils import TEST_DIRS + + +@patch('logging.handlers.QueueListener') +@patch('logging.handlers.QueueHandler.emit') +def test_configure_logging_from_level_default_handlers(queue_handler, queue_listener): + configure_logging(log_level=logging.DEBUG) + morpheus_logger = logging.getLogger("morpheus") + assert morpheus_logger.level == logging.DEBUG + assert morpheus_logger.propagate is False + assert isinstance(morpheus_logger.handlers[1].queue, multiprocessing.queues.Queue) + pos_args = queue_listener.call_args[0] + assert len(pos_args) == 3 + assert isinstance(pos_args[0], multiprocessing.queues.Queue) + assert isinstance(pos_args[1], TqdmLoggingHandler) + assert isinstance(pos_args[2], logging.handlers.RotatingFileHandler) + assert pos_args[2].baseFilename.endswith("morpheus.log") + morpheus_logger.debug("test") + queue_handler.assert_called() + + +def test_configure_logging__no_args(): + with pytest.raises(Exception) as excinfo: + configure_logging() + assert "log_level must be specified" in str(excinfo.value) + + +@patch('logging.handlers.RotatingFileHandler.emit') +@patch('morpheus.utils.logger.TqdmLoggingHandler.emit') +def test_configure_logging_from_file(console_handler, file_handler): + log_config_file = os.path.join(TEST_DIRS.tests_data_dir, "logging.json") + configure_logging(log_config_file=log_config_file) + morpheus_logger = logging.getLogger("morpheus") + assert morpheus_logger.level == logging.DEBUG + assert morpheus_logger.propagate is False + morpheus_logger.debug("test") + console_handler.assert_called_once() + file_handler.assert_called_once() + + +def test_configure_logging_from_file_filenotfound(): + with pytest.raises(FileNotFoundError): + configure_logging(log_config_file="does_not_exist.json") + + +@patch('logging.handlers.QueueListener') +@patch('logging.handlers.QueueHandler.emit') +def test_configure_logging_add_one_handler(queue_handler, queue_listener): + new_handler = logging.StreamHandler() + configure_logging(new_handler, log_level=logging.DEBUG) + morpheus_logger = logging.getLogger("morpheus") + assert morpheus_logger.level == logging.DEBUG + assert morpheus_logger.propagate is False + pos_args = queue_listener.call_args[0] + assert len(pos_args) == 4 + assert isinstance(pos_args[0], multiprocessing.queues.Queue) + assert isinstance(pos_args[1], TqdmLoggingHandler) + assert isinstance(pos_args[2], logging.handlers.RotatingFileHandler) + assert isinstance(pos_args[3], logging.StreamHandler) + morpheus_logger.debug("test") + queue_handler.assert_called() + + +@patch('logging.handlers.QueueListener') +@patch('logging.handlers.QueueHandler.emit') +def test_configure_logging_add_two_handlers(queue_handler, queue_listener): + new_handler_1 = logging.StreamHandler() + new_handler_2 = logging.StreamHandler() + configure_logging(new_handler_1, new_handler_2, log_level=logging.DEBUG) + morpheus_logger = logging.getLogger("morpheus") + assert morpheus_logger.level == logging.DEBUG + assert morpheus_logger.propagate is False + pos_args = queue_listener.call_args[0] + assert len(pos_args) == 5 + assert isinstance(pos_args[0], multiprocessing.queues.Queue) + assert isinstance(pos_args[1], TqdmLoggingHandler) + assert isinstance(pos_args[2], logging.handlers.RotatingFileHandler) + assert isinstance(pos_args[3], logging.StreamHandler) + assert isinstance(pos_args[4], logging.StreamHandler) + morpheus_logger.debug("test") + queue_handler.assert_called() + + +def test_set_log_level(): + configure_logging(log_level=logging.INFO) + morpheus_logger = logging.getLogger("morpheus") + assert morpheus_logger.level == logging.INFO + set_log_level(logging.DEBUG) + assert morpheus_logger.level == logging.DEBUG + + +def test_deprecated_stage_warning(caplog): + + class DummyStage(): + pass + + logger = logging.getLogger() + caplog.set_level(logging.WARNING) + deprecated_stage_warning(logger, DummyStage, "dummy_stage") + assert len(caplog.records) == 1 + assert caplog.records[0].levelname == "WARNING" + assert "The 'DummyStage' stage ('dummy_stage') has been deprecated" in caplog.text + + +def test_deprecated_stage_warning_with_reason(caplog): + + class DummyStage(): + pass + + logger = logging.getLogger() + caplog.set_level(logging.WARNING) + deprecated_stage_warning(logger, DummyStage, "dummy_stage", reason="This is the reason.") + assert len(caplog.records) == 1 + assert caplog.records[0].levelname == "WARNING" + assert "The 'DummyStage' stage ('dummy_stage') has been deprecated" in caplog.text + assert "This is the reason." in caplog.text + + +def test_deprecated_message_warning(caplog): + + class OldMessage(): + pass + + class NewMessage(): + pass + + logger = logging.getLogger() + caplog.set_level(logging.WARNING) + deprecated_message_warning(logger, OldMessage, NewMessage) + assert len(caplog.records) == 1 + assert caplog.records[0].levelname == "WARNING" + assert "The 'OldMessage' message has been deprecated and will be removed in a future version. Please use 'NewMessage' instead." in caplog.text diff --git a/tests/tests_data/logging.json b/tests/tests_data/logging.json new file mode 100644 index 0000000000..ef91489fcd --- /dev/null +++ b/tests/tests_data/logging.json @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:e79afe674ded154db230fa205acd572644b6317d24f314b83184f105cd672fe3 +size 832 From 3b509ddf09e04e884a4d4aeaa8f71689c79fb032 Mon Sep 17 00:00:00 2001 From: Eli Fajardo Date: Wed, 14 Feb 2024 12:52:30 -0500 Subject: [PATCH 12/15] assert update and style fixes --- tests/test_logger.py | 13 +++++++++---- tests/tests_data/logging.json | 4 ++-- 2 files changed, 11 insertions(+), 6 deletions(-) diff --git a/tests/test_logger.py b/tests/test_logger.py index bbd22d75d6..ed469f31ea 100644 --- a/tests/test_logger.py +++ b/tests/test_logger.py @@ -20,8 +20,12 @@ import pytest -from morpheus.utils.logger import configure_logging, deprecated_message_warning, deprecated_stage_warning, TqdmLoggingHandler, set_log_level from _utils import TEST_DIRS +from morpheus.utils.logger import TqdmLoggingHandler +from morpheus.utils.logger import configure_logging +from morpheus.utils.logger import deprecated_message_warning +from morpheus.utils.logger import deprecated_stage_warning +from morpheus.utils.logger import set_log_level @patch('logging.handlers.QueueListener') @@ -135,8 +139,8 @@ class DummyStage(): deprecated_stage_warning(logger, DummyStage, "dummy_stage", reason="This is the reason.") assert len(caplog.records) == 1 assert caplog.records[0].levelname == "WARNING" - assert "The 'DummyStage' stage ('dummy_stage') has been deprecated" in caplog.text - assert "This is the reason." in caplog.text + assert "The 'DummyStage' stage ('dummy_stage') has been deprecated and will be removed in a future version. " \ + "This is the reason." in caplog.text def test_deprecated_message_warning(caplog): @@ -152,4 +156,5 @@ class NewMessage(): deprecated_message_warning(logger, OldMessage, NewMessage) assert len(caplog.records) == 1 assert caplog.records[0].levelname == "WARNING" - assert "The 'OldMessage' message has been deprecated and will be removed in a future version. Please use 'NewMessage' instead." in caplog.text + assert "The 'OldMessage' message has been deprecated and will be removed in a future version. " \ + "Please use 'NewMessage' instead." in caplog.text diff --git a/tests/tests_data/logging.json b/tests/tests_data/logging.json index ef91489fcd..bb681247b7 100644 --- a/tests/tests_data/logging.json +++ b/tests/tests_data/logging.json @@ -1,3 +1,3 @@ version https://git-lfs.github.com/spec/v1 -oid sha256:e79afe674ded154db230fa205acd572644b6317d24f314b83184f105cd672fe3 -size 832 +oid sha256:4c9bffb074de716dc573421a31fa8ab75a3476676db1522fb839c2e75807faba +size 752 From edde7a9102c1a7f8e4a75b232f6835d5beccf0d3 Mon Sep 17 00:00:00 2001 From: Eli Fajardo Date: Wed, 14 Feb 2024 17:56:14 -0500 Subject: [PATCH 13/15] add logger info to docs --- .../developer_guide/guides/1_simple_python_stage.md | 13 +++++++++++++ morpheus/utils/logger.py | 2 +- 2 files changed, 14 insertions(+), 1 deletion(-) diff --git a/docs/source/developer_guide/guides/1_simple_python_stage.md b/docs/source/developer_guide/guides/1_simple_python_stage.md index 95a71012e9..e21aa6de48 100644 --- a/docs/source/developer_guide/guides/1_simple_python_stage.md +++ b/docs/source/developer_guide/guides/1_simple_python_stage.md @@ -230,6 +230,19 @@ Before constructing the pipeline, we need to do a bit of environment configurati ```python configure_logging(log_level=logging.DEBUG) ``` +We use the default configuration with the `DEBUG` logging level. The logger will output to both the console and a file. The logging handlers are non-blocking since they utilize a queue to send the log messages on a separate thread. + +We can also use `configure_logging` to add one or more logging handlers to the default configuration. The added handlers will also be non-blocking. The following is from the +[Grafana example](../../../../examples/digital_fingerprinting/production/grafana/README.md) where we add a [Loki](https://grafana.com/oss/loki/) logging handler to also publish Morpheus logs to a Loki log aggregation server. +```python +loki_handler = logging_loki.LokiHandler( + url=f"{loki_url}/loki/api/v1/push", + tags={"app": "morpheus"}, + version="1", +) + +configure_logging(loki_handler, log_level=log_level) +``` Next, we will build a Morpheus `Config` object. We will cover setting some common configuration parameters in the next guide. For now, it is important to know that we will always need to build a `Config` object: ```python diff --git a/morpheus/utils/logger.py b/morpheus/utils/logger.py index bb1caf97ca..693e6574b7 100644 --- a/morpheus/utils/logger.py +++ b/morpheus/utils/logger.py @@ -165,7 +165,7 @@ def configure_logging(*extra_handlers: logging.Handler, log_level: int = None, l Parameters ---------- - *extra_handlers: List of additional handlers which will handle entries placed on the queue + *extra_handlers: List of handlers to add to existing default console and file handlers. log_level: int Specifies the log level and above to output. Must be one of the available levels in the `logging` module. log_config_file: str, optional (default = None): From f10df9770d9923626a6ba8bee2467b73a1230d12 Mon Sep 17 00:00:00 2001 From: Eli Fajardo Date: Thu, 15 Feb 2024 12:05:12 -0500 Subject: [PATCH 14/15] remove invalid link from updated doc --- docs/source/developer_guide/guides/1_simple_python_stage.md | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/docs/source/developer_guide/guides/1_simple_python_stage.md b/docs/source/developer_guide/guides/1_simple_python_stage.md index e21aa6de48..adb813a8b2 100644 --- a/docs/source/developer_guide/guides/1_simple_python_stage.md +++ b/docs/source/developer_guide/guides/1_simple_python_stage.md @@ -232,8 +232,7 @@ configure_logging(log_level=logging.DEBUG) ``` We use the default configuration with the `DEBUG` logging level. The logger will output to both the console and a file. The logging handlers are non-blocking since they utilize a queue to send the log messages on a separate thread. -We can also use `configure_logging` to add one or more logging handlers to the default configuration. The added handlers will also be non-blocking. The following is from the -[Grafana example](../../../../examples/digital_fingerprinting/production/grafana/README.md) where we add a [Loki](https://grafana.com/oss/loki/) logging handler to also publish Morpheus logs to a Loki log aggregation server. +We can also use `configure_logging` to add one or more logging handlers to the default configuration. The added handlers will also be non-blocking. The following is from the Grafana example (`examples/digital_fingerprinting/production/grafana/README.md`) where we add a [Loki](https://grafana.com/oss/loki/) logging handler to also publish Morpheus logs to a Loki log aggregation server. ```python loki_handler = logging_loki.LokiHandler( url=f"{loki_url}/loki/api/v1/push", From 903901cfb72fff8494d07eccfcaadcf69c5b0c30 Mon Sep 17 00:00:00 2001 From: Eli Fajardo Date: Thu, 15 Feb 2024 14:18:01 -0500 Subject: [PATCH 15/15] fix test --- tests/test_logger.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/test_logger.py b/tests/test_logger.py index ed469f31ea..dddf05d345 100644 --- a/tests/test_logger.py +++ b/tests/test_logger.py @@ -35,7 +35,6 @@ def test_configure_logging_from_level_default_handlers(queue_handler, queue_list morpheus_logger = logging.getLogger("morpheus") assert morpheus_logger.level == logging.DEBUG assert morpheus_logger.propagate is False - assert isinstance(morpheus_logger.handlers[1].queue, multiprocessing.queues.Queue) pos_args = queue_listener.call_args[0] assert len(pos_args) == 3 assert isinstance(pos_args[0], multiprocessing.queues.Queue)