Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve DPAPI plugin #711

Merged
merged 22 commits into from
Sep 6, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion dissect/target/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -970,7 +970,7 @@ def __init_subclass_subplugin__(cls, **kwargs):
continue

# The method needs to output records
if getattr(subplugin_func, "__output__", None) != "record":
if getattr(subplugin_func, "__output__", None) not in ["record", "yield"]:
continue

# The method may not be part of a parent class.
Expand Down Expand Up @@ -1056,6 +1056,10 @@ def __init_subclass__(cls, **kwargs):
cls.__init_subclass_namespace__(cls, **kwargs)


class InternalNamespacePlugin(NamespacePlugin):
pass


class InternalPlugin(Plugin):
"""Parent class for internal plugins.

Expand Down
Empty file.
171 changes: 171 additions & 0 deletions dissect/target/plugins/os/windows/credential/lsa.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
import hashlib
from functools import cached_property
from struct import unpack
JSCU-CNI marked this conversation as resolved.
Show resolved Hide resolved
from typing import Iterator, Optional
JSCU-CNI marked this conversation as resolved.
Show resolved Hide resolved

from dissect.target.exceptions import UnsupportedPluginError
from dissect.target.helpers.record import TargetRecordDescriptor
from dissect.target.plugin import Plugin, export

try:
from Crypto.Cipher import AES, ARC4, DES

HAS_CRYPTO = True
except ImportError:
HAS_CRYPTO = False


LSASecretRecord = TargetRecordDescriptor(
"windows/credential/lsa",
[
("datetime", "ts"),
("string", "name"),
("string", "value"),
],
)


class LSAPlugin(Plugin):
"""Windows Local Security Authority (LSA) plugin.

