diff --git a/services-preview/markets-service/docs/ORDER_BOOK_AUDIT.md b/services-preview/markets-service/docs/ORDER_BOOK_AUDIT.md index c46445da..7e501db7 100644 --- a/services-preview/markets-service/docs/ORDER_BOOK_AUDIT.md +++ b/services-preview/markets-service/docs/ORDER_BOOK_AUDIT.md @@ -93,6 +93,19 @@ if last_update_id < prev_id: | `order_book_full_written` | full 写入行数 | | `order_book_write_errors` | 写入错误次数 | | `order_book_out_of_order` | 乱序跳过次数 | +| `delay_avg/max` | 数据延迟 (ms) | + +### 3.4 健康检查 + +- **延迟监控**: `receipt_ts - event_ts`,超 5s 告警 +- **心跳检测**: 30s 无数据告警 +- **统计日志**: 每 60s 输出 + +``` +INFO 统计: received=12000, tick=4000, full=800, errors=0, oos=0, delay_avg=50ms, delay_max=200ms +WARN 心跳超时: 35s 无数据 +WARN 高延迟: BTCUSDT delay=6000ms +``` --- @@ -175,3 +188,4 @@ ORDER_BOOK_SYMBOLS= # 可选,逗号分隔 | 2026-01-09 | 修复: 原始格式 `[["price","qty"],...]` | | 2026-01-09 | 修复: lastUpdateId 采集、乱序检测 | | 2026-01-09 | 增强: 错误日志、统计指标 | +| 2026-01-09 | 增强: 延迟监控、心跳检测 | diff --git a/services-preview/markets-service/src/crypto/collectors/order_book.py b/services-preview/markets-service/src/crypto/collectors/order_book.py index 42c48dc4..a375a5cc 100644 --- a/services-preview/markets-service/src/crypto/collectors/order_book.py +++ b/services-preview/markets-service/src/crypto/collectors/order_book.py @@ -271,6 +271,8 @@ async def _on_book(self, book, receipt_ts: float) -> None: return now = time.time() + self._last_msg_time = now # 心跳更新 + ts = datetime.fromtimestamp(book.timestamp, tz=timezone.utc) bids_dict = book.book.bids.to_dict() asks_dict = book.book.asks.to_dict() @@ -278,6 +280,15 @@ async def _on_book(self, book, receipt_ts: float) -> None: # 提取原始元数据 (cryptofeed: sequence_number = lastUpdateId) last_update_id = getattr(book, 'sequence_number', None) + # 延迟监控: receipt_ts - event_ts + delay_ms = int((receipt_ts - book.timestamp) * 1000) + if delay_ms > 0: + self._stats["total_delay_ms"] += delay_ms + if delay_ms > self._stats["max_delay_ms"]: + self._stats["max_delay_ms"] = delay_ms + if delay_ms > 5000: # 延迟超过 5 秒告警 + logger.warning("高延迟: %s delay=%dms", sym, delay_ms) + self._stats["received"] += 1 # 乱序检测: 如果 lastUpdateId 倒退则跳过 @@ -473,10 +484,15 @@ async def _stats_reporter(): while True: await asyncio.sleep(60) s = self._stats + avg_delay = s["total_delay_ms"] // max(s["received"], 1) + # 心跳检测 + idle_sec = int(time.time() - self._last_msg_time) if self._last_msg_time > 0 else 0 + if idle_sec > 30: + logger.warning("心跳超时: %ds 无数据", idle_sec) logger.info( - "统计: received=%d, tick=%d, full=%d, errors=%d, out_of_order=%d", + "统计: received=%d, tick=%d, full=%d, errors=%d, oos=%d, delay_avg=%dms, delay_max=%dms", s["received"], s["written_tick"], s["written_full"], - s["errors"], s["out_of_order"] + s["errors"], s["out_of_order"], avg_delay, s["max_delay_ms"] ) try: @@ -492,10 +508,11 @@ async def _stats_reporter(): def _log_final_stats(self) -> None: """输出最终统计""" s = self._stats + avg_delay = s["total_delay_ms"] // max(s["received"], 1) logger.info( - "采集结束统计: received=%d, tick=%d, full=%d, errors=%d, out_of_order=%d", + "采集结束: received=%d, tick=%d, full=%d, errors=%d, oos=%d, delay_avg=%dms, delay_max=%dms", s["received"], s["written_tick"], s["written_full"], - s["errors"], s["out_of_order"] + s["errors"], s["out_of_order"], avg_delay, s["max_delay_ms"] ) async def _final_flush(self) -> None: