diff --git a/.env.template b/.env.template index aadf513..7f83002 100644 --- a/.env.template +++ b/.env.template @@ -13,13 +13,5 @@ SERPER_API_KEY= WANDB_API_KEY= HF_TOKEN= -# Azure VM Configuration (optional, for --vm flag) -AZURE_SUBSCRIPTION_ID= -AZURE_RESOURCE_GROUP_NAME= -AZURE_LOCATION= -NETWORK_SECURITY_GROUP_NAME= -SSH_PUBLIC_KEY_PATH= -SSH_PRIVATE_KEY_PATH= - # Required: Set to your name for tracking who ran the benchmark EXECUTED_BY= diff --git a/README.md b/README.md index ab4a4ef..dd82231 100644 --- a/README.md +++ b/README.md @@ -92,35 +92,7 @@ This repository provides a standardized evaluation harness for reproducible agen pip install anthropic ``` -6. **Optional: Azure VM Setup** - If you plan to use Azure VMs for evaluation, add the following to your `.env`: - - ``` - AZURE_SUBSCRIPTION_ID=your_subscription_id - AZURE_RESOURCE_GROUP_NAME=your_resource_group - AZURE_LOCATION=your_location - NETWORK_SECURITY_GROUP_NAME=your_nsg_name - SSH_PUBLIC_KEY_PATH=/path/to/your/ssh/key.pub - SSH_PRIVATE_KEY_PATH=/path/to/your/ssh/key - ``` - - - AZURE_SUBSCRIPTION_ID: This is the ID of your Azure subscription. Use the UUID. - - AZURE_RESOURCE_GROUP_NAME: This is the name of the resource group in which your VMs should be created. - - AZURE_LOCATION: e.g., "eastus" or "westus", etc. - - NETWORK_SECURITY_GROUP_NAME - - You will need to create a NSG in Azure for your access. - - Ensure that the NSG has an Inbound security rule that permits your machine to access SSH (port 22). - - Enter the NSG's name here. - - SSH_PUBLIC_KEY_PATH: This is your SSH key (on your local machine) - - SSH_PRIVATE_KEY_PATH: This is your SSH key (on your local machine) - - Then run the following command to install the optional azure dependencies: - - ```bash - pip install -e ".[azure]" - ``` - -7. **Optional: Docker Setup** +6. **Optional: Docker Setup** If you plan to use Docker containers for isolated evaluation, make sure Docker is installed on your system. The harness will automatically build the required Docker image. ## Tests @@ -404,7 +376,6 @@ hal-eval --benchmark --agent_dir --agent_func - **`--upload`**: Upload results to HuggingFace Hub - **`--max_concurrent `**: Number of parallel tasks (default: 1) - **`--conda_env_name `**: Conda environment for agent execution -- **`--vm`**: Run evaluation on Azure VMs - **`--docker`**: Run evaluation in Docker containers for isolation - **`--run_id `**: Specify a run ID (useful for continuing runs) - **`--continue_run`**: Continue from a previous run (requires run_id) @@ -434,19 +405,7 @@ hal-eval --benchmark usaco \ -A model_name=gpt-4o-mini-2024-07-18 ``` -3. **Running USACO on Azure VM:** - -```bash -hal-eval --benchmark usaco \ - --agent_dir agents/usaco_example_agent/ \ - --agent_function main.run \ - --agent_name "USACO Solver (gpt-4o-2024-11-20)" \ - --vm \ - --max_concurrent 5 \ - -A model_name=gpt-4o-2024-11-20 -``` - -4. **Running USACO with Amazon Bedrock models:** +3. **Running USACO with Amazon Bedrock models:** ```bash hal-eval --benchmark usaco \ diff --git a/agents/README.md b/agents/README.md index 4d44bed..050b5a8 100644 --- a/agents/README.md +++ b/agents/README.md @@ -32,9 +32,7 @@ def run(input: dict[str, dict], **kwargs) -> dict[str, str]: ## General Requirements -1. **Dependencies**: List all dependencies in `requirements.txt`. These will be installed: - - On VMs if `--vm` flag is used - - If you run evaluations locally, you must install the dependencies yourself. Then specify the conda environment name with `--conda_env_name` or run evaluations from the conda environment. +1. **Dependencies**: List all dependencies in `requirements.txt`. These will be installed when running in Docker. If you run evaluations locally, install the dependencies yourself and specify the conda environment name with `--conda_env_name` or run from the conda environment. 2. **Arguments**: Your agent can receive additional arguments via `-A` flags: @@ -172,7 +170,6 @@ def run(input: dict, **kwargs): **Requirements**: -- Must be run with `--vm` flag - **Important:** set `remote_environment_url` to `http://0.0.0.0:8000` and `experiment_name` to `output`. An example is below and in `agents/appworld_example_agent/main.py`. **Example Agent**: diff --git a/hal/agent_runner.py b/hal/agent_runner.py index de7eab3..4a16242 100644 --- a/hal/agent_runner.py +++ b/hal/agent_runner.py @@ -26,7 +26,6 @@ def __init__( config: Dict[str, Any], task_timeout: int, run_id: Optional[str] = None, - use_vm: bool = False, use_docker: bool = False, max_concurrent: int = 1, conda_env: Optional[str] = None, @@ -59,16 +58,15 @@ def __init__( not os.path.exists(requirements_path) and not conda_env and not use_docker - and not use_vm ): raise ValueError( f"No requirements.txt found in agent directory: {agent_dir}" ) # Validate runner options - if sum([bool(conda_env), use_vm, use_docker]) > 1: + if sum([bool(conda_env), use_docker]) > 1: raise ValueError( - "Only one of conda_env, use_vm, or use_docker can be set at a time." + "Only one of conda_env or use_docker can be set at a time." ) # Initialize benchmark first @@ -83,45 +81,19 @@ def __init__( results_dir, self.benchmark.benchmark_name ) - # Check if any task requires GPU - has_gpu_task = False - if hasattr(self.benchmark, "benchmark") and isinstance( - self.benchmark.benchmark, dict - ): - for task_id, task_data in self.benchmark.benchmark.items(): - if isinstance(task_data, dict) and task_data.get("gpu", False): - has_gpu_task = True - break - - # Print warning if GPU tasks are present but not running on VM - if has_gpu_task and not use_vm: - logger.warning( - "Warning: This benchmark contains tasks that require GPU, but is not being run on a VM. " - "GPU tasks may not work correctly without VM execution. Use the --vm flag to run on a VM." - ) - self.run_command = run_command # Check if benchmark requires sandbox - if self.benchmark.requires_sandbox and not use_vm and not use_docker: + if self.benchmark.requires_sandbox and not use_docker: raise ValueError( - f"Benchmark {benchmark_name} requires sandbox execution. Please use --vm or --docker flag." + f"Benchmark {benchmark_name} requires sandbox execution. Please use --docker flag." ) # Set run ID self.run_id = run_id or f"{benchmark_name}_{int(time.time())}" # Initialize appropriate runner with benchmark - if use_vm: - from .utils.virtual_machine_runner import VirtualMachineRunner - - self.runner = VirtualMachineRunner( - max_concurrent=max_concurrent, - log_dir=self.benchmark.get_run_dir(self.run_id), - benchmark=self.benchmark, - task_timeout=task_timeout, - ) - elif use_docker: + if use_docker: self.runner = DockerRunner( max_concurrent=max_concurrent, log_dir=self.benchmark.get_run_dir(self.run_id), @@ -143,7 +115,6 @@ def __init__( self.config = config self.max_concurrent = max_concurrent self.conda_env = conda_env - self.use_vm = use_vm self.use_docker = use_docker self.continue_run = continue_run self.ignore_errors = ignore_errors diff --git a/hal/benchmarks/README.md b/hal/benchmarks/README.md index 961545b..0a12944 100644 --- a/hal/benchmarks/README.md +++ b/hal/benchmarks/README.md @@ -120,25 +120,11 @@ class SimpleMathBenchmark(BaseBenchmark): 3. **Sandbox Support**: Set `requires_sandbox = True` if benchmark requires sandbox execution. -4. **Setup Script**: Provide `setup_script` for installing benchmark-specific dependencies on VMs. - -5. **GPU Support**: Tasks can specify GPU requirements by including a `"gpu": true` flag in their benchmark entries. When the benchmark is run with the `--vm` flag, tasks with this flag will be executed on GPU-enabled VMs. - ```python - # Example of a task that requires GPU - self.benchmark = { - "task_id": { - "prompt": "Train a neural network model...", - "files": {...}, - "gpu": true # This task will use a GPU VM when run with --vm flag - } - } - ``` - - GPU VMs are only created if the benchmark is run with the `--vm` flag (which activates VM-based execution) - - For tasks that don't specify a GPU requirement (no `"gpu"` key or `"gpu": false`), regular VMs will be used +4. **Setup Script**: Provide `setup_script` for installing benchmark-specific dependencies. ## Providing task-specific files to agents -Benchmarks can provide files to agents by including a `files` dictionary in each task. These files will be automatically copied into the agent's working environment by both the VM and local runs. +Benchmarks can provide files to agents by including a `files` dictionary in each task. These files will be automatically copied into the agent's working environment. Example task with files: ```python diff --git a/hal/benchmarks/base_benchmark.py b/hal/benchmarks/base_benchmark.py index 44aa956..41d0fbc 100644 --- a/hal/benchmarks/base_benchmark.py +++ b/hal/benchmarks/base_benchmark.py @@ -34,7 +34,7 @@ def __init__( ) self.agent_args: Dict[str, Any] = {} # Store agent args self.requires_sandbox = ( - requires_sandbox # Whether benchmark requires VM execution + requires_sandbox # Whether benchmark requires sandbox execution ) def _normalize_agent_output(self, agent_output: Dict[str, Any]) -> Dict[str, Any]: diff --git a/hal/cli.py b/hal/cli.py index 53708dd..68a8af2 100644 --- a/hal/cli.py +++ b/hal/cli.py @@ -82,7 +82,6 @@ default=os.path.join(os.path.dirname(__file__), "config.yaml"), help="Path to configuration file. (currently not used)", ) -@click.option("--vm", is_flag=True, help="Run the agent on azure VMs") @click.option( "--docker", is_flag=True, @@ -163,7 +162,6 @@ def main( a, b, i, - vm, docker, max_tasks, prompt_sensitivity, @@ -204,7 +202,7 @@ def main( log_dir = os.path.join(results_dir, benchmark, run_id) os.makedirs(log_dir, exist_ok=True) verbose_log_path = os.path.join(log_dir, f"{run_id}_verbose.log") - setup_logging(log_dir, run_id, use_vm=vm) + setup_logging(log_dir, run_id) logger.info("HAL Harness") @@ -216,9 +214,9 @@ def main( validate_model_pricing(agent_args["model_name"]) # Validate runner options - if sum([bool(conda_env_name), vm, docker]) > 1: + if sum([bool(conda_env_name), docker]) > 1: logger.error( - "Only one of --conda_env_name, --vm, or --docker can be specified. Exiting..." + "Only one of --conda_env_name or --docker can be specified. Exiting..." ) sys.exit(1) @@ -239,7 +237,6 @@ def main( max_concurrent=max_concurrent, conda_env_name=conda_env_name, log_dir=log_dir, - vm=vm, docker=docker, continue_run=continue_run, ignore_errors=ignore_errors, @@ -260,7 +257,6 @@ def main( benchmark_name=benchmark, config=config, run_id=run_id, # Now guaranteed to have a value - use_vm=vm, use_docker=docker, max_concurrent=max_concurrent, conda_env=conda_env_name, diff --git a/hal/utils/logging_utils.py b/hal/utils/logging_utils.py index e7a8f77..c049ed7 100644 --- a/hal/utils/logging_utils.py +++ b/hal/utils/logging_utils.py @@ -16,13 +16,12 @@ logger = logging.getLogger(__name__) -def setup_logging(log_dir: str, run_id: str, use_vm: bool = False) -> None: +def setup_logging(log_dir: str, run_id: str) -> None: """Setup logging configuration. Args: log_dir: Directory for log files run_id: Unique run identifier - use_vm: Unused; kept for API compatibility. """ # Create absolute path for log directory to avoid path duplication log_dir = os.path.abspath(log_dir) @@ -39,16 +38,8 @@ def setup_logging(log_dir: str, run_id: str, use_vm: bool = False) -> None: for handler in root_logger.handlers[:]: root_logger.removeHandler(handler) - # Suppress verbose Azure SDK logging - logging.getLogger("azure.core.pipeline.policies.http_logging_policy").setLevel( - logging.WARNING - ) - # Suppress Azure identity logging - logging.getLogger("azure.identity").setLevel(logging.WARNING) # Suppress httpx logging logging.getLogger("httpx").setLevel(logging.WARNING) - # Suppress SSH logging - logging.getLogger("paramiko.transport").setLevel(logging.WARNING) # Create formatters detailed_formatter = logging.Formatter( @@ -142,7 +133,6 @@ def print_run_config( max_concurrent: int, log_dir: str, conda_env_name: Optional[str], - vm: bool, continue_run: bool, docker: bool = False, ignore_errors: bool = False, @@ -160,7 +150,6 @@ def print_run_config( logger.info(f" Log Directory: {log_dir}") logger.info(f" Max Concurrent: {max_concurrent}") logger.info(f" Upload Results: {'Yes' if upload else 'No'}") - logger.info(f" VM Execution: {'Yes' if vm else 'No'}") logger.info(f" Docker Execution: {'Yes' if docker else 'No'}") logger.info(f" Continue Previous Run: {'Yes' if continue_run else 'No'}") logger.info(f" Ignore Errors: {'Yes' if ignore_errors else 'No'}") diff --git a/hal/utils/setup_vm.sh b/hal/utils/setup_vm.sh deleted file mode 100644 index 7ab1df4..0000000 --- a/hal/utils/setup_vm.sh +++ /dev/null @@ -1,34 +0,0 @@ -#!/bin/bash -set -e # Exit on error - -# Redirect all output to log file -exec > /home/agent/setup_vm.log 2>&1 - -# Configuration -HOME_DIR="/home/agent" - -echo "Starting setup for user: agent" - -# Create and activate environment as the agent user with explicit output -echo "Creating conda environment..." -# FIXME: stop installing pinned dependencies like this -su - agent -c "bash -c '\ - source /home/agent/miniconda3/etc/profile.d/conda.sh && \ - echo \"Installing Python standard library modules...\" && \ - conda activate agent_env - pip install --upgrade pip && \ - echo \"Checking for requirements.txt...\" && \ - if [ -f requirements.txt ]; then \ - echo \"Installing requirements...\" && \ - pip install -r requirements.txt && \ - echo \"Installing weave and gql pin...\" && \ - pip install weave==0.51.41 \"gql<4\" && \ - echo \"Installing Azure VM dependencies...\" && \ - pip install \"azure-identity>=1.12.0\" \"requests>=2.31.0\" && \ - echo \"Requirements installed\"; \ - else \ - echo \"No requirements.txt found\" && \ - exit 1; \ - fi'" - -echo "Setup completed successfully" diff --git a/hal/utils/virtual_machine_manager.py b/hal/utils/virtual_machine_manager.py deleted file mode 100644 index 54c2b4c..0000000 --- a/hal/utils/virtual_machine_manager.py +++ /dev/null @@ -1,473 +0,0 @@ -from azure.mgmt.compute import ComputeManagementClient -from azure.mgmt.network import NetworkManagementClient -from azure.identity import DefaultAzureCredential -import paramiko -import os -import tarfile -import json -import logging -from contextlib import contextmanager -from pathlib import Path -from .vm.azure_virtual_machine import AzureVirtualMachine - -# Mount names for core_agent: used only under VM_AGENT_HOME/environment/ (e.g. environment/data, environment/code, environment/results from task payload). -VM_AGENT_HOME = "/home/agent" -VM_ENVIRONMENT_MOUNT_NAMES = ("data", "code", "results") - -RUN_AGENT_SCRIPT_PATH = Path(__file__).resolve().parent / "vm" / "run_agent.py" - -# Set up base logger -_base_logger = logging.getLogger(__name__) - - -class VMLoggerAdapter(logging.LoggerAdapter): - """Prefix all messages in the file with the vm_name.""" - - def process(self, msg, kwargs): - vm_name = self.extra.get("vm_name", "unknown") - return f"VM Manager {vm_name}: {msg}", kwargs - - -def _get_logger(vm_name: str) -> logging.LoggerAdapter: - """Get a logger adapter that prefixes messages with the VM name.""" - return VMLoggerAdapter(_base_logger, {"vm_name": vm_name}) - - -class VirtualMachineManager: - """ - Manages virtual machine operations for agent execution. - - This class provides stateless methods for creating, managing, and deleting Azure VMs. - Each method operates on a specific VM identified by vm_name parameter. - - Thread-safe for concurrent operations on different VMs when using the same instance. - All methods require vm_name as the first parameter to avoid state conflicts. - - Environment Variables Required: - - AZURE_SUBSCRIPTION_ID: Azure subscription ID - - AZURE_RESOURCE_GROUP_NAME: Resource group for VMs - - AZURE_LOCATION: Azure region (e.g., 'eastus') - - SSH_PRIVATE_KEY_PATH: Path to SSH private key file - - SSH_PUBLIC_KEY_PATH: Path to SSH public key file - - NETWORK_SECURITY_GROUP_NAME: Network security group name - """ - - def __init__(self): - # Load required environment variables - self.subscription_id = os.getenv("AZURE_SUBSCRIPTION_ID") - self.resource_group_name = os.getenv("AZURE_RESOURCE_GROUP_NAME") - self.location = os.getenv("AZURE_LOCATION") - self.ssh_private_key_path = os.getenv("SSH_PRIVATE_KEY_PATH") - self.ssh_public_key_path = os.getenv("SSH_PUBLIC_KEY_PATH") - self.network_security_group_name = os.getenv("NETWORK_SECURITY_GROUP_NAME") - - # Validate all required environment variables - missing_vars = [] - if not self.subscription_id: - missing_vars.append("AZURE_SUBSCRIPTION_ID") - if not self.resource_group_name: - missing_vars.append("AZURE_RESOURCE_GROUP_NAME") - if not self.location: - missing_vars.append("AZURE_LOCATION") - if not self.ssh_private_key_path: - missing_vars.append("SSH_PRIVATE_KEY_PATH") - if not self.ssh_public_key_path: - missing_vars.append("SSH_PUBLIC_KEY_PATH") - if not self.network_security_group_name: - missing_vars.append("NETWORK_SECURITY_GROUP_NAME") - - if missing_vars: - raise ValueError( - f"Missing required environment variables: {', '.join(missing_vars)}. " - "Please set them in your .env file or environment." - ) - - # Validate SSH key files exist - if not os.path.exists(self.ssh_private_key_path): - raise FileNotFoundError( - f"SSH private key not found at: {self.ssh_private_key_path}. " - f"Please ensure SSH_PRIVATE_KEY_PATH points to a valid private key file." - ) - if not os.path.exists(self.ssh_public_key_path): - raise FileNotFoundError( - f"SSH public key not found at: {self.ssh_public_key_path}. " - f"Please ensure SSH_PUBLIC_KEY_PATH points to a valid public key file." - ) - - # Read SSH public key - with open(self.ssh_public_key_path, "r") as f: - self.ssh_public_key = f.read().strip() - - # Calculate NSG ID - self.nsg_id = f"/subscriptions/{self.subscription_id}/resourceGroups/{self.resource_group_name}/providers/Microsoft.Network/networkSecurityGroups/{self.network_security_group_name}" - - # Initialize Azure clients (for backwards compatibility with existing code) - self.credential = DefaultAzureCredential() - self.compute_client = ComputeManagementClient( - self.credential, self.subscription_id - ) - self.network_client = NetworkManagementClient( - self.credential, self.subscription_id - ) - - # Store created VMs for tracking - self._vms = {} - - @contextmanager - def _get_sftp_client( - self, - vm_name, - network_client, - resource_group_name, - ): - """ - Context manager for SFTP client that automatically handles connection and cleanup. - - Args: - vm_name: Name of the VM to connect to - network_client: Azure network client - resource_group_name: Azure resource group name - - Usage: - with self._get_sftp_client(vm_name, network_client, rg_name) as (sftp, ssh): - sftp.put(local_file, remote_file) - """ - logger = _get_logger(vm_name) - ssh_client = None - sftp_client = None - - try: - # Get the public IP address of the VM - public_ip_address = network_client.public_ip_addresses.get( - resource_group_name, f"{vm_name}-public-ip" - ).ip_address - - # Create SSH client - ssh_client = paramiko.SSHClient() - ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) - - # Connect to the VM using SSH (key_filename lets Paramiko auto-detect RSA/Ed25519/ECDSA) - ssh_client.connect( - hostname=public_ip_address, - username="agent", - key_filename=self.ssh_private_key_path, - ) - - # Create SFTP client - sftp_client = ssh_client.open_sftp() - - yield sftp_client, ssh_client - - finally: - # Close connections - if sftp_client: - try: - sftp_client.close() - except Exception as e: - logger.error(f"Error closing SFTP client: {e}") - - if ssh_client: - try: - ssh_client.close() - except Exception as e: - logger.error(f"Error closing SSH client: {e}") - - def create_virtual_machine_by_name( - self, vm_name, has_gpu: bool = False, setup_timeout: int = 0 - ): - """Create a standard Azure VM without GPU. - - Args: - vm_name: Name of the VM. - has_gpu: Whether the VM should have a GPU. - setup_timeout: Seconds to wait for startup script (passed from - VirtualMachineRunner.task_timeout). - """ - logger = _get_logger(vm_name) - if has_gpu: - logger.info(f"Creating virtual machine {vm_name} with a GPU") - else: - logger.info(f"Creating virtual machine {vm_name} with *no* GPU") - - # Create VM using new AzureVirtualMachine class - vm = AzureVirtualMachine( - name=vm_name, - resource_group=self.resource_group_name, - location=self.location, - subscription_id=self.subscription_id, - nsg_id=self.nsg_id, - ssh_public_key=self.ssh_public_key, - gpu=has_gpu, - timeout=setup_timeout, - ) - - # Store for tracking - self._vms[vm_name] = vm - - if has_gpu: - logger.info(f"Created virtual machine {vm_name} with GPU") - else: - logger.info(f"Created virtual machine {vm_name} with *no* GPU") - return vm - - def delete_virtual_machine_by_name(self, vm_name): - """Delete an Azure VM and all associated resources.""" - # Use the AzureVirtualMachine delete method if we have it - if vm_name in self._vms: - self._vms[vm_name].delete() - del self._vms[vm_name] - - def compress_and_copy_files_to_vm(self, vm_name, source_directory): - """Copy files from a local directory to the VM.""" - logger = _get_logger(vm_name) - try: - # Compress the source directory - source_directory = os.path.abspath(source_directory) - tar_file_path = f"{source_directory}.tar.gz" - - logger.info( - f"Creating tar archive from {source_directory} in {tar_file_path}" - ) - with tarfile.open(tar_file_path, "w:gz") as tar: - tar.add(source_directory, arcname=os.path.basename(source_directory)) - - tar_size = os.path.getsize(tar_file_path) - - # Copy the compressed file to the VM - remote_tar_file_path = f"/home/agent/{os.path.basename(tar_file_path)}" - with self._get_sftp_client( - vm_name, - self.network_client, - self.resource_group_name, - ) as (sftp_client, ssh_client): - logger.info(f"Uploading {tar_size} bytes") - sftp_client.put(tar_file_path, remote_tar_file_path) - - # Extract the compressed file on the VM - logger.info("Extracting files on the VM") - _, stdout, stderr = ssh_client.exec_command( - f"tar -xzf {remote_tar_file_path} --strip-components=1 -C /home/agent" - ) - - # Block until the tar command completes and check for errors - exit_status = stdout.channel.recv_exit_status() - stderr_text = stderr.read().decode() - - if exit_status != 0: - raise Exception( - f"Tar extraction failed with exit status {exit_status}. stderr: {stderr_text}" - ) - - if stderr_text: - logger.warning(f"Warning during tar extraction: {stderr_text}") - - logger.info(f"Successfully copied files from {source_directory}") - - # Remove the compressed file from the VM and the local machine - sftp_client.remove(remote_tar_file_path) - os.remove(tar_file_path) - - except Exception as e: - logger.error(f"Error copying files: {e}") - raise - - def copy_files_from_vm(self, vm_name, destination_directory): - """Copy files from the VM to local directory.""" - with self._get_sftp_client( - vm_name, - self.network_client, - self.resource_group_name, - ) as (sftp_client, ssh_client): - # Remove ./miniconda3 directory from the VM - _, stdout, _ = ssh_client.exec_command("rm -rf /home/agent/miniconda3") - for _ in stdout: - pass # Block until the rm command completes - - # Compress all files in the home directory on the VM - remote_tar_file_path = ( - f"/home/agent/{os.path.basename(destination_directory)}_back.tar.gz" - ) - remote_home_directory = "/home/agent" - _, stdout, _ = ssh_client.exec_command( - f"tar -czf {remote_tar_file_path} -C {remote_home_directory} ." - ) - for _ in stdout: - pass # Block until the tar command completes - - # Copy the compressed file from the VM - sftp_client.get(remote_tar_file_path, f"{destination_directory}.tar.gz") - - # Extract the compressed file on the local machine - with tarfile.open(f"{destination_directory}.tar.gz", "r:gz") as tar: - tar.extractall(destination_directory) - - # Remove the compressed file from the VM and the local machine - # sftp_client.remove(remote_tar_file_path) - os.remove(f"{destination_directory}.tar.gz") - - def check_task_completion(self, vm_name: str) -> bool: - """ - Check if task is complete by checking for output.json file. - - :param self: the virtual machine manager - :param vm_name: the virtual machine whose task we want to check - :type vm_name: str - :return: whether or not the task is complete - :rtype: bool - """ - task_completed_filepath = "/home/agent/output.json" - vm = self._vms[vm_name] - return vm.check_for_file_presence_by_path(task_completed_filepath) - - def run_agent_on_virtual_machine( - self, - vm_name, - agent_function, - task_id, - input_data, - agent_args, - run_id, - log_dir, - benchmark, - ): - """ - Run agent on VM with improved monitoring and error handling. - """ - logger = _get_logger(vm_name) - - def copy_env_and_run_setup_script( - vm_name: str, - log_dir: str, - benchmark, - task_id: str, - ) -> None: - """ - Set up the VM environment using uv and a setup script. - """ - try: - with self._get_sftp_client( - vm_name, - self.network_client, - self.resource_group_name, - ) as (sftp_client, ssh_client): - # Copy .env file to VM first - if os.path.exists(".env"): - logger.info("Copying .env file") - sftp_client.put(".env", "/home/agent/.env") - - # Copy setup script to VM - setup_script_path = os.path.join( - os.path.dirname(__file__), "setup_vm.sh" - ) - remote_setup_path = "/home/agent/setup_vm.sh" - sftp_client.put(setup_script_path, remote_setup_path) - - # Make setup script executable - ssh_client.exec_command(f"chmod +x {remote_setup_path}") - - # Run setup script with sudo - logger.info("Running setup_vm.sh on remote") - _, stdout, stderr = ssh_client.exec_command( - f"sudo bash {remote_setup_path}" - ) - - # Create log directory if it doesn't exist and write log file - os.makedirs(log_dir, exist_ok=True) - with open(f"{log_dir}/setup_vm_log_{task_id}.log", "w") as f: - f.write(stdout.read().decode()) - f.write(stderr.read().decode()) - - # Run setup script if it exists - if benchmark and benchmark.setup_script: - setup_script = os.path.join(benchmark.setup_script) - if os.path.exists(setup_script): - logger.info("Running setup script") - try: - cmd = """ - source /home/agent/miniconda3/etc/profile.d/conda.sh && \ - cd /home/agent && \ - bash setup_script.sh - """ - _, stdout, stderr = ssh_client.exec_command(cmd) - with open( - f"{log_dir}/setup_script_log_{task_id}.log", "w" - ) as f: - f.write(stdout.read().decode()) - f.write(stderr.read().decode()) - except Exception as e: - logger.error(f"Error running setup script: {e}") - - except Exception as e: - logger.error(f"Error setting up VM environment: {e}") - raise - - try: - # Setup conda environment if it exists - copy_env_and_run_setup_script( - vm_name, - log_dir, - benchmark, - task_id, - ) - - with self._get_sftp_client( - vm_name, - self.network_client, - self.resource_group_name, - ) as (sftp_client, ssh_client): - # Write input data and agent args to files - with sftp_client.open("/home/agent/input.json", "w") as f: - f.write(json.dumps({task_id: input_data})) - with sftp_client.open("/home/agent/agent_args.json", "w") as f: - f.write(json.dumps(agent_args)) - - # Write run-specific env vars for static run_agent.py - run_agent_env = f"RUN_ID={run_id}\nAGENT_FUNCTION={agent_function}\nTASK_ID={task_id}\n" - with sftp_client.open("/home/agent/run_agent.env", "w") as f: - f.write(run_agent_env) - - script_path = "/home/agent/run_agent.py" - ssh_client.exec_command(f"chmod +x {script_path}") - - # Construct command to run script - cmd = f"source /home/agent/miniconda3/etc/profile.d/conda.sh && conda activate agent_env && python {script_path} > agent_trace.log 2>&1" - - # Execute script - logger.info("Running agent") - _, stdout, stderr = ssh_client.exec_command(cmd) - - # Close the channel to prevent hanging - stdout.channel.close() - stderr.channel.close() - - except Exception as e: - logger.error(f"Error running agent: {e}") - raise - - def get_agent_trace(self, vm_name): - """ - Fetch the current agent trace log from a VM. - - Args: - vm_name: Name of the VM to fetch logs from - - Returns: - str: Contents of the agent trace log, or None if not available - """ - logger = _get_logger(vm_name) - try: - with self._get_sftp_client( - vm_name, - self.network_client, - self.resource_group_name, - ) as (sftp_client, _): - # Try to read the agent trace file - try: - with sftp_client.open("/home/agent/agent_trace.log") as f: - return f.read().decode("utf-8") - except FileNotFoundError: - return None - - except Exception as e: - logger.error(f"Error fetching agent trace: {e}") - return None diff --git a/hal/utils/virtual_machine_runner.py b/hal/utils/virtual_machine_runner.py deleted file mode 100644 index 53d0491..0000000 --- a/hal/utils/virtual_machine_runner.py +++ /dev/null @@ -1,326 +0,0 @@ -import os -import json -import asyncio -import time -import tempfile -import shutil -import uuid -import logging -from typing import Dict, Any, Optional -from rich.progress import Progress, TaskID -from .virtual_machine_manager import VirtualMachineManager, RUN_AGENT_SCRIPT_PATH -from ..benchmarks.base_benchmark import BaseBenchmark -import traceback - -# Set up loggers -logger = logging.getLogger(__name__) - - -class VirtualMachineRunner: - """Handles running agents on Azure VMs""" - - def __init__( - self, - log_dir: str, - task_timeout: int, - max_concurrent: int = 1, - benchmark: Optional[BaseBenchmark] = None, - ): - self.max_concurrent = max_concurrent - self.log_dir = log_dir - self.task_timeout = task_timeout - self.vm_manager = VirtualMachineManager() - self._semaphore = asyncio.Semaphore(max_concurrent) - self._file_lock = asyncio.Lock() - self.benchmark = benchmark - - async def fetch_agent_logs(self, vm_name, ssh_private_key_path, task_id): - """Fetch the latest agent trace log from a VM and store it locally.""" - try: - result = await asyncio.to_thread(self.vm_manager.get_agent_trace, vm_name) - - if result: - # Log the agent output through logger - logger.info( - f"Agent output for task {task_id} on VM {vm_name}:\n{result}" - ) - - if self.log_dir: - trace_dir = os.path.join(self.log_dir, "agent_logs") - os.makedirs(trace_dir, exist_ok=True) - - # Write/update the trace file - trace_path = os.path.join(trace_dir, f"{task_id}_log.log") - with open(trace_path, "w") as f: - f.write(result) - - # Also write to a combined trace file - combined_path = os.path.join(trace_dir, "combined_logs.log") - with open(combined_path, "a") as f: - f.write( - f"\n=== {task_id} @ {time.strftime('%Y-%m-%d %H:%M:%S')} ===\n" - ) - f.write(result) - f.write("\n") - - except Exception as e: - logger.error(f"Error fetching logs for {task_id}: {e}") - - async def run_agent( - self, - dataset: Dict[str, Any], - agent_function: str, - agent_dir: str, - agent_args: Dict[str, Any], - run_id: str, - benchmark: Optional[BaseBenchmark] = None, - progress: Optional[Progress] = None, - task: Optional[TaskID] = None, - ) -> Dict[str, Any]: - """Run agent on all tasks using Azure VMs""" - self.benchmark = benchmark - results = {} - vm_names = [] - - async def process_task(task_id: str, input_data: Any) -> Optional[Dict]: - # Create unique VM name - vm_name = ( - f"agent-{benchmark.benchmark_name}-{uuid.uuid4()}"[:32] - .lower() - .replace("_", "-") - ) - vm_names.append(vm_name) - - try: - # Check if the task requires GPU - gpu_required = False - if self.benchmark and hasattr(self.benchmark, "benchmark"): - task_benchmark = self.benchmark.benchmark.get(task_id, {}) - gpu_required = task_benchmark.get("gpu", False) - - # Create VM based on GPU requirement - logger.info( - f"Task {task_id}: Creating Azure virtual machine {vm_name} for task {task_id} with GPU={gpu_required}" - ) - await asyncio.to_thread( - self.vm_manager.create_virtual_machine_by_name, - vm_name=vm_name, - has_gpu=gpu_required, - setup_timeout=self.task_timeout, - ) - - # Create temp directory with all necessary files - temp_dir = tempfile.mkdtemp() - try: - # Create input and args files - input_file = os.path.join(temp_dir, "input.json") - args_file = os.path.join(temp_dir, "agent_args.json") - - with open(input_file, "w") as f: - json.dump({task_id: input_data}, f) - with open(args_file, "w") as f: - json.dump(agent_args, f) - - # Copy task-specific files if they exist in input_data - if isinstance(input_data, dict) and "files" in input_data: - for dest_path, src_path in input_data["files"].items(): - # Remove 'root' prefix and leading slash if present - dest_path = dest_path.replace("/root/", "").lstrip("/") - - # Create destination directory structure in temp_dir - dest_full_path = os.path.join(temp_dir, dest_path) - os.makedirs(os.path.dirname(dest_full_path), exist_ok=True) - - # Copy the file - try: - if os.path.isdir(src_path): - shutil.copytree( - src_path, dest_full_path, dirs_exist_ok=True - ) - else: - shutil.copy2(src_path, dest_full_path) - except Exception as e: - logger.warning( - f"Task {task_id}: Warning: Failed to copy task file {src_path} to {dest_full_path}: {e}" - ) - - # Copy setup script if it exists - if self.benchmark and self.benchmark.setup_script: - setup_script_src = os.path.join(self.benchmark.setup_script) - if os.path.exists(setup_script_src): - setup_script_dest = os.path.join( - temp_dir, "setup_script.sh" - ) - shutil.copy2(setup_script_src, setup_script_dest) - os.chmod(setup_script_dest, 0o755) - - # Drop harness run_agent.py into payload - run_agent_dest = os.path.join(temp_dir, "run_agent.py") - shutil.copy2(RUN_AGENT_SCRIPT_PATH, run_agent_dest) - os.chmod(run_agent_dest, 0o755) - - # Copy all files to VM - logger.info( - f"Task {task_id}: Copying temporary directory files to VM {vm_name}" - ) - await asyncio.to_thread( - self.vm_manager.compress_and_copy_files_to_vm, - vm_name, - temp_dir, - ) - logger.info( - f"Task {task_id}: Copying agent directory files to VM {vm_name}" - ) - await asyncio.to_thread( - self.vm_manager.compress_and_copy_files_to_vm, - vm_name, - agent_dir, - ) - logger.info( - f"Task {task_id}: Finished copying all files to VM {vm_name}" - ) - - finally: - shutil.rmtree(temp_dir) - - # Run agent on VM - await asyncio.to_thread( - self.vm_manager.run_agent_on_virtual_machine, - vm_name, - agent_function, - task_id, - input_data, - agent_args, - run_id, - self.log_dir, - benchmark, - ) - - # Wait for completion or timeout - start_time = time.time() - task_is_complete = None - timeout_secs = self.task_timeout - - while time.time() - start_time < timeout_secs: - try: - logger.info( - f"Task {task_id}: Checking task completion on VM {vm_name}" - ) - # Fetch and store trace logs - await self.fetch_agent_logs( - vm_name=vm_name, - ssh_private_key_path=os.getenv("SSH_PRIVATE_KEY_PATH"), - task_id=task_id, - ) - - task_is_complete = await asyncio.to_thread( - self.vm_manager.check_task_completion, - vm_name, - ) - if task_is_complete is True: - logger.info( - f"Task {task_id}: Task {task_id} completed on VM {vm_name}" - ) - break - except Exception as e: - logger.error( - f"Task {task_id}: Error checking task completion on {vm_name}: {e}" - ) - await asyncio.sleep(30) # Check every 30 seconds - - if task_is_complete is None: - logger.warning( - f"Task {task_id}: timed out after {timeout_secs} seconds" - ) - return {task_id: f"TIMEOUT after {timeout_secs} seconds"} - - # Copy results back - if self.log_dir: - logger.info( - f"Task {task_id}: Copying results from VM {vm_name} to local directory" - ) - dest_dir = os.path.join(self.log_dir, f"{task_id}") - os.makedirs(dest_dir, exist_ok=True) - await asyncio.to_thread( - self.vm_manager.copy_files_from_vm, - vm_name, - dest_dir, - ) - - # Read the output.json file from the copied directory - output_file = os.path.join(dest_dir, "output.json") - if os.path.exists(output_file): - with open(output_file, "r") as f: - result = json.load(f) - else: - # FIXME: this seems to show up, need to debug - logger.warning( - f"Task {task_id}: output.json not found in {dest_dir}" - ) - result = { - task_id: "ERROR: output.json not found after copying from VM" - } - else: - # If no log_dir, we can't copy results, so return an error - logger.error( - f"Task {task_id}: Cannot retrieve results - no log_dir specified" - ) - result = { - task_id: "ERROR: Cannot retrieve results - no log_dir specified" - } - - return result - - except Exception as e: - logger.error(f"Error processing task {task_id} on VM {vm_name}: {e}") - traceback.print_exc() - return {task_id: f"ERROR: {str(e)}"} - - finally: - # Cleanup VM - try: - await asyncio.to_thread( - self.vm_manager.delete_virtual_machine_by_name, vm_name - ) - except Exception as e: - logger.error(f"Task {task_id}: Error deleting VM {vm_name}: {e}") - - # Run tasks in parallel with semaphore to limit concurrency - semaphore = asyncio.Semaphore(self.max_concurrent) - - async def run_with_semaphore(task_id, input_data): - async with semaphore: - result = await process_task(task_id, input_data) - if progress and task is not None: - progress.update(task, advance=1) - return result - - # Create tasks for all inputs - tasks = [ - run_with_semaphore(task_id, input_data) - for task_id, input_data in dataset.items() - ] - - # Run all tasks and gather results - results = await asyncio.gather(*tasks) - - # Merge results - merged_results = {} - for result in results: - if result: - merged_results.update(result) - - # Save raw submissions if log_dir provided - if self.log_dir: - raw_submissions_path = os.path.join( - self.log_dir, f"{run_id}_RAW_SUBMISSIONS.jsonl" - ) - os.makedirs(self.log_dir, exist_ok=True) - - # append to submissions file - with open(raw_submissions_path, "a") as f: - for task_id, result in merged_results.items(): - json.dump({task_id: result}, f) - f.write("\n") - - return merged_results diff --git a/hal/utils/vm/azure_virtual_machine.py b/hal/utils/vm/azure_virtual_machine.py deleted file mode 100644 index a0f4e50..0000000 --- a/hal/utils/vm/azure_virtual_machine.py +++ /dev/null @@ -1,281 +0,0 @@ -"""Azure Virtual Machine representation.""" - -import base64 -import logging -import os -import subprocess -import time - -from azure.mgmt.compute import ComputeManagementClient -from azure.mgmt.network import NetworkManagementClient -from azure.identity import DefaultAzureCredential - -from .utils import run_command - - -logger = logging.getLogger(__name__) - - -class AzureVirtualMachine: - """Represents a single Azure VM.""" - - def __init__( - self, - name: str, - resource_group: str, - location: str, - subscription_id: str, - nsg_id: str, - ssh_public_key: str, - gpu: bool, - timeout: int, - ): - self.name = name - self.resource_group = resource_group - self.location = location - self.vm_size = "Standard_NC4as_T4_v3" if gpu else "Standard_E2as_v5" - self.gpu = gpu - self.timeout = timeout - - # Azure clients - credential = DefaultAzureCredential() - self.compute_client = ComputeManagementClient(credential, subscription_id) - self.network_client = NetworkManagementClient(credential, subscription_id) - - # Store for later use - self.nsg_id = nsg_id - self.ssh_public_key = ssh_public_key - self.public_ip = None - - # Create the VM - self._create() - - def check_for_file_presence_by_path(self, file_path: str) -> bool: - """ - Checks if a file is present on the virtual machine. - - :param self: the virtual machine - :param file_path: the file path to check - :type file_path: str - :return: whether or not the file is present - :rtype: bool - """ - ssh_key_path = os.getenv("SSH_PRIVATE_KEY_PATH") - - cmd = [ - "ssh", - "-i", - ssh_key_path, - "-o", - "StrictHostKeyChecking=no", - "-o", - "ConnectTimeout=5", - f"agent@{self.public_ip}", - f"test -f {file_path}", - ] - result = subprocess.run(cmd, capture_output=True) - - return result.returncode == 0 - - def _create(self) -> None: - """Create VM using Azure SDK.""" - logger.info( - f"Creating VM {self.name} {'with' if self.gpu else 'WITHOUT'} a GPU" - ) - - # Create VNet - vnet_name = f"{self.name}-vnet" - subnet_name = f"{self.name}-subnet" - vnet = self.network_client.virtual_networks.begin_create_or_update( - self.resource_group, - vnet_name, - { - "location": self.location, - "address_space": {"address_prefixes": ["10.0.0.0/16"]}, - "subnets": [{"name": subnet_name, "address_prefix": "10.0.0.0/24"}], - }, - ).result() - subnet = vnet.subnets[0] - - # Create public IP - public_ip_name = f"{self.name}-public-ip" - public_ip = self.network_client.public_ip_addresses.begin_create_or_update( - self.resource_group, - public_ip_name, - { - "location": self.location, - "sku": {"name": "Standard"}, - "public_ip_allocation_method": "Static", - }, - ).result() - self.public_ip = public_ip.ip_address - - # Create NIC - nic_name = f"{self.name}-nic" - nic = self.network_client.network_interfaces.begin_create_or_update( - self.resource_group, - nic_name, - { - "location": self.location, - "ip_configurations": [ - { - "name": "default", - "subnet": {"id": subnet.id}, - "public_ip_address": {"id": public_ip.id}, - } - ], - "network_security_group": {"id": self.nsg_id}, - }, - ).result() - - # Load cloud-init config - cloud_init_path = os.path.join(os.path.dirname(__file__), "cloud_init.yaml") - with open(cloud_init_path, "r") as f: - cloud_init_config = f.read() - - # Azure requires custom_data to be base64 encoded - custom_data = base64.b64encode(cloud_init_config.encode()).decode() - - # Create VM - # Use Ubuntu-HPC image for GPU VMs (has NVIDIA drivers pre-installed) - if self.gpu: - image_reference = { - "publisher": "microsoft-dsvm", - "offer": "ubuntu-hpc", - "sku": "2204", - "version": "latest", - } - logger.info( - f"Using Ubuntu-HPC 22.04 image with pre-installed GPU drivers for {self.name}" - ) - else: - image_reference = { - "publisher": "Canonical", - "offer": "0001-com-ubuntu-server-jammy", - "sku": "22_04-lts-gen2", - "version": "latest", - } - logger.info(f"Using standard Ubuntu 22.04 image for {self.name}") - - vm_params = { - "location": self.location, - "storage_profile": { - "image_reference": image_reference, - "os_disk": {"createOption": "FromImage", "diskSizeGB": 80}, - }, - "hardware_profile": {"vm_size": self.vm_size}, - "os_profile": { - "computer_name": self.name, - "admin_username": "agent", - "custom_data": custom_data, - "linux_configuration": { - "disable_password_authentication": True, - "ssh": { - "public_keys": [ - { - "path": "/home/agent/.ssh/authorized_keys", - "key_data": self.ssh_public_key, - } - ] - }, - }, - }, - "network_profile": {"network_interfaces": [{"id": nic.id}]}, - } - - self.compute_client.virtual_machines.begin_create_or_update( - self.resource_group, self.name, vm_params - ).result() - - logger.info(f"VM {self.name} created at {self.public_ip}") - - # Wait for startup script to complete - startup_start = time.time() - self._wait_for_setup_to_complete() - startup_duration = int(time.time() - startup_start) - logger.info( - f"Startup script completed for VM {self.name} in {startup_duration} seconds" - ) - - def _wait_for_setup_to_complete(self) -> None: - """Wait for startup script to complete by checking for sentinel file. - - Uses self.timeout (passed from VirtualMachineRunner.task_timeout). - """ - start_time = time.time() - sentinel_file = "/home/agent/startup_complete" - - logger.info( - f"Waiting for startup script to complete on {self.name} (timeout: {self.timeout}s)" - ) - while time.time() - start_time < self.timeout: - if self.check_for_file_presence_by_path(sentinel_file): - logger.debug( - f"Startup script completed on {self.name} at {self.public_ip}" - ) - return - - time.sleep(10) - - raise TimeoutError( - f"Startup script did not complete on {self.name} ({self.public_ip}) within {self.timeout} seconds" - ) - - def delete(self) -> None: - """Delete this VM and all resources.""" - logger.info(f"Deleting VM {self.name}") - - resources = [ - ("vm", self.name), - ("nic", f"{self.name}-nic"), - ("public-ip", f"{self.name}-public-ip"), - ("vnet", f"{self.name}-vnet"), - ] - - for resource_type, resource_name in resources: - if resource_type == "vm": - cmd = [ - "az", - "vm", - "delete", - "--resource-group", - self.resource_group, - "--name", - resource_name, - "--yes", - ] - elif resource_type == "nic": - cmd = [ - "az", - "network", - "nic", - "delete", - "--resource-group", - self.resource_group, - "--name", - resource_name, - ] - elif resource_type == "public-ip": - cmd = [ - "az", - "network", - "public-ip", - "delete", - "--resource-group", - self.resource_group, - "--name", - resource_name, - ] - elif resource_type == "vnet": - cmd = [ - "az", - "network", - "vnet", - "delete", - "--resource-group", - self.resource_group, - "--name", - resource_name, - ] - - run_command(cmd, check=False) diff --git a/hal/utils/vm/cloud_init.yaml b/hal/utils/vm/cloud_init.yaml deleted file mode 100644 index dd8d368..0000000 --- a/hal/utils/vm/cloud_init.yaml +++ /dev/null @@ -1,49 +0,0 @@ -#cloud-config -runcmd: - - | - ## FIXME: combine with setup_vm.sh - #!/bin/bash - set -e - - # Redirect all output to log file - exec > /home/agent/cloud_init.log 2>&1 - - # Azure VM startup script for HAL agent execution - # This runs on first boot via cloud-init - - echo "Starting VM initialization..." - touch /home/agent/startup_started - chown agent:agent /home/agent/startup_started - - # 1. Update package lists - sudo DEBIAN_FRONTEND=noninteractive apt-get update - - # 2. Upgrade existing packages - sudo DEBIAN_FRONTEND=noninteractive apt-get upgrade -y - - # 3. Install Python 3 and pip - sudo DEBIAN_FRONTEND=noninteractive apt-get install -y python3 python3-pip python3-venv - - # 4. Install system dependencies - sudo DEBIAN_FRONTEND=noninteractive apt-get install -y build-essential git curl wget - - # 5. Install miniconda - wget --quiet "https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh" -O /tmp/miniconda.sh - bash /tmp/miniconda.sh -b -p /home/agent/miniconda3 - rm /tmp/miniconda.sh - chown -R agent:agent /home/agent/miniconda3 - - # 6. Create conda environment as agent user - sudo -u agent bash -c " - export CONDA_PLUGINS_AUTO_ACCEPT_TOS=true && \ - source /home/agent/miniconda3/etc/profile.d/conda.sh && \ - conda create -y -n agent_env python=3.12 && \ - conda activate agent_env - " - - # 8. Create sentinel file to signal completion - rm /home/agent/startup_started - touch /home/agent/startup_complete - chown agent:agent /home/agent/startup_complete - - echo "VM initialization complete!" diff --git a/hal/utils/vm/run_agent.py b/hal/utils/vm/run_agent.py deleted file mode 100644 index cfcfba3..0000000 --- a/hal/utils/vm/run_agent.py +++ /dev/null @@ -1,72 +0,0 @@ -#!/usr/bin/env python3 -""" -Static entrypoint for running an agent on a VM. Gets copied by hal/utils/virtual_machine_manager.py into the VM - -Reads configuration from environment variables (loaded from .env and run_agent.env). -Required env vars: RUN_ID, AGENT_FUNCTION, TASK_ID. -Input data and agent kwargs are read from input.json and agent_args.json in the -current working directory (/home/agent). -""" - -import os -import sys -import json -import importlib.util -import traceback - -from dotenv import load_dotenv - -# Load harness .env and run-specific env (written by VM manager) -load_dotenv("/home/agent/.env") -load_dotenv("/home/agent/run_agent.env") - -RUN_ID = os.environ.get("RUN_ID") -AGENT_FUNCTION = os.environ.get("AGENT_FUNCTION") -TASK_ID = os.environ.get("TASK_ID") - -missing = [k for k in ("RUN_ID", "AGENT_FUNCTION", "TASK_ID") if not os.environ.get(k)] -if missing: - print(f"ERROR: Missing required env vars: {', '.join(missing)}", file=sys.stderr) - sys.exit(1) - - -def main(): - import weave - - weave.init(RUN_ID) - - with open("/home/agent/input.json", "r") as f: - input_data = json.load(f) - - with open("/home/agent/agent_args.json", "r") as f: - agent_args = json.load(f) - - module_name, _, function_name = AGENT_FUNCTION.rpartition(".") - if not module_name or not function_name: - print( - f"ERROR: AGENT_FUNCTION must be 'module.function', got: {AGENT_FUNCTION!r}", - file=sys.stderr, - ) - sys.exit(1) - - agent_path = os.path.join("/home/agent", f"{module_name}.py") - spec = importlib.util.spec_from_file_location(module_name, agent_path) - module = importlib.util.module_from_spec(spec) - spec.loader.exec_module(module) - agent = getattr(module, function_name) - - with weave.attributes({"weave_task_id": TASK_ID}): - result = agent(input_data, **agent_args) - - with open("/home/agent/output.json", "w") as f: - json.dump(result, f) - - -if __name__ == "__main__": - try: - main() - except Exception as e: - with open("/home/agent/error.log", "w") as f: - f.write(f"ERROR: {e}\n") - f.write(traceback.format_exc()) - raise diff --git a/hal/utils/vm/utils.py b/hal/utils/vm/utils.py deleted file mode 100644 index 4707ef3..0000000 --- a/hal/utils/vm/utils.py +++ /dev/null @@ -1,42 +0,0 @@ -"""Utility functions for v2.""" - -import logging -import subprocess - - -logger = logging.getLogger(__name__) - - -def run_command(cmd: list[str], check: bool = True) -> subprocess.CompletedProcess: - """Run a subprocess command with logging. - - Args: - cmd: Command and arguments as list - check: If True, raise on non-zero exit code - - Returns: - CompletedProcess with stdout/stderr captured - - Raises: - subprocess.CalledProcessError: If check=True and command fails - """ - logger.debug(f"Running command: {' '.join(cmd)}") - - result = subprocess.run(cmd, capture_output=True, text=True, check=False) - - if result.stdout: - logger.debug(f"Command stdout: {result.stdout}") - if result.stderr: - logger.debug(f"Command stderr: {result.stderr}") - - if check and result.returncode != 0: - logger.error( - f"Command failed with exit code {result.returncode}: {' '.join(cmd)}" - ) - raise subprocess.CalledProcessError( - result.returncode, cmd, result.stdout, result.stderr - ) - elif result.returncode != 0: - logger.warning(f"Command failed (ignoring): {' '.join(cmd)}") - - return result diff --git a/pyproject.toml b/pyproject.toml index 7c02a40..428c4d0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -32,14 +32,6 @@ dev = [ swebench = [ "swebench @ git+https://github.com/SWE-bench/SWE-bench@006a760a95c9cc11e987884d7e311d74a16db88a", ] -azure = [ - "azure-mgmt-compute>=29.1.0", - "azure-mgmt-network>=25.1.0", - "azure-mgmt-resource>=23.0.1", - "azure-identity>=1.12.0", - "paramiko==3.5.0", - "requests>=2.31.0", # For Azure VM detection via IMDS -] appworld = [ "appworld @ git+https://github.com/stonybrooknlp/appworld.git", "appworld-experiments[simplified,smolagents] @ git+https://github.com/stonybrooknlp/appworld.git@main#egg=appworld-experiments&subdirectory=experiments", diff --git a/reliability_eval/config.py b/reliability_eval/config.py index 53fd86d..2b8d728 100644 --- a/reliability_eval/config.py +++ b/reliability_eval/config.py @@ -436,7 +436,7 @@ "taubench_airline": { "benchmark_name": "taubench_airline", "requires_docker": False, - "requires_vm": False, + "max_concurrent": 5, # Can run in parallel "task_ids": TAUBENCH_AIRLINE_CLEAN_TASKS, "compliance_constraints": [ @@ -452,7 +452,7 @@ # "taubench_retail": { # "benchmark_name": "taubench_retail", # "requires_docker": False, - # "requires_vm": False, + # "max_concurrent": 5, # Can run in parallel # "compliance_constraints": [ # "pii_handling_customer_service", # Customer service context @@ -467,7 +467,7 @@ # "gaia": { # "benchmark_name": "gaia", # "requires_docker": False, - # "requires_vm": False, + # "max_concurrent": 5, # GAIA tasks are independent, can run in parallel # "compliance_constraints": [ # "pii_handling_gaia", # Q&A context - providing requested info is expected diff --git a/reliability_eval/phases/runner.py b/reliability_eval/phases/runner.py index 0e93a85..b30f78e 100644 --- a/reliability_eval/phases/runner.py +++ b/reliability_eval/phases/runner.py @@ -167,9 +167,6 @@ def build_base_command( if benchmark_config.get("requires_docker", False): cmd.append("--docker") - if benchmark_config.get("requires_vm", False): - cmd.append("--vm") - if results_dir and results_dir != "results": cmd.extend(["--results_dir", results_dir]) diff --git a/tests/reliability_eval/test_config.py b/tests/reliability_eval/test_config.py index 7885628..9aa1b14 100644 --- a/tests/reliability_eval/test_config.py +++ b/tests/reliability_eval/test_config.py @@ -30,7 +30,6 @@ def test_each_entry_has_required_keys(self): required = { "benchmark_name", "requires_docker", - "requires_vm", "max_concurrent", } for name, cfg in BENCHMARK_CONFIGS.items(): diff --git a/tests/reliability_eval/test_phases.py b/tests/reliability_eval/test_phases.py index 32e854c..59fdd7d 100644 --- a/tests/reliability_eval/test_phases.py +++ b/tests/reliability_eval/test_phases.py @@ -21,7 +21,6 @@ BENCHMARK_CONFIG = { "benchmark_name": "taubench_airline", "requires_docker": False, - "requires_vm": False, "max_concurrent": 5, "compliance_constraints": ["pii_handling_customer_service"], } diff --git a/tests/utils/vm/__init__.py b/tests/utils/vm/__init__.py deleted file mode 100644 index 9a56d50..0000000 --- a/tests/utils/vm/__init__.py +++ /dev/null @@ -1 +0,0 @@ -"""Tests for hal.utils.vm package.""" diff --git a/tests/utils/vm/test_azure_virtual_machine.py b/tests/utils/vm/test_azure_virtual_machine.py deleted file mode 100644 index 353b10f..0000000 --- a/tests/utils/vm/test_azure_virtual_machine.py +++ /dev/null @@ -1,221 +0,0 @@ -"""Tests for AzureVirtualMachine class.""" - -import os -from unittest.mock import Mock, patch - -from hal.utils.vm.azure_virtual_machine import AzureVirtualMachine - - -class TestAzureVirtualMachineCheckForFilePresence: - """Tests for check_for_file_presence_by_path method.""" - - @patch.dict(os.environ, {"SSH_PRIVATE_KEY_PATH": "/path/to/key"}) - @patch("subprocess.run") - def test_calls_the_right_subprocess_command_when_the_test_file_exists( - self, mock_run - ): - """Test when file exists on VM.""" - # Setup - mock_run.return_value = Mock(returncode=0) - vm = self._create_mock_vm() - vm.public_ip = "1.2.3.4" - - # Execute - result = vm.check_for_file_presence_by_path("/path/to/file.txt") - - # Assert - assert result is True - mock_run.assert_called_once_with( - [ - "ssh", - "-i", - "/path/to/key", - "-o", - "StrictHostKeyChecking=no", - "-o", - "ConnectTimeout=5", - "agent@1.2.3.4", - "test -f /path/to/file.txt", - ], - capture_output=True, - ) - - @patch.dict(os.environ, {"SSH_PRIVATE_KEY_PATH": "/path/to/key"}) - @patch("subprocess.run") - def test_calls_the_right_subprocess_command_when_the_test_file_does_not_exist( - self, mock_run - ): - """Test when file does not exist on VM.""" - # Setup - mock_run.return_value = Mock(returncode=1) - vm = self._create_mock_vm() - vm.public_ip = "1.2.3.4" - - # Execute - result = vm.check_for_file_presence_by_path("/path/to/missing.txt") - - # Assert - assert result is False - mock_run.assert_called_once_with( - [ - "ssh", - "-i", - "/path/to/key", - "-o", - "StrictHostKeyChecking=no", - "-o", - "ConnectTimeout=5", - "agent@1.2.3.4", - "test -f /path/to/missing.txt", - ], - capture_output=True, - ) - - @patch.dict( - os.environ, {"SSH_PRIVATE_KEY_PATH": "/home/user/.ssh/id_rsa_CUSTOMPATH"} - ) - @patch("subprocess.run") - def test_pulls_ssh_key_path_from_env(self, mock_run): - """Test that correct SSH key path from environment is used.""" - # Setup - mock_run.return_value = Mock(returncode=0) - vm = self._create_mock_vm() - vm.public_ip = "10.0.0.1" - - # Execute - vm.check_for_file_presence_by_path("/test") - - # Assert - call_args = mock_run.call_args[0][0] - assert call_args[0] == "ssh" - assert call_args[1] == "-i" - assert call_args[2] == "/home/user/.ssh/id_rsa_CUSTOMPATH" - - def _create_mock_vm(self) -> AzureVirtualMachine: - """Create a mock AzureVirtualMachine instance without calling __init__.""" - vm = object.__new__(AzureVirtualMachine) - return vm - - -class TestAzureVirtualMachineInit: - """Tests for __init__ method GPU configuration.""" - - @patch("hal.utils.vm.azure_virtual_machine.DefaultAzureCredential") - @patch("hal.utils.vm.azure_virtual_machine.ComputeManagementClient") - @patch("hal.utils.vm.azure_virtual_machine.NetworkManagementClient") - @patch.object(AzureVirtualMachine, "_create") - def test_gpu_enabled_uses_gpu_vm_size(self, mock_create, _, __, ___): - """Test that gpu=True results in GPU VM size.""" - # Execute - vm = AzureVirtualMachine( - name="test-vm", - resource_group="test-rg", - location="eastus", - subscription_id="sub-123", - nsg_id="nsg-123", - ssh_public_key="ssh-rsa AAAAB3...", - gpu=True, - timeout=555, - ) - - # Assert - assert vm.vm_size == "Standard_NC4as_T4_v3" - assert vm.gpu is True - mock_create.assert_called_once() - - @patch("hal.utils.vm.azure_virtual_machine.DefaultAzureCredential") - @patch("hal.utils.vm.azure_virtual_machine.ComputeManagementClient") - @patch("hal.utils.vm.azure_virtual_machine.NetworkManagementClient") - @patch.object(AzureVirtualMachine, "_create") - def test_gpu_disabled_uses_provided_vm_size(self, mock_create, _, __, ___): - """Test that gpu=False uses the provided VM size.""" - # Execute - vm = AzureVirtualMachine( - name="test-vm", - resource_group="test-rg", - location="westus", - subscription_id="sub-456", - nsg_id="nsg-456", - ssh_public_key="ssh-rsa AAAAB3...", - gpu=False, - timeout=555, - ) - - # Assert - assert vm.vm_size == "Standard_E2as_v5" - assert vm.gpu is False - mock_create.assert_called_once() - - @patch("hal.utils.vm.azure_virtual_machine.DefaultAzureCredential") - @patch("hal.utils.vm.azure_virtual_machine.ComputeManagementClient") - @patch("hal.utils.vm.azure_virtual_machine.NetworkManagementClient") - @patch.object(AzureVirtualMachine, "_create") - def test_default_gpu_false_uses_default_vm_size(self, mock_create, _, __, ___): - """Test that default gpu=False uses the default VM size.""" - # Execute - vm = AzureVirtualMachine( - name="test-vm", - resource_group="test-rg", - location="centralus", - subscription_id="sub-789", - nsg_id="nsg-789", - ssh_public_key="ssh-rsa AAAAB3...", - gpu=False, - timeout=555, - ) - - # Assert - assert vm.vm_size == "Standard_E2as_v5" - mock_create.assert_called_once() - - @patch("hal.utils.vm.azure_virtual_machine.DefaultAzureCredential") - @patch("hal.utils.vm.azure_virtual_machine.ComputeManagementClient") - @patch("hal.utils.vm.azure_virtual_machine.NetworkManagementClient") - @patch.object(AzureVirtualMachine, "_create") - def test_gpu_true_overrides_vm_size(self, mock_create, _, __, ___): - """Test that gpu=True overrides any provided vm_size.""" - # Execute - vm = AzureVirtualMachine( - name="test-vm", - resource_group="test-rg", - location="eastus", - subscription_id="sub-abc", - nsg_id="nsg-abc", - ssh_public_key="ssh-rsa AAAAB3...", - gpu=True, - timeout=555, - ) - - # Assert - GPU size should override the provided size - assert vm.vm_size == "Standard_NC4as_T4_v3" - assert vm.gpu is True - mock_create.assert_called_once() - - @patch("hal.utils.vm.azure_virtual_machine.DefaultAzureCredential") - @patch("hal.utils.vm.azure_virtual_machine.ComputeManagementClient") - @patch("hal.utils.vm.azure_virtual_machine.NetworkManagementClient") - @patch.object(AzureVirtualMachine, "_create") - def test_stores_all_initialization_parameters(self, mock_create, _, __, ___): - """Test that all initialization parameters are stored correctly.""" - # Execute - vm = AzureVirtualMachine( - name="my-vm", - resource_group="my-rg", - location="westeurope", - subscription_id="sub-xyz", - nsg_id="nsg-xyz", - ssh_public_key="ssh-rsa AAAAB3NzaC1...", - gpu=False, - timeout=555, - ) - - # Assert - assert vm.name == "my-vm" - assert vm.resource_group == "my-rg" - assert vm.location == "westeurope" - assert vm.gpu is False - assert vm.nsg_id == "nsg-xyz" - assert vm.ssh_public_key == "ssh-rsa AAAAB3NzaC1..." - assert vm.timeout == 555 - assert vm.public_ip is None # Not set until _create runs - mock_create.assert_called_once()