From ebbc1fa6d678223e3226678cc5ffb2e8cb85a30b Mon Sep 17 00:00:00 2001 From: doomedraven Date: Wed, 30 Oct 2024 16:31:32 +0100 Subject: [PATCH] RKP as package --- lib/parsers_aux/__init__.py | 0 lib/parsers_aux/ratking/__init__.py | 211 ------------- .../ratking/config_parser_exception.py | 31 -- lib/parsers_aux/ratking/readme.md | 35 --- lib/parsers_aux/ratking/utils/__init__.py | 25 -- lib/parsers_aux/ratking/utils/config_item.py | 149 --------- lib/parsers_aux/ratking/utils/data_utils.py | 62 ---- .../ratking/utils/decryptors/__init__.py | 50 --- .../utils/decryptors/config_decryptor.py | 53 ---- .../decryptors/config_decryptor_aes_cbc.py | 290 ------------------ .../decryptors/config_decryptor_aes_ecb.py | 152 --------- .../config_decryptor_decrypt_xor.py | 121 -------- .../decryptors/config_decryptor_plaintext.py | 124 -------- .../config_decryptor_random_hardcoded.py | 96 ------ .../ratking/utils/dotnet_constants.py | 86 ------ .../ratking/utils/dotnetpe_payload.py | 199 ------------ modules/processing/parsers/CAPE/AsyncRAT.py | 2 +- modules/processing/parsers/CAPE/DCRat.py | 2 +- modules/processing/parsers/CAPE/QuasarRAT.py | 2 +- modules/processing/parsers/CAPE/VenomRAT.py | 2 +- modules/processing/parsers/CAPE/XWorm.py | 2 +- modules/processing/parsers/CAPE/XenoRAT.py | 2 +- poetry.lock | 25 +- pyproject.toml | 2 + 24 files changed, 32 insertions(+), 1691 deletions(-) delete mode 100644 lib/parsers_aux/__init__.py delete mode 100644 lib/parsers_aux/ratking/__init__.py delete mode 100644 lib/parsers_aux/ratking/config_parser_exception.py delete mode 100644 lib/parsers_aux/ratking/readme.md delete mode 100644 lib/parsers_aux/ratking/utils/__init__.py delete mode 100644 lib/parsers_aux/ratking/utils/config_item.py delete mode 100644 lib/parsers_aux/ratking/utils/data_utils.py delete mode 100644 lib/parsers_aux/ratking/utils/decryptors/__init__.py delete mode 100644 lib/parsers_aux/ratking/utils/decryptors/config_decryptor.py delete mode 100644 lib/parsers_aux/ratking/utils/decryptors/config_decryptor_aes_cbc.py delete mode 100644 lib/parsers_aux/ratking/utils/decryptors/config_decryptor_aes_ecb.py delete mode 100644 lib/parsers_aux/ratking/utils/decryptors/config_decryptor_decrypt_xor.py delete mode 100644 lib/parsers_aux/ratking/utils/decryptors/config_decryptor_plaintext.py delete mode 100644 lib/parsers_aux/ratking/utils/decryptors/config_decryptor_random_hardcoded.py delete mode 100644 lib/parsers_aux/ratking/utils/dotnet_constants.py delete mode 100644 lib/parsers_aux/ratking/utils/dotnetpe_payload.py diff --git a/lib/parsers_aux/__init__.py b/lib/parsers_aux/__init__.py deleted file mode 100644 index e69de29bb2d..00000000000 diff --git a/lib/parsers_aux/ratking/__init__.py b/lib/parsers_aux/ratking/__init__.py deleted file mode 100644 index a73ff2deaa7..00000000000 --- a/lib/parsers_aux/ratking/__init__.py +++ /dev/null @@ -1,211 +0,0 @@ -#!/usr/bin/env python3 -# -# rat_config_parser.py -# -# Author: jeFF0Falltrades -# -# Provides the primary functionality for parsing configurations from the -# AsyncRAT, DcRAT, QuasarRAT, VenomRAT, XWorm, XenoRAT, etc. RAT families -# -# MIT License -# -# Copyright (c) 2024 Jeff Archer -# -# Permission is hereby granted, free of charge, to any person obtaining a copy -# of this software and associated documentation files (the "Software"), to deal -# in the Software without restriction, including without limitation the rights -# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -# copies of the Software, and to permit persons to whom the Software is -# furnished to do so, subject to the following conditions: -# -# The above copyright notice and this permission notice shall be included in all -# copies or substantial portions of the Software. -# -# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -# SOFTWARE. -from logging import getLogger - -# from os.path import isfile -from re import DOTALL, compile, search -from typing import Any, Tuple - -from .config_parser_exception import ConfigParserException -from .utils import config_item -from .utils.decryptors import SUPPORTED_DECRYPTORS, ConfigDecryptor, IncompatibleDecryptorException -from .utils.dotnetpe_payload import DotNetPEPayload - -# from yara import Rules - - -logger = getLogger(__name__) - - -class RATConfigParser: - # Min and max number of items in a potential config section - _MIN_CONFIG_LEN_FLOOR = 5 - _MIN_CONFIG_LEN_CEILING = 9 - - # Pattern to find the VerifyHash() method - _PATTERN_VERIFY_HASH = compile(rb"\x7e.{3}\x04(?:\x6f.{3}\x0a){2}\x74.{3}\x01", DOTALL) - - # def __init__(self, file_path: str, yara_rule: Rules = None) -> None: - def __init__(self, file_data: bytes = None) -> None: - self.report = { - "config": {}, - } - try: - # Filled in _decrypt_and_decode_config() - self._incompatible_decryptors: list[int] = [] - try: - self._dnpp = DotNetPEPayload(file_data) - except Exception as e: - raise e - # self.report["sha256"] = self._dnpp.sha256 - # self.report["yara_possible_family"] = self._dnpp.yara_match - - # Assigned in _decrypt_and_decode_config() - self._decryptor: ConfigDecryptor = None - self.report["config"] = self._get_config() - self.report["key"] = ( - self._decryptor.key.hex() if self._decryptor is not None and self._decryptor.key is not None else "None" - ) - self.report["salt"] = ( - self._decryptor.salt.hex() if self._decryptor is not None and self._decryptor.salt is not None else "None" - ) - except Exception as e: - # self.report["config"] = f"Exception encountered for {file_path}: {e}" - self.report["config"] = f"Exception encountered: {e}" - - # Decrypts/decodes values from an encrypted config and returns the - # decrypted/decoded config - def _decrypt_and_decode_config(self, encrypted_config: bytes, min_config_len: int) -> dict[str, Any]: - decoded_config = {} - - for item_class in config_item.SUPPORTED_CONFIG_ITEMS: - item = item_class() - # Translate config Field RVAs to Field names - item_data = {self._dnpp.field_name_from_rva(k): v for k, v in item.parse_from(encrypted_config).items()} - - if len(item_data) > 0: - if type(item) is config_item.EncryptedStringConfigItem: - # Translate config value RVAs to string values - for k in item_data: - item_data[k] = self._dnpp.user_string_from_rva(item_data[k]) - - # Attempt to decrypt encrypted values - for decryptor in SUPPORTED_DECRYPTORS: - if decryptor in self._incompatible_decryptors: - continue - - if self._decryptor is None: - # Try to instantiate the selected decryptor - # Add to incompatible list and move on upon failure - try: - self._decryptor = decryptor(self._dnpp) - except IncompatibleDecryptorException as ide: - logger.debug(f"Decryptor incompatible {decryptor} : {ide}") - self._incompatible_decryptors.append(decryptor) - continue - try: - # Try to decrypt the encrypted strings - # Continue to next compatible decryptor on failure - item_data = self._decryptor.decrypt_encrypted_strings(item_data) - break - except Exception as e: - logger.debug(f"Decryption failed with decryptor {decryptor} : {e}") - self._decryptor = None - - if self._decryptor is None: - raise ConfigParserException("All decryptors failed") - - elif type(item) is config_item.ByteArrayConfigItem: - for k in item_data: - arr_size, arr_rva = item_data[k] - item_data[k] = self._dnpp.byte_array_from_size_and_rva(arr_size, arr_rva).hex() - - decoded_config.update(item_data) - - if len(decoded_config) < min_config_len: - raise ConfigParserException(f"Minimum threshold of config items not met: {len(decoded_config)}/{min_config_len}") - return decoded_config - - # Searches for the RAT configuration section, using the VerifyHash() marker - # or brute-force, returning the decrypted config on success - def _get_config(self) -> dict[str, Any]: - logger.debug("Extracting config...") - try: - config_start, decrypted_config = self._get_config_verify_hash_method() - except Exception: - logger.debug("VerifyHash() method failed; Attempting .cctor brute force...") - # If the VerifyHash() method does not work, move to brute-forcing - # static constructors - try: - config_start, decrypted_config = self._get_config_cctor_brute_force() - except Exception as e: - raise ConfigParserException(f"Could not identify config: {e}") - logger.debug(f"Config found at RVA {hex(config_start)}...") - return decrypted_config - - # Attempts to retrieve the config via brute-force, looking through every - # static constructor (.cctor) and attempting to decode/decrypt a valid - # config from that constructor, returning the config RVA and decrypted - # config on success - def _get_config_cctor_brute_force(self) -> Tuple[int, dict[str, Any]]: - candidates = self._dnpp.methods_from_name(".cctor") - if len(candidates) == 0: - raise ConfigParserException("No .cctor method could be found") - - # For each .cctor method, map its RVA and body (in raw bytes) - candidate_cctor_data = {method.rva: self._dnpp.method_body_from_method(method) for method in candidates} - - config_start, decrypted_config = None, None - # Start at our ceiling value for number of config items - min_config_len = self._MIN_CONFIG_LEN_CEILING - - while decrypted_config is None and min_config_len >= self._MIN_CONFIG_LEN_FLOOR: - for method_rva, method_body in candidate_cctor_data.items(): - logger.debug(f"Attempting brute force at .cctor method at {hex(method_rva)}") - try: - config_start, decrypted_config = ( - method_rva, - self._decrypt_and_decode_config(method_body, min_config_len), - ) - break - except Exception as e: - logger.debug(f"Brute force failed for method at {hex(method_rva)}: {e}") - continue - # Reduce the minimum config length until we reach our floor - min_config_len -= 1 - - if decrypted_config is None: - raise ConfigParserException("No valid configuration could be parsed from any .cctor methods") - return config_start, decrypted_config - - # Attempts to retrieve the config via looking for a config section preceded - # by the VerifyHash() method typically found in a Settings module, - # returning the config RVA and decrypted config on success - def _get_config_verify_hash_method(self) -> Tuple[int, dict[str, Any]]: - # Identify the VerifyHash() Method code - verify_hash_hit = search(self._PATTERN_VERIFY_HASH, self._dnpp.data) - if verify_hash_hit is None: - raise ConfigParserException("Could not identify VerifyHash() marker") - - # Reverse the hit to find the VerifyHash() method, then grab the - # subsequent function - config_method = self._dnpp.method_from_instruction_offset(verify_hash_hit.start(), 1) - encrypted_config = self._dnpp.method_body_from_method(config_method) - min_config_len = self._MIN_CONFIG_LEN_CEILING - while True: - try: - decrypted_config = self._decrypt_and_decode_config(encrypted_config, min_config_len) - return config_method.rva, decrypted_config - except Exception as e: - # Reduce the minimum config length until we reach our floor - if min_config_len < self._MIN_CONFIG_LEN_FLOOR: - raise e - min_config_len -= 1 diff --git a/lib/parsers_aux/ratking/config_parser_exception.py b/lib/parsers_aux/ratking/config_parser_exception.py deleted file mode 100644 index 2b8c1b06282..00000000000 --- a/lib/parsers_aux/ratking/config_parser_exception.py +++ /dev/null @@ -1,31 +0,0 @@ -#!/usr/bin/env python3 -# -# config_parser_exception.py -# -# Author: jeFF0Falltrades -# -# A simple custom Exception class for use with configuration parsing actions -# -# MIT License -# -# Copyright (c) 2024 Jeff Archer -# -# Permission is hereby granted, free of charge, to any person obtaining a copy -# of this software and associated documentation files (the "Software"), to deal -# in the Software without restriction, including without limitation the rights -# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -# copies of the Software, and to permit persons to whom the Software is -# furnished to do so, subject to the following conditions: -# -# The above copyright notice and this permission notice shall be included in all -# copies or substantial portions of the Software. -# -# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -# SOFTWARE. -class ConfigParserException(Exception): - pass diff --git a/lib/parsers_aux/ratking/readme.md b/lib/parsers_aux/ratking/readme.md deleted file mode 100644 index 28fc18ea444..00000000000 --- a/lib/parsers_aux/ratking/readme.md +++ /dev/null @@ -1,35 +0,0 @@ -All works here is done by https://github.com/jeFF0Falltrades/rat_king_parser - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - diff --git a/lib/parsers_aux/ratking/utils/__init__.py b/lib/parsers_aux/ratking/utils/__init__.py deleted file mode 100644 index 716cb99880a..00000000000 --- a/lib/parsers_aux/ratking/utils/__init__.py +++ /dev/null @@ -1,25 +0,0 @@ -#!/usr/bin/env python3 -# -# __init__.py -# -# Author: jeFF0Falltrades -# -# Copyright (c) 2024 Jeff Archer -# -# Permission is hereby granted, free of charge, to any person obtaining a copy -# of this software and associated documentation files (the "Software"), to deal -# in the Software without restriction, including without limitation the rights -# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -# copies of the Software, and to permit persons to whom the Software is -# furnished to do so, subject to the following conditions: -# -# The above copyright notice and this permission notice shall be included in all -# copies or substantial portions of the Software. -# -# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -# SOFTWARE. diff --git a/lib/parsers_aux/ratking/utils/config_item.py b/lib/parsers_aux/ratking/utils/config_item.py deleted file mode 100644 index e466c018b5e..00000000000 --- a/lib/parsers_aux/ratking/utils/config_item.py +++ /dev/null @@ -1,149 +0,0 @@ -#!/usr/bin/env python3 -# -# config_item.py -# -# Author: jeFF0Falltrades -# -# Provides a utility class for parsing field names and values of various types -# from raw RAT config data -# -# MIT License -# -# Copyright (c) 2024 Jeff Archer -# -# Permission is hereby granted, free of charge, to any person obtaining a copy -# of this software and associated documentation files (the "Software"), to deal -# in the Software without restriction, including without limitation the rights -# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -# copies of the Software, and to permit persons to whom the Software is -# furnished to do so, subject to the following conditions: -# -# The above copyright notice and this permission notice shall be included in all -# copies or substantial portions of the Software. -# -# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -# SOFTWARE. -from abc import ABC, abstractmethod -from logging import getLogger -from re import DOTALL, compile, findall -from typing import Any, Tuple - -from .data_utils import bytes_to_int -from .dotnet_constants import OPCODE_LDC_I4_0, SpecialFolder - -logger = getLogger(__name__) - - -# Provides an abstract class for config items -class ConfigItem(ABC): - def __init__(self, label: str, pattern: bytes) -> None: - self._label = label - self._pattern = compile(pattern, flags=DOTALL) - - # Should be overridden by children to provide a meaningful value - @abstractmethod - def _derive_item_value(self) -> Any: - pass - - # Derives config Field RVAs and values from data using the specified - # ConfigItem's pattern - def parse_from(self, data: bytes) -> dict[int, Any]: - logger.debug(f"Parsing {self._label} values from data...") - fields = {} - raw_data = findall(self._pattern, data) - found_items = 0 - for obj, bytes_rva in raw_data: - try: - field_value = self._derive_item_value(obj) - field_rva = bytes_to_int(bytes_rva) - except Exception: - logger.debug(f"Could not parse value from {obj} at {hex(bytes_rva)}") - continue - if field_rva not in fields: - fields[field_rva] = field_value - found_items += 1 - else: - logger.debug(f"Overlapping Field RVAs detected in config at {hex(field_rva)}") - logger.debug(f"Parsed {found_items} {self._label} values") - return fields - - -class BoolConfigItem(ConfigItem): - def __init__(self) -> None: - super().__init__("boolean", b"(\x16|\x17)\x80(.{3}\x04)") - - # Boolean values are derived by examing if the opcode is "ldc.i4.0" (False) - # or "ldc.i4.1" (True) - def _derive_item_value(self, opcode: bytes) -> bool: - return bool(bytes_to_int(opcode) - bytes_to_int(OPCODE_LDC_I4_0)) - - -class ByteArrayConfigItem(ConfigItem): - def __init__(self) -> None: - super().__init__( - "byte array", - rb"\x1f(.\x8d.{3}\x01\x25\xd0.{3}\x04)\x28.{3}\x0a\x80(.{3}\x04)", - ) - - # Byte array size and RVA are returned, as these are needed to - # extract the value of the bytes from the payload - def _derive_item_value(self, byte_data: bytes) -> Tuple[int, int]: - arr_size = byte_data[0] - arr_rva = bytes_to_int(byte_data[-4:]) - return (arr_size, arr_rva) - - -class IntConfigItem(ConfigItem): - def __init__(self) -> None: - super().__init__("int", b"(\x20.{4}|[\x18-\x1e])\x80(.{3}\x04)") - - def _derive_item_value(self, int_bytes: bytes) -> int: - # If single byte, must be value 2-8, represented by opcodes 0x18-0x1e - # Subtract 0x16 to get the int value, e.g.: - # ldc.i4.8 == 0x1e - 0x16 == 8 - if len(int_bytes) == 1: - return bytes_to_int(int_bytes) - 0x16 - # Else, look for which int was loaded by "ldc.i4" - return bytes_to_int(int_bytes[1:]) - - -class NullConfigItem(ConfigItem): - def __init__(self) -> None: - super().__init__("null", b"(\x14\x80)(.{3}\x04)") - - # If "ldnull" is being used, simply return "null" - def _derive_item_value(self, _: bytes) -> str: - return "null" - - -class SpecialFolderConfigItem(ConfigItem): - def __init__(self) -> None: - super().__init__("special folder", b"\x1f(.)\x80(.{3}\x04)") - - # Translates SpecialFolder ID to name - def _derive_item_value(self, folder_id: bytes) -> str: - return SpecialFolder(bytes_to_int(folder_id)).name - - -class EncryptedStringConfigItem(ConfigItem): - def __init__(self) -> None: - super().__init__("encrypted string", b"\x72(.{3}\x70)\x80(.{3}\x04)") - - # Returns the encrypted string's RVA - def _derive_item_value(self, enc_str_rva: bytes) -> int: - return bytes_to_int(enc_str_rva) - - -SUPPORTED_CONFIG_ITEMS = [ - BoolConfigItem, - ByteArrayConfigItem, - IntConfigItem, - NullConfigItem, - SpecialFolderConfigItem, - EncryptedStringConfigItem, -] diff --git a/lib/parsers_aux/ratking/utils/data_utils.py b/lib/parsers_aux/ratking/utils/data_utils.py deleted file mode 100644 index 1f0ec88cd02..00000000000 --- a/lib/parsers_aux/ratking/utils/data_utils.py +++ /dev/null @@ -1,62 +0,0 @@ -#!/usr/bin/env python3 -# -# data_utils.py -# -# Author: jeFF0Falltrades -# -# Provides various utility functions for working with binary data -# -# MIT License -# -# Copyright (c) 2024 Jeff Archer -# -# Permission is hereby granted, free of charge, to any person obtaining a copy -# of this software and associated documentation files (the "Software"), to deal -# in the Software without restriction, including without limitation the rights -# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -# copies of the Software, and to permit persons to whom the Software is -# furnished to do so, subject to the following conditions: -# -# The above copyright notice and this permission notice shall be included in all -# copies or substantial portions of the Software. -# -# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -# SOFTWARE. -from ..config_parser_exception import ConfigParserException - - -# Converts a bytes object to an int object using the specified byte order -def bytes_to_int(bytes: bytes, order: str = "little") -> int: - try: - return int.from_bytes(bytes, byteorder=order) - except Exception: - raise ConfigParserException(f"Error parsing int from value: {bytes}") - - -# Decodes a bytes object to a Unicode string, using UTF-16LE for byte values -# with null bytes still embedded in them, and UTF-8 for all other values -def decode_bytes(byte_str: bytes | str) -> str: - if isinstance(byte_str, str): - return byte_str.strip() - result = None - try: - if b"\x00" in byte_str: - result = byte_str.decode("utf-16le") - else: - result = byte_str.decode("utf-8") - except Exception: - raise ConfigParserException(f"Error decoding bytes object to Unicode: {byte_str}") - return result - - -# Converts an int to a bytes object, with the specified length and order -def int_to_bytes(int: int, length: int = 4, order: str = "little") -> bytes: - try: - return int.to_bytes(length, order) - except Exception: - raise ConfigParserException(f"Error parsing bytes from value: {int}") diff --git a/lib/parsers_aux/ratking/utils/decryptors/__init__.py b/lib/parsers_aux/ratking/utils/decryptors/__init__.py deleted file mode 100644 index a340a598f31..00000000000 --- a/lib/parsers_aux/ratking/utils/decryptors/__init__.py +++ /dev/null @@ -1,50 +0,0 @@ -#!/usr/bin/env python3 -# -# __init__.py -# -# Author: jeFF0Falltrades -# -# Copyright (c) 2024 Jeff Archer -# -# Permission is hereby granted, free of charge, to any person obtaining a copy -# of this software and associated documentation files (the "Software"), to deal -# in the Software without restriction, including without limitation the rights -# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -# copies of the Software, and to permit persons to whom the Software is -# furnished to do so, subject to the following conditions: -# -# The above copyright notice and this permission notice shall be included in all -# copies or substantial portions of the Software. -# -# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -# SOFTWARE. -from .config_decryptor import ConfigDecryptor, IncompatibleDecryptorException -from .config_decryptor_aes_cbc import ConfigDecryptorAESCBC -from .config_decryptor_aes_ecb import ConfigDecryptorAESECB -from .config_decryptor_decrypt_xor import ConfigDecryptorDecryptXOR -from .config_decryptor_plaintext import ConfigDecryptorPlaintext -from .config_decryptor_random_hardcoded import ConfigDecryptorRandomHardcoded - -__all__ = [ - ConfigDecryptor, - IncompatibleDecryptorException, - ConfigDecryptorAESCBC, - ConfigDecryptorAESECB, - ConfigDecryptorDecryptXOR, - ConfigDecryptorRandomHardcoded, - ConfigDecryptorPlaintext, -] - -# ConfigDecryptorPlaintext should always be the last fallthrough case -SUPPORTED_DECRYPTORS = [ - ConfigDecryptorAESCBC, - ConfigDecryptorAESECB, - ConfigDecryptorDecryptXOR, - ConfigDecryptorRandomHardcoded, - ConfigDecryptorPlaintext, -] diff --git a/lib/parsers_aux/ratking/utils/decryptors/config_decryptor.py b/lib/parsers_aux/ratking/utils/decryptors/config_decryptor.py deleted file mode 100644 index 8b4827d38b3..00000000000 --- a/lib/parsers_aux/ratking/utils/decryptors/config_decryptor.py +++ /dev/null @@ -1,53 +0,0 @@ -#!/usr/bin/env python3 -# -# config_decryptor.py -# -# Author: jeFF0Falltrades -# -# Provides a simple abstract base class for different types of config decryptors -# -# MIT License -# -# Copyright (c) 2024 Jeff Archer -# -# Permission is hereby granted, free of charge, to any person obtaining a copy -# of this software and associated documentation files (the "Software"), to deal -# in the Software without restriction, including without limitation the rights -# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -# copies of the Software, and to permit persons to whom the Software is -# furnished to do so, subject to the following conditions: -# -# The above copyright notice and this permission notice shall be included in all -# copies or substantial portions of the Software. -# -# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -# SOFTWARE. -from abc import ABC, abstractmethod -from logging import getLogger - -from ..dotnetpe_payload import DotNetPEPayload - -logger = getLogger(__name__) - - -# Custom Exception to denote that a decryptor is incompatible with a payload -class IncompatibleDecryptorException(Exception): - pass - - -class ConfigDecryptor(ABC): - def __init__(self, payload: DotNetPEPayload) -> None: - self.key: bytes | str = None - self._payload = payload - self.salt: bytes = None - - # Abstract method to take in a map representing a configuration of config - # Field names and values and return a decoded/decrypted configuration - @abstractmethod - def decrypt_encrypted_strings(self, encrypted_strings: dict[str, str]) -> dict[str, list[str] | str]: - pass diff --git a/lib/parsers_aux/ratking/utils/decryptors/config_decryptor_aes_cbc.py b/lib/parsers_aux/ratking/utils/decryptors/config_decryptor_aes_cbc.py deleted file mode 100644 index 886a804eb68..00000000000 --- a/lib/parsers_aux/ratking/utils/decryptors/config_decryptor_aes_cbc.py +++ /dev/null @@ -1,290 +0,0 @@ -#!/usr/bin/env python3 -# -# config_decryptor_aes_cbc.py -# -# Author: jeFF0Falltrades -# -# Provides a custom AES decryptor for RAT payloads utilizing CBC mode -# -# Example Hash: 6b99acfa5961591c39b3f889cf29970c1dd48ddb0e274f14317940cf279a4412 -# -# MIT License -# -# Copyright (c) 2024 Jeff Archer -# -# Permission is hereby granted, free of charge, to any person obtaining a copy -# of this software and associated documentation files (the "Software"), to deal -# in the Software without restriction, including without limitation the rights -# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -# copies of the Software, and to permit persons to whom the Software is -# furnished to do so, subject to the following conditions: -# -# The above copyright notice and this permission notice shall be included in all -# copies or substantial portions of the Software. -# -# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -# SOFTWARE. -from base64 import b64decode -from logging import getLogger -from re import DOTALL, compile, escape, search -from typing import Tuple - -from cryptography.hazmat.backends import default_backend -from cryptography.hazmat.primitives.ciphers import Cipher -from cryptography.hazmat.primitives.ciphers.algorithms import AES -from cryptography.hazmat.primitives.ciphers.modes import CBC -from cryptography.hazmat.primitives.hashes import SHA1 -from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC -from cryptography.hazmat.primitives.padding import PKCS7 - -from ...config_parser_exception import ConfigParserException -from ..data_utils import bytes_to_int, decode_bytes, int_to_bytes -from ..dotnet_constants import OPCODE_LDSTR, OPCODE_LDTOKEN -from ..dotnetpe_payload import DotNetPEPayload -from .config_decryptor import ConfigDecryptor, IncompatibleDecryptorException - -logger = getLogger(__name__) - - -class ConfigDecryptorAESCBC(ConfigDecryptor): - # Minimum length of valid ciphertext - _MIN_CIPHERTEXT_LEN = 48 - - # Patterns for identifying AES metadata - _PATTERN_AES_KEY_AND_BLOCK_SIZE = compile(b"[\x06-\x09]\x20(.{4})\x6f.{4}[\x06-\x09]\x20(.{4})", DOTALL) - # Do not compile in-line replacement patterns - _PATTERN_AES_KEY_BASE = b"(.{3}\x04).%b" - _PATTERN_AES_SALT_INIT = b"\x80%b\x2a" - _PATTERN_AES_SALT_ITER = compile(b"[\x02-\x05]\x7e(.{4})\x20(.{4})\x73", DOTALL) - - def __init__(self, payload: DotNetPEPayload) -> None: - super().__init__(payload) - self._block_size: int = None - self._iterations: int = None - self._key_candidates: list[bytes] = None - self._key_size: int = None - self._key_rva: int = None - try: - self._get_aes_metadata() - except Exception as e: - raise IncompatibleDecryptorException(e) - - # Given an initialization vector and ciphertext, creates a Cipher - # object with the AES key and specified IV and decrypts the ciphertext - def _decrypt(self, iv: bytes, ciphertext: bytes) -> bytes: - logger.debug(f"Decrypting {ciphertext} with key {self.key.hex()} and IV {iv.hex()}...") - aes_cipher = Cipher(AES(self.key), CBC(iv), backend=default_backend()) - decryptor = aes_cipher.decryptor() - # Use a PKCS7 unpadder to remove padding from decrypted value - # https://cryptography.io/en/latest/hazmat/primitives/padding/ - unpadder = PKCS7(self._block_size).unpadder() - - try: - padded_text = decryptor.update(ciphertext) + decryptor.finalize() - unpadded_text = unpadder.update(padded_text) + unpadder.finalize() - except Exception as e: - raise ConfigParserException( - f"Error decrypting ciphertext {ciphertext} with IV {iv.hex()} and key {self.key.hex()} : {e}" - ) - - logger.debug(f"Decryption result: {unpadded_text}") - return unpadded_text - - # Derives AES passphrase candidates from a config - # - # If a passphrase is base64-encoded, both its raw value and decoded value - # will be added as candidates - def _derive_aes_passphrase_candidates(self, key_val: str) -> list[bytes]: - passphrase_candidates = [key_val.encode()] - try: - passphrase_candidates.append(b64decode(key_val)) - except Exception: - pass - logger.debug(f"AES passphrase candidates found: {passphrase_candidates}") - return passphrase_candidates - - # Decrypts encrypted config values with the provided cipher data - def decrypt_encrypted_strings(self, encrypted_strings: dict[str, str]) -> dict[str, str]: - logger.debug("Decrypting encrypted strings...") - if self._key_candidates is None: - self._key_candidates = self._get_aes_key_candidates(encrypted_strings) - - decrypted_config_strings = {} - for k, v in encrypted_strings.items(): - # Leave empty strings as they are - if len(v) == 0: - logger.debug(f"Key: {k}, Value: {v}") - decrypted_config_strings[k] = v - continue - - # Check if base64-encoded string - b64_exception = False - try: - decoded_val = b64decode(v) - except Exception: - b64_exception = True - # If it was not base64-encoded, or if it is less than our min length - # for ciphertext, leave the value as it is - if b64_exception or len(decoded_val) < self._MIN_CIPHERTEXT_LEN: - logger.debug(f"Key: {k}, Value: {v}") - decrypted_config_strings[k] = v - continue - - # Otherwise, extract the IV from the 16 bytes after the HMAC - # (first 32 bytes) and the ciphertext from the rest of the data - # after the IV, and run the decryption - iv, ciphertext = decoded_val[32:48], decoded_val[48:] - result, last_exc = None, None - key_idx = 0 - # Run through key candidates until suitable one found or failure - while result is None and key_idx < len(self._key_candidates): - try: - self.key = self._key_candidates[key_idx] - key_idx += 1 - result = decode_bytes(self._decrypt(iv, ciphertext)) - except ConfigParserException as e: - last_exc = e - - if result is None: - logger.debug(f"Decryption failed for item {v}: {last_exc}; Leaving as original value...") - result = v - - logger.debug(f"Key: {k}, Value: {result}") - decrypted_config_strings[k] = result - - logger.debug("Successfully decrypted strings") - return decrypted_config_strings - - # Extracts AES key candidates from the payload - def _get_aes_key_candidates(self, encrypted_strings: dict[str, str]) -> list[bytes]: - logger.debug("Extracting AES key candidates...") - keys = [] - - # Use the key Field name to index into our existing config - key_raw_value = encrypted_strings[self._payload.field_name_from_rva(self._key_rva)] - passphrase_candidates = self._derive_aes_passphrase_candidates(key_raw_value) - - for candidate in passphrase_candidates: - try: - # The backend parameter is optional in newer versions of the - # cryptography library, but we keep it here for compatibility - kdf = PBKDF2HMAC( - SHA1(), - length=self._key_size, - salt=self.salt, - iterations=self._iterations, - backend=default_backend(), - ) - keys.append(kdf.derive(candidate)) - logger.debug(f"AES key derived: {keys[-1]}") - except Exception: - continue - - if len(keys) == 0: - raise ConfigParserException(f"Could not derive key from passphrase candidates: {passphrase_candidates}") - return keys - - # Extracts the AES key and block size from the payload - def _get_aes_key_and_block_size(self) -> Tuple[int, int]: - logger.debug("Extracting AES key and block size...") - hit = search(self._PATTERN_AES_KEY_AND_BLOCK_SIZE, self._payload.data) - if hit is None: - raise ConfigParserException("Could not extract AES key or block size") - - # Convert key size from bits to bytes by dividing by 8 - # Note use of // instead of / to ensure integer output, not float - key_size = bytes_to_int(hit.groups()[0]) // 8 - block_size = bytes_to_int(hit.groups()[1]) - - logger.debug(f"Found key size {key_size} and block size {block_size}") - return key_size, block_size - - # Given an offset to an instruction within the Method that sets up the - # Cipher, extracts the AES key RVA from the payload - def _get_aes_key_rva(self, metadata_ins_offset: int) -> int: - logger.debug("Extracting AES key RVA...") - - # Get the RVA of the method that sets up AES256 metadata - metadata_method_token = self._payload.method_from_instruction_offset(metadata_ins_offset, by_token=True).token - - # Insert this RVA into the KEY_BASE pattern to find where the AES key - # is initialized - key_hit = search( - self._PATTERN_AES_KEY_BASE % escape(int_to_bytes(metadata_method_token)), - self._payload.data, - DOTALL, - ) - if key_hit is None: - raise ConfigParserException("Could not find AES key pattern") - - key_rva = bytes_to_int(key_hit.groups()[0]) - logger.debug(f"AES key RVA: {hex(key_rva)}") - return key_rva - - # Identifies the initialization of the AES256 object in the payload and - # sets the necessary values needed for decryption - def _get_aes_metadata(self) -> None: - logger.debug("Extracting AES metadata...") - metadata = search(self._PATTERN_AES_SALT_ITER, self._payload.data) - if metadata is None: - raise ConfigParserException("Could not identify AES metadata") - logger.debug(f"AES metadata found at offset {hex(metadata.start())}") - - self._key_size, self._block_size = self._get_aes_key_and_block_size() - - logger.debug("Extracting AES iterations...") - self._iterations = bytes_to_int(metadata.groups()[1]) - logger.debug(f"Found AES iteration number of {self._iterations}") - - self.salt = self._get_aes_salt(metadata.groups()[0]) - self._key_rva = self._get_aes_key_rva(metadata.start()) - - # Extracts the AES salt from the payload, accounting for both hardcoded - # salt byte arrays, and salts derived from hardcoded strings - def _get_aes_salt(self, salt_rva: int) -> bytes: - logger.debug("Extracting AES salt value...") - - # Use % to insert our salt RVA into our match pattern - # This pattern will then find the salt initialization ops, - # specifically: - # - # stsfld uint8[] Client.Algorithm.Aes256::Salt - # ret - aes_salt_initialization = self._payload.data.find(self._PATTERN_AES_SALT_INIT % escape(salt_rva)) - if aes_salt_initialization == -1: - raise ConfigParserException("Could not identify AES salt initialization") - - # Look at the opcode used to initialize the salt to decide how to - # proceed with extracting the salt value (start of pattern - 10 bytes) - salt_op_offset = aes_salt_initialization - 10 - # Need to use bytes([int]) here to properly convert from int to byte - # string for our comparison below - salt_op = bytes([self._payload.data[salt_op_offset]]) - - # Get the salt RVA from the 4 bytes following the initialization op - salt_strings_rva_packed = self._payload.data[salt_op_offset + 1 : salt_op_offset + 5] - salt_strings_rva = bytes_to_int(salt_strings_rva_packed) - - # If the op is a ldstr op, just get the bytes value of the string being - # used to initialize the salt - if salt_op == OPCODE_LDSTR: - salt_encoded = self._payload.user_string_from_rva(salt_strings_rva) - # We use decode_bytes() here to get the salt string without any - # null bytes (because it's stored as UTF-16LE), then convert it - # back to bytes - salt = decode_bytes(salt_encoded).encode() - # If the op is a ldtoken (0xd0) operation, we need to get the salt - # byte array value from the FieldRVA table - elif salt_op == OPCODE_LDTOKEN: - salt_size = self._payload.data[salt_op_offset - 7] - salt = self._payload.byte_array_from_size_and_rva(salt_size, salt_strings_rva) - else: - raise ConfigParserException(f"Unknown salt opcode found: {salt_op.hex()}") - - logger.debug(f"Found salt value: {salt.hex()}") - return salt diff --git a/lib/parsers_aux/ratking/utils/decryptors/config_decryptor_aes_ecb.py b/lib/parsers_aux/ratking/utils/decryptors/config_decryptor_aes_ecb.py deleted file mode 100644 index 668c18de4e6..00000000000 --- a/lib/parsers_aux/ratking/utils/decryptors/config_decryptor_aes_ecb.py +++ /dev/null @@ -1,152 +0,0 @@ -#!/usr/bin/env python3 -# -# config_decryptor_aes_ecb.py -# -# Author: jeFF0Falltrades -# -# Provides a custom AES decryptor for RAT payloads utilizing ECB mode -# -# Example Hash: d5028e10a756f2df677f32ebde105d7de8df37e253c431837c8f810260f4428e -# -# MIT License -# -# Copyright (c) 2024 Jeff Archer -# -# Permission is hereby granted, free of charge, to any person obtaining a copy -# of this software and associated documentation files (the "Software"), to deal -# in the Software without restriction, including without limitation the rights -# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -# copies of the Software, and to permit persons to whom the Software is -# furnished to do so, subject to the following conditions: -# -# The above copyright notice and this permission notice shall be included in all -# copies or substantial portions of the Software. -# -# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -# SOFTWARE. -from base64 import b64decode -from hashlib import md5 -from logging import getLogger -from re import DOTALL, compile, search - -from cryptography.hazmat.backends import default_backend -from cryptography.hazmat.primitives.ciphers import Cipher -from cryptography.hazmat.primitives.ciphers.algorithms import AES -from cryptography.hazmat.primitives.ciphers.modes import ECB -from cryptography.hazmat.primitives.padding import PKCS7 - -from ...config_parser_exception import ConfigParserException -from ..data_utils import bytes_to_int, decode_bytes -from ..dotnetpe_payload import DotNetPEPayload -from .config_decryptor import ConfigDecryptor, IncompatibleDecryptorException - -logger = getLogger(__name__) - - -class ConfigDecryptorAESECB(ConfigDecryptor): - # MD5 hash pattern used to detect AES key - _PATTERN_MD5_HASH = compile(rb"\x7e(.{3}\x04)\x28.{3}\x06\x6f", DOTALL) - - def __init__(self, payload: DotNetPEPayload) -> None: - super().__init__(payload) - try: - self._aes_key_rva = self._get_aes_key_rva() - except Exception as e: - raise IncompatibleDecryptorException(e) - - # Given ciphertext, creates a Cipher object with the AES key and decrypts - # the ciphertext - def _decrypt(self, ciphertext: bytes) -> bytes: - logger.debug(f"Decrypting {ciphertext} with key {self.key.hex()}...") - aes_cipher = Cipher(AES(self.key), ECB(), backend=default_backend()) - decryptor = aes_cipher.decryptor() - unpadder = PKCS7(AES.block_size).unpadder() - # Use a PKCS7 unpadder to remove padding from decrypted value - # https://cryptography.io/en/latest/hazmat/primitives/padding/ - unpadder = PKCS7(AES.block_size).unpadder() - - try: - padded_text = decryptor.update(ciphertext) + decryptor.finalize() - unpadded_text = unpadder.update(padded_text) + unpadder.finalize() - except Exception as e: - raise ConfigParserException(f"Error decrypting ciphertext {ciphertext} with key {self.key.hex()}: {e}") - - logger.debug(f"Decryption result: {unpadded_text}") - return unpadded_text - - # Decrypts encrypted config values with the provided cipher data - def decrypt_encrypted_strings(self, encrypted_strings: dict[str, str]) -> dict[str, str]: - logger.debug("Decrypting encrypted strings...") - - if self.key is None: - try: - raw_key_field = self._payload.field_name_from_rva(self._aes_key_rva) - self.key = self._derive_aes_key(encrypted_strings[raw_key_field]) - except Exception as e: - raise ConfigParserException(f"Failed to derive AES key: {e}") - - decrypted_config_strings = {} - for k, v in encrypted_strings.items(): - # Leave empty strings as they are - if len(v) == 0: - logger.debug(f"Key: {k}, Value: {v}") - decrypted_config_strings[k] = v - continue - - # Check if base64-encoded string - b64_exception = False - try: - decoded_val = b64decode(v) - except Exception: - b64_exception = True - # If it was not base64-encoded, leave the value as it is - if b64_exception: - logger.debug(f"Key: {k}, Value: {v}") - decrypted_config_strings[k] = v - continue - - ciphertext = decoded_val - result, last_exc = None, None - try: - result = decode_bytes(self._decrypt(ciphertext)) - except ConfigParserException as e: - last_exc = e - - if result is None: - logger.debug(f"Decryption failed for item {v}: {last_exc}") - result = v - - logger.debug(f"Key: {k}, Value: {result}") - decrypted_config_strings[k] = result - - logger.debug("Successfully decrypted strings") - return decrypted_config_strings - - # Given the raw bytes that will become the key value, derives the AES key - def _derive_aes_key(self, key_unhashed: str) -> bytes: - # Generate the MD5 hash - md5_hash = md5() - md5_hash.update(key_unhashed.encode("utf-8")) - md5_digest = md5_hash.digest() - - # Key is a 32-byte value made up of the MD5 hash overlaying itself, - # tailed with one null byte - key = md5_digest[:15] + md5_digest[:16] + b"\x00" - logger.debug(f"AES key derived: {key}") - return key - - # Extracts the AES key RVA from the payload - def _get_aes_key_rva(self) -> int: - logger.debug("Extracting AES key value...") - key_hit = search(self._PATTERN_MD5_HASH, self._payload.data) - if key_hit is None: - raise ConfigParserException("Could not find AES key pattern") - - key_rva = bytes_to_int(key_hit.groups()[0]) - logger.debug(f"AES key RVA: {hex(key_rva)}") - return key_rva diff --git a/lib/parsers_aux/ratking/utils/decryptors/config_decryptor_decrypt_xor.py b/lib/parsers_aux/ratking/utils/decryptors/config_decryptor_decrypt_xor.py deleted file mode 100644 index 7ea52618ceb..00000000000 --- a/lib/parsers_aux/ratking/utils/decryptors/config_decryptor_decrypt_xor.py +++ /dev/null @@ -1,121 +0,0 @@ -#!/usr/bin/env python3 -# -# config_decryptor_decrypt_xor.py -# -# Author: jeFF0Falltrades -# -# Provides a custom decryptor for RAT payloads utilizing the DecryptXOR -# method of embeddeding config strings -# -# Example Hash: 6e5671dec52db7f64557ba8ef70caf53cf0c782795236b03655623640f9e6a83 -# -# MIT License -# -# Copyright (c) 2024 Jeff Archer -# -# Permission is hereby granted, free of charge, to any person obtaining a copy -# of this software and associated documentation files (the "Software"), to deal -# in the Software without restriction, including without limitation the rights -# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -# copies of the Software, and to permit persons to whom the Software is -# furnished to do so, subject to the following conditions: -# -# The above copyright notice and this permission notice shall be included in all -# copies or substantial portions of the Software. -# -# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -# SOFTWARE. -from logging import getLogger -from re import DOTALL, compile, findall, search - -from ...config_parser_exception import ConfigParserException -from ..data_utils import bytes_to_int, decode_bytes -from ..dotnet_constants import PATTERN_LDSTR_OP -from ..dotnetpe_payload import DotNetPEPayload -from .config_decryptor import ConfigDecryptor, IncompatibleDecryptorException -from .config_decryptor_plaintext import ConfigDecryptorPlaintext - -logger = getLogger(__name__) - - -class ConfigDecryptorDecryptXOR(ConfigDecryptor): - _KEY_XOR_DECODED_STRINGS = "xor_decoded_strings" - - # Pattern to detect usage of DecryptXOR Method - _PATTERN_DECRYPT_XOR_BLOCK = compile( - rb"(\x2d.\x72.{3}\x70\x28.{3}\x06\x2a(?:\x02[\x16-\x1f].?\x33.\x72.{3}\x70\x28.{3}\x06\x2a){7,}.+?\x72.{3}\x70)", - flags=DOTALL, - ) - - def __init__(self, payload: DotNetPEPayload) -> None: - super().__init__(payload) - # Filled in _get_xor_metadata() - self._xor_strings: list[str] = [] - try: - self._get_xor_metadata() - except Exception as e: - raise IncompatibleDecryptorException(e) - - # Returns a list of decoded XOR-encoded strings found in the payload - def _decode_encoded_strings(self) -> list[str]: - decoded_strings = [] - - for string in self._xor_strings: - decoded = [] - # Do not modify unencoded strings - if ":" not in string: - decoded_strings.append(string) - continue - - # Split encoded string by ':' and run XOR decoding - arr, arr2 = (bytes.fromhex(arr) for arr in string.split(":")) - for idx, byte in enumerate(arr2): - decoded.append(byte ^ self.key[idx % len(self.key)] ^ arr[idx]) - decoded_strings.append(decode_bytes(bytes(decoded))) - - logger.debug(f"Decoded {len(decoded_strings)} strings") - return decoded_strings - - # Parses the config, adds decoded XOR strings, and returns the decoded - # config - def decrypt_encrypted_strings(self, encrypted_strings: dict[str, str]) -> dict[str, list[str] | str]: - config = {} - # Pass off plaintext config to a ConfigDecryptorPlaintext - ptcd = ConfigDecryptorPlaintext(self._payload) - config.update(ptcd.decrypt_encrypted_strings(encrypted_strings)) - config[self._KEY_XOR_DECODED_STRINGS] = self._decode_encoded_strings() - return config - - # Gathers XOR metadata from the payload - def _get_xor_metadata(self): - dxor_block = search(self._PATTERN_DECRYPT_XOR_BLOCK, self._payload.data) - if dxor_block is None: - raise ConfigParserException("Could not identify DecryptXOR block") - logger.debug(f"DecryptXOR block found at offset {hex(dxor_block.start())}") - - # Derive all XOR-encoded string references in the DecryptXOR block - xor_string_rvas = findall(PATTERN_LDSTR_OP, dxor_block.groups()[0]) - self._xor_strings = list( - filter( - None, - [self._payload.user_string_from_rva(bytes_to_int(rva)) for rva in xor_string_rvas], - ) - ) - logger.debug(f"{len(self._xor_strings)} XOR strings found") - - # Get the static constructor containing the XOR key - xor_key_cctor = self._payload.method_from_instruction_offset(dxor_block.start(), step=1, by_token=True) - xor_key_cctor_body = self._payload.method_body_from_method(xor_key_cctor) - - # Derive the XOR key RVA and value - xor_rva = search(PATTERN_LDSTR_OP, xor_key_cctor_body) - if xor_rva is None: - raise ConfigParserException("Could not identify XOR key RVA") - xor_rva = bytes_to_int(xor_rva.groups()[0]) - self.key = bytes(self._payload.user_string_from_rva(xor_rva), encoding="utf-8") - logger.debug(f"XOR key found at {hex(xor_rva)} : {self.key}") diff --git a/lib/parsers_aux/ratking/utils/decryptors/config_decryptor_plaintext.py b/lib/parsers_aux/ratking/utils/decryptors/config_decryptor_plaintext.py deleted file mode 100644 index b24d2b387fd..00000000000 --- a/lib/parsers_aux/ratking/utils/decryptors/config_decryptor_plaintext.py +++ /dev/null @@ -1,124 +0,0 @@ -#!/usr/bin/env python3 -# -# config_decryptor_plaintext.py -# -# Author: jeFF0Falltrades -# -# Provides a fall-through decryptor that will attempt to return the plaintext -# values of a found config when all other decryptors fail by matching known -# config field names from supported RAT families -# -# MIT License -# -# Copyright (c) 2024 Jeff Archer -# -# Permission is hereby granted, free of charge, to any person obtaining a copy -# of this software and associated documentation files (the "Software"), to deal -# in the Software without restriction, including without limitation the rights -# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -# copies of the Software, and to permit persons to whom the Software is -# furnished to do so, subject to the following conditions: -# -# The above copyright notice and this permission notice shall be included in all -# copies or substantial portions of the Software. -# -# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -# SOFTWARE. -from logging import getLogger - -from ...config_parser_exception import ConfigParserException -from ..dotnetpe_payload import DotNetPEPayload -from .config_decryptor import ConfigDecryptor - -logger = getLogger(__name__) - -KNOWN_CONFIG_FIELD_NAMES = set( - [ - "AUTHKEY", - "An_ti", - "Anti", - "Anti_Process", - "BDOS", - "BS_OD", - "Certifi_cate", - "Certificate", - "DIRECTORY", - "De_lay", - "Delay", - "DoStartup", - "ENABLELOGGER", - "EncryptionKey", - "Groub", - "Group", - "HIDEFILE", - "HIDEINSTALLSUBDIRECTORY", - "HIDELOGDIRECTORY", - "HOSTS", - "Hos_ts", - "Hosts", - "Hw_id", - "Hwid", - "INSTALL", - "INSTALLNAME", - "In_stall", - "Install", - "InstallDir", - "InstallFile", - "InstallFolder", - "InstallStr", - "Install_File", - "Install_Folder", - "Install_path", - "KEY", - "Key", - "LOGDIRECTORYNAME", - "MTX", - "MUTEX", - "Mutex", - "Paste_bin", - "Pastebin", - "Por_ts", - "Port", - "Ports", - "RECONNECTDELAY", - "SPL", - "STARTUP", - "STARTUPKEY", - "SUBDIRECTORY", - "ServerIp", - "ServerPort", - "Server_signa_ture", - "Serversignature", - "Sleep", - "TAG", - "USBNM", - "VERSION", - "Ver_sion", - "Version", - "delay", - "mutex_string", - "startup_name", - ] -) - - -class ConfigDecryptorPlaintext(ConfigDecryptor): - # Minimum threshold for matching Field names - MIN_THRESHOLD_MATCH = 3 - - def __init__(self, payload: DotNetPEPayload) -> None: - super().__init__(payload) - - # Calculates whether the config meets the minimum threshold for known Field - # Names and returns it if it does - def decrypt_encrypted_strings(self, encrypted_strings: dict[str, str]) -> dict[str, str]: - field_names = set(encrypted_strings.keys()) - num_overlapping_field_names = len(KNOWN_CONFIG_FIELD_NAMES & field_names) - if num_overlapping_field_names < self.MIN_THRESHOLD_MATCH: - raise ConfigParserException("Plaintext threshold of known config items not met") - return encrypted_strings diff --git a/lib/parsers_aux/ratking/utils/decryptors/config_decryptor_random_hardcoded.py b/lib/parsers_aux/ratking/utils/decryptors/config_decryptor_random_hardcoded.py deleted file mode 100644 index e5d598f47b7..00000000000 --- a/lib/parsers_aux/ratking/utils/decryptors/config_decryptor_random_hardcoded.py +++ /dev/null @@ -1,96 +0,0 @@ -#!/usr/bin/env python3 -# -# config_decryptor_random_hardcoded.py -# -# Author: jeFF0Falltrades -# -# Provides a custom decryptor for RAT payloads utilizing the method of -# randomly selecting from an embedded list of C2 domains/supradomains -# -# Example hash: a2817702fecb280069f0723cd2d0bfdca63763b9cdc833941c4f33bbe383d93e -# -# MIT License -# -# Copyright (c) 2024 Jeff Archer -# -# Permission is hereby granted, free of charge, to any person obtaining a copy -# of this software and associated documentation files (the "Software"), to deal -# in the Software without restriction, including without limitation the rights -# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -# copies of the Software, and to permit persons to whom the Software is -# furnished to do so, subject to the following conditions: -# -# The above copyright notice and this permission notice shall be included in all -# copies or substantial portions of the Software. -# -# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -# SOFTWARE. -from logging import getLogger -from re import DOTALL, compile, findall, search - -from ...config_parser_exception import ConfigParserException -from ..data_utils import bytes_to_int -from ..dotnet_constants import PATTERN_LDSTR_OP -from ..dotnetpe_payload import DotNetPEMethod, DotNetPEPayload -from .config_decryptor import ConfigDecryptor, IncompatibleDecryptorException -from .config_decryptor_plaintext import ConfigDecryptorPlaintext - -logger = getLogger(__name__) - - -class ConfigDecryptorRandomHardcoded(ConfigDecryptor): - _KEY_HARDCODED_HOSTS = "hardcoded_hosts" - - # Pattern to find the Method that retrieves a random domain - _PATTERN_RANDOM_DOMAIN = compile(rb"(?:\x73.{3}\x0a){2}\x25.+?\x0a\x06(?:\x6f.{3}\x0a){2}\x0b", flags=DOTALL) - - def __init__(self, payload: DotNetPEPayload) -> None: - super().__init__(payload) - try: - self._random_domain_method = self._get_random_domain_method() - except Exception as e: - raise IncompatibleDecryptorException(e) - - # Returns a combined config containing config fields + hardcoded hosts - def decrypt_encrypted_strings(self, encrypted_strings: dict[str, str]) -> dict[str, list[str] | str]: - config = {} - # Pass off plaintext config to a ConfigDecryptorPlaintext - ptcd = ConfigDecryptorPlaintext(self._payload) - config.update(ptcd.decrypt_encrypted_strings(encrypted_strings)) - config[self._KEY_HARDCODED_HOSTS] = self._get_hardcoded_hosts() - return config - - # Retrieves and returns a list of hardcoded hosts - def _get_hardcoded_hosts(self) -> list[str]: - random_domain_method_body = self._payload.method_body_from_method(self._random_domain_method) - hardcoded_host_rvas = findall(PATTERN_LDSTR_OP, random_domain_method_body) - - hardcoded_hosts = [] - for rva in hardcoded_host_rvas: - try: - harcoded_host = self._payload.user_string_from_rva(bytes_to_int(rva)) - if harcoded_host != ".": - hardcoded_hosts.append(harcoded_host) - except Exception as e: - logger.error(f"Error translating hardcoded host at {hex(rva)}: {e}") - continue - - logger.debug(f"Hardcoded hosts found: {hardcoded_hosts}") - return hardcoded_hosts - - # Retrieves the Method that randomly selects from a list of embedded hosts - def _get_random_domain_method(self) -> DotNetPEMethod: - logger.debug("Searching for random domain method") - random_domain_marker = search(self._PATTERN_RANDOM_DOMAIN, self._payload.data) - if random_domain_marker is None: - raise ConfigParserException("Could not identify random domain generator method") - - random_domain_method = self._payload.method_from_instruction_offset(random_domain_marker.start()) - - logger.debug(f"Random domain generator found at offset {hex(random_domain_method.offset)}") - return random_domain_method diff --git a/lib/parsers_aux/ratking/utils/dotnet_constants.py b/lib/parsers_aux/ratking/utils/dotnet_constants.py deleted file mode 100644 index 84f82e14619..00000000000 --- a/lib/parsers_aux/ratking/utils/dotnet_constants.py +++ /dev/null @@ -1,86 +0,0 @@ -#!/usr/bin/env python3 -# -# dotnet_constants.py -# -# Author: jeFF0Falltrades -# -# Useful .NET constants and enums -# -# MIT License -# -# Copyright (c) 2024 Jeff Archer -# -# Permission is hereby granted, free of charge, to any person obtaining a copy -# of this software and associated documentation files (the "Software"), to deal -# in the Software without restriction, including without limitation the rights -# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -# copies of the Software, and to permit persons to whom the Software is -# furnished to do so, subject to the following conditions: -# -# The above copyright notice and this permission notice shall be included in all -# copies or substantial portions of the Software. -# -# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -# SOFTWARE. -from enum import IntEnum -from re import DOTALL, compile - -# Notable CIL Opcodes and Tokens -OPCODE_LDC_I4_0 = b"\x16" -OPCODE_LDSTR = b"\x72" -OPCODE_LDTOKEN = b"\xd0" -MDT_FIELD_DEF = 0x04000000 -MDT_METHOD_DEF = 0x06000000 -MDT_STRING = 0x70000000 -PATTERN_LDSTR_OP = compile( - rb"\x72(.{3}\x70)", - flags=DOTALL, -) - - -# IntEnum derivative used for translating a SpecialFolder ID to its name -class SpecialFolder(IntEnum): - ADMINTOOLS = 48 - APPLICATIONDATA = 26 - CDBURNING = 59 - COMMONADMINTOOLS = 47 - COMMONAPPLICATIONDATA = 35 - COMMONDESKTOPDIRECTORY = 25 - COMMONDOCUMENTS = 46 - COMMONMUSIC = 53 - COMMONOEMLINKS = 58 - COMMONPICTURES = 54 - COMMONPROGRAMFILES = 43 - COMMONPROGRAMFILESX86 = 44 - COMMONPROGRAMS = 23 - COMMONSTARTMENU = 22 - COMMONSTARTUP = 24 - COMMONTEMPLATES = 45 - COMMONVIDEOS = 55 - COOKIES = 33 - DESKTOPDIRECTORY = 16 - FONTS = 20 - HISTORY = 34 - INTERNETCACHE = 32 - LOCALAPPLICATIONDATA = 28 - LOCALIZEDRESOURCES = 57 - MYCOMPUTER = 17 - MYMUSIC = 13 - MYPICTURES = 39 - MYVIDEOS = 14 - NETWORKSHORTCUTS = 19 - PRINTERSHORTCUTS = 27 - PROGRAMFILES = 38 - PROGRAMFILESX86 = 42 - RESOURCES = 56 - STARTMENU = 11 - SYSTEM = 37 - SYSTEMX86 = 41 - TEMPLATES = 21 - USERPROFILE = 40 - WINDOWS = 36 diff --git a/lib/parsers_aux/ratking/utils/dotnetpe_payload.py b/lib/parsers_aux/ratking/utils/dotnetpe_payload.py deleted file mode 100644 index d2d9d3f60f3..00000000000 --- a/lib/parsers_aux/ratking/utils/dotnetpe_payload.py +++ /dev/null @@ -1,199 +0,0 @@ -#!/usr/bin/env python3 -# -# dotnetpe_payload.py -# -# Author: jeFF0Falltrades -# -# Provides a wrapper class for accessing metadata from a DotNetPE object and -# performing data conversions -# -# MIT License -# -# Copyright (c) 2024 Jeff Archer -# -# Permission is hereby granted, free of charge, to any person obtaining a copy -# of this software and associated documentation files (the "Software"), to deal -# in the Software without restriction, including without limitation the rights -# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -# copies of the Software, and to permit persons to whom the Software is -# furnished to do so, subject to the following conditions: -# -# The above copyright notice and this permission notice shall be included in all -# copies or substantial portions of the Software. -# -# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -# SOFTWARE. -from dataclasses import dataclass -from hashlib import sha256 -from logging import getLogger - -from dnfile import dnPE -from yara import Rules - -from ..config_parser_exception import ConfigParserException -from .data_utils import bytes_to_int -from .dotnet_constants import MDT_FIELD_DEF, MDT_METHOD_DEF, MDT_STRING - -logger = getLogger(__name__) - - -# Helper class representing a single Method -@dataclass -class DotNetPEMethod: - name: str - offset: int - rva: int - size: int - token: int - - -class DotNetPEPayload: - # def __init__(self, file_path: str, yara_rule: Rules = None) -> None: - def __init__(self, file_data: bytes, yara_rule: Rules = None) -> None: - # self.file_path = file_path - self.data = file_data # self._get_file_data() - - # Calculate SHA256 - sha256_obj = sha256() - sha256_obj.update(self.data) - self.sha256 = sha256_obj.hexdigest() - - self.dotnetpe: dnPE = None - try: - # self.dotnetpe = dnPE(self.file_path, clr_lazy_load=True) - self.dotnetpe = dnPE(data=file_data, clr_lazy_load=True) - except Exception: - raise ConfigParserException("Failed to load project as dotnet executable") - - self.yara_match = "" - if yara_rule is not None: - self.yara_match = self._match_yara(yara_rule) - - # Pre-sort Method table for efficient lookups - self._methods = self._generate_method_list() - self._methods_by_offset = sorted(self._methods, key=lambda m: m.offset) - self._methods_by_token = sorted(self._methods, key=lambda m: m.token) - - # Given a byte array's size and RVA, translates the RVA to the offset of - # the byte array and returns the bytes of the array as a byte string - def byte_array_from_size_and_rva(self, arr_size: int, arr_rva: int) -> bytes: - arr_field_rva = self.fieldrva_from_rva(arr_rva) - arr_offset = self.offset_from_rva(arr_field_rva) - return self.data[arr_offset : arr_offset + arr_size] - - # Given an offset, and either a terminating offset or delimiter, extracts - # the byte string - def byte_string_from_offset(self, offset_start: int, offstart_end: int = -1, delimiter: bytes = b"\0") -> bytes: - if offstart_end != -1: - try: - return self.data[offset_start:offstart_end] - except Exception: - raise ConfigParserException( - f"Could not extract string value from offset range [{hex(offset_start)}:{offstart_end}]" - ) - try: - return self.data[offset_start:].partition(delimiter)[0] - except Exception: - raise ConfigParserException( - f"Could not extract string value from offset {hex(offset_start)} with delimiter {delimiter}" - ) - - # Given an RVA, derives the corresponding Field name - def field_name_from_rva(self, rva: int) -> str: - try: - return self.dotnetpe.net.mdtables.Field.rows[(rva ^ MDT_FIELD_DEF) - 1].Name.value - except Exception: - raise ConfigParserException(f"Could not find Field for RVA {rva}") - - # Given an RVA, derives the corresponding FieldRVA value - def fieldrva_from_rva(self, rva: int) -> int: - field_id = rva ^ MDT_FIELD_DEF - for row in self.dotnetpe.net.mdtables.FieldRva: - if row.struct.Field_Index == field_id: - return row.struct.Rva - raise ConfigParserException(f"Could not find FieldRVA for RVA {rva}") - - # Generates a list of DotNetPEMethod objects for efficient lookups of method - # metadata in other operations - def _generate_method_list( - self, - ) -> list[DotNetPEMethod]: - method_objs = [] - - for idx, method in enumerate(self.dotnetpe.net.mdtables.MethodDef.rows): - method_offset = self.offset_from_rva(method.Rva) - - # Parse size from flags - flags = self.data[method_offset] - method_size = 0 - if flags & 3 == 2: # Tiny format - method_size = flags >> 2 - elif flags & 3 == 3: # Fat format (add 12-byte header) - method_size = 12 + bytes_to_int(self.data[method_offset + 4 : method_offset + 8]) - - method_objs.append( - DotNetPEMethod( - method.Name.value, - method_offset, - method.Rva, - method_size, - (MDT_METHOD_DEF ^ idx) + 1, - ) - ) - return method_objs - - # Returns payload binary content - def _get_file_data(self) -> bytes: - logger.debug(f"Reading contents from: {self.file_path}") - try: - with open(self.file_path, "rb") as fp: - data = fp.read() - except Exception: - raise ConfigParserException(f"Error reading from path: {self.file_path}") - logger.debug(f"Successfully read {len(data)} bytes") - return data - - # Tests a given YARA rule object against the file at self.file_path, - # returning the matching rule's name, or "No match" - def _match_yara(self, rule: Rules) -> str: - try: - match = rule.match(self.file_path) - return str(match[0]) if len(match) > 0 else "No match" - except Exception as e: - logger.exception(e) - return f"Exception encountered: {e}" - - # Given a DotNetPEMethod, returns its body as raw bytes - def method_body_from_method(self, method: DotNetPEMethod) -> bytes: - return self.byte_string_from_offset(method.offset, method.offset + method.size) - - # Given a Method name, returns a list of DotNetPEMethods matching that name - def methods_from_name(self, name: str) -> list[DotNetPEMethod]: - return [method for method in self._methods if method.name == name] - - # Given the offset to an instruction, reverses the instruction to its - # parent Method, optionally returning an adjacent Method using step to - # signify the direction of adjacency, and using by_token to determine - # whether to calculate adjacency by token or offset - def method_from_instruction_offset(self, ins_offset: int, step: int = 0, by_token: bool = False) -> DotNetPEMethod: - for idx, method in enumerate(self._methods_by_offset): - if method.offset <= ins_offset < method.offset + method.size: - return ( - self._methods_by_token[self._methods_by_token.index(method) + step] - if by_token - else self._methods_by_offset[idx + step] - ) - raise ConfigParserException(f"Could not find method from instruction offset {hex(ins_offset)}") - - # Given an RVA, returns a data/file offset - def offset_from_rva(self, rva: int) -> int: - return self.dotnetpe.get_offset_from_rva(rva) - - # Given an RVA, derives the corresponding User String - def user_string_from_rva(self, rva: int) -> str: - return self.dotnetpe.net.user_strings.get(rva ^ MDT_STRING).value diff --git a/modules/processing/parsers/CAPE/AsyncRAT.py b/modules/processing/parsers/CAPE/AsyncRAT.py index 29c59a04fa2..1220071ea7d 100644 --- a/modules/processing/parsers/CAPE/AsyncRAT.py +++ b/modules/processing/parsers/CAPE/AsyncRAT.py @@ -1,4 +1,4 @@ -from lib.parsers_aux.ratking import RATConfigParser +from rat_king_parser.rkp import RATConfigParser def extract_config(data: bytes): diff --git a/modules/processing/parsers/CAPE/DCRat.py b/modules/processing/parsers/CAPE/DCRat.py index 29c59a04fa2..1220071ea7d 100644 --- a/modules/processing/parsers/CAPE/DCRat.py +++ b/modules/processing/parsers/CAPE/DCRat.py @@ -1,4 +1,4 @@ -from lib.parsers_aux.ratking import RATConfigParser +from rat_king_parser.rkp import RATConfigParser def extract_config(data: bytes): diff --git a/modules/processing/parsers/CAPE/QuasarRAT.py b/modules/processing/parsers/CAPE/QuasarRAT.py index 29c59a04fa2..1220071ea7d 100644 --- a/modules/processing/parsers/CAPE/QuasarRAT.py +++ b/modules/processing/parsers/CAPE/QuasarRAT.py @@ -1,4 +1,4 @@ -from lib.parsers_aux.ratking import RATConfigParser +from rat_king_parser.rkp import RATConfigParser def extract_config(data: bytes): diff --git a/modules/processing/parsers/CAPE/VenomRAT.py b/modules/processing/parsers/CAPE/VenomRAT.py index 29c59a04fa2..1220071ea7d 100644 --- a/modules/processing/parsers/CAPE/VenomRAT.py +++ b/modules/processing/parsers/CAPE/VenomRAT.py @@ -1,4 +1,4 @@ -from lib.parsers_aux.ratking import RATConfigParser +from rat_king_parser.rkp import RATConfigParser def extract_config(data: bytes): diff --git a/modules/processing/parsers/CAPE/XWorm.py b/modules/processing/parsers/CAPE/XWorm.py index 29c59a04fa2..1220071ea7d 100644 --- a/modules/processing/parsers/CAPE/XWorm.py +++ b/modules/processing/parsers/CAPE/XWorm.py @@ -1,4 +1,4 @@ -from lib.parsers_aux.ratking import RATConfigParser +from rat_king_parser.rkp import RATConfigParser def extract_config(data: bytes): diff --git a/modules/processing/parsers/CAPE/XenoRAT.py b/modules/processing/parsers/CAPE/XenoRAT.py index 29c59a04fa2..1220071ea7d 100644 --- a/modules/processing/parsers/CAPE/XenoRAT.py +++ b/modules/processing/parsers/CAPE/XenoRAT.py @@ -1,4 +1,4 @@ -from lib.parsers_aux.ratking import RATConfigParser +from rat_king_parser.rkp import RATConfigParser def extract_config(data: bytes): diff --git a/poetry.lock b/poetry.lock index 9aeb10ad277..26063d23dd1 100644 --- a/poetry.lock +++ b/poetry.lock @@ -3303,6 +3303,29 @@ files = [ [package.dependencies] pycryptodomex = "*" +[[package]] +name = "rat-king-parser" +version = "3.0.0" +description = "A robust, multiprocessing-capable, multi-family RAT config parser/config extractor for AsyncRAT, DcRAT, VenomRAT, QuasarRAT, XWorm, Xeno RAT, and cloned/derivative RAT families." +optional = false +python-versions = ">=3.10" +files = [] +develop = false + +[package.dependencies] +cryptography = "*" +dnfile = "*" +yara-python = "*" + +[package.extras] +maco = ["maco", "validators"] + +[package.source] +type = "git" +url = "https://github.com/jeFF0Falltrades/rat_king_parser" +reference = "ab849ec8face38c8dac3f803ae5fe7cf8be26583" +resolved_reference = "ab849ec8face38c8dac3f803ae5fe7cf8be26583" + [[package]] name = "regex" version = "2021.7.6" @@ -4615,4 +4638,4 @@ maco = ["maco"] [metadata] lock-version = "2.0" python-versions = ">=3.10, <4.0" -content-hash = "ab3f807dcdc7fa1fb2098ee2222602ffde65894297ca6b4c3629f2528d584d09" +content-hash = "ab65373ef8c8244e2d8237cb6208783a0276fa62f52545098cb12170c1cd7d76" diff --git a/pyproject.toml b/pyproject.toml index 43be4a45a58..2d50b19820a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -91,6 +91,8 @@ setproctitle = "1.3.2" # tmp dependency to fix vuln certifi = "2024.7.4" +rat_king_parser = {git = "https://github.com/jeFF0Falltrades/rat_king_parser", rev = "ab849ec8face38c8dac3f803ae5fe7cf8be26583"} + [tool.poetry.extras] maco = ["maco"]