Skip to content

Commit

Permalink
Merge pull request #95 from Parsely/engagement_manager_coroutines
Browse files Browse the repository at this point in the history
Engagement manager to coroutines
  • Loading branch information
wzieba authored Dec 4, 2023
2 parents 62cfa12 + d282f8b commit ca714a7
Show file tree
Hide file tree
Showing 8 changed files with 328 additions and 195 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import okhttp3.mockwebserver.MockResponse
import okhttp3.mockwebserver.MockWebServer
import okhttp3.mockwebserver.RecordedRequest
import org.assertj.core.api.Assertions.assertThat
import org.assertj.core.api.Assertions.within
import org.junit.Test
import org.junit.runner.RunWith

Expand Down Expand Up @@ -190,6 +191,96 @@ class FunctionalTests {
}
}

/**
* In this scenario consumer app starts an engagement session and after 27150 ms,
* it stops the session.
*
* Intervals:
* With current implementation of `HeartbeatIntervalCalculator`, the next intervals are:
* - 10500ms for the first interval
* - 13650ms for the second interval
*
* So after ~27,2s we should observe
* - 2 `heartbeat` events from `startEngagement` + 1 `heartbeat` event caused by `stopEngagement` which is triggered during engagement interval
*
* Time off-differences in assertions are acceptable, because it's a time-sensitive test
*/
@Test
fun engagementManagerTest() {
val engagementUrl = "engagementUrl"
var startTimestamp = Duration.ZERO
val firstInterval = 10500.milliseconds
val secondInterval = 13650.milliseconds
val pauseInterval = 3.seconds
ActivityScenario.launch(SampleActivity::class.java).use { scenario ->
// given
scenario.onActivity { activity: Activity ->
beforeEach(activity)
server.enqueue(MockResponse().setResponseCode(200))
parselyTracker = initializeTracker(activity, flushInterval = 30.seconds)

// when
startTimestamp = System.currentTimeMillis().milliseconds
parselyTracker.startEngagement(engagementUrl, null)
}

Thread.sleep((firstInterval + secondInterval + pauseInterval).inWholeMilliseconds)
parselyTracker.stopEngagement()

// then
val request = server.takeRequest(35, TimeUnit.SECONDS)!!.toMap()["events"]!!

assertThat(
request.sortedBy { it.data.timestamp }
.filter { it.action == "heartbeat" }
).hasSize(3)
.satisfies({
val firstEvent = it[0]
val secondEvent = it[1]
val thirdEvent = it[2]

assertThat(firstEvent.data.timestamp).isCloseTo(
(startTimestamp + firstInterval).inWholeMilliseconds,
within(1.seconds.inWholeMilliseconds)
)
assertThat(firstEvent.totalTime).isCloseTo(
firstInterval.inWholeMilliseconds,
within(100L)
)
assertThat(firstEvent.incremental).isCloseTo(
firstInterval.inWholeSeconds,
within(1L)
)

assertThat(secondEvent.data.timestamp).isCloseTo(
(startTimestamp + firstInterval + secondInterval).inWholeMilliseconds,
within(1.seconds.inWholeMilliseconds)
)
assertThat(secondEvent.totalTime).isCloseTo(
(firstInterval + secondInterval).inWholeMilliseconds,
within(100L)
)
assertThat(secondEvent.incremental).isCloseTo(
secondInterval.inWholeSeconds,
within(1L)
)

assertThat(thirdEvent.data.timestamp).isCloseTo(
(startTimestamp + firstInterval + secondInterval + pauseInterval).inWholeMilliseconds,
within(1.seconds.inWholeMilliseconds)
)
assertThat(thirdEvent.totalTime).isCloseTo(
(firstInterval + secondInterval + pauseInterval).inWholeMilliseconds,
within(100L)
)
assertThat(thirdEvent.incremental).isCloseTo(
(pauseInterval).inWholeSeconds,
within(1L)
)
})
}
}

