Skip to content

Commit

Permalink
Eventloop scheduling improvements for stop_on_error_timeout and sched…
Browse files Browse the repository at this point in the history
…ule_next (#1212)

Co-authored-by: Steven Silvester <[email protected]>
  • Loading branch information
jdranczewski and blink1073 authored Feb 25, 2024
1 parent 2071a88 commit de2221c
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 26 deletions.
69 changes: 50 additions & 19 deletions ipykernel/eventloops.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,15 +82,19 @@ def _notify_stream_qt(kernel):
def enum_helper(name):
return operator.attrgetter(name.rpartition(".")[0])(sys.modules[QtCore.__package__])

def exit_loop():
"""fall back to main loop"""
kernel._qt_notifier.setEnabled(False)
kernel.app.qt_event_loop.quit()

def process_stream_events():
"""fall back to main loop when there's a socket event"""
# call flush to ensure that the stream doesn't lose events
# due to our consuming of the edge-triggered FD
# flush returns the number of events consumed.
# if there were any, wake it up
if kernel.shell_stream.flush(limit=1):
kernel._qt_notifier.setEnabled(False)
kernel.app.qt_event_loop.quit()
exit_loop()

if not hasattr(kernel, "_qt_notifier"):
fd = kernel.shell_stream.getsockopt(zmq.FD)
Expand All @@ -101,18 +105,31 @@ def process_stream_events():
else:
kernel._qt_notifier.setEnabled(True)

# allow for scheduling exits from the loop in case a timeout needs to
# be set from the kernel level
def _schedule_exit(delay):
"""schedule fall back to main loop in [delay] seconds"""
# The signatures of QtCore.QTimer.singleShot are inconsistent between PySide and PyQt
# if setting the TimerType, so we create a timer explicitly and store it
# to avoid a memory leak.
# PreciseTimer is needed so we exit after _at least_ the specified delay, not within 5% of it
if not hasattr(kernel, "_qt_timer"):
kernel._qt_timer = QtCore.QTimer(kernel.app)
kernel._qt_timer.setSingleShot(True)
kernel._qt_timer.setTimerType(enum_helper("QtCore.Qt.TimerType").PreciseTimer)
kernel._qt_timer.timeout.connect(exit_loop)
kernel._qt_timer.start(int(1000 * delay))

loop_qt._schedule_exit = _schedule_exit

# there may already be unprocessed events waiting.
# these events will not wake zmq's edge-triggered FD
# since edge-triggered notification only occurs on new i/o activity.
# process all the waiting events immediately
# so we start in a clean state ensuring that any new i/o events will notify.
# schedule first call on the eventloop as soon as it's running,
# so we don't block here processing events
if not hasattr(kernel, "_qt_timer"):
kernel._qt_timer = QtCore.QTimer(kernel.app)
kernel._qt_timer.setSingleShot(True)
kernel._qt_timer.timeout.connect(process_stream_events)
kernel._qt_timer.start(0)
QtCore.QTimer.singleShot(0, process_stream_events)


@register_integration("qt", "qt5", "qt6")
Expand Down Expand Up @@ -229,23 +246,33 @@ def __init__(self, app):
self.app = app
self.app.withdraw()

def process_stream_events(stream, *a, **kw):
def exit_loop():
"""fall back to main loop"""
app.tk.deletefilehandler(kernel.shell_stream.getsockopt(zmq.FD))
app.quit()
app.destroy()
del kernel.app_wrapper

def process_stream_events(*a, **kw):
"""fall back to main loop when there's a socket event"""
if stream.flush(limit=1):
app.tk.deletefilehandler(stream.getsockopt(zmq.FD))
app.quit()
app.destroy()
del kernel.app_wrapper
if kernel.shell_stream.flush(limit=1):
exit_loop()

# allow for scheduling exits from the loop in case a timeout needs to
# be set from the kernel level
def _schedule_exit(delay):
"""schedule fall back to main loop in [delay] seconds"""
app.after(int(1000 * delay), exit_loop)

loop_tk._schedule_exit = _schedule_exit

# For Tkinter, we create a Tk object and call its withdraw method.
kernel.app_wrapper = BasicAppWrapper(app)

notifier = partial(process_stream_events, kernel.shell_stream)
# seems to be needed for tk
notifier.__name__ = "notifier" # type:ignore[attr-defined]
app.tk.createfilehandler(kernel.shell_stream.getsockopt(zmq.FD), READABLE, notifier)
app.tk.createfilehandler(
kernel.shell_stream.getsockopt(zmq.FD), READABLE, process_stream_events
)
# schedule initial call after start
app.after(0, notifier)
app.after(0, process_stream_events)

app.mainloop()

Expand Down Expand Up @@ -560,6 +587,10 @@ def enable_gui(gui, kernel=None):
# User wants to turn off integration; clear any evidence if Qt was the last one.
if hasattr(kernel, "app"):
delattr(kernel, "app")
if hasattr(kernel, "_qt_notifier"):
delattr(kernel, "_qt_notifier")
if hasattr(kernel, "_qt_timer"):
delattr(kernel, "_qt_timer")
else:
if gui.startswith("qt"):
# Prepare the kernel here so any exceptions are displayed in the client.
Expand Down
26 changes: 19 additions & 7 deletions ipykernel/kernelbase.py
Original file line number Diff line number Diff line change
Expand Up @@ -472,7 +472,7 @@ def enter_eventloop(self):
self.log.info("Exiting as there is no eventloop")
return

def advance_eventloop():
async def advance_eventloop():
# check if eventloop changed:
if self.eventloop is not eventloop:
self.log.info("exiting eventloop %s", eventloop)
Expand All @@ -494,10 +494,13 @@ def advance_eventloop():

def schedule_next():
"""Schedule the next advance of the eventloop"""
# flush the eventloop every so often,
# giving us a chance to handle messages in the meantime
# call_later allows the io_loop to process other events if needed.
# Going through schedule_dispatch ensures all other dispatches on msg_queue
# are processed before we enter the eventloop, even if the previous dispatch was
# already consumed from the queue by process_one and the queue is
# technically empty.
self.log.debug("Scheduling eventloop advance")
self.io_loop.call_later(0.001, advance_eventloop)
self.io_loop.call_later(0.001, partial(self.schedule_dispatch, advance_eventloop))

# begin polling the eventloop
schedule_next()
Expand Down Expand Up @@ -1202,9 +1205,18 @@ async def stop_aborting():
# before we reset the flag
schedule_stop_aborting = partial(self.schedule_dispatch, stop_aborting)

# if we have a delay, give messages this long to arrive on the queue
# before we stop aborting requests
asyncio.get_event_loop().call_later(self.stop_on_error_timeout, schedule_stop_aborting)
if self.stop_on_error_timeout:
# if we have a delay, give messages this long to arrive on the queue
# before we stop aborting requests
self.io_loop.call_later(self.stop_on_error_timeout, schedule_stop_aborting)
# If we have an eventloop, it may interfere with the call_later above.
# If the loop has a _schedule_exit method, we call that so the loop exits
# after stop_on_error_timeout, returning to the main io_loop and letting
# the call_later fire.
if self.eventloop is not None and hasattr(self.eventloop, "_schedule_exit"):
self.eventloop._schedule_exit(self.stop_on_error_timeout + 0.01)
else:
schedule_stop_aborting()

def _send_abort_reply(self, stream, msg, idents):
"""Send a reply to an aborted request"""
Expand Down

0 comments on commit de2221c

Please sign in to comment.