diff --git a/README.md b/README.md index 617ca3b..f599575 100644 --- a/README.md +++ b/README.md @@ -320,6 +320,12 @@ make it heavy/ └── task_done_tool.py # Task completion ``` +## ⏱ Benchmarking + +Run `benchmark.py` to compare how long multi-agent workflows take when using +Tygent's asynchronous scheduler versus the standard thread pool fallback. +The script prints a simple table of execution times for several example prompts. + ## 🤝 Contributing 1. Fork the repository diff --git a/TYGENT_INTEGRATION.md b/TYGENT_INTEGRATION.md new file mode 100644 index 0000000..85d09f5 --- /dev/null +++ b/TYGENT_INTEGRATION.md @@ -0,0 +1,15 @@ +# Tygent Integration + +This project uses [Tygent](https://github.com/OpenPipe/tygent) to schedule agents asynchronously and run their tasks in a DAG. The integration occurs in two places: + +1. **Orchestrator (`orchestrator.py`)** + - If `use_tygent` is enabled when creating `TaskOrchestrator`, the orchestrator uses `MultiAgentManager` to run multiple agents concurrently. Each agent is wrapped in a tiny class with an `execute` coroutine that calls `run_agent_async`. + - When Tygent execution fails or is disabled, the orchestrator falls back to a thread pool so functionality does not break. + - Progress updates and aggregation work the same regardless of the execution strategy. + +2. **Agent (`agent.py`)** + - Each `OpenRouterAgent` exposes a `run_async` method that builds a Tygent `DAG` representing the agent plan. + - The DAG issues web search nodes in parallel and then summarizes the results with the LLM using Tygent's `Scheduler`. + - This allows individual agents to benefit from asynchronous execution while still supporting the synchronous `run` method used by the thread‑pool fallback. + +The `benchmark.py` script demonstrates the performance difference by running example prompts with Tygent enabled and disabled. It shows how asynchronous execution can speed up workflows that rely on parallel web search. diff --git a/agent.py b/agent.py index cc5a58a..49e7cd0 100644 --- a/agent.py +++ b/agent.py @@ -1,7 +1,12 @@ import json import yaml -from openai import OpenAI +import asyncio +from typing import Dict, Any, List +from openai import OpenAI, AsyncOpenAI from tools import discover_tools +from tygent.dag import DAG +from tygent.nodes import Node +from tygent.scheduler import Scheduler class OpenRouterAgent: def __init__(self, config_path="config.yaml", silent=False): @@ -12,11 +17,15 @@ def __init__(self, config_path="config.yaml", silent=False): # Silent mode for orchestrator (suppresses debug output) self.silent = silent - # Initialize OpenAI client with OpenRouter + # Initialize OpenAI clients (sync and async) with OpenRouter self.client = OpenAI( base_url=self.config['openrouter']['base_url'], api_key=self.config['openrouter']['api_key'] ) + self.async_client = AsyncOpenAI( + base_url=self.config['openrouter']['base_url'], + api_key=self.config['openrouter']['api_key'] + ) # Discover tools dynamically self.discovered_tools = discover_tools(self.config, silent=self.silent) @@ -39,6 +48,27 @@ def call_llm(self, messages): return response except Exception as e: raise Exception(f"LLM call failed: {str(e)}") + + async def generate_search_queries(self, user_input: str) -> List[str]: + """Use the async client to generate web search queries for a task.""" + prompt = ( + "You are a planning assistant. Generate 3 concise web search queries " + "that would help answer the following question. Return ONLY a JSON " + "array of strings.\nQuestion: " + user_input + ) + messages = [ + {"role": "system", "content": "You generate search queries."}, + {"role": "user", "content": prompt}, + ] + try: + resp = await self.async_client.chat.completions.create( + model=self.config["openrouter"]["model"], + messages=messages, + ) + content = resp.choices[0].message.content + return json.loads(content) + except Exception: + return [user_input] def handle_tool_call(self, tool_call): """Handle a tool call and return the result message""" @@ -140,4 +170,63 @@ def run(self, user_input: str): # Continue the loop regardless of whether there were tool calls or not # If max iterations reached, return whatever content we gathered - return "\n\n".join(full_response_content) if full_response_content else "Maximum iterations reached. The agent may be stuck in a loop." \ No newline at end of file + return "\n\n".join(full_response_content) if full_response_content else "Maximum iterations reached. The agent may be stuck in a loop." + + async def run_async(self, user_input: str) -> str: + """Run the agent using a Tygent DAG with real async execution.""" + + queries = await self.generate_search_queries(user_input) + + dag = DAG("agent_plan") + search_nodes = [] + + class SearchNode(Node): + def __init__(self, agent: "OpenRouterAgent", name: str, query: str) -> None: + super().__init__(name) + self.agent = agent + self.query = query + + async def execute(self, _inputs: Dict[str, Any]) -> list: + tool = self.agent.discovered_tools.get("search_web") + return await asyncio.to_thread( + tool.execute, + query=self.query, + max_results=self.agent.config["search"].get("max_results", 5), + ) + + for idx, q in enumerate(queries): + node_name = f"search_{idx+1}" + dag.add_node(SearchNode(self, node_name, q)) + search_nodes.append(node_name) + + class SummarizeNode(Node): + def __init__(self, agent: "OpenRouterAgent") -> None: + super().__init__("summarize") + self.agent = agent + + async def execute(self, inputs: Dict[str, Any]) -> str: + combined = json.dumps(inputs) + messages = [ + {"role": "system", "content": self.agent.config["system_prompt"]}, + { + "role": "user", + "content": ( + "Use these search results to answer the question: " + + user_input + + "\n" + combined + ), + }, + ] + resp = await self.agent.async_client.chat.completions.create( + model=self.agent.config["openrouter"]["model"], + messages=messages, + ) + return resp.choices[0].message.content + + dag.add_node(SummarizeNode(self)) + for name in search_nodes: + dag.add_edge(name, "summarize") + + scheduler = Scheduler(dag) + result = await scheduler.execute({}) + return result["results"]["summarize"] diff --git a/benchmark.py b/benchmark.py new file mode 100644 index 0000000..84f3d4c --- /dev/null +++ b/benchmark.py @@ -0,0 +1,42 @@ +import time +from orchestrator import TaskOrchestrator + +EXAMPLES = [ + # Prompts chosen to require up-to-date information so that + # the asynchronous search DAG provides a real advantage. + "Research the impact of AI on renewable energy.", + "Find recent developments in clean energy policy worldwide.", + "Gather the latest statistics on global electric vehicle adoption." +] + + +def benchmark(): + rows = [] + for prompt in EXAMPLES: + orch_tygent = TaskOrchestrator(silent=True, use_tygent=True) + start = time.perf_counter() + try: + orch_tygent.orchestrate(prompt) + except Exception: + pass + tygent_time = time.perf_counter() - start + + orch_thread = TaskOrchestrator(silent=True, use_tygent=False) + start = time.perf_counter() + try: + orch_thread.orchestrate(prompt) + except Exception: + pass + thread_time = time.perf_counter() - start + + rows.append((prompt, tygent_time, thread_time)) + + print("Benchmark Results (seconds)") + print(f"{'Prompt':<40} {'Tygent':>10} {'Threads':>10}") + for prompt, t_time, thr_time in rows: + label = (prompt[:37] + '...') if len(prompt) > 40 else prompt + print(f"{label:<40} {t_time:>10.2f} {thr_time:>10.2f}") + + +if __name__ == '__main__': + benchmark() diff --git a/config.yaml b/config.yaml index a463bac..0ffd744 100644 --- a/config.yaml +++ b/config.yaml @@ -58,4 +58,4 @@ orchestrator: # Search tool settings search: max_results: 5 - user_agent: "Mozilla/5.0 (compatible; OpenRouter Agent)" \ No newline at end of file + user_agent: "Mozilla/5.0 (compatible; OpenRouter Agent)" diff --git a/main.py b/main.py index 1d2d9fc..dddfdb8 100644 --- a/main.py +++ b/main.py @@ -1,3 +1,4 @@ +import asyncio from agent import OpenRouterAgent def main(): @@ -32,7 +33,7 @@ def main(): continue print("Agent: Thinking...") - response = agent.run(user_input) + response = asyncio.run(agent.run_async(user_input)) print(f"Agent: {response}") except KeyboardInterrupt: @@ -43,4 +44,4 @@ def main(): print("Please try again or type 'quit' to exit.") if __name__ == "__main__": - main() \ No newline at end of file + main() diff --git a/make_it_heavy.py b/make_it_heavy.py index 30ee712..388e6bf 100644 --- a/make_it_heavy.py +++ b/make_it_heavy.py @@ -191,4 +191,4 @@ def main(): cli.interactive_mode() if __name__ == "__main__": - main() \ No newline at end of file + main() diff --git a/orchestrator.py b/orchestrator.py index 7dba0c7..b4d2b97 100644 --- a/orchestrator.py +++ b/orchestrator.py @@ -2,12 +2,14 @@ import yaml import time import threading +import asyncio from concurrent.futures import ThreadPoolExecutor, as_completed from typing import List, Dict, Any from agent import OpenRouterAgent +from tygent.multi_agent import MultiAgentManager class TaskOrchestrator: - def __init__(self, config_path="config.yaml", silent=False): + def __init__(self, config_path="config.yaml", silent=False, use_tygent=True): # Load configuration with open(config_path, 'r') as f: self.config = yaml.safe_load(f) @@ -16,6 +18,7 @@ def __init__(self, config_path="config.yaml", silent=False): self.task_timeout = self.config['orchestrator']['task_timeout'] self.aggregation_strategy = self.config['orchestrator']['aggregation_strategy'] self.silent = silent + self.use_tygent = use_tygent # Track agent progress self.agent_progress = {} @@ -70,28 +73,28 @@ def update_agent_progress(self, agent_id: int, status: str, result: str = None): def run_agent_parallel(self, agent_id: int, subtask: str) -> Dict[str, Any]: """ - Run a single agent with the given subtask. + Run a single agent with the given subtask (synchronous fallback). Returns result dictionary with agent_id, status, and response. """ try: self.update_agent_progress(agent_id, "PROCESSING...") - + # Use simple agent like in main.py agent = OpenRouterAgent(silent=True) - + start_time = time.time() response = agent.run(subtask) execution_time = time.time() - start_time - + self.update_agent_progress(agent_id, "COMPLETED", response) - + return { "agent_id": agent_id, - "status": "success", + "status": "success", "response": response, "execution_time": execution_time } - + except Exception as e: # Simple error handling return { @@ -100,6 +103,29 @@ def run_agent_parallel(self, agent_id: int, subtask: str) -> Dict[str, Any]: "response": f"Error: {str(e)}", "execution_time": 0 } + + async def run_agent_async(self, agent_id: int, subtask: str) -> Dict[str, Any]: + """Asynchronous agent execution used with Tygent.""" + try: + self.update_agent_progress(agent_id, "PROCESSING...") + agent = OpenRouterAgent(silent=True) + start_time = time.time() + response = await agent.run_async(subtask) + execution_time = time.time() - start_time + self.update_agent_progress(agent_id, "COMPLETED", response) + return { + "agent_id": agent_id, + "status": "success", + "response": response, + "execution_time": execution_time, + } + except Exception as e: + return { + "agent_id": agent_id, + "status": "error", + "response": f"Error: {str(e)}", + "execution_time": 0, + } def aggregate_results(self, agent_results: List[Dict[str, Any]]) -> str: """ @@ -166,6 +192,38 @@ def get_progress_status(self) -> Dict[int, str]: """Get current progress status for all agents""" with self.progress_lock: return self.agent_progress.copy() + + async def _run_agents_with_tygent(self, subtasks: List[str]) -> List[Dict[str, Any]]: + """Run agents concurrently using Tygent's MultiAgentManager.""" + + class WrapperAgent: + def __init__(self, orchestrator: "TaskOrchestrator", agent_id: int, task: str) -> None: + self.orchestrator = orchestrator + self.agent_id = agent_id + self.task = task + + async def execute(self, _inputs: Dict[str, Any]) -> Dict[str, Any]: + return await self.orchestrator.run_agent_async(self.agent_id, self.task) + + manager = MultiAgentManager("orchestrator") + for idx, task in enumerate(subtasks): + manager.add_agent(f"agent_{idx+1}", WrapperAgent(self, idx, task)) + + results_dict = await manager.execute({}) + + # Convert results to list sorted by agent id + results = [] + for idx in range(self.num_agents): + res = results_dict.get(f"agent_{idx+1}") + if res is None: + res = { + "agent_id": idx, + "status": "error", + "response": "No response", + "execution_time": 0, + } + results.append(res) + return results def orchestrate(self, user_input: str): """ @@ -184,29 +242,36 @@ def orchestrate(self, user_input: str): for i in range(self.num_agents): self.agent_progress[i] = "QUEUED" - # Execute agents in parallel agent_results = [] - - with ThreadPoolExecutor(max_workers=self.num_agents) as executor: - # Submit all agent tasks - future_to_agent = { - executor.submit(self.run_agent_parallel, i, subtasks[i]): i - for i in range(self.num_agents) - } - - # Collect results as they complete - for future in as_completed(future_to_agent, timeout=self.task_timeout): - try: - result = future.result() - agent_results.append(result) - except Exception as e: - agent_id = future_to_agent[future] - agent_results.append({ - "agent_id": agent_id, - "status": "timeout", - "response": f"Agent {agent_id + 1} timed out or failed: {str(e)}", - "execution_time": self.task_timeout - }) + + # Execute agents concurrently using Tygent when enabled + if self.use_tygent: + try: + agent_results = asyncio.run(self._run_agents_with_tygent(subtasks)) + except Exception: + # Fall back to threads if Tygent execution fails + agent_results = [] + + # Use thread pool when Tygent is disabled or failed + if not agent_results: + with ThreadPoolExecutor(max_workers=self.num_agents) as executor: + future_to_agent = { + executor.submit(self.run_agent_parallel, i, subtasks[i]): i + for i in range(self.num_agents) + } + + for future in as_completed(future_to_agent, timeout=self.task_timeout): + try: + result = future.result() + agent_results.append(result) + except Exception as e: + agent_id = future_to_agent[future] + agent_results.append({ + "agent_id": agent_id, + "status": "timeout", + "response": f"Agent {agent_id + 1} timed out or failed: {str(e)}", + "execution_time": self.task_timeout, + }) # Sort results by agent_id for consistent output agent_results.sort(key=lambda x: x["agent_id"]) @@ -214,4 +279,4 @@ def orchestrate(self, user_input: str): # Aggregate results final_result = self.aggregate_results(agent_results) - return final_result \ No newline at end of file + return final_result