Skip to content

Commit 0d3043e

Browse files
authored
Revert "Adopt NIOThrowingAsyncSequenceProducer (#2879)" (#2892)
This reverts commit 282f593. ### Motivation: Repeated testing in linux containers has surfaced issues with the implementation which require some rethinking of the code, let's revert it for now and fix it up. ### Modifications: Revert the adoption commit. ### Result: The change is backed out.
1 parent 9f6c011 commit 0d3043e

File tree

4 files changed

+165
-337
lines changed

4 files changed

+165
-337
lines changed

Sources/NIOFileSystem/DirectoryEntries.swift

+65-147
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
import CNIODarwin
1717
import CNIOLinux
1818
import NIOConcurrencyHelpers
19-
import NIOCore
2019
import NIOPosix
2120
@preconcurrency import SystemPackage
2221

@@ -90,17 +89,17 @@ extension DirectoryEntries {
9089
public typealias AsyncIterator = BatchedIterator
9190
public typealias Element = [DirectoryEntry]
9291

93-
private let stream: BufferedOrAnyStream<[DirectoryEntry], DirectoryEntryProducer>
92+
private let stream: BufferedOrAnyStream<[DirectoryEntry]>
9493

9594
/// Creates a ``DirectoryEntries/Batched`` sequence by wrapping an `AsyncSequence`
9695
/// of directory entry batches.
9796
public init<S: AsyncSequence>(wrapping sequence: S) where S.Element == Element {
98-
self.stream = BufferedOrAnyStream<[DirectoryEntry], DirectoryEntryProducer>(wrapping: sequence)
97+
self.stream = BufferedOrAnyStream(wrapping: sequence)
9998
}
10099

101100
fileprivate init(handle: SystemFileHandle, recursive: Bool) {
102101
// Expanding the batches yields watermarks of 256 and 512 directory entries.
103-
let stream = NIOThrowingAsyncSequenceProducer.makeBatchedDirectoryEntryStream(
102+
let stream = BufferedStream.makeBatchedDirectoryEntryStream(
104103
handle: handle,
105104
recursive: recursive,
106105
entriesPerBatch: 64,
@@ -117,11 +116,9 @@ extension DirectoryEntries {
117116

118117
/// An `AsyncIteratorProtocol` of `Array<DirectoryEntry>`.
119118
public struct BatchedIterator: AsyncIteratorProtocol {
120-
private var iterator: BufferedOrAnyStream<[DirectoryEntry], DirectoryEntryProducer>.AsyncIterator
119+
private var iterator: BufferedOrAnyStream<[DirectoryEntry]>.AsyncIterator
121120

122-
fileprivate init(
123-
wrapping iterator: BufferedOrAnyStream<[DirectoryEntry], DirectoryEntryProducer>.AsyncIterator
124-
) {
121+
init(wrapping iterator: BufferedOrAnyStream<[DirectoryEntry]>.AsyncIterator) {
125122
self.iterator = iterator
126123
}
127124

@@ -138,95 +135,52 @@ extension DirectoryEntries.Batched.AsyncIterator: Sendable {}
138135
// MARK: - Internal
139136

140137
@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
141-
extension NIOThrowingAsyncSequenceProducer
142-
where
143-
Element == [DirectoryEntry],
144-
Failure == (any Error),
145-
Strategy == NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark,
146-
Delegate == DirectoryEntryProducer
147-
{
138+
extension BufferedStream where Element == [DirectoryEntry] {
148139
fileprivate static func makeBatchedDirectoryEntryStream(
149140
handle: SystemFileHandle,
150141
recursive: Bool,
151142
entriesPerBatch: Int,
152143
lowWatermark: Int,
153144
highWatermark: Int
154-
) -> NIOThrowingAsyncSequenceProducer<
155-
[DirectoryEntry], any Error, NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark,
156-
DirectoryEntryProducer
157-
> {
158-
let producer = DirectoryEntryProducer(
159-
handle: handle,
160-
recursive: recursive,
161-
entriesPerBatch: entriesPerBatch
162-
)
145+
) -> BufferedStream<[DirectoryEntry]> {
146+
let state = DirectoryEnumerator(handle: handle, recursive: recursive)
147+
let protectedState = NIOLockedValueBox(state)
163148

164-
let nioThrowingAsyncSequence = NIOThrowingAsyncSequenceProducer.makeSequence(
165-
elementType: [DirectoryEntry].self,
166-
backPressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark(
167-
lowWatermark: lowWatermark,
168-
highWatermark: highWatermark
169-
),
170-
finishOnDeinit: false,
171-
delegate: producer
149+
var (stream, source) = BufferedStream.makeStream(
150+
of: [DirectoryEntry].self,
151+
backPressureStrategy: .watermark(low: lowWatermark, high: highWatermark)
172152
)
173153

174-
producer.setSequenceProducerSource(nioThrowingAsyncSequence.source)
154+
source.onTermination = {
155+
guard let threadPool = protectedState.withLockedValue({ $0.threadPoolForClosing() }) else {
156+
return
157+
}
158+
159+
threadPool.submit { _ in // always run, even if cancelled
160+
protectedState.withLockedValue { state in
161+
state.closeIfNecessary()
162+
}
163+
}
164+
}
165+
166+
let producer = DirectoryEntryProducer(
167+
state: protectedState,
168+
source: source,
169+
entriesPerBatch: entriesPerBatch
170+
)
171+
// Start producing immediately.
172+
producer.produceMore()
175173

176-
return nioThrowingAsyncSequence.sequence
174+
return stream
177175
}
178176
}
179177

180178
@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
181-
private typealias DirectoryEntrySequenceProducer = NIOThrowingAsyncSequenceProducer<
182-
[DirectoryEntry], Error, NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark, DirectoryEntryProducer
183-
>
184-
185-
@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
186-
private final class DirectoryEntryProducer: NIOAsyncSequenceProducerDelegate {
179+
private struct DirectoryEntryProducer {
187180
let state: NIOLockedValueBox<DirectoryEnumerator>
181+
let source: BufferedStream<[DirectoryEntry]>.Source
188182
let entriesPerBatch: Int
189183

190-
init(handle: SystemFileHandle, recursive: Bool, entriesPerBatch: Int) {
191-
let state = DirectoryEnumerator(handle: handle, recursive: recursive)
192-
self.state = NIOLockedValueBox(state)
193-
self.entriesPerBatch = entriesPerBatch
194-
}
195-
196-
func didTerminate() {
197-
guard let threadPool = self.state.withLockedValue({ $0.threadPoolForClosing() }) else {
198-
return
199-
}
200-
201-
threadPool.submit { _ in // always run, even if cancelled
202-
self.state.withLockedValue { state in
203-
state.closeIfNecessary()
204-
}
205-
}
206-
}
207-
208-
/// sets the source within the producer state
209-
func setSequenceProducerSource(_ sequenceProducerSource: DirectoryEntrySequenceProducer.Source) {
210-
self.state.withLockedValue { state in
211-
switch state.state {
212-
case .idle:
213-
state.sequenceProducerSource = sequenceProducerSource
214-
case .done:
215-
sequenceProducerSource.finish()
216-
case .open, .openPausedProducing:
217-
fatalError()
218-
case .modifying:
219-
fatalError()
220-
}
221-
}
222-
}
223-
224-
func clearSource() {
225-
self.state.withLockedValue { state in
226-
state.sequenceProducerSource = nil
227-
}
228-
}
229-
230184
/// The 'entry point' for producing elements.
231185
///
232186
/// Calling this function will start producing directory entries asynchronously by dispatching
@@ -253,12 +207,6 @@ private final class DirectoryEntryProducer: NIOAsyncSequenceProducerDelegate {
253207
}
254208
}
255209

256-
func pauseProducing() {
257-
self.state.withLockedValue { state in
258-
state.pauseProducing()
259-
}
260-
}
261-
262210
private func nextBatch() throws -> [DirectoryEntry] {
263211
try self.state.withLockedValue { state in
264212
try state.next(self.entriesPerBatch)
@@ -273,51 +221,45 @@ private final class DirectoryEntryProducer: NIOAsyncSequenceProducerDelegate {
273221
// Failed to read more entries: close and notify the stream so consumers receive the
274222
// error.
275223
self.close()
276-
let source = self.state.withLockedValue { state in
277-
state.sequenceProducerSource
278-
}
279-
source?.finish(error)
280-
self.clearSource()
224+
self.source.finish(throwing: error)
281225
}
282226
}
283227

284228
private func onNextBatch(_ entries: [DirectoryEntry]) {
285-
let source = self.state.withLockedValue { state in
286-
state.sequenceProducerSource
287-
}
288-
289-
guard let source else {
290-
assertionFailure("unexpectedly missing source")
291-
return
292-
}
293-
294229
// No entries were read: this must be the end (as the batch size must be greater than zero).
295230
if entries.isEmpty {
296-
source.finish()
297-
self.clearSource()
231+
self.source.finish(throwing: nil)
298232
return
299233
}
300234

301235
// Reading short means reading EOF. The enumerator closes itself in that case.
302236
let readEOF = entries.count < self.entriesPerBatch
303237

304238
// Entries were produced: yield them and maybe produce more.
305-
let writeResult = source.yield(contentsOf: CollectionOfOne(entries))
306-
307-
// Exit early if EOF was read; no use in trying to produce more.
308-
if readEOF {
309-
source.finish()
310-
self.clearSource()
311-
return
312-
}
239+
do {
240+
let writeResult = try self.source.write(contentsOf: CollectionOfOne(entries))
241+
// Exit early if EOF was read; no use in trying to produce more.
242+
if readEOF {
243+
self.source.finish(throwing: nil)
244+
return
245+
}
313246

314-
switch writeResult {
315-
case .produceMore:
316-
self.produceMore()
317-
case .stopProducing:
318-
self.pauseProducing()
319-
case .dropped:
320-
// The source is finished; mark ourselves as done.
247+
switch writeResult {
248+
case .produceMore:
249+
self.produceMore()
250+
case let .enqueueCallback(token):
251+
self.source.enqueueCallback(callbackToken: token) {
252+
switch $0 {
253+
case .success:
254+
self.produceMore()
255+
case .failure:
256+
self.close()
257+
}
258+
}
259+
}
260+
} catch {
261+
// Failure to write means the source is already done, that's okay we just need to
262+
// update our state and stop producing.
321263
self.close()
322264
}
323265
}
@@ -340,30 +282,25 @@ private final class DirectoryEntryProducer: NIOAsyncSequenceProducerDelegate {
340282
/// Note that this is not a `Sequence` because we allow for errors to be thrown on `next()`.
341283
@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
342284
private struct DirectoryEnumerator: Sendable {
343-
internal enum State: @unchecked Sendable {
285+
private enum State: @unchecked Sendable {
344286
case modifying
345287
case idle(SystemFileHandle.SendableView, recursive: Bool)
346288
case open(NIOThreadPool, Source, [DirectoryEntry])
347-
case openPausedProducing(NIOThreadPool, Source, [DirectoryEntry])
348289
case done
349290
}
350291

351292
/// The source of directory entries.
352-
internal enum Source {
293+
private enum Source {
353294
case readdir(CInterop.DirPointer)
354295
case fts(CInterop.FTSPointer)
355296
}
356297

357298
/// The current state of enumeration.
358-
internal var state: State
299+
private var state: State
359300

360301
/// The path to the directory being enumerated.
361302
private let path: FilePath
362303

363-
/// The route via which directory entry batches are yielded,
364-
/// the sourcing end of the `DirectoryEntrySequenceProducer`
365-
internal var sequenceProducerSource: DirectoryEntrySequenceProducer.Source?
366-
367304
/// Information about an entry returned by FTS. See 'fts(3)'.
368305
private enum FTSInfo: Hashable, Sendable {
369306
case directoryPreOrder
@@ -416,38 +353,22 @@ private struct DirectoryEnumerator: Sendable {
416353
self.path = handle.path
417354
}
418355

419-
internal mutating func produceMore() -> NIOThreadPool? {
356+
internal func produceMore() -> NIOThreadPool? {
420357
switch self.state {
421358
case let .idle(handle, _):
422359
return handle.threadPool
423360
case let .open(threadPool, _, _):
424361
return threadPool
425-
case .openPausedProducing(let threadPool, let source, let array):
426-
self.state = .open(threadPool, source, array)
427-
return threadPool
428362
case .done:
429363
return nil
430364
case .modifying:
431365
fatalError()
432366
}
433367
}
434368

435-
internal mutating func pauseProducing() {
436-
switch self.state {
437-
case .open(let threadPool, let source, let array):
438-
self.state = .openPausedProducing(threadPool, source, array)
439-
case .idle:
440-
() // we won't apply back pressure until something has been read
441-
case .openPausedProducing, .done:
442-
() // no-op
443-
case .modifying:
444-
fatalError()
445-
}
446-
}
447-
448369
internal func threadPoolForClosing() -> NIOThreadPool? {
449370
switch self.state {
450-
case .open(let threadPool, _, _), .openPausedProducing(let threadPool, _, _):
371+
case let .open(threadPool, _, _):
451372
return threadPool
452373
case .idle, .done:
453374
// Don't need to close in the idle state: we don't own the handle.
@@ -476,7 +397,7 @@ private struct DirectoryEnumerator: Sendable {
476397
// We don't own the handle so don't close it.
477398
self.state = .done
478399

479-
case .open(_, let mode, _), .openPausedProducing(_, let mode, _):
400+
case let .open(_, mode, _):
480401
self.state = .done
481402
switch mode {
482403
case .readdir(let dir):
@@ -710,9 +631,6 @@ private struct DirectoryEnumerator: Sendable {
710631
return result
711632
}
712633

713-
case .openPausedProducing:
714-
return .yield(.success([]))
715-
716634
case .done:
717635
return .yield(.success([]))
718636

0 commit comments

Comments
 (0)