Replies: 3 comments 5 replies
-
Implementation notes
Maybe we could adopt a format similar to notebooklets - a class that has a required @FlorianBracq has a PoC for an implementation Refs: https://msticnb.readthedocs.io/en/latest/creatingnotebooklets.html |
Beta Was this translation helpful? Give feedback.
-
First version of the PoC: https://github.com/FlorianBracq/msticpy/tree/process_analysis Sample usage:
Example with custom mapping:
Happy to receive some feedbacks, and there are many ideas already shared that open up a lot more possibilities that I had not though of, surely the end result will look nothing like what I'm sharing here! Cheers, |
Beta Was this translation helpful? Give feedback.
-
Thoughts about implementing analyzer functions(thoughts as code) from functools import wraps
from typing import List, Optional, Union
import pandas as pd
class MPAnalytic:
"""Decorator for MSTICPy Analytic function."""
_RTN_HEADER = "\n Returns"
_SCH_DOC_STRING = (
" schema_map : Optional[Dict[str, str]]\n"
" Mapping of input schema to analytic required fields.\n\n"
" Returns"
)
def __init__(
self,
name: Optional[str] = None,
description: Optional[str] = None,
entities: Union[str, List[str], None] = None,
required_fields: Optional[List[str]] = None,
**kwargs,
):
self.name = name
self.description = description
self.entities = entities if isinstance(entities, list) else [entities]
self.required_fields = required_fields
def __call__(self, func):
"""Call the decorator to wrap function execution."""
setattr(func, "mp_analytic", True)
if self.name is not None:
self.name = func.__name__
setattr(func, "properties", self)
if self._RTN_HEADER in func.__doc__:
func.__doc__ = func.__doc__.replace(self._RTN_HEADER, self._SCH_DOC_STRING)
@wraps(func)
def run_analytic(*args, **kwargs):
"""Extract data and schema_map args and run function."""
# Get `data` param as arg[0] or `data` kwarg
if isinstance(args[0], pd.DataFrame):
input_df = args[0]
args = args[1:]
elif "data" in kwargs:
input_df = kwargs["data"]
#
if "schema_map" in kwargs:
input_df = input_df.rename(columns=kwargs.pop("schema_map"))
kwargs["data"] = input_df
return func(*args, **kwargs)
return run_analytic
def __repr__(self):
fields = "\n ".join(f"{name}={value}" for name, value in self.__dict__.items())
return "Analytic(\n " + fields + "\n)"
@MPAnalytic(
name="test analytic",
description="Testing wrapper",
entities="Host",
required_fields=["ProcessName", "ParentProcessName"]
)
def test_analytic(data: pd.DataFrame) -> pd.DataFrame:
"""
Test analytic
Parameters
----------
data : pd.DataFrame
Input data for analytic
Returns
-------
pd.DataFrame
Analytic results
"""
column = data.columns[1]
results = AnalyticResult(data.groupby(column).count().reset_index())
results.result_properties["severity"] = "high"
results.result_properties["description"] = "Some badness was found"
return results
print(test_analytic.properties, "\n\n")
help(test_analytic) Output from last two statements -
Sample - more realistic analytic @MPAnalytic(
name="Suspicious rundll32 parent",
entities="Process",
description="Detects suspicious rundll32.exe parent processes",
required_fields=["ProcessName", "NewProcessId", "ProcessId", "ParentProcessName", "TimeGenerated"],
)
def suspicious_rundll32_parent(data: pd.DataFrame) -> pd.DataFrame:
"""
Detects suspicious rundll32.exe parent processes
Parameters
----------
data : pd.DataFrame
Input data for analytic
Returns
-------
pd.DataFrame
Analytic results
"""
susp_rundll_parent = data[
((data["ProcessName"].str.lower() == "rundll32.exe"))
& (
data["ParentProcessName"].str.lower().isin(
[
"winword.exe",
"excel.exe",
"msaccess.exe",
"lsass.exe",
"taskeng.exe",
"winlogon.exe",
"schtask.exe",
"regsvr32.exe",
"wmiprvse.exe",
"wsmprovhost.exe",
]
)
)
][["ProcessName", "NewProcessId", "ProcessId", "ParentProcessName", "TimeGenerated"]]
results = AnalyticResult(susp_rundll_parent)
if susp_rundll_parent.empty:
results.result_properties["severity"] = "information"
results.result_properties["description"] = "No suspicious processes found."
else:
results.result_properties["severity"] = "high"
results.result_properties["description"] = (
f"{len(susp_rundll_parent)} rundll processes found with suspect parents."
)
return results |
Beta Was this translation helpful? Give feedback.
-
From a recent discussion (2023-01-26) we arrived at a consensus that it makes sense to have some more scenario-specific analysis functions in MSTICPy (or at least in the MSTICPy family).
There is a bit of discussion in a parallel thread here #611 (comment)
Some discussion points
Analytics and Feb 2023 Hack Month
Beta Was this translation helpful? Give feedback.
All reactions