Skip to content

Commit

Permalink
thread configuration (names & QoS on Darwin)
Browse files Browse the repository at this point in the history
  • Loading branch information
weissi committed Oct 23, 2024
1 parent be823e6 commit 82af47f
Show file tree
Hide file tree
Showing 9 changed files with 330 additions and 63 deletions.
71 changes: 44 additions & 27 deletions Sources/NIOPosix/MultiThreadedEventLoopGroup.swift
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public final class MultiThreadedEventLoopGroup: EventLoopGroup {
private let index = ManagedAtomic<Int>(0)
private var eventLoops: [SelectableEventLoop]
private let shutdownLock: NIOLock = NIOLock()
private let threadNamePrefix: String
private let threadNamePrefix: Optional<String>
private var runState: RunState = .running
private let canBeShutDown: Bool

Expand Down Expand Up @@ -108,7 +108,8 @@ public final class MultiThreadedEventLoopGroup: EventLoopGroup {
}

private static func setupThreadAndEventLoop(
name: String,
name: String?,
threadConfiguration: NIOThreadConfiguration,
parentGroup: MultiThreadedEventLoopGroup,
selectorFactory: @escaping () throws -> NIOPosix.Selector<NIORegistration>,
initializer: @escaping ThreadInitializer,
Expand All @@ -119,7 +120,7 @@ public final class MultiThreadedEventLoopGroup: EventLoopGroup {
// synchronised by `lock`
var _loop: SelectableEventLoop! = nil

NIOThread.spawnAndRun(name: name, detachThread: false) { t in
NIOThread.spawnAndRun(name: name, configuration: threadConfiguration, detachThread: false) { t in
MultiThreadedEventLoopGroup.runTheLoop(
thread: t,
parentGroup: parentGroup,
Expand Down Expand Up @@ -150,6 +151,7 @@ public final class MultiThreadedEventLoopGroup: EventLoopGroup {
public convenience init(numberOfThreads: Int) {
self.init(
numberOfThreads: numberOfThreads,
threadConfiguration: .defaultForEventLoopGroups,
canBeShutDown: true,
metricsDelegate: nil,
selectorFactory: NIOPosix.Selector<NIORegistration>.init
Expand All @@ -169,6 +171,32 @@ public final class MultiThreadedEventLoopGroup: EventLoopGroup {
public convenience init(numberOfThreads: Int, metricsDelegate: NIOEventLoopMetricsDelegate) {
self.init(
numberOfThreads: numberOfThreads,
threadConfiguration: .defaultForEventLoopGroups,
canBeShutDown: true,
metricsDelegate: metricsDelegate,
selectorFactory: NIOPosix.Selector<NIORegistration>.init
)
}

/// Creates a `MultiThreadedEventLoopGroup` instance which uses `numberOfThreads`.
///
/// - note: Don't forget to call `shutdownGracefully` or `syncShutdownGracefully` when you no longer need this
/// `EventLoopGroup`. If you forget to shut the `EventLoopGroup` down you will leak `numberOfThreads`
/// (kernel) threads which are costly resources. This is especially important in unit tests where one
/// `MultiThreadedEventLoopGroup` is started per test case.
///
/// - Parameters:
/// - numberOfThreads: The number of `Threads` to use.
/// - threadConfiguration: Configuration for the threads to spawn.
/// - metricsDelegate: Delegate for collecting information from this eventloop
public convenience init(
numberOfThreads: Int,
threadConfiguration: NIOThreadConfiguration,
metricsDelegate: NIOEventLoopMetricsDelegate? = nil
) {
self.init(
numberOfThreads: numberOfThreads,
threadConfiguration: threadConfiguration,
canBeShutDown: true,
metricsDelegate: metricsDelegate,
selectorFactory: NIOPosix.Selector<NIORegistration>.init
Expand All @@ -179,20 +207,21 @@ public final class MultiThreadedEventLoopGroup: EventLoopGroup {
///
/// This is only useful for global singletons.
public static func _makePerpetualGroup(
threadNamePrefix: String,
numberOfThreads: Int
numberOfThreads: Int,
threadConfiguration: NIOThreadConfiguration
) -> MultiThreadedEventLoopGroup {
self.init(
numberOfThreads: numberOfThreads,
threadConfiguration: threadConfiguration,
canBeShutDown: false,
threadNamePrefix: threadNamePrefix,
metricsDelegate: nil,
selectorFactory: NIOPosix.Selector<NIORegistration>.init
)
}

internal convenience init(
numberOfThreads: Int,
threadConfiguration: NIOThreadConfiguration,
metricsDelegate: NIOEventLoopMetricsDelegate?,
selectorFactory: @escaping () throws -> NIOPosix.Selector<NIORegistration>
) {
Expand All @@ -201,31 +230,15 @@ public final class MultiThreadedEventLoopGroup: EventLoopGroup {
self.init(
threadInitializers: initializers,
canBeShutDown: true,
threadConfiguration: threadConfiguration,
metricsDelegate: metricsDelegate,
selectorFactory: selectorFactory
)
}

internal convenience init(
numberOfThreads: Int,
canBeShutDown: Bool,
threadNamePrefix: String,
metricsDelegate: NIOEventLoopMetricsDelegate?,
selectorFactory: @escaping () throws -> NIOPosix.Selector<NIORegistration>
) {
precondition(numberOfThreads > 0, "numberOfThreads must be positive")
let initializers: [ThreadInitializer] = Array(repeating: { _ in }, count: numberOfThreads)
self.init(
threadInitializers: initializers,
canBeShutDown: canBeShutDown,
threadNamePrefix: threadNamePrefix,
metricsDelegate: metricsDelegate,
selectorFactory: selectorFactory
)
}

internal convenience init(
numberOfThreads: Int,
threadConfiguration: NIOThreadConfiguration,
canBeShutDown: Bool,
metricsDelegate: NIOEventLoopMetricsDelegate?,
selectorFactory: @escaping () throws -> NIOPosix.Selector<NIORegistration>
Expand All @@ -235,20 +248,23 @@ public final class MultiThreadedEventLoopGroup: EventLoopGroup {
self.init(
threadInitializers: initializers,
canBeShutDown: canBeShutDown,
threadConfiguration: threadConfiguration,
metricsDelegate: metricsDelegate,
selectorFactory: selectorFactory
)
}

internal convenience init(
threadInitializers: [ThreadInitializer],
threadConfiguration: NIOThreadConfiguration,
metricsDelegate: NIOEventLoopMetricsDelegate?,
selectorFactory: @escaping () throws -> NIOPosix.Selector<NIORegistration> = NIOPosix.Selector<NIORegistration>
.init
) {
self.init(
threadInitializers: threadInitializers,
canBeShutDown: true,
threadConfiguration: threadConfiguration,
metricsDelegate: metricsDelegate,
selectorFactory: selectorFactory
)
Expand All @@ -261,12 +277,12 @@ public final class MultiThreadedEventLoopGroup: EventLoopGroup {
internal init(
threadInitializers: [ThreadInitializer],
canBeShutDown: Bool,
threadNamePrefix: String = "NIO-ELT-",
threadConfiguration: NIOThreadConfiguration,
metricsDelegate: NIOEventLoopMetricsDelegate?,
selectorFactory: @escaping () throws -> NIOPosix.Selector<NIORegistration> = NIOPosix.Selector<NIORegistration>
.init
) {
self.threadNamePrefix = threadNamePrefix
self.threadNamePrefix = threadConfiguration.threadNamePrefix
let myGroupID = nextEventLoopGroupID.loadThenWrappingIncrement(ordering: .relaxed)
self.myGroupID = myGroupID
var idx = 0
Expand All @@ -275,7 +291,8 @@ public final class MultiThreadedEventLoopGroup: EventLoopGroup {
self.eventLoops = threadInitializers.map { initializer in
// Maximum name length on linux is 16 by default.
let ev = MultiThreadedEventLoopGroup.setupThreadAndEventLoop(
name: "\(threadNamePrefix)\(myGroupID)-#\(idx)",
name: self.threadNamePrefix.map { "\($0)\(myGroupID)-#\(idx)" },
threadConfiguration: threadConfiguration,
parentGroup: self,
selectorFactory: selectorFactory,
initializer: initializer,
Expand Down
56 changes: 47 additions & 9 deletions Sources/NIOPosix/NIOThreadPool.swift
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ public final class NIOThreadPool {
/// It should never be "leaked" outside of the lock block.
case modifying
}
private let threadConfiguration: NIOThreadConfiguration
private let semaphore = DispatchSemaphore(value: 0)
private let lock = NIOLock()
private var threads: [NIOThread]? = nil // protected by `lock`
Expand Down Expand Up @@ -194,21 +195,57 @@ public final class NIOThreadPool {
/// - parameters:
/// - numberOfThreads: The number of threads to use for the thread pool.
public convenience init(numberOfThreads: Int) {
self.init(numberOfThreads: numberOfThreads, canBeStopped: true)
self.init(
numberOfThreads: numberOfThreads,
threadConfiguration: .defaultForOffloadThreadPool,
canBeStopped: true
)
}

/// Initialize a `NIOThreadPool` thread pool with `numberOfThreads` threads.
///
/// - parameters:
/// - numberOfThreads: The number of threads to use for the thread pool.
public convenience init(numberOfThreads: Int, threadConfiguration: NIOThreadConfiguration) {
self.init(
numberOfThreads: numberOfThreads,
threadConfiguration: .defaultForOffloadThreadPool,
canBeStopped: true
)
}

/// Create a ``NIOThreadPool`` that is already started, cannot be shut down and must not be `deinit`ed.
///
/// This is only useful for global singletons.
@available(*, deprecated, renamed: "_makePerpetualStartedPool(numberOfThreads:threadConfiguration:threadNamePrefix:)")
public static func _makePerpetualStartedPool(numberOfThreads: Int, threadNamePrefix: String) -> NIOThreadPool {
let pool = self.init(numberOfThreads: numberOfThreads, canBeStopped: false)
pool._start(threadNamePrefix: threadNamePrefix)
var threadConfig = NIOThreadConfiguration.defaultForOffloadThreadPool
threadConfig.threadNamePrefix = threadNamePrefix
let pool = self.init(numberOfThreads: numberOfThreads, threadConfiguration: threadConfig, canBeStopped: false)
pool.start()
return pool
}

private init(numberOfThreads: Int, canBeStopped: Bool) {
/// Create a ``NIOThreadPool`` that is already started, cannot be shut down and must not be `deinit`ed.
///
/// This is only useful for global singletons.
public static func _makePerpetualStartedPool(
numberOfThreads: Int,
threadConfiguration: NIOThreadConfiguration
) -> NIOThreadPool {
let pool = self.init(
numberOfThreads: numberOfThreads,
threadConfiguration: threadConfiguration,
canBeStopped: false
)
pool.start()
return pool
}

private init(numberOfThreads: Int, threadConfiguration: NIOThreadConfiguration, canBeStopped: Bool) {
self.numberOfThreads = numberOfThreads
self.canBeStopped = canBeStopped
self.threadConfiguration = threadConfiguration
}

private func process(identifier: Int) {
Expand Down Expand Up @@ -252,10 +289,6 @@ public final class NIOThreadPool {

/// Start the `NIOThreadPool` if not already started.
public func start() {
self._start(threadNamePrefix: "TP-#")
}

public func _start(threadNamePrefix: String) {
let alreadyRunning: Bool = self.lock.withLock {
switch self.state {
case .running(_):
Expand Down Expand Up @@ -286,9 +319,14 @@ public final class NIOThreadPool {
self.threads?.reserveCapacity(self.numberOfThreads)
}

let threadNamePrefix = self.threadConfiguration.threadNamePrefix
for id in 0..<self.numberOfThreads {
// We should keep thread names under 16 characters because Linux doesn't allow more.
NIOThread.spawnAndRun(name: "\(threadNamePrefix)\(id)", detachThread: false) { thread in
NIOThread.spawnAndRun(
name: "\(threadNamePrefix)\(id)",
configuration: self.threadConfiguration,
detachThread: false
) { thread in
self.lock.withLock {
self.threads!.append(thread)
cond.lock()
Expand Down
10 changes: 7 additions & 3 deletions Sources/NIOPosix/PosixSingletons.swift
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,11 @@ private let singletonMTELG: MultiThreadedEventLoopGroup = {
)
}
let threadCount = NIOSingletons.groupLoopCountSuggestion
var threadConfig = NIOThreadConfiguration.defaultForEventLoopGroups
threadConfig.threadNamePrefix = "NIO-SGLTN-"
let group = MultiThreadedEventLoopGroup._makePerpetualGroup(
threadNamePrefix: "NIO-SGLTN-",
numberOfThreads: threadCount
numberOfThreads: threadCount,
threadConfiguration: threadConfig
)
_ = Unmanaged.passUnretained(group).retain() // Never gonna give you up,
return group
Expand All @@ -113,9 +115,11 @@ private let globalPosixBlockingPool: NIOThreadPool = {
"""
)
}
var threadConfig = NIOThreadConfiguration.defaultForOffloadThreadPool
threadConfig.threadNamePrefix = "SGLTN-TP-#"
let pool = NIOThreadPool._makePerpetualStartedPool(
numberOfThreads: NIOSingletons.blockingPoolThreadCountSuggestion,
threadNamePrefix: "SGLTN-TP-#"
threadConfiguration: threadConfig
)
_ = Unmanaged.passUnretained(pool).retain() // never gonna let you down.
return pool
Expand Down
15 changes: 12 additions & 3 deletions Sources/NIOPosix/Thread.swift
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ protocol ThreadOps {
///
/// All methods exposed are thread-safe.
final class NIOThread {
internal typealias ThreadBoxValue = (body: (NIOThread) -> Void, name: String?)
internal typealias ThreadBoxValue = (body: (NIOThread) -> Void, name: String?, configuration: NIOThreadConfiguration)
internal typealias ThreadBox = Box<ThreadBoxValue>

private let desiredName: String?
Expand Down Expand Up @@ -78,22 +78,31 @@ final class NIOThread {
ThreadOpsSystem.joinThread(self.handle)
}

static func spawnAndRunBasic(
body: @escaping (NIOThread) -> Void
) {
var threadConfig = NIOThreadConfiguration.defaultForEventLoopGroups
threadConfig.threadNamePrefix = "UnitTest-"
self.spawnAndRun(name: nil, configuration: threadConfig, detachThread: true, body: body)
}

/// Spawns and runs some task in a `NIOThread`.
///
/// - arguments:
/// - name: The name of the `NIOThread` or `nil` if no specific name should be set.
/// - body: The function to execute within the spawned `NIOThread`.
/// - detach: Whether to detach the thread. If the thread is not detached it must be `join`ed.
static func spawnAndRun(
name: String? = nil,
name: String?,
configuration: NIOThreadConfiguration,
detachThread: Bool = true,
body: @escaping (NIOThread) -> Void
) {
var handle: ThreadOpsSystem.ThreadHandle? = nil

// Store everything we want to pass into the c function in a Box so we
// can hand-over the reference.
let tuple: ThreadBoxValue = (body: body, name: name)
let tuple: ThreadBoxValue = (body: body, name: name, configuration: configuration)
let box = ThreadBox(tuple)

ThreadOpsSystem.run(handle: &handle, args: box, detachThread: detachThread)
Expand Down
Loading

0 comments on commit 82af47f

Please sign in to comment.