Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

thread configuration (names & QoS on Darwin) #2943

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 44 additions & 27 deletions Sources/NIOPosix/MultiThreadedEventLoopGroup.swift
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public final class MultiThreadedEventLoopGroup: EventLoopGroup {
private let index = ManagedAtomic<Int>(0)
private var eventLoops: [SelectableEventLoop]
private let shutdownLock: NIOLock = NIOLock()
private let threadNamePrefix: String
private let threadNamePrefix: String?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this still used?

private var runState: RunState = .running
private let canBeShutDown: Bool

Expand Down Expand Up @@ -108,7 +108,8 @@ public final class MultiThreadedEventLoopGroup: EventLoopGroup {
}

private static func setupThreadAndEventLoop(
name: String,
name: String?,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How come name is optional now?

threadConfiguration: NIOThreadConfiguration,
parentGroup: MultiThreadedEventLoopGroup,
selectorFactory: @escaping () throws -> NIOPosix.Selector<NIORegistration>,
initializer: @escaping ThreadInitializer,
Expand All @@ -119,7 +120,7 @@ public final class MultiThreadedEventLoopGroup: EventLoopGroup {
// synchronised by `lock`
var _loop: SelectableEventLoop! = nil

NIOThread.spawnAndRun(name: name, detachThread: false) { t in
NIOThread.spawnAndRun(name: name, configuration: threadConfiguration, detachThread: false) { t in
MultiThreadedEventLoopGroup.runTheLoop(
thread: t,
parentGroup: parentGroup,
Expand Down Expand Up @@ -150,6 +151,7 @@ public final class MultiThreadedEventLoopGroup: EventLoopGroup {
public convenience init(numberOfThreads: Int) {
self.init(
numberOfThreads: numberOfThreads,
threadConfiguration: .defaultForEventLoopGroups,
canBeShutDown: true,
metricsDelegate: nil,
selectorFactory: NIOPosix.Selector<NIORegistration>.init
Expand All @@ -169,6 +171,32 @@ public final class MultiThreadedEventLoopGroup: EventLoopGroup {
public convenience init(numberOfThreads: Int, metricsDelegate: NIOEventLoopMetricsDelegate) {
self.init(
numberOfThreads: numberOfThreads,
threadConfiguration: .defaultForEventLoopGroups,
canBeShutDown: true,
metricsDelegate: metricsDelegate,
selectorFactory: NIOPosix.Selector<NIORegistration>.init
)
}

/// Creates a `MultiThreadedEventLoopGroup` instance which uses `numberOfThreads`.
///
/// - note: Don't forget to call `shutdownGracefully` or `syncShutdownGracefully` when you no longer need this
/// `EventLoopGroup`. If you forget to shut the `EventLoopGroup` down you will leak `numberOfThreads`
/// (kernel) threads which are costly resources. This is especially important in unit tests where one
/// `MultiThreadedEventLoopGroup` is started per test case.
///
/// - Parameters:
/// - numberOfThreads: The number of `Threads` to use.
/// - threadConfiguration: Configuration for the threads to spawn.
/// - metricsDelegate: Delegate for collecting information from this eventloop
public convenience init(
numberOfThreads: Int,
threadConfiguration: NIOThreadConfiguration,
metricsDelegate: NIOEventLoopMetricsDelegate? = nil
) {
self.init(
numberOfThreads: numberOfThreads,
threadConfiguration: threadConfiguration,
canBeShutDown: true,
metricsDelegate: metricsDelegate,
selectorFactory: NIOPosix.Selector<NIORegistration>.init
Expand All @@ -179,20 +207,21 @@ public final class MultiThreadedEventLoopGroup: EventLoopGroup {
///
/// This is only useful for global singletons.
public static func _makePerpetualGroup(
threadNamePrefix: String,
numberOfThreads: Int
numberOfThreads: Int,
threadConfiguration: NIOThreadConfiguration
) -> MultiThreadedEventLoopGroup {
self.init(
numberOfThreads: numberOfThreads,
threadConfiguration: threadConfiguration,
canBeShutDown: false,
threadNamePrefix: threadNamePrefix,
metricsDelegate: nil,
selectorFactory: NIOPosix.Selector<NIORegistration>.init
)
}

internal convenience init(
numberOfThreads: Int,
threadConfiguration: NIOThreadConfiguration,
metricsDelegate: NIOEventLoopMetricsDelegate?,
selectorFactory: @escaping () throws -> NIOPosix.Selector<NIORegistration>
) {
Expand All @@ -201,31 +230,15 @@ public final class MultiThreadedEventLoopGroup: EventLoopGroup {
self.init(
threadInitializers: initializers,
canBeShutDown: true,
threadConfiguration: threadConfiguration,
metricsDelegate: metricsDelegate,
selectorFactory: selectorFactory
)
}

internal convenience init(
numberOfThreads: Int,
canBeShutDown: Bool,
threadNamePrefix: String,
metricsDelegate: NIOEventLoopMetricsDelegate?,
selectorFactory: @escaping () throws -> NIOPosix.Selector<NIORegistration>
) {
precondition(numberOfThreads > 0, "numberOfThreads must be positive")
let initializers: [ThreadInitializer] = Array(repeating: { _ in }, count: numberOfThreads)
self.init(
threadInitializers: initializers,
canBeShutDown: canBeShutDown,
threadNamePrefix: threadNamePrefix,
metricsDelegate: metricsDelegate,
selectorFactory: selectorFactory
)
}

internal convenience init(
numberOfThreads: Int,
threadConfiguration: NIOThreadConfiguration,
canBeShutDown: Bool,
metricsDelegate: NIOEventLoopMetricsDelegate?,
selectorFactory: @escaping () throws -> NIOPosix.Selector<NIORegistration>
Expand All @@ -235,20 +248,23 @@ public final class MultiThreadedEventLoopGroup: EventLoopGroup {
self.init(
threadInitializers: initializers,
canBeShutDown: canBeShutDown,
threadConfiguration: threadConfiguration,
metricsDelegate: metricsDelegate,
selectorFactory: selectorFactory
)
}

internal convenience init(
threadInitializers: [ThreadInitializer],
threadConfiguration: NIOThreadConfiguration,
metricsDelegate: NIOEventLoopMetricsDelegate?,
selectorFactory: @escaping () throws -> NIOPosix.Selector<NIORegistration> = NIOPosix.Selector<NIORegistration>
.init
) {
self.init(
threadInitializers: threadInitializers,
canBeShutDown: true,
threadConfiguration: threadConfiguration,
metricsDelegate: metricsDelegate,
selectorFactory: selectorFactory
)
Expand All @@ -261,12 +277,12 @@ public final class MultiThreadedEventLoopGroup: EventLoopGroup {
internal init(
threadInitializers: [ThreadInitializer],
canBeShutDown: Bool,
threadNamePrefix: String = "NIO-ELT-",
threadConfiguration: NIOThreadConfiguration,
metricsDelegate: NIOEventLoopMetricsDelegate?,
selectorFactory: @escaping () throws -> NIOPosix.Selector<NIORegistration> = NIOPosix.Selector<NIORegistration>
.init
) {
self.threadNamePrefix = threadNamePrefix
self.threadNamePrefix = threadConfiguration.threadNamePrefix
let myGroupID = nextEventLoopGroupID.loadThenWrappingIncrement(ordering: .relaxed)
self.myGroupID = myGroupID
var idx = 0
Expand All @@ -275,7 +291,8 @@ public final class MultiThreadedEventLoopGroup: EventLoopGroup {
self.eventLoops = threadInitializers.map { initializer in
// Maximum name length on linux is 16 by default.
let ev = MultiThreadedEventLoopGroup.setupThreadAndEventLoop(
name: "\(threadNamePrefix)\(myGroupID)-#\(idx)",
name: self.threadNamePrefix.map { "\($0)\(myGroupID)-#\(idx)" },
threadConfiguration: threadConfiguration,
parentGroup: self,
selectorFactory: selectorFactory,
initializer: initializer,
Expand Down
60 changes: 51 additions & 9 deletions Sources/NIOPosix/NIOThreadPool.swift
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ public final class NIOThreadPool {
/// It should never be "leaked" outside of the lock block.
case modifying
}
private let threadConfiguration: NIOThreadConfiguration
private let semaphore = DispatchSemaphore(value: 0)
private let lock = NIOLock()
private var threads: [NIOThread]? = nil // protected by `lock`
Expand Down Expand Up @@ -194,21 +195,61 @@ public final class NIOThreadPool {
/// - parameters:
/// - numberOfThreads: The number of threads to use for the thread pool.
public convenience init(numberOfThreads: Int) {
self.init(numberOfThreads: numberOfThreads, canBeStopped: true)
self.init(
numberOfThreads: numberOfThreads,
threadConfiguration: .defaultForOffloadThreadPool,
canBeStopped: true
)
}

/// Initialize a `NIOThreadPool` thread pool with `numberOfThreads` threads.
///
/// - parameters:
/// - numberOfThreads: The number of threads to use for the thread pool.
public convenience init(numberOfThreads: Int, threadConfiguration: NIOThreadConfiguration) {
self.init(
numberOfThreads: numberOfThreads,
threadConfiguration: .defaultForOffloadThreadPool,
canBeStopped: true
)
}

/// Create a ``NIOThreadPool`` that is already started, cannot be shut down and must not be `deinit`ed.
///
/// This is only useful for global singletons.
@available(
*,
deprecated,
renamed: "_makePerpetualStartedPool(numberOfThreads:threadConfiguration:threadNamePrefix:)"
)
public static func _makePerpetualStartedPool(numberOfThreads: Int, threadNamePrefix: String) -> NIOThreadPool {
let pool = self.init(numberOfThreads: numberOfThreads, canBeStopped: false)
pool._start(threadNamePrefix: threadNamePrefix)
var threadConfig = NIOThreadConfiguration.defaultForOffloadThreadPool
threadConfig.threadNamePrefix = threadNamePrefix
let pool = self.init(numberOfThreads: numberOfThreads, threadConfiguration: threadConfig, canBeStopped: false)
pool.start()
return pool
}

private init(numberOfThreads: Int, canBeStopped: Bool) {
/// Create a ``NIOThreadPool`` that is already started, cannot be shut down and must not be `deinit`ed.
///
/// This is only useful for global singletons.
public static func _makePerpetualStartedPool(
numberOfThreads: Int,
threadConfiguration: NIOThreadConfiguration
) -> NIOThreadPool {
let pool = self.init(
numberOfThreads: numberOfThreads,
threadConfiguration: threadConfiguration,
canBeStopped: false
)
pool.start()
return pool
}

private init(numberOfThreads: Int, threadConfiguration: NIOThreadConfiguration, canBeStopped: Bool) {
self.numberOfThreads = numberOfThreads
self.canBeStopped = canBeStopped
self.threadConfiguration = threadConfiguration
}

private func process(identifier: Int) {
Expand Down Expand Up @@ -252,10 +293,6 @@ public final class NIOThreadPool {

/// Start the `NIOThreadPool` if not already started.
public func start() {
self._start(threadNamePrefix: "TP-#")
}

public func _start(threadNamePrefix: String) {
let alreadyRunning: Bool = self.lock.withLock {
switch self.state {
case .running(_):
Expand Down Expand Up @@ -286,9 +323,14 @@ public final class NIOThreadPool {
self.threads?.reserveCapacity(self.numberOfThreads)
}

let threadNamePrefix = self.threadConfiguration.threadNamePrefix
for id in 0..<self.numberOfThreads {
// We should keep thread names under 16 characters because Linux doesn't allow more.
NIOThread.spawnAndRun(name: "\(threadNamePrefix)\(id)", detachThread: false) { thread in
NIOThread.spawnAndRun(
name: "\(threadNamePrefix)\(id)",
configuration: self.threadConfiguration,
detachThread: false
) { thread in
self.lock.withLock {
self.threads!.append(thread)
cond.lock()
Expand Down
10 changes: 7 additions & 3 deletions Sources/NIOPosix/PosixSingletons.swift
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,11 @@ private let singletonMTELG: MultiThreadedEventLoopGroup = {
)
}
let threadCount = NIOSingletons.groupLoopCountSuggestion
var threadConfig = NIOThreadConfiguration.defaultForEventLoopGroups
threadConfig.threadNamePrefix = "NIO-SGLTN-"
let group = MultiThreadedEventLoopGroup._makePerpetualGroup(
threadNamePrefix: "NIO-SGLTN-",
numberOfThreads: threadCount
numberOfThreads: threadCount,
threadConfiguration: threadConfig
)
_ = Unmanaged.passUnretained(group).retain() // Never gonna give you up,
return group
Expand All @@ -113,9 +115,11 @@ private let globalPosixBlockingPool: NIOThreadPool = {
"""
)
}
var threadConfig = NIOThreadConfiguration.defaultForOffloadThreadPool
threadConfig.threadNamePrefix = "SGLTN-TP-#"
let pool = NIOThreadPool._makePerpetualStartedPool(
numberOfThreads: NIOSingletons.blockingPoolThreadCountSuggestion,
threadNamePrefix: "SGLTN-TP-#"
threadConfiguration: threadConfig
)
_ = Unmanaged.passRetained(pool) // never gonna let you down.
return pool
Expand Down
17 changes: 14 additions & 3 deletions Sources/NIOPosix/Thread.swift
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@ protocol ThreadOps {
///
/// All methods exposed are thread-safe.
final class NIOThread {
internal typealias ThreadBoxValue = (body: (NIOThread) -> Void, name: String?)
internal typealias ThreadBoxValue = (
body: (NIOThread) -> Void, name: String?, configuration: NIOThreadConfiguration
)
internal typealias ThreadBox = Box<ThreadBoxValue>

private let desiredName: String?
Expand Down Expand Up @@ -78,22 +80,31 @@ final class NIOThread {
ThreadOpsSystem.joinThread(self.handle)
}

static func spawnAndRunBasic(
body: @escaping (NIOThread) -> Void
) {
var threadConfig = NIOThreadConfiguration.defaultForEventLoopGroups
threadConfig.threadNamePrefix = "UnitTest-"
self.spawnAndRun(name: nil, configuration: threadConfig, detachThread: true, body: body)
}

/// Spawns and runs some task in a `NIOThread`.
///
/// - arguments:
/// - name: The name of the `NIOThread` or `nil` if no specific name should be set.
/// - body: The function to execute within the spawned `NIOThread`.
/// - detach: Whether to detach the thread. If the thread is not detached it must be `join`ed.
static func spawnAndRun(
name: String? = nil,
name: String?,
configuration: NIOThreadConfiguration,
detachThread: Bool = true,
body: @escaping (NIOThread) -> Void
) {
var handle: ThreadOpsSystem.ThreadHandle? = nil

// Store everything we want to pass into the c function in a Box so we
// can hand-over the reference.
let tuple: ThreadBoxValue = (body: body, name: name)
let tuple: ThreadBoxValue = (body: body, name: name, configuration: configuration)
let box = ThreadBox(tuple)

ThreadOpsSystem.run(handle: &handle, args: box, detachThread: detachThread)
Expand Down
Loading
Loading