Skip to content

Commit cb18269

Browse files
gh-138253: Fix compatibility of sub-interpreters queues with queue.Queue (GH-138256)
Add the block parameter in the put() and get() methods of the concurrent.interpreters queues for compatibility with the queue.Queue interface.
1 parent a2ba0a7 commit cb18269

File tree

3 files changed

+24
-5
lines changed

3 files changed

+24
-5
lines changed

Lib/concurrent/interpreters/_queues.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -170,13 +170,13 @@ def full(self):
170170
def qsize(self):
171171
return _queues.get_count(self._id)
172172

173-
def put(self, obj, timeout=None, *,
173+
def put(self, obj, block=True, timeout=None, *,
174174
unbounditems=None,
175175
_delay=10 / 1000, # 10 milliseconds
176176
):
177177
"""Add the object to the queue.
178178
179-
This blocks while the queue is full.
179+
If "block" is true, this blocks while the queue is full.
180180
181181
For most objects, the object received through Queue.get() will
182182
be a new one, equivalent to the original and not sharing any
@@ -209,6 +209,8 @@ def put(self, obj, timeout=None, *,
209209
If "unbounditems" is UNBOUND then it is returned by get() in place
210210
of the unbound item.
211211
"""
212+
if not block:
213+
return self.put_nowait(obj, unbounditems=unbounditems)
212214
if unbounditems is None:
213215
unboundop = -1
214216
else:
@@ -235,17 +237,19 @@ def put_nowait(self, obj, *, unbounditems=None):
235237
unboundop, = _serialize_unbound(unbounditems)
236238
_queues.put(self._id, obj, unboundop)
237239

238-
def get(self, timeout=None, *,
240+
def get(self, block=True, timeout=None, *,
239241
_delay=10 / 1000, # 10 milliseconds
240242
):
241243
"""Return the next object from the queue.
242244
243-
This blocks while the queue is empty.
245+
If "block" is true, this blocks while the queue is empty.
244246
245247
If the next item's original interpreter has been destroyed
246248
then the "next object" is determined by the value of the
247249
"unbounditems" argument to put().
248250
"""
251+
if not block:
252+
return self.get_nowait()
249253
if timeout is not None:
250254
timeout = int(timeout)
251255
if timeout < 0:

Lib/test/test_interpreters/test_queues.py

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
from concurrent.interpreters import _queues as queues, _crossinterp
1212
from .utils import _run_output, TestBase as _TestBase
1313

14-
14+
HUGE_TIMEOUT = 3600
1515
REPLACE = _crossinterp._UNBOUND_CONSTANT_TO_FLAG[_crossinterp.UNBOUND]
1616

1717

@@ -306,6 +306,8 @@ def test_put_timeout(self):
306306
queue.put(None)
307307
with self.assertRaises(queues.QueueFull):
308308
queue.put(None, timeout=0.1)
309+
with self.assertRaises(queues.QueueFull):
310+
queue.put(None, HUGE_TIMEOUT, 0.1)
309311
queue.get()
310312
queue.put(None)
311313

@@ -315,6 +317,10 @@ def test_put_nowait(self):
315317
queue.put_nowait(None)
316318
with self.assertRaises(queues.QueueFull):
317319
queue.put_nowait(None)
320+
with self.assertRaises(queues.QueueFull):
321+
queue.put(None, False)
322+
with self.assertRaises(queues.QueueFull):
323+
queue.put(None, False, timeout=HUGE_TIMEOUT)
318324
queue.get()
319325
queue.put_nowait(None)
320326

@@ -345,11 +351,17 @@ def test_get_timeout(self):
345351
queue = queues.create()
346352
with self.assertRaises(queues.QueueEmpty):
347353
queue.get(timeout=0.1)
354+
with self.assertRaises(queues.QueueEmpty):
355+
queue.get(HUGE_TIMEOUT, 0.1)
348356

349357
def test_get_nowait(self):
350358
queue = queues.create()
351359
with self.assertRaises(queues.QueueEmpty):
352360
queue.get_nowait()
361+
with self.assertRaises(queues.QueueEmpty):
362+
queue.get(False)
363+
with self.assertRaises(queues.QueueEmpty):
364+
queue.get(False, timeout=HUGE_TIMEOUT)
353365

354366
def test_put_get_full_fallback(self):
355367
expected = list(range(20))
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
Add the *block* parameter in the :meth:`!put` and :meth:`!get` methods
2+
of the :mod:`concurrent.interpreters` queues for compatibility with the
3+
:class:`queue.Queue` interface.

0 commit comments

Comments
 (0)