Skip to content

Commit

Permalink
Improve tolerance to backend inavailability in orchestrator (#1124)
Browse files Browse the repository at this point in the history
* Skip heartbeat processing but update `HeartbeatInspector`'s registry if agent state can't be posted to backend instead of failing and potentially treating all agents as crashed
* Make `areAgentsHaveStarted` track agents per execution

Closes #1075
  • Loading branch information
petertrr authored Aug 31, 2022
1 parent 71b6102 commit 7828d78
Show file tree
Hide file tree
Showing 10 changed files with 83 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import com.saveourtool.save.agent.utils.*
import com.saveourtool.save.agent.utils.readFile
import com.saveourtool.save.agent.utils.requiredEnv
import com.saveourtool.save.agent.utils.sendDataToBackend
import com.saveourtool.save.core.files.getWorkingDirectory
import com.saveourtool.save.core.logging.describe
import com.saveourtool.save.core.plugin.Plugin
import com.saveourtool.save.core.result.CountWarnings
Expand Down Expand Up @@ -117,7 +118,7 @@ class SaveAgent(private val config: AgentConfiguration,
* [coroutineScope] is the topmost scope for all jobs, so by cancelling it
* we can gracefully shut down the whole application.
*/
fun shutdown() {
internal fun shutdown() {
coroutineScope.cancel()
}

Expand All @@ -126,8 +127,9 @@ class SaveAgent(private val config: AgentConfiguration,
logInfoCustom("Scheduling heartbeats")
while (true) {
val response = runCatching {
val executionId = requiredEnv(AgentEnvName.EXECUTION_ID).toLong()
// TODO: get execution progress here. However, with current implementation JSON report won't be valid until all tests are finished.
sendHeartbeat(ExecutionProgress(0))
sendHeartbeat(ExecutionProgress(executionId = executionId, percentCompletion = 0))
}
if (response.isSuccess) {
when (val heartbeatResponse = response.getOrThrow().also {
Expand Down Expand Up @@ -170,27 +172,26 @@ class SaveAgent(private val config: AgentConfiguration,

/**
* @param cliArgs arguments for SAVE process
* @return Unit
*/
internal fun CoroutineScope.startSaveProcess(cliArgs: String) {
// blocking execution of OS process
state.value = AgentState.BUSY
executionStartSeconds.value = Clock.System.now().epochSeconds
val pwd = FileSystem.SYSTEM.canonicalize(".".toPath())
logInfoCustom("Starting SAVE in $pwd with provided args $cliArgs")
logInfoCustom("Starting SAVE in ${getWorkingDirectory()} with provided args $cliArgs")
val executionResult = runSave(cliArgs)
logInfoCustom("SAVE has completed execution with status ${executionResult.code}")

val saveCliLogFilePath = config.logFilePath
val byteArray = FileSystem.SYSTEM.source(saveCliLogFilePath.toPath())
.buffer()
.readByteArray()
val saveCliLogData = String(byteArray).split("\n")

launchLogSendingJob(byteArray)
logDebugCustom("SAVE has completed execution, execution logs:")
saveCliLogData.forEach {
logDebugCustom("[SAVE] $it")
}

when (executionResult.code) {
0 -> if (saveCliLogData.isEmpty()) {
state.value = AgentState.CLI_FAILED
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ class SaveAgentTest {
@Test
fun `agent should send heartbeats`() {
runBlocking {
saveAgentForTest.sendHeartbeat(ExecutionProgress(0))
saveAgentForTest.sendHeartbeat(ExecutionProgress(0, -1L))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ data:
management.endpoints.web.exposure.include=*
orchestrator.agent-settings.orchestrator-url=http://{{ .Values.orchestrator.name }}
orchestrator.agent-settings.backend-url=http://{{ .Values.backend.name }}
orchestrator.agent-settings.debug=true
orchestrator.test-resources.tmp-path=/tmp/save/resources
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,13 @@ import kotlinx.serialization.Serializable
* Progress of tests execution
*
* @property percentCompletion percentage of completed jobs, integer 0..100
* @property executionId ID of an execution under which the progress is being tracked
*/
@Serializable
data class ExecutionProgress(val percentCompletion: Int) {
data class ExecutionProgress(
val percentCompletion: Int,
val executionId: Long,
) {
init {
@Suppress("MAGIC_NUMBER", "MagicNumber")
require(percentCompletion in 0..100) { "percentCompletion should be in 0..100, but is $percentCompletion" }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,8 @@ data class ConfigProperties(
val useGvisor: Boolean,
val agentCpuRequests: String = "100m",
val agentCpuLimits: String = "500m",
val agentMemoryRequests: String = "300m",
val agentMemoryLimits: String = "500m",
val agentMemoryRequests: String = "300Mi",
val agentMemoryLimits: String = "500Mi",
)

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,16 @@ import com.saveourtool.save.orchestrator.config.ConfigProperties
import com.saveourtool.save.orchestrator.service.AgentService
import com.saveourtool.save.orchestrator.service.DockerService
import com.saveourtool.save.orchestrator.service.HeartBeatInspector
import com.saveourtool.save.orchestrator.service.areAgentsHaveStarted
import com.saveourtool.save.utils.debug
import com.saveourtool.save.utils.warn

import org.slf4j.LoggerFactory
import org.springframework.web.bind.annotation.PostMapping
import org.springframework.web.bind.annotation.RequestBody
import org.springframework.web.bind.annotation.RestController
import org.springframework.web.reactive.function.client.WebClientResponseException
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import reactor.kotlin.core.publisher.toMono
import reactor.kotlin.core.util.function.component1
import reactor.kotlin.core.util.function.component2

Expand Down Expand Up @@ -58,21 +57,20 @@ class HeartbeatController(private val agentService: AgentService,
*/
@PostMapping("/heartbeat")
fun acceptHeartbeat(@RequestBody heartbeat: Heartbeat): Mono<String> {
logger.info("Got heartbeat state: ${heartbeat.state.name} from ${heartbeat.agentId}")
areAgentsHaveStarted.compareAndSet(false, true)
// store new state into DB
return agentService.updateAgentStatusesWithDto(
AgentStatusDto(LocalDateTime.now(), heartbeat.state, heartbeat.agentId)
)
.onErrorResume(WebClientResponseException::class.java) {
logger.warn("Couldn't update agent statuses for agent ${heartbeat.agentId}, will skip this heartbeat: ${it.message}")
Mono.empty()
val executionId = heartbeat.executionProgress.executionId
logger.info("Got heartbeat state: ${heartbeat.state.name} from ${heartbeat.agentId} under execution id=$executionId")
return {
dockerService.markAgentForExecutionAsStarted(executionId)
heartBeatInspector.updateAgentHeartbeatTimeStamps(heartbeat)
}
.toMono()
.flatMap {
// store new state into DB
agentService.updateAgentStatusesWithDto(
AgentStatusDto(LocalDateTime.now(), heartbeat.state, heartbeat.agentId)
)
}
.doOnSuccess {
// Update heartbeat info only if agent state is updated in the backend
heartBeatInspector.updateAgentHeartbeatTimeStamps(heartbeat)
}
.then(
.flatMap {
when (heartbeat.state) {
// if agent sends the first heartbeat, we try to assign work for it
STARTING -> handleVacantAgent(heartbeat.agentId, isStarting = true)
Expand All @@ -82,14 +80,17 @@ class HeartbeatController(private val agentService: AgentService,
FINISHED -> agentService.checkSavedData(heartbeat.agentId).flatMap { isSavingSuccessful ->
handleFinishedAgent(heartbeat.agentId, isSavingSuccessful)
}

BUSY -> Mono.just(ContinueResponse)
BACKEND_FAILURE, BACKEND_UNREACHABLE, CLI_FAILED -> Mono.just(WaitResponse)
CRASHED, TERMINATED, STOPPED_BY_ORCH -> Mono.fromCallable {
handleIllegallyOnlineAgent(heartbeat.agentId, heartbeat.state)
WaitResponse
}
}
)
}
// Heartbeat couldn't be processed, agent should replay it current state on the next heartbeat.
.defaultIfEmpty(ContinueResponse)
.map {
Json.encodeToString(HeartbeatResponse.serializer(), it)
}
Expand Down Expand Up @@ -118,7 +119,8 @@ class HeartbeatController(private val agentService: AgentService,
.flatMap { (response, shouldStop) ->
if (shouldStop) {
agentService.updateAgentStatusesWithDto(AgentStatusDto(LocalDateTime.now(), TERMINATED, agentId))
.thenReturn(TerminateResponse)
.thenReturn<HeartbeatResponse>(TerminateResponse)
.defaultIfEmpty(ContinueResponse)
.doOnSuccess {
logger.info("Agent id=$agentId will receive ${TerminateResponse::class.simpleName} and should shutdown gracefully")
ensureGracefulShutdown(agentId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,12 @@ import org.springframework.beans.factory.annotation.Qualifier
import org.springframework.stereotype.Service
import org.springframework.web.reactive.function.BodyInserters
import org.springframework.web.reactive.function.client.WebClient
import org.springframework.web.reactive.function.client.WebClientException
import org.springframework.web.reactive.function.client.WebClientResponseException
import org.springframework.web.reactive.function.client.bodyToMono
import reactor.core.publisher.Mono
import reactor.core.scheduler.Schedulers
import reactor.kotlin.core.publisher.onErrorResume
import java.time.Duration

import java.time.LocalDateTime
Expand Down Expand Up @@ -92,7 +95,7 @@ class AgentService(

/**
* @param agentState [AgentStatus] to update in the DB
* @return as bodiless entity of response
* @return a Mono containing bodiless entity of response or an empty Mono if request has failed
*/
fun updateAgentStatusesWithDto(agentState: AgentStatusDto): Mono<BodilessResponseEntity> =
webClientBackend
Expand All @@ -101,6 +104,10 @@ class AgentService(
.body(BodyInserters.fromValue(agentState))
.retrieve()
.toBodilessEntity()
.onErrorResume(WebClientException::class) {
log.warn("Couldn't update agent statuses because of backend failure", it)
Mono.empty()
}

/**
* Check that no TestExecution for agent [agentId] have status READY_FOR_TESTING
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ import org.springframework.web.reactive.function.BodyInserters
import org.springframework.web.reactive.function.client.WebClient
import reactor.core.publisher.Flux

import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.ConcurrentMap
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicLong

import kotlin.io.path.*
Expand All @@ -39,6 +42,8 @@ class DockerService(
private val agentRunner: AgentRunner,
private val agentService: AgentService,
) {
private val areAgentsHaveStarted: ConcurrentMap<Long, AtomicBoolean> = ConcurrentHashMap()

@Autowired
@Qualifier("webClientBackend")
private lateinit var webClientBackend: WebClient
Expand Down Expand Up @@ -100,20 +105,22 @@ class DockerService(
val duration = AtomicLong(0)
Flux.interval(configProperties.agentsStartCheckIntervalMillis.milliseconds.toJavaDuration())
.takeWhile {
duration.get() < configProperties.agentsStartTimeoutMillis && !areAgentsHaveStarted.get()
val isAnyAgentStarted = areAgentsHaveStarted.computeIfAbsent(executionId) { AtomicBoolean(false) }.get()
duration.get() < configProperties.agentsStartTimeoutMillis && !isAnyAgentStarted
}
.doOnNext {
duration.set((Clock.System.now() - now).inWholeMilliseconds)
}
.doOnComplete {
if (!areAgentsHaveStarted.get()) {
if (areAgentsHaveStarted[executionId]?.get() != true) {
log.error("Internal error: none of agents $agentIds are started, will mark execution $executionId as failed.")
agentRunner.stop(executionId)
agentService.updateExecution(executionId, ExecutionStatus.ERROR,
"Internal error, raise an issue at https://github.com/saveourtool/save-cloud/issues/new"
).then(agentService.markTestExecutionsAsFailed(agentIds, AgentState.CRASHED))
.subscribe()
}
areAgentsHaveStarted.remove(executionId)
}
}
}
Expand All @@ -133,6 +140,15 @@ class DockerService(
false
}

/**
* @param executionId
*/
fun markAgentForExecutionAsStarted(executionId: Long) {
areAgentsHaveStarted
.computeIfAbsent(executionId) { AtomicBoolean(false) }
.compareAndSet(false, true)
}

/**
* Check whether the agent agentId is stopped
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,10 @@ import reactor.core.publisher.Flux
import java.time.LocalDateTime
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.ConcurrentMap
import java.util.concurrent.atomic.AtomicBoolean

import kotlinx.datetime.Clock
import kotlinx.datetime.Instant

internal var areAgentsHaveStarted = AtomicBoolean(false)

typealias AgentStateWithTimeStamp = Pair<String, Instant>

/**
Expand Down
Loading

0 comments on commit 7828d78

Please sign in to comment.