-
Notifications
You must be signed in to change notification settings - Fork 433
Description
Describe the bug
I have a table in which in each cell I send two queries to GRPC using async let
. If I scroll through the table slowly, I almost never encounter any problems. But if I start scrolling the table quickly, every query I send throws the error unavailable (14): Transport became inactive.
This usually happens when the number of streams in use exceeds 20.
channel log
[{Default}] connectionUtilizationChanged(id:streamsUsed:streamCapacity:)::ChannelProvider.swift::191:19: [API v2] → ChannelProvider::connectionUtilizationChanged(id:streamsUsed:streamCapacity:). Connection ID: ObjectIdentifier(0x000000011ee58900). Stream capacity: 2147483647. Streams used: 21
channel error
[{Default}] connectionClosed(id:error:)::ChannelProvider.swift::205:19: [API v2] → ChannelProvider::connectionClosed(id:error:). Connection ID: ObjectIdentifier(0x000000011ee58900). Error: Optional(NIOCore.ChannelError.ioOnClosedChannel)
request error
[{Default}] xxx()::XXXService.swift::67:23: [API] → XXXService::xxx(). XXX service completed with result: success(unavailable (14): Transport became inactive)
trace
2025-02-27T09:45:36+0100 trace [CallOptions] : call_state=closed grpc.conn.addr_local=10.179.0.18 grpc.conn.addr_remote=10.229.0.160 grpc_request_id=5BFD6881-842F-42AB-95F1-AC0E322817FE [GRPC] failing buffered writes
SPM:
Protobuf 1.25.2
gRPC 1.65.1
gRPC-Swift 1.21.0
Platform: iOS
Expected behaviour
I expect that the channel will normally handle this number of requests as we can see that it is only 20-30 requests.
Additional information
Options for each unary call:
static var unaryCallOptions: CallOptions {
let timeLimit: TimeLimit = .timeout(.seconds(5))
#if DEBUG
var logger = Logger(label: "[\(Self.self)]")
logger.logLevel = .trace
return CallOptions(timeLimit: timeLimit, logger: logger)
#else
return CallOptions(timeLimit: timeLimit)
#endif
}
Keepalive
let keepalive = ClientConnectionKeepalive(
interval: .minutes(5),
timeout: .seconds(30),
permitWithoutCalls: true
)
GRPCChannelPool
configuration:
let channel = try GRPCChannelPool.with(
target: .hostAndPort(
configuration.host,
configuration.port
),
transportSecurity: .tls(
configuration.tlsConfiguration
),
eventLoopGroup: configuration.eventLoopGroup,
{ [weak self] channelConfiguration in
channelConfiguration.keepalive = configuration.keepalive
channelConfiguration.delegate = self
channelConfiguration.connectionBackoff = ConnectionBackoff(
initialBackoff: 1,
maximumBackoff: 15,
multiplier: 1.5,
minimumConnectionTimeout: 15
)
#if DEBUG
var logger = Logger(label: "[Channel gRPC]")
logger.logLevel = .trace
channelConfiguration.backgroundActivityLogger = logger
#endif
}
)
Request example:
public func getData(
for id: UUID,
options: CallOptions
) async throws -> Data {
let client = try dependency.getClient()
async let task1 = client.getX1(
X1Request(uids: [id]),
callOptions: options
)
async let task2 = client.getX2(
X2Request(uids: [id]),
callOptions: options
)
let (response1, response2) = try await (task1, task1) // ! error !
return ...
}
Generated code:
public func getX(
_ request: Frontend_PpX_V1_XRequest,
callOptions: CallOptions? = nil
) async throws -> Frontend_PpX_V1_XResponse {
return try await self.performAsyncUnaryCall(
path: Frontend_PpX_V1_XServiceClientMetadata.Methods.getX.path,
request: request,
callOptions: callOptions ?? self.defaultCallOptions,
interceptors: self.interceptors?.makeXInterceptors() ?? []
)
}
public func performAsyncUnaryCall<Request: Message & Sendable, Response: Message & Sendable>(
path: String,
request: Request,
callOptions: CallOptions? = nil,
interceptors: [ClientInterceptor<Request, Response>] = [],
responseType: Response.Type = Response.self
) async throws -> Response {
let call = self.channel.makeAsyncUnaryCall(
path: path,
request: request,
callOptions: callOptions ?? self.defaultCallOptions,
interceptors: interceptors
)
return try await withTaskCancellationHandler {
try await call.response
} onCancel: {
call.cancel()
}
}