Skip to content
50 changes: 31 additions & 19 deletions Sources/FluentKit/Concurrency/AsyncModelMiddleware.swift
Original file line number Diff line number Diff line change
Expand Up @@ -11,33 +11,45 @@ public protocol AsyncModelMiddleware: AnyModelMiddleware {
}

extension AsyncModelMiddleware {
public func handle(
func handle(
_ event: ModelEvent,
_ model: any AnyModel,
on db: any Database,
chainingTo next: any AnyModelResponder
) -> EventLoopFuture<Void> {
chainingTo next: any AnyAsyncModelResponder
) async throws {
guard let modelType = (model as? Model) else {
return next.handle(event, model, on: db)
return try await next.handle(event, model, on: db)
}

return db.eventLoop.makeFutureWithTask {
let responder = AsyncBasicModelResponder { responderEvent, responderModel, responderDB in
try await next.handle(responderEvent, responderModel, on: responderDB).get()
}
switch event {
case .create:
try await self.create(model: modelType, on: db, next: next)
case .update:
try await self.update(model: modelType, on: db, next: next)
case .delete(let force):
try await self.delete(model: modelType, force: force, on: db, next: next)
case .softDelete:
try await self.softDelete(model: modelType, on: db, next: next)
case .restore:
try await self.restore(model: modelType, on: db, next: next)
}
}

switch event {
case .create:
try await self.create(model: modelType, on: db, next: responder)
case .update:
try await self.update(model: modelType, on: db, next: responder)
case .delete(let force):
try await self.delete(model: modelType, force: force, on: db, next: responder)
case .softDelete:
try await self.softDelete(model: modelType, on: db, next: responder)
case .restore:
try await self.restore(model: modelType, on: db, next: responder)
public func handle(
_ event: ModelEvent,
_ model: any AnyModel,
on db: any Database,
chainingTo next: any AnyModelResponder
) -> EventLoopFuture<Void> {
db.eventLoop.makeFutureWithTask {
let responder = AnyAsyncBasicModelResponder { responderModel, responderEvent, responderDB in
if let next = (next as? any AnyAsyncModelResponder) {
try await next.handle(responderModel, responderEvent, on: responderDB)
} else {
try await next.handle(responderModel, responderEvent, on: responderDB).get()
}
}
try await self.handle(event, model, on: db, chainingTo: responder)
}
}

Expand Down
26 changes: 23 additions & 3 deletions Sources/FluentKit/Concurrency/Children+Concurrency.swift
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,35 @@ import NIOCore

public extension ChildrenProperty {
func load(on database: any Database) async throws {
try await self.load(on: database).get()
self.value = try await self.query(on: database).all()
}

func create(_ to: To, on database: any Database) async throws {
try await self.create(to, on: database).get()
guard let id = self.idValue else {
fatalError("Cannot save child in relation \(self.name) to unsaved model.")
}
switch self.parentKey {
case .required(let keyPath):
to[keyPath: keyPath].id = id
case .optional(let keyPath):
to[keyPath: keyPath].id = id
}
return try await to.create(on: database)
}

func create(_ to: [To], on database: any Database) async throws {
try await self.create(to, on: database).get()
guard let id = self.idValue else {
fatalError("Cannot save child in relation \(self.name) to unsaved model.")
}
to.forEach {
switch self.parentKey {
case .required(let keyPath):
$0[keyPath: keyPath].id = id
case .optional(let keyPath):
$0[keyPath: keyPath].id = id
}
}
return try await to.create(on: database)
}
}

Expand Down
54 changes: 50 additions & 4 deletions Sources/FluentKit/Concurrency/EnumBuilder+Concurrency.swift
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,64 @@ import NIOCore

public extension EnumBuilder {
func create() async throws -> DatabaseSchema.DataType {
try await self.create().get()
self.enum.action = .create
try await self.database.execute(enum: self.enum)
return try await self.generateDatatype()
}

func read() async throws -> DatabaseSchema.DataType {
try await self.read().get()
try await self.generateDatatype()
}

func update() async throws -> DatabaseSchema.DataType {
try await self.update().get()
self.enum.action = .update
try await self.database.execute(enum: self.enum)
return try await self.generateDatatype()
}

func delete() async throws {
try await self.delete().get()
self.enum.action = .delete
try await self.database.execute(enum: self.enum)
try await self.deleteMetadata()
}

// MARK: Private

func generateDatatype() async throws -> DatabaseSchema.DataType {
try await EnumMetadata.migration.prepare(on: self.database)
try await self.updateMetadata()

let cases = try await EnumMetadata.query(on: self.database).filter(\.$name == self.enum.name).all()

return .enum(.init(
name: self.enum.name,
cases: cases.map { $0.case }
))
}

private func updateMetadata() async throws {
// Create all new enum cases.
try await self.enum.createCases.map {
EnumMetadata(name: self.enum.name, case: $0)
}.create(on: self.database)

// Delete all old enum cases.
try await EnumMetadata.query(on: self.database)
.filter(\.$name == self.enum.name)
.filter(\.$case ~~ self.enum.deleteCases)
.delete()
}

private func deleteMetadata() async throws {
// Delete all cases for this enum.
try await EnumMetadata.query(on: self.database)
.filter(\.$name == self.enum.name)
.delete()
let count = try await EnumMetadata.query(on: self.database).count()

// If no enums are left, remove table.
if count == 0 {
try await EnumMetadata.migration.revert(on: self.database)
}
}
}
166 changes: 158 additions & 8 deletions Sources/FluentKit/Concurrency/Model+Concurrency.swift
Original file line number Diff line number Diff line change
@@ -1,41 +1,191 @@
import NIOCore
import protocol SQLKit.SQLDatabase

public extension Model {
static func find(
_ id: Self.IDValue?,
on database: any Database
) async throws -> Self? {
try await self.find(id, on: database).get()
guard let id = id else { return nil }
return try await Self.query(on: database)
.filter(id: id)
.first()
}

// MARK: - CRUD
func save(on database: any Database) async throws {
try await self.save(on: database).get()
if self._$idExists {
try await self.update(on: database)
} else {
try await self.create(on: database)
}
}

func create(on database: any Database) async throws {
try await self.create(on: database).get()
try await database.configuration.middleware.chainingTo(Self.self) { event, model, db in
try await model.handle(event, on: db)
}.handle(.create, self, on: database)
}

private func _create(on database: any Database) async throws {
precondition(!self._$idExists)
self.touchTimestamps(.create, .update)
if self.anyID is any AnyQueryableProperty {
self.anyID.generate()

nonisolated(unsafe) var output: (any DatabaseOutput)?
try await Self.query(on: database)
.set(self.collectInput(withDefaultedValues: database is any SQLDatabase))
.action(.create)
.run { output = $0 }

var input = self.collectInput()
if case .default = self._$id.inputValue {
let idKey = Self()._$id.key
// In theory, this shouldn't happen, but in case it does in some edge case,
// better to throw an error than crash with an IUO.
guard let output else { throw RunQueryError.noDatabaseOutput }
input[idKey] = try .bind(output.decode(idKey, as: Self.IDValue.self))
}
try self.output(from: SavedInput(input))
} else {
// non-ID case: run async and then perform the decoding step
try await Self.query(on: database)
.set(self.collectInput(withDefaultedValues: database is any SQLDatabase))
.action(.create)
.run()
try self.output(from: SavedInput(self.collectInput()))
}
}

func update(on database: any Database) async throws {
try await self.update(on: database).get()
try await database.configuration.middleware.chainingTo(Self.self) { event, model, db in
try await model.handle(event, on: db)
}.handle(.update, self, on: database)
}

private func _update(on database: any Database) async throws {
precondition(self._$idExists)
guard self.hasChanges else { return }
self.touchTimestamps(.update)
let input = self.collectInput()
guard let id = self.id else { throw FluentError.idRequired }
try await Self.query(on: database)
.filter(id: id)
.set(input)
.update()
try self.output(from: SavedInput(input))
}

func delete(force: Bool = false, on database: any Database) async throws {
try await self.delete(force: force, on: database).get()
if !force, let timestamp = self.deletedTimestamp {
timestamp.touch()
try await database.configuration.middleware.chainingTo(Self.self) { event, model, db in
try await model.handle(event, on: db)
}.handle(.softDelete, self, on: database)
} else {
try await database.configuration.middleware.chainingTo(Self.self) { event, model, db in
try await model.handle(event, on: db)
}.handle(.delete(force), self, on: database)
}
}

private func _delete(force: Bool = false, on database: any Database) async throws {
guard let id = self.id else { throw FluentError.idRequired }
try await Self.query(on: database)
.filter(id: id)
.delete(force: force)
if force || self.deletedTimestamp == nil {
self._$idExists = false
}
}

func restore(on database: any Database) async throws {
try await self.restore(on: database).get()
try await database.configuration.middleware.chainingTo(Self.self) { event, model, db in
try await model.handle(event, on: db)
}.handle(.restore, self, on: database)
}

private func _restore(on database: any Database) async throws {
guard let timestamp = self.timestamps.filter({ $0.trigger == .delete }).first else {
fatalError("no delete timestamp on this model")
}
timestamp.touch(date: nil)
precondition(self._$idExists)
guard let id = self.id else { throw FluentError.idRequired }
let _: Void = try await Self.query(on: database)
.withDeleted()
.filter(id: id)
.set(self.collectInput())
.action(.update)
.run()

try self.output(from: SavedInput(self.collectInput()))
self._$idExists = true
}

func handle(_ event: ModelEvent, on db: any Database) async throws -> Void {
switch event {
case .create:
try await _create(on: db)
case .delete(let force):
try await _delete(force: force, on: db)
case .restore:
try await _restore(on: db)
case .softDelete:
try await _delete(force: false, on: db)
case .update:
try await _update(on: db)
}
}
}

public extension Collection where Element: FluentKit.Model, Self: Sendable {
func delete(force: Bool = false, on database: any Database) async throws {
try await self.delete(force: force, on: database).get()
guard !self.isEmpty else { return }

precondition(self.allSatisfy { $0._$idExists })
for model in self {
try await database.configuration.middleware.chainingTo(Element.self) { event, model, db in
return
}.delete(model, force: force, on: database)
}

try await Element.query(on: database)
.filter(ids: self.map { $0.id! })
.delete(force: force)

guard force else { return }
for model in self where model.deletedTimestamp == nil {
model._$idExists = false
}
}

func create(on database: any Database) async throws {
try await self.create(on: database).get()
guard !self.isEmpty else { return }

precondition(self.allSatisfy { !$0._$idExists })

try await withThrowingTaskGroup(of: Void.self) { group in
for model in self {
group.addTask {
try await database.configuration.middleware.chainingTo(Element.self) { event, model, db in
if model.anyID is any AnyQueryableProperty {
model._$id.generate()
}
model.touchTimestamps(.create, .update)
}.create(model, on: database)
}
}
try await group.waitForAll()
}

try await Element.query(on: database)
.set(self.map { $0.collectInput(withDefaultedValues: database is any SQLDatabase) })
.create()

for model in self {
model._$idExists = true
}
}
}
Loading
Loading