Skip to content

Commit

Permalink
🚀 final details around ws integration (#154)
Browse files Browse the repository at this point in the history
  • Loading branch information
Ousret authored Oct 7, 2024
1 parent 8f5e243 commit f60b80a
Show file tree
Hide file tree
Showing 19 changed files with 156 additions and 71 deletions.
6 changes: 1 addition & 5 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -99,12 +99,8 @@ jobs:
uses: "actions/checkout@d632683dd7b4114ad314bca15554477dd762a938"

- name: "Traefik: Prerequisites - Colima (MacOS)"
uses: nick-fields/retry@v3
if: ${{ matrix.traefik-server && contains(matrix.os, 'mac') }}
with:
timeout_minutes: 10
max_attempts: 3
command: ./traefik/macos.sh
run: ./traefik/macos.sh

- name: "Setup Python ${{ matrix.python-version }}"
uses: "actions/setup-python@f677139bbe7f9c59b41e40162b753c062f5d49a3"
Expand Down
3 changes: 0 additions & 3 deletions CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,6 @@
This will require the installation of an optional dependency ``wsproto``, to do so, please install urllib3-future with
``pip install urllib3-future[ws]``.
- Fixed a rare issue where the ``:authority`` (special header) value might be malformed.
- Fixed an issue where passing ``Upgrade: websocket`` would be discarded without effect, thus smuggling the original user
intent. This is an issue due to our strict backward compatibility with our predecessor. Now, passing this header
will automatically disable HTTP/2 and HTTP/3 support for the given request.

2.9.900 (2024-09-24)
====================
Expand Down
2 changes: 2 additions & 0 deletions docs/advanced-usage.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1586,3 +1586,5 @@ response object.
.. note:: The important thing here, is that, when the server agrees to stop speaking HTTP in favor of something else, the ``response.extension`` is set and you will be able to exchange raw data at will.

.. warning:: In HTTP/2 or HTTP/3 you want to replace ``"GET"`` by ``"CONNECT"`` and add a header named ``:protocol`` to issue a proper "Upgrade".
25 changes: 13 additions & 12 deletions src/urllib3/_async/response.py
Original file line number Diff line number Diff line change
Expand Up @@ -316,14 +316,6 @@ async def _raw_read( # type: ignore[override]
if self._fp is None:
return None # type: ignore[return-value]

# Safe-Guard against response that does not provide bodies.
# Calling read will most likely block the program forever!
if self.length_remaining == 0 and (
self.status == 101
or (self._request_method == "CONNECT" and 200 <= self.status < 300)
):
return None # type: ignore[return-value]

fp_closed = getattr(self._fp, "closed", False)

async with self._error_catcher():
Expand Down Expand Up @@ -450,10 +442,16 @@ async def read( # type: ignore[override]
and self._fp._eot
and self._police_officer is not None
):
self._police_officer.forget(self)
if self._police_officer.busy:
self._police_officer.release()
self._police_officer = None
# an HTTP extension could be live, we don't want to accidentally kill it!
if (
not hasattr(self._fp, "_dsa")
or self._fp._dsa is None
or self._fp._dsa.closed is True
):
self._police_officer.forget(self)
if self._police_officer.busy:
self._police_officer.release()
self._police_officer = None

async def stream( # type: ignore[override]
self, amt: int | None = 2**16, decode_content: bool | None = None
Expand All @@ -465,6 +463,9 @@ async def stream( # type: ignore[override]
yield data

async def close(self) -> None: # type: ignore[override]
if self.extension is not None and self.extension.closed is False:
await self.extension.close()

if not self.closed and self._fp:
self._fp.close()

Expand Down
12 changes: 10 additions & 2 deletions src/urllib3/backend/_async/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ def __init__(
self,
stream_id: int,
read: typing.Callable[
[int | None, int | None, bool],
[int | None, int | None, bool, bool],
typing.Awaitable[tuple[bytes, bool, HTTPHeaderDict | None]],
]
| None = None,
Expand All @@ -23,6 +23,10 @@ def __init__(
self._read = read
self._write = write

@property
def closed(self) -> bool:
return self._read is None and self._write is None

async def readinto(self, b: bytearray) -> int:
if self._read is None:
raise OSError("read operation on a closed stream")
Expand Down Expand Up @@ -61,7 +65,10 @@ async def recv_extended(
raise OSError("stream closed error")

data, eot, trailers = await self._read(
__bufsize, self._stream_id, __bufsize is not None
__bufsize,
self._stream_id,
__bufsize is not None,
False,
)

if eot:
Expand Down Expand Up @@ -96,6 +103,7 @@ async def close(self) -> None:
await self._write(b"", self._stream_id, True)
self._write = None
if self._read is not None:
await self._read(None, self._stream_id, False, True)
self._read = None


Expand Down
18 changes: 17 additions & 1 deletion src/urllib3/backend/_async/hface.py
Original file line number Diff line number Diff line change
Expand Up @@ -1067,8 +1067,24 @@ async def __read_st(
__amt: int | None,
__stream_id: int | None,
__respect_end_signal: bool = True,
__dummy_operation: bool = False,
) -> tuple[bytes, bool, HTTPHeaderDict | None]:
"""Allows us to defer the body loading after constructing the response object."""

# we may want to just remove the response as "pending"
# e.g. HTTP Extension; making reads on sub protocol close may
# ends up in a blocking situation (forever).
if __dummy_operation:
try:
del self._pending_responses[__stream_id] # type: ignore[arg-type]
except KeyError:
pass # Hmm... this should be impossible.

# remote can refuse future inquiries, so no need to go further with this conn.
if self._protocol.has_expired(): # type: ignore[union-attr]
await self.close()
return b"", True, None

eot = False

events: list[DataReceived | HeadersReceived] = await self.__exchange_until( # type: ignore[assignment]
Expand Down Expand Up @@ -1247,7 +1263,7 @@ async def getresponse( # type: ignore[override]
self._http_vsn,
reason,
headers,
self.__read_st if not eot else None,
self.__read_st if eot is False and dsa is None else None,
authority=self.host,
port=self.port,
stream_id=promise.stream_id,
Expand Down
14 changes: 10 additions & 4 deletions src/urllib3/backend/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ def __init__(
self,
stream_id: int,
read: typing.Callable[
[int | None, int | None, bool], tuple[bytes, bool, HTTPHeaderDict | None]
[int | None, int | None, bool, bool],
tuple[bytes, bool, HTTPHeaderDict | None],
]
| None = None,
write: typing.Callable[[bytes, int, bool], None] | None = None,
Expand All @@ -99,8 +100,8 @@ def __init__(

if read is not None:
self._read: typing.Callable[
[int | None], tuple[bytes, bool, HTTPHeaderDict | None]
] | None = lambda amt: read(amt, self._stream_id, amt is not None)
[int | None, bool], tuple[bytes, bool, HTTPHeaderDict | None]
] | None = lambda amt, fo: read(amt, self._stream_id, amt is not None, fo)
else:
self._read = None

Expand All @@ -111,6 +112,10 @@ def __init__(
else:
self._write = None

@property
def closed(self) -> bool:
return self._read is None and self._write is None

def readinto(self, b: bytearray) -> int:
if self._read is None:
raise OSError("read operation on a closed stream")
Expand Down Expand Up @@ -148,7 +153,7 @@ def recv_extended(
if self._read is None:
raise OSError("stream closed error")

data, eot, trailers = self._read(__bufsize)
data, eot, trailers = self._read(__bufsize, False)

if eot:
self._read = None
Expand Down Expand Up @@ -180,6 +185,7 @@ def close(self) -> None:
self._write(b"", True)
self._write = None
if self._read is not None:
self._read(None, True)
self._read = None


Expand Down
18 changes: 17 additions & 1 deletion src/urllib3/backend/hface.py
Original file line number Diff line number Diff line change
Expand Up @@ -1127,8 +1127,24 @@ def __read_st(
__amt: int | None,
__stream_id: int | None,
__respect_end_signal: bool = True,
__dummy_operation: bool = False,
) -> tuple[bytes, bool, HTTPHeaderDict | None]:
"""Allows us to defer the body loading after constructing the response object."""

# we may want to just remove the response as "pending"
# e.g. HTTP Extension; making reads on sub protocol close may
# ends up in a blocking situation (forever).
if __dummy_operation:
try:
del self._pending_responses[__stream_id] # type: ignore[arg-type]
except KeyError:
pass # Hmm... this should be impossible.

# remote can refuse future inquiries, so no need to go further with this conn.
if self._protocol.has_expired(): # type: ignore[union-attr]
self.close()
return b"", True, None

eot = False

events: list[DataReceived | HeadersReceived] = self.__exchange_until( # type: ignore[assignment]
Expand Down Expand Up @@ -1306,7 +1322,7 @@ def getresponse(
self._http_vsn,
reason,
headers,
self.__read_st if not eot else None,
self.__read_st if eot is False and dsa is None else None,
authority=self.host,
port=self.port,
stream_id=promise.stream_id,
Expand Down
11 changes: 11 additions & 0 deletions src/urllib3/contrib/hface/protocols/http2/_h2.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import jh2.connection # type: ignore
import jh2.events # type: ignore
import jh2.exceptions # type: ignore
import jh2.settings # type: ignore

from ..._stream_matrix import StreamMatrix
from ..._typing import HeadersType
Expand Down Expand Up @@ -51,6 +52,16 @@ def __init__(
observable_impl: HTTP2ProtocolHyperImpl | None = None,
) -> None:
super().__init__(config=config)
# by default CONNECT is disabled
# we need it to support natively WebSocket over HTTP/2 for example.
self.local_settings = jh2.settings.Settings(
client=True,
initial_values={
jh2.settings.SettingCodes.MAX_CONCURRENT_STREAMS: 100,
jh2.settings.SettingCodes.MAX_HEADER_LIST_SIZE: self.DEFAULT_MAX_HEADER_LIST_SIZE,
jh2.settings.SettingCodes.ENABLE_CONNECT_PROTOCOL: 1,
},
)
self._observable_impl = observable_impl

def _open_streams(self, *args, **kwargs) -> int: # type: ignore[no-untyped-def]
Expand Down
6 changes: 5 additions & 1 deletion src/urllib3/contrib/webextensions/_async/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,16 @@ def __init__(self) -> None:
async def start(self, response: AsyncHTTPResponse) -> None:
"""The HTTP server gave us the go-to start negotiating another protocol."""
if response._fp is None or not hasattr(response._fp, "_dsa"):
raise OSError()
raise OSError("The HTTP extension is closed or uninitialized")

self._dsa = response._fp._dsa
self._police_officer = response._police_officer
self._response = response

@property
def closed(self) -> bool:
return self._dsa is None

@staticmethod
def supported_svn() -> set[HttpVersion]:
"""Hint about supported parent SVN for this extension."""
Expand Down
4 changes: 2 additions & 2 deletions src/urllib3/contrib/webextensions/_async/raw.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,14 @@ def scheme_to_http_scheme(scheme: str) -> str:

async def next_payload(self) -> bytes | None:
if self._police_officer is None or self._dsa is None:
raise OSError
raise OSError("The HTTP extension is closed or uninitialized")
async with self._police_officer.borrow(self._response):
data, eot, _ = await self._dsa.recv_extended(None)
return data

async def send_payload(self, buf: str | bytes) -> None:
if self._police_officer is None or self._dsa is None:
raise OSError
raise OSError("The HTTP extension is closed or uninitialized")

async with self._police_officer.borrow(self._response):
if isinstance(buf, str):
Expand Down
21 changes: 15 additions & 6 deletions src/urllib3/contrib/webextensions/_async/ws.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,9 @@ async def start(self, response: AsyncHTTPResponse) -> None:
accept_token: str | None = response.headers.get("Sec-Websocket-Accept")

if accept_token is None:
raise OSError
raise RuntimeError(
"The WebSocket HTTP extension requires 'Sec-Websocket-Accept' header in the server response but was not present."
)

fake_http_response += accept_token.encode() + b"\r\n\r\n"

Expand All @@ -54,7 +56,9 @@ async def start(self, response: AsyncHTTPResponse) -> None:
event = next(self._protocol.events())

if not isinstance(event, AcceptConnection):
raise OSError
raise RuntimeError(
"The WebSocket state-machine did not pass the handshake phase when expected."
)

def headers(self, http_version: HttpVersion) -> dict[str, str]:
"""Specific HTTP headers required (request) before the 101 status response."""
Expand Down Expand Up @@ -90,15 +94,20 @@ async def close(self) -> None:
await self._dsa.close()
self._dsa = None
if self._response is not None:
await self._response.close()
if self._police_officer is not None:
self._police_officer.forget(self._response)
if self._police_officer.busy:
self._police_officer.release()
else:
await self._response.close()
self._response = None

self._police_officer = None

async def next_payload(self) -> str | bytes | None:
"""Unpack the next received message/payload from remote."""
if self._dsa is None or self._response is None or self._police_officer is None:
raise OSError("Missing call to start(...) for the HTTP extension")
raise OSError("The HTTP extension is closed or uninitialized")

for event in self._protocol.events():
if isinstance(event, TextMessage):
Expand Down Expand Up @@ -130,7 +139,7 @@ async def next_payload(self) -> str | bytes | None:
async def send_payload(self, buf: str | bytes) -> None:
"""Dispatch a buffer to remote."""
if self._dsa is None or self._response is None or self._police_officer is None:
raise OSError("Missing call to start(...) for the HTTP extension")
raise OSError("The HTTP extension is closed or uninitialized")

if isinstance(buf, str):
data_to_send: bytes = self._protocol.send(TextMessage(buf))
Expand All @@ -142,7 +151,7 @@ async def send_payload(self, buf: str | bytes) -> None:

async def ping(self) -> None:
if self._dsa is None or self._response is None or self._police_officer is None:
raise OSError("Missing call to start(...) for the HTTP extension")
raise OSError("The HTTP extension is closed or uninitialized")

data_to_send: bytes = self._protocol.send(Ping())
await self._dsa.sendall(data_to_send)
Expand Down
8 changes: 7 additions & 1 deletion src/urllib3/contrib/webextensions/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,18 @@ def __init__(self) -> None:
def start(self, response: HTTPResponse) -> None:
"""The HTTP server gave us the go-to start negotiating another protocol."""
if response._fp is None or not hasattr(response._fp, "_dsa"):
raise OSError()
raise RuntimeError(
"Attempt to start an HTTP extension without direct I/O access to the stream"
)

self._dsa = response._fp._dsa
self._police_officer = response._police_officer
self._response = response

@property
def closed(self) -> bool:
return self._dsa is None

@staticmethod
def supported_svn() -> set[HttpVersion]:
"""Hint about supported parent SVN for this extension."""
Expand Down
4 changes: 2 additions & 2 deletions src/urllib3/contrib/webextensions/raw.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,14 @@ def scheme_to_http_scheme(scheme: str) -> str:

def next_payload(self) -> bytes | None:
if self._police_officer is None or self._dsa is None:
raise OSError
raise OSError("The HTTP extension is closed or uninitialized")
with self._police_officer.borrow(self._response):
data, eot, _ = self._dsa.recv_extended(None)
return data

def send_payload(self, buf: str | bytes) -> None:
if self._police_officer is None or self._dsa is None:
raise OSError
raise OSError("The HTTP extension is closed or uninitialized")

with self._police_officer.borrow(self._response):
if isinstance(buf, str):
Expand Down
Loading

0 comments on commit f60b80a

Please sign in to comment.