Skip to content

Commit fcdb6c3

Browse files
authored
Adopt NIOThrowingAsyncSequenceProducer 2nd try (#2917)
### Motivation: Adopt `NIOThrowingAsyncSequenceProducer` in NIOFileSystem to reduce code duplication. ### Modifications: Adopt `NIOThrowingAsyncSequenceProducer` in NIOFileSystem `DirectoryEntryProducer` and `FileChunkProducer`. This change was previously merged and then backed out due to issues (#2879). The original change is in the first commit, the second commit contains additional changes. In the original adoption of `NIOThrowingAsyncSequenceProducer` the code did not deal with backpressure being applied very well, in some cases causes hangs. A key bug was that it was not protected against re-entrant calls to `produceMore`. Such calls may happen e.g. when the producer has been asked to pause producing and then resume again before the initial read had completed. This resulted in overlapping reads and re-ordered data. This change introduces a new activity state which is protected by a lock and keeps track of if we are in the critical section to serialize access. `DirectoryEntryProducer` performs most of its logic within a lock and so doesn't seem to be impacted in the same way. ### Result: No functional changes. Internal changes reduce code duplication.
1 parent c16420c commit fcdb6c3

File tree

4 files changed

+495
-206
lines changed

4 files changed

+495
-206
lines changed

Sources/NIOFileSystem/DirectoryEntries.swift

+147-65
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import CNIODarwin
1717
import CNIOLinux
1818
import NIOConcurrencyHelpers
19+
import NIOCore
1920
import NIOPosix
2021
@preconcurrency import SystemPackage
2122

@@ -89,17 +90,17 @@ extension DirectoryEntries {
8990
public typealias AsyncIterator = BatchedIterator
9091
public typealias Element = [DirectoryEntry]
9192

92-
private let stream: BufferedOrAnyStream<[DirectoryEntry]>
93+
private let stream: BufferedOrAnyStream<[DirectoryEntry], DirectoryEntryProducer>
9394

9495
/// Creates a ``DirectoryEntries/Batched`` sequence by wrapping an `AsyncSequence`
9596
/// of directory entry batches.
9697
public init<S: AsyncSequence>(wrapping sequence: S) where S.Element == Element {
97-
self.stream = BufferedOrAnyStream(wrapping: sequence)
98+
self.stream = BufferedOrAnyStream<[DirectoryEntry], DirectoryEntryProducer>(wrapping: sequence)
9899
}
99100

100101
fileprivate init(handle: SystemFileHandle, recursive: Bool) {
101102
// Expanding the batches yields watermarks of 256 and 512 directory entries.
102-
let stream = BufferedStream.makeBatchedDirectoryEntryStream(
103+
let stream = NIOThrowingAsyncSequenceProducer.makeBatchedDirectoryEntryStream(
103104
handle: handle,
104105
recursive: recursive,
105106
entriesPerBatch: 64,
@@ -116,9 +117,11 @@ extension DirectoryEntries {
116117

117118
/// An `AsyncIteratorProtocol` of `Array<DirectoryEntry>`.
118119
public struct BatchedIterator: AsyncIteratorProtocol {
119-
private var iterator: BufferedOrAnyStream<[DirectoryEntry]>.AsyncIterator
120+
private var iterator: BufferedOrAnyStream<[DirectoryEntry], DirectoryEntryProducer>.AsyncIterator
120121

121-
init(wrapping iterator: BufferedOrAnyStream<[DirectoryEntry]>.AsyncIterator) {
122+
fileprivate init(
123+
wrapping iterator: BufferedOrAnyStream<[DirectoryEntry], DirectoryEntryProducer>.AsyncIterator
124+
) {
122125
self.iterator = iterator
123126
}
124127

@@ -135,52 +138,95 @@ extension DirectoryEntries.Batched.AsyncIterator: Sendable {}
135138
// MARK: - Internal
136139

137140
@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
138-
extension BufferedStream where Element == [DirectoryEntry] {
141+
extension NIOThrowingAsyncSequenceProducer
142+
where
143+
Element == [DirectoryEntry],
144+
Failure == (any Error),
145+
Strategy == NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark,
146+
Delegate == DirectoryEntryProducer
147+
{
139148
fileprivate static func makeBatchedDirectoryEntryStream(
140149
handle: SystemFileHandle,
141150
recursive: Bool,
142151
entriesPerBatch: Int,
143152
lowWatermark: Int,
144153
highWatermark: Int
145-
) -> BufferedStream<[DirectoryEntry]> {
146-
let state = DirectoryEnumerator(handle: handle, recursive: recursive)
147-
let protectedState = NIOLockedValueBox(state)
148-
149-
var (stream, source) = BufferedStream.makeStream(
150-
of: [DirectoryEntry].self,
151-
backPressureStrategy: .watermark(low: lowWatermark, high: highWatermark)
152-
)
153-
154-
source.onTermination = {
155-
guard let threadPool = protectedState.withLockedValue({ $0.threadPoolForClosing() }) else {
156-
return
157-
}
158-
159-
threadPool.submit { _ in // always run, even if cancelled
160-
protectedState.withLockedValue { state in
161-
state.closeIfNecessary()
162-
}
163-
}
164-
}
165-
154+
) -> NIOThrowingAsyncSequenceProducer<
155+
[DirectoryEntry], any Error, NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark,
156+
DirectoryEntryProducer
157+
> {
166158
let producer = DirectoryEntryProducer(
167-
state: protectedState,
168-
source: source,
159+
handle: handle,
160+
recursive: recursive,
169161
entriesPerBatch: entriesPerBatch
170162
)
171-
// Start producing immediately.
172-
producer.produceMore()
173163

174-
return stream
164+
let nioThrowingAsyncSequence = NIOThrowingAsyncSequenceProducer.makeSequence(
165+
elementType: [DirectoryEntry].self,
166+
backPressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark(
167+
lowWatermark: lowWatermark,
168+
highWatermark: highWatermark
169+
),
170+
finishOnDeinit: false,
171+
delegate: producer
172+
)
173+
174+
producer.setSequenceProducerSource(nioThrowingAsyncSequence.source)
175+
176+
return nioThrowingAsyncSequence.sequence
175177
}
176178
}
177179

178180
@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
179-
private struct DirectoryEntryProducer {
181+
private typealias DirectoryEntrySequenceProducer = NIOThrowingAsyncSequenceProducer<
182+
[DirectoryEntry], Error, NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark, DirectoryEntryProducer
183+
>
184+
185+
@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
186+
private final class DirectoryEntryProducer: NIOAsyncSequenceProducerDelegate {
180187
let state: NIOLockedValueBox<DirectoryEnumerator>
181-
let source: BufferedStream<[DirectoryEntry]>.Source
182188
let entriesPerBatch: Int
183189

190+
init(handle: SystemFileHandle, recursive: Bool, entriesPerBatch: Int) {
191+
let state = DirectoryEnumerator(handle: handle, recursive: recursive)
192+
self.state = NIOLockedValueBox(state)
193+
self.entriesPerBatch = entriesPerBatch
194+
}
195+
196+
func didTerminate() {
197+
guard let threadPool = self.state.withLockedValue({ $0.threadPoolForClosing() }) else {
198+
return
199+
}
200+
201+
threadPool.submit { _ in // always run, even if cancelled
202+
self.state.withLockedValue { state in
203+
state.closeIfNecessary()
204+
}
205+
}
206+
}
207+
208+
/// sets the source within the producer state
209+
func setSequenceProducerSource(_ sequenceProducerSource: DirectoryEntrySequenceProducer.Source) {
210+
self.state.withLockedValue { state in
211+
switch state.state {
212+
case .idle:
213+
state.sequenceProducerSource = sequenceProducerSource
214+
case .done:
215+
sequenceProducerSource.finish()
216+
case .open, .openPausedProducing:
217+
fatalError()
218+
case .modifying:
219+
fatalError()
220+
}
221+
}
222+
}
223+
224+
func clearSource() {
225+
self.state.withLockedValue { state in
226+
state.sequenceProducerSource = nil
227+
}
228+
}
229+
184230
/// The 'entry point' for producing elements.
185231
///
186232
/// Calling this function will start producing directory entries asynchronously by dispatching
@@ -207,6 +253,12 @@ private struct DirectoryEntryProducer {
207253
}
208254
}
209255

256+
func pauseProducing() {
257+
self.state.withLockedValue { state in
258+
state.pauseProducing()
259+
}
260+
}
261+
210262
private func nextBatch() throws -> [DirectoryEntry] {
211263
try self.state.withLockedValue { state in
212264
try state.next(self.entriesPerBatch)
@@ -221,45 +273,51 @@ private struct DirectoryEntryProducer {
221273
// Failed to read more entries: close and notify the stream so consumers receive the
222274
// error.
223275
self.close()
224-
self.source.finish(throwing: error)
276+
let source = self.state.withLockedValue { state in
277+
state.sequenceProducerSource
278+
}
279+
source?.finish(error)
280+
self.clearSource()
225281
}
226282
}
227283

228284
private func onNextBatch(_ entries: [DirectoryEntry]) {
285+
let source = self.state.withLockedValue { state in
286+
state.sequenceProducerSource
287+
}
288+
289+
guard let source else {
290+
assertionFailure("unexpectedly missing source")
291+
return
292+
}
293+
229294
// No entries were read: this must be the end (as the batch size must be greater than zero).
230295
if entries.isEmpty {
231-
self.source.finish(throwing: nil)
296+
source.finish()
297+
self.clearSource()
232298
return
233299
}
234300

235301
// Reading short means reading EOF. The enumerator closes itself in that case.
236302
let readEOF = entries.count < self.entriesPerBatch
237303

238304
// Entries were produced: yield them and maybe produce more.
239-
do {
240-
let writeResult = try self.source.write(contentsOf: CollectionOfOne(entries))
241-
// Exit early if EOF was read; no use in trying to produce more.
242-
if readEOF {
243-
self.source.finish(throwing: nil)
244-
return
245-
}
305+
let writeResult = source.yield(contentsOf: CollectionOfOne(entries))
246306

247-
switch writeResult {
248-
case .produceMore:
249-
self.produceMore()
250-
case let .enqueueCallback(token):
251-
self.source.enqueueCallback(callbackToken: token) {
252-
switch $0 {
253-
case .success:
254-
self.produceMore()
255-
case .failure:
256-
self.close()
257-
}
258-
}
259-
}
260-
} catch {
261-
// Failure to write means the source is already done, that's okay we just need to
262-
// update our state and stop producing.
307+
// Exit early if EOF was read; no use in trying to produce more.
308+
if readEOF {
309+
source.finish()
310+
self.clearSource()
311+
return
312+
}
313+
314+
switch writeResult {
315+
case .produceMore:
316+
self.produceMore()
317+
case .stopProducing:
318+
self.pauseProducing()
319+
case .dropped:
320+
// The source is finished; mark ourselves as done.
263321
self.close()
264322
}
265323
}
@@ -282,25 +340,30 @@ private struct DirectoryEntryProducer {
282340
/// Note that this is not a `Sequence` because we allow for errors to be thrown on `next()`.
283341
@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
284342
private struct DirectoryEnumerator: Sendable {
285-
private enum State: @unchecked Sendable {
343+
internal enum State: @unchecked Sendable {
286344
case modifying
287345
case idle(SystemFileHandle.SendableView, recursive: Bool)
288346
case open(NIOThreadPool, Source, [DirectoryEntry])
347+
case openPausedProducing(NIOThreadPool, Source, [DirectoryEntry])
289348
case done
290349
}
291350

292351
/// The source of directory entries.
293-
private enum Source {
352+
internal enum Source {
294353
case readdir(CInterop.DirPointer)
295354
case fts(CInterop.FTSPointer)
296355
}
297356

298357
/// The current state of enumeration.
299-
private var state: State
358+
internal var state: State
300359

301360
/// The path to the directory being enumerated.
302361
private let path: FilePath
303362

363+
/// The route via which directory entry batches are yielded,
364+
/// the sourcing end of the `DirectoryEntrySequenceProducer`
365+
internal var sequenceProducerSource: DirectoryEntrySequenceProducer.Source?
366+
304367
/// Information about an entry returned by FTS. See 'fts(3)'.
305368
private enum FTSInfo: Hashable, Sendable {
306369
case directoryPreOrder
@@ -353,22 +416,38 @@ private struct DirectoryEnumerator: Sendable {
353416
self.path = handle.path
354417
}
355418

356-
internal func produceMore() -> NIOThreadPool? {
419+
internal mutating func produceMore() -> NIOThreadPool? {
357420
switch self.state {
358421
case let .idle(handle, _):
359422
return handle.threadPool
360423
case let .open(threadPool, _, _):
361424
return threadPool
425+
case .openPausedProducing(let threadPool, let source, let array):
426+
self.state = .open(threadPool, source, array)
427+
return threadPool
362428
case .done:
363429
return nil
364430
case .modifying:
365431
fatalError()
366432
}
367433
}
368434

435+
internal mutating func pauseProducing() {
436+
switch self.state {
437+
case .open(let threadPool, let source, let array):
438+
self.state = .openPausedProducing(threadPool, source, array)
439+
case .idle:
440+
() // we won't apply back pressure until something has been read
441+
case .openPausedProducing, .done:
442+
() // no-op
443+
case .modifying:
444+
fatalError()
445+
}
446+
}
447+
369448
internal func threadPoolForClosing() -> NIOThreadPool? {
370449
switch self.state {
371-
case let .open(threadPool, _, _):
450+
case .open(let threadPool, _, _), .openPausedProducing(let threadPool, _, _):
372451
return threadPool
373452
case .idle, .done:
374453
// Don't need to close in the idle state: we don't own the handle.
@@ -397,7 +476,7 @@ private struct DirectoryEnumerator: Sendable {
397476
// We don't own the handle so don't close it.
398477
self.state = .done
399478

400-
case let .open(_, mode, _):
479+
case .open(_, let mode, _), .openPausedProducing(_, let mode, _):
401480
self.state = .done
402481
switch mode {
403482
case .readdir(let dir):
@@ -631,6 +710,9 @@ private struct DirectoryEnumerator: Sendable {
631710
return result
632711
}
633712

713+
case .openPausedProducing:
714+
return .yield(.success([]))
715+
634716
case .done:
635717
return .yield(.success([]))
636718

0 commit comments

Comments
 (0)