From 9c4b8f896ffeecc2ba5bc804b3e4d08aa3796b33 Mon Sep 17 00:00:00 2001 From: Daniel Reynaud Date: Fri, 8 May 2020 10:13:45 -0700 Subject: [PATCH] feat(shovel): override partition of foreign executions (#3667) * feat(shovel): override partition of foreign executions This should allow the shovel to be used in failover scenarios, between orcas that have a different partition. --- .../config/RedisQueueShovelConfiguration.kt | 6 +- .../config/SqlQueueShovelConfiguration.kt | 7 +- .../SqlRedisQueueShovelConfiguration.kt | 9 +- .../netflix/spinnaker/orca/q/QueueShovel.kt | 45 +++++++- .../spinnaker/orca/q/QueueShovelTest.kt | 105 ++++++++++++++++-- orca-web/orca-web.gradle | 1 - 6 files changed, 154 insertions(+), 19 deletions(-) diff --git a/orca-queue-redis/src/main/kotlin/com/netflix/spinnaker/config/RedisQueueShovelConfiguration.kt b/orca-queue-redis/src/main/kotlin/com/netflix/spinnaker/config/RedisQueueShovelConfiguration.kt index 83e8600aa0..43513146d9 100644 --- a/orca-queue-redis/src/main/kotlin/com/netflix/spinnaker/config/RedisQueueShovelConfiguration.kt +++ b/orca-queue-redis/src/main/kotlin/com/netflix/spinnaker/config/RedisQueueShovelConfiguration.kt @@ -162,7 +162,8 @@ class RedisQueueShovelConfiguration { previousQueue = previousQueueImpl, registry = registry, activator = discoveryActivator, - config = dynamicConfigService + config = dynamicConfigService, + executionRepository = null ) @Bean @@ -179,6 +180,7 @@ class RedisQueueShovelConfiguration { previousQueue = previousQueueImpl, registry = registry, activator = discoveryActivator, - config = dynamicConfigService + config = dynamicConfigService, + executionRepository = null ) } diff --git a/orca-queue-sql/src/main/kotlin/com/netflix/spinnaker/config/SqlQueueShovelConfiguration.kt b/orca-queue-sql/src/main/kotlin/com/netflix/spinnaker/config/SqlQueueShovelConfiguration.kt index 726bbbf452..ebca064463 100644 --- a/orca-queue-sql/src/main/kotlin/com/netflix/spinnaker/config/SqlQueueShovelConfiguration.kt +++ b/orca-queue-sql/src/main/kotlin/com/netflix/spinnaker/config/SqlQueueShovelConfiguration.kt @@ -19,6 +19,7 @@ package com.netflix.spinnaker.config import com.fasterxml.jackson.databind.ObjectMapper import com.netflix.spectator.api.Registry import com.netflix.spinnaker.kork.dynamicconfig.DynamicConfigService +import com.netflix.spinnaker.orca.pipeline.persistence.ExecutionRepository import com.netflix.spinnaker.orca.q.QueueShovel import com.netflix.spinnaker.q.Activator import com.netflix.spinnaker.q.metrics.EventPublisher @@ -76,13 +77,15 @@ class SqlQueueShovelConfiguration { @Qualifier("previousSqlQueue") previousQueue: SqlQueue, registry: Registry, @Qualifier("discoveryActivator") activator: Activator, - config: DynamicConfigService + config: DynamicConfigService, + executionRepository: ExecutionRepository ): QueueShovel { return QueueShovel( queue = queue, previousQueue = previousQueue, registry = registry, activator = activator, - config = config) + config = config, + executionRepository = executionRepository) } } diff --git a/orca-queue-sql/src/main/kotlin/com/netflix/spinnaker/config/SqlRedisQueueShovelConfiguration.kt b/orca-queue-sql/src/main/kotlin/com/netflix/spinnaker/config/SqlRedisQueueShovelConfiguration.kt index 2d625e9986..af8855afce 100644 --- a/orca-queue-sql/src/main/kotlin/com/netflix/spinnaker/config/SqlRedisQueueShovelConfiguration.kt +++ b/orca-queue-sql/src/main/kotlin/com/netflix/spinnaker/config/SqlRedisQueueShovelConfiguration.kt @@ -86,7 +86,8 @@ class SqlRedisQueueShovelConfiguration { previousQueue = previousQueue, registry = registry, activator = discoveryActivator, - config = dynamicConfigService) + config = dynamicConfigService, + executionRepository = null) } @Bean @@ -119,7 +120,8 @@ class SqlRedisQueueShovelConfiguration { previousQueue = previousQueue, registry = registry, activator = discoveryActivator, - config = dynamicConfigService) + config = dynamicConfigService, + executionRepository = null) } /** @@ -158,6 +160,7 @@ class SqlRedisQueueShovelConfiguration { previousQueue = previousQueue, registry = registry, activator = discoveryActivator, - config = dynamicConfigService) + config = dynamicConfigService, + executionRepository = null) } } diff --git a/orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/QueueShovel.kt b/orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/QueueShovel.kt index 0c8a074508..141e5ece3d 100644 --- a/orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/QueueShovel.kt +++ b/orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/QueueShovel.kt @@ -17,7 +17,10 @@ package com.netflix.spinnaker.orca.q import com.netflix.spectator.api.Registry import com.netflix.spinnaker.kork.dynamicconfig.DynamicConfigService +import com.netflix.spinnaker.orca.pipeline.persistence.ExecutionNotFoundException +import com.netflix.spinnaker.orca.pipeline.persistence.ExecutionRepository import com.netflix.spinnaker.q.Activator +import com.netflix.spinnaker.q.Message import com.netflix.spinnaker.q.Queue import java.time.Duration import java.time.Instant @@ -41,7 +44,8 @@ class QueueShovel( private val previousQueue: Queue, private val registry: Registry, private val activator: Activator, - private val config: DynamicConfigService + private val config: DynamicConfigService, + private val executionRepository: ExecutionRepository? ) { private val log = LoggerFactory.getLogger(javaClass) @@ -71,9 +75,20 @@ class QueueShovel( registry.counter(pollOpsRateId).increment() previousQueue.poll { message, ack -> try { + log.debug("Shoveling message $message") + + // transfer the ownership _before_ pushing the message on the queue + // we don't want a task handler running that message if the execution is not local + transferOwnership(message) + queue.push(message) ack.invoke() registry.counter(shoveledMessageId).increment() + } catch (e: ExecutionNotFoundException) { + // no need to log the stack trace on ExecutionNotFoundException, which can be somewhat expected + log.error("Failed shoveling message from previous queue to active (message: $message) " + + "because of exception $e") + registry.counter(shovelErrorId).increment() } catch (e: Throwable) { log.error("Failed shoveling message from previous queue to active (message: $message)", e) registry.counter(shovelErrorId).increment() @@ -81,7 +96,33 @@ class QueueShovel( } } + private fun transferOwnership(message: Message) { + if (executionRepository == null) { + return + } + + if (message !is ExecutionLevel) { + log.warn("Message $message does not implement ExecutionLevel, can not inspect partition") + return + } + + // don't catch exceptions on retrieve/store (e.g. ExecutionNotFoundException), so that we can short-circuit shoveling + // of this message + val execution = executionRepository.retrieve(message.executionType, message.executionId) + val isForeign = !executionRepository.handlesPartition(execution.partition) + if (isForeign) { + log.info("Taking ownership of foreign execution ${execution.id} with partition '${execution.partition}'. " + + "Setting partition to '${executionRepository.partition}'") + execution.partition = executionRepository.partition + executionRepository.store(execution) + } + } + @PostConstruct - fun confirmShovelUsage() = + fun confirmShovelUsage() { log.info("${javaClass.simpleName} migrator from $previousQueue to $queue is enabled") + if (executionRepository == null) { + log.warn("${javaClass.simpleName} configured without an ExecutionRepository, won't be able to transfer ownership") + } + } } diff --git a/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/QueueShovelTest.kt b/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/QueueShovelTest.kt index 6b83fa25f8..478faa4130 100644 --- a/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/QueueShovelTest.kt +++ b/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/QueueShovelTest.kt @@ -18,48 +18,135 @@ package com.netflix.spinnaker.orca.q import com.netflix.spectator.api.NoopRegistry import com.netflix.spinnaker.kork.dynamicconfig.DynamicConfigService import com.netflix.spinnaker.orca.api.pipeline.models.ExecutionType.PIPELINE +import com.netflix.spinnaker.orca.api.pipeline.models.PipelineExecution +import com.netflix.spinnaker.orca.pipeline.persistence.ExecutionNotFoundException +import com.netflix.spinnaker.orca.pipeline.persistence.ExecutionRepository import com.netflix.spinnaker.q.Activator import com.netflix.spinnaker.q.Queue import com.netflix.spinnaker.q.QueueCallback import com.nhaarman.mockito_kotlin.any import com.nhaarman.mockito_kotlin.doAnswer +import com.nhaarman.mockito_kotlin.doReturn import com.nhaarman.mockito_kotlin.mock +import com.nhaarman.mockito_kotlin.never +import com.nhaarman.mockito_kotlin.reset import com.nhaarman.mockito_kotlin.verify +import com.nhaarman.mockito_kotlin.verifyZeroInteractions import com.nhaarman.mockito_kotlin.whenever import org.jetbrains.spek.api.dsl.describe import org.jetbrains.spek.api.dsl.it import org.jetbrains.spek.api.dsl.on import org.jetbrains.spek.api.lifecycle.CachingMode import org.jetbrains.spek.subject.SubjectSpek +import org.mockito.ArgumentMatchers.anyString class QueueShovelTest : SubjectSpek({ - + val message = StartExecution(PIPELINE, "executionId", "app") val queue: Queue = mock() - val previousQueue: Queue = mock() val registry = NoopRegistry() + val ackCallback = mock<() -> Unit>() + val previousQueue: Queue = mock() + val executionRepository: ExecutionRepository = mock() + val execution: PipelineExecution = mock() + val activator = object : Activator { override val enabled = true } subject(CachingMode.GROUP) { - QueueShovel(queue, previousQueue, registry, activator, DynamicConfigService.NOOP) + QueueShovel(queue, previousQueue, registry, activator, DynamicConfigService.NOOP, executionRepository) + } + + beforeEachTest { + whenever(previousQueue.poll(any())).doAnswer { + it.getArgument(0)(message, ackCallback) + } + + whenever(execution.partition).doReturn("some-partition") + } + + afterEachTest { + reset(executionRepository, queue, ackCallback, execution) } describe("polling the previous queue") { - val message = StartExecution(PIPELINE, "1", "spinnaker") + beforeGroup { + whenever(executionRepository.retrieve(any(), anyString())).doReturn(execution) + whenever(executionRepository.handlesPartition(anyString())).doReturn(true) + } + + on("the shovel poll method is invoked") { + subject.migrateOne() + + it("pushes the message onto the current queue and acks it") { + verify(queue).push(message) + verify(ackCallback).invoke() + } + } + } + describe("dealing with a foreign execution") { beforeGroup { - whenever(previousQueue.poll(any())) doAnswer { - it.getArgument(0)(message, {}) + whenever(executionRepository.retrieve(any(), anyString())).doReturn(execution) + whenever(executionRepository.handlesPartition(anyString())).doReturn(false) + whenever(executionRepository.partition).thenReturn("local-partition") + } + + on("a poll cycle where the message belongs to a foreign execution") { + subject.migrateOne() + + it("overwrites the partition to be the local one") { + verify(execution).partition = "local-partition" + verify(executionRepository).store(execution) + verify(queue).push(message) } } + } - on("the shovel poll method is invoked") { + describe("dealing with execution repository read errors") { + beforeGroup { + whenever(executionRepository.retrieve(any(), anyString())).thenThrow(ExecutionNotFoundException("womp womp")) + } + + on("a poll cycle") { subject.migrateOne() + + it("leaves the message on the old queue") { + // not pushed + verifyZeroInteractions(queue) + + // not acked + verifyZeroInteractions(ackCallback) + + // execution not updated + verify(executionRepository, never()).handlesPartition(anyString()) + verify(executionRepository, never()).store(any()) + } + } + } + + describe("dealing with execution repository write errors") { + beforeGroup { + whenever(executionRepository.retrieve(any(), anyString())).doReturn(execution) + whenever(executionRepository.handlesPartition(anyString())).doReturn(false) + whenever(executionRepository.partition).thenReturn("local-partition") + whenever(executionRepository.store(execution)).thenThrow(RuntimeException("something unexpected")) } - it("pushes the message onto the current queue") { - verify(queue).push(message) + on("a poll cycle") { + subject.migrateOne() + + it("leaves the message on the old queue") { + // attempted to transfer ownership + verify(execution).partition = "local-partition" + verify(executionRepository).store(execution) + + // not pushed + verifyZeroInteractions(queue) + + // not acked + verifyZeroInteractions(ackCallback) + } } } }) diff --git a/orca-web/orca-web.gradle b/orca-web/orca-web.gradle index aff457e71d..1a6b88594b 100644 --- a/orca-web/orca-web.gradle +++ b/orca-web/orca-web.gradle @@ -90,7 +90,6 @@ dependencies { testImplementation("cglib:cglib-nodep") testImplementation("org.objenesis:objenesis") testImplementation("org.junit.jupiter:junit-jupiter-api") - testImplementation("org.hamcrest:hamcrest-core:1.3") testImplementation("com.netflix.spinnaker.keiko:keiko-mem:$keikoVersion") }