diff --git a/mcpgateway/routers/log_search.py b/mcpgateway/routers/log_search.py index 98cd75bf4..40524868d 100644 --- a/mcpgateway/routers/log_search.py +++ b/mcpgateway/routers/log_search.py @@ -157,14 +157,38 @@ def _aggregate_custom_windows( reference_end = datetime.now(timezone.utc) + # Collect all window starts for the full range, then perform a single batched aggregation + window_starts: List[datetime] = [] while current_start < reference_end: - current_end = current_start + window_delta - aggregator.aggregate_all_components( - window_start=current_start, - window_end=current_end, - db=db, + window_starts.append(current_start) + current_start = current_start + window_delta + + # Limit to prevent memory issues; keep most recent windows (trim oldest) + max_windows = 10000 + if len(window_starts) > max_windows: + logger.warning( + "Window list truncated from %d to %d windows; keeping most recent", + len(window_starts), + max_windows, ) - current_start = current_end + window_starts = window_starts[-max_windows:] + + # Delegate to aggregator batch method to avoid per-window recomputation + # Note: window_starts must be contiguous and aligned; sparse lists will generate extra windows + if window_starts: + batch_succeeded = False + if hasattr(aggregator, "aggregate_all_components_batch"): + try: + aggregator.aggregate_all_components_batch(window_starts=window_starts, window_minutes=window_minutes, db=db) + batch_succeeded = True + except Exception: + logger.exception("Batch aggregation failed; falling back to per-window aggregation") + # Rollback failed transaction before attempting fallback (required for PostgreSQL) + db.rollback() + if not batch_succeeded: + # Backwards-compatible fallback: iterate windows (less efficient) + for ws in window_starts: + aggregator.aggregate_all_components(window_start=ws, window_end=ws + window_delta, db=db) # Request/Response Models diff --git a/mcpgateway/services/log_aggregator.py b/mcpgateway/services/log_aggregator.py index e852fcee4..56c16aa46 100644 --- a/mcpgateway/services/log_aggregator.py +++ b/mcpgateway/services/log_aggregator.py @@ -129,6 +129,255 @@ def aggregate_performance_metrics( if should_close: db.close() + def aggregate_all_components_batch(self, window_starts: List[datetime], window_minutes: int, db: Optional[Session] = None) -> List[PerformanceMetric]: + """Aggregate metrics for all components/operations for multiple windows in a single batch. + + This reduces the number of database round-trips by fetching logs for the full + time span once per component/operation and partitioning them into windows in + Python, then upserting per-window metrics. + + Args: + window_starts: List of window start datetimes (UTC) + window_minutes: Window size in minutes + db: Optional database session + + Returns: + List of created/updated PerformanceMetric records + + Raises: + Exception: If a database operation fails during aggregation. + """ + if not self.enabled: + return [] + + if not window_starts: + return [] + + should_close = False + if db is None: + db = SessionLocal() + should_close = True + + try: + window_delta = timedelta(minutes=window_minutes) + # Determine full range to query once + full_start = min(window_starts) + full_end = max(window_starts) + window_delta + + # Validate window_starts is contiguous - warn if sparse (batch generates all windows in range) + expected_window_count = int((full_end - full_start).total_seconds() / (window_minutes * 60)) + if len(window_starts) != expected_window_count: + logger.warning( + "Batch aggregation received %d windows but range spans %d; sparse windows will generate extra metrics", + len(window_starts), + expected_window_count, + ) + + # If PostgreSQL is available, use a single SQL rollup with generate_series and ordered-set aggregates + if _is_postgresql(): + sql = text( + """ + WITH windows AS ( + SELECT generate_series(:full_start::timestamptz, (:full_end - (:window_minutes || ' minutes')::interval)::timestamptz, (:window_minutes || ' minutes')::interval) AS window_start + ), pairs AS ( + SELECT DISTINCT component, operation_type FROM structured_log_entries + WHERE timestamp >= :full_start AND timestamp < :full_end + AND duration_ms IS NOT NULL + AND component IS NOT NULL AND component <> '' + AND operation_type IS NOT NULL AND operation_type <> '' + ) + SELECT + w.window_start, + p.component, + p.operation_type, + COUNT(sle.duration_ms) AS cnt, + AVG(sle.duration_ms) AS avg_duration, + MIN(sle.duration_ms) AS min_duration, + MAX(sle.duration_ms) AS max_duration, + percentile_cont(0.50) WITHIN GROUP (ORDER BY sle.duration_ms) AS p50, + percentile_cont(0.95) WITHIN GROUP (ORDER BY sle.duration_ms) AS p95, + percentile_cont(0.99) WITHIN GROUP (ORDER BY sle.duration_ms) AS p99, + SUM(CASE WHEN upper(sle.level) IN ('ERROR','CRITICAL') OR sle.error_details IS NOT NULL THEN 1 ELSE 0 END) AS error_count + FROM windows w + CROSS JOIN pairs p + JOIN structured_log_entries sle + ON sle.timestamp >= w.window_start AND sle.timestamp < w.window_start + (:window_minutes || ' minutes')::interval + AND sle.component = p.component AND sle.operation_type = p.operation_type + AND sle.duration_ms IS NOT NULL + GROUP BY w.window_start, p.component, p.operation_type + HAVING COUNT(sle.duration_ms) > 0 + ORDER BY w.window_start, p.component, p.operation_type + """ + ) + + rows = db.execute( + sql, + { + "full_start": full_start, + "full_end": full_end, + "window_minutes": str(window_minutes), + }, + ).fetchall() + + created_metrics: List[PerformanceMetric] = [] + for row in rows: + ws = row.window_start if row.window_start.tzinfo else row.window_start.replace(tzinfo=timezone.utc) + component = row.component + operation = row.operation_type + count = int(row.cnt) + avg_duration = float(row.avg_duration) if row.avg_duration is not None else 0.0 + min_duration = float(row.min_duration) if row.min_duration is not None else 0.0 + max_duration = float(row.max_duration) if row.max_duration is not None else 0.0 + p50 = float(row.p50) if row.p50 is not None else 0.0 + p95 = float(row.p95) if row.p95 is not None else 0.0 + p99 = float(row.p99) if row.p99 is not None else 0.0 + error_count = int(row.error_count) if row.error_count is not None else 0 + + metric = self._upsert_metric( + component=component, + operation_type=operation, + window_start=ws, + window_end=ws + window_delta, + request_count=count, + error_count=error_count, + error_rate=(error_count / count) if count else 0.0, + avg_duration_ms=avg_duration, + min_duration_ms=min_duration, + max_duration_ms=max_duration, + p50_duration_ms=p50, + p95_duration_ms=p95, + p99_duration_ms=p99, + metric_metadata={ + "sample_size": count, + "generated_at": datetime.now(timezone.utc).isoformat(), + }, + db=db, + ) + if metric: + created_metrics.append(metric) + + if should_close: + db.commit() + return created_metrics + + # Fallback: in-Python bucketing (previous implementation) + # Warning: This path loads all entries into memory; for very large ranges this may spike memory usage + range_hours = (full_end - full_start).total_seconds() / 3600 + if range_hours > 168: # > 1 week + logger.warning("Large aggregation range (%.1f hours) may cause high memory usage in non-PostgreSQL fallback", range_hours) + + # Get unique component/operation pairs for the full range + pair_stmt = ( + select(StructuredLogEntry.component, StructuredLogEntry.operation_type) + .where( + and_( + StructuredLogEntry.timestamp >= full_start, + StructuredLogEntry.timestamp < full_end, + StructuredLogEntry.duration_ms.isnot(None), + StructuredLogEntry.component.isnot(None), + StructuredLogEntry.component != "", + StructuredLogEntry.operation_type.isnot(None), + StructuredLogEntry.operation_type != "", + ) + ) + .distinct() + ) + + pairs = db.execute(pair_stmt).all() + + created_metrics: List[PerformanceMetric] = [] + + # helper to align timestamp to window start + def _align_to_window_local(dt: datetime, minutes: int) -> datetime: + ts = dt.astimezone(timezone.utc) + total_minutes = int(ts.timestamp() // 60) + aligned_minutes = (total_minutes // minutes) * minutes + return datetime.fromtimestamp(aligned_minutes * 60, tz=timezone.utc) + + for component, operation in pairs: + if not component or not operation: + continue + + # Fetch all relevant log entries for this component/operation in the full range + entries_stmt = select(StructuredLogEntry).where( + and_( + StructuredLogEntry.component == component, + StructuredLogEntry.operation_type == operation, + StructuredLogEntry.timestamp >= full_start, + StructuredLogEntry.timestamp < full_end, + StructuredLogEntry.duration_ms.isnot(None), + ) + ) + + entries = db.execute(entries_stmt).scalars().all() + if not entries: + continue + + # Bucket entries into windows + buckets: Dict[datetime, List[StructuredLogEntry]] = {} + for e in entries: + ts = e.timestamp if e.timestamp.tzinfo else e.timestamp.replace(tzinfo=timezone.utc) + bucket_start = _align_to_window_local(ts, window_minutes) + if bucket_start not in buckets: + buckets[bucket_start] = [] + buckets[bucket_start].append(e) + + # For each requested window, compute stats if we have data + for window_start in window_starts: + bucket_entries = buckets.get(window_start) + if not bucket_entries: + continue + + durations = sorted([b.duration_ms for b in bucket_entries if b.duration_ms is not None]) + if not durations: + continue + + count = len(durations) + avg_duration = float(sum(durations) / count) if count else 0.0 + min_duration = float(durations[0]) + max_duration = float(durations[-1]) + p50 = float(self._percentile(durations, 0.5)) + p95 = float(self._percentile(durations, 0.95)) + p99 = float(self._percentile(durations, 0.99)) + error_count = self._calculate_error_count(bucket_entries) + + try: + metric = self._upsert_metric( + component=component, + operation_type=operation, + window_start=window_start, + window_end=window_start + window_delta, + request_count=count, + error_count=error_count, + error_rate=(error_count / count) if count else 0.0, + avg_duration_ms=avg_duration, + min_duration_ms=min_duration, + max_duration_ms=max_duration, + p50_duration_ms=p50, + p95_duration_ms=p95, + p99_duration_ms=p99, + metric_metadata={ + "sample_size": count, + "generated_at": datetime.now(timezone.utc).isoformat(), + }, + db=db, + ) + if metric: + created_metrics.append(metric) + except Exception: + logger.exception("Failed to upsert metric for %s.%s window %s", component, operation, window_start) + + if should_close: + db.commit() + return created_metrics + except Exception: + if should_close: + db.rollback() + raise + finally: + if should_close: + db.close() + def aggregate_all_components(self, window_start: Optional[datetime] = None, window_end: Optional[datetime] = None, db: Optional[Session] = None) -> List[PerformanceMetric]: """Aggregate metrics for all components and operations. diff --git a/tests/unit/mcpgateway/services/test_log_aggregator.py b/tests/unit/mcpgateway/services/test_log_aggregator.py index 068adf898..6d850a707 100644 --- a/tests/unit/mcpgateway/services/test_log_aggregator.py +++ b/tests/unit/mcpgateway/services/test_log_aggregator.py @@ -311,3 +311,306 @@ def test_error_count_with_error_details(self): result = LogAggregator._calculate_error_count(entries) assert result == 1 + + +class TestAggregateAllComponentsBatch: + """Tests for aggregate_all_components_batch method.""" + + def test_batch_returns_empty_when_disabled(self): + """Test batch aggregation returns empty list when disabled.""" + with patch("mcpgateway.services.log_aggregator._is_postgresql", return_value=False): + aggregator = LogAggregator() + aggregator.enabled = False + + window_starts = [datetime.now(timezone.utc) - timedelta(hours=1)] + result = aggregator.aggregate_all_components_batch(window_starts=window_starts, window_minutes=5) + assert result == [] + + def test_batch_returns_empty_when_no_windows(self): + """Test batch aggregation returns empty list when no windows provided.""" + with patch("mcpgateway.services.log_aggregator._is_postgresql", return_value=False): + aggregator = LogAggregator() + + result = aggregator.aggregate_all_components_batch(window_starts=[], window_minutes=5) + assert result == [] + + def test_batch_postgresql_path_called(self): + """Test batch aggregation uses PostgreSQL path when available.""" + with patch("mcpgateway.services.log_aggregator._is_postgresql", return_value=True): + aggregator = LogAggregator() + + with patch("mcpgateway.services.log_aggregator.SessionLocal") as mock_session: + mock_db = MagicMock() + mock_session.return_value = mock_db + # Mock empty result set + mock_db.execute.return_value.fetchall.return_value = [] + + window_starts = [datetime.now(timezone.utc) - timedelta(hours=1)] + result = aggregator.aggregate_all_components_batch(window_starts=window_starts, window_minutes=5) + + assert result == [] + # Verify SQL was executed (PostgreSQL path) + mock_db.execute.assert_called() + + def test_batch_postgresql_with_data(self): + """Test PostgreSQL batch aggregation with mocked SQL results.""" + with patch("mcpgateway.services.log_aggregator._is_postgresql", return_value=True): + aggregator = LogAggregator() + + with patch("mcpgateway.services.log_aggregator.SessionLocal") as mock_session: + mock_db = MagicMock() + mock_session.return_value = mock_db + + # Create mock row result + window_start = datetime.now(timezone.utc) - timedelta(hours=1) + mock_row = MagicMock() + mock_row.window_start = window_start + mock_row.component = "test_component" + mock_row.operation_type = "test_op" + mock_row.cnt = 50 + mock_row.avg_duration = 25.5 + mock_row.min_duration = 1.0 + mock_row.max_duration = 100.0 + mock_row.p50 = 25.0 + mock_row.p95 = 90.0 + mock_row.p99 = 98.0 + mock_row.error_count = 2 + + mock_db.execute.return_value.fetchall.return_value = [mock_row] + + # Mock _upsert_metric to return a mock metric + mock_metric = MagicMock() + with patch.object(aggregator, "_upsert_metric", return_value=mock_metric): + result = aggregator.aggregate_all_components_batch( + window_starts=[window_start], + window_minutes=5, + db=mock_db, + ) + + assert len(result) == 1 + assert result[0] == mock_metric + + def test_batch_python_fallback_path(self): + """Test batch aggregation uses Python fallback for non-PostgreSQL.""" + with patch("mcpgateway.services.log_aggregator._is_postgresql", return_value=False): + aggregator = LogAggregator() + + with patch("mcpgateway.services.log_aggregator.SessionLocal") as mock_session: + mock_db = MagicMock() + mock_session.return_value = mock_db + + # Mock empty pairs result (no component/operation combinations) + mock_db.execute.return_value.all.return_value = [] + + window_starts = [datetime.now(timezone.utc) - timedelta(hours=1)] + result = aggregator.aggregate_all_components_batch(window_starts=window_starts, window_minutes=5) + + assert result == [] + + def test_batch_python_fallback_with_data(self): + """Test Python fallback batch aggregation with mocked entries.""" + with patch("mcpgateway.services.log_aggregator._is_postgresql", return_value=False): + aggregator = LogAggregator() + + mock_db = MagicMock() + + # Mock pairs query result + mock_db.execute.return_value.all.return_value = [("test_component", "test_op")] + + # Create mock log entries with timestamps in the window + window_start = datetime(2026, 1, 1, 12, 0, 0, tzinfo=timezone.utc) + mock_entries = [] + for i in range(10): + entry = MagicMock() + entry.duration_ms = float(i + 1) # 1 to 10 + entry.level = "INFO" + entry.error_details = None + entry.timestamp = window_start + timedelta(minutes=i % 5) # Within window + mock_entries.append(entry) + + # Add one error entry + mock_entries[5].level = "ERROR" + + # Mock scalars().all() for entries query + mock_scalars = MagicMock() + mock_scalars.all.return_value = mock_entries + mock_db.execute.return_value.scalars.return_value = mock_scalars + + # Mock _upsert_metric + mock_metric = MagicMock() + upsert_called = False + + def track_upsert(**kwargs): + nonlocal upsert_called + upsert_called = True + return mock_metric + + with patch.object(aggregator, "_upsert_metric", side_effect=track_upsert): + result = aggregator.aggregate_all_components_batch( + window_starts=[window_start], + window_minutes=5, + db=mock_db, + ) + + # Verify upsert was called and metric was returned + assert upsert_called, "Expected _upsert_metric to be called" + assert len(result) == 1 + assert result[0] == mock_metric + + def test_batch_error_count_consistency_with_duration_filter(self): + """Test that error_count only includes entries with duration_ms (consistency with per-window path).""" + with patch("mcpgateway.services.log_aggregator._is_postgresql", return_value=False): + aggregator = LogAggregator() + + mock_db = MagicMock() + + # Mock pairs + mock_db.execute.return_value.all.return_value = [("comp", "op")] + + window_start = datetime(2026, 1, 1, 12, 0, 0, tzinfo=timezone.utc) + + # Create entries - some with duration_ms, some without + mock_entries = [] + + # Entry with duration_ms and ERROR level - should be counted + entry1 = MagicMock() + entry1.duration_ms = 10.0 + entry1.level = "ERROR" + entry1.error_details = None + entry1.timestamp = window_start + timedelta(minutes=1) + mock_entries.append(entry1) + + # Entry with duration_ms and INFO level - should NOT be counted as error + entry2 = MagicMock() + entry2.duration_ms = 20.0 + entry2.level = "INFO" + entry2.error_details = None + entry2.timestamp = window_start + timedelta(minutes=2) + mock_entries.append(entry2) + + # Note: The query already filters for duration_ms IS NOT NULL, + # so entries without duration_ms won't be in the result set + + mock_scalars = MagicMock() + mock_scalars.all.return_value = mock_entries + mock_db.execute.return_value.scalars.return_value = mock_scalars + + # Capture the upsert call to verify error_count + upsert_calls = [] + + def capture_upsert(**kwargs): + upsert_calls.append(kwargs) + return MagicMock() + + with patch.object(aggregator, "_upsert_metric", side_effect=capture_upsert): + aggregator.aggregate_all_components_batch( + window_starts=[window_start], + window_minutes=5, + db=mock_db, + ) + + # Verify upsert was called with correct error_count + assert len(upsert_calls) == 1, f"Expected 1 upsert call, got {len(upsert_calls)}" + assert upsert_calls[0]["error_count"] == 1, "error_count should only count ERROR entry" + assert upsert_calls[0]["request_count"] == 2, "request_count should include both entries" + + def test_batch_large_range_warning_logged(self): + """Test that large aggregation ranges log a warning.""" + with patch("mcpgateway.services.log_aggregator._is_postgresql", return_value=False): + aggregator = LogAggregator() + + with patch("mcpgateway.services.log_aggregator.SessionLocal") as mock_session: + mock_db = MagicMock() + mock_session.return_value = mock_db + mock_db.execute.return_value.all.return_value = [] + + # Create window starts spanning more than 168 hours (1 week) + now = datetime.now(timezone.utc) + window_starts = [ + now - timedelta(hours=200), # Start 200 hours ago + now, # End now + ] + + with patch("mcpgateway.services.log_aggregator.logger") as mock_logger: + aggregator.aggregate_all_components_batch(window_starts=window_starts, window_minutes=5) + + # Verify warning was logged for large range + mock_logger.warning.assert_called() + warning_call = mock_logger.warning.call_args + assert "Large aggregation range" in warning_call[0][0] + + +class TestAggregateCustomWindowsFallback: + """Tests for fallback behavior in _aggregate_custom_windows.""" + + def test_fallback_on_batch_exception(self): + """Test that per-window fallback is used when batch aggregation fails.""" + from mcpgateway.routers.log_search import _aggregate_custom_windows + + mock_aggregator = MagicMock() + mock_aggregator.aggregate_all_components_batch.side_effect = Exception("Batch failed") + mock_db = MagicMock() + + # Mock the prerequisite queries + mock_db.execute.return_value.first.return_value = None + mock_db.execute.return_value.scalar.return_value = datetime.now(timezone.utc) - timedelta(hours=1) + + with patch("mcpgateway.routers.log_search.logger"): + _aggregate_custom_windows( + aggregator=mock_aggregator, + window_minutes=5, + db=mock_db, + ) + + # Verify batch was attempted + mock_aggregator.aggregate_all_components_batch.assert_called_once() + + # Verify rollback was called + mock_db.rollback.assert_called_once() + + # Verify fallback to per-window aggregation was used + assert mock_aggregator.aggregate_all_components.call_count > 0 + + def test_no_fallback_when_batch_succeeds(self): + """Test that per-window fallback is not used when batch succeeds.""" + from mcpgateway.routers.log_search import _aggregate_custom_windows + + mock_aggregator = MagicMock() + mock_aggregator.aggregate_all_components_batch.return_value = [MagicMock()] + mock_db = MagicMock() + + # Mock the prerequisite queries + mock_db.execute.return_value.first.return_value = None + mock_db.execute.return_value.scalar.return_value = datetime.now(timezone.utc) - timedelta(hours=1) + + _aggregate_custom_windows( + aggregator=mock_aggregator, + window_minutes=5, + db=mock_db, + ) + + # Verify batch was called + mock_aggregator.aggregate_all_components_batch.assert_called_once() + + # Verify per-window fallback was NOT called + mock_aggregator.aggregate_all_components.assert_not_called() + + def test_fallback_when_batch_method_missing(self): + """Test fallback to per-window when aggregator lacks batch method.""" + from mcpgateway.routers.log_search import _aggregate_custom_windows + + mock_aggregator = MagicMock(spec=["aggregate_all_components"]) # No batch method + mock_db = MagicMock() + + # Mock the prerequisite queries + mock_db.execute.return_value.first.return_value = None + mock_db.execute.return_value.scalar.return_value = datetime.now(timezone.utc) - timedelta(hours=1) + + _aggregate_custom_windows( + aggregator=mock_aggregator, + window_minutes=5, + db=mock_db, + ) + + # Verify per-window aggregation was used as fallback + assert mock_aggregator.aggregate_all_components.call_count > 0