diff --git a/intelmq/bots/outputs/misp/output_feed.py b/intelmq/bots/outputs/misp/output_feed.py index cbeeec09e..49829d9ed 100644 --- a/intelmq/bots/outputs/misp/output_feed.py +++ b/intelmq/bots/outputs/misp/output_feed.py @@ -5,12 +5,13 @@ # -*- coding: utf-8 -*- import datetime import json +import re from pathlib import Path from uuid import uuid4 -import re from intelmq.lib.bot import OutputBot from intelmq.lib.exceptions import MissingDependencyError +from intelmq.lib.mixins import CacheMixin from intelmq.lib.utils import parse_relative try: @@ -19,19 +20,14 @@ except ImportError: # catching SyntaxError because of https://github.com/MISP/PyMISP/issues/501 MISPEvent = None - import_fail_reason = 'import' -except SyntaxError: - # catching SyntaxError because of https://github.com/MISP/PyMISP/issues/501 - MISPEvent = None - import_fail_reason = 'syntax' - + import_fail_reason = "import" -# NOTE: This module is compatible with Python 3.6+ - -class MISPFeedOutputBot(OutputBot): +class MISPFeedOutputBot(OutputBot, CacheMixin): """Generate an output in the MISP Feed format""" + interval_event: str = "1 hour" + delay_save_event_count: int = None misp_org_name = None misp_org_uuid = None output_dir: str = "/opt/intelmq/var/lib/bots/mispfeed-output" # TODO: should be path @@ -45,13 +41,8 @@ def check_output_dir(dirname): return True def init(self): - if MISPEvent is None and import_fail_reason == 'syntax': - raise MissingDependencyError("pymisp", - version='>=2.4.117.3', - additional_text="Python versions below 3.6 are " - "only supported by pymisp <= 2.4.119.1.") - elif MISPEvent is None: - raise MissingDependencyError('pymisp', version='>=2.4.117.3') + if MISPEvent is None: + raise MissingDependencyError("pymisp", version=">=2.4.117.3") self.current_event = None @@ -71,59 +62,90 @@ def init(self): try: with (self.output_dir / '.current').open() as f: self.current_file = Path(f.read()) - self.current_event = MISPEvent() - self.current_event.load_file(self.current_file) - - last_min_time, last_max_time = re.findall('IntelMQ event (.*) - (.*)', self.current_event.info)[0] - last_min_time = datetime.datetime.strptime(last_min_time, '%Y-%m-%dT%H:%M:%S.%f') - last_max_time = datetime.datetime.strptime(last_max_time, '%Y-%m-%dT%H:%M:%S.%f') - if last_max_time < datetime.datetime.now(): - self.min_time_current = datetime.datetime.now() - self.max_time_current = self.min_time_current + self.timedelta - self.current_event = None - else: - self.min_time_current = last_min_time - self.max_time_current = last_max_time + + if self.current_file.exists(): + self.current_event = MISPEvent() + self.current_event.load_file(self.current_file) + + last_min_time, last_max_time = re.findall( + "IntelMQ event (.*) - (.*)", self.current_event.info + )[0] + last_min_time = datetime.datetime.strptime( + last_min_time, "%Y-%m-%dT%H:%M:%S.%f" + ) + last_max_time = datetime.datetime.strptime( + last_max_time, "%Y-%m-%dT%H:%M:%S.%f" + ) + if last_max_time < datetime.datetime.now(): + self.min_time_current = datetime.datetime.now() + self.max_time_current = self.min_time_current + self.timedelta + self.current_event = None + else: + self.min_time_current = last_min_time + self.max_time_current = last_max_time except: - self.logger.exception("Loading current event %s failed. Skipping it.", self.current_event) + self.logger.exception( + "Loading current event %s failed. Skipping it.", self.current_event + ) self.current_event = None else: self.min_time_current = datetime.datetime.now() self.max_time_current = self.min_time_current + self.timedelta def process(self): - if not self.current_event or datetime.datetime.now() > self.max_time_current: self.min_time_current = datetime.datetime.now() self.max_time_current = self.min_time_current + self.timedelta self.current_event = MISPEvent() - self.current_event.info = ('IntelMQ event {begin} - {end}' - ''.format(begin=self.min_time_current.isoformat(), - end=self.max_time_current.isoformat())) + self.current_event.info = "IntelMQ event {begin} - {end}" "".format( + begin=self.min_time_current.isoformat(), + end=self.max_time_current.isoformat(), + ) self.current_event.set_date(datetime.date.today()) self.current_event.Orgc = self.misp_org self.current_event.uuid = str(uuid4()) - self.current_file = self.output_dir / f'{self.current_event.uuid}.json' - with (self.output_dir / '.current').open('w') as f: + self.current_file = self.output_dir / f"{self.current_event.uuid}.json" + with (self.output_dir / ".current").open("w") as f: f.write(str(self.current_file)) + # On startup or when timeout occurs, clean the queue to ensure we do not + # keep events forever because there was not enough generated + self._generate_feed() + event = self.receive_message().to_dict(jsondict_as_string=True) - obj = self.current_event.add_object(name='intelmq_event') - for object_relation, value in event.items(): + cache_size = None + if self.delay_save_event_count: + cache_size = self.cache_put(event) + + if cache_size is None: + self._generate_feed(event) + elif cache_size >= self.delay_save_event_count: + self._generate_feed() + + self.acknowledge_message() + + def _add_message_to_feed(self, message: dict): + obj = self.current_event.add_object(name="intelmq_event") + for object_relation, value in message.items(): try: obj.add_attribute(object_relation, value=value) except NewAttributeError: # This entry isn't listed in the harmonization file, ignoring. pass - feed_output = self.current_event.to_feed(with_meta=False) + def _generate_feed(self, message: dict = None): + if message: + self._add_message_to_feed(message) + + while message := self.cache_pop(): + self._add_message_to_feed(message) - with self.current_file.open('w') as f: + feed_output = self.current_event.to_feed(with_meta=False) + with self.current_file.open("w") as f: json.dump(feed_output, f) feed_meta_generator(self.output_dir) - self.acknowledge_message() @staticmethod def check(parameters): diff --git a/intelmq/lib/bot.py b/intelmq/lib/bot.py index f1b0ed333..ef09f51a3 100644 --- a/intelmq/lib/bot.py +++ b/intelmq/lib/bot.py @@ -279,6 +279,10 @@ def catch_shutdown(): def harmonization(self): return self._harmonization + @property + def bot_id(self): + return self.__bot_id_full + def __handle_sigterm_signal(self, signum: int, stack: Optional[object]): """ Calls when a SIGTERM is received. Stops the bot. diff --git a/intelmq/lib/mixins/cache.py b/intelmq/lib/mixins/cache.py index 3cf536502..956517540 100644 --- a/intelmq/lib/mixins/cache.py +++ b/intelmq/lib/mixins/cache.py @@ -1,4 +1,4 @@ -""" CacheMixin for IntelMQ +"""CacheMixin for IntelMQ SPDX-FileCopyrightText: 2021 Sebastian Waldbauer SPDX-License-Identifier: AGPL-3.0-or-later @@ -6,6 +6,7 @@ CacheMixin is used for caching/storing data in redis. """ +import json from typing import Any, Optional import redis import intelmq.lib.utils as utils @@ -31,7 +32,9 @@ def __init__(self, **kwargs): "socket_timeout": 5, } - self.__redis = redis.Redis(db=self.redis_cache_db, password=self.redis_cache_password, **kwargs) + self.__redis = redis.Redis( + db=self.redis_cache_db, password=self.redis_cache_password, **kwargs + ) super().__init__() def cache_exists(self, key: str): @@ -51,6 +54,17 @@ def cache_set(self, key: str, value: Any, ttl: Optional[int] = None): if self.redis_cache_ttl: self.__redis.expire(key, self.redis_cache_ttl) + def cache_put(self, value: dict) -> int: + # Returns the length of the list after pushing + size = self.__redis.lpush(self.bot_id, json.dumps(value)) + return size + + def cache_pop(self) -> dict: + data = self.__redis.rpop(self.bot_id) + if data is None: + return None + return json.loads(data) + def cache_flush(self): """ Flushes the currently opened database by calling FLUSHDB. diff --git a/intelmq/tests/bots/outputs/misp/test_output_feed.py b/intelmq/tests/bots/outputs/misp/test_output_feed.py index 783f2bfa9..1627e29c4 100644 --- a/intelmq/tests/bots/outputs/misp/test_output_feed.py +++ b/intelmq/tests/bots/outputs/misp/test_output_feed.py @@ -3,8 +3,9 @@ # SPDX-License-Identifier: AGPL-3.0-or-later # -*- coding: utf-8 -*- +import json import unittest -import sys +from pathlib import Path from tempfile import TemporaryDirectory import intelmq.lib.test as test @@ -37,9 +38,9 @@ @test.skip_exotic() class TestMISPFeedOutputBot(test.BotTestCase, unittest.TestCase): - @classmethod def set_bot(cls): + cls.use_cache = True cls.bot_reference = MISPFeedOutputBot cls.default_input_message = EXAMPLE_EVENT cls.directory = TemporaryDirectory() @@ -51,10 +52,57 @@ def set_bot(cls): def test_event(self): self.run_bot() + current_event = open(f"{self.directory.name}/.current").read() + with open(current_event) as f: + objects = json.load(f).get("Event", {}).get("Object", []) + assert len(objects) == 1 + + def test_accumulating_events(self): + self.input_message = [EXAMPLE_EVENT, EXAMPLE_EVENT] + self.run_bot(iterations=2, parameters={"delay_save_event_count": 3}) + + current_event = open(f"{self.directory.name}/.current").read() + + # First, the feed is empty - not enough events came + with open(current_event) as f: + objects = json.load(f).get("Event", {}).get("Object", []) + assert len(objects) == 0 + + self.input_message = [EXAMPLE_EVENT] + self.run_bot(parameters={"delay_save_event_count": 3}) + + # When enough events were collected, save them + with open(current_event) as f: + objects = json.load(f)["Event"]["Object"] + assert len(objects) == 3 + + self.input_message = [EXAMPLE_EVENT, EXAMPLE_EVENT, EXAMPLE_EVENT] + self.run_bot(iterations=3, parameters={"delay_save_event_count": 3}) + + # We continue saving to the same file until interval timeout + with open(current_event) as f: + objects = json.load(f)["Event"]["Object"] + assert len(objects) == 6 + + # Simulating leftovers in the queue when it's time to generate new event + Path(f"{self.directory.name}/.current").unlink() + self.bot.cache_put(EXAMPLE_EVENT) + self.run_bot(parameters={"delay_save_event_count": 3}) + + new_event = open(f"{self.directory.name}/.current").read() + with open(new_event) as f: + objects = json.load(f)["Event"]["Object"] + assert len(objects) == 1 + + + def tearDown(self): + self.cache.delete(self.bot_id) + super().tearDown() + @classmethod def tearDownClass(cls): cls.directory.cleanup() -if __name__ == '__main__': # pragma: no cover +if __name__ == "__main__": # pragma: no cover unittest.main()