Skip to content

Commit

Permalink
🔖 Release 2.2.905
Browse files Browse the repository at this point in the history
- Fixed loss of a QUIC connection due to an inappropriate check in ``conn.is_connected``.
- Separate saturated (multiplexed) connections from the main pool to a distinct one.
  • Loading branch information
Ousret committed Nov 8, 2023
1 parent 10e844f commit c410ca0
Show file tree
Hide file tree
Showing 7 changed files with 58 additions and 31 deletions.
6 changes: 6 additions & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
2.2.905 (2023-11-08)
====================

- Fixed loss of a QUIC connection due to an inappropriate check in ``conn.is_connected``.
- Separate saturated (multiplexed) connections from the main pool to a distinct one.

2.2.904 (2023-11-06)
====================

Expand Down
2 changes: 1 addition & 1 deletion src/urllib3/_version.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# This file is protected via CODEOWNERS
from __future__ import annotations

__version__ = "2.2.904"
__version__ = "2.2.905"
3 changes: 3 additions & 0 deletions src/urllib3/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,9 @@ def is_connected(self) -> bool:
# wait_for_read: not functional with multiplexed connection!
if self._promises or self._pending_responses:
return True
# wait_for_read: not functional w/ UDP!
if self._svn == HttpVersion.h3:
return self._protocol is not None and self._protocol.has_expired() is False
return not wait_for_read(self.sock, timeout=0.0)

@property
Expand Down
68 changes: 41 additions & 27 deletions src/urllib3/connectionpool.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ def __init__(

self._maxsize = maxsize
self.pool: queue.LifoQueue[typing.Any] | None = self.QueueCls(maxsize)
self.saturated_pool: queue.LifoQueue[typing.Any] = self.QueueCls(maxsize)
self.block = block

self.proxy = _proxy
Expand Down Expand Up @@ -291,11 +292,6 @@ def _get_conn(
if conn and is_connection_dropped(conn):
log.debug("Resetting dropped connection: %s", self.host)
conn.close()
elif conn and conn.is_saturated is True and no_new is False:
try:
return self._get_conn(timeout=timeout, no_new=no_new)
finally:
self._put_conn(conn)

return conn or self._new_conn()

Expand All @@ -313,7 +309,27 @@ def _put_conn(self, conn: HTTPConnection | None) -> None:
If the pool is closed, then the connection will be closed and discarded.
"""

if self.pool is not None:
if conn:
if conn.is_multiplexed and conn.is_saturated:
try:
self.saturated_pool.put(conn, block=False)
except queue.Full:
self.num_connections -= 1
conn.close()
return
elif (
conn.is_multiplexed
and conn.is_idle
and self.pool.maxsize > self._maxsize
):
print("???")
self.num_connections -= 1
self.pool.maxsize -= 1
conn.close()
return

try:
self.pool.put(conn, block=False)
return # Everything is dandy, done.
Expand Down Expand Up @@ -344,18 +360,20 @@ def _put_conn(self, conn: HTTPConnection | None) -> None:
)

self.pool.maxsize += 1

return self._put_conn(conn)

log.warning(
"Connection pool is full, discarding connection: %s. Connection pool size: %s",
self.host,
self.pool.qsize(),
)
self.num_connections -= 1
return

# Connection never got put back into the pool, close it.
if conn:
conn.close()
self.num_connections -= 1

def _validate_conn(self, conn: HTTPConnection) -> None:
"""
Expand Down Expand Up @@ -408,16 +426,28 @@ def get_response(
If none available, return None.
"""
connections = []
response = None
response: HTTPResponse | None = None

while True:
try:
conn = self._get_conn(no_new=True)
except ValueError:
conn = self.saturated_pool.get(self.block)
connections.append(conn)
except queue.Empty:
break

connections.append(conn)
if not connections:
while True:
try:
conn = self._get_conn(no_new=True)
except ValueError:
break

connections.append(conn)

if not connections:
return None

for conn in connections:
if promise:
if promise in conn:
response = conn.getresponse(promise=promise)
Expand All @@ -430,24 +460,8 @@ def get_response(
continue
break

forget_about_connections = []

# we exceptionally increased the size (block=False + multiplexed enabled)
if len(connections) > self._maxsize:
expect_drop_count = len(connections) - self._maxsize

for conn in connections:
if conn.is_idle:
forget_about_connections.append(conn)
if len(forget_about_connections) >= expect_drop_count:
break

for conn in connections:
if conn not in forget_about_connections:
self._put_conn(conn)

for conn in forget_about_connections:
conn.close()
self._put_conn(conn)

if promise is not None and response is None:
raise ValueError(
Expand Down
5 changes: 4 additions & 1 deletion src/urllib3/contrib/hface/protocols/http2/_h2.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,10 @@ def exceptions() -> tuple[type[BaseException], ...]:

def is_available(self) -> bool:
max_streams = self._connection.remote_settings.max_concurrent_streams
return self._terminated is False and max_streams > len(self._connection.streams)
return (
self._terminated is False
and max_streams > self._connection.open_outbound_streams
)

def has_expired(self) -> bool:
return self._terminated
Expand Down
2 changes: 1 addition & 1 deletion test/test_connectionpool.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ def test_put_conn_when_pool_is_full_nonblocking(
assert conn1 == pool._get_conn()
assert conn2 != pool._get_conn()

assert pool.num_connections == 3
assert pool.num_connections == 2
assert "Connection pool is full, discarding connection" in caplog.text
assert "Connection pool size: 1" in caplog.text

Expand Down
3 changes: 2 additions & 1 deletion test/with_traefik/test_connectionpool_multiplexed.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ def test_multiplexing_stream_saturation(self) -> None:
promises.append(promise)

assert len(promises) == 300
assert pool.num_connections == 2

for i in range(300):
response = pool.get_response()
Expand All @@ -103,4 +104,4 @@ def test_multiplexing_stream_saturation(self) -> None:
assert "/delay/1" in response.json()["url"]

assert pool.get_response() is None
assert pool.pool is not None and pool.pool.qsize() == 2
assert pool.pool is not None and pool.num_connections == 2

0 comments on commit c410ca0

Please sign in to comment.