Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

POC output with URring #2357

Open
wants to merge 17 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 80 additions & 18 deletions Sources/NIOPosix/BaseSocketChannel.swift
Original file line number Diff line number Diff line change
Expand Up @@ -374,10 +374,21 @@ class BaseSocketChannel<SocketType: BaseSocketProtocol>: SelectableChannel, Chan
}

// MARK: Methods to override in subclasses.

#if SWIFTNIO_USE_IO_URING && os(Linux)

func writeToSocket() throws {
fatalError("must be overridden")
}

#else

func writeToSocket() throws -> OverallWriteResult {
fatalError("must be overridden")
}

#endif

/// Read data from the underlying socket and dispatch it to the `ChannelPipeline`
///
/// - returns: `true` if any data was read, `false` otherwise.
Expand Down Expand Up @@ -436,17 +447,17 @@ class BaseSocketChannel<SocketType: BaseSocketProtocol>: SelectableChannel, Chan
fatalError("this must be overridden by sub class")
}

/// Buffer a write in preparation for a flush.
func bufferPendingWrite(data: NIOAny, promise: EventLoopPromise<Void>?) {
fatalError("this must be overridden by sub class")
}

/// Mark a flush point. This is called when flush is received, and instructs
/// the implementation to record the flush.
func markFlushPoint() {
fatalError("this must be overridden by sub class")
}

/// Buffer a write in preparation for a flush.
func bufferPendingWrite(data: NIOAny, promise: EventLoopPromise<Void>?) {
fatalError("this must be overridden by sub class")
}

/// Called when closing, to instruct the specific implementation to discard all pending
/// writes.
func cancelWritesOnClose(error: Error) {
Expand Down Expand Up @@ -500,6 +511,19 @@ class BaseSocketChannel<SocketType: BaseSocketProtocol>: SelectableChannel, Chan
return try self.socket.remoteAddress()
}

#if SWIFTNIO_USE_IO_URING && os(Linux)

func flushNow() {
do {
try self.writeToSocket()
}
catch let err {
self.close0(error: err, mode: .all, promise: nil)
}
}

#else

/// Flush data to the underlying socket and return if this socket needs to be registered for write notifications.
///
/// This method can be called re-entrantly but it will return immediately because the first call is responsible
Expand All @@ -523,8 +547,8 @@ class BaseSocketChannel<SocketType: BaseSocketProtocol>: SelectableChannel, Chan
self.inFlushNow = false
}

var newWriteRegistrationState: IONotificationState = .unregister
do {
var newWriteRegistrationState: IONotificationState = .unregister
while newWriteRegistrationState == .unregister && self.hasFlushedPendingWrites() && self.isOpen {
assert(self.lifecycleManager.isActive)
let writeResult = try self.writeToSocket()
Expand All @@ -540,7 +564,13 @@ class BaseSocketChannel<SocketType: BaseSocketProtocol>: SelectableChannel, Chan
self.pipeline.syncOperations.fireChannelWritabilityChanged()
}
}
} catch let err {

assert((newWriteRegistrationState == .register && self.hasFlushedPendingWrites()) ||
(newWriteRegistrationState == .unregister && !self.hasFlushedPendingWrites()),
"illegal flushNow decision: \(newWriteRegistrationState) and \(self.hasFlushedPendingWrites())")
return newWriteRegistrationState
}
catch let err {
// If there is a write error we should try drain the inbound before closing the socket as there may be some data pending.
// We ignore any error that is thrown as we will use the original err to close the channel and notify the user.
if self.readIfNeeded0() {
Expand All @@ -561,13 +591,10 @@ class BaseSocketChannel<SocketType: BaseSocketProtocol>: SelectableChannel, Chan
// we handled all writes
return .unregister
}

assert((newWriteRegistrationState == .register && self.hasFlushedPendingWrites()) ||
(newWriteRegistrationState == .unregister && !self.hasFlushedPendingWrites()),
"illegal flushNow decision: \(newWriteRegistrationState) and \(self.hasFlushedPendingWrites())")
return newWriteRegistrationState
}

#endif

public final func setOption<Option: ChannelOption>(_ option: Option, value: Option.Value) -> EventLoopFuture<Void> {
if eventLoop.inEventLoop {
let promise = eventLoop.makePromise(of: Void.self)
Expand Down Expand Up @@ -722,16 +749,30 @@ class BaseSocketChannel<SocketType: BaseSocketProtocol>: SelectableChannel, Chan
return
}

#if SWIFTNIO_USE_IO_URING && os(Linux)
guard self.lifecycleManager.isActive else {
return
}

let isFlushPending = self.hasFlushedPendingWrites()
self.markFlushPoint()

if !isFlushPending {
self.flushNow()
}
#else
self.markFlushPoint()

guard self.lifecycleManager.isActive else {
return
}

if !isWritePending() && flushNow() == .register {
let isWritePending = self.interestedEvent.contains(.write)
if !isWritePending && flushNow() == .register {
assert(self.lifecycleManager.isPreRegistered)
registerForWritable()
}
#endif
}

public func read0() {
Expand Down Expand Up @@ -877,7 +918,6 @@ class BaseSocketChannel<SocketType: BaseSocketProtocol>: SelectableChannel, Chan
}
}


