Skip to content

Commit

Permalink
ENH: Add possibility to delay generating MISP Feed
Browse files Browse the repository at this point in the history
Generating MISP feed on every incoming message slows down processing.
The new config option let us decide to save them in batches. Cached
events are stored in a cache list in Redis. In addition, a code
related to Python 3.6 was removed as we do not support this version
any more.
  • Loading branch information
kamil-certat committed Jun 27, 2024
1 parent f9474b2 commit b40a200
Show file tree
Hide file tree
Showing 4 changed files with 135 additions and 47 deletions.
106 changes: 64 additions & 42 deletions intelmq/bots/outputs/misp/output_feed.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@
# -*- coding: utf-8 -*-
import datetime
import json
import re
from pathlib import Path
from uuid import uuid4
import re

from intelmq.lib.bot import OutputBot
from intelmq.lib.exceptions import MissingDependencyError
from intelmq.lib.mixins import CacheMixin
from intelmq.lib.utils import parse_relative

try:
Expand All @@ -19,19 +20,14 @@
except ImportError:
# catching SyntaxError because of https://github.com/MISP/PyMISP/issues/501
MISPEvent = None
import_fail_reason = 'import'
except SyntaxError:
# catching SyntaxError because of https://github.com/MISP/PyMISP/issues/501
MISPEvent = None
import_fail_reason = 'syntax'

import_fail_reason = "import"

# NOTE: This module is compatible with Python 3.6+


class MISPFeedOutputBot(OutputBot):
class MISPFeedOutputBot(OutputBot, CacheMixin):
"""Generate an output in the MISP Feed format"""

interval_event: str = "1 hour"
delay_save_event_count: int = None
misp_org_name = None
misp_org_uuid = None
output_dir: str = "/opt/intelmq/var/lib/bots/mispfeed-output" # TODO: should be path
Expand All @@ -45,13 +41,8 @@ def check_output_dir(dirname):
return True

def init(self):
if MISPEvent is None and import_fail_reason == 'syntax':
raise MissingDependencyError("pymisp",
version='>=2.4.117.3',
additional_text="Python versions below 3.6 are "
"only supported by pymisp <= 2.4.119.1.")
elif MISPEvent is None:
raise MissingDependencyError('pymisp', version='>=2.4.117.3')
if MISPEvent is None:
raise MissingDependencyError("pymisp", version=">=2.4.117.3")

self.current_event = None

Expand All @@ -71,59 +62,90 @@ def init(self):
try:
with (self.output_dir / '.current').open() as f:
self.current_file = Path(f.read())
self.current_event = MISPEvent()
self.current_event.load_file(self.current_file)

last_min_time, last_max_time = re.findall('IntelMQ event (.*) - (.*)', self.current_event.info)[0]
last_min_time = datetime.datetime.strptime(last_min_time, '%Y-%m-%dT%H:%M:%S.%f')
last_max_time = datetime.datetime.strptime(last_max_time, '%Y-%m-%dT%H:%M:%S.%f')
if last_max_time < datetime.datetime.now():
self.min_time_current = datetime.datetime.now()
self.max_time_current = self.min_time_current + self.timedelta
self.current_event = None
else:
self.min_time_current = last_min_time
self.max_time_current = last_max_time

if self.current_file.exists():
self.current_event = MISPEvent()
self.current_event.load_file(self.current_file)

last_min_time, last_max_time = re.findall(
"IntelMQ event (.*) - (.*)", self.current_event.info
)[0]
last_min_time = datetime.datetime.strptime(
last_min_time, "%Y-%m-%dT%H:%M:%S.%f"
)
last_max_time = datetime.datetime.strptime(
last_max_time, "%Y-%m-%dT%H:%M:%S.%f"
)
if last_max_time < datetime.datetime.now():
self.min_time_current = datetime.datetime.now()
self.max_time_current = self.min_time_current + self.timedelta
self.current_event = None
else:
self.min_time_current = last_min_time
self.max_time_current = last_max_time
except:
self.logger.exception("Loading current event %s failed. Skipping it.", self.current_event)
self.logger.exception(
"Loading current event %s failed. Skipping it.", self.current_event
)
self.current_event = None
else:
self.min_time_current = datetime.datetime.now()
self.max_time_current = self.min_time_current + self.timedelta

def process(self):

if not self.current_event or datetime.datetime.now() > self.max_time_current:
self.min_time_current = datetime.datetime.now()
self.max_time_current = self.min_time_current + self.timedelta
self.current_event = MISPEvent()
self.current_event.info = ('IntelMQ event {begin} - {end}'
''.format(begin=self.min_time_current.isoformat(),
end=self.max_time_current.isoformat()))
self.current_event.info = "IntelMQ event {begin} - {end}" "".format(
begin=self.min_time_current.isoformat(),
end=self.max_time_current.isoformat(),
)
self.current_event.set_date(datetime.date.today())
self.current_event.Orgc = self.misp_org
self.current_event.uuid = str(uuid4())
self.current_file = self.output_dir / f'{self.current_event.uuid}.json'
with (self.output_dir / '.current').open('w') as f:
self.current_file = self.output_dir / f"{self.current_event.uuid}.json"
with (self.output_dir / ".current").open("w") as f:
f.write(str(self.current_file))

# On startup or when timeout occurs, clean the queue to ensure we do not
# keep events forever because there was not enough generated
self._generate_feed()

event = self.receive_message().to_dict(jsondict_as_string=True)

obj = self.current_event.add_object(name='intelmq_event')
for object_relation, value in event.items():
cache_size = None
if self.delay_save_event_count:
cache_size = self.cache_put(event)

