diff --git a/.env.example b/.env.example index 375035ce33..6f1895a366 100644 --- a/.env.example +++ b/.env.example @@ -100,6 +100,10 @@ DB_PREPARE_THRESHOLD=5 # Recommended: true for PostgreSQL production deployments, auto-detected for SQLite USE_POSTGRESDB_PERCENTILES=true +# The number of rows fetched from the database at a time when streaming results, +# to limit memory usage and avoid loading all rows into RAM at once. +YIELD_BATCH_SIZE=1000 + # Cache Backend Configuration # Options: database (default), memory (in-process), redis (distributed) # - database: Uses SQLite/PostgreSQL for persistence (good for single-node) diff --git a/README.md b/README.md index 8eb44fb03d..40f255c65c 100644 --- a/README.md +++ b/README.md @@ -2042,6 +2042,8 @@ Automatic management of metrics data to prevent unbounded table growth and maint | `METRICS_ROLLUP_LATE_DATA_HOURS` | Hours to re-process for late-arriving data | `1` | 1-48 | | `METRICS_DELETE_RAW_AFTER_ROLLUP` | Delete raw metrics after rollup exists | `true` | bool | | `METRICS_DELETE_RAW_AFTER_ROLLUP_HOURS` | Hours to retain raw when rollup exists | `1` | 1-8760 | +| `USE_POSTGRESDB_PERCENTILES` | Use PostgreSQL-native percentile_cont for p50/p95/p99 | `true` | bool | +| `YIELD_BATCH_SIZE` | Rows per batch when streaming rollup queries | `1000` | 100-10000 | **Key Features:** - 📊 **Hourly rollup**: Pre-aggregated summaries with p50/p95/p99 percentiles diff --git a/charts/mcp-stack/values.yaml b/charts/mcp-stack/values.yaml index 8ab121c599..a71b74a0a4 100644 --- a/charts/mcp-stack/values.yaml +++ b/charts/mcp-stack/values.yaml @@ -361,6 +361,8 @@ mcpContextForge: METRICS_ROLLUP_LATE_DATA_HOURS: "1" # hours to re-process for late-arriving data (1-48) METRICS_DELETE_RAW_AFTER_ROLLUP: "true" # delete raw metrics after hourly rollup exists METRICS_DELETE_RAW_AFTER_ROLLUP_HOURS: "1" # hours to retain raw when rollup exists (1-8760) + USE_POSTGRESDB_PERCENTILES: "true" # use PostgreSQL-native percentile_cont for p50/p95/p99 + YIELD_BATCH_SIZE: "1000" # rows per batch when streaming rollup queries (100-10000) # ─ Transports ─ TRANSPORT_TYPE: all # comma-separated list: http, ws, sse, stdio, all diff --git a/docs/docs/manage/configuration.md b/docs/docs/manage/configuration.md index 3ec4069877..4900ce24d3 100644 --- a/docs/docs/manage/configuration.md +++ b/docs/docs/manage/configuration.md @@ -1123,6 +1123,17 @@ METRICS_DELETE_RAW_AFTER_ROLLUP=true # Delete raw after rollup (default: true) METRICS_DELETE_RAW_AFTER_ROLLUP_HOURS=1 # Hours before deletion (default: 1) ``` +**Performance Optimization (PostgreSQL):** + +```bash +USE_POSTGRESDB_PERCENTILES=true # Use PostgreSQL-native percentile_cont (default: true) +YIELD_BATCH_SIZE=1000 # Rows per batch for streaming queries (default: 1000) +``` + +When `USE_POSTGRESDB_PERCENTILES=true` (default), PostgreSQL uses native `percentile_cont()` for p50/p95/p99 calculations, which is 5-10x faster than Python-based percentile computation. For SQLite or when disabled, falls back to Python linear interpolation. + +`YIELD_BATCH_SIZE` controls memory usage by streaming query results in batches instead of loading all rows into RAM at once. + #### Configuration Examples **Default (recommended for most deployments):** diff --git a/mcpgateway/config.py b/mcpgateway/config.py index fd395f2a69..dac162844f 100644 --- a/mcpgateway/config.py +++ b/mcpgateway/config.py @@ -998,6 +998,15 @@ def _parse_allowed_origins(cls, v: Any) -> Set[str]: metrics_aggregation_backfill_hours: int = Field(default=6, ge=0, le=168, description="Hours of structured logs to backfill into performance metrics on startup") metrics_aggregation_window_minutes: int = Field(default=5, description="Time window for metrics aggregation (minutes)") metrics_aggregation_auto_start: bool = Field(default=False, description="Automatically run the log aggregation loop on application startup") + yield_batch_size: int = Field( + default=1000, + ge=100, + le=100000, + description="Number of rows fetched per batch when streaming hourly metric data from the database. " + "Used to limit memory usage during aggregation and percentile calculations. " + "Smaller values reduce memory footprint but increase DB round-trips; larger values improve throughput " + "at the cost of higher memory usage.", + ) # Execution Metrics Recording # Controls whether tool/resource/prompt/server/A2A execution metrics are written to the database. diff --git a/mcpgateway/services/metrics_rollup_service.py b/mcpgateway/services/metrics_rollup_service.py index 5f93a8f6e6..8afe3eb8af 100644 --- a/mcpgateway/services/metrics_rollup_service.py +++ b/mcpgateway/services/metrics_rollup_service.py @@ -27,6 +27,9 @@ # Third-Party from sqlalchemy import and_, case, delete, func, select +from sqlalchemy.dialects.postgresql import insert as pg_insert +from sqlalchemy.dialects.sqlite import insert as sqlite_insert +from sqlalchemy.exc import IntegrityError, SQLAlchemyError from sqlalchemy.orm import Session # First-Party @@ -247,7 +250,6 @@ async def _rollup_loop(self) -> None: # Calculate interval in seconds interval_seconds = self.rollup_interval_hours * 3600 - # On first run, do a backfill check first_run = True @@ -370,7 +372,6 @@ async def rollup_all( current_hour_start = now.replace(minute=0, second=0, microsecond=0) # Go back hours_back from the start of current hour start_hour = current_hour_start - timedelta(hours=hours_back) - results: Dict[str, RollupResult] = {} total_hours = 0 total_records = 0 @@ -492,7 +493,6 @@ def _rollup_table( hour_end, is_a2a, ) - # Upsert rollups for agg in aggregations: created, updated = self._upsert_rollup( @@ -568,126 +568,201 @@ def _aggregate_hour( Returns: List[HourlyAggregation]: Aggregated metrics for each entity + + Raises: + Exception: If aggregation fails due to a database query or processing error. """ - entity_id_attr = getattr(raw_model, entity_id_col) - - # Build group by columns - if is_a2a: - group_cols = [entity_id_attr, raw_model.interaction_type] - else: - group_cols = [entity_id_attr] - - # Time filter for this hour - time_filter = and_( - raw_model.timestamp >= hour_start, - raw_model.timestamp < hour_end, - ) + try: + entity_id_attr = getattr(raw_model, entity_id_col) + entity_name_attr = getattr(entity_model, entity_name_col) - # OPTIMIZED: Single bulk query for basic aggregations per entity - # pylint: disable=not-callable - agg_query = ( - select( - *group_cols, - func.count(raw_model.id).label("total_count"), - func.sum(case((raw_model.is_success.is_(True), 1), else_=0)).label("success_count"), - func.min(raw_model.response_time).label("min_rt"), - func.max(raw_model.response_time).label("max_rt"), - func.avg(raw_model.response_time).label("avg_rt"), + time_filter = and_( + raw_model.timestamp >= hour_start, + raw_model.timestamp < hour_end, ) - .where(time_filter) - .group_by(*group_cols) - ) - # Store aggregation results by entity key - agg_results = {} - for row in db.execute(agg_query).fetchall(): - entity_id = row[0] - interaction_type = row[1] if is_a2a else None - key = (entity_id, interaction_type) if is_a2a else entity_id - - agg_results[key] = { - "entity_id": entity_id, - "interaction_type": interaction_type, - "total_count": row.total_count or 0, - "success_count": row.success_count or 0, - "min_rt": row.min_rt, - "max_rt": row.max_rt, - "avg_rt": row.avg_rt, - } + aggregations: list = [] + if self._is_postgresql and settings.use_postgresdb_percentiles: + # ---- build SELECT and GROUP BY dynamically (CRITICAL FIX) ---- + select_cols = [ + entity_id_attr.label("entity_id"), + func.coalesce(entity_name_attr, "unknown").label("entity_name"), + ] + group_by_cols = [ + entity_id_attr, + entity_name_attr, + ] + + if is_a2a: + select_cols.append(raw_model.interaction_type.label("interaction_type")) + group_by_cols.append(raw_model.interaction_type) + + # pylint: disable=not-callable + agg_query = ( + select( + *select_cols, + func.count(raw_model.id).label("total_count"), + func.sum(case((raw_model.is_success.is_(True), 1), else_=0)).label("success_count"), + func.min(raw_model.response_time).label("min_rt"), + func.max(raw_model.response_time).label("max_rt"), + func.avg(raw_model.response_time).label("avg_rt"), + func.percentile_cont(0.50).within_group(raw_model.response_time).label("p50_rt"), + func.percentile_cont(0.95).within_group(raw_model.response_time).label("p95_rt"), + func.percentile_cont(0.99).within_group(raw_model.response_time).label("p99_rt"), + ) + .select_from(raw_model) + .join(entity_model, entity_model.id == entity_id_attr, isouter=True) + .where(time_filter) + .group_by(*group_by_cols) + ) + # pylint: enable=not-callable + for row in db.execute(agg_query).yield_per(settings.yield_batch_size): + aggregations.append( + HourlyAggregation( + entity_id=row.entity_id, + entity_name=row.entity_name, + hour_start=hour_start, + total_count=row.total_count, + success_count=row.success_count, + failure_count=row.total_count - row.success_count, + min_response_time=row.min_rt, + max_response_time=row.max_rt, + avg_response_time=row.avg_rt, + p50_response_time=row.p50_rt, + p95_response_time=row.p95_rt, + p99_response_time=row.p99_rt, + interaction_type=row.interaction_type if is_a2a else None, + ) + ) + else: + # Build group by columns + if is_a2a: + group_cols = [entity_id_attr, raw_model.interaction_type] + else: + group_cols = [entity_id_attr] - if not agg_results: - return [] - - # OPTIMIZED: Bulk load entity names in one query - entity_ids = list(set(r["entity_id"] for r in agg_results.values())) - entity_names = {} - if entity_ids: - entities = db.execute(select(entity_model.id, getattr(entity_model, entity_name_col)).where(entity_model.id.in_(entity_ids))).fetchall() - entity_names = {e[0]: e[1] for e in entities} - - # OPTIMIZED: Bulk load all response times for percentile calculation - # Load all response times for the hour in one query, grouped by entity - rt_query = ( - select( - *group_cols, - raw_model.response_time, - ) - .where(time_filter) - .order_by(*group_cols, raw_model.response_time) - ) + # Time filter for this hour + time_filter = and_( + raw_model.timestamp >= hour_start, + raw_model.timestamp < hour_end, + ) - # Group response times by entity - response_times_by_entity: Dict[Any, List[float]] = {} - for row in db.execute(rt_query).fetchall(): - entity_id = row[0] - interaction_type = row[1] if is_a2a else None - key = (entity_id, interaction_type) if is_a2a else entity_id - rt = row.response_time if not is_a2a else row[2] - - if key not in response_times_by_entity: - response_times_by_entity[key] = [] - if rt is not None: - response_times_by_entity[key].append(rt) - - # Build aggregation results with percentiles - aggregations = [] - for key, agg in agg_results.items(): - entity_id = agg["entity_id"] - interaction_type = agg["interaction_type"] - - # Get entity name - entity_name = entity_names.get(entity_id, "unknown") - - # Get response times for percentile calculation - response_times = response_times_by_entity.get(key, []) - - # Calculate percentiles (response_times are already sorted from ORDER BY) - if response_times: - p50_rt = self._percentile(response_times, 50) - p95_rt = self._percentile(response_times, 95) - p99_rt = self._percentile(response_times, 99) - else: - p50_rt = p95_rt = p99_rt = None - - aggregations.append( - HourlyAggregation( - entity_id=entity_id, - entity_name=entity_name, - hour_start=hour_start, - total_count=agg["total_count"], - success_count=agg["success_count"], - failure_count=agg["total_count"] - agg["success_count"], - min_response_time=agg["min_rt"], - max_response_time=agg["max_rt"], - avg_response_time=agg["avg_rt"], - p50_response_time=p50_rt, - p95_response_time=p95_rt, - p99_response_time=p99_rt, - interaction_type=interaction_type, + # OPTIMIZED: Single bulk query for basic aggregations per entity + # pylint: disable=not-callable + agg_query = ( + select( + *group_cols, + func.count(raw_model.id).label("total_count"), + func.sum(case((raw_model.is_success.is_(True), 1), else_=0)).label("success_count"), + func.min(raw_model.response_time).label("min_rt"), + func.max(raw_model.response_time).label("max_rt"), + func.avg(raw_model.response_time).label("avg_rt"), + ) + .where(time_filter) + .group_by(*group_cols) ) - ) - return aggregations + # Store aggregation results by entity key + agg_results = {} + for row in db.execute(agg_query).yield_per(settings.yield_batch_size): + entity_id = row[0] + interaction_type = row[1] if is_a2a else None + key = (entity_id, interaction_type) if is_a2a else entity_id + + agg_results[key] = { + "entity_id": entity_id, + "interaction_type": interaction_type, + "total_count": row.total_count or 0, + "success_count": row.success_count or 0, + "min_rt": row.min_rt, + "max_rt": row.max_rt, + "avg_rt": row.avg_rt, + } + + if not agg_results: + return [] + + # OPTIMIZED: Bulk load entity names in one query + entity_ids = list(set(r["entity_id"] for r in agg_results.values())) + entity_names = {} + if entity_ids: + entities = db.execute(select(entity_model.id, getattr(entity_model, entity_name_col)).where(entity_model.id.in_(entity_ids))) # .fetchall() + entity_names = {e[0]: e[1] for e in entities} + + # OPTIMIZED: Bulk load all response times for percentile calculation + # Load all response times for the hour in one query, grouped by entity + rt_query = ( + select( + *group_cols, + raw_model.response_time, + ) + .where(time_filter) + .order_by(*group_cols, raw_model.response_time) + ) + + # Group response times by entity + response_times_by_entity: Dict[Any, List[float]] = {} + for row in db.execute(rt_query).yield_per(settings.yield_batch_size): + entity_id = row[0] + interaction_type = row[1] if is_a2a else None + key = (entity_id, interaction_type) if is_a2a else entity_id + rt = row.response_time if not is_a2a else row[2] + + if key not in response_times_by_entity: + response_times_by_entity[key] = [] + if rt is not None: + response_times_by_entity[key].append(rt) + + # Build aggregation results with percentiles + aggregations = [] + for key, agg in agg_results.items(): + entity_id = agg["entity_id"] + interaction_type = agg["interaction_type"] + + # Get entity name + entity_name = entity_names.get(entity_id, "unknown") + + # Get response times for percentile calculation + response_times = response_times_by_entity.get(key, []) + + # Calculate percentiles (response_times are already sorted from ORDER BY) + if response_times: + p50_rt = self._percentile(response_times, 50) + p95_rt = self._percentile(response_times, 95) + p99_rt = self._percentile(response_times, 99) + else: + p50_rt = p95_rt = p99_rt = None + + aggregations.append( + HourlyAggregation( + entity_id=entity_id, + entity_name=entity_name, + hour_start=hour_start, + total_count=agg["total_count"], + success_count=agg["success_count"], + failure_count=agg["total_count"] - agg["success_count"], + min_response_time=agg["min_rt"], + max_response_time=agg["max_rt"], + avg_response_time=agg["avg_rt"], + p50_response_time=p50_rt, + p95_response_time=p95_rt, + p99_response_time=p99_rt, + interaction_type=interaction_type, + ) + ) + return aggregations + except Exception: + logger.exception( + "Failed to aggregate hourly metrics", + extra={ + "hour_start": hour_start, + "hour_end": hour_end, + "raw_model": raw_model.__name__, + "entity_model": entity_model.__name__, + "is_a2a": is_a2a, + }, + ) + raise def _percentile(self, sorted_data: List[float], percentile: int) -> float: """Calculate percentile from sorted data. @@ -708,7 +783,6 @@ def _percentile(self, sorted_data: List[float], percentile: int) -> float: if f == c: return sorted_data[f] - return sorted_data[f] + (k - f) * (sorted_data[c] - sorted_data[f]) def _upsert_rollup( @@ -719,70 +793,159 @@ def _upsert_rollup( agg: HourlyAggregation, is_a2a: bool, ) -> Tuple[int, int]: - """Insert or update a rollup record. + """ + Insert or update a single hourly rollup record using a DB-aware UPSERT. + This function is concurrency-safe for PostgreSQL and SQLite. + Falls back to Python SELECT+UPDATE/INSERT for unsupported DBs + + This function is concurrency-safe and enforces uniqueness at the database level. Args: - db: Database session - hourly_model: SQLAlchemy model for hourly rollups - entity_id_col: Name of the entity ID column - agg: Aggregation to upsert - is_a2a: Whether this is A2A agent metrics + db (Session): Active SQLAlchemy database session. + hourly_model (Type): ORM model representing the hourly rollup table. + entity_id_col (str): Name of the entity ID column (e.g. "tool_id", "agent_id"). + agg (HourlyAggregation): Aggregated hourly metrics for a single entity. + is_a2a (bool): Whether interaction_type should be included in the uniqueness key. Returns: - Tuple[int, int]: (created count, updated count) + Tuple[int, int]: Best-effort (inserted_count, updated_count) values for logging only. + + Raises: + SQLAlchemyError: If the database UPSERT operation fails. """ - # Build the name column based on entity type - if entity_id_col == "tool_id": - name_col = "tool_name" - elif entity_id_col == "resource_id": - name_col = "resource_name" - elif entity_id_col == "prompt_id": - name_col = "prompt_name" - elif entity_id_col == "server_id": - name_col = "server_name" - else: - name_col = "agent_name" - - values = { - entity_id_col: agg.entity_id, - name_col: agg.entity_name, - "hour_start": agg.hour_start, - "total_count": agg.total_count, - "success_count": agg.success_count, - "failure_count": agg.failure_count, - "min_response_time": agg.min_response_time, - "max_response_time": agg.max_response_time, - "avg_response_time": agg.avg_response_time, - "p50_response_time": agg.p50_response_time, - "p95_response_time": agg.p95_response_time, - "p99_response_time": agg.p99_response_time, - } + try: + # Resolve name column + name_col_map = { + "tool_id": "tool_name", + "resource_id": "resource_name", + "prompt_id": "prompt_name", + "server_id": "server_name", + } + name_col = name_col_map.get(entity_id_col, "agent_name") + + # Normalizing + hour_start = agg.hour_start.replace(minute=0, second=0, microsecond=0) + + values = { + entity_id_col: agg.entity_id, + name_col: agg.entity_name, + "hour_start": hour_start, + "total_count": agg.total_count, + "success_count": agg.success_count, + "failure_count": agg.failure_count, + "min_response_time": agg.min_response_time, + "max_response_time": agg.max_response_time, + "avg_response_time": agg.avg_response_time, + "p50_response_time": agg.p50_response_time, + "p95_response_time": agg.p95_response_time, + "p99_response_time": agg.p99_response_time, + } + + if is_a2a: + values["interaction_type"] = agg.interaction_type + + dialect = db.bind.dialect.name if db.bind else "unknown" + conflict_cols = [ + getattr(hourly_model, entity_id_col), + hourly_model.hour_start, + ] + + if is_a2a: + conflict_cols.append(hourly_model.interaction_type) + + logger.debug( + "Upserting hourly rollup", + extra={ + "dialect": dialect, + "entity_id_col": entity_id_col, + "entity_id": agg.entity_id, + "hour_start": hour_start.isoformat(), + "is_a2a": is_a2a, + }, + ) + + if dialect == "postgresql": + # ======================= + # PostgreSQL + # ======================= + stmt = pg_insert(hourly_model).values(**values) + update_cols = {k: stmt.excluded[k] for k in values if k not in (entity_id_col, "hour_start", "interaction_type")} + stmt = stmt.on_conflict_do_update( + index_elements=conflict_cols, + set_=update_cols, + ) + + db.execute(stmt) + return (0, 1) + + if "sqlite" in dialect: + # ======================= + # SQLite + # ======================= + stmt = sqlite_insert(hourly_model).values(**values) - if is_a2a: - values["interaction_type"] = agg.interaction_type - - # Check if rollup exists - entity_id_attr = getattr(hourly_model, entity_id_col) - filters = [ - entity_id_attr == agg.entity_id, - hourly_model.hour_start == agg.hour_start, - ] - if is_a2a: - filters.append(hourly_model.interaction_type == agg.interaction_type) - - existing = db.execute(select(hourly_model).where(and_(*filters))).scalar_one_or_none() - - if existing: - # Update existing - for key, value in values.items(): - if key not in (entity_id_col, "hour_start", "interaction_type"): - setattr(existing, key, value) - return (0, 1) - - # Insert new - rollup = hourly_model(**values) - db.add(rollup) - return (1, 0) + update_cols = {k: stmt.excluded[k] for k in values if k not in (entity_id_col, "hour_start", "interaction_type")} + + stmt = stmt.on_conflict_do_update( + index_elements=conflict_cols, + set_=update_cols, + ) + + db.execute(stmt) + return (0, 1) + + logger.warning( + "Dialect does not support native UPSERT. Using Python fallback with conflict handling.", + extra={"dialect": dialect}, + ) + # Use savepoint to avoid rolling back the entire transaction on conflict + savepoint = db.begin_nested() + try: + db.add(hourly_model(**values)) + db.flush() # Force INSERT now + savepoint.commit() + return (1, 0) + except IntegrityError: + savepoint.rollback() # Only roll back the savepoint, not the whole transaction + logger.info( + "Insert conflict detected in fallback path. Retrying as update.", + extra={ + "entity_id_col": entity_id_col, + "entity_id": agg.entity_id, + "hour_start": hour_start.isoformat(), + "is_a2a": is_a2a, + }, + ) + + entity_id_attr = getattr(hourly_model, entity_id_col) + + filters = [ + entity_id_attr == agg.entity_id, + hourly_model.hour_start == hour_start, + ] + + if is_a2a: + filters.append(hourly_model.interaction_type == agg.interaction_type) + + existing = db.execute(select(hourly_model).where(and_(*filters))).scalar_one() + + for key, value in values.items(): + if key not in (entity_id_col, "hour_start", "interaction_type"): + setattr(existing, key, value) + + return (0, 1) + + except SQLAlchemyError: + logger.exception( + "Failed to upsert hourly rollup", + extra={ + "entity_id_col": entity_id_col, + "entity_id": agg.entity_id, + "hour_start": hour_start.isoformat(), + "is_a2a": is_a2a, + }, + ) + raise def _delete_raw_metrics( self, diff --git a/tests/unit/mcpgateway/services/test_metrics_rollup_service.py b/tests/unit/mcpgateway/services/test_metrics_rollup_service.py index 3c883f7040..1471399444 100644 --- a/tests/unit/mcpgateway/services/test_metrics_rollup_service.py +++ b/tests/unit/mcpgateway/services/test_metrics_rollup_service.py @@ -7,8 +7,6 @@ # Standard from datetime import datetime, timedelta, timezone -from unittest.mock import MagicMock, patch -import uuid # Third-Party import pytest