diff --git a/CHANGES.rst b/CHANGES.rst index 04d4418aa2..0c9df23ecc 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -1,3 +1,9 @@ +2.2.905 (2023-11-08) +==================== + +- Fixed loss of a QUIC connection due to an inappropriate check in ``conn.is_connected``. +- Separate saturated (multiplexed) connections from the main pool to a distinct one. + 2.2.904 (2023-11-06) ==================== diff --git a/src/urllib3/_version.py b/src/urllib3/_version.py index dc53ef1ee0..b0a0dd5315 100644 --- a/src/urllib3/_version.py +++ b/src/urllib3/_version.py @@ -1,4 +1,4 @@ # This file is protected via CODEOWNERS from __future__ import annotations -__version__ = "2.2.904" +__version__ = "2.2.905" diff --git a/src/urllib3/connection.py b/src/urllib3/connection.py index 6bdfe8b92f..0ba3162b85 100644 --- a/src/urllib3/connection.py +++ b/src/urllib3/connection.py @@ -250,6 +250,9 @@ def is_connected(self) -> bool: # wait_for_read: not functional with multiplexed connection! if self._promises or self._pending_responses: return True + # wait_for_read: not functional w/ UDP! + if self._svn == HttpVersion.h3: + return self._protocol is not None and self._protocol.has_expired() is False return not wait_for_read(self.sock, timeout=0.0) @property diff --git a/src/urllib3/connectionpool.py b/src/urllib3/connectionpool.py index ce9f04a2c2..ef3ef81029 100644 --- a/src/urllib3/connectionpool.py +++ b/src/urllib3/connectionpool.py @@ -198,6 +198,7 @@ def __init__( self._maxsize = maxsize self.pool: queue.LifoQueue[typing.Any] | None = self.QueueCls(maxsize) + self.saturated_pool: queue.LifoQueue[typing.Any] = self.QueueCls(maxsize) self.block = block self.proxy = _proxy @@ -291,11 +292,6 @@ def _get_conn( if conn and is_connection_dropped(conn): log.debug("Resetting dropped connection: %s", self.host) conn.close() - elif conn and conn.is_saturated is True and no_new is False: - try: - return self._get_conn(timeout=timeout, no_new=no_new) - finally: - self._put_conn(conn) return conn or self._new_conn() @@ -313,7 +309,27 @@ def _put_conn(self, conn: HTTPConnection | None) -> None: If the pool is closed, then the connection will be closed and discarded. """ + if self.pool is not None: + if conn: + if conn.is_multiplexed and conn.is_saturated: + try: + self.saturated_pool.put(conn, block=False) + except queue.Full: + self.num_connections -= 1 + conn.close() + return + elif ( + conn.is_multiplexed + and conn.is_idle + and self.pool.maxsize > self._maxsize + ): + print("???") + self.num_connections -= 1 + self.pool.maxsize -= 1 + conn.close() + return + try: self.pool.put(conn, block=False) return # Everything is dandy, done. @@ -344,7 +360,6 @@ def _put_conn(self, conn: HTTPConnection | None) -> None: ) self.pool.maxsize += 1 - return self._put_conn(conn) log.warning( @@ -352,10 +367,13 @@ def _put_conn(self, conn: HTTPConnection | None) -> None: self.host, self.pool.qsize(), ) + self.num_connections -= 1 + return # Connection never got put back into the pool, close it. if conn: conn.close() + self.num_connections -= 1 def _validate_conn(self, conn: HTTPConnection) -> None: """ @@ -408,16 +426,28 @@ def get_response( If none available, return None. """ connections = [] - response = None + response: HTTPResponse | None = None while True: try: - conn = self._get_conn(no_new=True) - except ValueError: + conn = self.saturated_pool.get(self.block) + connections.append(conn) + except queue.Empty: break - connections.append(conn) + if not connections: + while True: + try: + conn = self._get_conn(no_new=True) + except ValueError: + break + + connections.append(conn) + if not connections: + return None + + for conn in connections: if promise: if promise in conn: response = conn.getresponse(promise=promise) @@ -430,24 +460,8 @@ def get_response( continue break - forget_about_connections = [] - - # we exceptionally increased the size (block=False + multiplexed enabled) - if len(connections) > self._maxsize: - expect_drop_count = len(connections) - self._maxsize - - for conn in connections: - if conn.is_idle: - forget_about_connections.append(conn) - if len(forget_about_connections) >= expect_drop_count: - break - for conn in connections: - if conn not in forget_about_connections: - self._put_conn(conn) - - for conn in forget_about_connections: - conn.close() + self._put_conn(conn) if promise is not None and response is None: raise ValueError( diff --git a/src/urllib3/contrib/hface/protocols/http2/_h2.py b/src/urllib3/contrib/hface/protocols/http2/_h2.py index fc8e7ccb7a..b52d4afe64 100644 --- a/src/urllib3/contrib/hface/protocols/http2/_h2.py +++ b/src/urllib3/contrib/hface/protocols/http2/_h2.py @@ -66,7 +66,10 @@ def exceptions() -> tuple[type[BaseException], ...]: def is_available(self) -> bool: max_streams = self._connection.remote_settings.max_concurrent_streams - return self._terminated is False and max_streams > len(self._connection.streams) + return ( + self._terminated is False + and max_streams > self._connection.open_outbound_streams + ) def has_expired(self) -> bool: return self._terminated diff --git a/test/test_connectionpool.py b/test/test_connectionpool.py index c7ca45a39a..5672f9d017 100644 --- a/test/test_connectionpool.py +++ b/test/test_connectionpool.py @@ -252,7 +252,7 @@ def test_put_conn_when_pool_is_full_nonblocking( assert conn1 == pool._get_conn() assert conn2 != pool._get_conn() - assert pool.num_connections == 3 + assert pool.num_connections == 2 assert "Connection pool is full, discarding connection" in caplog.text assert "Connection pool size: 1" in caplog.text diff --git a/test/with_traefik/test_connectionpool_multiplexed.py b/test/with_traefik/test_connectionpool_multiplexed.py index c800450819..0da40bbc5e 100644 --- a/test/with_traefik/test_connectionpool_multiplexed.py +++ b/test/with_traefik/test_connectionpool_multiplexed.py @@ -95,6 +95,7 @@ def test_multiplexing_stream_saturation(self) -> None: promises.append(promise) assert len(promises) == 300 + assert pool.num_connections == 2 for i in range(300): response = pool.get_response() @@ -103,4 +104,4 @@ def test_multiplexing_stream_saturation(self) -> None: assert "/delay/1" in response.json()["url"] assert pool.get_response() is None - assert pool.pool is not None and pool.pool.qsize() == 2 + assert pool.pool is not None and pool.num_connections == 2