@@ -2,6 +2,7 @@ package f.cking.software.domain.interactor
2
2
3
3
import f.cking.software.data.helpers.BleScannerHelper
4
4
import f.cking.software.domain.model.DeviceData
5
+ import f.cking.software.domain.model.JournalEntry
5
6
import f.cking.software.domain.model.SavedDeviceHandle
6
7
import f.cking.software.domain.model.isNullOrEmpty
7
8
import f.cking.software.mapParallel
@@ -11,7 +12,8 @@ import kotlinx.coroutines.Job
11
12
import kotlinx.coroutines.async
12
13
import kotlinx.coroutines.coroutineScope
13
14
import kotlinx.coroutines.delay
14
- import kotlinx.coroutines.launch
15
+ import kotlinx.coroutines.flow.channelFlow
16
+ import kotlinx.coroutines.flow.first
15
17
import kotlinx.coroutines.withContext
16
18
import timber.log.Timber
17
19
import kotlin.math.max
@@ -23,89 +25,145 @@ import kotlin.time.Duration.Companion.seconds
23
25
class DeviceServicesFetchingPlanner (
24
26
private val fetchDeviceServiceInfo : FetchDeviceServiceInfo ,
25
27
private val bleScannerHelper : BleScannerHelper ,
28
+ private val saveReportInteractor : SaveReportInteractor ,
26
29
) {
27
30
28
- private var currentJob: Job ? = null
29
31
private var parallelProcessingBatches = PARALLEL_BATCH_COUNT
30
32
private var maxPossibleConnections = PARALLEL_BATCH_COUNT
33
+ private var cooldown: Long? = null
31
34
32
- suspend fun scheduleFetchServiceInfo (devices : List <SavedDeviceHandle >) = coroutineScope {
33
- currentJob?.cancel()
34
- currentJob = launch {
35
- withContext(Dispatchers .IO ) {
36
- val metadataNeeded = devices
37
- .filter { checkIfMetadataUpdateNeeded(it) }
38
- .map { it.device }
39
- .sortedBy { it.rssi }
40
- .reversed()
41
-
42
- Timber .tag(TAG ).i(" Scheduling fetch service info for ${metadataNeeded.size} devices, out of ${devices.size} total" )
43
- try {
44
- fetchAllDevices(metadataNeeded)
45
- // increaseConnections()
46
- } catch (e: FetchDeviceServiceInfo .BluetoothConnectionException .UnspecifiedConnectionError ) {
47
- Timber .tag(TAG ).e(e, " Max connections reached" )
48
- // currentJob?.cancel()
49
- // tooMachConnections()
35
+ suspend fun scheduleFetchServiceInfo (devices : List <SavedDeviceHandle >): List <SavedDeviceHandle > = coroutineScope {
36
+
37
+ val cooldown = cooldown
38
+ if (cooldown != null && System .currentTimeMillis() - cooldown < MIN_COOLDOWN_DURATION_SEC .seconds.inWholeMilliseconds) {
39
+ Timber .tag(TAG ).i(" Device services fetching is on cooldown due to a high errors rate, current batch will be skipped" )
40
+ return @coroutineScope devices
41
+ }
42
+
43
+ val result = devices
44
+ .map { it }
45
+ .associateBy { it.device.address }
46
+ .toMutableMap()
47
+
48
+ var updatedCount = 0
49
+ var errors = 0
50
+ var timeouts = 0
51
+
52
+ withContext(Dispatchers .IO ) {
53
+ val metadataNeeded = devices
54
+ .filter { checkIfMetadataUpdateNeeded(it) }
55
+ .map { it.device }
56
+ .sortedBy { it.rssi }
57
+ .reversed()
58
+
59
+ Timber .tag(TAG ).i(" Scheduling fetch service info for ${metadataNeeded.size} devices, out of ${devices.size} total" )
60
+ val updated = fetchAllDevices(metadataNeeded)
61
+ updated.forEach { fetchFeedback ->
62
+ when (fetchFeedback.feedback) {
63
+ FetchFeedback .SUCCESS -> {
64
+ fetchFeedback.device?.let { device ->
65
+ result[device.address]?.let { previousHandle ->
66
+ result[device.address] = previousHandle.copy(device = device)
67
+ }
68
+ updatedCount++
69
+ }
70
+ }
71
+ FetchFeedback .TIMEOUT -> {
72
+ timeouts++
73
+ }
74
+ FetchFeedback .ERROR -> {
75
+ errors++
76
+ }
50
77
}
51
78
}
52
- }
53
- }
54
79
55
- private fun tooMachConnections () {
56
- maxPossibleConnections = parallelProcessingBatches - 1
57
- parallelProcessingBatches = max(1 , (parallelProcessingBatches * 0.5 ).toInt())
58
- bleScannerHelper.closeAllConnections()
80
+ analyzeFeedback(metadataNeeded.size, updatedCount, timeouts, errors, devices.size)
81
+ result.values.toList()
82
+ }
59
83
}
60
84
61
- private fun increaseConnections () {
62
- parallelProcessingBatches = min(max(1 , (parallelProcessingBatches * 1.2 ).toInt()), maxPossibleConnections)
63
- }
85
+ private suspend fun fetchAllDevices (metadataNeeded : List <DeviceData >): List <DeviceFetchFeedback > = coroutineScope {
86
+ val result = mutableListOf<DeviceFetchFeedback >()
64
87
65
- private suspend fun fetchAllDevices (metadataNeeded : List <DeviceData >) = coroutineScope {
66
88
softTimeout(TOTAL_FETCH_TIMEOUT_SEC .seconds, onTimeout = {
67
89
Timber .tag(TAG ).e(" Timeout fetching total devices" )
68
90
}) {
69
91
metadataNeeded.splitToBatchesEqual(parallelProcessingBatches)
70
92
.filter { it.isNotEmpty() }
71
93
.mapParallel { batch ->
72
94
Timber .tag(TAG ).i(" Processing batch of ${batch.size} devices ($parallelProcessingBatches parallel)" )
73
- batch.forEach { device ->
74
- fetchDevice(device)
95
+ batch.map { device ->
96
+ result + = fetchDevice(device)
75
97
}
76
98
}
77
- Timber .tag(TAG ).i(" All devices processed" )
78
99
}
100
+
101
+ result
79
102
}
80
103
81
- private suspend fun fetchDevice (device : DeviceData ) {
82
- softTimeout(DEVICE_FETCH_TIMEOUT_SEC .seconds, onTimeout = {
104
+ private suspend fun fetchDevice (device : DeviceData ): DeviceFetchFeedback {
105
+ return softTimeout(DEVICE_FETCH_TIMEOUT_SEC .seconds, onTimeout = {
83
106
Timber .tag(TAG ).e(" Timeout fetching device info for ${device.address} " )
107
+ DeviceFetchFeedback (null , FetchFeedback .TIMEOUT )
84
108
}) {
85
109
Timber .tag(TAG ).i(" Fetching device info for ${device.address} , distance: ${device.distance()} " )
86
110
try {
87
111
val result = fetchDeviceServiceInfo.execute(device)
88
112
Timber .tag(TAG ).i(" Fetching complete for ${device.address} . Result: $result " )
113
+ DeviceFetchFeedback (result?.let { device.copy(metadata = it) }, FetchFeedback .SUCCESS )
89
114
} catch (e: FetchDeviceServiceInfo .BluetoothConnectionException ) {
90
115
Timber .tag(TAG ).e(e, " Error when connecting to device ${device.address} " )
116
+ DeviceFetchFeedback (null , FetchFeedback .ERROR )
91
117
}
92
118
}
93
119
}
94
120
95
- private suspend fun softTimeout (timeout : Duration , onTimeout : suspend () -> Unit , block : suspend () -> Unit ) = coroutineScope {
96
- var timeoutJob: Job ? = null
97
- val primaryJob = async {
98
- block.invoke()
99
- timeoutJob?.cancel()
100
- }
121
+ private data class DeviceFetchFeedback (
122
+ val device : DeviceData ? ,
123
+ val feedback : FetchFeedback ,
124
+ )
101
125
102
- timeoutJob = async {
103
- delay(timeout)
104
- primaryJob.cancel()
105
- onTimeout.invoke()
126
+ private enum class FetchFeedback {
127
+ TIMEOUT ,
128
+ ERROR ,
129
+ SUCCESS ,
130
+ }
131
+
132
+ private suspend fun analyzeFeedback (
133
+ updateNeeded : Int ,
134
+ updated : Int ,
135
+ timeouts : Int ,
136
+ errors : Int ,
137
+ total : Int ,
138
+ ) {
139
+ Timber .tag(TAG ).i(" Deep analysis finished. Candidates: $updateNeeded (updated: $updated , timeouts: $timeouts , errors: $errors ), total $total devices" )
140
+ if (updateNeeded > 5 && errors / updateNeeded > 0.7 ) {
141
+ val report = JournalEntry .Report .Error (
142
+ title = " Too many errors during deep analysis. Restart bluetooth or disable deep analysis in settings" ,
143
+ stackTrace = " Errors: $errors , timeouts: $timeouts , updated: $updated , in total: $updateNeeded "
144
+ )
145
+ saveReportInteractor.execute(report)
146
+ cooldown = System .currentTimeMillis()
106
147
}
107
148
}
108
149
150
+ private suspend fun <T > softTimeout (timeout : Duration , onTimeout : suspend () -> T , block : suspend () -> T ): T = coroutineScope {
151
+ channelFlow<T > {
152
+ var timeoutJob: Job ? = null
153
+ val primaryJob = async {
154
+ val result = block.invoke()
155
+ timeoutJob?.cancel()
156
+ send(result)
157
+ }
158
+
159
+ timeoutJob = async {
160
+ delay(timeout)
161
+ primaryJob.cancel()
162
+ send(onTimeout.invoke())
163
+ }
164
+ }.first()
165
+ }
166
+
109
167
private fun checkIfMetadataUpdateNeeded (device : SavedDeviceHandle ): Boolean {
110
168
if (! device.device.isConnectable) {
111
169
return false
@@ -121,11 +179,22 @@ class DeviceServicesFetchingPlanner(
121
179
|| ! recentlyChecked
122
180
}
123
181
182
+ private fun tooMachConnections () {
183
+ maxPossibleConnections = parallelProcessingBatches - 1
184
+ parallelProcessingBatches = max(1 , (parallelProcessingBatches * 0.5 ).toInt())
185
+ bleScannerHelper.closeAllConnections()
186
+ }
187
+
188
+ private fun increaseConnections () {
189
+ parallelProcessingBatches = min(max(1 , (parallelProcessingBatches * 1.2 ).toInt()), maxPossibleConnections)
190
+ }
191
+
124
192
companion object {
125
- private const val PARALLEL_BATCH_COUNT = 6
193
+ private const val PARALLEL_BATCH_COUNT = 10
126
194
private const val CHECK_INTERVAL_PER_DEVICE_MIN = 10
127
195
private const val DEVICE_FETCH_TIMEOUT_SEC = 5
128
196
private const val TOTAL_FETCH_TIMEOUT_SEC = 30
197
+ private const val MIN_COOLDOWN_DURATION_SEC = 60
129
198
private const val TAG = " DeviceServicesFetchingPlanner"
130
199
}
131
200
}
0 commit comments