Skip to content

Commit c945748

Browse files
committed
fix: timeout processing lmao requests
1 parent 7f41e96 commit c945748

4 files changed

+98
-92
lines changed

lmao_process_loop.py

+6-4
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,8 @@ def _lmao_stop_stream_loop() -> None:
146146
logging.warning(f"Exit from {name} loop requested")
147147
break
148148

149+
request_response = None
150+
149151
try:
150152
# Wait a bit to prevent overloading
151153
# We need to wait at the beginning to enable delay even after exception
@@ -258,11 +260,9 @@ def _lmao_stop_stream_loop() -> None:
258260
break
259261

260262
# Save conversation ID
263+
logging.info(f"Saving user {request_response.user_id} conversation ID as: name_{conversation_id}")
261264
users_handler_.set_key(request_response.user_id, name + "_conversation_id", conversation_id)
262265

263-
# Return container
264-
lmao_response_queue.put(request_response)
265-
266266
# Non-blocking get of user_id to clear conversation for
267267
delete_conversation_user_id = None
268268
try:
@@ -297,10 +297,12 @@ def _lmao_stop_stream_loop() -> None:
297297
logging.error(f"{name} error", exc_info=e)
298298
lmao_exceptions_queue.put(e)
299299

300-
# Read module's status
300+
# Read module's status and return the container
301301
finally:
302302
with lmao_module_status.get_lock():
303303
lmao_module_status.value = module.status
304+
if request_response:
305+
lmao_response_queue.put(request_response)
304306

305307
# Wait for stop handler to finish
306308
if stop_handler_thread and stop_handler_thread.is_alive():

lmao_process_loop_web.py

+80-75
Original file line numberDiff line numberDiff line change
@@ -372,6 +372,9 @@ def _lmao_stop_stream_loop() -> None:
372372
logging.warning(f"Exit from {name} loop requested")
373373
break
374374

375+
release_lock = False
376+
request_response = None
377+
375378
try:
376379
# Wait a bit to prevent overloading
377380
# We need to wait at the beginning to enable delay even after exception
@@ -435,89 +438,83 @@ def _lmao_stop_stream_loop() -> None:
435438
users_handler_.set_key(request_response.user_id, "suggestions", [])
436439

