-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcato_svc_runner.py
More file actions
58 lines (47 loc) · 1.93 KB
/
cato_svc_runner.py
File metadata and controls
58 lines (47 loc) · 1.93 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
"""Minimal runner script for the Cato daemon — used by Task Scheduler / NSSM."""
import io
import logging
import os
import sys
from pathlib import Path
os.chdir(r"C:\Users\Administrator\Desktop\Cato")
sys.path.insert(0, r"C:\Users\Administrator\Desktop\Cato")
_DATA_DIR = Path(os.environ.get("APPDATA", Path.home())) / "cato"
_DATA_DIR.mkdir(parents=True, exist_ok=True)
_DAEMON_LOG = _DATA_DIR / "daemon_runner.log"
# Vault password — baked in so no env var needed when run as SYSTEM
os.environ.setdefault("CATO_VAULT_PASSWORD", "mypassword123")
# Hidden/background launches on Windows can have no real stdout/stderr.
if sys.stdout is None:
sys.stdout = open(os.devnull, "w", encoding="utf-8") # type: ignore[assignment]
elif getattr(sys.stdout, "closed", False):
sys.stdout = open(os.devnull, "w", encoding="utf-8") # type: ignore[assignment]
if sys.stderr is None:
sys.stderr = open(_DAEMON_LOG, "a", encoding="utf-8") # type: ignore[assignment]
elif getattr(sys.stderr, "closed", False):
sys.stderr = open(_DAEMON_LOG, "a", encoding="utf-8") # type: ignore[assignment]
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(name)s %(levelname)s %(message)s",
handlers=[
logging.FileHandler(_DAEMON_LOG, encoding="utf-8"),
logging.StreamHandler(sys.stdout),
],
)
try:
from dotenv import load_dotenv
load_dotenv(r"C:\Users\Administrator\Desktop\Cato\.env")
except ImportError:
pass
from cato.cli import CatoConfig, Vault, BudgetManager, _CATO_DIR, _run_daemon, _PID_FILE
vault_path = _CATO_DIR / "vault.enc"
vault = Vault(vault_path=vault_path) if vault_path.exists() else None
config = CatoConfig.load()
budget = BudgetManager(session_cap=config.session_cap, monthly_cap=config.monthly_cap)
if _PID_FILE.exists():
_PID_FILE.unlink(missing_ok=True)
_PID_FILE.write_text(str(os.getpid()))
try:
_run_daemon(config, "claude", "all")
finally:
_PID_FILE.unlink(missing_ok=True)