Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 30 additions & 6 deletions mcpgateway/routers/log_search.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,14 +157,38 @@ def _aggregate_custom_windows(

reference_end = datetime.now(timezone.utc)

# Collect all window starts for the full range, then perform a single batched aggregation
window_starts: List[datetime] = []
while current_start < reference_end:
current_end = current_start + window_delta
aggregator.aggregate_all_components(
window_start=current_start,
window_end=current_end,
db=db,
window_starts.append(current_start)
current_start = current_start + window_delta

# Limit to prevent memory issues; keep most recent windows (trim oldest)
max_windows = 10000
if len(window_starts) > max_windows:
logger.warning(
"Window list truncated from %d to %d windows; keeping most recent",
len(window_starts),
max_windows,
)
current_start = current_end
window_starts = window_starts[-max_windows:]

# Delegate to aggregator batch method to avoid per-window recomputation
# Note: window_starts must be contiguous and aligned; sparse lists will generate extra windows
if window_starts:
batch_succeeded = False
if hasattr(aggregator, "aggregate_all_components_batch"):
try:
aggregator.aggregate_all_components_batch(window_starts=window_starts, window_minutes=window_minutes, db=db)
batch_succeeded = True
except Exception:
logger.exception("Batch aggregation failed; falling back to per-window aggregation")
# Rollback failed transaction before attempting fallback (required for PostgreSQL)
db.rollback()
if not batch_succeeded:
# Backwards-compatible fallback: iterate windows (less efficient)
for ws in window_starts:
aggregator.aggregate_all_components(window_start=ws, window_end=ws + window_delta, db=db)


# Request/Response Models
Expand Down
249 changes: 249 additions & 0 deletions mcpgateway/services/log_aggregator.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,255 @@ def aggregate_performance_metrics(
if should_close:
db.close()

def aggregate_all_components_batch(self, window_starts: List[datetime], window_minutes: int, db: Optional[Session] = None) -> List[PerformanceMetric]:
"""Aggregate metrics for all components/operations for multiple windows in a single batch.

This reduces the number of database round-trips by fetching logs for the full
time span once per component/operation and partitioning them into windows in
Python, then upserting per-window metrics.

Args:
window_starts: List of window start datetimes (UTC)
window_minutes: Window size in minutes
db: Optional database session

Returns:
List of created/updated PerformanceMetric records

Raises:
Exception: If a database operation fails during aggregation.
"""
if not self.enabled:
return []

if not window_starts:
return []

should_close = False
if db is None:
db = SessionLocal()
should_close = True

try:
window_delta = timedelta(minutes=window_minutes)
# Determine full range to query once
full_start = min(window_starts)
full_end = max(window_starts) + window_delta

# Validate window_starts is contiguous - warn if sparse (batch generates all windows in range)
expected_window_count = int((full_end - full_start).total_seconds() / (window_minutes * 60))
if len(window_starts) != expected_window_count:
logger.warning(
"Batch aggregation received %d windows but range spans %d; sparse windows will generate extra metrics",
len(window_starts),
expected_window_count,
)

# If PostgreSQL is available, use a single SQL rollup with generate_series and ordered-set aggregates
if _is_postgresql():
sql = text(
"""
WITH windows AS (
SELECT generate_series(:full_start::timestamptz, (:full_end - (:window_minutes || ' minutes')::interval)::timestamptz, (:window_minutes || ' minutes')::interval) AS window_start
), pairs AS (
SELECT DISTINCT component, operation_type FROM structured_log_entries
WHERE timestamp >= :full_start AND timestamp < :full_end
AND duration_ms IS NOT NULL
AND component IS NOT NULL AND component <> ''
AND operation_type IS NOT NULL AND operation_type <> ''
)
SELECT
w.window_start,
p.component,
p.operation_type,
COUNT(sle.duration_ms) AS cnt,
AVG(sle.duration_ms) AS avg_duration,
MIN(sle.duration_ms) AS min_duration,
MAX(sle.duration_ms) AS max_duration,
percentile_cont(0.50) WITHIN GROUP (ORDER BY sle.duration_ms) AS p50,
percentile_cont(0.95) WITHIN GROUP (ORDER BY sle.duration_ms) AS p95,
percentile_cont(0.99) WITHIN GROUP (ORDER BY sle.duration_ms) AS p99,
SUM(CASE WHEN upper(sle.level) IN ('ERROR','CRITICAL') OR sle.error_details IS NOT NULL THEN 1 ELSE 0 END) AS error_count
FROM windows w
CROSS JOIN pairs p
JOIN structured_log_entries sle
ON sle.timestamp >= w.window_start AND sle.timestamp < w.window_start + (:window_minutes || ' minutes')::interval
AND sle.component = p.component AND sle.operation_type = p.operation_type
AND sle.duration_ms IS NOT NULL
GROUP BY w.window_start, p.component, p.operation_type
HAVING COUNT(sle.duration_ms) > 0
ORDER BY w.window_start, p.component, p.operation_type
"""
)

rows = db.execute(
sql,
{
"full_start": full_start,
"full_end": full_end,
"window_minutes": str(window_minutes),
},
).fetchall()

created_metrics: List[PerformanceMetric] = []
for row in rows:
ws = row.window_start if row.window_start.tzinfo else row.window_start.replace(tzinfo=timezone.utc)
component = row.component
operation = row.operation_type
count = int(row.cnt)
avg_duration = float(row.avg_duration) if row.avg_duration is not None else 0.0
min_duration = float(row.min_duration) if row.min_duration is not None else 0.0
max_duration = float(row.max_duration) if row.max_duration is not None else 0.0
p50 = float(row.p50) if row.p50 is not None else 0.0
p95 = float(row.p95) if row.p95 is not None else 0.0
p99 = float(row.p99) if row.p99 is not None else 0.0
error_count = int(row.error_count) if row.error_count is not None else 0

metric = self._upsert_metric(
component=component,
operation_type=operation,
window_start=ws,
window_end=ws + window_delta,
request_count=count,
error_count=error_count,
error_rate=(error_count / count) if count else 0.0,
avg_duration_ms=avg_duration,
min_duration_ms=min_duration,
max_duration_ms=max_duration,
p50_duration_ms=p50,
p95_duration_ms=p95,
p99_duration_ms=p99,
metric_metadata={
"sample_size": count,
"generated_at": datetime.now(timezone.utc).isoformat(),
},
db=db,
)
if metric:
created_metrics.append(metric)

if should_close:
db.commit()
return created_metrics

# Fallback: in-Python bucketing (previous implementation)
# Warning: This path loads all entries into memory; for very large ranges this may spike memory usage
range_hours = (full_end - full_start).total_seconds() / 3600
if range_hours > 168: # > 1 week
logger.warning("Large aggregation range (%.1f hours) may cause high memory usage in non-PostgreSQL fallback", range_hours)

# Get unique component/operation pairs for the full range
pair_stmt = (
select(StructuredLogEntry.component, StructuredLogEntry.operation_type)
.where(
and_(
StructuredLogEntry.timestamp >= full_start,
StructuredLogEntry.timestamp < full_end,
StructuredLogEntry.duration_ms.isnot(None),
StructuredLogEntry.component.isnot(None),
StructuredLogEntry.component != "",
StructuredLogEntry.operation_type.isnot(None),
StructuredLogEntry.operation_type != "",
)
)
.distinct()
)

pairs = db.execute(pair_stmt).all()

created_metrics: List[PerformanceMetric] = []

# helper to align timestamp to window start
def _align_to_window_local(dt: datetime, minutes: int) -> datetime:
ts = dt.astimezone(timezone.utc)
total_minutes = int(ts.timestamp() // 60)
aligned_minutes = (total_minutes // minutes) * minutes
return datetime.fromtimestamp(aligned_minutes * 60, tz=timezone.utc)

for component, operation in pairs:
if not component or not operation:
continue

# Fetch all relevant log entries for this component/operation in the full range
entries_stmt = select(StructuredLogEntry).where(
and_(
StructuredLogEntry.component == component,
StructuredLogEntry.operation_type == operation,
StructuredLogEntry.timestamp >= full_start,
StructuredLogEntry.timestamp < full_end,
StructuredLogEntry.duration_ms.isnot(None),
)
)

entries = db.execute(entries_stmt).scalars().all()
if not entries:
continue

# Bucket entries into windows
buckets: Dict[datetime, List[StructuredLogEntry]] = {}
for e in entries:
ts = e.timestamp if e.timestamp.tzinfo else e.timestamp.replace(tzinfo=timezone.utc)
bucket_start = _align_to_window_local(ts, window_minutes)
if bucket_start not in buckets:
buckets[bucket_start] = []
buckets[bucket_start].append(e)

# For each requested window, compute stats if we have data
for window_start in window_starts:
bucket_entries = buckets.get(window_start)
if not bucket_entries:
continue

durations = sorted([b.duration_ms for b in bucket_entries if b.duration_ms is not None])
if not durations:
continue

count = len(durations)
avg_duration = float(sum(durations) / count) if count else 0.0
min_duration = float(durations[0])
max_duration = float(durations[-1])
p50 = float(self._percentile(durations, 0.5))
p95 = float(self._percentile(durations, 0.95))
p99 = float(self._percentile(durations, 0.99))
error_count = self._calculate_error_count(bucket_entries)

try:
metric = self._upsert_metric(
component=component,
operation_type=operation,
window_start=window_start,
window_end=window_start + window_delta,
request_count=count,
error_count=error_count,
error_rate=(error_count / count) if count else 0.0,
avg_duration_ms=avg_duration,
min_duration_ms=min_duration,
max_duration_ms=max_duration,
p50_duration_ms=p50,
p95_duration_ms=p95,
p99_duration_ms=p99,
metric_metadata={
"sample_size": count,
"generated_at": datetime.now(timezone.utc).isoformat(),
},
db=db,
)
if metric:
created_metrics.append(metric)
except Exception:
logger.exception("Failed to upsert metric for %s.%s window %s", component, operation, window_start)

if should_close:
db.commit()
return created_metrics
except Exception:
if should_close:
db.rollback()
raise
finally:
if should_close:
db.close()

def aggregate_all_components(self, window_start: Optional[datetime] = None, window_end: Optional[datetime] = None, db: Optional[Session] = None) -> List[PerformanceMetric]:
"""Aggregate metrics for all components and operations.

Expand Down
Loading
Loading