Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Some Bugfixes #2357

Merged
merged 16 commits into from
May 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/ansible-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ on:
- '.github/**'

jobs:
build:
ansible:
# temporary fix for https://github.com/actions/virtual-environments/issues/3080
runs-on: ubuntu-20.04
runs-on: ubuntu-22.04
name: Run ansible tests

steps:
Expand Down
2 changes: 1 addition & 1 deletion docs/user/bots.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1928,7 +1928,7 @@ RemoveAffix

**Configuration Parameters**

* `remove_prefix`: True - cut from start, False - cut from end
* `remove_prefix`: True - cut from start, False - cut from end. Default: True
* `affix`: example 'www.'
* `field`: example field 'source.fqdn'

Expand Down
2 changes: 1 addition & 1 deletion intelmq/bots/experts/domain_suffix/expert.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class DomainSuffixExpertBot(ExpertBot):

def init(self):
if self.field not in ALLOWED_FIELDS:
raise InvalidArgument('key', got=self.field, expected=ALLOWED_FIELDS)
raise InvalidArgument('field', got=self.field, expected=ALLOWED_FIELDS)
with codecs.open(self.suffix_file, encoding='UTF-8') as file_handle:
self.psl = PublicSuffixList(source=file_handle, only_icann=True)

Expand Down
4 changes: 2 additions & 2 deletions intelmq/bots/experts/jinja/expert.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
#
# SPDX-License-Identifier: AGPL-3.0-or-later

from intelmq.lib.bot import Bot
from intelmq.lib.bot import ExpertBot
from intelmq.lib.exceptions import MissingDependencyError

import pathlib
Expand All @@ -15,7 +15,7 @@
Template = None


class JinjaExpertBot(Bot):
class JinjaExpertBot(ExpertBot):
"""
Modify the message using the Jinja templating engine
Example:
Expand Down
4 changes: 2 additions & 2 deletions intelmq/bots/experts/remove_affix/expert.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@
SPDX-FileCopyrightText: 2021 Marius Karotkis <[email protected]>
SPDX-License-Identifier: AGPL-3.0-or-later
"""
from intelmq.lib.bot import Bot
from intelmq.lib.bot import ExpertBot


class RemoveAffixExpertBot(Bot):
class RemoveAffixExpertBot(ExpertBot):
remove_prefix: bool = True # True - from start, False - from end
affix: str = 'www.'
field: str = 'source.fqdn'
Expand Down
6 changes: 5 additions & 1 deletion intelmq/bots/outputs/amqptopic/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,11 @@ def process(self):

def shutdown(self):
if self._connection:
self._connection.close()
try:
self._connection.close()
except pika.exceptions.ConnectionWrongStateError:
# pika.exceptions.ConnectionWrongStateError: BlockingConnection.close(200, 'Normal shutdown') called on closed connection.
pass


BOT = AMQPTopicOutputBot
39 changes: 21 additions & 18 deletions intelmq/lib/bot.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,17 @@
import warnings
from collections import defaultdict
from datetime import datetime, timedelta
from typing import Any, List, Optional, Union
from typing import Any, List, Optional, Union, Tuple

import intelmq.lib.message as libmessage
from intelmq import (DEFAULT_LOGGING_PATH,
DEFAULT_LOGGING_LEVEL,
HARMONIZATION_CONF_FILE,
RUNTIME_CONF_FILE, __version__)
from intelmq.lib import cache, exceptions, utils
from intelmq.lib.pipeline import PipelineFactory, Pipeline
from intelmq.lib.utils import RewindableFileHandle, base64_decode
from intelmq.lib.datatypes import *
from intelmq.lib.datatypes import BotType

__all__ = ['Bot', 'CollectorBot', 'ParserBot', 'OutputBot', 'ExpertBot']
ALLOWED_SYSTEM_PARAMETERS = {'enabled', 'run_mode', 'group', 'description', 'module', 'name'}
Expand All @@ -48,9 +49,14 @@
class Bot:
""" Not to be reset when initialized again on reload. """
__current_message: Optional[libmessage.Message] = None
__message_counter: dict = {"since": None}
__message_counter_delay: timedelta = timedelta(seconds=2)
__stats_cache: cache.Cache = None
__source_pipeline = None
__destination_pipeline = None
__log_buffer: List[tuple] = []

