Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 65 additions & 39 deletions ai_council/arbitration/layer.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
"""Implementation of the ArbitrationLayer for conflict resolution between agent responses."""

from http.client import responses
from difflib import SequenceMatcher


Expand Down Expand Up @@ -63,7 +62,6 @@ def _calculate_similarity(self, responses):
return sum(scores) / len(scores) if scores else 1.0

async def arbitrate(self, responses: List[AgentResponse]) -> ArbitrationResult:
print("DEBUG: Responses received:", responses)
"""
Arbitrate between multiple agent responses to resolve conflicts.

Expand Down Expand Up @@ -231,13 +229,65 @@ async def resolve_contradiction(self, conflict: Conflict, responses: Optional[Li
elif conflict.conflict_type == "quality_conflict":
return await self._resolve_quality_conflict(conflict, responses)
else:
# Default resolution: choose first response with warning
# Unknown conflict type: still attempt score-based fallback when possible
logger.warning("Unknown conflict type", extra={"conflict_type": conflict.conflict_type})
best_resp = self._select_best_response_for_conflict(conflict, responses)
chosen_response_id = best_resp.subtask_id + "_" + best_resp.model_used if best_resp else conflict.response_ids[0]
return Resolution(
chosen_response_id=conflict.response_ids[0],
reasoning=f"Unknown conflict type '{conflict.conflict_type}', defaulted to first response",
confidence=0.5
chosen_response_id=chosen_response_id,
reasoning=(
f"Unknown conflict type '{conflict.conflict_type}', "
"selected best available response by composite quality score"
if best_resp
else f"Unknown conflict type '{conflict.conflict_type}', defaulted to first response"
),
confidence=0.55 if best_resp else 0.5
)

def _select_best_response_for_conflict(
self,
conflict: Conflict,
responses: Optional[List[AgentResponse]] = None,
) -> Optional[AgentResponse]:
"""Select best response for a conflict using confidence + quality scoring.

