diff --git a/Sources/NIOCore/BSDSocketAPI.swift b/Sources/NIOCore/BSDSocketAPI.swift index 13f28b0103..0c23ba71b1 100644 --- a/Sources/NIOCore/BSDSocketAPI.swift +++ b/Sources/NIOCore/BSDSocketAPI.swift @@ -406,6 +406,9 @@ extension NIOBSDSocket.Option { /// Specifies the total per-socket buffer space reserved for receives. public static let so_rcvbuf = Self(rawValue: SO_RCVBUF) + /// Specifies the total per-socket buffer space reserved for sends. + public static let so_sndbuf = Self(rawValue: SO_SNDBUF) + /// Specifies the receive timeout. public static let so_rcvtimeo = Self(rawValue: SO_RCVTIMEO) diff --git a/Sources/NIOCore/ChannelOption.swift b/Sources/NIOCore/ChannelOption.swift index 066e1e5ce5..bec809b381 100644 --- a/Sources/NIOCore/ChannelOption.swift +++ b/Sources/NIOCore/ChannelOption.swift @@ -287,6 +287,13 @@ extension ChannelOptions { public typealias Value = Bool public init() {} } + + /// `BufferedWritableBytesOption` allows users to know the number of writable bytes currently buffered in the `Channel`. + public struct BufferedWritableBytesOption: ChannelOption, Sendable { + public typealias Value = Int + + public init() {} + } } } @@ -358,6 +365,9 @@ public struct ChannelOptions: Sendable { /// - seealso: `ReceivePacketInfo` public static let receivePacketInfo = Types.ReceivePacketInfo() + + /// - seealso: `BufferedWritableBytesOption` + public static let bufferedWritableBytes = Types.BufferedWritableBytesOption() } /// - seealso: `SocketOption`. @@ -451,6 +461,11 @@ extension ChannelOption where Self == ChannelOptions.Types.ReceivePacketInfo { public static var receivePacketInfo: Self { .init() } } +/// - seealso: `BufferedWritableBytesOption` +extension ChannelOption where Self == ChannelOptions.Types.BufferedWritableBytesOption { + public static var bufferedWritableBytes: Self { .init() } +} + extension ChannelOptions { /// A type-safe storage facility for `ChannelOption`s. You will only ever need this if you implement your own /// `Channel` that needs to store `ChannelOption`s. diff --git a/Sources/NIOEmbedded/AsyncTestingChannel.swift b/Sources/NIOEmbedded/AsyncTestingChannel.swift index 087f9a0ea5..694b033ec3 100644 --- a/Sources/NIOEmbedded/AsyncTestingChannel.swift +++ b/Sources/NIOEmbedded/AsyncTestingChannel.swift @@ -581,6 +581,14 @@ public final class NIOAsyncTestingChannel: Channel { if option is ChannelOptions.Types.AllowRemoteHalfClosureOption { return self.allowRemoteHalfClosure as! Option.Value } + if option is ChannelOptions.Types.BufferedWritableBytesOption { + let result = self.channelcore.pendingOutboundBuffer.reduce(0) { partialResult, dataAndPromise in + let buffer = self.channelcore.unwrapData(dataAndPromise.0, as: ByteBuffer.self) + return partialResult + buffer.readableBytes + } + + return result as! Option.Value + } fatalError("option \(option) not supported") } diff --git a/Sources/NIOEmbedded/Embedded.swift b/Sources/NIOEmbedded/Embedded.swift index c2207da4d7..032b4a924a 100644 --- a/Sources/NIOEmbedded/Embedded.swift +++ b/Sources/NIOEmbedded/Embedded.swift @@ -865,6 +865,14 @@ public final class EmbeddedChannel: Channel { if option is ChannelOptions.Types.AllowRemoteHalfClosureOption { return self.allowRemoteHalfClosure as! Option.Value } + if option is ChannelOptions.Types.BufferedWritableBytesOption { + let result = self.channelcore.pendingOutboundBuffer.reduce(0) { partialResult, dataAndPromise in + let buffer = self.channelcore.unwrapData(dataAndPromise.0, as: ByteBuffer.self) + return partialResult + buffer.readableBytes + } + + return result as! Option.Value + } fatalError("option \(option) not supported") } diff --git a/Sources/NIOPosix/BaseStreamSocketChannel.swift b/Sources/NIOPosix/BaseStreamSocketChannel.swift index 605f551464..6f04ecec6d 100644 --- a/Sources/NIOPosix/BaseStreamSocketChannel.swift +++ b/Sources/NIOPosix/BaseStreamSocketChannel.swift @@ -76,6 +76,8 @@ class BaseStreamSocketChannel: BaseSocketChannel return self.pendingWrites.writeSpinCount as! Option.Value case _ as ChannelOptions.Types.WriteBufferWaterMarkOption: return self.pendingWrites.waterMark as! Option.Value + case _ as ChannelOptions.Types.BufferedWritableBytesOption: + return Int(self.pendingWrites.bufferedBytes) as! Option.Value default: return try super.getOption0(option) } diff --git a/Sources/NIOPosix/PendingDatagramWritesManager.swift b/Sources/NIOPosix/PendingDatagramWritesManager.swift index 31365ca987..8945df0692 100644 --- a/Sources/NIOPosix/PendingDatagramWritesManager.swift +++ b/Sources/NIOPosix/PendingDatagramWritesManager.swift @@ -451,6 +451,10 @@ final class PendingDatagramWritesManager: PendingWritesManager { self.state.isEmpty } + var bufferedBytes: Int64 { + self.state.bytes + } + private func add(_ pendingWrite: PendingDatagramWrite) -> Bool { assert(self.isOpen) self.state.append(pendingWrite) diff --git a/Sources/NIOPosix/PendingWritesManager.swift b/Sources/NIOPosix/PendingWritesManager.swift index 10c22cd204..02f82cbb1c 100644 --- a/Sources/NIOPosix/PendingWritesManager.swift +++ b/Sources/NIOPosix/PendingWritesManager.swift @@ -324,6 +324,10 @@ final class PendingStreamWritesManager: PendingWritesManager { self.state.isEmpty } + var bufferedBytes: Int64 { + self.state.bytes + } + /// Add a pending write alongside its promise. /// /// - parameters: diff --git a/Sources/NIOPosix/SocketChannel.swift b/Sources/NIOPosix/SocketChannel.swift index a299a8ec75..6c3cf5e1ee 100644 --- a/Sources/NIOPosix/SocketChannel.swift +++ b/Sources/NIOPosix/SocketChannel.swift @@ -676,6 +676,8 @@ final class DatagramChannel: BaseSocketChannel { throw ChannelError._operationUnsupported } return try self.socket.getUDPReceiveOffload() as! Option.Value + case _ as ChannelOptions.Types.BufferedWritableBytesOption: + return Int(self.pendingWrites.bufferedBytes) as! Option.Value default: return try super.getOption0(option) } diff --git a/Tests/NIOEmbeddedTests/AsyncTestingChannelTests.swift b/Tests/NIOEmbeddedTests/AsyncTestingChannelTests.swift index 92690716ca..0dea56abc5 100644 --- a/Tests/NIOEmbeddedTests/AsyncTestingChannelTests.swift +++ b/Tests/NIOEmbeddedTests/AsyncTestingChannelTests.swift @@ -551,6 +551,116 @@ class AsyncTestingChannelTests: XCTestCase { _ = try await channel.finish() await XCTAsyncAssertThrowsError(try await channel.finish()) } + + func testWriteOutboundEmptyBufferedByte() async throws { + let channel = NIOAsyncTestingChannel() + var buffered: ChannelOptions.Types.BufferedWritableBytesOption.Value = try await channel.getOption( + .bufferedWritableBytes + ) + XCTAssertEqual(0, buffered) + + let buf = channel.allocator.buffer(capacity: 10) + + channel.write(buf, promise: nil) + buffered = try await channel.getOption(.bufferedWritableBytes) + XCTAssertEqual(0, buffered) + + channel.flush() + buffered = try await channel.getOption(.bufferedWritableBytes) + XCTAssertEqual(0, buffered) + + try await XCTAsyncAssertEqual(buf, try await channel.waitForOutboundWrite(as: ByteBuffer.self)) + try await XCTAsyncAssertTrue(try await channel.finish().isClean) + } + + func testWriteOutboundBufferedBytesSingleWrite() async throws { + let channel = NIOAsyncTestingChannel() + var buf = channel.allocator.buffer(capacity: 10) + buf.writeString("hello") + + channel.write(buf, promise: nil) + var buffered: ChannelOptions.Types.BufferedWritableBytesOption.Value = try await channel.getOption( + .bufferedWritableBytes + ) + XCTAssertEqual(buf.readableBytes, buffered) + channel.flush() + + buffered = try await channel.getOption(.bufferedWritableBytes).get() + XCTAssertEqual(0, buffered) + try await XCTAsyncAssertEqual(buf, try await channel.waitForOutboundWrite(as: ByteBuffer.self)) + try await XCTAsyncAssertTrue(try await channel.finish().isClean) + } + + func testWriteOuboundBufferedBytesMultipleWrites() async throws { + let channel = NIOAsyncTestingChannel() + var buf = channel.allocator.buffer(capacity: 10) + buf.writeString("hello") + let totalCount = 5 + for _ in 0..] = (0..<4).map { (_: Int) in el.makePromise() } @@ -597,6 +606,7 @@ public final class ChannelTests: XCTestCase { _ = pwm.add(data: .byteBuffer(buffer), promise: ps[1]) _ = pwm.add(data: .byteBuffer(buffer), promise: ps[2]) _ = pwm.add(data: .byteBuffer(buffer), promise: ps[3]) + XCTAssertEqual(totalBytes, pwm.bufferedBytes) pwm.markFlushCheckpoint() var result = try assertExpectedWritability( @@ -610,6 +620,8 @@ public final class ChannelTests: XCTestCase { ) XCTAssertEqual(.couldNotWriteEverything, result.writeResult) + XCTAssertEqual(totalBytes - 1, pwm.bufferedBytes) + result = try assertExpectedWritability( pendingWritesManager: pwm, promises: ps, @@ -621,6 +633,7 @@ public final class ChannelTests: XCTestCase { ) XCTAssertEqual(.couldNotWriteEverything, result.writeResult) + XCTAssertEqual(totalBytes - 1 - 7, pwm.bufferedBytes) result = try assertExpectedWritability( pendingWritesManager: pwm, @@ -632,6 +645,7 @@ public final class ChannelTests: XCTestCase { promiseStates: [[true, true, true, true], [true, true, true, true]] ) XCTAssertEqual(.writtenCompletely, result.writeResult) + XCTAssertEqual(totalBytes - 1 - 7 - 8, pwm.bufferedBytes) } } @@ -652,6 +666,7 @@ public final class ChannelTests: XCTestCase { let ps: [EventLoopPromise] = (0..<1).map { (_: Int) in el.makePromise() } _ = pwm.add(data: .byteBuffer(buffer), promise: ps[0]) pwm.markFlushCheckpoint() + XCTAssertEqual(Int64(numberOfBytes), pwm.bufferedBytes) // below, we'll write 1 byte at a time. So the number of bytes offered should decrease by one. // The write operation should be repeated until we did it 1 + spin count times and then return `.writtenPartially`. @@ -666,6 +681,7 @@ public final class ChannelTests: XCTestCase { promiseStates: Array(repeating: [false], count: numberOfBytes) ) XCTAssertEqual(.couldNotWriteEverything, result.writeResult) + XCTAssertEqual(1, pwm.bufferedBytes) // we'll now write the one last byte and assert that all the writes are complete result = try assertExpectedWritability( @@ -678,6 +694,7 @@ public final class ChannelTests: XCTestCase { promiseStates: [[true]] ) XCTAssertEqual(.writtenCompletely, result.writeResult) + XCTAssertEqual(0, pwm.bufferedBytes) } } @@ -700,6 +717,7 @@ public final class ChannelTests: XCTestCase { _ = pwm.add(data: .byteBuffer(buffer), promise: p) return p } + XCTAssertEqual(Int64(numberOfBytes * buffer.readableBytes), pwm.bufferedBytes) pwm.markFlushCheckpoint() // this will create an `Array` like this (for `numberOfBytes == 4`) @@ -727,6 +745,7 @@ public final class ChannelTests: XCTestCase { promiseStates: expectedPromiseStates ) XCTAssertEqual(.couldNotWriteEverything, result.writeResult) + XCTAssertEqual(Int64(buffer.readableBytes), pwm.bufferedBytes) // we'll now write the one last byte and assert that all the writes are complete result = try assertExpectedWritability( @@ -739,6 +758,7 @@ public final class ChannelTests: XCTestCase { promiseStates: [Array(repeating: true, count: numberOfBytes)] ) XCTAssertEqual(.writtenCompletely, result.writeResult) + XCTAssertEqual(0, pwm.bufferedBytes) } } @@ -766,6 +786,9 @@ public final class ChannelTests: XCTestCase { for i in (0..] = (0..<3).map { (_: Int) in el.makePromise() } _ = pwm.add(data: .byteBuffer(buffer), promise: ps[0]) _ = pwm.add(data: .byteBuffer(buffer), promise: ps[1]) + let totalBytes = Int64(buffer.readableBytes * 2) + XCTAssertEqual(totalBytes, pwm.bufferedBytes) pwm.markFlushCheckpoint() _ = pwm.add(data: .byteBuffer(emptyBuffer), promise: ps[2]) + XCTAssertEqual(totalBytes, pwm.bufferedBytes) let result = try assertExpectedWritability( pendingWritesManager: pwm, @@ -812,6 +840,7 @@ public final class ChannelTests: XCTestCase { promiseStates: [[false, false, false], [false, false, false]] ) XCTAssertEqual(.couldNotWriteEverything, result.writeResult) + XCTAssertEqual(totalBytes - 2, pwm.bufferedBytes) pwm.failAll(error: ChannelError.operationUnsupported, close: true) @@ -840,6 +869,7 @@ public final class ChannelTests: XCTestCase { _ = pwm.add(data: .byteBuffer(buffer), promise: ps[0]) _ = pwm.add(data: .byteBuffer(buffer), promise: ps[1]) _ = pwm.add(data: .byteBuffer(buffer), promise: ps[2]) + XCTAssertEqual(Int64(buffer.readableBytes * 3), pwm.bufferedBytes) pwm.markFlushCheckpoint() let result = try assertExpectedWritability( @@ -852,6 +882,7 @@ public final class ChannelTests: XCTestCase { promiseStates: [[true, true, false], [true, true, true]] ) XCTAssertEqual(.writtenCompletely, result.writeResult) + XCTAssertEqual(0, pwm.bufferedBytes) } } @@ -876,11 +907,17 @@ public final class ChannelTests: XCTestCase { try withPendingStreamWritesManager { pwm in let ps: [EventLoopPromise] = (0..<3).map { (_: Int) in el.makePromise() } + var totalBytes: Int64 = 0 + // add 1.5x the writev limit _ = pwm.add(data: .byteBuffer(buffer), promise: ps[0]) _ = pwm.add(data: .byteBuffer(buffer), promise: ps[1]) + totalBytes += Int64(buffer.readableBytes * 2) + XCTAssertEqual(totalBytes, pwm.bufferedBytes) buffer.moveWriterIndex(to: 100) + totalBytes += Int64(buffer.readableBytes) _ = pwm.add(data: .byteBuffer(buffer), promise: ps[2]) + XCTAssertEqual(totalBytes, pwm.bufferedBytes) pwm.markFlushCheckpoint() @@ -915,6 +952,7 @@ public final class ChannelTests: XCTestCase { ] ) XCTAssertEqual(.writtenCompletely, result.writeResult) + XCTAssertEqual(0, pwm.bufferedBytes) pwm.markFlushCheckpoint() } } @@ -933,9 +971,14 @@ public final class ChannelTests: XCTestCase { XCTAssertNoThrow(try fh1.takeDescriptorOwnership()) XCTAssertNoThrow(try fh2.takeDescriptorOwnership()) } + var totalBytes: Int64 = 0 _ = pwm.add(data: .fileRegion(fr1), promise: ps[0]) + totalBytes += Int64(fr1.readableBytes) + XCTAssertEqual(totalBytes, pwm.bufferedBytes) pwm.markFlushCheckpoint() _ = pwm.add(data: .fileRegion(fr2), promise: ps[1]) + totalBytes += Int64(fr2.readableBytes) + XCTAssertEqual(totalBytes, pwm.bufferedBytes) var result = try assertExpectedWritability( pendingWritesManager: pwm, @@ -947,6 +990,8 @@ public final class ChannelTests: XCTestCase { promiseStates: [[true, false]] ) XCTAssertEqual(.writtenCompletely, result.writeResult) + totalBytes -= Int64(fr1.readableBytes) + XCTAssertEqual(totalBytes, pwm.bufferedBytes) result = try assertExpectedWritability( pendingWritesManager: pwm, @@ -958,7 +1003,7 @@ public final class ChannelTests: XCTestCase { promiseStates: [[true, false]] ) XCTAssertEqual(.writtenCompletely, result.writeResult) - + XCTAssertEqual(totalBytes, pwm.bufferedBytes) pwm.markFlushCheckpoint() result = try assertExpectedWritability( @@ -970,6 +1015,9 @@ public final class ChannelTests: XCTestCase { returns: [.processed(1), .processed(1)], promiseStates: [[true, false], [true, true]] ) + + totalBytes -= Int64(fr2.readableBytes) + XCTAssertEqual(totalBytes, pwm.bufferedBytes) XCTAssertEqual(.writtenCompletely, result.writeResult) } } @@ -986,6 +1034,7 @@ public final class ChannelTests: XCTestCase { XCTAssertNoThrow(try fh.takeDescriptorOwnership()) } _ = pwm.add(data: .fileRegion(fr), promise: ps[0]) + XCTAssertEqual(0, pwm.bufferedBytes) pwm.markFlushCheckpoint() let result = try assertExpectedWritability( @@ -997,6 +1046,8 @@ public final class ChannelTests: XCTestCase { returns: [.processed(0)], promiseStates: [[true]] ) + + XCTAssertEqual(0, pwm.bufferedBytes) XCTAssertEqual(.writtenCompletely, result.writeResult) } } @@ -1020,12 +1071,17 @@ public final class ChannelTests: XCTestCase { XCTAssertNoThrow(try fh2.takeDescriptorOwnership()) } + var totalBytes: Int64 = 0 _ = pwm.add(data: .byteBuffer(buffer), promise: ps[0]) _ = pwm.add(data: .byteBuffer(buffer), promise: ps[1]) _ = pwm.add(data: .fileRegion(fr1), promise: ps[2]) _ = pwm.add(data: .byteBuffer(buffer), promise: ps[3]) _ = pwm.add(data: .fileRegion(fr2), promise: ps[4]) - + totalBytes += Int64( + buffer.readableBytes + buffer.readableBytes + fr1.readableBytes + buffer.readableBytes + + fr2.readableBytes + ) + XCTAssertEqual(totalBytes, pwm.bufferedBytes) pwm.markFlushCheckpoint() var result = try assertExpectedWritability( @@ -1050,6 +1106,9 @@ public final class ChannelTests: XCTestCase { [true, true, true, true, false], ] ) + + totalBytes -= (4 + 4 + 0 + 4 + 6) + XCTAssertEqual(totalBytes, pwm.bufferedBytes) XCTAssertEqual(.couldNotWriteEverything, result.writeResult) result = try assertExpectedWritability( @@ -1061,6 +1120,9 @@ public final class ChannelTests: XCTestCase { returns: [.processed(4)], promiseStates: [[true, true, true, true, true]] ) + + totalBytes -= 4 + XCTAssertEqual(totalBytes, pwm.bufferedBytes) XCTAssertEqual(.writtenCompletely, result.writeResult) } } @@ -1076,6 +1138,7 @@ public final class ChannelTests: XCTestCase { let ps: [EventLoopPromise] = (0..<3).map { (_: Int) in el.makePromise() } pwm.markFlushCheckpoint() + XCTAssertEqual(0, pwm.bufferedBytes) // let's start with no writes and just a flush var result = try assertExpectedWritability( @@ -1091,10 +1154,11 @@ public final class ChannelTests: XCTestCase { // let's add a few writes but still without any promises _ = pwm.add(data: .byteBuffer(buffer), promise: ps[0]) _ = pwm.add(data: .byteBuffer(buffer), promise: ps[1]) - + XCTAssertEqual(Int64(buffer.readableBytes * 2), pwm.bufferedBytes) pwm.markFlushCheckpoint() _ = pwm.add(data: .byteBuffer(emptyBuffer), promise: ps[2]) + XCTAssertEqual(Int64(buffer.readableBytes * 2), pwm.bufferedBytes) result = try assertExpectedWritability( pendingWritesManager: pwm, @@ -1106,6 +1170,7 @@ public final class ChannelTests: XCTestCase { promiseStates: [[true, true, false]] ) XCTAssertEqual(.writtenCompletely, result.writeResult) + XCTAssertEqual(0, pwm.bufferedBytes) pwm.markFlushCheckpoint() @@ -1118,6 +1183,8 @@ public final class ChannelTests: XCTestCase { returns: [.processed(0)], promiseStates: [[true, true, true]] ) + + XCTAssertEqual(0, pwm.bufferedBytes) XCTAssertEqual(.writtenCompletely, result.writeResult) } } @@ -1131,8 +1198,10 @@ public final class ChannelTests: XCTestCase { let ps: [EventLoopPromise] = (0..<3).map { (_: Int) in el.makePromise() } _ = pwm.add(data: .byteBuffer(emptyBuffer), promise: ps[0]) _ = pwm.add(data: .byteBuffer(emptyBuffer), promise: ps[1]) + XCTAssertEqual(0, pwm.bufferedBytes) pwm.markFlushCheckpoint() _ = pwm.add(data: .byteBuffer(emptyBuffer), promise: ps[2]) + XCTAssertEqual(0, pwm.bufferedBytes) var result = try assertExpectedWritability( pendingWritesManager: pwm, @@ -1144,6 +1213,7 @@ public final class ChannelTests: XCTestCase { promiseStates: [[true, true, false]] ) XCTAssertEqual(.writtenCompletely, result.writeResult) + XCTAssertEqual(0, pwm.bufferedBytes) pwm.markFlushCheckpoint() @@ -1156,6 +1226,8 @@ public final class ChannelTests: XCTestCase { returns: [.processed(0)], promiseStates: [[true, true, true]] ) + + XCTAssertEqual(0, pwm.bufferedBytes) XCTAssertEqual(.writtenCompletely, result.writeResult) } } @@ -1170,8 +1242,10 @@ public final class ChannelTests: XCTestCase { let ps: [EventLoopPromise] = (0..<3).map { (_: Int) in el.makePromise() } _ = pwm.add(data: .byteBuffer(buffer), promise: ps[0]) _ = pwm.add(data: .byteBuffer(buffer), promise: ps[1]) + XCTAssertEqual(Int64(buffer.readableBytes * 2), pwm.bufferedBytes) pwm.markFlushCheckpoint() _ = pwm.add(data: .byteBuffer(buffer), promise: ps[2]) + XCTAssertEqual(Int64(buffer.readableBytes * 3), pwm.bufferedBytes) ps[0].futureResult.whenComplete { (_: Result) in pwm.failAll(error: ChannelError.inputClosed, close: true) @@ -1186,6 +1260,8 @@ public final class ChannelTests: XCTestCase { returns: [.processed(4)], promiseStates: [[true, true, true]] ) + + XCTAssertEqual(0, pwm.bufferedBytes) XCTAssertEqual(.writtenCompletely, result.writeResult) XCTAssertNoThrow(try ps[0].futureResult.wait()) XCTAssertThrowsError(try ps[1].futureResult.wait()) @@ -1200,9 +1276,12 @@ public final class ChannelTests: XCTestCase { buffer.writeString("1234") try withPendingStreamWritesManager { pwm in + var totalBytes: Int64 = 0 let ps: [EventLoopPromise] = (0...Socket.writevLimitIOVectors).map { (_: Int) in el.makePromise() } for p in ps { _ = pwm.add(data: .byteBuffer(buffer), promise: p) + totalBytes += Int64(buffer.readableBytes) + XCTAssertEqual(totalBytes, pwm.bufferedBytes) } pwm.markFlushCheckpoint() @@ -1218,7 +1297,10 @@ public final class ChannelTests: XCTestCase { Array(repeating: true, count: Socket.writevLimitIOVectors) + [false], ] ) + totalBytes -= Int64(buffer.readableBytes * Socket.writevLimitIOVectors) + XCTAssertEqual(totalBytes, pwm.bufferedBytes) XCTAssertEqual(.couldNotWriteEverything, result.writeResult) + result = try assertExpectedWritability( pendingWritesManager: pwm, promises: ps, @@ -1228,6 +1310,7 @@ public final class ChannelTests: XCTestCase { returns: [.processed(4)], promiseStates: [Array(repeating: true, count: Socket.writevLimitIOVectors + 1)] ) + XCTAssertEqual(0, pwm.bufferedBytes) XCTAssertEqual(.writtenCompletely, result.writeResult) } } @@ -1245,6 +1328,7 @@ public final class ChannelTests: XCTestCase { } _ = pwm.add(data: .fileRegion(fr), promise: ps[0]) + XCTAssertEqual(Int64(fr.readableBytes), pwm.bufferedBytes) pwm.markFlushCheckpoint() let result = try assertExpectedWritability( @@ -1256,6 +1340,8 @@ public final class ChannelTests: XCTestCase { returns: [.wouldBlock(8192)], promiseStates: [[true]] ) + + XCTAssertEqual(0, pwm.bufferedBytes) XCTAssertEqual(.writtenCompletely, result.writeResult) } } @@ -3329,6 +3415,68 @@ public final class ChannelTests: XCTestCase { XCTAssertNoThrow(try handler.becameUnwritable.futureResult.wait()) XCTAssertNoThrow(try handler.becameWritable.futureResult.wait()) } + + func testChannelCanReportWritableBufferedBytes() throws { + let group = MultiThreadedEventLoopGroup(numberOfThreads: 1) + defer { + XCTAssertNoThrow(try group.syncShutdownGracefully()) + } + + let server = try ServerBootstrap(group: group) + .serverChannelOption(ChannelOptions.socketOption(.so_reuseaddr), value: 1) + .bind(host: "localhost", port: 0) + .wait() + + let client = try ClientBootstrap(group: group) + .channelOption(ChannelOptions.socketOption(.so_reuseaddr), value: 1) + .connect(to: server.localAddress!) + .wait() + + let buffer = client.allocator.buffer(string: "abcd") + let writeCount = 3 + + let promises = (0..= 0 && bufferedAmount <= buffer.readableBytes * writeCount) + promises.append(client.write(NIOAny(buffer))) + bufferedAmount = try client.getOption(.bufferedWritableBytes).wait() + XCTAssertTrue( + bufferedAmount >= buffer.readableBytes && bufferedAmount <= buffer.readableBytes * (writeCount + 1) + ) + client.flush() + XCTAssertNoThrow(try EventLoopFuture.andAllSucceed(promises, on: client.eventLoop).wait()) + let bufferedAmountAfterFlush = try client.getOption(.bufferedWritableBytes).wait() + XCTAssertEqual(bufferedAmountAfterFlush, 0) + } } private final class FailRegistrationAndDelayCloseHandler: ChannelOutboundHandler { diff --git a/Tests/NIOPosixTests/DatagramChannelTests.swift b/Tests/NIOPosixTests/DatagramChannelTests.swift index 8db44985b4..2610903253 100644 --- a/Tests/NIOPosixTests/DatagramChannelTests.swift +++ b/Tests/NIOPosixTests/DatagramChannelTests.swift @@ -1650,6 +1650,68 @@ class DatagramChannelTests: XCTestCase { try self.testReceiveLargeBufferWithGRO(segments: 10, segmentSize: 1000, writes: 4, vectorReads: 4) } + func testChannelCanReportWritableBufferedBytes() throws { + let buffer = self.firstChannel.allocator.buffer(string: "abcd") + let data = AddressedEnvelope(remoteAddress: self.secondChannel.localAddress!, data: buffer) + let writeCount = 3 + + let promises = (0..(), name: "ByteReadRecorder") + } + .bind(host: "127.0.0.1", port: 0) + .wait() + let buffer = self.firstChannel.allocator.buffer(repeating: 0xff, count: 16) + let data = AddressedEnvelope(remoteAddress: self.secondChannel.localAddress!, data: buffer) + let writeCount = 10 + var promises: [EventLoopFuture] = [] + + for i in 0.. Bool { // Source code for UDP_GRO was added in Linux 5.0. However, this support is somewhat limited // and some sources indicate support was actually added in 5.10 (perhaps more widely diff --git a/Tests/NIOPosixTests/PendingDatagramWritesManagerTests.swift b/Tests/NIOPosixTests/PendingDatagramWritesManagerTests.swift index 988b235197..76bcb6db34 100644 --- a/Tests/NIOPosixTests/PendingDatagramWritesManagerTests.swift +++ b/Tests/NIOPosixTests/PendingDatagramWritesManagerTests.swift @@ -328,8 +328,10 @@ class PendingDatagramWritesManagerTests: XCTestCase { XCTAssertFalse(pwm.isEmpty) XCTAssertTrue(pwm.isFlushPending) + XCTAssertEqual(Int64(buffer.readableBytes), pwm.bufferedBytes) _ = pwm.add(envelope: AddressedEnvelope(remoteAddress: address, data: buffer), promise: ps[1]) + XCTAssertEqual(Int64(buffer.readableBytes), pwm.bufferedBytes) var result = try assertExpectedWritability( pendingWritesManager: pwm, @@ -353,6 +355,7 @@ class PendingDatagramWritesManagerTests: XCTestCase { promiseStates: [[true, false]] ) XCTAssertEqual(.writtenCompletely, result.writeResult) + XCTAssertEqual(Int64(buffer.readableBytes), pwm.bufferedBytes) pwm.markFlushCheckpoint() @@ -365,6 +368,7 @@ class PendingDatagramWritesManagerTests: XCTestCase { promiseStates: [[true, true]] ) XCTAssertEqual(.writtenCompletely, result.writeResult) + XCTAssertEqual(Int64(buffer.readableBytes), pwm.bufferedBytes) } } @@ -385,6 +389,8 @@ class PendingDatagramWritesManagerTests: XCTestCase { pwm.markFlushCheckpoint() _ = pwm.add(envelope: AddressedEnvelope(remoteAddress: firstAddress, data: emptyBuffer), promise: ps[2]) + XCTAssertEqual(8, pwm.bufferedBytes) + var result = try assertExpectedWritability( pendingWritesManager: pwm, promises: ps, @@ -394,6 +400,7 @@ class PendingDatagramWritesManagerTests: XCTestCase { promiseStates: [[true, true, false]] ) XCTAssertEqual(.writtenCompletely, result.writeResult) + XCTAssertEqual(0, pwm.bufferedBytes) pwm.markFlushCheckpoint() @@ -406,6 +413,7 @@ class PendingDatagramWritesManagerTests: XCTestCase { promiseStates: [[true, true, true]] ) XCTAssertEqual(.writtenCompletely, result.writeResult) + XCTAssertEqual(0, pwm.bufferedBytes) } } @@ -425,6 +433,7 @@ class PendingDatagramWritesManagerTests: XCTestCase { _ = pwm.add(envelope: AddressedEnvelope(remoteAddress: firstAddress, data: buffer), promise: ps[2]) _ = pwm.add(envelope: AddressedEnvelope(remoteAddress: secondAddress, data: buffer), promise: ps[3]) pwm.markFlushCheckpoint() + XCTAssertEqual(16, pwm.bufferedBytes) var result = try assertExpectedWritability( pendingWritesManager: pwm, @@ -439,6 +448,8 @@ class PendingDatagramWritesManagerTests: XCTestCase { ) XCTAssertEqual(.couldNotWriteEverything, result.writeResult) + XCTAssertEqual(12, pwm.bufferedBytes) + result = try assertExpectedWritability( pendingWritesManager: pwm, promises: ps, @@ -451,6 +462,7 @@ class PendingDatagramWritesManagerTests: XCTestCase { ) XCTAssertEqual(.couldNotWriteEverything, result.writeResult) + XCTAssertEqual(4, pwm.bufferedBytes) result = try assertExpectedWritability( pendingWritesManager: pwm, @@ -461,6 +473,7 @@ class PendingDatagramWritesManagerTests: XCTestCase { promiseStates: [[true, true, true, true]] ) XCTAssertEqual(.writtenCompletely, result.writeResult) + XCTAssertEqual(0, pwm.bufferedBytes) } } @@ -477,6 +490,7 @@ class PendingDatagramWritesManagerTests: XCTestCase { for promise in ps { _ = pwm.add(envelope: AddressedEnvelope(remoteAddress: address, data: buffer), promise: promise) } + let totalBytes = ps.count * buffer.readableBytes let maxVectorWritabilities = ps.map { (_: EventLoopPromise) in (buffer.readableBytes, address) } let actualVectorWritabilities = maxVectorWritabilities.indices.dropLast().map { Array(maxVectorWritabilities[$0...]) @@ -486,6 +500,7 @@ class PendingDatagramWritesManagerTests: XCTestCase { } pwm.markFlushCheckpoint() + XCTAssertEqual(Int64(totalBytes), pwm.bufferedBytes) // below, we'll write 1 datagram at a time. So the number of datagrams offered should decrease by one. // The write operation should be repeated until we did it 1 + spin count times and then return `.couldNotWriteEverything`. @@ -499,6 +514,7 @@ class PendingDatagramWritesManagerTests: XCTestCase { promiseStates: actualPromiseStates ) XCTAssertEqual(.couldNotWriteEverything, result.writeResult) + XCTAssertEqual(Int64(buffer.readableBytes), pwm.bufferedBytes) // we'll now write the one last datagram and assert that all the writes are complete result = try assertExpectedWritability( @@ -510,6 +526,7 @@ class PendingDatagramWritesManagerTests: XCTestCase { promiseStates: [Array(repeating: true, count: ps.count - 1) + [true]] ) XCTAssertEqual(.writtenCompletely, result.writeResult) + XCTAssertEqual(0, pwm.bufferedBytes) } } @@ -528,6 +545,8 @@ class PendingDatagramWritesManagerTests: XCTestCase { pwm.markFlushCheckpoint() _ = pwm.add(envelope: AddressedEnvelope(remoteAddress: address, data: buffer), promise: ps[2]) + XCTAssertEqual(12, pwm.bufferedBytes) + let result = try assertExpectedWritability( pendingWritesManager: pwm, promises: ps, @@ -537,6 +556,7 @@ class PendingDatagramWritesManagerTests: XCTestCase { promiseStates: [[false, false, false], [false, false, false]] ) XCTAssertEqual(.couldNotWriteEverything, result.writeResult) + XCTAssertEqual(12, pwm.bufferedBytes) pwm.failAll(error: ChannelError.operationUnsupported, close: true) @@ -568,6 +588,8 @@ class PendingDatagramWritesManagerTests: XCTestCase { _ = pwm.add(envelope: AddressedEnvelope(remoteAddress: address, data: buffer), promise: ps[2]) pwm.markFlushCheckpoint() + XCTAssertEqual(Int64(3 * halfTheWriteVLimit), pwm.bufferedBytes) + let result = try assertExpectedWritability( pendingWritesManager: pwm, promises: ps, @@ -577,6 +599,7 @@ class PendingDatagramWritesManagerTests: XCTestCase { promiseStates: [[true, true, false], [true, true, true]] ) XCTAssertEqual(.writtenCompletely, result.writeResult) + XCTAssertEqual(0, pwm.bufferedBytes) } } @@ -604,9 +627,11 @@ class PendingDatagramWritesManagerTests: XCTestCase { let ps: [EventLoopPromise] = (0..<3).map { (_: Int) in el.makePromise() } // add 1.5x the writev limit _ = pwm.add(envelope: AddressedEnvelope(remoteAddress: address, data: buffer), promise: ps[0]) + XCTAssertEqual(Int64(biggerThanWriteV), pwm.bufferedBytes) buffer.moveReaderIndex(to: 100) _ = pwm.add(envelope: AddressedEnvelope(remoteAddress: address, data: buffer), promise: ps[1]) _ = pwm.add(envelope: AddressedEnvelope(remoteAddress: address, data: buffer), promise: ps[2]) + XCTAssertEqual(Int64(biggerThanWriteV * 3 - 100 * 2), pwm.bufferedBytes) pwm.markFlushCheckpoint() @@ -627,6 +652,7 @@ class PendingDatagramWritesManagerTests: XCTestCase { ) XCTAssertEqual(.writtenCompletely, result.writeResult) + XCTAssertEqual(0, pwm.bufferedBytes) XCTAssertNoThrow(try ps[1].futureResult.wait()) XCTAssertNoThrow(try ps[2].futureResult.wait()) @@ -655,6 +681,8 @@ class PendingDatagramWritesManagerTests: XCTestCase { pwm.markFlushCheckpoint() _ = pwm.add(envelope: AddressedEnvelope(remoteAddress: address, data: emptyBuffer), promise: ps[2]) + XCTAssertEqual(0, pwm.bufferedBytes) + var result = try assertExpectedWritability( pendingWritesManager: pwm, promises: ps, @@ -664,6 +692,7 @@ class PendingDatagramWritesManagerTests: XCTestCase { promiseStates: [[true, true, false]] ) XCTAssertEqual(.writtenCompletely, result.writeResult) + XCTAssertEqual(0, pwm.bufferedBytes) pwm.markFlushCheckpoint() @@ -676,6 +705,7 @@ class PendingDatagramWritesManagerTests: XCTestCase { promiseStates: [[true, true, true]] ) XCTAssertEqual(.writtenCompletely, result.writeResult) + XCTAssertEqual(0, pwm.bufferedBytes) } } @@ -690,8 +720,10 @@ class PendingDatagramWritesManagerTests: XCTestCase { let ps: [EventLoopPromise] = (0..<3).map { (_: Int) in el.makePromise() } _ = pwm.add(envelope: AddressedEnvelope(remoteAddress: address, data: buffer), promise: ps[0]) _ = pwm.add(envelope: AddressedEnvelope(remoteAddress: address, data: buffer), promise: ps[1]) + XCTAssertEqual(Int64(buffer.readableBytes * 2), pwm.bufferedBytes) pwm.markFlushCheckpoint() _ = pwm.add(envelope: AddressedEnvelope(remoteAddress: address, data: buffer), promise: ps[2]) + XCTAssertEqual(Int64(buffer.readableBytes * 3), pwm.bufferedBytes) ps[0].futureResult.whenComplete { (_: Result) in pwm.failAll(error: ChannelError.inputClosed, close: true) @@ -706,6 +738,7 @@ class PendingDatagramWritesManagerTests: XCTestCase { promiseStates: [[true, true, true]] ) XCTAssertEqual(.writtenCompletely, result.writeResult) + XCTAssertEqual(0, pwm.bufferedBytes) XCTAssertNoThrow(try ps[0].futureResult.wait()) XCTAssertThrowsError(try ps[1].futureResult.wait()) XCTAssertThrowsError(try ps[2].futureResult.wait()) @@ -725,6 +758,7 @@ class PendingDatagramWritesManagerTests: XCTestCase { _ = pwm.add(envelope: AddressedEnvelope(remoteAddress: address, data: buffer), promise: promise) } pwm.markFlushCheckpoint() + XCTAssertEqual(Int64(buffer.readableBytes * ps.count), pwm.bufferedBytes) var result = try assertExpectedWritability( pendingWritesManager: pwm, @@ -738,6 +772,8 @@ class PendingDatagramWritesManagerTests: XCTestCase { ] ) XCTAssertEqual(.couldNotWriteEverything, result.writeResult) + XCTAssertEqual(Int64(buffer.readableBytes), pwm.bufferedBytes) + result = try assertExpectedWritability( pendingWritesManager: pwm, promises: ps, @@ -747,6 +783,68 @@ class PendingDatagramWritesManagerTests: XCTestCase { promiseStates: [Array(repeating: true, count: Socket.writevLimitIOVectors + 1)] ) XCTAssertEqual(.writtenCompletely, result.writeResult) + XCTAssertEqual(0, pwm.bufferedBytes) + } + } + + func testReadBufferedWritableBytesWithConsecutiveWritesAndWouldBlock() throws { + let el = EmbeddedEventLoop() + let alloc = ByteBufferAllocator() + let address = try SocketAddress(ipAddress: "127.0.0.1", port: 80) + var buffer = alloc.buffer(capacity: 12) + buffer.writeString("12") + let bufferSize = buffer.readableBytes + try withPendingDatagramWritesManager { pwm in + let ps: [EventLoopPromise] = (0...4).map { (_: Int) in el.makePromise() } + for idx in 0..<4 { + _ = pwm.add(envelope: AddressedEnvelope(remoteAddress: address, data: buffer), promise: ps[idx]) + } + + pwm.markFlushCheckpoint() + XCTAssertEqual(Int64(bufferSize * (ps.count - 1)), pwm.bufferedBytes) + + var result = try assertExpectedWritability( + pendingWritesManager: pwm, + promises: ps, + expectedSingleWritabilities: [(bufferSize, address)], + expectedVectorWritabilities: [Array(repeating: (bufferSize, address), count: 4)], + returns: [.success(.processed(3)), .success(.wouldBlock(0))], + promiseStates: [ + Array(repeating: true, count: 3) + [false, false], + Array(repeating: true, count: 3) + [false, false], + ] + ) + XCTAssertEqual(.couldNotWriteEverything, result.writeResult) + XCTAssertEqual(Int64(bufferSize), pwm.bufferedBytes) + + _ = pwm.add(envelope: AddressedEnvelope(remoteAddress: address, data: buffer), promise: ps.last!) + pwm.markFlushCheckpoint() + XCTAssertEqual(Int64(bufferSize * 2), pwm.bufferedBytes) + + result = try assertExpectedWritability( + pendingWritesManager: pwm, + promises: ps, + expectedSingleWritabilities: [(bufferSize, address)], + expectedVectorWritabilities: [[(bufferSize, address), (bufferSize, address)]], + returns: [.success(.processed(1)), .success(.wouldBlock(0))], + promiseStates: [ + Array(repeating: true, count: 4) + [false], + Array(repeating: true, count: 4) + [false], + ] + ) + XCTAssertEqual(.couldNotWriteEverything, result.writeResult) + XCTAssertEqual(Int64(bufferSize), pwm.bufferedBytes) + + result = try assertExpectedWritability( + pendingWritesManager: pwm, + promises: ps, + expectedSingleWritabilities: [(bufferSize, address)], + expectedVectorWritabilities: nil, + returns: [.success(.processed(1))], + promiseStates: [Array(repeating: true, count: 5)] + ) + XCTAssertEqual(.writtenCompletely, result.writeResult) + XCTAssertEqual(0, pwm.bufferedBytes) } } }