From d48d3b5859fe3c75de67c327b2d1c8132eaab088 Mon Sep 17 00:00:00 2001 From: qin-ctx Date: Tue, 24 Mar 2026 17:39:37 +0800 Subject: [PATCH 1/3] refactor(model): unify config-driven retry across VLM and embedding Move retry behavior into shared model-call utilities and config defaults so VLM and embedding providers handle transient failures consistently. Co-Authored-By: Claude Opus 4.6 --- openviking/models/embedder/base.py | 13 +- .../models/embedder/gemini_embedders.py | 39 +++- openviking/models/embedder/jina_embedders.py | 22 +- .../models/embedder/litellm_embedders.py | 19 +- .../models/embedder/minimax_embedders.py | 4 +- .../models/embedder/openai_embedders.py | 24 +- .../models/embedder/vikingdb_embedders.py | 159 ++++++++----- .../models/embedder/volcengine_embedders.py | 50 ++--- .../models/embedder/voyage_embedders.py | 22 +- openviking/models/vlm/backends/litellm_vlm.py | 146 ++++++------ openviking/models/vlm/backends/openai_vlm.py | 208 +++++++++--------- .../models/vlm/backends/volcengine_vlm.py | 145 ++++++------ openviking/models/vlm/base.py | 13 +- openviking/models/vlm/llm.py | 8 +- openviking/utils/circuit_breaker.py | 46 +--- openviking/utils/model_retry.py | 150 +++++++++++++ .../utils/config/embedding_config.py | 22 +- openviking_cli/utils/config/vlm_config.py | 12 +- tests/models/test_vlm_strip_think_tags.py | 2 +- tests/server/conftest.py | 2 +- tests/unit/test_extra_headers_embedding.py | 17 ++ tests/unit/test_extra_headers_vlm.py | 4 +- tests/unit/test_model_retry.py | 38 ++++ tests/unit/test_stream_config_vlm.py | 33 ++- tests/unit/test_vlm_response_formats.py | 14 +- 25 files changed, 770 insertions(+), 442 deletions(-) create mode 100644 openviking/utils/model_retry.py create mode 100644 tests/unit/test_model_retry.py diff --git a/openviking/models/embedder/base.py b/openviking/models/embedder/base.py index a8c23a5f7..8582267a1 100644 --- a/openviking/models/embedder/base.py +++ b/openviking/models/embedder/base.py @@ -6,6 +6,8 @@ from dataclasses import dataclass from typing import Any, Callable, Dict, List, Optional, TypeVar +from openviking.utils.model_retry import retry_sync + T = TypeVar("T") @@ -74,6 +76,7 @@ def __init__(self, model_name: str, config: Optional[Dict[str, Any]] = None): """ self.model_name = model_name self.config = config or {} + self.max_retries = int(self.config.get("max_retries", 3)) @abstractmethod def embed(self, text: str, is_query: bool = False) -> EmbedResult: @@ -104,6 +107,14 @@ def close(self): """Release resources, subclasses can override as needed""" pass + def _run_with_retry(self, func: Callable[[], T], *, logger=None, operation_name: str) -> T: + return retry_sync( + func, + max_retries=self.max_retries, + logger=logger, + operation_name=operation_name, + ) + @property def is_dense(self) -> bool: """Check if result contains dense vector""" @@ -255,7 +266,7 @@ def embed_batch(self, texts: List[str], is_query: bool = False) -> List[EmbedRes return [ EmbedResult(dense_vector=d.dense_vector, sparse_vector=s.sparse_vector) - for d, s in zip(dense_results, sparse_results) + for d, s in zip(dense_results, sparse_results, strict=True) ] def get_dimension(self) -> int: diff --git a/openviking/models/embedder/gemini_embedders.py b/openviking/models/embedder/gemini_embedders.py index 878fa80f5..0210c5c59 100644 --- a/openviking/models/embedder/gemini_embedders.py +++ b/openviking/models/embedder/gemini_embedders.py @@ -151,9 +151,9 @@ def __init__( api_key=api_key, http_options=HttpOptions( retry_options=HttpRetryOptions( - attempts=3, - initial_delay=1.0, - max_delay=30.0, + attempts=max(self.max_retries + 1, 1), + initial_delay=0.5, + max_delay=8.0, exp_base=2.0, ) ), @@ -207,8 +207,9 @@ def embed( task_type = self.query_param elif not is_query and self.document_param: task_type = self.document_param + # SDK accepts plain str; converts to REST Parts format internally. - try: + def _call() -> EmbedResult: result = self.client.models.embed_content( model=self.model_name, contents=text, @@ -216,6 +217,15 @@ def embed( ) vector = truncate_and_normalize(list(result.embeddings[0].values), self._dimension) return EmbedResult(dense_vector=vector) + + try: + if _HTTP_RETRY_AVAILABLE: + return _call() + return self._run_with_retry( + _call, + logger=logger, + operation_name="Gemini embedding", + ) except (APIError, ClientError) as e: _raise_api_error(e, self.model_name) @@ -233,7 +243,7 @@ def embed_batch( if titles is not None: return [ self.embed(text, is_query=is_query, task_type=task_type, title=title) - for text, title in zip(texts, titles) + for text, title in zip(texts, titles, strict=True) ] # Resolve effective task_type from is_query when no explicit override if task_type is None: @@ -253,14 +263,29 @@ def embed_batch( continue non_empty_texts = [batch[j] for j in non_empty_indices] - try: + + def _call_batch( + non_empty_texts: List[str] = non_empty_texts, + config: types.EmbedContentConfig = config, + ) -> Any: response = self.client.models.embed_content( model=self.model_name, contents=non_empty_texts, config=config, ) + return response + + try: + if _HTTP_RETRY_AVAILABLE: + response = _call_batch() + else: + response = self._run_with_retry( + _call_batch, + logger=logger, + operation_name="Gemini batch embedding", + ) batch_results = [None] * len(batch) - for j, emb in zip(non_empty_indices, response.embeddings): + for j, emb in zip(non_empty_indices, response.embeddings, strict=True): batch_results[j] = EmbedResult( dense_vector=truncate_and_normalize(list(emb.values), self._dimension) ) diff --git a/openviking/models/embedder/jina_embedders.py b/openviking/models/embedder/jina_embedders.py index 49e00b2b3..62d1d120a 100644 --- a/openviking/models/embedder/jina_embedders.py +++ b/openviking/models/embedder/jina_embedders.py @@ -2,6 +2,7 @@ # SPDX-License-Identifier: Apache-2.0 """Jina AI Embedder Implementation""" +import logging from typing import Any, Dict, List, Optional import openai @@ -11,6 +12,8 @@ EmbedResult, ) +logger = logging.getLogger(__name__) + # Default dimensions for Jina embedding models JINA_MODEL_DIMENSIONS = { "jina-embeddings-v5-text-small": 1024, # 677M params, max seq 32768 @@ -153,7 +156,8 @@ def embed(self, text: str, is_query: bool = False) -> EmbedResult: Raises: RuntimeError: When API call fails """ - try: + + def _call() -> EmbedResult: kwargs: Dict[str, Any] = {"input": text, "model": self.model_name} if self.dimension: kwargs["dimensions"] = self.dimension @@ -166,6 +170,13 @@ def embed(self, text: str, is_query: bool = False) -> EmbedResult: vector = response.data[0].embedding return EmbedResult(dense_vector=vector) + + try: + return self._run_with_retry( + _call, + logger=logger, + operation_name="Jina embedding", + ) except openai.APIError as e: raise RuntimeError(f"Jina API error: {e.message}") from e except Exception as e: @@ -187,7 +198,7 @@ def embed_batch(self, texts: List[str], is_query: bool = False) -> List[EmbedRes if not texts: return [] - try: + def _call() -> List[EmbedResult]: kwargs: Dict[str, Any] = {"input": texts, "model": self.model_name} if self.dimension: kwargs["dimensions"] = self.dimension @@ -199,6 +210,13 @@ def embed_batch(self, texts: List[str], is_query: bool = False) -> List[EmbedRes response = self.client.embeddings.create(**kwargs) return [EmbedResult(dense_vector=item.embedding) for item in response.data] + + try: + return self._run_with_retry( + _call, + logger=logger, + operation_name="Jina batch embedding", + ) except openai.APIError as e: raise RuntimeError(f"Jina API error: {e.message}") from e except Exception as e: diff --git a/openviking/models/embedder/litellm_embedders.py b/openviking/models/embedder/litellm_embedders.py index 4f10f99c0..19291be79 100644 --- a/openviking/models/embedder/litellm_embedders.py +++ b/openviking/models/embedder/litellm_embedders.py @@ -154,13 +154,21 @@ def embed(self, text: str, is_query: bool = False) -> EmbedResult: Raises: RuntimeError: When embedding call fails """ - try: + + def _call() -> EmbedResult: kwargs = self._build_kwargs(is_query=is_query) kwargs["input"] = [text] response = litellm.embedding(**kwargs) self._update_telemetry_token_usage(response) vector = response.data[0]["embedding"] return EmbedResult(dense_vector=vector) + + try: + return self._run_with_retry( + _call, + logger=logger, + operation_name="LiteLLM embedding", + ) except Exception as e: raise RuntimeError(f"LiteLLM embedding failed: {e}") from e @@ -180,12 +188,19 @@ def embed_batch(self, texts: List[str], is_query: bool = False) -> List[EmbedRes if not texts: return [] - try: + def _call() -> List[EmbedResult]: kwargs = self._build_kwargs(is_query=is_query) kwargs["input"] = texts response = litellm.embedding(**kwargs) self._update_telemetry_token_usage(response) return [EmbedResult(dense_vector=item["embedding"]) for item in response.data] + + try: + return self._run_with_retry( + _call, + logger=logger, + operation_name="LiteLLM batch embedding", + ) except Exception as e: raise RuntimeError(f"LiteLLM batch embedding failed: {e}") from e diff --git a/openviking/models/embedder/minimax_embedders.py b/openviking/models/embedder/minimax_embedders.py index faec9b913..b2a734370 100644 --- a/openviking/models/embedder/minimax_embedders.py +++ b/openviking/models/embedder/minimax_embedders.py @@ -90,8 +90,8 @@ def _create_session(self) -> requests.Session: """Create a requests session with retry logic""" session = requests.Session() retry_strategy = Retry( - total=6, - backoff_factor=1, # 1s, 2s, 4s, 8s, 16s, 32s + total=self.max_retries, + backoff_factor=0.5, status_forcelist=[429, 500, 502, 503, 504], allowed_methods=["POST"], ) diff --git a/openviking/models/embedder/openai_embedders.py b/openviking/models/embedder/openai_embedders.py index c57ec9ff3..471f84584 100644 --- a/openviking/models/embedder/openai_embedders.py +++ b/openviking/models/embedder/openai_embedders.py @@ -2,19 +2,22 @@ # SPDX-License-Identifier: Apache-2.0 """OpenAI Embedder Implementation""" +import logging from typing import Any, Dict, List, Optional import openai -from openviking.models.vlm.registry import DEFAULT_AZURE_API_VERSION from openviking.models.embedder.base import ( DenseEmbedderBase, EmbedResult, HybridEmbedderBase, SparseEmbedderBase, ) +from openviking.models.vlm.registry import DEFAULT_AZURE_API_VERSION from openviking.telemetry import get_current_telemetry +logger = logging.getLogger(__name__) + class OpenAIDenseEmbedder(DenseEmbedderBase): """OpenAI-Compatible Dense Embedder Implementation @@ -235,7 +238,8 @@ def embed(self, text: str, is_query: bool = False) -> EmbedResult: Raises: RuntimeError: When API call fails """ - try: + + def _call() -> EmbedResult: kwargs: Dict[str, Any] = {"input": text, "model": self.model_name} extra_body = self._build_extra_body(is_query=is_query) @@ -247,6 +251,13 @@ def embed(self, text: str, is_query: bool = False) -> EmbedResult: vector = response.data[0].embedding return EmbedResult(dense_vector=vector) + + try: + return self._run_with_retry( + _call, + logger=logger, + operation_name="OpenAI embedding", + ) except openai.APIError as e: raise RuntimeError(f"OpenAI API error: {e.message}") from e except Exception as e: @@ -268,7 +279,7 @@ def embed_batch(self, texts: List[str], is_query: bool = False) -> List[EmbedRes if not texts: return [] - try: + def _call() -> List[EmbedResult]: kwargs: Dict[str, Any] = {"input": texts, "model": self.model_name} if self.dimension: kwargs["dimensions"] = self.dimension @@ -281,6 +292,13 @@ def embed_batch(self, texts: List[str], is_query: bool = False) -> List[EmbedRes self._update_telemetry_token_usage(response) return [EmbedResult(dense_vector=item.embedding) for item in response.data] + + try: + return self._run_with_retry( + _call, + logger=logger, + operation_name="OpenAI batch embedding", + ) except openai.APIError as e: raise RuntimeError(f"OpenAI API error: {e.message}") from e except Exception as e: diff --git a/openviking/models/embedder/vikingdb_embedders.py b/openviking/models/embedder/vikingdb_embedders.py index d28aac492..cabbd69b8 100644 --- a/openviking/models/embedder/vikingdb_embedders.py +++ b/openviking/models/embedder/vikingdb_embedders.py @@ -124,29 +124,44 @@ def __init__( self.dense_model = {"name": model_name, "version": model_version, "dim": dimension} def embed(self, text: str, is_query: bool = False) -> EmbedResult: - results = self._call_api([text], dense_model=self.dense_model) - if not results: - return EmbedResult(dense_vector=[]) - - item = results[0] - dense_vector = [] - if "dense_embedding" in item: - dense_vector = self._truncate_and_normalize(item["dense_embedding"], self.dimension) - - return EmbedResult(dense_vector=dense_vector) + def _call() -> EmbedResult: + results = self._call_api([text], dense_model=self.dense_model) + if not results: + return EmbedResult(dense_vector=[]) + + item = results[0] + dense_vector = [] + if "dense_embedding" in item: + dense_vector = self._truncate_and_normalize(item["dense_embedding"], self.dimension) + + return EmbedResult(dense_vector=dense_vector) + + return self._run_with_retry( + _call, + logger=logger, + operation_name="VikingDB embedding", + ) def embed_batch(self, texts: List[str], is_query: bool = False) -> List[EmbedResult]: if not texts: return [] - raw_results = self._call_api(texts, dense_model=self.dense_model) - return [ - EmbedResult( - dense_vector=self._truncate_and_normalize( - item.get("dense_embedding", []), self.dimension + + def _call() -> List[EmbedResult]: + raw_results = self._call_api(texts, dense_model=self.dense_model) + return [ + EmbedResult( + dense_vector=self._truncate_and_normalize( + item.get("dense_embedding", []), self.dimension + ) ) - ) - for item in raw_results - ] + for item in raw_results + ] + + return self._run_with_retry( + _call, + logger=logger, + operation_name="VikingDB batch embedding", + ) def get_dimension(self) -> int: return self.dimension if self.dimension else 2048 @@ -174,27 +189,42 @@ def __init__( } def embed(self, text: str, is_query: bool = False) -> EmbedResult: - results = self._call_api([text], sparse_model=self.sparse_model) - if not results: - return EmbedResult(sparse_vector={}) + def _call() -> EmbedResult: + results = self._call_api([text], sparse_model=self.sparse_model) + if not results: + return EmbedResult(sparse_vector={}) - item = results[0] - sparse_vector = {} - if "sparse" in item: - sparse_vector = item["sparse"] + item = results[0] + sparse_vector = {} + if "sparse" in item: + sparse_vector = item["sparse"] + + return EmbedResult(sparse_vector=sparse_vector) - return EmbedResult(sparse_vector=sparse_vector) + return self._run_with_retry( + _call, + logger=logger, + operation_name="VikingDB sparse embedding", + ) def embed_batch(self, texts: List[str], is_query: bool = False) -> List[EmbedResult]: if not texts: return [] - raw_results = self._call_api(texts, sparse_model=self.sparse_model) - return [ - EmbedResult( - sparse_vector=self._process_sparse_embedding(item.get("sparse_embedding", {})) - ) - for item in raw_results - ] + + def _call() -> List[EmbedResult]: + raw_results = self._call_api(texts, sparse_model=self.sparse_model) + return [ + EmbedResult( + sparse_vector=self._process_sparse_embedding(item.get("sparse_embedding", {})) + ) + for item in raw_results + ] + + return self._run_with_retry( + _call, + logger=logger, + operation_name="VikingDB sparse batch embedding", + ) class VikingDBHybridEmbedder(HybridEmbedderBase, VikingDBClientMixin): @@ -224,37 +254,54 @@ def __init__( } def embed(self, text: str, is_query: bool = False) -> EmbedResult: - results = self._call_api( - [text], dense_model=self.dense_model, sparse_model=self.sparse_model - ) - if not results: - return EmbedResult(dense_vector=[], sparse_vector={}) + def _call() -> EmbedResult: + results = self._call_api( + [text], dense_model=self.dense_model, sparse_model=self.sparse_model + ) + if not results: + return EmbedResult(dense_vector=[], sparse_vector={}) - item = results[0] - dense_vector = [] - sparse_vector = {} + item = results[0] + dense_vector = [] + sparse_vector = {} - if "dense" in item: - dense_vector = self._truncate_and_normalize(item["dense"], self.dimension) - if "sparse" in item: - sparse_vector = item["sparse"] + if "dense" in item: + dense_vector = self._truncate_and_normalize(item["dense"], self.dimension) + if "sparse" in item: + sparse_vector = item["sparse"] - return EmbedResult(dense_vector=dense_vector, sparse_vector=sparse_vector) + return EmbedResult(dense_vector=dense_vector, sparse_vector=sparse_vector) + + return self._run_with_retry( + _call, + logger=logger, + operation_name="VikingDB hybrid embedding", + ) def embed_batch(self, texts: List[str], is_query: bool = False) -> List[EmbedResult]: if not texts: return [] - raw_results = self._call_api( - texts, dense_model=self.dense_model, sparse_model=self.sparse_model + + def _call() -> List[EmbedResult]: + raw_results = self._call_api( + texts, dense_model=self.dense_model, sparse_model=self.sparse_model + ) + results = [] + for item in raw_results: + dense_vector = [] + sparse_vector = {} + if "dense" in item: + dense_vector = self._truncate_and_normalize(item["dense"], self.dimension) + if "sparse" in item: + sparse_vector = item["sparse"] + results.append(EmbedResult(dense_vector=dense_vector, sparse_vector=sparse_vector)) + return results + + return self._run_with_retry( + _call, + logger=logger, + operation_name="VikingDB hybrid batch embedding", ) - results = [] - for item in raw_results: - if "dense" in item: - dense_vector = self._truncate_and_normalize(item["dense"], self.dimension) - if "sparse" in item: - sparse_vector = item["sparse"] - results.append(EmbedResult(dense_vector=dense_vector, sparse_vector=sparse_vector)) - return results def get_dimension(self) -> int: return self.dimension if self.dimension else 2048 diff --git a/openviking/models/embedder/volcengine_embedders.py b/openviking/models/embedder/volcengine_embedders.py index c2384ec88..0d2593d1e 100644 --- a/openviking/models/embedder/volcengine_embedders.py +++ b/openviking/models/embedder/volcengine_embedders.py @@ -11,29 +11,12 @@ EmbedResult, HybridEmbedderBase, SparseEmbedderBase, - exponential_backoff_retry, truncate_and_normalize, ) from openviking.telemetry import get_current_telemetry from openviking_cli.utils.logger import default_logger as logger -def is_429_error(exception: Exception) -> bool: - """ - 判断异常是否为 429 限流错误 - - Args: - exception: 要检查的异常 - - Returns: - 如果是 429 错误则返回 True,否则返回 False - """ - exception_str = str(exception) - return ( - "429" in exception_str or "TooManyRequests" in exception_str or "RateLimit" in exception_str - ) - - def process_sparse_embedding(sparse_data: Any) -> Dict[str, float]: """Process sparse embedding data from SDK response""" if not sparse_data: @@ -177,14 +160,10 @@ def _embed_call(): return EmbedResult(dense_vector=vector) try: - return exponential_backoff_retry( + return self._run_with_retry( _embed_call, - max_wait=10.0, - base_delay=0.5, - max_delay=2.0, - jitter=True, - is_retryable=is_429_error, logger=logger, + operation_name="Volcengine embedding", ) except Exception as e: raise RuntimeError(f"Volcengine embedding failed: {str(e)}") from e @@ -205,7 +184,7 @@ def embed_batch(self, texts: List[str], is_query: bool = False) -> List[EmbedRes if not texts: return [] - try: + def _call() -> List[EmbedResult]: if self.input_type == "multimodal": multimodal_inputs = [{"type": "text", "text": text} for text in texts] response = self.client.multimodal_embeddings.create( @@ -222,6 +201,13 @@ def embed_batch(self, texts: List[str], is_query: bool = False) -> List[EmbedRes EmbedResult(dense_vector=truncate_and_normalize(item.embedding, self.dimension)) for item in data ] + + try: + return self._run_with_retry( + _call, + logger=logger, + operation_name="Volcengine batch embedding", + ) except Exception as e: logger.error( f"Volcengine batch embedding failed, texts length: {len(texts)}, input_type: {self.input_type}, model_name: {self.model_name}" @@ -295,14 +281,10 @@ def _embed_call(): return EmbedResult(sparse_vector=process_sparse_embedding(sparse_vector)) try: - return exponential_backoff_retry( + return self._run_with_retry( _embed_call, - max_wait=10.0, - base_delay=0.5, - max_delay=2.0, - jitter=True, - is_retryable=is_429_error, logger=logger, + operation_name="Volcengine sparse embedding", ) except Exception as e: raise RuntimeError(f"Volcengine sparse embedding failed: {str(e)}") from e @@ -400,14 +382,10 @@ def _embed_call(): ) try: - return exponential_backoff_retry( + return self._run_with_retry( _embed_call, - max_wait=10.0, - base_delay=0.5, - max_delay=2.0, - jitter=True, - is_retryable=is_429_error, logger=logger, + operation_name="Volcengine hybrid embedding", ) except Exception as e: raise RuntimeError(f"Volcengine hybrid embedding failed: {str(e)}") from e diff --git a/openviking/models/embedder/voyage_embedders.py b/openviking/models/embedder/voyage_embedders.py index e866f0a18..a8b7874f5 100644 --- a/openviking/models/embedder/voyage_embedders.py +++ b/openviking/models/embedder/voyage_embedders.py @@ -2,12 +2,15 @@ # SPDX-License-Identifier: Apache-2.0 """Voyage AI dense embedder implementation.""" +import logging from typing import Any, Dict, List, Optional import openai from openviking.models.embedder.base import DenseEmbedderBase, EmbedResult +logger = logging.getLogger(__name__) + VOYAGE_MODEL_DIMENSIONS = { "voyage-3": 1024, "voyage-3-large": 1024, @@ -83,7 +86,8 @@ def __init__( def embed(self, text: str, is_query: bool = False) -> EmbedResult: """Perform dense embedding on text.""" - try: + + def _call() -> EmbedResult: kwargs: Dict[str, Any] = {"input": text, "model": self.model_name} if self.dimension is not None: kwargs["extra_body"] = {"output_dimension": self.dimension} @@ -91,6 +95,13 @@ def embed(self, text: str, is_query: bool = False) -> EmbedResult: response = self.client.embeddings.create(**kwargs) vector = response.data[0].embedding return EmbedResult(dense_vector=vector) + + try: + return self._run_with_retry( + _call, + logger=logger, + operation_name="Voyage embedding", + ) except openai.APIError as e: raise RuntimeError(f"Voyage API error: {e.message}") from e except Exception as e: @@ -101,13 +112,20 @@ def embed_batch(self, texts: List[str], is_query: bool = False) -> List[EmbedRes if not texts: return [] - try: + def _call() -> List[EmbedResult]: kwargs: Dict[str, Any] = {"input": texts, "model": self.model_name} if self.dimension is not None: kwargs["extra_body"] = {"output_dimension": self.dimension} response = self.client.embeddings.create(**kwargs) return [EmbedResult(dense_vector=item.embedding) for item in response.data] + + try: + return self._run_with_retry( + _call, + logger=logger, + operation_name="Voyage batch embedding", + ) except openai.APIError as e: raise RuntimeError(f"Voyage API error: {e.message}") from e except Exception as e: diff --git a/openviking/models/vlm/backends/litellm_vlm.py b/openviking/models/vlm/backends/litellm_vlm.py index 90230987b..6a67a3a1a 100644 --- a/openviking/models/vlm/backends/litellm_vlm.py +++ b/openviking/models/vlm/backends/litellm_vlm.py @@ -7,7 +7,6 @@ os.environ["LITELLM_LOCAL_MODEL_COST_MAP"] = "True" -import asyncio import base64 import time from pathlib import Path @@ -16,6 +15,8 @@ import litellm from litellm import acompletion, completion +from openviking.utils.model_retry import retry_async, retry_sync + from ..base import VLMBase logger = logging.getLogger(__name__) @@ -221,44 +222,57 @@ def _build_kwargs(self, model: str, messages: list) -> dict[str, Any]: return kwargs - def get_completion(self, prompt: str, thinking: bool = False) -> str: - """Get text completion synchronously.""" + def _build_text_kwargs(self, prompt: str) -> dict[str, Any]: model = self._resolve_model(self.model or "gpt-4o-mini") messages = [{"role": "user", "content": prompt}] - kwargs = self._build_kwargs(model, messages) + return self._build_kwargs(model, messages) - t0 = time.perf_counter() - response = completion(**kwargs) - elapsed = time.perf_counter() - t0 - self._update_token_usage_from_response(response, duration_seconds=elapsed) - return self._clean_response(self._extract_content_from_response(response)) + def _build_vision_kwargs( + self, + prompt: str, + images: List[Union[str, Path, bytes]], + ) -> dict[str, Any]: + model = self._resolve_model(self.model or "gpt-4o-mini") + content = [self._prepare_image(img) for img in images] + content.append({"type": "text", "text": prompt}) + messages = [{"role": "user", "content": content}] + return self._build_kwargs(model, messages) - async def get_completion_async( - self, prompt: str, thinking: bool = False, max_retries: int = 0 - ) -> str: + def get_completion(self, prompt: str, thinking: bool = False) -> str: + """Get text completion synchronously.""" + kwargs = self._build_text_kwargs(prompt) + + def _call() -> str: + t0 = time.perf_counter() + response = completion(**kwargs) + elapsed = time.perf_counter() - t0 + self._update_token_usage_from_response(response, duration_seconds=elapsed) + return self._clean_response(self._extract_content_from_response(response)) + + return retry_sync( + _call, + max_retries=self.max_retries, + logger=logger, + operation_name="LiteLLM VLM completion", + ) + + async def get_completion_async(self, prompt: str, thinking: bool = False) -> str: """Get text completion asynchronously.""" - model = self._resolve_model(self.model or "gpt-4o-mini") - messages = [{"role": "user", "content": prompt}] - kwargs = self._build_kwargs(model, messages) - - last_error = None - for attempt in range(max_retries + 1): - try: - t0 = time.perf_counter() - response = await acompletion(**kwargs) - elapsed = time.perf_counter() - t0 - self._update_token_usage_from_response( - response, duration_seconds=elapsed, - ) - return self._clean_response(self._extract_content_from_response(response)) - except Exception as e: - last_error = e - if attempt < max_retries: - await asyncio.sleep(2**attempt) - - if last_error: - raise last_error - raise RuntimeError("Unknown error in async completion") + kwargs = self._build_text_kwargs(prompt) + + async def _call() -> str: + t0 = time.perf_counter() + response = await acompletion(**kwargs) + elapsed = time.perf_counter() - t0 + self._update_token_usage_from_response(response, duration_seconds=elapsed) + return self._clean_response(self._extract_content_from_response(response)) + + return await retry_async( + _call, + max_retries=self.max_retries, + logger=logger, + operation_name="LiteLLM VLM async completion", + ) def get_vision_completion( self, @@ -267,21 +281,21 @@ def get_vision_completion( thinking: bool = False, ) -> str: """Get vision completion synchronously.""" - model = self._resolve_model(self.model or "gpt-4o-mini") - - content = [] - for img in images: - content.append(self._prepare_image(img)) - content.append({"type": "text", "text": prompt}) - - messages = [{"role": "user", "content": content}] - kwargs = self._build_kwargs(model, messages) - - t0 = time.perf_counter() - response = completion(**kwargs) - elapsed = time.perf_counter() - t0 - self._update_token_usage_from_response(response, duration_seconds=elapsed) - return self._clean_response(self._extract_content_from_response(response)) + kwargs = self._build_vision_kwargs(prompt, images) + + def _call() -> str: + t0 = time.perf_counter() + response = completion(**kwargs) + elapsed = time.perf_counter() - t0 + self._update_token_usage_from_response(response, duration_seconds=elapsed) + return self._clean_response(self._extract_content_from_response(response)) + + return retry_sync( + _call, + max_retries=self.max_retries, + logger=logger, + operation_name="LiteLLM VLM vision completion", + ) async def get_vision_completion_async( self, @@ -290,24 +304,26 @@ async def get_vision_completion_async( thinking: bool = False, ) -> str: """Get vision completion asynchronously.""" - model = self._resolve_model(self.model or "gpt-4o-mini") - - content = [] - for img in images: - content.append(self._prepare_image(img)) - content.append({"type": "text", "text": prompt}) - - messages = [{"role": "user", "content": content}] - kwargs = self._build_kwargs(model, messages) - - t0 = time.perf_counter() - response = await acompletion(**kwargs) - elapsed = time.perf_counter() - t0 - self._update_token_usage_from_response(response, duration_seconds=elapsed) - return self._clean_response(self._extract_content_from_response(response)) + kwargs = self._build_vision_kwargs(prompt, images) + + async def _call() -> str: + t0 = time.perf_counter() + response = await acompletion(**kwargs) + elapsed = time.perf_counter() - t0 + self._update_token_usage_from_response(response, duration_seconds=elapsed) + return self._clean_response(self._extract_content_from_response(response)) + + return await retry_async( + _call, + max_retries=self.max_retries, + logger=logger, + operation_name="LiteLLM VLM async vision completion", + ) def _update_token_usage_from_response( - self, response, duration_seconds: float = 0.0, + self, + response, + duration_seconds: float = 0.0, ) -> None: """Update token usage from response.""" if hasattr(response, "usage") and response.usage: diff --git a/openviking/models/vlm/backends/openai_vlm.py b/openviking/models/vlm/backends/openai_vlm.py index 16d6948fa..f09c15fa1 100644 --- a/openviking/models/vlm/backends/openai_vlm.py +++ b/openviking/models/vlm/backends/openai_vlm.py @@ -2,13 +2,16 @@ # SPDX-License-Identifier: Apache-2.0 """OpenAI VLM backend implementation""" -import asyncio import base64 import logging import time from pathlib import Path from typing import Any, Dict, List, Union +import openai + +from openviking.utils.model_retry import retry_async, retry_sync + from ..base import VLMBase from ..registry import DEFAULT_AZURE_API_VERSION @@ -50,13 +53,12 @@ def __init__(self, config: Dict[str, Any]): def get_client(self): """Get sync client""" if self._sync_client is None: - try: - import openai - except ImportError: - raise ImportError("Please install openai: pip install openai") kwargs = _build_openai_client_kwargs( - self.provider, self.api_key, self.api_base, - self.api_version, self.extra_headers, + self.provider, + self.api_key, + self.api_base, + self.api_version, + self.extra_headers, ) if self.provider == "azure": self._sync_client = openai.AzureOpenAI(**kwargs) @@ -67,13 +69,12 @@ def get_client(self): def get_async_client(self): """Get async client""" if self._async_client is None: - try: - import openai - except ImportError: - raise ImportError("Please install openai: pip install openai") kwargs = _build_openai_client_kwargs( - self.provider, self.api_key, self.api_base, - self.api_version, self.extra_headers, + self.provider, + self.api_key, + self.api_base, + self.api_version, + self.extra_headers, ) if self.provider == "azure": self._async_client = openai.AsyncAzureOpenAI(**kwargs) @@ -82,7 +83,9 @@ def get_async_client(self): return self._async_client def _update_token_usage_from_response( - self, response, duration_seconds: float = 0.0, + self, + response, + duration_seconds: float = 0.0, ): if hasattr(response, "usage") and response.usage: prompt_tokens = response.usage.prompt_tokens @@ -183,9 +186,7 @@ async def _process_streaming_response_async(self, response): return "".join(content_parts) - def get_completion(self, prompt: str, thinking: bool = False) -> str: - """Get text completion""" - client = self.get_client() + def _build_text_kwargs(self, prompt: str) -> Dict[str, Any]: kwargs = { "model": self.model or "gpt-4o-mini", "messages": [{"role": "user", "content": prompt}], @@ -194,58 +195,77 @@ def get_completion(self, prompt: str, thinking: bool = False) -> str: } if self.max_tokens is not None: kwargs["max_tokens"] = self.max_tokens + return kwargs + + def _build_vision_kwargs( + self, + prompt: str, + images: List[Union[str, Path, bytes]], + ) -> Dict[str, Any]: + content = [self._prepare_image(img) for img in images] + content.append({"type": "text", "text": prompt}) - t0 = time.perf_counter() - response = client.chat.completions.create(**kwargs) - elapsed = time.perf_counter() - t0 + kwargs = { + "model": self.model or "gpt-4o-mini", + "messages": [{"role": "user", "content": content}], + "temperature": self.temperature, + "stream": self.stream, + } + if self.max_tokens is not None: + kwargs["max_tokens"] = self.max_tokens + return kwargs + def _extract_completion_content(self, response, elapsed: float) -> str: if self.stream: content = self._process_streaming_response(response) else: self._update_token_usage_from_response(response, duration_seconds=elapsed) content = self._extract_content_from_response(response) + return self._clean_response(content) + async def _extract_completion_content_async(self, response, elapsed: float) -> str: + if self.stream: + content = await self._process_streaming_response_async(response) + else: + self._update_token_usage_from_response(response, duration_seconds=elapsed) + content = self._extract_content_from_response(response) return self._clean_response(content) - async def get_completion_async( - self, prompt: str, thinking: bool = False, max_retries: int = 0 - ) -> str: + def get_completion(self, prompt: str, thinking: bool = False) -> str: + """Get text completion""" + client = self.get_client() + kwargs = self._build_text_kwargs(prompt) + + def _call() -> str: + t0 = time.perf_counter() + response = client.chat.completions.create(**kwargs) + elapsed = time.perf_counter() - t0 + return self._extract_completion_content(response, elapsed) + + return retry_sync( + _call, + max_retries=self.max_retries, + logger=logger, + operation_name="OpenAI VLM completion", + ) + + async def get_completion_async(self, prompt: str, thinking: bool = False) -> str: """Get text completion asynchronously""" client = self.get_async_client() - kwargs = { - "model": self.model or "gpt-4o-mini", - "messages": [{"role": "user", "content": prompt}], - "temperature": self.temperature, - "stream": self.stream, - } - if self.max_tokens is not None: - kwargs["max_tokens"] = self.max_tokens + kwargs = self._build_text_kwargs(prompt) - last_error = None - for attempt in range(max_retries + 1): - try: - t0 = time.perf_counter() - response = await client.chat.completions.create(**kwargs) - elapsed = time.perf_counter() - t0 - - if self.stream: - content = await self._process_streaming_response_async(response) - else: - self._update_token_usage_from_response( - response, duration_seconds=elapsed, - ) - content = self._extract_content_from_response(response) - - return self._clean_response(content) - except Exception as e: - last_error = e - if attempt < max_retries: - await asyncio.sleep(2**attempt) - - if last_error: - raise last_error - else: - raise RuntimeError("Unknown error in async completion") + async def _call() -> str: + t0 = time.perf_counter() + response = await client.chat.completions.create(**kwargs) + elapsed = time.perf_counter() - t0 + return await self._extract_completion_content_async(response, elapsed) + + return await retry_async( + _call, + max_retries=self.max_retries, + logger=logger, + operation_name="OpenAI VLM async completion", + ) def _detect_image_format(self, data: bytes) -> str: """Detect image format from magic bytes. @@ -307,32 +327,20 @@ def get_vision_completion( ) -> str: """Get vision completion""" client = self.get_client() + kwargs = self._build_vision_kwargs(prompt, images) - content = [] - for img in images: - content.append(self._prepare_image(img)) - content.append({"type": "text", "text": prompt}) - - kwargs = { - "model": self.model or "gpt-4o-mini", - "messages": [{"role": "user", "content": content}], - "temperature": self.temperature, - "stream": self.stream, - } - if self.max_tokens is not None: - kwargs["max_tokens"] = self.max_tokens + def _call() -> str: + t0 = time.perf_counter() + response = client.chat.completions.create(**kwargs) + elapsed = time.perf_counter() - t0 + return self._extract_completion_content(response, elapsed) - t0 = time.perf_counter() - response = client.chat.completions.create(**kwargs) - elapsed = time.perf_counter() - t0 - - if self.stream: - content = self._process_streaming_response(response) - else: - self._update_token_usage_from_response(response, duration_seconds=elapsed) - content = self._extract_content_from_response(response) - - return self._clean_response(content) + return retry_sync( + _call, + max_retries=self.max_retries, + logger=logger, + operation_name="OpenAI VLM vision completion", + ) async def get_vision_completion_async( self, @@ -342,29 +350,17 @@ async def get_vision_completion_async( ) -> str: """Get vision completion asynchronously""" client = self.get_async_client() - - content = [] - for img in images: - content.append(self._prepare_image(img)) - content.append({"type": "text", "text": prompt}) - - kwargs = { - "model": self.model or "gpt-4o-mini", - "messages": [{"role": "user", "content": content}], - "temperature": self.temperature, - "stream": self.stream, - } - if self.max_tokens is not None: - kwargs["max_tokens"] = self.max_tokens - - t0 = time.perf_counter() - response = await client.chat.completions.create(**kwargs) - elapsed = time.perf_counter() - t0 - - if self.stream: - content = await self._process_streaming_response_async(response) - else: - self._update_token_usage_from_response(response, duration_seconds=elapsed) - content = self._extract_content_from_response(response) - - return self._clean_response(content) + kwargs = self._build_vision_kwargs(prompt, images) + + async def _call() -> str: + t0 = time.perf_counter() + response = await client.chat.completions.create(**kwargs) + elapsed = time.perf_counter() - t0 + return await self._extract_completion_content_async(response, elapsed) + + return await retry_async( + _call, + max_retries=self.max_retries, + logger=logger, + operation_name="OpenAI VLM async vision completion", + ) diff --git a/openviking/models/vlm/backends/volcengine_vlm.py b/openviking/models/vlm/backends/volcengine_vlm.py index 0c9c558ae..4993e9b87 100644 --- a/openviking/models/vlm/backends/volcengine_vlm.py +++ b/openviking/models/vlm/backends/volcengine_vlm.py @@ -2,13 +2,14 @@ # SPDX-License-Identifier: Apache-2.0 """VolcEngine VLM backend implementation""" -import asyncio import base64 import logging import time from pathlib import Path from typing import Any, Dict, List, Union +from openviking.utils.model_retry import retry_async, retry_sync + from .openai_vlm import OpenAIVLM logger = logging.getLogger(__name__) @@ -60,9 +61,7 @@ def get_async_client(self): ) return self._async_client - def get_completion(self, prompt: str, thinking: bool = False) -> str: - """Get text completion""" - client = self.get_client() + def _build_text_kwargs(self, prompt: str, thinking: bool = False) -> Dict[str, Any]: kwargs = { "model": self.model or "doubao-seed-2-0-pro-260215", "messages": [{"role": "user", "content": prompt}], @@ -71,46 +70,64 @@ def get_completion(self, prompt: str, thinking: bool = False) -> str: } if self.max_tokens is not None: kwargs["max_tokens"] = self.max_tokens + return kwargs - t0 = time.perf_counter() - response = client.chat.completions.create(**kwargs) - elapsed = time.perf_counter() - t0 - self._update_token_usage_from_response(response, duration_seconds=elapsed) - return self._clean_response(self._extract_content_from_response(response)) + def _build_vision_kwargs( + self, + prompt: str, + images: List[Union[str, Path, bytes]], + thinking: bool = False, + ) -> Dict[str, Any]: + content = [self._prepare_image(img) for img in images] + content.append({"type": "text", "text": prompt}) - async def get_completion_async( - self, prompt: str, thinking: bool = False, max_retries: int = 0 - ) -> str: - """Get text completion asynchronously""" - client = self.get_async_client() kwargs = { "model": self.model or "doubao-seed-2-0-pro-260215", - "messages": [{"role": "user", "content": prompt}], + "messages": [{"role": "user", "content": content}], "temperature": self.temperature, "thinking": {"type": "disabled" if not thinking else "enabled"}, } if self.max_tokens is not None: kwargs["max_tokens"] = self.max_tokens + return kwargs - last_error = None - for attempt in range(max_retries + 1): - try: - t0 = time.perf_counter() - response = await client.chat.completions.create(**kwargs) - elapsed = time.perf_counter() - t0 - self._update_token_usage_from_response( - response, duration_seconds=elapsed, - ) - return self._clean_response(self._extract_content_from_response(response)) - except Exception as e: - last_error = e - if attempt < max_retries: - await asyncio.sleep(2**attempt) + def get_completion(self, prompt: str, thinking: bool = False) -> str: + """Get text completion""" + client = self.get_client() + kwargs = self._build_text_kwargs(prompt, thinking) - if last_error: - raise last_error - else: - raise RuntimeError("Unknown error in async completion") + def _call() -> str: + t0 = time.perf_counter() + response = client.chat.completions.create(**kwargs) + elapsed = time.perf_counter() - t0 + self._update_token_usage_from_response(response, duration_seconds=elapsed) + return self._clean_response(self._extract_content_from_response(response)) + + return retry_sync( + _call, + max_retries=self.max_retries, + logger=logger, + operation_name="VolcEngine VLM completion", + ) + + async def get_completion_async(self, prompt: str, thinking: bool = False) -> str: + """Get text completion asynchronously""" + client = self.get_async_client() + kwargs = self._build_text_kwargs(prompt, thinking) + + async def _call() -> str: + t0 = time.perf_counter() + response = await client.chat.completions.create(**kwargs) + elapsed = time.perf_counter() - t0 + self._update_token_usage_from_response(response, duration_seconds=elapsed) + return self._clean_response(self._extract_content_from_response(response)) + + return await retry_async( + _call, + max_retries=self.max_retries, + logger=logger, + operation_name="VolcEngine VLM async completion", + ) def _detect_image_format(self, data: bytes) -> str: """Detect image format from magic bytes. @@ -234,26 +251,21 @@ def get_vision_completion( ) -> str: """Get vision completion""" client = self.get_client() + kwargs = self._build_vision_kwargs(prompt, images, thinking) - content = [] - for img in images: - content.append(self._prepare_image(img)) - content.append({"type": "text", "text": prompt}) + def _call() -> str: + t0 = time.perf_counter() + response = client.chat.completions.create(**kwargs) + elapsed = time.perf_counter() - t0 + self._update_token_usage_from_response(response, duration_seconds=elapsed) + return self._clean_response(self._extract_content_from_response(response)) - kwargs = { - "model": self.model or "doubao-seed-2-0-pro-260215", - "messages": [{"role": "user", "content": content}], - "temperature": self.temperature, - "thinking": {"type": "disabled" if not thinking else "enabled"}, - } - if self.max_tokens is not None: - kwargs["max_tokens"] = self.max_tokens - - t0 = time.perf_counter() - response = client.chat.completions.create(**kwargs) - elapsed = time.perf_counter() - t0 - self._update_token_usage_from_response(response, duration_seconds=elapsed) - return self._clean_response(self._extract_content_from_response(response)) + return retry_sync( + _call, + max_retries=self.max_retries, + logger=logger, + operation_name="VolcEngine VLM vision completion", + ) async def get_vision_completion_async( self, @@ -263,23 +275,18 @@ async def get_vision_completion_async( ) -> str: """Get vision completion asynchronously""" client = self.get_async_client() + kwargs = self._build_vision_kwargs(prompt, images, thinking) - content = [] - for img in images: - content.append(self._prepare_image(img)) - content.append({"type": "text", "text": prompt}) - - kwargs = { - "model": self.model or "doubao-seed-2-0-pro-260215", - "messages": [{"role": "user", "content": content}], - "temperature": self.temperature, - "thinking": {"type": "disabled" if not thinking else "enabled"}, - } - if self.max_tokens is not None: - kwargs["max_tokens"] = self.max_tokens + async def _call() -> str: + t0 = time.perf_counter() + response = await client.chat.completions.create(**kwargs) + elapsed = time.perf_counter() - t0 + self._update_token_usage_from_response(response, duration_seconds=elapsed) + return self._clean_response(self._extract_content_from_response(response)) - t0 = time.perf_counter() - response = await client.chat.completions.create(**kwargs) - elapsed = time.perf_counter() - t0 - self._update_token_usage_from_response(response, duration_seconds=elapsed) - return self._clean_response(self._extract_content_from_response(response)) + return await retry_async( + _call, + max_retries=self.max_retries, + logger=logger, + operation_name="VolcEngine VLM async vision completion", + ) diff --git a/openviking/models/vlm/base.py b/openviking/models/vlm/base.py index fd8f939c1..1b8833b70 100644 --- a/openviking/models/vlm/base.py +++ b/openviking/models/vlm/base.py @@ -24,7 +24,7 @@ def __init__(self, config: Dict[str, Any]): self.api_key = config.get("api_key") self.api_base = config.get("api_base") self.temperature = config.get("temperature", 0.0) - self.max_retries = config.get("max_retries", 2) + self.max_retries = config.get("max_retries", 3) self.max_tokens = config.get("max_tokens") self.extra_headers = config.get("extra_headers") self.stream = config.get("stream", False) @@ -38,9 +38,7 @@ def get_completion(self, prompt: str, thinking: bool = False) -> str: pass @abstractmethod - async def get_completion_async( - self, prompt: str, thinking: bool = False, max_retries: int = 0 - ) -> str: + async def get_completion_async(self, prompt: str, thinking: bool = False) -> str: """Get text completion asynchronously""" pass @@ -74,7 +72,11 @@ def is_available(self) -> bool: # Token usage tracking methods def update_token_usage( - self, model_name: str, provider: str, prompt_tokens: int, completion_tokens: int, + self, + model_name: str, + provider: str, + prompt_tokens: int, + completion_tokens: int, duration_seconds: float = 0.0, ) -> None: """Update token usage @@ -142,6 +144,7 @@ def _extract_content_from_response(self, response) -> str: return response return response.choices[0].message.content or "" + class VLMFactory: """VLM factory class, creates corresponding VLM instance based on config""" diff --git a/openviking/models/vlm/llm.py b/openviking/models/vlm/llm.py index 6c8b9c569..fe29b5d61 100644 --- a/openviking/models/vlm/llm.py +++ b/openviking/models/vlm/llm.py @@ -182,13 +182,12 @@ async def complete_json_async( prompt: str, schema: Optional[Dict[str, Any]] = None, thinking: bool = False, - max_retries: int = 0, ) -> Optional[Dict[str, Any]]: """Async version of complete_json.""" if schema: prompt = f"{prompt}\n\n{get_json_schema_prompt(schema)}" - response = await self._get_vlm().get_completion_async(prompt, thinking, max_retries) + response = await self._get_vlm().get_completion_async(prompt, thinking) return parse_json_from_response(response) def complete_model( @@ -214,13 +213,10 @@ async def complete_model_async( prompt: str, model_class: Type[T], thinking: bool = False, - max_retries: int = 0, ) -> Optional[T]: """Async version of complete_model.""" schema = model_class.model_json_schema() - response = await self.complete_json_async( - prompt, schema=schema, thinking=thinking, max_retries=max_retries - ) + response = await self.complete_json_async(prompt, schema=schema, thinking=thinking) if response is None: return None diff --git a/openviking/utils/circuit_breaker.py b/openviking/utils/circuit_breaker.py index 6f9fbdc26..9d75fd66a 100644 --- a/openviking/utils/circuit_breaker.py +++ b/openviking/utils/circuit_breaker.py @@ -7,55 +7,11 @@ import threading import time +from openviking.utils.model_retry import classify_api_error from openviking_cli.utils.logger import get_logger logger = get_logger(__name__) -# --- Error classification --- - -_PERMANENT_PATTERNS = ("403", "401", "Forbidden", "Unauthorized", "AccountOverdue") -_TRANSIENT_PATTERNS = ( - "429", - "500", - "502", - "503", - "504", - "TooManyRequests", - "RateLimit", - "timeout", - "Timeout", - "ConnectionError", - "Connection refused", - "Connection reset", -) - - -def classify_api_error(error: Exception) -> str: - """Classify an API error as permanent, transient, or unknown. - - Checks both str(error) and str(error.__cause__) for known patterns. - - Returns: - "permanent" — 403/401, never retry. - "transient" — 429/5xx/timeout, safe to retry. - "unknown" — unrecognized, treated as transient by callers. - """ - texts = [str(error)] - if error.__cause__ is not None: - texts.append(str(error.__cause__)) - - for text in texts: - for pattern in _PERMANENT_PATTERNS: - if pattern in text: - return "permanent" - - for text in texts: - for pattern in _TRANSIENT_PATTERNS: - if pattern in text: - return "transient" - - return "unknown" - # --- Circuit breaker --- diff --git a/openviking/utils/model_retry.py b/openviking/utils/model_retry.py new file mode 100644 index 000000000..2e7cfa957 --- /dev/null +++ b/openviking/utils/model_retry.py @@ -0,0 +1,150 @@ +from __future__ import annotations + +import asyncio +import random +import time +from typing import Awaitable, Callable, TypeVar + +T = TypeVar("T") + +PERMANENT_API_ERROR_PATTERNS = ( + "400", + "401", + "403", + "Forbidden", + "Unauthorized", + "AccountOverdue", +) + +TRANSIENT_API_ERROR_PATTERNS = ( + "429", + "500", + "502", + "503", + "504", + "TooManyRequests", + "RateLimit", + "RequestBurstTooFast", + "timeout", + "Timeout", + "ConnectionError", + "Connection refused", + "Connection reset", +) + + +def classify_api_error(error: Exception) -> str: + """Classify an API error as permanent, transient, or unknown.""" + texts = [str(error)] + if error.__cause__ is not None: + texts.append(str(error.__cause__)) + + for text in texts: + for pattern in PERMANENT_API_ERROR_PATTERNS: + if pattern in text: + return "permanent" + + for text in texts: + for pattern in TRANSIENT_API_ERROR_PATTERNS: + if pattern in text: + return "transient" + + return "unknown" + + +def is_retryable_api_error(error: Exception) -> bool: + """Return True if the error should be retried.""" + return classify_api_error(error) == "transient" + + +def _compute_delay( + attempt: int, + *, + base_delay: float, + max_delay: float, + jitter: bool, +) -> float: + delay = min(base_delay * (2**attempt), max_delay) + if jitter: + delay += random.uniform(0.0, min(base_delay, delay)) + return delay + + +def retry_sync( + func: Callable[[], T], + *, + max_retries: int, + base_delay: float = 0.5, + max_delay: float = 8.0, + jitter: bool = True, + is_retryable: Callable[[Exception], bool] = is_retryable_api_error, + logger=None, + operation_name: str = "operation", +) -> T: + """Retry a sync function on known transient errors.""" + attempt = 0 + + while True: + try: + return func() + except Exception as e: + if max_retries <= 0 or attempt >= max_retries or not is_retryable(e): + raise + + delay = _compute_delay( + attempt, + base_delay=base_delay, + max_delay=max_delay, + jitter=jitter, + ) + if logger: + logger.warning( + "%s failed with retryable error (retry %d/%d): %s; retrying in %.2fs", + operation_name, + attempt + 1, + max_retries, + e, + delay, + ) + time.sleep(delay) + attempt += 1 + + +async def retry_async( + func: Callable[[], Awaitable[T]], + *, + max_retries: int, + base_delay: float = 0.5, + max_delay: float = 8.0, + jitter: bool = True, + is_retryable: Callable[[Exception], bool] = is_retryable_api_error, + logger=None, + operation_name: str = "operation", +) -> T: + """Retry an async function on known transient errors.""" + attempt = 0 + + while True: + try: + return await func() + except Exception as e: + if max_retries <= 0 or attempt >= max_retries or not is_retryable(e): + raise + + delay = _compute_delay( + attempt, + base_delay=base_delay, + max_delay=max_delay, + jitter=jitter, + ) + if logger: + logger.warning( + "%s failed with retryable error (retry %d/%d): %s; retrying in %.2fs", + operation_name, + attempt + 1, + max_retries, + e, + delay, + ) + await asyncio.sleep(delay) + attempt += 1 diff --git a/openviking_cli/utils/config/embedding_config.py b/openviking_cli/utils/config/embedding_config.py index a122ed356..d5b7a836e 100644 --- a/openviking_cli/utils/config/embedding_config.py +++ b/openviking_cli/utils/config/embedding_config.py @@ -253,6 +253,10 @@ class EmbeddingConfig(BaseModel): max_concurrent: int = Field( default=10, description="Maximum number of concurrent embedding requests" ) + max_retries: int = Field( + default=3, + description="Maximum retry attempts for embedding provider calls (0 disables retry)", + ) text_source: str = Field( default="summary_first", description="Text source for file vectorization: summary_first|summary_only|content_only", @@ -313,9 +317,7 @@ def _create_embedder( ) if provider == "litellm" and LiteLLMDenseEmbedder is None: - raise ValueError( - "LiteLLM is not installed. Install it with: pip install litellm" - ) + raise ValueError("LiteLLM is not installed. Install it with: pip install litellm") # Factory registry: (provider, type) -> (embedder_class, param_builder) factory_registry = { @@ -329,6 +331,7 @@ def _create_embedder( "api_version": cfg.api_version, "dimension": cfg.dimension, "provider": "openai", + "config": {"max_retries": self.max_retries}, **({"query_param": cfg.query_param} if cfg.query_param else {}), **({"document_param": cfg.document_param} if cfg.document_param else {}), **({"extra_headers": cfg.extra_headers} if cfg.extra_headers else {}), @@ -343,6 +346,7 @@ def _create_embedder( "api_version": cfg.api_version, "dimension": cfg.dimension, "provider": "azure", + "config": {"max_retries": self.max_retries}, **({"query_param": cfg.query_param} if cfg.query_param else {}), **({"document_param": cfg.document_param} if cfg.document_param else {}), **({"extra_headers": cfg.extra_headers} if cfg.extra_headers else {}), @@ -356,6 +360,7 @@ def _create_embedder( "api_base": cfg.api_base, "dimension": cfg.dimension, "input_type": cfg.input, + "config": {"max_retries": self.max_retries}, }, ), ("volcengine", "sparse"): ( @@ -364,6 +369,7 @@ def _create_embedder( "model_name": cfg.model, "api_key": cfg.api_key, "api_base": cfg.api_base, + "config": {"max_retries": self.max_retries}, }, ), ("volcengine", "hybrid"): ( @@ -374,6 +380,7 @@ def _create_embedder( "api_base": cfg.api_base, "dimension": cfg.dimension, "input_type": cfg.input, + "config": {"max_retries": self.max_retries}, }, ), ("vikingdb", "dense"): ( @@ -387,6 +394,7 @@ def _create_embedder( "host": cfg.host, "dimension": cfg.dimension, "input_type": cfg.input, + "config": {"max_retries": self.max_retries}, }, ), ("vikingdb", "sparse"): ( @@ -398,6 +406,7 @@ def _create_embedder( "sk": cfg.sk, "region": cfg.region, "host": cfg.host, + "config": {"max_retries": self.max_retries}, }, ), ("vikingdb", "hybrid"): ( @@ -411,6 +420,7 @@ def _create_embedder( "host": cfg.host, "dimension": cfg.dimension, "input_type": cfg.input, + "config": {"max_retries": self.max_retries}, }, ), ("jina", "dense"): ( @@ -420,6 +430,7 @@ def _create_embedder( "api_key": cfg.api_key, "api_base": cfg.api_base, "dimension": cfg.dimension, + "config": {"max_retries": self.max_retries}, **({"query_param": cfg.query_param} if cfg.query_param else {}), **({"document_param": cfg.document_param} if cfg.document_param else {}), }, @@ -430,6 +441,7 @@ def _create_embedder( "model_name": cfg.model, "api_key": cfg.api_key, "dimension": cfg.dimension, + "config": {"max_retries": self.max_retries}, **({"query_param": cfg.query_param} if cfg.query_param else {}), **({"document_param": cfg.document_param} if cfg.document_param else {}), }, @@ -443,6 +455,7 @@ def _create_embedder( or "no-key", # Ollama ignores the key, but client requires non-empty "api_base": cfg.api_base or "http://localhost:11434/v1", "dimension": cfg.dimension, + "config": {"max_retries": self.max_retries}, }, ), ("voyage", "dense"): ( @@ -452,6 +465,7 @@ def _create_embedder( "api_key": cfg.api_key, "api_base": cfg.api_base, "dimension": cfg.dimension, + "config": {"max_retries": self.max_retries}, }, ), ("minimax", "dense"): ( @@ -461,6 +475,7 @@ def _create_embedder( "api_key": cfg.api_key, "api_base": cfg.api_base, "dimension": cfg.dimension, + "config": {"max_retries": self.max_retries}, **({"query_param": cfg.query_param} if cfg.query_param else {}), **({"document_param": cfg.document_param} if cfg.document_param else {}), **({"extra_headers": cfg.extra_headers} if cfg.extra_headers else {}), @@ -473,6 +488,7 @@ def _create_embedder( "api_key": cfg.api_key, "api_base": cfg.api_base, "dimension": cfg.dimension, + "config": {"max_retries": self.max_retries}, **({"query_param": cfg.query_param} if cfg.query_param else {}), **({"document_param": cfg.document_param} if cfg.document_param else {}), **({"extra_headers": cfg.extra_headers} if cfg.extra_headers else {}), diff --git a/openviking_cli/utils/config/vlm_config.py b/openviking_cli/utils/config/vlm_config.py index a8c7da078..20b9f9002 100644 --- a/openviking_cli/utils/config/vlm_config.py +++ b/openviking_cli/utils/config/vlm_config.py @@ -12,7 +12,7 @@ class VLMConfig(BaseModel): api_key: Optional[str] = Field(default=None, description="API key") api_base: Optional[str] = Field(default=None, description="API base URL") temperature: float = Field(default=0.0, description="Generation temperature") - max_retries: int = Field(default=2, description="Maximum retry attempts") + max_retries: int = Field(default=3, description="Maximum retry attempts") provider: Optional[str] = Field(default=None, description="Provider type") backend: Optional[str] = Field( @@ -177,13 +177,9 @@ def get_completion(self, prompt: str, thinking: bool = False) -> str: """Get LLM completion.""" return self.get_vlm_instance().get_completion(prompt, thinking) - async def get_completion_async( - self, prompt: str, thinking: bool = False, max_retries: int | None = None - ) -> str: - """Get LLM completion asynchronously. Uses self.max_retries if not specified.""" - if max_retries is None: - max_retries = self.max_retries - return await self.get_vlm_instance().get_completion_async(prompt, thinking, max_retries) + async def get_completion_async(self, prompt: str, thinking: bool = False) -> str: + """Get LLM completion asynchronously.""" + return await self.get_vlm_instance().get_completion_async(prompt, thinking) def is_available(self) -> bool: """Check if LLM is configured.""" diff --git a/tests/models/test_vlm_strip_think_tags.py b/tests/models/test_vlm_strip_think_tags.py index fd47fa1c4..da6114851 100644 --- a/tests/models/test_vlm_strip_think_tags.py +++ b/tests/models/test_vlm_strip_think_tags.py @@ -18,7 +18,7 @@ class _Stub(VLMBase): def get_completion(self, prompt, thinking=False): return "" - async def get_completion_async(self, prompt, thinking=False, max_retries=0): + async def get_completion_async(self, prompt, thinking=False): return "" def get_vision_completion(self, prompt, images, thinking=False): diff --git a/tests/server/conftest.py b/tests/server/conftest.py index 3bc0e40fd..3e9ca9d8a 100644 --- a/tests/server/conftest.py +++ b/tests/server/conftest.py @@ -72,7 +72,7 @@ def get_dimension(self) -> int: def _install_fake_vlm(monkeypatch): """Use a fake VLM so server tests never hit external LLM APIs.""" - async def _fake_get_completion(self, prompt, thinking=False, max_retries=0): + async def _fake_get_completion(self, prompt, thinking=False): return "# Test Summary\n\nFake summary for testing.\n\n## Details\nTest content." async def _fake_get_vision_completion(self, prompt, images, thinking=False): diff --git a/tests/unit/test_extra_headers_embedding.py b/tests/unit/test_extra_headers_embedding.py index c766debb0..7c5174a65 100644 --- a/tests/unit/test_extra_headers_embedding.py +++ b/tests/unit/test_extra_headers_embedding.py @@ -98,6 +98,23 @@ def test_factory_omits_extra_headers_when_none(self, mock_openai_class): call_kwargs = mock_openai_class.call_args[1] assert "default_headers" not in call_kwargs + @patch("openai.OpenAI") + def test_factory_injects_embedding_max_retries(self, mock_openai_class): + """Factory should inject top-level embedding.max_retries into embedder config.""" + mock_openai_class.return_value = _make_mock_client() + + cfg = EmbeddingModelConfig( + provider="openai", + model="text-embedding-3-small", + api_key="sk-test", + dimension=8, + ) + embedder = EmbeddingConfig(dense=cfg, max_retries=0)._create_embedder( + "openai", "dense", cfg + ) + + assert embedder.max_retries == 0 + class TestEmbeddingModelConfigExtraHeaders: """Test that EmbeddingModelConfig accepts and stores the extra_headers field.""" diff --git a/tests/unit/test_extra_headers_vlm.py b/tests/unit/test_extra_headers_vlm.py index 2a5c8b283..da7e1777e 100644 --- a/tests/unit/test_extra_headers_vlm.py +++ b/tests/unit/test_extra_headers_vlm.py @@ -108,7 +108,7 @@ class StubVLM(OpenAIVLM): def get_completion(self, prompt, thinking=False): return "" - async def get_completion_async(self, prompt, thinking=False, max_retries=0): + async def get_completion_async(self, prompt, thinking=False): return "" def get_vision_completion(self, prompt, images, thinking=False): @@ -134,7 +134,7 @@ class StubVLM(OpenAIVLM): def get_completion(self, prompt, thinking=False): return "" - async def get_completion_async(self, prompt, thinking=False, max_retries=0): + async def get_completion_async(self, prompt, thinking=False): return "" def get_vision_completion(self, prompt, images, thinking=False): diff --git a/tests/unit/test_model_retry.py b/tests/unit/test_model_retry.py new file mode 100644 index 000000000..7f0360805 --- /dev/null +++ b/tests/unit/test_model_retry.py @@ -0,0 +1,38 @@ +# Copyright (c) 2026 Beijing Volcano Engine Technology Co., Ltd. +# SPDX-License-Identifier: Apache-2.0 +"""Tests for shared model retry helpers.""" + +import pytest + +from openviking.utils.model_retry import classify_api_error, retry_async, retry_sync + + +def test_classify_api_error_recognizes_request_burst_too_fast(): + assert classify_api_error(RuntimeError("RequestBurstTooFast")) == "transient" + + +def test_retry_sync_retries_transient_error_until_success(): + attempts = {"count": 0} + + def _call(): + attempts["count"] += 1 + if attempts["count"] < 3: + raise RuntimeError("429 TooManyRequests") + return "ok" + + assert retry_sync(_call, max_retries=3) == "ok" + assert attempts["count"] == 3 + + +@pytest.mark.asyncio +async def test_retry_async_does_not_retry_unknown_error(): + attempts = {"count": 0} + + async def _call(): + attempts["count"] += 1 + raise RuntimeError("some unexpected validation failure") + + with pytest.raises(RuntimeError): + await retry_async(_call, max_retries=3) + + assert attempts["count"] == 1 diff --git a/tests/unit/test_stream_config_vlm.py b/tests/unit/test_stream_config_vlm.py index faf5b6e25..5b4f078d2 100644 --- a/tests/unit/test_stream_config_vlm.py +++ b/tests/unit/test_stream_config_vlm.py @@ -2,7 +2,7 @@ # SPDX-License-Identifier: Apache-2.0 """Tests for VLM stream configuration support.""" -from unittest.mock import MagicMock, patch +from unittest.mock import AsyncMock, MagicMock, patch import pytest @@ -131,10 +131,7 @@ async def async_generator(): for chunk in chunks: yield chunk - async def mock_create(*args, **kwargs): - return async_generator() - - mock_client.chat.completions.create = mock_create + mock_client.chat.completions.create = AsyncMock(return_value=async_generator()) mock_async_openai_class.return_value = mock_client vlm = OpenAIVLM( @@ -220,10 +217,7 @@ async def async_generator(): for chunk in chunks: yield chunk - async def mock_create(*args, **kwargs): - return async_generator() - - mock_client.chat.completions.create = mock_create + mock_client.chat.completions.create = AsyncMock(return_value=async_generator()) mock_async_openai_class.return_value = mock_client vlm = OpenAIVLM( @@ -253,7 +247,7 @@ class StubVLM(OpenAIVLM): def get_completion(self, prompt, thinking=False): return "" - async def get_completion_async(self, prompt, thinking=False, max_retries=0): + async def get_completion_async(self, prompt, thinking=False): return "" def get_vision_completion(self, prompt, images, thinking=False): @@ -277,7 +271,7 @@ class StubVLM(OpenAIVLM): def get_completion(self, prompt, thinking=False): return "" - async def get_completion_async(self, prompt, thinking=False, max_retries=0): + async def get_completion_async(self, prompt, thinking=False): return "" def get_vision_completion(self, prompt, images, thinking=False): @@ -389,6 +383,23 @@ def test_vlm_config_stream_in_providers_takes_precedence(self): result = config._build_vlm_config_dict() assert result["stream"] is True + def test_vlm_config_max_retries_defaults_to_three(self): + """VLMConfig should default max_retries to 3.""" + from openviking_cli.utils.config.vlm_config import VLMConfig + + config = VLMConfig( + model="gpt-4o", + provider="openai", + providers={ + "openai": { + "api_key": "sk-test", + } + }, + ) + + assert config.max_retries == 3 + assert config._build_vlm_config_dict()["max_retries"] == 3 + class TestStreamingResponseProcessing: """Test streaming response processing logic.""" diff --git a/tests/unit/test_vlm_response_formats.py b/tests/unit/test_vlm_response_formats.py index 14be204c5..0c0d0c0af 100644 --- a/tests/unit/test_vlm_response_formats.py +++ b/tests/unit/test_vlm_response_formats.py @@ -18,9 +18,7 @@ class ConcreteVLM(VLMBase): def get_completion(self, prompt: str, thinking: bool = False) -> str: pass - async def get_completion_async( - self, prompt: str, thinking: bool = False, max_retries: int = 0 - ) -> str: + async def get_completion_async(self, prompt: str, thinking: bool = False) -> str: pass def get_vision_completion( @@ -50,14 +48,12 @@ def vlm(self): ) def test_extract_content_from_str_response(self, vlm): - assert vlm._extract_content_from_response("plain string response") == "plain string response" + assert ( + vlm._extract_content_from_response("plain string response") == "plain string response" + ) def test_extract_content_from_standard_openai_response(self, vlm): response = SimpleNamespace( - choices=[ - SimpleNamespace( - message=SimpleNamespace(content="standard response content") - ) - ] + choices=[SimpleNamespace(message=SimpleNamespace(content="standard response content"))] ) assert vlm._extract_content_from_response(response) == "standard response content" From ed966e1b881c024df151aeccc6c19169183e88c9 Mon Sep 17 00:00:00 2001 From: qin-ctx Date: Tue, 24 Mar 2026 22:46:30 +0800 Subject: [PATCH 2/3] fix --- tests/integration/conftest.py | 97 +++++++++++++++++++++++++++++++++++ 1 file changed, 97 insertions(+) diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index 6ca667509..7c1446040 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -7,6 +7,7 @@ AsyncHTTPClient integration tests can run without a manually started server process. """ +import copy import math import os import shutil @@ -17,12 +18,15 @@ import httpx import pytest +import pytest_asyncio import uvicorn +from openviking import AsyncOpenViking from openviking.server.app import create_app from openviking.server.config import ServerConfig from openviking.service.core import OpenVikingService from openviking_cli.session.user_id import UserIdentifier +from openviking_cli.utils.config.open_viking_config import OpenVikingConfigSingleton PROJECT_ROOT = Path(__file__).parent.parent.parent TEST_TMP_DIR = PROJECT_ROOT / "test_data" / "tmp_integration" @@ -52,6 +56,20 @@ ] +def _local_engine_available() -> bool: + try: + from openviking.storage.vectordb.engine import ENGINE_VARIANT + except Exception: + return False + return ENGINE_VARIANT != "unavailable" + + +requires_engine = pytest.mark.skipif( + not _local_engine_available(), + reason="local vectordb engine unavailable", +) + + def l2_norm(vec: list[float]) -> float: """Compute L2 norm of a vector.""" return math.sqrt(sum(v * v for v in vec)) @@ -69,6 +87,85 @@ def gemini_embedder(): return GeminiDenseEmbedder("gemini-embedding-2-preview", api_key=GOOGLE_API_KEY, dimension=768) +def gemini_config_dict( + model: str, + dim: int, + query_param: str | None = None, + doc_param: str | None = None, +) -> dict: + """Build a minimal embedded-mode config for Gemini-backed integration tests.""" + return { + "storage": { + "workspace": str(TEST_TMP_DIR / "gemini"), + "agfs": {"backend": "local"}, + "vectordb": {"name": "test", "backend": "local", "project": "default"}, + }, + "embedding": { + "dense": { + "provider": "gemini", + "api_key": GOOGLE_API_KEY, + "model": model, + "dimension": dim, + **({"query_param": query_param} if query_param else {}), + **({"document_param": doc_param} if doc_param else {}), + } + }, + } + + +async def teardown_ov_client() -> None: + """Reset singleton client/config state used by embedded integration tests.""" + await AsyncOpenViking.reset() + OpenVikingConfigSingleton.reset_instance() + + +async def make_ov_client(config_dict: dict, data_path: str) -> AsyncOpenViking: + """Create an AsyncOpenViking client from an explicit config dict.""" + if not GOOGLE_API_KEY: + pytest.skip("GOOGLE_API_KEY not set") + try: + from openviking.models.embedder.gemini_embedders import GeminiDenseEmbedder # noqa: F401 + except (ImportError, ModuleNotFoundError, AttributeError): + pytest.skip("google-genai not installed") + + await teardown_ov_client() + + workspace = Path(data_path) + shutil.rmtree(workspace, ignore_errors=True) + workspace.mkdir(parents=True, exist_ok=True) + + effective_config = copy.deepcopy(config_dict) + storage = effective_config.setdefault("storage", {}) + storage["workspace"] = str(workspace) + storage.setdefault("agfs", {"backend": "local"}) + storage.setdefault("vectordb", {"name": "test", "backend": "local", "project": "default"}) + + OpenVikingConfigSingleton.initialize(config_dict=effective_config) + + client = AsyncOpenViking(path=str(workspace)) + await client.initialize() + return client + + +def sample_markdown(base_dir: Path, slug: str, content: str) -> Path: + """Write a markdown file for an integration test case.""" + path = base_dir / f"{slug}.md" + path.write_text(content, encoding="utf-8") + return path + + +@pytest_asyncio.fixture(scope="function") +async def gemini_ov_client(tmp_path): + """Provide a Gemini-backed OpenViking client and its model metadata.""" + model = "gemini-embedding-2-preview" + dim = 768 + client = await make_ov_client(gemini_config_dict(model, dim), str(tmp_path / "ov_gemini")) + try: + yield client, model, dim + finally: + await teardown_ov_client() + + @pytest.fixture(scope="session") def temp_dir(): """Create temp directory for the whole test session.""" From 478bd8840d1e2c6a532279d506c9c4a145727338 Mon Sep 17 00:00:00 2001 From: qin-ctx Date: Tue, 31 Mar 2026 15:44:01 +0800 Subject: [PATCH 3/3] docs(config): document model retry settings --- docs/en/guides/01-configuration.md | 16 +++++++++++++++- docs/zh/guides/01-configuration.md | 16 +++++++++++++++- 2 files changed, 30 insertions(+), 2 deletions(-) diff --git a/docs/en/guides/01-configuration.md b/docs/en/guides/01-configuration.md index 136ec1277..4a71fc4d7 100644 --- a/docs/en/guides/01-configuration.md +++ b/docs/en/guides/01-configuration.md @@ -99,6 +99,7 @@ Embedding model configuration for vector search, supporting dense, sparse, and h { "embedding": { "max_concurrent": 10, + "max_retries": 3, "dense": { "provider": "volcengine", "api_key": "your-api-key", @@ -115,6 +116,7 @@ Embedding model configuration for vector search, supporting dense, sparse, and h | Parameter | Type | Description | |-----------|------|-------------| | `max_concurrent` | int | Maximum concurrent embedding requests (`embedding.max_concurrent`, default: `10`) | +| `max_retries` | int | Maximum retry attempts for transient embedding provider errors (`embedding.max_retries`, default: `3`; `0` disables retry) | | `provider` | str | `"volcengine"`, `"openai"`, `"vikingdb"`, `"jina"`, `"voyage"`, or `"gemini"` | | `api_key` | str | API key | | `model` | str | Model name | @@ -122,6 +124,8 @@ Embedding model configuration for vector search, supporting dense, sparse, and h | `input` | str | Input type: `"text"` or `"multimodal"` | | `batch_size` | int | Batch size for embedding requests | +`embedding.max_retries` only applies to transient errors such as `429`, `5xx`, timeouts, and connection failures. Permanent errors such as `400`, `401`, `403`, and `AccountOverdue` are not retried automatically. The backoff strategy is exponential backoff with jitter, starting at `0.5s` and capped at `8s`. + **Available Models** | Model | Dimension | Input Type | Notes | @@ -355,7 +359,8 @@ Vision Language Model for semantic extraction (L0/L1 generation). "vlm": { "api_key": "your-api-key", "model": "doubao-seed-2-0-pro-260215", - "api_base": "https://ark.cn-beijing.volces.com/api/v3" + "api_base": "https://ark.cn-beijing.volces.com/api/v3", + "max_retries": 3 } } ``` @@ -369,9 +374,12 @@ Vision Language Model for semantic extraction (L0/L1 generation). | `api_base` | str | API endpoint (optional) | | `thinking` | bool | Enable thinking mode for VolcEngine models (default: `false`) | | `max_concurrent` | int | Maximum concurrent semantic LLM calls (default: `100`) | +| `max_retries` | int | Maximum retry attempts for transient VLM provider errors (default: `3`; `0` disables retry) | | `extra_headers` | object | Custom HTTP headers (for OpenAI-compatible providers, optional) | | `stream` | bool | Enable streaming mode (for OpenAI-compatible providers, default: `false`) | +`vlm.max_retries` only applies to transient errors such as `429`, `5xx`, timeouts, and connection failures. Permanent authentication, authorization, and billing errors are not retried automatically. The backoff strategy is exponential backoff with jitter, starting at `0.5s` and capped at `8s`. + **Available Models** | Model | Notes | @@ -956,6 +964,7 @@ For detailed encryption explanations, see [Data Encryption](../concepts/10-encry { "embedding": { "max_concurrent": 10, + "max_retries": 3, "dense": { "provider": "volcengine", "api_key": "string", @@ -971,6 +980,7 @@ For detailed encryption explanations, see [Data Encryption](../concepts/10-encry "api_base": "string", "thinking": false, "max_concurrent": 100, + "max_retries": 3, "extra_headers": {}, "stream": false }, @@ -1058,7 +1068,9 @@ Error: VLM request timeout - Check network connectivity - Increase timeout in config +- For intermittent timeouts, increase `vlm.max_retries` moderately - Try a smaller model +- For bulk ingestion, consider lowering `vlm.max_concurrent` ### Rate Limiting @@ -1067,6 +1079,8 @@ Error: Rate limit exceeded ``` Volcengine has rate limits. Consider batch processing with delays or upgrading your plan. +- Lower `embedding.max_concurrent` / `vlm.max_concurrent` first +- Keep a small `max_retries` value for occasional `429`s; set it to `0` if you prefer fail-fast behavior ## Related Documentation diff --git a/docs/zh/guides/01-configuration.md b/docs/zh/guides/01-configuration.md index 5c2396883..65b6040ba 100644 --- a/docs/zh/guides/01-configuration.md +++ b/docs/zh/guides/01-configuration.md @@ -104,6 +104,7 @@ OpenViking 使用 JSON 配置文件(`ov.conf`)进行设置。配置文件支 { "embedding": { "max_concurrent": 10, + "max_retries": 3, "dense": { "provider": "volcengine", "api_key": "your-api-key", @@ -121,6 +122,7 @@ OpenViking 使用 JSON 配置文件(`ov.conf`)进行设置。配置文件支 | 参数 | 类型 | 说明 | |------|------|------| | `max_concurrent` | int | 最大并发 Embedding 请求数(`embedding.max_concurrent`,默认:`10`) | +| `max_retries` | int | Embedding provider 瞬时错误的最大重试次数(`embedding.max_retries`,默认:`3`;`0` 表示禁用重试) | | `provider` | str | `"volcengine"`、`"openai"`、`"vikingdb"`、`"jina"`、`"voyage"`、`"minimax"` 或 `"gemini"` | | `api_key` | str | API Key | | `model` | str | 模型名称 | @@ -128,6 +130,8 @@ OpenViking 使用 JSON 配置文件(`ov.conf`)进行设置。配置文件支 | `input` | str | 输入类型:`"text"` 或 `"multimodal"` | | `batch_size` | int | 批量请求大小 | +`embedding.max_retries` 仅对瞬时错误生效,例如 `429`、`5xx`、超时和连接错误;`400`、`401`、`403`、`AccountOverdue` 这类永久错误不会自动重试。退避策略为指数退避,初始延迟 `0.5s`,上限 `8s`,并带随机抖动。 + **可用模型** | 模型 | 维度 | 输入类型 | 说明 | @@ -330,7 +334,8 @@ OpenViking 使用 JSON 配置文件(`ov.conf`)进行设置。配置文件支 "provider": "volcengine", "api_key": "your-api-key", "model": "doubao-seed-2-0-pro-260215", - "api_base": "https://ark.cn-beijing.volces.com/api/v3" + "api_base": "https://ark.cn-beijing.volces.com/api/v3", + "max_retries": 3 } } ``` @@ -344,9 +349,12 @@ OpenViking 使用 JSON 配置文件(`ov.conf`)进行设置。配置文件支 | `api_base` | str | API 端点(可选) | | `thinking` | bool | 启用思考模式(仅对部分火山模型生效,默认:`false`) | | `max_concurrent` | int | 语义处理阶段 LLM 最大并发调用数(默认:`100`) | +| `max_retries` | int | VLM provider 瞬时错误的最大重试次数(默认:`3`;`0` 表示禁用重试) | | `extra_headers` | object | 自定义 HTTP 请求头(OpenAI 兼容 provider 可用,可选) | | `stream` | bool | 启用流式模式(OpenAI 兼容 provider 可用,默认:`false`) | +`vlm.max_retries` 仅对瞬时错误生效,例如 `429`、`5xx`、超时和连接错误;认证、鉴权、欠费等永久错误不会自动重试。退避策略为指数退避,初始延迟 `0.5s`,上限 `8s`,并带随机抖动。 + **可用模型** | 模型 | 说明 | @@ -933,6 +941,7 @@ openviking --account acme --user alice --agent-id assistant-2 ls viking:// { "embedding": { "max_concurrent": 10, + "max_retries": 3, "dense": { "provider": "volcengine", "api_key": "string", @@ -948,6 +957,7 @@ openviking --account acme --user alice --agent-id assistant-2 ls viking:// "api_base": "string", "thinking": false, "max_concurrent": 100, + "max_retries": 3, "extra_headers": {}, "stream": false }, @@ -1035,7 +1045,9 @@ Error: VLM request timeout - 检查网络连接 - 增加配置中的超时时间 +- 对偶发超时,适当增大 `vlm.max_retries` - 尝试更小的模型 +- 如为批量导入场景,结合降低 `vlm.max_concurrent` ### 速率限制 @@ -1044,6 +1056,8 @@ Error: Rate limit exceeded ``` 火山引擎有速率限制。考虑批量处理时添加延迟或升级套餐。 +- 优先降低 `embedding.max_concurrent` / `vlm.max_concurrent` +- 对偶发 `429` 可保留少量 `max_retries`;若希望快速失败,可将其设为 `0` ## 相关文档