437440
# Ask and read stream
438-
try:
439-
for line in _request_wrapper(
440-
api_url,
441-
"ask",
442-
{name_lmao: module_request},
443-
cooldown_timer,
444-
request_lock,
445-
proxy,
446-
token,
447-
stream=True,
448-
).iter_lines():
449-
if not line:
450-
continue
451-
try:
452-
response = json.loads(line.decode("utf-8"))
453-
except Exception as e:
454-
logging.warning(f"Unable to parse response line as JSON: {e}")
455-
continue
456-
457-
finished = response.get("finished")
458-
conversation_id = response.get("conversation_id")
459-
request_response.response_text = response.get("response")
460-
461-
images = response.get("images")
462-
if images is not None:
463-
request_response.response_images = images[:]
464-
465-
# Format and add attributions
466-
attributions = response.get("attributions")
467-
if attributions is not None and len(attributions) != 0:
468-
response_link_format = messages_.get_message(
469-
"response_link_format", user_id=request_response.user_id
441+
release_lock = True
442+
for line in _request_wrapper(
443+
api_url,
444+
"ask",
445+
{name_lmao: module_request},
446+
cooldown_timer,
447+
request_lock,
448+
proxy,
449+
token,
450+
stream=True,
451+
).iter_lines():
452+
if not line:
453+
continue
454+
try:
455+
response = json.loads(line.decode("utf-8"))
456+
except Exception as e:
457+
logging.warning(f"Unable to parse response line as JSON: {e}")
458+
continue
459+
460+
finished = response.get("finished")
461+
conversation_id = response.get("conversation_id")
462+
request_response.response_text = response.get("response")
463+
464+
images = response.get("images")
465+
if images is not None:
466+
request_response.response_images = images[:]
467+
468+
# Format and add attributions
469+
attributions = response.get("attributions")
470+
if attributions is not None and len(attributions) != 0:
471+
response_link_format = messages_.get_message(
472+
"response_link_format", user_id=request_response.user_id
473+
)
474+
request_response.response_text += "\n"
475+
for i, attribution in enumerate(attributions):
476+
request_response.response_text += response_link_format.format(
477+
source_name=str(i + 1), link=attribution.get("url", "")
470478
)
471-
request_response.response_text += "\n"
472-
for i, attribution in enumerate(attributions):
473-
request_response.response_text += response_link_format.format(
474-
source_name=str(i + 1), link=attribution.get("url", "")
475-
)
476479

477-
# Suggestions must be stored as tuples with unique ID for reply-markup
478-
if finished:
479-
suggestions = response.get("suggestions")
480-
if suggestions is not None:
481-
request_response.response_suggestions = []
482-
for suggestion in suggestions:
483-
if not suggestion or len(suggestion) < 1:
484-
continue
485-
id_ = "".join(
486-
random.choices(
487-
string.ascii_uppercase + string.ascii_lowercase + string.digits, k=8
488-
)
480+
# Suggestions must be stored as tuples with unique ID for reply-markup
481+
if finished:
482+
suggestions = response.get("suggestions")
483+
if suggestions is not None:
484+
request_response.response_suggestions = []
485+
for suggestion in suggestions:
486+
if not suggestion or len(suggestion) < 1:
487+
continue
488+
id_ = "".join(
489+
random.choices(
490+
string.ascii_uppercase + string.ascii_lowercase + string.digits, k=8
489491
)
490-
request_response.response_suggestions.append((id_, suggestion))
491-
users_handler_.set_key(
492-
request_response.user_id,
493-
"suggestions",
494-
request_response.response_suggestions,
495492
)
493+
request_response.response_suggestions.append((id_, suggestion))
494+
users_handler_.set_key(
495+
request_response.user_id,
496+
"suggestions",
497+
request_response.response_suggestions,
498+
)
496499

497-
# Check if exit was requested
498-
with lmao_process_running.get_lock():
499-
lmao_process_running_value = lmao_process_running.value
500-
if not lmao_process_running_value:
501-
finished = True
502-
503-
# Send response to the user
504-
async_helper(
505-
send_message_async(config.get("telegram"), messages_, request_response, end=finished)
506-
)
507-
508-
# Exit from stream reader
509-
if not lmao_process_running_value:
510-
break
500+
# Check if exit was requested
501+
with lmao_process_running.get_lock():
502+
lmao_process_running_value = lmao_process_running.value
503+
if not lmao_process_running_value:
504+
finished = True
511505

512-
# Save conversation ID
513-
users_handler_.set_key(request_response.user_id, name + "_conversation_id", conversation_id)
506+
# Send response to the user
507+
async_helper(
508+
send_message_async(config.get("telegram"), messages_, request_response, end=finished)
509+
)
514510

515-
# Return container
516-
lmao_response_queue.put(request_response)
511+
# Exit from stream reader
512+
if not lmao_process_running_value:
513+
break
517514

518-
# Release lock after stream stop
519-
finally:
520-
request_lock.release()
515+
# Save conversation ID
516+
logging.info(f"Saving user {request_response.user_id} conversation ID as: name_{conversation_id}")
517+
users_handler_.set_key(request_response.user_id, name + "_conversation_id", conversation_id)
521518

522519
# Non-blocking get of user_id to clear conversation for
523520
delete_conversation_user_id = None
@@ -580,6 +577,14 @@ def _lmao_stop_stream_loop() -> None:
580577
logging.error(f"{name} error", exc_info=e)
581578
lmao_exceptions_queue.put(e)
582579

580+
# Release lock if needed and return the container
581+
finally:
582+
if release_lock:
583+
request_lock.release()
584+
release_lock = False
585+
if request_response:
586+
lmao_response_queue.put(request_response)
587+
583588
# Read module status
584589
try:
585590
_check_status(status_dict, api_url, cooldown_timer, request_lock, proxy, token)

module_wrapper_global.py

+11-12
Original file line numberDiff line numberDiff line change
@@ -289,8 +289,8 @@ def process_request(self, request_response: request_response_container.RequestRe
289289
self._lmao_request_queue.put(request_response)
290290

291291
# Wait until it's processed or failed
292-
logging.info(f"Waiting for {self.name} request to be processed")
293-
time.sleep(1)
292+
logging.info(f"Waiting for {self.name} request to be processed (waiting for container)")
293+
response_ = None
294294
while True:
295295
# Check process
296296
with self._lmao_process_running.get_lock():
@@ -307,21 +307,18 @@ def process_request(self, request_response: request_response_container.RequestRe
307307
if lmao_exception is not None:
308308
raise lmao_exception
309309

310-
# Check status
311-
with self._lmao_module_status.get_lock():
312-
module_status = self._lmao_module_status.value
313-
if module_status == STATUS_IDLE:
310+
# Try to get container back
311+
try:
312+
response_ = self._lmao_response_queue.get(block=False)
313+
except queue.Empty:
314+
pass
315+
if response_:
316+
logging.info(f"Received container back from {self.name} process")
314317
break
315318

316319
time.sleep(LMAO_LOOP_DELAY)
317320

318321
# Update container
319-
# TODO: Optimize this
320-
response_ = None
321-
try:
322-
response_ = self._lmao_response_queue.get(block=True, timeout=1)
323-
except queue.Empty:
324-
logging.warning(f"Cannot get container back from {self.name} process")
325322
if response_:
326323
request_response.response_text = response_.response_text
327324
for response_image in response_.response_images:
@@ -335,6 +332,8 @@ def process_request(self, request_response: request_response_container.RequestRe
335332
request_response.error = response_.error
336333
request_response.response_next_chunk_start_index = response_.response_next_chunk_start_index
337334
request_response.response_sent_len = response_.response_sent_len
335+
else:
336+
logging.warning(f"Unable to get container back from {self.name} process")
338337

339338
##########
340339
# Gemini #

request_response_container.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333
PROCESSING_STATE_ABORT = 7
3434

3535
# State to string
36-
PROCESSING_STATE_NAMES = ["Waiting", "Starting", "Active", "Done", "Timed out", "Canceling", "Canceling"]
36+
PROCESSING_STATE_NAMES = ["Waiting", "Starting", "Active", "Done", "Timed out", "Canceling", "Canceling", "Abort"]
3737

3838

3939
class RequestResponseContainer:

0 commit comments

Comments
 (0)