Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 14 additions & 3 deletions backend/app/api/report.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@
import os
import traceback
import threading
from flask import request, jsonify, send_file
from flask import request, jsonify, send_file, current_app

from . import report_bp
from ..config import Config
from ..services.report_agent import ReportAgent, ReportManager, ReportStatus
from ..services.graph_tools import GraphToolsService
from ..services.simulation_manager import SimulationManager
from ..models.project import ProjectManager
from ..models.task import TaskManager, TaskStatus
Expand Down Expand Up @@ -109,6 +110,14 @@ def generate_report():
import uuid
report_id = f"report_{uuid.uuid4().hex[:12]}"

# Capture GraphStorage before background thread starts
storage = current_app.extensions.get('neo4j_storage')
if not storage:
return jsonify({
"success": False,
"error": "GraphStorage not initialized - check Neo4j connection"
}), 500

# Create async task
task_manager = TaskManager()
task_id = task_manager.create_task(
Expand All @@ -130,11 +139,13 @@ def run_generate():
message="Initializing Report Agent..."
)

# Create Report Agent
# Create Report Agent with injected graph tools
tools = GraphToolsService(storage=storage)
agent = ReportAgent(
graph_id=graph_id,
simulation_id=simulation_id,
simulation_requirement=simulation_requirement
simulation_requirement=simulation_requirement,
graph_tools=tools
)

# Progress callback
Expand Down
1 change: 1 addition & 0 deletions backend/app/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ class Config:
LLM_API_KEY = os.environ.get('LLM_API_KEY')
LLM_BASE_URL = os.environ.get('LLM_BASE_URL', 'http://localhost:11434/v1')
LLM_MODEL_NAME = os.environ.get('LLM_MODEL_NAME', 'qwen2.5:32b')
NER_MODEL = os.environ.get('NER_MODEL', 'minimax-m2.7:cloud')

