Skip to content

v0.4.0 Task 1.2.4: AggregateRepository Pattern #142

@gbrennon

Description

@gbrennon

Task Description

Epic: Epic 1.2: Repository Pattern Completion (#70)
Acceptance Criteria: AggregateRepository pattern for event sourcing, Snapshot support for large aggregates, Version conflict detection, Aggregate reconstruction from events

Implementation Details

Files to Create/Modify

  • src/forging_blocks/infrastructure/repositories/aggregate_repository.py (NEW)
  • src/forging_blocks/domain/aggregate_root.py (ENHANCE existing)
  • tests/unit/infrastructure/repositories/test_aggregate_repository.py (NEW)

AggregateRepository Interface

from abc import ABC, abstractmethod
from typing import TypeVar, Generic, List, Optional, AsyncIterator
from uuid import UUID

TAggregate = TypeVar('TAggregate', bound=AggregateRoot)

class AggregateRepository(ABC, Generic[TAggregate]):
    """Repository for aggregates with event sourcing support."""
    
    @abstractmethod
    async def save(self, aggregate: TAggregate) -> None:
        pass
    
    @abstractmethod
    async def get_by_id(self, id: UUID) -> Optional[TAggregate]:
        pass
    
    @abstractmethod
    async def get_events(self, aggregate_id: UUID, from_version: Optional[int] = None) -> List[DomainEvent]:
        pass
    
    @abstractmethod
    async def save_snapshot(self, aggregate: TAggregate) -> None:
        pass
    
    @abstractmethod
    async def get_snapshot(self, aggregate_id: UUID) -> Optional[AggregateSnapshot]:
        pass

Event Sourcing Integration

class EventSourcingAggregateRepository(AggregateRepository[TAggregate]):
    """Aggregate repository with event sourcing capabilities."""
    
    async def save(self, aggregate: TAggregate) -> None:
        # Get uncommitted events
        events = aggregate.get_uncommitted_events()
        
        # Save events to event store
        await self._event_store.append_events(aggregate.id, events, aggregate.version)
        
        # Clear uncommitted events
        aggregate.mark_events_as_committed()
    
    async def get_by_id(self, id: UUID) -> Optional[TAggregate]:
        # Try to get snapshot first
        snapshot = await self.get_snapshot(id)
        
        if snapshot:
            aggregate = snapshot.reconstruct_aggregate()
            events = await self.get_events(id, from_version=snapshot.version + 1)
        else:
            aggregate = self._aggregate_class.create_empty(id)
            events = await self.get_events(id)
        
        # Replay events
        for event in events:
            aggregate.apply(event)
        
        return aggregate

Acceptance Criteria

  • AggregateRepository interface with event sourcing support
  • Snapshot support for performance optimization
  • Version conflict detection with optimistic concurrency
  • Aggregate reconstruction from events and snapshots
  • Event store integration with existing EventStore
  • Comprehensive tests for event sourcing scenarios

Definition of Done

  • Event sourcing repository working with your AggregateRoot
  • Snapshot functionality for large aggregates
  • Version conflict handling with proper error types
  • All tests passing with event replay scenarios
  • Documentation with event sourcing examples

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions