Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions services-preview/markets-service/docs/ORDER_BOOK_AUDIT.md
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,19 @@ if last_update_id < prev_id:
| `order_book_full_written` | full 写入行数 |
| `order_book_write_errors` | 写入错误次数 |
| `order_book_out_of_order` | 乱序跳过次数 |
| `delay_avg/max` | 数据延迟 (ms) |

### 3.4 健康检查

- **延迟监控**: `receipt_ts - event_ts`,超 5s 告警
- **心跳检测**: 30s 无数据告警
- **统计日志**: 每 60s 输出

```
INFO 统计: received=12000, tick=4000, full=800, errors=0, oos=0, delay_avg=50ms, delay_max=200ms
WARN 心跳超时: 35s 无数据
WARN 高延迟: BTCUSDT delay=6000ms
```

---

Expand Down Expand Up @@ -175,3 +188,4 @@ ORDER_BOOK_SYMBOLS= # 可选,逗号分隔
| 2026-01-09 | 修复: 原始格式 `[["price","qty"],...]` |
| 2026-01-09 | 修复: lastUpdateId 采集、乱序检测 |
| 2026-01-09 | 增强: 错误日志、统计指标 |
| 2026-01-09 | 增强: 延迟监控、心跳检测 |
Original file line number Diff line number Diff line change
Expand Up @@ -271,13 +271,24 @@ async def _on_book(self, book, receipt_ts: float) -> None:
return

now = time.time()
self._last_msg_time = now # 心跳更新

ts = datetime.fromtimestamp(book.timestamp, tz=timezone.utc)
bids_dict = book.book.bids.to_dict()
asks_dict = book.book.asks.to_dict()

# 提取原始元数据 (cryptofeed: sequence_number = lastUpdateId)
last_update_id = getattr(book, 'sequence_number', None)

# 延迟监控: receipt_ts - event_ts
delay_ms = int((receipt_ts - book.timestamp) * 1000)
if delay_ms > 0:
self._stats["total_delay_ms"] += delay_ms
if delay_ms > self._stats["max_delay_ms"]:
self._stats["max_delay_ms"] = delay_ms
if delay_ms > 5000: # 延迟超过 5 秒告警
logger.warning("高延迟: %s delay=%dms", sym, delay_ms)

self._stats["received"] += 1

# 乱序检测: 如果 lastUpdateId 倒退则跳过
Expand Down Expand Up @@ -473,10 +484,15 @@ async def _stats_reporter():
while True:
await asyncio.sleep(60)
s = self._stats
avg_delay = s["total_delay_ms"] // max(s["received"], 1)
# 心跳检测
idle_sec = int(time.time() - self._last_msg_time) if self._last_msg_time > 0 else 0
if idle_sec > 30:
logger.warning("心跳超时: %ds 无数据", idle_sec)
logger.info(
"统计: received=%d, tick=%d, full=%d, errors=%d, out_of_order=%d",
"统计: received=%d, tick=%d, full=%d, errors=%d, oos=%d, delay_avg=%dms, delay_max=%dms",
s["received"], s["written_tick"], s["written_full"],
s["errors"], s["out_of_order"]
s["errors"], s["out_of_order"], avg_delay, s["max_delay_ms"]
)

try:
Expand All @@ -492,10 +508,11 @@ async def _stats_reporter():
def _log_final_stats(self) -> None:
"""输出最终统计"""
s = self._stats
avg_delay = s["total_delay_ms"] // max(s["received"], 1)
logger.info(
"采集结束统计: received=%d, tick=%d, full=%d, errors=%d, out_of_order=%d",
"采集结束: received=%d, tick=%d, full=%d, errors=%d, oos=%d, delay_avg=%dms, delay_max=%dms",
s["received"], s["written_tick"], s["written_full"],
s["errors"], s["out_of_order"]
s["errors"], s["out_of_order"], avg_delay, s["max_delay_ms"]
)

async def _final_flush(self) -> None:
Expand Down
Loading