|
8 | 8 |
|
9 | 9 | from collections import defaultdict |
10 | 10 |
|
11 | | -from datetime import datetime |
12 | 11 | import json |
13 | 12 | from uuid import UUID |
14 | 13 | from fastapi import HTTPException |
@@ -114,8 +113,8 @@ class CrawlSpec(BaseModel): |
114 | 113 | started: str |
115 | 114 | stopping: bool = False |
116 | 115 | scheduled: bool = False |
117 | | - expire_time: Optional[datetime] = None |
118 | | - max_crawl_size: Optional[int] = None |
| 116 | + timeout: int = 0 |
| 117 | + max_crawl_size: int = 0 |
119 | 118 |
|
120 | 119 |
|
121 | 120 | # ============================================================================ |
@@ -232,9 +231,15 @@ class CrawlStatus(BaseModel): |
232 | 231 | restartTime: Optional[str] |
233 | 232 | canceled: bool = False |
234 | 233 |
|
235 | | - # Execution Time -- updated on pod exits and at regular interval |
| 234 | + # updated on pod exits and at regular interval |
| 235 | + # Crawl Execution Time -- time all crawler pods have been running |
| 236 | + # used to track resource usage and enforce execution minutes limit |
236 | 237 | crawlExecTime: int = 0 |
237 | 238 |
|
| 239 | + # Elapsed Exec Time -- time crawl has been running in at least one pod |
| 240 | + # used for crawl timeouts |
| 241 | + elapsedCrawlTime: int = 0 |
| 242 | + |
238 | 243 | # last exec time update |
239 | 244 | lastUpdatedTime: str = "" |
240 | 245 |
|
@@ -440,7 +445,7 @@ async def sync_crawls(self, data: MCSyncData): |
440 | 445 | scale=spec.get("scale", 1), |
441 | 446 | started=data.parent["metadata"]["creationTimestamp"], |
442 | 447 | stopping=spec.get("stopping", False), |
443 | | - expire_time=from_k8s_date(spec.get("expireTime")), |
| 448 | + timeout=spec.get("timeout") or 0, |
444 | 449 | max_crawl_size=int(spec.get("maxCrawlSize") or 0), |
445 | 450 | scheduled=spec.get("manual") != "1", |
446 | 451 | ) |
@@ -1081,6 +1086,7 @@ async def increment_pod_exec_time( |
1081 | 1086 | return |
1082 | 1087 |
|
1083 | 1088 | exec_time = 0 |
| 1089 | + max_duration = 0 |
1084 | 1090 | print( |
1085 | 1091 | f"Exec Time Update: {reason}: {now} - {update_start_time} = {update_duration}" |
1086 | 1092 | ) |
@@ -1131,11 +1137,13 @@ async def increment_pod_exec_time( |
1131 | 1137 | f" - {name}: {pod_state}: {end_time} - {start_time} = {duration}" |
1132 | 1138 | ) |
1133 | 1139 | exec_time += duration |
| 1140 | + max_duration = max(duration, max_duration) |
1134 | 1141 |
|
1135 | 1142 | if exec_time: |
1136 | 1143 | await self.crawl_ops.inc_crawl_exec_time(crawl_id, exec_time) |
1137 | 1144 | await self.org_ops.inc_org_time_stats(oid, exec_time, True) |
1138 | 1145 | status.crawlExecTime += exec_time |
| 1146 | + status.elapsedCrawlTime += max_duration |
1139 | 1147 |
|
1140 | 1148 | print( |
1141 | 1149 | f" Exec Time Total: {status.crawlExecTime}, Incremented By: {exec_time}", |
@@ -1254,20 +1262,27 @@ async def add_file_to_crawl(self, cc_data, crawl, redis): |
1254 | 1262 |
|
1255 | 1263 | return True |
1256 | 1264 |
|
1257 | | - def is_crawl_stopping(self, crawl, size): |
| 1265 | + def is_crawl_stopping(self, crawl: CrawlSpec, status: CrawlStatus) -> bool: |
1258 | 1266 | """return true if crawl should begin graceful stopping phase""" |
1259 | 1267 |
|
1260 | 1268 | # if user requested stop, then enter stopping phase |
1261 | 1269 | if crawl.stopping: |
1262 | 1270 | print("Graceful Stop: User requested stop") |
1263 | 1271 | return True |
1264 | 1272 |
|
1265 | | - # check crawl expiry |
1266 | | - if crawl.expire_time and dt_now() > crawl.expire_time: |
1267 | | - print(f"Graceful Stop: Job duration expired at {crawl.expire_time}") |
1268 | | - return True |
| 1273 | + # check timeout if timeout time exceeds elapsed time |
| 1274 | + if crawl.timeout: |
| 1275 | + elapsed = ( |
| 1276 | + status.elapsedCrawlTime |
| 1277 | + + (dt_now() - from_k8s_date(status.lastUpdatedTime)).total_seconds() |
| 1278 | + ) |
| 1279 | + if elapsed > crawl.timeout: |
| 1280 | + print( |
| 1281 | + f"Graceful Stop: Crawl running time exceeded {crawl.timeout} second timeout" |
| 1282 | + ) |
| 1283 | + return True |
1269 | 1284 |
|
1270 | | - if crawl.max_crawl_size and size > crawl.max_crawl_size: |
| 1285 | + if crawl.max_crawl_size and status.size > crawl.max_crawl_size: |
1271 | 1286 | print(f"Graceful Stop: Maximum crawl size {crawl.max_crawl_size} hit") |
1272 | 1287 | return True |
1273 | 1288 |
|
@@ -1311,7 +1326,7 @@ async def update_crawl_state(self, redis, crawl, status, pods, done) -> CrawlSta |
1311 | 1326 | pod_info = status.podStatus[key] |
1312 | 1327 | pod_info.used.storage = value |
1313 | 1328 |
|
1314 | | - status.stopping = self.is_crawl_stopping(crawl, status.size) |
| 1329 | + status.stopping = self.is_crawl_stopping(crawl, status) |
1315 | 1330 |
|
1316 | 1331 | # check exec time quotas and stop if reached limit |
1317 | 1332 | if not status.stopping: |
|
0 commit comments