Resources:
- https://learn.microsoft.com/en-us/windows/win32/secauthn/lsa-authentication
- https://moyix.blogspot.com/2008/02/decrypting-lsa-secrets.html (Windows XP)
- https://github.com/fortra/impacket/blob/master/impacket/examples/secretsdump.py
"""

__namespace__ = "lsa"

SECURITY_POLICY_KEY = "HKEY_LOCAL_MACHINE\\SECURITY\\Policy"
SYSTEM_KEY = "HKEY_LOCAL_MACHINE\\SYSTEM\\CurrentControlSet\\Control\\LSA"

def check_compatible(self) -> None:
if not HAS_CRYPTO:
raise UnsupportedPluginError("Missing pycryptodome dependency")

if not self.target.has_function("registry") or not list(self.target.registry.keys(self.SYSTEM_KEY)):
raise UnsupportedPluginError("Registry key not found: %s", self.SYSTEM_KEY)

@cached_property
def syskey(self) -> bytes:
"""Return byte value of Windows system SYSKEY, also called BootKey."""
lsa = self.target.registry.key(self.SYSTEM_KEY)
syskey_keys = ["JD", "Skew1", "GBG", "Data"]
# This magic value rotates the order of the data
alterator = [0x8, 0x5, 0x4, 0x2, 0xB, 0x9, 0xD, 0x3, 0x0, 0x6, 0x1, 0xC, 0xE, 0xA, 0xF, 0x7]

r = bytes.fromhex("".join([lsa.subkey(key).class_name for key in syskey_keys]))
return bytes(r[i] for i in alterator)

@cached_property
def lsakey(self) -> bytes:
"""Decrypt and return the LSA key of the Windows system."""
security_pol = self.target.registry.key(self.SECURITY_POLICY_KEY)

# Windows Vista or newer
if key := security_pol.subkeys().mapping.get("PolEKList"):
enc_key = key.value("(Default)").value
lsa_key = _decrypt_aes(enc_key, self.syskey)
return lsa_key[68:100]

# Windows XP
if key := security_pol.subkeys().mapping.get("PolSecretEncryptionKey"):
enc_key = key.value("(Default)").value
lsa_key = _decrypt_rc4(enc_key, self.syskey)
return lsa_key[16:32]

raise ValueError("Unable to determine LSA policy key location in registry")
JSCU-CNI marked this conversation as resolved.
Show resolved Hide resolved

@cached_property
def _secrets(self) -> Optional[dict[str, bytes]]:
JSCU-CNI marked this conversation as resolved.
Show resolved Hide resolved
"""Return dict of Windows system decrypted LSA secrets."""
if not self.target.ntversion:
raise ValueError("Unable to determine Windows NT version")

result = {}

reg_secrets = self.target.registry.key(self.SECURITY_POLICY_KEY).subkey("Secrets")
for subkey in reg_secrets.subkeys():
JSCU-CNI marked this conversation as resolved.
Show resolved Hide resolved
enc_data = subkey.subkey("CurrVal").value("(Default)").value

# Windows Vista or newer
if float(self.target.ntversion) >= 6.0:
secret = _decrypt_aes(enc_data, self.lsakey)

# Windows XP
else:
secret = _decrypt_des(enc_data, self.lsakey)

result[subkey.name] = secret

return result

@export(record=LSASecretRecord)
def secrets(self) -> Iterator[LSASecretRecord]:
"""Yield decrypted LSA secrets from a Windows target."""
for key, value in self._secrets.items():
yield LSASecretRecord(
ts=self.target.registry.key(f"{self.SECURITY_POLICY_KEY}\\Secrets").subkeys().mapping.get(key).ts,
JSCU-CNI marked this conversation as resolved.
Show resolved Hide resolved
name=key,
value=value.hex(),
_target=self.target,
)


def _decrypt_aes(data: bytes, key: bytes) -> bytes:
ctx = hashlib.sha256()
ctx.update(key)
for _ in range(1, 1000 + 1):
ctx.update(data[28:60])

ciphertext = data[60:]
plaintext = b""
JSCU-CNI marked this conversation as resolved.
Show resolved Hide resolved

for i in range(0, len(ciphertext), 16):
cipher = AES.new(key=ctx.digest(), mode=AES.MODE_CBC, iv=b"\x00" * 16)
JSCU-CNI marked this conversation as resolved.
Show resolved Hide resolved
plaintext += cipher.decrypt(ciphertext[i : i + 16].ljust(16, b"\x00"))
JSCU-CNI marked this conversation as resolved.
Show resolved Hide resolved

return plaintext
JSCU-CNI marked this conversation as resolved.
Show resolved Hide resolved


def _decrypt_rc4(data: bytes, key: bytes) -> bytes:
md5 = hashlib.md5()
md5.update(key)
for _ in range(1000):
md5.update(data[60:76])
rc4_key = md5.digest()

cipher = ARC4.new(rc4_key)
return cipher.decrypt(data[12:60])


def _decrypt_des(data: bytes, key: bytes) -> bytes:
plaintext = b""
JSCU-CNI marked this conversation as resolved.
Show resolved Hide resolved

enc_size = unpack("<I", data[:4])[0]
JSCU-CNI marked this conversation as resolved.
Show resolved Hide resolved
data = data[len(data) - enc_size :]

key0 = key
for _ in range(0, len(data), 8):
ciphertext = data[:8]
block_key = transform_key(key0[:7])
JSCU-CNI marked this conversation as resolved.
Show resolved Hide resolved

cipher = DES.new(block_key, DES.MODE_ECB)
plaintext += cipher.decrypt(ciphertext)
JSCU-CNI marked this conversation as resolved.
Show resolved Hide resolved

key0 = key0[7:]
data = data[8:]

if len(key0) < 7:
key0 = key[len(key0) :]

return plaintext
JSCU-CNI marked this conversation as resolved.
Show resolved Hide resolved


def transform_key(key: bytes) -> bytes:
JSCU-CNI marked this conversation as resolved.
Show resolved Hide resolved
new_key = []
new_key.append(((key[0] >> 0x01) << 1) & 0xFE)
for i in range(0, 6):
new_key.append((((key[i] & ((1 << (i + 1)) - 1)) << (6 - i) | (key[i + 1] >> (i + 2))) << 1) & 0xFE)
new_key.append(((key[6] & 0x7F) << 1) & 0xFE)
return bytes(new_key)
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@
c_sam = cstruct().load(sam_def)

SamRecord = TargetRecordDescriptor(
"windows/registry/sam",
"windows/credential/sam",
[
("uint32", "rid"),
("string", "fullname"),
Expand Down Expand Up @@ -303,6 +303,9 @@ def check_compatible(self) -> None:
if not HAS_CRYPTO:
raise UnsupportedPluginError("Missing pycryptodome dependency")

if not self.target.has_function("lsa"):
raise UnsupportedPluginError("LSA plugin is required for SAM plugin")

if not len(list(self.target.registry.keys(self.SAM_KEY))) > 0:
raise UnsupportedPluginError(f"Registry key not found: {self.SAM_KEY}")

Expand Down Expand Up @@ -374,7 +377,7 @@ def sam(self) -> Iterator[SamRecord]:
nt (string): Parsed NT-hash.
"""

syskey = self.target.dpapi.syskey # aka. bootkey
syskey = self.target.lsa.syskey # aka. bootkey
samkey = self.calculate_samkey(syskey) # aka. hashed bootkey or hbootkey

