Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Fix] Query Hangs if Connection is Closed #487

Merged
merged 25 commits into from
Jun 24, 2024
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
9206b25
Add a test to reproduce the problem
MahdiBM Jun 19, 2024
5ff285b
fail query promise when channel write fails
MahdiBM Jun 19, 2024
3e49ef1
make sure the test succeeds
MahdiBM Jun 19, 2024
c2192a7
do the same for prepared statements query
MahdiBM Jun 19, 2024
a288cc2
minor refinements
MahdiBM Jun 19, 2024
d98944a
more clear query
MahdiBM Jun 19, 2024
00c11b6
more writePromise s catching channel failures
MahdiBM Jun 19, 2024
2ed7946
one more place
MahdiBM Jun 19, 2024
e92b2f0
move query into the loop
MahdiBM Jun 19, 2024
e127327
use a private function to simplify cascading channel failures
MahdiBM Jun 20, 2024
9ff54d9
better `write` function accepting `HandlerTask`, not a generic
MahdiBM Jun 20, 2024
fe8f9fe
better test, move the test to `PostgresConnectionTests`
MahdiBM Jun 20, 2024
2a43b88
move/rename test
MahdiBM Jun 20, 2024
e41c922
[build fix] forgot to change the passed parameter
MahdiBM Jun 20, 2024
607f26e
add a test for listen
MahdiBM Jun 20, 2024
f00099b
add another test for mid-way listens + fix the code
MahdiBM Jun 20, 2024
c43e781
spaces
MahdiBM Jun 20, 2024
8fe12f5
fix for resuming continuation multiple times
MahdiBM Jun 20, 2024
843d6a0
use backward-compatible `Task.sleep()`
MahdiBM Jun 20, 2024
3e56e49
add a listen test that does hang
MahdiBM Jun 22, 2024
552c913
aesthetics
MahdiBM Jun 22, 2024
c97e127
fix the hanging test: was a testing issue, not code
MahdiBM Jun 22, 2024
138aea3
add test for prepareStatement
MahdiBM Jun 22, 2024
22db6e4
add execute function tests
MahdiBM Jun 22, 2024
ceaa688
minor
MahdiBM Jun 22, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 28 additions & 11 deletions Sources/PostgresNIO/Connection/PostgresConnection.swift
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ public final class PostgresConnection: @unchecked Sendable {
promise: promise
)

self.channel.write(HandlerTask.extendedQuery(context), promise: nil)
self.write(.extendedQuery(context), cascadingFailureTo: promise)

return promise.futureResult
}
Expand All @@ -239,7 +239,8 @@ public final class PostgresConnection: @unchecked Sendable {
promise: promise
)

self.channel.write(HandlerTask.extendedQuery(context), promise: nil)
self.write(.extendedQuery(context), cascadingFailureTo: promise)

return promise.futureResult.map { rowDescription in
PSQLPreparedStatement(name: name, query: query, connection: self, rowDescription: rowDescription)
}
Expand All @@ -255,15 +256,17 @@ public final class PostgresConnection: @unchecked Sendable {
logger: logger,
promise: promise)

self.channel.write(HandlerTask.extendedQuery(context), promise: nil)
self.write(.extendedQuery(context), cascadingFailureTo: promise)

return promise.futureResult
}

func close(_ target: CloseTarget, logger: Logger) -> EventLoopFuture<Void> {
let promise = self.channel.eventLoop.makePromise(of: Void.self)
let context = CloseCommandContext(target: target, logger: logger, promise: promise)

self.channel.write(HandlerTask.closeCommand(context), promise: nil)
self.write(.closeCommand(context), cascadingFailureTo: promise)

return promise.futureResult
}

Expand Down Expand Up @@ -426,7 +429,7 @@ extension PostgresConnection {
promise: promise
)

self.channel.write(HandlerTask.extendedQuery(context), promise: nil)
self.write(.extendedQuery(context), cascadingFailureTo: promise)

do {
return try await promise.futureResult.map({ $0.asyncSequence() }).get()
Expand Down Expand Up @@ -455,7 +458,11 @@ extension PostgresConnection {

let task = HandlerTask.startListening(listener)

self.channel.write(task, promise: nil)
let writePromise = self.channel.eventLoop.makePromise(of: Void.self)
self.channel.write(task, promise: writePromise)
writePromise.futureResult.whenFailure { error in
listener.failed(error)
}
}
} onCancel: {
let task = HandlerTask.cancelListening(channel, id)
Expand All @@ -480,7 +487,9 @@ extension PostgresConnection {
logger: logger,
promise: promise
))
self.channel.write(task, promise: nil)

self.write(task, cascadingFailureTo: promise)

do {
return try await promise.futureResult
.map { $0.asyncSequence() }
Expand Down Expand Up @@ -515,7 +524,9 @@ extension PostgresConnection {
logger: logger,
promise: promise
))
self.channel.write(task, promise: nil)

self.write(task, cascadingFailureTo: promise)

do {
return try await promise.futureResult
.map { $0.commandTag }
Expand All @@ -530,6 +541,12 @@ extension PostgresConnection {
throw error // rethrow with more metadata
}
}

private func write<T>(_ task: HandlerTask, cascadingFailureTo promise: EventLoopPromise<T>) {
let writePromise = self.channel.eventLoop.makePromise(of: Void.self)
self.channel.write(task, promise: writePromise)
writePromise.futureResult.cascadeFailure(to: promise)
}
}

