- 
                Notifications
    
You must be signed in to change notification settings  - Fork 0
 
Feature/zip support #29
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 7 commits
e54368c
              4e5dcd9
              63c64f7
              dfe3df3
              e6518bd
              c3c2cf3
              3f171f0
              edd8e65
              7d9df08
              File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1,53 @@ | ||||||||||||||||||||||||||
| import os | ||||||||||||||||||||||||||
| import zipfile | ||||||||||||||||||||||||||
| import tempfile | ||||||||||||||||||||||||||
| import shutil | ||||||||||||||||||||||||||
| 
     | 
||||||||||||||||||||||||||
| class ArchiveHandler: | ||||||||||||||||||||||||||
| def __init__(self): | ||||||||||||||||||||||||||
| self.temp_dir = None | ||||||||||||||||||||||||||
| 
     | 
||||||||||||||||||||||||||
| def process_zip(self, zip_path, root_dir): | ||||||||||||||||||||||||||
        
    
 | 
||||||||||||||||||||||||||
| """ | ||||||||||||||||||||||||||
| Process a zip file and return paths to its contents. | ||||||||||||||||||||||||||
| Parameters: | ||||||||||||||||||||||||||
| ------------ | ||||||||||||||||||||||||||
| zip_path - String. Path to the zip file. | ||||||||||||||||||||||||||
| root_dir - String. Root directory for relative path calculations. | ||||||||||||||||||||||||||
| Returns: | ||||||||||||||||||||||||||
| --------- | ||||||||||||||||||||||||||
| List of tuples (file_path, relative_path) for files in the zip. | ||||||||||||||||||||||||||
| """ | ||||||||||||||||||||||||||
| if not zipfile.is_zipfile(zip_path): | ||||||||||||||||||||||||||
| return [] | ||||||||||||||||||||||||||
| 
     | 
||||||||||||||||||||||||||
| # Create a temporary directory for extraction | ||||||||||||||||||||||||||
| self.temp_dir = tempfile.mkdtemp() | ||||||||||||||||||||||||||
| 
     | 
||||||||||||||||||||||||||
| try: | ||||||||||||||||||||||||||
| with zipfile.ZipFile(zip_path, 'r') as zip_ref: | ||||||||||||||||||||||||||
| # Extract all contents to temp directory | ||||||||||||||||||||||||||
| zip_ref.extractall(self.temp_dir) | ||||||||||||||||||||||||||
| 
         
      Comment on lines
    
      +27
     to 
      +32
    
   
  
    
 | 
||||||||||||||||||||||||||
| self.temp_dir = tempfile.mkdtemp() | |
| try: | |
| with zipfile.ZipFile(zip_path, 'r') as zip_ref: | |
| # Extract all contents to temp directory | |
| zip_ref.extractall(self.temp_dir) | |
| temp_dir = tempfile.mkdtemp() | |
| try: | |
| with zipfile.ZipFile(zip_path, 'r') as zip_ref: | |
| # Extract all contents to temp directory | |
| zip_ref.extractall(temp_dir) | 
| Original file line number | Diff line number | Diff line change | ||||||||
|---|---|---|---|---|---|---|---|---|---|---|
| 
          
            
          
           | 
    @@ -5,13 +5,13 @@ class Hasher: | |||||||||
| def __init__(self, algorithm='md5'): | ||||||||||
| self.algorithm = algorithm | ||||||||||
| 
     | 
