diff --git a/main.py b/main.py
index c3ee8ff..a1f9e1a 100755
--- a/main.py
+++ b/main.py
@@ -30,6 +30,7 @@
import sys
try:
import readline
+ readline # ignore unused import warning
except ImportError:
pass
@@ -37,6 +38,10 @@
# scanners
from recuperabit.fs.ntfs import NTFSScanner
+from typing import TYPE_CHECKING
+if TYPE_CHECKING:
+ from recuperabit.fs.core_types import Partition
+
__author__ = "Andrea Lazzarotto"
__copyright__ = "(c) 2014-2021, Andrea Lazzarotto"
__license__ = "GPLv3"
@@ -97,7 +102,7 @@ def check_valid_part(num, parts, shorthands, rebuild=True):
return None
-def interpret(cmd, arguments, parts, shorthands, outdir):
+def interpret(cmd, arguments, parts: dict[int, Partition], shorthands, outdir):
"""Perform command required by user."""
if cmd == 'help':
print('Available commands:')
@@ -362,7 +367,7 @@ def main():
pickle.dump(interesting, savefile)
# Ask for partitions
- parts = {}
+ parts: dict[int, Partition] = {}
for scanner in scanners:
parts.update(scanner.get_partitions())
diff --git a/recuperabit/fs/constants.py b/recuperabit/fs/constants.py
index 9370a77..69c928d 100644
--- a/recuperabit/fs/constants.py
+++ b/recuperabit/fs/constants.py
@@ -19,5 +19,5 @@
# along with RecuperaBit. If not, see .
-sector_size = 512
-max_sectors = 256 # Maximum block size for recovery
+sector_size: int = 512
+max_sectors: int = 256 # Maximum block size for recovery
diff --git a/recuperabit/fs/core_types.py b/recuperabit/fs/core_types.py
index 87dda78..eb94d2a 100644
--- a/recuperabit/fs/core_types.py
+++ b/recuperabit/fs/core_types.py
@@ -25,6 +25,8 @@
import logging
import os.path
+from typing import Optional, Dict, Set, List, Tuple, Union, Any, Iterator
+from datetime import datetime
from .constants import sector_size
@@ -32,49 +34,49 @@
class File(object):
- """Filesystem-independent representation of a file."""
- def __init__(self, index, name, size, is_directory=False,
- is_deleted=False, is_ghost=False):
- self.index = index
- self.name = name
- self.size = size
- self.is_directory = is_directory
- self.is_deleted = is_deleted
- self.is_ghost = is_ghost
- self.parent = None
- self.mac = {
+ """Filesystem-independent representation of a file. Aka Node."""
+ def __init__(self, index: Union[int, str], name: str, size: Optional[int], is_directory: bool = False,
+ is_deleted: bool = False, is_ghost: bool = False) -> None:
+ self.index: Union[int, str] = index
+ self.name: str = name
+ self.size: Optional[int] = size
+ self.is_directory: bool = is_directory
+ self.is_deleted: bool = is_deleted
+ self.is_ghost: bool = is_ghost
+ self.parent: Optional[Union[int, str]] = None
+ self.mac: Dict[str, Optional[datetime]] = {
'modification': None,
'access': None,
'creation': None
}
- self.children = set()
- self.children_names = set() # Avoid name clashes breaking restore
- self.offset = None # Offset from beginning of disk
+ self.children: Set['File'] = set()
+ self.children_names: Set[str] = set() # Avoid name clashes breaking restore
+ self.offset: Optional[int] = None # Offset from beginning of disk
- def set_parent(self, parent):
+ def set_parent(self, parent: Optional[Union[int, str]]) -> None:
"""Set a pointer to the parent directory."""
self.parent = parent
- def set_mac(self, modification, access, creation):
+ def set_mac(self, modification: Optional[datetime], access: Optional[datetime], creation: Optional[datetime]) -> None:
"""Set the modification, access and creation times."""
self.mac['modification'] = modification
self.mac['access'] = access
self.mac['creation'] = creation
- def get_mac(self):
+ def get_mac(self) -> List[Optional[datetime]]:
"""Get the modification, access and creation times."""
keys = ('modification', 'access', 'creation')
return [self.mac[k] for k in keys]
- def set_offset(self, offset):
+ def set_offset(self, offset: Optional[int]) -> None:
"""Set the offset of the file record with respect to the disk image."""
self.offset = offset
- def get_offset(self):
+ def get_offset(self) -> Optional[int]:
"""Get the offset of the file record with respect to the disk image."""
return self.offset
- def add_child(self, node):
+ def add_child(self, node: 'File') -> None:
"""Add a new child to this directory."""
original_name = node.name
i = 0
@@ -90,7 +92,7 @@ def add_child(self, node):
self.children.add(node)
self.children_names.add(node.name)
- def full_path(self, part):
+ def full_path(self, part: 'Partition') -> str:
"""Return the full path of this file."""
if self.parent is not None:
parent = part[self.parent]
@@ -98,7 +100,7 @@ def full_path(self, part):
else:
return self.name
- def get_content(self, partition):
+ def get_content(self, partition: 'Partition') -> Optional[Union[bytes, Iterator[bytes]]]:
# pylint: disable=W0613
"""Extract the content of the file.
@@ -109,14 +111,14 @@ def get_content(self, partition):
raise NotImplementedError
# pylint: disable=R0201
- def ignore(self):
+ def ignore(self) -> bool:
"""The following method is used by the restore procedure to check
files that should not be recovered. For example, in NTFS file
$BadClus:$Bad shall not be recovered because it creates an output
with the same size as the partition (usually many GBs)."""
return False
- def __repr__(self):
+ def __repr__(self) -> str:
return (
u'File(#%s, ^^%s^^, %s, offset = %s sectors)' %
(self.index, self.parent, self.name, self.offset)
@@ -128,42 +130,42 @@ class Partition(object):
Parameter root_id represents the identifier assigned to the root directory
of a partition. This can be file system dependent."""
- def __init__(self, fs_type, root_id, scanner):
- self.fs_type = fs_type
- self.root_id = root_id
- self.size = None
- self.offset = None
- self.root = None
- self.lost = File(-1, 'LostFiles', 0, is_directory=True, is_ghost=True)
- self.files = {}
- self.recoverable = False
- self.scanner = scanner
-
- def add_file(self, node):
+ def __init__(self, fs_type: str, root_id: Union[int, str], scanner: 'DiskScanner') -> None:
+ self.fs_type: str = fs_type
+ self.root_id: Union[int, str] = root_id
+ self.size: Optional[int] = None
+ self.offset: Optional[int] = None
+ self.root: Optional[File] = None
+ self.lost: File = File(-1, 'LostFiles', 0, is_directory=True, is_ghost=True)
+ self.files: Dict[Union[int, str], File] = {}
+ self.recoverable: bool = False
+ self.scanner: 'DiskScanner' = scanner
+
+ def add_file(self, node: File) -> None:
"""Insert a new file in the partition."""
index = node.index
self.files[index] = node
- def set_root(self, node):
+ def set_root(self, node: File) -> None:
"""Set the root directory."""
if not node.is_directory:
raise TypeError('Not a directory')
self.root = node
self.root.set_parent(None)
- def set_size(self, size):
+ def set_size(self, size: int) -> None:
"""Set the (estimated) size of the partition."""
self.size = size
- def set_offset(self, offset):
+ def set_offset(self, offset: int) -> None:
"""Set the offset from the beginning of the disk."""
self.offset = offset
- def set_recoverable(self, recoverable):
+ def set_recoverable(self, recoverable: bool) -> None:
"""State if the partition contents are also recoverable."""
self.recoverable = recoverable
- def rebuild(self):
+ def rebuild(self) -> None:
"""Rebuild the partition structure.
This method processes the contents of files and it rebuilds the
@@ -201,11 +203,11 @@ def rebuild(self):
return
# pylint: disable=R0201
- def additional_repr(self):
+ def additional_repr(self) -> List[Tuple[str, Any]]:
"""Return additional values to show in the string representation."""
return []
- def __repr__(self):
+ def __repr__(self) -> str:
size = (
readable_bytes(self.size * sector_size)
if self.size is not None else '??? b'
@@ -227,14 +229,14 @@ def __repr__(self):
', '.join(a+': '+str(b) for a, b in data)
)
- def __getitem__(self, index):
+ def __getitem__(self, index: Union[int, str]) -> File:
if index in self.files:
return self.files[index]
if index == self.lost.index:
return self.lost
raise KeyError
- def get(self, index, default=None):
+ def get(self, index: Union[int, str], default: Optional[File] = None) -> Optional[File]:
"""Get a file or the special LostFiles directory."""
try:
return self.__getitem__(index)
@@ -244,17 +246,22 @@ def get(self, index, default=None):
class DiskScanner(object):
"""Abstract stub for the implementation of disk scanners."""
- def __init__(self, pointer):
- self.image = pointer
+ def __init__(self, pointer: Any) -> None:
+ self.image: Any = pointer
- def get_image(self):
+ def get_image(self) -> Any:
"""Return the image reference."""
return self.image
- def feed(self, index, sector):
+ @staticmethod
+ def get_image(scanner: 'DiskScanner') -> Any:
+ """Static method to get image from scanner instance."""
+ return scanner.image
+
+ def feed(self, index: int, sector: bytes) -> Optional[str]:
"""Feed a new sector."""
raise NotImplementedError
- def get_partitions(self):
+ def get_partitions(self) -> Dict[int, Partition]:
"""Get a list of the found partitions."""
raise NotImplementedError
diff --git a/recuperabit/fs/ntfs.py b/recuperabit/fs/ntfs.py
index f0e820c..20086fe 100644
--- a/recuperabit/fs/ntfs.py
+++ b/recuperabit/fs/ntfs.py
@@ -24,6 +24,7 @@
import logging
from collections import Counter
+from typing import Any, Dict, List, Optional, Tuple, Union, Iterator, Set
from .constants import max_sectors, sector_size
from .core_types import DiskScanner, File, Partition
@@ -36,7 +37,7 @@
from ..utils import merge, sectors, unpack
# Some attributes may appear multiple times
-multiple_attributes = set([
+multiple_attributes: Set[str] = set([
'$FILE_NAME',
'$DATA',
'$INDEX_ROOT',
@@ -45,11 +46,11 @@
])
# Size of records in sectors
-FILE_size = 2
-INDX_size = 8
+FILE_size: int = 2
+INDX_size: int = 8
-def best_name(entries):
+def best_name(entries: List[Tuple[int, str]]) -> Optional[str]:
"""Return the best file name available.
This function accepts a list of tuples formed by a namespace and a string.
@@ -66,7 +67,7 @@ def best_name(entries):
return name if len(name) else None
-def parse_mft_attr(attr):
+def parse_mft_attr(attr: bytes) -> Tuple[Dict[str, Any], Optional[str]]:
"""Parse the contents of a MFT attribute."""
header = unpack(attr, attr_header_fmt)
attr_type = header['type']
@@ -94,7 +95,7 @@ def parse_mft_attr(attr):
return header, name
-def _apply_fixup_values(header, entry):
+def _apply_fixup_values(header: Dict[str, Any], entry: bytearray) -> None:
"""Apply the fixup values to FILE and INDX records."""
offset = header['off_fixup']
for i in range(1, header['n_entries']):
@@ -102,7 +103,7 @@ def _apply_fixup_values(header, entry):
entry[pos-2:pos] = entry[offset + 2*i:offset + 2*(i+1)]
-def _attributes_reader(entry, offset):
+def _attributes_reader(entry: bytes, offset: int) -> Dict[str, Any]:
"""Read every attribute."""
attributes = {}
while offset < len(entry) - 16:
@@ -133,7 +134,7 @@ def _attributes_reader(entry, offset):
return attributes
-def parse_file_record(entry):
+def parse_file_record(entry: bytes) -> Dict[str, Any]:
"""Parse the contents of a FILE record (MFT entry)."""
header = unpack(entry, entry_fmt)
if (header['size_alloc'] is None or
@@ -154,7 +155,7 @@ def parse_file_record(entry):
return header
-def parse_indx_record(entry):
+def parse_indx_record(entry: bytes) -> Dict[str, Any]:
"""Parse the contents of a INDX record (directory index)."""
header = unpack(entry, indx_fmt)
@@ -200,7 +201,7 @@ def parse_indx_record(entry):
return header
-def _integrate_attribute_list(parsed, part, image):
+def _integrate_attribute_list(parsed: Dict[str, Any], part: 'NTFSPartition', image: Any) -> None:
"""Integrate missing attributes in the parsed MTF entry."""
base_record = parsed['record_n']
attrs = parsed['attributes']
@@ -264,7 +265,7 @@ def _integrate_attribute_list(parsed, part, image):
class NTFSFile(File):
"""NTFS File."""
- def __init__(self, parsed, offset, is_ghost=False, ads=''):
+ def __init__(self, parsed: Dict[str, Any], offset: Optional[int], is_ghost: bool = False, ads: str = '') -> None:
index = parsed['record_n']
ads_suffix = ':' + ads if ads != '' else ads
if ads != '':
@@ -322,7 +323,7 @@ def __init__(self, parsed, offset, is_ghost=False, ads=''):
self.ads = ads
@staticmethod
- def _padded_bytes(image, offset, size):
+ def _padded_bytes(image: Any, offset: int, size: int) -> bytes:
dump = sectors(image, offset, size, 1)
if len(dump) < size:
logging.warning(
@@ -331,7 +332,7 @@ def _padded_bytes(image, offset, size):
dump += bytearray(b'\x00' * (size - len(dump)))
return dump
- def content_iterator(self, partition, image, datas):
+ def content_iterator(self, partition: 'NTFSPartition', image: Any, datas: List[Dict[str, Any]]) -> Iterator[bytes]:
"""Return an iterator for the contents of this file."""
vcn = 0
spc = partition.sec_per_clus
@@ -378,7 +379,7 @@ def content_iterator(self, partition, image, datas):
yield bytes(partial)
vcn = attr['end_VCN'] + 1
- def get_content(self, partition):
+ def get_content(self, partition: 'NTFSPartition') -> Optional[Union[bytes, Iterator[bytes]]]:
"""Extract the content of the file.
This method works by extracting the $DATA attribute."""
@@ -439,7 +440,7 @@ def get_content(self, partition):
)
return self.content_iterator(partition, image, non_resident)
- def ignore(self):
+ def ignore(self) -> bool:
"""Determine which files should be ignored."""
return (
(self.index == '8:$Bad') or
@@ -449,13 +450,13 @@ def ignore(self):
class NTFSPartition(Partition):
"""Partition with additional fields for NTFS recovery."""
- def __init__(self, scanner, position=None):
+ def __init__(self, scanner: 'NTFSScanner', position: Optional[int] = None) -> None:
Partition.__init__(self, 'NTFS', 5, scanner)
- self.sec_per_clus = None
- self.mft_pos = position
- self.mftmirr_pos = None
+ self.sec_per_clus: Optional[int] = None
+ self.mft_pos: Optional[int] = position
+ self.mftmirr_pos: Optional[int] = None
- def additional_repr(self):
+ def additional_repr(self) -> List[Tuple[str, Any]]:
"""Return additional values to show in the string representation."""
return [
('Sec/Clus', self.sec_per_clus),
@@ -466,17 +467,17 @@ def additional_repr(self):
class NTFSScanner(DiskScanner):
"""NTFS Disk Scanner."""
- def __init__(self, pointer):
+ def __init__(self, pointer: Any) -> None:
DiskScanner.__init__(self, pointer)
- self.found_file = set()
- self.parsed_file_review = {}
- self.found_indx = set()
- self.parsed_indx = {}
- self.indx_list = None
- self.found_boot = []
- self.found_spc = []
-
- def feed(self, index, sector):
+ self.found_file: Set[int] = set()
+ self.parsed_file_review: Dict[int, Dict[str, Any]] = {}
+ self.found_indx: Set[int] = set()
+ self.parsed_indx: Dict[int, Dict[str, Any]] = {}
+ self.indx_list: Optional[SparseList[int]] = None
+ self.found_boot: List[int] = []
+ self.found_spc: List[int] = []
+
+ def feed(self, index: int, sector: bytes) -> Optional[str]:
"""Feed a new sector."""
# check boot sector
if sector.endswith(b'\x55\xAA') and b'NTFS' in sector[:8]:
@@ -494,7 +495,7 @@ def feed(self, index, sector):
return 'NTFS index record'
@staticmethod
- def add_indx_entries(entries, part):
+ def add_indx_entries(entries: List[Dict[str, Any]], part: NTFSPartition) -> None:
"""Insert new ghost files which were not already found."""
for rec in entries:
if (rec['record_n'] not in part.files and
@@ -512,7 +513,7 @@ def add_indx_entries(entries, part):
rec['flags'] = 0x1
part.add_file(NTFSFile(rec, None, is_ghost=True))
- def add_from_indx_root(self, parsed, part):
+ def add_from_indx_root(self, parsed: Dict[str, Any], part: NTFSPartition) -> None:
"""Add ghost entries to part from INDEX_ROOT attributes in parsed."""
for attribute in parsed['attributes']['$INDEX_ROOT']:
if (attribute.get('content') is None or
@@ -520,7 +521,7 @@ def add_from_indx_root(self, parsed, part):
continue
self.add_indx_entries(attribute['content']['records'], part)
- def most_likely_sec_per_clus(self):
+ def most_likely_sec_per_clus(self) -> List[int]:
"""Determine the most likely value of sec_per_clus of each partition,
to speed up the search."""
counter = Counter()
@@ -528,7 +529,7 @@ def most_likely_sec_per_clus(self):
counter.update(2**i for i in range(8))
return [i for i, _ in counter.most_common()]
- def find_boundary(self, part, mft_address, multipliers):
+ def find_boundary(self, part: NTFSPartition, mft_address: int, multipliers: List[int]) -> Tuple[Optional[int], Optional[int]]:
"""Determine the starting sector of a partition with INDX records."""
nodes = (
self.parsed_file_review[node.offset]
@@ -593,7 +594,7 @@ def find_boundary(self, part, mft_address, multipliers):
else:
return (None, None)
- def add_from_indx_allocation(self, parsed, part):
+ def add_from_indx_allocation(self, parsed: Dict[str, Any], part: NTFSPartition) -> None:
"""Add ghost entries to part from INDEX_ALLOCATION attributes in parsed.
This procedure requires that the beginning of the partition has already
@@ -625,7 +626,7 @@ def add_from_indx_allocation(self, parsed, part):
entries = parse_indx_record(dump)['entries']
self.add_indx_entries(entries, part)
- def add_from_attribute_list(self, parsed, part, offset):
+ def add_from_attribute_list(self, parsed: Dict[str, Any], part: NTFSPartition, offset: int) -> None:
"""Add additional entries to part from attributes in ATTRIBUTE_LIST.
Files with many attributes may have additional attributes not in the
@@ -643,7 +644,7 @@ def add_from_attribute_list(self, parsed, part, offset):
if ads_name and len(ads_name):
part.add_file(NTFSFile(parsed, offset, ads=ads_name))
- def add_from_mft_mirror(self, part):
+ def add_from_mft_mirror(self, part: NTFSPartition) -> None:
"""Fix the first file records using the MFT mirror."""
img = DiskScanner.get_image(self)
mirrpos = part.mftmirr_pos
@@ -664,7 +665,7 @@ def add_from_mft_mirror(self, part):
'%s from backup', node.index, node.name, part.offset
)
- def finalize_reconstruction(self, part):
+ def finalize_reconstruction(self, part: NTFSPartition) -> None:
"""Finish information gathering from a file.
This procedure requires that the beginning of the
@@ -693,9 +694,9 @@ def finalize_reconstruction(self, part):
parsed = self.parsed_file_review[node.offset]
self.add_from_indx_allocation(parsed, part)
- def get_partitions(self):
+ def get_partitions(self) -> Dict[int, NTFSPartition]:
"""Get a list of the found partitions."""
- partitioned_files = {}
+ partitioned_files: Dict[int, NTFSPartition] = {}
img = DiskScanner.get_image(self)
logging.info('Parsing MFT entries')
diff --git a/recuperabit/logic.py b/recuperabit/logic.py
index e97052b..f6a5e34 100644
--- a/recuperabit/logic.py
+++ b/recuperabit/logic.py
@@ -20,37 +20,43 @@
import bisect
-import codecs
import logging
import os
-import os.path
+from pathlib import Path
import sys
import time
import types
+from typing import TYPE_CHECKING, Dict, List, Optional, Union, Iterator, Set, TypeVar, Generic
+from concurrent.futures import ThreadPoolExecutor
-from .utils import tiny_repr
+from recuperabit.utils import readable_bytes
+T = TypeVar('T')
-class SparseList(object):
+if TYPE_CHECKING:
+ from .fs.core_types import File, Partition
+
+
+class SparseList(Generic[T]):
"""List which only stores values at some places."""
- def __init__(self, data=None, default=None):
- self.keys = [] # This is always kept in order
- self.elements = {}
- self.default = default
+ def __init__(self, data: Optional[Dict[int, T]] = None, default: Optional[T] = None) -> None:
+ self.keys: List[int] = [] # This is always kept in order
+ self.elements: Dict[int, T] = {}
+ self.default: Optional[T] = default
if data is not None:
self.keys = sorted(data)
self.elements.update(data)
- def __len__(self):
+ def __len__(self) -> int:
try:
return self.keys[-1] + 1
except IndexError:
return 0
- def __getitem__(self, index):
+ def __getitem__(self, index: int) -> Optional[T]:
return self.elements.get(index, self.default)
- def __setitem__(self, index, item):
+ def __setitem__(self, index: int, item: T) -> None:
if item == self.default:
if index in self.elements:
del self.elements[index]
@@ -60,18 +66,18 @@ def __setitem__(self, index, item):
bisect.insort(self.keys, index)
self.elements[index] = item
- def __contains__(self, element):
+ def __contains__(self, element: T) -> bool:
return element in self.elements.values()
- def __iter__(self):
+ def __iter__(self) -> Iterator[int]:
return self.keys.__iter__()
- def __repr__(self):
+ def __repr__(self) -> str:
elems = []
prevk = 0
if len(self.elements) > 0:
k = self.keys[0]
- elems.append(str(k) + ' -> ' + tiny_repr(self.elements[k]))
+ elems.append(str(k) + ' -> ' + repr(self.elements[k]))
prevk = self.keys[0]
for i in range(1, len(self.elements)):
nextk = self.keys[i]
@@ -79,31 +85,31 @@ def __repr__(self):
while prevk < nextk - 1:
elems.append('__')
prevk += 1
- elems.append(tiny_repr(self.elements[nextk]))
+ elems.append(repr(self.elements[nextk]))
else:
elems.append('\n... ' + str(nextk) + ' -> ' +
- tiny_repr(self.elements[nextk]))
+ repr(self.elements[nextk]))
prevk = nextk
return '[' + ', '.join(elems) + ']'
- def iterkeys(self):
+ def iterkeys(self) -> Iterator[int]:
"""An iterator over the keys of actual elements."""
return self.__iter__()
- def iterkeys_rev(self):
+ def iterkeys_rev(self) -> Iterator[int]:
"""An iterator over the keys of actual elements (reversed)."""
i = len(self.keys)
while i > 0:
i -= 1
yield self.keys[i]
- def itervalues(self):
+ def itervalues(self) -> Iterator[T]:
"""An iterator over the elements."""
for k in self.keys:
yield self.elements[k]
- def wipe_interval(self, bottom, top):
+ def wipe_interval(self, bottom: int, top: int) -> None:
"""Remove elements between bottom and top."""
new_keys = set()
if bottom > top:
@@ -121,12 +127,12 @@ def wipe_interval(self, bottom, top):
self.keys = sorted(new_keys)
-def preprocess_pattern(pattern):
+def preprocess_pattern(pattern: SparseList[T]) -> Dict[T, List[int]]:
"""Preprocess a SparseList for approximate string matching.
This function performs preprocessing for the Baeza-Yates--Perleberg
fast and practical approximate string matching algorithm."""
- result = {}
+ result: Dict[T, List[int]] = {}
length = pattern.__len__()
for k in pattern:
name = pattern[k]
@@ -137,7 +143,7 @@ def preprocess_pattern(pattern):
return result
-def approximate_matching(records, pattern, stop, k=1):
+def approximate_matching(records: SparseList[T], pattern: SparseList[T], stop: int, k: int = 1) -> Optional[List[Union[Set[int], int, float]]]:
"""Find the best match for a given pattern.
The Baeza-Yates--Perleberg algorithm requires a preprocessed pattern. This
@@ -152,8 +158,8 @@ def approximate_matching(records, pattern, stop, k=1):
return None
lookup = preprocess_pattern(pattern)
- count = SparseList(default=0)
- match_offsets = set()
+ count: SparseList[int] = SparseList(default=0)
+ match_offsets: Set[int] = set()
i = 0
j = 0 # previous value of i
@@ -192,65 +198,43 @@ def approximate_matching(records, pattern, stop, k=1):
return None
-def makedirs(path):
+def makedirs(path: str | Path) -> bool:
"""Make directories if they do not exist."""
+ path = Path(path)
try:
- os.makedirs(path)
+ path.mkdir(parents=True, exist_ok=True)
+ except FileExistsError:
+ logging.error(f"makedirs: {path} already exists and is not a directory")
except OSError:
_, value, _ = sys.exc_info()
- # The directory already exists = no problem
- if value.errno != 17:
- logging.error(value)
- return False
+ logging.error(value)
+ return False
return True
-
-def recursive_restore(node, part, outputdir, make_dirs=True):
- """Restore a directory structure starting from a file node."""
- parent_path = str(
- part[node.parent].full_path(part) if node.parent is not None
- else ''
- )
-
- file_path = os.path.join(parent_path, node.name)
- restore_parent_path = os.path.join(outputdir, parent_path)
- restore_path = os.path.join(outputdir, file_path)
-
+def file_restore(node: 'File', part: 'Partition', restore_path: Path) -> int:
+ """ Restore a single file to the given path. """
+
+ restored_bytes = 0
try:
content = node.get_content(part)
except NotImplementedError:
- logging.error(u'Restore of #%s %s is not supported', node.index,
- file_path)
+ logging.error(u'Restore of #%s %s is not supported', node.index, restore_path)
content = None
-
- if make_dirs:
- if not makedirs(restore_parent_path):
- return
-
+
is_directory = node.is_directory or len(node.children) > 0
- if is_directory:
- logging.info(u'Restoring #%s %s', node.index, file_path)
- if not makedirs(restore_path):
- return
-
- if is_directory and content is not None:
- logging.warning(u'Directory %s has data content!', file_path)
- restore_path += '_recuperabit_content'
-
try:
if content is not None:
- logging.info(u'Restoring #%s %s', node.index, file_path)
- with codecs.open(restore_path, 'wb') as outfile:
+ with restore_path.open('wb') as outfile:
if isinstance(content, types.GeneratorType):
for piece in content:
- outfile.write(piece)
+ restored_bytes += outfile.write(piece)
else:
- outfile.write(content)
+ restored_bytes += outfile.write(content)
else:
if not is_directory:
# Empty file
- open(restore_path, 'wb').close()
+ restore_path.touch()
except IOError:
logging.error(u'IOError when trying to create %s', restore_path)
@@ -263,10 +247,63 @@ def recursive_restore(node, part, outputdir, make_dirs=True):
os.utime(restore_path, (atime, mtime))
except IOError:
logging.error(u'IOError while setting atime and mtime of %s', restore_path)
+
+ logging.info(u'Copied %s bytes to %s', readable_bytes(restored_bytes), restore_path)
+
+ return restored_bytes
- if is_directory:
- for child in node.children:
- if not child.ignore():
- recursive_restore(child, part, outputdir, make_dirs=False)
- else:
- logging.info(u'Skipping ignored file {}'.format(child))
+def recursive_restore(node: 'File', part: 'Partition', outputdir: str, make_dirs: bool = True) -> None:
+ """Restore a directory structure starting from a file node."""
+ # Use a stack for iterative depth-first traversal
+ stack = [node]
+ file_copy_queue: list[tuple['File', Path]] = []
+
+ while stack:
+ current_node = stack.pop()
+
+ logging.info(u'Restoring #%s %s', current_node.index, current_node.name)
+
+ try:
+ parent_path = str(
+ part[current_node.parent].full_path(part) if current_node.parent is not None
+ else ''
+ )
+
+ file_path = Path(parent_path) / current_node.name
+ restore_path = Path(outputdir) / file_path
+
+ if make_dirs:
+ restore_path.parent.mkdir(parents=True, exist_ok=True)
+
+ is_directory = current_node.is_directory or len(current_node.children) > 0
+
+ if is_directory:
+ if not makedirs(restore_path):
+ continue
+
+ if is_directory and current_node.size is not None and current_node.size > 0:
+ logging.warning(u'Directory %s has data content!', file_path)
+ restore_path = Path(str(restore_path) + '_recuperabit_content')
+
+ file_copy_queue.append((current_node, restore_path))
+
+ # Add children to stack for processing (in reverse order to maintain depth-first traversal)
+ if is_directory:
+ for child in current_node.children:
+ if not child.ignore():
+ logging.info(u'Adding child file %s to stack', child.name)
+ stack.append(child)
+ else:
+ logging.info(u'Skipping ignored file %s', child.name)
+
+ except Exception as e:
+ logging.error(u'Error restoring #%s %s: %s', current_node.index, current_node.name, e)
+
+ def _file_restore(tuple_item: tuple['File', Path]) -> int:
+ node, path = tuple_item
+ return file_restore(node, part, path)
+
+ # Process file copy queue with a ThreadPool, using 4 threads (more threads hurt performance on most storage devices)
+ with ThreadPoolExecutor(max_workers=4) as executor:
+ restored_bytes = sum(executor.map(_file_restore, file_copy_queue))
+ logging.info(u'Total restored bytes: %s', readable_bytes(restored_bytes))
diff --git a/recuperabit/utils.py b/recuperabit/utils.py
index 3ee1424..0303390 100644
--- a/recuperabit/utils.py
+++ b/recuperabit/utils.py
@@ -19,25 +19,31 @@
# along with RecuperaBit. If not, see .
+from datetime import datetime
import logging
import pprint
import string
import sys
import time
+from typing import TYPE_CHECKING, Any, Optional, List, Dict, Tuple, Union, Callable
import unicodedata
+import io
from .fs.constants import sector_size
-printer = pprint.PrettyPrinter(indent=4)
+printer: pprint.PrettyPrinter = pprint.PrettyPrinter(indent=4)
all_chars = (chr(i) for i in range(sys.maxunicode))
-unicode_printable = set(
+unicode_printable: set[str] = set(
c for c in all_chars
if not unicodedata.category(c)[0].startswith('C')
)
-ascii_printable = set(string.printable[:-5])
+ascii_printable: set[str] = set(string.printable[:-5])
+if TYPE_CHECKING:
+ from .fs.core_types import File, Partition
-def sectors(image, offset, size, bsize=sector_size, fill=True):
+
+def sectors(image: io.BufferedReader, offset: int, size: int, bsize: int = sector_size, fill: bool = True) -> Optional[bytearray]:
"""Read from a file descriptor."""
read = True
try:
@@ -60,7 +66,7 @@ def sectors(image, offset, size, bsize=sector_size, fill=True):
return None
return bytearray(dump)
-def unixtime(dtime):
+def unixtime(dtime: Optional[datetime]) -> int:
"""Convert datetime to UNIX epoch."""
if dtime is None:
return 0
@@ -72,9 +78,9 @@ def unixtime(dtime):
# format:
# [(label, (formatter, lower, higher)), ...]
-def unpack(data, fmt):
+def unpack(data: bytes, fmt: List[Tuple[str, Tuple[Union[str, Callable[[bytes], Any]], Union[int, Callable[[Dict[str, Any]], Optional[int]]], Union[int, Callable[[Dict[str, Any]], Optional[int]]]]]]) -> Dict[str, Any]:
"""Extract formatted information from a string of bytes."""
- result = {}
+ result: Dict[str, Any] = {}
for label, description in fmt:
formatter, lower, higher = description
# If lower is a function, then apply it
@@ -112,9 +118,9 @@ def unpack(data, fmt):
return result
-def feed_all(image, scanners, indexes):
+def feed_all(image: io.BufferedReader, scanners: List[Any], indexes: List[int]) -> List[int]:
# Scan the disk image and feed the scanners
- interesting = []
+ interesting: List[int] = []
for index in indexes:
sector = sectors(image, index, 1, fill=False)
if not sector:
@@ -128,29 +134,19 @@ def feed_all(image, scanners, indexes):
return interesting
-def printable(text, default='.', alphabet=None):
+def printable(text: str, default: str = '.', alphabet: Optional[set[str]] = None) -> str:
"""Replace unprintable characters in a text with a default one."""
if alphabet is None:
alphabet = unicode_printable
return ''.join((i if i in alphabet else default) for i in text)
-def pretty(dictionary):
- """Format dictionary with the pretty printer."""
- return printer.pformat(dictionary)
-def show(dictionary):
- """Print dictionary with the pretty printer."""
- printer.pprint(dictionary)
-def tiny_repr(element):
- """deprecated: Return a representation of unicode strings without the u."""
- rep = repr(element)
- return rep[1:] if type(element) == unicode else rep
-def readable_bytes(amount):
+def readable_bytes(amount: Optional[int]) -> str:
"""Return a human readable string representing a size in bytes."""
if amount is None:
return '??? B'
@@ -164,7 +160,7 @@ def readable_bytes(amount):
return '%.2f %sB' % (scaled, powers[biggest])
-def _file_tree_repr(node):
+def _file_tree_repr(node: 'File') -> str:
"""Give a nice representation for the tree."""
desc = (
' [GHOST]' if node.is_ghost else
@@ -188,9 +184,9 @@ def _file_tree_repr(node):
)
-def tree_folder(directory, padding=0):
+def tree_folder(directory: 'File', padding: int = 0) -> str:
"""Return a tree-like textual representation of a directory."""
- lines = []
+ lines: List[str] = []
pad = ' ' * padding
lines.append(
pad + _file_tree_repr(directory)
@@ -207,7 +203,7 @@ def tree_folder(directory, padding=0):
return '\n'.join(lines)
-def _bodyfile_repr(node, path):
+def _bodyfile_repr(node: 'File', path: str) -> str:
"""Return a body file line for node."""
end = '/' if node.is_directory or len(node.children) else ''
return '|'.join(str(el) for el in [
@@ -223,13 +219,13 @@ def _bodyfile_repr(node, path):
])
-def bodyfile_folder(directory, path=''):
+def bodyfile_folder(directory: 'File', path: str = '') -> List[str]:
"""Create a body file compatible with TSK 3.x.
Format:
'#MD5|name|inode|mode_as_string|UID|GID|size|atime|mtime|ctime|crtime'
See also: http://wiki.sleuthkit.org/index.php?title=Body_file"""
- lines = [_bodyfile_repr(directory, path)]
+ lines: List[str] = [_bodyfile_repr(directory, path)]
path += directory.name + '/'
for entry in directory.children:
if len(entry.children) or entry.is_directory:
@@ -239,7 +235,7 @@ def bodyfile_folder(directory, path=''):
return lines
-def _ltx_clean(label):
+def _ltx_clean(label: Any) -> str:
"""Small filter to prepare strings to be included in LaTeX code."""
clean = str(label).replace('$', r'\$').replace('_', r'\_')
if clean[0] == '-':
@@ -247,7 +243,7 @@ def _ltx_clean(label):
return clean
-def _tikz_repr(node):
+def _tikz_repr(node: 'File') -> str:
"""Represent the node for a Tikz diagram."""
return r'node %s{%s\enskip{}%s}' % (
'[ghost]' if node.is_ghost else '[deleted]' if node.is_deleted else '',
@@ -255,11 +251,11 @@ def _tikz_repr(node):
)
-def tikz_child(directory, padding=0):
+def tikz_child(directory: 'File', padding: int = 0) -> Tuple[str, int]:
"""Write a child row for Tikz representation."""
pad = ' ' * padding
- lines = [r'%schild {%s' % (pad, _tikz_repr(directory))]
- count = len(directory.children)
+ lines: List[str] = [r'%schild {%s' % (pad, _tikz_repr(directory))]
+ count: int = len(directory.children)
for entry in directory.children:
content, number = tikz_child(entry, padding+4)
lines.append(content)
@@ -270,7 +266,7 @@ def tikz_child(directory, padding=0):
return '\n'.join(lines).replace('\n}', '}'), count
-def tikz_part(part):
+def tikz_part(part: 'Partition') -> str:
"""Create LaTeX code to represent the directory structure as a nice Tikz
diagram.
@@ -296,7 +292,7 @@ def tikz_part(part):
)
-def csv_part(part):
+def csv_part(part: 'Partition') -> list[str]:
"""Provide a CSV representation for a partition."""
contents = [
','.join(('Id', 'Parent', 'Name', 'Full Path', 'Modification Time',
@@ -324,9 +320,9 @@ def csv_part(part):
return contents
-def _sub_locate(text, directory, part):
+def _sub_locate(text: str, directory: 'File', part: 'Partition') -> List[Tuple['File', str]]:
"""Helper for locate."""
- lines = []
+ lines: List[Tuple['File', str]] = []
for entry in sorted(directory.children, key=lambda node: node.name):
path = entry.full_path(part)
if text in path.lower():
@@ -336,16 +332,16 @@ def _sub_locate(text, directory, part):
return lines
-def locate(part, text):
+def locate(part: 'Partition', text: str) -> List[Tuple['File', str]]:
"""Return paths of files matching the text."""
- lines = []
+ lines: List[Tuple['File', str]] = []
text = text.lower()
lines += _sub_locate(text, part.lost, part)
lines += _sub_locate(text, part.root, part)
return lines
-def merge(part, piece):
+def merge(part: 'Partition', piece: 'Partition') -> None:
"""Merge piece into part (both are partitions)."""
for index in piece.files:
if (