Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into cb-isolated
Browse files Browse the repository at this point in the history
  • Loading branch information
Lukasa committed Nov 17, 2024
2 parents ccff9c8 + d836385 commit 8a67fe3
Show file tree
Hide file tree
Showing 9 changed files with 328 additions and 79 deletions.
12 changes: 7 additions & 5 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ name: Main
on:
push:
branches: [main]
schedule:
- cron: "0 8,20 * * *"

jobs:
unit-tests:
Expand All @@ -16,18 +18,18 @@ jobs:
linux_nightly_6_0_arguments_override: "--explicit-target-dependency-import-check error"
linux_nightly_main_arguments_override: "--explicit-target-dependency-import-check error"

cxx-interop:
name: Cxx interop
# Workaround https://github.com/nektos/act/issues/1875
uses: apple/swift-nio/.github/workflows/cxx_interop.yml@main

benchmarks:
name: Benchmarks
# Workaround https://github.com/nektos/act/issues/1875
uses: apple/swift-nio/.github/workflows/benchmarks.yml@main
with:
benchmark_package_path: "Benchmarks"

cxx-interop:
name: Cxx interop
# Workaround https://github.com/nektos/act/issues/1875
uses: apple/swift-nio/.github/workflows/cxx_interop.yml@main

integration-tests:
name: Integration Tests
# Workaround https://github.com/nektos/act/issues/1875
Expand Down
5 changes: 0 additions & 5 deletions .github/workflows/pull_request.yml
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,3 @@ jobs:
with:
name: "Integration tests"
matrix_linux_command: "apt-get update -y -q && apt-get install -y -q lsof dnsutils netcat-openbsd net-tools curl jq && ./scripts/integration_tests.sh"

swift-6-language-mode:
name: Swift 6 Language Mode
# Workaround https://github.com/nektos/act/issues/1875
uses: apple/swift-nio/.github/workflows/swift_6_language_mode.yml@main
32 changes: 0 additions & 32 deletions .github/workflows/scheduled.yml

This file was deleted.

25 changes: 25 additions & 0 deletions Sources/NIOHTTP1/NIOTypedHTTPServerUpgradeHandler.swift
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,25 @@ public final class NIOTypedHTTPServerUpgradeHandler<UpgradeResult: Sendable>: Ch
}
}

public func userInboundEventTriggered(context: ChannelHandlerContext, event: Any) {
switch event {
case let evt as ChannelEvent where evt == ChannelEvent.inputClosed:
// The remote peer half-closed the channel during the upgrade. Should we close the other side
switch self.stateMachine.inputClosed() {
case .close:
context.close(promise: nil)
self.upgradeResultPromise.fail(ChannelError.inputClosed)
case .continue:
break
case .fireInputClosedEvent:
context.fireUserInboundEventTriggered(event)
}

default:
context.fireUserInboundEventTriggered(event)
}
}

