Skip to content

Commit

Permalink
Add MPLog parser to Defender plugin (#724)
Browse files Browse the repository at this point in the history
(DIS-2399)
  • Loading branch information
cecinestpasunepipe authored Jun 20, 2024
1 parent e091d41 commit b89b12d
Show file tree
Hide file tree
Showing 10 changed files with 928 additions and 1 deletion.
219 changes: 218 additions & 1 deletion dissect/target/plugins/os/windows/defender.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
from __future__ import annotations

import re
from datetime import datetime, timezone
from io import BytesIO
from pathlib import Path
from typing import Any, BinaryIO, Generator, Iterable, Iterator, Union
from typing import Any, BinaryIO, Generator, Iterable, Iterator, TextIO, Union

import dissect.util.ts as ts
from dissect.cstruct import Structure, cstruct
Expand All @@ -10,6 +13,27 @@
from dissect.target import plugin
from dissect.target.exceptions import UnsupportedPluginError
from dissect.target.helpers.record import TargetRecordDescriptor
from dissect.target.plugins.os.windows.defender_helpers.defender_patterns import (
DEFENDER_MPLOG_BLOCK_PATTERNS,
DEFENDER_MPLOG_LINE,
DEFENDER_MPLOG_PATTERNS,
)
from dissect.target.plugins.os.windows.defender_helpers.defender_records import (
DefenderMPLogBMTelemetryRecord,
DefenderMPLogDetectionAddRecord,
DefenderMPLogDetectionEventRecord,
DefenderMPLogEMSRecord,
DefenderMPLogExclusionRecord,
DefenderMPLogLowfiRecord,
DefenderMPLogMinFilBlockedFileRecord,
DefenderMPLogMinFilUSSRecord,
DefenderMPLogOriginalFileNameRecord,
DefenderMPLogProcessImageRecord,
DefenderMPLogResourceScanRecord,
DefenderMPLogRTPRecord,
DefenderMPLogThreatActionRecord,
DefenderMPLogThreatRecord,
)

DEFENDER_EVTX_FIELDS = [
("datetime", "ts"),
Expand Down Expand Up @@ -73,6 +97,7 @@
EVTX_PROVIDER_NAME = "Microsoft-Windows-Windows Defender"

DEFENDER_QUARANTINE_DIR = "sysvol/programdata/microsoft/windows defender/quarantine"
DEFENDER_MPLOG_DIR = "sysvol/programdata/microsoft/windows defender/support"
DEFENDER_KNOWN_DETECTION_TYPES = [b"internalbehavior", b"regkey", b"runkey"]

DEFENDER_EXCLUSION_KEY = "HKLM\\SOFTWARE\\Microsoft\\Windows Defender\\Exclusions"
Expand Down Expand Up @@ -494,6 +519,198 @@ def exclusions(self) -> Iterator[DefenderExclusionRecord]:
value=exclusion_value,
)

def _mplog_processimage(self, data: dict) -> Iterator[DefenderMPLogProcessImageRecord]:
yield DefenderMPLogProcessImageRecord(**data)

def _mplog_minfiluss(self, data: dict) -> Iterator[DefenderMPLogMinFilUSSRecord]:
yield DefenderMPLogMinFilUSSRecord(**data)

def _mplog_blockedfile(self, data: dict) -> Iterator[DefenderMPLogMinFilBlockedFileRecord]:
yield DefenderMPLogMinFilBlockedFileRecord(**data)

def _mplog_bmtelemetry(self, data: dict) -> Iterator[DefenderMPLogBMTelemetryRecord]:
data["ts"] = datetime.strptime(data["ts"], "%m-%d-%Y %H:%M:%S")
yield DefenderMPLogBMTelemetryRecord(**data)

def _mplog_ems(self, data: dict) -> Iterator[DefenderMPLogEMSRecord]:
yield DefenderMPLogEMSRecord(**data)

def _mplog_originalfilename(self, data: dict) -> Iterator[DefenderMPLogOriginalFileNameRecord]:
yield DefenderMPLogOriginalFileNameRecord(**data)

def _mplog_exclusion(self, data: dict) -> Iterator[DefenderMPLogExclusionRecord]:
yield DefenderMPLogExclusionRecord(**data)

def _mplog_lowfi(self, data: dict) -> Iterator[DefenderMPLogLowfiRecord]:
yield DefenderMPLogLowfiRecord(**data)

def _mplog_detectionadd(self, data: dict) -> Iterator[DefenderMPLogDetectionAddRecord]:
yield DefenderMPLogDetectionAddRecord(**data)

def _mplog_threat(self, data: dict) -> Iterator[DefenderMPLogThreatRecord]:
yield DefenderMPLogThreatRecord(**data)

def _mplog_resourcescan(self, data: dict) -> Iterator[DefenderMPLogResourceScanRecord]:
data["start_time"] = datetime.strptime(data["start_time"], "%m-%d-%Y %H:%M:%S")
data["end_time"] = datetime.strptime(data["end_time"], "%m-%d-%Y %H:%M:%S")
data["ts"] = data["start_time"]
rest = data.pop("rest")
yield DefenderMPLogResourceScanRecord(
threats=re.findall("Threat Name:([^\n]+)", rest),
resources=re.findall("Resource Path:([^\n]+)", rest),
**data,
)

def _mplog_threataction(self, data: dict) -> Iterator[DefenderMPLogThreatActionRecord]:
data["ts"] = datetime.strptime(data["ts"], "%m-%d-%Y %H:%M:%S")
rest = data.pop("rest")
yield DefenderMPLogThreatActionRecord(
threats=re.findall("Threat Name:([^\n]+)", rest),
resources=re.findall("(?:Path|File Name):([^\n]+)", rest),
actions=re.findall("Action:([^\n]+)", rest),
**data,
)

def _mplog_rtp_log(self, data: dict) -> Iterator[DefenderMPLogRTPRecord]:
times = {}
for dtkey in ["ts", "last_perf", "first_rtp_scan"]:
try:
times[dtkey] = datetime.strptime(data[dtkey], "%m-%d-%Y %H:%M:%S")
except ValueError:
pass

yield DefenderMPLogRTPRecord(
_target=self.target,
source_log=data["source_log"],
**times,
plugin_states=re.findall(r"^\s+(.*)$", data["plugin_states"])[0],
process_exclusions=re.findall(DEFENDER_MPLOG_LINE, data["process_exclusions"]),
path_exclusions=re.findall(DEFENDER_MPLOG_LINE, data["path_exclusions"]),
ext_exclusions=re.findall(DEFENDER_MPLOG_LINE, data["ext_exclusions"]),
)

def _mplog_detectionevent(self, data: dict) -> Iterator[DefenderMPLogDetectionEventRecord]:
yield DefenderMPLogDetectionEventRecord(**data)

def _mplog_line(
self, mplog_line: str, source: Path
) -> Iterator[
DefenderMPLogProcessImageRecord
| DefenderMPLogMinFilUSSRecord
| DefenderMPLogMinFilBlockedFileRecord
| DefenderMPLogEMSRecord
| DefenderMPLogOriginalFileNameRecord
| DefenderMPLogExclusionRecord
| DefenderMPLogLowfiRecord
| DefenderMPLogDetectionAddRecord
| DefenderMPLogThreatRecord
| DefenderMPLogDetectionEventRecord
]:
for pattern, record in DEFENDER_MPLOG_PATTERNS:
if match := pattern.match(mplog_line):
data = match.groupdict()
data["_target"] = self.target
data["source_log"] = source
yield from getattr(self, f"_mplog_{record.name.split('/')[-1:][0]}")(data)

def _mplog_block(
self, mplog_line: str, mplog: TextIO, source: Path
) -> Iterator[DefenderMPLogResourceScanRecord | DefenderMPLogThreatActionRecord | DefenderMPLogRTPRecord]:
block = ""
for prefix, suffix, pattern, record in DEFENDER_MPLOG_BLOCK_PATTERNS:
if prefix.search(mplog_line):
block += mplog_line
break
if block:
while mplog_line := mplog.readline():
block += mplog_line
if suffix.search(mplog_line):
break
match = pattern.match(block)
data = match.groupdict()
data["_target"] = self.target
data["source_log"] = source
yield from getattr(self, f"_mplog_{record.name.split('/')[-1:][0]}")(data)

def _mplog(
self, mplog: TextIO, source: Path
) -> Iterator[
DefenderMPLogProcessImageRecord
| DefenderMPLogMinFilUSSRecord
| DefenderMPLogMinFilBlockedFileRecord
| DefenderMPLogBMTelemetryRecord
| DefenderMPLogEMSRecord
| DefenderMPLogOriginalFileNameRecord
| DefenderMPLogExclusionRecord
| DefenderMPLogLowfiRecord
| DefenderMPLogDetectionAddRecord
| DefenderMPLogThreatRecord
| DefenderMPLogDetectionEventRecord
| DefenderMPLogResourceScanRecord
| DefenderMPLogThreatActionRecord
| DefenderMPLogRTPRecord
]:
while mplog_line := mplog.readline():
yield from self._mplog_line(mplog_line, source)
yield from self._mplog_block(mplog_line, mplog, source)

@plugin.export(
record=[
DefenderMPLogProcessImageRecord,
DefenderMPLogMinFilUSSRecord,
DefenderMPLogMinFilBlockedFileRecord,
DefenderMPLogBMTelemetryRecord,
DefenderMPLogEMSRecord,
DefenderMPLogOriginalFileNameRecord,
DefenderMPLogExclusionRecord,
DefenderMPLogLowfiRecord,
DefenderMPLogDetectionAddRecord,
DefenderMPLogThreatRecord,
DefenderMPLogDetectionEventRecord,
DefenderMPLogResourceScanRecord,
DefenderMPLogThreatActionRecord,
DefenderMPLogRTPRecord,
]
)
def mplog(
self,
) -> Iterator[
DefenderMPLogProcessImageRecord
| DefenderMPLogMinFilUSSRecord
| DefenderMPLogMinFilBlockedFileRecord
| DefenderMPLogBMTelemetryRecord
| DefenderMPLogEMSRecord
| DefenderMPLogOriginalFileNameRecord
| DefenderMPLogExclusionRecord
| DefenderMPLogLowfiRecord
| DefenderMPLogDetectionAddRecord
| DefenderMPLogThreatRecord
| DefenderMPLogDetectionEventRecord
| DefenderMPLogResourceScanRecord
| DefenderMPLogThreatActionRecord
| DefenderMPLogRTPRecord
]:
"""Return the contents of the Defender MPLog file.
References:
- https://www.crowdstrike.com/blog/how-to-use-microsoft-protection-logging-for-forensic-investigations/
- https://www.intrinsec.com/hunt-mplogs/
- https://github.com/Intrinsec/mplog_parser
"""
mplog_directory = self.target.fs.path(DEFENDER_MPLOG_DIR)

if not (mplog_directory.exists() and mplog_directory.is_dir()):
return

for mplog_file in mplog_directory.glob("MPLog-*"):
for encoding in ["UTF-16", "UTF-8"]:
try:
with mplog_file.open("rt", encoding=encoding) as mplog:
yield from self._mplog(mplog, self.target.fs.path(mplog_file))
break
except UnicodeError:
continue

@plugin.arg(
"--output",
"-o",
Expand Down
Empty file.
Loading

0 comments on commit b89b12d

Please sign in to comment.