-
Notifications
You must be signed in to change notification settings - Fork 16
fix: add backend startup heartbeat liveness probe #114
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 12 commits
0f70fb0
068564a
011aeaa
c72be6e
d8f8200
8638ba9
cb423aa
b64664e
47f00a0
e43de52
4f4c5c9
15f582b
46e2f13
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,14 +1,22 @@ | ||
| from __future__ import annotations | ||
|
|
||
| import atexit | ||
| import ctypes | ||
| import json | ||
| import os | ||
| import runpy | ||
| import sys | ||
| import threading | ||
| import time | ||
| from pathlib import Path | ||
|
|
||
| BACKEND_DIR = Path(__file__).resolve().parent | ||
| APP_DIR = BACKEND_DIR / "app" | ||
| _WINDOWS_DLL_DIRECTORY_HANDLES: list[object] = [] | ||
| # Keep this in sync with BACKEND_STARTUP_HEARTBEAT_PATH_ENV in src-tauri/src/app_constants.rs. | ||
| STARTUP_HEARTBEAT_ENV = "ASTRBOT_BACKEND_STARTUP_HEARTBEAT_PATH" | ||
| STARTUP_HEARTBEAT_INTERVAL_SECONDS = 2.0 | ||
| STARTUP_HEARTBEAT_STOP_JOIN_TIMEOUT_SECONDS = 1.0 | ||
|
|
||
|
|
||
| def configure_stdio_utf8() -> None: | ||
|
|
@@ -113,15 +121,113 @@ def preload_windows_runtime_dlls() -> None: | |
| continue | ||
|
|
||
|
|
||
| configure_stdio_utf8() | ||
| configure_windows_dll_search_path() | ||
| preload_windows_runtime_dlls() | ||
| def resolve_startup_heartbeat_path() -> Path | None: | ||
| raw = os.environ.get(STARTUP_HEARTBEAT_ENV, "").strip() | ||
| if not raw: | ||
| return None | ||
| return Path(raw) | ||
|
|
||
|
|
||
| def build_heartbeat_payload(state: str) -> dict[str, object]: | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. issue (complexity): Consider simplifying the heartbeat implementation by inlining helpers, reducing state in the loop, and centralizing logging policy to make the code easier to follow. You can simplify the heartbeat implementation without changing behavior in any meaningful way for this feature. 1. Flatten
|
||
| return { | ||
| "pid": os.getpid(), | ||
| "state": state, | ||
| "updated_at_ms": int(time.time() * 1000), | ||
| } | ||
|
|
||
|
|
||
| def atomic_write_json(path: Path, payload: dict[str, object]) -> None: | ||
| temp_path = path.with_name(f"{path.name}.tmp") | ||
| temp_path.write_text( | ||
| json.dumps(payload, separators=(",", ":")), | ||
| encoding="utf-8", | ||
| ) | ||
| temp_path.replace(path) | ||
sourcery-ai[bot] marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
|
|
||
| def write_startup_heartbeat( | ||
| path: Path, state: str, *, warn_on_error: bool = False | ||
| ) -> bool: | ||
| try: | ||
| path.parent.mkdir(parents=True, exist_ok=True) | ||
| atomic_write_json(path, build_heartbeat_payload(state)) | ||
| return True | ||
| except Exception as exc: | ||
| if warn_on_error: | ||
| print( | ||
| f"[startup-heartbeat] failed to write heartbeat to {path}: {exc.__class__.__name__}: {exc}", | ||
| file=sys.stderr, | ||
| ) | ||
| return False | ||
|
|
||
|
|
||
| def heartbeat_loop( | ||
sourcery-ai[bot] marked this conversation as resolved.
Show resolved
Hide resolved
sourcery-ai[bot] marked this conversation as resolved.
Show resolved
Hide resolved
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. issue (complexity): Consider simplifying the heartbeat warning logic and inlining the payload builder to make the heartbeat flow easier to follow and maintain. You can reduce complexity in two small, targeted ways without changing the overall behavior (periodic atomic JSON writes + “stopping” heartbeat). 1. Simplify heartbeat loop warning logicThe current For example, throttle warnings to at most once every 10 seconds: WARNING_THROTTLE_SECONDS = 10.0
def heartbeat_loop(
path: Path, interval_seconds: float, stop_event: threading.Event
) -> None:
last_warning_time: float | None = None
while not stop_event.wait(interval_seconds):
now = time.time()
warn_now = (
last_warning_time is None
or (now - last_warning_time) >= WARNING_THROTTLE_SECONDS
)
ok = write_startup_heartbeat(path, "starting", warn_on_error=warn_now)
if warn_now and not ok:
last_warning_time = nowThis keeps log volume bounded while removing 2. Inline
|
||
| path: Path, interval_seconds: float, stop_event: threading.Event | ||
| ) -> None: | ||
| # At least one successful write has happened. | ||
| had_successful_write = False | ||
| # A warning has already been emitted since the last successful write. | ||
| warning_emitted_since_last_success = False | ||
|
|
||
| def should_warn() -> bool: | ||
| # Before the first successful heartbeat we want every failure to surface so startup | ||
| # path/permission issues stay visible. After a success, only warn on the first failure in | ||
| # each consecutive failure run to avoid log spam. | ||
| return (not had_successful_write) or (not warning_emitted_since_last_success) | ||
|
|
||
| ok = write_startup_heartbeat(path, "starting", warn_on_error=True) | ||
| if ok: | ||
| had_successful_write = True | ||
| else: | ||
| warning_emitted_since_last_success = True | ||
|
|
||
| while not stop_event.wait(interval_seconds): | ||
| warn_now = should_warn() | ||
| ok = write_startup_heartbeat(path, "starting", warn_on_error=warn_now) | ||
| if ok: | ||
| had_successful_write = True | ||
| warning_emitted_since_last_success = False | ||
| elif warn_now: | ||
| warning_emitted_since_last_success = True | ||
|
|
||
|
|
||
| def start_startup_heartbeat() -> None: | ||
sourcery-ai[bot] marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| heartbeat_path = resolve_startup_heartbeat_path() | ||
| if heartbeat_path is None: | ||
| return | ||
|
|
||
| stop_event = threading.Event() | ||
| thread = threading.Thread( | ||
| target=heartbeat_loop, | ||
| args=(heartbeat_path, STARTUP_HEARTBEAT_INTERVAL_SECONDS, stop_event), | ||
| name="astrbot-startup-heartbeat", | ||
| daemon=True, | ||
| ) | ||
|
|
||
| def on_exit() -> None: | ||
| stop_event.set() | ||
| thread.join(timeout=STARTUP_HEARTBEAT_STOP_JOIN_TIMEOUT_SECONDS) | ||
| write_startup_heartbeat(heartbeat_path, "stopping", warn_on_error=True) | ||
|
|
||
| atexit.register(on_exit) | ||
| thread.start() | ||
sourcery-ai[bot] marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
|
|
||
| def main() -> None: | ||
| configure_stdio_utf8() | ||
| configure_windows_dll_search_path() | ||
| preload_windows_runtime_dlls() | ||
| start_startup_heartbeat() | ||
|
|
||
| sys.path.insert(0, str(APP_DIR)) | ||
|
|
||
| main_file = APP_DIR / "main.py" | ||
| if not main_file.is_file(): | ||
| raise FileNotFoundError(f"Backend entrypoint not found: {main_file}") | ||
|
|
||
| sys.path.insert(0, str(APP_DIR)) | ||
| sys.argv[0] = str(main_file) | ||
| runpy.run_path(str(main_file), run_name="__main__") | ||
|
|
||
| main_file = APP_DIR / "main.py" | ||
| if not main_file.is_file(): | ||
| raise FileNotFoundError(f"Backend entrypoint not found: {main_file}") | ||
|
|
||
| sys.argv[0] = str(main_file) | ||
| runpy.run_path(str(main_file), run_name="__main__") | ||
| if __name__ == "__main__": | ||
| main() | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,91 @@ | ||
| import importlib.util | ||
| import unittest | ||
| from pathlib import Path | ||
| from unittest import mock | ||
|
|
||
|
|
||
| MODULE_PATH = Path(__file__).with_name("launch_backend.py") | ||
| SPEC = importlib.util.spec_from_file_location("launch_backend_under_test", MODULE_PATH) | ||
| if SPEC is None or SPEC.loader is None: | ||
| raise RuntimeError(f"Cannot load launch_backend module from {MODULE_PATH}") | ||
| launch_backend = importlib.util.module_from_spec(SPEC) | ||
| SPEC.loader.exec_module(launch_backend) | ||
|
|
||
|
|
||
| class StartupHeartbeatTests(unittest.TestCase): | ||
| def test_repeated_failures_warn_before_first_success(self) -> None: | ||
| stop_event = mock.Mock() | ||
| stop_event.wait.side_effect = [False, True] | ||
|
|
||
| with mock.patch.object( | ||
| launch_backend, | ||
| "write_startup_heartbeat", | ||
| side_effect=[False, False], | ||
| ) as write_mock: | ||
| launch_backend.heartbeat_loop(Path("/tmp/heartbeat.json"), 2.0, stop_event) | ||
|
|
||
| self.assertEqual( | ||
| [call.kwargs["warn_on_error"] for call in write_mock.call_args_list], | ||
| [True, True], | ||
| ) | ||
|
|
||
| def test_repeated_failures_after_success_are_suppressed(self) -> None: | ||
| stop_event = mock.Mock() | ||
| stop_event.wait.side_effect = [False, False, True] | ||
|
|
||
| with mock.patch.object( | ||
| launch_backend, | ||
| "write_startup_heartbeat", | ||
| side_effect=[True, False, False], | ||
| ) as write_mock: | ||
| launch_backend.heartbeat_loop(Path("/tmp/heartbeat.json"), 2.0, stop_event) | ||
|
|
||
| self.assertEqual( | ||
| [call.kwargs["warn_on_error"] for call in write_mock.call_args_list], | ||
| [True, True, False], | ||
| ) | ||
|
|
||
| def test_stop_failure_still_warns_after_earlier_failure(self) -> None: | ||
| stop_event = mock.Mock() | ||
| thread = mock.Mock() | ||
| register = mock.Mock() | ||
|
|
||
| with mock.patch.object( | ||
| launch_backend, | ||
| "write_startup_heartbeat", | ||
| return_value=False, | ||
| ) as write_mock: | ||
| with mock.patch.object( | ||
| launch_backend, | ||
| "resolve_startup_heartbeat_path", | ||
| return_value=Path("/tmp/heartbeat.json"), | ||
| ): | ||
| with mock.patch.object( | ||
| launch_backend.threading, "Event", return_value=stop_event | ||
| ): | ||
| with mock.patch.object( | ||
| launch_backend.threading, "Thread", return_value=thread | ||
| ): | ||
| with mock.patch.object( | ||
| launch_backend.atexit, "register", register | ||
| ): | ||
| launch_backend.start_startup_heartbeat() | ||
| thread.join.assert_not_called() | ||
| on_exit = register.call_args.args[0] | ||
| on_exit() | ||
|
|
||
| thread.join.assert_called_once_with( | ||
| timeout=launch_backend.STARTUP_HEARTBEAT_STOP_JOIN_TIMEOUT_SECONDS | ||
| ) | ||
| self.assertEqual( | ||
| [call.args[1] for call in write_mock.call_args_list], | ||
| ["stopping"], | ||
| ) | ||
| self.assertEqual( | ||
| [call.kwargs["warn_on_error"] for call in write_mock.call_args_list], | ||
| [True], | ||
| ) | ||
|
|
||
|
|
||
| if __name__ == "__main__": | ||
| unittest.main() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
issue (complexity): Consider inlining the heartbeat JSON construction and atomic write logic directly into write_startup_heartbeat to avoid unnecessary helper indirection.
You can reduce indirection without changing behavior by inlining the tightly-coupled helpers into
write_startup_heartbeat.Right now:
These helpers are only used together in a single call path and don’t provide reuse elsewhere, so they add mental hops without buying much.
You can keep the atomic write semantics and JSON format but collapse this into a single function:
Then you can remove
build_heartbeat_payloadandatomic_write_jsonentirely. This makes the heartbeat write path self-contained and easier to scan, while preserving all existing behavior (atomic replace, payload shape, logging).