From b40a200406dde535ea0b6436faf9b69ac640876a Mon Sep 17 00:00:00 2001 From: Kamil Mankowski Date: Thu, 27 Jun 2024 16:34:57 +0200 Subject: [PATCH] ENH: Add possibility to delay generating MISP Feed Generating MISP feed on every incoming message slows down processing. The new config option let us decide to save them in batches. Cached events are stored in a cache list in Redis. In addition, a code related to Python 3.6 was removed as we do not support this version any more. --- intelmq/bots/outputs/misp/output_feed.py | 106 +++++++++++------- intelmq/lib/bot.py | 4 + intelmq/lib/mixins/cache.py | 18 ++- .../bots/outputs/misp/test_output_feed.py | 54 ++++++++- 4 files changed, 135 insertions(+), 47 deletions(-) diff --git a/intelmq/bots/outputs/misp/output_feed.py b/intelmq/bots/outputs/misp/output_feed.py index cbeeec09e..49829d9ed 100644 --- a/intelmq/bots/outputs/misp/output_feed.py +++ b/intelmq/bots/outputs/misp/output_feed.py @@ -5,12 +5,13 @@ # -*- coding: utf-8 -*- import datetime import json +import re from pathlib import Path from uuid import uuid4 -import re from intelmq.lib.bot import OutputBot from intelmq.lib.exceptions import MissingDependencyError +from intelmq.lib.mixins import CacheMixin from intelmq.lib.utils import parse_relative try: @@ -19,19 +20,14 @@ except ImportError: # catching SyntaxError because of https://github.com/MISP/PyMISP/issues/501 MISPEvent = None - import_fail_reason = 'import' -except SyntaxError: - # catching SyntaxError because of https://github.com/MISP/PyMISP/issues/501 - MISPEvent = None - import_fail_reason = 'syntax' - + import_fail_reason = "import" -# NOTE: This module is compatible with Python 3.6+ - -class MISPFeedOutputBot(OutputBot): +class MISPFeedOutputBot(OutputBot, CacheMixin): """Generate an output in the MISP Feed format""" + interval_event: str = "1 hour" + delay_save_event_count: int = None misp_org_name = None misp_org_uuid = None output_dir: str = "/opt/intelmq/var/lib/bots/mispfeed-output" # TODO: should be path @@ -45,13 +41,8 @@ def check_output_dir(dirname): return True def init(self): - if MISPEvent is None and import_fail_reason == 'syntax': - raise MissingDependencyError("pymisp", - version='>=2.4.117.3', - additional_text="Python versions below 3.6 are " - "only supported by pymisp <= 2.4.119.1.") - elif MISPEvent is None: - raise MissingDependencyError('pymisp', version='>=2.4.117.3') + if MISPEvent is None: + raise MissingDependencyError("pymisp", version=">=2.4.117.3") self.current_event = None @@ -71,59 +62,90 @@ def init(self): try: with (self.output_dir / '.current').open() as f: self.current_file = Path(f.read()) - self.current_event = MISPEvent() - self.current_event.load_file(self.current_file) - - last_min_time, last_max_time = re.findall('IntelMQ event (.*) - (.*)', self.current_event.info)[0] - last_min_time = datetime.datetime.strptime(last_min_time, '%Y-%m-%dT%H:%M:%S.%f') - last_max_time = datetime.datetime.strptime(last_max_time, '%Y-%m-%dT%H:%M:%S.%f') - if last_max_time < datetime.datetime.now(): - self.min_time_current = datetime.datetime.now() - self.max_time_current = self.min_time_current + self.timedelta - self.current_event = None - else: - self.min_time_current = last_min_time - self.max_time_current = last_max_time + + if self.current_file.exists(): + self.current_event = MISPEvent() + self.current_event.load_file(self.current_file) + + last_min_time, last_max_time = re.findall( + "IntelMQ event (.*) - (.*)", self.current_event.info + )[0] + last_min_time = datetime.datetime.strptime( + last_min_time, "%Y-%m-%dT%H:%M:%S.%f" + ) + last_max_time = datetime.datetime.strptime( + last_max_time, "%Y-%m-%dT%H:%M:%S.%f" + ) + if last_max_time < datetime.datetime.now(): + self.min_time_current = datetime.datetime.now() + self.max_time_current = self.min_time_current + self.timedelta + self.current_event = None + else: + self.min_time_current = last_min_time + self.max_time_current = last_max_time except: - self.logger.exception("Loading current event %s failed. Skipping it.", self.current_event) + self.logger.exception( + "Loading current event %s failed. Skipping it.", self.current_event + ) self.current_event = None else: self.min_time_current = datetime.datetime.now() self.max_time_current = self.min_time_current + self.timedelta def process(self): - if not self.current_event or datetime.datetime.now() > self.max_time_current: self.min_time_current = datetime.datetime.now() self.max_time_current = self.min_time_current + self.timedelta self.current_event = MISPEvent() - self.current_event.info = ('IntelMQ event {begin} - {end}' - ''.format(begin=self.min_time_current.isoformat(), - end=self.max_time_current.isoformat())) + self.current_event.info = "IntelMQ event {begin} - {end}" "".format( + begin=self.min_time_current.isoformat(), + end=self.max_time_current.isoformat(), + ) self.current_event.set_date(datetime.date.today()) self.current_event.Orgc = self.misp_org self.current_event.uuid = str(uuid4()) - self.current_file = self.output_dir / f'{self.current_event.uuid}.json' - with (self.output_dir / '.current').open('w') as f: + self.current_file = self.output_dir / f"{self.current_event.uuid}.json" + with (self.output_dir / ".current").open("w") as f: f.write(str(self.current_file)) + # On startup or when timeout occurs, clean the queue to ensure we do not + # keep events forever because there was not enough generated + self._generate_feed() + event = self.receive_message().to_dict(jsondict_as_string=True) - obj = self.current_event.add_object(name='intelmq_event') - for object_relation, value in event.items(): + cache_size = None + if self.delay_save_event_count: + cache_size = self.cache_put(event) + + if cache_size is None: + self._generate_feed(event) + elif cache_size >= self.delay_save_event_count: + self._generate_feed() + + self.acknowledge_message() + + def _add_message_to_feed(self, message: dict): + obj = self.current_event.add_object(name="intelmq_event") + for object_relation, value in message.items(): try: obj.add_attribute(object_relation, value=value) except NewAttributeError: # This entry isn't listed in the harmonization file, ignoring. pass - feed_output = self.current_event.to_feed(with_meta=False) + def _generate_feed(self, message: dict = None): + if message: + self._add_message_to_feed(message) + + while message := self.cache_pop(): + self._add_message_to_feed(message) - with self.current_file.open('w') as f: + feed_output = self.current_event.to_feed(with_meta=False) + with self.current_file.open("w") as f: json.dump(feed_output, f) feed_meta_generator(self.output_dir) - self.acknowledge_message() @staticmethod def check(parameters): diff --git a/intelmq/lib/bot.py b/intelmq/lib/bot.py index f1b0ed333..ef09f51a3 100644 --- a/intelmq/lib/bot.py +++ b/intelmq/lib/bot.py @@ -279,6 +279,10 @@ def catch_shutdown(): def harmonization(self): return self._harmonization + @property + def bot_id(self): + return self.__bot_id_full + def __handle_sigterm_signal(self, signum: int, stack: Optional[object]): """ Calls when a SIGTERM is received. Stops the bot. diff --git a/intelmq/lib/mixins/cache.py b/intelmq/lib/mixins/cache.py index 3cf536502..956517540 100644 --- a/intelmq/lib/mixins/cache.py +++ b/intelmq/lib/mixins/cache.py @@ -1,4 +1,4 @@ -""" CacheMixin for IntelMQ +"""CacheMixin for IntelMQ SPDX-FileCopyrightText: 2021 Sebastian Waldbauer SPDX-License-Identifier: AGPL-3.0-or-later @@ -6,6 +6,7 @@ CacheMixin is used for caching/storing data in redis. """ +import json from typing import Any, Optional import redis import intelmq.lib.utils as utils @@ -31,7 +32,9 @@ def __init__(self, **kwargs): "socket_timeout": 5, } - self.__redis = redis.Redis(db=self.redis_cache_db, password=self.redis_cache_password, **kwargs) + self.__redis = redis.Redis( + db=self.redis_cache_db, password=self.redis_cache_password, **kwargs + ) super().__init__() def cache_exists(self, key: str): @@ -51,6 +54,17 @@ def cache_set(self, key: str, value: Any, ttl: Optional[int] = None): if self.redis_cache_ttl: self.__redis.expire(key, self.redis_cache_ttl) + def cache_put(self, value: dict) -> int: + # Returns the length of the list after pushing + size = self.__redis.lpush(self.bot_id, json.dumps(value)) + return size + + def cache_pop(self) -> dict: + data = self.__redis.rpop(self.bot_id) + if data is None: + return None + return json.loads(data) + def cache_flush(self): """ Flushes the currently opened database by calling FLUSHDB. diff --git a/intelmq/tests/bots/outputs/misp/test_output_feed.py b/intelmq/tests/bots/outputs/misp/test_output_feed.py index 783f2bfa9..1627e29c4 100644 --- a/intelmq/tests/bots/outputs/misp/test_output_feed.py +++ b/intelmq/tests/bots/outputs/misp/test_output_feed.py @@ -3,8 +3,9 @@ # SPDX-License-Identifier: AGPL-3.0-or-later # -*- coding: utf-8 -*- +import json import unittest -import sys +from pathlib import Path from tempfile import TemporaryDirectory import intelmq.lib.test as test @@ -37,9 +38,9 @@ @test.skip_exotic() class TestMISPFeedOutputBot(test.BotTestCase, unittest.TestCase): - @classmethod def set_bot(cls): + cls.use_cache = True cls.bot_reference = MISPFeedOutputBot cls.default_input_message = EXAMPLE_EVENT cls.directory = TemporaryDirectory() @@ -51,10 +52,57 @@ def set_bot(cls): def test_event(self): self.run_bot() + current_event = open(f"{self.directory.name}/.current").read() + with open(current_event) as f: + objects = json.load(f).get("Event", {}).get("Object", []) + assert len(objects) == 1 + + def test_accumulating_events(self): + self.input_message = [EXAMPLE_EVENT, EXAMPLE_EVENT] + self.run_bot(iterations=2, parameters={"delay_save_event_count": 3}) + + current_event = open(f"{self.directory.name}/.current").read() + + # First, the feed is empty - not enough events came + with open(current_event) as f: + objects = json.load(f).get("Event", {}).get("Object", []) + assert len(objects) == 0 + + self.input_message = [EXAMPLE_EVENT] + self.run_bot(parameters={"delay_save_event_count": 3}) + + # When enough events were collected, save them + with open(current_event) as f: + objects = json.load(f)["Event"]["Object"] + assert len(objects) == 3 + + self.input_message = [EXAMPLE_EVENT, EXAMPLE_EVENT, EXAMPLE_EVENT] + self.run_bot(iterations=3, parameters={"delay_save_event_count": 3}) + + # We continue saving to the same file until interval timeout + with open(current_event) as f: + objects = json.load(f)["Event"]["Object"] + assert len(objects) == 6 + + # Simulating leftovers in the queue when it's time to generate new event + Path(f"{self.directory.name}/.current").unlink() + self.bot.cache_put(EXAMPLE_EVENT) + self.run_bot(parameters={"delay_save_event_count": 3}) + + new_event = open(f"{self.directory.name}/.current").read() + with open(new_event) as f: + objects = json.load(f)["Event"]["Object"] + assert len(objects) == 1 + + + def tearDown(self): + self.cache.delete(self.bot_id) + super().tearDown() + @classmethod def tearDownClass(cls): cls.directory.cleanup() -if __name__ == '__main__': # pragma: no cover +if __name__ == "__main__": # pragma: no cover unittest.main()