From 6cd39c04c14735057c747c933eeccdf364bb7466 Mon Sep 17 00:00:00 2001 From: Gustavo Moreira Date: Wed, 18 Sep 2024 17:04:17 +1000 Subject: [PATCH 01/17] Refactor of module object. Adding function helpers to simplify the control of exceptions and errors and consolidate everything on them. --- .../symbols/linux/extensions/__init__.py | 89 +++++++++---------- 1 file changed, 40 insertions(+), 49 deletions(-) diff --git a/volatility3/framework/symbols/linux/extensions/__init__.py b/volatility3/framework/symbols/linux/extensions/__init__.py index fdd34403ac..0a879249b4 100644 --- a/volatility3/framework/symbols/linux/extensions/__init__.py +++ b/volatility3/framework/symbols/linux/extensions/__init__.py @@ -13,7 +13,6 @@ from volatility3.framework import constants, exceptions, objects, interfaces, symbols from volatility3.framework.renderers import conversion -from volatility3.framework.configuration import requirements from volatility3.framework.constants.linux import SOCK_TYPES, SOCK_FAMILY from volatility3.framework.constants.linux import IP_PROTOCOLS, IPV6_PROTOCOLS from volatility3.framework.constants.linux import TCP_STATES, NETLINK_PROTOCOLS @@ -56,88 +55,80 @@ def mod_mem_type(self): self._mod_mem_type = {} return self._mod_mem_type + def _get_mem_type(self, mod_mem_type_name): + module_mem_index = self.mod_mem_type.get(mod_mem_type_name) + if module_mem_index is None: + raise AttributeError(f"Unknown module memory type '{mod_mem_type_name}'") + + if not (0 <= module_mem_index < self.mem.count): + raise AttributeError( + f"Invalid module memory type index '{module_mem_index}'" + ) + + return self.mem[module_mem_index] + + def _get_mem_size(self, mod_mem_type_name): + return self._get_mem_type(mod_mem_type_name).size + + def _get_mem_base(self, mod_mem_type_name): + return self._get_mem_type(mod_mem_type_name).base + def get_module_base(self): if self.has_member("mem"): # kernels 6.4+ - try: - return self.mem[self.mod_mem_type["MOD_TEXT"]].base - except KeyError: - raise AttributeError( - "module -> get_module_base: Unable to get module base. Cannot read base from MOD_TEXT." - ) + return self._get_mem_base("MOD_TEXT") elif self.has_member("core_layout"): return self.core_layout.base elif self.has_member("module_core"): return self.module_core - raise AttributeError("module -> get_module_base: Unable to get module base") + + raise AttributeError("Unable to get module base") def get_init_size(self): if self.has_member("mem"): # kernels 6.4+ - try: - return ( - self.mem[self.mod_mem_type["MOD_INIT_TEXT"]].size - + self.mem[self.mod_mem_type["MOD_INIT_DATA"]].size - + self.mem[self.mod_mem_type["MOD_INIT_RODATA"]].size - ) - except KeyError: - raise AttributeError( - "module -> get_init_size: Unable to determine .init section size of module. Cannot read size of MOD_INIT_TEXT, MOD_INIT_DATA, and MOD_INIT_RODATA" - ) + return ( + self._get_mem_size("MOD_INIT_TEXT") + + self._get_mem_size("MOD_INIT_DATA") + + self._get_mem_size("MOD_INIT_RODATA") + ) elif self.has_member("init_layout"): return self.init_layout.size elif self.has_member("init_size"): return self.init_size - raise AttributeError( - "module -> get_init_size: Unable to determine .init section size of module" - ) + + raise AttributeError("Unable to determine .init section size of module") def get_core_size(self): if self.has_member("mem"): # kernels 6.4+ - try: - return ( - self.mem[self.mod_mem_type["MOD_TEXT"]].size - + self.mem[self.mod_mem_type["MOD_DATA"]].size - + self.mem[self.mod_mem_type["MOD_RODATA"]].size - + self.mem[self.mod_mem_type["MOD_RO_AFTER_INIT"]].size - ) - except KeyError: - raise AttributeError( - "module -> get_core_size: Unable to determine core size of module. Cannot read size of MOD_TEXT, MOD_DATA, MOD_RODATA, and MOD_RO_AFTER_INIT." - ) + return ( + self._get_mem_size("MOD_TEXT") + + self._get_mem_size("MOD_DATA") + + self._get_mem_size("MOD_RODATA") + + self._get_mem_size("MOD_RO_AFTER_INIT") + ) elif self.has_member("core_layout"): return self.core_layout.size elif self.has_member("core_size"): return self.core_size - raise AttributeError( - "module -> get_core_size: Unable to determine core size of module" - ) + + raise AttributeError("Unable to determine core size of module") def get_module_core(self): if self.has_member("mem"): # kernels 6.4+ - try: - return self.mem[self.mod_mem_type["MOD_TEXT"]].base - except KeyError: - raise AttributeError( - "module -> get_module_core: Unable to get module core. Cannot read base from MOD_TEXT." - ) + return self._get_mem_base("MOD_TEXT") elif self.has_member("core_layout"): return self.core_layout.base elif self.has_member("module_core"): return self.module_core - raise AttributeError("module -> get_module_core: Unable to get module core") + raise AttributeError("Unable to get module core") def get_module_init(self): if self.has_member("mem"): # kernels 6.4+ - try: - return self.mem[self.mod_mem_type["MOD_INIT_TEXT"]].base - except KeyError: - raise AttributeError( - "module -> get_module_core: Unable to get module init. Cannot read base from MOD_INIT_TEXT." - ) + return self._get_mem_base("MOD_INIT_TEXT") elif self.has_member("init_layout"): return self.init_layout.base elif self.has_member("module_init"): return self.module_init - raise AttributeError("module -> get_module_init: Unable to get module init") + raise AttributeError("Unable to get module init") def get_name(self): """Get the name of the module as a string""" From 5dee3aeeee5e6b08481c1e39d2f7cb37a17c086c Mon Sep 17 00:00:00 2001 From: Gustavo Moreira Date: Tue, 1 Oct 2024 19:30:50 +1000 Subject: [PATCH 02/17] Add linux.hidden_modules plugin --- .../framework/plugins/linux/hidden_modules.py | 331 ++++++++++++++++++ .../symbols/linux/extensions/__init__.py | 38 ++ 2 files changed, 369 insertions(+) create mode 100644 volatility3/framework/plugins/linux/hidden_modules.py diff --git a/volatility3/framework/plugins/linux/hidden_modules.py b/volatility3/framework/plugins/linux/hidden_modules.py new file mode 100644 index 0000000000..1c2429449e --- /dev/null +++ b/volatility3/framework/plugins/linux/hidden_modules.py @@ -0,0 +1,331 @@ +# This file is Copyright 2024 Volatility Foundation and licensed under the Volatility Software License 1.0 +# which is available at https://www.volatilityfoundation.org/license/vsl-v1.0 +# +import re +import functools +import logging +import contextlib +from typing import List, Iterable +from volatility3.framework import renderers, interfaces, exceptions, objects +from volatility3.framework.constants.architectures import LINUX_ARCHS +from volatility3.framework.renderers import format_hints +from volatility3.framework.configuration import requirements +from volatility3.plugins.linux import lsmod + +vollog = logging.getLogger(__name__) + + +class Hidden_modules(interfaces.plugins.PluginInterface): + """Carves memory to find hidden kernel modules""" + + _required_framework_version = (2, 10, 0) + + _version = (1, 0, 0) + + @classmethod + def get_requirements(cls) -> List[interfaces.configuration.RequirementInterface]: + return [ + requirements.ModuleRequirement( + name="kernel", + description="Linux kernel", + architectures=LINUX_ARCHS, + ), + requirements.PluginRequirement( + name="lsmod", plugin=lsmod.Lsmod, version=(2, 0, 0) + ), + requirements.BooleanRequirement( + name="fast", + description="Fast scan method. Recommended only for kernels 4.2 and above", + optional=True, + default=False, + ), + ] + + def _get_modules_memory_boundaries(self, vmlinux): + if vmlinux.has_symbol("mod_tree"): + mod_tree = vmlinux.object_from_symbol("mod_tree") + modules_addr_min = mod_tree.addr_min + modules_addr_max = mod_tree.addr_max + elif vmlinux.has_symbol("module_addr_min"): + modules_addr_min = vmlinux.object_from_symbol("module_addr_min") + modules_addr_max = vmlinux.object_from_symbol("module_addr_max") + + if isinstance(modules_addr_min, objects.Void): + # Crap ISF! Here's my best-effort workaround + vollog.warning( + "Your ISF symbols are missing type information. You may need to update " + "the ISF using the latest version of dwarf2json" + ) + # See issue #1041. In the Linux kernel these are "unsigned long" + for type_name in ("long unsigned int", "unsigned long"): + if vmlinux.has_type(type_name): + modules_addr_min = modules_addr_min.cast(type_name) + modules_addr_max = modules_addr_max.cast(type_name) + break + else: + raise exceptions.VolatilityException( + "Bad ISF! Please update the ISF using the latest version of dwarf2json" + ) + else: + raise exceptions.VolatilityException( + "Cannot find the module memory allocation area. Unsupported kernel" + ) + + return modules_addr_min, modules_addr_max + + def _get_module_state_live_bytes( + self, + context: interfaces.context.ContextInterface, + vmlinux_module_name: str, + ) -> bytes: + """Retrieve the MODULE_STATE_LIVE value bytes by introspecting its enum type + + Args: + context: The context to retrieve required elements (layers, symbol tables) from + vmlinux_module_name: The name of the kernel module on which to operate + + Returns: + The MODULE_STATE_LIVE value bytes + """ + vmlinux = context.modules[vmlinux_module_name] + module_state_type_template = vmlinux.get_type("module").vol.members["state"][1] + module_state_live_val = module_state_type_template.choices["MODULE_STATE_LIVE"] + data_format = module_state_type_template.base_type.vol.data_format + module_state_live_bytes = objects.convert_value_to_data( + module_state_live_val, int, data_format + ) + return module_state_live_bytes + + def get_hidden_modules_vol2( + self, + context: interfaces.context.ContextInterface, + vmlinux_module_name: str, + known_module_addresses, + modules_memory_boundaries: tuple, + ) -> Iterable[interfaces.objects.ObjectInterface]: + """Enumerate hidden modules using the traditional implementation. + + This is a port of the Volatility2 plugin, with minor code improvements. + + Args: + context: The context to retrieve required elements (layers, symbol tables) from + vmlinux_module_name: The name of the kernel module on which to operate + Yields: + module objects + """ + vmlinux = context.modules[vmlinux_module_name] + vmlinux_layer = context.layers[vmlinux.layer_name] + + check_nums = ( + 3000, + 2800, + 2700, + 2500, + 2300, + 2100, + 2000, + 1500, + 1300, + 1200, + 1024, + 512, + 256, + 128, + 96, + 64, + 48, + 32, + 24, + ) + modules_addr_min, modules_addr_max = modules_memory_boundaries + modules_addr_min = modules_addr_min & ~0xFFF + modules_addr_max = (modules_addr_max & ~0xFFF) + vmlinux_layer.page_size + + check_bufs = [] + replace_bufs = [] + minus_size = vmlinux.get_type("pointer").size + null_pointer_bytes = b"\x00" * minus_size + for num in check_nums: + check_bufs.append(b"\x00" * num) + replace_bufs.append((b"\xff" * (num - minus_size)) + null_pointer_bytes) + + all_ffs = b"\xff" * 4096 + scan_list = [] + for page_addr in range( + modules_addr_min, modules_addr_max, vmlinux_layer.page_size + ): + content_fixed = all_ffs + with contextlib.suppress( + exceptions.InvalidAddressException, + exceptions.PagedInvalidAddressException, + ): + content = vmlinux_layer.read(page_addr, vmlinux_layer.page_size) + + all_nulls = all(x == 0 for x in content) + if content and not all_nulls: + content_fixed = content + for check_bytes, replace_bytes in zip(check_bufs, replace_bufs): + content_fixed = content_fixed.replace( + check_bytes, replace_bytes + ) + + scan_list.append(content_fixed) + + scan_buf = b"".join(scan_list) + del scan_list + + module_state_live_bytes = self._get_module_state_live_bytes( + context, vmlinux_module_name + ) + # f'strings cannot be combined with bytes literals + for cur_addr in re.finditer(b"(?=(%s))" % (module_state_live_bytes), scan_buf): + module_addr = modules_addr_min + cur_addr.start() + + if module_addr in known_module_addresses: + continue + + module = vmlinux.object("module", offset=module_addr, absolute=True) + if module and module.is_valid(): + yield module + + @functools.cached_property + def module_address_alignment(self) -> int: + """Obtain the module memory address alignment. This is only used with the fast scan method. + + struct module is aligned to the L1 cache line, which is typically 64 bytes for most + common i386/AMD64/ARM64 configurations. In some cases, it can be 128 bytes, but this + will still work. + + Returns: + The struct module alignment + """ + # FIXME: When dwarf2json/ISF supports type alignments. Read it directly from the type metadata + # The cached_property won't provide any benefits until then + return 64 + + def get_hidden_modules_fast( + self, + context: interfaces.context.ContextInterface, + vmlinux_module_name: str, + known_module_addresses, + modules_memory_boundaries: tuple, + ) -> Iterable[interfaces.objects.ObjectInterface]: + """Enumerate hidden modules by taking advantage of memory address alignment patterns + + This technique is much faster and uses less memory than the traditional scan method + in Volatility2, but it doesn't work with older kernels. + + From kernels 4.2 struct module allocation are aligned to the L1 cache line size. + In i386/amd64/arm64 this is typically 64 bytes. However, this can be changed in + the Linux kernel configuration via CONFIG_X86_L1_CACHE_SHIFT. The alignment can + also be obtained from the DWARF info i.e. DW_AT_alignment<64>, but dwarf2json + doesn't support this feature yet. + In kernels < 4.2, alignment attributes are absent in the struct module, meaning + alignment cannot be guaranteed. Therefore, for older kernels, it's better to use + the traditional scan technique. + + Args: + context: The context to retrieve required elements (layers, symbol tables) from + vmlinux_module_name: The name of the kernel module on which to operate + Yields: + module objects + """ + vmlinux = context.modules[vmlinux_module_name] + vmlinux_layer = context.layers[vmlinux.layer_name] + + module_addr_min, module_addr_max = modules_memory_boundaries + + module_state_live_bytes = self._get_module_state_live_bytes( + context, vmlinux_module_name + ) + + for module_addr in range( + module_addr_min, module_addr_max, self.module_address_alignment + ): + if module_addr in known_module_addresses: + continue + + try: + # This is just a pre-filter. Module readability and consistency are verified in module.is_valid() + module_state_bytes = vmlinux_layer.read( + module_addr, len(module_state_live_bytes) + ) + if module_state_bytes != module_state_live_bytes: + continue + except ( + exceptions.PagedInvalidAddressException, + exceptions.InvalidAddressException, + ): + continue + + module = vmlinux.object("module", offset=module_addr, absolute=True) + if module and module.is_valid(): + yield module + + def _validate_alignment_patterns(self, addresses: Iterable[int]) -> bool: + """Check if the memory addresses meet our alignments patterns + + Args: + addresses: Iterable with the address values + + Returns: + True if all the addresses meet the alignment + """ + return all(addr % self.module_address_alignment == 0 for addr in addresses) + + def get_hidden_modules( + self, + context: interfaces.context.ContextInterface, + vmlinux_module_name: str, + ) -> Iterable[interfaces.objects.ObjectInterface]: + """Enumerate hidden modules + + Args: + context: The context to retrieve required elements (layers, symbol tables) from + vmlinux_module_name: The name of the kernel module on which to operate + Yields: + module objects + """ + vmlinux = context.modules[vmlinux_module_name] + vmlinux_layer = context.layers[vmlinux.layer_name] + + known_module_addresses = { + vmlinux_layer.canonicalize(module.vol.offset) + for module in lsmod.Lsmod.list_modules(context, vmlinux_module_name) + } + + modules_memory_boundaries = self._get_modules_memory_boundaries(vmlinux) + + if self.config.get("fast"): + if self._validate_alignment_patterns(known_module_addresses): + scan_method = self.get_hidden_modules_fast + else: + vollog.warning( + f"Module addresses aren't aligned to {self.module_address_alignment} bytes. " + "Switching to the traditional scan method." + ) + scan_method = self.get_hidden_modules_vol2 + else: + scan_method = self.get_hidden_modules_vol2 + + yield from scan_method( + context, + vmlinux_module_name, + known_module_addresses, + modules_memory_boundaries, + ) + + def _generator(self): + vmlinux_module_name = self.config["kernel"] + for module in self.get_hidden_modules(self.context, vmlinux_module_name): + module_addr = module.vol.offset + module_name = module.get_name() or renderers.NotAvailableValue() + fields = (format_hints.Hex(module_addr), module_name) + yield (0, fields) + + def run(self): + headers = [ + ("Address", format_hints.Hex), + ("Name", str), + ] + return renderers.TreeGrid(headers, self._generator()) diff --git a/volatility3/framework/symbols/linux/extensions/__init__.py b/volatility3/framework/symbols/linux/extensions/__init__.py index 0a879249b4..b30634ca3e 100644 --- a/volatility3/framework/symbols/linux/extensions/__init__.py +++ b/volatility3/framework/symbols/linux/extensions/__init__.py @@ -35,6 +35,34 @@ def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self._mod_mem_type = None # Initialize _mod_mem_type to None for memoization + def is_valid(self): + layer = self._context.layers[self.vol.layer_name] + # Make sure the entire module content is readable + if not layer.is_valid(self.vol.offset, self.vol.size): + return False + + if not self.state.is_valid_choice: + return False + + core_size = self.get_core_size() + if not ( + 1 <= core_size <= 20000000 + and core_size + self.get_init_size() >= 4096 + and 1 <= self.get_core_text_size() <= 20000000 + ): + return False + + if self.has_member("mkobj") and self.mkobj.has_member("mod"): + if not ( + self.mkobj + and self.mkobj.mod + and self.mkobj.mod.is_readable() + and self.mkobj.mod == self.vol.offset + ): + return False + + return True + @property def mod_mem_type(self): """Return the mod_mem_type enum choices if available or an empty dict if not""" @@ -112,6 +140,16 @@ def get_core_size(self): raise AttributeError("Unable to determine core size of module") + def get_core_text_size(self): + if self.has_member("mem"): # kernels 6.4+ + return self._get_mem_size("MOD_TEXT") + elif self.has_member("core_layout"): + return self.core_layout.text_size + elif self.has_member("core_text_size"): + return self.core_text_size + + raise AttributeError("Unable to determine core text size of module") + def get_module_core(self): if self.has_member("mem"): # kernels 6.4+ return self._get_mem_base("MOD_TEXT") From d5e6e7c203f4e17390f0891fa4b70138d1fb97a5 Mon Sep 17 00:00:00 2001 From: Gustavo Moreira Date: Wed, 2 Oct 2024 09:43:27 +1000 Subject: [PATCH 03/17] Allow any module state value in both traditional and fast scan methods --- .../framework/plugins/linux/hidden_modules.py | 30 ++++++++++--------- 1 file changed, 16 insertions(+), 14 deletions(-) diff --git a/volatility3/framework/plugins/linux/hidden_modules.py b/volatility3/framework/plugins/linux/hidden_modules.py index 1c2429449e..6d04c08afc 100644 --- a/volatility3/framework/plugins/linux/hidden_modules.py +++ b/volatility3/framework/plugins/linux/hidden_modules.py @@ -73,28 +73,29 @@ def _get_modules_memory_boundaries(self, vmlinux): return modules_addr_min, modules_addr_max - def _get_module_state_live_bytes( + def _get_module_state_values_bytes( self, context: interfaces.context.ContextInterface, vmlinux_module_name: str, - ) -> bytes: - """Retrieve the MODULE_STATE_LIVE value bytes by introspecting its enum type + ) -> List[bytes]: + """Retrieve the module state values bytes by introspecting its enum type Args: context: The context to retrieve required elements (layers, symbol tables) from vmlinux_module_name: The name of the kernel module on which to operate Returns: - The MODULE_STATE_LIVE value bytes + A list with the module state values bytes """ vmlinux = context.modules[vmlinux_module_name] module_state_type_template = vmlinux.get_type("module").vol.members["state"][1] - module_state_live_val = module_state_type_template.choices["MODULE_STATE_LIVE"] data_format = module_state_type_template.base_type.vol.data_format - module_state_live_bytes = objects.convert_value_to_data( - module_state_live_val, int, data_format - ) - return module_state_live_bytes + values = module_state_type_template.choices.values() + values_bytes = [ + objects.convert_value_to_data(value, int, data_format) + for value in sorted(values) + ] + return values_bytes def get_hidden_modules_vol2( self, @@ -174,11 +175,12 @@ def get_hidden_modules_vol2( scan_buf = b"".join(scan_list) del scan_list - module_state_live_bytes = self._get_module_state_live_bytes( + module_state_values_bytes = self._get_module_state_values_bytes( context, vmlinux_module_name ) + values_bytes_pattern = b"|".join(module_state_values_bytes) # f'strings cannot be combined with bytes literals - for cur_addr in re.finditer(b"(?=(%s))" % (module_state_live_bytes), scan_buf): + for cur_addr in re.finditer(b"(?=(%s))" % values_bytes_pattern, scan_buf): module_addr = modules_addr_min + cur_addr.start() if module_addr in known_module_addresses: @@ -235,7 +237,7 @@ def get_hidden_modules_fast( module_addr_min, module_addr_max = modules_memory_boundaries - module_state_live_bytes = self._get_module_state_live_bytes( + module_state_values_bytes = self._get_module_state_values_bytes( context, vmlinux_module_name ) @@ -248,9 +250,9 @@ def get_hidden_modules_fast( try: # This is just a pre-filter. Module readability and consistency are verified in module.is_valid() module_state_bytes = vmlinux_layer.read( - module_addr, len(module_state_live_bytes) + module_addr, len(module_state_values_bytes[0]) ) - if module_state_bytes != module_state_live_bytes: + if module_state_bytes not in module_state_values_bytes: continue except ( exceptions.PagedInvalidAddressException, From 590aa9c19ca56250e87c07ed49dbddc55a22b4a8 Mon Sep 17 00:00:00 2001 From: Gustavo Moreira Date: Thu, 3 Oct 2024 14:50:39 +1000 Subject: [PATCH 04/17] Make it callable from other plugins. Additionally, classmethod helpers were added, and docstrings were enhanced for improved usability and clarity. --- .../framework/plugins/linux/hidden_modules.py | 154 +++++++++++++----- 1 file changed, 116 insertions(+), 38 deletions(-) diff --git a/volatility3/framework/plugins/linux/hidden_modules.py b/volatility3/framework/plugins/linux/hidden_modules.py index 6d04c08afc..4843f30373 100644 --- a/volatility3/framework/plugins/linux/hidden_modules.py +++ b/volatility3/framework/plugins/linux/hidden_modules.py @@ -2,10 +2,9 @@ # which is available at https://www.volatilityfoundation.org/license/vsl-v1.0 # import re -import functools import logging import contextlib -from typing import List, Iterable +from typing import List, Set, Tuple, Iterable from volatility3.framework import renderers, interfaces, exceptions, objects from volatility3.framework.constants.architectures import LINUX_ARCHS from volatility3.framework.renderers import format_hints @@ -41,7 +40,21 @@ def get_requirements(cls) -> List[interfaces.configuration.RequirementInterface] ), ] - def _get_modules_memory_boundaries(self, vmlinux): + @staticmethod + def get_modules_memory_boundaries( + context: interfaces.context.ContextInterface, + vmlinux_module_name: str, + ) -> Tuple[int]: + """Determine the boundaries of the module allocation area + + Args: + context: The context to retrieve required elements (layers, symbol tables) from + vmlinux_module_name: The name of the kernel module on which to operate + + Returns: + A tuple containing the minimum and maximum addresses for the module allocation area. + """ + vmlinux = context.modules[vmlinux_module_name] if vmlinux.has_symbol("mod_tree"): mod_tree = vmlinux.object_from_symbol("mod_tree") modules_addr_min = mod_tree.addr_min @@ -73,8 +86,8 @@ def _get_modules_memory_boundaries(self, vmlinux): return modules_addr_min, modules_addr_max + @staticmethod def _get_module_state_values_bytes( - self, context: interfaces.context.ContextInterface, vmlinux_module_name: str, ) -> List[bytes]: @@ -97,12 +110,13 @@ def _get_module_state_values_bytes( ] return values_bytes - def get_hidden_modules_vol2( - self, + @classmethod + def _get_hidden_modules_vol2( + cls, context: interfaces.context.ContextInterface, vmlinux_module_name: str, - known_module_addresses, - modules_memory_boundaries: tuple, + known_module_addresses: Set[int], + modules_memory_boundaries: Tuple, ) -> Iterable[interfaces.objects.ObjectInterface]: """Enumerate hidden modules using the traditional implementation. @@ -111,6 +125,9 @@ def get_hidden_modules_vol2( Args: context: The context to retrieve required elements (layers, symbol tables) from vmlinux_module_name: The name of the kernel module on which to operate + known_module_addresses: Set with known module addresses + modules_memory_boundaries: Minimum and maximum address boundaries for module allocation. + Yields: module objects """ @@ -175,7 +192,7 @@ def get_hidden_modules_vol2( scan_buf = b"".join(scan_list) del scan_list - module_state_values_bytes = self._get_module_state_values_bytes( + module_state_values_bytes = cls._get_module_state_values_bytes( context, vmlinux_module_name ) values_bytes_pattern = b"|".join(module_state_values_bytes) @@ -190,27 +207,37 @@ def get_hidden_modules_vol2( if module and module.is_valid(): yield module - @functools.cached_property - def module_address_alignment(self) -> int: + @classmethod + def _get_module_address_alignment( + cls, + context: interfaces.context.ContextInterface, + vmlinux_module_name: str, + ) -> int: """Obtain the module memory address alignment. This is only used with the fast scan method. struct module is aligned to the L1 cache line, which is typically 64 bytes for most common i386/AMD64/ARM64 configurations. In some cases, it can be 128 bytes, but this will still work. + Args: + context: The context to retrieve required elements (layers, symbol tables) from + vmlinux_module_name: The name of the kernel module on which to operate + Returns: The struct module alignment """ # FIXME: When dwarf2json/ISF supports type alignments. Read it directly from the type metadata - # The cached_property won't provide any benefits until then + # Also, 'context' and 'vmlinux_module_name' are not used yet, but they will be needed to obtain + # the type metadata return 64 - def get_hidden_modules_fast( - self, + @classmethod + def _get_hidden_modules_fast( + cls, context: interfaces.context.ContextInterface, vmlinux_module_name: str, - known_module_addresses, - modules_memory_boundaries: tuple, + known_module_addresses: Set[int], + modules_memory_boundaries: Tuple, ) -> Iterable[interfaces.objects.ObjectInterface]: """Enumerate hidden modules by taking advantage of memory address alignment patterns @@ -229,6 +256,9 @@ def get_hidden_modules_fast( Args: context: The context to retrieve required elements (layers, symbol tables) from vmlinux_module_name: The name of the kernel module on which to operate + known_module_addresses: Set with known module addresses + modules_memory_boundaries: Minimum and maximum address boundaries for module allocation. + Yields: module objects """ @@ -237,12 +267,16 @@ def get_hidden_modules_fast( module_addr_min, module_addr_max = modules_memory_boundaries - module_state_values_bytes = self._get_module_state_values_bytes( + module_state_values_bytes = cls._get_module_state_values_bytes( + context, vmlinux_module_name + ) + + module_address_alignment = cls._get_module_address_alignment( context, vmlinux_module_name ) for module_addr in range( - module_addr_min, module_addr_max, self.module_address_alignment + module_addr_min, module_addr_max, module_address_alignment ): if module_addr in known_module_addresses: continue @@ -264,51 +298,59 @@ def get_hidden_modules_fast( if module and module.is_valid(): yield module - def _validate_alignment_patterns(self, addresses: Iterable[int]) -> bool: + @staticmethod + def _validate_alignment_patterns( + addresses: Iterable[int], + address_alignment: int, + ) -> bool: """Check if the memory addresses meet our alignments patterns Args: addresses: Iterable with the address values + address_alignment: Number of bytes for alignment validation Returns: True if all the addresses meet the alignment """ - return all(addr % self.module_address_alignment == 0 for addr in addresses) + return all(addr % address_alignment == 0 for addr in addresses) + @classmethod def get_hidden_modules( - self, + cls, context: interfaces.context.ContextInterface, vmlinux_module_name: str, + known_module_addresses: Set[int], + modules_memory_boundaries: Tuple, + fast_method: bool = False, ) -> Iterable[interfaces.objects.ObjectInterface]: """Enumerate hidden modules Args: context: The context to retrieve required elements (layers, symbol tables) from vmlinux_module_name: The name of the kernel module on which to operate + known_module_addresses: Set with known module addresses + modules_memory_boundaries: Minimum and maximum address boundaries for module allocation. + fast_method: If True, it uses the fast method. Otherwise, it uses the traditional one. + Yields: module objects """ - vmlinux = context.modules[vmlinux_module_name] - vmlinux_layer = context.layers[vmlinux.layer_name] - - known_module_addresses = { - vmlinux_layer.canonicalize(module.vol.offset) - for module in lsmod.Lsmod.list_modules(context, vmlinux_module_name) - } - - modules_memory_boundaries = self._get_modules_memory_boundaries(vmlinux) - - if self.config.get("fast"): - if self._validate_alignment_patterns(known_module_addresses): - scan_method = self.get_hidden_modules_fast + if fast_method: + module_address_alignment = cls._get_module_address_alignment( + context, vmlinux_module_name + ) + if cls._validate_alignment_patterns( + known_module_addresses, module_address_alignment + ): + scan_method = cls._get_hidden_modules_fast else: vollog.warning( - f"Module addresses aren't aligned to {self.module_address_alignment} bytes. " + f"Module addresses aren't aligned to {module_address_alignment} bytes. " "Switching to the traditional scan method." ) - scan_method = self.get_hidden_modules_vol2 + scan_method = cls._get_hidden_modules_vol2 else: - scan_method = self.get_hidden_modules_vol2 + scan_method = cls._get_hidden_modules_vol2 yield from scan_method( context, @@ -317,9 +359,45 @@ def get_hidden_modules( modules_memory_boundaries, ) + @classmethod + def get_lsmod_module_addresses( + cls, + context: interfaces.context.ContextInterface, + vmlinux_module_name: str, + ) -> Set[int]: + """Obtain a set the known module addresses from linux.lsmod plugin + + Args: + context: The context to retrieve required elements (layers, symbol tables) from + vmlinux_module_name: The name of the kernel module on which to operate + + Returns: + A set containing known kernel module addresses + """ + vmlinux = context.modules[vmlinux_module_name] + vmlinux_layer = context.layers[vmlinux.layer_name] + + known_module_addresses = { + vmlinux_layer.canonicalize(module.vol.offset) + for module in lsmod.Lsmod.list_modules(context, vmlinux_module_name) + } + return known_module_addresses + def _generator(self): vmlinux_module_name = self.config["kernel"] - for module in self.get_hidden_modules(self.context, vmlinux_module_name): + known_module_addresses = self.get_lsmod_module_addresses( + self.context, vmlinux_module_name + ) + modules_memory_boundaries = self.get_modules_memory_boundaries( + self.context, vmlinux_module_name + ) + for module in self.get_hidden_modules( + self.context, + vmlinux_module_name, + known_module_addresses, + modules_memory_boundaries, + fast_method=self.config.get("fast"), + ): module_addr = module.vol.offset module_name = module.get_name() or renderers.NotAvailableValue() fields = (format_hints.Hex(module_addr), module_name) From 8d925bd8ad6346370d6256ddacced9d0db36b23d Mon Sep 17 00:00:00 2001 From: Gustavo Moreira Date: Thu, 3 Oct 2024 15:17:02 +1000 Subject: [PATCH 05/17] Added the --heuristic-mode option, which relaxes constraints to improve detection of more advanced threats --- .../framework/plugins/linux/hidden_modules.py | 46 ++++++++++++------- .../symbols/linux/extensions/__init__.py | 4 +- 2 files changed, 32 insertions(+), 18 deletions(-) diff --git a/volatility3/framework/plugins/linux/hidden_modules.py b/volatility3/framework/plugins/linux/hidden_modules.py index 4843f30373..1ec43aefdb 100644 --- a/volatility3/framework/plugins/linux/hidden_modules.py +++ b/volatility3/framework/plugins/linux/hidden_modules.py @@ -38,6 +38,13 @@ def get_requirements(cls) -> List[interfaces.configuration.RequirementInterface] optional=True, default=False, ), + requirements.BooleanRequirement( + name="heuristic-mode", + description="Relaxed constraints. This may generate some false positives and" + "take a bit longer. This feature is available only when using the --fast option", + optional=True, + default=False, + ), ] @staticmethod @@ -117,6 +124,7 @@ def _get_hidden_modules_vol2( vmlinux_module_name: str, known_module_addresses: Set[int], modules_memory_boundaries: Tuple, + heuristic_mode: bool = False, ) -> Iterable[interfaces.objects.ObjectInterface]: """Enumerate hidden modules using the traditional implementation. @@ -127,6 +135,7 @@ def _get_hidden_modules_vol2( vmlinux_module_name: The name of the kernel module on which to operate known_module_addresses: Set with known module addresses modules_memory_boundaries: Minimum and maximum address boundaries for module allocation. + heuristic_mode: ignored for this scan method. Yields: module objects @@ -227,8 +236,8 @@ def _get_module_address_alignment( The struct module alignment """ # FIXME: When dwarf2json/ISF supports type alignments. Read it directly from the type metadata - # Also, 'context' and 'vmlinux_module_name' are not used yet, but they will be needed to obtain - # the type metadata + # Additionally, while 'context' and 'vmlinux_module_name' are currently unused, they will be + # essential for retrieving type metadata in the future. return 64 @classmethod @@ -238,6 +247,7 @@ def _get_hidden_modules_fast( vmlinux_module_name: str, known_module_addresses: Set[int], modules_memory_boundaries: Tuple, + heuristic_mode: bool = False, ) -> Iterable[interfaces.objects.ObjectInterface]: """Enumerate hidden modules by taking advantage of memory address alignment patterns @@ -258,7 +268,7 @@ def _get_hidden_modules_fast( vmlinux_module_name: The name of the kernel module on which to operate known_module_addresses: Set with known module addresses modules_memory_boundaries: Minimum and maximum address boundaries for module allocation. - + heuristic_mode: If True, it loosens constraints to enhance the detection of advanced threats. Yields: module objects """ @@ -281,21 +291,22 @@ def _get_hidden_modules_fast( if module_addr in known_module_addresses: continue - try: - # This is just a pre-filter. Module readability and consistency are verified in module.is_valid() - module_state_bytes = vmlinux_layer.read( - module_addr, len(module_state_values_bytes[0]) - ) - if module_state_bytes not in module_state_values_bytes: + if not heuristic_mode: + try: + # This is just a pre-filter. Module readability and consistency are verified in module.is_valid() + module_state_bytes = vmlinux_layer.read( + module_addr, len(module_state_values_bytes[0]) + ) + if module_state_bytes not in module_state_values_bytes: + continue + except ( + exceptions.PagedInvalidAddressException, + exceptions.InvalidAddressException, + ): continue - except ( - exceptions.PagedInvalidAddressException, - exceptions.InvalidAddressException, - ): - continue module = vmlinux.object("module", offset=module_addr, absolute=True) - if module and module.is_valid(): + if module and module.is_valid(strict_states=not heuristic_mode): yield module @staticmethod @@ -322,6 +333,7 @@ def get_hidden_modules( known_module_addresses: Set[int], modules_memory_boundaries: Tuple, fast_method: bool = False, + heuristic_mode: bool = False, ) -> Iterable[interfaces.objects.ObjectInterface]: """Enumerate hidden modules @@ -331,7 +343,7 @@ def get_hidden_modules( known_module_addresses: Set with known module addresses modules_memory_boundaries: Minimum and maximum address boundaries for module allocation. fast_method: If True, it uses the fast method. Otherwise, it uses the traditional one. - + heuristic_mode: If True, it loosens constraints to enhance the detection of advanced threats. Yields: module objects """ @@ -357,6 +369,7 @@ def get_hidden_modules( vmlinux_module_name, known_module_addresses, modules_memory_boundaries, + heuristic_mode, ) @classmethod @@ -397,6 +410,7 @@ def _generator(self): known_module_addresses, modules_memory_boundaries, fast_method=self.config.get("fast"), + heuristic_mode=self.config.get("heuristic-mode"), ): module_addr = module.vol.offset module_name = module.get_name() or renderers.NotAvailableValue() diff --git a/volatility3/framework/symbols/linux/extensions/__init__.py b/volatility3/framework/symbols/linux/extensions/__init__.py index b30634ca3e..810df41f80 100644 --- a/volatility3/framework/symbols/linux/extensions/__init__.py +++ b/volatility3/framework/symbols/linux/extensions/__init__.py @@ -35,13 +35,13 @@ def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self._mod_mem_type = None # Initialize _mod_mem_type to None for memoization - def is_valid(self): + def is_valid(self, strict_states=True): layer = self._context.layers[self.vol.layer_name] # Make sure the entire module content is readable if not layer.is_valid(self.vol.offset, self.vol.size): return False - if not self.state.is_valid_choice: + if strict_states and not self.state.is_valid_choice: return False core_size = self.get_core_size() From e8754fadbd93864ea2623e7df10bb33265db5073 Mon Sep 17 00:00:00 2001 From: Gustavo Moreira Date: Thu, 3 Oct 2024 15:50:30 +1000 Subject: [PATCH 06/17] Fix typo in usage help --- volatility3/framework/plugins/linux/hidden_modules.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/volatility3/framework/plugins/linux/hidden_modules.py b/volatility3/framework/plugins/linux/hidden_modules.py index 1ec43aefdb..86a4c36261 100644 --- a/volatility3/framework/plugins/linux/hidden_modules.py +++ b/volatility3/framework/plugins/linux/hidden_modules.py @@ -40,7 +40,7 @@ def get_requirements(cls) -> List[interfaces.configuration.RequirementInterface] ), requirements.BooleanRequirement( name="heuristic-mode", - description="Relaxed constraints. This may generate some false positives and" + description="Relaxed constraints. This may generate false positives and " "take a bit longer. This feature is available only when using the --fast option", optional=True, default=False, From b5948d7ec0e9bb6dcc193f8c74b40bdb498ac3a8 Mon Sep 17 00:00:00 2001 From: Gustavo Moreira Date: Fri, 4 Oct 2024 07:48:25 +1000 Subject: [PATCH 07/17] Linux: hidden_modules: Add @Abyss-W4tcher suggestion to optimize the fast scan method for even better performance, using the mkobj.mod self referential validation used in module.is_valid() as pre-filter Removed the --heuristic-mode and the module.states validation, since the self referential check is enough by itself --- .../framework/plugins/linux/hidden_modules.py | 53 ++++++++----------- .../symbols/linux/extensions/__init__.py | 20 +++---- 2 files changed, 30 insertions(+), 43 deletions(-) diff --git a/volatility3/framework/plugins/linux/hidden_modules.py b/volatility3/framework/plugins/linux/hidden_modules.py index 86a4c36261..a76d1c69d7 100644 --- a/volatility3/framework/plugins/linux/hidden_modules.py +++ b/volatility3/framework/plugins/linux/hidden_modules.py @@ -38,13 +38,6 @@ def get_requirements(cls) -> List[interfaces.configuration.RequirementInterface] optional=True, default=False, ), - requirements.BooleanRequirement( - name="heuristic-mode", - description="Relaxed constraints. This may generate false positives and " - "take a bit longer. This feature is available only when using the --fast option", - optional=True, - default=False, - ), ] @staticmethod @@ -124,7 +117,6 @@ def _get_hidden_modules_vol2( vmlinux_module_name: str, known_module_addresses: Set[int], modules_memory_boundaries: Tuple, - heuristic_mode: bool = False, ) -> Iterable[interfaces.objects.ObjectInterface]: """Enumerate hidden modules using the traditional implementation. @@ -135,7 +127,6 @@ def _get_hidden_modules_vol2( vmlinux_module_name: The name of the kernel module on which to operate known_module_addresses: Set with known module addresses modules_memory_boundaries: Minimum and maximum address boundaries for module allocation. - heuristic_mode: ignored for this scan method. Yields: module objects @@ -247,7 +238,6 @@ def _get_hidden_modules_fast( vmlinux_module_name: str, known_module_addresses: Set[int], modules_memory_boundaries: Tuple, - heuristic_mode: bool = False, ) -> Iterable[interfaces.objects.ObjectInterface]: """Enumerate hidden modules by taking advantage of memory address alignment patterns @@ -268,7 +258,6 @@ def _get_hidden_modules_fast( vmlinux_module_name: The name of the kernel module on which to operate known_module_addresses: Set with known module addresses modules_memory_boundaries: Minimum and maximum address boundaries for module allocation. - heuristic_mode: If True, it loosens constraints to enhance the detection of advanced threats. Yields: module objects """ @@ -276,37 +265,41 @@ def _get_hidden_modules_fast( vmlinux_layer = context.layers[vmlinux.layer_name] module_addr_min, module_addr_max = modules_memory_boundaries - - module_state_values_bytes = cls._get_module_state_values_bytes( - context, vmlinux_module_name - ) - module_address_alignment = cls._get_module_address_alignment( context, vmlinux_module_name ) + mkobj_offset = vmlinux.get_type("module").relative_child_offset("mkobj") + mod_offset = vmlinux.get_type("module_kobject").relative_child_offset("mod") + offset_to_mkobj_mod = mkobj_offset + mod_offset + mod_member_template = vmlinux.get_type("module_kobject").vol.members["mod"][1] + mod_size = mod_member_template.size + mod_member_data_format = mod_member_template.data_format + for module_addr in range( module_addr_min, module_addr_max, module_address_alignment ): if module_addr in known_module_addresses: continue - if not heuristic_mode: - try: - # This is just a pre-filter. Module readability and consistency are verified in module.is_valid() - module_state_bytes = vmlinux_layer.read( - module_addr, len(module_state_values_bytes[0]) - ) - if module_state_bytes not in module_state_values_bytes: - continue - except ( - exceptions.PagedInvalidAddressException, - exceptions.InvalidAddressException, - ): + try: + # This is just a pre-filter. Module readability and consistency are verified in module.is_valid() + self_referential_bytes = vmlinux_layer.read( + module_addr + offset_to_mkobj_mod, mod_size + ) + self_referential = objects.convert_data_to_value( + self_referential_bytes, int, mod_member_data_format + ) + if self_referential != module_addr: continue + except ( + exceptions.PagedInvalidAddressException, + exceptions.InvalidAddressException, + ): + continue module = vmlinux.object("module", offset=module_addr, absolute=True) - if module and module.is_valid(strict_states=not heuristic_mode): + if module and module.is_valid(): yield module @staticmethod @@ -369,7 +362,6 @@ def get_hidden_modules( vmlinux_module_name, known_module_addresses, modules_memory_boundaries, - heuristic_mode, ) @classmethod @@ -410,7 +402,6 @@ def _generator(self): known_module_addresses, modules_memory_boundaries, fast_method=self.config.get("fast"), - heuristic_mode=self.config.get("heuristic-mode"), ): module_addr = module.vol.offset module_name = module.get_name() or renderers.NotAvailableValue() diff --git a/volatility3/framework/symbols/linux/extensions/__init__.py b/volatility3/framework/symbols/linux/extensions/__init__.py index 810df41f80..9f66220019 100644 --- a/volatility3/framework/symbols/linux/extensions/__init__.py +++ b/volatility3/framework/symbols/linux/extensions/__init__.py @@ -35,15 +35,12 @@ def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self._mod_mem_type = None # Initialize _mod_mem_type to None for memoization - def is_valid(self, strict_states=True): + def is_valid(self): layer = self._context.layers[self.vol.layer_name] # Make sure the entire module content is readable if not layer.is_valid(self.vol.offset, self.vol.size): return False - if strict_states and not self.state.is_valid_choice: - return False - core_size = self.get_core_size() if not ( 1 <= core_size <= 20000000 @@ -52,14 +49,13 @@ def is_valid(self, strict_states=True): ): return False - if self.has_member("mkobj") and self.mkobj.has_member("mod"): - if not ( - self.mkobj - and self.mkobj.mod - and self.mkobj.mod.is_readable() - and self.mkobj.mod == self.vol.offset - ): - return False + if not ( + self.mkobj + and self.mkobj.mod + and self.mkobj.mod.is_readable() + and self.mkobj.mod == self.vol.offset + ): + return False return True From f455c30e4962789e5cac595f57e9ea5090fac6b6 Mon Sep 17 00:00:00 2001 From: Gustavo Moreira Date: Fri, 4 Oct 2024 09:51:08 +1000 Subject: [PATCH 08/17] Linux: hidden_modules: remove missed optional heuristic_mode argument --- volatility3/framework/plugins/linux/hidden_modules.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/volatility3/framework/plugins/linux/hidden_modules.py b/volatility3/framework/plugins/linux/hidden_modules.py index a76d1c69d7..6542f58f3a 100644 --- a/volatility3/framework/plugins/linux/hidden_modules.py +++ b/volatility3/framework/plugins/linux/hidden_modules.py @@ -326,7 +326,6 @@ def get_hidden_modules( known_module_addresses: Set[int], modules_memory_boundaries: Tuple, fast_method: bool = False, - heuristic_mode: bool = False, ) -> Iterable[interfaces.objects.ObjectInterface]: """Enumerate hidden modules @@ -336,7 +335,6 @@ def get_hidden_modules( known_module_addresses: Set with known module addresses modules_memory_boundaries: Minimum and maximum address boundaries for module allocation. fast_method: If True, it uses the fast method. Otherwise, it uses the traditional one. - heuristic_mode: If True, it loosens constraints to enhance the detection of advanced threats. Yields: module objects """ From d98c7ebb1d21f3074d413e7097bde7448932db85 Mon Sep 17 00:00:00 2001 From: Gustavo Moreira Date: Wed, 16 Oct 2024 15:05:02 +1100 Subject: [PATCH 09/17] linux: hidden_modules: Make the fast method the default. Remove vol2 and fall back to a 1-byte alignment scan if addresses aren't aligned to the L1 cache size --- .../framework/plugins/linux/hidden_modules.py | 216 +++--------------- 1 file changed, 26 insertions(+), 190 deletions(-) diff --git a/volatility3/framework/plugins/linux/hidden_modules.py b/volatility3/framework/plugins/linux/hidden_modules.py index 6542f58f3a..69b4492610 100644 --- a/volatility3/framework/plugins/linux/hidden_modules.py +++ b/volatility3/framework/plugins/linux/hidden_modules.py @@ -32,12 +32,6 @@ def get_requirements(cls) -> List[interfaces.configuration.RequirementInterface] requirements.PluginRequirement( name="lsmod", plugin=lsmod.Lsmod, version=(2, 0, 0) ), - requirements.BooleanRequirement( - name="fast", - description="Fast scan method. Recommended only for kernels 4.2 and above", - optional=True, - default=False, - ), ] @staticmethod @@ -86,134 +80,13 @@ def get_modules_memory_boundaries( return modules_addr_min, modules_addr_max - @staticmethod - def _get_module_state_values_bytes( - context: interfaces.context.ContextInterface, - vmlinux_module_name: str, - ) -> List[bytes]: - """Retrieve the module state values bytes by introspecting its enum type - - Args: - context: The context to retrieve required elements (layers, symbol tables) from - vmlinux_module_name: The name of the kernel module on which to operate - - Returns: - A list with the module state values bytes - """ - vmlinux = context.modules[vmlinux_module_name] - module_state_type_template = vmlinux.get_type("module").vol.members["state"][1] - data_format = module_state_type_template.base_type.vol.data_format - values = module_state_type_template.choices.values() - values_bytes = [ - objects.convert_value_to_data(value, int, data_format) - for value in sorted(values) - ] - return values_bytes - - @classmethod - def _get_hidden_modules_vol2( - cls, - context: interfaces.context.ContextInterface, - vmlinux_module_name: str, - known_module_addresses: Set[int], - modules_memory_boundaries: Tuple, - ) -> Iterable[interfaces.objects.ObjectInterface]: - """Enumerate hidden modules using the traditional implementation. - - This is a port of the Volatility2 plugin, with minor code improvements. - - Args: - context: The context to retrieve required elements (layers, symbol tables) from - vmlinux_module_name: The name of the kernel module on which to operate - known_module_addresses: Set with known module addresses - modules_memory_boundaries: Minimum and maximum address boundaries for module allocation. - - Yields: - module objects - """ - vmlinux = context.modules[vmlinux_module_name] - vmlinux_layer = context.layers[vmlinux.layer_name] - - check_nums = ( - 3000, - 2800, - 2700, - 2500, - 2300, - 2100, - 2000, - 1500, - 1300, - 1200, - 1024, - 512, - 256, - 128, - 96, - 64, - 48, - 32, - 24, - ) - modules_addr_min, modules_addr_max = modules_memory_boundaries - modules_addr_min = modules_addr_min & ~0xFFF - modules_addr_max = (modules_addr_max & ~0xFFF) + vmlinux_layer.page_size - - check_bufs = [] - replace_bufs = [] - minus_size = vmlinux.get_type("pointer").size - null_pointer_bytes = b"\x00" * minus_size - for num in check_nums: - check_bufs.append(b"\x00" * num) - replace_bufs.append((b"\xff" * (num - minus_size)) + null_pointer_bytes) - - all_ffs = b"\xff" * 4096 - scan_list = [] - for page_addr in range( - modules_addr_min, modules_addr_max, vmlinux_layer.page_size - ): - content_fixed = all_ffs - with contextlib.suppress( - exceptions.InvalidAddressException, - exceptions.PagedInvalidAddressException, - ): - content = vmlinux_layer.read(page_addr, vmlinux_layer.page_size) - - all_nulls = all(x == 0 for x in content) - if content and not all_nulls: - content_fixed = content - for check_bytes, replace_bytes in zip(check_bufs, replace_bufs): - content_fixed = content_fixed.replace( - check_bytes, replace_bytes - ) - - scan_list.append(content_fixed) - - scan_buf = b"".join(scan_list) - del scan_list - - module_state_values_bytes = cls._get_module_state_values_bytes( - context, vmlinux_module_name - ) - values_bytes_pattern = b"|".join(module_state_values_bytes) - # f'strings cannot be combined with bytes literals - for cur_addr in re.finditer(b"(?=(%s))" % values_bytes_pattern, scan_buf): - module_addr = modules_addr_min + cur_addr.start() - - if module_addr in known_module_addresses: - continue - - module = vmlinux.object("module", offset=module_addr, absolute=True) - if module and module.is_valid(): - yield module - @classmethod def _get_module_address_alignment( cls, context: interfaces.context.ContextInterface, vmlinux_module_name: str, ) -> int: - """Obtain the module memory address alignment. This is only used with the fast scan method. + """Obtain the module memory address alignment. struct module is aligned to the L1 cache line, which is typically 64 bytes for most common i386/AMD64/ARM64 configurations. In some cases, it can be 128 bytes, but this @@ -231,8 +104,24 @@ def _get_module_address_alignment( # essential for retrieving type metadata in the future. return 64 + @staticmethod + def _validate_alignment_patterns( + addresses: Iterable[int], + address_alignment: int, + ) -> bool: + """Check if the memory addresses meet our alignments patterns + + Args: + addresses: Iterable with the address values + address_alignment: Number of bytes for alignment validation + + Returns: + True if all the addresses meet the alignment + """ + return all(addr % address_alignment == 0 for addr in addresses) + @classmethod - def _get_hidden_modules_fast( + def get_hidden_modules( cls, context: interfaces.context.ContextInterface, vmlinux_module_name: str, @@ -268,6 +157,14 @@ def _get_hidden_modules_fast( module_address_alignment = cls._get_module_address_alignment( context, vmlinux_module_name ) + if not cls._validate_alignment_patterns( + known_module_addresses, module_address_alignment + ): + vollog.warning( + f"Module addresses aren't aligned to {module_address_alignment} bytes. " + "Switching to 1 byte aligment scan method." + ) + module_address_alignment = 1 mkobj_offset = vmlinux.get_type("module").relative_child_offset("mkobj") mod_offset = vmlinux.get_type("module_kobject").relative_child_offset("mod") @@ -302,66 +199,6 @@ def _get_hidden_modules_fast( if module and module.is_valid(): yield module - @staticmethod - def _validate_alignment_patterns( - addresses: Iterable[int], - address_alignment: int, - ) -> bool: - """Check if the memory addresses meet our alignments patterns - - Args: - addresses: Iterable with the address values - address_alignment: Number of bytes for alignment validation - - Returns: - True if all the addresses meet the alignment - """ - return all(addr % address_alignment == 0 for addr in addresses) - - @classmethod - def get_hidden_modules( - cls, - context: interfaces.context.ContextInterface, - vmlinux_module_name: str, - known_module_addresses: Set[int], - modules_memory_boundaries: Tuple, - fast_method: bool = False, - ) -> Iterable[interfaces.objects.ObjectInterface]: - """Enumerate hidden modules - - Args: - context: The context to retrieve required elements (layers, symbol tables) from - vmlinux_module_name: The name of the kernel module on which to operate - known_module_addresses: Set with known module addresses - modules_memory_boundaries: Minimum and maximum address boundaries for module allocation. - fast_method: If True, it uses the fast method. Otherwise, it uses the traditional one. - Yields: - module objects - """ - if fast_method: - module_address_alignment = cls._get_module_address_alignment( - context, vmlinux_module_name - ) - if cls._validate_alignment_patterns( - known_module_addresses, module_address_alignment - ): - scan_method = cls._get_hidden_modules_fast - else: - vollog.warning( - f"Module addresses aren't aligned to {module_address_alignment} bytes. " - "Switching to the traditional scan method." - ) - scan_method = cls._get_hidden_modules_vol2 - else: - scan_method = cls._get_hidden_modules_vol2 - - yield from scan_method( - context, - vmlinux_module_name, - known_module_addresses, - modules_memory_boundaries, - ) - @classmethod def get_lsmod_module_addresses( cls, @@ -399,7 +236,6 @@ def _generator(self): vmlinux_module_name, known_module_addresses, modules_memory_boundaries, - fast_method=self.config.get("fast"), ): module_addr = module.vol.offset module_name = module.get_name() or renderers.NotAvailableValue() From 0ddd9210a8e95a9172a36f4939be68fff6edd8ed Mon Sep 17 00:00:00 2001 From: Gustavo Moreira Date: Wed, 16 Oct 2024 15:08:59 +1100 Subject: [PATCH 10/17] linux: hidden_modules: Remove unused module imports --- volatility3/framework/plugins/linux/hidden_modules.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/volatility3/framework/plugins/linux/hidden_modules.py b/volatility3/framework/plugins/linux/hidden_modules.py index 69b4492610..679f2b00a3 100644 --- a/volatility3/framework/plugins/linux/hidden_modules.py +++ b/volatility3/framework/plugins/linux/hidden_modules.py @@ -1,9 +1,7 @@ # This file is Copyright 2024 Volatility Foundation and licensed under the Volatility Software License 1.0 # which is available at https://www.volatilityfoundation.org/license/vsl-v1.0 # -import re import logging -import contextlib from typing import List, Set, Tuple, Iterable from volatility3.framework import renderers, interfaces, exceptions, objects from volatility3.framework.constants.architectures import LINUX_ARCHS From 526007f948b4b1fcefacb84e344d0b6ff49e945b Mon Sep 17 00:00:00 2001 From: Gustavo Moreira Date: Tue, 29 Oct 2024 16:41:47 +1100 Subject: [PATCH 11/17] Linux: hidden_modules: Use child_template --- volatility3/framework/plugins/linux/hidden_modules.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/volatility3/framework/plugins/linux/hidden_modules.py b/volatility3/framework/plugins/linux/hidden_modules.py index 679f2b00a3..09b8851ec3 100644 --- a/volatility3/framework/plugins/linux/hidden_modules.py +++ b/volatility3/framework/plugins/linux/hidden_modules.py @@ -167,7 +167,7 @@ def get_hidden_modules( mkobj_offset = vmlinux.get_type("module").relative_child_offset("mkobj") mod_offset = vmlinux.get_type("module_kobject").relative_child_offset("mod") offset_to_mkobj_mod = mkobj_offset + mod_offset - mod_member_template = vmlinux.get_type("module_kobject").vol.members["mod"][1] + mod_member_template = vmlinux.get_type("module_kobject").child_template("mod") mod_size = mod_member_template.size mod_member_data_format = mod_member_template.data_format From cbe071ff3ef90c4c2ae78bc953390bd69deb7c02 Mon Sep 17 00:00:00 2001 From: Gustavo Moreira Date: Tue, 29 Oct 2024 17:19:30 +1100 Subject: [PATCH 12/17] Linux: hidden_modules: Import the whole architectures module --- volatility3/framework/plugins/linux/hidden_modules.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/volatility3/framework/plugins/linux/hidden_modules.py b/volatility3/framework/plugins/linux/hidden_modules.py index 09b8851ec3..b7513a952c 100644 --- a/volatility3/framework/plugins/linux/hidden_modules.py +++ b/volatility3/framework/plugins/linux/hidden_modules.py @@ -4,7 +4,7 @@ import logging from typing import List, Set, Tuple, Iterable from volatility3.framework import renderers, interfaces, exceptions, objects -from volatility3.framework.constants.architectures import LINUX_ARCHS +from volatility3.framework.constants import architectures from volatility3.framework.renderers import format_hints from volatility3.framework.configuration import requirements from volatility3.plugins.linux import lsmod @@ -25,7 +25,7 @@ def get_requirements(cls) -> List[interfaces.configuration.RequirementInterface] requirements.ModuleRequirement( name="kernel", description="Linux kernel", - architectures=LINUX_ARCHS, + architectures=architectures.LINUX_ARCHS, ), requirements.PluginRequirement( name="lsmod", plugin=lsmod.Lsmod, version=(2, 0, 0) From dfd8a1f19565ddfdd9521206bf4fb9773ba52578 Mon Sep 17 00:00:00 2001 From: Gustavo Moreira Date: Tue, 29 Oct 2024 17:22:48 +1100 Subject: [PATCH 13/17] Linux: hidden_modules: Include kernel version and commit details --- volatility3/framework/plugins/linux/hidden_modules.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/volatility3/framework/plugins/linux/hidden_modules.py b/volatility3/framework/plugins/linux/hidden_modules.py index b7513a952c..7fd5f9fad7 100644 --- a/volatility3/framework/plugins/linux/hidden_modules.py +++ b/volatility3/framework/plugins/linux/hidden_modules.py @@ -48,10 +48,12 @@ def get_modules_memory_boundaries( """ vmlinux = context.modules[vmlinux_module_name] if vmlinux.has_symbol("mod_tree"): + # Kernel >= 5.19 58d208de3e8d87dbe196caf0b57cc58c7a3836ca mod_tree = vmlinux.object_from_symbol("mod_tree") modules_addr_min = mod_tree.addr_min modules_addr_max = mod_tree.addr_max elif vmlinux.has_symbol("module_addr_min"): + # 2.6.27 <= kernel < 5.19 3a642e99babe0617febb6f402e1e063479f489db modules_addr_min = vmlinux.object_from_symbol("module_addr_min") modules_addr_max = vmlinux.object_from_symbol("module_addr_max") From 1c6a5480170d38760f53e3d1caa2a6dafcba2f46 Mon Sep 17 00:00:00 2001 From: Gustavo Moreira Date: Tue, 29 Oct 2024 17:30:47 +1100 Subject: [PATCH 14/17] Linux: hidden_modules: Simplify symbols type checks --- .../framework/plugins/linux/hidden_modules.py | 17 +++-------------- 1 file changed, 3 insertions(+), 14 deletions(-) diff --git a/volatility3/framework/plugins/linux/hidden_modules.py b/volatility3/framework/plugins/linux/hidden_modules.py index 7fd5f9fad7..7a01341c15 100644 --- a/volatility3/framework/plugins/linux/hidden_modules.py +++ b/volatility3/framework/plugins/linux/hidden_modules.py @@ -58,21 +58,10 @@ def get_modules_memory_boundaries( modules_addr_max = vmlinux.object_from_symbol("module_addr_max") if isinstance(modules_addr_min, objects.Void): - # Crap ISF! Here's my best-effort workaround - vollog.warning( - "Your ISF symbols are missing type information. You may need to update " - "the ISF using the latest version of dwarf2json" + raise exceptions.VolatilityException( + "Your ISF symbols lack type information. You may need to update the" + "ISF using the latest version of dwarf2json" ) - # See issue #1041. In the Linux kernel these are "unsigned long" - for type_name in ("long unsigned int", "unsigned long"): - if vmlinux.has_type(type_name): - modules_addr_min = modules_addr_min.cast(type_name) - modules_addr_max = modules_addr_max.cast(type_name) - break - else: - raise exceptions.VolatilityException( - "Bad ISF! Please update the ISF using the latest version of dwarf2json" - ) else: raise exceptions.VolatilityException( "Cannot find the module memory allocation area. Unsupported kernel" From 8960bdab086bef47121c16ba92a630dd70a18a26 Mon Sep 17 00:00:00 2001 From: Gustavo Moreira Date: Tue, 29 Oct 2024 17:33:06 +1100 Subject: [PATCH 15/17] Linux: hidden_modules: Add a symbol table check for a recent dwarf2json version --- volatility3/framework/plugins/linux/hidden_modules.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/volatility3/framework/plugins/linux/hidden_modules.py b/volatility3/framework/plugins/linux/hidden_modules.py index 7a01341c15..fd4b289430 100644 --- a/volatility3/framework/plugins/linux/hidden_modules.py +++ b/volatility3/framework/plugins/linux/hidden_modules.py @@ -232,6 +232,13 @@ def _generator(self): yield (0, fields) def run(self): + if self.context.symbol_space.verify_table_versions( + "dwarf2json", lambda version, _: (not version) or version < (0, 8, 0) + ): + raise exceptions.SymbolSpaceError( + "Invalid symbol table, please ensure the ISF table produced by dwarf2json was created with version 0.8.0 or later" + ) + headers = [ ("Address", format_hints.Hex), ("Name", str), From 4b76b699ad97f93fd9cf2949e6ab49c526784f3c Mon Sep 17 00:00:00 2001 From: Gustavo Moreira Date: Wed, 30 Oct 2024 09:56:05 +1100 Subject: [PATCH 16/17] Linux: hidden_modules: Add docstrings and comments to enhance the documentation of the module verification process. Move the hardcoded values to the linux constants file. --- .../framework/constants/linux/__init__.py | 8 ++++ .../symbols/linux/extensions/__init__.py | 37 ++++++++++++++----- 2 files changed, 36 insertions(+), 9 deletions(-) diff --git a/volatility3/framework/constants/linux/__init__.py b/volatility3/framework/constants/linux/__init__.py index 0567b8574c..7c485d3c39 100644 --- a/volatility3/framework/constants/linux/__init__.py +++ b/volatility3/framework/constants/linux/__init__.py @@ -339,3 +339,11 @@ class PT_FLAGS(Flag): def flags(self) -> str: """Returns the ptrace flags string""" return str(self).replace(self.__class__.__name__ + ".", "") + + +# Valid sizes for modules. Note that the Linux kernel does not define these values; they +# are based on empirical observations of typical memory allocations for kernel modules. +# We use this to verify that the found module falls within reasonable limits. +MODULE_MAXIMUM_CORE_SIZE = 20000000 +MODULE_MAXIMUM_CORE_TEXT_SIZE = 20000000 +MODULE_MINIMUM_SIZE = 4096 diff --git a/volatility3/framework/symbols/linux/extensions/__init__.py b/volatility3/framework/symbols/linux/extensions/__init__.py index cc43df8f51..fedb3d96fc 100644 --- a/volatility3/framework/symbols/linux/extensions/__init__.py +++ b/volatility3/framework/symbols/linux/extensions/__init__.py @@ -13,12 +13,24 @@ from volatility3.framework import constants, exceptions, objects, interfaces, symbols from volatility3.framework.renderers import conversion -from volatility3.framework.constants.linux import SOCK_TYPES, SOCK_FAMILY -from volatility3.framework.constants.linux import IP_PROTOCOLS, IPV6_PROTOCOLS -from volatility3.framework.constants.linux import TCP_STATES, NETLINK_PROTOCOLS -from volatility3.framework.constants.linux import ETH_PROTOCOLS, BLUETOOTH_STATES -from volatility3.framework.constants.linux import BLUETOOTH_PROTOCOLS, SOCKET_STATES -from volatility3.framework.constants.linux import CAPABILITIES, PT_FLAGS +from volatility3.framework.constants.linux import ( + SOCK_TYPES, + SOCK_FAMILY, + IP_PROTOCOLS, + IPV6_PROTOCOLS, + TCP_STATES, + NETLINK_PROTOCOLS, + ETH_PROTOCOLS, + BLUETOOTH_STATES, + BLUETOOTH_PROTOCOLS, + SOCKET_STATES, + CAPABILITIES, + PT_FLAGS, + MODULE_MAXIMUM_CORE_SIZE, + MODULE_MAXIMUM_CORE_TEXT_SIZE, + MODULE_MINIMUM_SIZE, +) + from volatility3.framework.layers import linear from volatility3.framework.objects import utility from volatility3.framework.symbols import generic, linux, intermed @@ -36,16 +48,23 @@ def __init__(self, *args, **kwargs): self._mod_mem_type = None # Initialize _mod_mem_type to None for memoization def is_valid(self): + """Determine whether it is a valid module object by verifying the self-referential + in module_kobject. This also confirms that the module is actively allocated and + not a remnant of freed memory or a failed module load attempt by verifying the + module memory section sizes. + """ layer = self._context.layers[self.vol.layer_name] # Make sure the entire module content is readable if not layer.is_valid(self.vol.offset, self.vol.size): return False core_size = self.get_core_size() + core_text_size = self.get_core_text_size() + init_size = self.get_init_size() if not ( - 1 <= core_size <= 20000000 - and core_size + self.get_init_size() >= 4096 - and 1 <= self.get_core_text_size() <= 20000000 + 0 < core_text_size <= MODULE_MAXIMUM_CORE_TEXT_SIZE + and 0 < core_size <= MODULE_MAXIMUM_CORE_SIZE + and core_size + init_size >= MODULE_MINIMUM_SIZE ): return False From 722ccd527e351f90d2a34dd31d29c39bf65be513 Mon Sep 17 00:00:00 2001 From: Gustavo Moreira Date: Wed, 30 Oct 2024 12:56:09 +1100 Subject: [PATCH 17/17] Linux: Extensions: Clean up the Linux constants imports in the object extension file --- .../symbols/linux/extensions/__init__.py | 83 ++++++++----------- 1 file changed, 35 insertions(+), 48 deletions(-) diff --git a/volatility3/framework/symbols/linux/extensions/__init__.py b/volatility3/framework/symbols/linux/extensions/__init__.py index fedb3d96fc..1f3e0504c2 100644 --- a/volatility3/framework/symbols/linux/extensions/__init__.py +++ b/volatility3/framework/symbols/linux/extensions/__init__.py @@ -13,24 +13,7 @@ from volatility3.framework import constants, exceptions, objects, interfaces, symbols from volatility3.framework.renderers import conversion -from volatility3.framework.constants.linux import ( - SOCK_TYPES, - SOCK_FAMILY, - IP_PROTOCOLS, - IPV6_PROTOCOLS, - TCP_STATES, - NETLINK_PROTOCOLS, - ETH_PROTOCOLS, - BLUETOOTH_STATES, - BLUETOOTH_PROTOCOLS, - SOCKET_STATES, - CAPABILITIES, - PT_FLAGS, - MODULE_MAXIMUM_CORE_SIZE, - MODULE_MAXIMUM_CORE_TEXT_SIZE, - MODULE_MINIMUM_SIZE, -) - +from volatility3.framework.constants import linux as linux_constants from volatility3.framework.layers import linear from volatility3.framework.objects import utility from volatility3.framework.symbols import generic, linux, intermed @@ -62,9 +45,9 @@ def is_valid(self): core_text_size = self.get_core_text_size() init_size = self.get_init_size() if not ( - 0 < core_text_size <= MODULE_MAXIMUM_CORE_TEXT_SIZE - and 0 < core_size <= MODULE_MAXIMUM_CORE_SIZE - and core_size + init_size >= MODULE_MINIMUM_SIZE + 0 < core_text_size <= linux_constants.MODULE_MAXIMUM_CORE_TEXT_SIZE + and 0 < core_size <= linux_constants.MODULE_MAXIMUM_CORE_SIZE + and core_size + init_size >= linux_constants.MODULE_MINIMUM_SIZE ): return False @@ -383,7 +366,7 @@ def is_kernel_thread(self) -> bool: Returns: bool: True, if this task is a kernel thread. Otherwise, False. """ - return (self.flags & constants.linux.PF_KTHREAD) != 0 + return (self.flags & linux_constants.PF_KTHREAD) != 0 @property def is_thread_group_leader(self) -> bool: @@ -460,7 +443,11 @@ def get_ptrace_tracee_tids(self) -> List[int]: def get_ptrace_tracee_flags(self) -> Optional[str]: """Returns a string with the ptrace flags""" - return PT_FLAGS(self.ptrace).flags if self.is_being_ptraced else None + return ( + linux_constants.PT_FLAGS(self.ptrace).flags + if self.is_being_ptraced + else None + ) class fs_struct(objects.StructType): @@ -1567,18 +1554,18 @@ def get_inode(self): def get_state(self): socket_state_idx = self.state - if 0 <= socket_state_idx < len(SOCKET_STATES): - return SOCKET_STATES[socket_state_idx] + if 0 <= socket_state_idx < len(linux_constants.SOCKET_STATES): + return linux_constants.SOCKET_STATES[socket_state_idx] class sock(objects.StructType): def get_family(self): family_idx = self.__sk_common.skc_family - if 0 <= family_idx < len(SOCK_FAMILY): - return SOCK_FAMILY[family_idx] + if 0 <= family_idx < len(linux_constants.SOCK_FAMILY): + return linux_constants.SOCK_FAMILY[family_idx] def get_type(self): - return SOCK_TYPES.get(self.sk_type, "") + return linux_constants.SOCK_TYPES.get(self.sk_type, "") def get_inode(self): if not self.sk_socket: @@ -1612,8 +1599,8 @@ def get_state(self): # Unix socket states reuse (a subset) of the inet_sock states contants if self.sk.get_type() == "STREAM": state_idx = self.sk.__sk_common.skc_state - if 0 <= state_idx < len(TCP_STATES): - return TCP_STATES[state_idx] + if 0 <= state_idx < len(linux_constants.TCP_STATES): + return linux_constants.TCP_STATES[state_idx] else: # Return the generic socket state return self.sk.sk_socket.get_state() @@ -1625,15 +1612,15 @@ def get_inode(self): class inet_sock(objects.StructType): def get_family(self): family_idx = self.sk.__sk_common.skc_family - if 0 <= family_idx < len(SOCK_FAMILY): - return SOCK_FAMILY[family_idx] + if 0 <= family_idx < len(linux_constants.SOCK_FAMILY): + return linux_constants.SOCK_FAMILY[family_idx] def get_protocol(self): # If INET6 family and a proto is defined, we use that specific IPv6 protocol. # Otherwise, we use the standard IP protocol. - protocol = IP_PROTOCOLS.get(self.sk.sk_protocol) + protocol = linux_constants.IP_PROTOCOLS.get(self.sk.sk_protocol) if self.get_family() == "AF_INET6": - protocol = IPV6_PROTOCOLS.get(self.sk.sk_protocol, protocol) + protocol = linux_constants.IPV6_PROTOCOLS.get(self.sk.sk_protocol, protocol) return protocol def get_state(self): @@ -1641,8 +1628,8 @@ def get_state(self): if self.sk.get_type() == "STREAM": state_idx = self.sk.__sk_common.skc_state - if 0 <= state_idx < len(TCP_STATES): - return TCP_STATES[state_idx] + if 0 <= state_idx < len(linux_constants.TCP_STATES): + return linux_constants.TCP_STATES[state_idx] else: # Return the generic socket state return self.sk.sk_socket.get_state() @@ -1725,8 +1712,8 @@ def get_dst_addr(self): class netlink_sock(objects.StructType): def get_protocol(self): protocol_idx = self.sk.sk_protocol - if 0 <= protocol_idx < len(NETLINK_PROTOCOLS): - return NETLINK_PROTOCOLS[protocol_idx] + if 0 <= protocol_idx < len(linux_constants.NETLINK_PROTOCOLS): + return linux_constants.NETLINK_PROTOCOLS[protocol_idx] def get_state(self): # Return the generic socket state @@ -1768,8 +1755,8 @@ def get_protocol(self): eth_proto = socket_module.htons(self.num) if eth_proto == 0: return None - elif eth_proto in ETH_PROTOCOLS: - return ETH_PROTOCOLS[eth_proto] + elif eth_proto in linux_constants.ETH_PROTOCOLS: + return linux_constants.ETH_PROTOCOLS[eth_proto] else: return f"0x{eth_proto:x}" @@ -1781,13 +1768,13 @@ def get_state(self): class bt_sock(objects.StructType): def get_protocol(self): type_idx = self.sk.sk_protocol - if 0 <= type_idx < len(BLUETOOTH_PROTOCOLS): - return BLUETOOTH_PROTOCOLS[type_idx] + if 0 <= type_idx < len(linux_constants.BLUETOOTH_PROTOCOLS): + return linux_constants.BLUETOOTH_PROTOCOLS[type_idx] def get_state(self): state_idx = self.sk.__sk_common.skc_state - if 0 <= state_idx < len(BLUETOOTH_STATES): - return BLUETOOTH_STATES[state_idx] + if 0 <= state_idx < len(linux_constants.BLUETOOTH_STATES): + return linux_constants.BLUETOOTH_STATES[state_idx] class xdp_sock(objects.StructType): @@ -1905,7 +1892,7 @@ def get_last_cap_value(cls) -> int: Returns: int: The latest capability ID supported by the framework. """ - return len(CAPABILITIES) - 1 + return len(linux_constants.CAPABILITIES) - 1 def get_kernel_cap_full(self) -> int: """Return the maximum value allowed for this kernel for a capability @@ -1934,7 +1921,7 @@ def capabilities_to_string(cls, capabilities_bitfield: int) -> List[str]: """ capabilities = [] - for bit, name in enumerate(CAPABILITIES): + for bit, name in enumerate(linux_constants.CAPABILITIES): if capabilities_bitfield & (1 << bit) != 0: capabilities.append(name) @@ -1995,10 +1982,10 @@ def has_capability(self, capability: str) -> bool: Returns: bool: "True" if the given capability is enabled. """ - if capability not in CAPABILITIES: + if capability not in linux_constants.CAPABILITIES: raise AttributeError(f"Unknown capability with name '{capability}'") - cap_value = 1 << CAPABILITIES.index(capability) + cap_value = 1 << linux_constants.CAPABILITIES.index(capability) return cap_value & self.get_capabilities() != 0