Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions harbor_cookbook/gepa/optimize.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
DEFAULT_ENVIRONMENT,
DEFAULT_MODEL,
download_tasks,
init_queue,
run_trial,
split_tasks,
)
Expand Down Expand Up @@ -173,6 +174,8 @@ def main():
_model_name = args.model
_environment = args.environment

init_queue(n_concurrent=args.max_workers)

logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)-8s %(message)s",
Expand Down
11 changes: 8 additions & 3 deletions harbor_cookbook/gepa/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from collections import defaultdict
from pathlib import Path

from harbor import TrialQueue
from harbor.models.environment_type import EnvironmentType
from harbor.models.trial.config import (
AgentConfig,
Expand All @@ -18,17 +19,21 @@
VerifierConfig,
)
from harbor.registry.client import RegistryClientFactory
from harbor.trial.trial import Trial

log = logging.getLogger(__name__)

DEFAULT_AGENT = "codex"
DEFAULT_MODEL = "openai/gpt-5-nano"
DEFAULT_ENVIRONMENT = EnvironmentType.DOCKER

# Single event loop shared across GEPA worker threads.
_loop = asyncio.new_event_loop()
threading.Thread(target=_loop.run_forever, daemon=True).start()
_queue: TrialQueue | None = None


def init_queue(n_concurrent: int) -> None:
global _queue
_queue = TrialQueue(n_concurrent=n_concurrent)


def download_tasks():
Expand Down Expand Up @@ -106,7 +111,7 @@ def run_trial(
)

log.debug("Starting trial for %s", task_dir.name)
result = asyncio.run_coroutine_threadsafe(Trial(config).run(), _loop).result()
result = asyncio.run_coroutine_threadsafe(_queue.submit(config), _loop).result()

rewards = result.verifier_result.rewards if result.verifier_result else {}
exc = result.exception_info
Expand Down
Loading