Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backport: Add new protocol for ChannelHandler to get buffered bytes in the channel handler (#2918) #2982

Merged
merged 1 commit into from
Nov 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions Sources/NIOCore/ChannelHandler.swift
Original file line number Diff line number Diff line change
Expand Up @@ -343,3 +343,19 @@ extension RemovableChannelHandler {
context.leavePipeline(removalToken: removalToken)
}
}

/// A `NIOOutboundByteBufferingChannelHandler` is a `ChannelHandler` that
/// reports the number of bytes buffered for outbound direction.
public protocol NIOOutboundByteBufferingChannelHandler {
/// The number of bytes buffered in the channel handler, which are queued to be sent to
/// the next outbound channel handler.
var outboundBufferedBytes: Int { get }
}

/// A `NIOInboundByteBufferingChannelHandler` is a `ChannelHandler` that
/// reports the number of bytes buffered for inbound direction.
public protocol NIOInboundByteBufferingChannelHandler {
/// The number of bytes buffered in the channel handler, which are queued to be sent to
/// the next inbound channel handler.
var inboundBufferedBytes: Int { get }
}
123 changes: 123 additions & 0 deletions Sources/NIOCore/ChannelPipeline.swift
Original file line number Diff line number Diff line change
Expand Up @@ -2089,3 +2089,126 @@ extension ChannelPipeline: CustomDebugStringConvertible {
return handlers
}
}

extension ChannelPipeline {
private enum BufferingDirection: Equatable {
case inbound
case outbound
}

/// Retrieve the total number of bytes buffered for outbound.
public func outboundBufferedBytes() -> EventLoopFuture<Int> {
let future: EventLoopFuture<Int>

if self.eventLoop.inEventLoop {
future = self.eventLoop.makeSucceededFuture(countAllBufferedBytes(direction: .outbound))
} else {
future = self.eventLoop.submit {
self.countAllBufferedBytes(direction: .outbound)
}
}

return future
}

/// Retrieve the total number of bytes buffered for inbound.
public func inboundBufferedBytes() -> EventLoopFuture<Int> {
let future: EventLoopFuture<Int>

if self.eventLoop.inEventLoop {
future = self.eventLoop.makeSucceededFuture(countAllBufferedBytes(direction: .inbound))
} else {
future = self.eventLoop.submit {
self.countAllBufferedBytes(direction: .inbound)
}
}

return future
}

private static func countBufferedBytes(context: ChannelHandlerContext, direction: BufferingDirection) -> Int? {
switch direction {
case .inbound:
guard let handler = context.handler as? NIOInboundByteBufferingChannelHandler else {
return nil
}
return handler.inboundBufferedBytes
case .outbound:
guard let handler = context.handler as? NIOOutboundByteBufferingChannelHandler else {
return nil
}
return handler.outboundBufferedBytes
}

}

private func countAllBufferedBytes(direction: BufferingDirection) -> Int {
self.eventLoop.assertInEventLoop()
var total = 0
var current = self.head?.next
switch direction {
case .inbound:
while let c = current, c !== self.tail {
if let inboundHandler = c.handler as? NIOInboundByteBufferingChannelHandler {
total += inboundHandler.inboundBufferedBytes
}
current = current?.next
}
case .outbound:
while let c = current, c !== self.tail {
if let outboundHandler = c.handler as? NIOOutboundByteBufferingChannelHandler {
total += outboundHandler.outboundBufferedBytes
}
current = current?.next
}
}

return total
}
}

extension ChannelPipeline.SynchronousOperations {
/// Retrieve the total number of bytes buffered for outbound.
///
/// - Important: This *must* be called on the event loop.
public func outboundBufferedBytes() -> Int {
self.eventLoop.assertInEventLoop()
return self._pipeline.countAllBufferedBytes(direction: .outbound)
}

/// Retrieve the number of outbound bytes buffered in the `ChannelHandler` associated with the given`ChannelHandlerContext`.
///
/// - Parameters:
/// - in: the `ChannelHandlerContext` from which the outbound buffered bytes of the `ChannelHandler` will be retrieved.
/// - Important: This *must* be called on the event loop.
///
/// - Returns: The number of bytes currently buffered in the `ChannelHandler` referenced by the `ChannelHandlerContext` parameter `in`.
/// If the `ChannelHandler` in the given `ChannelHandlerContext` does not conform to
/// `NIOOutboundByteBufferingChannelHandler`, this method will return `nil`.
public func outboundBufferedBytes(in context: ChannelHandlerContext) -> Int? {
self.eventLoop.assertInEventLoop()
return ChannelPipeline.countBufferedBytes(context: context, direction: .outbound)
}

/// Retrieve total number of bytes buffered for inbound.
///
/// - Important: This *must* be called on the event loop.
public func inboundBufferedBytes() -> Int {
self.eventLoop.assertInEventLoop()
return self._pipeline.countAllBufferedBytes(direction: .inbound)
}

/// Retrieve the number of inbound bytes buffered in the `ChannelHandler` associated with the given `ChannelHandlerContext`.
///
/// - Parameters:
/// - in: the `ChannelHandlerContext` from which the inbound buffered bytes of the `handler` will be retrieved.
/// - Important: This *must* be called on the event loop.
///
/// - Returns: The number of bytes currently buffered in the `ChannelHandler` referenced by the `ChannelHandlerContext` parameter `in`.
/// If the `ChannelHandler` in the given `ChannelHandlerContext` does not conform to
/// `NIOInboundByteBufferingChannelHandler`, this method will return `nil`.
public func inboundBufferedBytes(in context: ChannelHandlerContext) -> Int? {
self.eventLoop.assertInEventLoop()
return ChannelPipeline.countBufferedBytes(context: context, direction: .inbound)
}
}
Loading
Loading