From de2221ce155668c343084fde37b77fb6b1671dc9 Mon Sep 17 00:00:00 2001 From: Jakub Dranczewski Date: Sun, 25 Feb 2024 18:57:10 +0000 Subject: [PATCH] Eventloop scheduling improvements for stop_on_error_timeout and schedule_next (#1212) Co-authored-by: Steven Silvester --- ipykernel/eventloops.py | 69 +++++++++++++++++++++++++++++------------ ipykernel/kernelbase.py | 26 +++++++++++----- 2 files changed, 69 insertions(+), 26 deletions(-) diff --git a/ipykernel/eventloops.py b/ipykernel/eventloops.py index c3ddd301..853738d9 100644 --- a/ipykernel/eventloops.py +++ b/ipykernel/eventloops.py @@ -82,6 +82,11 @@ def _notify_stream_qt(kernel): def enum_helper(name): return operator.attrgetter(name.rpartition(".")[0])(sys.modules[QtCore.__package__]) + def exit_loop(): + """fall back to main loop""" + kernel._qt_notifier.setEnabled(False) + kernel.app.qt_event_loop.quit() + def process_stream_events(): """fall back to main loop when there's a socket event""" # call flush to ensure that the stream doesn't lose events @@ -89,8 +94,7 @@ def process_stream_events(): # flush returns the number of events consumed. # if there were any, wake it up if kernel.shell_stream.flush(limit=1): - kernel._qt_notifier.setEnabled(False) - kernel.app.qt_event_loop.quit() + exit_loop() if not hasattr(kernel, "_qt_notifier"): fd = kernel.shell_stream.getsockopt(zmq.FD) @@ -101,6 +105,23 @@ def process_stream_events(): else: kernel._qt_notifier.setEnabled(True) + # allow for scheduling exits from the loop in case a timeout needs to + # be set from the kernel level + def _schedule_exit(delay): + """schedule fall back to main loop in [delay] seconds""" + # The signatures of QtCore.QTimer.singleShot are inconsistent between PySide and PyQt + # if setting the TimerType, so we create a timer explicitly and store it + # to avoid a memory leak. + # PreciseTimer is needed so we exit after _at least_ the specified delay, not within 5% of it + if not hasattr(kernel, "_qt_timer"): + kernel._qt_timer = QtCore.QTimer(kernel.app) + kernel._qt_timer.setSingleShot(True) + kernel._qt_timer.setTimerType(enum_helper("QtCore.Qt.TimerType").PreciseTimer) + kernel._qt_timer.timeout.connect(exit_loop) + kernel._qt_timer.start(int(1000 * delay)) + + loop_qt._schedule_exit = _schedule_exit + # there may already be unprocessed events waiting. # these events will not wake zmq's edge-triggered FD # since edge-triggered notification only occurs on new i/o activity. @@ -108,11 +129,7 @@ def process_stream_events(): # so we start in a clean state ensuring that any new i/o events will notify. # schedule first call on the eventloop as soon as it's running, # so we don't block here processing events - if not hasattr(kernel, "_qt_timer"): - kernel._qt_timer = QtCore.QTimer(kernel.app) - kernel._qt_timer.setSingleShot(True) - kernel._qt_timer.timeout.connect(process_stream_events) - kernel._qt_timer.start(0) + QtCore.QTimer.singleShot(0, process_stream_events) @register_integration("qt", "qt5", "qt6") @@ -229,23 +246,33 @@ def __init__(self, app): self.app = app self.app.withdraw() - def process_stream_events(stream, *a, **kw): + def exit_loop(): + """fall back to main loop""" + app.tk.deletefilehandler(kernel.shell_stream.getsockopt(zmq.FD)) + app.quit() + app.destroy() + del kernel.app_wrapper + + def process_stream_events(*a, **kw): """fall back to main loop when there's a socket event""" - if stream.flush(limit=1): - app.tk.deletefilehandler(stream.getsockopt(zmq.FD)) - app.quit() - app.destroy() - del kernel.app_wrapper + if kernel.shell_stream.flush(limit=1): + exit_loop() + + # allow for scheduling exits from the loop in case a timeout needs to + # be set from the kernel level + def _schedule_exit(delay): + """schedule fall back to main loop in [delay] seconds""" + app.after(int(1000 * delay), exit_loop) + + loop_tk._schedule_exit = _schedule_exit # For Tkinter, we create a Tk object and call its withdraw method. kernel.app_wrapper = BasicAppWrapper(app) - - notifier = partial(process_stream_events, kernel.shell_stream) - # seems to be needed for tk - notifier.__name__ = "notifier" # type:ignore[attr-defined] - app.tk.createfilehandler(kernel.shell_stream.getsockopt(zmq.FD), READABLE, notifier) + app.tk.createfilehandler( + kernel.shell_stream.getsockopt(zmq.FD), READABLE, process_stream_events + ) # schedule initial call after start - app.after(0, notifier) + app.after(0, process_stream_events) app.mainloop() @@ -560,6 +587,10 @@ def enable_gui(gui, kernel=None): # User wants to turn off integration; clear any evidence if Qt was the last one. if hasattr(kernel, "app"): delattr(kernel, "app") + if hasattr(kernel, "_qt_notifier"): + delattr(kernel, "_qt_notifier") + if hasattr(kernel, "_qt_timer"): + delattr(kernel, "_qt_timer") else: if gui.startswith("qt"): # Prepare the kernel here so any exceptions are displayed in the client. diff --git a/ipykernel/kernelbase.py b/ipykernel/kernelbase.py index 4bf2dee3..a24e3238 100644 --- a/ipykernel/kernelbase.py +++ b/ipykernel/kernelbase.py @@ -472,7 +472,7 @@ def enter_eventloop(self): self.log.info("Exiting as there is no eventloop") return - def advance_eventloop(): + async def advance_eventloop(): # check if eventloop changed: if self.eventloop is not eventloop: self.log.info("exiting eventloop %s", eventloop) @@ -494,10 +494,13 @@ def advance_eventloop(): def schedule_next(): """Schedule the next advance of the eventloop""" - # flush the eventloop every so often, - # giving us a chance to handle messages in the meantime + # call_later allows the io_loop to process other events if needed. + # Going through schedule_dispatch ensures all other dispatches on msg_queue + # are processed before we enter the eventloop, even if the previous dispatch was + # already consumed from the queue by process_one and the queue is + # technically empty. self.log.debug("Scheduling eventloop advance") - self.io_loop.call_later(0.001, advance_eventloop) + self.io_loop.call_later(0.001, partial(self.schedule_dispatch, advance_eventloop)) # begin polling the eventloop schedule_next() @@ -1202,9 +1205,18 @@ async def stop_aborting(): # before we reset the flag schedule_stop_aborting = partial(self.schedule_dispatch, stop_aborting) - # if we have a delay, give messages this long to arrive on the queue - # before we stop aborting requests - asyncio.get_event_loop().call_later(self.stop_on_error_timeout, schedule_stop_aborting) + if self.stop_on_error_timeout: + # if we have a delay, give messages this long to arrive on the queue + # before we stop aborting requests + self.io_loop.call_later(self.stop_on_error_timeout, schedule_stop_aborting) + # If we have an eventloop, it may interfere with the call_later above. + # If the loop has a _schedule_exit method, we call that so the loop exits + # after stop_on_error_timeout, returning to the main io_loop and letting + # the call_later fire. + if self.eventloop is not None and hasattr(self.eventloop, "_schedule_exit"): + self.eventloop._schedule_exit(self.stop_on_error_timeout + 0.01) + else: + schedule_stop_aborting() def _send_abort_reply(self, stream, msg, idents): """Send a reply to an aborted request"""