public final func register0(promise: EventLoopPromise<Void>?) {
self.eventLoop.assertInEventLoop()

Expand Down Expand Up @@ -938,6 +978,15 @@ class BaseSocketChannel<SocketType: BaseSocketProtocol>: SelectableChannel, Chan

self.finishConnect() // If we were connecting, that has finished.

#if SWIFTNIO_USE_IO_URING && os(Linux)
// the only case when it happen with Uring is a socket connect,
// writtable event is not needed anymore
//let isFlushPending = self.hasFlushedPendingWrites()
//if !isFlushPending {
// self.flushNow()
//}
self.unregisterForWritable()
#else
switch self.flushNow() {
case .unregister:
// Everything was written or connect was complete, let's unregister from writable.
Expand All @@ -946,6 +995,7 @@ class BaseSocketChannel<SocketType: BaseSocketProtocol>: SelectableChannel, Chan
assert(!self.isOpen || self.interestedEvent.contains(.write))
() // nothing to do because given that we just received `writable`, we're still registered for writable.
}
#endif
}

private func finishConnect() {
Expand Down Expand Up @@ -1222,10 +1272,6 @@ class BaseSocketChannel<SocketType: BaseSocketProtocol>: SelectableChannel, Chan
// Do nothing
}

private func isWritePending() -> Bool {
return self.interestedEvent.contains(.write)
}

private final func safeReregister(interested: SelectorEventSet) {
self.eventLoop.assertInEventLoop()
assert(self.lifecycleManager.isRegisteredFully)
Expand Down Expand Up @@ -1309,6 +1355,22 @@ class BaseSocketChannel<SocketType: BaseSocketProtocol>: SelectableChannel, Chan
func reregister(selector: Selector<NIORegistration>, interested: SelectorEventSet) throws {
fatalError("must override")
}

#if SWIFTNIO_USE_IO_URING && os(Linux)

func writeAsync(selector: Selector<NIORegistration>, pointer: UnsafeRawBufferPointer) throws {
fatalError("must override")
}

func writeAsync(selector: Selector<NIORegistration>, iovecs: UnsafeBufferPointer<IOVector>) throws {
fatalError("must override")
}

func sendFileAsync(selector: Selector<NIORegistration>, src: CInt, offset: Int64, count: UInt32) throws {
fatalError("must override")
}

#endif
}

extension BaseSocketChannel {
Expand Down
51 changes: 50 additions & 1 deletion Sources/NIOPosix/BaseStreamSocketChannel.swift
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,14 @@ class BaseStreamSocketChannel<Socket: SocketProtocol>: BaseSocketChannel<Socket>
eventLoop: SelectableEventLoop,
recvAllocator: RecvByteBufferAllocator
) throws {
self.pendingWrites = PendingStreamWritesManager(iovecs: eventLoop.iovecs, storageRefs: eventLoop.storageRefs)
#if SWIFTNIO_USE_IO_URING && os(Linux)
let iovecs = UnsafeMutableBufferPointer<IOVector>.allocate(capacity: NIOPosix.Socket.writevLimitIOVectors)
let storageRefs = UnsafeMutableBufferPointer<Unmanaged<AnyObject>>.allocate(capacity: NIOPosix.Socket.writevLimitIOVectors)
#else
let iovecs = eventLoop.iovecs
let storageRefs = eventLoop.storageRefs
#endif
self.pendingWrites = PendingStreamWritesManager(iovecs: iovecs, storageRefs: storageRefs)
self.connectTimeoutScheduled = nil
try super.init(
socket: socket,
Expand Down Expand Up @@ -159,6 +166,46 @@ class BaseStreamSocketChannel<Socket: SocketProtocol>: BaseSocketChannel<Socket>
return result
}

#if SWIFTNIO_USE_IO_URING && os(Linux)

override func writeToSocket() throws {
try self.pendingWrites.triggerAppropriateAsyncWriteOperations(
scalarBufferAsyncWriteOperation: { ptr in
try self.selectableEventLoop.writeAsync(channel: self, pointer: ptr)
},
vectorBufferAsyncWriteOperation: { iovecs in
try self.selectableEventLoop.writeAsync(channel: self, iovecs: iovecs)
},
scalarFileAsyncWriteOperation: { descriptor, index, endIndex in
let count = (endIndex - index)
try self.selectableEventLoop.sendFileAsync(channel: self, src: descriptor, offset: Int64(index), count: UInt32(count))
})
}

func didAsyncWrite(result: Int32) {
if (result > 0) {
let (writabilityChange, flushAgain) = self.pendingWrites.didAsyncWrite(written: Int(result))
if writabilityChange {
self.pipeline.syncOperations.fireChannelWritabilityChanged()
}
if flushAgain {
self.flushNow()
}
}
else {
self.pendingWrites.releaseData()
let errnoCode = -result
if errnoCode == EAGAIN {
self.flushNow()
}
else {
assert(false)
}
}
}

#else

final override func writeToSocket() throws -> OverallWriteResult {
let result = try self.pendingWrites.triggerAppropriateWriteOperations(scalarBufferWriteOperation: { ptr in
guard ptr.count > 0 else {
Expand All @@ -176,6 +223,8 @@ class BaseStreamSocketChannel<Socket: SocketProtocol>: BaseSocketChannel<Socket>
return result
}

#endif

final override func close0(error: Error, mode: CloseMode, promise: EventLoopPromise<Void>?) {
do {
switch mode {
Expand Down
Loading