Skip to content

v0.4.0 Task 1.4.2: FileSystemPort Interface #147

@gbrennon

Description

@gbrennon

Task Description

Epic: Epic 1.4: Infrastructure Adapter Patterns (#72)
Acceptance Criteria: FileSystemPort interface, File storage abstraction, Async file operations support, Streaming and chunked uploads

Implementation Details

Files to Create/Modify

  • src/forging_blocks/infrastructure/ports/filesystem.py (NEW)
  • src/forging_blocks/infrastructure/filesystem/__init__.py (NEW)
  • tests/unit/infrastructure/filesystem/test_filesystem.py (NEW)

FileSystemPort Interface

from abc import ABC, abstractmethod
from typing import AsyncIterator, BinaryIO, Optional, Union
from pathlib import Path
from dataclasses import dataclass
from enum import Enum
import aiofiles
from uuid import UUID

class StorageType(Enum):
    LOCAL = "local"
    S3 = "s3"
    AZURE = "azure"
    GCS = "gcs"

@dataclass(frozen=True)
class FileMetadata:
    file_id: UUID
    filename: str
    content_type: str
    size: int
    created_at: datetime
    updated_at: datetime
    storage_path: str
    checksum: Optional[str] = None

class FileSystemPort(ABC):
    """Port interface for file storage operations."""
    
    @abstractmethod
    async def upload_file(
        self,
        file_data: Union[bytes, BinaryIO],
        filename: str,
        content_type: str,
        metadata: Optional[Dict[str, Any]] = None
    ) -> FileMetadata:
        pass
    
    @abstractmethod
    async def download_file(self, file_id: UUID) -> AsyncIterator[bytes]:
        pass
    
    @abstractmethod
    async def get_file_metadata(self, file_id: UUID) -> Optional[FileMetadata]:
        pass
    
    @abstractmethod
    async def delete_file(self, file_id: UUID) -> bool:
        pass
    
    @abstractmethod
    async def list_files(
        self,
        prefix: Optional[str] = None,
        limit: Optional[int] = None
    ) -> List[FileMetadata]:
        pass
    
    @abstractmethod
    async def file_exists(self, file_id: UUID) -> bool:
        pass

class LocalFileSystem(FileSystemPort):
    """Local file system implementation."""
    
    def __init__(self, base_path: Path):
        self._base_path = base_path
        self._base_path.mkdir(parents=True, exist_ok=True)
    
    async def upload_file(
        self,
        file_data: Union[bytes, BinaryIO],
        filename: str,
        content_type: str,
        metadata: Optional[Dict[str, Any]] = None
    ) -> FileMetadata:
        import hashlib
        import uuid
        
        file_id = uuid.uuid4()
        file_path = self._base_path / str(file_id)
        
        # Handle file data
        if isinstance(file_data, bytes):
            data = file_data
        else:
            data = file_data.read()
        
        # Calculate checksum
        checksum = hashlib.md5(data).hexdigest()
        
        # Write file
        async with aiofiles.open(file_path, 'wb') as f:
            await f.write(data)
        
        # Create metadata
        file_metadata = FileMetadata(
            file_id=file_id,
            filename=filename,
            content_type=content_type,
            size=len(data),
            created_at=datetime.utcnow(),
            updated_at=datetime.utcnow(),
            storage_path=str(file_path),
            checksum=checksum
        )
        
        return file_metadata
    
    async def download_file(self, file_id: UUID) -> AsyncIterator[bytes]:
        file_path = self._base_path / str(file_id)
        
        if not file_path.exists():
            raise FileNotFoundError(f"File {file_id} not found")
        
        async with aiofiles.open(file_path, 'rb') as f:
            while chunk := await f.read(8192):  # 8KB chunks
                yield chunk
    
    async def get_file_metadata(self, file_id: UUID) -> Optional[FileMetadata]:
        file_path = self._base_path / str(file_id)
        
        if not file_path.exists():
            return None
        
        stat = file_path.stat()
        
        # For simplicity, we'll need to store metadata separately
        # In a real implementation, you might use a database or metadata files
        return FileMetadata(
            file_id=file_id,
            filename=file_path.name,
            content_type="application/octet-stream",  # Would need to be stored
            size=stat.st_size,
            created_at=datetime.fromtimestamp(stat.st_ctime),
            updated_at=datetime.fromtimestamp(stat.st_mtime),
            storage_path=str(file_path)
        )
    
    async def delete_file(self, file_id: UUID) -> bool:
        file_path = self._base_path / str(file_id)
        
        if file_path.exists():
            file_path.unlink()
            return True
        return False
    
    async def list_files(
        self,
        prefix: Optional[str] = None,
        limit: Optional[int] = None
    ) -> List[FileMetadata]:
        files = []
        
        for file_path in self._base_path.iterdir():
            if file_path.is_file():
                if prefix and not file_path.name.startswith(prefix):
                    continue
                
                stat = file_path.stat()
                file_metadata = FileMetadata(
                    file_id=UUID(file_path.name),
                    filename=file_path.name,
                    content_type="application/octet-stream",
                    size=stat.st_size,
                    created_at=datetime.fromtimestamp(stat.st_ctime),
                    updated_at=datetime.fromtimestamp(stat.st_mtime),
                    storage_path=str(file_path)
                )
                files.append(file_metadata)
        
        # Sort by creation time
        files.sort(key=lambda x: x.created_at, reverse=True)
        
        if limit:
            files = files[:limit]
        
        return files
    
    async def file_exists(self, file_id: UUID) -> bool:
        file_path = self._base_path / str(file_id)
        return file_path.exists()

Streaming Support

class StreamingFileSystem(FileSystemPort):
    """File system with streaming support for large files."""
    
    async def upload_file(
        self,
        file_data: Union[bytes, BinaryIO],
        filename: str,
        content_type: str,
        metadata: Optional[Dict[str, Any]] = None
    ) -> FileMetadata:
        # Implementation for streaming uploads
        pass
    
    async def download_file(self, file_id: UUID) -> AsyncIterator[bytes]:
        # Implementation for streaming downloads with progress
        pass
    
    async def chunked_upload(
        self,
        file_stream: AsyncIterator[bytes],
        filename: str,
        content_type: str,
        chunk_size: int = 1024 * 1024  # 1MB chunks
    ) -> FileMetadata:
        """Upload file in chunks for large files."""
        pass

Acceptance Criteria

  • FileSystemPort interface for file operations
  • Local file system implementation for development/testing
  • Async file operations with proper streaming
  • File metadata management with checksums
  • Chunked upload support for large files
  • File existence checking and listing

Definition of Done

  • File system port working with your Result pattern
  • Local implementation ready for testing
  • Streaming operations for large files
  • All tests passing with file operations
  • Documentation with file system examples

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions