Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 44 additions & 14 deletions volatility3/framework/automagic/symbol_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,16 @@ def find_location(
results = self._database.cursor().execute(statement, parameters).fetchall()
result = None
for row in results:
result = row["location"]
local_filepath = self._get_local_filepath(row["location"])
if not (
local_filepath is None
or local_filepath.startswith(tuple(constants.SYMBOL_BASEPATHS))
):
vollog.debug(
f"Location {row['location']} found but outside of the registered symbol paths"
)
else:
result = row["location"]
return result

def get_local_locations(self) -> Generator[str, None, None]:
Expand All @@ -249,7 +258,11 @@ def get_local_locations(self) -> Generator[str, None, None]:
.fetchall()
)
for row in result:
yield row["location"]
local_filepath = self._get_local_filepath(row["location"])
if local_filepath and local_filepath.startswith(
tuple(constants.SYMBOL_BASEPATHS)
):
yield row["location"]
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Needs an else with a debug message saying it was found but outside the appropriate path.


def is_url_local(self, url: str) -> bool:
"""Determines whether an url is local or not"""
Expand Down Expand Up @@ -296,6 +309,20 @@ def get_hash(self, location: str) -> Optional[str]:
return row["hash"]
return None

def _get_local_filepath(
self, location: str, local_only: bool = True
) -> Optional[str]:
# See if the file is a local URL type we can handle:
parsed = urllib.parse.urlparse(location)
pathname = location if not local_only else None
if parsed.scheme == "file":
pathname = parsed.path
if parsed.scheme == "jar":
inner_url = urllib.parse.urlparse(parsed.path)
if inner_url.scheme == "file":
pathname = inner_url.path.split("!")[0]
return pathname

def update(self, progress_callback=None):
"""Locates all files under the symbol directories. Updates the cache with additions, modifications and removals.
This also updates remote locations based on a cache timeout.
Expand Down Expand Up @@ -340,15 +367,7 @@ def dummy_progress(*args, **kargs) -> None:
timestamp = stored_timestamp # Default to requiring update

# See if the file is a local URL type we can handle:
parsed = urllib.parse.urlparse(location)
pathname = None
if parsed.scheme == "file":
pathname = urllib.request.url2pathname(parsed.path)
if parsed.scheme == "jar":
inner_url = urllib.parse.urlparse(parsed.path)
if inner_url.scheme == "file":
pathname = inner_url.path.split("!")[0]

pathname = self._get_local_filepath(location)
if pathname and os.path.exists(pathname):
timestamp = datetime.datetime.fromtimestamp(
os.stat(pathname).st_mtime
Expand Down Expand Up @@ -461,7 +480,7 @@ def dummy_progress(*args, **kargs) -> None:
def get_identifier_dictionary(
self, operating_system: Optional[str] = None, local_only: bool = False
) -> Dict[bytes, str]:
output = {}
output: Dict[bytes, str] = {}
additions = []
statement = "SELECT location, identifier FROM cache"
if local_only:
Expand All @@ -476,7 +495,15 @@ def get_identifier_dictionary(
vollog.debug(
f"Duplicate entry for identifier {row['identifier']}: {row['location']} and {output[row['identifier']]}"
)
output[row["identifier"]] = row["location"]
local_filepath = self._get_local_filepath(row["location"])
if local_filepath and not local_filepath.startswith(
tuple(constants.SYMBOL_BASEPATHS)
):
vollog.debug(
f"Location {row['location']} was not in the registered symbol paths and therefore not in the identifier dictionary"
)
else:
output[row["identifier"]] = row["location"]
return output

def get_identifiers(self, operating_system: Optional[str]) -> List[bytes]:
Expand Down Expand Up @@ -533,7 +560,10 @@ def __init__(self, *args, **kwargs):

def __call__(self, context, config_path, configurable, progress_callback=None):
"""Runs the automagic over the configurable."""
self._cache.update(progress_callback)
try:
self._cache.update(progress_callback)
except Exception as excp:
vollog.debug(f"Excption during cache update: {excp}")

@classmethod
def get_requirements(cls) -> List[interfaces.configuration.RequirementInterface]:
Expand Down
Loading