# Neo4j configuration
NEO4J_URI = os.environ.get('NEO4J_URI', 'bolt://localhost:7687')
Expand Down
7 changes: 7 additions & 0 deletions backend/app/services/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
EventConfig,
PlatformConfig
)
from .platform_simulators import PlatformSimulator, TwitterSimulator, RedditSimulator
from .outcome_scorer import SimulationOutcome, OutcomeScorer
from .simulation_runner import (
SimulationRunner,
SimulationRunState,
Expand Down Expand Up @@ -55,6 +57,11 @@
'TimeSimulationConfig',
'EventConfig',
'PlatformConfig',
'PlatformSimulator',
'TwitterSimulator',
'RedditSimulator',
'SimulationOutcome',
'OutcomeScorer',
'SimulationRunner',
'SimulationRunState',
'RunnerStatus',
Expand Down
113 changes: 110 additions & 3 deletions backend/app/services/entity_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,12 @@
Replaces zep_entity_reader.py — all Zep Cloud calls replaced by GraphStorage.
"""

from difflib import SequenceMatcher
import json
from typing import Dict, Any, List, Optional, Set
from dataclasses import dataclass, field

from ..config import Config
from ..utils.logger import get_logger
from ..storage import GraphStorage

Expand Down Expand Up @@ -73,8 +76,15 @@ class EntityReader:
3. Get related edges and linked node information for each entity
"""

def __init__(self, storage: GraphStorage):
def __init__(
self,
storage: GraphStorage,
ner_model: Optional[str] = None,
deduplication_threshold: float = 0.85,
):
self.storage = storage
self.ner_model = ner_model or Config.NER_MODEL
self.deduplication_threshold = deduplication_threshold
Comment on lines +79 to +87
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Reject out-of-range similarity thresholds early.

An invalid deduplication_threshold silently changes dedup behavior: values above 1 prevent merges, and values below 0 merge every same-type pair.

💡 Suggested fix
     def __init__(
         self,
         storage: GraphStorage,
         ner_model: Optional[str] = None,
         deduplication_threshold: float = 0.85,
     ):
         self.storage = storage
         self.ner_model = ner_model or Config.NER_MODEL
+        if not 0.0 <= deduplication_threshold <= 1.0:
+            raise ValueError("deduplication_threshold must be between 0.0 and 1.0")
         self.deduplication_threshold = deduplication_threshold
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@backend/app/services/entity_reader.py` around lines 79 - 87, Validate
deduplication_threshold in EntityReader.__init__: check that the provided
deduplication_threshold is between 0.0 and 1.0 (inclusive or exclusive per
project convention) and raise a ValueError with a clear message if out of range;
update the __init__ signature in entity_reader.py to perform this early
validation (before assigning to self.deduplication_threshold) so invalid values
cannot silently alter dedup behavior.


def get_all_nodes(self, graph_id: str) -> List[Dict[str, Any]]:
"""
Expand Down Expand Up @@ -230,8 +240,20 @@ def filter_defined_entities(

filtered_entities.append(entity)

logger.info(f"Filter completed: total nodes {total_count}, matched {len(filtered_entities)}, "
f"entity types: {entity_types_found}")
filtered_entities = self._deduplicate_entities(
filtered_entities,
similarity_threshold=self.deduplication_threshold,
)
entity_types_found = {
entity.get_entity_type()
for entity in filtered_entities
if entity.get_entity_type()
}

logger.info(
f"Filter completed: total nodes {total_count}, matched {len(filtered_entities)}, "
f"entity types: {entity_types_found}"
)

return FilteredEntities(
entities=filtered_entities,
Expand Down Expand Up @@ -338,3 +360,88 @@ def get_entities_by_type(
enrich_with_edges=enrich_with_edges
)
return result.entities

def _deduplicate_entities(
self,
entities: List[EntityNode],
similarity_threshold: float = 0.85,
) -> List[EntityNode]:
"""
Collapse near-duplicate entities using fuzzy name matching.

Only entities with the same resolved entity type are considered duplicates.
The first entity encountered remains canonical and absorbs merged context.
"""
if len(entities) < 2:
return entities

deduplicated: List[EntityNode] = []

for entity in entities:
canonical_match = None
entity_type = entity.get_entity_type()
normalized_name = self._normalize_entity_name(entity.name)

for candidate in deduplicated:
candidate_type = candidate.get_entity_type()
if entity_type != candidate_type:
continue

candidate_name = self._normalize_entity_name(candidate.name)
similarity = SequenceMatcher(None, normalized_name, candidate_name).ratio()
if similarity >= similarity_threshold:
canonical_match = candidate
break

if canonical_match is None:
deduplicated.append(entity)
continue

self._merge_entity_into_canonical(canonical_match, entity)

return deduplicated

@staticmethod
def _normalize_entity_name(name: str) -> str:
return " ".join((name or "").lower().split())

def _merge_entity_into_canonical(self, canonical: EntityNode, duplicate: EntityNode) -> None:
"""Merge duplicate entity details into the canonical entity in place."""
canonical.labels = sorted(set(canonical.labels) | set(duplicate.labels))
Comment on lines +408 to +410
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Preserve the canonical label order here.

get_entity_type() is order-dependent, so sorting the merged labels can change an entity from its original filtered type to some other label mid-dedup. That leaks into the recomputed entity_types_found on Lines 247-250 and can also stop later same-type matches from merging into the same canonical record.

💡 Suggested fix
-        canonical.labels = sorted(set(canonical.labels) | set(duplicate.labels))
+        canonical.labels = list(dict.fromkeys(canonical.labels + duplicate.labels))
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def _merge_entity_into_canonical(self, canonical: EntityNode, duplicate: EntityNode) -> None:
"""Merge duplicate entity details into the canonical entity in place."""
canonical.labels = sorted(set(canonical.labels) | set(duplicate.labels))
def _merge_entity_into_canonical(self, canonical: EntityNode, duplicate: EntityNode) -> None:
"""Merge duplicate entity details into the canonical entity in place."""
canonical.labels = list(dict.fromkeys(canonical.labels + duplicate.labels))
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@backend/app/services/entity_reader.py` around lines 408 - 410, In
_merge_entity_into_canonical, avoid sorting merged labels because
get_entity_type is order-dependent; instead preserve canonical.labels order and
append any labels from duplicate.labels that are not already present (dedupe
while keeping canonical order), so update canonical.labels to canonical.labels +
[l for l in duplicate.labels if l not in canonical.labels]; this ensures
get_entity_type and downstream entity_types_found logic remain stable and later
same-type merges still match the canonical record.


if not canonical.summary and duplicate.summary:
canonical.summary = duplicate.summary
elif duplicate.summary and len(duplicate.summary) > len(canonical.summary):
canonical.summary = duplicate.summary

merged_attributes = dict(canonical.attributes)
for key, value in duplicate.attributes.items():
if key not in merged_attributes or not merged_attributes[key]:
merged_attributes[key] = value
canonical.attributes = merged_attributes
Comment on lines +417 to +421
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Don't treat falsy attribute values as missing.

not merged_attributes[key] overwrites valid canonical values like 0 and False during deduplication, which silently corrupts merged facts.

💡 Suggested fix
         merged_attributes = dict(canonical.attributes)
         for key, value in duplicate.attributes.items():
-            if key not in merged_attributes or not merged_attributes[key]:
+            current_value = merged_attributes.get(key)
+            if key not in merged_attributes or current_value is None or current_value == "":
                 merged_attributes[key] = value
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@backend/app/services/entity_reader.py` around lines 417 - 421, The merge loop
treats falsy canonical values as missing; update the condition in the block that
assigns merged_attributes from duplicate.attributes (where canonical and
duplicate are used) to only treat truly absent/undefined values as missing
(e.g., check for key not in merged_attributes or merged_attributes[key] is None)
instead of using a truthiness test like not merged_attributes[key], so valid
values like 0 or False are preserved.


canonical.related_edges = self._merge_unique_dicts(
canonical.related_edges,
duplicate.related_edges,
)
canonical.related_nodes = self._merge_unique_dicts(
canonical.related_nodes,
duplicate.related_nodes,
)

@staticmethod
def _merge_unique_dicts(
existing_items: List[Dict[str, Any]],
incoming_items: List[Dict[str, Any]],
) -> List[Dict[str, Any]]:
seen = set()
merged_items: List[Dict[str, Any]] = []

for item in existing_items + incoming_items:
marker = json.dumps(item, sort_keys=True, ensure_ascii=False)
if marker in seen:
continue
seen.add(marker)
merged_items.append(item)

return merged_items
142 changes: 142 additions & 0 deletions backend/app/services/outcome_scorer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
"""
Simulation outcome scoring primitives.
"""

from collections import Counter, defaultdict
from dataclasses import dataclass, field
import math
from typing import Any, Dict, List, Tuple


@dataclass
class SimulationOutcome:
"""Core quantitative metrics describing a simulation run."""

total_posts: int = 0
total_engagement: int = 0
average_post_reach: float = 0.0
viral_posts_count: int = 0
sentiment_standard_deviation: float = 0.0
platform_distribution: Dict[str, float] = field(default_factory=dict)
top_influencers: List[Tuple[str, float]] = field(default_factory=list)
peak_activity_round: int = 0


class OutcomeScorer:
"""Compute additive core metrics from simulation post-like records."""

def score_simulation(
self,
posts: List[Any],
viral_threshold: int = 1000,
influencer_limit: int = 5,
) -> SimulationOutcome:
total_posts = len(posts)
total_engagement = self._sum_engagement(posts)
average_post_reach = self._compute_average_reach(posts)

return SimulationOutcome(
total_posts=total_posts,
total_engagement=total_engagement,
average_post_reach=average_post_reach,
viral_posts_count=sum(
1 for post in posts if self._engagement_value(post) >= viral_threshold
),
sentiment_standard_deviation=self._compute_sentiment_standard_deviation(posts),
platform_distribution=self._compute_platform_distribution(posts),
top_influencers=self._find_top_influencers(posts, influencer_limit=influencer_limit),
peak_activity_round=self._find_peak_activity_round(posts),
)

@staticmethod
def _value(source: Any, key: str, default: Any = 0) -> Any:
if isinstance(source, dict):
return source.get(key, default)
return getattr(source, key, default)

def _engagement_value(self, post: Any) -> int:
direct_value = self._value(post, "engagement", None)
if direct_value is not None:
return int(direct_value)

components = (
self._value(post, "likes", 0),
self._value(post, "upvotes", 0),
self._value(post, "comments", 0),
self._value(post, "replies", 0),
self._value(post, "shares", 0),
self._value(post, "retweets", 0),
self._value(post, "reposts", 0),
)
return int(sum(int(component or 0) for component in components))
Comment on lines +62 to +71
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Avoid double-counting alias metrics in engagement totals.

backend/app/services/platform_simulators.py lines 47-48 and 77-80 treat retweets/reposts and comments/replies as alternate names for the same interaction. Summing both here will inflate total_engagement, viral_posts_count, average_post_reach when it falls back to engagement, and top_influencers as soon as a post carries both aliases.

🛠️ Proposed fix
     def _engagement_value(self, post: Any) -> int:
         direct_value = self._value(post, "engagement", None)
         if direct_value is not None:
             return int(direct_value)

+        comment_count = max(
+            int(self._value(post, "comments", 0) or 0),
+            int(self._value(post, "replies", 0) or 0),
+        )
+        repost_count = max(
+            int(self._value(post, "retweets", 0) or 0),
+            int(self._value(post, "reposts", 0) or 0),
+        )
+
         components = (
             self._value(post, "likes", 0),
             self._value(post, "upvotes", 0),
-            self._value(post, "comments", 0),
-            self._value(post, "replies", 0),
+            comment_count,
             self._value(post, "shares", 0),
-            self._value(post, "retweets", 0),
-            self._value(post, "reposts", 0),
+            repost_count,
         )
         return int(sum(int(component or 0) for component in components))
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
components = (
self._value(post, "likes", 0),
self._value(post, "upvotes", 0),
self._value(post, "comments", 0),
self._value(post, "replies", 0),
self._value(post, "shares", 0),
self._value(post, "retweets", 0),
self._value(post, "reposts", 0),
)
return int(sum(int(component or 0) for component in components))
comment_count = max(
int(self._value(post, "comments", 0) or 0),
int(self._value(post, "replies", 0) or 0),
)
repost_count = max(
int(self._value(post, "retweets", 0) or 0),
int(self._value(post, "reposts", 0) or 0),
)
components = (
self._value(post, "likes", 0),
self._value(post, "upvotes", 0),
comment_count,
self._value(post, "shares", 0),
repost_count,
)
return int(sum(int(component or 0) for component in components))
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@backend/app/services/outcome_scorer.py` around lines 62 - 71, The engagement
sum in outcome_scorer.py currently adds both alias fields (e.g., "retweets" and
"reposts", "comments" and "replies") causing double-counting; update the
aggregation logic around components/_value so aliases are deduplicated by
mapping aliases to a canonical metric (or taking the max of alias pairs) before
summing—e.g., treat "retweets"/"reposts" as one metric and "comments"/"replies"
as one metric, compute a single value for each canonical interaction, then
return the int(sum(...)) of those canonical values.


def _sum_engagement(self, posts: List[Any]) -> int:
return sum(self._engagement_value(post) for post in posts)

def _compute_average_reach(self, posts: List[Any]) -> float:
if not posts:
return 0.0

total_reach = 0.0
for post in posts:
reach = self._value(post, "reach", None)
if reach is None:
reach = self._value(post, "impressions", self._engagement_value(post))
total_reach += float(reach or 0.0)

return total_reach / len(posts)

def _compute_sentiment_standard_deviation(self, posts: List[Any]) -> float:
sentiments = [
float(self._value(post, "sentiment"))
for post in posts
if self._value(post, "sentiment", None) is not None
]
if len(sentiments) < 2:
return 0.0

mean = sum(sentiments) / len(sentiments)
variance = sum((value - mean) ** 2 for value in sentiments) / len(sentiments)
return math.sqrt(variance)

def _compute_platform_distribution(self, posts: List[Any]) -> Dict[str, float]:
if not posts:
return {}

platform_counts = Counter(
str(self._value(post, "platform", "unknown")).lower()
for post in posts
)
total = sum(platform_counts.values())
return {
platform: count / total
for platform, count in platform_counts.items()
}

def _find_top_influencers(
self,
posts: List[Any],
influencer_limit: int = 5,
) -> List[Tuple[str, float]]:
influence_scores: Dict[str, float] = defaultdict(float)

for post in posts:
author_name = self._value(post, "author_name", None)
if author_name is None:
author_name = self._value(post, "agent_name", self._value(post, "author_id", "unknown"))
influence_scores[str(author_name)] += float(self._engagement_value(post))

ranked = sorted(
influence_scores.items(),
key=lambda item: item[1],
reverse=True,
)
return ranked[:influencer_limit]

def _find_peak_activity_round(self, posts: List[Any]) -> int:
if not posts:
return 0

rounds = Counter(int(self._value(post, "created_round", 0) or 0) for post in posts)
peak_round, _ = max(rounds.items(), key=lambda item: (item[1], -item[0]))
return peak_round
Loading