diff --git a/src/dnfile/stream.py b/src/dnfile/stream.py index c1ab692..8181840 100644 --- a/src/dnfile/stream.py +++ b/src/dnfile/stream.py @@ -6,14 +6,16 @@ https://www.ntcore.com/files/dotnetformat.htm https://referencesource.microsoft.com/System.AddIn/System/Addin/MiniReflection/MetadataReader/Metadata.cs.html#123 + ECMA-335 6th Edition, June 2012, Section II.24.2.4 #US and #Blob heaps -Copyright (c) 2020-2022 MalwareFrank +Copyright (c) 2020-2024 MalwareFrank """ import struct as _struct import logging from typing import Dict, List, Tuple, Union, Optional from binascii import hexlify as _hexlify +import collections as _collections from pefile import MAX_STRING_LENGTH, Structure @@ -30,10 +32,61 @@ class GenericStream(base.ClrStream): pass +class HeapItemString(base.HeapItem, _collections.abc.Sequence): + encoding: Optional[str] + + def __init__(self, data, encoding="utf-8"): + super().__init__(data) + self.encoding = encoding + self.value = self.__data__.decode(encoding) + + def __str__(self) -> str: + return self.value + + def __eq__(self, other): + if isinstance(other, str): + return str(self) == other + return super().__eq__(other) + + def __getitem__(self, i): + return self.value[i] + + def __len__(self): + return len(self.value) + + +class HeapItemBinary(base.HeapItem, _collections.abc.Sequence): + item_size: Optional[base.CompressedInt] + + def __init__(self, data: bytes, rva: Optional[int] = None): + # read compressed int, which has a max size of four bytes + self.item_size = base.CompressedInt.read(data[:4], rva) + if self.item_size is None: + raise ValueError("invalid compressed int") + base.HeapItem.__init__(self, data[:self.item_size.raw_size + self.item_size], rva) + + # read data + offset = self.item_size.raw_size + self.value = self.__data__[offset:offset + self.item_size] + + def __eq__(self, other): + if isinstance(other, base.HeapItem): + return base.HeapItem.__eq__(self, other) + if isinstance(other, bytes): + return self.value == other + return False + + def __getitem__(self, i): + return self.value[i] + + def __len__(self): + return len(self.value) + + class StringsHeap(base.ClrHeap): offset_size = 0 - def get(self, index, max_length=MAX_STRING_LENGTH, encoding="utf-8", as_bytes=False): + def get_str(self, index, max_length=MAX_STRING_LENGTH, encoding="utf-8", as_bytes=False): """ Given an index (offset), read a null-terminated UTF-8 (or given encoding) string. Returns None on error, or string, or bytes if as_bytes is True. @@ -55,6 +108,27 @@ def get(self, index, max_length=MAX_STRING_LENGTH, encoding="utf-8", as_bytes=Fa s = data.decode(encoding) return s + def get(self, index, max_length=MAX_STRING_LENGTH, encoding="utf-8") -> Optional[HeapItemString]: + """ + Given an index (offset), read a null-terminated UTF-8 (or given encoding) string. + Returns a HeapItemString, or None on error. + """ + if not self.__data__ or index is None or not max_length: + return None + + if index >= len(self.__data__): + raise IndexError("index out of range") + + offset = index + end = self.__data__.find(b"\x00", offset) + if end - offset > max_length: + return None + + item = HeapItemString(self.__data__[offset:end], encoding) + item.rva = self.rva + offset + + return item + class BinaryHeap(base.ClrHeap): def get_with_size(self, index) -> Optional[Tuple[bytes, int]]: @@ -84,7 +158,7 @@ def get_with_size(self, index) -> Optional[Tuple[bytes, int]]: return data, length_size + data_length - def get(self, index) -> Optional[bytes]: + def get_bytes(self, index) -> Optional[bytes]: try: ret = self.get_with_size(index) except IndexError: @@ -97,12 +171,34 @@ def get(self, index) -> Optional[bytes]: return data + def get(self, index) -> Optional[HeapItemBinary]: + if self.__data__ is None: + logger.warning("stream has no data") + return None + + if index >= len(self.__data__): + logger.warning("stream is too small: wanted: 0x%x found: 0x%x", index, len(self.__data__)) + return None + + offset = index + + try: + item = HeapItemBinary(self.__data__[index:]) + except ValueError as e: + # possible invalid compressed int length, such as invalid leading flags. + logger.warning(f"stream entry error - {e} @ RVA=0x{hex(self.rva)}") + return None + + item.rva = self.rva + offset + + return item + class BlobHeap(BinaryHeap): pass -class UserString(object): +class UserString(HeapItemBinary, HeapItemString, _collections.abc.Sequence): """ The #US or UserStrings stream should contain UTF-16 strings. Each entry in the stream includes a byte indicating whether @@ -111,9 +207,60 @@ class UserString(object): Reference ECMA-335, Partition II Section 24.2.4 """ - def __init__(self, data: bytes, encoding="utf-16"): - self.__data__: bytes = data - self.value: str = data.decode(encoding) + + flag: Optional[int] = None + + def __init__(self, data: Union[bytes, HeapItemBinary], encoding="utf-16"): + self.encoding = encoding + if isinstance(data, bytes): + HeapItemBinary.__init__(self, data) + elif isinstance(data, HeapItemBinary): + HeapItemBinary.__init__(self, data.to_bytes()) + + if self.item_size % 2 == 1: + # > This final byte holds the value 1 if and only if any UTF16 + # > character within the string has any bit set in its top byte, + # > or its low byte is any of the following: + # > 0x01ā€“0x08, 0x0Eā€“0x1F, 0x27, 0x2D, 0x7F. + # + # via ECMA-335 6th edition, II.24.2.4 + # + # Trim this trailing flag, which is not part of the string. + buf = self.value + self.flag = buf[-1] + str_buf = buf[:-1] + if self.flag == 0x00: + # > Otherwise, it holds 0. + # + # via ECMA-335 6th edition, II.24.2.4 + # + # *should* be a normal UTF-16 string, but still not + # make sense. + pass + elif self.flag == 0x01: + # > The 1 signifies Unicode characters that require handling + # > beyond that normally provided for 8-bit encoding sets. + # + # via ECMA-335 6th edition, II.24.2.4 + # + # these strings are probably best interpreted as bytes. + pass + else: + logger.warning("unexpected string flag value: 0x%02x", flag) + else: + logger.warning("string missing trailing flag") + str_buf = self.value + + self.value = str_buf.decode(encoding) + + def __eq__(self, other): + return HeapItemString.__eq__(self, other) + + def __getitem__(self, i): + return self.value[i] + + def __len__(self): + return len(self.value) class UserStringHeap(BinaryHeap): @@ -158,24 +305,43 @@ def get(self, index) -> Optional[bytes]: return data def get_us(self, index, encoding="utf-16") -> Optional[UserString]: - """ - Fetch the user string at the given index and attempt to decode it as UTF-16. - - Note: the underlying data is not guaranteed to be well formed UTF-16, - so this routine may raise a UnicodeDecodeError when encountering such data. - You can always use `UserStringHeap.get()` to fetch the raw binary data. - """ - data = self.get(index) - if data is None: + bin_item = super().get(index) + if bin_item is None: return None - else: - return UserString(data, encoding=encoding) + + us_item = UserString(bin_item, encoding=encoding) + us_item.rva = bin_item.rva + us_item.item_size = bin_item.item_size + + return us_item + + +class HeapItemGuid(base.HeapItem): + + def __init__(self, data: bytes, rva: Optional[int] = None): + super().__init__(data, rva) + + @property + def value(self): + return self.__data__ + + def __str__(self): + data = self.__data__ + parts = _struct.unpack_from(" HeapItemGuid: + if index is None or index < 1: + return None + + size = 128 // 8 # number of bytes in a guid + # offset into the GUID stream + offset = (index - 1) * size + + if offset + size > len(self.__data__): + raise IndexError("index out of range") + + item = HeapItemGuid(self.__data__[offset:offset + size], self.rva + offset) + + return item + class MDTablesStruct(Structure): Reserved_1: int