diff --git a/.secrets.baseline b/.secrets.baseline index fc1ac4872e..aa5615109c 100644 --- a/.secrets.baseline +++ b/.secrets.baseline @@ -149,14 +149,14 @@ "filename": "config/slips.yaml", "hashed_secret": "4cac50cee3ad8e462728e711eac3e670753d5016", "is_verified": false, - "line_number": 223 + "line_number": 226 }, { "type": "Secret Keyword", "filename": "config/slips.yaml", "hashed_secret": "d033e22ae348aeb5660fc2140aec35850c4da997", "is_verified": false, - "line_number": 393 + "line_number": 396 } ], "dataset/test14-malicious-zeek-dir/http.log": [ @@ -7192,5 +7192,5 @@ } ] }, - "generated_at": "2025-05-08T14:51:28Z" + "generated_at": "2025-05-10T13:18:46Z" } diff --git a/config/slips.yaml b/config/slips.yaml index 02adc7f1b4..635df7f918 100644 --- a/config/slips.yaml +++ b/config/slips.yaml @@ -105,13 +105,12 @@ parameters: deletePrevdb: true # Set the label for all the flows that are being read. - # For now only normal and malware directly. No option for setting labels - # with a filter + # For now only Benign and Malicious (Capitalized) # The purpose is to be used in the training of ML models and to output # flows with labels for other tools. - # label: malicious - # label: unknown - label: normal + # label: Malicious + # label: Benign + label: Benign # If Zeek files are rotated or not to avoid running out of disk. # Zeek rotation is enabled by default when using an interface, # which means Slips will delete all Zeek log files after 1 day @@ -213,7 +212,10 @@ flowmldetection: # training the models, to test in unknown data. # You should have trained at least once with 'Normal' data and once with # 'Malicious' data in order for the test to work. - mode: test + mode: train + # creates an extra log file called training.log/testing.log in the + # ouptput dir with performance metrics depending on the mode. + create_performance_metrics_log_files: True ############################# virustotal: diff --git a/modules/flowmldetection/flowmlanalysis.ipynb b/modules/flowmldetection/flowmlanalysis.ipynb new file mode 100644 index 0000000000..d726cd2805 --- /dev/null +++ b/modules/flowmldetection/flowmlanalysis.ipynb @@ -0,0 +1,76 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Analysis of Flows with Machine Learning for Slips" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Analysis of a fixed list of flows to try techniques and find parameters" + ] + }, + { + "cell_type": "code", + "execution_count": 1, + "metadata": {}, + "outputs": [], + "source": [ + "import numpy\n", + "from sklearn.linear_model import SGDClassifier\n", + "from sklearn.preprocessing import StandardScaler\n", + "import pickle\n", + "import pandas as pd\n", + "import json\n", + "import traceback\n", + "import warnings" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "#" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "slips-new", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.10.12" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/modules/flowmldetection/flowmldetection.py b/modules/flowmldetection/flowmldetection.py index e44ac83f4d..4ef661146e 100644 --- a/modules/flowmldetection/flowmldetection.py +++ b/modules/flowmldetection/flowmldetection.py @@ -3,6 +3,7 @@ # SPDX-License-Identifier: GPL-2.0-only import numpy +import os from sklearn.linear_model import SGDClassifier from sklearn.preprocessing import StandardScaler import pickle @@ -10,10 +11,18 @@ import json import traceback import warnings - +from sklearn.metrics import confusion_matrix +from sklearn.metrics import ( + f1_score, + precision_score, + accuracy_score, + matthews_corrcoef, + recall_score, +) from slips_files.common.parsers.config_parser import ConfigParser from slips_files.common.slips_utils import utils from slips_files.common.abstracts.module import IModule +from slips_files.core.structures.labels import Label from slips_files.core.structures.evidence import ( Evidence, ProfileID, @@ -27,9 +36,6 @@ Method, ) -# Only for debbuging -# from matplotlib import pyplot as plt - # This horrible hack is only to stop sklearn from printing those warnings def warn(*args, **kwargs): @@ -38,6 +44,10 @@ def warn(*args, **kwargs): warnings.warn = warn +BACKGROUND = Label.BACKGROUND.value +BENIGN = Label.BENIGN.value +MALICIOUS = Label.MALICIOUS.value + class FlowMLDetection(IModule): # Name: short name of the module. Do not use spaces @@ -55,71 +65,145 @@ def init(self): # Set the output queue of our database instance # Read the configuration self.read_configuration() - # Minum amount of new lables needed to trigger the train - self.minimum_lables_to_retrain = 50 - # To plot the scores of training - # self.scores = [] + # Minum amount of new labels needed to start the train + self.minimum_labels_to_start_train = 50 + # Minum amount of new labels needed to retrain + self.minimum_labels_to_retrain = 50 + # The number of flows when last trained. Used internally only to know + # when to retrain + self.last_number_of_flows_when_trained = 0 # The scaler trained during training and to use during testing self.scaler = StandardScaler() self.model_path = "./modules/flowmldetection/model.bin" self.scaler_path = "./modules/flowmldetection/scaler.bin" + self.init_log_file() + + def init_log_file(self): + """ + Init the log file for training or testing + """ + if not self.enable_logs: + return + + if self.mode == "train": + # Initialize the training log file + self.log_path = os.path.join(self.output_dir, "training.log") + elif self.mode == "test": + # Initialize the testing log file + self.log_path = os.path.join(self.output_dir, "testing.log") + self.log_file = open(self.log_path, "w") def read_configuration(self): conf = ConfigParser() self.mode = conf.get_ml_mode() + # This is the global label in the configuration, + # in case the flows do not have a label themselves + self.ground_truth_config_label = conf.label() + self.enable_logs: bool = conf.create_performance_metrics_log_files() - def train(self): + def write_to_log(self, message: str): + """ + Write a message to the local log file if + create_performance_metrics_log_files is enabled in slips.yaml + """ + if not self.enable_logs: + return + + try: + self.log_file.write(message + "\n") + except Exception as e: + self.print(f"Error writing to log: {e}", 0, 1) + + def train(self, sum_labeled_flows, last_number_of_flows_when_trained): """ Train a model based on the flows we receive and the labels """ try: - # Process the labels to have only Normal and Malware - self.flows.label = self.flows.label.str.replace( - r"(^.*ormal.*$)", "Normal", regex=True - ) - self.flows.label = self.flows.label.str.replace( - r"(^.*alware.*$)", "Malware", regex=True + # Create y_flow with the label + y_flow = numpy.full( + self.flows.shape[0], self.flows.ground_truth_label ) - self.flows.label = self.flows.label.str.replace( - r"(^.*alicious.*$)", "Malware", regex=True - ) - - # Separate - y_flow = self.flows["label"] - X_flow = self.flows.drop("label", axis=1) + # Create X_flow with the current flows minus the label + X_flow = self.flows.drop("ground_truth_label", axis=1) + # Drop the detailed labels + X_flow = X_flow.drop("detailed_ground_truth_label", axis=1) + # Drop the module_labels X_flow = X_flow.drop("module_labels", axis=1) - # Normalize this batch of data so far. This can get progressivle slow + # Normalize this batch of data so far. This can get progressively slow X_flow = self.scaler.fit_transform(X_flow) + # Count the number of labels of each type in this epoc + epoch_label_counts = { + BACKGROUND: (y_flow == BACKGROUND).sum(), + MALICIOUS: (y_flow == MALICIOUS).sum(), + BENIGN: (y_flow == BENIGN).sum(), + } + # Train try: + # Online incremental learning self.clf.partial_fit( - X_flow, y_flow, classes=["Malware", "Normal"] + X_flow, + y_flow, + classes=[BACKGROUND, MALICIOUS, BENIGN], ) except Exception: self.print("Error while calling clf.train()") self.print(traceback.format_exc(), 0, 1) - # See score so far in training - score = self.clf.score(X_flow, y_flow) + # Predict on the training data + y_pred = self.clf.predict(X_flow) - # To debug the training score - # self.scores.append(score) + # For metrics, let's focus on Malicious vs Benign (ignore Background) + mask = (y_flow == MALICIOUS) | (y_flow == BENIGN) + y_true_bin = y_flow[mask] + y_pred_bin = y_pred[mask] - self.print(f" Training Score: {score}", 0, 1) - # self.print(f' Model Parameters: {self.clf.coef_}') + # Map to binary: Malicious=1, Benign=0 + y_true_bin = numpy.where(y_true_bin == MALICIOUS, 1, 0) + y_pred_bin = numpy.where(y_pred_bin == MALICIOUS, 1, 0) - # Debug code to store a plot in a png of the scores - # plt.plot(self.scores) - # plt.savefig('train-scores.png') + # Compute confusion matrix: tn, fp, fn, tp + tn, fp, fn, tp = ( + confusion_matrix(y_true_bin, y_pred_bin, labels=[0, 1]).ravel() + if len(set(y_true_bin)) > 1 + else (0, 0, 0, 0) + ) + + # Compute metrics + FPR = fp / (fp + tn) if (fp + tn) > 0 else 0 + TNR = tn / (tn + fp) if (tn + fp) > 0 else 0 + TPR = tp / (tp + fn) if (tp + fn) > 0 else 0 + FNR = fn / (fn + tp) if (fn + tp) > 0 else 0 + F1 = f1_score(y_true_bin, y_pred_bin, zero_division=0) + PREC = precision_score(y_true_bin, y_pred_bin, zero_division=0) + ACCU = accuracy_score(y_true_bin, y_pred_bin) + MCC = ( + matthews_corrcoef(y_true_bin, y_pred_bin) + if len(set(y_true_bin)) > 1 + else 0 + ) + RECALL = recall_score(y_true_bin, y_pred_bin, zero_division=0) # Store the models on disk self.store_model() + # Log training information + self.write_to_log( + f"Total labels: {sum_labeled_flows}, " + f"Background: {epoch_label_counts['Background']}. " + f"Benign: {epoch_label_counts['Benign']}. " + f"Malicious: {epoch_label_counts[MALICIOUS]}. " + f"Metrics: FPR={FPR:.4f}, TNR={TNR:.4f}, " + f"TPR={TPR:.4f}, FNR={FNR:.4f}, " + f"F1={F1:.4f}, Precision={PREC:.4f}, " + f"Accuracy={ACCU:.4f}, MCC={MCC:.4f}, Recall={RECALL:.4f}." + ) except Exception: - self.print("Error in train()", 0, 1) + self.print("Error in train().", 0, 1) self.print(traceback.format_exc(), 0, 1) + self.write_to_log("Error occurred during training.") def process_features(self, dataset): """ @@ -132,7 +216,13 @@ def process_features(self, dataset): for proto in to_discard: dataset = dataset[dataset.proto != proto] - # For now, discard the ports + # If te proto is in the list to delete and there is only one flow, + # then the dataset will be empty + if dataset.empty: + # DataFrame is empty now, so return empty + return dataset + + # For now, discard these to_drop = [ "appproto", "daddr", @@ -144,9 +234,7 @@ def process_features(self, dataset): "history", "uid", "dir_", - "dbytes", "endtime", - "bytes", "flow_source", ] for field in to_drop: @@ -155,15 +243,29 @@ def process_features(self, dataset): except (ValueError, KeyError): pass + # When flows are read from Slips sqlite, + # the state is not transformed to 'Established' or + # 'Not Established', it is still 'S0' and others + # So transform here + dataset["state"] = dataset.apply( + lambda row: self.db.get_final_state_from_flags( + row["state"], (row["spkts"] + row["dpkts"]) + ), + axis=1, + ) + # Convert state to categorical dataset.state = dataset.state.str.replace( - r"(^.*NotEstablished.*$)", "0", regex=True + r"(^.*Not Established.*$)", "0", regex=True ) dataset.state = dataset.state.str.replace( r"(^.*Established.*$)", "1", regex=True ) - # Convert proto to categorical. For now we only have few states, - # so we can hardcode... + + # Convert categories to floats + dataset.state = dataset.state.astype("float64") + + # Convert proto to categorical. For now we only have few states, so we can hardcode... # We dont use the data to create categories because in testing mode # we dont see all the protocols # Also we dont store the Categorizer because the user can retrain @@ -184,21 +286,25 @@ def process_features(self, dataset): dataset.proto = dataset.proto.str.replace( r"(^.*arp.*$)", "4", regex=True ) - fields_to_convert_to_flow = [ + + dataset["bytes"] = dataset["sbytes"] + dataset["dbytes"] + dataset["pkts"] = dataset["spkts"] + dataset["dpkts"] + + fields_to_convert_to_float = [ dataset.proto, dataset.dport, dataset.sport, dataset.dur, dataset.pkts, dataset.spkts, - dataset.allbytes, + dataset.bytes, dataset.sbytes, dataset.state, ] - for field in fields_to_convert_to_flow: + for field in fields_to_convert_to_float: try: field = field.astype("float64") - except ValueError: + except (ValueError, AttributeError): pass return dataset @@ -207,69 +313,74 @@ def process_features(self, dataset): self.print("Error in process_features()") self.print(traceback.format_exc(), 0, 1) - def process_flows(self): + def process_training_flows(self, last_number_of_flows_when_trained): """ - Process all the flwos in the DB + Process only the new flows in the DB since the last training. Store the pandas df in self.flows """ try: + # Ensure the index is an integer + if last_number_of_flows_when_trained is None: + last_number_of_flows_when_trained = 0 + else: + last_number_of_flows_when_trained = int( + last_number_of_flows_when_trained + ) + # We get all the flows so far - # because this retraining happens in batches flows = self.db.get_all_flows() - # Check how many different labels are in the DB - # We need both normal and malware + # Only process new flows since last training + new_flows = flows[last_number_of_flows_when_trained:] + + # Check how many **different** labels are in the DB labels = self.db.get_labels() if len(labels) == 1: - # Only 1 label has flows - # There are not enough different labels, so insert two flows - # that are fake but representative of a normal and malware flow - # they are only for the training process - # At least 1 flow of each label is required - # self.print(f'Amount of labeled flows: {labels}', 0, 1) - flows.append( + # Insert fake flows for both classes if needed + new_flows.append( { - "ts": 1594417039.029793, + "starttime": 1594417039.029793, "dur": "1.9424750804901123", "saddr": "10.7.10.101", "sport": "49733", "daddr": "40.70.224.145", "dport": "443", "proto": "tcp", - "state": "Established", - "allbytes": 42764, - "spkts": 37, + "state": "SF", + "spkts": 17, + "dpkts": 27, "sbytes": 25517, + "dbytes": 17247, "appproto": "ssl", - "label": "Malware", + "ground_truth_label": MALICIOUS, "module_labels": { - "flowalerts-long-connection": "Malware" + "flowalerts-long-connection": MALICIOUS }, } ) - flows.append( + new_flows.append( { - "ts": 1382355032.706468, + "starttime": 1382355032.706468, "dur": "10.896695", "saddr": "147.32.83.52", "sport": "47956", "daddr": "80.242.138.72", "dport": "80", "proto": "tcp", - "state": "Established", - "allbytes": 67696, + "state": "SF", "spkts": 1, + "dpkts": 0, "sbytes": 100, + "dbytes": 67596, "appproto": "http", - "label": "Normal", + "ground_truth_label": BENIGN, "module_labels": { - "flowalerts-long-connection": "Normal" + "flowalerts-long-connection": BENIGN }, } ) - # If there are enough flows, we dont insert them anymore # Convert to pandas df - df_flows = pd.DataFrame(flows) + df_flows = pd.DataFrame(new_flows) # Process features df_flows = self.process_features(df_flows) @@ -277,7 +388,6 @@ def process_flows(self): # Update the flow to the processed version self.flows = df_flows except Exception: - # Stop the timer self.print("Error in process_flows()") self.print(traceback.format_exc(), 0, 1) @@ -290,6 +400,8 @@ def process_flow(self, flow_to_process: dict): # Convert the flow to a pandas dataframe raw_flow = pd.DataFrame(flow_to_process, index=[0]) dflow = self.process_features(raw_flow) + if dflow.empty: + return None # Update the flow to the processed version return dflow except Exception: @@ -303,7 +415,6 @@ def detect(self, x_flow) -> Optional[numpy.ndarray]: and returns the predection array """ try: - given_x_flow = x_flow # clean the flow fields_to_drop = [ "label", @@ -311,12 +422,9 @@ def detect(self, x_flow) -> Optional[numpy.ndarray]: "uid", "history", "dir_", - "dbytes", - "dpkts", "endtime", - "bytes", "flow_source", - "ground_truth_label", # todo now we can use them + "ground_truth_label", "detailed_ground_truth_label", ] for field in fields_to_drop: @@ -330,7 +438,7 @@ def detect(self, x_flow) -> Optional[numpy.ndarray]: return pred except Exception as e: self.print( - f"Error in detect() while processing " f"\n{given_x_flow}\n{e}" + f"Error in detect() while processing " f"\n{x_flow}\n{e}" ) self.print(traceback.format_exc(), 0, 1) @@ -422,18 +530,16 @@ def pre_main(self): def main(self): if msg := self.get_msg("new_flow"): + # When a new flow arrives msg = json.loads(msg["data"]) - twid = msg["twid"] + self.twid = msg["twid"] + self.profileid = msg["profileid"] self.flow = msg["flow"] - # these fields are expected in testing. update the original - # flow dict to have them + # These following extra fields are expected in testing. + # update the original flow dict to have them self.flow.update( { - "allbytes": (self.flow["sbytes"] + self.flow["dbytes"]), - # the flow["state"] is the origstate, we dont need that here - # we need the interpreted state "state": msg["interpreted_state"], - "pkts": self.flow["spkts"] + self.flow["dpkts"], "label": msg["label"], "module_labels": msg["module_labels"], } @@ -446,56 +552,66 @@ def main(self): # Use labeled flows labels = self.db.get_labels() sum_labeled_flows = sum(i[1] for i in labels) - if ( - sum_labeled_flows >= self.minimum_lables_to_retrain - and sum_labeled_flows % self.minimum_lables_to_retrain == 1 - ): - # We get here every 'self.minimum_lables_to_retrain' - # amount of labels - # So for example we retrain every 100 labels and only when - # we have at least 100 labels - self.print( - f"Training the model with the last group of " - f"flows and labels. Total flows: {sum_labeled_flows}." - ) - # Process all flows in the DB and make them ready - # for pandas - self.process_flows() - # Train an algorithm - self.train() + + # The min labels to retrain is the min number of flows + # we should have seen so far in this capture to start training + # This is so we dont _start_ training with only 1 flow + + # Once we are over the start minimum, the second condition is + # to force to retrain every a minimum_labels_to_retrain number + # of flows. So we dont retrain every 1 flow. + if sum_labeled_flows >= self.minimum_labels_to_start_train: + if ( + sum_labeled_flows + - self.last_number_of_flows_when_trained + >= self.minimum_labels_to_retrain + ): + # So for example we retrain every 50 labels and only when + # we have at least 50 labels + self.print( + f"Training the model with the last group of " + f"flows and labels. Total flows: {sum_labeled_flows}." + ) + # Process all flows in the DB and make them ready + # for pandas + self.process_training_flows( + self.last_number_of_flows_when_trained + ) + # Train an algorithm + self.train( + sum_labeled_flows, + self.last_number_of_flows_when_trained, + ) + self.last_number_of_flows_when_trained = ( + sum_labeled_flows + ) + elif self.mode == "test": # We are testing, which means using the model to detect processed_flow = self.process_flow(self.flow) - # After processing the flow, it may happen that we # delete icmp/arp/etc so the dataframe can be empty if processed_flow is not None and not processed_flow.empty: + try: + original_label = processed_flow[ + "ground_truth_label" + ].iloc[0] + except KeyError: + # If there are no labels in the flows, the default + # label should be the one in the config file. + original_label = self.ground_truth_config_label + # Predict pred: numpy.ndarray = self.detect(processed_flow) if not pred: # an error occurred return - label = self.flow["label"] - if label and label != "unknown" and label != pred[0]: - # If the user specified a label in test mode, - # and the label is diff from the prediction, - # print in debug mode - self.print( - f"Report Prediction {pred[0]} for label" - f' {label} flow {self.flow["saddr"]}:' - f'{self.flow["sport"]} ->' - f' {self.flow["daddr"]}:' - f'{self.flow["dport"]}/' - f'{self.flow["proto"]}', - 0, - 3, - ) - if pred[0] == "Malware": + if pred[0] == MALICIOUS: # Generate an alert - self.set_evidence_malicious_flow(self.flow, twid) + self.set_evidence_malicious_flow(self.flow, self.twid) self.print( - f"Prediction {pred[0]} for label {label}" + f"Prediction {pred[0]} for label {original_label}" f' flow {self.flow["saddr"]}:' f'{self.flow["sport"]} -> ' f'{self.flow["daddr"]}:' @@ -504,3 +620,42 @@ def main(self): 0, 2, ) + + # So you can disable this code easily. Since it is used + # only for evaluating a testing + log_testing_data = True + if log_testing_data: + # Initialize counters if not already done + if not hasattr(self, "tp"): + self.tp = 0 + if not hasattr(self, "tn"): + self.tn = 0 + if not hasattr(self, "fp"): + self.fp = 0 + if not hasattr(self, "fn"): + self.fn = 0 + + # Update counters based on predictions and labels + if ( + pred[0] == MALICIOUS + and original_label == MALICIOUS + ): + self.tp += 1 + elif pred[0] == BENIGN and original_label == BENIGN: + self.tn += 1 + elif pred[0] == MALICIOUS and original_label == BENIGN: + self.fp += 1 + self.write_to_log( + f"False Positive Flow: {self.flow}" + ) + elif pred[0] == BENIGN and original_label == MALICIOUS: + self.fn += 1 + self.write_to_log( + f"False Negative Flow: {self.flow}" + ) + + # Log the testing performance metrics + self.write_to_log( + f"TP: {self.tp}, TN: {self.tn}," + f" FP: {self.fp}, FN: {self.fn}" + ) diff --git a/modules/flowmldetection/model.bin b/modules/flowmldetection/model.bin index aef4cba35b..5c305d3834 100644 Binary files a/modules/flowmldetection/model.bin and b/modules/flowmldetection/model.bin differ diff --git a/modules/flowmldetection/plot_testing_performance.py b/modules/flowmldetection/plot_testing_performance.py new file mode 100644 index 0000000000..f0f9b8f2d0 --- /dev/null +++ b/modules/flowmldetection/plot_testing_performance.py @@ -0,0 +1,182 @@ +import matplotlib.pyplot as plt +import sys +import numpy as np +import argparse + +def process_file(file_path): + # Initialize the counters for the values + FPR_values = [] + FNR_values = [] + TNR_values = [] + TPR_values = [] + F1_values = [] + accuracy_values = [] + precision_values = [] + MCC_values = [] + recall_values = [] + + # Counters for error tracking + total_lines = 0 + error_lines = 0 + unusual_lines = 0 + + # Read the file and extract the data + with open(file_path, 'r') as file: + for line in file: + total_lines += 1 + if "TP:" in line: + try: + # Extract the values from the line + parts = line.split(',') + TP = int(parts[0].split(':')[1].strip()) + TN = int(parts[1].split(':')[1].strip()) + FP = int(parts[2].split(':')[1].strip()) + FN = int(parts[3].split(':')[1].strip()) + + # Calculate metrics + FPR = FP / (FP + TN) if (FP + TN) != 0 else 0 + FNR = FN / (FN + TP) if (FN + TP) != 0 else 0 + TNR = TN / (TN + FP) if (TN + FP) != 0 else 0 + TPR = TP / (TP + FN) if (TP + FN) != 0 else 0 + Precision = TP / (TP + FP) if (TP + FP) != 0 else 0 + Recall = TPR # Recall is the same as TPR + F1 = 2 * (Precision * Recall) / (Precision + Recall) if (Precision + Recall) != 0 else 0 + Accuracy = (TP + TN) / (TP + TN + FP + FN) + MCC = ((TP * TN) - (FP * FN)) / np.sqrt((TP + FP) * (TP + FN) * (TN + FP) * (TN + FN)) if ((TP + FP) * (TP + FN) * (TN + FP) * (TN + FN)) != 0 else 0 + + # Append the values to the respective lists + FPR_values.append(FPR) + FNR_values.append(FNR) + TNR_values.append(TNR) + TPR_values.append(TPR) + F1_values.append(F1) + accuracy_values.append(Accuracy) + precision_values.append(Precision) + MCC_values.append(MCC) + recall_values.append(Recall) + + except Exception as e: + error_lines += 1 + print(f"Error in line {total_lines}: {e}") + continue + + # Check for any unusual cases + if any(np.isnan([FPR, FNR, TNR, TPR, F1, Accuracy, Precision, MCC, Recall])): + unusual_lines += 1 + print(f"Unusual values in line {total_lines}: NaN values found") + + return FPR_values, FNR_values, TNR_values, TPR_values, F1_values, accuracy_values, precision_values, MCC_values, recall_values, total_lines, error_lines, unusual_lines + +def plot_metrics(FPR_values, FNR_values, TNR_values, TPR_values, F1_values, accuracy_values, precision_values, MCC_values, recall_values, experiment_number, total_lines, error_lines, unusual_lines): + # Separate the values into two groups based on their proximity to 0 or 1 + close_to_0 = { + 'FPR': [], 'FNR': [] + } + close_to_1 = { + 'TNR': [], 'TPR': [], 'F1': [], 'accuracy': [], 'precision': [], 'MCC': [], 'recall': [] + } + + # Categorize the metrics into two groups + for i in range(len(FPR_values)): + close_to_0['FPR'].append(FPR_values[i]) + close_to_0['FNR'].append(FNR_values[i]) + + close_to_1['TNR'].append(TNR_values[i]) + close_to_1['TPR'].append(TPR_values[i]) + close_to_1['F1'].append(F1_values[i]) + close_to_1['accuracy'].append(accuracy_values[i]) + close_to_1['precision'].append(precision_values[i]) + close_to_1['MCC'].append(MCC_values[i]) + close_to_1['recall'].append(recall_values[i]) + + # Plot metrics for values close to 0 (linear scale) + plot_single_group(close_to_0, f'performance_metrics_testing_close_to_0_experiment_{experiment_number}.png', experiment_number, is_close_to_0=True) + + # Plot metrics for values close to 1 (log scale) + plot_single_group(close_to_1, f'performance_metrics_testing_close_to_1_experiment_{experiment_number}.png', experiment_number, is_close_to_0=False) + + # Print the final values + print("\nFinal Metric Values for Experiment", experiment_number) + print(f"Final FPR: {FPR_values[-1]:.4f}") + print(f"Final FNR: {FNR_values[-1]:.4f}") + print(f"Final TNR: {TNR_values[-1]:.4f}") + print(f"Final TPR: {TPR_values[-1]:.4f}") + print(f"Final F1 Score: {F1_values[-1]:.4f}") + print(f"Final Accuracy: {accuracy_values[-1]:.4f}") + print(f"Final Precision: {precision_values[-1]:.4f}") + print(f"Final MCC: {MCC_values[-1]:.4f}") + print(f"Final Recall: {recall_values[-1]:.4f}") + + # Print summary statistics + print(f"\nSummary for Experiment {experiment_number}:") + print(f"Total lines read: {total_lines}") + print(f"Lines with errors: {error_lines}") + print(f"Unusual lines (NaN values): {unusual_lines}") + +def plot_single_group(metrics_dict, output_filename, experiment_number, is_close_to_0=False): + plt.figure(figsize=(12, 8)) + + # Only plot the metrics that exist in the dictionary + if 'FPR' in metrics_dict: + plt.plot(metrics_dict['FPR'], label='False Positive Rate (FPR)', marker='o') + if 'FNR' in metrics_dict: + plt.plot(metrics_dict['FNR'], label='False Negative Rate (FNR)', marker='o') + if 'TNR' in metrics_dict: + plt.plot(metrics_dict['TNR'], label='True Negative Rate (TNR)', marker='o') + if 'TPR' in metrics_dict: + plt.plot(metrics_dict['TPR'], label='True Positive Rate (TPR)', marker='o') + if 'F1' in metrics_dict: + plt.plot(metrics_dict['F1'], label='F1 Score', marker='o') + if 'accuracy' in metrics_dict: + plt.plot(metrics_dict['accuracy'], label='Accuracy', marker='o') + if 'precision' in metrics_dict: + plt.plot(metrics_dict['precision'], label='Precision', marker='o') + if 'MCC' in metrics_dict: + plt.plot(metrics_dict['MCC'], label='Matthews Correlation Coefficient (MCC)', marker='o') + if 'recall' in metrics_dict: + plt.plot(metrics_dict['recall'], label='Recall (TPR)', marker='o') + + # If the plot is close to 1, apply log scale + if not is_close_to_0: + plt.yscale('log') + + # If the plot is close to 0, set dynamic Y-ticks based on the min/max values of the series + if is_close_to_0: + min_val = min(min(metrics_dict['FPR']), min(metrics_dict['FNR'])) + max_val = max(max(metrics_dict['FPR']), max(metrics_dict['FNR'])) + + # Avoid log(0), so set the minimum limit a little higher than zero + if min_val == 0: + min_val = 1e-4 # Avoid zero values on the logarithmic scale + + plt.ylim(min_val, max_val) # Set Y-axis limits based on the data range + # Ensure ticks are within the valid range + if min_val > 0 and max_val > 0: + plt.yticks(np.logspace(np.log10(min_val), np.log10(max_val), num=6)) # Set ticks logarithmically + + # Add the experiment number to the plot title + plt.xlabel('Index') + plt.ylabel('Metric Value') + plt.title(f'Experiment {experiment_number} - Evaluation Metrics Over Time') + plt.legend() + + # Save the plot + plt.savefig(output_filename) + plt.close() + +def main(): + # Set up argument parsing + parser = argparse.ArgumentParser(description='Plot testing performance metrics.') + parser.add_argument('-f', '--file', type=str, required=True, help='Path to the testing performance log file') + parser.add_argument('-e', '--experiment', type=str, required=True, help='Experiment number') + + args = parser.parse_args() + + file_path = args.file + experiment_number = args.experiment + + FPR_values, FNR_values, TNR_values, TPR_values, F1_values, accuracy_values, precision_values, MCC_values, recall_values, total_lines, error_lines, unusual_lines = process_file(file_path) + plot_metrics(FPR_values, FNR_values, TNR_values, TPR_values, F1_values, accuracy_values, precision_values, MCC_values, recall_values, experiment_number, total_lines, error_lines, unusual_lines) + +if __name__ == "__main__": + main() diff --git a/modules/flowmldetection/plot_train_performance.py b/modules/flowmldetection/plot_train_performance.py new file mode 100644 index 0000000000..304f0f4ead --- /dev/null +++ b/modules/flowmldetection/plot_train_performance.py @@ -0,0 +1,111 @@ +import pandas as pd +import matplotlib.pyplot as plt +import re +import sys +import argparse +import os +import matplotlib.ticker as ticker + +def plot_log_data(file_path, experiment_number): + # Read the log data from the file + with open(file_path, 'r') as file: + log_data = file.read() + + # Regex pattern for the new log format + pattern = ( + r"Total labels: ([\d\.]+), Background: (\d+). Benign: (\d+). Malicious: (\d+). Metrics: " + r"FPR=([\d\.]+), TNR=([\d\.]+), TPR=([\d\.]+), FNR=([\d\.]+), " + r"F1=([\d\.]+), Precision=([\d\.]+), Accuracy=([\d\.]+), MCC=([\d\.]+), Recall=([\d\.]+)\." + ) + + # Parse the log file + data = re.findall(pattern, log_data) + + # Convert data to a DataFrame + columns = [ + "Total labels", "Background", "Benign", "Malicious", + "FPR", "TNR", "TPR", "FNR", "F1", "Precision", "Accuracy", "MCC", "Recall" + ] + df = pd.DataFrame(data, columns=columns) + df = df.astype({ + "Total labels": float, + "Background": int, + "Benign": int, + "Malicious": int, + "FPR": float, + "TNR": float, + "TPR": float, + "FNR": float, + "F1": float, + "Precision": float, + "Accuracy": float, + "MCC": float, + "Recall": float, + }) + + dir_name = os.path.dirname(file_path) + + # --- Plot 1: Number of labels (linear scale, no total labels) --- + fig1, ax1 = plt.subplots(figsize=(10, 6)) + ax1.plot(df.index, df["Background"], label="Background", color='black') + ax1.plot(df.index, df["Benign"], label="Benign", color='cyan') + ax1.plot(df.index, df["Malicious"], label="Malicious", color='magenta') + ax1.set_xlabel('Index') + ax1.set_ylabel('Label Counts') + ax1.set_title(f'Label Counts - Experiment {experiment_number}') + ax1.legend() + ax1.yaxis.set_major_locator(ticker.MaxNLocator(70)) + ax1.xaxis.set_major_locator(ticker.MaxNLocator(50)) + plt.tight_layout() + plt.savefig(os.path.join(dir_name, f'performance_metrics_training_{experiment_number}_labels.png')) + + # --- Plot 2: FNR and FPR (log scale) --- + fig2, ax2 = plt.subplots(figsize=(10, 6)) + ax2.plot(df.index, df["FNR"], label="FNR", color='red') + ax2.plot(df.index, df["FPR"], label="FPR", color='blue') + ax2.set_xlabel('Index') + ax2.set_ylabel('Rate') + ax2.set_yscale('log') + ax2.set_title(f'FNR and FPR - Experiment {experiment_number}') + ax2.legend() + ax2.yaxis.set_major_locator(ticker.MaxNLocator(100)) + ax2.xaxis.set_major_locator(ticker.MaxNLocator(50)) + plt.tight_layout() + plt.savefig(os.path.join(dir_name, f'performance_metrics_training_{experiment_number}_fnr_fpr.png')) + + # --- Plot 3: Other metrics (log scale) --- + fig3, ax3 = plt.subplots(figsize=(12, 7)) + metrics_rest = ["TNR", "TPR", "F1", "Precision", "Accuracy", "MCC", "Recall"] + colors_rest = [ + 'tab:blue', 'tab:green', 'tab:purple', 'tab:brown', + 'tab:gray', 'tab:pink', 'tab:olive' + ] + for metric, color in zip(metrics_rest, colors_rest): + ax3.plot(df.index, df[metric], label=metric, color=color) + ax3.set_xlabel('Index') + ax3.set_ylabel('Metric Value') + ax3.set_yscale('log') + ax3.set_title(f'Performance Metrics (except FNR/FPR) - Experiment {experiment_number}') + ax3.legend() + ax3.yaxis.set_major_locator(ticker.MaxNLocator(50)) + ax3.xaxis.set_major_locator(ticker.MaxNLocator(50)) + plt.tight_layout() + plt.savefig(os.path.join(dir_name, f'performance_metrics_training_{experiment_number}_other_metrics.png')) + + plt.show() + + # --- Print final values in terminal --- + print("\nFinal values at last training step:") + for col in ["Total labels", "Background", "Benign", "Malicious", + "FPR", "TNR", "TPR", "FNR", "F1", "Precision", "Accuracy", "MCC", "Recall"]: + print(f"{col}: {df[col].iloc[-1]}") + +def main(): + parser = argparse.ArgumentParser(description="Process a log file and plot the data with two y-axes.") + parser.add_argument('-f', '--file', metavar='log_file', type=str, required=True, help="Path to the log file") + parser.add_argument('-e', '--experiment', metavar='experiment_number', type=str, required=True, help="Experiment number to add to the filename") + args = parser.parse_args() + plot_log_data(args.file, args.experiment) + +if __name__ == "__main__": + main() diff --git a/modules/flowmldetection/scaler.bin b/modules/flowmldetection/scaler.bin index 9292bda6a6..a62890d0b0 100644 Binary files a/modules/flowmldetection/scaler.bin and b/modules/flowmldetection/scaler.bin differ diff --git a/modules/riskiq/riskiq.py b/modules/riskiq/riskiq.py index 5abf2ddb19..7b5653997e 100644 --- a/modules/riskiq/riskiq.py +++ b/modules/riskiq/riskiq.py @@ -25,7 +25,7 @@ def init(self): def read_configuration(self): conf = ConfigParser() - risk_iq_credentials_path = conf.RiskIQ_credentials_path() + risk_iq_credentials_path = conf.risk_iq_credentials_path() try: with open(risk_iq_credentials_path, "r") as f: self.riskiq_email = f.readline().replace("\n", "") diff --git a/modules/update_manager/update_manager.py b/modules/update_manager/update_manager.py index ba8106aa5c..b791bfc137 100644 --- a/modules/update_manager/update_manager.py +++ b/modules/update_manager/update_manager.py @@ -119,7 +119,7 @@ def read_riskiq_creds(risk_iq_credentials_path): self.ssl_feeds_path = conf.ssl_feeds() self.ssl_feeds = self.get_feed_details(self.ssl_feeds_path) - risk_iq_credentials_path = conf.RiskIQ_credentials_path() + risk_iq_credentials_path = conf.risk_iq_credentials_path() read_riskiq_creds(risk_iq_credentials_path) self.riskiq_update_period = conf.riskiq_update_period() diff --git a/slips/main.py b/slips/main.py index b00cc8f3db..3f661c8843 100644 --- a/slips/main.py +++ b/slips/main.py @@ -414,7 +414,7 @@ def get_analyzed_flows_percentage(self) -> str: self.total_flows = self.db.get_total_flows() flows_percentage = int( - (self.db.get_processed_flows_so_far() / self.total_flows) * 100 + (self.db.get_processed_flows_so_far() / self.total_flows) * 100 if self.total_flows != 0 else 0 ) return f"Analyzed Flows: {green(flows_percentage)}{green('%')}. " diff --git a/slips_files/common/parsers/config_parser.py b/slips_files/common/parsers/config_parser.py index 40f1b044bc..e208f78816 100644 --- a/slips_files/common/parsers/config_parser.py +++ b/slips_files/common/parsers/config_parser.py @@ -418,7 +418,12 @@ def data_exfiltration_threshold(self): def get_ml_mode(self): return self.read_configuration("flowmldetection", "mode", "test") - def RiskIQ_credentials_path(self): + def create_performance_metrics_log_files(self) -> bool: + return self.read_configuration( + "flowmldetection", "create_performance_metrics_log_files", False + ) + + def risk_iq_credentials_path(self): return self.read_configuration( "threatintelligence", "RiskIQ_credentials_path", "" ) diff --git a/slips_files/core/database/database_manager.py b/slips_files/core/database/database_manager.py index 0b805976df..1d339685f8 100644 --- a/slips_files/core/database/database_manager.py +++ b/slips_files/core/database/database_manager.py @@ -661,7 +661,8 @@ def add_software_to_profile(self, *args, **kwargs): return self.rdb.add_software_to_profile(*args, **kwargs) def get_total_flows(self, *args, **kwargs): - return int(self.rdb.get_total_flows(*args, **kwargs)) + total_flows = self.rdb.get_total_flows(*args, **kwargs) + return int(total_flows) if total_flows is not None else 0 def increment_processed_flows(self, *args, **kwargs): return self.rdb.increment_processed_flows(*args, **kwargs) @@ -887,7 +888,10 @@ def get_flow(self, *args, **kwargs): """returns the raw flow as read from the log file""" return self.sqlite.get_flow(*args, **kwargs) - def add_flow(self, flow, profileid: str, twid: str, label="benign"): + def add_flow(self, flow, profileid: str, twid: str, label="Benign"): + """ + Just in case, by default if there are no labels in the flow, we consider it Benign + """ # stores it in the db self.sqlite.add_flow(flow, profileid, twid, label=label) # handles the channels and labels etc. diff --git a/slips_files/core/database/redis_db/profile_handler.py b/slips_files/core/database/redis_db/profile_handler.py index edbbf3a12f..e11fa47372 100644 --- a/slips_files/core/database/redis_db/profile_handler.py +++ b/slips_files/core/database/redis_db/profile_handler.py @@ -395,7 +395,12 @@ def get_final_state_from_flags(self, state, pkts): We receive the pakets to distinguish some Reset connections """ try: - pre = state.split("_")[0] + # In some flows the state is a nan + try: + pre = state.split("_")[0] + except AttributeError: + pre = "" + try: # Try suricata states """ @@ -404,9 +409,10 @@ def get_final_state_from_flags(self, state, pkts): these are: New, Established and Closed,for UDP only new and established. For each of these states Suricata can employ different timeouts. """ - if "new" in state or "established" in state: + # This is controversial, but if we dont have a good state, we consider it not established for now + if "new" in state or state.lower() == "established": return "Established" - elif "closed" in state: + elif "closed" in state or state.lower() == "not established": return "Not Established" # We have varius type of states depending on the type of flow. @@ -417,7 +423,11 @@ def get_final_state_from_flags(self, state, pkts): return "Established" # For Argus - suf = state.split("_")[1] + # In some flows the state is a nan + try: + suf = state.split("_")[1] + except AttributeError: + suf = "" if "S" in pre and "A" in pre and "S" in suf and "A" in suf: """ Examples: @@ -518,7 +528,7 @@ def get_final_state_from_flags(self, state, pkts): except Exception: exception_line = sys.exc_info()[2].tb_lineno self.print( - f"Error in getFinalStateFromFlags() in database.py line {exception_line}", + f"Error in get_final_state_from_flags() in profile_handler.py line {exception_line}", 0, 1, ) diff --git a/slips_files/core/profiler.py b/slips_files/core/profiler.py index 0d9b11bd27..e8fdf5cc56 100644 --- a/slips_files/core/profiler.py +++ b/slips_files/core/profiler.py @@ -119,7 +119,7 @@ def read_configuration(self): self.local_whitelist_path = conf.local_whitelist_path() self.timeformat = conf.ts_format() self.analysis_direction = conf.analysis_direction() - self.label = conf.label() + self.configuration_label = conf.label() self.width = conf.get_tw_width_as_float() self.client_ips: List[ Union[IPv4Network, IPv6Network, IPv4Address, IPv6Address] @@ -377,7 +377,7 @@ def store_features_going_in(self, profileid: str, twid: str, flow): flow, profileid=profileid, twid=twid, - label=self.label, + label=self.configuration_label, ) self.db.mark_profile_tw_as_modified(profileid, twid, "") diff --git a/slips_files/core/structures/labels.py b/slips_files/core/structures/labels.py new file mode 100644 index 0000000000..b1dc64234e --- /dev/null +++ b/slips_files/core/structures/labels.py @@ -0,0 +1,11 @@ +from enum import Enum + + +class Label(Enum): + """ + label of flows should be one of the following + """ + + MALICIOUS = "Malicious" + BENIGN = "Benign" + BACKGROUND = "Background" diff --git a/tests/test_profiler.py b/tests/test_profiler.py index 36733d2b8c..465bc5922b 100644 --- a/tests/test_profiler.py +++ b/tests/test_profiler.py @@ -467,7 +467,6 @@ def test_read_configuration( mock_conf.local_whitelist_path.return_value = "path/to/whitelist" mock_conf.ts_format.return_value = "unixtimestamp" mock_conf.analysis_direction.return_value = "all" - mock_conf.label.return_value = "malicious" mock_conf.get_tw_width_as_float.return_value = 1.0 mock_conf.client_ips.return_value = ["192.168.1.1", "10.0.0.1"] @@ -476,7 +475,6 @@ def test_read_configuration( assert profiler.local_whitelist_path == "path/to/whitelist" assert profiler.timeformat == "unixtimestamp" assert profiler.analysis_direction == "all" - assert profiler.label == "malicious" assert profiler.width == 1.0 assert profiler.client_ips == ["192.168.1.1", "10.0.0.1"]