Skip to content

Commit

Permalink
BREAKING CHANGE: Guid, Strings, and BinaryHeap .get() return HeapItem…
Browse files Browse the repository at this point in the history
… objects
  • Loading branch information
malwarefrank committed Mar 17, 2024
1 parent a4d4ce4 commit 4edd63e
Showing 1 changed file with 200 additions and 19 deletions.
219 changes: 200 additions & 19 deletions src/dnfile/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,16 @@
https://www.ntcore.com/files/dotnetformat.htm
https://referencesource.microsoft.com/System.AddIn/System/Addin/MiniReflection/MetadataReader/Metadata.cs.html#123
ECMA-335 6th Edition, June 2012, Section II.24.2.4 #US and #Blob heaps
Copyright (c) 2020-2022 MalwareFrank
Copyright (c) 2020-2024 MalwareFrank
"""

import struct as _struct
import logging
from typing import Dict, List, Tuple, Union, Optional
from binascii import hexlify as _hexlify
import collections as _collections

from pefile import MAX_STRING_LENGTH, Structure

Expand All @@ -30,10 +32,61 @@ class GenericStream(base.ClrStream):
pass


class HeapItemString(base.HeapItem, _collections.abc.Sequence):
encoding: Optional[str]

def __init__(self, data, encoding="utf-8"):
super().__init__(data)
self.encoding = encoding
self.value = self.__data__.decode(encoding)

def __str__(self) -> str:
return self.value

def __eq__(self, other):
if isinstance(other, str):
return str(self) == other
return super().__eq__(other)

def __getitem__(self, i):
return self.value[i]

def __len__(self):
return len(self.value)


class HeapItemBinary(base.HeapItem, _collections.abc.Sequence):
item_size: Optional[base.CompressedInt]

def __init__(self, data: bytes, rva: Optional[int] = None):
# read compressed int, which has a max size of four bytes
self.item_size = base.CompressedInt.read(data[:4], rva)
if self.item_size is None:
raise ValueError("invalid compressed int")
base.HeapItem.__init__(self, data[:self.item_size.raw_size + self.item_size], rva)

# read data
offset = self.item_size.raw_size
self.value = self.__data__[offset:offset + self.item_size]

def __eq__(self, other):
if isinstance(other, base.HeapItem):
return base.HeapItem.__eq__(self, other)
if isinstance(other, bytes):
return self.value == other
return False

def __getitem__(self, i):
return self.value[i]

def __len__(self):
return len(self.value)


class StringsHeap(base.ClrHeap):
offset_size = 0

def get(self, index, max_length=MAX_STRING_LENGTH, encoding="utf-8", as_bytes=False):
def get_str(self, index, max_length=MAX_STRING_LENGTH, encoding="utf-8", as_bytes=False):
"""
Given an index (offset), read a null-terminated UTF-8 (or given encoding) string.
Returns None on error, or string, or bytes if as_bytes is True.
Expand All @@ -55,6 +108,27 @@ def get(self, index, max_length=MAX_STRING_LENGTH, encoding="utf-8", as_bytes=Fa
s = data.decode(encoding)
return s

def get(self, index, max_length=MAX_STRING_LENGTH, encoding="utf-8") -> Optional[HeapItemString]:
"""
Given an index (offset), read a null-terminated UTF-8 (or given encoding) string.
Returns a HeapItemString, or None on error.
"""
if not self.__data__ or index is None or not max_length:
return None

if index >= len(self.__data__):
raise IndexError("index out of range")

offset = index
end = self.__data__.find(b"\x00", offset)
if end - offset > max_length:
return None

item = HeapItemString(self.__data__[offset:end], encoding)
item.rva = self.rva + offset

return item


class BinaryHeap(base.ClrHeap):
def get_with_size(self, index) -> Optional[Tuple[bytes, int]]:
Expand Down Expand Up @@ -84,7 +158,7 @@ def get_with_size(self, index) -> Optional[Tuple[bytes, int]]:

return data, length_size + data_length

def get(self, index) -> Optional[bytes]:
def get_bytes(self, index) -> Optional[bytes]:
try:
ret = self.get_with_size(index)
except IndexError:
Expand All @@ -97,12 +171,34 @@ def get(self, index) -> Optional[bytes]:

return data

def get(self, index) -> Optional[HeapItemBinary]:
if self.__data__ is None:
logger.warning("stream has no data")
return None

if index >= len(self.__data__):
logger.warning("stream is too small: wanted: 0x%x found: 0x%x", index, len(self.__data__))
return None

offset = index

try:
item = HeapItemBinary(self.__data__[index:])
except ValueError as e:
# possible invalid compressed int length, such as invalid leading flags.
logger.warning(f"stream entry error - {e} @ RVA=0x{hex(self.rva)}")
return None

item.rva = self.rva + offset

return item


class BlobHeap(BinaryHeap):
pass


class UserString(object):
class UserString(HeapItemBinary, HeapItemString, _collections.abc.Sequence):
"""
The #US or UserStrings stream should contain UTF-16 strings.
Each entry in the stream includes a byte indicating whether
Expand All @@ -111,9 +207,60 @@ class UserString(object):
Reference ECMA-335, Partition II Section 24.2.4
"""
def __init__(self, data: bytes, encoding="utf-16"):
self.__data__: bytes = data
self.value: str = data.decode(encoding)

flag: Optional[int] = None

def __init__(self, data: Union[bytes, HeapItemBinary], encoding="utf-16"):
self.encoding = encoding
if isinstance(data, bytes):
HeapItemBinary.__init__(self, data)
elif isinstance(data, HeapItemBinary):
HeapItemBinary.__init__(self, data.to_bytes())

if self.item_size % 2 == 1:
# > This final byte holds the value 1 if and only if any UTF16
# > character within the string has any bit set in its top byte,
# > or its low byte is any of the following:
# > 0x01–0x08, 0x0E–0x1F, 0x27, 0x2D, 0x7F.
#
# via ECMA-335 6th edition, II.24.2.4
#
# Trim this trailing flag, which is not part of the string.
buf = self.value
self.flag = buf[-1]
str_buf = buf[:-1]
if self.flag == 0x00:
# > Otherwise, it holds 0.
#
# via ECMA-335 6th edition, II.24.2.4
#
# *should* be a normal UTF-16 string, but still not
# make sense.
pass
elif self.flag == 0x01:
# > The 1 signifies Unicode characters that require handling
# > beyond that normally provided for 8-bit encoding sets.
#
# via ECMA-335 6th edition, II.24.2.4
#
# these strings are probably best interpreted as bytes.
pass
else:
logger.warning("unexpected string flag value: 0x%02x", flag)
else:
logger.warning("string missing trailing flag")
str_buf = self.value

self.value = str_buf.decode(encoding)

def __eq__(self, other):
return HeapItemString.__eq__(self, other)

def __getitem__(self, i):
return self.value[i]

def __len__(self):
return len(self.value)


class UserStringHeap(BinaryHeap):
Expand Down Expand Up @@ -158,24 +305,43 @@ def get(self, index) -> Optional[bytes]:
return data

def get_us(self, index, encoding="utf-16") -> Optional[UserString]:
"""
Fetch the user string at the given index and attempt to decode it as UTF-16.
Note: the underlying data is not guaranteed to be well formed UTF-16,
so this routine may raise a UnicodeDecodeError when encountering such data.
You can always use `UserStringHeap.get()` to fetch the raw binary data.
"""
data = self.get(index)
if data is None:
bin_item = super().get(index)
if bin_item is None:
return None
else:
return UserString(data, encoding=encoding)

us_item = UserString(bin_item, encoding=encoding)
us_item.rva = bin_item.rva
us_item.item_size = bin_item.item_size

return us_item


class HeapItemGuid(base.HeapItem):

def __init__(self, data: bytes, rva: Optional[int] = None):
super().__init__(data, rva)

@property
def value(self):
return self.__data__

def __str__(self):
data = self.__data__
parts = _struct.unpack_from("<IHH", data)
part3 = _hexlify(data[8:10])
part4 = _hexlify(data[10:16])
part3 = part3.decode("ascii")
part4 = part4.decode("ascii")
return f"{parts[0]:08x}-{parts[1]:04x}-{parts[2]:04x}-{part3}-{part4}"

def __repr__(self):
return f"HeapItemGuid(data={self.__data__},rva={self.rva})"


class GuidHeap(base.ClrHeap):
offset_size = 0

def get(self, index, as_bytes=False):
def get_str(self, index, as_bytes=False):
if index is None or index < 1:
return None

Expand All @@ -199,6 +365,21 @@ def get(self, index, as_bytes=False):
parts[0], parts[1], parts[2], part3, part4
)

def get(self, index) -> HeapItemGuid:
if index is None or index < 1:
return None

size = 128 // 8 # number of bytes in a guid
# offset into the GUID stream
offset = (index - 1) * size

if offset + size > len(self.__data__):
raise IndexError("index out of range")

item = HeapItemGuid(self.__data__[offset:offset + size], self.rva + offset)

return item


class MDTablesStruct(Structure):
Reserved_1: int
Expand Down

0 comments on commit 4edd63e

Please sign in to comment.