Provider-agnostic CLI execution layer for Claude Code, Codex, and Gemini.
types.py:AgentRequest,AgentResponse,CLIResponsebase.py:BaseCLI,CLIConfig,docker_wrap(), Windows helpersfactory.py: provider factory (claude/codex/gemini)service.py:CLIServicegateway for orchestratorinit_wizard.py: interactive onboarding and smart reset flowexecutor.py: shared subprocess lifecycle helpers for provider wrapperstimeout_controller.py: configurable timeout warnings + activity-based extension controllermodel_cache.py: shared base classes for provider model-cache persistence and refresh observersclaude_provider.py: Claude subprocess wrappercodex_provider.py: Codex subprocess wrappergemini_provider.py: Gemini subprocess wrapperstream_events.py: normalized stream events + Claude stream parsercodex_events.py: Codex JSONL parsergemini_events.py: Gemini NDJSON + JSON parsercoalescer.py: streaming text coalescing buffer used by bot streaming dispatchgemini_utils.py: Gemini CLI discovery, trusted folder, model discovery helperscodex_discovery.py: Codex model discovery viacodex app-serverJSON-RPCprocess_registry.py: subprocess tracking/abort/killauth.py: provider auth detectionparam_resolver.py: task override resolution for cron/webhook one-shot runscodex_cache.py,codex_cache_observer.py: Codex model cache + observergemini_cache.py,gemini_cache_observer.py: Gemini model cache + observer
- Orchestrator builds
AgentRequest. CLIService._make_cli()resolves model/provider.CLIServiceConfiginjects provider-specific global CLI args.create_cli()selects provider wrapper.- provider executes subprocess and returns
CLIResponse. - service converts to
AgentResponse.
Environment variables injected into every CLI subprocess (executor.py and docker_wrap):
DUCTOR_CHAT_IDDUCTOR_TOPIC_ID(when set)DUCTOR_TRANSPORT(active transport identifier, e.g."tg","mx")
Configured globally in config.json:
cli_parameters.claudecli_parameters.codexcli_parameters.gemini
CLIService forwards them per provider.
Used by cron and webhook cron_task runs.
- input:
TaskOverrides(provider, model, reasoning_effort, cli_parameters) - output: immutable
TaskExecutionConfig - validation:
- Claude model in
haiku|sonnet|opus - Codex model validated against
CodexModelCache - Gemini model validated against aliases/discovered IDs or
gemini-*patterns
- Claude model in
- Codex reasoning effort applied only when supported by model
- task
cli_parametersare task-level only (no merge with global provider args)
Normalized events in stream_events.py include:
AssistantTextDeltaToolUseEventToolResultEventThinkingEventSystemStatusEventCompactBoundaryEventSystemInitEventResultEvent
CLIService.execute_streaming() behavior:
- routes deltas/events to callbacks,
- checks
ProcessRegistry.was_aborted(chat_id)on each event, - if stream fails or lacks final result event:
- aborted -> empty result,
- non-error with accumulated text -> use accumulated text,
- else retry non-streaming and mark
stream_fallback=True.
Timeout behavior in current production paths:
- provider wrappers accept both
timeout_secondsandtimeout_controller, and pass both into executor helpers. SubprocessSpec.timeout_controlleris used in foreground and named-session flows where orchestrator builds controllers (flows._make_timeout_controller).- when no controller is supplied, executor falls back to plain
asyncio.timeout(...). - remaining timeout-only paths still using
timeout_secondsinclude cron/webhook one-shot runs, inter-agent turns, and task-result/task-question injection turns.
Status-callback nuance:
TimeoutControllerwarning/extension callbacks are not currently wired to emitSystemStatusEvents, so UI labels liketimeout_warning/timeout_extendeddepend on future callback wiring.
messenger/telegram/message_dispatch.py wraps delta delivery with StreamCoalescer (coalescer.py) so Telegram edits flush at readable boundaries (paragraph/sentence/idle/full flush).
Session recovery is orchestrator-managed (flows._recover_session), not CLIService-managed.
Recovery triggers handled in orchestrator flows:
- SIGKILL termination (
returncode == -SIGKILL) - invalid resumed session (
"invalid session"/"session not found"from provider CLI)
- non-streaming uses
--output-format json - streaming uses
--output-format stream-json - respects
--max-turns,--max-budget-usd, session resume/continue
- fresh runs use
codex exec --json --color never --skip-git-repo-check - resumed runs use
codex exec resume [--json] -- <session_id>and do not go through the same--color never --skip-git-repo-checkpath - sandbox/approval flag selection from
permission_mode - reasoning effort via
-c model_reasoning_effort=... continue_session=Trueis ignored for Codex
- command via
gemini(ornode <index.js>when resolved) - non-streaming
--output-format json, streaming--output-format stream-json - permission bypass maps to
--approval-mode yolo - always includes
--include-directories . - trusts workspace path in
~/.gemini/trustedFolders.json - may inject
GEMINI_API_KEYfrom ductor config when Gemini settings indicate API-key mode and no env key is set
Statuses: AUTHENTICATED, INSTALLED, NOT_FOUND.
- Claude:
~/.claude/.credentials.json - Claude fallback paths:
ANTHROPIC_API_KEY, thenclaude auth status - Codex:
$CODEX_HOME/auth.json - Codex fallback paths:
OPENAI_API_KEY; install markers:version.jsonorconfig.toml - Gemini:
- CLI presence (
find_gemini_cli) - OAuth creds (
~/.gemini/oauth_creds.json) - env/.env/API-key/Vertex markers
settings.jsonselected auth mode- optional fallback to
~/.ductor/config/config.jsongemini_api_key
- CLI presence (
- file:
~/.ductor/config/codex_models.json - discovery source:
discover_codex_models()(codex_discovery.py) viacodex app-server(initialize+model/list) - loaded on startup with force refresh
- hourly refresh loop
- file:
~/.ductor/config/gemini_models.json - loaded on startup (uses cache when fresh, refreshes when stale/missing)
- hourly refresh loop
- refresh callback updates runtime Gemini model registry (
set_gemini_models)
ProcessRegistry provides:
- registration/unregistration by chat with optional
topic_idtracking has_active(chat_id, topic_id=None): whentopic_idis given, only processes for that specific topic are considered active; otherwise any process for the chat qualifies- abort markers (
was_aborted,clear_abort) kill_all(chat_id)- stale wall-clock cleanup (
kill_stale)
Windows uses process-tree termination (taskkill /F /T) to avoid orphaned child processes.
docker_wrap(cmd, config, extra_env=None, interactive=False):
- host mode (
config.docker_container == ""): return original command + resolved local cwd - container mode:
- wraps command as
docker exec ... <container> ..., - injects
DUCTOR_CHAT_ID, optionalDUCTOR_TOPIC_ID,DUCTOR_TRANSPORT,DUCTOR_AGENT_NAME,DUCTOR_INTERAGENT_PORT,DUCTOR_HOME,DUCTOR_SHARED_MEMORY_PATH, andDUCTOR_INTERAGENT_HOST, - merges user secrets from
~/.ductor/.env(never overrides existing vars), - forwards optional env vars via
-eflags (extra_env, overrides.env), - uses
-iwheninteractive=True(required for stdin-fed providers like Gemini), - returns
cwd=None(execution happens inside container context).
- wraps command as