Skip to content

Commit

Permalink
use a private function to simplify cascading channel failures
Browse files Browse the repository at this point in the history
  • Loading branch information
MahdiBM committed Jun 20, 2024
1 parent e92b2f0 commit e127327
Showing 1 changed file with 15 additions and 25 deletions.
40 changes: 15 additions & 25 deletions Sources/PostgresNIO/Connection/PostgresConnection.swift
Original file line number Diff line number Diff line change
Expand Up @@ -222,9 +222,7 @@ public final class PostgresConnection: @unchecked Sendable {
promise: promise
)

let writePromise = self.channel.eventLoop.makePromise(of: Void.self)
self.channel.write(HandlerTask.extendedQuery(context), promise: writePromise)
writePromise.futureResult.cascadeFailure(to: promise)
self.write(HandlerTask.extendedQuery(context), cascadingFailureTo: promise)

return promise.futureResult
}
Expand All @@ -241,9 +239,7 @@ public final class PostgresConnection: @unchecked Sendable {
promise: promise
)

let writePromise = self.channel.eventLoop.makePromise(of: Void.self)
self.channel.write(HandlerTask.extendedQuery(context), promise: writePromise)
writePromise.futureResult.cascadeFailure(to: promise)
self.write(HandlerTask.extendedQuery(context), cascadingFailureTo: promise)

return promise.futureResult.map { rowDescription in
PSQLPreparedStatement(name: name, query: query, connection: self, rowDescription: rowDescription)
Expand All @@ -260,9 +256,7 @@ public final class PostgresConnection: @unchecked Sendable {
logger: logger,
promise: promise)

let writePromise = self.channel.eventLoop.makePromise(of: Void.self)
self.channel.write(HandlerTask.extendedQuery(context), promise: writePromise)
writePromise.futureResult.cascadeFailure(to: promise)
self.write(HandlerTask.extendedQuery(context), cascadingFailureTo: promise)

return promise.futureResult
}
Expand All @@ -271,9 +265,7 @@ public final class PostgresConnection: @unchecked Sendable {
let promise = self.channel.eventLoop.makePromise(of: Void.self)
let context = CloseCommandContext(target: target, logger: logger, promise: promise)

let writePromise = self.channel.eventLoop.makePromise(of: Void.self)
self.channel.write(HandlerTask.closeCommand(context), promise: writePromise)
writePromise.futureResult.cascadeFailure(to: promise)
self.write(HandlerTask.closeCommand(context), cascadingFailureTo: promise)

return promise.futureResult
}
Expand Down Expand Up @@ -437,9 +429,7 @@ extension PostgresConnection {
promise: promise
)

let writePromise = self.channel.eventLoop.makePromise(of: Void.self)
self.channel.write(HandlerTask.extendedQuery(context), promise: writePromise)
writePromise.futureResult.cascadeFailure(to: promise)
self.write(HandlerTask.extendedQuery(context), cascadingFailureTo: promise)

do {
return try await promise.futureResult.map({ $0.asyncSequence() }).get()
Expand Down Expand Up @@ -498,9 +488,7 @@ extension PostgresConnection {
promise: promise
))

let writePromise = self.channel.eventLoop.makePromise(of: Void.self)
self.channel.write(task, promise: writePromise)
writePromise.futureResult.cascadeFailure(to: promise)
self.write(task, cascadingFailureTo: promise)

do {
return try await promise.futureResult
Expand Down Expand Up @@ -537,9 +525,7 @@ extension PostgresConnection {
promise: promise
))

let writePromise = self.channel.eventLoop.makePromise(of: Void.self)
self.channel.write(task, promise: writePromise)
writePromise.futureResult.cascadeFailure(to: promise)
self.write(task, cascadingFailureTo: promise)

do {
return try await promise.futureResult
Expand All @@ -555,6 +541,12 @@ extension PostgresConnection {
throw error // rethrow with more metadata
}
}

private func write<T, Value>(_ any: T, cascadingFailureTo promise: EventLoopPromise<Value>) {

This comment has been minimized.

Copy link
@fabianfett

fabianfett Jun 20, 2024

Collaborator
func write<T, Value>(_ task: HandlerTask, cascadingFailureTo promise: EventLoopPromise<Value>) {
let writePromise = self.channel.eventLoop.makePromise(of: Void.self)
self.channel.write(any, promise: writePromise)
writePromise.futureResult.cascadeFailure(to: promise)
}
}

// MARK: EventLoopFuture interface
Expand Down Expand Up @@ -738,10 +730,7 @@ extension PostgresConnection {
closure: notificationHandler
)

let task = HandlerTask.startListening(listener)
let writePromise = self.channel.eventLoop.makePromise(of: Void.self)
self.channel.write(task, promise: writePromise)
writePromise.futureResult.cascadeFailure(to: listenContext.promise)
self.write(HandlerTask.startListening(listener), cascadingFailureTo: listenContext.promise)

listenContext.future.whenComplete { _ in
let task = HandlerTask.cancelListening(channel, id)
Expand Down Expand Up @@ -788,3 +777,4 @@ extension PostgresConnection {
#endif
}
}

0 comments on commit e127327

Please sign in to comment.