logger = None
# Bot is capable of SIGHUP delaying
_sighup_delay: bool = True
# From the runtime configuration
Expand Down Expand Up @@ -85,8 +91,8 @@ class Bot:
log_processed_messages_count: int = 500
log_processed_messages_seconds: int = 900
logging_handler: str = "file"
logging_level: str = "INFO"
logging_path: str = "/opt/intelmq/var/log/"
logging_level: str = DEFAULT_LOGGING_LEVEL
logging_path: str = DEFAULT_LOGGING_PATH
logging_syslog: str = "/dev/log"
process_manager: str = "intelmq"
rate_limit: int = 0
Expand Down Expand Up @@ -226,7 +232,7 @@ def handle_sighup_signal_threading(signum: int,

@atexit.register
def catch_shutdown():
self.stop()
self.stop(exitcode=0)
except Exception as exc:
if self.error_log_exception:
self.logger.exception('Bot initialization failed.')
Expand Down Expand Up @@ -562,7 +568,7 @@ def __print_log_buffer(self):
print(level.upper(), '-', message)
self.__log_buffer = []

def __check_bot_id(self, name: str):
def __check_bot_id(self, name: str) -> Tuple[str, str, str]:
res = re.fullmatch(r'([0-9a-zA-Z\-]+)(\.[0-9]+)?', name)
if res:
if not (res.group(2) and threading.current_thread() == threading.main_thread()):
Expand All @@ -571,6 +577,7 @@ def __check_bot_id(self, name: str):
"Invalid bot id, must match '"
r"[^0-9a-zA-Z\-]+'."))
self.stop()
return False, False, False

def __connect_pipelines(self):
pipeline_args = {key: getattr(self, key) for key in dir(self) if not inspect.ismethod(getattr(self, key)) and (key.startswith('source_pipeline_') or key.startswith('destination_pipeline'))}
Expand Down Expand Up @@ -960,9 +967,8 @@ class ParserBot(Bot):

default_fields: Optional[dict] = {}

def __init__(self, bot_id: str, start: bool = False, sighup_event=None,
disable_multithreading: bool = None):
super().__init__(bot_id, start, sighup_event, disable_multithreading)
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
if self.__class__.__name__ == 'ParserBot':
self.logger.error('ParserBot can\'t be started itself. '
'Possible Misconfiguration.')
Expand Down Expand Up @@ -1256,9 +1262,8 @@ class CollectorBot(Bot):
provider: Optional[str] = None
documentation: Optional[str] = None

def __init__(self, bot_id: str, start: bool = False, sighup_event=None,
disable_multithreading: bool = None):
super().__init__(bot_id, start, sighup_event, disable_multithreading)
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
if self.__class__.__name__ == 'CollectorBot':
self.logger.error('CollectorBot can\'t be started itself. '
'Possible Misconfiguration.')
Expand Down Expand Up @@ -1316,9 +1321,8 @@ class ExpertBot(Bot):
"""
bottype = BotType.EXPERT

def __init__(self, bot_id: str, start: bool = False, sighup_event=None,
disable_multithreading: bool = None):
super().__init__(bot_id, start, sighup_event, disable_multithreading)
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)


class OutputBot(Bot):
Expand All @@ -1327,9 +1331,8 @@ class OutputBot(Bot):
"""
bottype = BotType.OUTPUT

def __init__(self, bot_id: str, start: bool = False, sighup_event=None,
disable_multithreading: bool = None):
super().__init__(bot_id, start, sighup_event, disable_multithreading)
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
if self.__class__.__name__ == 'OutputBot':
self.logger.error('OutputBot can\'t be started itself. '
'Possible Misconfiguration.')
Expand Down
2 changes: 0 additions & 2 deletions intelmq/lib/mixins/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,6 @@ def cache_get(self, key: str):
return retval

