Skip to content

Commit

Permalink
Add timeout to Peer connection (#448)
Browse files Browse the repository at this point in the history
A dynamic timeout is used when connecting to Peer/Electrum.

Note that the timeouts depend on the number of consecutive
failed connection attempts, and whether TOR is enabled or
not.

This commits uses the new Peer connection timeouts added in
ACINQ/lightning-kmp#541
  • Loading branch information
dpad85 authored Nov 16, 2023
1 parent cdf9c16 commit 816754b
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 18 deletions.
2 changes: 1 addition & 1 deletion buildSrc/src/main/kotlin/Versions.kt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
object Versions {
const val lightningKmp = "1.5.11"
const val lightningKmp = "1.5.12-SNAPSHOT"
const val secp256k1 = "0.10.1"
const val torMobile = "0.2.0"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ abstract class AppContext : Application() {
log.debug("fetching context from ${Constants.WALLET_CONTEXT_URL}")
Wallet.httpClient.newCall(Request.Builder().url(Constants.WALLET_CONTEXT_URL).build()).enqueue(object : Callback {
override fun onFailure(call: Call, e: IOException) {
log.warn("failed to fetch context from ${Constants.WALLET_CONTEXT_URL}: ", e)
log.warn("failed to fetch context from ${Constants.WALLET_CONTEXT_URL}: ${e.message}")
fetchWalletContextFallback(context)
}

Expand Down Expand Up @@ -174,7 +174,7 @@ abstract class AppContext : Application() {
log.debug("fetching context from ${Constants.WALLET_CONTEXT_URL_FALLBACK}")
Wallet.httpClient.newCall(Request.Builder().url(Constants.WALLET_CONTEXT_URL_FALLBACK).build()).enqueue(object : Callback {
override fun onFailure(call: Call, e: IOException) {
log.warn("failed to fetch context from ${Constants.WALLET_CONTEXT_URL_FALLBACK}: ", e)
log.warn("failed to fetch context from ${Constants.WALLET_CONTEXT_URL_FALLBACK}: ${e.message}")
}

override fun onResponse(call: Call, response: Response) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,11 +264,29 @@ class AppConnectionsDaemon(
peerConnectionJob = connectionLoop(
name = "Peer",
statusStateFlow = peer.connectionState,
) {
) { connectionAttempt ->
peer.socketBuilder = tcpSocketBuilder()
try {
logger.info { "calling Peer.connect" }
peer.connect()
val connectTimeout = when {
it.torIsEnabled && connectionAttempt <= 6 -> 15.seconds
it.torIsEnabled -> 30.seconds
connectionAttempt <= 1 -> 1.seconds
connectionAttempt <= 3 -> 2.seconds
connectionAttempt <= 6 -> 4.seconds
connectionAttempt <= 10 -> 7.seconds
else -> 10.seconds
}
val handshakeTimeout = when {
it.torIsEnabled && connectionAttempt <= 6 -> 20.seconds
it.torIsEnabled -> 40.seconds
connectionAttempt <= 1 -> 2.seconds
connectionAttempt <= 3 -> 4.seconds
connectionAttempt <= 6 -> 7.seconds
connectionAttempt <= 10 -> 10.seconds
else -> 15.seconds
}
logger.info { "calling Peer.connect with connect_timeout=$connectTimeout handshake_timeout=$handshakeTimeout" }
peer.connect(connectTimeout = connectTimeout, handshakeTimeout = handshakeTimeout)
} catch (e: Exception) {
logger.error { "error when connecting to peer: ${e.message ?: e::class.simpleName}" }
}
Expand Down Expand Up @@ -305,7 +323,7 @@ class AppConnectionsDaemon(
electrumConnectionJob = connectionLoop(
name = "Electrum",
statusStateFlow = electrumClient.connectionStatus.map { it.toConnectionState() }.stateIn(this)
) {
) { connectionAttempt ->
val electrumServerAddress: ServerAddress? = configurationManager.electrumConfig.value?.let { electrumConfig ->
when (electrumConfig) {
is ElectrumConfig.Custom -> electrumConfig.server
Expand All @@ -316,8 +334,16 @@ class AppConnectionsDaemon(
logger.debug { "ignoring electrum connection opportunity because no server is configured yet" }
} else {
try {
logger.info { "calling ElectrumClient.connect to server=$electrumServerAddress" }
electrumClient.connect(electrumServerAddress, tcpSocketBuilder())
val handshakeTimeout = when {
it.torIsEnabled && connectionAttempt <= 6 -> 20.seconds
it.torIsEnabled -> 40.seconds
connectionAttempt <= 3 -> 4.seconds
connectionAttempt <= 6 -> 7.seconds
connectionAttempt <= 10 -> 15.seconds
else -> 20.seconds
}
logger.info { "calling ElectrumClient.connect to server=$electrumServerAddress with handshake_timeout=$handshakeTimeout" }
electrumClient.connect(electrumServerAddress, tcpSocketBuilder(), timeout = handshakeTimeout)
} catch (e: Exception) {
logger.error { "error when connecting to electrum: ${e.message ?: e::class.simpleName}"}
}
Expand Down Expand Up @@ -456,27 +482,43 @@ class AppConnectionsDaemon(
* are throttled with an exponential backoff. This pause cannot be cancelled, which can be an issue if the network
* conditions have just changed and we want to reconnect immediately. In this case this job should be cancelled
* altogether and restarted.
*
* @param connect the parameter is the current number of failed consecutive attempts. If this counter is large (i.e.
* we have connection issues), the internal connection method should use more lax parameters. Conversely, if
* we've just started the connection loop, the connection method should fail fast to provide a snappier UX.
*/
private fun connectionLoop(
name: String,
statusStateFlow: StateFlow<Connection>,
connect: suspend () -> Unit
connect: suspend (Int) -> Unit
) = launch {
var pause = Duration.ZERO
// tracks how many failed connection attempts have been made in a row
// when connection keeps failing, this loop is paused for a bit
var connectionCounter = 0
statusStateFlow.collect {
if (it is Connection.CLOSED) {
logger.info { "next $name connection attempt in $pause" }
val pause = connectionPause(connectionCounter)
logger.info { "next $name connection attempt #$connectionCounter in $pause" }
delay(pause)
val minPause = 0.25.seconds
val maxPause = 8.seconds
pause = (pause.coerceAtLeast(minPause) * 2).coerceAtMost(maxPause)
connect()
connectionCounter++
connect(connectionCounter)
} else if (it == Connection.ESTABLISHED) {
// reset the pause so that next disconnection is snappy
pause = 0.5.seconds
connectionCounter = 0
}
}
}

private fun connectionPause(attemptCount: Int): Duration {
return when {
attemptCount <= 0 -> 0.1.seconds
attemptCount == 1 -> 0.25.seconds
attemptCount == 2 -> 0.5.seconds
attemptCount == 3 -> 1.seconds
attemptCount == 4 -> 2.seconds
attemptCount == 5 -> 4.seconds
else -> 8.seconds
}
}
}

/** The start function must run on a different dispatcher depending on the platform. */
Expand Down

0 comments on commit 816754b

Please sign in to comment.