||||||||||
| def checksum_file(self, file_path, algorithm=None, length=None): | ||||||||||
| def checksum_file(self, file_path_or_obj, algorithm=None, length=None): | ||||||||||
| """ | ||||||||||
| Calculate the checksum of a file using the specified algorithm. | ||||||||||
| 
     | 
||||||||||
| Parameters: | ||||||||||
| ------------ | ||||||||||
| file_path - String. Path to file to apply checksum function. | ||||||||||
| file_path_or_obj - String or file-like object. Path to file or file-like object to apply checksum function. | ||||||||||
| algorithm - String. Hash function to use for checksums. Default: 'md5', see options with 'hashlib.algorithms_available'. | ||||||||||
| length - Integer [optional]. Length of the digest for SHAKE and BLAKE algorithms in bytes. | ||||||||||
| 
     | 
||||||||||
| 
          
            
          
           | 
    @@ -55,9 +55,14 @@ def checksum_file(self, file_path, algorithm=None, length=None): | |||||||||
| raise LengthUsedForFixedLengthHashError(algorithm) | ||||||||||
| hash_func = hashlib.new(algorithm) | ||||||||||
| 
     | 
||||||||||
| # Read the file and update the hash function | ||||||||||
| with open(file_path, "rb") as f: | ||||||||||
| for chunk in iter(lambda: f.read(4096), b""): | ||||||||||
| # Handle both file paths and file-like objects | ||||||||||
| if isinstance(file_path_or_obj, str): | ||||||||||
| with open(file_path_or_obj, "rb") as f: | ||||||||||
| for chunk in iter(lambda: f.read(4096), b""): | ||||||||||
| hash_func.update(chunk) | ||||||||||
| else: | ||||||||||
| # Assume it's a file-like object | ||||||||||
        
    
 | 
||||||||||
| # Assume it's a file-like object | |
| # Validate that the object supports the 'read' method | |
| if not hasattr(file_path_or_obj, 'read'): | |
| raise TypeError("The provided object is not a valid file-like object. It must support the 'read()' method.") | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm having a hard time concocting a scenario where not hasattr(file_path_or_obj, 'read') would be true. The gather_file_paths and ArchiveHandler.stream_zip() should always return something that can be read. The only thing I can think of is if someone tries to manually do something like checksum_file(321) which would give a pretty clear error already of AttributeError: 'int' object has no attribute 'read'. I think we can leave this one alone for now.
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -1,10 +1,13 @@ | ||||||||||||||||||||||||
| import os | ||||||||||||||||||||||||
| import zipfile | ||||||||||||||||||||||||
| from sumbuddy.filter import Filter | ||||||||||||||||||||||||
| from sumbuddy.exceptions import EmptyInputDirectoryError, NoFilesAfterFilteringError, NotADirectoryError | ||||||||||||||||||||||||
| from sumbuddy.archive import ArchiveHandler | ||||||||||||||||||||||||
| 
     | 
||||||||||||||||||||||||
| class Mapper: | ||||||||||||||||||||||||
| def __init__(self): | ||||||||||||||||||||||||
| self.filter_manager = Filter() | ||||||||||||||||||||||||
| self.archive_handler = ArchiveHandler() | ||||||||||||||||||||||||
| 
     | 
||||||||||||||||||||||||
| def reset_filter(self, ignore_file=None, include_hidden=False): | ||||||||||||||||||||||||
| """ | ||||||||||||||||||||||||
| 
          
            
          
           | 
    @@ -56,7 +59,18 @@ def gather_file_paths(self, input_directory, ignore_file=None, include_hidden=Fa | |||||||||||||||||||||||
| file_path = os.path.join(root, name) | ||||||||||||||||||||||||
| if self.filter_manager.should_include(file_path, root_directory): | ||||||||||||||||||||||||
| file_paths.append(file_path) | ||||||||||||||||||||||||
| # If it's a zip file, process its contents | ||||||||||||||||||||||||
| if zipfile.is_zipfile(file_path): | ||||||||||||||||||||||||
| try: | ||||||||||||||||||||||||
| zip_contents = self.archive_handler.process_zip(file_path, root_directory) | ||||||||||||||||||||||||
| for _, zip_path in zip_contents: | ||||||||||||||||||||||||
| if self.filter_manager.should_include(zip_path, root_directory): | ||||||||||||||||||||||||
| file_paths.append(zip_path) | ||||||||||||||||||||||||
| finally: | ||||||||||||||||||||||||
| pass | ||||||||||||||||||||||||
                
       | 
||||||||||||||||||||||||
| try: | |
| zip_contents = self.archive_handler.process_zip(file_path, root_directory) | |
| for _, zip_path in zip_contents: | |
| if self.filter_manager.should_include(zip_path, root_directory): | |
| file_paths.append(zip_path) | |
| finally: | |
| pass | |
| zip_contents = self.archive_handler.process_zip(file_path, root_directory) | |
| for _, zip_path in zip_contents: | |
| if self.filter_manager.should_include(zip_path, root_directory): | |
| file_paths.append(zip_path) | 
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
| @@ -0,0 +1,181 @@ | ||
| import tempfile | ||
| import zipfile | ||
| from pathlib import Path | ||
| 
     | 
||
| from sumbuddy.archive import ArchiveHandler | ||
| from sumbuddy.mapper import Mapper | ||
| from sumbuddy.hasher import Hasher | ||
| 
     | 
||
| 
     | 
||
| class TestArchiveHandler: | ||
| """Test cases for ArchiveHandler class.""" | ||
| 
     | 
||
| def test_process_zip_success(self): | ||
| """Test successful zip file processing.""" | ||
| handler = ArchiveHandler() | ||
| test_zip_path = Path(__file__).parent / "test_archive.zip" | ||
| 
     | 
||
| # Ensure test zip exists | ||
| assert test_zip_path.exists(), "Test zip file not found" | ||
| 
     | 
||
| with tempfile.TemporaryDirectory() as temp_dir: | ||
| extracted_files = handler.process_zip(str(test_zip_path), temp_dir) | ||
| 
     | 
||
| # Should return list of tuples (file_path, relative_path) | ||
| assert len(extracted_files) == 2 | ||
| assert any("test_file.txt" in str(f[1]) for f in extracted_files) | ||
| assert any("nested_file.txt" in str(f[1]) for f in extracted_files) | ||
| 
     | 
||
| # Check that files were actually extracted | ||
| for file_path, _ in extracted_files: | ||
| assert Path(file_path).exists() | ||
| 
     | 
||
| def test_process_zip_invalid_file(self): | ||
| """Test processing non-zip file.""" | ||
| handler = ArchiveHandler() | ||
| 
     | 
||
| with tempfile.TemporaryDirectory() as temp_dir: | ||
| # Create a non-zip file | ||
| non_zip_file = Path(temp_dir) / "not_a_zip.txt" | ||
| non_zip_file.write_text("This is not a zip file") | ||
| 
     | 
||
| # Should return empty list for non-zip files | ||
| result = handler.process_zip(str(non_zip_file), temp_dir) | ||
| assert result == [] | ||
| 
     | 
||
| def test_process_zip_nonexistent_file(self): | ||
| """Test processing non-existent file.""" | ||
| handler = ArchiveHandler() | ||
| 
     | 
||
| with tempfile.TemporaryDirectory() as temp_dir: | ||
| non_existent_file = Path(temp_dir) / "nonexistent.zip" | ||
| 
     | 
||
| # Should return empty list for non-existent files | ||
| result = handler.process_zip(str(non_existent_file), temp_dir) | ||
| assert result == [] | ||
| 
     | 
||
| 
     | 
||
| class TestMapperWithZip: | ||
| """Test cases for Mapper class with zip file support.""" | ||
| 
     | 
||
| def test_gather_file_paths_with_zip(self): | ||
| """Test gathering file paths including zip files.""" | ||
| mapper = Mapper() | ||
| test_zip_path = Path(__file__).parent / "test_archive.zip" | ||
| 
     | 
||
| # Create a temporary directory with the test zip | ||
| with tempfile.TemporaryDirectory() as temp_dir: | ||
| temp_zip_path = Path(temp_dir) / "test_archive.zip" | ||
| # Copy test zip to temp directory | ||
| import shutil | ||
| shutil.copy2(test_zip_path, temp_zip_path) | ||
| 
     | 
||
| file_paths = mapper.gather_file_paths(temp_dir) | ||
| 
     | 
||
| # Should include the zip file itself | ||
| assert str(temp_zip_path) in file_paths | ||
| 
     | 
||
| # Should include files from within the zip | ||
| zip_file_paths = [p for p in file_paths if "test_archive.zip/" in p] | ||
| assert len(zip_file_paths) == 2 | ||
| assert any("test_file.txt" in p for p in zip_file_paths) | ||
| assert any("nested_file.txt" in p for p in zip_file_paths) | ||
| 
     | 
||
| def test_gather_file_paths_with_zip_and_filter(self): | ||
| """Test gathering file paths with zip files and filters.""" | ||
| mapper = Mapper() | ||
| test_zip_path = Path(__file__).parent / "test_archive.zip" | ||
| 
     | 
||
| # Create a temporary directory with the test zip | ||
| with tempfile.TemporaryDirectory() as temp_dir: | ||
| temp_zip_path = Path(temp_dir) / "test_archive.zip" | ||
| import shutil | ||
| shutil.copy2(test_zip_path, temp_zip_path) | ||
| 
     | 
||
| # Create an ignore file to exclude nested files | ||
| ignore_file = Path(temp_dir) / ".ignore" | ||
| ignore_file.write_text("**/nested_dir/**") | ||
| 
     | 
||
| file_paths = mapper.gather_file_paths(temp_dir, ignore_file=str(ignore_file)) | ||
| 
     | 
||
| # Should include the zip file itself | ||
| assert str(temp_zip_path) in file_paths | ||
| 
     | 
||
| # Should include only non-nested files from zip | ||
| zip_file_paths = [p for p in file_paths if "test_archive.zip/" in p] | ||
| assert len(zip_file_paths) == 1 | ||
| assert any("test_file.txt" in p for p in zip_file_paths) | ||
| assert not any("nested_file.txt" in p for p in zip_file_paths) | ||
| 
     | 
||
| 
     | 
||
| class TestHasherWithZip: | ||
| """Test cases for Hasher class with zip file support.""" | ||
| 
     | 
||
| def test_checksum_file_with_file_like_object(self): | ||
| """Test checksum calculation with file-like object.""" | ||
| hasher = Hasher() | ||
| test_zip_path = Path(__file__).parent / "test_archive.zip" | ||
| 
     | 
||
| # Test with zip file | ||
| with zipfile.ZipFile(test_zip_path, 'r') as zip_file: | ||
| # Get the first file in the zip | ||
| file_name = zip_file.namelist()[0] | ||
| with zip_file.open(file_name) as file_obj: | ||
| checksum = hasher.checksum_file(file_obj) | ||
| 
     | 
||
| # Should return a valid checksum | ||
| assert isinstance(checksum, str) | ||
| assert len(checksum) > 0 | ||
| 
     | 
||
| def test_checksum_file_with_zip_file_path(self): | ||
| """Test checksum calculation with zip file path.""" | ||
| hasher = Hasher() | ||
| test_zip_path = Path(__file__).parent / "test_archive.zip" | ||
| 
     | 
||
| checksum = hasher.checksum_file(str(test_zip_path)) | ||
| 
     | 
||
| # Should return a valid checksum | ||
| assert isinstance(checksum, str) | ||
| assert len(checksum) > 0 | ||
| 
     | 
||
| 
     | 
||
| def test_integration_zip_support(): | ||
| """Integration test for zip support functionality.""" | ||
| from sumbuddy import get_checksums | ||
| import tempfile | ||
| import csv | ||
| 
     | 
||
| test_zip_path = Path(__file__).parent / "test_archive.zip" | ||
| 
     | 
||
| with tempfile.TemporaryDirectory() as temp_dir: | ||
| temp_zip_path = Path(temp_dir) / "test_archive.zip" | ||
| import shutil | ||
| shutil.copy2(test_zip_path, temp_zip_path) | ||
| 
     | 
||
| output_file = Path(temp_dir) / "checksums.csv" | ||
| 
     | 
||
| # Run get_checksums on directory containing zip | ||
| get_checksums(temp_dir, output_file) | ||
| 
     | 
||
| # Verify output file was created | ||
| assert output_file.exists() | ||
| 
     | 
||
| # Read and verify CSV contents | ||
| with open(output_file, 'r') as f: | ||
| reader = csv.DictReader(f) | ||
| rows = list(reader) | ||
| 
     | 
||
| # Should have at least the zip file and its contents | ||
| assert len(rows) >= 3 | ||
| 
     | 
||
| # Should include zip file itself | ||
| zip_rows = [r for r in rows if r['filename'] == 'test_archive.zip'] | ||
| assert len(zip_rows) == 1 | ||
| 
     | 
||
| # Should include files from within zip | ||
| zip_content_rows = [r for r in rows if 'test_archive.zip/' in r['filepath']] | ||
| assert len(zip_content_rows) == 2 | ||
| 
     | 
||
| # All rows should have valid checksums | ||
| for row in rows: | ||
| assert row['md5'] and len(row['md5']) > 0 | 
Uh oh!
There was an error while loading. Please reload this page.