Task Description
Epic: Epic 1.4: Infrastructure Adapter Patterns (#72)
Acceptance Criteria: LoggingPort abstraction, Structured logging interface, Correlation ID propagation, Context-aware logging
Implementation Details
Files to Create/Modify
src/forging_blocks/infrastructure/ports/logging.py (NEW)
src/forging_blocks/infrastructure/logging/__init__.py (NEW)
tests/unit/infrastructure/logging/test_logging.py (NEW)
LoggingPort Interface
from abc import ABC, abstractmethod
from typing import Dict, Any, Optional, Union
from enum import Enum
from datetime import datetime
from uuid import UUID
class LogLevel(Enum):
DEBUG = "debug"
INFO = "info"
WARNING = "warning"
ERROR = "error"
CRITICAL = "critical"
@dataclass(frozen=True)
class LogContext:
correlation_id: UUID
user_id: Optional[str] = None
operation: Optional[str] = None
metadata: Dict[str, Any] = None
class LoggingPort(ABC):
"""Port interface for structured logging."""
@abstractmethod
async def log(
self,
level: LogLevel,
message: str,
context: Optional[LogContext] = None,
extra: Optional[Dict[str, Any]] = None
) -> None:
pass
@abstractmethod
async def debug(
self,
message: str,
context: Optional[LogContext] = None,
extra: Optional[Dict[str, Any]] = None
) -> None:
pass
@abstractmethod
async def info(
self,
message: str,
context: Optional[LogContext] = None,
extra: Optional[Dict[str, Any]] = None
) -> None:
pass
@abstractmethod
async def warning(
self,
message: str,
context: Optional[LogContext] = None,
extra: Optional[Dict[str, Any]] = None
) -> None:
pass
@abstractmethod
async def error(
self,
message: str,
context: Optional[LogContext] = None,
extra: Optional[Dict[str, Any]] = None
) -> None:
pass
@abstractmethod
async def critical(
self,
message: str,
context: Optional[LogContext] = None,
extra: Optional[Dict[str, Any]] = None
) -> None:
pass
class StructuredLogger(LoggingPort):
"""Structured logging implementation with correlation tracking."""
def __init__(self, logger_name: str):
self._logger_name = logger_name
self._correlation_context: Optional[LogContext] = None
async def set_correlation_context(self, context: LogContext) -> None:
"""Set correlation context for subsequent log messages."""
self._correlation_context = context
async def clear_correlation_context(self) -> None:
"""Clear correlation context."""
self._correlation_context = None
async def log(
self,
level: LogLevel,
message: str,
context: Optional[LogContext] = None,
extra: Optional[Dict[str, Any]] = None
) -> None:
# Merge contexts
effective_context = context or self._correlation_context
log_data = self._build_log_data(level, message, effective_context, extra)
# Here you would integrate with your actual logging framework
# For now, we'll use Python's standard logging
import logging
logger = logging.getLogger(self._logger_name)
if level == LogLevel.DEBUG:
logger.debug(message, extra=log_data)
elif level == LogLevel.INFO:
logger.info(message, extra=log_data)
elif level == LogLevel.WARNING:
logger.warning(message, extra=log_data)
elif level == LogLevel.ERROR:
logger.error(message, extra=log_data)
elif level == LogLevel.CRITICAL:
logger.critical(message, extra=log_data)
def _build_log_data(
self,
level: LogLevel,
message: str,
context: Optional[LogContext],
extra: Optional[Dict[str, Any]]
) -> Dict[str, Any]:
log_data = {
"logger": self._logger_name,
"level": level.value,
"timestamp": datetime.utcnow().isoformat(),
"message": message
}
if context:
log_data.update({
"correlation_id": str(context.correlation_id),
"user_id": context.user_id,
"operation": context.operation,
"context_metadata": context.metadata
})
if extra:
log_data.update(extra)
return log_data
Correlation ID Management
class CorrelationManager:
"""Manages correlation IDs across async contexts."""
def __init__(self):
import contextvars
self._correlation_id = contextvars.ContextVar('correlation_id', default=None)
self._user_id = contextvars.ContextVar('user_id', default=None)
self._operation = contextvars.ContextVar('operation', default=None)
def set_correlation_id(self, correlation_id: UUID) -> None:
self._correlation_id.set(correlation_id)
def set_user_id(self, user_id: str) -> None:
self._user_id.set(user_id)
def set_operation(self, operation: str) -> None:
self._operation.set(operation)
def get_correlation_context(self) -> Optional[LogContext]:
correlation_id = self._correlation_id.get()
if not correlation_id:
return None
return LogContext(
correlation_id=correlation_id,
user_id=self._user_id.get(),
operation=self._operation.get()
)
def clear(self) -> None:
self._correlation_id.set(None)
self._user_id.set(None)
self._operation.set(None)
Acceptance Criteria
Definition of Done
Task Description
Epic: Epic 1.4: Infrastructure Adapter Patterns (#72)
Acceptance Criteria: LoggingPort abstraction, Structured logging interface, Correlation ID propagation, Context-aware logging
Implementation Details
Files to Create/Modify
src/forging_blocks/infrastructure/ports/logging.py(NEW)src/forging_blocks/infrastructure/logging/__init__.py(NEW)tests/unit/infrastructure/logging/test_logging.py(NEW)LoggingPort Interface
Correlation ID Management
Acceptance Criteria
Definition of Done