Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adopt NIOThrowingAsyncSequenceProducer 2nd try #2917

Merged
merged 4 commits into from
Oct 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
212 changes: 147 additions & 65 deletions Sources/NIOFileSystem/DirectoryEntries.swift
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import CNIODarwin
import CNIOLinux
import NIOConcurrencyHelpers
import NIOCore
import NIOPosix
@preconcurrency import SystemPackage

Expand Down Expand Up @@ -89,17 +90,17 @@ extension DirectoryEntries {
public typealias AsyncIterator = BatchedIterator
public typealias Element = [DirectoryEntry]

private let stream: BufferedOrAnyStream<[DirectoryEntry]>
private let stream: BufferedOrAnyStream<[DirectoryEntry], DirectoryEntryProducer>

/// Creates a ``DirectoryEntries/Batched`` sequence by wrapping an `AsyncSequence`
/// of directory entry batches.
public init<S: AsyncSequence>(wrapping sequence: S) where S.Element == Element {
self.stream = BufferedOrAnyStream(wrapping: sequence)
self.stream = BufferedOrAnyStream<[DirectoryEntry], DirectoryEntryProducer>(wrapping: sequence)
}

fileprivate init(handle: SystemFileHandle, recursive: Bool) {
// Expanding the batches yields watermarks of 256 and 512 directory entries.
let stream = BufferedStream.makeBatchedDirectoryEntryStream(
let stream = NIOThrowingAsyncSequenceProducer.makeBatchedDirectoryEntryStream(
handle: handle,
recursive: recursive,
entriesPerBatch: 64,
Expand All @@ -116,9 +117,11 @@ extension DirectoryEntries {

/// An `AsyncIteratorProtocol` of `Array<DirectoryEntry>`.
public struct BatchedIterator: AsyncIteratorProtocol {
private var iterator: BufferedOrAnyStream<[DirectoryEntry]>.AsyncIterator
private var iterator: BufferedOrAnyStream<[DirectoryEntry], DirectoryEntryProducer>.AsyncIterator

init(wrapping iterator: BufferedOrAnyStream<[DirectoryEntry]>.AsyncIterator) {
fileprivate init(
wrapping iterator: BufferedOrAnyStream<[DirectoryEntry], DirectoryEntryProducer>.AsyncIterator
) {
self.iterator = iterator
}

Expand All @@ -135,52 +138,95 @@ extension DirectoryEntries.Batched.AsyncIterator: Sendable {}
// MARK: - Internal

@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
extension BufferedStream where Element == [DirectoryEntry] {
extension NIOThrowingAsyncSequenceProducer
where
Element == [DirectoryEntry],
Failure == (any Error),
Strategy == NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark,
Delegate == DirectoryEntryProducer
{
fileprivate static func makeBatchedDirectoryEntryStream(
handle: SystemFileHandle,
recursive: Bool,
entriesPerBatch: Int,
lowWatermark: Int,
highWatermark: Int
) -> BufferedStream<[DirectoryEntry]> {
let state = DirectoryEnumerator(handle: handle, recursive: recursive)
let protectedState = NIOLockedValueBox(state)

var (stream, source) = BufferedStream.makeStream(
of: [DirectoryEntry].self,
backPressureStrategy: .watermark(low: lowWatermark, high: highWatermark)
)

source.onTermination = {
guard let threadPool = protectedState.withLockedValue({ $0.threadPoolForClosing() }) else {
return
}

threadPool.submit { _ in // always run, even if cancelled
protectedState.withLockedValue { state in
state.closeIfNecessary()
}
}
}

) -> NIOThrowingAsyncSequenceProducer<
[DirectoryEntry], any Error, NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark,
DirectoryEntryProducer
> {
let producer = DirectoryEntryProducer(
state: protectedState,
source: source,
handle: handle,
recursive: recursive,
entriesPerBatch: entriesPerBatch
)
// Start producing immediately.
producer.produceMore()

return stream
let nioThrowingAsyncSequence = NIOThrowingAsyncSequenceProducer.makeSequence(
elementType: [DirectoryEntry].self,
backPressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark(
lowWatermark: lowWatermark,
highWatermark: highWatermark
),
finishOnDeinit: false,
delegate: producer
)

producer.setSequenceProducerSource(nioThrowingAsyncSequence.source)

return nioThrowingAsyncSequence.sequence
}
}

@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
private struct DirectoryEntryProducer {
private typealias DirectoryEntrySequenceProducer = NIOThrowingAsyncSequenceProducer<
[DirectoryEntry], Error, NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark, DirectoryEntryProducer
>

@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
private final class DirectoryEntryProducer: NIOAsyncSequenceProducerDelegate {
let state: NIOLockedValueBox<DirectoryEnumerator>
let source: BufferedStream<[DirectoryEntry]>.Source
let entriesPerBatch: Int

init(handle: SystemFileHandle, recursive: Bool, entriesPerBatch: Int) {
let state = DirectoryEnumerator(handle: handle, recursive: recursive)
self.state = NIOLockedValueBox(state)
self.entriesPerBatch = entriesPerBatch
}

func didTerminate() {
guard let threadPool = self.state.withLockedValue({ $0.threadPoolForClosing() }) else {
return
}

threadPool.submit { _ in // always run, even if cancelled
self.state.withLockedValue { state in
state.closeIfNecessary()
}
}
}

/// sets the source within the producer state
func setSequenceProducerSource(_ sequenceProducerSource: DirectoryEntrySequenceProducer.Source) {
self.state.withLockedValue { state in
switch state.state {
case .idle:
state.sequenceProducerSource = sequenceProducerSource
case .done:
sequenceProducerSource.finish()
case .open, .openPausedProducing:
fatalError()
case .modifying:
fatalError()
}
}
}

func clearSource() {
self.state.withLockedValue { state in
state.sequenceProducerSource = nil
}
}

/// The 'entry point' for producing elements.
///
/// Calling this function will start producing directory entries asynchronously by dispatching
Expand All @@ -207,6 +253,12 @@ private struct DirectoryEntryProducer {
}
}

func pauseProducing() {
self.state.withLockedValue { state in
state.pauseProducing()
}
}

private func nextBatch() throws -> [DirectoryEntry] {
try self.state.withLockedValue { state in
try state.next(self.entriesPerBatch)
Expand All @@ -221,45 +273,51 @@ private struct DirectoryEntryProducer {
// Failed to read more entries: close and notify the stream so consumers receive the
// error.
self.close()
self.source.finish(throwing: error)
let source = self.state.withLockedValue { state in
state.sequenceProducerSource
}
source?.finish(error)
self.clearSource()
}
}

private func onNextBatch(_ entries: [DirectoryEntry]) {
let source = self.state.withLockedValue { state in
state.sequenceProducerSource
}

guard let source else {
assertionFailure("unexpectedly missing source")
return
}

// No entries were read: this must be the end (as the batch size must be greater than zero).
if entries.isEmpty {
self.source.finish(throwing: nil)
source.finish()
self.clearSource()
return
}

// Reading short means reading EOF. The enumerator closes itself in that case.
let readEOF = entries.count < self.entriesPerBatch

// Entries were produced: yield them and maybe produce more.
do {
let writeResult = try self.source.write(contentsOf: CollectionOfOne(entries))
// Exit early if EOF was read; no use in trying to produce more.
if readEOF {
self.source.finish(throwing: nil)
return
}
let writeResult = source.yield(contentsOf: CollectionOfOne(entries))

switch writeResult {
case .produceMore:
self.produceMore()
case let .enqueueCallback(token):
self.source.enqueueCallback(callbackToken: token) {
switch $0 {
case .success:
self.produceMore()
case .failure:
self.close()
}
}
}
} catch {
// Failure to write means the source is already done, that's okay we just need to
// update our state and stop producing.
// Exit early if EOF was read; no use in trying to produce more.
if readEOF {
source.finish()
self.clearSource()
return
}

switch writeResult {
case .produceMore:
self.produceMore()
case .stopProducing:
self.pauseProducing()
case .dropped:
// The source is finished; mark ourselves as done.
self.close()
}
}
Expand All @@ -282,25 +340,30 @@ private struct DirectoryEntryProducer {
/// Note that this is not a `Sequence` because we allow for errors to be thrown on `next()`.
@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
private struct DirectoryEnumerator: Sendable {
private enum State: @unchecked Sendable {
internal enum State: @unchecked Sendable {
case modifying
case idle(SystemFileHandle.SendableView, recursive: Bool)
case open(NIOThreadPool, Source, [DirectoryEntry])
case openPausedProducing(NIOThreadPool, Source, [DirectoryEntry])
case done
}

/// The source of directory entries.
private enum Source {
internal enum Source {
case readdir(CInterop.DirPointer)
case fts(CInterop.FTSPointer)
}

/// The current state of enumeration.
private var state: State
internal var state: State

/// The path to the directory being enumerated.
private let path: FilePath

/// The route via which directory entry batches are yielded,
/// the sourcing end of the `DirectoryEntrySequenceProducer`
internal var sequenceProducerSource: DirectoryEntrySequenceProducer.Source?

/// Information about an entry returned by FTS. See 'fts(3)'.
private enum FTSInfo: Hashable, Sendable {
case directoryPreOrder
Expand Down Expand Up @@ -353,22 +416,38 @@ private struct DirectoryEnumerator: Sendable {
self.path = handle.path
}

internal func produceMore() -> NIOThreadPool? {
internal mutating func produceMore() -> NIOThreadPool? {
switch self.state {
case let .idle(handle, _):
return handle.threadPool
case let .open(threadPool, _, _):
return threadPool
case .openPausedProducing(let threadPool, let source, let array):
self.state = .open(threadPool, source, array)
return threadPool
case .done:
return nil
case .modifying:
fatalError()
}
}

internal mutating func pauseProducing() {
switch self.state {
case .open(let threadPool, let source, let array):
self.state = .openPausedProducing(threadPool, source, array)
case .idle:
() // we won't apply back pressure until something has been read
case .openPausedProducing, .done:
() // no-op
case .modifying:
fatalError()
}
}

internal func threadPoolForClosing() -> NIOThreadPool? {
switch self.state {
case let .open(threadPool, _, _):
case .open(let threadPool, _, _), .openPausedProducing(let threadPool, _, _):
return threadPool
case .idle, .done:
// Don't need to close in the idle state: we don't own the handle.
Expand Down Expand Up @@ -397,7 +476,7 @@ private struct DirectoryEnumerator: Sendable {
// We don't own the handle so don't close it.
self.state = .done

case let .open(_, mode, _):
case .open(_, let mode, _), .openPausedProducing(_, let mode, _):
self.state = .done
switch mode {
case .readdir(let dir):
Expand Down Expand Up @@ -631,6 +710,9 @@ private struct DirectoryEnumerator: Sendable {
return result
}

case .openPausedProducing:
return .yield(.success([]))

case .done:
return .yield(.success([]))

Expand Down
Loading
Loading