-
Notifications
You must be signed in to change notification settings - Fork 0
Migrate cloud-api to using a single router endpoint. #240
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This pull request refactors the inference provider system from a distributed "model discovery" approach with load balancing and failover to a simplified "inference router" design that delegates all routing logic to a single external endpoint.
Key changes:
- Replaced
ModelDiscoveryConfigwithInferenceRouterConfig, removing periodic refresh and discovery-specific settings - Simplified
InferenceProviderPoolfrom a complex load balancer with multiple providers to a thin wrapper around a single router endpoint - Removed background refresh tasks, retry/fallback logic, and multi-provider coordination from the pool implementation
Reviewed changes
Copilot reviewed 5 out of 5 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
crates/config/src/types.rs |
Renamed configuration struct and environment variables from MODEL_DISCOVERY_* to INFERENCE_ROUTER_*, removed refresh_interval field |
crates/services/src/inference_provider_pool/mod.rs |
Simplified pool to use single router provider, removed discovery/load-balancing/retry logic, updated shutdown to only clear local mappings |
crates/api/src/lib.rs |
Updated initialization to create pool with router URL instead of discovery server, simplified mock provider setup |
crates/api/src/main.rs |
Updated shutdown logging to reflect resource cleanup instead of background task cancellation |
crates/api/tests/common/mod.rs |
Updated test configuration to use inference_router field instead of model_discovery |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
crates/config/src/types.rs
Outdated
| pub api_key: Option<String>, | ||
| pub refresh_interval: i64, // seconds | ||
| pub timeout: i64, // seconds (for discovery requests) | ||
| pub timeout: i64, // seconds (for router requests) |
Copilot
AI
Dec 9, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The timeout field is defined but never used in the new implementation. The pool creation only uses inference_timeout (via config.inference_router.inference_timeout). This appears to be leftover from the previous model discovery implementation where it was used for discovery HTTP requests.
Consider either:
- Removing this field entirely if it's no longer needed
- Using it for the router's HTTP client timeout if that's the intent
- Documenting why it exists and when it might be used in the future
If removing, also update:
- The
from_env()method (lines 132-135) - The
Defaultimplementation (lines 150-153) - The corresponding environment variable
INFERENCE_ROUTER_TIMEOUT
| // Create a MockProvider that accepts all models (using new_accept_all) | ||
| let mock_provider = Arc::new(MockProvider::new_accept_all()); | ||
|
|
||
| // For the new simplified pool, we need to create a wrapper pool that uses the mock provider | ||
| // Since the pool now expects a router URL, we'll create it with a dummy URL | ||
| // but the mock will be used directly in tests via a different mechanism | ||
| let pool = Arc::new( | ||
| services::inference_provider_pool::InferenceProviderPool::new( | ||
| "http://localhost:8080/models".to_string(), | ||
| "http://localhost:8080".to_string(), | ||
| None, | ||
| 5, | ||
| 30 * 60, | ||
| ), | ||
| ); |
Copilot
AI
Dec 9, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The mock provider is created but never used. The pool is initialized with a real VLlmProvider pointing to "http://localhost:8080", which means tests will attempt real HTTP requests instead of using the mock. This defeats the purpose of having a mock provider.
Consider creating a mechanism to inject the mock provider into the pool, or create a separate test-only pool constructor that accepts a provider directly. For example:
// Add to InferenceProviderPool
pub fn new_with_provider(provider: Arc<InferenceProviderTrait>) -> Self {
Self {
router_provider: provider,
chat_id_mapping: Arc::new(RwLock::new(HashMap::new())),
signature_hashes: Arc::new(RwLock::new(HashMap::new())),
}
}
// Then in this function:
let mock_provider_trait: Arc<InferenceProviderTrait> = mock_provider.clone();
let pool = Arc::new(
services::inference_provider_pool::InferenceProviderPool::new_with_provider(
mock_provider_trait
)
);| /// Register a provider for testing (useful for testing with mock providers) | ||
| pub async fn register_provider( | ||
| &self, | ||
| _model_id: String, | ||
| _provider: Arc<InferenceProviderTrait>, | ||
| ) { | ||
| // For testing, we replace the router provider with the mock | ||
| // This is a simplified approach - in tests we'll use the mock directly | ||
| // Note: We can't actually replace router_provider since it's Arc, but tests | ||
| // should call register_providers which creates a pool differently | ||
| tracing::debug!("register_provider called - this is mainly for backwards compatibility"); | ||
| } | ||
|
|
||
| /// Register multiple providers for multiple models (useful for testing) | ||
| /// Register multiple providers for testing | ||
| /// In the new simplified design, this creates a new pool with a mock provider | ||
| pub async fn register_providers(&self, providers: Vec<(String, Arc<InferenceProviderTrait>)>) { | ||
| let mut model_mapping = self.model_mapping.write().await; | ||
| for (model_id, provider) in providers { | ||
| model_mapping | ||
| .entry(model_id) | ||
| .or_insert_with(Vec::new) | ||
| .push(provider); | ||
| } | ||
| } | ||
|
|
||
| /// Initialize model discovery - should be called during application startup | ||
| pub async fn initialize(&self) -> Result<(), ListModelsError> { | ||
| tracing::debug!( | ||
| url = %self.discovery_url, | ||
| "Initializing model discovery from discovery server" | ||
| ); | ||
|
|
||
| match self.discover_models().await { | ||
| Ok(models_response) => { | ||
| tracing::info!( | ||
| "Successfully discovered {} models", | ||
| models_response.data.len() | ||
| ); | ||
| Ok(()) | ||
| } | ||
| Err(e) => { | ||
| tracing::error!("Failed to initialize model discovery"); | ||
| Err(e) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| /// Fetch and parse models from discovery endpoint | ||
| async fn fetch_from_discovery( | ||
| &self, | ||
| ) -> Result<HashMap<String, DiscoveryEntry>, ListModelsError> { | ||
| tracing::debug!( | ||
| url = %self.discovery_url, | ||
| "Fetching models from discovery server" | ||
| ); | ||
|
|
||
| let client = reqwest::Client::builder() | ||
| .timeout(self.discovery_timeout) | ||
| .build() | ||
| .map_err(|e| { | ||
| ListModelsError::FetchError(format!("Failed to create HTTP client: {e}")) | ||
| })?; | ||
|
|
||
| let response = client | ||
| .get(&self.discovery_url) | ||
| .send() | ||
| .await | ||
| .map_err(|e| ListModelsError::FetchError(format!("HTTP request failed: {e}")))?; | ||
|
|
||
| let discovery_map: HashMap<String, DiscoveryEntry> = response | ||
| .json() | ||
| .await | ||
| .map_err(|e| ListModelsError::FetchError(format!("Failed to parse JSON: {e}")))?; | ||
|
|
||
| tracing::debug!(entries = discovery_map.len(), "Received discovery response"); | ||
|
|
||
| Ok(discovery_map) | ||
| } | ||
|
|
||
| /// Parse IP-based keys like "192.0.2.1:8000" | ||
| /// Returns None for keys that don't match IP:PORT format (e.g., "redpill:...") | ||
| fn parse_ip_port(key: &str) -> Option<(String, u16)> { | ||
| let parts: Vec<&str> = key.split(':').collect(); | ||
| if parts.len() != 2 { | ||
| return None; | ||
| } | ||
|
|
||
| let ip = parts[0]; | ||
| let port = parts[1].parse::<u16>().ok()?; | ||
|
|
||
| // Verify it's a valid IP address | ||
| if ip.parse::<IpAddr>().is_err() { | ||
| return None; | ||
| } | ||
|
|
||
| Some((ip.to_string(), port)) | ||
| } | ||
|
|
||
| /// Ensure models are discovered before using them | ||
| async fn ensure_models_discovered(&self) -> Result<(), CompletionError> { | ||
| let model_mapping = self.model_mapping.read().await; | ||
|
|
||
| // If mapping is empty, we need to discover models | ||
| if model_mapping.is_empty() { | ||
| drop(model_mapping); // Release read lock | ||
| tracing::warn!("Model mapping is empty, triggering model discovery"); | ||
| self.discover_models().await.map_err(|e| { | ||
| CompletionError::CompletionError(format!("Failed to discover models: {e}")) | ||
| })?; | ||
| } | ||
|
|
||
| Ok(()) | ||
| } | ||
|
|
||
| async fn discover_models(&self) -> Result<ModelsResponse, ListModelsError> { | ||
| tracing::info!("Starting model discovery from discovery endpoint"); | ||
|
|
||
| // Fetch from discovery server | ||
| let discovery_map = self.fetch_from_discovery().await?; | ||
|
|
||
| let mut model_mapping = self.model_mapping.write().await; | ||
| model_mapping.clear(); | ||
|
|
||
| // Group by model name | ||
| let mut model_to_endpoints: HashMap<String, Vec<(String, u16)>> = HashMap::new(); | ||
|
|
||
| for (key, entry) in discovery_map { | ||
| // Filter out non-IP keys | ||
| if let Some((ip, port)) = Self::parse_ip_port(&key) { | ||
| tracing::debug!( | ||
| key = %key, | ||
| model = %entry.model, | ||
| tags = ?entry.tags, | ||
| ip = %ip, | ||
| port = port, | ||
| "Adding IP-based provider" | ||
| ); | ||
|
|
||
| model_to_endpoints | ||
| .entry(entry.model) | ||
| .or_default() | ||
| .push((ip, port)); | ||
| } else { | ||
| tracing::debug!( | ||
| key = %key, | ||
| model = %entry.model, | ||
| tags = ?entry.tags, | ||
| "Skipping non-IP key" | ||
| ); | ||
| } | ||
| } | ||
|
|
||
| // Create providers for each endpoint | ||
| let mut all_models = Vec::new(); | ||
|
|
||
| for (model_name, endpoints) in model_to_endpoints { | ||
| tracing::info!( | ||
| model = %model_name, | ||
| providers_count = endpoints.len(), | ||
| "Discovered model with {} provider(s)", | ||
| endpoints.len() | ||
| ); | ||
|
|
||
| let mut providers_for_model = Vec::new(); | ||
|
|
||
| for (ip, port) in endpoints { | ||
| let url = format!("http://{ip}:{port}"); | ||
| tracing::debug!( | ||
| model = %model_name, | ||
| url = %url, | ||
| "Creating provider for model" | ||
| ); | ||
|
|
||
| let provider = Arc::new(VLlmProvider::new(VLlmConfig::new( | ||
| url.clone(), | ||
| self.api_key.clone(), | ||
| Some(self.inference_timeout_secs), | ||
| ))) as Arc<InferenceProviderTrait>; | ||
|
|
||
| match provider | ||
| .get_attestation_report(model_name.clone(), None, None, None) | ||
| .await | ||
| { | ||
| Ok(_) => { | ||
| providers_for_model.push(provider); | ||
| } | ||
| Err(e) => { | ||
| tracing::debug!( | ||
| model = %model_name, | ||
| url = %url, | ||
| error = %e, | ||
| "Provider failed to return attestation report, excluding from pool" | ||
| ); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| model_mapping.insert(model_name.clone(), providers_for_model); | ||
|
|
||
| all_models.push(inference_providers::models::ModelInfo { | ||
| id: model_name, | ||
| object: "model".to_string(), | ||
| created: 0, | ||
| owned_by: "discovered".to_string(), | ||
| }); | ||
| } | ||
|
|
||
| tracing::info!( | ||
| total_models = all_models.len(), | ||
| total_providers = model_mapping.values().map(|v| v.len()).sum::<usize>(), | ||
| model_ids = ?all_models.iter().map(|m| &m.id).collect::<Vec<_>>(), | ||
| "Model discovery from endpoint completed" | ||
| "register_providers called with {} providers - mainly for backwards compatibility", | ||
| providers.len() | ||
| ); | ||
|
|
||
| Ok(ModelsResponse { | ||
| object: "list".to_string(), | ||
| data: all_models, | ||
| }) | ||
| } | ||
|
|
||
| async fn get_providers_for_model( | ||
| &self, | ||
| model_id: &str, | ||
| ) -> Option<Vec<Arc<InferenceProviderTrait>>> { | ||
| let model_mapping = self.model_mapping.read().await; | ||
| model_mapping.get(model_id).cloned() | ||
| // In tests, we should create pools directly with mock providers instead of using this | ||
| } |
Copilot
AI
Dec 9, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The register_provider and register_providers methods are now no-ops that don't perform any action, yet they're still pub async functions. This is misleading to API consumers who might expect these methods to actually register providers.
Consider one of these approaches:
- Remove these methods entirely if they're no longer needed (breaking change but clearer)
- Mark them as deprecated with
#[deprecated]and a message explaining the new testing approach - If they're truly needed for backwards compatibility, document why they exist and when they can be removed
Example:
#[deprecated(note = "Use InferenceProviderPool::new_with_provider for testing instead")]
pub async fn register_provider(&self, _model_id: String, _provider: Arc<InferenceProviderTrait>) {
// No-op for backwards compatibility
}| // Use StreamingResultExt to peek at the first event | ||
| use inference_providers::StreamingResultExt; | ||
| let mut peekable = stream.peekable(); |
Copilot
AI
Dec 9, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The use inference_providers::StreamingResultExt; statement is placed inside the function body. While this works, it's unconventional and less discoverable. Consider either:
- Adding it back to the top-level imports (line 3)
- If it's intentionally placed here for scoping reasons, add a comment explaining why
Example for top-level import:
use inference_providers::{
models::{AttestationError, CompletionError, ListModelsError, ModelsResponse},
ChatCompletionParams, InferenceProvider, StreamingResult, StreamingResultExt, VLlmConfig, VLlmProvider,
};There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the final PR Bugbot will review for you during this billing cycle
Your free Bugbot reviews will reset on January 9
Details
Your team is on the Bugbot Free tier. On this plan, Bugbot will review limited PRs each billing cycle for each member of your team.
To receive Bugbot reviews on all of your PRs, visit the Cursor dashboard to activate Pro and start your 14-day free trial.
Code Review: PR #240 - Migrate to Single Router EndpointCritical Issues Found
|
| volumes: | ||
| postgres_data: | ||
| # volumes: | ||
| # postgres_data: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bug: Development docker-compose changes accidentally committed
The docker-compose.yml file has several services and volumes commented out: the api service, datadog-agent service, and postgres volume persistence. These changes appear to be for local development convenience (e.g., starting fresh without persisted data) rather than intentional changes for the inference router migration. This could break other developers' workflows who expect the full docker-compose setup to work, and removes postgres data persistence which may cause data loss between restarts in development environments.
|
Let's hold on merging this PR before vLLM router is fully verifiable with a reproducible build. |
This pull request refactors the configuration and initialization logic for inference providers, replacing the previous "model discovery" system with a new "inference router" approach. The changes simplify how inference provider pools are initialized and configured, update environment variable usage, and clean up related shutdown and test logic.
Configuration Refactor
ModelDiscoveryConfigstruct and all related references with a newInferenceRouterConfigstruct, updating field names, default values, and environment variable usage throughoutcrates/config/src/types.rs. [1] [2] [3] [4] [5] [6]Inference Provider Pool Initialization
init_inference_providersto use the inference router endpoint and removed model discovery refresh logic, simplifying pool creation and startup incrates/api/src/lib.rs.Test and Mock Logic Updates
crates/api/src/lib.rsandcrates/api/tests/common/mod.rs. [1] [2] [3] [4]Shutdown Procedure
crates/api/src/main.rs.Note
Replaces model discovery with a single inference router, simplifying the provider pool and config, updating shutdown/tests, and adding router attestation to reports.
router_providerwith direct forwarding (chat_completion, streaming,models,get_attestation_report). Removed discovery, load-balancing, fallback/retry, periodic refresh, and error sanitization logic. Streamlinedshutdownto clear chat/signature maps only.router_attestationtoAttestationReportand API response. Extracts per-model attestations fromall_attestationsand treats remaining provider payload as router attestation.init_inference_providersnow usesconfig.inference_router(router_url,api_key,inference_timeout). Removed discovery init/refresh calls.ModelDiscoveryConfigwithInferenceRouterConfigand updated env vars (INFERENCE_ROUTER_URL,INFERENCE_ROUTER_API_KEY,MODEL_INFERENCE_TIMEOUT). UpdatedApiConfig/DomainConfigwiring.InferenceRouterConfig. Simplified mock pool setup to work with the new router-based pool.Written by Cursor Bugbot for commit 8168edf. This will update automatically on new commits. Configure here.