Skip to content

Commit

Permalink
enable add_field_to to always take a batch of fields
Browse files Browse the repository at this point in the history
- Replace `add_batch_to` with `add_field_to` throughout code.
- Update helper functions to streamline `add_field_to` usage.
  • Loading branch information
dtrai2 committed Nov 13, 2024
1 parent 905e53e commit 434216e
Show file tree
Hide file tree
Showing 18 changed files with 60 additions and 57 deletions.
8 changes: 4 additions & 4 deletions logprep/abc/input.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from logprep.abc.exceptions import LogprepException
from logprep.metrics.metrics import Metric
from logprep.processor.base.exceptions import FieldExistsWarning
from logprep.util.helper import add_batch_to, add_field_to, get_dotted_field_value
from logprep.util.helper import add_field_to, get_dotted_field_value
from logprep.util.time import UTC, TimeParser
from logprep.util.validators import dict_structure_validator

Expand Down Expand Up @@ -308,7 +308,7 @@ def _add_env_enrichment_to_event(self, event: dict):
target: os.environ.get(variable_name, "")
for target, variable_name in enrichments.items()
}
add_batch_to(event, fields)
add_field_to(event, fields)

def _add_arrival_time_information_to_event(self, event: dict):
new_field = {
Expand All @@ -332,13 +332,13 @@ def _add_arrival_timedelta_information_to_event(self, event: dict):
TimeParser.from_string(log_arrival_time).astimezone(UTC)
- TimeParser.from_string(time_reference).astimezone(UTC)
).total_seconds()
add_field_to(event, field={target_field: delta_time_sec})
add_field_to(event, fields={target_field: delta_time_sec})

def _add_version_information_to_event(self, event: dict):
"""Add the version information to the event"""
target_field = self._config.preprocessing.get("version_info_target_field")
# pylint: disable=protected-access
add_field_to(event, field={target_field: self._config._version_information})
add_field_to(event, fields={target_field: self._config._version_information})
# pylint: enable=protected-access

def _add_hmac_to(self, event_dict, raw_event) -> dict:
Expand Down
2 changes: 1 addition & 1 deletion logprep/abc/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,7 @@ def _has_missing_values(self, event, rule, source_field_dict):
def _write_target_field(self, event: dict, rule: "Rule", result: any) -> None:
add_field_to(
event,
field={rule.target_field: result},
fields={rule.target_field: result},
extends_lists=rule.extend_target_list,
overwrite_target_field=rule.overwrite_target,
)
Expand Down
6 changes: 3 additions & 3 deletions logprep/metrics/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,12 +222,12 @@ def inner(self, *args, **kwargs): # nosemgrep
if hasattr(self, "rule_type"):
event = args[0]
if event:
add_field_to(event, field={f"processing_times.{self.rule_type}": duration})
add_field_to(event, fields={f"processing_times.{self.rule_type}": duration})
if hasattr(self, "_logprep_config"): # attribute of the Pipeline class
event = args[0]
if event:
add_field_to(event, field={"processing_times.pipeline": duration})
add_field_to(event, field={"processing_times.hostname": gethostname()})
add_field_to(event, fields={"processing_times.pipeline": duration})
add_field_to(event, fields={"processing_times.hostname": gethostname()})
return result

return inner
Expand Down
2 changes: 1 addition & 1 deletion logprep/processor/clusterer/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ def _cluster(self, event: dict, rule: ClustererRule):
cluster_signature = cluster_signature_based_on_message
add_field_to(
event,
field={self._config.output_field_name: cluster_signature},
fields={self._config.output_field_name: cluster_signature},
extends_lists=rule.extend_target_list,
overwrite_target_field=rule.overwrite_target,
)
Expand Down
8 changes: 4 additions & 4 deletions logprep/processor/domain_label_extractor/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
from logprep.processor.domain_label_extractor.rule import DomainLabelExtractorRule
from logprep.processor.field_manager.processor import FieldManager
from logprep.util.getter import GetterFactory
from logprep.util.helper import add_and_overwrite, add_batch_to, get_dotted_field_value
from logprep.util.helper import add_and_overwrite, add_field_to, get_dotted_field_value
from logprep.util.validators import list_of_urls_validator

