Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion chutes/cfsv_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,13 @@

def main():
binary_path = Path(__file__).parent / "cfsv"
os.chmod(binary_path, os.stat(binary_path).st_mode | stat.S_IEXEC)
if not binary_path.exists():
print(f"Error: cfsv binary not found at {binary_path}", file=sys.stderr)
sys.exit(1)
try:
os.chmod(binary_path, os.stat(binary_path).st_mode | stat.S_IEXEC)
except OSError as e:
print(f"Warning: Could not set executable permission on {binary_path}: {e}", file=sys.stderr)
result = subprocess.run([str(binary_path)] + sys.argv[1:])
sys.exit(result.returncode)

Expand Down
43 changes: 43 additions & 0 deletions chutes/chute/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,49 @@ async def initialize(self):
for job in self._jobs:
logger.info(f"Found job definition: {job._func.__name__}")

def _register_warmup_routes(self):
"""
Register warmup cords routes.
These routes are available even during cold start.
"""
from chutes.chute.warmup import get_warmup_manager
from fastapi import Request, Response, status
from fastapi.responses import JSONResponse

warmup_manager = get_warmup_manager()

async def warmup_kick(request: Request) -> Response:
"""POST /warmup/kick - Start or restart warmup (idempotent)."""
try:
result = await warmup_manager.kick()
return JSONResponse(
content=result,
status_code=status.HTTP_202_ACCEPTED
)
except Exception as e:
logger.error(f"Error in warmup/kick: {e}")
return JSONResponse(
content={"error": str(e)},
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR
)

async def warmup_status(request: Request) -> Response:
"""GET /warmup/status - Get current warmup status."""
try:
status_data = await warmup_manager.get_status()
return JSONResponse(content=status_data.model_dump())
except Exception as e:
logger.error(f"Error in warmup/status: {e}")
return JSONResponse(
content={"error": str(e)},
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR
)

# Register routes
self.add_api_route("/warmup/kick", warmup_kick, methods=["POST"])
self.add_api_route("/warmup/status", warmup_status, methods=["GET"])
logger.info("Registered warmup cords: /warmup/kick, /warmup/status")

def cord(self, **kwargs):
"""
Decorator to define a parachute cord (function).
Expand Down
40 changes: 39 additions & 1 deletion chutes/chute/cord.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,39 @@ async def _local_stream_call(self, *args, **kwargs):
else:
yield data["result"]

def _process_prefill_messages(self, request_data: dict) -> dict:
"""
Process prefill messages (partial/prefix) for vLLM compatibility.
Strips unsupported fields and handles prefill content.
"""
if not isinstance(request_data, dict) or "messages" not in request_data:
return request_data

processed_messages = []
for msg in request_data["messages"]:
if not isinstance(msg, dict):
processed_messages.append(msg)
continue

# Create a clean message copy
clean_msg = {
"role": msg.get("role"),
"content": msg.get("content", ""),
}

# Handle prefill: if assistant message has partial/prefix, keep content for prefill
# vLLM will use assistant message content as prefill automatically
if msg.get("role") == "assistant" and (msg.get("partial") or msg.get("prefix")):
# Keep the content - vLLM uses assistant message content as prefill
# Remove unsupported fields
logger.debug(f"Processing prefill message with content: {clean_msg['content'][:50]}...")

# Remove unsupported fields that vLLM doesn't understand
processed_messages.append(clean_msg)

request_data["messages"] = processed_messages
return request_data

@asynccontextmanager
async def _passthrough_call(self, **kwargs):
"""
Expand All @@ -278,6 +311,11 @@ async def _passthrough_call(self, **kwargs):
headers = kwargs.pop("headers", {}) or {}
if self._app.passthrough_headers:
headers.update(self._app.passthrough_headers)

# Process prefill messages if this is a chat completion request
if "json" in kwargs and isinstance(kwargs["json"], dict):
kwargs["json"] = self._process_prefill_messages(kwargs["json"])