def cache_set(self, key: str, value: Any, ttl: Optional[int] = None):
if self.redis_cache_ttl is None:
ttl = self.redis_cache_ttl
if isinstance(value, str):
value = utils.encode(value)
# backward compatibility (Redis v2.2)
Expand Down
17 changes: 11 additions & 6 deletions intelmq/lib/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,16 @@
# SPDX-License-Identifier: AGPL-3.0-or-later

# -*- coding: utf-8 -*-

"""
Algorithm
---------
[Receive] B RPOP LPUSH source_queue -> internal_queue
[Send] LPUSH message -> destination_queue
[Acknowledge] RPOP message <- internal_queue
"""


import time
from itertools import chain
from typing import Dict, Optional
Expand Down Expand Up @@ -326,18 +336,13 @@ def _reject_message(self):
Rejecting is a no-op as the message is in the internal queue anyway.
"""

# Algorithm
# ---------
# [Receive] B RPOP LPUSH source_queue -> internal_queue
# [Send] LPUSH message -> destination_queue
# [Acknowledge] RPOP message <- internal_queue


class Pythonlist(Pipeline):
"""
This pipeline uses simple lists and is only for testing purpose.

It behaves in most ways like a normal pipeline would do,
including all encoding and decoding steps,
but works entirely without external modules and programs.
Data is saved as it comes (no conversion) and it is not blocking.
"""
Expand Down
23 changes: 20 additions & 3 deletions intelmq/lib/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import unittest
import unittest.mock as mock
from itertools import chain
from sys import version_info

import pkg_resources
import redis
Expand Down Expand Up @@ -236,6 +237,13 @@ def prepare_bot(self, parameters={}, destination_queues=None, prepare_source_que
new=self.mocked_config):
with mock.patch('intelmq.lib.utils.log', self.get_mocked_logger(self.logger)):
with mock.patch('intelmq.lib.utils.get_global_settings', mocked_get_global_settings):
"""
Since Bot.__del__ method calls Bot.stop, log messages of the previous bot run like:
"Processed/Forwarded X messages since last logging."
"Bot stopped"
appear in the log_stream at this point. So we clean the log before calling the bot, so that the tests in run_bot succeed.
"""
self.log_stream.truncate(0)
self.bot = self.bot_reference(self.bot_id)
self.bot._Bot__stats_cache = None

Expand Down Expand Up @@ -355,8 +363,16 @@ def run_bot(self, iterations: int = 1, error_on_pipeline: bool = False,
self.assertIn('raw', event)

""" Test if bot log messages are correctly formatted. """
self.assertLoglineMatches(0, BOT_INIT_REGEX.format(self.bot_name,
self.bot_id), "INFO")
try:
self.assertLoglineMatches(0, BOT_INIT_REGEX.format(self.bot_name,
self.bot_id), "INFO")
except AssertionError as exc:
# In some obscure but rate instances the logging of the previous bot run can end up in the logging of the next run, resulting in line 0 being:
# "Processed/Forwarded 1 messages since last logging." (written at shutdown of that previous bot run)
if 'since last logging' in exc.args[0]:
pass
else:
raise
self.assertRegexpMatchesLog("INFO - Bot is starting.")
if stop_bot:
self.assertLoglineEqual(-1, "Bot stopped.", "INFO")
Expand Down Expand Up @@ -488,7 +504,8 @@ def assertLoglineMatches(self, line_no: int, pattern: str, levelname: str = "ERR

self.assertIsNotNone(self.loglines)
logline = self.loglines[line_no]
fields = utils.parse_logline(logline)
# in some cases, the log_stream may end with a bunch of \x00 Nullbytes, maybe a side-effect of truncating the log?
fields = utils.parse_logline(logline.strip('\x00'))

self.assertEqual(self.bot_id, fields["bot_id"],
"bot_id {!r} didn't match {!r}."
Expand Down
7 changes: 6 additions & 1 deletion intelmq/lib/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,12 @@ def emit(self, record):
stream = sys.stderr
stream.write(red(msg))
stream.write(self.terminator)
self.flush()
try:
self.flush()
except ValueError:
# I/O operation on closed file.
# stdout/stderr is already close (during shutdown), there's nothing we can do about it
pass
except Exception:
self.handleError(record)

Expand Down