Skip to content

Commit f4ef4ef

Browse files
committed
Replace thread add_task with start_soon
1 parent 1fe492a commit f4ef4ef

File tree

5 files changed

+33
-36
lines changed

5 files changed

+33
-36
lines changed

ipykernel/kernelbase.py

+9-8
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import uuid
1717
import warnings
1818
from datetime import datetime
19+
from functools import partial
1920
from signal import SIGINT, SIGTERM, Signals
2021

2122
from .thread import CONTROL_THREAD_NAME
@@ -536,7 +537,7 @@ async def start(self, *, task_status: TaskStatus = TASK_STATUS_IGNORED) -> None:
536537
self.control_stop = threading.Event()
537538
if not self._is_test and self.control_socket is not None:
538539
if self.control_thread:
539-
self.control_thread.add_task(self.control_main)
540+
self.control_thread.start_soon(self.control_main)
540541
self.control_thread.start()
541542
else:
542543
tg.start_soon(self.control_main)
@@ -551,11 +552,11 @@ async def start(self, *, task_status: TaskStatus = TASK_STATUS_IGNORED) -> None:
551552

552553
# Assign tasks to and start shell channel thread.
553554
manager = self.shell_channel_thread.manager
554-
self.shell_channel_thread.add_task(self.shell_channel_thread_main)
555-
self.shell_channel_thread.add_task(
556-
manager.listen_from_control, self.shell_main, self.shell_channel_thread
555+
self.shell_channel_thread.start_soon(self.shell_channel_thread_main)
556+
self.shell_channel_thread.start_soon(
557+
partial(manager.listen_from_control, self.shell_main, self.shell_channel_thread)
557558
)
558-
self.shell_channel_thread.add_task(manager.listen_from_subshells)
559+
self.shell_channel_thread.start_soon(manager.listen_from_subshells)
559560
self.shell_channel_thread.start()
560561
else:
561562
if not self._is_test and self.shell_socket is not None:
@@ -1085,7 +1086,7 @@ async def create_subshell_request(self, socket, ident, parent) -> None:
10851086
# This should only be called in the control thread if it exists.
10861087
# Request is passed to shell channel thread to process.
10871088
other_socket = await self.shell_channel_thread.manager.get_control_other_socket(
1088-
self.control_thread.get_task_group()
1089+
self.control_thread
10891090
)
10901091
await other_socket.asend_json({"type": "create"})
10911092
reply = await other_socket.arecv_json()
@@ -1109,7 +1110,7 @@ async def delete_subshell_request(self, socket, ident, parent) -> None:
11091110
# This should only be called in the control thread if it exists.
11101111
# Request is passed to shell channel thread to process.
11111112
other_socket = await self.shell_channel_thread.manager.get_control_other_socket(
1112-
self.control_thread.get_task_group()
1113+
self.control_thread
11131114
)
11141115
await other_socket.asend_json({"type": "delete", "subshell_id": subshell_id})
11151116
reply = await other_socket.arecv_json()
@@ -1126,7 +1127,7 @@ async def list_subshell_request(self, socket, ident, parent) -> None:
11261127
# This should only be called in the control thread if it exists.
11271128
# Request is passed to shell channel thread to process.
11281129
other_socket = await self.shell_channel_thread.manager.get_control_other_socket(
1129-
self.control_thread.get_task_group()
1130+
self.control_thread
11301131
)
11311132
await other_socket.asend_json({"type": "list"})
11321133
reply = await other_socket.arecv_json()

ipykernel/shellchannel.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ def __init__(
2828
def manager(self) -> SubshellManager:
2929
# Lazy initialisation.
3030
if self._manager is None:
31-
self._manager = SubshellManager(self._context, self._shell_socket, self.get_task_group)
31+
self._manager = SubshellManager(self._context, self._shell_socket)
3232
return self._manager
3333

3434
def run(self) -> None:

ipykernel/subshell.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -25,13 +25,13 @@ async def create_pair_socket(
2525
) -> None:
2626
"""Create inproc PAIR socket, for communication with shell channel thread.
2727
28-
Should be called from this thread, so usually via add_task before the
28+
Should be called from this thread, so usually via start_soon before the
2929
thread is started.
3030
"""
3131
assert current_thread() == self
3232
self._pair_socket = zmq_anyio.Socket(context, zmq.PAIR)
3333
self._pair_socket.connect(address)
34-
self.add_task(self._pair_socket.start)
34+
self.start_soon(self._pair_socket.start)
3535

3636
def run(self) -> None:
3737
try:

ipykernel/subshell_manager.py

+6-7
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import typing as t
88
import uuid
99
from dataclasses import dataclass
10+
from functools import partial
1011
from threading import Lock, current_thread, main_thread
1112
from typing import Callable
1213

@@ -44,13 +45,11 @@ def __init__(
4445
self,
4546
context: zmq.Context, # type: ignore[type-arg]
4647
shell_socket: zmq_anyio.Socket,
47-
get_task_group: Callable[[], TaskGroup],
4848
):
4949
assert current_thread() == main_thread()
5050

5151
self._context: zmq.Context = context # type: ignore[type-arg]
5252
self._shell_socket = shell_socket
53-
self._get_task_group = get_task_group
5453
self._cache: dict[str, Subshell] = {}
5554
self._lock_cache = Lock()
5655
self._lock_shell_socket = Lock()
@@ -91,9 +90,9 @@ def close(self) -> None:
9190
break
9291
self._stop_subshell(subshell)
9392

94-
async def get_control_other_socket(self, task_group: TaskGroup) -> zmq_anyio.Socket:
93+
async def get_control_other_socket(self, thread: BaseThread) -> zmq_anyio.Socket:
9594
if not self._control_other_socket.started.is_set():
96-
task_group.start_soon(self._control_other_socket.start)
95+
thread.start_soon(self._control_other_socket.start)
9796
await self._control_other_socket.started.wait()
9897
return self._control_other_socket
9998

@@ -134,7 +133,7 @@ async def listen_from_control(self, subshell_task: t.Any, thread: BaseThread) ->
134133
assert current_thread().name == SHELL_CHANNEL_THREAD_NAME
135134

136135
if not self._control_shell_channel_socket.started.is_set():
137-
thread.get_task_group().start_soon(self._control_shell_channel_socket.start)
136+
thread.start_soon(self._control_shell_channel_socket.start)
138137
await self._control_shell_channel_socket.started.wait()
139138
socket = self._control_shell_channel_socket
140139
while True:
@@ -200,8 +199,8 @@ async def _create_subshell(self, subshell_task: t.Any) -> str:
200199
await self._send_stream.send(subshell_id)
201200

202201
address = self._get_inproc_socket_address(subshell_id)
203-
thread.add_task(thread.create_pair_socket, self._context, address)
204-
thread.add_task(subshell_task, subshell_id)
202+
thread.start_soon(partial(thread.create_pair_socket, self._context, address))
203+
thread.start_soon(partial(subshell_task, subshell_id))
205204
thread.start()
206205

207206
return subshell_id

ipykernel/thread.py

+15-18
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
"""Base class for threads."""
2-
import typing as t
3-
from threading import Event, Thread
2+
import queue
3+
from typing import Awaitable, Callable
4+
from threading import Thread
45

56
from anyio import create_task_group, run, to_thread
67
from anyio.abc import TaskGroup
@@ -17,31 +18,27 @@ def __init__(self, **kwargs):
1718
super().__init__(**kwargs)
1819
self.pydev_do_not_trace = True
1920
self.is_pydev_daemon_thread = True
20-
self.__stop = Event()
21-
self._tasks_and_args: list[tuple[t.Any, t.Any]] = []
21+
self._tasks = queue.Queue()
2222

23-
def get_task_group(self) -> TaskGroup:
24-
return self._task_group
23+
def start_soon(self, task: Callable[[], Awaitable[None]] | None) -> None:
24+
self._tasks.put(task)
2525

26-
def add_task(self, task: t.Any, *args: t.Any) -> None:
27-
# May only add tasks before the thread is started.
28-
self._tasks_and_args.append((task, args))
29-
30-
def run(self) -> t.Any:
26+
def run(self) -> None:
3127
"""Run the thread."""
32-
return run(self._main)
28+
run(self._main)
3329

3430
async def _main(self) -> None:
3531
async with create_task_group() as tg:
36-
self._task_group = tg
37-
for task, args in self._tasks_and_args:
38-
tg.start_soon(task, *args)
39-
await to_thread.run_sync(self.__stop.wait)
40-
tg.cancel_scope.cancel()
32+
while True:
33+
task = await to_thread.run_sync(self._tasks.get)
34+
if task is None:
35+
tg.cancel_scope.cancel()
36+
else:
37+
tg.start_soon(task)
4138

4239
def stop(self) -> None:
4340
"""Stop the thread.
4441
4542
This method is threadsafe.
4643
"""
47-
self.__stop.set()
44+
self._tasks.put(None)

0 commit comments

Comments
 (0)