private fun RecordedRequest.toMap(): Map<String, List<Event>> {
val listType: TypeReference<Map<String, List<Event>>> =
object : TypeReference<Map<String, List<Event>>>() {}
Expand All @@ -200,6 +291,15 @@ class FunctionalTests {
@JsonIgnoreProperties(ignoreUnknown = true)
data class Event(
@JsonProperty("idsite") var idsite: String,
@JsonProperty("action") var action: String,
@JsonProperty("data") var data: ExtraData,
@JsonProperty("tt") var totalTime: Long,
@JsonProperty("inc") var incremental: Long,
)

@JsonIgnoreProperties(ignoreUnknown = true)
data class ExtraData(
@JsonProperty("ts") var timestamp: Long,
)

private val locallyStoredEvents
Expand Down
3 changes: 2 additions & 1 deletion parsely/src/main/java/com/parsely/parselyandroid/Clock.kt
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@ package com.parsely.parselyandroid

import java.util.Calendar
import java.util.TimeZone
import kotlin.time.Duration
import kotlin.time.Duration.Companion.milliseconds

open class Clock {
open val now
open val now: Duration
get() = Calendar.getInstance(TimeZone.getTimeZone("UTC")).timeInMillis.milliseconds
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package com.parsely.parselyandroid

import kotlin.time.Duration
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.delay
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch

/**
* Engagement manager for article and video engagement.
*
*
* Implemented to handle its own queuing of future executions to accomplish
* two things:
*
*
* 1. Flushing any engaged time before canceling.
* 2. Progressive backoff for long engagements to save data.
*/
internal class EngagementManager(
private val parselyTracker: ParselyTracker,
private var latestDelayMillis: Long,
private val baseEvent: Map<String, Any>,
private val intervalCalculator: HeartbeatIntervalCalculator,
private val coroutineScope: CoroutineScope,
private val clock: Clock,
) {
private var job: Job? = null
private var totalTime: Long = 0
private var nextScheduledExecution: Long = 0

val isRunning: Boolean
get() = job?.isActive ?: false

fun start() {
val startTime = clock.now
job = coroutineScope.launch {
while (isActive) {
latestDelayMillis = intervalCalculator.calculate(startTime)
nextScheduledExecution = clock.now.inWholeMilliseconds + latestDelayMillis
delay(latestDelayMillis)
doEnqueue(clock.now.inWholeMilliseconds)
}
}
}

fun stop() {
job?.let {
it.cancel()
doEnqueue(nextScheduledExecution)
}
}

fun isSameVideo(url: String, urlRef: String, metadata: ParselyVideoMetadata): Boolean {
val baseMetadata = baseEvent["metadata"] as Map<String, Any>?
return baseEvent["url"] == url && baseEvent["urlref"] == urlRef && baseMetadata!!["link"] == metadata.link && baseMetadata["duration"] as Int == metadata.durationSeconds
}

private fun doEnqueue(scheduledExecutionTime: Long) {
// Create a copy of the base event to enqueue
val event: MutableMap<String, Any> = HashMap(
baseEvent
)
ParselyTracker.PLog(String.format("Enqueuing %s event.", event["action"]))

// Update `ts` for the event since it's happening right now.
val baseEventData = (event["data"] as Map<String, Any>?)!!
val data: MutableMap<String, Any> = HashMap(baseEventData)
data["ts"] = clock.now.inWholeMilliseconds
event["data"] = data

// Adjust inc by execution time in case we're late or early.
val executionDiff = clock.now.inWholeMilliseconds - scheduledExecutionTime
val inc = latestDelayMillis + executionDiff
totalTime += inc
event["inc"] = inc / 1000
event["tt"] = totalTime
parselyTracker.enqueueEvent(event)
}

val intervalMillis: Double
get() = latestDelayMillis.toDouble()
}
Original file line number Diff line number Diff line change
@@ -1,18 +1,15 @@
package com.parsely.parselyandroid

import java.util.Calendar
import kotlin.time.Duration.Companion.hours
import kotlin.time.Duration.Companion.milliseconds
import kotlin.time.Duration
import kotlin.time.Duration.Companion.minutes
import kotlin.time.Duration.Companion.seconds

internal open class HeartbeatIntervalCalculator(private val clock: Clock) {

open fun calculate(startTime: Calendar): Long {
val startTimeDuration = startTime.time.time.milliseconds
open fun calculate(startTime: Duration): Long {
val nowDuration = clock.now

val totalTrackedTime = nowDuration - startTimeDuration
val totalTrackedTime = nowDuration - startTime
val totalWithOffset = totalTrackedTime + OFFSET_MATCHING_BASE_INTERVAL
val newInterval = totalWithOffset * BACKOFF_PROPORTION
val clampedNewInterval = minOf(MAX_TIME_BETWEEN_HEARTBEATS, newInterval)
Expand Down
Loading

0 comments on commit ca714a7

Please sign in to comment.