Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,12 @@ make it heavy/
└── task_done_tool.py # Task completion
```

## ⏱ Benchmarking

Run `benchmark.py` to compare how long multi-agent workflows take when using
Tygent's asynchronous scheduler versus the standard thread pool fallback.
The script prints a simple table of execution times for several example prompts.

## 🤝 Contributing

1. Fork the repository
Expand Down
15 changes: 15 additions & 0 deletions TYGENT_INTEGRATION.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# Tygent Integration

This project uses [Tygent](https://github.com/OpenPipe/tygent) to schedule agents asynchronously and run their tasks in a DAG. The integration occurs in two places:

1. **Orchestrator (`orchestrator.py`)**
- If `use_tygent` is enabled when creating `TaskOrchestrator`, the orchestrator uses `MultiAgentManager` to run multiple agents concurrently. Each agent is wrapped in a tiny class with an `execute` coroutine that calls `run_agent_async`.
- When Tygent execution fails or is disabled, the orchestrator falls back to a thread pool so functionality does not break.
- Progress updates and aggregation work the same regardless of the execution strategy.

2. **Agent (`agent.py`)**
- Each `OpenRouterAgent` exposes a `run_async` method that builds a Tygent `DAG` representing the agent plan.
- The DAG issues web search nodes in parallel and then summarizes the results with the LLM using Tygent's `Scheduler`.
- This allows individual agents to benefit from asynchronous execution while still supporting the synchronous `run` method used by the thread‑pool fallback.

The `benchmark.py` script demonstrates the performance difference by running example prompts with Tygent enabled and disabled. It shows how asynchronous execution can speed up workflows that rely on parallel web search.
95 changes: 92 additions & 3 deletions agent.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
import json
import yaml
from openai import OpenAI
import asyncio
from typing import Dict, Any, List
from openai import OpenAI, AsyncOpenAI
from tools import discover_tools
from tygent.dag import DAG
from tygent.nodes import Node
from tygent.scheduler import Scheduler

class OpenRouterAgent:
def __init__(self, config_path="config.yaml", silent=False):
Expand All @@ -12,11 +17,15 @@ def __init__(self, config_path="config.yaml", silent=False):
# Silent mode for orchestrator (suppresses debug output)
self.silent = silent

# Initialize OpenAI client with OpenRouter
# Initialize OpenAI clients (sync and async) with OpenRouter
self.client = OpenAI(
base_url=self.config['openrouter']['base_url'],
api_key=self.config['openrouter']['api_key']
)
self.async_client = AsyncOpenAI(
base_url=self.config['openrouter']['base_url'],
api_key=self.config['openrouter']['api_key']
)

# Discover tools dynamically
self.discovered_tools = discover_tools(self.config, silent=self.silent)
Expand All @@ -39,6 +48,27 @@ def call_llm(self, messages):
return response
except Exception as e:
raise Exception(f"LLM call failed: {str(e)}")

async def generate_search_queries(self, user_input: str) -> List[str]:
"""Use the async client to generate web search queries for a task."""
prompt = (
"You are a planning assistant. Generate 3 concise web search queries "
"that would help answer the following question. Return ONLY a JSON "
"array of strings.\nQuestion: " + user_input
)
messages = [
{"role": "system", "content": "You generate search queries."},
{"role": "user", "content": prompt},
]
try:
resp = await self.async_client.chat.completions.create(
model=self.config["openrouter"]["model"],
messages=messages,
)
content = resp.choices[0].message.content
return json.loads(content)
except Exception:
return [user_input]

def handle_tool_call(self, tool_call):
"""Handle a tool call and return the result message"""
Expand Down Expand Up @@ -140,4 +170,63 @@ def run(self, user_input: str):
# Continue the loop regardless of whether there were tool calls or not

# If max iterations reached, return whatever content we gathered
return "\n\n".join(full_response_content) if full_response_content else "Maximum iterations reached. The agent may be stuck in a loop."
return "\n\n".join(full_response_content) if full_response_content else "Maximum iterations reached. The agent may be stuck in a loop."

async def run_async(self, user_input: str) -> str:
"""Run the agent using a Tygent DAG with real async execution."""

queries = await self.generate_search_queries(user_input)

dag = DAG("agent_plan")
search_nodes = []

class SearchNode(Node):
def __init__(self, agent: "OpenRouterAgent", name: str, query: str) -> None:
super().__init__(name)
self.agent = agent
self.query = query

async def execute(self, _inputs: Dict[str, Any]) -> list:
tool = self.agent.discovered_tools.get("search_web")
return await asyncio.to_thread(
tool.execute,
query=self.query,
max_results=self.agent.config["search"].get("max_results", 5),
)

for idx, q in enumerate(queries):
node_name = f"search_{idx+1}"
dag.add_node(SearchNode(self, node_name, q))
search_nodes.append(node_name)

class SummarizeNode(Node):
def __init__(self, agent: "OpenRouterAgent") -> None:
super().__init__("summarize")
self.agent = agent

async def execute(self, inputs: Dict[str, Any]) -> str:
combined = json.dumps(inputs)
messages = [
{"role": "system", "content": self.agent.config["system_prompt"]},
{
"role": "user",
"content": (
"Use these search results to answer the question: "
+ user_input
+ "\n" + combined
),
},
]
resp = await self.agent.async_client.chat.completions.create(
model=self.agent.config["openrouter"]["model"],
messages=messages,
)
return resp.choices[0].message.content

dag.add_node(SummarizeNode(self))
for name in search_nodes:
dag.add_edge(name, "summarize")

scheduler = Scheduler(dag)
result = await scheduler.execute({})
return result["results"]["summarize"]
42 changes: 42 additions & 0 deletions benchmark.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import time
from orchestrator import TaskOrchestrator

EXAMPLES = [
# Prompts chosen to require up-to-date information so that
# the asynchronous search DAG provides a real advantage.
"Research the impact of AI on renewable energy.",
"Find recent developments in clean energy policy worldwide.",
"Gather the latest statistics on global electric vehicle adoption."
]


def benchmark():
rows = []
for prompt in EXAMPLES:
orch_tygent = TaskOrchestrator(silent=True, use_tygent=True)
start = time.perf_counter()
try:
orch_tygent.orchestrate(prompt)
except Exception:
pass
tygent_time = time.perf_counter() - start

orch_thread = TaskOrchestrator(silent=True, use_tygent=False)
start = time.perf_counter()
try:
orch_thread.orchestrate(prompt)
except Exception:
pass
thread_time = time.perf_counter() - start

rows.append((prompt, tygent_time, thread_time))

print("Benchmark Results (seconds)")
print(f"{'Prompt':<40} {'Tygent':>10} {'Threads':>10}")
for prompt, t_time, thr_time in rows:
label = (prompt[:37] + '...') if len(prompt) > 40 else prompt
print(f"{label:<40} {t_time:>10.2f} {thr_time:>10.2f}")


if __name__ == '__main__':
benchmark()
2 changes: 1 addition & 1 deletion config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -58,4 +58,4 @@ orchestrator:
# Search tool settings
search:
max_results: 5
user_agent: "Mozilla/5.0 (compatible; OpenRouter Agent)"
user_agent: "Mozilla/5.0 (compatible; OpenRouter Agent)"
5 changes: 3 additions & 2 deletions main.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
from agent import OpenRouterAgent

def main():
Expand Down Expand Up @@ -32,7 +33,7 @@ def main():
continue

print("Agent: Thinking...")
response = agent.run(user_input)
response = asyncio.run(agent.run_async(user_input))
print(f"Agent: {response}")

except KeyboardInterrupt:
Expand All @@ -43,4 +44,4 @@ def main():
print("Please try again or type 'quit' to exit.")

if __name__ == "__main__":
main()
main()
2 changes: 1 addition & 1 deletion make_it_heavy.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,4 +191,4 @@ def main():
cli.interactive_mode()

if __name__ == "__main__":
main()
main()
Loading