Skip to content

Commit

Permalink
🐛 Fixed several issues with multiplexing
Browse files Browse the repository at this point in the history
  • Loading branch information
Ousret committed Nov 4, 2023
1 parent 396f1ea commit 5fb7148
Show file tree
Hide file tree
Showing 10 changed files with 150 additions and 24 deletions.
9 changes: 9 additions & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
@@ -1,3 +1,12 @@
2.2.901 (2023-11-04)
====================

- Fixed several issues with multiplexing.
(i) Fixed max concurrent streams in HTTP/2, and HTTP/3.
(ii) Fixed tracking of unconsumed response prior to try upgrade the connection (to HTTP/3).
(iii) Fixed (always) releasing multiplexed connections into pool.
(iv) Fixed request having body being interrupted by the ``EarlyResponse`` exception 'signal'.

2.2.900 (2023-11-01)
====================

Expand Down
2 changes: 1 addition & 1 deletion src/urllib3/_version.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# This file is protected via CODEOWNERS
from __future__ import annotations

__version__ = "2.2.900"
__version__ = "2.2.901"
5 changes: 5 additions & 0 deletions src/urllib3/backend/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,7 @@ def __init__(
self.conn_info: ConnectionInfo | None = None

self._promises: list[ResponsePromise] = []
self._pending_responses: list[LowLevelResponse] = []

def __contains__(self, item: ResponsePromise) -> bool:
return item in self._promises
Expand All @@ -307,6 +308,10 @@ def _http_vsn(self) -> int:
def is_saturated(self) -> bool:
raise NotImplementedError

@property
def is_idle(self) -> bool:
return not self._promises and not self._pending_responses

@property
def is_multiplexed(self) -> bool:
raise NotImplementedError
Expand Down
17 changes: 15 additions & 2 deletions src/urllib3/backend/hface.py
Original file line number Diff line number Diff line change
Expand Up @@ -572,6 +572,8 @@ def __exchange_until(
else:
if isinstance(event, event_type_collectable):
events.append(event)
else:
reshelve_events.append(event)

target_cap_reached: bool = (
maximal_data_in_read is not None
Expand Down Expand Up @@ -755,7 +757,16 @@ def __read_st(

if events and events[-1].end_stream:
eot = True
if not self._promises:
_r = None

for _r in self._pending_responses:
if _r._stream_id == __stream_id:
break

if _r:
self._pending_responses.remove(_r)

if self.is_idle:
# probe for h3/quic if available, and remember it.
self._upgrade()

Expand Down Expand Up @@ -852,12 +863,14 @@ def getresponse(
self.__session_ticket = self._protocol.session_ticket

if eot:
if not self._promises:
if self.is_idle:
self._upgrade()

# remote can refuse future inquiries, so no need to go further with this conn.
if self._protocol and self._protocol.has_expired():
self.close()
else:
self._pending_responses.append(response)

return response

Expand Down
4 changes: 2 additions & 2 deletions src/urllib3/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,8 +247,8 @@ def is_closed(self) -> bool:
def is_connected(self) -> bool:
if self.sock is None:
return False
# wait_for_read become flaky with concurrent streams!
if self._promises:
# wait_for_read: not functional with multiplexed connection!
if self._promises or self._pending_responses:
return True
return not wait_for_read(self.sock, timeout=0.0)

Expand Down
18 changes: 10 additions & 8 deletions src/urllib3/connectionpool.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from ._collections import HTTPHeaderDict
from ._request_methods import RequestMethods
from ._typing import _TYPE_BODY, _TYPE_BODY_POSITION, _TYPE_TIMEOUT, ProxyConfig
from .backend import ConnectionInfo, HttpVersion, ResponsePromise
from .backend import ConnectionInfo, ResponsePromise
from .connection import (
BaseSSLError,
BrokenPipeError,
Expand Down Expand Up @@ -290,6 +290,11 @@ def _get_conn(
if conn and is_connection_dropped(conn):
log.debug("Resetting dropped connection: %s", self.host)
conn.close()
elif conn and conn.is_saturated is True and no_new is False:
try:
return self._get_conn(timeout=timeout, no_new=no_new)
finally:
self._put_conn(conn)

return conn or self._new_conn()

Expand Down Expand Up @@ -413,7 +418,9 @@ def get_response(
self._put_conn(conn)

if promise is not None and response is None:
raise ValueError
raise ValueError(
"Invoked get_response with promise=... that no connection in pool recognize"
)

if response is None:
return None
Expand Down Expand Up @@ -1155,12 +1162,7 @@ def urlopen(
conn.close()
conn = None
release_this_conn = True
elif (
conn
and conn.is_saturated is False
and conn.conn_info is not None
and conn.conn_info.http_version != HttpVersion.h11
):
elif conn and conn.is_multiplexed is True:
# multiplexing allows us to issue more requests.
release_this_conn = True

Expand Down
27 changes: 21 additions & 6 deletions src/urllib3/contrib/hface/protocols/http2/_h2.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ def __init__(
)
self._connection.initiate_connection()
self._events: deque[Event] = deque()
self._events_streams: list[int] = []
self._terminated: bool = False

@staticmethod
Expand All @@ -66,10 +67,7 @@ def exceptions() -> tuple[type[BaseException], ...]:

def is_available(self) -> bool:
max_streams = self._connection.remote_settings.max_concurrent_streams
return (
self._terminated is False
and max_streams > self._connection.highest_outbound_stream_id
)
return self._terminated is False and max_streams > len(self._connection.streams)

def has_expired(self) -> bool:
return self._terminated
Expand Down Expand Up @@ -97,13 +95,28 @@ def submit_stream_reset(self, stream_id: int, error_code: int = 0) -> None:
def next_event(self) -> Event | None:
if not self._events:
return None
return self._events.popleft()
ev = self._events.popleft()

if hasattr(ev, "stream_id"):
self._events_streams.remove(ev.stream_id)

return ev

def has_pending_event(self, *, stream_id: int | None = None) -> bool:
return len(self._events) > 0
if stream_id is None:
return len(self._events) > 0

try:
self._events_streams.index(stream_id)
except ValueError:
return False

return True

def _map_events(self, h2_events: list[h2.events.Event]) -> Iterator[Event]:
for e in h2_events:
if hasattr(e, "stream_id"):
self._events_streams.append(e.stream_id)
if isinstance(
e,
(
Expand Down Expand Up @@ -175,3 +188,5 @@ def should_wait_remote_flow_control(
def reshelve(self, *events: Event) -> None:
for ev in reversed(events):
self._events.appendleft(ev)
if hasattr(ev, "stream_id"):
self._events_streams.append(ev.stream_id)
22 changes: 18 additions & 4 deletions src/urllib3/contrib/hface/protocols/http3/_qh3.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ def __init__(
self._connection_ids: set[bytes] = set()
self._remote_address = remote_address
self._event_buffer: deque[Event] = deque()
self._events_streams: list[int] = []
self._packets: deque[bytes] = deque()
self._http: H3Connection | None = None
self._terminated: bool = False
Expand All @@ -111,8 +112,8 @@ def exceptions() -> tuple[type[BaseException], ...]:
return ProtocolError, H3Error, QuicConnectionError, AssertionError

def is_available(self) -> bool:
# todo: decide a limit. 250?
return not self._terminated
# todo: qh3 hardcode 128 streams as max, change this!
return self._terminated is False and 128 > len(self._quic._streams)

def has_expired(self) -> bool:
return self._terminated
Expand Down Expand Up @@ -157,10 +158,19 @@ def submit_stream_reset(self, stream_id: int, error_code: int = 0) -> None:
def next_event(self) -> Event | None:
if not self._event_buffer:
return None
return self._event_buffer.popleft()
ev = self._event_buffer.popleft()
if hasattr(ev, "stream_id"):
self._events_streams.remove(ev.stream_id)
return ev

def has_pending_event(self, *, stream_id: int | None = None) -> bool:
return len(self._event_buffer) > 0
if stream_id is None:
return len(self._event_buffer) > 0
try:
self._events_streams.index(stream_id)
except ValueError:
return False
return True

@property
def connection_ids(self) -> Sequence[bytes]:
Expand Down Expand Up @@ -232,6 +242,8 @@ def _map_quic_event(self, quic_event: quic_events.QuicEvent) -> Iterable[Event]:
yield StreamResetReceived(quic_event.stream_id, quic_event.error_code)

def _map_h3_event(self, h3_event: h3_events.H3Event) -> Iterable[Event]:
if hasattr(h3_event, "stream_id"):
self._events_streams.append(h3_event.stream_id)
if isinstance(h3_event, h3_events.HeadersReceived):
yield HeadersReceived(
h3_event.stream_id, h3_event.headers, h3_event.stream_ended
Expand Down Expand Up @@ -496,4 +508,6 @@ def cipher(self) -> str | None:

def reshelve(self, *events: Event) -> None:
for ev in reversed(events):
if hasattr(ev, "stream_id"):
self._events_streams.append(ev.stream_id)
self._event_buffer.appendleft(ev)
11 changes: 10 additions & 1 deletion src/urllib3/poolmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,11 @@ def get_response(
if not pool:
continue

response = pool.get_response(promise=promise)
try:
response = pool.get_response(promise=promise)
except ValueError:
response = None

put_back_pool.append((pool_key, pool))

if response:
Expand All @@ -450,6 +454,11 @@ def get_response(
for pool_key, pool in put_back_pool:
self.pools[pool_key] = pool

if promise is not None and response is None:
raise ValueError(
"Invoked get_response with promise=... that no connections across pools recognize"
)

if response is None:
return None

Expand Down
59 changes: 59 additions & 0 deletions test/with_traefik/test_connectionpool_multiplexed.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,62 @@ def test_multiplexing_fastest_to_slowest(self) -> None:

assert 3.5 >= round(time() - before, 2)
assert pool.get_response() is None

def test_multiplexing_without_preload(self) -> None:
with HTTPSConnectionPool(
self.host, self.https_port, ca_certs=self.ca_authority
) as pool:
promises = []

for i in range(5):
promise_slow = pool.urlopen(
"GET", "/delay/3", multiplexed=True, preload_content=False
)
promise_fast = pool.urlopen(
"GET", "/delay/1", multiplexed=True, preload_content=False
)

assert isinstance(promise_fast, ResponsePromise)
assert isinstance(promise_slow, ResponsePromise)
promises.append(promise_slow)
promises.append(promise_fast)

assert len(promises) == 10

for i in range(5):
response = pool.get_response()
assert response is not None
assert response.status == 200
assert "/delay/1" in response.json()["url"]

for i in range(5):
response = pool.get_response()
assert response is not None
assert response.status == 200
assert "/delay/3" in response.json()["url"]

assert pool.get_response() is None

def test_multiplexing_stream_saturation(self) -> None:
with HTTPSConnectionPool(
self.host, self.https_port, ca_certs=self.ca_authority, maxsize=2
) as pool:
promises = []

for i in range(300):
promise = pool.urlopen(
"GET", "/delay/1", multiplexed=True, preload_content=False
)
assert isinstance(promise, ResponsePromise)
promises.append(promise)

assert len(promises) == 300

for i in range(300):
response = pool.get_response()
assert response is not None
assert response.status == 200
assert "/delay/1" in response.json()["url"]

assert pool.get_response() is None
assert pool.pool is not None and pool.pool.qsize() == 2

0 comments on commit 5fb7148

Please sign in to comment.