Selection order:
1. Responses matching conflict IDs exactly (`subtask_model` format)
2. Best-effort matching by model ID token from conflict IDs
3. Fallback to all successful responses
"""
if not responses:
return None

successful = [r for r in responses if r.success]
if not successful:
return None

response_map = {
f"{r.subtask_id}_{r.model_used}": r
for r in successful
}

# 1) Exact conflict-id matches
candidates = [response_map[rid] for rid in conflict.response_ids if rid in response_map]

# 2) Best-effort model-id matching when IDs are malformed/partial
if not candidates:
model_tokens = {rid.split("_", 1)[-1] for rid in conflict.response_ids if "_" in rid}
if model_tokens:
candidates = [r for r in successful if r.model_used in model_tokens]

# 3) Final fallback to all successful responses
if not candidates:
candidates = successful

return max(
candidates,
key=lambda r: (
self._calculate_quality_score(r),
r.self_assessment.confidence_score if r.self_assessment else 0.0,
),
)
Comment on lines +247 to +290
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Verify the current selector is quality-first, not confidence-first.

sed -n '247,290p' ai_council/arbitration/layer.py

python - <<'PY'
def quality(conf, risk_score, content_len, assumptions):
    return conf * 0.4 + risk_score * 0.3 + min(content_len / 1000.0, 1.0) * 0.2 + max(0, 1.0 - assumptions * 0.1) * 0.1

# Higher confidence but worse non-confidence quality signals.
a = ("high_conf", quality(0.95, 0.4, 5, 5), 0.95)
# Lower confidence but much better non-confidence quality signals.
b = ("low_conf", quality(0.50, 1.0, 1000, 0), 0.50)

print("current_key_winner =", max([a, b], key=lambda x: (x[1], x[2]))[0])
print("confidence_only_winner =", max([a, b], key=lambda x: x[2])[0])
PY

Repository: shrixtacy/Ai-Council

Length of output: 1694


🏁 Script executed:

# Find all usages of _select_best_response_for_conflict
rg "_select_best_response_for_conflict" ai_council/arbitration/layer.py -A 2 -B 2

Repository: shrixtacy/Ai-Council

Length of output: 1668


🏁 Script executed:

# Look for the _resolve_confidence_conflict method and its implementation
rg "_resolve_confidence_conflict" ai_council/arbitration/layer.py -A 10 -B 2

Repository: shrixtacy/Ai-Council

Length of output: 1548


🏁 Script executed:

# Get context around line 421-434 to see the claimed call site
sed -n '410,445p' ai_council/arbitration/layer.py

Repository: shrixtacy/Ai-Council

Length of output: 1879


🏁 Script executed:

# Find _calculate_quality_score implementation
rg "_calculate_quality_score" ai_council/arbitration/layer.py -A 15 | head -40

Repository: shrixtacy/Ai-Council

Length of output: 1734


🏁 Script executed:

# Get full _calculate_quality_score implementation
rg "_calculate_quality_score" ai_council/arbitration/layer.py -A 20 | grep -A 20 "def _calculate_quality_score"

Repository: shrixtacy/Ai-Council

Length of output: 1145


Fix the ranking in _resolve_confidence_conflict to prioritize confidence over quality.

The helper _select_best_response_for_conflict ranks responses with (quality_score, confidence_score), making quality the primary decision axis. This causes _resolve_confidence_conflict to select a lower-confidence response if it has better risk/length/assumption metrics, contradicting its documented behavior of "choosing the most confident response". The returned reasoning ("selecting response with highest confidence score") is also inaccurate.

Examples: A response with 50% confidence but better risk/length scores beats one with 95% confidence but worse non-confidence metrics.

Update the helper to accept a ranking parameter (or create a separate confidence-first variant) so _resolve_confidence_conflict can use confidence_score as the primary key instead of quality_score.

Example fix
 def _select_best_response_for_conflict(
     self,
     conflict: Conflict,
     responses: Optional[List[AgentResponse]] = None,
+    ranking_key: str = "quality",
 ) -> Optional[AgentResponse]:
     # ... candidate selection logic ...
-    return max(
-        candidates,
-        key=lambda r: (
-            self._calculate_quality_score(r),
-            r.self_assessment.confidence_score if r.self_assessment else 0.0,
-        ),
-    )
+    if ranking_key == "confidence":
+        return max(
+            candidates,
+            key=lambda r: r.self_assessment.confidence_score if r.self_assessment else 0.0,
+        )
+    
+    return max(
+        candidates,
+        key=lambda r: (
+            self._calculate_quality_score(r),
+            r.self_assessment.confidence_score if r.self_assessment else 0.0,
+        ),
+    )

Then call from _resolve_confidence_conflict(...) with ranking_key="confidence".

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def _select_best_response_for_conflict(
self,
conflict: Conflict,
responses: Optional[List[AgentResponse]] = None,
) -> Optional[AgentResponse]:
"""Select best response for a conflict using confidence + quality scoring.
Selection order:
1. Responses matching conflict IDs exactly (`subtask_model` format)
2. Best-effort matching by model ID token from conflict IDs
3. Fallback to all successful responses
"""
if not responses:
return None
successful = [r for r in responses if r.success]
if not successful:
return None
response_map = {
f"{r.subtask_id}_{r.model_used}": r
for r in successful
}
# 1) Exact conflict-id matches
candidates = [response_map[rid] for rid in conflict.response_ids if rid in response_map]
# 2) Best-effort model-id matching when IDs are malformed/partial
if not candidates:
model_tokens = {rid.split("_", 1)[-1] for rid in conflict.response_ids if "_" in rid}
if model_tokens:
candidates = [r for r in successful if r.model_used in model_tokens]
# 3) Final fallback to all successful responses
if not candidates:
candidates = successful
return max(
candidates,
key=lambda r: (
self._calculate_quality_score(r),
r.self_assessment.confidence_score if r.self_assessment else 0.0,
),
)
def _select_best_response_for_conflict(
self,
conflict: Conflict,
responses: Optional[List[AgentResponse]] = None,
ranking_key: str = "quality",
) -> Optional[AgentResponse]:
"""Select best response for a conflict using confidence + quality scoring.
Selection order:
1. Responses matching conflict IDs exactly (`subtask_model` format)
2. Best-effort matching by model ID token from conflict IDs
3. Fallback to all successful responses
"""
if not responses:
return None
successful = [r for r in responses if r.success]
if not successful:
return None
response_map = {
f"{r.subtask_id}_{r.model_used}": r
for r in successful
}
# 1) Exact conflict-id matches
candidates = [response_map[rid] for rid in conflict.response_ids if rid in response_map]
# 2) Best-effort model-id matching when IDs are malformed/partial
if not candidates:
model_tokens = {rid.split("_", 1)[-1] for rid in conflict.response_ids if "_" in rid}
if model_tokens:
candidates = [r for r in successful if r.model_used in model_tokens]
# 3) Final fallback to all successful responses
if not candidates:
candidates = successful
if ranking_key == "confidence":
return max(
candidates,
key=lambda r: r.self_assessment.confidence_score if r.self_assessment else 0.0,
)
return max(
candidates,
key=lambda r: (
self._calculate_quality_score(r),
r.self_assessment.confidence_score if r.self_assessment else 0.0,
),
)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@ai_council/arbitration/layer.py` around lines 247 - 290, The helper
_select_best_response_for_conflict currently ranks by (quality_score,
confidence_score) which makes quality primary; change it to accept a new
optional parameter (e.g., ranking="quality"|"confidence") and when
ranking=="confidence" use max(..., key=lambda r:
(r.self_assessment.confidence_score if r.self_assessment else 0.0,
self._calculate_quality_score(r))) so confidence becomes the primary key; update
_resolve_confidence_conflict to call
_select_best_response_for_conflict(conflict, responses, ranking="confidence")
(or use a separate _select_best_response_by_confidence wrapper) and keep
existing behavior for other callers, preserving use of Conflict, AgentResponse,
_calculate_quality_score and self_assessment.confidence_score.


def _group_responses_by_subtask(self, responses: List[AgentResponse]) -> Dict[str, List[AgentResponse]]:
"""Group responses by their subtask ID."""
Expand Down Expand Up @@ -354,22 +404,14 @@ def _detect_quality_conflicts(self, responses: List[AgentResponse]) -> List[Conf

async def _resolve_content_contradiction(self, conflict: Conflict, responses: Optional[List[AgentResponse]] = None) -> Resolution:
"""Resolve content contradictions by choosing the most reliable response."""
if not responses:
best_resp = self._select_best_response_for_conflict(conflict, responses)
if not best_resp:
return Resolution(
chosen_response_id=conflict.response_ids[0],
reasoning="Resolved content contradiction by default (no responses context)",
confidence=0.7
)

conflict_responses = [r for r in responses if r.subtask_id + "_" + r.model_used in conflict.response_ids]
if not conflict_responses:
return Resolution(
chosen_response_id=conflict.response_ids[0],
reasoning="Resolved content contradiction by default (responses not found)",
confidence=0.7
)

best_resp = max(conflict_responses, key=lambda r: self._calculate_quality_score(r))

return Resolution(
chosen_response_id=best_resp.subtask_id + "_" + best_resp.model_used,
reasoning="Resolved content contradiction by selecting response with highest composite score",
Expand All @@ -378,22 +420,14 @@ async def _resolve_content_contradiction(self, conflict: Conflict, responses: Op

async def _resolve_confidence_conflict(self, conflict: Conflict, responses: Optional[List[AgentResponse]] = None) -> Resolution:
"""Resolve confidence conflicts by choosing the most confident response."""
if not responses:
best_resp = self._select_best_response_for_conflict(conflict, responses)
if not best_resp:
return Resolution(
chosen_response_id=conflict.response_ids[0],
reasoning="Resolved confidence conflict by default (no responses context)",
confidence=0.8
)

conflict_responses = [r for r in responses if r.subtask_id + "_" + r.model_used in conflict.response_ids]
if not conflict_responses:
return Resolution(
chosen_response_id=conflict.response_ids[0],
reasoning="Resolved confidence conflict by default (responses not found)",
confidence=0.8
)

best_resp = max(conflict_responses, key=lambda r: r.self_assessment.confidence_score if getattr(r, 'self_assessment', None) and getattr(r.self_assessment, 'confidence_score', None) is not None else 0.0)

return Resolution(
chosen_response_id=best_resp.subtask_id + "_" + best_resp.model_used,
reasoning="Resolved confidence conflict by selecting response with highest confidence score",
Expand All @@ -402,22 +436,14 @@ async def _resolve_confidence_conflict(self, conflict: Conflict, responses: Opti

async def _resolve_quality_conflict(self, conflict: Conflict, responses: Optional[List[AgentResponse]] = None) -> Resolution:
"""Resolve quality conflicts by choosing the highest quality response."""
if not responses:
best_resp = self._select_best_response_for_conflict(conflict, responses)
if not best_resp:
return Resolution(
chosen_response_id=conflict.response_ids[0],
reasoning="Resolved quality conflict by default (no responses context)",
confidence=0.75
)

conflict_responses = [r for r in responses if r.subtask_id + "_" + r.model_used in conflict.response_ids]
if not conflict_responses:
return Resolution(
chosen_response_id=conflict.response_ids[0],
reasoning="Resolved quality conflict by default (responses not found)",
confidence=0.75
)

best_resp = max(conflict_responses, key=lambda r: self._calculate_quality_score(r))

return Resolution(
chosen_response_id=best_resp.subtask_id + "_" + best_resp.model_used,
reasoning="Resolved quality conflict by selecting response with highest quality score",
Expand Down
95 changes: 95 additions & 0 deletions tests/test_arbitration_layer.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,101 @@ async def test_resolve_contradiction_types(arbitration_layer):
assert res4.chosen_response_id == "a"
assert "Unknown" in res4.reasoning


@pytest.mark.asyncio
async def test_resolve_confidence_conflict_prefers_highest_confidence_not_first(arbitration_layer, sample_subtask):
"""Regression test for issue #174: should not always choose first response."""
low_confidence = AgentResponse(
subtask_id=sample_subtask.id,
model_used="model-a",
content="Low confidence answer",
success=True,
self_assessment=SelfAssessment(
confidence_score=0.51,
assumptions=["weak assumption"],
risk_level=RiskLevel.MEDIUM,
estimated_cost=0.01,
token_usage=100,
execution_time=0.2,
model_used="model-a",
),
)
high_confidence = AgentResponse(
subtask_id=sample_subtask.id,
model_used="model-b",
content="High confidence answer",
success=True,
self_assessment=SelfAssessment(
confidence_score=0.92,
assumptions=[],
risk_level=RiskLevel.LOW,
estimated_cost=0.01,
token_usage=100,
execution_time=0.2,
model_used="model-b",
),
)

