Skip to content

Commit feeaad6

Browse files
authored
Monitoring instability fixes (#118)
* experimenting with thread-pool tuning * make retry logic work again * provide a temporary user-agent as a PoC
1 parent 8d60006 commit feeaad6

File tree

14 files changed

+123
-79
lines changed

14 files changed

+123
-79
lines changed

build.gradle.kts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,6 @@ dependencies {
6363
implementation(mn.logback.classic)
6464
implementation(mn.micronaut.http.client)
6565
implementation(mn.micronaut.rxjava3)
66-
implementation(mn.micronaut.rxjava3.http.client)
6766
implementation(mn.micronaut.retry)
6867
implementation(mn.micronaut.security.jwt)
6968

@@ -84,6 +83,7 @@ dependencies {
8483
implementation(mn.kotlin.stdlib.jdk8)
8584
implementation(mn.kotlin.reflect)
8685
implementation(mn.kotlinx.coroutines.core)
86+
implementation(mn.kotlinx.coroutines.reactive)
8787
implementation(mn.micronaut.kotlin.extension.functions)
8888
implementation("io.arrow-kt:arrow-core-data:0.12.1")
8989

config/detekt/detekt.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -189,7 +189,7 @@ coroutines:
189189
- 'Default'
190190
- 'Unconfined'
191191
RedundantSuspendModifier:
192-
active: true
192+
active: false
193193
SleepInsteadOfDelay:
194194
active: true
195195
SuspendFunWithCoroutineScopeReceiver:

src/main/kotlin/com/kuvaszuptime/kuvasz/services/CheckScheduler.kt

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,9 @@ import io.micronaut.scheduling.TaskExecutors
1111
import io.micronaut.scheduling.TaskScheduler
1212
import jakarta.annotation.PostConstruct
1313
import jakarta.inject.Named
14+
import kotlinx.coroutines.CoroutineDispatcher
15+
import kotlinx.coroutines.CoroutineScope
16+
import kotlinx.coroutines.launch
1417
import org.slf4j.LoggerFactory
1518
import java.time.Duration
1619
import java.util.concurrent.ScheduledFuture
@@ -20,9 +23,12 @@ class CheckScheduler(
2023
@Named(TaskExecutors.SCHEDULED) private val taskScheduler: TaskScheduler,
2124
private val monitorRepository: MonitorRepository,
2225
private val uptimeChecker: UptimeChecker,
23-
private val sslChecker: SSLChecker
26+
private val sslChecker: SSLChecker,
27+
dispatcher: CoroutineDispatcher,
2428
) {
2529

30+
private val scope = CoroutineScope(dispatcher)
31+
2632
private val scheduledChecks: MutableList<ScheduledCheck> = mutableListOf()
2733

2834
@PostConstruct
@@ -100,21 +106,36 @@ class CheckScheduler(
100106
val initialDelay = (1..monitor.uptimeCheckInterval).random().toDurationOfSeconds()
101107
val period = monitor.uptimeCheckInterval.toDurationOfSeconds()
102108
taskScheduler.scheduleWithFixedDelay(initialDelay, period) {
103-
uptimeChecker.check(monitor)
109+
scope.launch {
110+
@Suppress("TooGenericExceptionCaught")
111+
try {
112+
uptimeChecker.check(monitor)
113+
} catch (ex: Throwable) {
114+
// Better to catch and swallow everything that wasn't catched before to prevent
115+
// the accidental cancellation of the parent coroutine
116+
logger.error(
117+
"An unexpected error happened during the uptime check of a " +
118+
"monitor (${monitor.name}): ${ex.message}"
119+
)
120+
}
121+
}
104122
}
105123
}
106124

107125
private fun scheduleSSLCheck(monitor: MonitorRecord): Result<ScheduledFuture<*>> =
108126
runCatching {
109-
val initialDelay = Duration.ofMinutes(SSL_CHECK_INITIAL_DELAY_MINUTES)
127+
val initialDelay = Duration.ofSeconds(
128+
(SSL_CHECK_INITIAL_DELAY_MIN_SECONDS..SSL_CHECK_INITIAL_DELAY_MAX_SECONDS).random()
129+
)
110130
val period = Duration.ofDays(SSL_CHECK_PERIOD_DAYS)
111131
taskScheduler.scheduleWithFixedDelay(initialDelay, period) {
112132
sslChecker.check(monitor)
113133
}
114134
}
115135

116136
companion object {
117-
private const val SSL_CHECK_INITIAL_DELAY_MINUTES = 1L
137+
private const val SSL_CHECK_INITIAL_DELAY_MIN_SECONDS = 60L
138+
private const val SSL_CHECK_INITIAL_DELAY_MAX_SECONDS = 300L
118139
private const val SSL_CHECK_PERIOD_DAYS = 1L
119140
private val logger = LoggerFactory.getLogger(CheckScheduler::class.java)
120141
}
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
package com.kuvaszuptime.kuvasz.services
2+
3+
import io.micronaut.context.annotation.Factory
4+
import jakarta.inject.Singleton
5+
import kotlinx.coroutines.Dispatchers
6+
7+
@Factory
8+
class DispatcherFactory {
9+
10+
@Singleton
11+
@Suppress("InjectDispatcher")
12+
fun provideDispatcher() = Dispatchers.IO
13+
}

src/main/kotlin/com/kuvaszuptime/kuvasz/services/SSLChecker.kt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ class SSLChecker(
1919
) {
2020

2121
companion object {
22+
// TODO make it configurable
2223
private const val EXPIRY_THRESHOLD_DAYS = 30L
2324
}
2425

src/main/kotlin/com/kuvaszuptime/kuvasz/services/UptimeChecker.kt

Lines changed: 41 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -7,21 +7,22 @@ import com.kuvaszuptime.kuvasz.models.toMicronautHttpMethod
77
import com.kuvaszuptime.kuvasz.repositories.UptimeEventRepository
88
import com.kuvaszuptime.kuvasz.tables.records.MonitorRecord
99
import com.kuvaszuptime.kuvasz.tables.records.UptimeEventRecord
10-
import com.kuvaszuptime.kuvasz.util.RawHttpResponse
1110
import com.kuvaszuptime.kuvasz.util.getRedirectionUri
1211
import com.kuvaszuptime.kuvasz.util.isRedirected
1312
import com.kuvaszuptime.kuvasz.util.isSuccess
1413
import io.micronaut.core.io.buffer.ByteBuffer
1514
import io.micronaut.http.HttpHeaders
1615
import io.micronaut.http.HttpRequest
1716
import io.micronaut.http.HttpResponse
17+
import io.micronaut.http.client.HttpClient
1818
import io.micronaut.http.client.HttpClientConfiguration
1919
import io.micronaut.http.client.annotation.Client
2020
import io.micronaut.http.client.exceptions.HttpClientException
2121
import io.micronaut.http.client.exceptions.HttpClientResponseException
22+
import io.micronaut.retry.annotation.Retryable
2223
import io.micronaut.runtime.ApplicationConfiguration
23-
import io.micronaut.rxjava3.http.client.Rx3HttpClient
2424
import jakarta.inject.Singleton
25+
import kotlinx.coroutines.reactive.awaitSingle
2526
import org.slf4j.LoggerFactory
2627
import java.net.URI
2728
import java.time.Duration
@@ -30,7 +31,7 @@ import java.util.*
3031
@Singleton
3132
class UptimeChecker(
3233
@Client(configuration = HttpCheckerClientConfiguration::class)
33-
private val httpClient: Rx3HttpClient,
34+
private val httpClient: HttpClient,
3435
private val eventDispatcher: EventDispatcher,
3536
private val uptimeEventRepository: UptimeEventRepository
3637
) {
@@ -40,50 +41,52 @@ class UptimeChecker(
4041
private val logger = LoggerFactory.getLogger(UptimeChecker::class.java)
4142
}
4243

43-
fun check(monitor: MonitorRecord, uriOverride: URI? = null) {
44+
suspend fun check(monitor: MonitorRecord, uriOverride: URI? = null) {
4445
val previousEvent = uptimeEventRepository.getPreviousEventByMonitorId(monitorId = monitor.id)
45-
var start = 0L
4646

4747
if (uriOverride == null) {
4848
logger.info("Starting uptime check for monitor (${monitor.name}) on URL: ${monitor.url}")
4949
}
5050

5151
@Suppress("TooGenericExceptionCaught")
52-
sendHttpRequest(monitor, uri = uriOverride ?: URI(monitor.url))
53-
.doOnSubscribe { start = System.currentTimeMillis() }
54-
.subscribe(
55-
{ response -> handleResponse(monitor, response, start, previousEvent) },
56-
{ error ->
57-
var clarifiedError = error
58-
val status = try {
59-
(error as? HttpClientResponseException)?.status
60-
} catch (ex: Throwable) {
61-
// Invalid status codes (e.g. 498) are throwing an IllegalArgumentException for example
62-
// Better to have an explicit error, because the status won't be visible later, so it would be
63-
// harder for the users to figure out what was failing during the check
64-
clarifiedError = HttpClientException(ex.message, ex)
65-
null
66-
}
67-
eventDispatcher.dispatch(
68-
MonitorDownEvent(
69-
monitor = monitor,
70-
status = status,
71-
error = clarifiedError,
72-
previousEvent = previousEvent
73-
)
74-
)
75-
}
52+
53+
try {
54+
val start = System.currentTimeMillis()
55+
val response = sendHttpRequest(monitor, uri = uriOverride ?: URI(monitor.url))
56+
val latency = (System.currentTimeMillis() - start).toInt()
57+
58+
handleResponse(monitor, response, latency, previousEvent)
59+
} catch (error: Throwable) {
60+
var clarifiedError = error
61+
val status = try {
62+
(error as? HttpClientResponseException)?.status
63+
} catch (ex: Throwable) {
64+
// Invalid status codes (e.g. 498) are throwing an IllegalArgumentException for example
65+
// Better to have an explicit error, because the status won't be visible later, so it would be
66+
// harder for the users to figure out what was failing during the check
67+
clarifiedError = HttpClientException(ex.message, ex)
68+
null
69+
}
70+
eventDispatcher.dispatch(
71+
MonitorDownEvent(
72+
monitor = monitor,
73+
status = status,
74+
error = clarifiedError,
75+
previousEvent = previousEvent
76+
)
7677
)
78+
}
7779
}
7880

79-
private fun handleResponse(
81+
// TODO handle redirect locations in a way that we pass in a list of previously
82+
// seen redirects to avoid redirect loops
83+
private suspend fun handleResponse(
8084
monitor: MonitorRecord,
8185
response: HttpResponse<ByteBuffer<Any>>,
82-
start: Long,
86+
latency: Int,
8387
previousEvent: UptimeEventRecord?
8488
) {
8589
if (response.isSuccess()) {
86-
val latency = (System.currentTimeMillis() - start).toInt()
8790
eventDispatcher.dispatch(
8891
MonitorUpEvent(
8992
monitor = monitor,
@@ -129,20 +132,24 @@ class UptimeChecker(
129132
}
130133
}
131134

132-
private fun sendHttpRequest(monitor: MonitorRecord, uri: URI): RawHttpResponse {
135+
@Retryable(delay = "5s", attempts = "$RETRY_COUNT", multiplier = "2")
136+
suspend fun sendHttpRequest(monitor: MonitorRecord, uri: URI): HttpResponse<ByteBuffer<Any>> {
137+
logger.debug("Sending HTTP request to $uri (${monitor.name})")
133138
val request = HttpRequest
134139
.create<Any>(
135140
monitor.requestMethod.toMicronautHttpMethod(),
136141
uri.toString()
137142
)
138143
.header(HttpHeaders.ACCEPT, "*/*")
139144
.header(HttpHeaders.ACCEPT_ENCODING, "gzip, deflate, br")
145+
// TODO move it to const and make it overridable
146+
.header(HttpHeaders.USER_AGENT, "Kuvasz Uptime Checker/2 https://github.com/kuvasz-uptime/kuvasz")
140147
.apply {
141148
if (monitor.forceNoCache)
142149
header(HttpHeaders.CACHE_CONTROL, "no-cache")
143150
}
144151

145-
return httpClient.exchange(request).retry(RETRY_COUNT)
152+
return httpClient.exchange(request).awaitSingle()
146153
}
147154
}
148155

src/main/resources/application.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,8 @@ micronaut:
4040
netty:
4141
event-loops:
4242
uptime-check:
43-
num-threads: 40
44-
prefer-native-transport: true
43+
num-threads: 4
44+
prefer-native-transport: false
4545
endpoints:
4646
health:
4747
enabled: true

src/main/resources/logback-dev.xml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,9 @@
99
</encoder>
1010
</appender>
1111

12+
<!-- Need to be configured here, because the logger config in application.yml is evaluated later-->
13+
<logger name="org.jooq.Constants" level="WARN"/>
14+
1215
<root level="INFO">
1316
<appender-ref ref="STDOUT"/>
1417
</root>

src/test/kotlin/com/kuvaszuptime/kuvasz/controllers/InfoEndpointTest.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,13 @@ package com.kuvaszuptime.kuvasz.controllers
22

33
import io.kotest.core.spec.style.BehaviorSpec
44
import io.kotest.matchers.string.shouldContain
5+
import io.micronaut.http.client.HttpClient
56
import io.micronaut.http.client.annotation.Client
6-
import io.micronaut.rxjava3.http.client.Rx3HttpClient
77
import io.micronaut.test.extensions.kotest5.annotation.MicronautTest
88

99
@MicronautTest
1010
class InfoEndpointTest(
11-
@Client("/") private val client: Rx3HttpClient
11+
@Client("/") private val client: HttpClient
1212
) : BehaviorSpec(
1313
{
1414
given("the /info endpoint") {

src/test/kotlin/com/kuvaszuptime/kuvasz/controllers/MonitorControllerTest.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,15 +31,15 @@ import io.kotest.matchers.shouldNotBe
3131
import io.kotest.matchers.string.shouldContain
3232
import io.micronaut.http.HttpRequest
3333
import io.micronaut.http.HttpStatus
34+
import io.micronaut.http.client.HttpClient
3435
import io.micronaut.http.client.annotation.Client
3536
import io.micronaut.http.client.exceptions.HttpClientResponseException
36-
import io.micronaut.rxjava3.http.client.Rx3HttpClient
3737
import io.micronaut.test.extensions.kotest5.annotation.MicronautTest
3838

3939
@Suppress("LongParameterList")
4040
@MicronautTest
4141
class MonitorControllerTest(
42-
@Client("/") private val client: Rx3HttpClient,
42+
@Client("/") private val client: HttpClient,
4343
private val monitorClient: MonitorClient,
4444
private val monitorRepository: MonitorRepository,
4545
private val latencyLogRepository: LatencyLogRepository,

0 commit comments

Comments
 (0)