Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 86 additions & 6 deletions Sources/AWSLambdaRuntime/HTTPClient/LambdaRuntimeClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -265,9 +265,55 @@ final actor LambdaRuntimeClient: LambdaRuntimeClientProtocol {
}

private func channelClosed(_ channel: any Channel) {
// Check if this is an old channel that we're already tracking as closed
// This handles the race condition where:
// 1. connectionWillClose() is called, adding the channel to closingConnections
// 2. A new connection is established (connectionState = .connected with new channel)
// 3. The old channel's closeFuture fires (closingState might be .closed)
// 4. We receive channelClosed() for the OLD channel while NEW channel is connected
if self.closingConnections.contains(where: { $0 === channel }) {
// If this channel is still the currently connected channel, let the main
// state-handling logic below run instead of treating it as an old channel.
if case .connected(let stateChannel, _) = self.connectionState, channel === stateChannel {
// Remove from tracking and fall through to the main switch statement
if let index = self.closingConnections.firstIndex(where: { $0 === channel }) {
self.closingConnections.remove(at: index)
}
} else {
// This is an old channel that's finishing its close operation
if let index = self.closingConnections.firstIndex(where: { $0 === channel }) {
self.closingConnections.remove(at: index)
}

// If we're in closing state and all connections are now closed, complete the close
if case .closing(let continuation) = self.closingState,
self.closingConnections.isEmpty
{
self.closingState = .closed
continuation.resume()
}

self.logger.trace(
"Old channel closed after new connection established",
metadata: ["channel": "\(channel)"]
)
return
}
}

switch (self.connectionState, self.closingState) {
case (_, .closed):
fatalError("Invalid state: \(self.connectionState), \(self.closingState)")
// This should not happen, but if it does, it means we're receiving a close
// notification for a channel after the runtime client has fully closed.
// Log it but don't crash - this could be a legitimate race condition.
self.logger.warning(
"Received channelClosed after closingState is .closed",
metadata: [
"channel": "\(channel)",
"connectionState": "\(self.connectionState)",
]
)
return

case (.disconnected, .notClosing):
if let index = self.closingConnections.firstIndex(where: { $0 === channel }) {
Expand Down Expand Up @@ -298,11 +344,35 @@ final actor LambdaRuntimeClient: LambdaRuntimeClientProtocol {
continuation.resume()
}

case (.connected, .notClosing):
self.connectionState = .disconnected
case (.connected(let currentChannel, _), .notClosing):
// Only transition to disconnected if this is the CURRENT channel closing
if currentChannel === channel {
self.connectionState = .disconnected
} else {
// This is an old channel closing - remove from tracking
if let index = self.closingConnections.firstIndex(where: { $0 === channel }) {
self.closingConnections.remove(at: index)
}

case (.connected, .closing(let continuation)):
self.connectionState = .disconnected
self.logger.trace(
"Old channel closing while new connection is active",
metadata: [
"closingChannel": "\(channel)",
"currentChannel": "\(currentChannel)",
]
)
}

case (.connected(let currentChannel, _), .closing(let continuation)):
// Only transition to disconnected if this is the CURRENT channel closing
if currentChannel === channel {
self.connectionState = .disconnected
} else {
// This is an old channel closing - remove from tracking
if let index = self.closingConnections.firstIndex(where: { $0 === channel }) {
self.closingConnections.remove(at: index)
}
}

if self.closingConnections.isEmpty {
self.closingState = .closed
Expand Down Expand Up @@ -369,7 +439,9 @@ final actor LambdaRuntimeClient: LambdaRuntimeClientProtocol {
self.assumeIsolated { runtimeClient in
// close the channel
runtimeClient.channelClosed(channel)
runtimeClient.connectionState = .disconnected
// Note: Do NOT set connectionState = .disconnected here!
// The channelClosed() method handles state transitions properly,
// checking if this is the current channel or an old one.
}
}

Expand Down Expand Up @@ -431,7 +503,15 @@ extension LambdaRuntimeClient: LambdaChannelHandlerDelegate {

case .connected(let stateChannel, _):
guard channel === stateChannel else {
// This is an old channel closing - add to tracking
isolated.closingConnections.append(channel)
isolated.logger.trace(
"Old channel will close while new connection is active",
metadata: [
"closingChannel": "\(channel)",
"currentChannel": "\(stateChannel)",
]
)
return
}

Expand Down
Loading