conflict = Conflict(
response_ids=[
f"{sample_subtask.id}_model-a",
f"{sample_subtask.id}_model-b",
],
conflict_type="confidence_conflict",
description="confidence gap",
)

resolution = await arbitration_layer.resolve_contradiction(conflict, [low_confidence, high_confidence])
assert resolution.chosen_response_id == f"{sample_subtask.id}_model-b"


@pytest.mark.asyncio
async def test_resolve_unknown_conflict_with_context_uses_best_score(arbitration_layer, sample_subtask):
"""Unknown conflict types should still use score-based fallback when responses are provided."""
weaker = AgentResponse(
subtask_id=sample_subtask.id,
model_used="model-a",
content="Weak answer",
success=True,
self_assessment=SelfAssessment(
confidence_score=0.55,
assumptions=["a", "b"],
risk_level=RiskLevel.HIGH,
estimated_cost=0.01,
token_usage=50,
execution_time=0.2,
model_used="model-a",
),
)
stronger = AgentResponse(
subtask_id=sample_subtask.id,
model_used="model-b",
content="Much stronger answer with more complete details" * 3,
success=True,
self_assessment=SelfAssessment(
confidence_score=0.9,
assumptions=[],
risk_level=RiskLevel.LOW,
estimated_cost=0.01,
token_usage=80,
execution_time=0.2,
model_used="model-b",
),
)

conflict = Conflict(
response_ids=[
f"{sample_subtask.id}_model-a",
f"{sample_subtask.id}_model-b",
],
conflict_type="unknown",
description="unknown conflict type",
)

resolution = await arbitration_layer.resolve_contradiction(conflict, [weaker, stronger])
assert resolution.chosen_response_id == f"{sample_subtask.id}_model-b"
assert "selected best available response" in resolution.reasoning

def test_risk_level_to_score(arbitration_layer):
assert arbitration_layer._risk_level_to_score(RiskLevel.LOW) == 1.0
assert arbitration_layer._risk_level_to_score(RiskLevel.CRITICAL) == 0.1
Expand Down
Loading