diff --git a/Package.swift b/Package.swift index b0f247504..87ef9446b 100644 --- a/Package.swift +++ b/Package.swift @@ -19,6 +19,8 @@ let package = Package( .package(url: "https://github.com/apple/swift-nio.git", from: "2.81.0"), .package(url: "https://github.com/apple/swift-log.git", from: "1.6.3"), .package(url: "https://github.com/vapor/sql-kit.git", from: "3.32.0"), + .package(url: "https://github.com/apple/swift-distributed-tracing.git", from: "1.3.1"), + .package(url: "https://github.com/apple/swift-service-context.git", from: "1.0.0"), ], targets: [ .target( @@ -30,6 +32,8 @@ let package = Package( .product(name: "NIOPosix", package: "swift-nio"), .product(name: "Logging", package: "swift-log"), .product(name: "SQLKit", package: "sql-kit"), + .product(name: "Tracing", package: "swift-distributed-tracing"), + .product(name: "ServiceContextModule", package: "swift-service-context"), ], swiftSettings: swiftSettings ), @@ -65,6 +69,7 @@ let package = Package( .target(name: "FluentBenchmark"), .target(name: "FluentSQL"), .target(name: "XCTFluent"), + .product(name: "InMemoryTracing", package: "swift-distributed-tracing") ], swiftSettings: swiftSettings ), diff --git a/Sources/FluentKit/Concurrency/AsyncModelMiddleware.swift b/Sources/FluentKit/Concurrency/AsyncModelMiddleware.swift index e9838610c..fcf8f6946 100644 --- a/Sources/FluentKit/Concurrency/AsyncModelMiddleware.swift +++ b/Sources/FluentKit/Concurrency/AsyncModelMiddleware.swift @@ -11,33 +11,45 @@ public protocol AsyncModelMiddleware: AnyModelMiddleware { } extension AsyncModelMiddleware { - public func handle( + func handle( _ event: ModelEvent, _ model: any AnyModel, on db: any Database, - chainingTo next: any AnyModelResponder - ) -> EventLoopFuture { + chainingTo next: any AnyAsyncModelResponder + ) async throws { guard let modelType = (model as? Model) else { - return next.handle(event, model, on: db) + return try await next.handle(event, model, on: db) } - return db.eventLoop.makeFutureWithTask { - let responder = AsyncBasicModelResponder { responderEvent, responderModel, responderDB in - try await next.handle(responderEvent, responderModel, on: responderDB).get() - } + switch event { + case .create: + try await self.create(model: modelType, on: db, next: next) + case .update: + try await self.update(model: modelType, on: db, next: next) + case .delete(let force): + try await self.delete(model: modelType, force: force, on: db, next: next) + case .softDelete: + try await self.softDelete(model: modelType, on: db, next: next) + case .restore: + try await self.restore(model: modelType, on: db, next: next) + } + } - switch event { - case .create: - try await self.create(model: modelType, on: db, next: responder) - case .update: - try await self.update(model: modelType, on: db, next: responder) - case .delete(let force): - try await self.delete(model: modelType, force: force, on: db, next: responder) - case .softDelete: - try await self.softDelete(model: modelType, on: db, next: responder) - case .restore: - try await self.restore(model: modelType, on: db, next: responder) + public func handle( + _ event: ModelEvent, + _ model: any AnyModel, + on db: any Database, + chainingTo next: any AnyModelResponder + ) -> EventLoopFuture { + db.eventLoop.makeFutureWithTask { + let responder = AnyAsyncBasicModelResponder { responderModel, responderEvent, responderDB in + if let next = (next as? any AnyAsyncModelResponder) { + try await next.handle(responderModel, responderEvent, on: responderDB) + } else { + try await next.handle(responderModel, responderEvent, on: responderDB).get() + } } + try await self.handle(event, model, on: db, chainingTo: responder) } } diff --git a/Sources/FluentKit/Concurrency/Children+Concurrency.swift b/Sources/FluentKit/Concurrency/Children+Concurrency.swift index 47e976368..37b1e0bbe 100644 --- a/Sources/FluentKit/Concurrency/Children+Concurrency.swift +++ b/Sources/FluentKit/Concurrency/Children+Concurrency.swift @@ -2,15 +2,35 @@ import NIOCore public extension ChildrenProperty { func load(on database: any Database) async throws { - try await self.load(on: database).get() + self.value = try await self.query(on: database).all() } func create(_ to: To, on database: any Database) async throws { - try await self.create(to, on: database).get() + guard let id = self.idValue else { + fatalError("Cannot save child in relation \(self.name) to unsaved model.") + } + switch self.parentKey { + case .required(let keyPath): + to[keyPath: keyPath].id = id + case .optional(let keyPath): + to[keyPath: keyPath].id = id + } + return try await to.create(on: database) } func create(_ to: [To], on database: any Database) async throws { - try await self.create(to, on: database).get() + guard let id = self.idValue else { + fatalError("Cannot save child in relation \(self.name) to unsaved model.") + } + to.forEach { + switch self.parentKey { + case .required(let keyPath): + $0[keyPath: keyPath].id = id + case .optional(let keyPath): + $0[keyPath: keyPath].id = id + } + } + return try await to.create(on: database) } } diff --git a/Sources/FluentKit/Concurrency/Database+Concurrency.swift b/Sources/FluentKit/Concurrency/Database+Concurrency.swift index 787f48344..1eff5be5d 100644 --- a/Sources/FluentKit/Concurrency/Database+Concurrency.swift +++ b/Sources/FluentKit/Concurrency/Database+Concurrency.swift @@ -1,11 +1,14 @@ import NIOCore +import Tracing public extension Database { func execute( query: DatabaseQuery, onOutput: @escaping @Sendable (any DatabaseOutput) -> () ) async throws { - try await self.execute(query: query, onOutput: onOutput).get() + try await query.withTracing { + try await self.execute(query: query, onOutput: onOutput).get() + } } func execute( diff --git a/Sources/FluentKit/Concurrency/EnumBuilder+Concurrency.swift b/Sources/FluentKit/Concurrency/EnumBuilder+Concurrency.swift index d2b6f040a..9d584e31c 100644 --- a/Sources/FluentKit/Concurrency/EnumBuilder+Concurrency.swift +++ b/Sources/FluentKit/Concurrency/EnumBuilder+Concurrency.swift @@ -2,18 +2,64 @@ import NIOCore public extension EnumBuilder { func create() async throws -> DatabaseSchema.DataType { - try await self.create().get() + self.enum.action = .create + try await self.database.execute(enum: self.enum) + return try await self.generateDatatype() } func read() async throws -> DatabaseSchema.DataType { - try await self.read().get() + try await self.generateDatatype() } func update() async throws -> DatabaseSchema.DataType { - try await self.update().get() + self.enum.action = .update + try await self.database.execute(enum: self.enum) + return try await self.generateDatatype() } func delete() async throws { - try await self.delete().get() + self.enum.action = .delete + try await self.database.execute(enum: self.enum) + try await self.deleteMetadata() + } + + // MARK: Private + + func generateDatatype() async throws -> DatabaseSchema.DataType { + try await EnumMetadata.migration.prepare(on: self.database) + try await self.updateMetadata() + + let cases = try await EnumMetadata.query(on: self.database).filter(\.$name == self.enum.name).all() + + return .enum(.init( + name: self.enum.name, + cases: cases.map { $0.case } + )) + } + + private func updateMetadata() async throws { + // Create all new enum cases. + try await self.enum.createCases.map { + EnumMetadata(name: self.enum.name, case: $0) + }.create(on: self.database) + + // Delete all old enum cases. + try await EnumMetadata.query(on: self.database) + .filter(\.$name == self.enum.name) + .filter(\.$case ~~ self.enum.deleteCases) + .delete() + } + + private func deleteMetadata() async throws { + // Delete all cases for this enum. + try await EnumMetadata.query(on: self.database) + .filter(\.$name == self.enum.name) + .delete() + let count = try await EnumMetadata.query(on: self.database).count() + + // If no enums are left, remove table. + if count == 0 { + try await EnumMetadata.migration.revert(on: self.database) + } } } diff --git a/Sources/FluentKit/Concurrency/Model+Concurrency.swift b/Sources/FluentKit/Concurrency/Model+Concurrency.swift index 4bf3ae34c..f49664f1d 100644 --- a/Sources/FluentKit/Concurrency/Model+Concurrency.swift +++ b/Sources/FluentKit/Concurrency/Model+Concurrency.swift @@ -1,41 +1,191 @@ import NIOCore +import protocol SQLKit.SQLDatabase public extension Model { static func find( _ id: Self.IDValue?, on database: any Database ) async throws -> Self? { - try await self.find(id, on: database).get() + guard let id = id else { return nil } + return try await Self.query(on: database) + .filter(id: id) + .first() } // MARK: - CRUD func save(on database: any Database) async throws { - try await self.save(on: database).get() + if self._$idExists { + try await self.update(on: database) + } else { + try await self.create(on: database) + } } func create(on database: any Database) async throws { - try await self.create(on: database).get() + try await database.configuration.middleware.chainingTo(Self.self) { event, model, db in + try await model.handle(event, on: db) + }.handle(.create, self, on: database) + } + + private func _create(on database: any Database) async throws { + precondition(!self._$idExists) + self.touchTimestamps(.create, .update) + if self.anyID is any AnyQueryableProperty { + self.anyID.generate() + + nonisolated(unsafe) var output: (any DatabaseOutput)? + try await Self.query(on: database) + .set(self.collectInput(withDefaultedValues: database is any SQLDatabase)) + .action(.create) + .run { output = $0 } + + var input = self.collectInput() + if case .default = self._$id.inputValue { + let idKey = Self()._$id.key + // In theory, this shouldn't happen, but in case it does in some edge case, + // better to throw an error than crash with an IUO. + guard let output else { throw RunQueryError.noDatabaseOutput } + input[idKey] = try .bind(output.decode(idKey, as: Self.IDValue.self)) + } + try self.output(from: SavedInput(input)) + } else { + // non-ID case: run async and then perform the decoding step + try await Self.query(on: database) + .set(self.collectInput(withDefaultedValues: database is any SQLDatabase)) + .action(.create) + .run() + try self.output(from: SavedInput(self.collectInput())) + } } func update(on database: any Database) async throws { - try await self.update(on: database).get() + try await database.configuration.middleware.chainingTo(Self.self) { event, model, db in + try await model.handle(event, on: db) + }.handle(.update, self, on: database) + } + + private func _update(on database: any Database) async throws { + precondition(self._$idExists) + guard self.hasChanges else { return } + self.touchTimestamps(.update) + let input = self.collectInput() + guard let id = self.id else { throw FluentError.idRequired } + try await Self.query(on: database) + .filter(id: id) + .set(input) + .update() + try self.output(from: SavedInput(input)) } func delete(force: Bool = false, on database: any Database) async throws { - try await self.delete(force: force, on: database).get() + if !force, let timestamp = self.deletedTimestamp { + timestamp.touch() + try await database.configuration.middleware.chainingTo(Self.self) { event, model, db in + try await model.handle(event, on: db) + }.handle(.softDelete, self, on: database) + } else { + try await database.configuration.middleware.chainingTo(Self.self) { event, model, db in + try await model.handle(event, on: db) + }.handle(.delete(force), self, on: database) + } + } + + private func _delete(force: Bool = false, on database: any Database) async throws { + guard let id = self.id else { throw FluentError.idRequired } + try await Self.query(on: database) + .filter(id: id) + .delete(force: force) + if force || self.deletedTimestamp == nil { + self._$idExists = false + } } func restore(on database: any Database) async throws { - try await self.restore(on: database).get() + try await database.configuration.middleware.chainingTo(Self.self) { event, model, db in + try await model.handle(event, on: db) + }.handle(.restore, self, on: database) + } + + private func _restore(on database: any Database) async throws { + guard let timestamp = self.timestamps.filter({ $0.trigger == .delete }).first else { + fatalError("no delete timestamp on this model") + } + timestamp.touch(date: nil) + precondition(self._$idExists) + guard let id = self.id else { throw FluentError.idRequired } + let _: Void = try await Self.query(on: database) + .withDeleted() + .filter(id: id) + .set(self.collectInput()) + .action(.update) + .run() + + try self.output(from: SavedInput(self.collectInput())) + self._$idExists = true + } + + func handle(_ event: ModelEvent, on db: any Database) async throws -> Void { + switch event { + case .create: + try await _create(on: db) + case .delete(let force): + try await _delete(force: force, on: db) + case .restore: + try await _restore(on: db) + case .softDelete: + try await _delete(force: false, on: db) + case .update: + try await _update(on: db) + } } } public extension Collection where Element: FluentKit.Model, Self: Sendable { func delete(force: Bool = false, on database: any Database) async throws { - try await self.delete(force: force, on: database).get() + guard !self.isEmpty else { return } + + precondition(self.allSatisfy { $0._$idExists }) + for model in self { + try await database.configuration.middleware.chainingTo(Element.self) { event, model, db in + return + }.delete(model, force: force, on: database) + + try await Element.query(on: database) + .filter(ids: self.map { $0.id! }) + .delete(force: force) + + guard force else { return } + for model in self where model.deletedTimestamp == nil { + model._$idExists = false + } + } } func create(on database: any Database) async throws { - try await self.create(on: database).get() + guard !self.isEmpty else { return } + + precondition(self.allSatisfy { !$0._$idExists }) + + try await withThrowingTaskGroup { group in + for model in self { + group.addTask { + try await database.configuration.middleware.chainingTo(Element.self) { event, model, db in + if model.anyID is any AnyQueryableProperty { + model._$id.generate() + } + model.touchTimestamps(.create, .update) + }.create(model, on: database) + } + } + try await group.waitForAll() + } + + try await Element.query(on: database) + .set(self.map { $0.collectInput(withDefaultedValues: database is any SQLDatabase) }) + .create() + + for model in self { + model._$idExists = true + } } } diff --git a/Sources/FluentKit/Concurrency/ModelResponder+Concurrency.swift b/Sources/FluentKit/Concurrency/ModelResponder+Concurrency.swift index dff70113b..8db9b2285 100644 --- a/Sources/FluentKit/Concurrency/ModelResponder+Concurrency.swift +++ b/Sources/FluentKit/Concurrency/ModelResponder+Concurrency.swift @@ -38,7 +38,7 @@ extension AnyAsyncModelResponder { } } -internal struct AsyncBasicModelResponder: AnyAsyncModelResponder { +internal struct AnyAsyncBasicModelResponder: AnyAsyncModelResponder { private let _handle: @Sendable (ModelEvent, any AnyModel, any Database) async throws -> Void internal func handle(_ event: ModelEvent, _ model: any AnyModel, on db: any Database) async throws { @@ -49,3 +49,19 @@ internal struct AsyncBasicModelResponder: AnyAsyncModelResponder { self._handle = handle } } + +internal struct AsyncBasicModelResponder: AnyAsyncModelResponder where Model: FluentKit.Model { + private let _handle: @Sendable (ModelEvent, Model, any Database) async throws -> Void + + internal func handle(_ event: ModelEvent, _ model: any AnyModel, on db: any Database) async throws { + guard let modelType = model as? Model else { + fatalError("Could not convert type AnyModel to \(Model.self)") + } + + try await _handle(event, modelType, db) + } + + init(handle: @escaping @Sendable (ModelEvent, Model, any Database) async throws -> Void) { + self._handle = handle + } +} diff --git a/Sources/FluentKit/Concurrency/OptionalChild+Concurrency.swift b/Sources/FluentKit/Concurrency/OptionalChild+Concurrency.swift index 6e35237dc..33d0416a0 100644 --- a/Sources/FluentKit/Concurrency/OptionalChild+Concurrency.swift +++ b/Sources/FluentKit/Concurrency/OptionalChild+Concurrency.swift @@ -2,16 +2,25 @@ import NIOCore public extension OptionalChildProperty { func load(on database: any Database) async throws { - try await self.load(on: database).get() + try await self.query(on: database).first().map { self.value = $0 } } func create(_ to: To, on database: any Database) async throws { - try await self.create(to, on: database).get() + guard let id = self.idValue else { + fatalError("Cannot save child in \(self.name) to unsaved model in.") + } + switch self.parentKey { + case .required(let keyPath): + to[keyPath: keyPath].id = id + case .optional(let keyPath): + to[keyPath: keyPath].id = id + } + return try await to.create(on: database) } } public extension CompositeOptionalChildProperty { func load(on database: any Database) async throws { - try await self.load(on: database).get() + try await self.query(on: database).first().map { self.value = $0 } } } diff --git a/Sources/FluentKit/Concurrency/OptionalParent+Concurrency.swift b/Sources/FluentKit/Concurrency/OptionalParent+Concurrency.swift index 9388746de..137a9fe18 100644 --- a/Sources/FluentKit/Concurrency/OptionalParent+Concurrency.swift +++ b/Sources/FluentKit/Concurrency/OptionalParent+Concurrency.swift @@ -2,12 +2,12 @@ import NIOCore public extension OptionalParentProperty { func load(on database: any Database) async throws { - try await self.load(on: database).get() + self.value = try await self.query(on: database).first() } } public extension CompositeOptionalParentProperty { func load(on database: any Database) async throws { - try await self.load(on: database).get() + self.value = try await self.query(on: database).first() } } diff --git a/Sources/FluentKit/Concurrency/QueryBuilder+Concurrency.swift b/Sources/FluentKit/Concurrency/QueryBuilder+Concurrency.swift index aa4469dde..583b1da2f 100644 --- a/Sources/FluentKit/Concurrency/QueryBuilder+Concurrency.swift +++ b/Sources/FluentKit/Concurrency/QueryBuilder+Concurrency.swift @@ -1,224 +1,384 @@ +import NIOConcurrencyHelpers import NIOCore -public extension QueryBuilder { +extension QueryBuilder { // MARK: - Actions - func create() async throws { - try await self.create().get() + public func create() async throws { + self.query.action = .create + return try await self.run() } - - func update() async throws { - try await self.update().get() + + public func update() async throws { + self.query.action = .update + return try await self.run() } - - func delete(force: Bool = false) async throws { - try await self.delete(force: force).get() + + public func delete(force: Bool = false) async throws { + self.includeDeleted = force + self.shouldForceDelete = force + self.query.action = .delete + return try await self.run() } - + // MARK: - Fetch - - func chunk(max: Int, closure: @escaping @Sendable ([Result]) -> ()) async throws { - try await self.chunk(max: max, closure: closure).get() + + func chunk(max: Int, closure: @escaping @Sendable ([Result]) -> Void) async throws { + nonisolated(unsafe) var partial: [Result] = [] + partial.reserveCapacity(max) + + do { + try await self.all { row in + partial.append(row) + if partial.count >= max { + closure(partial) + partial.removeAll() + } + } + } catch { + if !partial.isEmpty { + closure(partial) + } + } } - - func first() async throws -> Model? { - try await self.first().get() + + public func first() async throws -> Model? { + try await self.limit(1).all().first } - - func all(_ key: KeyPath) async throws -> [Field.Value] - where Field: QueryableProperty, Field.Model == Model + + public func all(_ key: KeyPath) async throws -> [Field.Value] + where Field: QueryableProperty, Field.Model == Model { - try await self.all(key).get() + let copy = self.copy() + copy.query.fields = [.extendedPath(Model.path(for: key), schema: Model.schemaOrAlias, space: Model.spaceIfNotAliased)] + return try await copy.all().map { + $0[keyPath: key].value! + } + } - - func all( + + public func all( _ joined: Joined.Type, _ field: KeyPath ) async throws -> [Field.Value] - where Joined: Schema, Field: QueryableProperty, Field.Model == Joined + where + Joined: Schema, + Field: QueryableProperty, + Field.Model == Joined { - try await self.all(joined, field).get() + let copy = self.copy() + copy.query.fields = [.extendedPath(Joined.path(for: field), schema: Joined.schemaOrAlias, space: Joined.spaceIfNotAliased)] + return try await copy.all().map { + try $0.joined(Joined.self)[keyPath: field].value! + } } - func all() async throws -> [Model] { - try await self.all().get() + public func all() async throws -> [Model] { + nonisolated(unsafe) var models: [Result] = [] + try await self.all { models.append($0) } + return try models.map { try $0.get() } } - - func run() async throws { - try await self.run().get() + + public func run() async throws { + try await self.run { _ in } } - - func all(_ onOutput: @escaping @Sendable (Result) -> ()) async throws { - try await self.all(onOutput).get() + + public func all(_ onOutput: @escaping @Sendable (Result) -> Void) async throws { + nonisolated(unsafe) var all: [Model] = [] + + do { + try await self.run { output in + onOutput( + .init(catching: { + let model = Model() + try model.output(from: output.qualifiedSchema(space: Model.spaceIfNotAliased, Model.schemaOrAlias)) + all.append(model) + return model + })) + } + } catch { + // if eager loads exist, run them, and update models + if !self.eagerLoaders.isEmpty { + let loaders = self.eagerLoaders + let db = self.database + + // don't run eager loads if result set was empty + guard !all.isEmpty else { return } + + for loader in loaders { + try await loader.anyRun(models: all, on: db).get() + } + } + } } - - func run(_ onOutput: @escaping @Sendable (any DatabaseOutput) -> ()) async throws { - try await self.run(onOutput).get() + + public func run(_ onOutput: @escaping @Sendable (any DatabaseOutput) -> Void) async throws { + // make a copy of this query before mutating it + // so that run can be called multiple times + var query = self.query + + // If fields are not being manually selected, + // add fields from all models being queried. + if query.fields.isEmpty { + for model in self.models { + self.addFields(for: model, to: &query) + } + } + + // If deleted models aren't included, add filters + // to exclude them for each model being queried. + if !self.includeDeleted { + for model in self.models { + model.excludeDeleted(from: &query) + } + } + + // TODO: combine this logic with model+crud timestamps + let forceDelete = + Model.init().deletedTimestamp == nil + ? true : self.shouldForceDelete + switch query.action { + case .delete: + if !forceDelete { + query.action = .update + query.input = [.dictionary([:])] + self.addTimestamps(triggers: [.update, .delete], to: &query) + } + case .create: + self.addTimestamps(triggers: [.create, .update], to: &query) + case .update: + self.addTimestamps(triggers: [.update], to: &query) + default: + break + } + + // N.B.: We use `self.query` here instead of `query` so that the logging reflects the query the user requested, + // without having to deal with the noise of us having added default fields, or doing deletedAt checks, etc. + self.database.logger.debug("Running query", metadata: self.query.describedByLoggingMetadata) + self.database.history?.add(self.query) + + return try await self.database.execute(query: query) { output in + onOutput(output) + } } - + // MARK: - Aggregate - func count() async throws -> Int { - try await self.count().get() + public func count() async throws -> Int { + if Model().anyID is any AnyQueryableProperty { + try await self.count(\._$id) + } else if let fieldsIDType = Model.IDValue.self as? any Fields.Type { + try await self.aggregate(.count, fieldsIDType.keys.first!) + } else { + fatalError("Model '\(Model.self)' has neither @ID nor @CompositeID, this is not valid.") + } } - - func count(_ key: KeyPath) async throws -> Int - where Field: QueryableProperty, Field.Model == Model, Field.Value: Sendable + + public func count(_ key: KeyPath) async throws -> Int + where Field: QueryableProperty, Field.Model == Model, Field.Value: Sendable { - try await self.count(key).get() + try await self.aggregate(.count, key, as: Int.self) } - func count(_ key: KeyPath) async throws -> Int - where Field: QueryableProperty, Field.Model == Model.IDValue, Field.Value: Sendable + public func count(_ key: KeyPath) async throws -> Int + where Field: QueryableProperty, Field.Model == Model.IDValue, Field.Value: Sendable { - try await self.count(key).get() + try await self.aggregate(.count, key, as: Int.self) } - func sum(_ key: KeyPath) async throws -> Field.Value? - where Field: QueryableProperty, Field.Model == Model, Field.Value: Sendable + public func sum(_ key: KeyPath) async throws -> Field.Value? + where Field: QueryableProperty, Field.Model == Model, Field.Value: Sendable { - try await self.sum(key).get() + try await self.aggregate(.sum, key) } - - func sum(_ key: KeyPath) async throws -> Field.Value? - where Field: QueryableProperty, Field.Model == Model.IDValue, Field.Value: Sendable + + public func sum(_ key: KeyPath) async throws -> Field.Value? + where Field: QueryableProperty, Field.Model == Model.IDValue, Field.Value: Sendable { - try await self.sum(key).get() + try await self.aggregate(.sum, key) } - - func sum(_ key: KeyPath) async throws -> Field.Value - where Field: QueryableProperty, Field.Value: OptionalType & Sendable, Field.Model == Model + + public func sum(_ key: KeyPath) async throws -> Field.Value + where Field: QueryableProperty, Field.Value: OptionalType & Sendable, Field.Model == Model { - try await self.sum(key).get() + try await self.aggregate(.sum, key) } - - func sum(_ key: KeyPath) async throws -> Field.Value - where Field: QueryableProperty, Field.Value: OptionalType & Sendable, Field.Model == Model.IDValue + + public func sum(_ key: KeyPath) async throws -> Field.Value + where Field: QueryableProperty, Field.Value: OptionalType & Sendable, Field.Model == Model.IDValue { - try await self.sum(key).get() + try await self.aggregate(.sum, key) } - - func average(_ key: KeyPath) async throws -> Field.Value? - where Field: QueryableProperty, Field.Model == Model, Field.Value: Sendable + + public func average(_ key: KeyPath) async throws -> Field.Value? + where Field: QueryableProperty, Field.Model == Model, Field.Value: Sendable { - try await self.average(key).get() + try await self.aggregate(.average, key) } - - func average(_ key: KeyPath) async throws -> Field.Value? - where Field: QueryableProperty, Field.Model == Model.IDValue, Field.Value: Sendable + + public func average(_ key: KeyPath) async throws -> Field.Value? + where Field: QueryableProperty, Field.Model == Model.IDValue, Field.Value: Sendable { - try await self.average(key).get() + try await self.aggregate(.average, key) } - - func average(_ key: KeyPath) async throws -> Field.Value - where Field: QueryableProperty, Field.Value: OptionalType & Sendable, Field.Model == Model + + public func average(_ key: KeyPath) async throws -> Field.Value + where Field: QueryableProperty, Field.Value: OptionalType & Sendable, Field.Model == Model { - try await self.average(key).get() + try await self.aggregate(.average, key) } - - func average(_ key: KeyPath) async throws -> Field.Value - where Field: QueryableProperty, Field.Value: OptionalType & Sendable, Field.Model == Model.IDValue + + public func average(_ key: KeyPath) async throws -> Field.Value + where Field: QueryableProperty, Field.Value: OptionalType & Sendable, Field.Model == Model.IDValue { - try await self.average(key).get() + try await self.aggregate(.average, key) } - - func min(_ key: KeyPath) async throws -> Field.Value? - where Field: QueryableProperty, Field.Model == Model, Field.Value: Sendable + + public func min(_ key: KeyPath) async throws -> Field.Value? + where Field: QueryableProperty, Field.Model == Model, Field.Value: Sendable { - try await self.min(key).get() + try await self.aggregate(.average, key) } - - func min(_ key: KeyPath) async throws -> Field.Value? - where Field: QueryableProperty, Field.Model == Model.IDValue, Field.Value: Sendable + + public func min(_ key: KeyPath) async throws -> Field.Value? + where Field: QueryableProperty, Field.Model == Model.IDValue, Field.Value: Sendable { - try await self.min(key).get() + try await self.aggregate(.minimum, key) } - - func min(_ key: KeyPath) async throws -> Field.Value - where Field: QueryableProperty, Field.Value: OptionalType & Sendable, Field.Model == Model + + public func min(_ key: KeyPath) async throws -> Field.Value + where Field: QueryableProperty, Field.Value: OptionalType & Sendable, Field.Model == Model { - try await self.min(key).get() + try await self.aggregate(.minimum, key) } - - func min(_ key: KeyPath) async throws -> Field.Value - where Field: QueryableProperty, Field.Value: OptionalType & Sendable, Field.Model == Model.IDValue + + public func min(_ key: KeyPath) async throws -> Field.Value + where Field: QueryableProperty, Field.Value: OptionalType & Sendable, Field.Model == Model.IDValue { - try await self.min(key).get() + try await self.aggregate(.minimum, key) } - - func max(_ key: KeyPath) async throws -> Field.Value? - where Field: QueryableProperty, Field.Model == Model, Field.Value: Sendable + + public func max(_ key: KeyPath) async throws -> Field.Value? + where Field: QueryableProperty, Field.Model == Model, Field.Value: Sendable { - try await self.max(key).get() + try await self.aggregate(.maximum, key) } - - func max(_ key: KeyPath) async throws -> Field.Value? - where Field: QueryableProperty, Field.Model == Model.IDValue, Field.Value: Sendable + + public func max(_ key: KeyPath) async throws -> Field.Value? + where Field: QueryableProperty, Field.Model == Model.IDValue, Field.Value: Sendable { - try await self.max(key).get() + try await self.aggregate(.maximum, key) } - - func max(_ key: KeyPath) async throws -> Field.Value - where Field: QueryableProperty, Field.Value: OptionalType & Sendable, Field.Model == Model + + public func max(_ key: KeyPath) async throws -> Field.Value + where Field: QueryableProperty, Field.Value: OptionalType & Sendable, Field.Model == Model { - try await self.max(key).get() + try await self.aggregate(.maximum, key) } - - func max(_ key: KeyPath) async throws -> Field.Value - where Field: QueryableProperty, Field.Value: OptionalType & Sendable, Field.Model == Model.IDValue + + public func max(_ key: KeyPath) async throws -> Field.Value + where Field: QueryableProperty, Field.Value: OptionalType & Sendable, Field.Model == Model.IDValue { - try await self.max(key).get() + try await self.aggregate(.maximum, key) } - func aggregate( + public func aggregate( _ method: DatabaseQuery.Aggregate.Method, _ field: KeyPath, as type: Result.Type = Result.self ) async throws -> Result - where Field: QueryableProperty, Field.Model == Model, Result: Codable & Sendable + where Field: QueryableProperty, Field.Model == Model, Result: Codable & Sendable { - try await self.aggregate(method, field, as: type).get() + try await self.aggregate(method, Model.path(for: field), as: Result.self) } - - func aggregate( + + public func aggregate( _ method: DatabaseQuery.Aggregate.Method, _ field: KeyPath, as type: Result.Type = Result.self ) async throws -> Result - where Field: QueryableProperty, Field.Model == Model.IDValue, Result: Codable & Sendable + where Field: QueryableProperty, Field.Model == Model.IDValue, Result: Codable & Sendable { - try await self.aggregate(method, field, as: type).get() + try await self.aggregate(method, Model.path(for: field), as: Result.self) } - - func aggregate( + + public func aggregate( _ method: DatabaseQuery.Aggregate.Method, _ field: FieldKey, as type: Result.Type = Result.self ) async throws -> Result - where Result: Codable & Sendable + where Result: Codable & Sendable { - try await self.aggregate(method, field, as: type).get() + try await self.aggregate(method, [field], as: Result.self) } - - func aggregate( + + public func aggregate( _ method: DatabaseQuery.Aggregate.Method, _ path: [FieldKey], as type: Result.Type = Result.self ) async throws -> Result - where Result: Codable & Sendable + where Result: Codable & Sendable { - try await self.aggregate(method, path, as: type).get() + try await self.aggregate( + .field( + .extendedPath(path, schema: Model.schemaOrAlias, space: Model.spaceIfNotAliased), + method + ), + as: Result.self + ) } - + + public func aggregate( + _ aggregate: DatabaseQuery.Aggregate, + as: Result.Type = Result.self + ) async throws -> Result + where Result: Codable & Sendable + { + let copy = self.copy() + + copy.eagerLoaders = .init() + copy.query.sorts = [] + copy.query.action = .aggregate(aggregate) + + nonisolated(unsafe) var output: (any DatabaseOutput)? + try await copy.run { output = $0 } + + // In theory, this shouldn't happen, but in case it does in some edge case, + // better to throw an error than crash with an IUO. + guard let output else { throw RunQueryError.noDatabaseOutput } + + return try output.decode(.aggregate, as: Result.self) + } + // MARK: - Paginate - func paginate( + public func paginate( _ request: PageRequest ) async throws -> Page { - try await self.paginate(request).get() + try await self.page(withIndex: request.page, size: request.per) } - - func page( + + public func page( withIndex page: Int, size per: Int ) async throws -> Page { - try await self.page(withIndex: page, size: per).get() + let trimmedRequest: PageRequest = + if let pageSizeLimit = database.context.pageSizeLimit { + .init( + page: Swift.max(page, 1), + per: Swift.max(Swift.min(per, pageSizeLimit), 1) + ) + } else { + .init(page: Swift.max(page, 1), per: Swift.max(per, 1)) + } + + let total = try await self.count() + let items = try await self.copy().range(trimmedRequest.start.. EventLoopFuture { - self.enum.action = .create - return self.database.execute(enum: self.enum).flatMap { - self.generateDatatype() + self.database.eventLoop.makeFutureWithTask { + try await self.create() } } public func read() -> EventLoopFuture { - self.generateDatatype() + self.database.eventLoop.makeFutureWithTask { + try await self.generateDatatype() + } } public func update() -> EventLoopFuture { - self.enum.action = .update - return self.database.execute(enum: self.enum).flatMap { - self.generateDatatype() + self.database.eventLoop.makeFutureWithTask { + try await self.update() } } public func delete() -> EventLoopFuture { - self.enum.action = .delete - return self.database.execute(enum: self.enum).flatMap { - self.deleteMetadata() - } - } - - // MARK: Private - - private func generateDatatype() -> EventLoopFuture { - EnumMetadata.migration.prepare(on: self.database).flatMap { - self.updateMetadata() - }.flatMap { _ in - // Fetch the latest cases. - EnumMetadata.query(on: self.database).filter(\.$name == self.enum.name).all() - }.map { cases in - // Convert latest cases to usable DataType. - .enum(.init( - name: self.enum.name, - cases: cases.map { $0.case } - )) - } - } - - private func updateMetadata() -> EventLoopFuture { - // Create all new enum cases. - let create = self.enum.createCases.map { - EnumMetadata(name: self.enum.name, case: $0) - }.create(on: self.database) - // Delete all old enum cases. - let delete = EnumMetadata.query(on: self.database) - .filter(\.$name == self.enum.name) - .filter(\.$case ~~ self.enum.deleteCases) - .delete() - return create.and(delete).map { _ in } - } - - private func deleteMetadata() -> EventLoopFuture { - // Delete all cases for this enum. - EnumMetadata.query(on: self.database) - .filter(\.$name == self.enum.name) - .delete() - .flatMap - { _ in - EnumMetadata.query(on: self.database).count() - }.flatMap { count in - // If no enums are left, remove table. - if count == 0 { - return EnumMetadata.migration.revert(on: self.database) - } else { - return self.database.eventLoop.makeSucceededFuture(()) - } + self.database.eventLoop.makeFutureWithTask { + try await self.delete() } } } diff --git a/Sources/FluentKit/Enum/EnumMetadata.swift b/Sources/FluentKit/Enum/EnumMetadata.swift index 78c397e4d..f7cc6e096 100644 --- a/Sources/FluentKit/Enum/EnumMetadata.swift +++ b/Sources/FluentKit/Enum/EnumMetadata.swift @@ -4,7 +4,7 @@ import Foundation final class EnumMetadata: Model, @unchecked Sendable { static let schema = "_fluent_enums" - static var migration: any Migration { + static var migration: any AsyncMigration { EnumMetadataMigration() } @@ -26,9 +26,9 @@ final class EnumMetadata: Model, @unchecked Sendable { } } -private struct EnumMetadataMigration: Migration { - func prepare(on database: any Database) -> EventLoopFuture { - database.schema(EnumMetadata.schema) +private struct EnumMetadataMigration: AsyncMigration { + func prepare(on database: any Database) async throws { + try await database.schema(EnumMetadata.schema) .id() .field("name", .string, .required) .field("case", .string, .required) @@ -37,7 +37,7 @@ private struct EnumMetadataMigration: Migration { .create() } - func revert(on database: any Database) -> EventLoopFuture { - database.schema(EnumMetadata.schema).delete() + func revert(on database: any Database) async throws { + try await database.schema(EnumMetadata.schema).delete() } } diff --git a/Sources/FluentKit/Middleware/ModelMiddleware.swift b/Sources/FluentKit/Middleware/ModelMiddleware.swift index 1a75d7095..d9c0793a4 100644 --- a/Sources/FluentKit/Middleware/ModelMiddleware.swift +++ b/Sources/FluentKit/Middleware/ModelMiddleware.swift @@ -64,6 +64,10 @@ extension AnyModelMiddleware { func makeResponder(chainingTo responder: any AnyModelResponder) -> any AnyModelResponder { ModelMiddlewareResponder(middleware: self, responder: responder) } + + func makeAsyncResponser(chainingTo responder: any AnyAsyncModelResponder) -> any AnyAsyncModelResponder { + AsyncModelMiddlewareResponder(middleware: self, responder: responder) + } } extension Array where Element == any AnyModelMiddleware { @@ -77,6 +81,17 @@ extension Array where Element == any AnyModelMiddleware { } return responder } + + internal func chainingTo( + _ type: Model.Type, + closure: @escaping @Sendable (ModelEvent, Model, any Database) async throws -> Void + ) -> any AnyAsyncModelResponder where Model: FluentKit.Model { + var responder: any AnyAsyncModelResponder = AsyncBasicModelResponder(handle: closure) + for middleware in reversed() { + responder = middleware.makeAsyncResponser(chainingTo: responder) + } + return responder + } } private struct ModelMiddlewareResponder: AnyModelResponder { @@ -88,6 +103,19 @@ private struct ModelMiddlewareResponder: AnyModelResponder { } } +private struct AsyncModelMiddlewareResponder: AnyAsyncModelResponder { + var middleware: any AnyModelMiddleware + var responder: any AnyModelResponder + + func handle(_ event: ModelEvent, _ model: any AnyModel, on db: any Database) async throws { + if let middleware = (middleware as? any AsyncModelMiddleware), let responder = (responder as? any AnyAsyncModelResponder) { + try await middleware.handle(event, model, on: db, chainingTo: responder) + } else { + try await middleware.handle(event, model, on: db, chainingTo: responder).get() + } + } +} + public enum ModelEvent: Sendable { case create case update diff --git a/Sources/FluentKit/Model/Model+CRUD.swift b/Sources/FluentKit/Model/Model+CRUD.swift index 3d4a9fd2e..bb7fbafd7 100644 --- a/Sources/FluentKit/Model/Model+CRUD.swift +++ b/Sources/FluentKit/Model/Model+CRUD.swift @@ -3,189 +3,46 @@ import protocol SQLKit.SQLDatabase extension Model { public func save(on database: any Database) -> EventLoopFuture { - if self._$idExists { - self.update(on: database) - } else { - self.create(on: database) + database.eventLoop.makeFutureWithTask { + try await self.save(on: database) } } public func create(on database: any Database) -> EventLoopFuture { - return database.configuration.middleware.chainingTo(Self.self) { event, model, db in - try model.handle(event, on: db) - }.handle(.create, self, on: database) - } - - private func _create(on database: any Database) -> EventLoopFuture { - precondition(!self._$idExists) - self.touchTimestamps(.create, .update) - if self.anyID is any AnyQueryableProperty { - self.anyID.generate() - let promise = database.eventLoop.makePromise(of: (any DatabaseOutput).self) - Self.query(on: database) - .set(self.collectInput(withDefaultedValues: database is any SQLDatabase)) - .action(.create) - .run { promise.succeed($0) } - .cascadeFailure(to: promise) - return promise.futureResult.flatMapThrowing { output in - var input = self.collectInput() - if case .default = self._$id.inputValue { - let idKey = Self()._$id.key - input[idKey] = try .bind(output.decode(idKey, as: Self.IDValue.self)) - } - try self.output(from: SavedInput(input)) - } - } else { - return Self.query(on: database) - .set(self.collectInput(withDefaultedValues: database is any SQLDatabase)) - .action(.create) - .run() - .flatMapThrowing { - try self.output(from: SavedInput(self.collectInput())) - } + database.eventLoop.makeFutureWithTask { + try await self.create(on: database) } } - + public func update(on database: any Database) -> EventLoopFuture { - database.configuration.middleware.chainingTo(Self.self) { event, model, db in - try model.handle(event, on: db) - }.handle(.update, self, on: database) - } - - private func _update(on database: any Database) throws -> EventLoopFuture { - precondition(self._$idExists) - guard self.hasChanges else { - return database.eventLoop.makeSucceededFuture(()) - } - self.touchTimestamps(.update) - let input = self.collectInput() - guard let id = self.id else { throw FluentError.idRequired } - return Self.query(on: database) - .filter(id: id) - .set(input) - .update() - .flatMapThrowing - { - try self.output(from: SavedInput(input)) + database.eventLoop.makeFutureWithTask { + try await self.update(on: database) } } public func delete(force: Bool = false, on database: any Database) -> EventLoopFuture { - if !force, let timestamp = self.deletedTimestamp { - timestamp.touch() - return database.configuration.middleware.chainingTo(Self.self) { event, model, db in - try model.handle(event, on: db) - }.handle(.softDelete, self, on: database) - } else { - return database.configuration.middleware.chainingTo(Self.self) { event, model, db in - try model.handle(event, on: db) - }.handle(.delete(force), self, on: database) - } - } - - private func _delete(force: Bool = false, on database: any Database) throws -> EventLoopFuture { - guard let id = self.id else { throw FluentError.idRequired } - return Self.query(on: database) - .filter(id: id) - .delete(force: force) - .map - { - if force || self.deletedTimestamp == nil { - self._$idExists = false - } + database.eventLoop.makeFutureWithTask { + try await self.delete(force: force, on: database) } } public func restore(on database: any Database) -> EventLoopFuture { - database.configuration.middleware.chainingTo(Self.self) { event, model, db in - try model.handle(event, on: db) - }.handle(.restore, self, on: database) - } - - private func _restore(on database: any Database) throws -> EventLoopFuture { - guard let timestamp = self.timestamps.filter({ $0.trigger == .delete }).first else { - fatalError("no delete timestamp on this model") - } - timestamp.touch(date: nil) - precondition(self._$idExists) - guard let id = self.id else { throw FluentError.idRequired } - return Self.query(on: database) - .withDeleted() - .filter(id: id) - .set(self.collectInput()) - .action(.update) - .run() - .flatMapThrowing - { - try self.output(from: SavedInput(self.collectInput())) - self._$idExists = true - } - } - - private func handle(_ event: ModelEvent, on db: any Database) throws -> EventLoopFuture { - switch event { - case .create: - _create(on: db) - case .delete(let force): - try _delete(force: force, on: db) - case .restore: - try _restore(on: db) - case .softDelete: - try _delete(force: false, on: db) - case .update: - try _update(on: db) + database.eventLoop.makeFutureWithTask { + try await self.restore(on: database) } } } extension Collection where Element: FluentKit.Model, Self: Sendable { public func delete(force: Bool = false, on database: any Database) -> EventLoopFuture { - guard !self.isEmpty else { - return database.eventLoop.makeSucceededFuture(()) - } - - precondition(self.allSatisfy { $0._$idExists }) - - return EventLoopFuture.andAllSucceed(self.map { model in - database.configuration.middleware.chainingTo(Element.self) { event, model, db in - db.eventLoop.makeSucceededFuture(()) - }.delete(model, force: force, on: database) - }, on: database.eventLoop).flatMap { - Element.query(on: database) - .filter(ids: self.map { $0.id! }) - .delete(force: force) - }.map { - guard force else { return } - - for model in self where model.deletedTimestamp == nil { - model._$idExists = false - } + database.eventLoop.makeFutureWithTask { + try await self.delete(force: force, on: database) } } public func create(on database: any Database) -> EventLoopFuture { - guard !self.isEmpty else { - return database.eventLoop.makeSucceededFuture(()) - } - - precondition(self.allSatisfy { !$0._$idExists }) - - return EventLoopFuture.andAllSucceed(self.enumerated().map { idx, model in - database.configuration.middleware.chainingTo(Element.self) { event, model, db in - if model.anyID is any AnyQueryableProperty { - model._$id.generate() - } - model.touchTimestamps(.create, .update) - return db.eventLoop.makeSucceededFuture(()) - }.create(model, on: database) - }, on: database.eventLoop).flatMap { - Element.query(on: database) - .set(self.map { $0.collectInput(withDefaultedValues: database is any SQLDatabase) }) - .create() - }.map { - for model in self { - model._$idExists = true - } + database.eventLoop.makeFutureWithTask { + try await self.create(on: database) } } } @@ -199,7 +56,7 @@ public enum MiddlewareFailureHandler { // MARK: Private -private struct SavedInput: DatabaseOutput { +struct SavedInput: DatabaseOutput { var input: [FieldKey: DatabaseQuery.Value] init(_ input: [FieldKey: DatabaseQuery.Value]) { diff --git a/Sources/FluentKit/Model/Model.swift b/Sources/FluentKit/Model/Model.swift index 175821bb4..6e84d673e 100644 --- a/Sources/FluentKit/Model/Model.swift +++ b/Sources/FluentKit/Model/Model.swift @@ -14,12 +14,9 @@ extension Model { _ id: Self.IDValue?, on database: any Database ) -> EventLoopFuture { - guard let id = id else { - return database.eventLoop.makeSucceededFuture(nil) + database.eventLoop.makeFutureWithTask { + try await self.find(id, on: database) } - return Self.query(on: database) - .filter(id: id) - .first() } public func requireID() throws -> IDValue { diff --git a/Sources/FluentKit/Properties/Children.swift b/Sources/FluentKit/Properties/Children.swift index f8dacb624..1c9358e1b 100644 --- a/Sources/FluentKit/Properties/Children.swift +++ b/Sources/FluentKit/Properties/Children.swift @@ -66,31 +66,15 @@ public final class ChildrenProperty: @unchecked Sendable } public func create(_ to: [To], on database: any Database) -> EventLoopFuture { - guard let id = self.idValue else { - fatalError("Cannot save child in relation \(self.name) to unsaved model.") - } - to.forEach { - switch self.parentKey { - case .required(let keyPath): - $0[keyPath: keyPath].id = id - case .optional(let keyPath): - $0[keyPath: keyPath].id = id - } + database.eventLoop.makeFutureWithTask { + try await self.create(to, on: database) } - return to.create(on: database) } public func create(_ to: To, on database: any Database) -> EventLoopFuture { - guard let id = self.idValue else { - fatalError("Cannot save child in relation \(self.name) to unsaved model.") + database.eventLoop.makeFutureWithTask { + try await self.create(to, on: database) } - switch self.parentKey { - case .required(let keyPath): - to[keyPath: keyPath].id = id - case .optional(let keyPath): - to[keyPath: keyPath].id = id - } - return to.create(on: database) } } @@ -207,6 +191,12 @@ private struct ChildrenEagerLoader: EagerLoader let withDeleted: Bool func run(models: [From], on database: any Database) -> EventLoopFuture { + database.eventLoop.makeFutureWithTask { + try await self.run(models: models, on: database) + } + } + + func run(models: [From], on database: any Database) async throws { let ids = models.map { $0.id! } let builder = To.query(on: database) @@ -220,16 +210,15 @@ private struct ChildrenEagerLoader: EagerLoader if (self.withDeleted) { builder.withDeleted() } - return builder.all().map { - for model in models { - let id = model[keyPath: self.relationKey].idValue! - model[keyPath: self.relationKey].value = $0.filter { child in - switch parentKey { - case .optional(let optional): - return child[keyPath: optional].id == id - case .required(let required): - return child[keyPath: required].id == id - } + let result = try await builder.all() + for model in models { + let id = model[keyPath: self.relationKey].idValue! + model[keyPath: self.relationKey].value = result.filter { child in + switch parentKey { + case .optional(let optional): + return child[keyPath: optional].id == id + case .required(let required): + return child[keyPath: required].id == id } } } diff --git a/Sources/FluentKit/Properties/CompositeChildren.swift b/Sources/FluentKit/Properties/CompositeChildren.swift index d9dd329c5..b09135ab0 100644 --- a/Sources/FluentKit/Properties/CompositeChildren.swift +++ b/Sources/FluentKit/Properties/CompositeChildren.swift @@ -192,6 +192,12 @@ private struct CompositeChildrenEagerLoader: EagerLoader let withDeleted: Bool func run(models: [From], on database: any Database) -> EventLoopFuture { + database.eventLoop.makeFutureWithTask { + try await self.run(models: models, on: database) + } + } + + private func run(models: [From], on database: any Database) async throws { let ids = Set(models.map(\.id!)) let parentKey = From()[keyPath: self.relationKey].parentKey let builder = To.query(on: database) @@ -202,13 +208,13 @@ private struct CompositeChildrenEagerLoader: EagerLoader if self.withDeleted { builder.withDeleted() } - - return builder.all().map { - let indexedResults = Dictionary(grouping: $0, by: { parentKey.referencedId(in: $0)! }) - - for model in models { - model[keyPath: self.relationKey].value = indexedResults[model[keyPath: self.relationKey].idValue!] ?? [] - } + + let result = try await builder.all() + + let indexedResults = Dictionary(grouping: result, by: { parentKey.referencedId(in: $0)! }) + + for model in models { + model[keyPath: self.relationKey].value = indexedResults[model[keyPath: self.relationKey].idValue!] ?? [] } } } diff --git a/Sources/FluentKit/Properties/CompositeOptionalChild.swift b/Sources/FluentKit/Properties/CompositeOptionalChild.swift index 7ff6a62fd..14d8dc889 100644 --- a/Sources/FluentKit/Properties/CompositeOptionalChild.swift +++ b/Sources/FluentKit/Properties/CompositeOptionalChild.swift @@ -95,7 +95,7 @@ public final class CompositeOptionalChildProperty: @unchecked Sendable set { self.idValue = newValue } } - public func query(on database:any Database) -> QueryBuilder { + public func query(on database: any Database) -> QueryBuilder { guard let id = self.idValue else { fatalError("Cannot query child relation \(self.name) from unsaved model.") } @@ -144,7 +144,7 @@ extension CompositeOptionalChildProperty: AnyCodableProperty { extension CompositeOptionalChildProperty: Relation { public var name: String { "CompositeOptionalChild<\(From.self), \(To.self)>(for: \(self.parentKey))" } - public func load(on database: any Database) -> EventLoopFuture { self.query(on: database).first().map { self.value = $0 } } + public func load(on database: any Database) -> EventLoopFuture { database.eventLoop.makeFutureWithTask { try await self.load(on: database) } } } extension CompositeOptionalChildProperty: EagerLoadable { @@ -177,6 +177,12 @@ private struct CompositeOptionalChildEagerLoader: EagerLoader let withDeleted: Bool func run(models: [From], on database: any Database) -> EventLoopFuture { + database.eventLoop.makeFutureWithTask { + try await self.run(models: models, on: database) + } + } + + private func run(models: [From], on database: any Database) async throws { let ids = Set(models.map(\.id!)) let parentKey = From()[keyPath: self.relationKey].parentKey let builder = To.query(on: database) @@ -187,12 +193,12 @@ private struct CompositeOptionalChildEagerLoader: EagerLoader if self.withDeleted { builder.withDeleted() } - return builder.all().map { - let indexedResults = Dictionary(grouping: $0, by: { parentKey.referencedId(in: $0)! }) - - for model in models { - model[keyPath: self.relationKey].value = indexedResults[model[keyPath: self.relationKey].idValue!]?.first - } + + let result = try await builder.all() + let indexedResults = Dictionary(grouping: result, by: { parentKey.referencedId(in: $0)! }) + + for model in models { + model[keyPath: self.relationKey].value = indexedResults[model[keyPath: self.relationKey].idValue!]?.first } } } @@ -204,6 +210,6 @@ private struct ThroughCompositeOptionalChildEagerLoader: let loader: Loader func run(models: [From], on database: any Database) -> EventLoopFuture { - return self.loader.run(models: models.compactMap { $0[keyPath: self.relationKey].value! }, on: database) + self.loader.run(models: models.compactMap { $0[keyPath: self.relationKey].value! }, on: database) } } diff --git a/Sources/FluentKit/Properties/CompositeOptionalParent.swift b/Sources/FluentKit/Properties/CompositeOptionalParent.swift index 5623371c3..a9bebf5d2 100644 --- a/Sources/FluentKit/Properties/CompositeOptionalParent.swift +++ b/Sources/FluentKit/Properties/CompositeOptionalParent.swift @@ -127,11 +127,9 @@ extension CompositeOptionalParentProperty: Relation { } public func load(on database: any Database) -> EventLoopFuture { - self.query(on: database) - .first() - .map { - self.value = $0 - } + database.eventLoop.makeFutureWithTask { + try await self.load(on: database) + } } } diff --git a/Sources/FluentKit/Properties/CompositeParent.swift b/Sources/FluentKit/Properties/CompositeParent.swift index 45c8bda9d..c99786d59 100644 --- a/Sources/FluentKit/Properties/CompositeParent.swift +++ b/Sources/FluentKit/Properties/CompositeParent.swift @@ -194,6 +194,12 @@ private struct CompositeParentEagerLoader: EagerLoader let withDeleted: Bool func run(models: [From], on database: any Database) -> EventLoopFuture { + database.eventLoop.makeFutureWithTask { + try await self.run(models: models, on: database) + } + } + + private func run(models: [From], on database: any Database) async throws { let sets = Dictionary(grouping: models, by: { $0[keyPath: self.relationKey].id }) let builder = To.query(on: database) @@ -203,23 +209,22 @@ private struct CompositeParentEagerLoader: EagerLoader if self.withDeleted { builder.withDeleted() } - return builder.all() - .flatMapThrowing { - let parents = Dictionary(uniqueKeysWithValues: $0.map { ($0.id!, $0) }) - - for (parentId, models) in sets { - guard let parent = parents[parentId] else { - database.logger.debug( - "Missing parent model in eager-load lookup results.", - metadata: ["parent": "\(To.self)", "id": "\(parentId)"] - ) - throw FluentError.missingParentError(keyPath: self.relationKey, id: parentId) - } - models.forEach { - $0[keyPath: self.relationKey].value = parent - } - } + + let result = try await builder.all() + let parents = Dictionary(uniqueKeysWithValues: result.map { ($0.id!, $0) }) + + for (parentId, models) in sets { + guard let parent = parents[parentId] else { + database.logger.debug( + "Missing parent model in eager-load lookup results.", + metadata: ["parent": "\(To.self)", "id": "\(parentId)"] + ) + throw FluentError.missingParentError(keyPath: self.relationKey, id: parentId) + } + models.forEach { + $0[keyPath: self.relationKey].value = parent } + } } } diff --git a/Sources/FluentKit/Properties/OptionalChild.swift b/Sources/FluentKit/Properties/OptionalChild.swift index ea19915ae..8aa93738f 100644 --- a/Sources/FluentKit/Properties/OptionalChild.swift +++ b/Sources/FluentKit/Properties/OptionalChild.swift @@ -66,16 +66,9 @@ public final class OptionalChildProperty: @unchecked Sendable } public func create(_ to: To, on database: any Database) -> EventLoopFuture { - guard let id = self.idValue else { - fatalError("Cannot save child in \(self.name) to unsaved model in.") - } - switch self.parentKey { - case .required(let keyPath): - to[keyPath: keyPath].id = id - case .optional(let keyPath): - to[keyPath: keyPath].id = id + database.eventLoop.makeFutureWithTask { + try await self.create(to, on: database) } - return to.create(on: database) } } @@ -140,8 +133,8 @@ extension OptionalChildProperty: Relation { } public func load(on database: any Database) -> EventLoopFuture { - self.query(on: database).first().map { - self.value = $0 + database.eventLoop.makeFutureWithTask { + try await self.load(on: database) } } } diff --git a/Sources/FluentKit/Properties/OptionalParent.swift b/Sources/FluentKit/Properties/OptionalParent.swift index df2232b20..bf2339869 100644 --- a/Sources/FluentKit/Properties/OptionalParent.swift +++ b/Sources/FluentKit/Properties/OptionalParent.swift @@ -63,8 +63,8 @@ extension OptionalParentProperty: Relation { } public func load(on database: any Database) -> EventLoopFuture { - self.query(on: database).first().map { - self.value = $0 + database.eventLoop.makeFutureWithTask { + try await self.load(on: database) } } } @@ -170,6 +170,12 @@ private struct OptionalParentEagerLoader: EagerLoader let withDeleted: Bool func run(models: [From], on database: any Database) -> EventLoopFuture { + database.eventLoop.makeFutureWithTask { + try await self.run(models: models, on: database) + } + } + + private func run(models: [From], on database: any Database) async throws { var _sets = Dictionary(grouping: models, by: { $0[keyPath: self.relationKey].id }) let nilParentModels = _sets.removeValue(forKey: nil) ?? [] let sets = _sets @@ -177,28 +183,29 @@ private struct OptionalParentEagerLoader: EagerLoader if sets.isEmpty { // Fetching "To" objects is unnecessary when no models have an id for "To". nilParentModels.forEach { $0[keyPath: self.relationKey].value = .some(.none) } - return database.eventLoop.makeSucceededVoidFuture() + return } let builder = To.query(on: database).filter(\._$id ~~ Set(sets.keys.compactMap { $0 })) if self.withDeleted { builder.withDeleted() } - return builder.all().flatMapThrowing { - let parents = Dictionary(uniqueKeysWithValues: $0.map { ($0.id!, $0) }) - - for (parentId, models) in sets { - guard let parent = parents[parentId!] else { - database.logger.debug( - "Missing parent model in eager-load lookup results.", - metadata: ["parent": .string("\(To.self)"), "id": .string("\(parentId!)")] - ) - throw FluentError.missingParentError(keyPath: self.relationKey, id: parentId!) - } - models.forEach { $0[keyPath: self.relationKey].value = .some(.some(parent)) } + + let result = try await builder.all() + + let parents = Dictionary(uniqueKeysWithValues: result.map { ($0.id!, $0) }) + + for (parentId, models) in sets { + guard let parent = parents[parentId!] else { + database.logger.debug( + "Missing parent model in eager-load lookup results.", + metadata: ["parent": .string("\(To.self)"), "id": .string("\(parentId!)")] + ) + throw FluentError.missingParentError(keyPath: self.relationKey, id: parentId!) } - nilParentModels.forEach { $0[keyPath: self.relationKey].value = .some(.none) } + models.forEach { $0[keyPath: self.relationKey].value = .some(.some(parent)) } } + nilParentModels.forEach { $0[keyPath: self.relationKey].value = .some(.none) } } } diff --git a/Sources/FluentKit/Properties/Parent.swift b/Sources/FluentKit/Properties/Parent.swift index 310fe2eb7..abe3d3d22 100644 --- a/Sources/FluentKit/Properties/Parent.swift +++ b/Sources/FluentKit/Properties/Parent.swift @@ -165,24 +165,29 @@ private struct ParentEagerLoader: EagerLoader let withDeleted: Bool func run(models: [From], on database: any Database) -> EventLoopFuture { + database.eventLoop.makeFutureWithTask { + try await self.run(models: models, on: database) + } + } + + private func run(models: [From], on database: any Database) async throws { let sets = Dictionary(grouping: models, by: { $0[keyPath: self.relationKey].id }) let builder = To.query(on: database).filter(\._$id ~~ Set(sets.keys)) if self.withDeleted { builder.withDeleted() } - return builder.all().flatMapThrowing { - let parents = Dictionary(uniqueKeysWithValues: $0.map { ($0.id!, $0) }) - - for (parentId, models) in sets { - guard let parent = parents[parentId] else { - database.logger.debug( - "Missing parent model in eager-load lookup results.", - metadata: ["parent": .string("\(To.self)"), "id": .string("\(parentId)")] - ) - throw FluentError.missingParentError(keyPath: self.relationKey, id: parentId) - } - models.forEach { $0[keyPath: self.relationKey].value = parent } + let result = try await builder.all() + let parents = Dictionary(uniqueKeysWithValues: result.map { ($0.id!, $0) }) + + for (parentId, models) in sets { + guard let parent = parents[parentId] else { + database.logger.debug( + "Missing parent model in eager-load lookup results.", + metadata: ["parent": .string("\(To.self)"), "id": .string("\(parentId)")] + ) + throw FluentError.missingParentError(keyPath: self.relationKey, id: parentId) } + models.forEach { $0[keyPath: self.relationKey].value = parent } } } } diff --git a/Sources/FluentKit/Properties/Siblings.swift b/Sources/FluentKit/Properties/Siblings.swift index be3c160be..14bfd0d0b 100644 --- a/Sources/FluentKit/Properties/Siblings.swift +++ b/Sources/FluentKit/Properties/Siblings.swift @@ -378,6 +378,12 @@ private struct SiblingsEagerLoader: EagerLoader let withDeleted: Bool func run(models: [From], on database: any Database) -> EventLoopFuture { + database.eventLoop.makeFutureWithTask { + try await self.run(models: models, on: database) + } + } + + private func run(models: [From], on database: any Database) async throws { let ids = models.map { $0.id! } let from = From()[keyPath: self.relationKey].from @@ -388,16 +394,15 @@ private struct SiblingsEagerLoader: EagerLoader if self.withDeleted { builder.withDeleted() } - return builder.all().flatMapThrowing { - var map: [From.IDValue: [To]] = [:] - for to in $0 { - let fromID = try to.joined(Through.self)[keyPath: from].id - map[fromID, default: []].append(to) - } - for model in models { - guard let id = model.id else { throw FluentError.idRequired } - model[keyPath: self.relationKey].value = map[id] ?? [] - } + let result = try await builder.all() + var map: [From.IDValue: [To]] = [:] + for to in result { + let fromID = try to.joined(Through.self)[keyPath: from].id + map[fromID, default: []].append(to) + } + for model in models { + guard let id = model.id else { throw FluentError.idRequired } + model[keyPath: self.relationKey].value = map[id] ?? [] } } } diff --git a/Sources/FluentKit/Query/Builder/QueryBuilder+Paginate.swift b/Sources/FluentKit/Query/Builder/QueryBuilder+Paginate.swift index c8af9c200..6620ca09d 100644 --- a/Sources/FluentKit/Query/Builder/QueryBuilder+Paginate.swift +++ b/Sources/FluentKit/Query/Builder/QueryBuilder+Paginate.swift @@ -24,27 +24,10 @@ extension QueryBuilder { /// - Returns: A single `Page` of the result set containing the requested items and page metadata. public func page( withIndex page: Int, - size per: Int) -> EventLoopFuture> { - let trimmedRequest: PageRequest = { - guard let pageSizeLimit = database.context.pageSizeLimit else { - return .init(page: Swift.max(page, 1), per: Swift.max(per, 1)) - } - return .init( - page: Swift.max(page, 1), - per: Swift.max(Swift.min(per, pageSizeLimit), 1) - ) - }() - let count = self.count() - let items = self.copy().range(trimmedRequest.start.. EventLoopFuture> { + self.database.eventLoop.makeFutureWithTask { + try await self.page(withIndex: page, size: per) } } } diff --git a/Sources/FluentKit/Query/Builder/QueryBuilder.swift b/Sources/FluentKit/Query/Builder/QueryBuilder.swift index 998316409..2a3daa8ab 100644 --- a/Sources/FluentKit/Query/Builder/QueryBuilder.swift +++ b/Sources/FluentKit/Query/Builder/QueryBuilder.swift @@ -13,9 +13,9 @@ public final class QueryBuilder public convenience init(database: any Database) { self.init( - query: .init(schema: Model.schema, space: Model.space), + query: .init(schema: Model.schema, space: Model.space, shouldTrace: database.context.shouldTrace), database: database, - models: [Model.self] + models: [Model.self], ) } @@ -25,7 +25,7 @@ public final class QueryBuilder models: [any Schema.Type] = [], eagerLoaders: [any AnyEagerLoader] = [], includeDeleted: Bool = false, - shouldForceDelete: Bool = false + shouldForceDelete: Bool = false, ) { self.query = query self.database = database @@ -309,7 +309,7 @@ public final class QueryBuilder return done } - private func addTimestamps( + func addTimestamps( triggers: [TimestampTrigger], to query: inout DatabaseQuery ) { diff --git a/Sources/FluentKit/Query/Database/DatabaseQuery.swift b/Sources/FluentKit/Query/Database/DatabaseQuery.swift index df44e3082..f34587199 100644 --- a/Sources/FluentKit/Query/Database/DatabaseQuery.swift +++ b/Sources/FluentKit/Query/Database/DatabaseQuery.swift @@ -1,3 +1,5 @@ +import Tracing + public struct DatabaseQuery: Sendable { public var schema: String public var space: String? @@ -12,7 +14,10 @@ public struct DatabaseQuery: Sendable { public var limits: [Limit] public var offsets: [Offset] - init(schema: String, space: String? = nil) { + var serviceContext: ServiceContext + let shouldTrace: Bool + + init(schema: String, space: String? = nil, shouldTrace: Bool = false) { self.schema = schema self.space = space self.isUnique = false @@ -24,7 +29,32 @@ public struct DatabaseQuery: Sendable { self.sorts = [] self.limits = [] self.offsets = [] + self.serviceContext = ServiceContext.current ?? .topLevel + self.shouldTrace = shouldTrace + } + + func withTracing(_ closure: () async throws -> T) async rethrows -> T { + if shouldTrace { + try await withSpan("db.query", context: self.serviceContext, ofKind: .client) { span in + // https://opentelemetry.io/docs/specs/semconv/database/database-spans/#span-definition + // We add what we can. The rest is up to the underlying driver packages + span.updateAttributes { attributes in + // db.system.name + attributes["db.collection.name"] = self.schema + attributes["db.namespace"] = self.space + attributes["db.operation.name"] = "\(self.action)" + // db.response.status_code + // error.type + // server.port + attributes["db.query.summary"] = "\(self.action) \(self.space.map { "\($0)." } ?? "")\(self.schema)" + } + return try await closure() + } + } else { + try await closure() + } } + } extension DatabaseQuery: CustomStringConvertible { @@ -67,7 +97,8 @@ extension DatabaseQuery: CustomStringConvertible { "schema": "\(self.space.map { "\($0)." } ?? "")\(self.schema)", ] switch self.action { - case .create, .update, .custom: result["input"] = .array(self.input.map(\.describedByLoggingMetadata)) + case .create, .update, .custom: + result["input"] = .array(self.input.map(\.describedByLoggingMetadata)) default: break } switch self.action { diff --git a/Tests/FluentKitTests/DummyDatabaseForTestSQLSerializer.swift b/Tests/FluentKitTests/DummyDatabaseForTestSQLSerializer.swift index a03d80f8f..9d037cb64 100644 --- a/Tests/FluentKitTests/DummyDatabaseForTestSQLSerializer.swift +++ b/Tests/FluentKitTests/DummyDatabaseForTestSQLSerializer.swift @@ -57,10 +57,13 @@ final class DummyDatabaseForTestSQLSerializer: Database, SQLDatabase { } init() { + var logger = Logger(label: "test") + logger.logLevel = .debug self.context = .init( configuration: Configuration(), - logger: .init(label: "test"), - eventLoop: NIOSingletons.posixEventLoopGroup.any() + logger: logger, + eventLoop: NIOSingletons.posixEventLoopGroup.any(), + shouldTrace: true ) } diff --git a/Tests/FluentKitTests/TracingTests.swift b/Tests/FluentKitTests/TracingTests.swift new file mode 100644 index 000000000..cef809e9c --- /dev/null +++ b/Tests/FluentKitTests/TracingTests.swift @@ -0,0 +1,147 @@ +import FluentBenchmark +import FluentKit +@testable import Tracing +@testable import Instrumentation +import InMemoryTracing +import Testing +import FluentSQL + +@Suite() +struct TracingTests { + let db = DummyDatabaseForTestSQLSerializer() + + init() { + InstrumentationSystem.bootstrapInternal(TaskLocalTracer()) + } + + @Test("Tracing CRUD", .withTracing(InMemoryTracer())) + func tracingCRUD() async throws { + let planet = Planet() + planet.name = "Pluto" + try await planet.create(on: db) + var span = try #require(tracer.finishedSpans.last) + #expect(span.attributes["db.operation.name"]?.toSpanAttribute() == "create") + #expect(span.attributes["db.query.summary"]?.toSpanAttribute() == "create \(Planet.schema)") + #expect(span.attributes["db.collection.name"]?.toSpanAttribute() == "\(Planet.schema)") + #expect(span.attributes["db.namespace"]?.toSpanAttribute() == nil) + + _ = try await Planet.find(planet.requireID(), on: db) + span = try #require(tracer.finishedSpans.last) + #expect(span.attributes["db.operation.name"]?.toSpanAttribute() == "read") + #expect(span.attributes["db.query.summary"]?.toSpanAttribute() == "read planets") + #expect(span.attributes["db.collection.name"]?.toSpanAttribute() == "\(Planet.schema)") + #expect(span.attributes["db.namespace"]?.toSpanAttribute() == nil) + + planet.name = "Jupiter" + try await planet.update(on: db) + span = try #require(tracer.finishedSpans.last) + #expect(span.attributes["db.operation.name"]?.toSpanAttribute() == "update") + #expect(span.attributes["db.query.summary"]?.toSpanAttribute() == "update \(Planet.schema)") + #expect(span.attributes["db.collection.name"]?.toSpanAttribute() == "\(Planet.schema)") + #expect(span.attributes["db.namespace"]?.toSpanAttribute() == nil) + + try await planet.delete(force: true, on: db) + span = try #require(tracer.finishedSpans.last) + #expect(span.attributes["db.operation.name"]?.toSpanAttribute() == "delete") + #expect(span.attributes["db.query.summary"]?.toSpanAttribute() == "delete \(Planet.schema)") + #expect(span.attributes["db.collection.name"]?.toSpanAttribute() == "\(Planet.schema)") + #expect(span.attributes["db.namespace"]?.toSpanAttribute() == nil) + } + + @Test("Tracing All", .withTracing(InMemoryTracer())) + func tracingFirst() async throws { + _ = try await Planet.query(on: db).all() + let span = try #require(tracer.finishedSpans.last) + #expect(span.attributes["db.operation.name"]?.toSpanAttribute() == "read") + #expect(span.attributes["db.query.summary"]?.toSpanAttribute() == "read \(Planet.schema)") + #expect(span.attributes["db.collection.name"]?.toSpanAttribute() == "\(Planet.schema)") + #expect(span.attributes["db.namespace"]?.toSpanAttribute() == nil) + } + + @Test("Aggregate Tracing", .withTracing(InMemoryTracer())) + func tracingAggregates() async throws { + db.fakedRows.append([.init(["aggregate": 1])]) + _ = try await Planet.query(on: db).count() + let span = try #require(tracer.finishedSpans.last) + #expect(span.attributes["db.operation.name"]?.toSpanAttribute() == "aggregate(count(planets[id]))") + #expect(span.attributes["db.query.summary"]?.toSpanAttribute() == "aggregate(count(planets[id])) planets") + #expect(span.attributes["db.collection.name"]?.toSpanAttribute() == "\(Planet.schema)") + #expect(span.attributes["db.namespace"]?.toSpanAttribute() == nil) + } + + @Test("CRUD Tracing", .withTracing(InMemoryTracer())) + func tracingFindInsertRaw() async throws { + try await Planet(name: "Pluto").create(on: db) + _ = try await Planet.find(UUID(), on: db) + let span = try #require(tracer.finishedSpans.last) + #expect(span.attributes["db.operation.name"]?.toSpanAttribute() == "read") + #expect(span.attributes["db.query.summary"]?.toSpanAttribute() == "read \(Planet.schema)") + #expect(span.attributes["db.collection.name"]?.toSpanAttribute() == "\(Planet.schema)") + #expect(span.attributes["db.namespace"]?.toSpanAttribute() == nil) + } + + @Test("Insert Tracing", .withTracing(InMemoryTracer())) + func tracingInsert() async throws { + let id = UUID() + try await Planet(id: id, name: "Pluto").create(on: db) + let span = try #require(tracer.finishedSpans.last) + #expect(span.attributes["db.operation.name"]?.toSpanAttribute() == "create") + #expect(span.attributes["db.query.summary"]?.toSpanAttribute() == "create \(Planet.schema)") + #expect(span.attributes["db.collection.name"]?.toSpanAttribute() == "\(Planet.schema)") + #expect(span.attributes["db.namespace"]?.toSpanAttribute() == nil) + } + + // @Test + // func tracingRaw() async throws { + // _ = try await self.db.select().columns("*").from(Planet.schema).all(decodingFluent: Planet.self) + // let span = try #require(tracer.finishedSpans.last) + // #expect(span.attributes["db.operation.name"]?.toSpanAttribute() == "read") + // #expect(span.attributes["db.query.summary"]?.toSpanAttribute() == "read \(Planet.schema)") + // #expect(span.attributes["db.collection.name"]?.toSpanAttribute() == "\(Planet.schema)") + // #expect(span.attributes["db.namespace"]?.toSpanAttribute() == nil) + // } +} + +@TaskLocal var tracer = InMemoryTracer() + +struct TracingTaskLocalTrait: TestTrait, SuiteTrait, TestScoping { + fileprivate let implementation: @Sendable (_ body: @Sendable () async throws -> Void) async throws -> Void + + func provideScope(for test: Test, testCase: Test.Case?, performing function: @Sendable @concurrent () async throws -> Void) async throws { + try await implementation { try await function() } + } +} + +extension Trait where Self == TracingTaskLocalTrait { + static func withTracing(_ value: InMemoryTracer) -> Self { + Self { body in + try await $tracer.withValue(value) { + try await body() + } + } + } +} + +struct TaskLocalTracer: Tracer { + typealias Span = InMemoryTracer.Span + + func startSpan(_ operationName: String, context: @autoclosure () -> ServiceContext, ofKind kind: SpanKind, at instant: @autoclosure () -> Instant, function: String, file fileID: String, line: UInt) -> Span where Instant : TracerInstant { + tracer.startSpan(operationName, context: context(), ofKind: kind, at: instant(), function: function, file: fileID, line: line) + } + + func extract( + _ carrier: Carrier, into context: inout ServiceContextModule.ServiceContext, using extractor: Extract + ) where Carrier == Extract.Carrier, Extract: Instrumentation.Extractor { + tracer.extract(carrier, into: &context, using: extractor) + } + + func inject( + _ context: ServiceContextModule.ServiceContext, into carrier: inout Carrier, using injector: Inject + ) where Carrier == Inject.Carrier, Inject: Instrumentation.Injector { + tracer.inject(context, into: &carrier, using: injector) + } + + func forceFlush() { + tracer.forceFlush() + } +}