diff --git a/main.py b/main.py index 42629d0..3ec2ec5 100755 --- a/main.py +++ b/main.py @@ -115,8 +115,8 @@ def interpret(cmd, arguments, parts: dict[int, 'Partition'], shorthands, outdir) part = check_valid_part(arguments[0], parts, shorthands) if part is not None: print('-'*10) - print(utils.tree_folder(part.root)) - print(utils.tree_folder(part.lost)) + print(utils.tree_folder(part.root, sector_size=part.sector_size)) + print(utils.tree_folder(part.lost, sector_size=part.sector_size)) print('-'*10) elif cmd == 'bodyfile': if len(arguments) != 2: diff --git a/recuperabit/fs/constants.py b/recuperabit/fs/constants.py index 69c928d..eb6ef12 100644 --- a/recuperabit/fs/constants.py +++ b/recuperabit/fs/constants.py @@ -19,5 +19,4 @@ # along with RecuperaBit. If not, see . -sector_size: int = 512 max_sectors: int = 256 # Maximum block size for recovery diff --git a/recuperabit/fs/core_types.py b/recuperabit/fs/core_types.py index eb94d2a..0cfba06 100644 --- a/recuperabit/fs/core_types.py +++ b/recuperabit/fs/core_types.py @@ -28,8 +28,6 @@ from typing import Optional, Dict, Set, List, Tuple, Union, Any, Iterator from datetime import datetime -from .constants import sector_size - from ..utils import readable_bytes @@ -140,6 +138,7 @@ def __init__(self, fs_type: str, root_id: Union[int, str], scanner: 'DiskScanner self.files: Dict[Union[int, str], File] = {} self.recoverable: bool = False self.scanner: 'DiskScanner' = scanner + self.sector_size: int = 512 # Default sector size, can be overridden def add_file(self, node: File) -> None: """Insert a new file in the partition.""" @@ -209,14 +208,14 @@ def additional_repr(self) -> List[Tuple[str, Any]]: def __repr__(self) -> str: size = ( - readable_bytes(self.size * sector_size) + readable_bytes(self.size * self.sector_size) if self.size is not None else '??? b' ) data = [ ('Offset', self.offset), ( 'Offset (b)', - self.offset * sector_size + self.offset * self.sector_size if self.offset is not None else None ), ] @@ -249,10 +248,6 @@ class DiskScanner(object): def __init__(self, pointer: Any) -> None: self.image: Any = pointer - def get_image(self) -> Any: - """Return the image reference.""" - return self.image - @staticmethod def get_image(scanner: 'DiskScanner') -> Any: """Static method to get image from scanner instance.""" diff --git a/recuperabit/fs/ntfs.py b/recuperabit/fs/ntfs.py index 20086fe..7a65366 100644 --- a/recuperabit/fs/ntfs.py +++ b/recuperabit/fs/ntfs.py @@ -26,7 +26,7 @@ from collections import Counter from typing import Any, Dict, List, Optional, Tuple, Union, Iterator, Set -from .constants import max_sectors, sector_size +from .constants import max_sectors from .core_types import DiskScanner, File, Partition from .ntfs_fmt import (attr_header_fmt, attr_names, attr_nonresident_fmt, attr_resident_fmt, attr_types_fmt, attribute_list_parser, @@ -95,7 +95,7 @@ def parse_mft_attr(attr: bytes) -> Tuple[Dict[str, Any], Optional[str]]: return header, name -def _apply_fixup_values(header: Dict[str, Any], entry: bytearray) -> None: +def _apply_fixup_values(header: Dict[str, Any], entry: bytearray, sector_size: int) -> None: """Apply the fixup values to FILE and INDX records.""" offset = header['off_fixup'] for i in range(1, header['n_entries']): @@ -134,7 +134,7 @@ def _attributes_reader(entry: bytes, offset: int) -> Dict[str, Any]: return attributes -def parse_file_record(entry: bytes) -> Dict[str, Any]: +def parse_file_record(entry: bytes, sector_size: int) -> Dict[str, Any]: """Parse the contents of a FILE record (MFT entry).""" header = unpack(entry, entry_fmt) if (header['size_alloc'] is None or @@ -147,7 +147,7 @@ def parse_file_record(entry: bytes) -> Dict[str, Any]: if header['off_fixup'] < 48: header['record_n'] = None - _apply_fixup_values(header, entry) + _apply_fixup_values(header, entry, sector_size) attributes = _attributes_reader(entry, header['off_first']) header['valid'] = True @@ -155,11 +155,11 @@ def parse_file_record(entry: bytes) -> Dict[str, Any]: return header -def parse_indx_record(entry: bytes) -> Dict[str, Any]: +def parse_indx_record(entry: bytes, sector_size: int) -> Dict[str, Any]: """Parse the contents of a INDX record (directory index).""" header = unpack(entry, indx_fmt) - _apply_fixup_values(header, entry) + _apply_fixup_values(header, entry, sector_size) node_data = unpack(entry[24:], indx_header_fmt) node_data['off_start_list'] += 24 @@ -214,7 +214,7 @@ def _integrate_attribute_list(parsed: Dict[str, Any], part: 'NTFSPartition', ima size = attr['real_size'] for entry in attr['runlist']: clusters_pos += entry['offset'] - length = min(entry['length'] * spc * sector_size, size) + length = min(entry['length'] * spc * part.sector_size, size) size -= length real_pos = clusters_pos * spc + part.offset dump = sectors(image, real_pos, length, 1) @@ -247,7 +247,7 @@ def _integrate_attribute_list(parsed: Dict[str, Any], part: 'NTFSPartition', ima for index in entries_by_type[num]: real_pos = mft_pos + index * FILE_size dump = sectors(image, real_pos, FILE_size) - child_parsed = parse_file_record(dump) + child_parsed = parse_file_record(dump, part.sector_size) if 'attributes' not in child_parsed: continue # Update the main entry (parsed) @@ -356,12 +356,12 @@ def content_iterator(self, partition: 'NTFSPartition', image: Any, datas: List[D break for entry in attr['runlist']: - length = min(entry['length'] * spc * sector_size, size) + length = min(entry['length'] * spc * partition.sector_size, size) size -= length # Sparse runlist if entry['offset'] is None: while length > 0: - amount = min(max_sectors*sector_size, length) + amount = min(max_sectors*partition.sector_size, length) length -= amount yield b'\x00' * amount continue @@ -371,8 +371,8 @@ def content_iterator(self, partition: 'NTFSPartition', image: Any, datas: List[D # Avoid to fill memory with huge blocks offset = 0 while length > 0: - amount = min(max_sectors*sector_size, length) - position = real_pos*sector_size + offset + amount = min(max_sectors*partition.sector_size, length) + position = real_pos*partition.sector_size + offset partial = self._padded_bytes(image, position, amount) length -= amount offset += amount @@ -389,7 +389,7 @@ def get_content(self, partition: 'NTFSPartition') -> Optional[Union[bytes, Itera image = DiskScanner.get_image(partition.scanner) dump = sectors(image, File.get_offset(self), FILE_size) - parsed = parse_file_record(dump) + parsed = parse_file_record(dump, partition.sector_size) if not parsed['valid'] or 'attributes' not in parsed: logging.error(u'Invalid MFT entry for {}'.format(self)) @@ -623,7 +623,7 @@ def add_from_indx_allocation(self, parsed: Dict[str, Any], part: NTFSPartition) img = DiskScanner.get_image(self) for position in read_again: dump = sectors(img, position, INDX_size) - entries = parse_indx_record(dump)['entries'] + entries = parse_indx_record(dump, part.sector_size)['entries'] self.add_indx_entries(entries, part) def add_from_attribute_list(self, parsed: Dict[str, Any], part: NTFSPartition, offset: int) -> None: @@ -656,7 +656,7 @@ def add_from_mft_mirror(self, part: NTFSPartition) -> None: if node is None or node.is_ghost: position = mirrpos + i * FILE_size dump = sectors(img, position, FILE_size) - parsed = parse_file_record(dump) + parsed = parse_file_record(dump, part.sector_size) if parsed['valid'] and '$FILE_NAME' in parsed['attributes']: node = NTFSFile(parsed, position) part.add_file(node) @@ -702,7 +702,7 @@ def get_partitions(self) -> Dict[int, NTFSPartition]: logging.info('Parsing MFT entries') for position in self.found_file: dump = sectors(img, position, FILE_size) - parsed = parse_file_record(dump) + parsed = parse_file_record(dump, 512) # Default sector size during discovery attrs = parsed.get('attributes', {}) if not parsed['valid'] or '$FILE_NAME' not in attrs: continue @@ -737,7 +737,7 @@ def get_partitions(self) -> Dict[int, NTFSPartition]: logging.info('Parsing INDX records') for position in self.found_indx: dump = sectors(img, position, INDX_size) - parsed = parse_indx_record(dump) + parsed = parse_indx_record(dump, 512) # Default sector size during discovery if not parsed['valid']: continue @@ -793,7 +793,7 @@ def get_partitions(self) -> Dict[int, NTFSPartition]: else: # Infer MFT mirror position dump = sectors(img, entry.offset, FILE_size) - mirror = parse_file_record(dump) + mirror = parse_file_record(dump, part.sector_size) if (mirror['valid'] and 'attributes' in mirror and '$DATA' in mirror['attributes']): datas = mirror['attributes']['$DATA'] @@ -856,7 +856,7 @@ def get_partitions(self) -> Dict[int, NTFSPartition]: if entry is None or part.sec_per_clus is None: continue dump = sectors(img, entry.offset, FILE_size) - parsed = parse_file_record(dump) + parsed = parse_file_record(dump, part.sector_size) if not parsed['valid'] or 'attributes' not in parsed: continue diff --git a/recuperabit/utils.py b/recuperabit/utils.py index 6325f2c..bee506c 100644 --- a/recuperabit/utils.py +++ b/recuperabit/utils.py @@ -25,11 +25,11 @@ import string import sys import time -from typing import TYPE_CHECKING, Any, Iterable, Optional, List, Dict, Tuple, Union, Callable +from typing import TYPE_CHECKING, Any, Iterable, Optional, List, Dict, Sequence, Tuple, Union, Callable import unicodedata import io -from .fs.constants import sector_size +from recuperabit.fs.core_types import DiskScanner printer: pprint.PrettyPrinter = pprint.PrettyPrinter(indent=4) all_chars = (chr(i) for i in range(sys.maxunicode)) @@ -43,7 +43,7 @@ from .fs.core_types import File, Partition -def sectors(image: io.BufferedReader, offset: int, size: int, bsize: int = sector_size, fill: bool = True) -> Optional[bytearray]: +def sectors(image: io.BufferedReader, offset: int, size: int, bsize: int = 512, fill: bool = True) -> Optional[bytearray]: """Read from a file descriptor.""" read = True try: @@ -118,7 +118,7 @@ def unpack(data: bytes, fmt: List[Tuple[str, Tuple[Union[str, Callable[[bytes], return result -def feed_all(image: io.BufferedReader, scanners: List[Any], indexes: Iterable[int]) -> List[int]: +def feed_all(image: io.BufferedReader, scanners: Sequence['DiskScanner'], indexes: Iterable[int]) -> List[int]: # Scan the disk image and feed the scanners interesting: List[int] = [] for index in indexes: @@ -160,7 +160,7 @@ def readable_bytes(amount: Optional[int]) -> str: return '%.2f %sB' % (scaled, powers[biggest]) -def _file_tree_repr(node: 'File') -> str: +def _file_tree_repr(node: 'File', sector_size: int) -> str: """Give a nice representation for the tree.""" desc = ( ' [GHOST]' if node.is_ghost else @@ -184,21 +184,23 @@ def _file_tree_repr(node: 'File') -> str: ) -def tree_folder(directory: 'File', padding: int = 0) -> str: +def tree_folder(directory: 'File', padding: int = 0, sector_size: int | None = None) -> str: """Return a tree-like textual representation of a directory.""" + assert sector_size is not None, "sector_size must be provided" + lines: List[str] = [] pad = ' ' * padding lines.append( - pad + _file_tree_repr(directory) + pad + _file_tree_repr(directory, sector_size) ) padding = padding + 2 pad = ' ' * padding for entry in directory.children: if len(entry.children) or entry.is_directory: - lines.append(tree_folder(entry, padding)) + lines.append(tree_folder(entry, padding, sector_size)) else: lines.append( - pad + _file_tree_repr(entry) + pad + _file_tree_repr(entry, sector_size) ) return '\n'.join(lines) @@ -309,7 +311,7 @@ def csv_part(part: 'Partition') -> list[str]: obj.mac['modification'], obj.mac['access'], obj.mac['creation'], obj.size, readable_bytes(obj.size), - (obj.offset * sector_size + (obj.offset * part.sector_size if obj.offset is not None else None), obj.offset, '1' if obj.is_directory else '',