private func channelRead(context: ChannelHandlerContext, requestPart: HTTPServerRequestPart) {
switch self.stateMachine.channelReadRequestPart(requestPart) {
case .failUpgradePromise(let error):
Expand Down Expand Up @@ -399,13 +418,19 @@ public final class NIOTypedHTTPServerUpgradeHandler<UpgradeResult: Sendable>: Ch
private func unbuffer(context: ChannelHandlerContext) {
while true {
switch self.stateMachine.unbuffer() {
case .close:
context.close(promise: nil)

case .fireChannelRead(let data):
context.fireChannelRead(data)

case .fireChannelReadCompleteAndRemoveHandler:
context.fireChannelReadComplete()
context.pipeline.removeHandler(self, promise: nil)
return

case .fireInputClosedEvent:
context.fireUserInboundEventTriggered(ChannelEvent.inputClosed)
}
}
}
Expand Down
83 changes: 72 additions & 11 deletions Sources/NIOHTTP1/NIOTypedHTTPServerUpgraderStateMachine.swift
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,15 @@ struct NIOTypedHTTPServerUpgraderStateMachine<UpgradeResult> {
/// The state before we received a TLSUserEvent. We are just forwarding any read at this point.
case initial

enum BufferedState {
case data(NIOAny)
case inputClosed
}

@usableFromInline
struct AwaitingUpgrader {
var seenFirstRequest: Bool
var buffer: Deque<NIOAny>
var buffer: Deque<BufferedState>
}

/// The request head has been received. We're currently running the future chain awaiting an upgrader.
Expand All @@ -37,22 +42,22 @@ struct NIOTypedHTTPServerUpgraderStateMachine<UpgradeResult> {
var requestHead: HTTPRequestHead
var responseHeaders: HTTPHeaders
var proto: String
var buffer: Deque<NIOAny>
var buffer: Deque<BufferedState>
}

/// We have an upgrader, which means we can begin upgrade we are just waiting for the request end.
case upgraderReady(UpgraderReady)

@usableFromInline
struct Upgrading {
var buffer: Deque<NIOAny>
var buffer: Deque<BufferedState>
}
/// We are either running the upgrading handler.
case upgrading(Upgrading)

@usableFromInline
struct Unbuffering {
var buffer: Deque<NIOAny>
var buffer: Deque<BufferedState>
}
case unbuffering(Unbuffering)

Expand Down Expand Up @@ -99,7 +104,7 @@ struct NIOTypedHTTPServerUpgraderStateMachine<UpgradeResult> {
if awaitingUpgrader.seenFirstRequest {
// We should buffer the data since we have seen the full request.
self.state = .modifying
awaitingUpgrader.buffer.append(data)
awaitingUpgrader.buffer.append(.data(data))
self.state = .awaitingUpgrader(awaitingUpgrader)
return nil
} else {
Expand All @@ -114,7 +119,7 @@ struct NIOTypedHTTPServerUpgraderStateMachine<UpgradeResult> {

case .unbuffering(var unbuffering):
self.state = .modifying
unbuffering.buffer.append(data)
unbuffering.buffer.append(.data(data))
self.state = .unbuffering(unbuffering)
return nil

Expand All @@ -125,7 +130,7 @@ struct NIOTypedHTTPServerUpgraderStateMachine<UpgradeResult> {
// We got a read while running ugprading.
// We have to buffer the read to unbuffer it afterwards
self.state = .modifying
upgrading.buffer.append(data)
upgrading.buffer.append(.data(data))
self.state = .upgrading(upgrading)
return nil

Expand Down Expand Up @@ -167,8 +172,8 @@ struct NIOTypedHTTPServerUpgraderStateMachine<UpgradeResult> {
guard requestedProtocols.count > 0 else {
// We have to buffer now since we got the request head but are not upgrading.
// The user is configuring the HTTP pipeline now.
var buffer = Deque<NIOAny>()
buffer.append(NIOAny(requestPart))
var buffer = Deque<State.BufferedState>()
buffer.append(.data(NIOAny(requestPart)))
self.state = .upgrading(.init(buffer: buffer))
return .runNotUpgradingInitializer
}
Expand Down Expand Up @@ -364,8 +369,10 @@ struct NIOTypedHTTPServerUpgraderStateMachine<UpgradeResult> {

@usableFromInline
enum UnbufferAction {
case close
case fireChannelRead(NIOAny)
case fireChannelReadCompleteAndRemoveHandler
case fireInputClosedEvent
}

@inlinable
Expand All @@ -379,8 +386,12 @@ struct NIOTypedHTTPServerUpgraderStateMachine<UpgradeResult> {

if let element = unbuffering.buffer.popFirst() {
self.state = .unbuffering(unbuffering)

return .fireChannelRead(element)
switch element {
case .data(let data):
return .fireChannelRead(data)
case .inputClosed:
return .fireInputClosedEvent
}
} else {
self.state = .finished

Expand All @@ -393,5 +404,55 @@ struct NIOTypedHTTPServerUpgraderStateMachine<UpgradeResult> {
}
}

@usableFromInline
enum InputClosedAction {
case close
case `continue`
case fireInputClosedEvent
}

@inlinable
mutating func inputClosed() -> InputClosedAction {
switch self.state {
case .initial:
self.state = .finished
return .close

case .awaitingUpgrader(var awaitingUpgrader):
if awaitingUpgrader.seenFirstRequest {
// We should buffer the input close since we have seen the full request.
awaitingUpgrader.buffer.append(.inputClosed)
self.state = .awaitingUpgrader(awaitingUpgrader)
return .continue
} else {
// We shouldn't buffer. This means we were still expecting HTTP parts.
return .close
}

case .upgrading(var upgrading):
upgrading.buffer.append(.inputClosed)
self.state = .upgrading(upgrading)
return .continue

case .upgraderReady:
// if the state is `upgraderReady` we have received a `.head` but not an `.end`.
// If input is closed then there is no way to move this forward so we should
// close.
self.state = .finished
return .close

case .unbuffering(var unbuffering):
unbuffering.buffer.append(.inputClosed)
self.state = .unbuffering(unbuffering)
return .continue

case .finished:
return .fireInputClosedEvent

case .modifying:
fatalError("Internal inconsistency in HTTPServerUpgradeStateMachine")
}
}

}
#endif
Loading

0 comments on commit 8a67fe3

Please sign in to comment.