Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 100 additions & 27 deletions backend/council.py
Original file line number Diff line number Diff line change
@@ -1,41 +1,51 @@
"""3-stage LLM Council orchestration."""

from typing import List, Dict, Any, Tuple
from .openrouter import query_models_parallel, query_model
from .openrouter import query_models_parallel, query_model, ModelQueryError, is_error
from .config import COUNCIL_MODELS, CHAIRMAN_MODEL


async def stage1_collect_responses(user_query: str) -> List[Dict[str, Any]]:
async def stage1_collect_responses(user_query: str) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]]]:
"""
Stage 1: Collect individual responses from all council models.

Args:
user_query: The user's question

Returns:
List of dicts with 'model' and 'response' keys
Tuple of (successful responses list, errors list)
"""
messages = [{"role": "user", "content": user_query}]

# Query all models in parallel
responses = await query_models_parallel(COUNCIL_MODELS, messages)

# Format results
# Format results, separating successes from errors
stage1_results = []
stage1_errors = []
for model, response in responses.items():
if response is not None: # Only include successful responses
if is_error(response):
if isinstance(response, ModelQueryError):
stage1_errors.append(response.to_dict())
else:
stage1_errors.append({
'error_type': 'unknown',
'message': 'Unknown error occurred',
'model': model
})
else:
stage1_results.append({
"model": model,
"response": response.get('content', '')
})

return stage1_results
return stage1_results, stage1_errors


