Skip to content

Commit

Permalink
❇️ Discrete idle connection watcher scheduler (#162)
Browse files Browse the repository at this point in the history
2.11.900 (2024-10-21)
=====================

- Added a discrete task for each instantiated ``ConnectionPool`` to
watch for unsolicited incoming data.
This improves the fix shipped in v2.10.906 and avoid having to recycle
your multiplexed connection in idle moments.
A new keyword argument is supported in your PoolManager configuration,
namely ``background_watch_delay``.
This parameter takes a int or float as the delay between checks. Set it
to None to void this background task.
Anything lower than ``0.01`` will be interpreted as None, therefor
disabling the discrete watch.
- Added managed keepalive for HTTP/2 and HTTP/3 over QUIC. A new keyword
argument, named ``keepalive_delay`` that
takes a value expressed in seconds for how long urllib3-future should
automatically keep the connection alive.
This is done in direct extension to our "discrete task" mentioned just
before. We will send ``PING`` frame
automatically to the remote peer every 60s by default (after idle for
60s to be clear). The window delay for
sending a ``PING`` is configurable via the ``keepalive_idle_window``
parameter. Learn more about this in our
  documentation.
  • Loading branch information
Ousret authored Oct 21, 2024
2 parents bc05f55 + 30dc52b commit 1446db4
Show file tree
Hide file tree
Showing 30 changed files with 1,138 additions and 21 deletions.
17 changes: 17 additions & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
@@ -1,3 +1,20 @@
2.11.900 (2024-10-21)
=====================

- Added a discrete task for each instantiated ``ConnectionPool`` to watch for unsolicited incoming data.
This improves the fix shipped in v2.10.906 and avoid having to recycle your multiplexed connection in idle moments.
A new keyword argument is supported in your PoolManager configuration, namely ``background_watch_delay``.
This parameter takes a int or float as the delay between checks. Set it to None to void this background task.
Anything lower than ``0.01`` will be interpreted as None, therefor disabling the discrete watch.
- Added managed keepalive for HTTP/2 and HTTP/3 over QUIC. A new keyword argument, named ``keepalive_delay`` that
takes a value expressed in seconds for how long urllib3-future should automatically keep the connection alive.
This is done in direct extension to our "discrete task" mentioned just before. We will send ``PING`` frame
automatically to the remote peer every 60s by default (after idle for 60s to be clear). The window delay for
sending a ``PING`` is configurable via the ``keepalive_idle_window`` parameter. Learn more about this in our
documentation.
- Fixed evaluation of ``fp`` in our ``LowLevelResponse`` instance to raise ``AttributeError`` when it cannot be
accessed. This will help with ``cachecontrol[filecache]`` way of determining if response was consumed entirely.

2.10.906 (2024-10-17)
=====================

Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
- File uploads with multipart encoding.
- DNS over UDP, TLS, QUIC, or HTTPS. DNSSEC protected.
- Helpers for retrying requests and dealing with HTTP redirects.
- Automatic Keep-Alive for HTTP/1.1, HTTP/2, and HTTP/3.
- Support for gzip, deflate, brotli, and zstd encoding.
- Support for Python/PyPy 3.7+, no compromise.
- Automatic Connection Upgrade / Downgrade.
Expand Down
2 changes: 1 addition & 1 deletion docker-compose.win.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
services:
proxy:
image: traefik:v3.1.4-windowsservercore-ltsc2022
image: traefik:v3.1.6-windowsservercore-ltsc2022
restart: unless-stopped
healthcheck:
test: [ "CMD", "traefik" ,"healthcheck", "--ping" ]
Expand Down
2 changes: 1 addition & 1 deletion docker-compose.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
services:
proxy:
image: traefik:v3.1.4
image: traefik:v3.1.6
restart: unless-stopped
healthcheck:
test: [ "CMD", "traefik" ,"healthcheck", "--ping" ]
Expand Down
17 changes: 17 additions & 0 deletions docs/advanced-usage.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1588,3 +1588,20 @@ response object.
.. note:: The important thing here, is that, when the server agrees to stop speaking HTTP in favor of something else, the ``response.extension`` is set and you will be able to exchange raw data at will.

.. warning:: In HTTP/2 or HTTP/3 you want to replace ``"GET"`` by ``"CONNECT"`` and add a header named ``:protocol`` to issue a proper "Upgrade".

Background unsolicited data
----------------------------

.. note:: Upgrade urllib3-future to 2.11+ or later to benefit from this.

Since HTTP/2 or later, you may receive unsolicited incoming data that can be a challenge to verify whether the
connection is still up or not. We added a discrete task that carefully check for incoming data in idle connections.

To customize the behavior you may add the parameter ``background_watch_delay`` to your PoolManager or ConnectionPool
instance constructor.

Setting it to ``None`` makes it disabled.

.. note:: By default, it checks for incoming data and react to it every 5000ms.

.. warning:: Disabling this will void the effect of our automated keepalive.
26 changes: 26 additions & 0 deletions docs/user-guide.rst
Original file line number Diff line number Diff line change
Expand Up @@ -610,6 +610,32 @@ the timeout at the :class:`~urllib3.poolmanager.PoolManager` level:
You still override this pool-level timeout by specifying ``timeout`` to
:meth:`~urllib3.PoolManager.request`.

Keep-Alive
----------

.. note:: Available since urllib3-future v2.11 and before this only HTTP/1.1 were kept alive properly.

urllib3-future can automatically make sure that your HTTP connection is kept alive
no matter the used protocol using a discrete scheduled task for each host.

.. code-block:: python
import urllib3
http = urllib3.PoolManager(keepalive_delay=300, keepalive_idle_window=60)
In that example, we indicate that we wish to keep a connection alive for 5 minutes and
eventually send ping every 60s after the connection was idle. (Those values are the default ones!)

The pings are only sent when using HTTP/2 or HTTP/3 over QUIC. Any connection activity is considered as used, therefor
making the ping only 60s after zero activity. If the connection receive unsolicited data, it is also considered used.

.. note:: Setting either keepalive_delay or keepalive_idle_window to None disable this feature.

.. warning:: We do not recommend setting anything lower than 30s for keepalive_idle_window. Anything lower than 1s is considered to be 1s. High frequency ping will lower the performance of your connection pool.

Once the ``keepalive_delay`` passed, we do not close the connection, we simply cease to ensure it is alive. This is purely for backward compatibility with our predecessor, as some host may retain the connection for hours.

Retrying Requests
-----------------

Expand Down
5 changes: 4 additions & 1 deletion dummyserver/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,10 @@ async def inner_fn() -> R:
try:
return asyncio.run(inner_fn())
finally:
tornado_loop.close(all_fds=True) # type: ignore[union-attr]
try:
tornado_loop.close(all_fds=True) # type: ignore[union-attr]
except (ValueError, OSError):
pass # can fail needlessly with "Invalid file descriptor". Ignore!


@contextlib.contextmanager
Expand Down
2 changes: 1 addition & 1 deletion mypy-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ trustme==0.9.0
types-backports
types-requests
nox
qh3>=1.0.1,<2.0.0
qh3>=1.2.0,<2.0.0
h11>=0.11.0,<1.0.0
jh2>=5.0.0,<6.0.0
python_socks>=2.0,<3.0
Expand Down
6 changes: 5 additions & 1 deletion src/urllib3/_async/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from ..util._async.traffic_police import AsyncTrafficPolice
from ..backend._async._base import AsyncLowLevelResponse

from .._constant import DEFAULT_BLOCKSIZE
from .._constant import DEFAULT_BLOCKSIZE, DEFAULT_KEEPALIVE_DELAY
from ..util.timeout import _DEFAULT_TIMEOUT, Timeout
from ..util.util import to_str

Expand Down Expand Up @@ -127,6 +127,7 @@ def __init__(
preemptive_quic_cache: QuicPreemptiveCacheType | None = None,
resolver: AsyncBaseResolver | None = None,
socket_family: socket.AddressFamily = socket.AF_UNSPEC,
keepalive_delay: float | int | None = DEFAULT_KEEPALIVE_DELAY,
) -> None:
super().__init__(
host=host,
Expand All @@ -137,6 +138,7 @@ def __init__(
socket_options=socket_options,
disabled_svn=disabled_svn,
preemptive_quic_cache=preemptive_quic_cache,
keepalive_delay=keepalive_delay,
)
self.proxy = proxy
self.proxy_config = proxy_config
Expand Down Expand Up @@ -687,6 +689,7 @@ def __init__(
preemptive_quic_cache: QuicPreemptiveCacheType | None = None,
resolver: AsyncBaseResolver | None = None,
socket_family: socket.AddressFamily = socket.AF_UNSPEC,
keepalive_delay: float | int | None = DEFAULT_KEEPALIVE_DELAY,
proxy: Url | None = None,
proxy_config: ProxyConfig | None = None,
cert_reqs: int | str | None = None,
Expand Down Expand Up @@ -725,6 +728,7 @@ def __init__(
preemptive_quic_cache=preemptive_quic_cache,
resolver=resolver,
socket_family=socket_family,
keepalive_delay=keepalive_delay,
)

self.key_file = key_file
Expand Down
132 changes: 132 additions & 0 deletions src/urllib3/_async/connectionpool.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,24 @@
import queue
import socket
import sys
import time
import typing
import warnings
from asyncio import Task
from datetime import datetime, timedelta, timezone
from itertools import zip_longest
from socket import timeout as SocketTimeout
from types import TracebackType
from weakref import proxy

from .._collections import HTTPHeaderDict
from .._constant import (
DEFAULT_BACKGROUND_WATCH_WINDOW,
DEFAULT_KEEPALIVE_DELAY,
DEFAULT_KEEPALIVE_IDLE_WINDOW,
MINIMAL_BACKGROUND_WATCH_WINDOW,
MINIMAL_KEEPALIVE_IDLE_WINDOW,
)
from .._request_methods import AsyncRequestMethods
from .._typing import (
_TYPE_ASYNC_BODY,
Expand Down Expand Up @@ -140,6 +149,52 @@ async def close(self) -> None:
_blocking_errnos = {errno.EAGAIN, errno.EWOULDBLOCK}


async def idle_conn_watch_task(
pool: AsyncHTTPConnectionPool,
waiting_delay: float | int = 5.0,
keepalive_delay: int | float | None = DEFAULT_KEEPALIVE_DELAY,
keepalive_idle_window: int | float | None = DEFAULT_KEEPALIVE_IDLE_WINDOW,
) -> None:
"""Discrete background task that monitor incoming data
and dispatch message to registered callbacks."""

try:
while pool.pool is not None:
pool.num_background_watch_iter += 1
await asyncio.sleep(waiting_delay)
if pool.pool is None:
return
try:
async for conn in pool.pool.iter_idle():
now = time.monotonic()
last_used = conn.last_used_at
idle_delay = now - last_used

# don't peek into conn that just became idle
# waste of resource.
if idle_delay < 1.0:
continue

await conn.peek_and_react()

if (
keepalive_delay is not None
and keepalive_idle_window is not None
):
connected_at = conn.connected_at

if connected_at is not None:
since_connection_delay = now - connected_at
if since_connection_delay <= keepalive_delay:
if idle_delay >= keepalive_idle_window:
pool.num_pings += 1
await conn.ping()
except AttributeError:
return
except ReferenceError:
return


class AsyncHTTPConnectionPool(AsyncConnectionPool, AsyncRequestMethods):
"""
Task-safe async connection pool for one host.
Expand Down Expand Up @@ -187,6 +242,35 @@ class AsyncHTTPConnectionPool(AsyncConnectionPool, AsyncRequestMethods):
A dictionary with proxy headers, should not be used directly,
instead, see :class:`urllib3.AsyncProxyManager`
:param resolver:
A manual configuration to use for DNS resolution.
Can be a support DSN/str (e.g. "doh+cloudflare://") or a
:class:`urllib3.AsyncResolverDescription` or a list of DSN/str or
:class:`urllib3.AsyncResolverDescription`
:param happy_eyeballs:
Enable IETF Happy Eyeballs algorithm when trying to
connect by concurrently try multiple IPv4/IPv6 endpoints.
By default, tries at most 4 endpoints simultaneously.
You may specify an int that override this default.
Default, set to False.
:param background_watch_delay:
The window delay used by our discrete scheduler that run in
dedicated task to collect unsolicited incoming data and react
if necessary.
:param keepalive_delay:
The delay expressed in seconds on how long we should make sure
the connection is kept alive by sending pings to the remote peer.
Set it to None to void this feature.
:param keepalive_idle_window:
Immediately related to the parameter keepalive_delay.
This one, expressed in seconds, specify how long after
a connection is marked as idle we should send out a
ping to the remote peer.
:param \\**conn_kw:
Additional parameters are used to create fresh :class:`urllib3._async.connection.AsyncHTTPConnection`,
:class:`urllib3._async.connection.AsyncHTTPSConnection` instances.
Expand Down Expand Up @@ -216,6 +300,9 @@ def __init__(
| AsyncBaseResolver
| None = None,
happy_eyeballs: bool | int = False,
background_watch_delay: int | float | None = DEFAULT_BACKGROUND_WATCH_WINDOW,
keepalive_delay: int | float | None = DEFAULT_KEEPALIVE_DELAY,
keepalive_idle_window: int | float | None = DEFAULT_KEEPALIVE_IDLE_WINDOW,
**conn_kw: typing.Any,
):
AsyncConnectionPool.__init__(self, host, port)
Expand Down Expand Up @@ -257,6 +344,9 @@ def __init__(
# These are mostly for testing and debugging purposes.
self.num_connections = 0
self.num_requests = 0
self.num_pings = 0
self.num_background_watch_iter = 0

self.conn_kw = conn_kw

if self.proxy:
Expand Down Expand Up @@ -323,6 +413,17 @@ def __init__(
)

self.conn_kw["resolver"] = self._resolver
self.conn_kw["keepalive_delay"] = keepalive_delay

self._background_watch_delay = background_watch_delay
self._keepalive_delay = keepalive_delay
self._keepalive_idle_window = keepalive_idle_window
if (
self._keepalive_idle_window is not None
and self._keepalive_idle_window < MINIMAL_KEEPALIVE_IDLE_WINDOW
):
self._keepalive_idle_window = MINIMAL_KEEPALIVE_IDLE_WINDOW
self._background_monitoring: asyncio.Task | None = None # type: ignore[type-arg]

@property
def is_idle(self) -> bool:
Expand All @@ -337,6 +438,20 @@ async def _new_conn(
if self.pool is None:
raise ClosedPoolError(self, "Pool is closed")

if (
self._background_monitoring is None
and self._background_watch_delay is not None
and self._background_watch_delay >= MINIMAL_BACKGROUND_WATCH_WINDOW
):
self._background_monitoring = asyncio.create_task(
idle_conn_watch_task(
proxy(self),
self._background_watch_delay,
self._keepalive_delay,
self._keepalive_idle_window,
)
)

self.num_connections += 1
log.debug(
"Starting new HTTP connection (%d): %s:%s",
Expand Down Expand Up @@ -1191,6 +1306,10 @@ async def close(self) -> None:
# Close all the HTTPConnections in the pool.
await old_pool.clear()

if self._background_monitoring is not None:
self._background_monitoring.cancel()
self._background_monitoring = None

# Close allocated resolver if we own it. (aka. not shared)
if self._own_resolver and self._resolver.is_available():
await self._resolver.close()
Expand Down Expand Up @@ -1846,6 +1965,19 @@ async def _new_conn(
"""
if self.pool is None:
raise ClosedPoolError(self, "Pool is closed")
if (
self._background_monitoring is None
and self._background_watch_delay is not None
and self._background_watch_delay >= MINIMAL_BACKGROUND_WATCH_WINDOW
):
self._background_monitoring = asyncio.create_task(
idle_conn_watch_task(
self,
self._background_watch_delay,
self._keepalive_delay,
self._keepalive_idle_window,
)
)
self.num_connections += 1
log.debug(
"Starting new HTTPS connection (%d): %s:%s",
Expand Down
7 changes: 7 additions & 0 deletions src/urllib3/_constant.py
Original file line number Diff line number Diff line change
Expand Up @@ -243,3 +243,10 @@ def __new__(
# Mozilla TLS recommendations for ciphers
# General-purpose servers with a variety of clients, recommended for almost all systems.
MOZ_INTERMEDIATE_CIPHERS: str = "ECDHE-ECDSA-AES128-GCM-SHA256:ECDHE-RSA-AES128-GCM-SHA256:ECDHE-ECDSA-AES256-GCM-SHA384:ECDHE-RSA-AES256-GCM-SHA384:ECDHE-ECDSA-CHACHA20-POLY1305:ECDHE-RSA-CHACHA20-POLY1305:DHE-RSA-AES128-GCM-SHA256:DHE-RSA-AES256-GCM-SHA384:DHE-RSA-CHACHA20-POLY1305"

DEFAULT_BACKGROUND_WATCH_WINDOW: float = 5.0
MINIMAL_BACKGROUND_WATCH_WINDOW: float = 0.05

DEFAULT_KEEPALIVE_DELAY: float = 300.0
DEFAULT_KEEPALIVE_IDLE_WINDOW: float = 60.0
MINIMAL_KEEPALIVE_IDLE_WINDOW: float = 1.0
2 changes: 1 addition & 1 deletion src/urllib3/_version.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# This file is protected via CODEOWNERS
from __future__ import annotations

__version__ = "2.10.906"
__version__ = "2.11.900"
3 changes: 3 additions & 0 deletions src/urllib3/backend/_async/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -314,3 +314,6 @@ async def send( # type: ignore[override]
"""The send() method SHOULD be invoked after calling endheaders() if and only if the request
context specify explicitly that a body is going to be sent."""
raise NotImplementedError

async def ping(self) -> None: # type: ignore[override]
raise NotImplementedError
Loading

0 comments on commit 1446db4

Please sign in to comment.