Skip to content

Commit c1985b0

Browse files
committed
Add support for async callbacks in MessageRouter
This change allows all callback registration methods to accept both synchronous and asynchronous callbacks. The router will automatically detect if a callback is async using asyncio.iscoroutinefunction() and schedule it as a task if needed. This addresses the comment in PR #13 requesting async callback support. Changes: - Add asyncio import - Add _invoke_callback() helper method to handle both sync and async - Update all callback invocation sites to use the helper: - _notify_notebook_activity_observers - _notify_slash_cmd_observers - _notify_chat_init_observers - _notify_msg_observers - _on_chat_reset - _on_notebook_reset
1 parent a6e5f2d commit c1985b0

File tree

1 file changed

+28
-26
lines changed

1 file changed

+28
-26
lines changed

jupyter_ai_router/router.py

Lines changed: 28 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
"""
99

1010
import time
11+
import asyncio
1112
from typing import Any, Callable, Dict, List, TYPE_CHECKING
1213
from functools import partial
1314
import re
@@ -117,6 +118,25 @@ def __init__(self, *args, **kwargs):
117118

118119
self.event_loop.create_task(self._start_observing_global_awareness())
119120

121+
def _invoke_callback(self, callback: Callable, *args, **kwargs) -> None:
122+
"""
123+
Invoke a callback, handling both sync and async callbacks.
124+
125+
Args:
126+
callback: The callback function to invoke
127+
*args: Positional arguments to pass to the callback
128+
**kwargs: Keyword arguments to pass to the callback
129+
"""
130+
try:
131+
if asyncio.iscoroutinefunction(callback):
132+
# For async callbacks, schedule them as a task
133+
self.event_loop.create_task(callback(*args, **kwargs))
134+
else:
135+
# For sync callbacks, call directly
136+
callback(*args, **kwargs)
137+
except Exception as e:
138+
self.log.error(f"Error invoking callback: {e}")
139+
120140
async def _room_id_from_path(self, path: str) -> str | None:
121141
room_id = await self.parent._room_id_from_path(path)
122142
return room_id
@@ -531,10 +551,7 @@ def _notify_notebook_activity_observers(
531551
for observer_id in observer_ids:
532552
if observer_id in self._observer_callbacks:
533553
callback = self._observer_callbacks[observer_id]["callback"]
534-
try:
535-
callback(username, prev_active_cell, notebook_path)
536-
except Exception as e:
537-
self.log.error(f"Notebook activity observer error for {username}: {e}")
554+
self._invoke_callback(callback, username, prev_active_cell, notebook_path)
538555

539556

540557
def connect_chat(self, room_id: str, ychat: "YChat") -> None:
@@ -635,58 +652,43 @@ def _notify_slash_cmd_observers(self, room_id: str, message: Message, clean_comm
635652
for registered_pattern, callbacks in room_observers.items():
636653
if matches_pattern(clean_command, registered_pattern):
637654
for callback in callbacks:
638-
try:
639-
callback(room_id, clean_command, message)
640-
except Exception as e:
641-
self.log.error(f"Slash command observer error for pattern '{registered_pattern}': {e}")
655+
self._invoke_callback(callback, room_id, clean_command, message)
642656

643657
def _notify_chat_init_observers(self, room_id: str, ychat: "YChat") -> None:
644658
"""Notify all new chat observers."""
645659
for callback in self.chat_init_observers:
646-
try:
647-
callback(room_id, ychat)
648-
except Exception as e:
649-
self.log.error(f"New chat observer error for {room_id}: {e}")
660+
self._invoke_callback(callback, room_id, ychat)
650661

651662
def _notify_msg_observers(self, room_id: str, message: Message) -> None:
652663
"""Notify all message observers."""
653664
callbacks = self.chat_msg_observers.get(room_id, [])
654665
for callback in callbacks:
655-
try:
656-
callback(room_id, message)
657-
except Exception as e:
658-
self.log.error(f"Message observer error for {room_id}: {e}")
666+
self._invoke_callback(callback, room_id, message)
659667

660668
def _on_chat_reset(self, room_id, ychat: "YChat") -> None:
661669
"""
662670
Method to call when the YChat undergoes a document reset, e.g. when the
663671
`.chat` file is modified directly on disk.
664-
672+
665673
NOTE: Document resets will only occur when `jupyter_server_documents` is
666674
installed.
667675
"""
668676
self.log.warning(f"Detected `YChat` document reset in room '{room_id}'.")
669677
self.active_chats[room_id] = ychat
670678
for callback in self.chat_reset_observers:
671-
try:
672-
callback(room_id, ychat)
673-
except Exception as e:
674-
self.log.error(f"Reset chat observer error for {room_id}: {e}")
679+
self._invoke_callback(callback, room_id, ychat)
675680

676681
def _on_notebook_reset(self, room_id, ydoc: YBaseDoc) -> None:
677682
"""
678683
Method to call when the YDoc undergoes a document reset, e.g. when the
679684
`.ipynb` file is modified directly on disk.
680-
685+
681686
NOTE: Document resets will only occur when `jupyter_server_documents` is
682687
installed.
683688
"""
684689
self.log.warning(f"Detected `YDoc` document reset in room '{room_id}'.")
685690
for callback in self.notebook_reset_observers:
686-
try:
687-
callback(room_id, ydoc)
688-
except Exception as e:
689-
self.log.error(f"Reset notebook observer error for {room_id}: {e}")
691+
self._invoke_callback(callback, room_id, ydoc)
690692

691693
def _cleanup_rooms(self) -> None:
692694
"""Clean up all room trackers and their subscriptions."""

0 commit comments

Comments
 (0)