diff --git a/Package.resolved b/Package.resolved index cf6b202..f9eb16d 100644 --- a/Package.resolved +++ b/Package.resolved @@ -6,8 +6,8 @@ "repositoryURL": "https://github.com/swift-server/async-http-client.git", "state": { "branch": null, - "revision": "4b4d6605aa2e4f0c2ae3c7563795ae3bec259fff", - "version": "1.2.1" + "revision": "8fa7f082b155ea325bcf7b2dbffaf81d4eea1ae4", + "version": "1.5.1" } }, { @@ -15,8 +15,8 @@ "repositoryURL": "https://github.com/vapor/async-kit.git", "state": { "branch": null, - "revision": "7457413e57dbfac762b32dd30c1caf2c55a02a3d", - "version": "1.2.0" + "revision": "ae12394e466f7fe44725655e763f7e603ed876b5", + "version": "1.4.0" } }, { @@ -24,8 +24,17 @@ "repositoryURL": "https://github.com/vapor/console-kit.git", "state": { "branch": null, - "revision": "f353ebf73014faf89804b76e51ff8171840b59ba", - "version": "4.2.3" + "revision": "75ea3b627d88221440b878e5dfccc73fd06842ed", + "version": "4.2.7" + } + }, + { + "package": "fluent", + "repositoryURL": "https://github.com/vapor/fluent.git", + "state": { + "branch": null, + "revision": "5810a409eb0271a576f68887fa6713dae3985056", + "version": "4.3.1" } }, { @@ -33,8 +42,71 @@ "repositoryURL": "https://github.com/vapor/fluent-kit.git", "state": { "branch": null, - "revision": "ef2a9a76c48856f0388ee2026d67d2ebde8c571c", - "version": "1.7.3" + "revision": "7d9982a788dc0bbc80937537a0e2b0a42d282202", + "version": "1.15.1" + } + }, + { + "package": "fluent-mysql-driver", + "repositoryURL": "https://github.com/vapor/fluent-mysql-driver.git", + "state": { + "branch": null, + "revision": "699e164880387349bf04c2329fe53097524a5904", + "version": "4.0.0" + } + }, + { + "package": "fluent-postgres-driver", + "repositoryURL": "https://github.com/vapor/fluent-postgres-driver.git", + "state": { + "branch": null, + "revision": "afe159ce915e752ab5f5b76479dc78a17051ce73", + "version": "2.1.3" + } + }, + { + "package": "multipart-kit", + "repositoryURL": "https://github.com/vapor/multipart-kit.git", + "state": { + "branch": null, + "revision": "c9ea04017b7fb3b1f034ad7a77f8e53d3e080be5", + "version": "4.2.1" + } + }, + { + "package": "mysql-kit", + "repositoryURL": "https://github.com/vapor/mysql-kit.git", + "state": { + "branch": null, + "revision": "529bde8ce86c8a2546e7e8856c206f3358dff41e", + "version": "4.2.0" + } + }, + { + "package": "mysql-nio", + "repositoryURL": "https://github.com/vapor/mysql-nio.git", + "state": { + "branch": null, + "revision": "4cbef4c0903c9792ff1f8dc4b55532276fa70c99", + "version": "1.3.2" + } + }, + { + "package": "postgres-kit", + "repositoryURL": "https://github.com/vapor/postgres-kit.git", + "state": { + "branch": null, + "revision": "6f35f3065ef923475553ad6f85660ad971fe6cf3", + "version": "2.3.2" + } + }, + { + "package": "postgres-nio", + "repositoryURL": "https://github.com/vapor/postgres-nio.git", + "state": { + "branch": null, + "revision": "6a9eb6f15235e844c95382959a88253442168a42", + "version": "1.6.1" } }, { @@ -42,8 +114,8 @@ "repositoryURL": "https://github.com/vapor/queues.git", "state": { "branch": null, - "revision": "c562842634aee8c2cb26bf186992f62af1abe6ce", - "version": "1.5.0" + "revision": "a0b96a560647fccba256d7e30c2a761bb54aa979", + "version": "1.6.0" } }, { @@ -51,8 +123,8 @@ "repositoryURL": "https://github.com/vapor/routing-kit.git", "state": { "branch": null, - "revision": "4cf052b78aebaf1b23f2264ce04d57b4b6eb5254", - "version": "4.2.0" + "revision": "a0801a36a6ad501d5ad6285cbcd4774de6b0a734", + "version": "4.3.0" } }, { @@ -60,8 +132,8 @@ "repositoryURL": "https://github.com/vapor/sql-kit.git", "state": { "branch": null, - "revision": "ea9928b7f4a801b175a00b982034d9c54ecb6167", - "version": "3.7.0" + "revision": "b70d1fea1b544dd819c17e83d59fb9aa85c81e74", + "version": "3.10.0" } }, { @@ -69,8 +141,8 @@ "repositoryURL": "https://github.com/swift-server/swift-backtrace.git", "state": { "branch": null, - "revision": "f2fd8c4845a123419c348e0bc4b3839c414077d5", - "version": "1.2.0" + "revision": "d3e04a9d4b3833363fb6192065b763310b156d54", + "version": "1.3.1" } }, { @@ -78,8 +150,8 @@ "repositoryURL": "https://github.com/apple/swift-crypto.git", "state": { "branch": null, - "revision": "9680b7251cd2be22caaed8f1468bd9e8915a62fb", - "version": "1.1.2" + "revision": "3bea268b223651c4ab7b7b9ad62ef9b2d4143eb6", + "version": "1.1.6" } }, { @@ -87,8 +159,8 @@ "repositoryURL": "https://github.com/apple/swift-log.git", "state": { "branch": null, - "revision": "173f567a2dfec11d74588eea82cecea555bdc0bc", - "version": "1.4.0" + "revision": "5d66f7ba25daf4f94100e7022febf3c75e37a6c7", + "version": "1.4.2" } }, { @@ -96,8 +168,8 @@ "repositoryURL": "https://github.com/apple/swift-metrics.git", "state": { "branch": null, - "revision": "708b960b4605abb20bc55d65abf6bad607252200", - "version": "2.0.0" + "revision": "3edd2f57afc4e68e23c3e4956bc8b65ca6b5b2ff", + "version": "2.2.0" } }, { @@ -105,8 +177,8 @@ "repositoryURL": "https://github.com/apple/swift-nio.git", "state": { "branch": null, - "revision": "c3e2359c55cd8b47207ab7363b77c9c398a95294", - "version": "2.23.0" + "revision": "94f41c4121a82fae5c7b1cb03e630e9f9e5e20f1", + "version": "2.32.1" } }, { @@ -114,8 +186,8 @@ "repositoryURL": "https://github.com/apple/swift-nio-extras.git", "state": { "branch": null, - "revision": "e5b5d191a80667a14827bfeb0ae4a511f7677942", - "version": "1.7.0" + "revision": "f72c4688f89c28502105509186eadc49a49cb922", + "version": "1.10.0" } }, { @@ -123,8 +195,8 @@ "repositoryURL": "https://github.com/apple/swift-nio-http2.git", "state": { "branch": null, - "revision": "78ddbdfca10f64e4399da37c63372fd8db232152", - "version": "1.15.0" + "revision": "42bdcae4ac4913507a5ee7af963c559deb60d1fc", + "version": "1.18.2" } }, { @@ -132,8 +204,8 @@ "repositoryURL": "https://github.com/apple/swift-nio-ssl.git", "state": { "branch": null, - "revision": "cc0e80dbccf8c89c1f49c3a11c5929575035076c", - "version": "2.9.2" + "revision": "4829979d9f5ed9a2f4c6efd9c1ed51d1ab4d0394", + "version": "2.14.1" } }, { @@ -141,8 +213,8 @@ "repositoryURL": "https://github.com/apple/swift-nio-transport-services.git", "state": { "branch": null, - "revision": "bb56586c4cab9a79dce6ec4738baddb5802c5de7", - "version": "1.9.0" + "revision": "9571a61d236c5253b6a255a2d13fac536a1e2625", + "version": "1.11.2" } }, { @@ -150,8 +222,8 @@ "repositoryURL": "https://github.com/vapor/vapor.git", "state": { "branch": null, - "revision": "29f5370b119593083674a4f7e6795ac2f4d884ef", - "version": "4.32.0" + "revision": "5113bfcb0dbcfaefdc043acd8e161046eb4fbb65", + "version": "4.48.4" } }, { @@ -159,8 +231,8 @@ "repositoryURL": "https://github.com/vapor/websocket-kit.git", "state": { "branch": null, - "revision": "b0736014be634475dac4c23843811257d86dcdc1", - "version": "2.1.1" + "revision": "d7537b787d37f3877e36b5f1e7cb910f7c401836", + "version": "2.1.4" } } ] diff --git a/Package.swift b/Package.swift index 7f10a61..288aad5 100644 --- a/Package.swift +++ b/Package.swift @@ -13,15 +13,17 @@ let package = Package( ], dependencies: [ .package(url: "https://github.com/vapor/queues.git", from: "1.5.0"), - .package(url: "https://github.com/vapor/fluent-kit.git", from: "1.7.0"), - .package(url: "https://github.com/vapor/sql-kit.git", from: "3.7.0") + .package(url: "https://github.com/vapor/fluent.git", from: "4.0.0"), + .package(url: "https://github.com/vapor/fluent-mysql-driver.git", from: "4.0.0"), + .package(url: "https://github.com/vapor/fluent-postgres-driver.git", from: "2.0.0") ], targets: [ .target( name: "QueuesDatabaseHooks", dependencies: [ - .product(name: "FluentKit", package: "fluent-kit"), - .product(name: "SQLKit", package: "sql-kit"), + .product(name: "Fluent", package: "fluent"), + .product(name: "FluentPostgresDriver", package: "fluent-postgres-driver"), + .product(name: "FluentMySQLDriver", package: "fluent-mysql-driver"), .product(name: "Queues", package: "queues") ]), .testTarget( diff --git a/Sources/QueuesDatabaseHooks/Model/CompletedJobStatusResponse.swift b/Sources/QueuesDatabaseHooks/Model/CompletedJobStatusResponse.swift new file mode 100644 index 0000000..4d4ee8b --- /dev/null +++ b/Sources/QueuesDatabaseHooks/Model/CompletedJobStatusResponse.swift @@ -0,0 +1,18 @@ +// +// CompletedJobStatusResponse.swift +// +// +// Created by lgriffie on 05/09/2021. +// + +import Foundation +import Vapor + +/// Data about jobs that have run successfully over a time period +public struct CompletedJobStatusResponse: Content { + /// The number of jobs that completed successfully + public let completedJobs: Int + + /// The percent of jobs (out of all jobs run in the time period) that ran successfully + public let percentSuccess: Double +} diff --git a/Sources/QueuesDatabaseHooks/Model/CurrentJobsStatusResponse.swift b/Sources/QueuesDatabaseHooks/Model/CurrentJobsStatusResponse.swift new file mode 100644 index 0000000..48d1b44 --- /dev/null +++ b/Sources/QueuesDatabaseHooks/Model/CurrentJobsStatusResponse.swift @@ -0,0 +1,18 @@ +// +// CurrentJobsStatusResponse.swift +// +// +// Created by lgriffie on 05/09/2021. +// + +import Foundation +import Vapor + +/// Data about jobs currently queued or running +public struct CurrentJobsStatusResponse: Content { + /// The number of queueud jobs currently waiting to be run + public let queuedCount: Int + + /// The number of jobs currently running + public let runningCount: Int +} diff --git a/Sources/QueuesDatabaseHooks/Model/JobsTimingResponse.swift b/Sources/QueuesDatabaseHooks/Model/JobsTimingResponse.swift new file mode 100644 index 0000000..cfe0d3d --- /dev/null +++ b/Sources/QueuesDatabaseHooks/Model/JobsTimingResponse.swift @@ -0,0 +1,18 @@ +// +// JobsTimingResponse.swift +// +// +// Created by lgriffie on 05/09/2021. +// + +import Foundation +import Vapor + +/// Data about how long jobs are taking to run +public struct JobsTimingResponse: Content { + /// The average time spent running a job + public let avgRunTime: Double? + + /// The average time jobs spent waiting to be processed + public let avgWaitTime: Double? +} diff --git a/Sources/QueuesDatabaseHooks/Query/MySqlQuery.swift b/Sources/QueuesDatabaseHooks/Query/MySqlQuery.swift new file mode 100644 index 0000000..8f20773 --- /dev/null +++ b/Sources/QueuesDatabaseHooks/Query/MySqlQuery.swift @@ -0,0 +1,82 @@ +// +// MySqlQuery.swift +// +// +// Created by lgriffie on 05/09/2021. +// + +import Foundation +import FluentKit +import SQLKit +import Vapor + +public class MySqlQuery { + internal static func getStatusOfCurrentJobsQuery() -> SQLQueryString { + """ + SELECT + COALESCE( + SUM( + CASE status + WHEN 0 THEN + 1 + ELSE + 0 + END + ) + , 0) as "queuedCount", + COALESCE( + SUM( + CASE status + WHEN 1 THEN + 1 + ELSE + 0 + END + ) + , 0) as "runningCount" + FROM + _queue_job_completions + """ + } + + internal static func getCompletedJobsForTimePeriodQuery(hours: Int) -> SQLQueryString { + """ + SELECT + COUNT(*) as "completedJobs", + COALESCE(SUM( + CASE status + WHEN 2 THEN + 1 + ELSE + 0 + END) / count(*), 1) as "percentSuccess" + FROM + _queue_job_completions + WHERE + completedAt IS NOT NULL + AND completedAt >= DATE_SUB(now(), interval \(raw: "\(hours)") hour) + """ + } + + internal static func getTimingDataForJobsQuery(hours: Int, jobName: String? = nil) -> SQLQueryString { + let jobFilterString: SQLQueryString + if let jobName = jobName { + jobFilterString = "AND \"jobName\" = \(raw: jobName)" + } else { + jobFilterString = "" + } + + return """ + SELECT + avg(TIMESTAMPDIFF(second, dequeuedAt, completedAt)) as "avgRunTime", + avg(TIMESTAMPDIFF(second, queuedAt, dequeuedAt)) as "avgWaitTime" + FROM + _queue_job_completions + WHERE + completedAt IS NOT NULL + AND dequeuedAt is not null + AND completedAt >= DATE_SUB(now(), interval \(raw: "\(hours)") hour) + \(jobFilterString) + """ + } +} diff --git a/Sources/QueuesDatabaseHooks/Query/PostgresSqlQuery.swift b/Sources/QueuesDatabaseHooks/Query/PostgresSqlQuery.swift new file mode 100644 index 0000000..fdad96b --- /dev/null +++ b/Sources/QueuesDatabaseHooks/Query/PostgresSqlQuery.swift @@ -0,0 +1,82 @@ +// +// PostgresSqlQuery.swift +// +// +// Created by lgriffie on 05/09/2021. +// + +import Foundation +import FluentKit +import SQLKit +import Vapor + +public class PostgresSqlQuery { + internal static func getStatusOfCurrentJobsQuery() -> SQLQueryString { + """ + SELECT + COALESCE( + SUM( + CASE status + WHEN 0::char THEN + 1 + ELSE + 0 + END + ) + , 0) as "queuedCount", + COALESCE( + SUM( + CASE status + WHEN 1::char THEN + 1 + ELSE + 0 + END + ) + , 0) as "runningCount" + FROM + _queue_job_completions + """ + } + + internal static func getCompletedJobsForTimePeriodQuery(hours: Int) -> SQLQueryString { + """ + SELECT + COUNT(*) as "completedJobs", + COALESCE(SUM( + CASE status + WHEN 2::char THEN + 1 + ELSE + 0 + END) / count(*), 1)::FLOAT as "percentSuccess" + FROM + _queue_job_completions + WHERE + "completedAt" IS NOT NULL + AND "completedAt" >= (NOW() - '\(raw: "\(hours)") HOURS'::INTERVAL) + """ + } + + internal static func getTimingDataForJobsQuery(hours: Int, jobName: String? = nil) -> SQLQueryString { + let jobFilterString: SQLQueryString + if let jobName = jobName { + jobFilterString = "AND \"jobName\" = \(raw: jobName)" + } else { + jobFilterString = "" + } + + return """ + SELECT + avg(EXTRACT(EPOCH FROM ("dequeuedAt" - "completedAt"))) as "avgRunTime", + avg(EXTRACT(EPOCH FROM ("queuedAt" - "dequeuedAt"))) as "avgWaitTime" + FROM + _queue_job_completions + WHERE + "completedAt" IS NOT NULL + AND "dequeuedAt" is not null + AND "completedAt" >= (NOW() - '\(raw: "\(hours)") HOURS'::INTERVAL) + \(jobFilterString) + """ + } +} diff --git a/Sources/QueuesDatabaseHooks/Query/QueryFactory.swift b/Sources/QueuesDatabaseHooks/Query/QueryFactory.swift new file mode 100644 index 0000000..3a16e21 --- /dev/null +++ b/Sources/QueuesDatabaseHooks/Query/QueryFactory.swift @@ -0,0 +1,40 @@ +// +// QueryFactory.swift +// +// +// Created by lgriffie on 05/09/2021. +// + +import Foundation +import FluentMySQLDriver +import FluentPostgresDriver + +public class QueryFactory { + enum QueryFactoryError: Error { + case databaseNotSupported + } + + internal static func getStatusOfCurrentJobsQuery(_ db: Database) throws -> SQLQueryString { + switch db { + case is PostgresDatabase: return PostgresSqlQuery.getStatusOfCurrentJobsQuery() + case is MySQLDatabase: return MySqlQuery.getStatusOfCurrentJobsQuery() + default: throw QueryFactoryError.databaseNotSupported + } + } + + internal static func getCompletedJobsForTimePeriodQuery(_ db: Database, hours: Int) throws -> SQLQueryString { + switch db { + case is PostgresDatabase: return PostgresSqlQuery.getCompletedJobsForTimePeriodQuery(hours: hours) + case is MySQLDatabase: return MySqlQuery.getCompletedJobsForTimePeriodQuery(hours: hours) + default: throw QueryFactoryError.databaseNotSupported + } + } + + internal static func getTimingDataForJobsQuery(_ db: Database, hours: Int, jobName: String? = nil) throws -> SQLQueryString { + switch db { + case is PostgresDatabase: return PostgresSqlQuery.getTimingDataForJobsQuery(hours: hours, jobName: jobName) + case is MySQLDatabase: return MySqlQuery.getTimingDataForJobsQuery(hours: hours, jobName: jobName) + default: throw QueryFactoryError.databaseNotSupported + } + } +} diff --git a/Sources/QueuesDatabaseHooks/QueueDatabaseEntry+Query.swift b/Sources/QueuesDatabaseHooks/QueueDatabaseEntry+Query.swift index 9504d96..56a5e99 100644 --- a/Sources/QueuesDatabaseHooks/QueueDatabaseEntry+Query.swift +++ b/Sources/QueuesDatabaseHooks/QueueDatabaseEntry+Query.swift @@ -1,5 +1,7 @@ import Foundation import FluentKit +import FluentMySQLDriver +import FluentPostgresDriver import SQLKit import Vapor @@ -11,33 +13,12 @@ public extension QueueDatabaseEntry { static func getStatusOfCurrentJobs(db: Database) -> EventLoopFuture { guard let sqlDb = db as? SQLDatabase else { return db.eventLoop.future(error: Abort(.badRequest, reason: "Only SQL Databases Supported")) } - let query: SQLQueryString = """ - SELECT - COALESCE( - SUM( - CASE status - WHEN 0 THEN - 1 - ELSE - 0 - END - ) - , 0) as "queuedCount", - COALESCE( - SUM( - CASE status - WHEN 1 THEN - 1 - ELSE - 0 - END - ) - , 0) as "runningCount" - FROM - _queue_job_completions - """ - - return sqlDb.raw(query).first(decoding: CurrentJobsStatusResponse.self).unwrap(or: Abort(.badRequest, reason: "Could not get data for status")) + do { + let query = try QueryFactory.getStatusOfCurrentJobsQuery(db) + return sqlDb.raw(query).first(decoding: CurrentJobsStatusResponse.self).unwrap(or: Abort(.badRequest, reason: "Could not get data for status")) + } catch { + return sqlDb.eventLoop.future(error: Abort(.badRequest, reason: "Only Postgres or MySql Databases Supported")) + } } /// Retrieves data about jobs that ran successfully over the specified time period @@ -48,24 +29,12 @@ public extension QueueDatabaseEntry { static func getCompletedJobsForTimePeriod(db: Database, hours: Int) -> EventLoopFuture { guard let sqlDb = db as? SQLDatabase else { return db.eventLoop.future(error: Abort(.badRequest, reason: "Only SQL Databases Supported")) } - let query: SQLQueryString = """ - SELECT - COUNT(*) as "completedJobs", - COALESCE(SUM( - CASE status - WHEN 2 THEN - 1 - ELSE - 0 - END) / count(*), 1) as "percentSuccess" - FROM - _queue_job_completions - WHERE - completedAt IS NOT NULL - AND completedAt >= DATE_SUB(now(), interval \(raw: "\(hours)") hour) - """ - - return sqlDb.raw(query).first(decoding: CompletedJobStatusResponse.self).unwrap(or: Abort(.badRequest, reason: "Could not get data for status")) + do { + let query = try QueryFactory.getCompletedJobsForTimePeriodQuery(db, hours: hours) + return sqlDb.raw(query).first(decoding: CompletedJobStatusResponse.self).unwrap(or: Abort(.badRequest, reason: "Could not get data for status")) + } catch { + return sqlDb.eventLoop.future(error: Abort(.badRequest, reason: "Only Postgres or MySql Databases Supported")) + } } /// Retrieves data about the how quickly jobs ran and how long they waited to be run @@ -77,54 +46,11 @@ public extension QueueDatabaseEntry { static func getTimingDataForJobs(db: Database, hours: Int, jobName: String? = nil) -> EventLoopFuture { guard let sqlDb = db as? SQLDatabase else { return db.eventLoop.future(error: Abort(.badRequest, reason: "Only SQL Databases Supported")) } - let jobFilterString: SQLQueryString - if let jobName = jobName { - jobFilterString = "AND jobName = \(raw: jobName)" - } else { - jobFilterString = "" + do { + let query = try QueryFactory.getTimingDataForJobsQuery(db, hours: hours, jobName: jobName) + return sqlDb.raw(query).first(decoding: JobsTimingResponse.self).unwrap(or: Abort(.badRequest, reason: "Could not get data for status")) + } catch { + return sqlDb.eventLoop.future(error: Abort(.badRequest, reason: "Only Postgres or MySql Databases Supported")) } - - let query: SQLQueryString = """ - SELECT - avg(TIMESTAMPDIFF(second, dequeuedAt, completedAt)) as "avgRunTime", - avg(TIMESTAMPDIFF(second, queuedAt, dequeuedAt)) as "avgWaitTime" - FROM - _queue_job_completions - WHERE - completedAt IS NOT NULL - AND dequeuedAt is not null - AND completedAt >= DATE_SUB(now(), interval \(raw: "\(hours)") hour) - \(jobFilterString) - """ - - return sqlDb.raw(query).first(decoding: JobsTimingResponse.self).unwrap(or: Abort(.badRequest, reason: "Could not get data for status")) } } - -/// Data about jobs currently queued or running -public struct CurrentJobsStatusResponse: Content { - /// The number of queueud jobs currently waiting to be run - public let queuedCount: Int - - /// The number of jobs currently running - public let runningCount: Int -} - -/// Data about jobs that have run successfully over a time period -public struct CompletedJobStatusResponse: Content { - /// The number of jobs that completed successfully - public let completedJobs: Int - - /// The percent of jobs (out of all jobs run in the time period) that ran successfully - public let percentSuccess: Double -} - -/// Data about how long jobs are taking to run -public struct JobsTimingResponse: Content { - - /// The average time spent running a job - public let avgRunTime: Double? - - /// The average time jobs spent waiting to be processed - public let avgWaitTime: Double? -} diff --git a/Sources/QueuesDatabaseHooks/QueueDatabaseEntry.swift b/Sources/QueuesDatabaseHooks/QueueDatabaseEntry.swift index bf3f07a..1acc0dc 100644 --- a/Sources/QueuesDatabaseHooks/QueueDatabaseEntry.swift +++ b/Sources/QueuesDatabaseHooks/QueueDatabaseEntry.swift @@ -77,17 +77,18 @@ public final class QueueDatabaseEntry: Model { public init() { } - public init(jobId: String, - jobName: String, - queueName: String, - payload: Data, - maxRetryCount: Int, - delayUntil: Date?, - queuedAt: Date, - dequeuedAt: Date?, - completedAt: Date?, - errorString: String?, - status: Status + public init( + jobId: String, + jobName: String, + queueName: String, + payload: Data, + maxRetryCount: Int, + delayUntil: Date?, + queuedAt: Date, + dequeuedAt: Date?, + completedAt: Date?, + errorString: String?, + status: Status ) { self.jobId = jobId self.jobName = jobName @@ -113,7 +114,7 @@ public struct QueueDatabaseEntryMigration: Migration { .field("jobId", .string, .required) .field("jobName", .string, .required) .field("queueName", .string, .required) - .field("payload", .json, .required) + .field("payload", .data, .required) .field("maxRetryCount", .int, .required) .field("delayUntil", .datetime) .field("queuedAt", .datetime, .required)