Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
2.2.905 (2023-11-08)
====================

- Fixed loss of a QUIC connection due to an inappropriate check in ``conn.is_connected``.
- Separate saturated (multiplexed) connections from the main pool to a distinct one.

2.2.904 (2023-11-06)
====================

Expand Down
2 changes: 1 addition & 1 deletion src/urllib3/_version.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# This file is protected via CODEOWNERS
from __future__ import annotations

__version__ = "2.2.904"
__version__ = "2.2.905"
3 changes: 3 additions & 0 deletions src/urllib3/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,9 @@ def is_connected(self) -> bool:
# wait_for_read: not functional with multiplexed connection!
if self._promises or self._pending_responses:
return True
# wait_for_read: not functional w/ UDP!
if self._svn == HttpVersion.h3:
return self._protocol is not None and self._protocol.has_expired() is False
return not wait_for_read(self.sock, timeout=0.0)

@property
Expand Down
67 changes: 40 additions & 27 deletions src/urllib3/connectionpool.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ def __init__(

self._maxsize = maxsize
self.pool: queue.LifoQueue[typing.Any] | None = self.QueueCls(maxsize)
self.saturated_pool: queue.LifoQueue[typing.Any] = self.QueueCls(maxsize)
self.block = block

self.proxy = _proxy
Expand Down Expand Up @@ -291,11 +292,6 @@ def _get_conn(
if conn and is_connection_dropped(conn):
log.debug("Resetting dropped connection: %s", self.host)
conn.close()
elif conn and conn.is_saturated is True and no_new is False:
try:
return self._get_conn(timeout=timeout, no_new=no_new)
finally:
self._put_conn(conn)

return conn or self._new_conn()

Expand All @@ -313,7 +309,26 @@ def _put_conn(self, conn: HTTPConnection | None) -> None:

If the pool is closed, then the connection will be closed and discarded.
"""

if self.pool is not None:
if conn:
if conn.is_multiplexed and conn.is_saturated:
try:
self.saturated_pool.put(conn, block=False)
except queue.Full:
self.num_connections -= 1
conn.close()
return
elif (
conn.is_multiplexed
and conn.is_idle
and self.pool.maxsize > self._maxsize
):
self.num_connections -= 1
self.pool.maxsize -= 1
conn.close()
return

try:
self.pool.put(conn, block=False)
return # Everything is dandy, done.
Expand Down Expand Up @@ -344,18 +359,20 @@ def _put_conn(self, conn: HTTPConnection | None) -> None:
)

self.pool.maxsize += 1

return self._put_conn(conn)

log.warning(
"Connection pool is full, discarding connection: %s. Connection pool size: %s",
self.host,
self.pool.qsize(),
)
self.num_connections -= 1
return

# Connection never got put back into the pool, close it.
if conn:
conn.close()
self.num_connections -= 1

def _validate_conn(self, conn: HTTPConnection) -> None:
"""
Expand Down Expand Up @@ -408,16 +425,28 @@ def get_response(
If none available, return None.
"""
connections = []
response = None
response: HTTPResponse | None = None

while True:
try:
conn = self._get_conn(no_new=True)
except ValueError:
conn = self.saturated_pool.get(self.block)
connections.append(conn)
except queue.Empty:
break

connections.append(conn)
if not connections:
while True:
try:
conn = self._get_conn(no_new=True)
except ValueError:
break

connections.append(conn)

if not connections:
return None

for conn in connections:
if promise:
if promise in conn:
response = conn.getresponse(promise=promise)
Expand All @@ -430,24 +459,8 @@ def get_response(
continue
break

forget_about_connections = []

# we exceptionally increased the size (block=False + multiplexed enabled)
if len(connections) > self._maxsize:
expect_drop_count = len(connections) - self._maxsize

for conn in connections:
if conn.is_idle:
forget_about_connections.append(conn)
if len(forget_about_connections) >= expect_drop_count:
break

for conn in connections:
if conn not in forget_about_connections:
self._put_conn(conn)

for conn in forget_about_connections:
conn.close()
self._put_conn(conn)

if promise is not None and response is None:
raise ValueError(
Expand Down
5 changes: 4 additions & 1 deletion src/urllib3/contrib/hface/protocols/http2/_h2.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,10 @@ def exceptions() -> tuple[type[BaseException], ...]:

def is_available(self) -> bool:
max_streams = self._connection.remote_settings.max_concurrent_streams
return self._terminated is False and max_streams > len(self._connection.streams)
return (
self._terminated is False
and max_streams > self._connection.open_outbound_streams
)

def has_expired(self) -> bool:
return self._terminated
Expand Down
2 changes: 1 addition & 1 deletion test/test_connectionpool.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ def test_put_conn_when_pool_is_full_nonblocking(
assert conn1 == pool._get_conn()
assert conn2 != pool._get_conn()

assert pool.num_connections == 3
assert pool.num_connections == 2
assert "Connection pool is full, discarding connection" in caplog.text
assert "Connection pool size: 1" in caplog.text

Expand Down
3 changes: 2 additions & 1 deletion test/with_traefik/test_connectionpool_multiplexed.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ def test_multiplexing_stream_saturation(self) -> None:
promises.append(promise)

assert len(promises) == 300
assert pool.num_connections == 2

for i in range(300):
response = pool.get_response()
Expand All @@ -103,4 +104,4 @@ def test_multiplexing_stream_saturation(self) -> None:
assert "/delay/1" in response.json()["url"]

assert pool.get_response() is None
assert pool.pool is not None and pool.pool.qsize() == 2
assert pool.pool is not None and pool.num_connections == 2