Skip to content

Commit 8271ef3

Browse files
authored
fix(security): configurable embedding circuit breaker & log suppression (volcengine#1277)
1 parent 49977f8 commit 8271ef3

File tree

8 files changed

+234
-8
lines changed

8 files changed

+234
-8
lines changed

docs/en/guides/01-configuration.md

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,28 @@ Embedding model configuration for vector search, supporting dense, sparse, and h
130130

131131
`embedding.max_retries` only applies to transient errors such as `429`, `5xx`, timeouts, and connection failures. Permanent errors such as `400`, `401`, `403`, and `AccountOverdue` are not retried automatically. The backoff strategy is exponential backoff with jitter, starting at `0.5s` and capped at `8s`.
132132

133+
#### Embedding Circuit Breaker
134+
135+
When the embedding provider experiences consecutive transient failures (e.g. `429`, `5xx`), OpenViking opens a circuit breaker to temporarily stop calling the provider and re-enqueue embedding tasks. After the base `reset_timeout`, it allows a probe request (HALF_OPEN). If the probe fails, the next `reset_timeout` is doubled (capped by `max_reset_timeout`).
136+
137+
```json
138+
{
139+
"embedding": {
140+
"circuit_breaker": {
141+
"failure_threshold": 5,
142+
"reset_timeout": 60,
143+
"max_reset_timeout": 600
144+
}
145+
}
146+
}
147+
```
148+
149+
| Parameter | Type | Description |
150+
|-----------|------|-------------|
151+
| `circuit_breaker.failure_threshold` | int | Consecutive failures required to open the breaker (default: `5`) |
152+
| `circuit_breaker.reset_timeout` | float | Base reset timeout in seconds (default: `60`) |
153+
| `circuit_breaker.max_reset_timeout` | float | Maximum reset timeout in seconds when backing off (default: `600`) |
154+
133155
**Available Models**
134156

135157
| Model | Dimension | Input Type | Notes |

docs/zh/guides/01-configuration.md

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,28 @@ OpenViking 使用 JSON 配置文件(`ov.conf`)进行设置。配置文件支
132132

133133
`embedding.max_retries` 仅对瞬时错误生效,例如 `429``5xx`、超时和连接错误;`400``401``403``AccountOverdue` 这类永久错误不会自动重试。退避策略为指数退避,初始延迟 `0.5s`,上限 `8s`,并带随机抖动。
134134

135+
#### Embedding 熔断(Circuit Breaker)
136+
137+
当 embedding provider 出现连续瞬时错误(如 `429``5xx`)时,OpenViking 会触发熔断,在一段时间内暂停调用 provider,并将 embedding 任务重新入队。超过基础 `reset_timeout` 后进入 HALF_OPEN,允许一次探测请求;如果探测失败,则下一次 `reset_timeout` 翻倍(上限为 `max_reset_timeout`)。
138+
139+
```json
140+
{
141+
"embedding": {
142+
"circuit_breaker": {
143+
"failure_threshold": 5,
144+
"reset_timeout": 60,
145+
"max_reset_timeout": 600
146+
}
147+
}
148+
}
149+
```
150+
151+
| 参数 | 类型 | 说明 |
152+
|------|------|------|
153+
| `circuit_breaker.failure_threshold` | int | 连续失败多少次后熔断(默认:`5`|
154+
| `circuit_breaker.reset_timeout` | float | 基础恢复等待时间(秒,默认:`60`|
155+
| `circuit_breaker.max_reset_timeout` | float | 指数退避后的最大恢复等待时间(秒,默认:`600`|
156+
135157
**可用模型**
136158

137159
| 模型 | 维度 | 输入类型 | 说明 |

examples/ov.conf.example

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,11 @@
4343
"dimension": 1024,
4444
"provider": "volcengine",
4545
"input": "multimodal"
46+
},
47+
"circuit_breaker": {
48+
"failure_threshold": 5,
49+
"reset_timeout": 60,
50+
"max_reset_timeout": 600
4651
}
4752
},
4853
"embedding_ollama_example": {

openviking/storage/collection_schemas.py

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import hashlib
1212
import json
1313
import threading
14+
import time
1415
from contextlib import nullcontext
1516
from dataclasses import dataclass
1617
from typing import Any, Dict, List, Optional
@@ -175,12 +176,35 @@ def __init__(self, vikingdb: VikingVectorIndexBackend):
175176
self._collection_name = config.storage.vectordb.name
176177
self._vector_dim = config.embedding.dimension
177178
self._initialize_embedder(config)
178-
self._circuit_breaker = CircuitBreaker()
179+
breaker_cfg = config.embedding.circuit_breaker
180+
self._circuit_breaker = CircuitBreaker(
181+
failure_threshold=breaker_cfg.failure_threshold,
182+
reset_timeout=breaker_cfg.reset_timeout,
183+
max_reset_timeout=breaker_cfg.max_reset_timeout,
184+
)
185+
self._breaker_open_last_log_at = 0.0
186+
self._breaker_open_suppressed_count = 0
187+
self._breaker_open_log_interval = 30.0
179188

180189
def _initialize_embedder(self, config: "OpenVikingConfig"):
181190
"""Initialize the embedder instance from config."""
182191
self._embedder = config.embedding.get_embedder()
183192

193+
def _log_breaker_open_reenqueue_summary(self) -> None:
194+
"""Log a throttled warning when embeddings are re-enqueued due to an open circuit breaker."""
195+
now = time.monotonic()
196+
if self._breaker_open_last_log_at == 0.0:
197+
logger.warning("Embedding circuit breaker is open; re-enqueueing messages")
198+
self._breaker_open_last_log_at = now
199+
self._breaker_open_suppressed_count = 0
200+
return
201+
202+
self._breaker_open_suppressed_count += 1
203+
if now - self._breaker_open_last_log_at >= self._breaker_open_log_interval:
204+
logger.warning("Embedding circuit breaker is open; re-enqueueing messages")
205+
self._breaker_open_last_log_at = now
206+
self._breaker_open_suppressed_count = 0
207+
184208
@classmethod
185209
def _merge_request_stats(
186210
cls, telemetry_id: str, processed: int = 0, error_count: int = 0
@@ -258,10 +282,10 @@ async def on_dequeue(self, data: Optional[Dict[str, Any]]) -> Optional[Dict[str,
258282
# Circuit breaker: if API is known-broken, re-enqueue and wait
259283
try:
260284
self._circuit_breaker.check()
285+
self._breaker_open_last_log_at = 0.0
286+
self._breaker_open_suppressed_count = 0
261287
except CircuitBreakerOpen:
262-
logger.warning(
263-
f"Circuit breaker is open, re-enqueueing embedding: {embedding_msg.id}"
264-
)
288+
self._log_breaker_open_reenqueue_summary()
265289
if self._vikingdb.has_queue_manager:
266290
wait = self._circuit_breaker.retry_after
267291
if wait > 0:

openviking/utils/circuit_breaker.py

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,17 @@ class CircuitBreaker:
3333
fails, the breaker reopens.
3434
"""
3535

36-
def __init__(self, failure_threshold: int = 5, reset_timeout: float = 300):
36+
def __init__(
37+
self,
38+
failure_threshold: int = 5,
39+
reset_timeout: float = 300,
40+
max_reset_timeout: float | None = None,
41+
):
3742
self._failure_threshold = failure_threshold
3843
self._reset_timeout = reset_timeout
44+
self._base_reset_timeout = reset_timeout
45+
self._max_reset_timeout = reset_timeout if max_reset_timeout is None else max_reset_timeout
46+
self._current_reset_timeout = reset_timeout
3947
self._lock = threading.Lock()
4048
self._state = _STATE_CLOSED
4149
self._failure_count = 0
@@ -50,12 +58,12 @@ def check(self) -> None:
5058
return # allow probe request
5159
# OPEN — check if timeout elapsed
5260
elapsed = time.monotonic() - self._last_failure_time
53-
if elapsed >= self._reset_timeout:
61+
if elapsed >= self._current_reset_timeout:
5462
self._state = _STATE_HALF_OPEN
5563
logger.info("Circuit breaker transitioning OPEN -> HALF_OPEN (timeout elapsed)")
5664
return
5765
raise CircuitBreakerOpen(
58-
f"Circuit breaker is OPEN, retry after {self._reset_timeout - elapsed:.0f}s"
66+
f"Circuit breaker is OPEN, retry after {self._current_reset_timeout - elapsed:.0f}s"
5967
)
6068

6169
@property
@@ -67,7 +75,7 @@ def retry_after(self) -> float:
6775
with self._lock:
6876
if self._state != _STATE_OPEN:
6977
return 0
70-
remaining = self._reset_timeout - (time.monotonic() - self._last_failure_time)
78+
remaining = self._current_reset_timeout - (time.monotonic() - self._last_failure_time)
7179
return min(max(remaining, 0), 30)
7280

7381
def record_success(self) -> None:
@@ -77,6 +85,7 @@ def record_success(self) -> None:
7785
logger.info("Circuit breaker transitioning HALF_OPEN -> CLOSED (probe succeeded)")
7886
self._failure_count = 0
7987
self._state = _STATE_CLOSED
88+
self._current_reset_timeout = self._base_reset_timeout
8089

8190
def record_failure(self, error: Exception) -> None:
8291
"""Record a failed API call. May trip the breaker."""
@@ -87,18 +96,24 @@ def record_failure(self, error: Exception) -> None:
8796

8897
if self._state == _STATE_HALF_OPEN:
8998
self._state = _STATE_OPEN
99+
self._current_reset_timeout = min(
100+
self._current_reset_timeout * 2,
101+
self._max_reset_timeout,
102+
)
90103
logger.info(
91104
f"Circuit breaker transitioning HALF_OPEN -> OPEN (probe failed: {error})"
92105
)
93106
return
94107

95108
if error_class == "permanent":
96109
self._state = _STATE_OPEN
110+
self._current_reset_timeout = self._base_reset_timeout
97111
logger.info(f"Circuit breaker tripped immediately on permanent error: {error}")
98112
return
99113

100114
if self._failure_count >= self._failure_threshold:
101115
self._state = _STATE_OPEN
116+
self._current_reset_timeout = self._base_reset_timeout
102117
logger.info(
103118
f"Circuit breaker tripped after {self._failure_count} consecutive "
104119
f"failures: {error}"

openviking_cli/utils/config/embedding_config.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -246,6 +246,30 @@ def get_effective_dimension(self) -> int:
246246
return 2048
247247

248248

249+
class EmbeddingCircuitBreakerConfig(BaseModel):
250+
failure_threshold: int = Field(
251+
default=5,
252+
ge=1,
253+
description="Consecutive failures required to open the embedding circuit breaker",
254+
)
255+
reset_timeout: float = Field(
256+
default=60.0,
257+
gt=0,
258+
description="Base circuit breaker reset timeout in seconds",
259+
)
260+
max_reset_timeout: float = Field(
261+
default=600.0,
262+
gt=0,
263+
description="Maximum circuit breaker reset timeout in seconds",
264+
)
265+
266+
@model_validator(mode="after")
267+
def validate_bounds(self):
268+
if self.max_reset_timeout < self.reset_timeout:
269+
raise ValueError("embedding.circuit_breaker.max_reset_timeout must be >= reset_timeout")
270+
return self
271+
272+
249273
class EmbeddingConfig(BaseModel):
250274
"""
251275
Embedding configuration, supports OpenAI, VolcEngine, VikingDB, Jina, Gemini, Voyage, or LiteLLM APIs.
@@ -261,6 +285,9 @@ class EmbeddingConfig(BaseModel):
261285
dense: Optional[EmbeddingModelConfig] = Field(default=None)
262286
sparse: Optional[EmbeddingModelConfig] = Field(default=None)
263287
hybrid: Optional[EmbeddingModelConfig] = Field(default=None)
288+
circuit_breaker: EmbeddingCircuitBreakerConfig = Field(
289+
default_factory=EmbeddingCircuitBreakerConfig
290+
)
264291

265292
max_concurrent: int = Field(
266293
default=10, description="Maximum number of concurrent embedding requests"

tests/storage/test_collection_schemas.py

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import asyncio
55
import inspect
66
import json
7+
import logging
78
from types import SimpleNamespace
89

910
import pytest
@@ -34,6 +35,11 @@ def __init__(self, embedder: _DummyEmbedder, backend: str = "volcengine"):
3435
self.embedding = SimpleNamespace(
3536
dimension=2,
3637
get_embedder=lambda: embedder,
38+
circuit_breaker=SimpleNamespace(
39+
failure_threshold=5,
40+
reset_timeout=60.0,
41+
max_reset_timeout=600.0,
42+
),
3743
)
3844

3945

@@ -50,6 +56,29 @@ def _build_queue_payload() -> dict:
5056
return {"data": json.dumps(msg.to_dict())}
5157

5258

59+
def test_embedding_handler_builds_circuit_breaker_from_config(monkeypatch):
60+
class _DummyVikingDB:
61+
is_closing = False
62+
63+
embedder = _DummyEmbedder()
64+
config = _DummyConfig(embedder)
65+
config.embedding.circuit_breaker = SimpleNamespace(
66+
failure_threshold=7,
67+
reset_timeout=60.0,
68+
max_reset_timeout=600.0,
69+
)
70+
monkeypatch.setattr(
71+
"openviking_cli.utils.config.get_openviking_config",
72+
lambda: config,
73+
)
74+
75+
handler = TextEmbeddingHandler(_DummyVikingDB())
76+
77+
assert handler._circuit_breaker._failure_threshold == 7
78+
assert handler._circuit_breaker._base_reset_timeout == 60.0
79+
assert handler._circuit_breaker._max_reset_timeout == 600.0
80+
81+
5382
@pytest.mark.asyncio
5483
async def test_embedding_handler_skip_all_work_when_manager_is_closing(monkeypatch):
5584
class _ClosingVikingDB:
@@ -79,6 +108,51 @@ async def upsert(self, _data, *, ctx): # pragma: no cover - should never run
79108
assert status["error"] == 0
80109

81110

111+
@pytest.mark.asyncio
112+
async def test_embedding_handler_open_breaker_logs_summary_instead_of_per_item_warning(
113+
monkeypatch, caplog
114+
):
115+
from openviking.utils.circuit_breaker import CircuitBreakerOpen
116+
117+
class _QueueingVikingDB:
118+
is_closing = False
119+
has_queue_manager = True
120+
121+
def __init__(self):
122+
self.enqueued = []
123+
124+
async def enqueue_embedding_msg(self, msg):
125+
self.enqueued.append(msg.id)
126+
return None
127+
128+
embedder = _DummyEmbedder()
129+
monkeypatch.setattr(
130+
"openviking_cli.utils.config.get_openviking_config",
131+
lambda: _DummyConfig(embedder),
132+
)
133+
134+
handler = TextEmbeddingHandler(_QueueingVikingDB())
135+
monkeypatch.setattr(
136+
handler._circuit_breaker,
137+
"check",
138+
lambda: (_ for _ in ()).throw(CircuitBreakerOpen("open")),
139+
)
140+
141+
import openviking.storage.collection_schemas as collection_schemas
142+
143+
collection_schemas.logger.addHandler(caplog.handler)
144+
collection_schemas.logger.setLevel(logging.WARNING)
145+
try:
146+
with caplog.at_level(logging.WARNING):
147+
await handler.on_dequeue(_build_queue_payload())
148+
await handler.on_dequeue(_build_queue_payload())
149+
finally:
150+
collection_schemas.logger.removeHandler(caplog.handler)
151+
152+
warnings = [record.message for record in caplog.records if record.levelno == logging.WARNING]
153+
assert warnings.count("Embedding circuit breaker is open; re-enqueueing messages") == 1
154+
155+
82156
@pytest.mark.asyncio
83157
async def test_embedding_handler_treats_shutdown_write_lock_as_success(monkeypatch):
84158
class _ClosingDuringUpsertVikingDB:

tests/utils/test_circuit_breaker.py

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,43 @@ def test_circuit_breaker_half_open_failure_reopens(monkeypatch):
7474
cb.check()
7575

7676

77+
def test_half_open_failure_doubles_reset_timeout(monkeypatch):
78+
from openviking.utils.circuit_breaker import CircuitBreaker, CircuitBreakerOpen
79+
80+
base = time.monotonic()
81+
cb = CircuitBreaker(failure_threshold=1, reset_timeout=60, max_reset_timeout=240)
82+
cb.record_failure(RuntimeError("429 TooManyRequests"))
83+
84+
monkeypatch.setattr(time, "monotonic", lambda: base + 61)
85+
cb.check()
86+
cb.record_failure(RuntimeError("429 TooManyRequests"))
87+
88+
assert cb._current_reset_timeout == 120
89+
90+
monkeypatch.setattr(time, "monotonic", lambda: base + 61 + 119)
91+
with pytest.raises(CircuitBreakerOpen):
92+
cb.check()
93+
94+
95+
def test_half_open_success_resets_backoff(monkeypatch):
96+
from openviking.utils.circuit_breaker import CircuitBreaker
97+
98+
base = time.monotonic()
99+
cb = CircuitBreaker(failure_threshold=1, reset_timeout=60, max_reset_timeout=240)
100+
cb.record_failure(RuntimeError("500"))
101+
102+
monkeypatch.setattr(time, "monotonic", lambda: base + 61)
103+
cb.check()
104+
cb.record_failure(RuntimeError("500 again"))
105+
assert cb._current_reset_timeout == 120
106+
107+
monkeypatch.setattr(time, "monotonic", lambda: base + 61 + 121)
108+
cb.check()
109+
cb.record_success()
110+
111+
assert cb._current_reset_timeout == 60
112+
113+
77114
def test_permanent_error_trips_immediately():
78115
from openviking.utils.circuit_breaker import CircuitBreaker, CircuitBreakerOpen
79116

0 commit comments

Comments
 (0)