async def stage2_collect_rankings(
user_query: str,
stage1_results: List[Dict[str, Any]]
) -> Tuple[List[Dict[str, Any]], Dict[str, str]]:
) -> Tuple[List[Dict[str, Any]], Dict[str, str], List[Dict[str, Any]]]:
"""
Stage 2: Each model ranks the anonymized responses.

Expand All @@ -44,7 +54,7 @@ async def stage2_collect_rankings(
stage1_results: Results from Stage 1

Returns:
Tuple of (rankings list, label_to_model mapping)
Tuple of (rankings list, label_to_model mapping, errors list)
"""
# Create anonymized labels for responses (Response A, Response B, etc.)
labels = [chr(65 + i) for i in range(len(stage1_results))] # A, B, C, ...
Expand Down Expand Up @@ -97,10 +107,20 @@ async def stage2_collect_rankings(
# Get rankings from all council models in parallel
responses = await query_models_parallel(COUNCIL_MODELS, messages)

# Format results
# Format results, separating successes from errors
stage2_results = []
stage2_errors = []
for model, response in responses.items():
if response is not None:
if is_error(response):
if isinstance(response, ModelQueryError):
stage2_errors.append(response.to_dict())
else:
stage2_errors.append({
'error_type': 'unknown',
'message': 'Unknown error occurred',
'model': model
})
else:
full_text = response.get('content', '')
parsed = parse_ranking_from_text(full_text)
stage2_results.append({
Expand All @@ -109,14 +129,14 @@ async def stage2_collect_rankings(
"parsed_ranking": parsed
})

return stage2_results, label_to_model
return stage2_results, label_to_model, stage2_errors


async def stage3_synthesize_final(
user_query: str,
stage1_results: List[Dict[str, Any]],
stage2_results: List[Dict[str, Any]]
) -> Dict[str, Any]:
) -> Tuple[Dict[str, Any], List[Dict[str, Any]]]:
"""
Stage 3: Chairman synthesizes final response.

Expand All @@ -126,7 +146,7 @@ async def stage3_synthesize_final(
stage2_results: Rankings from Stage 2

Returns:
Dict with 'model' and 'response' keys
Tuple of (result dict with 'model' and 'response' keys, errors list)
"""
# Build comprehensive context for chairman
stage1_text = "\n\n".join([
Expand Down Expand Up @@ -161,17 +181,31 @@ async def stage3_synthesize_final(
# Query the chairman model
response = await query_model(CHAIRMAN_MODEL, messages)

if response is None:
# Fallback if chairman fails
return {
"model": CHAIRMAN_MODEL,
"response": "Error: Unable to generate final synthesis."
}
stage3_errors = []
if is_error(response):
if isinstance(response, ModelQueryError):
error_info = response.to_dict()
stage3_errors.append(error_info)
return {
"model": CHAIRMAN_MODEL,
"response": f"Error: {error_info['message']}",
"error": error_info
}, stage3_errors
else:
stage3_errors.append({
'error_type': 'unknown',
'message': 'Unknown error occurred',
'model': CHAIRMAN_MODEL
})
return {
"model": CHAIRMAN_MODEL,
"response": "Error: Unable to generate final synthesis."
}, stage3_errors

return {
"model": CHAIRMAN_MODEL,
"response": response.get('content', '')
}
}, stage3_errors


def parse_ranking_from_text(ranking_text: str) -> List[str]:
Expand Down Expand Up @@ -302,34 +336,73 @@ async def run_full_council(user_query: str) -> Tuple[List, List, Dict, Dict]:

Returns:
Tuple of (stage1_results, stage2_results, stage3_result, metadata)
metadata includes 'errors' list with any failures from all stages
"""
all_errors = []

# Stage 1: Collect individual responses
stage1_results = await stage1_collect_responses(user_query)
stage1_results, stage1_errors = await stage1_collect_responses(user_query)
all_errors.extend(stage1_errors)

# If no models responded successfully, return error
# If no models responded successfully, return error with details
if not stage1_results:
error_summary = _summarize_errors(stage1_errors)
return [], [], {
"model": "error",
"response": "All models failed to respond. Please try again."
}, {}
"response": f"All models failed to respond. {error_summary}"
}, {"errors": all_errors}

# Stage 2: Collect rankings
stage2_results, label_to_model = await stage2_collect_rankings(user_query, stage1_results)
stage2_results, label_to_model, stage2_errors = await stage2_collect_rankings(user_query, stage1_results)
all_errors.extend(stage2_errors)

# Calculate aggregate rankings
aggregate_rankings = calculate_aggregate_rankings(stage2_results, label_to_model)

# Stage 3: Synthesize final answer
stage3_result = await stage3_synthesize_final(
stage3_result, stage3_errors = await stage3_synthesize_final(
user_query,
stage1_results,
stage2_results
)
all_errors.extend(stage3_errors)

# Prepare metadata
metadata = {
"label_to_model": label_to_model,
"aggregate_rankings": aggregate_rankings
"aggregate_rankings": aggregate_rankings,
"errors": all_errors if all_errors else None
}

return stage1_results, stage2_results, stage3_result, metadata


def _summarize_errors(errors: List[Dict[str, Any]]) -> str:
"""Create a human-readable summary of errors."""
if not errors:
return "Please try again."

# Group by error type
by_type = {}
for error in errors:
error_type = error.get('error_type', 'unknown')
if error_type not in by_type:
by_type[error_type] = []
by_type[error_type].append(error)

summaries = []
if 'auth' in by_type:
summaries.append("API key issue - please check your OPENROUTER_API_KEY")
if 'payment' in by_type:
summaries.append("Payment required - please add credits to OpenRouter")
if 'rate_limit' in by_type:
summaries.append(f"{len(by_type['rate_limit'])} model(s) rate limited")
if 'not_found' in by_type:
models = [e.get('model', 'unknown') for e in by_type['not_found']]
summaries.append(f"Model(s) not found: {', '.join(models)}")
if 'timeout' in by_type:
summaries.append(f"{len(by_type['timeout'])} model(s) timed out")
if 'server' in by_type:
summaries.append("OpenRouter server error")

return "; ".join(summaries) if summaries else "Please try again."
91 changes: 85 additions & 6 deletions backend/openrouter.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,33 @@
"""OpenRouter API client for making LLM requests."""

import httpx
from typing import List, Dict, Any, Optional
from typing import List, Dict, Any, Optional, Union
from dataclasses import dataclass
from .config import OPENROUTER_API_KEY, OPENROUTER_API_URL


@dataclass
class ModelQueryError:
"""Structured error information from a failed model query."""
error_type: str # 'auth', 'rate_limit', 'not_found', 'payment', 'server', 'timeout', 'unknown'
message: str
status_code: Optional[int] = None
model: Optional[str] = None

def to_dict(self) -> Dict[str, Any]:
return {
'error_type': self.error_type,
'message': self.message,
'status_code': self.status_code,
'model': self.model
}


async def query_model(
model: str,
messages: List[Dict[str, str]],
timeout: float = 120.0
) -> Optional[Dict[str, Any]]:
) -> Union[Dict[str, Any], ModelQueryError]:
"""
Query a single model via OpenRouter API.

Expand All @@ -19,7 +37,8 @@ async def query_model(
timeout: Request timeout in seconds

Returns:
Response dict with 'content' and optional 'reasoning_details', or None if failed
Response dict with 'content' and optional 'reasoning_details',
or ModelQueryError if the request failed
"""
headers = {
"Authorization": f"Bearer {OPENROUTER_API_KEY}",
Expand All @@ -38,6 +57,44 @@ async def query_model(
headers=headers,
json=payload
)

# Handle specific HTTP error codes
if response.status_code == 401:
return ModelQueryError(
error_type='auth',
message='Invalid API key. Please check your OPENROUTER_API_KEY.',
status_code=401,
model=model
)
elif response.status_code == 402:
return ModelQueryError(
error_type='payment',
message='Payment required. Please add credits to your OpenRouter account.',
status_code=402,
model=model
)
elif response.status_code == 404:
return ModelQueryError(
error_type='not_found',
message=f'Model "{model}" not found on OpenRouter.',
status_code=404,
model=model
)
elif response.status_code == 429:
return ModelQueryError(
error_type='rate_limit',
message='Rate limit exceeded. Please wait before retrying.',
status_code=429,
model=model
)
elif response.status_code >= 500:
return ModelQueryError(
error_type='server',
message=f'OpenRouter server error (HTTP {response.status_code}). Please try again.',
status_code=response.status_code,
model=model
)

response.raise_for_status()

data = response.json()
Expand All @@ -48,15 +105,32 @@ async def query_model(
'reasoning_details': message.get('reasoning_details')
}

except httpx.TimeoutException:
return ModelQueryError(
error_type='timeout',
message=f'Request timed out after {timeout}s.',
model=model
)
except httpx.HTTPStatusError as e:
return ModelQueryError(
error_type='unknown',
message=f'HTTP error: {e}',
status_code=e.response.status_code if e.response else None,
model=model
)
except Exception as e:
print(f"Error querying model {model}: {e}")
return None
return ModelQueryError(
error_type='unknown',
message=str(e),
model=model
)


async def query_models_parallel(
models: List[str],
messages: List[Dict[str, str]]
) -> Dict[str, Optional[Dict[str, Any]]]:
) -> Dict[str, Union[Dict[str, Any], ModelQueryError]]:
"""
Query multiple models in parallel.

Expand All @@ -65,7 +139,7 @@ async def query_models_parallel(
messages: List of message dicts to send to each model

Returns:
Dict mapping model identifier to response dict (or None if failed)
Dict mapping model identifier to response dict or ModelQueryError
"""
import asyncio

Expand All @@ -77,3 +151,8 @@ async def query_models_parallel(

# Map models to their responses
return {model: response for model, response in zip(models, responses)}


def is_error(response: Union[Dict[str, Any], ModelQueryError, None]) -> bool:
"""Check if a response is an error."""
return isinstance(response, ModelQueryError) or response is None