diff --git a/doc/requirements.txt b/doc/requirements.txt index 2628f3ac3..23c08efbf 100644 --- a/doc/requirements.txt +++ b/doc/requirements.txt @@ -1,6 +1,5 @@ sphinx sphinx_rtd_theme -sphinxcontrib-mermaid sphinxcontrib.datatemplates sphinx-copybutton nbsphinx diff --git a/doc/source/conf.py b/doc/source/conf.py index 798abf806..004902e94 100644 --- a/doc/source/conf.py +++ b/doc/source/conf.py @@ -56,7 +56,6 @@ def setup(app): # ones. extensions = [ "sphinx.ext.napoleon", - "sphinxcontrib.mermaid", "sphinx.ext.autosummary", "sphinxcontrib.datatemplates", "nbsphinx", diff --git a/doc/source/development/index.rst b/doc/source/development/index.rst index b532a3640..f54b5dfa2 100644 --- a/doc/source/development/index.rst +++ b/doc/source/development/index.rst @@ -2,122 +2,6 @@ Development =========== -.. mermaid:: - - classDiagram - Component <-- Processor - Component <-- Connector - Connector <-- Input : implements - Connector <-- Output : implements - Processor <-- Normalizer : implements - Processor <-- Pseudonymizer : implements - Input <-- ConfluentKafkaInput : implements - Output <-- ConfluentKafkaOutput : implements - ProcessorConfiguration - Rule <-- NormalizerRule : inherit - Rule <-- PseudonymizerRule : inherit - BaseProcessorTestCase <-- NormalizerTestCase : implements - BaseProcessorTestCase <-- PseudonymizerTestCase : implements - class Component{ - +Config - +str name - +Logger _logger - +Config _config - +String describe() - +None setup() - +None shut_down() - - } - class Processor{ - <> - +rule_class - +Config - +load_rules() - +process() - +apply_rules()* - } - class Normalizer{ - +Config - +rule_class = NormalizerRule - +_config: Normalizer.Config - +apply_rules() - } - - class Pseudonymizer{ - +Config - +rule_class = PseudonymizerRule - +_config: Pseudonymizer.Config - +apply_rules() - } - class Connector{ - <> - +Config - } - class Input{ - <> - +Config - +_config: Input.Config - -Dict _get_event()* - -None _get_raw_event() - +tuple[dict, error|None] get_next() - } - class Output{ - <> - +Config - +_config: Output.Config - +None store()* - +None store_custom()* - +None store_failed()* - } - class ConfluentKafkaInput{ - +Config - +_config: ConfluentKafkaInput.Config - +tuple _get_event() - +bytearray _get_raw_event() - } - class ConfluentKafkaOutput{ - +Config - +_config: ConfluentKafkaInput.Config - +None store() - +None store_custom() - +None store_failed() - } - - class Configuration{ - <> - +create - } - class Registry{ - +mapping : dict - } - - class Factory{ - +create() - } - - - class TestFactory{ - +test_check() - +test_create_normalizer() - +test_create_pseudonymizer() - } - - class BaseProcessorTestCase{ - +test_describe() - +test_load_rules() - +test_process() - +test_apply_rules()* - } - - class NormalizerTestCase{ - +test_apply_rules() - } - - class PseudonymizerTestCase{ - +test_apply_rules() - } - - .. toctree:: :maxdepth: 2 diff --git a/doc/source/user_manual/introduction.rst b/doc/source/user_manual/introduction.rst index 7851201da..3d7a5e446 100644 --- a/doc/source/user_manual/introduction.rst +++ b/doc/source/user_manual/introduction.rst @@ -36,27 +36,6 @@ Multiple instances of pipelines are created and run in parallel by different pro Only one event at a time is processed by each processor. Therefore, results of a processor should not depend on other events. -.. mermaid:: - - flowchart LR - A[Input\nConnector] --> B - A[Input\nConnector] --> C - A[Input\nConnector] --> D - subgraph Pipeline 1 - B[Normalizer] --> E[Geo-IP Enricher] - E --> F[Dropper] - end - subgraph Pipeline 2 - C[Normalizer] --> G[Geo-IP Enricher] - G --> H[Dropper] - end - subgraph Pipeline n - D[Normalizer] --> I[Geo-IP Enricher] - I --> J[Dropper] - end - F --> K[Output\nConnector] - H --> K[Output\nConnector] - J --> K[Output\nConnector] Processors ========== diff --git a/logprep/abc/processor.py b/logprep/abc/processor.py index 2503afde7..be780900c 100644 --- a/logprep/abc/processor.py +++ b/logprep/abc/processor.py @@ -1,6 +1,8 @@ """Abstract module for processors""" import copy +import time from abc import abstractmethod +from functools import reduce from logging import DEBUG, Logger from multiprocessing import current_process from pathlib import Path @@ -16,7 +18,6 @@ ProcessingCriticalError, ProcessingWarning, ) -from logprep.processor.processor_strategy import SpecificGenericProcessStrategy from logprep.util import getter from logprep.util.helper import ( add_and_overwrite, @@ -122,7 +123,6 @@ def update_mean_processing_time_per_event(self, new_sample): def __init__(self, name: str, configuration: "Processor.Config", logger: Logger): super().__init__(name, configuration, logger) - self._strategy = SpecificGenericProcessStrategy(self._config.apply_multiple_times) self.metric_labels, specific_tree_labels, generic_tree_labels = self._create_metric_labels() self._specific_tree = RuleTree( config_path=self._config.tree_config, metric_labels=specific_tree_labels @@ -192,15 +192,31 @@ def process(self, event: dict): A dictionary representing a log event. """ - if self._logger.isEnabledFor(DEBUG): # pragma: no cover - self._logger.debug(f"{self.describe()} processing event {event}") - self._strategy.process( - event, - generic_tree=self._generic_tree, - specific_tree=self._specific_tree, - callback=self._apply_rules_wrapper, - processor_metrics=self.metrics, - ) + self._logger.debug(f"{self.describe()} processing event {event}") + self.metrics.number_of_processed_events += 1 + self._process_rule_tree(event, self._specific_tree) + self._process_rule_tree(event, self._generic_tree) + + def _process_rule_tree(self, event: dict, tree: "RuleTree"): + applied_rules = set() + + def _process_rule(event, rule): + begin = time.time() + self._apply_rules_wrapper(event, rule) + processing_time = time.time() - begin + rule.metrics._number_of_matches += 1 + rule.metrics.update_mean_processing_time(processing_time) + self.metrics.update_mean_processing_time_per_event(processing_time) + applied_rules.add(rule) + return event + + if self._config.apply_multiple_times: + matching_rules = tree.get_matching_rules(event) + while matching_rules: + reduce(_process_rule, (event, *matching_rules)) + matching_rules = set(tree.get_matching_rules(event)).difference(applied_rules) + else: + reduce(_process_rule, (event, *tree.get_matching_rules(event))) def _apply_rules_wrapper(self, event, rule): try: diff --git a/logprep/processor/processor_strategy.py b/logprep/processor/processor_strategy.py deleted file mode 100644 index 2fbfa0af0..000000000 --- a/logprep/processor/processor_strategy.py +++ /dev/null @@ -1,91 +0,0 @@ -""" -processor strategies module - -processor strategies are used to implement in one point how rules are processed in processors -this could be the order of specific or generic rules -""" -from abc import ABC, abstractmethod -from functools import reduce -from time import time -from typing import TYPE_CHECKING, Callable - -if TYPE_CHECKING: # pragma: no cover - from logprep.abc.processor import Processor - from logprep.framework.rule_tree.rule_tree import RuleTree - - -class ProcessStrategy(ABC): - """ - abstract class for strategies - """ - - @abstractmethod - def process(self, event: dict, **kwargs): - """abstract method for processing rules""" - ... # pragma: no cover - - -class SpecificGenericProcessStrategy(ProcessStrategy): - """ - Strategy to process rules in rule trees in the following order: - specific_rules >> generic_rules - """ - - def __init__(self, apply_multiple_times=False): - self._apply_multiple_times = apply_multiple_times - - def process(self, event: dict, **kwargs): - specific_tree = kwargs.get("specific_tree") - generic_tree = kwargs.get("generic_tree") - callback = kwargs.get("callback") - processor_metrics = kwargs.get("processor_metrics") - processor_metrics.number_of_processed_events += 1 - self._process_specific(event, specific_tree, callback, processor_metrics) - self._process_generic(event, generic_tree, callback, processor_metrics) - - def _process_specific( - self, - event: dict, - specific_tree: "RuleTree", - callback: Callable, - processor_metrics: "Processor.ProcessorMetrics", - ): - """method for processing specific rules""" - self._process_rule_tree(event, specific_tree, callback, processor_metrics) - - def _process_generic( - self, - event: dict, - generic_tree: "RuleTree", - callback: Callable, - processor_metrics: "Processor.ProcessorMetrics", - ): - """method for processing generic rules""" - self._process_rule_tree(event, generic_tree, callback, processor_metrics) - - def _process_rule_tree( - self, - event: dict, - tree: "RuleTree", - callback: Callable, - processor_metrics: "Processor.ProcessorMetrics", - ): - applied_rules = set() - - def _process_rule(event, rule): - begin = time() - callback(event, rule) - processing_time = time() - begin - rule.metrics._number_of_matches += 1 - rule.metrics.update_mean_processing_time(processing_time) - processor_metrics.update_mean_processing_time_per_event(processing_time) - applied_rules.add(rule) - return event - - if self._apply_multiple_times: - matching_rules = tree.get_matching_rules(event) - while matching_rules: - reduce(_process_rule, (event, *matching_rules)) - matching_rules = set(tree.get_matching_rules(event)).difference(applied_rules) - else: - reduce(_process_rule, (event, *tree.get_matching_rules(event))) diff --git a/logprep/run_logprep.py b/logprep/run_logprep.py index debd22bad..7ffdad94a 100644 --- a/logprep/run_logprep.py +++ b/logprep/run_logprep.py @@ -6,7 +6,6 @@ import sys import warnings from argparse import ArgumentParser -from logging import ERROR, Logger, getLogger from os.path import basename from pathlib import Path @@ -24,21 +23,13 @@ from logprep.util.schema_and_rule_checker import SchemaAndRuleChecker from logprep.util.time_measurement import TimeMeasurement -from logging import ( - getLogger, - basicConfig, - Logger, -) -from logging.handlers import SysLogHandler - - warnings.simplefilter("always", DeprecationWarning) logging.captureWarnings(True) DEFAULT_LOCATION_CONFIG = "file:///etc/logprep/pipeline.yml" -getLogger("filelock").setLevel(ERROR) -getLogger("urllib3.connectionpool").setLevel(ERROR) -getLogger("elasticsearch").setLevel(ERROR) +logging.getLogger("filelock").setLevel(logging.ERROR) +logging.getLogger("urllib3.connectionpool").setLevel(logging.ERROR) +logging.getLogger("elasticsearch").setLevel(logging.ERROR) def _parse_arguments(): @@ -98,7 +89,7 @@ def _parse_arguments(): return arguments -def _run_logprep(arguments, logger: Logger): +def _run_logprep(arguments, logger: logging.Logger): runner = None try: runner = Runner.get_runner() @@ -148,7 +139,7 @@ def _setup_logger(args, config: Configuration): try: log_config = config.get("logger", {}) log_level = log_config.get("level", "INFO") - basicConfig( + logging.basicConfig( level=log_level, format="%(asctime)-15s %(name)-5s %(levelname)-8s: %(message)s" ) logger = logging.getLogger("Logprep") @@ -156,7 +147,7 @@ def _setup_logger(args, config: Configuration): for version in get_versions_string(args).split("\n"): logger.info(version) except BaseException as error: # pylint: disable=broad-except - getLogger("Logprep").exception(error) + logging.getLogger("Logprep").exception(error) sys.exit(1) return logger @@ -187,7 +178,7 @@ def _setup_metrics_and_time_measurement(args, config, logger): logger.debug(f"Config path: {args.config}") -def _validate_rules(args, config: Configuration, logger: Logger): +def _validate_rules(args, config: Configuration, logger: logging.Logger): try: config.verify_pipeline_only(logger) except InvalidConfigurationError as error: diff --git a/tests/unit/processor/base.py b/tests/unit/processor/base.py index 8a33750a0..0a0f4c6ad 100644 --- a/tests/unit/processor/base.py +++ b/tests/unit/processor/base.py @@ -16,7 +16,6 @@ from logprep.factory import Factory from logprep.framework.rule_tree.rule_tree import RuleTree from logprep.processor.base.exceptions import ProcessingWarning -from logprep.processor.processor_strategy import ProcessStrategy from logprep.util.helper import camel_to_snake from logprep.util.json_handling import list_json_files_in_directory from logprep.util.time_measurement import TimeMeasurement @@ -228,20 +227,6 @@ def test_rules_returns_all_specific_and_generic_rules(self): object_rules_count = len(self.object.rules) assert all_rules_count == object_rules_count - def test_process_strategy_returns_strategy_object(self): - assert isinstance(self.object._strategy, ProcessStrategy) - - def test_process_calls_strategy(self): - """ - This test method needs to be overwritten in your ProcessorTests - if your processor uses another strategy - """ - with mock.patch( - "logprep.processor.processor_strategy.SpecificGenericProcessStrategy.process" - ) as mock_strategy_process: - self.object.process({}) - mock_strategy_process.assert_called() - def test_process_is_measured(self): TimeMeasurement.TIME_MEASUREMENT_ENABLED = True TimeMeasurement.APPEND_TO_EVENT = True @@ -261,11 +246,9 @@ def test_process_measurements_appended_under_processor_config_name(self): assert isinstance(processing_times[config_name], float) @mock.patch("logging.Logger.debug") - @mock.patch("logging.Logger.isEnabledFor", return_value=True) - def test_process_writes_debug_messages(self, mock_is_enabled, mock_debug): + def test_process_writes_debug_messages(self, mock_debug): event = {} self.object.process(event) - mock_is_enabled.assert_called() mock_debug.assert_called() def test_config_attribute_is_config_object(self): diff --git a/tests/unit/processor/test_processor_strategy.py b/tests/unit/processor/test_process.py similarity index 65% rename from tests/unit/processor/test_processor_strategy.py rename to tests/unit/processor/test_process.py index c63622080..6739473a3 100644 --- a/tests/unit/processor/test_processor_strategy.py +++ b/tests/unit/processor/test_process.py @@ -7,48 +7,48 @@ import pytest -from logprep.abc.processor import Processor from logprep.factory import Factory from logprep.framework.pipeline import Pipeline from logprep.processor.dissector.rule import DissectorRule from logprep.processor.generic_adder.rule import GenericAdderRule -from logprep.processor.processor_strategy import SpecificGenericProcessStrategy class TestSpecificGenericProcessStrategy: - @mock.patch( - "logprep.processor.processor_strategy.SpecificGenericProcessStrategy._process_generic" - ) - @mock.patch( - "logprep.processor.processor_strategy.SpecificGenericProcessStrategy._process_specific" - ) - def test_process(self, mock_process_specific, mock_process_generic): - mock_metrics = Processor.ProcessorMetrics( - labels={}, specific_rule_tree=[], generic_rule_tree=[] + @mock.patch("logprep.abc.processor.Processor._process_rule_tree") + def test_process(self, mock_process_rule_tree): + processor = Factory.create( + { + "dummy": { + "type": "calculator", + "generic_rules": [], + "specific_rules": [], + } + }, + mock.MagicMock(), ) - strategy = SpecificGenericProcessStrategy() - strategy.process({}, processor_stats=mock.Mock(), processor_metrics=mock_metrics) - mock_process_generic.assert_called() - mock_process_specific.assert_called() + processor.process({}) + mock_process_rule_tree.assert_called() + assert mock_process_rule_tree.call_count == 2 - @mock.patch( - "logprep.processor.processor_strategy.SpecificGenericProcessStrategy._process_generic" - ) - @mock.patch( - "logprep.processor.processor_strategy.SpecificGenericProcessStrategy._process_specific" - ) - def test_process_specific_before_generic(self, mock_process_specific, mock_process_generic): - call_order = [] - mock_process_specific.side_effect = lambda *a, **kw: call_order.append( - mock_process_specific + @mock.patch("logprep.abc.processor.Processor._process_rule_tree") + def test_process_specific_before_generic(self, mock_process_rule_tree): + processor = Factory.create( + { + "dummy": { + "type": "calculator", + "generic_rules": [], + "specific_rules": [], + } + }, + mock.MagicMock(), ) - mock_process_generic.side_effect = lambda *a, **kw: call_order.append(mock_process_generic) - mock_metrics = Processor.ProcessorMetrics( - labels={}, specific_rule_tree=[], generic_rule_tree=[] - ) - strategy = SpecificGenericProcessStrategy() - strategy.process({}, processor_stats=mock.Mock(), processor_metrics=mock_metrics) - assert call_order == [mock_process_specific, mock_process_generic] + processor.process({}) + assert mock_process_rule_tree.call_count == 2 + mock_calls = [ + call({}, processor._specific_tree), + call({}, processor._generic_tree), + ] + mock_process_rule_tree.assert_has_calls(mock_calls, any_order=False) def test_apply_processor_multiple_times_until_no_new_rule_matches(self): config = { @@ -79,14 +79,7 @@ def test_apply_processor_multiple_times_until_no_new_rule_matches(self): "time": "time", "url": "url", } - processor._strategy.process( - event, - generic_tree=processor._generic_tree, - specific_tree=processor._specific_tree, - callback=processor._apply_rules_wrapper, - processor_stats=mock.Mock(), - processor_metrics=mock.MagicMock(), - ) + processor.process(event) assert expected_event == event def test_apply_processor_multiple_times_not_enabled(self): @@ -111,14 +104,7 @@ def test_apply_processor_multiple_times_not_enabled(self): "time": "time", "url": "url", } - processor._strategy.process( - event, - generic_tree=processor._generic_tree, - specific_tree=processor._specific_tree, - callback=processor._apply_rules_wrapper, - processor_stats=mock.Mock(), - processor_metrics=mock.MagicMock(), - ) + processor.process(event) assert expected_event == event @pytest.mark.parametrize("execution_number", range(5)) # repeat test to ensure determinism @@ -132,19 +118,10 @@ def test_strategy_applies_rules_in_deterministic_order(self, execution_number): processor._specific_tree.add_rule(rule_one) processor._specific_tree.add_rule(rule_two) event = {"val": "content"} - mock_callback = mock.MagicMock() - processor._strategy.process( - event=event, - generic_tree=processor._generic_tree, - specific_tree=processor._specific_tree, - callback=mock_callback, - processor_stats=mock.Mock(), - processor_metrics=mock.MagicMock(), - ) - expected_call_order = [call(event, rule_one), call(event, rule_two)] - assert ( - mock_callback.mock_calls == expected_call_order - ), f"Wrong call order in test {execution_number}" + with mock.patch("logprep.abc.processor.Processor._apply_rules_wrapper") as mock_callback: + expected_call_order = [call(event, rule_one), call(event, rule_two)] + processor.process(event=event) + mock_callback.assert_has_calls(expected_call_order, any_order=False) def test_strategy_processes_generic_rules_after_processor_error_in_specific_rules(self, capsys): config = { diff --git a/tests/unit/test_run_logprep.py b/tests/unit/test_run_logprep.py index f53ed6c60..20b59d9a7 100644 --- a/tests/unit/test_run_logprep.py +++ b/tests/unit/test_run_logprep.py @@ -5,8 +5,8 @@ from unittest import mock import pytest -import responses import requests +import responses from yaml import safe_load from logprep import run_logprep @@ -155,7 +155,8 @@ def test_version_arg_prints_with_http_config(self, capsys): expected_lines = ( f"python version: {sys.version.split()[0]}\n" f"logprep version: {get_versions()['version']}\n" - f"configuration version: {configuration['version']}, http://localhost:32000/{config_path}" + f"configuration version: {configuration['version']}," + f" http://localhost:32000/{config_path}" ) assert lines == expected_lines @@ -184,7 +185,8 @@ def test_version_arg_prints_with_http_config_without_exposing_secret_data(self, expected_lines = ( f"python version: {sys.version.split()[0]}\n" f"logprep version: {get_versions()['version']}\n" - f"configuration version: {configuration['version']}, http://localhost:32000/{config_path}" + f"configuration version: {configuration['version']}," + f" http://localhost:32000/{config_path}" ) assert lines == expected_lines @@ -233,7 +235,7 @@ def test_main_calls_runner_stop_on_any_exception(self, mock_stop, mock_start): mock_stop.assert_called() def test_logprep_exits_if_logger_can_not_be_created(self): - with mock.patch("logging.getLogger") as mock_create: + with mock.patch("logprep.run_logprep.Configuration.get") as mock_create: mock_create.side_effect = BaseException config_path = "quickstart/exampledata/config/pipeline.yml" with mock.patch("sys.argv", ["logprep", config_path]):