almpassword = b"LMPASSWORD\0"
Expand Down
3 changes: 3 additions & 0 deletions dissect/target/plugins/os/windows/dpapi/blob.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,9 @@ def decrypt(
if self.decrypted:
return True

if not master_key:
raise ValueError("No master key provided to decrypt blob with")

for algo in [crypt_session_key_type1, crypt_session_key_type2]:
session_key = algo(
master_key,
Expand Down
56 changes: 49 additions & 7 deletions dissect/target/plugins/os/windows/dpapi/crypto.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from typing import Optional, Union
JSCU-CNI marked this conversation as resolved.
Show resolved Hide resolved

try:
from Crypto.Cipher import AES, ARC4
from Crypto.Cipher import AES, ARC4, DES3

HAS_CRYPTO = True
except ImportError:
Expand Down Expand Up @@ -35,17 +35,27 @@ def from_name(cls, name: str) -> CipherAlgorithm:
return CIPHER_ALGORITHMS[name]()

def derive_key(self, key: bytes, hash_algorithm: HashAlgorithm) -> bytes:
"""Mimics the corresponding native Microsoft function."""
"""Mimics the corresponding native Microsoft function.

Resources:
- https://github.com/tijldeneut/DPAPIck3/blob/main/dpapick3/crypto.py#L185
"""

if len(key) > hash_algorithm.block_length:
key = hashlib.new(hash_algorithm.name, key).digest()

if len(key) >= hash_algorithm.digest_length:
if len(key) >= self.key_length:
return key

key = key.ljust(hash_algorithm.block_length, b"\x00")
pad1 = bytes(c ^ 0x36 for c in key)
pad2 = bytes(c ^ 0x5C for c in key)
return hashlib.new(hash_algorithm.name, pad1).digest() + hashlib.new(hash_algorithm.name, pad2).digest()
pad1 = bytes(c ^ 0x36 for c in key)[: hash_algorithm.block_length]
pad2 = bytes(c ^ 0x5C for c in key)[: hash_algorithm.block_length]
key = hashlib.new(hash_algorithm.name, pad1).digest() + hashlib.new(hash_algorithm.name, pad2).digest()
key = self.fixup_key(key)
return key

def fixup_key(self, key: bytes) -> bytes:
return key

def decrypt_with_hmac(
self, data: bytes, key: bytes, iv: bytes, hash_algorithm: HashAlgorithm, rounds: int
Expand Down Expand Up @@ -108,6 +118,37 @@ def decrypt(self, data: bytes, key: bytes, iv: Optional[bytes] = None) -> bytes:
return cipher.decrypt(data)


class _DES3(CipherAlgorithm):
id = 0x6603
name = "DES3"
key_length = 192 // 8
iv_length = 64 // 8
block_length = 64 // 8

def fixup_key(self, key: bytes) -> bytes:
nkey = bytearray()
JSCU-CNI marked this conversation as resolved.
Show resolved Hide resolved
for byte in key:
parity_bit = 0
for i in range(8):
parity_bit ^= (byte >> i) & 1

if parity_bit == 0:
nkey.append(byte)
else:
nkey.append(byte | 1)
JSCU-CNI marked this conversation as resolved.
Show resolved Hide resolved
return bytes(nkey[0 : self.key_length])
JSCU-CNI marked this conversation as resolved.
Show resolved Hide resolved

def decrypt(self, data: bytes, key: bytes, iv: Optional[bytes] = None) -> bytes:
JSCU-CNI marked this conversation as resolved.
Show resolved Hide resolved
if not HAS_CRYPTO:
raise RuntimeError("Missing pycryptodome dependency")

if len(key) != 24:
raise ValueError(f"Invalid DES3 CBC key length {len(key)}")

cipher = DES3.new(key, mode=DES3.MODE_CBC, iv=iv if iv else b"\x00" * 8)
JSCU-CNI marked this conversation as resolved.
Show resolved Hide resolved
return cipher.decrypt(data)


class HashAlgorithm:
id: int
name: str
Expand Down Expand Up @@ -148,7 +189,7 @@ class _HMAC(_SHA1):


class _SHA256(HashAlgorithm):
id = 0x8004
id = 0x800C
name = "sha256"
digest_length = 256 // 8
block_length = 512 // 8
Expand Down Expand Up @@ -218,6 +259,7 @@ def crypt_session_key_type1(
strong_password: Optional password used for decryption or the blob itself.
smart_card_secret: Optional MS Next Gen Crypto secret (e.g. from PIN code).
verify_blob: Optional encrypted blob used for integrity check.

Returns:
decryption key
"""
Expand Down
Loading