Skip to content

Commit

Permalink
Add new protocol for ChannelHandler to get buffered bytes in the chan…
Browse files Browse the repository at this point in the history
…nel handler (#2918)

Add new protocol to get buffered bytes from `ChannelHandler`s and
`ChannelPipeline` API to query the buffered bytes from `ChannelHandler`s

### Motivation:

In #2849, a new `ChannelOption` is introduced to retrieve the number of
buffered outbound bytes in a `Channel`. However, this solution does not
account for bytes that may be buffered within individual
`ChannelHandler`s. This PR builds on #2849 by adding functionality to
audit buffered bytes residing in `ChannelHandler`s and exposing this
information through new `ChannelPipeline` APIs.

### Modifications:

- Two new protocols for `ChannelHandler` to audit buffered bytes for
inbound and outbound.
    - `NIOOutboundByteBufferingChannelHandler`
    - `NIOInboundByteBufferingChannelHandler`
- New `ChannelPipeline` APIs
    - outboundBufferedBytes()
- outboundBufferedBytes(in: ChannelHandlerContext) (only in
`syncOperations`)
    - inboundBufferedBytes()
- inboundBufferedBytes(in: ChannelHandlerContext) (only in
`syncOperations`)

### Result:

Users can now easily query the amount of bytes buffered in
`ChannelHandler`s using the new `ChannelPipeline` APIs, enhancing the
visibility of `ChannelHandler` performance.

---------

Co-authored-by: Cory Benfield <[email protected]>
  • Loading branch information
johnnzhou and Lukasa authored Oct 30, 2024
1 parent fdc3a31 commit 9356598
Show file tree
Hide file tree
Showing 3 changed files with 1,218 additions and 0 deletions.
16 changes: 16 additions & 0 deletions Sources/NIOCore/ChannelHandler.swift
Original file line number Diff line number Diff line change
Expand Up @@ -343,3 +343,19 @@ extension RemovableChannelHandler {
context.leavePipeline(removalToken: removalToken)
}
}

/// A `NIOOutboundByteBufferingChannelHandler` is a `ChannelHandler` that
/// reports the number of bytes buffered for outbound direction.
public protocol NIOOutboundByteBufferingChannelHandler {
/// The number of bytes buffered in the channel handler, which are queued to be sent to
/// the next outbound channel handler.
var outboundBufferedBytes: Int { get }
}

/// A `NIOInboundByteBufferingChannelHandler` is a `ChannelHandler` that
/// reports the number of bytes buffered for inbound direction.
public protocol NIOInboundByteBufferingChannelHandler {
/// The number of bytes buffered in the channel handler, which are queued to be sent to
/// the next inbound channel handler.
var inboundBufferedBytes: Int { get }
}
123 changes: 123 additions & 0 deletions Sources/NIOCore/ChannelPipeline.swift
Original file line number Diff line number Diff line change
Expand Up @@ -2186,3 +2186,126 @@ extension ChannelPipeline: CustomDebugStringConvertible {
return handlers
}
}

extension ChannelPipeline {
private enum BufferingDirection: Equatable {
case inbound
case outbound
}

/// Retrieve the total number of bytes buffered for outbound.
public func outboundBufferedBytes() -> EventLoopFuture<Int> {
let future: EventLoopFuture<Int>

if self.eventLoop.inEventLoop {
future = self.eventLoop.makeSucceededFuture(countAllBufferedBytes(direction: .outbound))
} else {
future = self.eventLoop.submit {
self.countAllBufferedBytes(direction: .outbound)
}
}

return future
}

/// Retrieve the total number of bytes buffered for inbound.
public func inboundBufferedBytes() -> EventLoopFuture<Int> {
let future: EventLoopFuture<Int>

if self.eventLoop.inEventLoop {
future = self.eventLoop.makeSucceededFuture(countAllBufferedBytes(direction: .inbound))
} else {
future = self.eventLoop.submit {
self.countAllBufferedBytes(direction: .inbound)
}
}

return future
}

private static func countBufferedBytes(context: ChannelHandlerContext, direction: BufferingDirection) -> Int? {
switch direction {
case .inbound:
guard let handler = context.handler as? NIOInboundByteBufferingChannelHandler else {
return nil
}
return handler.inboundBufferedBytes
case .outbound:
guard let handler = context.handler as? NIOOutboundByteBufferingChannelHandler else {
return nil
}
return handler.outboundBufferedBytes
}

}

private func countAllBufferedBytes(direction: BufferingDirection) -> Int {
self.eventLoop.assertInEventLoop()
var total = 0
var current = self.head?.next
switch direction {
case .inbound:
while let c = current, c !== self.tail {
if let inboundHandler = c.handler as? NIOInboundByteBufferingChannelHandler {
total += inboundHandler.inboundBufferedBytes
}
current = current?.next
}
case .outbound:
while let c = current, c !== self.tail {
if let outboundHandler = c.handler as? NIOOutboundByteBufferingChannelHandler {
total += outboundHandler.outboundBufferedBytes
}
current = current?.next
}
}

return total
}
}

extension ChannelPipeline.SynchronousOperations {
/// Retrieve the total number of bytes buffered for outbound.
///
/// - Important: This *must* be called on the event loop.
public func outboundBufferedBytes() -> Int {
self.eventLoop.assertInEventLoop()
return self._pipeline.countAllBufferedBytes(direction: .outbound)
}

/// Retrieve the number of outbound bytes buffered in the `ChannelHandler` associated with the given`ChannelHandlerContext`.
///
/// - Parameters:
/// - in: the `ChannelHandlerContext` from which the outbound buffered bytes of the `ChannelHandler` will be retrieved.
/// - Important: This *must* be called on the event loop.
///
/// - Returns: The number of bytes currently buffered in the `ChannelHandler` referenced by the `ChannelHandlerContext` parameter `in`.
/// If the `ChannelHandler` in the given `ChannelHandlerContext` does not conform to
/// `NIOOutboundByteBufferingChannelHandler`, this method will return `nil`.
public func outboundBufferedBytes(in context: ChannelHandlerContext) -> Int? {
self.eventLoop.assertInEventLoop()
return ChannelPipeline.countBufferedBytes(context: context, direction: .outbound)
}

/// Retrieve total number of bytes buffered for inbound.
///
/// - Important: This *must* be called on the event loop.
public func inboundBufferedBytes() -> Int {
self.eventLoop.assertInEventLoop()
return self._pipeline.countAllBufferedBytes(direction: .inbound)
}

/// Retrieve the number of inbound bytes buffered in the `ChannelHandler` associated with the given `ChannelHandlerContext`.
///
/// - Parameters:
/// - in: the `ChannelHandlerContext` from which the inbound buffered bytes of the `handler` will be retrieved.
/// - Important: This *must* be called on the event loop.
///
/// - Returns: The number of bytes currently buffered in the `ChannelHandler` referenced by the `ChannelHandlerContext` parameter `in`.
/// If the `ChannelHandler` in the given `ChannelHandlerContext` does not conform to
/// `NIOInboundByteBufferingChannelHandler`, this method will return `nil`.
public func inboundBufferedBytes(in context: ChannelHandlerContext) -> Int? {
self.eventLoop.assertInEventLoop()
return ChannelPipeline.countBufferedBytes(context: context, direction: .inbound)
}
}
Loading

0 comments on commit 9356598

Please sign in to comment.