-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathfog_manager.py
154 lines (120 loc) · 5.01 KB
/
fog_manager.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
import os
import json
import time
import threading
import logging
import traceback # 导入 traceback 模块
from typing import Optional
from .fog_model import FogModel
from .fog_client import FogClient
from .fog_comfy import ComfyUIClient
from .fog_scheduler import FogScheduler
logger = logging.getLogger('ComfyFog')
class FogManager:
"""ComfyFog插件的核心管理类"""
def __init__(self):
"""初始化Fog管理器"""
try:
# 1. 初始化配置
self.config = self._load_config()
# 2. 初始化组件
self.config.get('task_center_url',"https://control.comfyfog.org/schedule/task")
self.client = FogClient(self.config['task_center_url'])
self.scheduler = FogScheduler(self.client)
self.comfy_client = ComfyUIClient()
self.model = FogModel();
# 3. 初始化线程安全锁
self.lock = threading.Lock()
# 4. 启动监控线程
self.running = True
self._start_monitor_thread()
self.history = []
self.max_history = 100
logger.info("FogManager initialized successfully")
except Exception as e:
logger.error(f"FogManager initialization failed: {e}")
self.running = False
raise
def _start_monitor_thread(self):
"""启动监控线程"""
def monitor_loop():
time.sleep(5) # 等待ComfyUI 完成加载并启动
while self.running:
try:
logger.debug(f"-------------------- ComfyFog Task Process Working Start -----------------------\n")
self.model.get_folder_paths_info();
self.config = self._load_config()
if self.scheduler and self.config.get("enabled"):
self.scheduler.process_task()
except Exception as e:
logger.error(f"ComfyFog error in task loop: {e}")
logger.error(traceback.format_exc()) # 打印完整堆栈
time.sleep(1)
self.monitor_thread = threading.Thread(
target=monitor_loop,
name="FogMonitor",
daemon=True
)
self.monitor_thread.start()
# ROUTES API LIST
def get_status(self):
"""获取当前状态"""
with self.lock:
return {
"enabled": self.config.get("enabled", False),
"scheduler_active": bool(self.scheduler),
"current_task": self.scheduler.current_task if self.scheduler else None,
"schedule": self.config.get("schedule", [])
}
def update_config(self, new_config):
"""更新配置"""
with self.lock:
try:
self.config.update(new_config)
self._save_config()
# 如果URL改变,重新初始化client
if 'task_center_url' in new_config:
self.config.get('task_center_url',"https://control.comfyfog.org/schedule/task")
self.client = FogClient(self.config['task_center_url'])
self.scheduler = FogScheduler(self.client)
return {"status": "success"}
except Exception as e:
logger.error(f"Failed to update config: {e}")
return {"status": "error", "message": str(e)}
def __del__(self):
"""清理资源"""
try:
self.running = False
if hasattr(self, 'monitor_thread'):
self.monitor_thread.join(timeout=1)
if hasattr(self, 'client'):
self.client.session.close()
except Exception as e:
logger.error(f"Error during cleanup: {e}")
def _load_config(self):
"""加载配置文件"""
self.config_file = os.path.join(os.path.dirname(__file__), 'config.json')
try:
if os.path.exists(self.config_file):
with open(self.config_file, 'r') as f:
return json.load(f)
return {} # 默认空配置
except Exception as e:
logger.error(f"Error loading config: {e}")
return {}
def _save_config(self):
"""保存配置文件"""
try:
with open(self.config_file, 'w') as f:
json.dump(self.config, f, indent=2)
except Exception as e:
logger.error(f"Error saving config: {e}")
def get_history(self, limit: int = 10, status: Optional[str] = None):
"""获取任务历史"""
filtered_history = self.history
if status:
filtered_history = [h for h in filtered_history if h['status'] == status]
return filtered_history[-limit:]
def clear_history(self):
"""清除历史记录"""
self.history = []