diff --git a/harbor_cookbook/gepa/optimize.py b/harbor_cookbook/gepa/optimize.py index 30ba3ce..b6cd588 100644 --- a/harbor_cookbook/gepa/optimize.py +++ b/harbor_cookbook/gepa/optimize.py @@ -26,6 +26,7 @@ DEFAULT_ENVIRONMENT, DEFAULT_MODEL, download_tasks, + init_queue, run_trial, split_tasks, ) @@ -173,6 +174,8 @@ def main(): _model_name = args.model _environment = args.environment + init_queue(n_concurrent=args.max_workers) + logging.basicConfig( level=logging.INFO, format="%(asctime)s %(levelname)-8s %(message)s", diff --git a/harbor_cookbook/gepa/utils.py b/harbor_cookbook/gepa/utils.py index fb17491..ae705bb 100644 --- a/harbor_cookbook/gepa/utils.py +++ b/harbor_cookbook/gepa/utils.py @@ -9,6 +9,7 @@ from collections import defaultdict from pathlib import Path +from harbor import TrialQueue from harbor.models.environment_type import EnvironmentType from harbor.models.trial.config import ( AgentConfig, @@ -18,7 +19,6 @@ VerifierConfig, ) from harbor.registry.client import RegistryClientFactory -from harbor.trial.trial import Trial log = logging.getLogger(__name__) @@ -26,9 +26,14 @@ DEFAULT_MODEL = "openai/gpt-5-nano" DEFAULT_ENVIRONMENT = EnvironmentType.DOCKER -# Single event loop shared across GEPA worker threads. _loop = asyncio.new_event_loop() threading.Thread(target=_loop.run_forever, daemon=True).start() +_queue: TrialQueue | None = None + + +def init_queue(n_concurrent: int) -> None: + global _queue + _queue = TrialQueue(n_concurrent=n_concurrent) def download_tasks(): @@ -106,7 +111,7 @@ def run_trial( ) log.debug("Starting trial for %s", task_dir.name) - result = asyncio.run_coroutine_threadsafe(Trial(config).run(), _loop).result() + result = asyncio.run_coroutine_threadsafe(_queue.submit(config), _loop).result() rewards = result.verifier_result.rewards if result.verifier_result else {} exc = result.exception_info