Skip to content

Commit

Permalink
🐛 Implement proper flow control when sending a body for HTTP/2
Browse files Browse the repository at this point in the history
  • Loading branch information
Ousret committed Sep 12, 2023
1 parent 97fafa1 commit 5ec2221
Show file tree
Hide file tree
Showing 7 changed files with 88 additions and 25 deletions.
3 changes: 3 additions & 0 deletions changelog/18.bugfix.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Fixed the flow control when sending a body for a HTTP/2 connection.
The body will be split into numerous chunks if the size exceed the specified blocksize when not
using HTTP/1.1 in order to avoid ProtocolError (flow control)
38 changes: 29 additions & 9 deletions src/urllib3/backend/hface.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,9 +190,11 @@ def _custom_tls(
if isinstance(ca_cert_data, str)
else ca_cert_data,
session_ticket=self.__session_ticket, # going to be set after first successful quic handshake
# mTLS start
certfile=cert_file,
keyfile=key_file,
keypassword=key_password,
# mTLS end
cert_fingerprint=cert_fingerprint,
cert_use_common_name=cert_use_common_name,
verify_hostname=bool(assert_hostname),
Expand Down Expand Up @@ -406,8 +408,6 @@ def __exchange_until(

if data_out:
self.sock.sendall(data_out)
else: # nothing to send out...? immediately exit.
return events # Defensive: This should be unreachable in the current project state.

data_in = self.sock.recv(maximal_data_in_read or self.blocksize)

Expand Down Expand Up @@ -764,14 +764,25 @@ def unpack_chunk(possible_chunk: bytes) -> bytes:
data_ = unpack_chunk(data)
is_chunked = len(data_) != len(data)

if (
self._protocol.should_wait_remote_flow_control(
self._stream_id, len(data_)
)
is True
):
self._protocol.bytes_received(self.sock.recv(self.blocksize))

if self.__remaining_body_length:
self.__remaining_body_length -= len(data_)

end_stream = (
is_chunked and data_ == b""
) or self.__remaining_body_length == 0

self._protocol.submit_data(
self._stream_id,
data_,
end_stream=(is_chunked and data_ == b"")
or self.__remaining_body_length == 0,
end_stream=end_stream,
)
else:
# urllib3 is supposed to handle every case
Expand All @@ -783,7 +794,16 @@ def unpack_chunk(possible_chunk: bytes) -> bytes:
if _HAS_SYS_AUDIT:
sys.audit("http.client.send", self, data)

self.sock.sendall(self._protocol.bytes_to_send())
# some protocols may impose regulated frame size
# so expect multiple frame per send()
while True:
data_out = self._protocol.bytes_to_send()

if not data_out:
break

self.sock.sendall(data_out)

except self._protocol.exceptions() as e:
raise ProtocolError( # Defensive: In the unlikely event that exception may leak from below
e
Expand All @@ -797,11 +817,11 @@ def close(self) -> None:
except self._protocol.exceptions() as e: # Defensive:
# overly protective, made in case of possible exception leak.
raise ProtocolError(e) from e # Defensive:
else:
goodbye_trame: bytes = self._protocol.bytes_to_send()

goodbye_trame: bytes = self._protocol.bytes_to_send()

if goodbye_trame:
self.sock.sendall(goodbye_trame)
if goodbye_trame:
self.sock.sendall(goodbye_trame)

self.sock.close()

Expand Down
7 changes: 6 additions & 1 deletion src/urllib3/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,12 @@ def request(

# Transform the body into an iterable of sendall()-able chunks
# and detect if an explicit Content-Length is doable.
chunks_and_cl = body_to_chunks(body, method=method, blocksize=self.blocksize)
chunks_and_cl = body_to_chunks(
body,
method=method,
blocksize=self.blocksize,
force=self._svn != HttpVersion.h11,
)
chunks = chunks_and_cl.chunks
content_length = chunks_and_cl.content_length

Expand Down
14 changes: 14 additions & 0 deletions src/urllib3/contrib/hface/protocols/_protocols.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,15 @@ def connection_lost(self) -> None:
"""
raise NotImplementedError

def should_wait_remote_flow_control(
self, stream_id: int, amt: int | None = None
) -> bool | None:
"""
Verify if the client should listen network incoming data for
the flow control update purposes.
"""
raise NotImplementedError


class OverTCPProtocol(BaseProtocol):
"""
Expand Down Expand Up @@ -273,6 +282,11 @@ class HTTP1Protocol(HTTPOverTCPProtocol):
def multiplexed(self) -> bool:
return False

def should_wait_remote_flow_control(
self, stream_id: int, amt: int | None = None
) -> bool | None:
return NotImplemented

error_codes = HTTPErrorCodes(
protocol_error=400,
internal_error=500,
Expand Down
8 changes: 8 additions & 0 deletions src/urllib3/contrib/hface/protocols/http2/_h2.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,3 +157,11 @@ def _connection_terminated(
error_code = int(error_code) # Convert h2 IntEnum to an actual int
self._terminated = True
self._events.append(ConnectionTerminated(error_code, message))

def should_wait_remote_flow_control(
self, stream_id: int, amt: int | None = None
) -> bool | None:
if amt is None:
return self._connection.local_flow_control_window(stream_id) == 0
else:
return amt > self._connection.local_flow_control_window(stream_id)
5 changes: 5 additions & 0 deletions src/urllib3/contrib/hface/protocols/http3/_qh3.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,3 +203,8 @@ def _map_h3_event(self, h3_event: h3_events.H3Event) -> Iterable[Event]:
)
elif isinstance(h3_event, h3_events.DataReceived):
yield DataReceived(h3_event.stream_id, h3_event.data, h3_event.stream_ended)

def should_wait_remote_flow_control(
self, stream_id: int, amt: int | None = None
) -> bool | None:
return True
38 changes: 23 additions & 15 deletions src/urllib3/util/request.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,10 @@ class ChunksAndContentLength(typing.NamedTuple):


def body_to_chunks(
body: typing.Any | None, method: str, blocksize: int
body: typing.Any | None,
method: str,
blocksize: int,
force: bool = False,
) -> ChunksAndContentLength:
"""Takes the HTTP request method, body, and blocksize and
transforms them into an iterable of chunks to pass to
Expand All @@ -201,6 +204,17 @@ def body_to_chunks(
chunks: typing.Iterable[bytes] | None
content_length: int | None

def chunk_readable() -> typing.Iterable[bytes]:
nonlocal body, blocksize
encode = isinstance(body, io.TextIOBase)
while True:
datablock = body.read(blocksize)
if not datablock:
break
if encode:
datablock = datablock.encode("iso-8859-1")
yield datablock

# No body, we need to make a recommendation on 'Content-Length'
# based on whether that request method is expected to have
# a body or not.
Expand All @@ -213,23 +227,17 @@ def body_to_chunks(

# Bytes or strings become bytes
elif isinstance(body, (str, bytes)):
chunks = (to_bytes(body),)
content_length = len(chunks[0])
converted = to_bytes(body)
content_length = len(converted)

if force and len(converted) >= blocksize:
body = io.BytesIO(converted)
chunks = chunk_readable()
else:
chunks = (converted,)

# File-like object, TODO: use seek() and tell() for length?
elif hasattr(body, "read"):

def chunk_readable() -> typing.Iterable[bytes]:
nonlocal body, blocksize
encode = isinstance(body, io.TextIOBase)
while True:
datablock = body.read(blocksize)
if not datablock:
break
if encode:
datablock = datablock.encode("iso-8859-1")
yield datablock

chunks = chunk_readable()
content_length = None

Expand Down

0 comments on commit 5ec2221

Please sign in to comment.