Skip to content

Commit e9e431c

Browse files
authored
Add EventLoop API that uses PostgresQuery (#265)
1 parent c1683ba commit e9e431c

File tree

5 files changed

+134
-89
lines changed

5 files changed

+134
-89
lines changed

Sources/PostgresNIO/Connection/PostgresConnection.swift

Lines changed: 65 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -294,7 +294,7 @@ public final class PostgresConnection {
294294

295295
// MARK: Query
296296

297-
func query(_ query: PostgresQuery, logger: Logger) -> EventLoopFuture<PSQLRowStream> {
297+
private func queryStream(_ query: PostgresQuery, logger: Logger) -> EventLoopFuture<PSQLRowStream> {
298298
var logger = logger
299299
logger[postgresMetadataKey: .connectionID] = "\(self.id)"
300300
guard query.binds.count <= Int(Int16.max) else {
@@ -433,6 +433,8 @@ extension PostgresConnection {
433433
}
434434
}
435435

436+
// MARK: Async/Await Interface
437+
436438
#if swift(>=5.5) && canImport(_Concurrency)
437439
extension PostgresConnection {
438440

@@ -489,7 +491,8 @@ extension PostgresConnection {
489491
let context = ExtendedQueryContext(
490492
query: query,
491493
logger: logger,
492-
promise: promise)
494+
promise: promise
495+
)
493496

494497
self.channel.write(PSQLTask.extendedQuery(context), promise: nil)
495498

@@ -498,7 +501,64 @@ extension PostgresConnection {
498501
}
499502
#endif
500503

501-
// MARK: PostgresDatabase
504+
// MARK: EventLoopFuture interface
505+
506+
extension PostgresConnection {
507+
508+
/// Run a query on the Postgres server the connection is connected to and collect all rows.
509+
///
510+
/// - Parameters:
511+
/// - query: The ``PostgresQuery`` to run
512+
/// - logger: The `Logger` to log into for the query
513+
/// - file: The file, the query was started in. Used for better error reporting.
514+
/// - line: The line, the query was started in. Used for better error reporting.
515+
/// - Returns: An EventLoopFuture, that allows access to the future ``PostgresQueryResult``.
516+
public func query(
517+
_ query: PostgresQuery,
518+
logger: Logger,
519+
file: String = #file,
520+
line: Int = #line
521+
) -> EventLoopFuture<PostgresQueryResult> {
522+
self.queryStream(query, logger: logger).flatMap { rowStream in
523+
rowStream.all().flatMapThrowing { rows -> PostgresQueryResult in
524+
guard let metadata = PostgresQueryMetadata(string: rowStream.commandTag) else {
525+
throw PSQLError.invalidCommandTag(rowStream.commandTag)
526+
}
527+
return PostgresQueryResult(metadata: metadata, rows: rows)
528+
}
529+
}
530+
}
531+
532+
/// Run a query on the Postgres server the connection is connected to and iterate the rows in a callback.
533+
///
534+
/// - Note: This API does not support back-pressure. If you need back-pressure please use the query
535+
/// API, that supports structured concurrency.
536+
/// - Parameters:
537+
/// - query: The ``PostgresQuery`` to run
538+
/// - logger: The `Logger` to log into for the query
539+
/// - file: The file, the query was started in. Used for better error reporting.
540+
/// - line: The line, the query was started in. Used for better error reporting.
541+
/// - onRow: A closure that is invoked for every row.
542+
/// - Returns: An EventLoopFuture, that allows access to the future ``PostgresQueryMetadata``.
543+
public func query(
544+
_ query: PostgresQuery,
545+
logger: Logger,
546+
file: String = #file,
547+
line: Int = #line,
548+
_ onRow: @escaping (PostgresRow) throws -> ()
549+
) -> EventLoopFuture<PostgresQueryMetadata> {
550+
self.queryStream(query, logger: logger).flatMap { rowStream in
551+
rowStream.onRow(onRow).flatMapThrowing { () -> PostgresQueryMetadata in
552+
guard let metadata = PostgresQueryMetadata(string: rowStream.commandTag) else {
553+
throw PSQLError.invalidCommandTag(rowStream.commandTag)
554+
}
555+
return metadata
556+
}
557+
}
558+
}
559+
}
560+
561+
// MARK: PostgresDatabase conformance
502562

503563
extension PostgresConnection: PostgresDatabase {
504564
public func send(
@@ -513,14 +573,14 @@ extension PostgresConnection: PostgresDatabase {
513573

514574
switch command {
515575
case .query(let query, let onMetadata, let onRow):
516-
resultFuture = self.query(query, logger: logger).flatMap { stream in
576+
resultFuture = self.queryStream(query, logger: logger).flatMap { stream in
517577
return stream.onRow(onRow).map { _ in
518578
onMetadata(PostgresQueryMetadata(string: stream.commandTag)!)
519579
}
520580
}
521581

522582
case .queryAll(let query, let onResult):
523-
resultFuture = self.query(query, logger: logger).flatMap { rows in
583+
resultFuture = self.queryStream(query, logger: logger).flatMap { rows in
524584
return rows.all().map { allrows in
525585
onResult(.init(metadata: PostgresQueryMetadata(string: rows.commandTag)!, rows: allrows))
526586
}

Sources/PostgresNIO/New/Connection State Machine/ConnectionStateMachine.swift

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1108,6 +1108,8 @@ extension ConnectionStateMachine {
11081108
return true
11091109
case .tooManyParameters:
11101110
return true
1111+
case .invalidCommandTag:
1112+
return true
11111113
case .connectionQuiescing:
11121114
preconditionFailure("Pure client error, that is thrown directly in PostgresConnection")
11131115
case .connectionClosed:

Sources/PostgresNIO/New/PSQLError.swift

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ struct PSQLError: Error {
1111
case unsupportedAuthMechanism(PSQLAuthScheme)
1212
case authMechanismRequiresPassword
1313
case saslError(underlyingError: Error)
14+
case invalidCommandTag(String)
1415

1516
case queryCancelled
1617
case tooManyParameters
@@ -60,6 +61,10 @@ struct PSQLError: Error {
6061
Self.init(.saslError(underlyingError: underlying))
6162
}
6263

64+
static func invalidCommandTag(_ value: String) -> PSQLError {
65+
Self.init(.invalidCommandTag(value))
66+
}
67+
6368
static var queryCancelled: PSQLError {
6469
Self.init(.queryCancelled)
6570
}

Sources/PostgresNIO/Postgres+PSQLCompat.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ extension PSQLError {
2626
return PostgresError.protocol("Unable to authenticate without password")
2727
case .saslError(underlyingError: let underlying):
2828
return underlying
29-
case .tooManyParameters:
29+
case .tooManyParameters, .invalidCommandTag:
3030
return self
3131
case .connectionQuiescing:
3232
return PostgresError.connectionClosed

0 commit comments

Comments
 (0)