Skip to content

Commit

Permalink
ENH: Added new expert plugin - extra_tags
Browse files Browse the repository at this point in the history
* added a new plugin that allows manipulation of extra_tags. Main use case
  is to allow usage of same pipeline for both dev and prod setups but be able
  to differentiate between messages and select a different path for your
  dev stage (ex: route dev messages to local directory for examination rather
  then production database)
  • Loading branch information
Manuel Subredu committed Mar 7, 2024
1 parent b257f04 commit dee6d63
Show file tree
Hide file tree
Showing 4 changed files with 125 additions and 0 deletions.
Empty file.
39 changes: 39 additions & 0 deletions intelmq/bots/experts/extra_tags/expert.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# SPDX-FileCopyrightText: 2024 Manuel Subredu
#
# SPDX-License-Identifier: AGPL-3.0-or-later

# -*- coding: utf-8 -*-
from intelmq.lib.bot import ExpertBot


class ExtraTagsExpertBot(ExpertBot):
"""Add custom extra tags as needed"""
overwrite: bool = False
tags: dict = None

def init(self):
self.logger.debug("Found {} tags in config section.".format(len(self.tags.keys())))

def process(self):
event = self.receive_message()

existing_tags = event.get('extra.tags')
if existing_tags is None:
event.add('extra.tags', self.tags)
else:
if self.overwrite is False:
for tag in self.tags.keys():
if tag in existing_tags:
self.logger.info("Tag {} already exists. Skipping.".format(tag))
else:
existing_tags.update({tag: self.tags[tag]})
else:
existing_tags.update(self.tags)

event.update({'extra.tags': existing_tags})
self.logger.debug('extra.tags is now: {}.'.format(existing_tags))

self.send_message(event)
self.acknowledge_message()

BOT = ExtraTagsExpertBot
Empty file.
86 changes: 86 additions & 0 deletions intelmq/tests/bots/experts/extra_tags/test_expert.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
# SPDX-FileCopyrightText: 2024 Manuel Subredu
#
# SPDX-License-Identifier: AGPL-3.0-or-later

# -*- coding: utf-8 -*-
import unittest

import intelmq.lib.test as test
from intelmq.bots.experts.extra_tags.expert import ExtraTagsExpertBot

MOCK_01_INPUT = {
"__type": "Event",
"time.observation": "2015-01-01T00:00:00+00:00",
}
MOCK_01_OUTPUT = {
"__type": "Event",
"time.observation": "2015-01-01T00:00:00+00:00",
"extra.tags": {
'is_dev': True
}
}

MOCK_02_INPUT = {
"__type": "Event",
"time.observation": "2015-01-01T00:00:00+00:00",
"extra.tags": {
'constituent': 'univ_01'
}
}
MOCK_02_OUTPUT = {
"__type": "Event",
"time.observation": "2015-01-01T00:00:00+00:00",
"extra.tags": {
'is_dev': True,
'constituent': 'univ_01'
}
}

MOCK_03_INPUT = {
"__type": "Event",
"time.observation": "2015-01-01T00:00:00+00:00",
"extra.tags": {
'constituent': 'univ_01',
'is_dev': False
}
}
MOCK_03_OUTPUT = {
"__type": "Event",
"time.observation": "2015-01-01T00:00:00+00:00",
"extra.tags": {
'is_dev': False,
'constituent': 'univ_01'
}
}

#@test.skip_exotic()
class TestExtraTagsExpertBot(test.BotTestCase, unittest.TestCase):
"""
A TestCase for GeohashExpertBot.
"""

@classmethod
def set_bot(cls):
cls.bot_reference = ExtraTagsExpertBot
cls.sysconfig = {'overwrite': True, 'tags': {'is_dev': True}}

def test_no_existing_tag(self):
self.input_message = MOCK_01_INPUT
self.run_bot()
self.assertMessageEqual(0, MOCK_01_OUTPUT)

def test_add_to_existing_tags(self):
self.input_message = MOCK_02_INPUT
self.run_bot()
self.assertMessageEqual(0, MOCK_02_OUTPUT)

def test_skip_existing_tag(self):
self.sysconfig['overwrite'] = False

self.input_message = MOCK_03_INPUT
self.run_bot()
self.assertMessageEqual(0, MOCK_03_OUTPUT)


if __name__ == '__main__': # pragma: no cover
unittest.main()

0 comments on commit dee6d63

Please sign in to comment.