logger = logging.getLogger("DomainLabelExtractor")
Expand Down Expand Up @@ -130,7 +130,7 @@ def _apply_rules(self, event, rule: DomainLabelExtractorRule):

if self._is_valid_ip(domain):
tagging_field.append(f"ip_in_{rule.source_fields[0].replace('.', '_')}")
add_and_overwrite(event, field={self._config.tagging_field_name: tagging_field})
add_and_overwrite(event, fields={self._config.tagging_field_name: tagging_field})
return

labels = self._tld_extractor(domain)
Expand All @@ -140,10 +140,10 @@ def _apply_rules(self, event, rule: DomainLabelExtractorRule):
f"{rule.target_field}.top_level_domain": labels.suffix,
f"{rule.target_field}.subdomain": labels.subdomain,
}
add_batch_to(event, fields, overwrite_target_field=rule.overwrite_target)
add_field_to(event, fields, overwrite_target_field=rule.overwrite_target)
else:
tagging_field.append(f"invalid_domain_in_{rule.source_fields[0].replace('.', '_')}")
add_and_overwrite(event, field={self._config.tagging_field_name: tagging_field})
add_and_overwrite(event, fields={self._config.tagging_field_name: tagging_field})

@staticmethod
def _is_valid_ip(domain):
Expand Down
15 changes: 7 additions & 8 deletions logprep/processor/field_manager/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
from logprep.processor.field_manager.rule import FieldManagerRule
from logprep.util.helper import (
add_and_overwrite,
add_batch_to,
add_field_to,
get_dotted_field_value,
pop_dotted_field_value,
Expand Down Expand Up @@ -78,7 +77,7 @@ def _apply_mapping(self, event, rule, rule_args):
if not any(source_field_values):
return
source_field_values, targets = self._filter_missing_fields(source_field_values, targets)
add_batch_to(
add_field_to(
event, dict(zip(targets, source_field_values)), extend_target_list, overwrite_target
)
if rule.delete_source_fields:
Expand Down Expand Up @@ -106,7 +105,7 @@ def _write_to_single_target(self, args, extend_target_list, overwrite_target, ru
case State(
extend=True, overwrite=True, single_source_element=False, target_is_list=False
):
add_and_overwrite(event, field={target_field: source_fields_values})
add_and_overwrite(event, fields={target_field: source_fields_values})
return

case State(
Expand All @@ -118,30 +117,30 @@ def _write_to_single_target(self, args, extend_target_list, overwrite_target, ru
):
flattened_source_fields = self._overwrite_from_source_values(source_fields_values)
source_fields_values = [*flattened_source_fields]
add_and_overwrite(event, field={target_field: source_fields_values})
add_and_overwrite(event, fields={target_field: source_fields_values})
return

case State(extend=True, overwrite=False, target_is_list=False, target_is_none=True):
add_and_overwrite(event, field={target_field: source_fields_values})
add_and_overwrite(event, fields={target_field: source_fields_values})
return

case State(extend=True, overwrite=False, target_is_list=False):
source_fields_values = [target_field_value, *source_fields_values]
add_and_overwrite(event, field={target_field: source_fields_values})
add_and_overwrite(event, fields={target_field: source_fields_values})
return

case State(
extend=True, overwrite=False, single_source_element=False, target_is_list=True
):
flattened_source_fields = self._overwrite_from_source_values(source_fields_values)
source_fields_values = [*target_field_value, *flattened_source_fields]
add_and_overwrite(event, field={target_field: source_fields_values})
add_and_overwrite(event, fields={target_field: source_fields_values})
return

case State(overwrite=True, extend=True):
flattened_source_fields = self._overwrite_from_source_values(source_fields_values)
source_fields_values = [*flattened_source_fields]
add_and_overwrite(event, field={target_field: source_fields_values})
add_and_overwrite(event, fields={target_field: source_fields_values})
return

case _:
Expand Down
4 changes: 2 additions & 2 deletions logprep/processor/generic_adder/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
from logprep.factory_error import InvalidConfigurationError
from logprep.processor.generic_adder.mysql_connector import MySQLConnector
from logprep.processor.generic_adder.rule import GenericAdderRule
from logprep.util.helper import add_batch_to, get_dotted_field_value
from logprep.util.helper import add_field_to, get_dotted_field_value


def sql_config_validator(_, attribute, value):
Expand Down Expand Up @@ -230,7 +230,7 @@ def _apply_rules(self, event: dict, rule: GenericAdderRule):
self._update_db_table()
items_to_add = self._get_items_to_add_from_db(event, rule)
if items_to_add:
add_batch_to(event, items_to_add, rule.extend_target_list, rule.overwrite_target)
add_field_to(event, items_to_add, rule.extend_target_list, rule.overwrite_target)

def _get_items_to_add_from_db(self, event: dict, rule: GenericAdderRule) -> dict | None:
"""Get the sub part of the value from the event using a regex pattern"""
Expand Down
2 changes: 1 addition & 1 deletion logprep/processor/generic_resolver/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ def _apply_rules(self, event, rule):
try:
add_field_to(
event,
field={target_field: content},
fields={target_field: content},
extends_lists=rule.extend_target_list,
overwrite_target_field=rule.overwrite_target,
)
Expand Down
4 changes: 2 additions & 2 deletions logprep/processor/geoip_enricher/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
from logprep.processor.field_manager.processor import FieldManager
from logprep.processor.geoip_enricher.rule import GEOIP_DATA_STUBS, GeoipEnricherRule
from logprep.util.getter import GetterFactory
from logprep.util.helper import add_batch_to, get_dotted_field_value
from logprep.util.helper import add_field_to, get_dotted_field_value

logger = logging.getLogger("GeoipEnricher")

Expand Down Expand Up @@ -132,7 +132,7 @@ def _apply_rules(self, event, rule):
rule.customize_target_subfields.get(target, f"{rule.target_field}.{target}"): value
for target, value in geoip_data.items()
}
add_batch_to(
add_field_to(
event,
fields,
extends_lists=False,
Expand Down
4 changes: 2 additions & 2 deletions logprep/processor/grokker/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
from logprep.processor.field_manager.processor import FieldManager
from logprep.processor.grokker.rule import GrokkerRule
from logprep.util.getter import GetterFactory
from logprep.util.helper import add_batch_to, get_dotted_field_value
from logprep.util.helper import add_field_to, get_dotted_field_value

logger = logging.getLogger("Grokker")

Expand Down Expand Up @@ -85,7 +85,7 @@ def _apply_rules(self, event: dict, rule: GrokkerRule):
if result is None or result == {}:
continue
matches.append(True)
add_batch_to(
add_field_to(
event,
result,
extends_lists=rule.extend_target_list,
Expand Down
2 changes: 1 addition & 1 deletion logprep/processor/hyperscan_resolver/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ def _apply_rules(self, event: dict, rule: HyperscanResolverRule):
try:
add_field_to(
event,
field={resolve_target: dest_val},
fields={resolve_target: dest_val},
extends_lists=rule.extend_target_list,
overwrite_target_field=rule.overwrite_target,
)
Expand Down
6 changes: 3 additions & 3 deletions logprep/processor/labeler/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
from logprep.abc.processor import Processor
from logprep.processor.labeler.labeling_schema import LabelingSchema
from logprep.processor.labeler.rule import LabelerRule
from logprep.util.helper import add_batch_to, get_dotted_field_value
from logprep.util.helper import add_field_to, get_dotted_field_value


class Labeler(Processor):
Expand Down Expand Up @@ -74,10 +74,10 @@ def setup(self):
def _apply_rules(self, event, rule):
"""Applies the rule to the current event"""
fields = {key: value for key, value in rule.prefixed_label.items()}
add_batch_to(event, fields, extends_lists=True)
add_field_to(event, fields, extends_lists=True)
# convert sets into sorted lists
fields = {
key: sorted(set(get_dotted_field_value(event, key)))
for key, _ in rule.prefixed_label.items()
}
add_batch_to(event, fields, overwrite_target_field=True)
add_field_to(event, fields, overwrite_target_field=True)
4 changes: 2 additions & 2 deletions logprep/processor/list_comparison/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,8 @@ def _apply_rules(self, event, rule):
"""
comparison_result, comparison_key = self._list_comparison(rule, event)
if comparison_result is not None:
field = {f"{rule.target_field}.{comparison_key}": comparison_result}
add_field_to(event, field, extends_lists=True)
fields = {f"{rule.target_field}.{comparison_key}": comparison_result}
add_field_to(event, fields, extends_lists=True)

def _list_comparison(self, rule: ListComparisonRule, event: dict):
"""
Expand Down
2 changes: 1 addition & 1 deletion logprep/processor/pseudonymizer/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ def _apply_rules(self, event: dict, rule: PseudonymizerRule):
]
else:
field_value = self._pseudonymize_field(rule, dotted_field, regex, field_value)
add_field_to(event, field={dotted_field: field_value}, overwrite_target_field=True)
add_field_to(event, fields={dotted_field: field_value}, overwrite_target_field=True)
if "@timestamp" in event:
for pseudonym, _ in self.result.data:
pseudonym["@timestamp"] = event["@timestamp"]
Expand Down
6 changes: 3 additions & 3 deletions logprep/processor/requester/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
from logprep.processor.base.exceptions import FieldExistsWarning
from logprep.processor.field_manager.processor import FieldManager
from logprep.processor.requester.rule import RequesterRule
from logprep.util.helper import add_batch_to, add_field_to, get_source_fields_dict
from logprep.util.helper import add_field_to, get_source_fields_dict

TEMPLATE_KWARGS = ("url", "json", "data", "params")

Expand Down Expand Up @@ -72,7 +72,7 @@ def _handle_response(self, event, rule, response):
try:
add_field_to(
event,
field={rule.target_field: self._get_result(response)},
fields={rule.target_field: self._get_result(response)},
extends_lists=rule.extend_target_list,
overwrite_target_field=rule.overwrite_target,
)
Expand All @@ -83,7 +83,7 @@ def _handle_response(self, event, rule, response):
contents = self._get_field_values(self._get_result(response), source_fields)
targets = rule.target_field_mapping.values()
try:
add_batch_to(
add_field_to(
event,
dict(zip(targets, contents)),
rule.extend_target_list,
Expand Down
4 changes: 2 additions & 2 deletions logprep/processor/selective_extractor/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@

from logprep.processor.field_manager.processor import FieldManager
from logprep.processor.selective_extractor.rule import SelectiveExtractorRule
from logprep.util.helper import add_batch_to, get_source_fields_dict
from logprep.util.helper import add_field_to, get_source_fields_dict


class SelectiveExtractor(FieldManager):
Expand Down Expand Up @@ -64,5 +64,5 @@ def _apply_rules(self, event: dict, rule: SelectiveExtractorRule):
}
if flattened_fields:
filtered_event = {}
add_batch_to(filtered_event, flattened_fields)
add_field_to(filtered_event, flattened_fields)
self.result.data.append((filtered_event, rule.outputs))
2 changes: 1 addition & 1 deletion logprep/processor/template_replacer/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ def _perform_replacement(self, event: dict, replacement: str, rule: TemplateRepl
"""
overwrite = get_dotted_field_value(event, self._target_field) is not None
add_field_to(
event, field={self._target_field: replacement}, overwrite_target_field=overwrite
event, fields={self._target_field: replacement}, overwrite_target_field=overwrite
)

def setup(self):
Expand Down
Loading

0 comments on commit 434216e

Please sign in to comment.