diff --git a/.github/workflows/integration-tests.yml b/.github/workflows/integration-tests.yml index 538ab87ede..35a19d8169 100644 --- a/.github/workflows/integration-tests.yml +++ b/.github/workflows/integration-tests.yml @@ -53,11 +53,18 @@ jobs: run: | python3 -m pytest tests/${{ matrix.test_file }} -p no:warnings -vv -s -n 3 + - name: Build Artifact Name + # otherwise we get numeric names for the artifacts and we dont know which is which + id: artifact-name + run: | + sanitized_test_file="${{ matrix.test_file }}" + sanitized_test_file=$(printf '%s\n' "$sanitized_test_file" | tr '/' '_') + echo "name=${sanitized_test_file}-integration-output" >> "$GITHUB_OUTPUT" + - name: Upload Artifacts if: always() uses: actions/upload-artifact@v6 with: - # Replaces slashes with underscores for valid artifact naming - name: ${{ github.run_id }}-${{ strategy.job-index }}-integration-output + name: ${{ steps.artifact-name.outputs.name }} path: | - output/integration + output/integration_tests diff --git a/config/TI_feeds.csv b/config/TI_feeds.csv index 8a5dfc70a1..d8e44b65d3 100644 --- a/config/TI_feeds.csv +++ b/config/TI_feeds.csv @@ -11,7 +11,7 @@ https://mcfp.felk.cvut.cz/publicDatasets/CTU-AIPP-BlackList/Todays-Blacklists/AI https://mcfp.felk.cvut.cz/publicDatasets/CTU-AIPP-BlackList/Todays-Blacklists/AIP_historical_blacklist_prioritized_by_newest_attackers.csv,medium, ['phishing','honeypot'] https://raw.githubusercontent.com/stratosphereips/Civilsphere/main/threatintel/strangereallintel-cyberthreatintel.csv,medium, ['phishing'] https://raw.githubusercontent.com/AssoEchap/stalkerware-indicators/master/generated/network.csv,medium, ['stalkerware'] -https://raw.githubusercontent.com/stratosphereips/Civilsphere/main/threatintel/adserversandtrackers.csv,medium, ['adtrackers'] +https://raw.githubusercontent.com/stratosphereips/Civilsphere/main/threatintel/adserversandtrackers.csv,info, ['adtrackers'] https://raw.githubusercontent.com/stratosphereips/Civilsphere/main/threatintel/civilsphereindicators.csv,medium, ['apt'] https://raw.githubusercontent.com/botherder/targetedthreats/master/targetedthreats.csv,medium, ['apt'] https://osint.digitalside.it/Threat-Intel/lists/latestdomains.txt,medium, ['honeypot'] @@ -26,10 +26,10 @@ https://lists.blocklist.de/lists/mail.txt,medium, ['honeypot'] https://lists.blocklist.de/lists/bruteforcelogin.txt,medium, ['honeypot'] https://feodotracker.abuse.ch/downloads/ipblocklist.csv,medium, ['honeypot'] https://reputation.alienvault.com/reputation.generic,medium, ['honeypot'] -https://raw.githubusercontent.com/anudeepND/blacklist/master/adservers.txt,medium, ['adtrackers'] +https://raw.githubusercontent.com/anudeepND/blacklist/master/adservers.txt,info, ['adtrackers'] # bigdargon: Hosts block ads of Vietnamese -https://raw.githubusercontent.com/bigdargon/hostsVN/master/option/domain.txt,medium, ['adtrackers'] -https://raw.githubusercontent.com/SweetSophia/mifitxiaomipiholelist/master/mifitblocklist.txt,medium, ['xiaomi-trackers'] +https://raw.githubusercontent.com/bigdargon/hostsVN/master/option/domain.txt,info, ['adtrackers'] +https://raw.githubusercontent.com/SweetSophia/mifitxiaomipiholelist/master/mifitblocklist.txt,info, ['xiaomi-trackers'] https://raw.githubusercontent.com/CriticalPathSecurity/Zeek-Intelligence-Feeds/master/abuse-ch-ipblocklist.intel,medium, ['honeypot'] https://raw.githubusercontent.com/CriticalPathSecurity/Zeek-Intelligence-Feeds/master/alienvault.intel,medium, ['honeypot'] https://raw.githubusercontent.com/CriticalPathSecurity/Zeek-Intelligence-Feeds/master/cobaltstrike_ips.intel,medium, ['honeypot'] diff --git a/dataset/test1-normal.nfdump b/dataset/test1-malicious.nfdump similarity index 100% rename from dataset/test1-normal.nfdump rename to dataset/test1-malicious.nfdump diff --git a/docs/fides.md b/docs/fides.md index 282972eb94..b752165b31 100644 --- a/docs/fides.md +++ b/docs/fides.md @@ -25,7 +25,11 @@ To be able to use the fides module, you should use ```--cap-add=NET_ADMIN``` If you plan on using the Fides Module, please be aware that it is used only if Slips is running on an interface OR on a growing Zeek directory. The `--use_fides=True` is ignored when Slips is run on a file. ## Configuration -The evaluation model used, the evaluation thresholds, and other configurations are located in ```fides.conf.yml``` file +The evaluation model used, the evaluation thresholds, and other configurations are located in ```modules/fides/config/fides.conf.yml```. + +If you need a Slips run to use a different Fides configuration file, set +```global_p2p.fides_conf``` in Slips config to the relative path +of that alternate YAML file. **Possible threat intelligence evaluation models** diff --git a/modules/arp_poisoner/arp_poisoner.py b/modules/arp_poisoner/arp_poisoner.py index dc191b6536..44124af08f 100644 --- a/modules/arp_poisoner/arp_poisoner.py +++ b/modules/arp_poisoner/arp_poisoner.py @@ -1,6 +1,7 @@ # SPDX-FileCopyrightText: 2021 Sebastian Garcia # SPDX-License-Identifier: GPL-2.0-only import logging +import shutil import subprocess import time from threading import Lock @@ -30,6 +31,8 @@ class ARPPoisoner(IModule): authors = ["Alya Gomaa"] def init(self): + self.arp_scan_path = shutil.which("arp-scan") + self.arp_scan_bin_available = self.arp_scan_path is not None self._time_since_last_repoison = {} self._time_since_last_internet_cut = {} self.log_file_path = self.get_module_specific_output_path( @@ -56,6 +59,9 @@ def init(self): self.ip_interface_map = {} def subscribe_to_channels(self): + if not self.arp_scan_bin_available: + self.channels = {} + return self.c1 = self.db.subscribe("new_blocking") self.c2 = self.db.subscribe("tw_closed") self.channels = { @@ -63,6 +69,22 @@ def subscribe_to_channels(self): "tw_closed": self.c2, } + def pre_main(self) -> bool: + """ + Stop the module before entering the main loop when arp-scan is + unavailable. + + :return: True when the module should shut down, otherwise False. + """ + if self.arp_scan_bin_available: + return False + + self.print( + "The arp-scan tool is not installed. ARP poisoner module is " + "stopping.", + ) + return True + def log(self, text): """Logs the given text to the blocking log file""" with self.blocking_logfile_lock: diff --git a/modules/fides/fides.py b/modules/fides/fides.py index 80867f7efd..2c582269da 100644 --- a/modules/fides/fides.py +++ b/modules/fides/fides.py @@ -53,8 +53,11 @@ def init(self): # load trust model configuration current_dir = Path(__file__).resolve().parent - config_path = current_dir / "config" / "fides.conf.yml" - self.__trust_model_config = load_configuration(config_path.__str__()) + default_config_path = current_dir / "config" / "fides.conf.yml" + config_path = self.conf.read_configuration( + "global_p2p", "fides_conf", str(default_config_path) + ) + self.__trust_model_config = load_configuration(config_path) # prepare variables for global protocols self.__bridge: NetworkBridge diff --git a/modules/fides/messaging/message_handler.py b/modules/fides/messaging/message_handler.py index 24a2f7fd87..1b6c917c8c 100644 --- a/modules/fides/messaging/message_handler.py +++ b/modules/fides/messaging/message_handler.py @@ -1,6 +1,6 @@ from typing import Dict, List, Callable, Optional, Union - +from slips_files.common.slips_utils import utils from ..messaging.dacite import from_dict from ..messaging.model import ( @@ -28,7 +28,7 @@ class MessageHandler: # def print(self, *args, **kwargs): # return self.printer.print(*args, **kwargs) - version = 1 + version = utils.get_current_version() def __init__( self, diff --git a/modules/fides/messaging/model.py b/modules/fides/messaging/model.py index f8131caab4..0ebf25ef06 100644 --- a/modules/fides/messaging/model.py +++ b/modules/fides/messaging/model.py @@ -13,7 +13,7 @@ @dataclass class NetworkMessage: type: str - version: int + version: str data: Any diff --git a/modules/fides/messaging/network_bridge.py b/modules/fides/messaging/network_bridge.py index e6495271da..c5b8c2a001 100644 --- a/modules/fides/messaging/network_bridge.py +++ b/modules/fides/messaging/network_bridge.py @@ -2,6 +2,7 @@ from dataclasses import asdict from typing import Dict, List +from slips_files.common.slips_utils import utils from .dacite import from_dict from .message_handler import MessageHandler @@ -24,7 +25,7 @@ class NetworkBridge: execute "listen" method. """ - version = 1 + version = utils.get_current_version() def __init__(self, queue: Queue): self.__queue = queue @@ -36,14 +37,15 @@ def listen(self, handler: MessageHandler, block: bool = False): """ def message_received(message: str): + """this is the callback that executes every new msg""" try: - # with open("fides_nb.txt", "a") as f: - # f.write(message) + logger.debug("New message received! Trying to parse.") parsed = json.loads(message) network_message = from_dict( data_class=NetworkMessage, data=parsed ) + logger.debug("Message parsed. Executing handler.") handler.on_message(network_message) except Exception as e: diff --git a/modules/fides/model/peer_trust_data.py b/modules/fides/model/peer_trust_data.py index b78a27c687..4ffc257953 100644 --- a/modules/fides/model/peer_trust_data.py +++ b/modules/fides/model/peer_trust_data.py @@ -3,8 +3,11 @@ from ..model.aliases import PeerId, OrganisationId from ..model.peer import PeerInfo -from ..model.recommendation_history import RecommendationHistory -from ..model.service_history import ServiceHistory +from ..model.recommendation_history import ( + RecommendationHistory, + RecommendationHistoryRecord, +) +from ..model.service_history import ServiceHistory, ServiceHistoryRecord @dataclass @@ -121,6 +124,7 @@ def to_dict(self, remove_histories: bool = False): # Method to create an object from a dictionary @classmethod def from_dict(cls, data): + """Create a PeerTrustData instance from a dictionary payload.""" return cls( info=PeerInfo.from_dict( data["info"] @@ -135,14 +139,13 @@ def from_dict(cls, data): "initial_reputation_provided_by_count" ], service_history=[ - ServiceHistory.from_dict(sh) for sh in data["service_history"] + ServiceHistoryRecord.from_dict(sh) + for sh in data["service_history"] ], - # Assuming ServiceHistory has from_dict recommendation_history=[ - RecommendationHistory.from_dict(rh) + RecommendationHistoryRecord.from_dict(rh) for rh in data["recommendation_history"] ], - # Assuming RecommendationHistory has from_dict ) diff --git a/modules/fides/persistence/fides_sqlite_db.py b/modules/fides/persistence/fides_sqlite_db.py index d4a222775c..48c86337ef 100644 --- a/modules/fides/persistence/fides_sqlite_db.py +++ b/modules/fides/persistence/fides_sqlite_db.py @@ -6,6 +6,7 @@ import os import sqlite3 +from pathlib import Path from typing import List, Any, Optional from slips_files.core.output import Output @@ -31,13 +32,23 @@ def __init__(self, logger: Output, db_path: str) -> None: """ self.logger = logger self.db_path = db_path - with open(self.db_path, "a") as f: - f.close() - sqlite3.connect(self.db_path).close() + if not self.__is_in_memory_database(): + Path(self.db_path).parent.mkdir(parents=True, exist_ok=True) + with open(self.db_path, "a") as f: + f.close() + sqlite3.connect(self.db_path).close() self.connection: Optional[sqlite3.Connection] = None self.__connect() self.__create_tables() + def __is_in_memory_database(self) -> bool: + """ + Determines whether the configured database path targets SQLite memory storage. + + :return: True when the database should live only in memory, otherwise False. + """ + return self.db_path == ":memory:" + def __slips_log(self, txt: str) -> None: self.logger.output_line_to_cli_and_logfiles( {"verbose": 2, "debug": 0, "from": self.name, "txt": txt} diff --git a/modules/fides/persistence/threat_intelligence_db.py b/modules/fides/persistence/threat_intelligence_db.py index 1317c433db..2b3d868a67 100644 --- a/modules/fides/persistence/threat_intelligence_db.py +++ b/modules/fides/persistence/threat_intelligence_db.py @@ -33,7 +33,12 @@ def get_for(self, target: Target) -> Optional[SlipsThreatIntelligence]: out = self.db.get_fides_ti(target) # returns str containing dumped # dict of STI or None if out: - out = SlipsThreatIntelligence(**json.loads(out)) + try: + out = SlipsThreatIntelligence(**json.loads(out)) + except (TypeError, ValueError, json.JSONDecodeError): + out = self.sqldb.get_slips_threat_intelligence_by_target( + target + ) else: out = self.sqldb.get_slips_threat_intelligence_by_target(target) return out diff --git a/modules/fides/persistence/trust_db.py b/modules/fides/persistence/trust_db.py index 189b9257fd..64f8350b51 100644 --- a/modules/fides/persistence/trust_db.py +++ b/modules/fides/persistence/trust_db.py @@ -120,7 +120,7 @@ def get_peer_trust_data( td_json = self.db.get_peer_trust_data(peer_id) if td_json: # Redis has available data - out = PeerTrustData(**json.loads(td_json)) + out = PeerTrustData.from_dict(json.loads(td_json)) else: # if redis is empty, try SQLite out = self.sqldb.get_peer_trust_data(peer_id) return out @@ -130,7 +130,6 @@ def get_peers_trust_data( ) -> TrustMatrix: """Return trust data for each peer from peer_ids.""" out = {} - peer_id = None for peer in peer_ids: # get PeerID to properly create TrustMatrix @@ -138,9 +137,13 @@ def get_peers_trust_data( peer_id = peer elif isinstance(peer, PeerInfo): peer_id = peer.id + else: + continue # TrustMatrix = Dict[PeerId, PeerTrustData]; here - peer_id: PeerId - out[peer_id] = self.get_peer_trust_data(peer_id) + trust_data = self.get_peer_trust_data(peer_id) + if trust_data is not None: + out[peer_id] = trust_data return out def cache_network_opinion(self, ti: SlipsThreatIntelligence): diff --git a/modules/fides/protocols/threat_intelligence.py b/modules/fides/protocols/threat_intelligence.py index cd1e2a2bfb..48129b745a 100644 --- a/modules/fides/protocols/threat_intelligence.py +++ b/modules/fides/protocols/threat_intelligence.py @@ -81,9 +81,17 @@ def handle_intelligence_response( self, responses: List[PeerIntelligenceResponse] ): """Handles intelligence responses.""" - trust_matrix = self._trust_db.get_peers_trust_data( - [r.sender.id for r in responses] - ) + trust_matrix = {} + for response in responses: + peer_trust = self._trust_db.get_peer_trust_data(response.sender.id) + if peer_trust is None: + peer_trust = ( + self.__trust_protocol.determine_and_store_initial_trust( + response.sender + ) + ) + trust_matrix[response.sender.id] = peer_trust + assert len(trust_matrix) == len( responses ), "We need to have trust data for all peers that sent the response." diff --git a/modules/fides/utils/logger.py b/modules/fides/utils/logger.py index 5997b3b679..1fcce47968 100644 --- a/modules/fides/utils/logger.py +++ b/modules/fides/utils/logger.py @@ -1,5 +1,4 @@ import json -import threading from dataclasses import is_dataclass, asdict from typing import Optional, List, Callable @@ -15,7 +14,7 @@ ] # Set this to custom callback that should be executed when there's new log message. -# First parameter is level ('DEBUG', 'INFO', 'WARN', 'ERROR'), second is message to be logged. +# First parameter is message, second is level ('DEBUG', 'INFO', 'WARN', 'ERROR') class Logger: @@ -54,23 +53,30 @@ def __try_to_guess_name() -> str: return name def debug(self, message: str, params=None): - return self.__print("DEBUG", message) + return self.__print("DEBUG", message, params) def info(self, message: str, params=None): - return self.__print("INFO", message) + return self.__print("INFO", message, params) + def warning(self, message: str, params=None): + return self.__print("WARN", message, params) + + # keep for backward compatibility def warn(self, message: str, params=None): - return self.__print("WARN", message) + return self.warning(message, params) def error(self, message: str, params=None): - return self.__print("ERROR", message) + return self.__print("ERROR", message, params) def __format(self, message: str, params=None): - thread = threading.get_ident() - formatted_message = f"T{thread}: {self.__name} - {message}" + formatted_message = f"{self.__name} - {message}" if params: params = asdict(params) if is_dataclass(params) else params - formatted_message = f"{formatted_message} {json.dumps(params)}" + try: + serialized_params = json.dumps(params) + except TypeError: + serialized_params = json.dumps(str(params)) + formatted_message = f"{formatted_message} {serialized_params}" return formatted_message def __print(self, level: str, message: str, params=None): @@ -81,6 +87,4 @@ def __print(self, level: str, message: str, params=None): formatted_message, verbose=0 ) # automatically verbose = 1 - print, debug = 0 - do not print else: - print_callback( - formatted_message, verbose=self.log_levels[level] - ) + print_callback(formatted_message, debug=1) diff --git a/modules/flow_alerts/conn.py b/modules/flow_alerts/conn.py index 3a85f496b8..f4d9590b9e 100644 --- a/modules/flow_alerts/conn.py +++ b/modules/flow_alerts/conn.py @@ -369,6 +369,46 @@ def detect_data_upload_in_twid(self, profileid, twid): ip, mbs_uploaded, profileid, twid, uids, ts ) + def _parse_closed_tw_message(self, msg: dict) -> Tuple[str, str] | None: + """ + Extract the profileid and twid from a tw_closed pub/sub message. + + Parameters: + msg: Redis pub/sub message for the tw_closed channel. + + Return: + Tuple[str, str] | None: Normalized (profileid, twid) or None if the + payload is invalid. + """ + payload = utils.get_msg_payload(msg) + + if isinstance(payload, dict): + payload = payload.get("text") + + if isinstance(payload, str): + payload = payload.strip() + + # Some callers may accidentally pass the serialized pub/sub + # envelope instead of its text payload. + if payload.startswith("{"): + with contextlib.suppress( + json.decoder.JSONDecodeError, TypeError + ): + decoded_payload = json.loads(payload) + if isinstance(decoded_payload, dict): + payload = decoded_payload.get("text", payload) + + if not isinstance(payload, str): + return None + + profileid, _, twid = payload.rpartition("_") + if not profileid.startswith("profile_") or not twid.startswith( + "timewindow" + ): + return None + + return profileid, twid + @staticmethod def _is_it_ok_for_ip_to_change(ip) -> bool: """Devices send flow as/to these ips all the time, the're not @@ -840,8 +880,12 @@ async def analyze(self, msg): self.check_device_changing_ips(twid, flow) elif utils.is_msg_intended_for(msg, "tw_closed"): - profileid_tw: List[str] = utils.get_msg_payload(msg).split("_") - profileid = f"{profileid_tw[0]}_{profileid_tw[1]}" - twid = profileid_tw[-1] + closed_tw = self._parse_closed_tw_message(msg) + if not closed_tw: + return + + profileid, twid = closed_tw self.detect_data_upload_in_twid(profileid, twid) - self.cleanup_conn_to_multiple_ports_tracker(profileid_tw) + self.cleanup_conn_to_multiple_ports_tracker( + [*profileid.split("_"), twid] + ) diff --git a/modules/flow_alerts/set_evidence.py b/modules/flow_alerts/set_evidence.py index 6576ec219f..aceadfc349 100644 --- a/modules/flow_alerts/set_evidence.py +++ b/modules/flow_alerts/set_evidence.py @@ -497,7 +497,8 @@ def conn_without_dns(self, twid, flow) -> None: confidence = 0.1 description: str = ( - f"A connection without DNS resolution to IP: " f"{flow.daddr}" + f"A connection without DNS resolution to Destination IP: " + f"{flow.daddr}" ) twid_number: int = int(twid.replace("timewindow", "")) diff --git a/slips/main.py b/slips/main.py index 90d35b0b60..de0697d597 100644 --- a/slips/main.py +++ b/slips/main.py @@ -689,7 +689,7 @@ def sig_handler(sig, frame): self.print("SIGTERM received, shutting down slips.") self.print( "SIGTERM received, likely due to " - "out of memory errors. Slips is stopping " + "OOM kill. Slips is stopping " "without completing the analysis.", 0, 1, diff --git a/slips_files/common/parsers/arg_parser.py b/slips_files/common/parsers/arg_parser.py index 357d2ee1cb..e3043238ad 100644 --- a/slips_files/common/parsers/arg_parser.py +++ b/slips_files/common/parsers/arg_parser.py @@ -254,7 +254,9 @@ def parse_arguments(self): "--multiinstance", action="store_true", required=False, - help="Run multiple instances of slips, don't overwrite the old one", + help="Run multiple instances of slips, don't overwrite the old " + "one. This option run redis using a random unused port " + "instead of the default one (6379).", ) self.add_argument( "-P", diff --git a/slips_files/common/slips_utils.py b/slips_files/common/slips_utils.py index c629dbb819..291cb0b965 100644 --- a/slips_files/common/slips_utils.py +++ b/slips_files/common/slips_utils.py @@ -133,6 +133,20 @@ def threat_level_to_string(self, threat_level: float) -> str: if threat_level <= int_value: return str_lvl + @staticmethod + def evidence_confidence_to_string(score: float) -> str: + """ + Convert an evidence confidence score to a string label. + + :param score: Evidence confidence score between 0 and 1. + :return: "high", "medium", or "low" based on the score. + """ + if score >= 0.80: + return "High" + if score >= 0.55: + return "Medium" + return "low" + @staticmethod def log10(n: int) -> int: if n <= 0: @@ -872,6 +886,9 @@ def get_ip_identification_as_str(self, ip_identification: dict) -> str: ip_identification.pop("DNS_resolution") for key, piece_of_info in ip_identification.items(): + if key == "timestamp": + continue + if not piece_of_info: continue diff --git a/slips_files/core/evidence_handler_worker.py b/slips_files/core/evidence_handler_worker.py index 83b6daf52e..cf5cd51e7a 100644 --- a/slips_files/core/evidence_handler_worker.py +++ b/slips_files/core/evidence_handler_worker.py @@ -125,6 +125,9 @@ def add_evidence_to_json_log_file( "uids": evidence.uid, "accumulated_threat_level": accumulated_threat_level, "threat_level": str(evidence.threat_level), + "confidence": utils.evidence_confidence_to_string( + evidence.confidence + ), "timewindow": evidence.timewindow.number, } ) diff --git a/slips_files/core/text_formatters/evidence_formatter.py b/slips_files/core/text_formatters/evidence_formatter.py index bdbe7e5d3c..b22d32a273 100644 --- a/slips_files/core/text_formatters/evidence_formatter.py +++ b/slips_files/core/text_formatters/evidence_formatter.py @@ -132,7 +132,7 @@ def add_threat_level_to_evidence_description( self, evidence: Evidence ) -> Evidence: evidence.description += ( - f" threat level: " f"{evidence.threat_level.name.lower()}." + f" Threat level: " f"{evidence.threat_level.name.lower()}." ) return evidence diff --git a/tests/common_test_utils.py b/tests/common_test_utils.py index 2fc9176734..5fac20966d 100644 --- a/tests/common_test_utils.py +++ b/tests/common_test_utils.py @@ -1,14 +1,19 @@ # SPDX-FileCopyrightText: 2021 Sebastian Garcia # SPDX-License-Identifier: GPL-2.0-only import sqlite3 +from contextlib import contextmanager from importlib.util import find_spec from pathlib import Path +import fcntl import os import shutil import binascii import subprocess import base64 import sys +import time +import socket +import threading from typing import ( Dict, Optional, @@ -16,11 +21,22 @@ from pathlib import PosixPath from unittest.mock import Mock import yaml +import redis IS_IN_A_DOCKER_CONTAINER = os.environ.get("IS_IN_A_DOCKER_CONTAINER", False) integration_tests_dir = "output/integration_tests/" alerts_file = "alerts.log" +INTEGRATION_TEST_PORT_START = 65000 +INTEGRATION_TEST_PORT_END = 65535 +_port_lock = threading.Lock() +_next_integration_test_port = INTEGRATION_TEST_PORT_START +_integration_test_port_counter_file = ( + Path(integration_tests_dir) / ".next-port" +) +_integration_test_port_lock_file = ( + Path(integration_tests_dir) / ".port-allocator.lock" +) # create the integration tests dir if not os.path.exists(integration_tests_dir): @@ -59,10 +75,14 @@ def modify_yaml_config( if changes: for key, value in changes.items(): - key: str - value: dict - if key in config: + if ( + key in config + and isinstance(config[key], dict) + and isinstance(value, dict) + ): config[key].update(value) + else: + config[key] = value with output_file.open("w", encoding="utf-8") as f: yaml.dump(config, f, default_flow_style=False, allow_unicode=True) @@ -96,6 +116,187 @@ def run_slips(cmd): return return_code +@contextmanager +def integration_test_port_file_lock(): + """ + Lock the shared integration-test port allocator across processes. + + :return: Yields while the allocator lock is held + """ + _integration_test_port_lock_file.touch(exist_ok=True) + with _integration_test_port_lock_file.open( + "r", encoding="utf-8" + ) as lock_file: + fcntl.flock(lock_file.fileno(), fcntl.LOCK_EX) + try: + yield + finally: + fcntl.flock(lock_file.fileno(), fcntl.LOCK_UN) + + +def get_next_integration_test_port_candidate() -> int: + """ + Read the next shared integration-test port candidate. + + :return: Next candidate port from the shared counter + """ + if not _integration_test_port_counter_file.exists(): + return max(_next_integration_test_port, INTEGRATION_TEST_PORT_START) + + counter_value = _integration_test_port_counter_file.read_text( + encoding="utf-8" + ).strip() + return int(counter_value or INTEGRATION_TEST_PORT_START) + + +def set_next_integration_test_port_candidate(next_port: int) -> None: + """ + Persist the next shared integration-test port candidate. + + :param next_port: Next port to hand out + :return: None + """ + global _next_integration_test_port + + _next_integration_test_port = next_port + _integration_test_port_counter_file.write_text( + str(next_port), encoding="utf-8" + ) + + +def is_integration_test_port_available(port: int) -> bool: + """ + Check whether a TCP port is currently available on localhost. + + :param port: TCP port to probe + :return: True when the port can be bound, otherwise False + """ + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: + sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + try: + sock.bind(("127.0.0.1", port)) + except OSError: + return False + + return True + + +def find_available_integration_test_port(start_port: int) -> int: + """ + Find the next available integration-test port in the configured range. + + :param start_port: Port number to start scanning from + :return: First available TCP port in the integration-test range + :raises RuntimeError: When the configured integration-test range is exhausted + """ + for candidate in range(start_port, INTEGRATION_TEST_PORT_END + 1): + if is_integration_test_port_available(candidate): + return candidate + + raise RuntimeError( + "No free integration test ports remain in the 65000-65535 range." + ) + + +def get_available_integration_test_port() -> int: + """ + Return a free TCP port reserved from the integration test range. + + The allocator walks ports from 65000 upwards and verifies each candidate + by binding to it before returning it. Allocation is synchronized across + pytest-xdist workers through a shared lock file and counter file. + + :return: Available TCP port for an integration test + :raises RuntimeError: When the configured integration test port range is exhausted + """ + with _port_lock: + with integration_test_port_file_lock(): + candidate = get_next_integration_test_port_candidate() + port = find_available_integration_test_port(candidate) + set_next_integration_test_port_candidate(port + 1) + return port + + +def allocate_integration_test_port(test_name: str, port_label: str) -> int: + """ + Allocate and announce a free TCP port for an integration test. + + :param test_name: Pytest node id or other human-readable test identifier + :param port_label: Label describing how the port will be used + :return: Allocated TCP port + """ + port = get_available_integration_test_port() + print(f"[integration-test] {test_name} using {port_label} port {port}") + return port + + +def start_test_redis_server(redis_port: int) -> None: + """ + Ensure a Redis server is running for an integration test. + + :param redis_port: Redis port required by the integration test + :return: None + """ + client = redis.StrictRedis(host="localhost", port=redis_port, db=0) + try: + client.ping() + return + except redis.exceptions.ConnectionError: + pass + finally: + client.connection_pool.disconnect() + + if shutil.which("redis-server") is None: + import pytest + + pytest.skip("Missing integration runtime dependencies: redis-server") + + subprocess.check_call( + ["redis-server", "--port", str(redis_port), "--daemonize", "yes"] + ) + + deadline = time.time() + 5 + while time.time() < deadline: + client = redis.StrictRedis(host="localhost", port=redis_port, db=0) + try: + client.ping() + return + except redis.exceptions.ConnectionError: + time.sleep(0.1) + finally: + client.connection_pool.disconnect() + + raise RuntimeError( + f"Redis server did not become ready on integration test port {redis_port}." + ) + + +def close_test_redis_server(redis_port: int) -> bool: + """ + Flush and stop the Redis server used by an integration test. + + :param redis_port: Redis port used by the test + :return: True when a Redis server was reached and shutdown was attempted + """ + client = redis.StrictRedis(host="localhost", port=redis_port, db=0) + try: + client.ping() + except redis.exceptions.ConnectionError: + return False + + try: + client.flushall() + client.flushdb() + client.script_flush() + client.shutdown(save=False) + except redis.exceptions.ConnectionError: + return True + finally: + client.connection_pool.disconnect() + + return True + + def get_slips_test_command(arguments): """ Build a Slips CLI command that uses the current Python interpreter. diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index ac91ed8e2d..eecfd1f624 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -2,6 +2,10 @@ # SPDX-License-Identifier: GPL-2.0-only import importlib.util import pytest +from tests.common_test_utils import ( + allocate_integration_test_port, + start_test_redis_server, +) if importlib.util.find_spec("termcolor") is None: @@ -9,3 +13,27 @@ "termcolor is required to run integration tests that invoke slips", allow_module_level=True, ) + + +@pytest.fixture +def integration_port_factory(request): + """ + Allocate free ports for an integration test from the shared high-port range. + + :param request: Pytest request object for the current test + :return: Callable that allocates and prints a labelled port + """ + + def allocate(port_label: str = "service") -> int: + """ + Allocate a free test port and print it for the current test. + + :param port_label: Label describing the allocated port + :return: Allocated TCP port + """ + port = allocate_integration_test_port(request.node.nodeid, port_label) + if "redis" in port_label.lower(): + start_test_redis_server(port) + return port + + return allocate diff --git a/tests/integration/test_config_files/test_config_files.py b/tests/integration/test_config_files/test_config_files.py index 83fc3fbbc0..1a9ef3d728 100644 --- a/tests/integration/test_config_files/test_config_files.py +++ b/tests/integration/test_config_files/test_config_files.py @@ -10,6 +10,7 @@ create_output_dir, assert_no_errors, check_for_text, + close_test_redis_server, get_label_count_from_output_db, run_slips, get_slips_test_command, @@ -27,17 +28,18 @@ @pytest.mark.parametrize( - "pcap_path, expected_profiles, output_dir, redis_port", + "pcap_path, expected_profiles, output_dir", [ ( "dataset/test7-malicious.pcap", 290, "test_configuration_file/", - 6667, ) ], ) -def test_conf_file(pcap_path, expected_profiles, output_dir, redis_port): +def test_conf_file( + pcap_path, expected_profiles, output_dir, integration_port_factory +): """ In this test we're using tests/test.conf """ @@ -46,6 +48,7 @@ def test_conf_file(pcap_path, expected_profiles, output_dir, redis_port): binaries=("redis-server",), require_zeek_or_bro=True, ) + redis_port = integration_port_factory("redis") config_file = TEST_DIR / "test.yaml" modify_yaml_config( output_filename=config_file.name, @@ -74,72 +77,80 @@ def test_conf_file(pcap_path, expected_profiles, output_dir, redis_port): }, ) output_dir = create_output_dir(output_dir) - output_file = os.path.join(output_dir, "slips_output.txt") - command = get_slips_test_command( - f"-t -e 1 -f {pcap_path} -o {output_dir} -c {config_file} " - f"-P {redis_port}" - ) - command = f"{command} > {output_file} 2>&1" - print("running slips ...") - # this function returns when slips is done - run_slips(command) - print("Slip is done, checking for errors in the output dir.") - assert_no_errors(output_dir) - print("Comparing profiles with expected profiles") - database = ModuleFactory().create_db_manager_obj( - redis_port, output_dir=output_dir, start_redis_server=False - ) - profiles = database.get_profiles_len() - # expected_profiles is more than 50 because we're using direction = all - assert profiles > expected_profiles - print("Checking for a random evidence") - log_file = output_dir / "alerts" / alerts_file + success = False + try: + output_file = os.path.join(output_dir, "slips_output.txt") + command = get_slips_test_command( + f"-t -e 1 -f {pcap_path} -o {output_dir} -c {config_file} " + f"-P {redis_port}" + ) + command = f"{command} > {output_file} 2>&1" + print("running slips ...") + # this function returns when slips is done + run_slips(command) + print("Slip is done, checking for errors in the output dir.") + assert_no_errors(output_dir) + print("Comparing profiles with expected profiles") + database = ModuleFactory().create_db_manager_obj( + redis_port, output_dir=output_dir, start_redis_server=False + ) + profiles = database.get_profiles_len() + # expected_profiles is more than 50 because we're using direction = all + assert profiles > expected_profiles + print("Checking for a random evidence") + log_file = output_dir / "alerts" / alerts_file - # testing disabled_detections param in the configuration file - disabled_evidence = "a connection without DNS resolution" - assert is_evidence_present(log_file, disabled_evidence) is False - print("Testing time_window_width param.") - # testing time_window_width param in the configuration file - assert check_for_text("115740 days 17 hrs 46 mins 39 seconds", output_dir) + # testing disabled_detections param in the configuration file + disabled_evidence = "a connection without DNS resolution" + assert is_evidence_present(log_file, disabled_evidence) is False + print("Testing time_window_width param.") + # testing time_window_width param in the configuration file + assert check_for_text( + "115740 days 17 hrs 46 mins 39 seconds", output_dir + ) - print("Make sure slips didn't delete zeek files.") - # test delete_zeek_files param - assert "zeek_files_test7-malicious" not in os.listdir() - print("Test storing a copy of zeek files.") - # test store_a_copy_of_zeek_files - assert "zeek_files" in os.listdir(output_dir) - print("Checking metadata directory") - # test metadata_dir - assert "metadata" in os.listdir(output_dir) - metadata_path = os.path.join(output_dir, "metadata") - for file in ("test.yaml", "whitelist.conf", "info.txt"): - print(f"checking if {file} in the metadata path {metadata_path}") - assert file in os.listdir(metadata_path) + print("Make sure slips didn't delete zeek files.") + # test delete_zeek_files param + assert "zeek_files_test7-malicious" not in os.listdir() + print("Test storing a copy of zeek files.") + # test store_a_copy_of_zeek_files + assert "zeek_files" in os.listdir(output_dir) + print("Checking metadata directory") + # test metadata_dir + assert "metadata" in os.listdir(output_dir) + metadata_path = os.path.join(output_dir, "metadata") + for file in ("test.yaml", "whitelist.conf", "info.txt"): + print(f"checking if {file} in the metadata path {metadata_path}") + assert file in os.listdir(metadata_path) - print("Checking malicious label count") - # test label=malicious - assert get_label_count_from_output_db(output_dir, "malicious") > 370 - # test disable - for module in ["template", "flow_ml_detection"]: - print(f"Checking if {module} is disabled") - assert check_for_text(module, output_dir) - print("Deleting the output directory") - shutil.rmtree(output_dir) - os.remove(config_file) + print("Checking malicious label count") + # test label=malicious + assert get_label_count_from_output_db(output_dir, "malicious") > 370 + # test disable + for module in ["template", "flow_ml_detection"]: + print(f"Checking if {module} is disabled") + assert check_for_text(module, output_dir) + success = True + finally: + if success: + close_test_redis_server(redis_port) + shutil.rmtree(output_dir) + os.remove(config_file) @pytest.mark.parametrize( - "pcap_path, expected_profiles, output_dir, redis_port", + "pcap_path, expected_profiles, output_dir", [ ( "dataset/test8-malicious.pcap ", 1, "pcap_test_conf2/", - 6668, ) ], ) -def test_conf_file2(pcap_path, expected_profiles, output_dir, redis_port): +def test_conf_file2( + pcap_path, expected_profiles, output_dir, integration_port_factory +): """ In this test we're using tests/test2.conf """ @@ -148,6 +159,7 @@ def test_conf_file2(pcap_path, expected_profiles, output_dir, redis_port): binaries=("redis-server",), require_zeek_or_bro=True, ) + redis_port = integration_port_factory("redis") config_file = TEST_DIR / "test2.yaml" modify_yaml_config( output_filename=config_file.name, @@ -170,16 +182,21 @@ def test_conf_file2(pcap_path, expected_profiles, output_dir, redis_port): ) output_dir = create_output_dir(output_dir) - output_file = os.path.join(output_dir, "slips_output.txt") - command = get_slips_test_command( - f"-t -e 1 -f {pcap_path} -o {output_dir} -c {config_file} " - f"-P {redis_port}" - ) - command = f"{command} > {output_file} 2>&1" - print("running slips ...") - run_slips(command) - print("Slip is done, checking for errors in the output dir.") - assert_no_errors(output_dir) - print("Deleting the output directory") - shutil.rmtree(output_dir) - os.remove(config_file) + success = False + try: + output_file = os.path.join(output_dir, "slips_output.txt") + command = get_slips_test_command( + f"-t -e 1 -f {pcap_path} -o {output_dir} -c {config_file} " + f"-P {redis_port}" + ) + command = f"{command} > {output_file} 2>&1" + print("running slips ...") + run_slips(command) + print("Slip is done, checking for errors in the output dir.") + assert_no_errors(output_dir) + success = True + finally: + if success: + close_test_redis_server(redis_port) + shutil.rmtree(output_dir) + os.remove(config_file) diff --git a/tests/integration/test_dataset/test_dataset.py b/tests/integration/test_dataset/test_dataset.py index 13f46f835f..010fe6a98e 100644 --- a/tests/integration/test_dataset/test_dataset.py +++ b/tests/integration/test_dataset/test_dataset.py @@ -10,6 +10,7 @@ is_evidence_present, create_output_dir, assert_no_errors, + close_test_redis_server, get_slips_test_command, skip_if_missing_runtime_dependencies, ) @@ -22,35 +23,31 @@ @pytest.mark.parametrize( - "binetflow_path, expected_profiles, expected_evidence, output_dir, redis_port", + "binetflow_path, expected_profiles, expected_evidence, output_dir", [ ( "dataset/test4-malicious.binetflow", 2, "Horizontal port scan to port http-alt 81/tcp. From 192.168.2.12", "test4/", - 6662, ), ( "dataset/test3-mixed.binetflow", 20, "Horizontal port scan to port rdp 3389/tcp. From 46.166.151.160", "test3/", - 6663, ), ( "dataset/test2-malicious.binetflow", 1, "Long Connection.", "test2/", - 6664, ), ( "dataset/test5-mixed.binetflow", 4, "Long Connection", "test5/", - 6655, ), # ( # 'dataset/test11-portscan.binetflow', @@ -66,41 +63,46 @@ def test_binetflow( expected_profiles, expected_evidence, output_dir, - redis_port, + integration_port_factory, ): skip_if_missing_runtime_dependencies( python_modules=("termcolor",), binaries=("redis-server",) ) + redis_port = integration_port_factory("redis") output_dir = create_output_dir(output_dir) + success = False + try: + output_file = os.path.join(output_dir, "slips_output.txt") + command = get_slips_test_command( + f"-e 1 -t -o {output_dir} -P {redis_port} -f {binetflow_path}" + ) + command = f"{command} > {output_file} 2>&1" + # this function returns when slips is done + run_slips(command) - output_file = os.path.join(output_dir, "slips_output.txt") - command = get_slips_test_command( - f"-e 1 -t -o {output_dir} -P {redis_port} -f {binetflow_path}" - ) - command = f"{command} > {output_file} 2>&1" - # this function returns when slips is done - run_slips(command) - - assert_no_errors(output_dir) + assert_no_errors(output_dir) - database = ModuleFactory().create_db_manager_obj( - redis_port, output_dir=output_dir, start_redis_server=False - ) - profiles = database.get_profiles_len() - assert profiles > expected_profiles + database = ModuleFactory().create_db_manager_obj( + redis_port, output_dir=output_dir, start_redis_server=False + ) + profiles = database.get_profiles_len() + assert profiles > expected_profiles - log_file = output_dir / "alerts" / alerts_file - assert is_evidence_present(log_file, expected_evidence) is True - shutil.rmtree(output_dir) + log_file = output_dir / "alerts" / alerts_file + assert is_evidence_present(log_file, expected_evidence) is True + success = True + finally: + if success: + close_test_redis_server(redis_port) + shutil.rmtree(output_dir) @pytest.mark.parametrize( - "suricata_path, output_dir, redis_port, expected_evidence", + "suricata_path, output_dir, expected_evidence", [ ( "dataset/test6-malicious.suricata.json", "test6/", - 6657, [ "Connection to unknown destination port", "vertical port scan", @@ -109,42 +111,56 @@ def test_binetflow( ) ], ) -def test_suricata(suricata_path, output_dir, redis_port, expected_evidence): +def test_suricata( + suricata_path, + output_dir, + expected_evidence, + integration_port_factory, +): skip_if_missing_runtime_dependencies( python_modules=("termcolor",), binaries=("redis-server",) ) + redis_port = integration_port_factory("redis") output_dir = create_output_dir(output_dir) - output_file = os.path.join(output_dir, "slips_output.txt") - command = get_slips_test_command( - f"-e 1 -t -f {suricata_path} -o {output_dir} -P {redis_port}" - ) - command = f"{command} > {output_file} 2>&1" - # this function returns when slips is done - run_slips(command) - - assert_no_errors(output_dir) + success = False + try: + output_file = os.path.join(output_dir, "slips_output.txt") + command = get_slips_test_command( + f"-e 1 -t -f {suricata_path} -o {output_dir} -P {redis_port}" + ) + command = f"{command} > {output_file} 2>&1" + # this function returns when slips is done + run_slips(command) - database = ModuleFactory().create_db_manager_obj( - redis_port, output_dir=output_dir, start_redis_server=False - ) - profiles = database.get_profiles_len() - # todo the profiles should be way more than 10, maybe 76, but it varies - # each run, we need to sy why - assert profiles > 10 + assert_no_errors(output_dir) - log_file = output_dir / "alerts" / alerts_file - assert any(is_evidence_present(log_file, ev) for ev in expected_evidence) - shutil.rmtree(output_dir) + database = ModuleFactory().create_db_manager_obj( + redis_port, output_dir=output_dir, start_redis_server=False + ) + profiles = database.get_profiles_len() + # todo the profiles should be way more than 10, maybe 76, but it varies + # each run, we need to sy why + assert profiles > 10 + + log_file = output_dir / "alerts" / alerts_file + assert any( + is_evidence_present(log_file, ev) for ev in expected_evidence + ) + success = True + finally: + if success: + close_test_redis_server(redis_port) + shutil.rmtree(output_dir) @pytest.mark.skipif( "nfdump" not in shutil.which("nfdump"), reason="nfdump is not installed" ) @pytest.mark.parametrize( - "nfdump_path, output_dir, redis_port", - [("dataset/test1-normal.nfdump", "test1/", 6656)], + "nfdump_path, output_dir", + [("dataset/test1-malicious.nfdump", "test1/")], ) -def test_nfdump(nfdump_path, output_dir, redis_port): +def test_nfdump(nfdump_path, output_dir, integration_port_factory): """ checks that slips is reading nfdump no issue, the file is not malicious so there's no evidence that should be present @@ -152,25 +168,28 @@ def test_nfdump(nfdump_path, output_dir, redis_port): skip_if_missing_runtime_dependencies( python_modules=("termcolor",), binaries=("redis-server",) ) + redis_port = integration_port_factory("redis") output_dir = create_output_dir(output_dir) + success = False + try: + # expected_evidence = 'Connection to unknown destination port 902/TCP' - # expected_evidence = 'Connection to unknown destination port 902/TCP' - - output_file = os.path.join(output_dir, "slips_output.txt") - command = get_slips_test_command( - f"-e 1 -t -f {nfdump_path} -o {output_dir} -P {redis_port}" - ) - command = f"{command} > {output_file} 2>&1" - # this function returns when slips is done - run_slips(command) - - database = ModuleFactory().create_db_manager_obj( - redis_port, output_dir=output_dir, start_redis_server=False - ) - profiles = database.get_profiles_len() - assert_no_errors(output_dir) - assert profiles > 0 + output_file = os.path.join(output_dir, "slips_output.txt") + command = get_slips_test_command( + f"-e 1 -t -f {nfdump_path} -o {output_dir} -P {redis_port}" + ) + command = f"{command} > {output_file} 2>&1" + # this function returns when slips is done + run_slips(command) - # log_file = os.path.join(output_dir, alerts_file) - # assert is_evidence_present(log_file, expected_evidence) == True - shutil.rmtree(output_dir) + database = ModuleFactory().create_db_manager_obj( + redis_port, output_dir=output_dir, start_redis_server=False + ) + profiles = database.get_profiles_len() + assert_no_errors(output_dir) + assert profiles > 0 + success = True + finally: + if success: + close_test_redis_server(redis_port) + shutil.rmtree(output_dir) diff --git a/tests/integration/test_fides/test_fides.py b/tests/integration/test_fides/test_fides.py index 4eeeab3f3b..0d08712f03 100644 --- a/tests/integration/test_fides/test_fides.py +++ b/tests/integration/test_fides/test_fides.py @@ -3,16 +3,21 @@ test/test.yaml and tests/test2.yaml """ +import json import shutil from pathlib import PosixPath, Path +import signal import redis +from modules.fides.messaging.network_bridge import NetworkBridge from modules.fides.model.peer import PeerInfo from modules.fides.persistence.fides_sqlite_db import FidesSQLiteDB from tests.common_test_utils import ( create_output_dir, assert_no_errors, + close_test_redis_server, + modify_yaml_config, ) from tests.module_factory import ModuleFactory import pytest @@ -31,17 +36,8 @@ alerts_file = "alerts.log" TEST_DIR = Path(__file__).resolve().parent - - -def ensure_redis_is_running(port): - redis_client = redis.StrictRedis(host="localhost", port=port, db=0) - try: - redis_client.ping() - return - except redis.exceptions.ConnectionError: - subprocess.check_call( - ["redis-server", "--port", str(port), "--daemonize", "yes"] - ) +FIDES_CONFIG_FILENAME = "fides_runtime.conf.yml" +SLIPS_CONFIG_FILENAME = "fides_runtime_slips.yaml" def delete_file_if_exists(file_path): @@ -73,7 +69,6 @@ def message_send(port): message = """ { "type": "nl2tl_intelligence_response", - "version": 1, "data": [ { "sender": { @@ -110,6 +105,11 @@ def message_send(port): ] } """ + # Fides expects the network protocol version, not the Slips package version. + message = json.loads(message) + message.update({"version": NetworkBridge.version}) + message = json.dumps(message) + redis_client = redis.StrictRedis(host="localhost", port=port, db=0) # publish the message to the "network2fides" channel redis_client.publish(channel, message) @@ -145,6 +145,80 @@ def message_handler(message): break # exit after processing one message +def stop_process_group(process, process_name, timeout_seconds=15): + """ + Stop a spawned process group and wait for it to exit. + + Parameters: + process: subprocess.Popen instance to stop. + process_name: Human-readable name used in log messages. + timeout_seconds: Maximum number of seconds to wait after SIGTERM. + + Returns: + None + """ + if process.poll() is not None: + return + + process_group_id = os.getpgid(process.pid) + os.killpg(process_group_id, signal.SIGTERM) + print(f"SIGTERM sent to {process_name} process group {process_group_id}.") + + try: + process.wait(timeout=timeout_seconds) + return + except subprocess.TimeoutExpired: + pass + + if process.poll() is not None: + return + + os.killpg(process_group_id, signal.SIGKILL) + process.wait() + print(f"SIGKILL sent to {process_name} process group {process_group_id}.") + + +def wait_for_runtime_message_count( + redis_port: int, + output_dir: Path, + module_name: str, + channel: str, + expected_count: str, + timeout_seconds: int = 30, +) -> dict: + """ + Wait for a module runtime message counter to reach an expected value. + + Parameters: + redis_port: Redis port used by the running Slips instance. + output_dir: Output directory associated with the running test. + module_name: Module whose runtime counters are being checked. + channel: Runtime counter key to wait for. + expected_count: Expected counter value stored in Redis. + timeout_seconds: Maximum time to wait before failing. + + Returns: + dict: Latest runtime counters for the module. + """ + deadline = time.time() + timeout_seconds + latest_counters = {} + + while time.time() < deadline: + db = ModuleFactory().create_db_manager_obj( + redis_port, output_dir=output_dir, start_redis_server=False + ) + latest_counters = db.get_msgs_received_at_runtime(module_name) or {} + if latest_counters.get(channel) == expected_count: + return latest_counters + time.sleep(1) + + raise AssertionError( + f"Timed out waiting for {module_name} runtime counter " + f"{channel} to reach {expected_count}. Latest counters: " + f"{latest_counters}" + ) + + def get_main_interface(): try: out = subprocess.check_output( @@ -155,24 +229,76 @@ def get_main_interface(): return None +def get_runtime_config_dir(output_dir_name: str) -> Path: + """ + Return the runtime config directory for a Fides integration test. + + Parameters: + output_dir_name: Name of the Slips output directory for the test. + + Returns: + Path: Directory where generated runtime configs should be stored. + """ + return TEST_DIR / "runtime_configs" / output_dir_name + + +def create_runtime_fides_configs( + output_dir: Path, db_name: str +) -> tuple[Path, Path]: + """ + Create isolated Slips and Fides config files for an integration test. + + Parameters: + output_dir: Test output directory used to derive the runtime config + location. + db_name: Database filename to be created under the permanent directory. + + Returns: + tuple: Generated Slips config path and permanent DB path. + """ + config_dir = get_runtime_config_dir(output_dir.name) + config_dir.mkdir(parents=True, exist_ok=True) + + runtime_fides_config = modify_yaml_config( + input_path="modules/fides/config/fides.conf.yml", + output_dir=config_dir, + output_filename=FIDES_CONFIG_FILENAME, + changes={"database": db_name}, + ) + runtime_slips_config = modify_yaml_config( + input_path=str(TEST_DIR / "fides_config.yaml"), + output_dir=config_dir, + output_filename=SLIPS_CONFIG_FILENAME, + changes={ + "global_p2p": { + "fides_conf": str(runtime_fides_config), + } + }, + ) + + return runtime_slips_config, Path("permanent") / db_name + + @pytest.mark.parametrize( - "path, output_dir, redis_port", + "path, output_dir", [ ( "dataset/test13-malicious-dhcpscan-zeek-dir", "fides_test_conf_file2/", - 6644, ) ], ) -def test_conf_file2(path, output_dir, redis_port): +def test_conf_file2(path, output_dir, integration_port_factory): """ In this test we're using the local fides integration config file. """ - ensure_redis_is_running(redis_port) + redis_port = integration_port_factory("redis") output_dir: PosixPath = create_output_dir(output_dir) + db_name = f"{output_dir.name}_fides_p2p_db.sqlite" + slips_config, test_db = create_runtime_fides_configs(output_dir, db_name) output_file = os.path.join(output_dir, "slips_output.txt") command = [ + sys.executable, "./slips.py", "-t", "-g", @@ -185,66 +311,83 @@ def test_conf_file2(path, output_dir, redis_port): "-o", str(output_dir), "-c", - str(TEST_DIR / "fides_config.yaml"), + str(slips_config), "-P", str(redis_port), ] + success = False + process = None + try: + print("running slips using output dir...") + print(output_dir) - print("running slips ...") - print(output_dir) - - # Open the log file in write mode - with open(output_file, "w") as log_file: - # Start the subprocess, redirecting stdout and stderr to the same file - process = subprocess.Popen( - command, # Replace with your command - stdout=log_file, - stderr=log_file, - ) - - print(f"Output and errors are logged in {output_file}") - countdown(40, "sigterm") - # send a SIGTERM to the process - os.kill(process.pid, 15) - print("SIGTERM sent. killing slips") - os.kill(process.pid, 9) - - message_receive(redis_port) + # Open the log file in write mode + with open(output_file, "w") as log_file: + # Start the subprocess, redirecting stdout and stderr to the same file + process = subprocess.Popen( + command, # Replace with your command + stdout=log_file, + stderr=log_file, + start_new_session=True, + ) - print(f"Slips with PID {process.pid} was killed.") + print(f"Output and errors are logged in {output_file}") + countdown(40, "sigterm") + runtime_counters = wait_for_runtime_message_count( + redis_port, + output_dir, + "fides", + "fides2network", + "1", + ) + stop_process_group(process, "fides slips") - print("Slip is done, checking for errors in the output dir.") - assert_no_errors(output_dir) - print("Checking database") - db = ModuleFactory().create_db_manager_obj( - redis_port, output_dir=output_dir, start_redis_server=False - ) - # iris is supposed to be receiving this msg, that last thing fides does - # is send a msg to this channel for iris to receive it - assert db.get_msgs_received_at_runtime("fides")["fides2network"] == "1" - assert db.get_msgs_received_at_runtime("fides")["new_alert"] == "1" - print(db.get_msgs_received_at_runtime("fides")) + print(f"Slips with PID {process.pid} was killed.") - print("Deleting the output directory") - shutil.rmtree(output_dir, ignore_errors=True) + print("Slips is done, checking for errors in the output dir.") + assert_no_errors(output_dir) + print("Checking database") + # db = ModuleFactory().create_db_manager_obj( + # redis_port, output_dir=output_dir, start_redis_server=False + # ) + # iris is supposed to be receiving this msg, that last thing fides does + # is send a msg to this channel for iris to receive it + assert runtime_counters["fides2network"] == "1" + assert runtime_counters["new_alert"] == "1" + print(runtime_counters) + success = True + finally: + if process is not None and process.poll() is None: + stop_process_group(process, "fides slips") + close_test_redis_server(redis_port) + if test_db.exists(): + test_db.unlink() + shutil.rmtree( + get_runtime_config_dir(output_dir.name), ignore_errors=True + ) + if success: + print("Deleting the output directory") + shutil.rmtree(output_dir, ignore_errors=True) @pytest.mark.parametrize( - "path, output_dir, redis_port", + "path, output_dir", [ ( "dataset/test15-malicious-zeek-dir", "fides_test_trust_recommendation_response/", - 6645, ) ], ) -def test_trust_recommendation_response(path, output_dir, redis_port): +def test_trust_recommendation_response( + path, output_dir, integration_port_factory +): """ This test simulates a common situation in the global P2P system, where Fides Module wanted to evaluate trust in an unknown peer and asked for the opinion of other peers. The known peers responded and Fides Module is processing the response. + Scenario: - Fides did not know a peer whose ID is 'stratosphere.org' and have asked for opinion of known peers: peer1 and peer2 @@ -259,12 +402,21 @@ def test_trust_recommendation_response(path, output_dir, redis_port): peers - Run Slips (includes Fides Module) in a thread and wait for all modules to start - """ - ensure_redis_is_running(redis_port) + redis_port = integration_port_factory("redis") output_dir: PosixPath = create_output_dir(output_dir) + db_name = f"{output_dir.name}_fides_test_database.sqlite" + print(f"db_name: {db_name}") + + slips_config, permanent_db = create_runtime_fides_configs( + output_dir, db_name + ) + print(f"slips_config: {slips_config} permanent_db: {permanent_db}") + output_file = os.path.join(output_dir, "slips_output.txt") + print(f"output_file: {output_file}") command = [ + sys.executable, "./slips.py", "-t", "-g", @@ -277,103 +429,105 @@ def test_trust_recommendation_response(path, output_dir, redis_port): "-o", str(output_dir), "-c", - str(TEST_DIR / "fides_config.yaml"), + str(slips_config), "-P", str(redis_port), ] - config_file_path = "modules/fides/config/fides.conf.yml" - config_temp_path = "modules/fides/config/fides.conf.yml.bak" - config_line = "database: 'fides_test_database.sqlite'\n" - shutil.copy(config_file_path, config_temp_path) - test_db = Path("permanent") / "fides_test_database.sqlite" - test_db.parent.mkdir(parents=True, exist_ok=True) + # success = False + process = None - try: - # Append the new line to the config - with open(config_file_path, "a") as file: - file.write(config_line) + print(f"command: {' '.join(command)}") - print("running slips ...") - print(output_dir) + # try: + print("running slips with output dir: ...") + print(output_dir) - mock_logger = Mock() - mock_logger.print_line = Mock() - mock_logger.error = Mock() - print("Manipulating database") - fdb = FidesSQLiteDB(mock_logger, str(test_db)) - fdb.store_peer_trust_data( - ptd.trust_data_prototype( - peer=PeerInfo( - id="peer1", - organisations=["org1", "org2"], - ip="192.168.1.1", - ), - has_fixed_trust=False, - ) + mock_logger = Mock() + mock_logger.print_line = Mock() + mock_logger.error = Mock() + print( + "Manipulating database: Inject peer1 and peer2 into the " + "database - Fides Module must know those peers" + ) + fdb = FidesSQLiteDB(mock_logger, str(permanent_db)) + fdb.store_peer_trust_data( + ptd.trust_data_prototype( + peer=PeerInfo( + id="peer1", + organisations=["org1", "org2"], + ip="192.168.1.1", + ), + has_fixed_trust=False, ) - fdb.store_peer_trust_data( - ptd.trust_data_prototype( - peer=PeerInfo( - id="peer2", organisations=["org2"], ip="192.168.1.2" - ), - has_fixed_trust=False, - ) + ) + fdb.store_peer_trust_data( + ptd.trust_data_prototype( + peer=PeerInfo( + id="peer2", organisations=["org2"], ip="192.168.1.2" + ), + has_fixed_trust=False, ) + ) - with open(output_file, "w") as log_file: - process = subprocess.Popen( - command, - stdout=log_file, - stderr=log_file, - ) - - print(f"Output and errors are logged in {output_file}") + with open(output_file, "w") as log_file: + process = subprocess.Popen( + command, + stdout=log_file, + stderr=log_file, + start_new_session=True, + ) - # these seconds are the time we wait for slips to start all the - # modules - countdown(60, "test message") + print(f"Output and errors are logged in {output_file}") - # this msg simulates a msg sent by peers to the started - # slips instance - message_send(redis_port) + # these seconds are the time we wait for slips to start all the + # modules + countdown(60, "test message") - # these 30s are the time we give slips to process the msg - countdown(30, "sigterm") + # this msg simulates a msg sent by peers to the started + # slips instance + message_send(redis_port) - # send a SIGTERM to the process - os.kill(process.pid, 15) - print("SIGTERM sent. killing slips") - os.kill(process.pid, 15) + # these 30s are the time we give slips to process the msg + countdown(30, "sigterm") + stop_process_group(process, "fides slips") - print(f"Slips with PID {process.pid} was killed.") + print(f"Slips with PID {process.pid} was killed.") - print("Slip is done, checking for errors in the output dir.") - assert_no_errors(output_dir) + print("Slips is done, checking for errors in the output dir.") + assert_no_errors(output_dir) - print("Checking database") - db = ModuleFactory().create_db_manager_obj( - redis_port, output_dir=output_dir, start_redis_server=False - ) + print("Checking database") + db = ModuleFactory().create_db_manager_obj( + redis_port, output_dir=output_dir, start_redis_server=False + ) - # assert db.get_msgs_received_at_runtime("fides")["fides2network"] == "1" - - print("Checking Fides' data outlets") - assert fdb.get_peer_trust_data("peer1").service_history != [] - assert fdb.get_peer_trust_data("peer2").service_history != [] - assert fdb.get_peer_trust_data("peer1").service_history_size == 1 - assert fdb.get_peer_trust_data("peer2").service_history_size == 1 - assert db.get_cached_network_opinion( - "stratosphere.org", 200000000000, 200000000000 - ) == { - "target": "stratosphere.org", - "score": "0.0", - "confidence": "0.0", - } - - print("Deleting the output directory") - shutil.rmtree(output_dir) - finally: - # Restore the original file - os.remove(test_db) - shutil.move(config_temp_path, config_file_path) - print("Config file restored to original state.") + # assert db.get_msgs_received_at_runtime("fides")["fides2network"] == "1" + print("Checking Fides' data outlets") + + assert fdb.get_peer_trust_data("peer1").service_history != [] + assert fdb.get_peer_trust_data("peer2").service_history != [] + assert fdb.get_peer_trust_data("peer1").service_history_size == 1 + assert fdb.get_peer_trust_data("peer2").service_history_size == 1 + assert db.get_cached_network_opinion( + "stratosphere.org", 200000000000, 200000000000 + ) == { + "target": "stratosphere.org", + "score": "0.0", + "confidence": "0.0", + } + # success = True + # except: + # pass + + # finally: + # if process is not None and process.poll() is None: + # stop_process_group(process, "fides slips") + # close_test_redis_server(redis_port) + # if permanent_db.exists(): + # permanent_db.unlink() + # shutil.rmtree( + # get_runtime_config_dir(output_dir.name), ignore_errors=True + # ) + # if success: + # print("Deleting the output directory") + # shutil.rmtree(output_dir) diff --git a/tests/integration/test_iris/test_iris.py b/tests/integration/test_iris/test_iris.py index be72922dfd..b8eb4a91e6 100644 --- a/tests/integration/test_iris/test_iris.py +++ b/tests/integration/test_iris/test_iris.py @@ -4,6 +4,7 @@ """ import re +import signal import shutil from pathlib import PosixPath @@ -12,6 +13,7 @@ from tests.common_test_utils import ( create_output_dir, assert_no_errors, + close_test_redis_server, modify_yaml_config, ) import pytest @@ -23,8 +25,6 @@ alerts_file = "alerts.log" TEST_DIR = Path(__file__).resolve().parent -PEER1_CONFIG_DIR = TEST_DIR / "peer1_config" -PEER2_CONFIG_DIR = TEST_DIR / "peer2_config" def countdown(seconds, message): @@ -144,18 +144,19 @@ def extract_connection_string(log_file_first_iris): exit(1) -def assert_peer1_setup(peer1_slips_config): +def assert_peer1_setup(peer1_slips_config, peer1_iris_config_path): """ Assert that the first peer config was generated as expected. Parameters: peer1_slips_config: Path to the first peer Slips config. + peer1_iris_config_path: Relative path to the first peer Iris config. Returns: None """ assert check_strings_in_file( - ["iris_conf: tests/integration/test_iris/peer1_config/iris.yaml"], + [f"iris_conf: {peer1_iris_config_path}"], peer1_slips_config, ) @@ -163,8 +164,11 @@ def assert_peer1_setup(peer1_slips_config): def assert_peer2_setup( peer2_slips_config, peer2_iris_config, + peer2_iris_config_path, connection_string, peer_redis_port, + peer_server_port, + peer2_key_path, ): """ Assert that the second peer config was generated as expected. @@ -172,28 +176,78 @@ def assert_peer2_setup( Parameters: peer2_slips_config: Path to the second peer Slips config. peer2_iris_config: Path to the second peer Iris config. + peer2_iris_config_path: Relative path to the second peer Iris config. connection_string: Multiaddress used to connect to the first peer. peer_redis_port: Redis port used by the second peer. + peer_server_port: Iris server port used by the second peer. + peer2_key_path: Relative path to the generated peer2 private key. Returns: None """ assert check_strings_in_file( - ["iris_conf: tests/integration/test_iris/peer2_config/iris.yaml"], + [f"iris_conf: {peer2_iris_config_path}"], peer2_slips_config, ) assert check_strings_in_file( [ f"Port: {peer_redis_port}", - "Port: 9006", + f"Port: {peer_server_port}", "DisableBootstrappingNodes: true", - "KeyFile: second.priv", + f"KeyFile: {peer2_key_path}", connection_string, ], peer2_iris_config, ) +def prepare_peer_config_paths( + config_dir: Path, prefix: str +) -> tuple[Path, Path, Path]: + """ + Build runtime config paths for a test peer. + + Parameters: + config_dir: Directory where generated config files will be stored. + prefix: Prefix used to name the generated files. + + Returns: + tuple: Slips config path, Iris config path, and Iris config path relative to repo root. + """ + config_dir.mkdir(parents=True, exist_ok=True) + iris_config = config_dir / f"{prefix}_iris.yaml" + slips_config = config_dir / f"{prefix}_slips.yaml" + iris_config_path = Path(os.path.relpath(iris_config, Path.cwd())) + return slips_config, iris_config, iris_config_path + + +def get_runtime_config_dir(output_dir_name: str, peer_name: str) -> Path: + """ + Return the runtime config directory for an Iris integration-test peer. + + Parameters: + output_dir_name: Name of the peer output directory. + peer_name: Peer-specific prefix used by the test. + + Returns: + Path: Directory where generated runtime configs should be stored. + """ + return TEST_DIR / "runtime_configs" / output_dir_name / peer_name + + +def get_iris_relative_key_path(key_path: Path) -> str: + """ + Build a key path relative to the Iris module working directory. + + Parameters: + key_path: Path to the private key file to be generated by Iris. + + Returns: + str: Relative path from `modules/iris` to the key file. + """ + return os.path.relpath(key_path, Path("modules/iris")) + + def assert_peer1_results(output_dir): """ Assert the first peer completed without runtime errors. @@ -229,6 +283,7 @@ def prepare_and_start_peer1( zeek_dir_path, output_dir, redis_port, + server_port, default_interface, log_file, ): @@ -239,24 +294,27 @@ def prepare_and_start_peer1( zeek_dir_path: Zeek dataset path used by the test. output_dir: Output directory for peer1. redis_port: Redis port used by peer1. + server_port: Iris server port used by peer1. default_interface: Interface required by Slips when using `-g`. log_file: Open file handle used to capture peer1 output. Returns: tuple: Peer1 process, Iris log path, and Slips config path. """ - peer1_slips_config = PEER1_CONFIG_DIR / "slips.yaml" - peer1_iris_config = PEER1_CONFIG_DIR / "iris.yaml" - peer1_iris_config_path = Path( - "tests/integration/test_iris/peer1_config" - ) / (peer1_iris_config.name) + config_dir = get_runtime_config_dir(output_dir.name, "peer1") + ( + peer1_slips_config, + peer1_iris_config, + peer1_iris_config_path, + ) = prepare_peer_config_paths(config_dir, "peer1") modify_yaml_config( input_path="config/iris_config.yaml", - output_dir=PEER1_CONFIG_DIR, + output_dir=config_dir, output_filename=peer1_iris_config.name, changes={ "Redis": {"Port": redis_port}, + "Server": {"Port": server_port}, "PeerDiscovery": { "DisableBootstrappingNodes": True, "ListOfMultiAddresses": [], @@ -265,7 +323,7 @@ def prepare_and_start_peer1( ) modify_yaml_config( input_path="config/slips.yaml", - output_dir=PEER1_CONFIG_DIR, + output_dir=config_dir, output_filename=peer1_slips_config.name, changes={ "global_p2p": { @@ -298,15 +356,22 @@ def prepare_and_start_peer1( command, stdout=log_file, stderr=log_file, + start_new_session=True, ) log_file_first_iris = output_dir / "iris/iris_logs.txt" - return process, log_file_first_iris, peer1_slips_config + return ( + process, + log_file_first_iris, + peer1_slips_config, + peer1_iris_config_path, + ) def prepare_and_start_peer2( zeek_dir_path, output_dir_peer, peer_redis_port, + peer_server_port, default_interface, connection_string, log_file, @@ -318,6 +383,7 @@ def prepare_and_start_peer2( zeek_dir_path: Zeek dataset path used by the test. output_dir_peer: Output directory for peer2. peer_redis_port: Redis port used by peer2. + peer_server_port: Iris server port used by peer2. default_interface: Interface required by Slips when using `-g`. connection_string: Multiaddress of peer1 used by peer2. log_file: Open file handle used to capture peer2 output. @@ -325,15 +391,18 @@ def prepare_and_start_peer2( Returns: tuple: Peer2 process, Iris log path, Slips config path, and Iris config path. """ - peer2_iris_config = PEER2_CONFIG_DIR / "iris.yaml" - peer2_iris_config_path = Path( - "tests/integration/test_iris/peer2_config" - ) / (peer2_iris_config.name) - peer2_slips_config = PEER2_CONFIG_DIR / "slips.yaml" + config_dir = get_runtime_config_dir(output_dir_peer.name, "peer2") + ( + peer2_slips_config, + peer2_iris_config, + peer2_iris_config_path, + ) = prepare_peer_config_paths(config_dir, "peer2") + peer2_key_path = output_dir_peer / "peer2.private.key" + peer2_key_path_for_iris = get_iris_relative_key_path(peer2_key_path) modify_yaml_config( input_path="config/slips.yaml", - output_dir=PEER2_CONFIG_DIR, + output_dir=config_dir, output_filename=peer2_slips_config.name, changes={ "global_p2p": { @@ -345,16 +414,16 @@ def prepare_and_start_peer2( ) modify_yaml_config( input_path="config/iris_config.yaml", - output_dir=PEER2_CONFIG_DIR, + output_dir=config_dir, output_filename=peer2_iris_config.name, changes={ "Redis": {"Port": peer_redis_port}, - "Server": {"Port": 9006}, + "Server": {"Port": peer_server_port}, "PeerDiscovery": { "DisableBootstrappingNodes": True, "ListOfMultiAddresses": [connection_string], }, - "Identity": {"KeyFile": "second.priv"}, + "Identity": {"KeyFile": peer2_key_path_for_iris}, }, ) @@ -376,7 +445,10 @@ def prepare_and_start_peer2( str(peer_redis_port), ] peer_process = subprocess.Popen( - peer_command, stdout=log_file, stderr=log_file + peer_command, + stdout=log_file, + stderr=log_file, + start_new_session=True, ) log_file_second_iris = output_dir_peer / "iris/iris_logs.txt" return ( @@ -384,23 +456,60 @@ def prepare_and_start_peer2( log_file_second_iris, peer2_slips_config, peer2_iris_config, + peer2_iris_config_path, + peer2_key_path, + peer2_key_path_for_iris, ) +def stop_process_group(process, process_name, timeout_seconds=15): + """ + Stop a spawned process group and wait for it to exit. + + Parameters: + process: subprocess.Popen instance to stop. + process_name: Human-readable name used in log messages. + timeout_seconds: Maximum number of seconds to wait after SIGTERM. + + Returns: + None + """ + if process.poll() is not None: + return + + process_group_id = os.getpgid(process.pid) + os.killpg(process_group_id, signal.SIGTERM) + print(f"SIGTERM sent to {process_name} process group {process_group_id}.") + + try: + process.wait(timeout=timeout_seconds) + return + except subprocess.TimeoutExpired: + pass + + if process.poll() is not None: + return + + os.killpg(process_group_id, signal.SIGKILL) + process.wait() + print(f"SIGKILL sent to {process_name} process group {process_group_id}.") + + @pytest.mark.parametrize( - "zeek_dir_path, output_dir, peer_output_dir, redis_port, peer_redis_port", + "zeek_dir_path, output_dir, peer_output_dir", [ ( "dataset/test13-malicious-dhcpscan-zeek-dir", "iris_integration_test/", "peer_iris_integration_test/", - 6644, - 6655, ) ], ) def test_messaging( - zeek_dir_path, output_dir, peer_output_dir, redis_port, peer_redis_port + zeek_dir_path, + output_dir, + peer_output_dir, + integration_port_factory, ): """ Tests whether Iris properly distributes an alert message generated by @@ -415,6 +524,10 @@ def test_messaging( which extends the standard use case of connecting to such P2P network. """ default_interface = get_default_interface() + redis_port = integration_port_factory("peer1 redis") + peer_redis_port = integration_port_factory("peer2 redis") + server_port = integration_port_factory("peer1 iris") + peer_server_port = integration_port_factory("peer2 iris") # Two Slips instances are necessary to be run in this test. @@ -425,103 +538,106 @@ def test_messaging( # Prepare output dir for the peer2 output_dir_peer: PosixPath = create_output_dir(peer_output_dir) output_file_peer = os.path.join(output_dir_peer, "slips_output.txt") - - print("running slips ...") - with open(output_file, "w") as log_file: - with open(output_file_peer, "w") as iris_log_file: - process, log_file_first_iris, peer1_slips_config = ( - prepare_and_start_peer1( + peer2_key_path = None + success = False + try: + print("running slips ...") + with open(output_file, "w") as log_file: + with open(output_file_peer, "w") as iris_log_file: + ( + process, + log_file_first_iris, + peer1_slips_config, + peer1_iris_config_path, + ) = prepare_and_start_peer1( zeek_dir_path=zeek_dir_path, output_dir=output_dir, redis_port=redis_port, + server_port=server_port, default_interface=default_interface, log_file=log_file, ) - ) - assert_peer1_setup(peer1_slips_config) - - # First peer (its Iris) needs to be ready and available for - # connections when the second peer tries to reach out to it. - countdown(20, "second peer") - # get the connection string from the first peer and give it - # to the second one so it is reachable - assert wait_for_file( - log_file_first_iris, 30 - ), f"Expected Iris log file was not created: {log_file_first_iris}" - original_conn_string = extract_connection_string( - log_file_first_iris - ) - - ( - peer_process, - log_file_second_iris, - peer2_slips_config, - peer2_iris_config, - ) = prepare_and_start_peer2( - zeek_dir_path=zeek_dir_path, - output_dir_peer=output_dir_peer, - peer_redis_port=peer_redis_port, - default_interface=default_interface, - connection_string=original_conn_string, - log_file=iris_log_file, - ) - assert_peer2_setup( - peer2_slips_config=peer2_slips_config, - peer2_iris_config=peer2_iris_config, - connection_string=original_conn_string, - peer_redis_port=peer_redis_port, - ) - - print( - f"Output and errors of first peer are logged in" - f" {output_file}" - ) - - # let Slips properly and fully star with all of its parts and modules. - countdown(80, "Sending msg in fides2network") - # Sending a manual message to make sure there is an alert generated, because - # is is highly probable that both slips have covered their network captures - # before the infrastructure of P2P network was fully up and running - message_send( - redis_port, - message=message_alert_TL_NL, - channel="fides2network", - ) - - # these seconds are the time we give slips to process the msg - countdown(30, "Sending SIGTERM to the 2 peers") - # Kill em with kindness. - os.kill(process.pid, 15) - os.kill(peer_process.pid, 15) - print("SIGTERM sent.") - - print("Sending SIGKILL to the 2 instances of Slips + iris") - # Kill em. Without kindness. - os.kill(process.pid, 9) - print(f"Slips with PID {process.pid} was killed.") - - os.kill(peer_process.pid, 9) - print(f"Slips peer with PID {peer_process.pid} was killed.") - - print("Slips is done, checking for errors in the 2 output dirs.") - assert_peer1_results(output_dir) - assert_peer2_results(output_dir_peer, log_file_second_iris) - - print("Deleting the output directories") - shutil.rmtree(output_dir) - shutil.rmtree(output_dir_peer) - os.remove("modules/iris/second.priv") - - # reset the generated peer2 Iris config back to its default values - # after the test finishes. - modify_yaml_config( - input_path="config/iris_config.yaml", - output_dir=PEER2_CONFIG_DIR, - output_filename=peer2_iris_config.name, - changes={ - "Redis": {"Port": 6644}, - "Server": {"Port": 9010}, - "PeerDiscovery": {}, - "Identity": {"KeyFile": "private.key"}, - }, - ) + assert_peer1_setup(peer1_slips_config, peer1_iris_config_path) + + # First peer (its Iris) needs to be ready and available for + # connections when the second peer tries to reach out to it. + countdown(20, "second peer") + # get the connection string from the first peer and give it + # to the second one so it is reachable + assert wait_for_file(log_file_first_iris, 30), ( + "Expected Iris log file was not created: " + f"{log_file_first_iris}" + ) + original_conn_string = extract_connection_string( + log_file_first_iris + ) + + ( + peer_process, + log_file_second_iris, + peer2_slips_config, + peer2_iris_config, + peer2_iris_config_path, + peer2_key_path, + peer2_key_path_for_iris, + ) = prepare_and_start_peer2( + zeek_dir_path=zeek_dir_path, + output_dir_peer=output_dir_peer, + peer_redis_port=peer_redis_port, + peer_server_port=peer_server_port, + default_interface=default_interface, + connection_string=original_conn_string, + log_file=iris_log_file, + ) + assert_peer2_setup( + peer2_slips_config=peer2_slips_config, + peer2_iris_config=peer2_iris_config, + peer2_iris_config_path=peer2_iris_config_path, + connection_string=original_conn_string, + peer_redis_port=peer_redis_port, + peer_server_port=peer_server_port, + peer2_key_path=peer2_key_path_for_iris, + ) + + print( + f"Output and errors of first peer are logged in" + f" {output_file}" + ) + + # let Slips properly and fully star with all of its parts and modules. + countdown(80, "Sending msg in fides2network") + # Sending a manual message to make sure there is an alert generated, because + # is is highly probable that both slips have covered their network captures + # before the infrastructure of P2P network was fully up and running + message_send( + redis_port, + message=message_alert_TL_NL, + channel="fides2network", + ) + + # these seconds are the time we give slips to process the msg + countdown(30, "Sending SIGTERM to the 2 peers") + print("Stopping the 2 instances of Slips and waiting for exit") + stop_process_group(process, "peer1 slips") + stop_process_group(peer_process, "peer2 slips") + + print("Slips is done, checking for errors in the 2 output dirs.") + assert_peer1_results(output_dir) + assert_peer2_results(output_dir_peer, log_file_second_iris) + success = True + finally: + close_test_redis_server(redis_port) + close_test_redis_server(peer_redis_port) + if peer2_key_path is not None and peer2_key_path.exists(): + peer2_key_path.unlink() + shutil.rmtree( + TEST_DIR / "runtime_configs" / output_dir.name, ignore_errors=True + ) + shutil.rmtree( + TEST_DIR / "runtime_configs" / output_dir_peer.name, + ignore_errors=True, + ) + if success: + print("Deleting the output directories") + shutil.rmtree(output_dir) + shutil.rmtree(output_dir_peer) diff --git a/tests/integration/test_pcap_dataset/test_pcap_dataset.py b/tests/integration/test_pcap_dataset/test_pcap_dataset.py index 60f380989d..edc19192c8 100644 --- a/tests/integration/test_pcap_dataset/test_pcap_dataset.py +++ b/tests/integration/test_pcap_dataset/test_pcap_dataset.py @@ -5,6 +5,7 @@ is_evidence_present, create_output_dir, assert_no_errors, + close_test_redis_server, ) from tests.module_factory import ModuleFactory import pytest @@ -15,36 +16,46 @@ @pytest.mark.parametrize( - "pcap_path, expected_profiles, output_dir, expected_evidence, redis_port", + "pcap_path, expected_profiles, output_dir, expected_evidence", [ ( "dataset/test8-malicious.pcap", 3, "test8/", "performing an arp scan", - 6665, ), ], ) def test_pcap( - pcap_path, expected_profiles, output_dir, expected_evidence, redis_port + pcap_path, + expected_profiles, + output_dir, + expected_evidence, + integration_port_factory, ): + redis_port = integration_port_factory("redis") output_dir = create_output_dir(output_dir) - output_file = os.path.join(output_dir, "slips_output.txt") - command = ( - f"./slips.py -e 1 -t -f {pcap_path} -o {output_dir} " - f" -P {redis_port} > {output_file} 2>&1" - ) - # this function returns when slips is done - run_slips(command) - assert_no_errors(output_dir) + success = False + try: + output_file = os.path.join(output_dir, "slips_output.txt") + command = ( + f"./slips.py -e 1 -t -f {pcap_path} -o {output_dir} " + f" -P {redis_port} > {output_file} 2>&1" + ) + # this function returns when slips is done + run_slips(command) + assert_no_errors(output_dir) - db = ModuleFactory().create_db_manager_obj( - redis_port, output_dir=output_dir - ) - profiles = db.get_profiles_len() - assert profiles > expected_profiles + db = ModuleFactory().create_db_manager_obj( + redis_port, output_dir=output_dir + ) + profiles = db.get_profiles_len() + assert profiles > expected_profiles - log_file = output_dir / "alerts" / alerts_file - assert is_evidence_present(log_file, expected_evidence) is True - shutil.rmtree(output_dir) + log_file = output_dir / "alerts" / alerts_file + assert is_evidence_present(log_file, expected_evidence) is True + success = True + finally: + if success: + close_test_redis_server(redis_port) + shutil.rmtree(output_dir) diff --git a/tests/integration/test_portscans/test_portscans.py b/tests/integration/test_portscans/test_portscans.py index 7359d3ba55..9ec8e95b94 100644 --- a/tests/integration/test_portscans/test_portscans.py +++ b/tests/integration/test_portscans/test_portscans.py @@ -10,6 +10,7 @@ create_output_dir, assert_no_errors, get_slips_test_command, + close_test_redis_server, skip_if_missing_runtime_dependencies, ) from tests.module_factory import ModuleFactory @@ -18,83 +19,92 @@ @pytest.mark.parametrize( - "path, output_dir, redis_port", + "path, output_dir", [ ( "dataset/port-scans/horizontal/conn.log", "testing_horizontal_ps/", - 7894, ) ], ) -def test_horizontal(path, output_dir, redis_port): +def test_horizontal(path, output_dir, integration_port_factory): """ checks that slips is detecting horizontal ps no issue, """ skip_if_missing_runtime_dependencies( python_modules=("termcolor",), binaries=("redis-server",) ) + redis_port = integration_port_factory("redis") output_dir = create_output_dir(output_dir) + success = False + try: + expected_evidence = ( + "Horizontal port scan to port http 80/tcp. From 10.0.2.112" + ) - expected_evidence = ( - "Horizontal port scan to port http 80/tcp. From 10.0.2.112" - ) - - output_file = os.path.join(output_dir, "slips_output.txt") - command = get_slips_test_command( - f"-e 1 -t -f {path} -o {output_dir} -P {redis_port}" - ) - command = f"{command} > {output_file} 2>&1" - # this function returns when slips is done - run_slips(command) - - assert_no_errors(output_dir) - database = ModuleFactory().create_db_manager_obj( - redis_port, output_dir=output_dir, start_redis_server=False - ) - profiles = database.get_profiles_len() - assert profiles > 0 + output_file = os.path.join(output_dir, "slips_output.txt") + command = get_slips_test_command( + f"-e 1 -t -f {path} -o {output_dir} -P {redis_port}" + ) + command = f"{command} > {output_file} 2>&1" + # this function returns when slips is done + run_slips(command) - log_file = output_dir / "alerts" / alerts_file - assert is_evidence_present(log_file, expected_evidence) + assert_no_errors(output_dir) + database = ModuleFactory().create_db_manager_obj( + redis_port, output_dir=output_dir, start_redis_server=False + ) + profiles = database.get_profiles_len() + assert profiles > 0 - shutil.rmtree(output_dir) + log_file = output_dir / "alerts" / alerts_file + assert is_evidence_present(log_file, expected_evidence) + success = True + finally: + if success: + close_test_redis_server(redis_port) + shutil.rmtree(output_dir) @pytest.mark.parametrize( - "path, output_dir, redis_port", - [("dataset/port-scans/vertical/conn.log", "testing_vertical_ps/", 7895)], + "path, output_dir", + [("dataset/port-scans/vertical/conn.log", "testing_vertical_ps/")], ) -def test_vertical(path, output_dir, redis_port): +def test_vertical(path, output_dir, integration_port_factory): """ checks that slips is detecting horizontal ps no issue, """ skip_if_missing_runtime_dependencies( python_modules=("termcolor",), binaries=("redis-server",) ) + redis_port = integration_port_factory("redis") output_dir = create_output_dir(output_dir) + success = False + try: + expected_evidence = ( + "vertical port scan to IP 45.33.32.156 from 192.168.1.9." + ) - expected_evidence = ( - "vertical port scan to IP 45.33.32.156 from 192.168.1.9." - ) - - output_file = os.path.join(output_dir, "slips_output.txt") - command = get_slips_test_command( - f"-e 1 -t -f {path} -o {output_dir} -P {redis_port}" - ) - command = f"{command} > {output_file} 2>&1" - # this function returns when slips is done - run_slips(command) - - assert_no_errors(output_dir) - - database = ModuleFactory().create_db_manager_obj( - redis_port, output_dir=output_dir, start_redis_server=False - ) - profiles = database.get_profiles_len() - assert profiles > 0 + output_file = os.path.join(output_dir, "slips_output.txt") + command = get_slips_test_command( + f"-e 1 -t -f {path} -o {output_dir} -P {redis_port}" + ) + command = f"{command} > {output_file} 2>&1" + # this function returns when slips is done + run_slips(command) - log_file = output_dir / "alerts" / alerts_file - assert is_evidence_present(log_file, expected_evidence) + assert_no_errors(output_dir) - shutil.rmtree(output_dir) + database = ModuleFactory().create_db_manager_obj( + redis_port, output_dir=output_dir, start_redis_server=False + ) + profiles = database.get_profiles_len() + assert profiles > 0 + + log_file = output_dir / "alerts" / alerts_file + assert is_evidence_present(log_file, expected_evidence) + success = True + finally: + if success: + close_test_redis_server(redis_port) + shutil.rmtree(output_dir) diff --git a/tests/integration/test_zeek_dataset/test_zeek_dataset.py b/tests/integration/test_zeek_dataset/test_zeek_dataset.py index ea33a457ef..f207c3ca04 100644 --- a/tests/integration/test_zeek_dataset/test_zeek_dataset.py +++ b/tests/integration/test_zeek_dataset/test_zeek_dataset.py @@ -5,6 +5,7 @@ is_evidence_present, create_output_dir, assert_no_errors, + close_test_redis_server, ) from tests.module_factory import ModuleFactory import pytest @@ -118,7 +119,7 @@ @pytest.mark.parametrize( - "conn_log_path, expected_profiles, expected_evidence, output_dir, redis_port", + "conn_log_path, expected_profiles, expected_evidence, output_dir", [ ( "dataset/test9-mixed-zeek-dir/conn.log", @@ -127,14 +128,12 @@ "destination IP: 194.132.197.198", # the flow with uid # CAwUdr34dVnyOwbUuj should trigger this "test9-conn_log_only/", - 6659, ), ( "dataset/test10-mixed-zeek-dir/conn.log", 5, "non-SSL established connection", "test10-conn_log_only/", - 6658, ), ], ) @@ -143,25 +142,31 @@ def test_zeek_conn_log( expected_profiles, expected_evidence, output_dir, - redis_port, + integration_port_factory, ): + redis_port = integration_port_factory("redis") output_dir = create_output_dir(output_dir) + success = False + try: + output_file = os.path.join(output_dir, "slips_output.txt") + command = ( + f"./slips.py -e 1 -t -f {conn_log_path} -o {output_dir} " + f"-P {redis_port} > {output_file} 2>&1" + ) + # this function returns when slips is done + run_slips(command) + assert_no_errors(output_dir) - output_file = os.path.join(output_dir, "slips_output.txt") - command = ( - f"./slips.py -e 1 -t -f {conn_log_path} -o {output_dir} " - f"-P {redis_port} > {output_file} 2>&1" - ) - # this function returns when slips is done - run_slips(command) - assert_no_errors(output_dir) + database = ModuleFactory().create_db_manager_obj( + redis_port, output_dir=output_dir + ) + profiles = database.get_profiles_len() + assert profiles > expected_profiles - database = ModuleFactory().create_db_manager_obj( - redis_port, output_dir=output_dir - ) - profiles = database.get_profiles_len() - assert profiles > expected_profiles - - log_file = output_dir / "alerts" / alerts_file - assert is_evidence_present(log_file, expected_evidence) is True - shutil.rmtree(output_dir) + log_file = output_dir / "alerts" / alerts_file + assert is_evidence_present(log_file, expected_evidence) is True + success = True + finally: + if success: + close_test_redis_server(redis_port) + shutil.rmtree(output_dir) diff --git a/tests/module_factory.py b/tests/module_factory.py index 9f76c5a859..64d12f16a9 100644 --- a/tests/module_factory.py +++ b/tests/module_factory.py @@ -151,6 +151,9 @@ def create_fides_obj(self, mock_db): from modules.fides.fides import FidesModule db_path = os.path.join("permanent", "databases", "fides_p2p_db.sqlite") + config_path = os.path.join( + "modules", "fides", "config", "fides.conf.yml" + ) def get_permanent_database_path(_filename): os.makedirs(os.path.dirname(db_path), exist_ok=True) @@ -159,6 +162,8 @@ def get_permanent_database_path(_filename): mock_db.return_value.get_permanent_database_path.side_effect = ( get_permanent_database_path ) + conf = Mock() + conf.read_configuration = Mock(return_value=config_path) fm = FidesModule( logger=self.logger, @@ -166,7 +171,7 @@ def get_permanent_database_path(_filename): redis_port=6379, termination_event=Mock(), slips_args=Mock(), - conf=Mock(), + conf=conf, ppid=Mock(), bloom_filters_manager=Mock(), ) @@ -974,11 +979,36 @@ def fake_read_configuration(worker): def create_evidence_loggr_obj(self): from slips_files.core.evidence_logger import EvidenceLogger - handler = EvidenceLogger( - logger_stop_signal=Mock(), - evidence_logger_q=Mock(), - output_dir="/tmp", - ) + conf = Mock() + conf.get_GID = Mock(return_value=0) + conf.get_UID = Mock(return_value=0) + conf.generate_performance_plots = Mock(return_value=False) + + logfile = Mock() + logfile.name = "alerts.log" + jsonfile = Mock() + jsonfile.name = "alerts.json" + + with ( + patch( + "slips_files.core.evidence_logger.ConfigParser", + return_value=conf, + ), + patch( + "slips_files.core.evidence_logger." + "utils.change_logfiles_ownership" + ), + patch.object( + EvidenceLogger, + "clean_file", + side_effect=[logfile, jsonfile], + ), + ): + handler = EvidenceLogger( + logger_stop_signal=Mock(), + evidence_logger_q=Mock(), + output_dir="/tmp", + ) return handler @patch(MODULE_DB_MANAGER, name="mock_db") diff --git a/tests/run_all_tests.sh b/tests/run_all_tests.sh index 1d64bca694..22a90bace3 100755 --- a/tests/run_all_tests.sh +++ b/tests/run_all_tests.sh @@ -1,31 +1,20 @@ #!/bin/bash # clear the cache database ./slips.py -cc -# close all open redis servers -printf "0" | ./slips.py -k # run all unit tests, -n *5 means distribute tests on 5 different process # -s to see print statements as they are executed python3 -m pytest tests/unit/ --ignore="tests/integration" -n 7 -p no:warnings -vvvv -s -# Close all redis-servers opened by the unit tests -python3 tests/unit/destrctor.py - # clear cache before running the integration tests ./slips.py -cc -# close all open redis servers -printf "0" | ./slips.py -k - -# the command to run dataset tests is separated from the rest because it takes so much time, -# so it's better to know and fix the failing unit tests from the above -# command before running the dataset tests -# distribute on 3 workers only because every worker will be spawning 10+ processes +# auto-discover integration test +mapfile -t integration_tests < <(find tests/integration -type f -name 'test_*.py' | sort) -python3 -m pytest -s tests/integration/test_portscans/test_portscans.py -p no:warnings -vv -python3 -m pytest -s tests/integration/test_dataset/test_dataset.py -p no:warnings -vv -python3 -m pytest -s tests/integration/test_config_files/test_config_files.py -p no:warnings -vv +for test_file in "${integration_tests[@]}"; do + python3 -m pytest -s "$test_file" -n 3 -p no:warnings -vv +done -printf "0" | ./slips.py -k ./slips.py -cc diff --git a/tests/unit/destrctor.py b/tests/unit/destrctor.py deleted file mode 100644 index 58b5c956be..0000000000 --- a/tests/unit/destrctor.py +++ /dev/null @@ -1,114 +0,0 @@ -# SPDX-FileCopyrightText: 2021 Sebastian Garcia -# SPDX-License-Identifier: GPL-2.0-only -""" -Close all redis-servers opened by the unit tests -""" - -import os -import redis - - -def get_pid_of_redis_server(port: int) -> str: - """ - Gets the pid of the redis server running on this port - Returns str(port) or false if there's no redis-server running on this port - """ - cmd = "ps aux | grep redis-server" - cmd_output = os.popen(cmd).read() - for line in cmd_output.splitlines(): - if str(port) in line: - pid = line.split()[1] - return pid - return False - - -def flush_redis_server(port: str = ""): - """ - Flush the redis server on this pid, only 1 param should be given, pid or port - :param pid: can be False if port is given - Gets the pid of the port is not given - """ - - # clear the server opened on this port - try: - # if connected := __database__.connect_to_redis_server(port): - # noinspection PyTypeChecker - # todo move this to the db - r = redis.StrictRedis( - host="localhost", - port=port, - db=0, - charset="utf-8", - socket_keepalive=True, - decode_responses=True, - retry_on_timeout=True, - health_check_interval=20, - ) - r.flushall() - r.flushdb() - r.script_flush() - return True - except (redis.exceptions.ConnectionError, RuntimeError): - # server already killed! - return False - - -def kill_redis_server(pid): - """ - Kill the redis server on this pid - """ - try: - pid = int(pid) - except ValueError: - # The server was killed before logging its PID - # the pid of it is 'not found' - return False - - # signal 0 is to check if the process is still running or not - # it returns 1 if the process used_redis_servers.txt exited - try: - # check if the process is still running - while os.kill(pid, 0) != 1: - # sigterm is 9 - os.kill(pid, 9) - except ProcessLookupError: - # ProcessLookupError: process already exited, sometimes this exception is raised - # but the process is still running, keep trying to kill it - return True - except PermissionError: - # PermissionError happens when the user tries to close redis-servers - # opened by root while he's not root, - # or when he tries to close redis-servers - # opened without root while he's root - return False - return True - - -if __name__ == "__main__": - redis_server_ports = [65531, 6380, 6381, 1234] - closed_servers = 0 - for redis_port in redis_server_ports: - # On modern systems, the netstat utility comes pre-installed, - # this can be done using psutil but it needs root on macos - redis_pid = get_pid_of_redis_server(redis_port) - if not redis_pid: - # server isn't started yet - continue - - # print(f'Redis port: {redis_port} is found using PID {redis_pid} ') - try: - flush_redis_server(str(redis_port)) - print(f"Flushed redis-server opened on port: {redis_port}") - kill_redis_server(redis_pid) - print(f"Killed redis-server on port {redis_port} PID: {redis_pid}") - closed_servers += 1 - except redis.exceptions.ConnectionError: - continue - - print(f"Closed {closed_servers} unused redis-servers") - - zeek_tmp_dir = os.path.join(os.getcwd(), "zeek_dir_for_testing") - try: - os.rmdir(zeek_tmp_dir) - except (FileNotFoundError, OSError): - pass diff --git a/tests/unit/modules/arp_poisoner/test_arp_poisoner.py b/tests/unit/modules/arp_poisoner/test_arp_poisoner.py index ae32255e6e..5490bab8ea 100644 --- a/tests/unit/modules/arp_poisoner/test_arp_poisoner.py +++ b/tests/unit/modules/arp_poisoner/test_arp_poisoner.py @@ -23,6 +23,58 @@ def test__is_time_to_repoison(poisoner): assert not poisoner._is_time_to_repoison(target) +@pytest.mark.parametrize( + "arp_scan_available, expected", + [(False, True), (True, False)], +) +def test_pre_main_stops_when_arp_scan_is_missing( + poisoner, arp_scan_available, expected +): + poisoner.arp_scan_bin_available = arp_scan_available + poisoner.print = MagicMock() + + assert poisoner.pre_main() is expected + + if expected: + poisoner.print.assert_called_once_with( + "The arp-scan tool is not installed. ARP poisoner module is " + "stopping.", + ) + else: + poisoner.print.assert_not_called() + + +@pytest.mark.parametrize( + "arp_scan_available, expected_channels", + [ + (False, {}), + ( + True, + { + "new_blocking": "new_blocking_subscription", + "tw_closed": "tw_closed_subscription", + }, + ), + ], +) +def test_subscribe_to_channels_depends_on_arp_scan( + poisoner, arp_scan_available, expected_channels +): + poisoner.arp_scan_bin_available = arp_scan_available + poisoner.db.subscribe = MagicMock( + side_effect=["new_blocking_subscription", "tw_closed_subscription"] + ) + + poisoner.subscribe_to_channels() + + assert poisoner.channels == expected_channels + + if arp_scan_available: + assert poisoner.db.subscribe.call_count == 2 + else: + poisoner.db.subscribe.assert_not_called() + + def test_is_broadcast_true(poisoner): assert poisoner.is_broadcast("192.168.1.255", "192.168.1.0/24") diff --git a/tests/unit/modules/fides/test_fides_bridge.py b/tests/unit/modules/fides/test_fides_bridge.py index 600ae348f6..3184d08ade 100644 --- a/tests/unit/modules/fides/test_fides_bridge.py +++ b/tests/unit/modules/fides/test_fides_bridge.py @@ -6,6 +6,7 @@ from modules.fides.messaging.network_bridge import NetworkMessage from modules.fides.model.aliases import PeerId, Target from modules.fides.model.threat_intelligence import ThreatIntelligence +from slips_files.common.slips_utils import utils @pytest.fixture @@ -25,7 +26,7 @@ def mock_handler(): def test_initialization(network_bridge, mock_queue): assert network_bridge._NetworkBridge__queue == mock_queue - assert network_bridge.version == 1 + assert network_bridge.version == utils.get_current_version() def test_listen_success(network_bridge, mock_handler, mock_queue): @@ -36,7 +37,11 @@ def test_listen_success(network_bridge, mock_handler, mock_queue): mock_queue.listen.assert_called_once() # Simulate a valid message being received - message = '{"type": "test", "version": 1, "data": {}}' + message = ( + '{"type": "test", ' + f'"version": "{utils.get_current_version()}", ' + '"data": {}}' + ) callback = mock_queue.listen.call_args[0][0] callback(message) @@ -83,5 +88,9 @@ def test_send_exception_handling(network_bridge, mock_queue): mock_queue.send = MagicMock(side_effect=Exception("send failed")) with pytest.raises(Exception, match="send failed"): network_bridge._NetworkBridge__send( - NetworkMessage(type="test", version=1, data={}) + NetworkMessage( + type="test", + version=utils.get_current_version(), + data={}, + ) ) diff --git a/tests/unit/modules/flow_alerts/test_conn.py b/tests/unit/modules/flow_alerts/test_conn.py index c2281603b8..15bdf76e74 100644 --- a/tests/unit/modules/flow_alerts/test_conn.py +++ b/tests/unit/modules/flow_alerts/test_conn.py @@ -1235,3 +1235,51 @@ def test_check_connection_to_local_ip( ) conn.check_connection_to_local_ip(twid, flow) assert conn.set_evidence.conn_to_private_ip.call_count == expected_calls + + +@pytest.mark.parametrize( + "msg, expected_result", + [ + ( + { + "channel": "tw_closed", + "data": json.dumps( + { + "text": "profile_192.168.1.1_timewindow3", + "version": "test-version", + } + ), + }, + ("profile_192.168.1.1", "timewindow3"), + ), + ( + { + "channel": "tw_closed", + "data": json.dumps( + { + "text": json.dumps( + { + "text": "profile_192.168.1.1_timewindow3", + "version": "test-version", + } + ), + "version": "test-version", + } + ), + }, + ("profile_192.168.1.1", "timewindow3"), + ), + ( + { + "channel": "tw_closed", + "data": json.dumps( + {"text": "invalid_payload", "version": "test-version"} + ), + }, + None, + ), + ], +) +def test_parse_closed_tw_message(msg, expected_result): + conn = ModuleFactory().create_conn_analyzer_obj() + assert conn._parse_closed_tw_message(msg) == expected_result diff --git a/tests/unit/modules/flow_alerts/test_set_evidence.py b/tests/unit/modules/flow_alerts/test_set_evidence.py index 929e14ba82..c2ce2836f4 100644 --- a/tests/unit/modules/flow_alerts/test_set_evidence.py +++ b/tests/unit/modules/flow_alerts/test_set_evidence.py @@ -669,6 +669,10 @@ def test_conn_without_dns(time_difference_hours, expected_confidence): args, _ = set_ev.db.set_evidence.call_args evidence = args[0] assert evidence.confidence == expected_confidence + assert ( + evidence.description + == "A connection without DNS resolution to Destination IP: 10.0.0.1" + ) @pytest.mark.parametrize( diff --git a/tests/unit/slips_files/common/test_slips_utils.py b/tests/unit/slips_files/common/test_slips_utils.py index 9c1be8097f..4ba88c40e9 100644 --- a/tests/unit/slips_files/common/test_slips_utils.py +++ b/tests/unit/slips_files/common/test_slips_utils.py @@ -129,6 +129,27 @@ def test_sanitize(input_string, expected_output): assert utils.sanitize(input_string) == expected_output +def test_get_ip_identification_as_str_skips_timestamp(): + utils = ModuleFactory().create_utils_obj() + ip_identification = { + "DNS_resolution": ["example.com"], + "SNI": "service.example.com", + "13.0.0.0/8": { + "AS": "Example ASN", + "timestamp": 1775911069.6800127, + }, + "timestamp": 1775911069.6800127, + } + + result = utils.get_ip_identification_as_str(ip_identification) + + assert result == ( + "example.com, SNI: service.example.com, 13.0.0.0/8: " + "AS: Example ASN, , " + ) + assert "timestamp" not in result + + @pytest.mark.parametrize( "ip, expected_val", [ @@ -212,6 +233,22 @@ def test_calculate_confidence(input_value, expected_output): assert utils.calculate_confidence(input_value) == expected_output +@pytest.mark.parametrize( + "score, expected_output", + [ + (0.80, "High"), + (0.95, "High"), + (0.55, "Medium"), + (0.79, "Medium"), + (0.54, "low"), + (0.0, "low"), + ], +) +def test_evidence_confidence_to_string(score, expected_output): + utils = ModuleFactory().create_utils_obj() + assert utils.evidence_confidence_to_string(score) == expected_output + + @pytest.mark.parametrize( "input_value, input_format, expected_output", [ diff --git a/tests/unit/slips_files/core/database/test_database.py b/tests/unit/slips_files/core/database/test_database.py index c7343aa964..9c195c2a0a 100644 --- a/tests/unit/slips_files/core/database/test_database.py +++ b/tests/unit/slips_files/core/database/test_database.py @@ -150,11 +150,11 @@ def test_get_the_other_ip_version(): profileid_ipv4 = "profile_192.168.250.250" ipv6 = "2001:0db8:85a3:0000:0000:8a2e:0370:7334" - db.rdb.get_ipv6_from_profile = Mock(return_value=json.dumps(ipv6)) + db.rdb.get_the_other_ip_version = Mock(return_value=ipv6) - other_ip = json.loads(db.get_the_other_ip_version(profileid_ipv4)) + other_ip = db.get_the_other_ip_version(profileid_ipv4) - db.rdb.get_ipv6_from_profile.assert_called_once_with(profileid_ipv4) + db.rdb.get_the_other_ip_version.assert_called_once_with(profileid_ipv4) assert other_ip == ipv6 diff --git a/tests/unit/slips_files/core/input/test_input.py b/tests/unit/slips_files/core/input/test_input.py index fbaf9f1d30..42e825aee9 100644 --- a/tests/unit/slips_files/core/input/test_input.py +++ b/tests/unit/slips_files/core/input/test_input.py @@ -219,7 +219,7 @@ def test_reached_timeout( @pytest.mark.skipif( "nfdump" not in shutil.which("nfdump"), reason="nfdump is not installed" ) -@pytest.mark.parametrize("path", ["dataset/test1-normal.nfdump"]) +@pytest.mark.parametrize("path", ["dataset/test1-malicious.nfdump"]) def test_handle_nfdump(path): input = ModuleFactory().create_input_obj(path, InputType.NFDUMP) handler = input.input_handlers[InputType.NFDUMP] diff --git a/tests/unit/slips_files/core/test_evidence_handler_worker.py b/tests/unit/slips_files/core/test_evidence_handler_worker.py index 4ad9a1fe4d..2669e96bfc 100644 --- a/tests/unit/slips_files/core/test_evidence_handler_worker.py +++ b/tests/unit/slips_files/core/test_evidence_handler_worker.py @@ -202,6 +202,45 @@ def test_add_alert_to_json_log_file( ) +@pytest.mark.parametrize( + "confidence, expected_output", + [ + (0.80, "High"), + (0.55, "Medium"), + (0.54, "low"), + ], +) +def test_add_evidence_to_json_log_file_maps_confidence_to_string( + confidence, expected_output +): + worker = ModuleFactory().create_evidence_handler_worker_obj() + worker.idmefv2.convert_to_idmef_event = Mock(return_value={"ID": "e1"}) + worker.evidence_logger_q.put = Mock() + worker.add_latency_to_csv = Mock() + evidence = Evidence( + evidence_type=EvidenceType.ARP_SCAN, + description="ARP scan detected", + attacker=Attacker( + direction=Direction.SRC, + ioc_type=IoCType.IP, + value="192.168.1.20", + ), + threat_level=ThreatLevel.INFO, + confidence=confidence, + profile=ProfileID("192.168.1.20"), + timewindow=TimeWindow(1), + uid=["uid1"], + timestamp="2024/10/04 15:45:30.123456+0000", + ) + + worker.add_evidence_to_json_log_file(evidence) + + logged_evidence = worker.evidence_logger_q.put.call_args[0][0]["to_log"] + note = logged_evidence["Note"] + assert '"confidence":' in note + assert f'"confidence": "{expected_output}"' in note + + def test_show_popup(): worker = ModuleFactory().create_evidence_handler_worker_obj() worker.notify = Mock() diff --git a/tests/unit/slips_files/core/text_formatters/test_evidence_formatter.py b/tests/unit/slips_files/core/text_formatters/test_evidence_formatter.py index 3ec58af499..0a444294a9 100644 --- a/tests/unit/slips_files/core/text_formatters/test_evidence_formatter.py +++ b/tests/unit/slips_files/core/text_formatters/test_evidence_formatter.py @@ -41,7 +41,7 @@ "converted_time IP 192.168.1.1 detected as malicious in timewindow 1" " (start 2023/07/01 12:00:00, stop 2023/07/01 12:05:00) " "given the following evidence:\n" - "\t- Detected Port scan detected threat level: medium. Interface: default.\n", + "\t- Detected Port scan detected Threat level: medium. Interface: default.\n", ), # testcase2: Multiple evidence ( @@ -76,8 +76,8 @@ "converted_time IP 192.168.1.1 detected as malicious in timewindow 1" " (start 2023/07/01 12:00:00, stop 2023/07/01 12:05:00) " "given the following evidence:\n" - "\t- Detected Port scan detected threat level: medium. Interface: default.\n" - "\t- Detected Malicious JA3 fingerprint threat level: high. Interface: default.\n", + "\t- Detected Port scan detected Threat level: medium. Interface: default.\n" + "\t- Detected Malicious JA3 fingerprint Threat level: high. Interface: default.\n", ), ], ) @@ -147,15 +147,15 @@ def test_line_wrap(input_text, expected_output): "threat_level, expected_description", [ # Testcase 1: INFO threat level - (ThreatLevel.INFO, "Original description threat level: info."), + (ThreatLevel.INFO, "Original description Threat level: info."), # Testcase 2: LOW threat level - (ThreatLevel.LOW, "Original description threat level: low."), + (ThreatLevel.LOW, "Original description Threat level: low."), # Testcase 3: MEDIUM threat level - (ThreatLevel.MEDIUM, "Original description threat level: medium."), + (ThreatLevel.MEDIUM, "Original description Threat level: medium."), # Testcase 4: HIGH threat level - (ThreatLevel.HIGH, "Original description threat level: high."), + (ThreatLevel.HIGH, "Original description Threat level: high."), # Testcase 5: CRITICAL threat level - (ThreatLevel.CRITICAL, "Original description threat level: critical."), + (ThreatLevel.CRITICAL, "Original description Threat level: critical."), ], ) def test_add_threat_level_to_evidence_description(