Skip to content

Commit 87f30ab

Browse files
committed
Replace thread add_task with start_soon
1 parent 1fe492a commit 87f30ab

File tree

5 files changed

+36
-37
lines changed

5 files changed

+36
-37
lines changed

ipykernel/kernelbase.py

+9-8
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import uuid
1717
import warnings
1818
from datetime import datetime
19+
from functools import partial
1920
from signal import SIGINT, SIGTERM, Signals
2021

2122
from .thread import CONTROL_THREAD_NAME
@@ -536,7 +537,7 @@ async def start(self, *, task_status: TaskStatus = TASK_STATUS_IGNORED) -> None:
536537
self.control_stop = threading.Event()
537538
if not self._is_test and self.control_socket is not None:
538539
if self.control_thread:
539-
self.control_thread.add_task(self.control_main)
540+
self.control_thread.start_soon(self.control_main)
540541
self.control_thread.start()
541542
else:
542543
tg.start_soon(self.control_main)
@@ -551,11 +552,11 @@ async def start(self, *, task_status: TaskStatus = TASK_STATUS_IGNORED) -> None:
551552

552553
# Assign tasks to and start shell channel thread.
553554
manager = self.shell_channel_thread.manager
554-
self.shell_channel_thread.add_task(self.shell_channel_thread_main)
555-
self.shell_channel_thread.add_task(
556-
manager.listen_from_control, self.shell_main, self.shell_channel_thread
555+
self.shell_channel_thread.start_soon(self.shell_channel_thread_main)
556+
self.shell_channel_thread.start_soon(
557+
partial(manager.listen_from_control, self.shell_main, self.shell_channel_thread)
557558
)
558-
self.shell_channel_thread.add_task(manager.listen_from_subshells)
559+
self.shell_channel_thread.start_soon(manager.listen_from_subshells)
559560
self.shell_channel_thread.start()
560561
else:
561562
if not self._is_test and self.shell_socket is not None:
@@ -1085,7 +1086,7 @@ async def create_subshell_request(self, socket, ident, parent) -> None:
10851086
# This should only be called in the control thread if it exists.
10861087
# Request is passed to shell channel thread to process.
10871088
other_socket = await self.shell_channel_thread.manager.get_control_other_socket(
1088-
self.control_thread.get_task_group()
1089+
self.control_thread
10891090
)
10901091
await other_socket.asend_json({"type": "create"})
10911092
reply = await other_socket.arecv_json()
@@ -1109,7 +1110,7 @@ async def delete_subshell_request(self, socket, ident, parent) -> None:
11091110
# This should only be called in the control thread if it exists.
11101111
# Request is passed to shell channel thread to process.
11111112
other_socket = await self.shell_channel_thread.manager.get_control_other_socket(
1112-
self.control_thread.get_task_group()
1113+
self.control_thread
11131114
)
11141115
await other_socket.asend_json({"type": "delete", "subshell_id": subshell_id})
11151116
reply = await other_socket.arecv_json()
@@ -1126,7 +1127,7 @@ async def list_subshell_request(self, socket, ident, parent) -> None:
11261127
# This should only be called in the control thread if it exists.
11271128
# Request is passed to shell channel thread to process.
11281129
other_socket = await self.shell_channel_thread.manager.get_control_other_socket(
1129-
self.control_thread.get_task_group()
1130+
self.control_thread
11301131
)
11311132
await other_socket.asend_json({"type": "list"})
11321133
reply = await other_socket.arecv_json()

ipykernel/shellchannel.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ def __init__(
2828
def manager(self) -> SubshellManager:
2929
# Lazy initialisation.
3030
if self._manager is None:
31-
self._manager = SubshellManager(self._context, self._shell_socket, self.get_task_group)
31+
self._manager = SubshellManager(self._context, self._shell_socket)
3232
return self._manager
3333

3434
def run(self) -> None:

ipykernel/subshell.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -25,13 +25,13 @@ async def create_pair_socket(
2525
) -> None:
2626
"""Create inproc PAIR socket, for communication with shell channel thread.
2727
28-
Should be called from this thread, so usually via add_task before the
28+
Should be called from this thread, so usually via start_soon before the
2929
thread is started.
3030
"""
3131
assert current_thread() == self
3232
self._pair_socket = zmq_anyio.Socket(context, zmq.PAIR)
3333
self._pair_socket.connect(address)
34-
self.add_task(self._pair_socket.start)
34+
self.start_soon(self._pair_socket.start)
3535

3636
def run(self) -> None:
3737
try:

ipykernel/subshell_manager.py

+6-8
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,8 @@
77
import typing as t
88
import uuid
99
from dataclasses import dataclass
10+
from functools import partial
1011
from threading import Lock, current_thread, main_thread
11-
from typing import Callable
1212

1313
import zmq
1414
import zmq_anyio
@@ -44,13 +44,11 @@ def __init__(
4444
self,
4545
context: zmq.Context, # type: ignore[type-arg]
4646
shell_socket: zmq_anyio.Socket,
47-
get_task_group: Callable[[], TaskGroup],
4847
):
4948
assert current_thread() == main_thread()
5049

5150
self._context: zmq.Context = context # type: ignore[type-arg]
5251
self._shell_socket = shell_socket
53-
self._get_task_group = get_task_group
5452
self._cache: dict[str, Subshell] = {}
5553
self._lock_cache = Lock()
5654
self._lock_shell_socket = Lock()
@@ -91,9 +89,9 @@ def close(self) -> None:
9189
break
9290
self._stop_subshell(subshell)
9391

94-
async def get_control_other_socket(self, task_group: TaskGroup) -> zmq_anyio.Socket:
92+
async def get_control_other_socket(self, thread: BaseThread) -> zmq_anyio.Socket:
9593
if not self._control_other_socket.started.is_set():
96-
task_group.start_soon(self._control_other_socket.start)
94+
thread.start_soon(self._control_other_socket.start)
9795
await self._control_other_socket.started.wait()
9896
return self._control_other_socket
9997

@@ -134,7 +132,7 @@ async def listen_from_control(self, subshell_task: t.Any, thread: BaseThread) ->
134132
assert current_thread().name == SHELL_CHANNEL_THREAD_NAME
135133

136134
if not self._control_shell_channel_socket.started.is_set():
137-
thread.get_task_group().start_soon(self._control_shell_channel_socket.start)
135+
thread.start_soon(self._control_shell_channel_socket.start)
138136
await self._control_shell_channel_socket.started.wait()
139137
socket = self._control_shell_channel_socket
140138
while True:
@@ -200,8 +198,8 @@ async def _create_subshell(self, subshell_task: t.Any) -> str:
200198
await self._send_stream.send(subshell_id)
201199

202200
address = self._get_inproc_socket_address(subshell_id)
203-
thread.add_task(thread.create_pair_socket, self._context, address)
204-
thread.add_task(subshell_task, subshell_id)
201+
thread.start_soon(partial(thread.create_pair_socket, self._context, address))
202+
thread.start_soon(partial(subshell_task, subshell_id))
205203
thread.start()
206204

207205
return subshell_id

ipykernel/thread.py

+18-18
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
11
"""Base class for threads."""
2-
import typing as t
3-
from threading import Event, Thread
2+
from __future__ import annotations
3+
4+
import queue
5+
from collections.abc import Awaitable
6+
from threading import Thread
7+
from typing import Callable
48

59
from anyio import create_task_group, run, to_thread
6-
from anyio.abc import TaskGroup
710

811
CONTROL_THREAD_NAME = "Control"
912
SHELL_CHANNEL_THREAD_NAME = "Shell channel"
@@ -17,31 +20,28 @@ def __init__(self, **kwargs):
1720
super().__init__(**kwargs)
1821
self.pydev_do_not_trace = True
1922
self.is_pydev_daemon_thread = True
20-
self.__stop = Event()
21-
self._tasks_and_args: list[tuple[t.Any, t.Any]] = []
22-
23-
def get_task_group(self) -> TaskGroup:
24-
return self._task_group
23+
self._tasks = queue.Queue()
2524

26-
def add_task(self, task: t.Any, *args: t.Any) -> None:
27-
# May only add tasks before the thread is started.
28-
self._tasks_and_args.append((task, args))
25+
def start_soon(self, task: Callable[[], Awaitable[None]] | None) -> None:
26+
self._tasks.put(task)
2927

30-
def run(self) -> t.Any:
28+
def run(self) -> None:
3129
"""Run the thread."""
32-
return run(self._main)
30+
run(self._main)
3331

3432
async def _main(self) -> None:
3533
async with create_task_group() as tg:
36-
self._task_group = tg
37-
for task, args in self._tasks_and_args:
38-
tg.start_soon(task, *args)
39-
await to_thread.run_sync(self.__stop.wait)
34+
while True:
35+
task = await to_thread.run_sync(self._tasks.get)
36+
if task is None:
37+
break
38+
else:
39+
tg.start_soon(task)
4040
tg.cancel_scope.cancel()
4141

4242
def stop(self) -> None:
4343
"""Stop the thread.
4444
4545
This method is threadsafe.
4646
"""
47-
self.__stop.set()
47+
self._tasks.put(None)

0 commit comments

Comments
 (0)