Skip to content
30 changes: 23 additions & 7 deletions volatility3/framework/automagic/symbol_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import urllib.parse
import urllib.request
from abc import abstractmethod
from typing import Dict, Generator, Iterable, List, Optional, Tuple
from typing import Dict, Generator, Iterable, List, Optional, Tuple, Union

from volatility3 import framework, schemas
from volatility3.framework import constants, interfaces
Expand Down Expand Up @@ -251,9 +251,13 @@ def get_local_locations(self) -> Generator[str, None, None]:
for row in result:
yield row["location"]

def is_url_local(self, url: str) -> bool:
"""Determines whether an url is local or not"""
def is_url_local(
self, url: str, prefix: Optional[Union[str, Tuple[str, ...]]] = None
) -> bool:
"""Determines whether an url is local or not (and whether it begins with a specific prefix if specified)"""
parsed = urllib.parse.urlparse(url)
if prefix and not parsed.path.startswith(prefix):
return False
return parsed.scheme in ["file", "jar"]

def get_identifier(self, location: str) -> Optional[bytes]:
Expand Down Expand Up @@ -321,9 +325,16 @@ def dummy_progress(*args, **kargs) -> None:

# Missing entries
if missing_locations:
non_existant_missing: list[str] = []
for missing_location in missing_locations:
parsed_url = urllib.parse(missing_location)
if self.is_url_local(missing_location) and parsed_url.startswith(
tuple(constants.SYMBOL_BASEPATHS)
): # Only remove entries that are within the specified basepath
non_existant_missing.append(missing_location)
self._database.cursor().execute(
f"DELETE FROM cache WHERE location IN ({','.join(['?'] * len(missing_locations))})",
[x for x in missing_locations],
f"DELETE FROM cache WHERE location IN ({','.join(['?'] * len(non_existant_missing))})",
[x for x in non_existant_missing],
)
self._database.commit()

Expand Down Expand Up @@ -461,9 +472,9 @@ def dummy_progress(*args, **kargs) -> None:
def get_identifier_dictionary(
self, operating_system: Optional[str] = None, local_only: bool = False
) -> Dict[bytes, str]:
output = {}
output: dict[bytes, str] = {}
additions = []
statement = "SELECT location, identifier FROM cache"
statement = "SELECT location, identifier, local FROM cache"
if local_only:
additions.append("local = 1")
if operating_system:
Expand All @@ -472,6 +483,11 @@ def get_identifier_dictionary(
statement += f" WHERE {' AND '.join(additions)}"
results = self._database.cursor().execute(statement)
for row in results:
if row["local"] and not self.is_url_local(
row["location"], tuple(constants.SYMBOL_BASEPATHS)
):
# Skip over local entries that *aren't* in our current specified symbol basepaths
continue
if row["identifier"] in output and row["identifier"] and row["location"]:
vollog.debug(
f"Duplicate entry for identifier {row['identifier']}: {row['location']} and {output[row['identifier']]}"
Expand Down
Loading