kwargs["headers"] = headers
async with aiohttp.ClientSession(
timeout=aiohttp.ClientTimeout(connect=5.0, total=900.0),
Expand Down Expand Up @@ -453,7 +491,7 @@ async def _request_handler(self, request: Request):
args = []
kwargs = {"json": request.state.decrypted} if request.state.decrypted else {}
if not self._passthrough:
if self.input_models and all([isinstance(args[idx], dict) for idx in range(len(args))]):
if self.input_models and len(args) == len(self.input_models) and all([isinstance(args[idx], dict) for idx in range(len(args))]):
try:
args = [
self.input_models[idx](**args[idx]) for idx in range(len(self.input_models))
Expand Down
28 changes: 26 additions & 2 deletions chutes/chute/template/vllm.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,8 +252,8 @@ def build_vllm_chute(
suggested_commit = None
try:
suggested_commit = get_current_hf_commit(model_name)
except Exception:
...
except Exception as exc:
logger.debug(f"Failed to fetch current HF commit for {model_name}: {exc}")
suggestion = (
"Unable to fetch the current refs/heads/main commit from HF, please check the model name."
if not suggested_commit
Expand Down Expand Up @@ -314,6 +314,10 @@ async def initialize_vllm(self):
nonlocal model_name
nonlocal image

# Initialize warmup manager for phase tracking
from chutes.chute.warmup import get_warmup_manager
warmup_manager = get_warmup_manager()

# Imports here to avoid needing torch/vllm/etc. to just perform inference/build remotely.
import torch
import multiprocessing
Expand All @@ -323,6 +327,9 @@ async def initialize_vllm(self):
from vllm.entrypoints.openai.serving_completion import OpenAIServingCompletion
import vllm.version as vv

# Update warmup phase: pulling
await warmup_manager.update_phase("pulling", 10)

# Force download in initializer with some retries.
from huggingface_hub import snapshot_download

Expand All @@ -347,6 +354,9 @@ async def initialize_vllm(self):
# Set torch inductor, flashinfer, etc., cache directories.
set_default_cache_dirs(download_path)

# Update warmup phase: loading
await warmup_manager.update_phase("loading", 40)

try:
from vllm.entrypoints.openai.serving_engine import BaseModelPath
except Exception:
Expand Down Expand Up @@ -392,6 +402,9 @@ async def initialize_vllm(self):
# Initialize engine directly in the main process
self.engine = AsyncLLMEngine.from_engine_args(engine_args)

# Update warmup phase: tokenizer
await warmup_manager.update_phase("tokenizer", 60)

base_model_paths = [
BaseModelPath(name=chute.name, model_path=chute.name),
]
Expand Down Expand Up @@ -481,6 +494,17 @@ async def initialize_vllm(self):
if not old_vllm:
self.state.openai_serving_models = extra_args["models"]

# Update warmup phase: tiny_infer (engine is ready, do a test inference)
await warmup_manager.update_phase("tiny_infer", 80)

# Perform a tiny inference to verify the model is working
try:
# This is a minimal check - the actual readiness is verified by /v1/models
# The warmup manager will poll /v1/models and set ready when it returns 200
logger.info("vLLM engine initialized, waiting for /v1/models to become ready")
except Exception as e:
logger.warning(f"Tiny inference check failed: {e}")

def _parse_stream_chunk(encoded_chunk):
chunk = encoded_chunk if isinstance(encoded_chunk, str) else encoded_chunk.decode()
if "data: {" in chunk:
Expand Down
174 changes: 174 additions & 0 deletions chutes/chute/warmup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
"""
Warmup state manager for canonical warmup cords.
Provides /warmup/kick and /warmup/status endpoints.
"""

import os
import time
import asyncio
import aiohttp
from typing import Optional
from loguru import logger
from chutes.chute.warmup_state import WarmupStatus


class WarmupManager:
"""
Singleton warmup state manager.
Manages warmup state, phases, and readiness checking.
"""

_instance: Optional["WarmupManager"] = None

def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._initialized = False
return cls._instance

def __init__(self):
if self._initialized:
return
self._state = WarmupStatus()
self._warmup_task: Optional[asyncio.Task] = None
self._state_lock = asyncio.Lock()
self._started_at: Optional[float] = None
self._port: int = 8000 # Default port, can be updated
self._initialized = True

async def kick(self) -> dict:
"""
Start or restart warmup (idempotent).
Returns 202 Accepted response data.
"""
async with self._state_lock:
if self._state.state in ("warming", "ready"):
return {
"ok": True,
"state": self._state.state,
"schema": self._state.schema
}

# Reset and start warmup (also resets error state)
self._state.state = "warming"
self._state.phase = "pulling"
self._state.progress = 0
self._state.error = None
self._started_at = time.time()
self._state.elapsed_sec = 0.0

# Cancel existing task if running
if self._warmup_task and not self._warmup_task.done():
self._warmup_task.cancel()

# Start background warmup task
self._warmup_task = asyncio.create_task(self._do_warmup())

return {
"ok": True,
"state": "warming",
"schema": self._state.schema
}

async def get_status(self) -> WarmupStatus:
"""
Get current warmup status (fast, <500ms).
"""
async with self._state_lock:
if self._started_at:
self._state.elapsed_sec = time.time() - self._started_at
return self._state

async def update_phase(self, phase: str, progress: int):
"""
Update warmup phase and progress.
Called by templates during model initialization.

Args:
phase: One of "pulling", "loading", "tokenizer", "tiny_infer", "ready"
progress: Integer between 0-100
"""
# Validate progress bounds
progress = max(0, min(100, progress))

async with self._state_lock:
if self._state.state == "warming":
# Validate phase (allow any string for flexibility, but log warnings for invalid ones)
valid_phases = ("pulling", "loading", "tokenizer", "tiny_infer", "ready")
if phase not in valid_phases:
logger.warning(f"Invalid warmup phase: {phase}, expected one of {valid_phases}")

self._state.phase = phase
self._state.progress = progress
if self._started_at:
self._state.elapsed_sec = time.time() - self._started_at

def set_port(self, port: int):
"""
Set the port for internal /v1/models checks.
Should be called during chute initialization.
"""
self._port = port

async def _check_models_ready(self, base_url: str) -> bool:
"""
Check if /v1/models endpoint returns 200.
Used for readiness bridge.
"""
try:
timeout = aiohttp.ClientTimeout(total=5.0)
async with aiohttp.ClientSession(timeout=timeout) as session:
async with session.get(f"{base_url.rstrip('/')}/v1/models") as response:
return response.status == 200
except Exception as e:
logger.debug(f"Models readiness check failed: {e}")
return False

async def _do_warmup(self):
"""
Background task that monitors warmup progress and checks readiness.
"""
# Use localhost for internal checks since we're checking the same instance
base_url = f"http://127.0.0.1:{self._port}"

# Wait for models to become ready (max 3 minutes)
max_wait = 180 # 3 minutes
check_interval = 1.0 # Check every second

for attempt in range(max_wait):
await asyncio.sleep(check_interval)

# Update elapsed time
async with self._state_lock:
if self._started_at:
self._state.elapsed_sec = time.time() - self._started_at

# Check if models are ready
if await self._check_models_ready(base_url):
async with self._state_lock:
# Only set ready if we're still in warming state
if self._state.state == "warming":
self._state.phase = "ready"
self._state.progress = 100
self._state.state = "ready"
if self._started_at:
self._state.elapsed_sec = time.time() - self._started_at
logger.info("Warmup completed: models are ready")
return

# Timeout
async with self._state_lock:
if self._state.state == "warming":
self._state.state = "error"
self._state.error = "models_not_ready_within_timeout"
if self._started_at:
self._state.elapsed_sec = time.time() - self._started_at
logger.warning("Warmup timed out: models not ready within 3 minutes")


def get_warmup_manager() -> WarmupManager:
"""
Get the singleton warmup manager instance.
"""
return WarmupManager()

20 changes: 20 additions & 0 deletions chutes/chute/warmup_state.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
"""
Pydantic models for warmup status schema v1.
"""

from pydantic import BaseModel
from typing import Literal, Optional


class WarmupStatus(BaseModel):
"""
Warmup status response schema v1.
Matches chutes.warmup.status.v1 specification.
"""
schema: Literal["chutes.warmup.status.v1"] = "chutes.warmup.status.v1"
state: Literal["idle", "warming", "ready", "error"] = "idle"
phase: Literal["idle", "pulling", "loading", "tokenizer", "tiny_infer", "ready"] = "idle"
progress: int = 0 # 0-100
elapsed_sec: float = 0.0
error: Optional[str] = None

Loading