// MARK: EventLoopFuture interface
Expand Down Expand Up @@ -674,7 +691,7 @@ internal enum PostgresCommands: PostgresRequest {

/// Context for receiving NotificationResponse messages on a connection, used for PostgreSQL's `LISTEN`/`NOTIFY` support.
public final class PostgresListenContext: Sendable {
private let promise: EventLoopPromise<Void>
let promise: EventLoopPromise<Void>

var future: EventLoopFuture<Void> {
self.promise.futureResult
Expand Down Expand Up @@ -713,8 +730,7 @@ extension PostgresConnection {
closure: notificationHandler
)

let task = HandlerTask.startListening(listener)
self.channel.write(task, promise: nil)
self.write(.startListening(listener), cascadingFailureTo: listenContext.promise)

listenContext.future.whenComplete { _ in
let task = HandlerTask.cancelListening(channel, id)
Expand Down Expand Up @@ -761,3 +777,4 @@ extension PostgresConnection {
#endif
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ extension ListenStateMachine {
mutating func fail(_ error: Error) -> FailAction {
switch self.state {
case .initialized:
fatalError("Invalid state: \(self.state)")
return .none
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See https://github.com/vapor/postgres-nio/pull/487/files#r1647790890.
This is needed otherwise things will crash.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we get this fix into another pr with a dedicated unit test case and explanation why this can happen?

Copy link
Contributor Author

@MahdiBM MahdiBM Jun 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmmm sure. I'll remove the related test/changes from this PR.

EDIT: See #487 (comment)


case .starting(let listeners), .listening(let listeners), .stopping(let listeners):
self.state = .failed(error)
Expand Down
1 change: 0 additions & 1 deletion Tests/IntegrationTests/PSQLIntegrationTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -359,5 +359,4 @@ final class IntegrationTests: XCTestCase {
XCTAssertEqual(obj?.bar, 2)
}
}

}
57 changes: 57 additions & 0 deletions Tests/PostgresNIOTests/New/PostgresConnectionTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -638,6 +638,63 @@ class PostgresConnectionTests: XCTestCase {
}
}

func testQueriesFailIfConnectionIsClosed() async throws {
let (connection, channel) = try await self.makeTestConnectionWithAsyncTestingChannel()

try await connection.closeGracefully()

XCTAssertEqual(channel.isActive, false)

do {
_ = try await connection.query("SELECT version;", logger: self.logger)
XCTFail("Expected to fail")
} catch let error as ChannelError {
XCTAssertEqual(error, .ioOnClosedChannel)
}
}

func testListenFailsIfConnectionIsClosed() async throws {
let (connection, channel) = try await self.makeTestConnectionWithAsyncTestingChannel()

try await connection.closeGracefully()

XCTAssertEqual(channel.isActive, false)

do {
_ = try await connection.listen("test_channel")
XCTFail("Expected to fail")
} catch let error as ChannelError {
XCTAssertEqual(error, .ioOnClosedChannel)
}
}

func testListenFailsIfConnectionIsClosedMidway() async throws {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What connection state do you want to test here? In which connection state does this happen? Can we mock this explicitly and not by accident?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test is consistent, it only waits 3 seconds to make sure the listener has started listening.

It tests that if there is a listen in process, and then the connection is closed, the listener sequence needs to fail.

Copy link
Collaborator

@fabianfett fabianfett Jun 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah! in that case we should wait for the client messages on the channel instead of using a timer.

See here how it is done:

func testSimpleListen() async throws {
let (connection, channel) = try await self.makeTestConnectionWithAsyncTestingChannel()
try await withThrowingTaskGroup(of: Void.self) { taskGroup in
taskGroup.addTask {
let events = try await connection.listen("foo")
for try await event in events {
XCTAssertEqual(event.payload, "wooohooo")
break
}
}
let listenMessage = try await channel.waitForUnpreparedRequest()
XCTAssertEqual(listenMessage.parse.query, #"LISTEN "foo";"#)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the clues, I did this in the new test.
We no longer need to change that fatalError("Invalid state: \(self.state)"), it appears that was triggered as a result of my bad test.

@fabianfett i think i no longer need to make another PR out of this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@fabianfett I think this PR's changes are good, but I noticed this results in a fatal error regardless of my changes:

    func testAddListenerFailsIfConnectionIsClosed() async throws {
        let (connection, channel) = try await self.makeTestConnectionWithAsyncTestingChannel()

        connection.addListener(channel: "example") { context, notification in
            XCTFail("Did not expect to receive")
        }

        try await connection.close()

        XCTAssertEqual(channel.isActive, false)
    }
Screenshot 2024-06-23 at 1 16 11 AM

let (connection, channel) = try await self.makeTestConnectionWithAsyncTestingChannel()

try await withThrowingTaskGroup(of: Void.self) { taskGroup in
taskGroup.addTask {
do {
let listenSequence = try await connection.listen("test_channel")

for try await _ in listenSequence {
XCTFail("Expected to fail")
}
} catch let error as PSQLError {
XCTAssertEqual(error.code, .listenFailed)
}
}

taskGroup.addTask {
try await Task.sleep(for: .seconds(3))

try await connection.close().get()
MahdiBM marked this conversation as resolved.
Show resolved Hide resolved
XCTAssertEqual(channel.isActive, false)
}

try await taskGroup.waitForAll()
}
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@fabianfett This is related to #458?
I noticed this same issue before they file an issue for when i was trying to put a ServiceDiscovery together using PostgresNIO for DistributedActors, but didn't get the chance/mood to file and issue or it before they did.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

revamped this test so this discussion is no longer valid. Though #487 (comment) is related to the crash.


func makeTestConnectionWithAsyncTestingChannel() async throws -> (PostgresConnection, NIOAsyncTestingChannel) {
let eventLoop = NIOAsyncTestingEventLoop()
let channel = await NIOAsyncTestingChannel(handlers: [
Expand Down
Loading