diff --git a/orca-queue-redis/orca-queue-redis.gradle b/orca-queue-redis/orca-queue-redis.gradle index e105d18718..5843210521 100644 --- a/orca-queue-redis/orca-queue-redis.gradle +++ b/orca-queue-redis/orca-queue-redis.gradle @@ -28,6 +28,7 @@ dependencies { implementation("com.netflix.spinnaker.keiko:keiko-redis-spring:$keikoVersion") implementation("com.fasterxml.jackson.module:jackson-module-kotlin") + testImplementation(project(":orca-queue")) testImplementation(project(":orca-queue-tck")) testImplementation(project(":orca-test-kotlin")) testImplementation(project(":orca-test-redis")) diff --git a/orca-queue-redis/src/main/kotlin/com/netflix/spinnaker/config/RedisOrcaQueueConfiguration.kt b/orca-queue-redis/src/main/kotlin/com/netflix/spinnaker/config/RedisOrcaQueueConfiguration.kt index a8ed865813..ee7420b61b 100644 --- a/orca-queue-redis/src/main/kotlin/com/netflix/spinnaker/config/RedisOrcaQueueConfiguration.kt +++ b/orca-queue-redis/src/main/kotlin/com/netflix/spinnaker/config/RedisOrcaQueueConfiguration.kt @@ -21,6 +21,7 @@ import com.fasterxml.jackson.databind.module.SimpleModule import com.fasterxml.jackson.module.kotlin.KotlinModule import com.netflix.spinnaker.orca.TaskResolver import com.netflix.spinnaker.orca.pipeline.model.Execution.ExecutionType +import com.netflix.spinnaker.orca.q.pending.PendingExecutionService import com.netflix.spinnaker.orca.q.redis.migration.ExecutionTypeDeserializer import com.netflix.spinnaker.orca.q.redis.migration.OrcaToKeikoSerializationMigrator import com.netflix.spinnaker.orca.q.redis.migration.TaskTypeDeserializer @@ -31,6 +32,7 @@ import com.netflix.spinnaker.q.redis.RedisDeadMessageHandler import com.netflix.spinnaker.q.redis.RedisQueue import org.springframework.beans.factory.annotation.Autowired import org.springframework.beans.factory.annotation.Qualifier +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean import org.springframework.boot.context.properties.EnableConfigurationProperties import org.springframework.context.annotation.Bean import org.springframework.context.annotation.Configuration @@ -80,6 +82,7 @@ class RedisOrcaQueueConfiguration : RedisQueueConfiguration() { } @Bean + @ConditionalOnMissingBean(PendingExecutionService::class) fun pendingExecutionService( @Qualifier("queueRedisPool") jedisPool: Pool, mapper: ObjectMapper diff --git a/orca-queue-redis/src/test/kotlin/com/netflix/spinnaker/orca/q/redis/pending/RedisPendingExecutionServiceTest.kt b/orca-queue-redis/src/test/kotlin/com/netflix/spinnaker/orca/q/redis/pending/RedisPendingExecutionServiceTest.kt index ca56413807..b25b4ebeaf 100644 --- a/orca-queue-redis/src/test/kotlin/com/netflix/spinnaker/orca/q/redis/pending/RedisPendingExecutionServiceTest.kt +++ b/orca-queue-redis/src/test/kotlin/com/netflix/spinnaker/orca/q/redis/pending/RedisPendingExecutionServiceTest.kt @@ -1,187 +1,27 @@ -/* - * Copyright 2018 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License") - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - package com.netflix.spinnaker.orca.q.redis.pending import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.module.kotlin.KotlinModule import com.netflix.spinnaker.kork.jedis.EmbeddedRedis -import com.netflix.spinnaker.orca.fixture.pipeline -import com.netflix.spinnaker.orca.fixture.stage +import com.netflix.spinnaker.orca.q.PendingExecutionServiceTest import com.netflix.spinnaker.orca.q.RestartStage import com.netflix.spinnaker.orca.q.StartExecution -import com.netflix.spinnaker.q.Message -import com.nhaarman.mockito_kotlin.mock -import com.nhaarman.mockito_kotlin.verify -import org.assertj.core.api.Assertions.assertThat -import org.jetbrains.spek.api.Spek -import org.jetbrains.spek.api.dsl.describe -import org.jetbrains.spek.api.dsl.given -import org.jetbrains.spek.api.dsl.it -import org.jetbrains.spek.api.dsl.on -import java.util.UUID +import org.jetbrains.spek.subject.SubjectSpek +import org.jetbrains.spek.subject.itBehavesLike + +internal object RedisPendingExecutionServiceTest : SubjectSpek ({ -internal object RedisPendingExecutionServiceTest : Spek({ + itBehavesLike(PendingExecutionServiceTest) - lateinit var redis: EmbeddedRedis + val redis = EmbeddedRedis.embed() val mapper = ObjectMapper().apply { registerModule(KotlinModule()) registerSubtypes(StartExecution::class.java, RestartStage::class.java) } - lateinit var subject: RedisPendingExecutionService - beforeGroup { - redis = EmbeddedRedis.embed() - subject = RedisPendingExecutionService(redis.pool, mapper) - } + subject { RedisPendingExecutionService(redis.pool, mapper) } afterGroup { redis.destroy() } - - fun flushAll() { - redis.pool.resource.use { it.flushAll() } - } - - val id = UUID.randomUUID().toString() - val pipeline = pipeline { - pipelineConfigId = id - stage { - refId = "1" - } - stage { - refId = "2" - requisiteStageRefIds = setOf("1") - } - } - val startMessage = StartExecution(pipeline) - val restartMessage = RestartStage(pipeline.stageByRef("2"), "fzlem@netflix.com") - - sequenceOf(startMessage, restartMessage).forEach { message -> - describe("enqueueing a ${message.javaClass.simpleName} message") { - given("the queue is empty") { - beforeGroup { - assertThat(subject.depth(id)).isZero() - } - - on("enqueueing the message") { - subject.enqueue(id, message) - - it("makes the depth 1") { - assertThat(subject.depth(id)).isOne() - } - } - - afterGroup(::flushAll) - } - } - } - - describe("popping a message") { - given("the queue is empty") { - beforeGroup { - assertThat(subject.depth(id)).isZero() - } - - on("popping a message") { - val popped = subject.popOldest(id) - - it("returns null") { - assertThat(popped).isNull() - } - } - } - - given("a message was enqueued") { - beforeGroup { - subject.enqueue(id, startMessage) - } - - on("popping a message") { - val popped = subject.popOldest(id) - - it("returns the message") { - assertThat(popped).isEqualTo(startMessage) - } - - it("removes the message from the queue") { - assertThat(subject.depth(id)).isZero() - } - } - - afterGroup(::flushAll) - } - - given("multiple messages were enqueued") { - beforeEachTest { - subject.enqueue(id, startMessage) - subject.enqueue(id, restartMessage) - } - - on("popping the oldest message") { - val popped = subject.popOldest(id) - - it("returns the oldest message") { - assertThat(popped).isEqualTo(startMessage) - } - - it("removes the message from the queue") { - assertThat(subject.depth(id)).isOne() - } - } - - on("popping the newest message") { - val popped = subject.popNewest(id) - - it("returns the newest message") { - assertThat(popped).isEqualTo(restartMessage) - } - - it("removes the message from the queue") { - assertThat(subject.depth(id)).isOne() - } - } - - afterEachTest(::flushAll) - } - } - - describe("purging the queue") { - val callback = mock<(Message) -> Unit>() - - given("there are some messages on the queue") { - beforeGroup { - subject.enqueue(id, startMessage) - subject.enqueue(id, restartMessage) - } - - on("purging the queue") { - subject.purge(id, callback) - - it("makes the queue empty") { - assertThat(subject.depth(id)).isZero() - } - - it("invokes the callback passing each message") { - verify(callback).invoke(startMessage) - verify(callback).invoke(restartMessage) - } - } - - afterGroup(::flushAll) - } - } }) diff --git a/orca-queue-sql/orca-queue-sql.gradle b/orca-queue-sql/orca-queue-sql.gradle new file mode 100644 index 0000000000..c3195c0c45 --- /dev/null +++ b/orca-queue-sql/orca-queue-sql.gradle @@ -0,0 +1,21 @@ +apply from: "$rootDir/gradle/kotlin.gradle" +apply from: "$rootDir/gradle/spock.gradle" +apply from: "$rootDir/gradle/spek.gradle" + + +dependencies { + implementation(project(":orca-core")) + implementation(project(":orca-queue")) + implementation(project(":orca-sql")) + implementation("com.netflix.spinnaker.keiko:keiko-redis-spring:$keikoVersion") + implementation("com.fasterxml.jackson.module:jackson-module-kotlin") + + implementation("org.jooq:jooq") + + testImplementation("com.netflix.spinnaker.kork:kork-sql-test") + testImplementation(project(":orca-queue-tck")) + testImplementation(project(":orca-sql")) + testImplementation("org.testcontainers:mysql") + + testRuntimeOnly("mysql:mysql-connector-java") +} diff --git a/orca-queue-sql/src/main/kotlin/com/netflix/spinnaker/config/SqlPendingExecutionConfiguration.kt b/orca-queue-sql/src/main/kotlin/com/netflix/spinnaker/config/SqlPendingExecutionConfiguration.kt new file mode 100644 index 0000000000..f761113a2c --- /dev/null +++ b/orca-queue-sql/src/main/kotlin/com/netflix/spinnaker/config/SqlPendingExecutionConfiguration.kt @@ -0,0 +1,46 @@ +package com.netflix.spinnaker.config + +import com.fasterxml.jackson.databind.ObjectMapper +import com.netflix.spectator.api.Registry +import com.netflix.spinnaker.orca.pipeline.persistence.ExecutionRepository +import com.netflix.spinnaker.orca.q.sql.pending.SqlPendingExecutionService +import com.netflix.spinnaker.q.Queue +import org.jooq.DSLContext +import org.springframework.beans.factory.annotation.Qualifier +import org.springframework.beans.factory.annotation.Value +import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression +import org.springframework.boot.context.properties.EnableConfigurationProperties +import org.springframework.context.annotation.Bean +import org.springframework.context.annotation.Configuration +import java.time.Clock + +@Configuration +@EnableConfigurationProperties(SqlProperties::class) +class SqlPendingExecutionConfiguration { + + @Bean + @ConditionalOnExpression( + "\${execution-repository.sql.enabled:false} && \${queue.sql.pending-execution-service.enabled:false}" + ) + fun pendingExecutionService( + jooq: DSLContext, + queue: Queue, + repository: ExecutionRepository, + @Qualifier("redisQueueObjectMapper") mapper: ObjectMapper, + clock: Clock, + registry: Registry, + properties: SqlProperties, + @Value("\${queue.pending.max-depth:50}") maxDepth: Int + ) = + SqlPendingExecutionService( + properties.partitionName, + jooq, + queue, + repository, + mapper, + clock, + registry, + properties.transactionRetry, + maxDepth + ) +} diff --git a/orca-queue-sql/src/main/kotlin/com/netflix/spinnaker/orca/q/sql/pending/SqlPendingExecutionService.kt b/orca-queue-sql/src/main/kotlin/com/netflix/spinnaker/orca/q/sql/pending/SqlPendingExecutionService.kt new file mode 100644 index 0000000000..4dcb3d4674 --- /dev/null +++ b/orca-queue-sql/src/main/kotlin/com/netflix/spinnaker/orca/q/sql/pending/SqlPendingExecutionService.kt @@ -0,0 +1,196 @@ +package com.netflix.spinnaker.orca.q.sql.pending + +import com.fasterxml.jackson.databind.ObjectMapper +import com.netflix.spectator.api.Registry +import com.netflix.spectator.api.histogram.PercentileTimer +import com.netflix.spinnaker.config.TransactionRetryProperties +import com.netflix.spinnaker.kork.core.RetrySupport +import com.netflix.spinnaker.orca.pipeline.model.Execution.ExecutionType.PIPELINE +import com.netflix.spinnaker.orca.pipeline.persistence.ExecutionNotFoundException +import com.netflix.spinnaker.orca.pipeline.persistence.ExecutionRepository +import com.netflix.spinnaker.orca.q.StartExecution +import com.netflix.spinnaker.orca.q.StartWaitingExecutions +import com.netflix.spinnaker.orca.q.pending.PendingExecutionService +import com.netflix.spinnaker.q.Message +import com.netflix.spinnaker.q.Queue +import de.huxhorn.sulky.ulid.ULID +import org.jooq.DSLContext +import org.jooq.SortField +import org.jooq.SortOrder +import org.jooq.impl.DSL +import org.slf4j.Logger +import org.slf4j.LoggerFactory +import java.lang.Exception +import java.time.Clock +import java.time.Duration +import java.util.concurrent.TimeUnit + +class SqlPendingExecutionService( + private val shard: String?, + private val jooq: DSLContext, + private val queue: Queue, + private val repository: ExecutionRepository, + private val mapper: ObjectMapper, + private val clock: Clock, + private val registry: Registry, + private val retryProperties: TransactionRetryProperties, + private val maxDepth: Int +) : PendingExecutionService { + + companion object { + private val pendingTable = DSL.table("pending_executions") + private val configField = DSL.field("config_id") + private val idField = DSL.field("id") + private val messageField = DSL.field("message") + private val shardField = DSL.field("shard") + + private val retrySupport = RetrySupport() + private val log: Logger = LoggerFactory.getLogger(SqlPendingExecutionService::class.java) + } + + private val enqueueId = registry.createId("queue.pending.enqueue") + private val cancelId = registry.createId("queue.pending.cancelled") + private val popId = registry.createId("queue.pending.pop") + + private var shardCondition = if (shard.isNullOrBlank()) { + shardField.isNull + } else { + shardField.eq(shard) + } + + override fun enqueue(pipelineConfigId: String, message: Message) { + PercentileTimer.get(registry, enqueueId) + .record { + doEnqueue(pipelineConfigId, message) + } + } + + private fun doEnqueue(pipelineConfigId: String, message: Message) { + try { + val queued = depth(pipelineConfigId) + if (queued >= maxDepth) { + /** + * If dropping a StartExecution message, actively cancel the execution so it won't be left in + * NOT_STARTED without a message on the active or pending queue. + * + * Other message types can be safely dropped. + */ + if (message is StartExecution) { + log.warn("Canceling execution ${message.executionId} for pipeline $pipelineConfigId due to pending " + + "depth of $queued executions") + registry.counter(cancelId).increment() + + try { + val execution = repository.retrieve(PIPELINE, message.executionId) + .apply { + isCanceled = true + canceledBy = "spinnaker" + cancellationReason = "Too many pending executions ($queued) for pipelineId" + } + repository.store(execution) + } catch (e: ExecutionNotFoundException) { + log.error("Failed to retrieve execution ${message.executionId} for pipeline $pipelineConfigId") + } + } else { + log.warn("Dropping pending message for pipeline $pipelineConfigId due to pending execution depth of $queued") + } + return + } + + withRetry { + jooq.insertInto(pendingTable) + .columns(idField, configField, shardField, messageField) + .values(ULID().nextValue().toString(), pipelineConfigId, shard, mapper.writeValueAsString(message)) + .execute() + } + } catch (e: Exception) { + log.error("Failed to enqueue pending execution for pipeline $pipelineConfigId") + throw e // back to StartExecutionHandler, we may want to requeue the StartExecution message + } + } + + override fun popOldest(pipelineConfigId: String): Message? { + val start = clock.millis() + val message = pop(pipelineConfigId, idField.asc()) + + PercentileTimer.get(registry, popId.withTag("purge", "false")) + .record(clock.millis() - start, TimeUnit.MILLISECONDS) + + return message + } + + override fun popNewest(pipelineConfigId: String): Message? { + val start = clock.millis() + val message = pop(pipelineConfigId, idField.desc()) + + PercentileTimer.get(registry, popId.withTag("purge", "true")) + .record(clock.millis() - start, TimeUnit.MILLISECONDS) + + return message + } + + private fun pop(pipelineConfigId: String, sortField: SortField): Message? { + try { + return withRetry { + jooq.transactionResult { configuration -> + val txn = DSL.using(configuration) + txn.select(idField, messageField) + .from(pendingTable) + .where( + configField.eq(pipelineConfigId), + shardCondition + ) + .orderBy(sortField) + .limit(1) + .forUpdate() + .fetchOne() + ?.into(MessageContainer::class.java) + ?.let { + txn.deleteFrom(pendingTable) + .where(idField.eq(it.id)) + .execute() + + return@transactionResult mapper.readValue(it.message, Message::class.java) + } + } + } + } catch (e: Exception) { + log.error("Failed popping pending execution for pipeline $pipelineConfigId, attempting to requeue " + + "StartWaitingExecutions message", e) + + val purge = (sortField.order == SortOrder.DESC) + queue.push(StartWaitingExecutions(pipelineConfigId, purge), Duration.ofSeconds(10)) + + return null + } + } + + override fun purge(pipelineConfigId: String, callback: (Message) -> Unit) { + do { + val oldest = popOldest(pipelineConfigId) + oldest?.let(callback) + } while (oldest != null) + } + + override fun depth(pipelineConfigId: String): Int = + withRetry { + jooq.selectCount() + .from(pendingTable) + .where( + configField.eq(pipelineConfigId), + shardCondition + ) + .fetchOne(0, Int::class.java) + } + + private data class MessageContainer( + val id: String, + val message: String + ) + + private fun withRetry(fn: (Any) -> T): T { + return retrySupport.retry({ + fn(this) + }, retryProperties.maxRetries, retryProperties.backoffMs, false) + } +} diff --git a/orca-queue-sql/src/test/kotlin/com/netflix/spinnaker/orca/q/sql/pending/SqlPendingExecutionServiceTest.kt b/orca-queue-sql/src/test/kotlin/com/netflix/spinnaker/orca/q/sql/pending/SqlPendingExecutionServiceTest.kt new file mode 100644 index 0000000000..449334b306 --- /dev/null +++ b/orca-queue-sql/src/test/kotlin/com/netflix/spinnaker/orca/q/sql/pending/SqlPendingExecutionServiceTest.kt @@ -0,0 +1,140 @@ +package com.netflix.spinnaker.orca.q.sql.pending + +import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.module.kotlin.KotlinModule +import com.netflix.spectator.api.NoopRegistry +import com.netflix.spinnaker.config.TransactionRetryProperties +import com.netflix.spinnaker.kork.sql.test.SqlTestUtil +import com.netflix.spinnaker.orca.fixture.pipeline +import com.netflix.spinnaker.orca.fixture.stage +import com.netflix.spinnaker.orca.pipeline.persistence.ExecutionRepository +import com.netflix.spinnaker.orca.q.PendingExecutionServiceTest +import com.netflix.spinnaker.orca.q.RestartStage +import com.netflix.spinnaker.orca.q.StartExecution +import com.netflix.spinnaker.q.Message +import com.netflix.spinnaker.q.Queue +import com.netflix.spinnaker.time.fixedClock +import com.nhaarman.mockito_kotlin.any +import com.nhaarman.mockito_kotlin.mock +import com.nhaarman.mockito_kotlin.spy +import com.nhaarman.mockito_kotlin.verify +import com.nhaarman.mockito_kotlin.verifyNoMoreInteractions +import com.nhaarman.mockito_kotlin.whenever +import org.assertj.core.api.Assertions +import org.jetbrains.spek.api.dsl.describe +import org.jetbrains.spek.api.dsl.given +import org.jetbrains.spek.api.dsl.it +import org.jetbrains.spek.api.dsl.on +import org.jetbrains.spek.subject.SubjectSpek +import org.jetbrains.spek.subject.itBehavesLike +import java.util.UUID + +internal object SqlPendingExecutionServiceTest : SubjectSpek({ + + itBehavesLike(PendingExecutionServiceTest) + + val testDatabase = SqlTestUtil.initTcMysqlDatabase() + val jooq = spy(testDatabase.context) + val queue = mock() + val repository = mock() + val clock = fixedClock() + val registry = NoopRegistry() + val retryProperties = TransactionRetryProperties() + + val maxDepth = 4 + + val mapper = ObjectMapper().apply { + registerModule(KotlinModule()) + registerSubtypes(StartExecution::class.java, RestartStage::class.java) + } + + subject { + SqlPendingExecutionService( + "test", + jooq, + queue, + repository, + mapper, + clock, + registry, + retryProperties, + maxDepth + ) + } + + val id = UUID.randomUUID().toString() + val pipeline = pipeline { + pipelineConfigId = id + stage { + refId = "1" + } + stage { + refId = "2" + requisiteStageRefIds = setOf("1") + } + } + + val startMessage = StartExecution(pipeline) + val restartMessage = RestartStage(pipeline.stageByRef("2"), "fzlem@netflix.com") + val callback = mock<(Message) -> Unit>() + + describe("maxDepth behavior") { + given("maxDepth messages are already queued") { + beforeGroup { + repeat(maxDepth) { + subject.enqueue(id, startMessage) + } + whenever(repository.retrieve(any(), any())).thenReturn(pipeline) + } + + on("enqueing a start message") { + subject.enqueue(id, startMessage) + + it("queue depth is unchanged") { + Assertions.assertThat(subject.depth(id)).isEqualTo(maxDepth) + } + + it("cancelled the associated execution") { + verify(repository).retrieve(any(), any()) + verify(repository).store(any()) + } + } + + on("enqueing a restart message") { + subject.enqueue(id, restartMessage) + + it("queue depth is unchanged") { + Assertions.assertThat(subject.depth(id)).isEqualTo(maxDepth) + } + + it("drops message without interacting with ExecutionRepository") { + verifyNoMoreInteractions(repository) + } + } + + afterGroup { subject.purge(id, callback) } + } + } + + describe("popping fails due to a database exception") { + given("db is down") { + beforeGroup { + jooq.execute("create table backup like pending_executions") + jooq.execute("drop table pending_executions") + } + + on("popping a message") { + subject.popOldest(id) + + it("requeues a StartWaitingExecutions message") { + verify(queue).push(any(), any()) + } + } + } + + afterGroup { + jooq.execute("create table pending_executions like backup") + jooq.execute("drop table backup") + } + } +}) diff --git a/orca-queue-tck/orca-queue-tck.gradle b/orca-queue-tck/orca-queue-tck.gradle index 603d5131b8..4de9ce154b 100644 --- a/orca-queue-tck/orca-queue-tck.gradle +++ b/orca-queue-tck/orca-queue-tck.gradle @@ -15,6 +15,7 @@ */ apply from: "$rootDir/gradle/kotlin.gradle" +apply from: "$rootDir/gradle/spek.gradle" dependencies { api(project(":orca-queue")) @@ -30,4 +31,8 @@ dependencies { implementation("com.netflix.spinnaker.keiko:keiko-spring:$keikoVersion") implementation("junit:junit") implementation("org.springframework:spring-test") + + implementation("org.jetbrains.spek:spek-api") + implementation("org.jetbrains.spek:spek-subject-extension") + } diff --git a/orca-queue-tck/src/main/kotlin/com/netflix/spinnaker/orca/q/PendingExecutionServiceTest.kt b/orca-queue-tck/src/main/kotlin/com/netflix/spinnaker/orca/q/PendingExecutionServiceTest.kt new file mode 100644 index 0000000000..1608a6d4af --- /dev/null +++ b/orca-queue-tck/src/main/kotlin/com/netflix/spinnaker/orca/q/PendingExecutionServiceTest.kt @@ -0,0 +1,148 @@ +package com.netflix.spinnaker.orca.q + +import com.netflix.spinnaker.orca.fixture.pipeline +import com.netflix.spinnaker.orca.fixture.stage +import com.netflix.spinnaker.orca.q.pending.PendingExecutionService +import com.netflix.spinnaker.q.Message +import com.nhaarman.mockito_kotlin.mock +import com.nhaarman.mockito_kotlin.verify +import org.assertj.core.api.Assertions +import org.jetbrains.spek.api.dsl.describe +import org.jetbrains.spek.api.dsl.given +import org.jetbrains.spek.api.dsl.it +import org.jetbrains.spek.api.dsl.on +import org.jetbrains.spek.subject.SubjectSpek +import java.util.UUID + +object PendingExecutionServiceTest : SubjectSpek({ + + val id = UUID.randomUUID().toString() + val pipeline = pipeline { + pipelineConfigId = id + stage { + refId = "1" + } + stage { + refId = "2" + requisiteStageRefIds = setOf("1") + } + } + val startMessage = StartExecution(pipeline) + val restartMessage = RestartStage(pipeline.stageByRef("2"), "fzlem@netflix.com") + val callback = mock<(Message) -> Unit>() + + sequenceOf(startMessage, restartMessage).forEach { message -> + describe("enqueueing a ${message.javaClass.simpleName} message") { + given("the queue is empty") { + beforeGroup { + Assertions.assertThat(subject.depth(id)).isZero() + } + + on("enqueueing the message") { + subject.enqueue(id, message) + + it("makes the depth 1") { + Assertions.assertThat(subject.depth(id)).isOne() + } + } + + afterGroup { subject.purge(id, callback) } + } + } + } + + describe("popping a message") { + given("the queue is empty") { + beforeGroup { + Assertions.assertThat(subject.depth(id)).isZero() + } + + on("popping a message") { + val popped = subject.popOldest(id) + + it("returns null") { + Assertions.assertThat(popped).isNull() + } + } + } + + given("a message was enqueued") { + beforeGroup { + subject.enqueue(id, startMessage) + } + + on("popping a message") { + val popped = subject.popOldest(id) + + it("returns the message") { + Assertions.assertThat(popped).isEqualTo(startMessage) + } + + it("removes the message from the queue") { + Assertions.assertThat(subject.depth(id)).isZero() + } + } + + afterGroup { subject.purge(id, callback) } + } + + given("multiple messages were enqueued") { + beforeEachTest { + subject.enqueue(id, startMessage) + subject.enqueue(id, restartMessage) + } + + on("popping the oldest message") { + val popped = subject.popOldest(id) + + it("returns the oldest message") { + Assertions.assertThat(popped).isEqualTo(startMessage) + } + + it("removes the message from the queue") { + Assertions.assertThat(subject.depth(id)).isOne() + } + } + + on("popping the newest message") { + val popped = subject.popNewest(id) + + it("returns the newest message") { + Assertions.assertThat(popped).isEqualTo(restartMessage) + } + + it("removes the message from the queue") { + Assertions.assertThat(subject.depth(id)).isOne() + } + } + + afterEachTest { subject.purge(id, callback) } + } + } + + describe("purging the queue") { + val purgeCallback = mock<(Message) -> Unit>() + + given("there are some messages on the queue") { + beforeGroup { + subject.enqueue(id, startMessage) + subject.enqueue(id, restartMessage) + } + + on("purging the queue") { + subject.purge(id, purgeCallback) + + it("makes the queue empty") { + Assertions.assertThat(subject.depth(id)).isZero() + } + + it("invokes the callback passing each message") { + verify(purgeCallback).invoke(startMessage) + verify(purgeCallback).invoke(restartMessage) + } + } + + afterGroup { subject.purge(id, callback) } + } + } +}) diff --git a/orca-sql/src/main/resources/db/changelog-master.yml b/orca-sql/src/main/resources/db/changelog-master.yml index 667e432f0a..85e5848eda 100644 --- a/orca-sql/src/main/resources/db/changelog-master.yml +++ b/orca-sql/src/main/resources/db/changelog-master.yml @@ -35,3 +35,6 @@ databaseChangeLog: - include: file: changelog/20190530-pipeline-config-index.yml relativeToChangelogFile: true +- include: + file: changelog/20190711-create-pending-table.yml + relativeToChangelogFile: true diff --git a/orca-sql/src/main/resources/db/changelog/20190711-create-pending-table.yml b/orca-sql/src/main/resources/db/changelog/20190711-create-pending-table.yml new file mode 100644 index 0000000000..78df6c69eb --- /dev/null +++ b/orca-sql/src/main/resources/db/changelog/20190711-create-pending-table.yml @@ -0,0 +1,51 @@ +databaseChangeLog: +- changeSet: + id: create-pending-table + author: afeldman + changes: + - createTable: + tableName: pending_executions + columns: + - column: + name: id + type: char(26) + constraints: + primaryKey: true + nullable: false + - column: + name: config_id + type: char(36) + constraints: + nullable: false + - column: + name: shard + type: varchar(32) + - column: + name: message + type: longtext + - modifySql: + dbms: mysql + append: + value: " engine innodb" + rollback: + - dropTable: + tableName: pending_executions + +- changeSet: + id: create-pending-indices + author: afeldman + changes: + - createIndex: + tableName: pending_executions + indexName: config_partition_id_idx + columns: + - column: + name: config_id + - column: + name: shard + - column: + name: id + rollback: + - dropIndex: + indexName: config_partition_id_idx + tableName: pending_executions diff --git a/orca-web/orca-web.gradle b/orca-web/orca-web.gradle index a402c8182d..cfafa54457 100644 --- a/orca-web/orca-web.gradle +++ b/orca-web/orca-web.gradle @@ -48,6 +48,7 @@ dependencies { implementation(project(":orca-webhook")) implementation(project(":orca-eureka")) implementation(project(":orca-queue-redis")) + implementation(project(":orca-queue-sql")) implementation(project(":orca-pipelinetemplate")) implementation(project(":orca-keel")) implementation(project(":orca-qos")) diff --git a/settings.gradle b/settings.gradle index 4aa1d28d1b..b331447c04 100644 --- a/settings.gradle +++ b/settings.gradle @@ -40,6 +40,7 @@ include "orca-extensionpoint", "orca-applications", "orca-queue", "orca-queue-redis", + "orca-queue-sql", "orca-queue-tck", "orca-pipelinetemplate", "orca-validation",