From 9206b252d6c48628ca17ab07d9d23b2b14991eef Mon Sep 17 00:00:00 2001 From: MahdiBM Date: Wed, 19 Jun 2024 14:44:51 +0330 Subject: [PATCH 01/25] Add a test to reproduce the problem --- .../PSQLIntegrationTests.swift | 32 +++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/Tests/IntegrationTests/PSQLIntegrationTests.swift b/Tests/IntegrationTests/PSQLIntegrationTests.swift index 57939c06..8a0f98e4 100644 --- a/Tests/IntegrationTests/PSQLIntegrationTests.swift +++ b/Tests/IntegrationTests/PSQLIntegrationTests.swift @@ -360,4 +360,36 @@ final class IntegrationTests: XCTestCase { } } + func testConnectionClosureMidQueryDoesNotHang() async throws { + let badQuery: PostgresQuery = """ + SELECT * FROM non_existent_table + """ + + try await withThrowingTaskGroup( + of: Void.self + ) { taskGroup in + + for _ in (0 ..< 1_000) { + taskGroup.addTask { + print("-0---0-000- in") + do { + let conn = try await PostgresConnection.test( + on: NIOSingletons.posixEventLoopGroup.next() + ).get() + + async let close: () = conn.closeGracefully() + async let query = conn.query(badQuery, logger: .psqlTest) + + _ = try? await (close, query) + print("-0---0-000- out") + } catch { + print("-0---0-000- out") + throw error + } + } + } + + try await taskGroup.waitForAll() + } + } } From 5ff285bc4992726e9418aec5bfe00fea2d695fa1 Mon Sep 17 00:00:00 2001 From: MahdiBM Date: Wed, 19 Jun 2024 16:38:52 +0330 Subject: [PATCH 02/25] fail query promise when channel write fails --- .../Connection/PostgresConnection.swift | 6 ++++- .../PSQLIntegrationTests.swift | 24 +++++++------------ 2 files changed, 13 insertions(+), 17 deletions(-) diff --git a/Sources/PostgresNIO/Connection/PostgresConnection.swift b/Sources/PostgresNIO/Connection/PostgresConnection.swift index eb9dc791..317b0d5b 100644 --- a/Sources/PostgresNIO/Connection/PostgresConnection.swift +++ b/Sources/PostgresNIO/Connection/PostgresConnection.swift @@ -426,7 +426,11 @@ extension PostgresConnection { promise: promise ) - self.channel.write(HandlerTask.extendedQuery(context), promise: nil) + let writePromise = self.channel.eventLoop.makePromise(of: Void.self) + self.channel.write(HandlerTask.extendedQuery(context), promise: writePromise) + writePromise.futureResult.whenFailure { error in + promise.fail(error) + } do { return try await promise.futureResult.map({ $0.asyncSequence() }).get() diff --git a/Tests/IntegrationTests/PSQLIntegrationTests.swift b/Tests/IntegrationTests/PSQLIntegrationTests.swift index 8a0f98e4..015e9bfd 100644 --- a/Tests/IntegrationTests/PSQLIntegrationTests.swift +++ b/Tests/IntegrationTests/PSQLIntegrationTests.swift @@ -368,24 +368,16 @@ final class IntegrationTests: XCTestCase { try await withThrowingTaskGroup( of: Void.self ) { taskGroup in - for _ in (0 ..< 1_000) { taskGroup.addTask { - print("-0---0-000- in") - do { - let conn = try await PostgresConnection.test( - on: NIOSingletons.posixEventLoopGroup.next() - ).get() - - async let close: () = conn.closeGracefully() - async let query = conn.query(badQuery, logger: .psqlTest) - - _ = try? await (close, query) - print("-0---0-000- out") - } catch { - print("-0---0-000- out") - throw error - } + let conn = try await PostgresConnection.test( + on: NIOSingletons.posixEventLoopGroup.next() + ).get() + + async let close: () = conn.closeGracefully() + async let query = conn.query(badQuery, logger: .psqlTest) + + _ = try await (close, query) } } From 3e49ef1a20847cd7ff72ebcc404a8c9440028a52 Mon Sep 17 00:00:00 2001 From: MahdiBM Date: Wed, 19 Jun 2024 16:41:06 +0330 Subject: [PATCH 03/25] make sure the test succeeds --- Sources/PostgresNIO/Connection/PostgresConnection.swift | 4 +--- Tests/IntegrationTests/PSQLIntegrationTests.swift | 4 ++-- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/Sources/PostgresNIO/Connection/PostgresConnection.swift b/Sources/PostgresNIO/Connection/PostgresConnection.swift index 317b0d5b..b641d348 100644 --- a/Sources/PostgresNIO/Connection/PostgresConnection.swift +++ b/Sources/PostgresNIO/Connection/PostgresConnection.swift @@ -428,9 +428,7 @@ extension PostgresConnection { let writePromise = self.channel.eventLoop.makePromise(of: Void.self) self.channel.write(HandlerTask.extendedQuery(context), promise: writePromise) - writePromise.futureResult.whenFailure { error in - promise.fail(error) - } + writePromise.futureResult.cascadeFailure(to: promise) do { return try await promise.futureResult.map({ $0.asyncSequence() }).get() diff --git a/Tests/IntegrationTests/PSQLIntegrationTests.swift b/Tests/IntegrationTests/PSQLIntegrationTests.swift index 015e9bfd..769ef9f3 100644 --- a/Tests/IntegrationTests/PSQLIntegrationTests.swift +++ b/Tests/IntegrationTests/PSQLIntegrationTests.swift @@ -365,7 +365,7 @@ final class IntegrationTests: XCTestCase { SELECT * FROM non_existent_table """ - try await withThrowingTaskGroup( + _ = try await withThrowingTaskGroup( of: Void.self ) { taskGroup in for _ in (0 ..< 1_000) { @@ -381,7 +381,7 @@ final class IntegrationTests: XCTestCase { } } - try await taskGroup.waitForAll() + /// Ignore failures } } } From c2192a7bd6d3e937b2a422c994aca35474682bcf Mon Sep 17 00:00:00 2001 From: MahdiBM Date: Wed, 19 Jun 2024 16:43:17 +0330 Subject: [PATCH 04/25] do the same for prepared statements query --- Sources/PostgresNIO/Connection/PostgresConnection.swift | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/Sources/PostgresNIO/Connection/PostgresConnection.swift b/Sources/PostgresNIO/Connection/PostgresConnection.swift index b641d348..2b088c5f 100644 --- a/Sources/PostgresNIO/Connection/PostgresConnection.swift +++ b/Sources/PostgresNIO/Connection/PostgresConnection.swift @@ -239,7 +239,10 @@ public final class PostgresConnection: @unchecked Sendable { promise: promise ) - self.channel.write(HandlerTask.extendedQuery(context), promise: nil) + let writePromise = self.channel.eventLoop.makePromise(of: Void.self) + self.channel.write(HandlerTask.extendedQuery(context), promise: writePromise) + writePromise.futureResult.cascadeFailure(to: promise) + return promise.futureResult.map { rowDescription in PSQLPreparedStatement(name: name, query: query, connection: self, rowDescription: rowDescription) } From a288cc216216b3eedee4bff79ef0b65880763638 Mon Sep 17 00:00:00 2001 From: MahdiBM Date: Wed, 19 Jun 2024 16:49:31 +0330 Subject: [PATCH 05/25] minor refinements --- Tests/IntegrationTests/PSQLIntegrationTests.swift | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/Tests/IntegrationTests/PSQLIntegrationTests.swift b/Tests/IntegrationTests/PSQLIntegrationTests.swift index 769ef9f3..b74bfb66 100644 --- a/Tests/IntegrationTests/PSQLIntegrationTests.swift +++ b/Tests/IntegrationTests/PSQLIntegrationTests.swift @@ -365,9 +365,7 @@ final class IntegrationTests: XCTestCase { SELECT * FROM non_existent_table """ - _ = try await withThrowingTaskGroup( - of: Void.self - ) { taskGroup in + _ = await withThrowingTaskGroup(of: Void.self) { taskGroup in for _ in (0 ..< 1_000) { taskGroup.addTask { let conn = try await PostgresConnection.test( @@ -381,7 +379,7 @@ final class IntegrationTests: XCTestCase { } } - /// Ignore failures + // Ignore failures } } } From d98944aabddc20d5022cd2c1d0be5e115939da3c Mon Sep 17 00:00:00 2001 From: MahdiBM Date: Wed, 19 Jun 2024 16:57:44 +0330 Subject: [PATCH 06/25] more clear query --- Tests/IntegrationTests/PSQLIntegrationTests.swift | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/Tests/IntegrationTests/PSQLIntegrationTests.swift b/Tests/IntegrationTests/PSQLIntegrationTests.swift index b74bfb66..b2c66f52 100644 --- a/Tests/IntegrationTests/PSQLIntegrationTests.swift +++ b/Tests/IntegrationTests/PSQLIntegrationTests.swift @@ -361,9 +361,7 @@ final class IntegrationTests: XCTestCase { } func testConnectionClosureMidQueryDoesNotHang() async throws { - let badQuery: PostgresQuery = """ - SELECT * FROM non_existent_table - """ + let query: PostgresQuery = "SELECT 1" _ = await withThrowingTaskGroup(of: Void.self) { taskGroup in for _ in (0 ..< 1_000) { @@ -373,7 +371,7 @@ final class IntegrationTests: XCTestCase { ).get() async let close: () = conn.closeGracefully() - async let query = conn.query(badQuery, logger: .psqlTest) + async let query = conn.query(query, logger: .psqlTest) _ = try await (close, query) } From 00c11b6b21e3c407bb4fb8e268305d27bebe23d2 Mon Sep 17 00:00:00 2001 From: MahdiBM Date: Wed, 19 Jun 2024 17:09:34 +0330 Subject: [PATCH 07/25] more writePromise s catching channel failures --- .../Connection/PostgresConnection.swift | 32 +++++++++++++++---- 1 file changed, 25 insertions(+), 7 deletions(-) diff --git a/Sources/PostgresNIO/Connection/PostgresConnection.swift b/Sources/PostgresNIO/Connection/PostgresConnection.swift index 2b088c5f..b76ce86e 100644 --- a/Sources/PostgresNIO/Connection/PostgresConnection.swift +++ b/Sources/PostgresNIO/Connection/PostgresConnection.swift @@ -222,7 +222,9 @@ public final class PostgresConnection: @unchecked Sendable { promise: promise ) - self.channel.write(HandlerTask.extendedQuery(context), promise: nil) + let writePromise = self.channel.eventLoop.makePromise(of: Void.self) + self.channel.write(HandlerTask.extendedQuery(context), promise: writePromise) + writePromise.futureResult.cascadeFailure(to: promise) return promise.futureResult } @@ -258,7 +260,10 @@ public final class PostgresConnection: @unchecked Sendable { logger: logger, promise: promise) - self.channel.write(HandlerTask.extendedQuery(context), promise: nil) + let writePromise = self.channel.eventLoop.makePromise(of: Void.self) + self.channel.write(HandlerTask.extendedQuery(context), promise: writePromise) + writePromise.futureResult.cascadeFailure(to: promise) + return promise.futureResult } @@ -266,7 +271,10 @@ public final class PostgresConnection: @unchecked Sendable { let promise = self.channel.eventLoop.makePromise(of: Void.self) let context = CloseCommandContext(target: target, logger: logger, promise: promise) - self.channel.write(HandlerTask.closeCommand(context), promise: nil) + let writePromise = self.channel.eventLoop.makePromise(of: Void.self) + self.channel.write(HandlerTask.closeCommand(context), promise: writePromise) + writePromise.futureResult.cascadeFailure(to: promise) + return promise.futureResult } @@ -485,7 +493,11 @@ extension PostgresConnection { logger: logger, promise: promise )) - self.channel.write(task, promise: nil) + + let writePromise = self.channel.eventLoop.makePromise(of: Void.self) + self.channel.write(task, promise: writePromise) + writePromise.futureResult.cascadeFailure(to: promise) + do { return try await promise.futureResult .map { $0.asyncSequence() } @@ -520,7 +532,11 @@ extension PostgresConnection { logger: logger, promise: promise )) - self.channel.write(task, promise: nil) + + let writePromise = self.channel.eventLoop.makePromise(of: Void.self) + self.channel.write(task, promise: writePromise) + writePromise.futureResult.cascadeFailure(to: promise) + do { return try await promise.futureResult .map { $0.commandTag } @@ -679,7 +695,7 @@ internal enum PostgresCommands: PostgresRequest { /// Context for receiving NotificationResponse messages on a connection, used for PostgreSQL's `LISTEN`/`NOTIFY` support. public final class PostgresListenContext: Sendable { - private let promise: EventLoopPromise + let promise: EventLoopPromise var future: EventLoopFuture { self.promise.futureResult @@ -719,7 +735,9 @@ extension PostgresConnection { ) let task = HandlerTask.startListening(listener) - self.channel.write(task, promise: nil) + let writePromise = self.channel.eventLoop.makePromise(of: Void.self) + self.channel.write(task, promise: writePromise) + writePromise.futureResult.cascadeFailure(to: listenContext.promise) listenContext.future.whenComplete { _ in let task = HandlerTask.cancelListening(channel, id) From 2ed7946a89393f23d2abb5780155037e327c1241 Mon Sep 17 00:00:00 2001 From: MahdiBM Date: Wed, 19 Jun 2024 17:13:29 +0330 Subject: [PATCH 08/25] one more place --- Sources/PostgresNIO/Connection/PostgresConnection.swift | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/Sources/PostgresNIO/Connection/PostgresConnection.swift b/Sources/PostgresNIO/Connection/PostgresConnection.swift index b76ce86e..d375c547 100644 --- a/Sources/PostgresNIO/Connection/PostgresConnection.swift +++ b/Sources/PostgresNIO/Connection/PostgresConnection.swift @@ -468,7 +468,11 @@ extension PostgresConnection { let task = HandlerTask.startListening(listener) - self.channel.write(task, promise: nil) + let writePromise = self.channel.eventLoop.makePromise(of: Void.self) + self.channel.write(task, promise: writePromise) + writePromise.futureResult.whenFailure { error in + continuation.resume(throwing: error) + } } } onCancel: { let task = HandlerTask.cancelListening(channel, id) @@ -536,7 +540,7 @@ extension PostgresConnection { let writePromise = self.channel.eventLoop.makePromise(of: Void.self) self.channel.write(task, promise: writePromise) writePromise.futureResult.cascadeFailure(to: promise) - + do { return try await promise.futureResult .map { $0.commandTag } From e92b2f0e4c93c344374a815c53f27ee64d347257 Mon Sep 17 00:00:00 2001 From: MahdiBM Date: Wed, 19 Jun 2024 21:43:29 +0330 Subject: [PATCH 09/25] move query into the loop --- Tests/IntegrationTests/PSQLIntegrationTests.swift | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/Tests/IntegrationTests/PSQLIntegrationTests.swift b/Tests/IntegrationTests/PSQLIntegrationTests.swift index b2c66f52..4d0d6d82 100644 --- a/Tests/IntegrationTests/PSQLIntegrationTests.swift +++ b/Tests/IntegrationTests/PSQLIntegrationTests.swift @@ -361,8 +361,6 @@ final class IntegrationTests: XCTestCase { } func testConnectionClosureMidQueryDoesNotHang() async throws { - let query: PostgresQuery = "SELECT 1" - _ = await withThrowingTaskGroup(of: Void.self) { taskGroup in for _ in (0 ..< 1_000) { taskGroup.addTask { @@ -371,7 +369,7 @@ final class IntegrationTests: XCTestCase { ).get() async let close: () = conn.closeGracefully() - async let query = conn.query(query, logger: .psqlTest) + async let query = conn.query("SELECT 1", logger: .psqlTest) _ = try await (close, query) } From e1273276ee3dc42e62dbaab5f8592746045ffa10 Mon Sep 17 00:00:00 2001 From: MahdiBM Date: Thu, 20 Jun 2024 18:29:51 +0330 Subject: [PATCH 10/25] use a private function to simplify cascading channel failures --- .../Connection/PostgresConnection.swift | 40 +++++++------------ 1 file changed, 15 insertions(+), 25 deletions(-) diff --git a/Sources/PostgresNIO/Connection/PostgresConnection.swift b/Sources/PostgresNIO/Connection/PostgresConnection.swift index d375c547..815f993e 100644 --- a/Sources/PostgresNIO/Connection/PostgresConnection.swift +++ b/Sources/PostgresNIO/Connection/PostgresConnection.swift @@ -222,9 +222,7 @@ public final class PostgresConnection: @unchecked Sendable { promise: promise ) - let writePromise = self.channel.eventLoop.makePromise(of: Void.self) - self.channel.write(HandlerTask.extendedQuery(context), promise: writePromise) - writePromise.futureResult.cascadeFailure(to: promise) + self.write(HandlerTask.extendedQuery(context), cascadingFailureTo: promise) return promise.futureResult } @@ -241,9 +239,7 @@ public final class PostgresConnection: @unchecked Sendable { promise: promise ) - let writePromise = self.channel.eventLoop.makePromise(of: Void.self) - self.channel.write(HandlerTask.extendedQuery(context), promise: writePromise) - writePromise.futureResult.cascadeFailure(to: promise) + self.write(HandlerTask.extendedQuery(context), cascadingFailureTo: promise) return promise.futureResult.map { rowDescription in PSQLPreparedStatement(name: name, query: query, connection: self, rowDescription: rowDescription) @@ -260,9 +256,7 @@ public final class PostgresConnection: @unchecked Sendable { logger: logger, promise: promise) - let writePromise = self.channel.eventLoop.makePromise(of: Void.self) - self.channel.write(HandlerTask.extendedQuery(context), promise: writePromise) - writePromise.futureResult.cascadeFailure(to: promise) + self.write(HandlerTask.extendedQuery(context), cascadingFailureTo: promise) return promise.futureResult } @@ -271,9 +265,7 @@ public final class PostgresConnection: @unchecked Sendable { let promise = self.channel.eventLoop.makePromise(of: Void.self) let context = CloseCommandContext(target: target, logger: logger, promise: promise) - let writePromise = self.channel.eventLoop.makePromise(of: Void.self) - self.channel.write(HandlerTask.closeCommand(context), promise: writePromise) - writePromise.futureResult.cascadeFailure(to: promise) + self.write(HandlerTask.closeCommand(context), cascadingFailureTo: promise) return promise.futureResult } @@ -437,9 +429,7 @@ extension PostgresConnection { promise: promise ) - let writePromise = self.channel.eventLoop.makePromise(of: Void.self) - self.channel.write(HandlerTask.extendedQuery(context), promise: writePromise) - writePromise.futureResult.cascadeFailure(to: promise) + self.write(HandlerTask.extendedQuery(context), cascadingFailureTo: promise) do { return try await promise.futureResult.map({ $0.asyncSequence() }).get() @@ -498,9 +488,7 @@ extension PostgresConnection { promise: promise )) - let writePromise = self.channel.eventLoop.makePromise(of: Void.self) - self.channel.write(task, promise: writePromise) - writePromise.futureResult.cascadeFailure(to: promise) + self.write(task, cascadingFailureTo: promise) do { return try await promise.futureResult @@ -537,9 +525,7 @@ extension PostgresConnection { promise: promise )) - let writePromise = self.channel.eventLoop.makePromise(of: Void.self) - self.channel.write(task, promise: writePromise) - writePromise.futureResult.cascadeFailure(to: promise) + self.write(task, cascadingFailureTo: promise) do { return try await promise.futureResult @@ -555,6 +541,12 @@ extension PostgresConnection { throw error // rethrow with more metadata } } + + private func write(_ any: T, cascadingFailureTo promise: EventLoopPromise) { + let writePromise = self.channel.eventLoop.makePromise(of: Void.self) + self.channel.write(any, promise: writePromise) + writePromise.futureResult.cascadeFailure(to: promise) + } } // MARK: EventLoopFuture interface @@ -738,10 +730,7 @@ extension PostgresConnection { closure: notificationHandler ) - let task = HandlerTask.startListening(listener) - let writePromise = self.channel.eventLoop.makePromise(of: Void.self) - self.channel.write(task, promise: writePromise) - writePromise.futureResult.cascadeFailure(to: listenContext.promise) + self.write(HandlerTask.startListening(listener), cascadingFailureTo: listenContext.promise) listenContext.future.whenComplete { _ in let task = HandlerTask.cancelListening(channel, id) @@ -788,3 +777,4 @@ extension PostgresConnection { #endif } } + From 9ff54d9346ab1f04dceecc73f018d95569a12feb Mon Sep 17 00:00:00 2001 From: MahdiBM Date: Thu, 20 Jun 2024 18:41:59 +0330 Subject: [PATCH 11/25] better `write` function accepting `HandlerTask`, not a generic --- .../Connection/PostgresConnection.swift | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/Sources/PostgresNIO/Connection/PostgresConnection.swift b/Sources/PostgresNIO/Connection/PostgresConnection.swift index 815f993e..428ad5ff 100644 --- a/Sources/PostgresNIO/Connection/PostgresConnection.swift +++ b/Sources/PostgresNIO/Connection/PostgresConnection.swift @@ -222,7 +222,7 @@ public final class PostgresConnection: @unchecked Sendable { promise: promise ) - self.write(HandlerTask.extendedQuery(context), cascadingFailureTo: promise) + self.write(.extendedQuery(context), cascadingFailureTo: promise) return promise.futureResult } @@ -239,7 +239,7 @@ public final class PostgresConnection: @unchecked Sendable { promise: promise ) - self.write(HandlerTask.extendedQuery(context), cascadingFailureTo: promise) + self.write(.extendedQuery(context), cascadingFailureTo: promise) return promise.futureResult.map { rowDescription in PSQLPreparedStatement(name: name, query: query, connection: self, rowDescription: rowDescription) @@ -256,7 +256,7 @@ public final class PostgresConnection: @unchecked Sendable { logger: logger, promise: promise) - self.write(HandlerTask.extendedQuery(context), cascadingFailureTo: promise) + self.write(.extendedQuery(context), cascadingFailureTo: promise) return promise.futureResult } @@ -265,7 +265,7 @@ public final class PostgresConnection: @unchecked Sendable { let promise = self.channel.eventLoop.makePromise(of: Void.self) let context = CloseCommandContext(target: target, logger: logger, promise: promise) - self.write(HandlerTask.closeCommand(context), cascadingFailureTo: promise) + self.write(.closeCommand(context), cascadingFailureTo: promise) return promise.futureResult } @@ -429,7 +429,7 @@ extension PostgresConnection { promise: promise ) - self.write(HandlerTask.extendedQuery(context), cascadingFailureTo: promise) + self.write(.extendedQuery(context), cascadingFailureTo: promise) do { return try await promise.futureResult.map({ $0.asyncSequence() }).get() @@ -542,7 +542,7 @@ extension PostgresConnection { } } - private func write(_ any: T, cascadingFailureTo promise: EventLoopPromise) { + private func write(_ task: HandlerTask, cascadingFailureTo promise: EventLoopPromise) { let writePromise = self.channel.eventLoop.makePromise(of: Void.self) self.channel.write(any, promise: writePromise) writePromise.futureResult.cascadeFailure(to: promise) @@ -730,7 +730,7 @@ extension PostgresConnection { closure: notificationHandler ) - self.write(HandlerTask.startListening(listener), cascadingFailureTo: listenContext.promise) + self.write(.startListening(listener), cascadingFailureTo: listenContext.promise) listenContext.future.whenComplete { _ in let task = HandlerTask.cancelListening(channel, id) From fe8f9fe79819834c57e7c54909f3f0820dbc531d Mon Sep 17 00:00:00 2001 From: MahdiBM Date: Thu, 20 Jun 2024 18:42:28 +0330 Subject: [PATCH 12/25] better test, move the test to `PostgresConnectionTests` --- .../PSQLIntegrationTests.swift | 19 ------------------- .../New/PostgresConnectionTests.swift | 15 +++++++++++++++ 2 files changed, 15 insertions(+), 19 deletions(-) diff --git a/Tests/IntegrationTests/PSQLIntegrationTests.swift b/Tests/IntegrationTests/PSQLIntegrationTests.swift index 4d0d6d82..913d91b2 100644 --- a/Tests/IntegrationTests/PSQLIntegrationTests.swift +++ b/Tests/IntegrationTests/PSQLIntegrationTests.swift @@ -359,23 +359,4 @@ final class IntegrationTests: XCTestCase { XCTAssertEqual(obj?.bar, 2) } } - - func testConnectionClosureMidQueryDoesNotHang() async throws { - _ = await withThrowingTaskGroup(of: Void.self) { taskGroup in - for _ in (0 ..< 1_000) { - taskGroup.addTask { - let conn = try await PostgresConnection.test( - on: NIOSingletons.posixEventLoopGroup.next() - ).get() - - async let close: () = conn.closeGracefully() - async let query = conn.query("SELECT 1", logger: .psqlTest) - - _ = try await (close, query) - } - } - - // Ignore failures - } - } } diff --git a/Tests/PostgresNIOTests/New/PostgresConnectionTests.swift b/Tests/PostgresNIOTests/New/PostgresConnectionTests.swift index 0bc61efd..1b6639e8 100644 --- a/Tests/PostgresNIOTests/New/PostgresConnectionTests.swift +++ b/Tests/PostgresNIOTests/New/PostgresConnectionTests.swift @@ -669,6 +669,21 @@ class PostgresConnectionTests: XCTestCase { return (connection, channel) } + + func testWeFailRequestsIfConnectionWasClosedBefore() async throws { + let (connection, channel) = try await self.makeTestConnectionWithAsyncTestingChannel() + + try await connection.closeGracefully() + + XCTAssertEqual(channel.isActive, false) + + do { + _ = try await connection.query("SELECT version;", logger: self.logger) + XCTFail("Expected to fail") + } catch let error as ChannelError { + XCTAssertEqual(error, .ioOnClosedChannel) + } + } } extension NIOAsyncTestingChannel { From 2a43b881abce36da95ef26134ba2fd8550998289 Mon Sep 17 00:00:00 2001 From: MahdiBM Date: Thu, 20 Jun 2024 18:46:44 +0330 Subject: [PATCH 13/25] move/rename test --- .../New/PostgresConnectionTests.swift | 30 +++++++++---------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/Tests/PostgresNIOTests/New/PostgresConnectionTests.swift b/Tests/PostgresNIOTests/New/PostgresConnectionTests.swift index 1b6639e8..42777a98 100644 --- a/Tests/PostgresNIOTests/New/PostgresConnectionTests.swift +++ b/Tests/PostgresNIOTests/New/PostgresConnectionTests.swift @@ -638,6 +638,21 @@ class PostgresConnectionTests: XCTestCase { } } + func testQueriesFailIfConnectionIsClosed() async throws { + let (connection, channel) = try await self.makeTestConnectionWithAsyncTestingChannel() + + try await connection.closeGracefully() + + XCTAssertEqual(channel.isActive, false) + + do { + _ = try await connection.query("SELECT version;", logger: self.logger) + XCTFail("Expected to fail") + } catch let error as ChannelError { + XCTAssertEqual(error, .ioOnClosedChannel) + } + } + func makeTestConnectionWithAsyncTestingChannel() async throws -> (PostgresConnection, NIOAsyncTestingChannel) { let eventLoop = NIOAsyncTestingEventLoop() let channel = await NIOAsyncTestingChannel(handlers: [ @@ -669,21 +684,6 @@ class PostgresConnectionTests: XCTestCase { return (connection, channel) } - - func testWeFailRequestsIfConnectionWasClosedBefore() async throws { - let (connection, channel) = try await self.makeTestConnectionWithAsyncTestingChannel() - - try await connection.closeGracefully() - - XCTAssertEqual(channel.isActive, false) - - do { - _ = try await connection.query("SELECT version;", logger: self.logger) - XCTFail("Expected to fail") - } catch let error as ChannelError { - XCTAssertEqual(error, .ioOnClosedChannel) - } - } } extension NIOAsyncTestingChannel { From e41c922900ffd0a2b1bd2194281269e8615e68c6 Mon Sep 17 00:00:00 2001 From: MahdiBM Date: Thu, 20 Jun 2024 18:48:35 +0330 Subject: [PATCH 14/25] [build fix] forgot to change the passed parameter --- Sources/PostgresNIO/Connection/PostgresConnection.swift | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Sources/PostgresNIO/Connection/PostgresConnection.swift b/Sources/PostgresNIO/Connection/PostgresConnection.swift index 428ad5ff..0909203e 100644 --- a/Sources/PostgresNIO/Connection/PostgresConnection.swift +++ b/Sources/PostgresNIO/Connection/PostgresConnection.swift @@ -544,7 +544,7 @@ extension PostgresConnection { private func write(_ task: HandlerTask, cascadingFailureTo promise: EventLoopPromise) { let writePromise = self.channel.eventLoop.makePromise(of: Void.self) - self.channel.write(any, promise: writePromise) + self.channel.write(task, promise: writePromise) writePromise.futureResult.cascadeFailure(to: promise) } } From 607f26e4f8df74829fa6b5353104da96c40670fa Mon Sep 17 00:00:00 2001 From: MahdiBM Date: Thu, 20 Jun 2024 18:49:39 +0330 Subject: [PATCH 15/25] add a test for listen --- .../New/PostgresConnectionTests.swift | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/Tests/PostgresNIOTests/New/PostgresConnectionTests.swift b/Tests/PostgresNIOTests/New/PostgresConnectionTests.swift index 42777a98..2db1c320 100644 --- a/Tests/PostgresNIOTests/New/PostgresConnectionTests.swift +++ b/Tests/PostgresNIOTests/New/PostgresConnectionTests.swift @@ -653,6 +653,21 @@ class PostgresConnectionTests: XCTestCase { } } + func testListenFailsIfConnectionIsClosed() async throws { + let (connection, channel) = try await self.makeTestConnectionWithAsyncTestingChannel() + + try await connection.closeGracefully() + + XCTAssertEqual(channel.isActive, false) + + do { + _ = try await connection.listen("test_channel") + XCTFail("Expected to fail") + } catch let error as ChannelError { + XCTAssertEqual(error, .ioOnClosedChannel) + } + } + func makeTestConnectionWithAsyncTestingChannel() async throws -> (PostgresConnection, NIOAsyncTestingChannel) { let eventLoop = NIOAsyncTestingEventLoop() let channel = await NIOAsyncTestingChannel(handlers: [ From f00099b9aef602a27043a185be47bbc245ca0fb1 Mon Sep 17 00:00:00 2001 From: MahdiBM Date: Thu, 20 Jun 2024 19:11:12 +0330 Subject: [PATCH 16/25] add another test for mid-way listens + fix the code --- .../Connection/PostgresConnection.swift | 1 + .../ListenStateMachine.swift | 4 +-- .../New/PostgresConnectionTests.swift | 27 +++++++++++++++++++ 3 files changed, 30 insertions(+), 2 deletions(-) diff --git a/Sources/PostgresNIO/Connection/PostgresConnection.swift b/Sources/PostgresNIO/Connection/PostgresConnection.swift index 0909203e..c780a454 100644 --- a/Sources/PostgresNIO/Connection/PostgresConnection.swift +++ b/Sources/PostgresNIO/Connection/PostgresConnection.swift @@ -461,6 +461,7 @@ extension PostgresConnection { let writePromise = self.channel.eventLoop.makePromise(of: Void.self) self.channel.write(task, promise: writePromise) writePromise.futureResult.whenFailure { error in + listener.failed(error) continuation.resume(throwing: error) } } diff --git a/Sources/PostgresNIO/New/Connection State Machine/ListenStateMachine.swift b/Sources/PostgresNIO/New/Connection State Machine/ListenStateMachine.swift index 89f40469..275c8a5f 100644 --- a/Sources/PostgresNIO/New/Connection State Machine/ListenStateMachine.swift +++ b/Sources/PostgresNIO/New/Connection State Machine/ListenStateMachine.swift @@ -224,12 +224,12 @@ extension ListenStateMachine { mutating func fail(_ error: Error) -> FailAction { switch self.state { case .initialized: - fatalError("Invalid state: \(self.state)") + return .none case .starting(let listeners), .listening(let listeners), .stopping(let listeners): self.state = .failed(error) return .failListeners(listeners.values) - + case .failed: return .none } diff --git a/Tests/PostgresNIOTests/New/PostgresConnectionTests.swift b/Tests/PostgresNIOTests/New/PostgresConnectionTests.swift index 2db1c320..65fd3063 100644 --- a/Tests/PostgresNIOTests/New/PostgresConnectionTests.swift +++ b/Tests/PostgresNIOTests/New/PostgresConnectionTests.swift @@ -668,6 +668,33 @@ class PostgresConnectionTests: XCTestCase { } } + func testListenFailsIfConnectionIsClosedMidway() async throws { + let (connection, channel) = try await self.makeTestConnectionWithAsyncTestingChannel() + + try await withThrowingTaskGroup(of: Void.self) { taskGroup in + taskGroup.addTask { + do { + let listenSequence = try await connection.listen("test_channel") + + for try await _ in listenSequence { + XCTFail("Expected to fail") + } + } catch let error as PSQLError { + XCTAssertEqual(error.code, .listenFailed) + } + } + + taskGroup.addTask { + try await Task.sleep(for: .seconds(3)) + + try await connection.close().get() + XCTAssertEqual(channel.isActive, false) + } + + try await taskGroup.waitForAll() + } + } + func makeTestConnectionWithAsyncTestingChannel() async throws -> (PostgresConnection, NIOAsyncTestingChannel) { let eventLoop = NIOAsyncTestingEventLoop() let channel = await NIOAsyncTestingChannel(handlers: [ From c43e781520f8a04fb3b2b7180250ebcebd56a37d Mon Sep 17 00:00:00 2001 From: MahdiBM Date: Thu, 20 Jun 2024 19:17:20 +0330 Subject: [PATCH 17/25] spaces --- .../New/Connection State Machine/ListenStateMachine.swift | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Sources/PostgresNIO/New/Connection State Machine/ListenStateMachine.swift b/Sources/PostgresNIO/New/Connection State Machine/ListenStateMachine.swift index 275c8a5f..2cc446c5 100644 --- a/Sources/PostgresNIO/New/Connection State Machine/ListenStateMachine.swift +++ b/Sources/PostgresNIO/New/Connection State Machine/ListenStateMachine.swift @@ -229,7 +229,7 @@ extension ListenStateMachine { case .starting(let listeners), .listening(let listeners), .stopping(let listeners): self.state = .failed(error) return .failListeners(listeners.values) - + case .failed: return .none } From 8fe12f51f4877d4ab4d3148cac417ca8888c0556 Mon Sep 17 00:00:00 2001 From: MahdiBM Date: Thu, 20 Jun 2024 19:25:02 +0330 Subject: [PATCH 18/25] fix for resuming continuation multiple times --- Sources/PostgresNIO/Connection/PostgresConnection.swift | 1 - 1 file changed, 1 deletion(-) diff --git a/Sources/PostgresNIO/Connection/PostgresConnection.swift b/Sources/PostgresNIO/Connection/PostgresConnection.swift index c780a454..a6efcfdf 100644 --- a/Sources/PostgresNIO/Connection/PostgresConnection.swift +++ b/Sources/PostgresNIO/Connection/PostgresConnection.swift @@ -462,7 +462,6 @@ extension PostgresConnection { self.channel.write(task, promise: writePromise) writePromise.futureResult.whenFailure { error in listener.failed(error) - continuation.resume(throwing: error) } } } onCancel: { From 843d6a0f1bf6ef03693147fa186869ac76820b2c Mon Sep 17 00:00:00 2001 From: MahdiBM Date: Thu, 20 Jun 2024 19:38:05 +0330 Subject: [PATCH 19/25] use backward-compatible `Task.sleep()` --- Tests/PostgresNIOTests/New/PostgresConnectionTests.swift | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Tests/PostgresNIOTests/New/PostgresConnectionTests.swift b/Tests/PostgresNIOTests/New/PostgresConnectionTests.swift index 65fd3063..785278ff 100644 --- a/Tests/PostgresNIOTests/New/PostgresConnectionTests.swift +++ b/Tests/PostgresNIOTests/New/PostgresConnectionTests.swift @@ -685,7 +685,7 @@ class PostgresConnectionTests: XCTestCase { } taskGroup.addTask { - try await Task.sleep(for: .seconds(3)) + try await Task.sleep(nanoseconds: 3_000_000_000) try await connection.close().get() XCTAssertEqual(channel.isActive, false) From 3e56e496245d224932fb3e7af56692109edfb125 Mon Sep 17 00:00:00 2001 From: MahdiBM Date: Sat, 22 Jun 2024 13:45:06 +0330 Subject: [PATCH 20/25] add a listen test that does hang --- .../ListenStateMachine.swift | 2 +- .../New/PostgresConnectionTests.swift | 41 +++++++++++++------ 2 files changed, 29 insertions(+), 14 deletions(-) diff --git a/Sources/PostgresNIO/New/Connection State Machine/ListenStateMachine.swift b/Sources/PostgresNIO/New/Connection State Machine/ListenStateMachine.swift index 2cc446c5..89f40469 100644 --- a/Sources/PostgresNIO/New/Connection State Machine/ListenStateMachine.swift +++ b/Sources/PostgresNIO/New/Connection State Machine/ListenStateMachine.swift @@ -224,7 +224,7 @@ extension ListenStateMachine { mutating func fail(_ error: Error) -> FailAction { switch self.state { case .initialized: - return .none + fatalError("Invalid state: \(self.state)") case .starting(let listeners), .listening(let listeners), .stopping(let listeners): self.state = .failed(error) diff --git a/Tests/PostgresNIOTests/New/PostgresConnectionTests.swift b/Tests/PostgresNIOTests/New/PostgresConnectionTests.swift index 785278ff..69f3b045 100644 --- a/Tests/PostgresNIOTests/New/PostgresConnectionTests.swift +++ b/Tests/PostgresNIOTests/New/PostgresConnectionTests.swift @@ -638,7 +638,7 @@ class PostgresConnectionTests: XCTestCase { } } - func testQueriesFailIfConnectionIsClosed() async throws { + func testPostgresQueryQueriesFailIfConnectionIsClosed() async throws { let (connection, channel) = try await self.makeTestConnectionWithAsyncTestingChannel() try await connection.closeGracefully() @@ -673,25 +673,40 @@ class PostgresConnectionTests: XCTestCase { try await withThrowingTaskGroup(of: Void.self) { taskGroup in taskGroup.addTask { + let events = try await connection.listen("foo") + var iterator = events.makeAsyncIterator() + let first = try await iterator.next() + XCTAssertEqual(first?.payload, "wooohooo") do { - let listenSequence = try await connection.listen("test_channel") - - for try await _ in listenSequence { - XCTFail("Expected to fail") - } + _ = try await iterator.next() + XCTFail("Did not expect to not throw") } catch let error as PSQLError { - XCTAssertEqual(error.code, .listenFailed) + XCTAssertEqual(error.code, .clientClosedConnection) } } - taskGroup.addTask { - try await Task.sleep(nanoseconds: 3_000_000_000) + let listenMessage = try await channel.waitForUnpreparedRequest() + XCTAssertEqual(listenMessage.parse.query, #"LISTEN "foo";"#) - try await connection.close().get() - XCTAssertEqual(channel.isActive, false) - } + try await channel.writeInbound(PostgresBackendMessage.parseComplete) + try await channel.writeInbound(PostgresBackendMessage.parameterDescription(.init(dataTypes: []))) + try await channel.writeInbound(PostgresBackendMessage.noData) + try await channel.writeInbound(PostgresBackendMessage.bindComplete) + try await channel.writeInbound(PostgresBackendMessage.commandComplete("LISTEN")) + try await channel.writeInbound(PostgresBackendMessage.readyForQuery(.idle)) - try await taskGroup.waitForAll() + try await channel.writeInbound(PostgresBackendMessage.notification(.init(backendPID: 12, channel: "foo", payload: "wooohooo"))) + + try await connection.closeGracefully() + + XCTAssertEqual(channel.isActive, false) + + switch await taskGroup.nextResult()! { + case .success: + break + case .failure(let failure): + XCTFail("Unexpected error: \(failure)") + } } } From 552c9131b1e992f888c3b0fab402fc5d70124ee6 Mon Sep 17 00:00:00 2001 From: MahdiBM Date: Sat, 22 Jun 2024 13:48:45 +0330 Subject: [PATCH 21/25] aesthetics --- .../New/PostgresConnectionTests.swift | 114 +++++++++--------- 1 file changed, 57 insertions(+), 57 deletions(-) diff --git a/Tests/PostgresNIOTests/New/PostgresConnectionTests.swift b/Tests/PostgresNIOTests/New/PostgresConnectionTests.swift index 69f3b045..bb312f28 100644 --- a/Tests/PostgresNIOTests/New/PostgresConnectionTests.swift +++ b/Tests/PostgresNIOTests/New/PostgresConnectionTests.swift @@ -224,6 +224,63 @@ class PostgresConnectionTests: XCTestCase { } } + func testSimpleListenFailsIfConnectionIsClosed() async throws { + let (connection, channel) = try await self.makeTestConnectionWithAsyncTestingChannel() + + try await connection.closeGracefully() + + XCTAssertEqual(channel.isActive, false) + + do { + _ = try await connection.listen("test_channel") + XCTFail("Expected to fail") + } catch let error as ChannelError { + XCTAssertEqual(error, .ioOnClosedChannel) + } + } + + func testSimpleListenFailsIfConnectionIsClosedWhileListening() async throws { + let (connection, channel) = try await self.makeTestConnectionWithAsyncTestingChannel() + + try await withThrowingTaskGroup(of: Void.self) { taskGroup in + taskGroup.addTask { + let events = try await connection.listen("foo") + var iterator = events.makeAsyncIterator() + let first = try await iterator.next() + XCTAssertEqual(first?.payload, "wooohooo") + do { + _ = try await iterator.next() + XCTFail("Did not expect to not throw") + } catch let error as PSQLError { + XCTAssertEqual(error.code, .clientClosedConnection) + } + } + + let listenMessage = try await channel.waitForUnpreparedRequest() + XCTAssertEqual(listenMessage.parse.query, #"LISTEN "foo";"#) + + try await channel.writeInbound(PostgresBackendMessage.parseComplete) + try await channel.writeInbound(PostgresBackendMessage.parameterDescription(.init(dataTypes: []))) + try await channel.writeInbound(PostgresBackendMessage.noData) + try await channel.writeInbound(PostgresBackendMessage.bindComplete) + try await channel.writeInbound(PostgresBackendMessage.commandComplete("LISTEN")) + try await channel.writeInbound(PostgresBackendMessage.readyForQuery(.idle)) + + try await channel.writeInbound(PostgresBackendMessage.notification(.init(backendPID: 12, channel: "foo", payload: "wooohooo"))) + + try await connection.closeGracefully() + + XCTAssertEqual(channel.isActive, false) + + switch await taskGroup.nextResult()! { + case .success: + break + case .failure(let failure): + XCTFail("Unexpected error: \(failure)") + } + } + } + func testCloseGracefullyClosesWhenInternalQueueIsEmpty() async throws { let (connection, channel) = try await self.makeTestConnectionWithAsyncTestingChannel() try await withThrowingTaskGroup(of: Void.self) { [logger] taskGroup async throws -> () in @@ -653,63 +710,6 @@ class PostgresConnectionTests: XCTestCase { } } - func testListenFailsIfConnectionIsClosed() async throws { - let (connection, channel) = try await self.makeTestConnectionWithAsyncTestingChannel() - - try await connection.closeGracefully() - - XCTAssertEqual(channel.isActive, false) - - do { - _ = try await connection.listen("test_channel") - XCTFail("Expected to fail") - } catch let error as ChannelError { - XCTAssertEqual(error, .ioOnClosedChannel) - } - } - - func testListenFailsIfConnectionIsClosedMidway() async throws { - let (connection, channel) = try await self.makeTestConnectionWithAsyncTestingChannel() - - try await withThrowingTaskGroup(of: Void.self) { taskGroup in - taskGroup.addTask { - let events = try await connection.listen("foo") - var iterator = events.makeAsyncIterator() - let first = try await iterator.next() - XCTAssertEqual(first?.payload, "wooohooo") - do { - _ = try await iterator.next() - XCTFail("Did not expect to not throw") - } catch let error as PSQLError { - XCTAssertEqual(error.code, .clientClosedConnection) - } - } - - let listenMessage = try await channel.waitForUnpreparedRequest() - XCTAssertEqual(listenMessage.parse.query, #"LISTEN "foo";"#) - - try await channel.writeInbound(PostgresBackendMessage.parseComplete) - try await channel.writeInbound(PostgresBackendMessage.parameterDescription(.init(dataTypes: []))) - try await channel.writeInbound(PostgresBackendMessage.noData) - try await channel.writeInbound(PostgresBackendMessage.bindComplete) - try await channel.writeInbound(PostgresBackendMessage.commandComplete("LISTEN")) - try await channel.writeInbound(PostgresBackendMessage.readyForQuery(.idle)) - - try await channel.writeInbound(PostgresBackendMessage.notification(.init(backendPID: 12, channel: "foo", payload: "wooohooo"))) - - try await connection.closeGracefully() - - XCTAssertEqual(channel.isActive, false) - - switch await taskGroup.nextResult()! { - case .success: - break - case .failure(let failure): - XCTFail("Unexpected error: \(failure)") - } - } - } - func makeTestConnectionWithAsyncTestingChannel() async throws -> (PostgresConnection, NIOAsyncTestingChannel) { let eventLoop = NIOAsyncTestingEventLoop() let channel = await NIOAsyncTestingChannel(handlers: [ From c97e127c26bdcb341c1620f3319798f5afaed034 Mon Sep 17 00:00:00 2001 From: MahdiBM Date: Sat, 22 Jun 2024 14:03:36 +0330 Subject: [PATCH 22/25] fix the hanging test: was a testing issue, not code --- Tests/PostgresNIOTests/New/PostgresConnectionTests.swift | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Tests/PostgresNIOTests/New/PostgresConnectionTests.swift b/Tests/PostgresNIOTests/New/PostgresConnectionTests.swift index bb312f28..e6ae2795 100644 --- a/Tests/PostgresNIOTests/New/PostgresConnectionTests.swift +++ b/Tests/PostgresNIOTests/New/PostgresConnectionTests.swift @@ -268,7 +268,7 @@ class PostgresConnectionTests: XCTestCase { try await channel.writeInbound(PostgresBackendMessage.notification(.init(backendPID: 12, channel: "foo", payload: "wooohooo"))) - try await connection.closeGracefully() + try await connection.close().get() XCTAssertEqual(channel.isActive, false) From 138aea3006acf9e8d368727a0fa9bdf0ffcc492c Mon Sep 17 00:00:00 2001 From: MahdiBM Date: Sat, 22 Jun 2024 14:16:19 +0330 Subject: [PATCH 23/25] add test for prepareStatement --- .../New/PostgresConnectionTests.swift | 21 ++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/Tests/PostgresNIOTests/New/PostgresConnectionTests.swift b/Tests/PostgresNIOTests/New/PostgresConnectionTests.swift index e6ae2795..fe4ca7a3 100644 --- a/Tests/PostgresNIOTests/New/PostgresConnectionTests.swift +++ b/Tests/PostgresNIOTests/New/PostgresConnectionTests.swift @@ -695,7 +695,7 @@ class PostgresConnectionTests: XCTestCase { } } - func testPostgresQueryQueriesFailIfConnectionIsClosed() async throws { + func testQueryFailsIfConnectionIsClosed() async throws { let (connection, channel) = try await self.makeTestConnectionWithAsyncTestingChannel() try await connection.closeGracefully() @@ -710,6 +710,25 @@ class PostgresConnectionTests: XCTestCase { } } + func testPrepareStatementQueryFailsIfConnectionIsClosed() async throws { + let (connection, channel) = try await self.makeTestConnectionWithAsyncTestingChannel() + + try await connection.closeGracefully() + + XCTAssertEqual(channel.isActive, false) + + do { + _ = try await connection.prepareStatement( + "SELECT version;", + with: "test_query", + logger: .psqlTest + ).get() + XCTFail("Expected to fail") + } catch let error as ChannelError { + XCTAssertEqual(error, .ioOnClosedChannel) + } + } + func makeTestConnectionWithAsyncTestingChannel() async throws -> (PostgresConnection, NIOAsyncTestingChannel) { let eventLoop = NIOAsyncTestingEventLoop() let channel = await NIOAsyncTestingChannel(handlers: [ From 22db6e4e8747d261a5cce81acca44ecdb7a5cf71 Mon Sep 17 00:00:00 2001 From: MahdiBM Date: Sun, 23 Jun 2024 00:23:24 +0330 Subject: [PATCH 24/25] add execute function tests --- .../New/PostgresConnectionTests.swift | 93 +++++++++++++++++-- 1 file changed, 87 insertions(+), 6 deletions(-) diff --git a/Tests/PostgresNIOTests/New/PostgresConnectionTests.swift b/Tests/PostgresNIOTests/New/PostgresConnectionTests.swift index fe4ca7a3..6610f046 100644 --- a/Tests/PostgresNIOTests/New/PostgresConnectionTests.swift +++ b/Tests/PostgresNIOTests/New/PostgresConnectionTests.swift @@ -710,7 +710,7 @@ class PostgresConnectionTests: XCTestCase { } } - func testPrepareStatementQueryFailsIfConnectionIsClosed() async throws { + func testPrepareStatementFailsIfConnectionIsClosed() async throws { let (connection, channel) = try await self.makeTestConnectionWithAsyncTestingChannel() try await connection.closeGracefully() @@ -718,17 +718,98 @@ class PostgresConnectionTests: XCTestCase { XCTAssertEqual(channel.isActive, false) do { - _ = try await connection.prepareStatement( - "SELECT version;", - with: "test_query", - logger: .psqlTest - ).get() + _ = try await connection.prepareStatement("SELECT version;", with: "test_query", logger: .psqlTest).get() XCTFail("Expected to fail") } catch let error as ChannelError { XCTAssertEqual(error, .ioOnClosedChannel) } } + func testExecuteFailsIfConnectionIsClosed() async throws { + let (connection, channel) = try await self.makeTestConnectionWithAsyncTestingChannel() + + try await connection.closeGracefully() + + XCTAssertEqual(channel.isActive, false) + + do { + let statement = PSQLExecuteStatement(name: "SELECT version;", binds: .init(), rowDescription: nil) + _ = try await connection.execute(statement, logger: .psqlTest).get() + XCTFail("Expected to fail") + } catch let error as ChannelError { + XCTAssertEqual(error, .ioOnClosedChannel) + } + } + + func testExecutePreparedStatementFailsIfConnectionIsClosed() async throws { + let (connection, channel) = try await self.makeTestConnectionWithAsyncTestingChannel() + + try await connection.closeGracefully() + + XCTAssertEqual(channel.isActive, false) + + struct TestPreparedStatement: PostgresPreparedStatement { + static let sql = "SELECT pid, datname FROM pg_stat_activity WHERE state = $1" + typealias Row = (Int, String) + + var state: String + + func makeBindings() -> PostgresBindings { + var bindings = PostgresBindings() + bindings.append(self.state) + return bindings + } + + func decodeRow(_ row: PostgresNIO.PostgresRow) throws -> Row { + try row.decode(Row.self) + } + } + + do { + let preparedStatement = TestPreparedStatement(state: "active") + _ = try await connection.execute(preparedStatement, logger: .psqlTest) + XCTFail("Expected to fail") + } catch let error as ChannelError { + XCTAssertEqual(error, .ioOnClosedChannel) + } + } + + func testExecutePreparedStatementWithVoidRowFailsIfConnectionIsClosed() async throws { + let (connection, channel) = try await self.makeTestConnectionWithAsyncTestingChannel() + + try await connection.closeGracefully() + + XCTAssertEqual(channel.isActive, false) + + struct TestPreparedStatement: PostgresPreparedStatement { + static let sql = "SELECT * FROM pg_stat_activity WHERE state = $1" + typealias Row = () + + var state: String + + func makeBindings() -> PostgresBindings { + var bindings = PostgresBindings() + bindings.append(self.state) + return bindings + } + + func decodeRow(_ row: PostgresNIO.PostgresRow) throws -> Row { + () + } + } + + do { + let preparedStatement = TestPreparedStatement(state: "active") + _ = try await connection.execute(preparedStatement, logger: .psqlTest) + XCTFail("Expected to fail") + } catch let error as ChannelError { + XCTAssertEqual(error, .ioOnClosedChannel) + } + } + + func testAddListenerFailsIfConnectionIsClosed() async throws { + } + func makeTestConnectionWithAsyncTestingChannel() async throws -> (PostgresConnection, NIOAsyncTestingChannel) { let eventLoop = NIOAsyncTestingEventLoop() let channel = await NIOAsyncTestingChannel(handlers: [ From ceaa68880e0aaf3509a944f7e3dcf6948f0dab8f Mon Sep 17 00:00:00 2001 From: MahdiBM Date: Sun, 23 Jun 2024 01:12:16 +0330 Subject: [PATCH 25/25] minor --- Tests/PostgresNIOTests/New/PostgresConnectionTests.swift | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/Tests/PostgresNIOTests/New/PostgresConnectionTests.swift b/Tests/PostgresNIOTests/New/PostgresConnectionTests.swift index 6610f046..5c7d4c83 100644 --- a/Tests/PostgresNIOTests/New/PostgresConnectionTests.swift +++ b/Tests/PostgresNIOTests/New/PostgresConnectionTests.swift @@ -268,7 +268,7 @@ class PostgresConnectionTests: XCTestCase { try await channel.writeInbound(PostgresBackendMessage.notification(.init(backendPID: 12, channel: "foo", payload: "wooohooo"))) - try await connection.close().get() + try await connection.close() XCTAssertEqual(channel.isActive, false) @@ -807,9 +807,6 @@ class PostgresConnectionTests: XCTestCase { } } - func testAddListenerFailsIfConnectionIsClosed() async throws { - } - func makeTestConnectionWithAsyncTestingChannel() async throws -> (PostgresConnection, NIOAsyncTestingChannel) { let eventLoop = NIOAsyncTestingEventLoop() let channel = await NIOAsyncTestingChannel(handlers: [