if cache_size is None:
self._generate_feed(event)
elif cache_size >= self.delay_save_event_count:
self._generate_feed()

self.acknowledge_message()

def _add_message_to_feed(self, message: dict):
obj = self.current_event.add_object(name="intelmq_event")
for object_relation, value in message.items():
try:
obj.add_attribute(object_relation, value=value)
except NewAttributeError:
# This entry isn't listed in the harmonization file, ignoring.
pass

feed_output = self.current_event.to_feed(with_meta=False)
def _generate_feed(self, message: dict = None):
if message:
self._add_message_to_feed(message)

while message := self.cache_pop():
self._add_message_to_feed(message)

with self.current_file.open('w') as f:
feed_output = self.current_event.to_feed(with_meta=False)
with self.current_file.open("w") as f:
json.dump(feed_output, f)

feed_meta_generator(self.output_dir)
self.acknowledge_message()

@staticmethod
def check(parameters):
Expand Down
4 changes: 4 additions & 0 deletions intelmq/lib/bot.py
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,10 @@ def catch_shutdown():
def harmonization(self):
return self._harmonization

@property
def bot_id(self):
return self.__bot_id_full

def __handle_sigterm_signal(self, signum: int, stack: Optional[object]):
"""
Calls when a SIGTERM is received. Stops the bot.
Expand Down
18 changes: 16 additions & 2 deletions intelmq/lib/mixins/cache.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
""" CacheMixin for IntelMQ
"""CacheMixin for IntelMQ
SPDX-FileCopyrightText: 2021 Sebastian Waldbauer
SPDX-License-Identifier: AGPL-3.0-or-later
CacheMixin is used for caching/storing data in redis.
"""

import json
from typing import Any, Optional
import redis
import intelmq.lib.utils as utils
Expand All @@ -31,7 +32,9 @@ def __init__(self, **kwargs):
"socket_timeout": 5,
}

self.__redis = redis.Redis(db=self.redis_cache_db, password=self.redis_cache_password, **kwargs)
self.__redis = redis.Redis(
db=self.redis_cache_db, password=self.redis_cache_password, **kwargs
)
super().__init__()

def cache_exists(self, key: str):
Expand All @@ -51,6 +54,17 @@ def cache_set(self, key: str, value: Any, ttl: Optional[int] = None):
if self.redis_cache_ttl:
self.__redis.expire(key, self.redis_cache_ttl)

def cache_put(self, value: dict) -> int:
# Returns the length of the list after pushing
size = self.__redis.lpush(self.bot_id, json.dumps(value))
return size

def cache_pop(self) -> dict:
data = self.__redis.rpop(self.bot_id)
if data is None:
return None
return json.loads(data)

def cache_flush(self):
"""
Flushes the currently opened database by calling FLUSHDB.
Expand Down
54 changes: 51 additions & 3 deletions intelmq/tests/bots/outputs/misp/test_output_feed.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@
# SPDX-License-Identifier: AGPL-3.0-or-later

# -*- coding: utf-8 -*-
import json
import unittest
import sys
from pathlib import Path
from tempfile import TemporaryDirectory

import intelmq.lib.test as test
Expand Down Expand Up @@ -37,9 +38,9 @@

@test.skip_exotic()
class TestMISPFeedOutputBot(test.BotTestCase, unittest.TestCase):

@classmethod
def set_bot(cls):
cls.use_cache = True
cls.bot_reference = MISPFeedOutputBot
cls.default_input_message = EXAMPLE_EVENT
cls.directory = TemporaryDirectory()
Expand All @@ -51,10 +52,57 @@ def set_bot(cls):
def test_event(self):
self.run_bot()

current_event = open(f"{self.directory.name}/.current").read()
with open(current_event) as f:
objects = json.load(f).get("Event", {}).get("Object", [])
assert len(objects) == 1

def test_accumulating_events(self):
self.input_message = [EXAMPLE_EVENT, EXAMPLE_EVENT]
self.run_bot(iterations=2, parameters={"delay_save_event_count": 3})

current_event = open(f"{self.directory.name}/.current").read()

# First, the feed is empty - not enough events came
with open(current_event) as f:
objects = json.load(f).get("Event", {}).get("Object", [])
assert len(objects) == 0

self.input_message = [EXAMPLE_EVENT]
self.run_bot(parameters={"delay_save_event_count": 3})

# When enough events were collected, save them
with open(current_event) as f:
objects = json.load(f)["Event"]["Object"]
assert len(objects) == 3

self.input_message = [EXAMPLE_EVENT, EXAMPLE_EVENT, EXAMPLE_EVENT]
self.run_bot(iterations=3, parameters={"delay_save_event_count": 3})

# We continue saving to the same file until interval timeout
with open(current_event) as f:
objects = json.load(f)["Event"]["Object"]
assert len(objects) == 6

# Simulating leftovers in the queue when it's time to generate new event
Path(f"{self.directory.name}/.current").unlink()
self.bot.cache_put(EXAMPLE_EVENT)
self.run_bot(parameters={"delay_save_event_count": 3})

new_event = open(f"{self.directory.name}/.current").read()
with open(new_event) as f:
objects = json.load(f)["Event"]["Object"]
assert len(objects) == 1


def tearDown(self):
self.cache.delete(self.bot_id)
super().tearDown()

@classmethod
def tearDownClass(cls):
cls.directory.cleanup()


if __name__ == '__main__': # pragma: no cover
if __name__ == "__main__": # pragma: no cover
unittest.main()

0 comments on commit b40a200

Please sign in to comment.