Skip to content

Commit

Permalink
remove process_strategy and mermaid (#451)
Browse files Browse the repository at this point in the history
* removed ProcessStrategy as it is implementing additional complexity without multiple Strategies
* remove mermaid from documentation because it does noct work in airgapped environments
  • Loading branch information
ekneg54 authored Oct 11, 2023
1 parent 02448cd commit d825a1c
Show file tree
Hide file tree
Showing 10 changed files with 79 additions and 340 deletions.
1 change: 0 additions & 1 deletion doc/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
sphinx
sphinx_rtd_theme
sphinxcontrib-mermaid
sphinxcontrib.datatemplates
sphinx-copybutton
nbsphinx
Expand Down
1 change: 0 additions & 1 deletion doc/source/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ def setup(app):
# ones.
extensions = [
"sphinx.ext.napoleon",
"sphinxcontrib.mermaid",
"sphinx.ext.autosummary",
"sphinxcontrib.datatemplates",
"nbsphinx",
Expand Down
116 changes: 0 additions & 116 deletions doc/source/development/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -2,122 +2,6 @@
Development
===========

.. mermaid::

classDiagram
Component <-- Processor
Component <-- Connector
Connector <-- Input : implements
Connector <-- Output : implements
Processor <-- Normalizer : implements
Processor <-- Pseudonymizer : implements
Input <-- ConfluentKafkaInput : implements
Output <-- ConfluentKafkaOutput : implements
ProcessorConfiguration
Rule <-- NormalizerRule : inherit
Rule <-- PseudonymizerRule : inherit
BaseProcessorTestCase <-- NormalizerTestCase : implements
BaseProcessorTestCase <-- PseudonymizerTestCase : implements
class Component{
+Config
+str name
+Logger _logger
+Config _config
+String describe()
+None setup()
+None shut_down()

}
class Processor{
<<interface>>
+rule_class
+Config
+load_rules()
+process()
+apply_rules()*
}
class Normalizer{
+Config
+rule_class = NormalizerRule
+_config: Normalizer.Config
+apply_rules()
}

class Pseudonymizer{
+Config
+rule_class = PseudonymizerRule
+_config: Pseudonymizer.Config
+apply_rules()
}
class Connector{
<<interface>>
+Config
}
class Input{
<<interface>>
+Config
+_config: Input.Config
-Dict _get_event()*
-None _get_raw_event()
+tuple[dict, error|None] get_next()
}
class Output{
<<interface>>
+Config
+_config: Output.Config
+None store()*
+None store_custom()*
+None store_failed()*
}
class ConfluentKafkaInput{
+Config
+_config: ConfluentKafkaInput.Config
+tuple _get_event()
+bytearray _get_raw_event()
}
class ConfluentKafkaOutput{
+Config
+_config: ConfluentKafkaInput.Config
+None store()
+None store_custom()
+None store_failed()
}

class Configuration{
<<adapter>>
+create
}
class Registry{
+mapping : dict
}

class Factory{
+create()
}


class TestFactory{
+test_check()
+test_create_normalizer()
+test_create_pseudonymizer()
}

class BaseProcessorTestCase{
+test_describe()
+test_load_rules()
+test_process()
+test_apply_rules()*
}

class NormalizerTestCase{
+test_apply_rules()
}

class PseudonymizerTestCase{
+test_apply_rules()
}


.. toctree::
:maxdepth: 2

Expand Down
21 changes: 0 additions & 21 deletions doc/source/user_manual/introduction.rst
Original file line number Diff line number Diff line change
Expand Up @@ -36,27 +36,6 @@ Multiple instances of pipelines are created and run in parallel by different pro
Only one event at a time is processed by each processor.
Therefore, results of a processor should not depend on other events.

.. mermaid::

flowchart LR
A[Input\nConnector] --> B
A[Input\nConnector] --> C
A[Input\nConnector] --> D
subgraph Pipeline 1
B[Normalizer] --> E[Geo-IP Enricher]
E --> F[Dropper]
end
subgraph Pipeline 2
C[Normalizer] --> G[Geo-IP Enricher]
G --> H[Dropper]
end
subgraph Pipeline n
D[Normalizer] --> I[Geo-IP Enricher]
I --> J[Dropper]
end
F --> K[Output\nConnector]
H --> K[Output\nConnector]
J --> K[Output\nConnector]

Processors
==========
Expand Down
38 changes: 27 additions & 11 deletions logprep/abc/processor.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
"""Abstract module for processors"""
import copy
import time
from abc import abstractmethod
from functools import reduce
from logging import DEBUG, Logger
from multiprocessing import current_process
from pathlib import Path
Expand All @@ -16,7 +18,6 @@
ProcessingCriticalError,
ProcessingWarning,
)
from logprep.processor.processor_strategy import SpecificGenericProcessStrategy
from logprep.util import getter
from logprep.util.helper import (
add_and_overwrite,
Expand Down Expand Up @@ -122,7 +123,6 @@ def update_mean_processing_time_per_event(self, new_sample):

def __init__(self, name: str, configuration: "Processor.Config", logger: Logger):
super().__init__(name, configuration, logger)
self._strategy = SpecificGenericProcessStrategy(self._config.apply_multiple_times)
self.metric_labels, specific_tree_labels, generic_tree_labels = self._create_metric_labels()
self._specific_tree = RuleTree(
config_path=self._config.tree_config, metric_labels=specific_tree_labels
Expand Down Expand Up @@ -192,15 +192,31 @@ def process(self, event: dict):
A dictionary representing a log event.
"""
if self._logger.isEnabledFor(DEBUG): # pragma: no cover
self._logger.debug(f"{self.describe()} processing event {event}")
self._strategy.process(
event,
generic_tree=self._generic_tree,
specific_tree=self._specific_tree,
callback=self._apply_rules_wrapper,
processor_metrics=self.metrics,
)
self._logger.debug(f"{self.describe()} processing event {event}")
self.metrics.number_of_processed_events += 1
self._process_rule_tree(event, self._specific_tree)
self._process_rule_tree(event, self._generic_tree)

def _process_rule_tree(self, event: dict, tree: "RuleTree"):
applied_rules = set()

def _process_rule(event, rule):
begin = time.time()
self._apply_rules_wrapper(event, rule)
processing_time = time.time() - begin
rule.metrics._number_of_matches += 1
rule.metrics.update_mean_processing_time(processing_time)
self.metrics.update_mean_processing_time_per_event(processing_time)
applied_rules.add(rule)
return event

if self._config.apply_multiple_times:
matching_rules = tree.get_matching_rules(event)
while matching_rules:
reduce(_process_rule, (event, *matching_rules))
matching_rules = set(tree.get_matching_rules(event)).difference(applied_rules)
else:
reduce(_process_rule, (event, *tree.get_matching_rules(event)))

def _apply_rules_wrapper(self, event, rule):
try:
Expand Down
91 changes: 0 additions & 91 deletions logprep/processor/processor_strategy.py

This file was deleted.

23 changes: 7 additions & 16 deletions logprep/run_logprep.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import sys
import warnings
from argparse import ArgumentParser
from logging import ERROR, Logger, getLogger
from os.path import basename
from pathlib import Path

Expand All @@ -24,21 +23,13 @@
from logprep.util.schema_and_rule_checker import SchemaAndRuleChecker
from logprep.util.time_measurement import TimeMeasurement

from logging import (
getLogger,
basicConfig,
Logger,
)
from logging.handlers import SysLogHandler


warnings.simplefilter("always", DeprecationWarning)
logging.captureWarnings(True)

DEFAULT_LOCATION_CONFIG = "file:///etc/logprep/pipeline.yml"
getLogger("filelock").setLevel(ERROR)
getLogger("urllib3.connectionpool").setLevel(ERROR)
getLogger("elasticsearch").setLevel(ERROR)
logging.getLogger("filelock").setLevel(logging.ERROR)
logging.getLogger("urllib3.connectionpool").setLevel(logging.ERROR)
logging.getLogger("elasticsearch").setLevel(logging.ERROR)


def _parse_arguments():
Expand Down Expand Up @@ -98,7 +89,7 @@ def _parse_arguments():
return arguments


def _run_logprep(arguments, logger: Logger):
def _run_logprep(arguments, logger: logging.Logger):
runner = None
try:
runner = Runner.get_runner()
Expand Down Expand Up @@ -148,15 +139,15 @@ def _setup_logger(args, config: Configuration):
try:
log_config = config.get("logger", {})
log_level = log_config.get("level", "INFO")
basicConfig(
logging.basicConfig(
level=log_level, format="%(asctime)-15s %(name)-5s %(levelname)-8s: %(message)s"
)
logger = logging.getLogger("Logprep")
logger.info(f"Log level set to '{log_level}'")
for version in get_versions_string(args).split("\n"):
logger.info(version)
except BaseException as error: # pylint: disable=broad-except
getLogger("Logprep").exception(error)
logging.getLogger("Logprep").exception(error)
sys.exit(1)
return logger

Expand Down Expand Up @@ -187,7 +178,7 @@ def _setup_metrics_and_time_measurement(args, config, logger):
logger.debug(f"Config path: {args.config}")


def _validate_rules(args, config: Configuration, logger: Logger):
def _validate_rules(args, config: Configuration, logger: logging.Logger):
try:
config.verify_pipeline_only(logger)
except InvalidConfigurationError as error:
Expand Down
Loading

0 comments on commit d825a1c

Please sign in to comment.