diff --git a/README.md b/README.md index 6ae6b3233..ba9a37088 100644 --- a/README.md +++ b/README.md @@ -169,29 +169,43 @@ agent = A1() # Everything uses gpt-4, 1200s timeout For detailed configuration options, see the **[Configuration Guide](docs/configuration.md)**. +### PDF Generation -## Running Tests +Generate PDF reports of execution traces: -Biomni uses Python's built-in unittest framework. +```python +from biomni.agent import A1 + +# Initialize agent +agent = A1(path='./data', llm='claude-sonnet-4-20250514') -- Run all tests in the tests/ folder (recommended): +# Run your task +agent.go("Your biomedical task here") -```bash -python3 -m unittest discover -s tests -p 'test_*.py' -q +# Save conversation history as PDF +agent.save_conversation_history("my_analysis_results.pdf") ``` -- Run only the glycoengineering tests: +**PDF Generation Dependencies:** + +For optimal PDF generation, install one of these packages: ```bash -python3 -m unittest -q tests/test_glycoengineering.py -``` +# Option 1: WeasyPrint (recommended for best layout control) +pip install weasyprint -Notes: -- The top-level package import may require optional dependencies (e.g., pandas). The commands above restrict discovery to the tests/ folder to avoid importing the entire biomni package during discovery. -- Ensure you are in your Biomni environment before running tests: +# Option 2: markdown2pdf (Rust-based, fast and reliable) +# macOS: +brew install theiskaa/tap/markdown2pdf -```bash -conda activate biomni_e1 +# Windows/Linux (using Cargo): +cargo install markdown2pdf + +# Or download prebuilt binaries from: +# https://github.com/theiskaa/markdown2pdf/releases/latest + +# Option 3: Pandoc (pip installation) +pip install pandoc ``` ## MCP (Model Context Protocol) Support diff --git a/biomni/agent/a1.py b/biomni/agent/a1.py index c69d5230d..df151e4e4 100644 --- a/biomni/agent/a1.py +++ b/biomni/agent/a1.py @@ -3,6 +3,7 @@ import os import re from collections.abc import Generator +from datetime import datetime from pathlib import Path from typing import Any, Literal, TypedDict @@ -20,12 +21,24 @@ from biomni.tool.tool_registry import ToolRegistry from biomni.utils import ( check_and_download_s3_files, + clean_message_content, + convert_markdown_to_pdf, + create_parsing_error_html, + find_matching_execution, + format_execute_tags_in_content, + format_lists_in_text, + format_observation_as_terminal, function_to_api_schema, + has_execution_results, + inject_custom_functions_to_repl, + parse_tool_calls_from_code, + parse_tool_calls_with_modules, pretty_print, read_module2api, run_bash_script, run_r_code, run_with_timeout, + should_skip_message, textify_api_dict, ) @@ -1104,14 +1117,14 @@ def format_item_with_description(name, description): if custom_tools_formatted: prompt_modifier += """ -CUSTOM TOOLS (USE THESE FIRST): +🔧 CUSTOM TOOLS (USE THESE FIRST): {custom_tools} """ if custom_data_formatted: prompt_modifier += """ -CUSTOM DATA (PRIORITIZE THESE DATASETS): +📊 CUSTOM DATA (PRIORITIZE THESE DATASETS): {custom_data} """ @@ -1305,7 +1318,7 @@ def generate(state: AgentState) -> AgentState: state["next_step"] = "generate" else: print("parsing error...") - # Check if we already added an error message to avoid infinite loops + error_count = sum( 1 for m in state["messages"] if isinstance(m, AIMessage) and "There are no tags" in m.content ) @@ -1350,7 +1363,7 @@ def execute(state: AgentState) -> AgentState: or code.strip().startswith("# R script") ): # Remove the R marker and run as R code - r_code = re.sub(r"^#!R|^# R code|^# R script", "", code, 1).strip() # noqa: B034 + r_code = re.sub(r"^#!R|^# R code|^# R script", "", code, count=1).strip() result = run_with_timeout(run_r_code, [r_code], timeout=timeout) # Check if the code is a Bash script or CLI command elif ( @@ -1361,25 +1374,54 @@ def execute(state: AgentState) -> AgentState: # Handle both Bash scripts and CLI commands with the same function if code.strip().startswith("#!CLI"): # For CLI commands, extract the command and run it as a simple bash script - cli_command = re.sub(r"^#!CLI", "", code, 1).strip() # noqa: B034 + cli_command = re.sub(r"^#!CLI", "", code, count=1).strip() # Remove any newlines to ensure it's a single command cli_command = cli_command.replace("\n", " ") result = run_with_timeout(run_bash_script, [cli_command], timeout=timeout) else: # For Bash scripts, remove the marker and run as a bash script - bash_script = re.sub(r"^#!BASH|^# Bash script", "", code, 1).strip() # noqa: B034 + bash_script = re.sub(r"^#!BASH|^# Bash script", "", code, count=1).strip() result = run_with_timeout(run_bash_script, [bash_script], timeout=timeout) # Otherwise, run as Python code else: + # Clear any previous plots before execution + self._clear_execution_plots() + # Inject custom functions into the Python execution environment self._inject_custom_functions_to_repl() result = run_with_timeout(run_python_repl, [code], timeout=timeout) + # Plots are now captured directly in the execution entry above + if len(result) > 10000: result = ( "The output is too long to be added to context. Here are the first 10K characters...\n" + result[:10000] ) + + # Store the execution result with the triggering message + if not hasattr(self, "_execution_results"): + self._execution_results = [] + + # Get any plots that were generated during this execution + execution_plots = [] + try: + from biomni.tool.support_tools import get_captured_plots + + current_plots = get_captured_plots() + execution_plots = current_plots.copy() + except Exception as e: + print(f"Warning: Could not capture plots from execution: {e}") + execution_plots = [] + + # Store the execution result with metadata + execution_entry = { + "triggering_message": last_message, # The AI message that contained + "images": execution_plots, # Base64 encoded images from this execution + "timestamp": datetime.now().isoformat(), + } + self._execution_results.append(execution_entry) + observation = f"\n{result}" state["messages"].append(AIMessage(content=observation.strip())) @@ -1569,10 +1611,17 @@ def go(self, prompt): config = {"recursion_limit": 500, "configurable": {"thread_id": 42}} self.log = [] + # Store the final conversation state for markdown generation + final_state = None + for s in self.app.stream(inputs, stream_mode="values", config=config): message = s["messages"][-1] out = pretty_print(message) self.log.append(out) + final_state = s # Store the latest state + + # Store the conversation state for markdown generation + self._conversation_state = final_state return self.log, message.content @@ -1599,14 +1648,21 @@ def go_stream(self, prompt) -> Generator[dict, None, None]: config = {"recursion_limit": 500, "configurable": {"thread_id": 42}} self.log = [] + # Store the final conversation state for markdown generation + final_state = None + for s in self.app.stream(inputs, stream_mode="values", config=config): message = s["messages"][-1] out = pretty_print(message) self.log.append(out) + final_state = s # Store the latest state # Yield the current step yield {"output": out} + # Store the conversation state for markdown generation + self._conversation_state = final_state + def update_system_prompt_with_selected_resources(self, selected_resources): """Update the system prompt with the selected resources.""" # Extract tool descriptions for the selected tools @@ -1736,24 +1792,38 @@ def result_formatting(self, output_class, task_intention): result = checker_llm.invoke({"messages": [("user", str(self.log))]}).dict() return result - def _inject_custom_functions_to_repl(self): - """Inject custom functions into the Python REPL execution environment. - This makes custom tools available during code execution. + def _parse_tool_calls_from_code(self, code: str) -> list[str]: + """Parse code to detect imported tools by looking for import statements. + + Args: + code: The Python code to parse + + Returns: + List of detected tool names """ - if hasattr(self, "_custom_functions") and self._custom_functions: - # Access the persistent namespace used by run_python_repl - from biomni.tool.support_tools import _persistent_namespace + module2api = getattr(self, "module2api", {}) + custom_functions = getattr(self, "_custom_functions", {}) + return parse_tool_calls_from_code(code, module2api, custom_functions) - # Inject all custom functions into the execution namespace - for name, func in self._custom_functions.items(): - _persistent_namespace[name] = func + def _parse_tool_calls_with_modules(self, code: str) -> list[tuple[str, str]]: + """Parse code to detect imported tools and their modules. - # Also make them available in builtins for broader access - import builtins + Args: + code: The Python code to parse - if not hasattr(builtins, "_biomni_custom_functions"): - builtins._biomni_custom_functions = {} - builtins._biomni_custom_functions.update(self._custom_functions) + Returns: + List of tuples (tool_name, module_name) + """ + module2api = getattr(self, "module2api", {}) + custom_functions = getattr(self, "_custom_functions", {}) + return parse_tool_calls_with_modules(code, module2api, custom_functions) + + def _inject_custom_functions_to_repl(self): + """Inject custom functions into the Python REPL execution environment. + This makes custom tools available during code execution. + """ + custom_functions = getattr(self, "_custom_functions", {}) + inject_custom_functions_to_repl(custom_functions) def create_mcp_server(self, tool_modules=None): """ @@ -1821,6 +1891,486 @@ def create_mcp_server(self, tool_modules=None): print(f"Created MCP server with {registered_tools} tools") return mcp + def save_conversation_history(self, filepath: str, include_images: bool = True, save_pdf: bool = True) -> None: + """Save the complete conversation history as PDF only. + + This function generates and saves the complete conversation history from the agent's + log and conversation state. It creates a temporary markdown file with formatted content + including steps, code execution, observations, and optionally images, then converts it + to PDF format. The markdown file is automatically cleaned up after PDF conversion. + + Args: + filepath: Path where to save the PDF file (without extension). If the path doesn't + end with .pdf, it will be automatically appended. + include_images: Whether to include captured plots and images in the output. + Defaults to True. + save_pdf: Whether to save as PDF. Defaults to True. If False, no file is saved. + + Note: + The function includes a 60-second timeout for PDF generation to prevent + hanging. A temporary markdown file is created and automatically deleted. + """ + import os + import tempfile + + if not save_pdf: + print("PDF saving is disabled. No file will be saved.") + return + + # Ensure directory exists + directory = os.path.dirname(filepath) + if directory: # Only create directory if it's not empty + os.makedirs(directory, exist_ok=True) + + # Create PDF file path - use the user's filename and add .pdf extension + if filepath.endswith(".pdf"): + pdf_path = filepath + else: + # Remove any existing .md extension if present, then add .pdf + base_name = filepath + if base_name.endswith(".md"): + base_name = base_name[:-3] # Remove .md extension + pdf_path = f"{base_name}.pdf" + + # Create markdown content + markdown_content = self._generate_markdown_content(include_images) + + # Create a temporary markdown file + with tempfile.NamedTemporaryFile(mode="w", suffix=".md", delete=False, encoding="utf-8") as temp_file: + temp_file.write(markdown_content) + temp_markdown_path = temp_file.name + + try: + # Add timeout for PDF generation to prevent hanging + import signal + + def timeout_handler(signum, frame): + raise TimeoutError("PDF generation timed out") + + # Set timeout to 60 seconds + signal.signal(signal.SIGALRM, timeout_handler) + signal.alarm(60) + + try: + self._convert_markdown_to_pdf(temp_markdown_path, pdf_path) + print(f"Conversation history saved as PDF: {pdf_path}") + print(f"Total steps recorded: {len(self.log)}") + finally: + signal.alarm(0) # Cancel the alarm + + except TimeoutError: + print("Warning: PDF generation timed out after 60 seconds") + except Exception as e: + print(f"Warning: Could not convert to PDF: {e}") + finally: + # Clean up the temporary markdown file + try: + os.unlink(temp_markdown_path) + except OSError: + pass # File might already be deleted + + def _generate_markdown_content(self, include_images: bool = True) -> str: + """Generate markdown content from conversation history using both log and conversation state. + + This function processes the agent's conversation history from either the conversation + state (if available) or the internal log to create a formatted markdown document. + It handles step numbering, message processing, and content formatting. + + Args: + include_images: Whether to include captured plots and images in the output. + Defaults to True. + + Returns: + Formatted markdown string containing the complete conversation history + with proper step numbering and content structure. + """ + + # Initialize content and tracking variables + content = """# Biomni Agent Conversation History + +""" + added_plots = set() + step_number = 0 + first_human_shown = False + + # Get data source (conversation state or log) + messages = self._get_messages_for_processing() + + # Process all messages using unified logic + for message_data in messages: + content, step_number, first_human_shown = self._process_message( + message_data, content, step_number, first_human_shown, added_plots, include_images + ) + + return content + + def _get_messages_for_processing(self): + """Get messages from conversation state or fallback to log. + + This function determines the best source for conversation messages, prioritizing + the conversation state if available, otherwise falling back to the internal log. + It normalizes the messages into a unified format for processing. + + Returns: + List of normalized message dictionaries with 'content', 'type', and 'original' keys + """ + conversation_state = getattr(self, "_conversation_state", None) + + if conversation_state and hasattr(conversation_state, "get") and "messages" in conversation_state: + print(f"DEBUG: Using conversation state with {len(conversation_state['messages'])} messages") + return self._normalize_conversation_state_messages(conversation_state["messages"]) + else: + print(f"DEBUG: Using self.log with {len(self.log)} entries") + return self._normalize_log_messages(self.log) + + def _normalize_conversation_state_messages(self, messages): + """Convert conversation state messages to unified format. + + This function takes LangChain message objects from the conversation state and + converts them into a standardized dictionary format that the markdown generation + system can work with. It extracts content and determines message types. + + Args: + messages: List of LangChain message objects (HumanMessage, AIMessage, etc.) + + Returns: + List of normalized message dictionaries with 'content', 'type', and 'original' keys + """ + normalized = [] + for message in messages: + if hasattr(message, "content"): + content = str(message.content) + else: + content = str(message) + + # Determine message type + if isinstance(message, HumanMessage): + msg_type = "human" + elif isinstance(message, AIMessage): + msg_type = "ai" + else: + msg_type = "other" + + normalized.append({"content": content, "type": msg_type, "original": message}) + + return normalized + + def _normalize_log_messages(self, log_entries): + """Convert log entries to unified format. + + This function takes internal log entries and converts them into the same + standardized format as conversation state messages. It parses the log format + to determine message types and extract content. + + Args: + log_entries: List of log entry strings from the agent's internal log + + Returns: + List of normalized message dictionaries with 'content', 'type', and 'original' keys + """ + normalized = [] + for log_entry in log_entries: + content = str(log_entry) + + # Determine message type from log format + if "Human Message" in content: + msg_type = "human" + elif "Ai Message" in content: + msg_type = "ai" + else: + msg_type = "other" + + normalized.append({"content": content, "type": msg_type, "original": log_entry}) + + return normalized + + def _process_message(self, message_data, content, step_number, first_human_shown, added_plots, include_images): + """Process a single message and return updated state. + + This function is the main dispatcher for processing individual messages in the + conversation history. It determines the message type and delegates to the + appropriate processing function. + + Args: + message_data: Dictionary containing 'content', 'type', and 'original' keys + content: Current markdown content string + step_number: Current step number counter + first_human_shown: Boolean flag indicating if first human message was shown + added_plots: Set of already added plot data to avoid duplicates + include_images: Whether to include images in the output + + Returns: + Tuple of (updated_content, updated_step_number, updated_first_human_shown) + """ + clean_output = clean_message_content(message_data["content"]) + msg_type = message_data["type"] + + if msg_type == "human": + return self._process_human_message(clean_output, content, step_number, first_human_shown) + elif msg_type == "ai": + return self._process_ai_message(clean_output, content, step_number, added_plots, include_images) + else: + return self._process_other_message( + clean_output, content, step_number, first_human_shown, added_plots, include_images + ) + + def _process_human_message(self, clean_output, content, step_number, first_human_shown): + """Process human messages. + + This function handles human messages in the conversation history. It identifies + parsing error messages and displays them appropriately, or formats the first + human prompt as a special section. + + Args: + clean_output: Cleaned message content with ANSI codes removed + content: Current markdown content string + step_number: Current step number counter (unchanged for human messages) + first_human_shown: Boolean flag indicating if first human message was shown + + Returns: + Tuple of (updated_content, step_number, updated_first_human_shown) + + Note: + Human messages don't increment the step counter as they are not considered + steps in the agent's process. + """ + if "each response must include thinking process" in clean_output.lower(): + parsing_error_content = create_parsing_error_html() + content += f"{parsing_error_content}\n\n" + elif not first_human_shown: + content += "#### Human Prompt\n\n" + content += f"*{clean_output}*\n\n" + first_human_shown = True + + return content, step_number, first_human_shown # step_number unchanged + + def _process_ai_message(self, clean_output, content, step_number, added_plots, include_images): + """Process AI messages. + + This function handles AI messages in the conversation history. It can process + both regular AI responses and messages containing observation tags. It handles + step numbering, execution results, and content formatting. + + Args: + clean_output: Cleaned message content with ANSI codes removed + content: Current markdown content string + step_number: Current step number counter + added_plots: Set of already added plot data to avoid duplicates + include_images: Whether to include images in the output + + Returns: + Tuple of (updated_content, updated_step_number, True) + + Note: + This function can split messages containing observation tags and process + each part separately, with observations formatted as terminal blocks. + """ + # Check if this message contains observation tags and process accordingly + import re + + observation_pattern = r"(.*?)" + observation_matches = re.findall(observation_pattern, clean_output, re.DOTALL | re.IGNORECASE) + + if observation_matches: + # Extract content before, between, and after observation tags + parts = re.split(observation_pattern, clean_output, flags=re.DOTALL | re.IGNORECASE) + + # Process each part + for i, part in enumerate(parts): + if i % 2 == 0: # Even indices are non-observation content + if part.strip(): + # This is regular content - process it normally + if not should_skip_message(part): + if part.strip(): + step_number += 1 + content += f"#### Step {step_number}\n\n" + + # Handle execution results if present + execution_results = getattr(self, "_execution_results", None) + if has_execution_results(part, execution_results): + content, added_plots = self._process_execution_with_results( + part, content, added_plots, include_images, execution_results + ) + else: + content = self._process_regular_ai_message(part, content) + else: # Odd indices are observation content + if part.strip(): + # This is observation content - format as terminal + formatted_observation = format_observation_as_terminal(f"{part}") + if formatted_observation is not None: + content += f"{formatted_observation}\n\n" + + return content, step_number, True + + # Skip empty or error messages + if should_skip_message(clean_output): + return content, step_number, True + + if clean_output.strip(): + step_number += 1 + content += f"#### Step {step_number}\n\n" + + # Handle execution results if present + execution_results = getattr(self, "_execution_results", None) + if has_execution_results(clean_output, execution_results): + content, added_plots = self._process_execution_with_results( + clean_output, content, added_plots, include_images, execution_results + ) + else: + content = self._process_regular_ai_message(clean_output, content) + + return content, step_number, True + + def _process_other_message( + self, clean_output, content, step_number, first_human_shown, added_plots, include_images + ): + """Process other message types. + + This function handles message types that are neither human nor AI messages. + It checks for observation tags and processes them accordingly, or adds the + content as regular text. + + Args: + clean_output: Cleaned message content with ANSI codes removed + content: Current markdown content string + step_number: Current step number counter + first_human_shown: Boolean flag indicating if first human message was shown + added_plots: Set of already added plot data to avoid duplicates + include_images: Whether to include images in the output + + Returns: + Tuple of (updated_content, step_number, first_human_shown) + """ + # Check if this is actually an observation (has tags) + import re + + if not re.search(r"", clean_output, re.IGNORECASE): + content += f"{clean_output}\n\n" + return content, step_number, first_human_shown + + def _process_execution_with_results(self, clean_output, content, added_plots, include_images, execution_results): + """Process AI message with execution results. + + This function handles AI messages that have associated execution results. + It finds the matching execution result and adds any captured plots or images + to the content. + + Args: + clean_output: Cleaned message content with ANSI codes removed + content: Current markdown content string + added_plots: Set of already added plot data to avoid duplicates + include_images: Whether to include images in the output + execution_results: List of execution result dictionaries + + Returns: + Tuple of (updated_content, updated_added_plots) + """ + matching_execution = find_matching_execution(clean_output, execution_results) + + if matching_execution: + content = self._format_and_add_content(clean_output, content) + content, added_plots = self._add_execution_plots(matching_execution, content, added_plots, include_images) + else: + content = self._format_and_add_content(clean_output, content) + + return content, added_plots + + def _format_and_add_content(self, clean_output, content): + """Format and add content to markdown. + + This function applies formatting to AI message content before adding it to the + markdown. It processes lists, execute tags, and tool calls to create properly + formatted markdown content. + + Args: + clean_output: Cleaned message content with ANSI codes removed + content: Current markdown content string + + Returns: + Updated markdown content string with formatted content added + """ + # Process lists first, then execute tags + formatted_content = format_lists_in_text(clean_output) + + # Create a wrapper function for the tool parsing + def parse_tool_calls_wrapper(code): + return self._parse_tool_calls_with_modules(code) + + formatted_content = format_execute_tags_in_content(formatted_content, parse_tool_calls_wrapper) + return content + f"{formatted_content}\n\n" + + def _add_execution_plots(self, matching_execution, content, added_plots, include_images): + """Add plots from execution results. + + This function adds captured plots and images from execution results to the + markdown content. It prevents duplicate plots from being added multiple times. + + Args: + matching_execution: Execution result dictionary containing image data + content: Current markdown content string + added_plots: Set of already added plot data to avoid duplicates + include_images: Whether to include images in the output + + Returns: + Tuple of (updated_content, updated_added_plots) + """ + if include_images and matching_execution.get("images"): + for plot_data in matching_execution["images"]: + if plot_data not in added_plots: + content += f"![Plot]({plot_data})\n\n" + added_plots.add(plot_data) + return content, added_plots + + def _process_regular_ai_message(self, clean_output, content): + """Process regular AI message without execution results. + + This function handles AI messages that don't have associated execution results. + It applies standard formatting and adds the content to the markdown. + + Args: + clean_output: Cleaned message content with ANSI codes removed + content: Current markdown content string + + Returns: + Updated markdown content string with formatted content added + """ + return self._format_and_add_content(clean_output, content) + + def _convert_markdown_to_pdf(self, markdown_path: str, pdf_path: str) -> None: + """Convert markdown file to PDF using weasyprint or markdown2pdf. + + This function is a wrapper around the utility function for converting markdown + to PDF. It provides a clean interface for the agent to convert conversation + history to PDF format. + + Args: + markdown_path: Path to the input markdown file + pdf_path: Path where the output PDF file should be saved + + Note: + This function delegates to the convert_markdown_to_pdf utility function + which handles multiple PDF conversion libraries and fallbacks. + """ + convert_markdown_to_pdf(markdown_path, pdf_path) + + def _clear_execution_plots(self): + """Clear execution plots before new execution. + + This function clears any previously captured plots from the execution environment + before starting a new execution. This prevents old plots from appearing in + new execution results. + + Note: + This function calls the clear_captured_plots utility function and handles + any exceptions gracefully to prevent execution failures. + """ + try: + from biomni.tool.support_tools import clear_captured_plots + + clear_captured_plots() + except Exception as e: + print(f"Warning: Could not clear execution plots: {e}") + def _generate_mcp_wrapper_from_biomni_schema(self, original_func, func_name, required_params, optional_params): """Generate wrapper function based on Biomni schema format.""" import inspect diff --git a/biomni/tool/support_tools.py b/biomni/tool/support_tools.py index 6dcd10177..6bde4b39a 100644 --- a/biomni/tool/support_tools.py +++ b/biomni/tool/support_tools.py @@ -1,9 +1,14 @@ +import base64 +import io import sys from io import StringIO # Create a persistent namespace that will be shared across all executions _persistent_namespace = {} +# Global list to store captured plots +_captured_plots = [] + def run_python_repl(command: str) -> str: """Executes the provided Python command in a persistent environment and returns the output. @@ -19,9 +24,16 @@ def execute_in_repl(command: str) -> str: global _persistent_namespace try: + # Apply matplotlib monkey patches before execution + _apply_matplotlib_patches() + # Execute the command in the persistent namespace exec(command, _persistent_namespace) output = mystdout.getvalue() + + # Capture any matplotlib plots that were generated + # _capture_matplotlib_plots() + except Exception as e: output = f"Error: {str(e)}" finally: @@ -32,6 +44,100 @@ def execute_in_repl(command: str) -> str: return execute_in_repl(command) +def _capture_matplotlib_plots(): + """Capture any matplotlib plots that might have been generated during execution.""" + global _captured_plots + try: + import matplotlib.pyplot as plt + + # Check if there are any active figures + if plt.get_fignums(): + for fig_num in plt.get_fignums(): + fig = plt.figure(fig_num) + + # Save figure to base64 + buffer = io.BytesIO() + fig.savefig(buffer, format="png", dpi=150, bbox_inches="tight") + buffer.seek(0) + + # Convert to base64 + image_data = base64.b64encode(buffer.getvalue()).decode("utf-8") + plot_data = f"data:image/png;base64,{image_data}" + + # Add to captured plots if not already there + if plot_data not in _captured_plots: + _captured_plots.append(plot_data) + + # Close the figure to free memory + plt.close(fig) + + except ImportError: + # matplotlib not available + pass + except Exception as e: + print(f"Warning: Could not capture matplotlib plots: {e}") + + +def _apply_matplotlib_patches(): + """Apply simple monkey patches to matplotlib functions to automatically capture plots.""" + try: + import matplotlib.pyplot as plt + + # Only patch if matplotlib is available and not already patched + if hasattr(plt, "_biomni_patched"): + return + + # Store original functions + original_show = plt.show + original_savefig = plt.savefig + + def show_with_capture(*args, **kwargs): + """Enhanced show function that captures plots before displaying them.""" + # Capture any plots before showing + _capture_matplotlib_plots() + # Print a message to indicate plot was generated + print("Plot generated and displayed") + # Call the original show function + return original_show(*args, **kwargs) + + def savefig_with_capture(*args, **kwargs): + """Enhanced savefig function that captures plots after saving them.""" + # Get the filename from args if provided + filename = args[0] if args else kwargs.get("fname", "unknown") + # Call the original savefig function + result = original_savefig(*args, **kwargs) + # Capture the plot after saving + _capture_matplotlib_plots() + # Print a message to indicate plot was saved + print(f"Plot saved to: {filename}") + return result + + # Replace functions with enhanced versions + plt.show = show_with_capture + plt.savefig = savefig_with_capture + + # Mark as patched to avoid double-patching + plt._biomni_patched = True + + except ImportError: + # matplotlib not available + pass + except Exception as e: + print(f"Warning: Could not apply matplotlib patches: {e}") + + +def get_captured_plots(): + """Get all captured matplotlib plots.""" + global _captured_plots + return _captured_plots.copy() + + +def clear_captured_plots(): + """Clear all captured matplotlib plots.""" + global _captured_plots + _captured_plots = [] + + def read_function_source_code(function_name: str) -> str: """Read the source code of a function from any module path. diff --git a/biomni/tool/synthetic_biology.py b/biomni/tool/synthetic_biology.py index 7d2ec48fd..768c1359a 100644 --- a/biomni/tool/synthetic_biology.py +++ b/biomni/tool/synthetic_biology.py @@ -245,6 +245,9 @@ def analyze_bacterial_growth_rate(time_points, od_measurements, strain_name="Unk import os from datetime import datetime + import matplotlib + + matplotlib.use("Agg") # Use non-interactive backend import matplotlib.pyplot as plt import numpy as np from scipy.optimize import curve_fit @@ -533,6 +536,9 @@ def analyze_bifurcation_diagram(time_series_data, parameter_values, system_name= """ import os + import matplotlib + + matplotlib.use("Agg") # Use non-interactive backend import matplotlib.pyplot as plt import numpy as np from scipy.signal import find_peaks diff --git a/biomni/utils.py b/biomni/utils.py index 498a38576..f071089a6 100644 --- a/biomni/utils.py +++ b/biomni/utils.py @@ -1014,3 +1014,1348 @@ def cleanup_file(file_path: str): download_results[filename] = False return download_results + + +def clean_message_content(content: str) -> str: + """Clean message content by removing ANSI escape codes. + + This function removes ANSI escape sequences (like color codes) from text content + that might be present in terminal output or console messages. This ensures clean + text for markdown generation and PDF conversion. + + Args: + content: The raw message content that may contain ANSI escape codes + + Returns: + Cleaned content with ANSI escape codes removed + + Example: + >>> clean_message_content("Hello \x1b[31mworld\x1b[0m!") + "Hello world!" + """ + import re + + return re.sub(r"\x1b\[[0-9;]*m", "", content) + + +def should_skip_message(clean_output: str) -> bool: + """Check if message should be skipped during markdown generation. + + This function determines whether a message should be excluded from the final + markdown output. It skips empty or meaningless messages but preserves important + error messages that should be displayed to users. + + Args: + clean_output: The cleaned message content to evaluate + + Returns: + True if the message should be skipped, False otherwise + + Note: + Parsing error messages are intentionally not skipped as they provide + important feedback to users about conversation flow issues. + """ + return ( + clean_output.strip() in ["", "None", "null", "undefined"] + # Don't skip parsing error messages - they should be displayed and increment step counter + # or "There are no tags" in clean_output + # or "Execution terminated due to repeated parsing errors" in clean_output + ) + + +def has_execution_results(clean_output: str, execution_results) -> bool: + """Check if message contains code execution and has associated results. + + This function determines whether a message contains executable code and has + corresponding execution results available for display in the markdown output. + + Args: + clean_output: The cleaned message content to check for execute tags + execution_results: List of execution results from the agent's execution history + + Returns: + True if the message contains tags and has execution results available + """ + return "" in clean_output and execution_results is not None and execution_results + + +def find_matching_execution(clean_output: str, execution_results) -> dict | None: + """Find the execution result that matches the given message content. + + This function searches through the execution results to find the one that + corresponds to the current message. It matches based on the triggering message + content to associate execution results with their originating AI messages. + + Args: + clean_output: The cleaned message content to match against + execution_results: List of execution result dictionaries containing + triggering messages and execution data + + Returns: + The matching execution result dictionary if found, None otherwise + + Note: + The matching is bidirectional - it checks if either the triggering message + is contained in the current output or vice versa to handle partial matches. + """ + for exec_result in execution_results: + if exec_result["triggering_message"] in clean_output or clean_output in exec_result["triggering_message"]: + return exec_result + return None + + +def create_parsing_error_html() -> str: + """Create HTML markup for displaying parsing errors in markdown output. + + This function generates a styled HTML block that displays parsing errors + when the agent's response doesn't contain the required tags. The HTML + uses CSS classes for consistent styling in the final PDF output. + + Returns: + HTML string containing a styled parsing error message box + + Note: + The returned HTML uses CSS classes defined in get_pdf_css_content() + for consistent styling across the document. + """ + return """ +
+
Parsing Error
+
Each response must include thinking process followed by either execute or solution tag. But there are no tags in the current response.
+
+""" + + +def parse_tool_calls_from_code(code: str, module2api: dict, custom_functions: dict = None) -> list[str]: + """Parse code to detect imported tools by analyzing import statements. + + This function analyzes Python code to identify which tools/functions are being + imported and used. It extracts tool names from import statements and function + calls, then returns a deduplicated list of detected tool names. + + Args: + code: The Python code string to analyze for tool imports + module2api: Dictionary mapping module names to their available API tools + custom_functions: Optional dictionary of custom functions that have been + added to the agent + + Returns: + Sorted list of unique tool names detected in the code + + Example: + >>> code = "from biomni.tool import analyze_data\nimport pandas as pd" + >>> parse_tool_calls_from_code(code, module2api) + ['analyze_data', 'pandas'] + """ + tool_module_pairs = parse_tool_calls_with_modules(code, module2api, custom_functions) + return sorted({pair[0] for pair in tool_module_pairs}) + + +def parse_tool_calls_with_modules(code: str, module2api: dict, custom_functions: dict = None) -> list[tuple[str, str]]: + """Parse code to detect imported tools and their associated modules. + + This function performs detailed analysis of Python code to identify which + tools/functions are being imported and which modules they belong to. It + handles various import patterns including direct imports, from-imports, + and module.function patterns. + + Args: + code: The Python code string to analyze for tool imports + module2api: Dictionary mapping module names to their available API tools + custom_functions: Optional dictionary of custom functions that have been + added to the agent + + Returns: + List of tuples containing (tool_name, module_name) pairs for each + detected tool and its associated module + + Note: + The function uses regex patterns to match various import statement + formats and also detects direct function calls without explicit imports. + """ + import re + + detected_tools = set() + + # Get all available tools from module2api + all_tools = {} + for module_name, module_tools in module2api.items(): + for tool in module_tools: + if isinstance(tool, dict) and "name" in tool: + tool_name = tool["name"] + if tool_name not in all_tools: + all_tools[tool_name] = [] + all_tools[tool_name].append(module_name) + + # Add custom tools + if custom_functions: + for tool_name in custom_functions.keys(): + if tool_name not in all_tools: + all_tools[tool_name] = [] + all_tools[tool_name].append("custom_tools") + + # Look for import statements in the code + import_patterns = [ + r"from\s+([\w.]+)\s+import\s+([\w,\s]+)", # from module import tool1, tool2 + r"import\s+([\w.]+)", # import module + ] + + for pattern in import_patterns: + matches = re.findall(pattern, code) + for match in matches: + if len(match) == 2: # from module import tools + module_name, tools_str = match + # Split tools by comma and clean up + tools = [tool.strip() for tool in tools_str.split(",")] + + for tool in tools: + # Check if this tool exists in any module + if tool in all_tools: + # Find the best matching module + best_module = find_best_module_match(module_name, all_tools[tool]) + detected_tools.add((tool, best_module)) + # Also check if it's a module.function pattern + elif "." in tool: + parts = tool.split(".") + if len(parts) == 2: + module_part, func_part = parts + if func_part in all_tools: + best_module = find_best_module_match(module_part, all_tools[func_part]) + detected_tools.add((func_part, best_module)) + + elif len(match) == 1: # import module + module_name = match[0] + # Check if any tools from this module are used + for tool_name, modules in all_tools.items(): + if any(module_name in mod for mod in modules): + # Look for usage of this tool in the code + if re.search(rf"\b{tool_name}\s*\(", code): + best_module = find_best_module_match(module_name, modules) + detected_tools.add((tool_name, best_module)) + + # Also look for direct function calls without imports + function_call_pattern = r"(\w+)\s*\(" + function_calls = re.findall(function_call_pattern, code) + + for func_call in function_calls: + if func_call in all_tools: + # For direct calls, use the first available module + best_module = all_tools[func_call][0] + detected_tools.add((func_call, best_module)) + + return sorted(detected_tools) + + +def find_best_module_match(target_module: str, available_modules: list[str]) -> str: + """Find the best matching module from a list of available modules. + + This function attempts to match a target module name against a list of + available modules using various matching strategies: exact match, partial + substring matches, and fallback to the first available module. + + Args: + target_module: The module name we're trying to match + available_modules: List of available module names to search through + + Returns: + The best matching module name from the available modules list. + Returns "unknown" if no modules are available. + + Note: + The matching strategy prioritizes exact matches, then partial matches + (where either the target is contained in the module name or vice versa), + and finally falls back to the first available module. + """ + # First try exact match + if target_module in available_modules: + return target_module + + # Try partial matches + for module in available_modules: + if target_module in module or module in target_module: + return module + + # Return the first available module as fallback + return available_modules[0] if available_modules else "unknown" + + +def inject_custom_functions_to_repl(custom_functions: dict): + """Inject custom functions into the Python REPL execution environment. + + This function makes custom tools available during code execution by injecting + them into both the persistent execution namespace and the builtins module. + This allows the agent to call custom functions that users have added via + agent.add_tool() when executing Python code in blocks. + + Args: + custom_functions: Dictionary mapping function names to their callable objects + + Note: + The function modifies both the persistent namespace used by run_python_repl + and the builtins module to ensure maximum compatibility and accessibility + of custom functions during code execution. + """ + if custom_functions: + # Access the persistent namespace used by run_python_repl + from biomni.tool.support_tools import _persistent_namespace + + # Inject all custom functions into the execution namespace + for name, func in custom_functions.items(): + _persistent_namespace[name] = func + + # Also make them available in builtins for broader access + import builtins + + if not hasattr(builtins, "_biomni_custom_functions"): + builtins._biomni_custom_functions = {} + builtins._biomni_custom_functions.update(custom_functions) + + +def format_execute_tags_in_content(content: str, parse_tool_calls_with_modules_func) -> str: + """Format execute tags in content by extracting code and creating highlighted tool call blocks. + + This function processes content that contains ... tags and + converts them into styled HTML blocks that display the code with syntax highlighting + and information about which tools are being used. + + Args: + content: The content string that may contain tags + parse_tool_calls_with_modules_func: Function to parse tool calls with modules + (typically parse_tool_calls_with_modules) + + Returns: + Formatted content with execute tags converted to highlighted tool call blocks. + Also processes tags in the same pass. + + Note: + The function also calls format_solution_tags_in_content() to handle + solution tags in the same processing pass. + """ + import re + + # Pattern to match ... blocks + execute_pattern = r"(.*?)" + + def replace_execute_tag(match): + code_content = match.group(1).strip() + language, tool_name = detect_code_language_and_tool(code_content) + code_content = clean_code_content(code_content, language) + + # Parse tools from the code content with module information + detected_tool_modules = parse_tool_calls_with_modules_func(code_content) + + # Create the formatted block + formatted_block = create_tool_call_block(code_content, language, tool_name, detected_tool_modules) + return formatted_block + + # Replace all execute tags with formatted tool call blocks + formatted_content = re.sub(execute_pattern, replace_execute_tag, content, flags=re.DOTALL) + + # Also format solution tags + formatted_content = format_solution_tags_in_content(formatted_content) + + return formatted_content + + +def detect_code_language_and_tool(code_content: str) -> tuple[str, str]: + """Detect the programming language and tool name from code content. + + This function analyzes code content to determine the programming language + and appropriate tool name based on language markers at the beginning of + the code block. + + Args: + code_content: The code content to analyze for language markers + + Returns: + Tuple containing (language, tool_name) where: + - language: The detected programming language ("python", "r", "bash") + - tool_name: The human-readable tool name for display + + Example: + >>> detect_code_language_and_tool("#!R\nlibrary(ggplot2)") + ("r", "R REPL") + >>> detect_code_language_and_tool("#!BASH\necho 'hello'") + ("bash", "Bash Script") + """ + if code_content.startswith("#!R") or code_content.startswith("# R code") or code_content.startswith("# R script"): + return "r", "R REPL" + elif code_content.startswith("#!BASH") or code_content.startswith("# Bash script"): + return "bash", "Bash Script" + elif code_content.startswith("#!CLI"): + return "bash", "CLI Command" + else: + return "python", "Python REPL" + + +def clean_code_content(code_content: str, language: str) -> str: + """Clean code content by removing language markers. + + This function removes language-specific markers from the beginning of code + content to prepare it for display in code blocks. The markers are used + internally for language detection but should not appear in the final output. + + Args: + code_content: The raw code content that may contain language markers + language: The detected programming language ("python", "r", "bash") + + Returns: + Cleaned code content with language markers removed + + Example: + >>> clean_code_content("#!R\nlibrary(ggplot2)", "r") + "library(ggplot2)" + >>> clean_code_content("#!BASH\necho 'hello'", "bash") + "echo 'hello'" + """ + import re + + if language == "r": + return re.sub(r"^#!R|^# R code|^# R script", "", code_content, count=1).strip() + elif language == "bash": + if code_content.startswith("#!BASH") or code_content.startswith("# Bash script"): + return re.sub(r"^#!BASH|^# Bash script", "", code_content, count=1).strip() + elif code_content.startswith("#!CLI"): + return re.sub(r"^#!CLI", "", code_content, count=1).strip() + return code_content + + +def create_tool_call_block(code_content: str, language: str, tool_name: str, detected_tool_modules: list) -> str: + """Create the HTML block for tool call highlighting. + + This function generates a styled HTML block that displays code execution + information including the code itself, syntax highlighting, and a list of + tools that were used during execution. + + Args: + code_content: The cleaned code content to display + language: The programming language for syntax highlighting + tool_name: The default tool name to display if no specific tools detected + detected_tool_modules: List of (tool_name, module_name) tuples for tools used + + Returns: + HTML string containing a styled tool call block with code and tool information + + Note: + The HTML uses CSS classes defined in get_pdf_css_content() for styling. + If no specific tools are detected, it falls back to a default tool name. + """ + # Create the formatted block with code and tools used + formatted_block = f"""
+
+Code Execution +
+
+```{language} +{code_content} +``` +
""" + + # Add tools used section + if detected_tool_modules: + tools_list = format_detected_tools(detected_tool_modules) + formatted_block += f""" +
+Tools Used: {tools_list} +
""" + else: + formatted_block += format_default_tool_name(language, tool_name) + + formatted_block += "
" + return formatted_block + + +def format_detected_tools(detected_tool_modules: list) -> str: + """Format detected tools with their modules for display. + + This function takes a list of (tool_name, module_name) tuples and formats + them into a human-readable string for display in the tool call blocks. + It handles special cases for common tools and formats module names appropriately. + + Args: + detected_tool_modules: List of (tool_name, module_name) tuples + + Returns: + Comma-separated string of formatted tool descriptions + + Example: + >>> format_detected_tools([("analyze_data", "biomni.tool"), ("pandas", "pandas")]) + "biomni → analyze_data, pandas → pandas" + """ + tool_descriptions = [] + for tool_name, module_name in detected_tool_modules: + if tool_name == "python_repl": + tool_descriptions.append("Python REPL") + elif tool_name == "r_repl": + tool_descriptions.append("R REPL") + elif "bash" in tool_name.lower(): + tool_descriptions.append("Bash Script") + else: + # Extract the last part of the module name for display + display_module = module_name.split(".")[-1] if "." in module_name else module_name + tool_descriptions.append(f"{display_module} → {tool_name}") + + return ", ".join(sorted(tool_descriptions)) + + +def format_default_tool_name(language: str, tool_name: str) -> str: + """Format default tool name based on programming language. + + This function generates HTML for displaying the default tool name when + no specific tools are detected in the code. It maps programming languages + to their appropriate default tool names. + + Args: + language: The programming language ("python", "r", "bash") + tool_name: The detected tool name (used for bash CLI vs script distinction) + + Returns: + HTML string containing a styled tools-used section + + Note: + For bash, it distinguishes between CLI commands and bash scripts + based on the tool_name parameter. + """ + if language == "r": + return """ +
+Tools Used: R REPL +
""" + elif language == "bash": + if tool_name == "CLI Command": + return """ +
+Tools Used: CLI Command +
""" + else: + return """ +
+Tools Used: Bash Script +
""" + else: + return """ +
+Tools Used: Python REPL +
""" + + +def format_solution_tags_in_content(content: str) -> str: + """Format solution tags in content by extracting text and formatting as solution blocks. + + This function processes content that contains ... tags and + converts them into styled HTML blocks that display solution content with appropriate + formatting and CSS classes. + + Args: + content: The content string that may contain tags + + Returns: + Formatted content with solution tags converted to styled solution blocks + + Note: + The solution blocks use the "title-text summary" CSS class for consistent + styling with other content blocks in the markdown output. + """ + import re + + # Pattern to match ... blocks + solution_pattern = r"(.*?)" + + def replace_solution_tag(match): + solution_content = match.group(1).strip() + # Format as regular text, not terminal + return f"""
+
+Summary and Solution +
+
+{solution_content} +
+
""" + + # Replace all solution tags with formatted solution blocks + formatted_content = re.sub(solution_pattern, replace_solution_tag, content, flags=re.DOTALL) + + return formatted_content + + +def format_observation_as_terminal(content: str) -> str | None: + """Format observation content with terminal-like styling. + + This function processes observation content from the agent's execution results + and formats it as a styled terminal block. It handles both text and image content, + with length limits to ensure the output fits within PDF page constraints. + + Args: + content: The observation content string, potentially containing tags + + Returns: + Formatted HTML content with terminal styling, or None if observation is + empty, invalid, or contains only meaningless content + + Note: + - Content is limited to 10,000 characters to fit within 2 A4 pages + - Handles both text and base64-encoded images + - Uses CSS classes for consistent styling with other content blocks + """ + import re + + # Character limit for 2 A4 pages (approximately 10,000 characters) + MAX_OBSERVATION_LENGTH = 10000 + + # Remove the tags and extract the content + observation_pattern = r"(.*?)" + observation_match = re.search(observation_pattern, content, re.DOTALL) + + if observation_match: + observation_content = observation_match.group(1).strip() + else: + # Fallback if no observation tags found - check if content is meaningful + if not (content.strip() and content.strip() not in ["", "None", "null", "undefined"]): + return None + observation_content = content.strip() + + # Skip empty observations + if not observation_content or observation_content in ["", "None", "null", "undefined"]: + return None + + # Check if observation is too long for 2 pages + if len(observation_content) > MAX_OBSERVATION_LENGTH: + cropped_content = observation_content[:MAX_OBSERVATION_LENGTH] + truncation_notice = f"\n\n[Output truncated - content was too long to display here ({len(observation_content)} characters total)]" + observation_content = cropped_content + truncation_notice + + # Check if it contains plot data (base64 images) + if "data:image/" in observation_content: + content_html = process_observation_with_images(observation_content) + else: + # Regular text output - format as terminal output + content_html = f"```terminal\n{observation_content}\n```" + + return f"""
+
+Observation +
+
+{content_html} +
+
""" + + +def process_observation_with_images(observation_content: str) -> str: + """Process observation content that contains both text and base64-encoded images. + + This function handles observation content that includes both text output and + base64-encoded images (typically plots from data analysis). It separates the + text and image content and formats them appropriately for markdown display. + + Args: + observation_content: The observation content containing both text and images + + Returns: + HTML string containing formatted text (as terminal blocks) and images + (as markdown image tags) + + Note: + The function uses "data:image/" as a delimiter to split content into + text and image parts, then processes each part separately. + """ + # Split content into text and image parts + parts = observation_content.split("data:image/") + text_parts = [] + image_parts = [] + + for i, part in enumerate(parts): + if i == 0: + # First part is text only + if part.strip(): + text_parts.append(part.strip()) + else: + # Find the end of the base64 data + end_markers = ["\n", "\r", " ", "\t", ">", "<", "]", ")", "}"] + image_end = len(part) + for marker in end_markers: + marker_pos = part.find(marker) + if marker_pos != -1 and marker_pos < image_end: + image_end = marker_pos + + # Extract image data + image_data = "data:image/" + part[:image_end] + image_parts.append(image_data) + + # Extract remaining text + remaining_text = part[image_end:].strip() + if remaining_text: + text_parts.append(remaining_text) + + # Build the content + content_html = "" + if text_parts: + # Add text content as terminal output + text_content = "\n".join(text_parts) + content_html += f"```terminal\n{text_content}\n```\n\n" + + if image_parts: + # Add image content + for image_data in image_parts: + content_html += f"![Plot]({image_data})\n\n" + + return content_html + + +def remove_emojis_from_text(text: str) -> str: + """Remove emojis from text for markdown/PDF output. + + This function removes common emojis used in the system prompt and configuration + display from text content before it's converted to markdown or PDF. This ensures + clean, professional output while preserving emojis in the console display. + + Args: + text: The text content that may contain emojis + + Returns: + Text content with emojis removed + + Note: + The function targets specific emojis used in the Biomni system: + - 🔧 for tools + - 📊 for data + - ⚙️ for software + - 📋 for configuration + - 🤖 for agent + """ + import re + + # Remove common emojis used in the system prompt, this makes conversion simpler + emoji_patterns = [ + r"🔧\s*", # Tool emoji + r"📊\s*", # Data emoji + r"⚙️\s*", # Software emoji + r"📋\s*", # Config emoji + r"🤖\s*", # Agent emoji + ] + + for pattern in emoji_patterns: + text = re.sub(pattern, "", text) + + return text + + +def format_lists_in_text(text: str) -> str: + """Format numbered lists and bullet points in text to proper markdown format. + + This function processes text content to identify and format various types of lists, + including numbered lists with checkboxes, regular lists, and plan structures. + It also handles preprocessing tasks like removing bold formatting from plan titles + and removing emojis for clean PDF output. + + Args: + text: The text content to process for list formatting + + Returns: + Formatted text with properly structured lists and cleaned formatting + + Note: + The function performs several preprocessing steps: + - Removes bold formatting from plan titles + - Removes emojis for PDF output + - Identifies and formats checkbox lists + - Processes regular text blocks + """ + import re + + # Preprocess to remove bold formatting from plan titles + # Remove **Plan:**, **Updated Plan:**, **Completed Plan:**, etc. + text = re.sub(r"\*\*([Pp]lan|Updated [Pp]lan|Completed [Pp]lan|Final [Pp]lan):\*\*", r"\1:", text) + # Also handle cases without colons + text = re.sub(r"\*\*([Pp]lan|Updated [Pp]lan|Completed [Pp]lan|Final [Pp]lan)\*\*", r"\1", text) + # Handle any other bold formatting patterns for plan titles + text = re.sub(r"([Pp]lan|Updated [Pp]lan|Completed [Pp]lan|Final [Pp]lan):", r"\1:", text) + text = re.sub(r"([Pp]lan|Updated [Pp]lan|Completed [Pp]lan|Final [Pp]lan)", r"\1", text) + + # Remove emojis from the text for markdown/PDF output + text = remove_emojis_from_text(text) + + lines = text.split("\n") + list_blocks = identify_list_blocks(lines) + + # Process each block + result_blocks = [] + for block_text, is_checkbox_list in list_blocks: + if is_checkbox_list: + result_blocks.append(format_single_list(block_text)) + else: + result_blocks.append(block_text) + + return "\n".join(result_blocks) + + +def identify_list_blocks(lines: list) -> list[tuple[str, bool]]: + """Identify blocks of text that contain lists. + + This function analyzes a list of text lines to identify contiguous blocks + that contain numbered lists with checkboxes. It groups lines into blocks + and marks whether each block contains a checkbox list or regular text. + + Args: + lines: List of text lines to analyze + + Returns: + List of tuples containing (block_text, is_checkbox_list) where: + - block_text: The text content of the block + - is_checkbox_list: True if the block contains numbered items with checkboxes + + Note: + The function looks for patterns like "1. [ ]", "2. [✓]", "3. [✗]" to + identify checkbox sequences and groups them into separate blocks. + """ + import re + + list_blocks = [] + current_block = [] + in_checkbox_sequence = False + + for line in lines: + line_stripped = line.strip() + + # Check if this line starts a numbered item with checkbox + if re.match(r"^\d+\.\s*\[[ ✓✗]\]", line_stripped): + if not in_checkbox_sequence: + # Start of a new checkbox sequence + if current_block: + list_blocks.append(("\n".join(current_block), False)) + current_block = [line] + in_checkbox_sequence = True + else: + # Continue the sequence + current_block.append(line) + else: + if in_checkbox_sequence: + # End of checkbox sequence + if current_block: + list_blocks.append(("\n".join(current_block), True)) + current_block = [] + in_checkbox_sequence = False + current_block.append(line) + + # Handle the last block + if current_block: + if in_checkbox_sequence: + list_blocks.append(("\n".join(current_block), True)) + else: + list_blocks.append(("\n".join(current_block), False)) + + return list_blocks + + +def format_single_list(text: str) -> str: + """Format a single list block with checkboxes and plan titles. + + This function processes a text block that may contain numbered lists with + checkboxes and plan titles. It converts checkbox symbols to HTML list items + and wraps the content in a styled container with appropriate CSS classes. + + Args: + text: The text block to format, potentially containing numbered lists + + Returns: + HTML string containing either a formatted list with plan title or + regular text if no list items are found + + Note: + The function recognizes plan titles like "Plan", "Updated Plan", "Completed Plan" + and converts checkbox symbols (✓, ✗) to HTML format ([x], [ ]). + """ + import re + + lines = text.split("\n") + list_items = [] + has_list_items = False + plan_title = "Plan" # Default title + + for line in lines: + line = line.strip() + if not line: + continue + + # Check for plan title patterns + if re.match(r"^(Plan|Updated Plan|Completed Plan)$", line, re.IGNORECASE): + plan_title = line + continue + + # Check for numbered lists with checkboxes (1. [ ] or 1. [✓] or 1. [✗]) + if re.match(r"^\d+\.\s*\[[ ✓✗]\]", line): + has_list_items = True + # Extract the content after the checkbox + content = re.sub(r"^\d+\.\s*\[[ ✓✗]\]\s*", "", line) + + # Replace checkbox symbols with text format + if "[✓]" in line: + list_items.append(f"
  • [x] {content}
  • ") + elif "[✗]" in line: + list_items.append(f"
  • [ ] {content}
  • ") + else: + list_items.append(f"
  • [ ] {content}
  • ") + else: + # Regular text - add as is (don't convert to list items) + list_items.append(line) + + if has_list_items and list_items: + # This is a list - return with container div and styled title + return f"""
    +
    +{plan_title} +
    +
    +
      +{chr(10).join(list_items)} +
    +
    +
    """ + else: + # Regular text + return "\n".join(list_items) + + +def convert_markdown_to_pdf(markdown_path: str, pdf_path: str) -> None: + """Convert markdown file to PDF using weasyprint or fallback libraries. + + This function converts a markdown file to PDF format using multiple fallback + strategies. It prioritizes weasyprint for better layout control, then falls back + to markdown2pdf and finally pandoc if the preferred libraries are not available. + + Args: + markdown_path: Path to the input markdown file + pdf_path: Path where the output PDF file should be saved + + Raises: + ImportError: If no PDF conversion library is available + Exception: If PDF conversion fails for any other reason + + Note: + The function uses minimal markdown extensions for better performance + and applies custom CSS styling for consistent formatting. + """ + try: + # Try weasyprint first (better for complex layouts) + from weasyprint import HTML + from weasyprint.text.fonts import FontConfiguration + + # Read markdown content + with open(markdown_path, encoding="utf-8") as f: + markdown_content = f.read() + + # Convert markdown to HTML with minimal extensions for better performance + import markdown + + # Use minimal extensions to improve performance + html_content = markdown.markdown( + markdown_content, + extensions=["fenced_code"], # Removed codehilite for better performance + ) + + # Add CSS styling + css_content = get_pdf_css_content() + + # Create HTML document + html_doc = f""" + + + + + Biomni Conversation History + + + + {html_content} + + + """ + + # Convert to PDF with performance optimizations + font_config = FontConfiguration() + html_obj = HTML(string=html_doc) + html_obj.write_pdf(pdf_path, font_config=font_config, optimize_images=True) + + except ImportError: + # Fallback to markdown2pdf if weasyprint is not available + try: + from markdown2pdf import markdown2pdf + + markdown2pdf(markdown_path, pdf_path) + except ImportError: + # Final fallback - try using pandoc if available + import subprocess + + try: + subprocess.run(["pandoc", markdown_path, "-o", pdf_path], check=True) + except (subprocess.CalledProcessError, FileNotFoundError) as e: + raise ImportError( + "No PDF conversion library available. Please install weasyprint, markdown2pdf, or pandoc." + ) from e + except Exception as e: + raise Exception(f"PDF conversion failed: {e}") from e + + +def get_pdf_css_content() -> str: + """Get the CSS content for PDF generation. + + This function returns a comprehensive CSS stylesheet designed specifically + for PDF generation from markdown content. It includes styling for all + HTML elements that may appear in the converted markdown, with optimized + typography, spacing, and layout for print media. + + Returns: + CSS string containing all styles needed for PDF generation + + Note: + The CSS includes styles for: + - Typography and font families + - Headings and text formatting + - Code blocks and syntax highlighting + - Tables and lists + - Custom classes for tool calls, observations, and plans + - Print-optimized spacing and layout + """ + return """ + body { + font-family: 'Noto Color Emoji', 'Apple Color Emoji', 'Segoe UI Emoji', 'Twemoji', 'EmojiOne Color', -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, sans-serif; + font-size: 9pt; + line-height: 1.4; + max-width: 800px; + margin: 0 auto; + padding: 15px; + color: #333; + } + h1, h2, h3, h4, h5, h6 { + font-family: 'Noto Color Emoji', 'Apple Color Emoji', 'Segoe UI Emoji', 'Twemoji', 'EmojiOne Color', -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, sans-serif; + color: #2c3e50; + margin-top: 1em; + margin-bottom: 0.5em; + } + h1 { + border-bottom: 2px solid #3498db; + padding-bottom: 8px; + font-size: 16pt; + } + h2 { + border-bottom: 1px solid #bdc3c7; + padding-bottom: 3px; + font-size: 14pt; + } + h3 { + font-size: 12pt; + } + h4 { + font-size: 10pt; + margin-top: 0.8em; + margin-bottom: 0.3em; + } + h5, h6 { + font-size: 9pt; + margin-top: 0.6em; + margin-bottom: 0.2em; + } + code { + background-color: #f8f9fa; + padding: 1px 3px; + border-radius: 2px; + font-family: 'Monaco', 'Menlo', 'Ubuntu Mono', monospace; + font-size: 8pt; + white-space: pre-wrap; + word-wrap: break-word; + } + pre { + background-color: #f8f9fa; + padding: 10px; + border-radius: 3px; + overflow-x: auto; + border-left: 3px solid #3498db; + white-space: pre-wrap; + word-wrap: break-word; + font-size: 8pt; + margin: 0.5em 0; + } + pre code { + background-color: transparent; + padding: 0; + border-radius: 0; + font-size: 8pt; + } + /* Code header styling */ + strong { + font-size: 9pt; + font-weight: normal; + color: #6c757d; + font-style: italic; + } + blockquote { + border-left: 3px solid #bdc3c7; + margin: 0.5em 0; + padding-left: 15px; + color: #7f8c8d; + font-size: 8pt; + } + table { + border-collapse: collapse; + width: 100%; + margin: 0.5em 0; + font-size: 8pt; + } + th, td { + border: 1px solid #bdc3c7; + padding: 4px 8px; + text-align: left; + } + th { + background-color: #ecf0f1; + font-weight: bold; + } + img { + max-width: 100%; + height: auto; + display: block; + margin: 10px auto; + border: 1px solid #ddd; + border-radius: 3px; + box-shadow: 0 2px 4px rgba(0,0,0,0.1); + } + p { + font-family: 'Noto Color Emoji', 'Apple Color Emoji', 'Segoe UI Emoji', 'Twemoji', 'EmojiOne Color', -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, sans-serif; + margin: 0.3em 0; + } + /* Tool call highlighting - matching observation and code formatting */ + .tool-call-highlight { + background-color: #f8f9fa; + border: 1px solid #e9ecef; + border-radius: 3px; + padding: 0; + margin: 10px 0; + overflow: hidden; + } + .tool-call-header { + background-color: #e9ecef; + color: #495057; + padding: 8px 12px; + margin: 0; + font-weight: normal; + font-size: 9pt; + font-style: italic; + border-bottom: 1px solid #dee2e6; + } + .tool-call-input { + background-color: #f8f9fa; + border: none; + border-radius: 0; + padding: 10px 12px; + margin: 0; + color: #333; + font-size: 8pt; + line-height: 1.4; + } + .tool-call-input strong { + color: #495057; + font-weight: normal; + font-size: 8pt; + font-style: italic; + } + .tool-call-input pre { + background-color: #f8f9fa; + border: 1px solid #e9ecef; + border-radius: 3px; + padding: 10px; + margin: 0; + font-size: 8pt; + line-height: 1.4; + overflow-x: auto; + white-space: pre-wrap; + word-wrap: break-word; + } + .tool-call-input code { + background-color: transparent; + padding: 0; + border-radius: 0; + font-size: 8pt; + color: #2c3e50; + } + .tools-used { + background-color: #f8f9fa; + border-top: 1px solid #dee2e6; + padding: 8px 12px; + margin: 0; + font-size: 8pt; + color: #6c757d; + } + .tools-used strong { + color: #6c757d; + font-weight: normal; + font-size: 8pt; + font-style: italic; + } + /* Title-text styling - unified for observations, plans, and solutions */ + .title-text { + background-color: #f8f9fa; + border: 1px solid #e9ecef; + border-radius: 3px; + padding: 0; + margin: 10px 0; + overflow: hidden; + } + .title-text-header { + background-color: #e9ecef; + color: #495057; + padding: 8px 12px; + margin: 0; + font-weight: normal; + font-size: 9pt; + font-style: italic; + border-bottom: 1px solid #dee2e6; + } + .title-text-header strong { + color: #495057; + font-weight: normal; + font-size: 9pt; + font-style: italic; + } + .title-text-content { + background-color: #f8f9fa; + border: none; + border-radius: 0; + padding: 10px 12px; + margin: 0; + color: #333; + font-size: 8pt; + line-height: 1.4; + } + /* Plan-specific styling - soft blue pastel */ + .title-text.plan { + background-color: #e3f2fd; + border-color: #bbdefb; + } + .title-text.plan .title-text-header { + background-color: #bbdefb; + color: #1976d2; + } + .title-text.plan .title-text-content { + background-color: #e3f2fd; + } + .plan-title { + font-style: italic; + font-weight: normal; + color: #1565c0; + text-shadow: 0 1px 2px rgba(0,0,0,0.1); + } + .plan-title strong { + font-weight: normal; + } + /* Code execution-specific styling - matching title-text styling */ + .tool-call-highlight { + background-color: #f8f9fa; + border-color: #e9ecef; + } + .tool-call-header { + background-color: #e9ecef; + color: #495057; + } + .tool-call-input { + background-color: #f8f9fa; + color: #333; + } + /* Observation-specific styling - soft purple pastel */ + .title-text.observation { + background-color: #f3e5f5; + border-color: #e1bee7; + } + .title-text.observation .title-text-header { + background-color: #e1bee7; + color: #7b1fa2; + } + .title-text.observation .title-text-content { + background-color: #f3e5f5; + } + /* Summary and solution-specific styling - soft orange pastel, no overlay */ + .title-text.summary { + background-color: #fff3e0; + border-color: #ffcc02; + } + .title-text.summary .title-text-header { + background-color: #ffcc02; + color: #f57c00; + } + .title-text.summary .title-text-content { + background-color: #fff3e0; + } + .title-text-content ul { + background-color: transparent; + border: none; + border-radius: 0; + padding: 0; + margin: 0; + color: #333; + font-size: 8pt; + line-height: 1.4; + } + .title-text-content li { + margin: 3px 0; + color: #333; + } + .title-text-content li strong { + color: #495057; + font-weight: normal; + font-size: 8pt; + font-style: italic; + } + .title-text-content li code { + background-color: #e9ecef; + color: #333; + padding: 1px 3px; + border-radius: 2px; + font-family: 'Monaco', 'Menlo', 'Ubuntu Mono', monospace; + font-size: 7pt; + } + .title-text-content pre { + background-color: #f8f9fa; + border: 1px solid #e9ecef; + border-radius: 3px; + padding: 10px; + margin: 0; + font-size: 8pt; + line-height: 1.4; + overflow-x: auto; + white-space: pre-wrap; + word-wrap: break-word; + } + .title-text-content code { + background-color: transparent; + padding: 0; + border-radius: 0; + font-size: 8pt; + color: #2c3e50; + } + /* Parsing error display styling */ + .parsing-error-box { + background-color: #ffebee; + border: 1px solid #f44336; + border-radius: 4px; + padding: 8px 12px; + margin: 8px 0; + font-size: 9pt; + color: #c62828; + box-shadow: 0 2px 4px rgba(244, 67, 54, 0.1); + } + .parsing-error-header { + font-weight: bold; + margin-bottom: 4px; + color: #d32f2f; + } + .parsing-error-content { + font-family: 'Courier New', monospace; + background-color: #ffcdd2; + padding: 4px 6px; + border-radius: 2px; + margin-top: 4px; + font-size: 8pt; + white-space: pre-wrap; + word-wrap: break-word; + } + """ diff --git a/biomni_env/new_software_v006.sh b/biomni_env/new_software_v006.sh index c326c8ba4..23c2aa3f4 100644 --- a/biomni_env/new_software_v006.sh +++ b/biomni_env/new_software_v006.sh @@ -7,3 +7,4 @@ pip install pybiomart pip install fair-esm pip install nnunet nibabel nilearn pip install mi-googlesearch-python +pip install weasyprint