diff --git a/volatility3/framework/automagic/symbol_cache.py b/volatility3/framework/automagic/symbol_cache.py index 327575e969..f3cfb05841 100644 --- a/volatility3/framework/automagic/symbol_cache.py +++ b/volatility3/framework/automagic/symbol_cache.py @@ -11,7 +11,7 @@ import urllib.parse import urllib.request from abc import abstractmethod -from typing import Dict, Generator, Iterable, List, Optional, Tuple +from typing import Dict, Generator, Iterable, List, Optional, Tuple, Union from volatility3 import framework, schemas from volatility3.framework import constants, interfaces @@ -251,9 +251,13 @@ def get_local_locations(self) -> Generator[str, None, None]: for row in result: yield row["location"] - def is_url_local(self, url: str) -> bool: - """Determines whether an url is local or not""" + def is_url_local( + self, url: str, prefix: Optional[Union[str, Tuple[str, ...]]] = None + ) -> bool: + """Determines whether an url is local or not (and whether it begins with a specific prefix if specified)""" parsed = urllib.parse.urlparse(url) + if prefix and not parsed.path.startswith(prefix): + return False return parsed.scheme in ["file", "jar"] def get_identifier(self, location: str) -> Optional[bytes]: @@ -321,9 +325,16 @@ def dummy_progress(*args, **kargs) -> None: # Missing entries if missing_locations: + non_existant_missing: list[str] = [] + for missing_location in missing_locations: + parsed_url = urllib.parse(missing_location) + if self.is_url_local(missing_location) and parsed_url.startswith( + tuple(constants.SYMBOL_BASEPATHS) + ): # Only remove entries that are within the specified basepath + non_existant_missing.append(missing_location) self._database.cursor().execute( - f"DELETE FROM cache WHERE location IN ({','.join(['?'] * len(missing_locations))})", - [x for x in missing_locations], + f"DELETE FROM cache WHERE location IN ({','.join(['?'] * len(non_existant_missing))})", + [x for x in non_existant_missing], ) self._database.commit() @@ -461,9 +472,9 @@ def dummy_progress(*args, **kargs) -> None: def get_identifier_dictionary( self, operating_system: Optional[str] = None, local_only: bool = False ) -> Dict[bytes, str]: - output = {} + output: dict[bytes, str] = {} additions = [] - statement = "SELECT location, identifier FROM cache" + statement = "SELECT location, identifier, local FROM cache" if local_only: additions.append("local = 1") if operating_system: @@ -472,6 +483,11 @@ def get_identifier_dictionary( statement += f" WHERE {' AND '.join(additions)}" results = self._database.cursor().execute(statement) for row in results: + if row["local"] and not self.is_url_local( + row["location"], tuple(constants.SYMBOL_BASEPATHS) + ): + # Skip over local entries that *aren't* in our current specified symbol basepaths + continue if row["identifier"] in output and row["identifier"] and row["location"]: vollog.debug( f"Duplicate entry for identifier {row['identifier']}: {row['location']} and {output[row['identifier']]}"