Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ChannelHandler: provide static (un)wrap(In|Out)bound(In|Out) #2791

Merged
merged 2 commits into from
Jul 18, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Benchmarks/Benchmarks/NIOPosixBenchmarks/TCPEcho.swift
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ private final class EchoRequestChannelHandler: ChannelInboundHandler {
}

func channelRead(context: ChannelHandlerContext, data: NIOAny) {
let buffer = self.unwrapInboundIn(data)
let buffer = Self.unwrapInboundIn(data)
self.receivedData += buffer.readableBytes

if self.receivedData == self.numberOfWrites * self.messageSize {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,14 @@ final class RepeatedRequests: ChannelInboundHandler {
}

func channelRead(context: ChannelHandlerContext, data: NIOAny) {
let respPart = self.unwrapInboundIn(data)
let respPart = Self.unwrapInboundIn(data)
if case .end(nil) = respPart {
if self.remainingNumberOfRequests <= 0 {
context.channel.close().map { self.numberOfRequests - self.remainingNumberOfRequests }.cascade(to: self.isDonePromise)
} else {
self.remainingNumberOfRequests -= 1
context.write(self.wrapOutboundOut(.head(RepeatedRequests.requestHead)), promise: nil)
context.writeAndFlush(self.wrapOutboundOut(.end(nil)), promise: nil)
context.write(Self.wrapOutboundOut(.head(RepeatedRequests.requestHead)), promise: nil)
context.writeAndFlush(Self.wrapOutboundOut(.end(nil)), promise: nil)
}
}
}
Expand Down Expand Up @@ -94,10 +94,10 @@ private final class SimpleHTTPServer: ChannelInboundHandler {
}

public func channelRead(context: ChannelHandlerContext, data: NIOAny) {
if case .head(let req) = self.unwrapInboundIn(data), req.uri == "/allocation-test-1" {
context.write(self.wrapOutboundOut(.head(self.responseHead)), promise: nil)
context.write(self.wrapOutboundOut(.body(.byteBuffer(self.responseBody(allocator: context.channel.allocator)))), promise: nil)
context.writeAndFlush(self.wrapOutboundOut(.end(nil)), promise: nil)
if case .head(let req) = Self.unwrapInboundIn(data), req.uri == "/allocation-test-1" {
context.write(Self.wrapOutboundOut(.head(self.responseHead)), promise: nil)
context.write(Self.wrapOutboundOut(.body(.byteBuffer(self.responseBody(allocator: context.channel.allocator)))), promise: nil)
context.writeAndFlush(Self.wrapOutboundOut(.end(nil)), promise: nil)
}
}
}
Expand Down Expand Up @@ -200,7 +200,7 @@ enum UDPShared {
// Forward the data.
let envolope = AddressedEnvelope<ByteBuffer>(remoteAddress: remoteAddress, data: buffer)

context.writeAndFlush(self.wrapOutboundOut(envolope), promise: nil)
context.writeAndFlush(Self.wrapOutboundOut(envolope), promise: nil)
} else {
// We're all done - hurrah!
context.close(promise: nil)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ fileprivate final class ReceiveAndCloseHandler: ChannelInboundHandler {
public typealias OutboundOut = ByteBuffer

func channelRead(context: ChannelHandlerContext, data: NIOAny) {
let byteBuffer = self.unwrapInboundIn(data)
let byteBuffer = Self.unwrapInboundIn(data)
precondition(byteBuffer.readableBytes == 1)
context.channel.close(promise: nil)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ fileprivate final class ClientHandler: ChannelInboundHandler {
// Send the data with ECN
let metadata = AddressedEnvelope<ByteBuffer>.Metadata(ecnState: .transportCapableFlag1)
let envelope = AddressedEnvelope<ByteBuffer>(remoteAddress: remoteAddress, data: buffer, metadata: metadata)
clientChannel.writeAndFlush(self.wrapOutboundOut(envelope), promise: nil)
clientChannel.writeAndFlush(Self.wrapOutboundOut(envelope), promise: nil)
}

func sendBytesAndWaitForReply(clientChannel: Channel) -> Int {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ class UnboxingChannelHandler: ChannelInboundHandler {
typealias InboundOut = WebSocketFrame

func channelRead(context: ChannelHandlerContext, data: NIOAny) {
let data = self.unwrapInboundIn(data)
context.fireChannelRead(self.wrapInboundOut(data))
let data = Self.unwrapInboundIn(data)
context.fireChannelRead(Self.wrapInboundOut(data))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ private final class PongDecoder: ByteToMessageDecoder {

public func decode(context: ChannelHandlerContext, buffer: inout ByteBuffer) -> DecodingState {
if let ping = buffer.readInteger(as: UInt8.self) {
context.fireChannelRead(self.wrapInboundOut(ping))
context.fireChannelRead(Self.wrapInboundOut(ping))
return .continue
} else {
return .needMoreData
Expand Down Expand Up @@ -65,16 +65,16 @@ private final class PingHandler: ChannelInboundHandler {
self.pingBuffer = context.channel.allocator.buffer(capacity: 1)
self.pingBuffer.writeInteger(PingHandler.pingCode)

context.writeAndFlush(self.wrapOutboundOut(self.pingBuffer), promise: nil)
context.writeAndFlush(Self.wrapOutboundOut(self.pingBuffer), promise: nil)
}

public func channelRead(context: ChannelHandlerContext, data: NIOAny) {
var buf = self.unwrapInboundIn(data)
var buf = Self.unwrapInboundIn(data)
if buf.readableBytes == 1 &&
buf.readInteger(as: UInt8.self) == PongHandler.pongCode {
if self.remainingNumberOfRequests > 0 {
self.remainingNumberOfRequests -= 1
context.writeAndFlush(self.wrapOutboundOut(self.pingBuffer), promise: nil)
context.writeAndFlush(Self.wrapOutboundOut(self.pingBuffer), promise: nil)
} else {
context.close(promise: self.allDone)
}
Expand Down Expand Up @@ -102,7 +102,7 @@ private final class PongHandler: ChannelInboundHandler {
}

func channelRead(context: ChannelHandlerContext, data: NIOAny) {
let data = self.unwrapInboundIn(data)
let data = Self.unwrapInboundIn(data)
if data == PingHandler.pingCode {
context.writeAndFlush(NIOAny(self.pongBuffer), promise: nil)
} else {
Expand Down
12 changes: 6 additions & 6 deletions Sources/NIOAsyncAwaitDemo/FullRequestResponse.swift
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ public final class MakeFullRequestHandler: ChannelOutboundHandler {
public typealias OutboundIn = HTTPRequestHead

public func write(context: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise<Void>?) {
let req = self.unwrapOutboundIn(data)
let req = Self.unwrapOutboundIn(data)

context.write(self.wrapOutboundOut(.head(req)), promise: nil)
context.write(self.wrapOutboundOut(.end(nil)), promise: promise)
context.write(Self.wrapOutboundOut(.head(req)), promise: nil)
context.write(Self.wrapOutboundOut(.end(nil)), promise: promise)
}
}

Expand Down Expand Up @@ -97,7 +97,7 @@ public final class RequestResponseHandler<Request, Response>: ChannelDuplexHandl
return
}

let response = self.unwrapInboundIn(data)
let response = Self.unwrapInboundIn(data)
let promise = self.promiseBuffer.removeFirst()

promise.succeed(response)
Expand All @@ -118,15 +118,15 @@ public final class RequestResponseHandler<Request, Response>: ChannelDuplexHandl
}

public func write(context: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise<Void>?) {
let (request, responsePromise) = self.unwrapOutboundIn(data)
let (request, responsePromise) = Self.unwrapOutboundIn(data)
switch self.state {
case .error(let error):
assert(self.promiseBuffer.count == 0)
responsePromise.fail(error)
promise?.fail(error)
case .operational:
self.promiseBuffer.append(responsePromise)
context.write(self.wrapOutboundOut(request), promise: promise)
context.write(Self.wrapOutboundOut(request), promise: promise)
}
}
}
2 changes: 1 addition & 1 deletion Sources/NIOChatClient/main.swift
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ private final class ChatHandler: ChannelInboundHandler {
}

public func channelRead(context: ChannelHandlerContext, data: NIOAny) {
var buffer = self.unwrapInboundIn(data)
var buffer = Self.unwrapInboundIn(data)
while let byte: UInt8 = buffer.readInteger() {
printByte(byte)
}
Expand Down
6 changes: 3 additions & 3 deletions Sources/NIOChatServer/main.swift
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ final class LineDelimiterCodec: ByteToMessageDecoder {
public func decode(context: ChannelHandlerContext, buffer: inout ByteBuffer) throws -> DecodingState {
let readable = buffer.withUnsafeReadableBytes { $0.firstIndex(of: newLine) }
if let r = readable {
context.fireChannelRead(self.wrapInboundOut(buffer.readSlice(length: r + 1)!))
context.fireChannelRead(Self.wrapInboundOut(buffer.readSlice(length: r + 1)!))
return .continue
}
return .needMoreData
Expand Down Expand Up @@ -64,7 +64,7 @@ final class ChatHandler: ChannelInboundHandler {

var buffer = channel.allocator.buffer(capacity: 64)
buffer.writeString("(ChatServer) - Welcome to: \(context.localAddress!)\n")
context.writeAndFlush(self.wrapOutboundOut(buffer), promise: nil)
context.writeAndFlush(Self.wrapOutboundOut(buffer), promise: nil)
}

public func channelInactive(context: ChannelHandlerContext) {
Expand All @@ -79,7 +79,7 @@ final class ChatHandler: ChannelInboundHandler {

public func channelRead(context: ChannelHandlerContext, data: NIOAny) {
let id = ObjectIdentifier(context.channel)
var read = self.unwrapInboundIn(data)
var read = Self.unwrapInboundIn(data)

// 64 should be good enough for the ipaddress
var buffer = context.channel.allocator.buffer(capacity: read.readableBytes + 64)
Expand Down
6 changes: 3 additions & 3 deletions Sources/NIOCore/AsyncChannel/AsyncChannelHandler.swift
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ extension NIOAsyncChannelHandler: ChannelInboundHandler {

@inlinable
func channelRead(context: ChannelHandlerContext, data: NIOAny) {
let unwrapped = self.unwrapInboundIn(data)
let unwrapped = Self.unwrapInboundIn(data)

switch self.transformation {
case .syncWrapping(let transformation):
Expand Down Expand Up @@ -477,15 +477,15 @@ extension NIOAsyncChannelHandler {
@inlinable
func _doOutboundWrites(context: ChannelHandlerContext, writes: Deque<OutboundOut>) {
for write in writes {
context.write(self.wrapOutboundOut(write), promise: nil)
context.write(Self.wrapOutboundOut(write), promise: nil)
}

context.flush()
}

@inlinable
func _doOutboundWrite(context: ChannelHandlerContext, write: OutboundOut) {
context.write(self.wrapOutboundOut(write), promise: nil)
context.write(Self.wrapOutboundOut(write), promise: nil)
context.flush()
}
}
Expand Down
18 changes: 12 additions & 6 deletions Sources/NIOCore/Codec.swift
Original file line number Diff line number Diff line change
Expand Up @@ -235,10 +235,16 @@ extension ByteToMessageDecoder {
return buffer.storageCapacity > 1024 && (buffer.storageCapacity - buffer.readerIndex) < buffer.readerIndex
}

@inlinable
public func wrapInboundOut(_ value: InboundOut) -> NIOAny {
return NIOAny(value)
}


@inlinable
public static func wrapInboundOut(_ value: InboundOut) -> NIOAny {
return NIOAny(value)
}

public mutating func decodeLast(context: ChannelHandlerContext, buffer: inout ByteBuffer, seenEOF: Bool) throws -> DecodingState {
while try self.decode(context: context, buffer: &buffer) == .continue {}
return .needMoreData
Expand Down Expand Up @@ -503,7 +509,7 @@ extension ByteToMessageHandler: CanDequeueWrites where Decoder: WriteObservingBy
fileprivate func dequeueWrites() {
while self.queuedWrites.count > 0 {
// self.decoder can't be `nil`, this is only allowed to be called when we're not already on the stack
self.decoder!.write(data: self.unwrapOutboundIn(self.queuedWrites.removeFirst()))
self.decoder!.write(data: Self.unwrapOutboundIn(self.queuedWrites.removeFirst()))
}
}
}
Expand Down Expand Up @@ -639,7 +645,7 @@ extension ByteToMessageHandler: ChannelInboundHandler {

/// Calls `decode` until there is nothing left to decode.
public func channelRead(context: ChannelHandlerContext, data: NIOAny) {
let buffer = self.unwrapInboundIn(data)
let buffer = Self.unwrapInboundIn(data)
if case .error(let error) = self.state {
context.fireErrorCaught(ByteToMessageDecoderError.dataReceivedInErrorState(error, buffer))
return
Expand Down Expand Up @@ -696,7 +702,7 @@ extension ByteToMessageHandler: ChannelOutboundHandler, _ChannelOutboundHandler
public typealias OutboundIn = Decoder.OutboundIn
public func write(context: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise<Void>?) {
if self.decoder != nil {
let data = self.unwrapOutboundIn(data)
let data = Self.unwrapOutboundIn(data)
assert(self.queuedWrites.isEmpty)
self.decoder!.write(data: data)
} else {
Expand Down Expand Up @@ -805,12 +811,12 @@ extension MessageToByteHandler {
// there's actually some work to do here
break
}
let data = self.unwrapOutboundIn(data)
let data = Self.unwrapOutboundIn(data)

do {
self.buffer!.clear()
try self.encoder.encode(data: data, out: &self.buffer!)
context.write(self.wrapOutboundOut(self.buffer!), promise: promise)
context.write(Self.wrapOutboundOut(self.buffer!), promise: promise)
} catch {
self.state = .error(error)
promise?.fail(error)
Expand Down
6 changes: 3 additions & 3 deletions Sources/NIOCore/NIOAny.swift
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@
/// dynamically at run-time. Yet, we assert that in any configuration the channel handler before
/// `SandwichHandler` does actually send us a stream of `Bacon`.
/// */
/// let bacon = self.unwrapInboundIn(data) /* `Bacon` or crash */
/// let bacon = Self.unwrapInboundIn(data) /* `Bacon` or crash */
/// let sandwich = makeSandwich(bacon)
/// context.fireChannelRead(self.wrapInboundOut(sandwich)) /* as promised we deliver a wrapped `Sandwich` */
/// context.fireChannelRead(Self.wrapInboundOut(sandwich)) /* as promised we deliver a wrapped `Sandwich` */
/// }
/// }
public struct NIOAny {
Expand All @@ -48,7 +48,7 @@ public struct NIOAny {

/// Wrap a value in a `NIOAny`. In most cases you should not create a `NIOAny` directly using this constructor.
/// The abstraction that accepts values of type `NIOAny` must also provide a mechanism to do the wrapping. An
/// example is a `ChannelInboundHandler` which provides `self.wrapInboundOut(aValueOfTypeInboundOut)`.
/// example is a `ChannelInboundHandler` which provides `Self.wrapInboundOut(aValueOfTypeInboundOut)`.
@inlinable
public init<T>(_ value: T) {
self._storage = _NIOAny(value)
Expand Down
20 changes: 10 additions & 10 deletions Sources/NIOCore/SingleStepByteToMessageDecoder.swift
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public protocol NIOSingleStepByteToMessageDecoder: ByteToMessageDecoder {
extension NIOSingleStepByteToMessageDecoder {
public mutating func decode(context: ChannelHandlerContext, buffer: inout ByteBuffer) throws -> DecodingState {
if let message = try self.decode(buffer: &buffer) {
context.fireChannelRead(self.wrapInboundOut(message))
context.fireChannelRead(Self.wrapInboundOut(message))
return .continue
} else {
return .needMoreData
Expand All @@ -65,7 +65,7 @@ extension NIOSingleStepByteToMessageDecoder {

public mutating func decodeLast(context: ChannelHandlerContext, buffer: inout ByteBuffer, seenEOF: Bool) throws -> DecodingState {
if let message = try self.decodeLast(buffer: &buffer, seenEOF: seenEOF) {
context.fireChannelRead(self.wrapInboundOut(message))
context.fireChannelRead(Self.wrapInboundOut(message))
return .continue
} else {
return .needMoreData
Expand Down Expand Up @@ -110,12 +110,12 @@ extension NIOSingleStepByteToMessageDecoder {
/// private var messageProcessor: NIOSingleStepByteToMessageProcessor<TwoByteStringCodec>? = nil
///
/// func channelRead(context: ChannelHandlerContext, data: NIOAny) {
/// let req = self.unwrapInboundIn(data)
/// let req = Self.unwrapInboundIn(data)
/// do {
/// switch req {
/// case .head(let head):
/// // simply forward on the head
/// context.fireChannelRead(self.wrapInboundOut(.head(head)))
/// context.fireChannelRead(Self.wrapInboundOut(.head(head)))
/// case .body(let body):
/// if self.messageProcessor == nil {
/// self.messageProcessor = NIOSingleStepByteToMessageProcessor(TwoByteStringCodec())
Expand All @@ -128,7 +128,7 @@ extension NIOSingleStepByteToMessageDecoder {
/// try self.messageProcessor?.finishProcessing(seenEOF: false) { message in
/// self.channelReadMessage(context: context, message: message)
/// }
/// context.fireChannelRead(self.wrapInboundOut(.end(trailers)))
/// context.fireChannelRead(Self.wrapInboundOut(.end(trailers)))
/// }
/// } catch {
/// context.fireErrorCaught(error)
Expand All @@ -137,7 +137,7 @@ extension NIOSingleStepByteToMessageDecoder {
///
/// // Forward on the body messages as whole messages
/// func channelReadMessage(context: ChannelHandlerContext, message: String) {
/// context.fireChannelRead(self.wrapInboundOut(.body(message)))
/// context.fireChannelRead(Self.wrapInboundOut(.body(message)))
/// }
/// }
///
Expand All @@ -148,7 +148,7 @@ extension NIOSingleStepByteToMessageDecoder {
/// var msgs: [String] = []
///
/// func channelRead(context: ChannelHandlerContext, data: NIOAny) {
/// let message = self.unwrapInboundIn(data)
/// let message = Self.unwrapInboundIn(data)
///
/// switch message {
/// case .head(let head):
Expand All @@ -165,13 +165,13 @@ extension NIOSingleStepByteToMessageDecoder {
/// var headers = HTTPHeaders()
/// headers.add(name: "content-length", value: String(responseBuffer.readableBytes))
///
/// context.write(self.wrapOutboundOut(HTTPServerResponsePart.head(
/// context.write(Self.wrapOutboundOut(HTTPServerResponsePart.head(
/// HTTPResponseHead(version: .http1_1,
/// status: .ok, headers: headers))), promise: nil)
///
/// context.write(self.wrapOutboundOut(HTTPServerResponsePart.body(
/// context.write(Self.wrapOutboundOut(HTTPServerResponsePart.body(
/// .byteBuffer(responseBuffer))), promise: nil)
/// context.writeAndFlush(self.wrapOutboundOut(HTTPServerResponsePart.end(nil)), promise: nil)
/// context.writeAndFlush(Self.wrapOutboundOut(HTTPServerResponsePart.end(nil)), promise: nil)
/// }
/// }
/// }
Expand Down
Loading
Loading