Skip to content

Commit

Permalink
refactor validation of generic_resolver rules to startup (#694)
Browse files Browse the repository at this point in the history
* refactor validation of `generic_resolver` rules to startup

- Move validation logic to `__attrs_post_init__` in GenericResolverRule.
- Remove redundant error handling in GenericResolver processor.
- Update corresponding tests to reflect changes.
  • Loading branch information
dtrai2 authored Nov 11, 2024
1 parent 81d6897 commit 382848f
Show file tree
Hide file tree
Showing 6 changed files with 87 additions and 102 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@
### Breaking
### Features
### Improvements

* replace `BaseException` with `Exception` for custom errors
* refactor `generic_resolver` to validate rules on startup instead of application of each rule

### Bugfix

- fix `confluent_kafka.store_offsets` if `last_valid_record` is `None`, can happen if a rebalancing happens
Expand Down
58 changes: 3 additions & 55 deletions logprep/processor/generic_resolver/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,40 +28,20 @@
import re
from typing import Union

from logprep.processor.base.exceptions import (
FieldExistsWarning,
ProcessingCriticalError,
)
from logprep.processor.base.exceptions import FieldExistsWarning
from logprep.processor.field_manager.processor import FieldManager
from logprep.processor.generic_resolver.rule import GenericResolverRule
from logprep.util.getter import GetterFactory
from logprep.util.helper import add_field_to, get_dotted_field_value


class GenericResolverError(ProcessingCriticalError):
"""Base class for GenericResolver related exceptions."""

def __init__(self, name: str, message: str, rule: GenericResolverRule):
super().__init__(f"{name}: {message}", rule=rule)


class GenericResolver(FieldManager):
"""Resolve values in documents by referencing a mapping list."""

__slots__ = ["_replacements_from_file"]

_replacements_from_file: dict

rule_class = GenericResolverRule

def __init__(self, name: str, configuration: FieldManager.Config):
super().__init__(name=name, configuration=configuration)
self._replacements_from_file = {}

def _apply_rules(self, event, rule):
"""Apply the given rule to the current event"""
conflicting_fields = []
self.ensure_rules_from_file(rule)

source_values = []
for source_field, target_field in rule.field_mapping.items():
Expand All @@ -73,17 +53,10 @@ def _apply_rules(self, event, rule):
# FILE
if rule.resolve_from_file:
pattern = f'^{rule.resolve_from_file["pattern"]}$'
replacements = self._replacements_from_file[rule.resolve_from_file["path"]]
replacements = rule.resolve_from_file["additions"]
matches = re.match(pattern, source_value)
if matches:
mapping = matches.group("mapping") if "mapping" in matches.groupdict() else None
if mapping is None:
raise GenericResolverError(
self.name,
"Mapping group is missing in mapping file pattern!",
rule=rule,
)
dest_val = replacements.get(mapping)
dest_val = replacements.get(matches.group("mapping"))
if dest_val:
success = self._add_uniquely_to_list(event, rule, target_field, dest_val)
if not success:
Expand Down Expand Up @@ -132,28 +105,3 @@ def _add_uniquely_to_list(
return add_success
add_success = add_field_to(event, target, content, extends_lists=rule.extend_target_list)
return add_success

def ensure_rules_from_file(self, rule):
"""loads rules from file"""
if rule.resolve_from_file:
if rule.resolve_from_file["path"] not in self._replacements_from_file:
try:
add_dict = GetterFactory.from_string(rule.resolve_from_file["path"]).get_yaml()
if isinstance(add_dict, dict) and all(
isinstance(value, str) for value in add_dict.values()
):
self._replacements_from_file[rule.resolve_from_file["path"]] = add_dict
else:
raise GenericResolverError(
self.name,
f"Additions file "
f'\'{rule.resolve_from_file["path"]}\''
f" must be a dictionary with string values!",
rule=rule,
)
except FileNotFoundError as error:
raise GenericResolverError(
self.name,
f'Additions file \'{rule.resolve_from_file["path"]}' f"' not found!",
rule=rule,
) from error
24 changes: 24 additions & 0 deletions logprep/processor/generic_resolver/rule.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,13 @@
:noindex:
"""

from pathlib import Path

from attrs import define, field, validators

from logprep.factory_error import InvalidConfigurationError
from logprep.processor.field_manager.rule import FieldManagerRule
from logprep.util.getter import GetterFactory


class GenericResolverRule(FieldManagerRule):
Expand Down Expand Up @@ -122,6 +126,26 @@ class Config(FieldManagerRule.Config):
The resolve list in the file at :code:`path` is then used in conjunction with
the regex pattern in :code:`pattern`."""

def __attrs_post_init__(self):
if self.resolve_from_file:
file_path = self.resolve_from_file["path"]
if "?P<mapping>" not in self.resolve_from_file["pattern"]:
raise InvalidConfigurationError(
f"Mapping group is missing in mapping file pattern! (Rule ID: '{self.id}')"
)
if not Path(file_path).is_file():
raise InvalidConfigurationError(
f"Additions file '{file_path}' not found! (Rule ID: '{self.id}')",
)
add_dict = GetterFactory.from_string(file_path).get_yaml()
if not isinstance(add_dict, dict) or not all(
isinstance(value, str) for value in add_dict.values()
):
raise InvalidConfigurationError(
f"Additions file '{file_path}' must be a dictionary with string values! (Rule ID: '{self.id}')",
)
self.resolve_from_file["additions"] = add_dict

@property
def field_mapping(self) -> dict:
"""Returns the field mapping"""
Expand Down
19 changes: 10 additions & 9 deletions logprep/processor/hyperscan_resolver/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,12 @@

from attr import define, field

from logprep.processor.base.exceptions import FieldExistsWarning, SkipImportError
from logprep.processor.base.exceptions import (
FieldExistsWarning,
SkipImportError,
ProcessingCriticalError,
)
from logprep.processor.field_manager.processor import FieldManager
from logprep.processor.generic_resolver.processor import GenericResolverError
from logprep.util.helper import add_field_to, get_dotted_field_value
from logprep.util.validators import directory_validator

Expand All @@ -57,10 +60,6 @@
# pylint: enable=ungrouped-imports


class HyperscanResolverError(GenericResolverError):
"""Base class for HyperscanResolver related exceptions."""


class HyperscanResolver(FieldManager):
"""Resolve values in documents by referencing a mapping list."""

Expand Down Expand Up @@ -169,7 +168,7 @@ def _get_hyperscan_database(self, rule: HyperscanResolverRule):
try:
database, value_mapping = self._load_database(database_id, resolve_list)
except FileNotFoundError:
database, value_mapping = self._create_database(resolve_list)
database, value_mapping = self._create_database(resolve_list, rule)

if rule.store_db_persistent:
self._save_database(database, database_id)
Expand Down Expand Up @@ -201,7 +200,7 @@ def _save_database(self, database: Database, database_id: int):
with open(f"{self._hyperscan_database_path}/{database_id}.db", "wb") as db_file:
db_file.write(serialized_db)

def _create_database(self, resolve_list: dict):
def _create_database(self, resolve_list: dict, rule):
database = Database()
value_mapping = {}
db_patterns = []
Expand All @@ -211,7 +210,9 @@ def _create_database(self, resolve_list: dict):
value_mapping[idx] = resolve_list[pattern]

if not db_patterns:
raise HyperscanResolverError(self.name, "No patter to compile for hyperscan database!")
raise ProcessingCriticalError(
f"{self.name} No patter to compile for hyperscan database!", rule
)

expressions, ids, flags = zip(*db_patterns)
database.compile(expressions=expressions, ids=ids, elements=len(db_patterns), flags=flags)
Expand Down
39 changes: 1 addition & 38 deletions tests/unit/processor/generic_resolver/test_generic_resolver.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,7 @@
# pylint: disable=wrong-import-position
from collections import OrderedDict

from logprep.processor.base.exceptions import (
FieldExistsWarning,
ProcessingCriticalError,
)
from logprep.processor.base.exceptions import FieldExistsWarning
from logprep.processor.generic_resolver.processor import GenericResolver
from tests.unit.processor.base import BaseProcessorTestCase

Expand Down Expand Up @@ -277,40 +274,6 @@ def test_resolve_dotted_field_no_conflict_match_from_file_and_list_has_conflict_

assert document == expected

def test_resolve_dotted_field_no_conflict_match_from_file_group_mapping_does_not_exist(
self,
):
rule = {
"filter": "to_resolve",
"generic_resolver": {
"field_mapping": {"to_resolve": "resolved"},
"resolve_from_file": {
"path": "tests/testdata/unit/generic_resolver/resolve_mapping.yml",
"pattern": r"\d*(?P<foobar>[a-z]+)\d*",
},
"resolve_list": {"FOO": "BAR"},
},
}
self._load_specific_rule(rule)
document = {"to_resolve": "ab"}
result = self.object.process(document)
assert isinstance(result.errors[0], ProcessingCriticalError)
assert "Mapping group is missing in mapping" in result.errors[0].args[0]

def test_resolve_generic_match_from_file_and_file_does_not_exist(self):
rule = {
"filter": "to.resolve",
"generic_resolver": {
"field_mapping": {"to.resolve": "resolved"},
"resolve_from_file": {"path": "foo", "pattern": "bar"},
},
}
self._load_specific_rule(rule)
document = {"to": {"resolve": "something HELLO1"}}
result = self.object.process(document)
assert isinstance(result.errors[0], ProcessingCriticalError)
assert "Additions file 'foo' not found" in result.errors[0].args[0]

def test_resolve_dotted_field_no_conflict_no_match(self):
rule = {
"filter": "to.resolve",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
# pylint: disable=wrong-import-order
import pytest

from logprep.factory_error import InvalidConfigurationError
from logprep.processor.generic_resolver.rule import GenericResolverRule


Expand Down Expand Up @@ -158,3 +159,48 @@ def test_rules_equality(
rule1 = GenericResolverRule._create_from_dict(specific_rule_definition)
rule2 = GenericResolverRule._create_from_dict(other_rule_definition)
assert (rule1 == rule2) == is_equal, testcase

@pytest.mark.parametrize(
["rule", "error", "message"],
[
(
{
"filter": "to_resolve",
"generic_resolver": {
"field_mapping": {"to_resolve": "resolved"},
"resolve_from_file": {
"path": "tests/testdata/unit/generic_resolver/resolve_mapping.yml",
"pattern": r"\d*(?P<foobar>[a-z]+)\d*",
},
"resolve_list": {"FOO": "BAR"},
},
},
InvalidConfigurationError,
"Mapping group is missing in mapping",
),
(
{
"filter": "to.resolve",
"generic_resolver": {
"field_mapping": {"to.resolve": "resolved"},
"resolve_from_file": {
"path": "foo",
"pattern": r"\d*(?P<mapping>[a-z]+)\d*",
},
},
},
InvalidConfigurationError,
"Additions file 'foo' not found",
),
],
)
def test_create_from_dict_validates_config(self, rule, error, message):
if error:
with pytest.raises(error, match=message):
GenericResolverRule._create_from_dict(rule)
else:
rule_instance = GenericResolverRule._create_from_dict(rule)
assert hasattr(rule_instance, "_config")
for key, value in rule.get("generic_resolver").items():
assert hasattr(rule_instance._config, key)
assert value == getattr(rule_instance._config, key)

0 comments on commit 382848f

Please sign in to comment.