From 6018f072275efc08a81e0fcfafaed89b3b85a8b4 Mon Sep 17 00:00:00 2001 From: Fabian Fett Date: Wed, 29 May 2024 19:56:54 +0200 Subject: [PATCH] Code review --- .../NIOPosixBenchmarks/Benchmarks.swift | 35 +++++++-- .../NIOEmbedded/AsyncTestingEventLoop.swift | 6 ++ Sources/NIOEmbedded/Embedded.swift | 7 ++ Tests/NIOPosixTests/TaskExecutorTests.swift | 77 ++++++++++++++++--- 4 files changed, 108 insertions(+), 17 deletions(-) diff --git a/Benchmarks/Benchmarks/NIOPosixBenchmarks/Benchmarks.swift b/Benchmarks/Benchmarks/NIOPosixBenchmarks/Benchmarks.swift index 45ddfc2a3f..825fb9e576 100644 --- a/Benchmarks/Benchmarks/NIOPosixBenchmarks/Benchmarks.swift +++ b/Benchmarks/Benchmarks/NIOPosixBenchmarks/Benchmarks.swift @@ -20,18 +20,18 @@ private let eventLoop = MultiThreadedEventLoopGroup.singleton.next() let benchmarks = { let defaultMetrics: [BenchmarkMetric] = [ .mallocCountTotal, + .cpuTotal ] Benchmark( "TCPEcho", configuration: .init( metrics: defaultMetrics, - timeUnits: .milliseconds, - scalingFactor: .mega + scalingFactor: .one ) ) { benchmark in try runTCPEcho( - numberOfWrites: benchmark.scaledIterations.upperBound, + numberOfWrites: 1_000_000, eventLoop: eventLoop ) } @@ -40,11 +40,10 @@ let benchmarks = { // to serial executor is also gated behind 5.9. #if compiler(>=5.9) Benchmark( - "TCPEchoAsyncChannel", + "TCPEchoAsyncChannel using globalHook 1M times", configuration: .init( metrics: defaultMetrics, - timeUnits: .milliseconds, - scalingFactor: .mega, + scalingFactor: .one, // We are expecting a bit of allocation variance due to an allocation // in the Concurrency runtime which happens when resuming a continuation. thresholds: [.mallocCountTotal: .init(absolute: [.p90: 2000])], @@ -59,9 +58,31 @@ let benchmarks = { ) ) { benchmark in try await runTCPEchoAsyncChannel( - numberOfWrites: benchmark.scaledIterations.upperBound, + numberOfWrites: 1_000_000, eventLoop: eventLoop ) } #endif + + #if compiler(>=6.0) + if #available(macOS 15.0, *) { + Benchmark( + "TCPEchoAsyncChannel using task executor preference", + configuration: .init( + metrics: defaultMetrics, + scalingFactor: .one + // We are expecting a bit of allocation variance due to an allocation + // in the Concurrency runtime which happens when resuming a continuation. +// thresholds: [.mallocCountTotal: .init(absolute: [.p90: 2000])] + ) + ) { benchmark in + try await withTaskExecutorPreference(eventLoop.taskExecutor) { + try await runTCPEchoAsyncChannel( + numberOfWrites: 1_000_000, + eventLoop: eventLoop + ) + } + } + } + #endif } diff --git a/Sources/NIOEmbedded/AsyncTestingEventLoop.swift b/Sources/NIOEmbedded/AsyncTestingEventLoop.swift index bdc9423c85..58f20b6bd0 100644 --- a/Sources/NIOEmbedded/AsyncTestingEventLoop.swift +++ b/Sources/NIOEmbedded/AsyncTestingEventLoop.swift @@ -355,6 +355,12 @@ public final class NIOAsyncTestingEventLoop: EventLoop, @unchecked Sendable { extension NIOAsyncTestingEventLoop: NIOSerialEventLoopExecutor { } #endif +// MARK: TaskExecutor conformance +#if compiler(>=6.0) +@available(macOS 9999.0, iOS 9999.0, watchOS 9999.0, tvOS 9999.0, *) +extension NIOAsyncTestingEventLoop: NIOTaskEventLoopExecutor { } +#endif + /// This is a thread-safe promise creation store. /// /// We use this to keep track of where promises come from in the `NIOAsyncTestingEventLoop`. diff --git a/Sources/NIOEmbedded/Embedded.swift b/Sources/NIOEmbedded/Embedded.swift index 4a8ce0d218..9e2eb23cdf 100644 --- a/Sources/NIOEmbedded/Embedded.swift +++ b/Sources/NIOEmbedded/Embedded.swift @@ -240,6 +240,13 @@ public final class EmbeddedEventLoop: EventLoop { fatalError("EmbeddedEventLoop is not thread safe and cannot be used as a SerialExecutor. Use NIOAsyncTestingEventLoop instead.") } #endif + + #if compiler(>=6.0) + @available(macOS 9999.0, iOS 9999.0, watchOS 9999.0, tvOS 9999.0, *) + public var taskExecutor: any TaskExecutor { + fatalError("EmbeddedEventLoop is not thread safe and cannot be used as a TaskExecutor. Use NIOAsyncTestingEventLoop instead.") + } + #endif } @usableFromInline diff --git a/Tests/NIOPosixTests/TaskExecutorTests.swift b/Tests/NIOPosixTests/TaskExecutorTests.swift index 34b316c528..bd8cf4bc84 100644 --- a/Tests/NIOPosixTests/TaskExecutorTests.swift +++ b/Tests/NIOPosixTests/TaskExecutorTests.swift @@ -17,32 +17,89 @@ import NIOPosix import XCTest final class TaskExecutorTests: XCTestCase { + + #if compiler(>=6.0) @available(macOS 9999.0, iOS 9999.0, watchOS 9999.0, tvOS 9999.0, *) - func testBasicExecutorFitsOnEventLoop_MTELG() async throws { + func _runTests(loop1: some EventLoop, loop2: some EventLoop) async { + await withTaskGroup(of: Void.self) { taskGroup in + taskGroup.addTask(executorPreference: loop1.taskExecutor) { + loop1.assertInEventLoop() + loop2.assertNotInEventLoop() + + withUnsafeCurrentTask { task in + // this currently fails on macOS + XCTAssertEqual(task?.unownedTaskExecutor, loop1.taskExecutor.asUnownedTaskExecutor()) + } + } + + taskGroup.addTask(executorPreference: loop2.taskExecutor) { + loop1.assertNotInEventLoop() + loop2.assertInEventLoop() + + withUnsafeCurrentTask { task in + // this currently fails on macOS + XCTAssertEqual(task?.unownedTaskExecutor, loop2.taskExecutor.asUnownedTaskExecutor()) + } + } + } + + let task = Task(executorPreference: loop1.taskExecutor) { + loop1.assertInEventLoop() + loop2.assertNotInEventLoop() + + withUnsafeCurrentTask { task in + // this currently fails on macOS + XCTAssertEqual(task?.unownedTaskExecutor, loop1.taskExecutor.asUnownedTaskExecutor()) + } + } + + await task.value + } + #endif + + @available(macOS 9999.0, iOS 9999.0, watchOS 9999.0, tvOS 9999.0, *) + func testSelectableEventLoopAsTaskExecutor() async throws { #if compiler(>=6.0) let group = MultiThreadedEventLoopGroup(numberOfThreads: 2) defer { try! group.syncShutdownGracefully() } - let loops = Array(group.makeIterator()) + var iterator = group.makeIterator() + let loop1 = iterator.next()! + let loop2 = iterator.next()! + + await self._runTests(loop1: loop1, loop2: loop2) + #endif + } + + @available(macOS 9999.0, iOS 9999.0, watchOS 9999.0, tvOS 9999.0, *) + func testAsyncTestingEventLoopAsTaskExecutor() async throws { + #if compiler(>=6.0) + let loop1 = NIOAsyncTestingEventLoop() + let loop2 = NIOAsyncTestingEventLoop() + defer { + try? loop1.syncShutdownGracefully() + try? loop2.syncShutdownGracefully() + } + await withTaskGroup(of: Void.self) { taskGroup in - taskGroup.addTask(executorPreference: loops[0].taskExecutor) { - loops[0].assertInEventLoop() - loops[1].assertNotInEventLoop() + taskGroup.addTask(executorPreference: loop1.taskExecutor) { + loop1.assertInEventLoop() + loop2.assertNotInEventLoop() withUnsafeCurrentTask { task in // this currently fails on macOS - XCTAssertEqual(task?.unownedTaskExecutor, loops[0].taskExecutor.asUnownedTaskExecutor()) + XCTAssertEqual(task?.unownedTaskExecutor, loop1.taskExecutor.asUnownedTaskExecutor()) } } - taskGroup.addTask(executorPreference: loops[1].taskExecutor) { - loops[0].assertNotInEventLoop() - loops[1].assertInEventLoop() + taskGroup.addTask(executorPreference: loop2) { + loop1.assertNotInEventLoop() + loop2.assertInEventLoop() withUnsafeCurrentTask { task in // this currently fails on macOS - XCTAssertEqual(task?.unownedTaskExecutor, loops[1].taskExecutor.asUnownedTaskExecutor()) + XCTAssertEqual(task?.unownedTaskExecutor, loop2.taskExecutor.asUnownedTaskExecutor()) } } }