Skip to content

Commit

Permalink
Merge pull request #92 from Parsely/coroutines-send-event
Browse files Browse the repository at this point in the history
Coroutines: send events flow
  • Loading branch information
wzieba authored Nov 28, 2023
2 parents 845947b + 784a021 commit 62cfa12
Show file tree
Hide file tree
Showing 15 changed files with 453 additions and 352 deletions.
21 changes: 3 additions & 18 deletions example/src/main/java/com/example/MainActivity.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,33 +30,18 @@ protected void onCreate(Bundle savedInstanceState) {
// Set debugging to true so we don't actually send things to Parse.ly
ParselyTracker.sharedInstance().setDebug(true);

final TextView queueView = (TextView) findViewById(R.id.queue_size);
queueView.setText(String.format("Queued events: %d", ParselyTracker.sharedInstance().queueSize()));

final TextView storedView = (TextView) findViewById(R.id.stored_size);
storedView.setText(String.format("Stored events: %d", ParselyTracker.sharedInstance().storedEventsCount()));

final TextView intervalView = (TextView) findViewById(R.id.interval);
storedView.setText(String.format("Flush interval: %d", ParselyTracker.sharedInstance().getFlushInterval()));

updateEngagementStrings();

final TextView views[] = new TextView[3];
views[0] = queueView;
views[1] = storedView;
views[2] = intervalView;
final TextView views[] = new TextView[1];
views[0] = intervalView;

final Handler mHandler = new Handler() {
@Override
public void handleMessage(Message msg) {
TextView[] v = (TextView[]) msg.obj;
TextView qView = v[0];
qView.setText(String.format("Queued events: %d", ParselyTracker.sharedInstance().queueSize()));

TextView sView = v[1];
sView.setText(String.format("Stored events: %d", ParselyTracker.sharedInstance().storedEventsCount()));

TextView iView = v[2];
TextView iView = v[0];
if (ParselyTracker.sharedInstance().flushTimerIsActive()) {
iView.setText(String.format("Flush Interval: %d", ParselyTracker.sharedInstance().getFlushInterval()));
} else {
Expand Down
19 changes: 2 additions & 17 deletions example/src/main/res/layout/activity_main.xml
Original file line number Diff line number Diff line change
Expand Up @@ -68,26 +68,11 @@
android:onClick="trackReset"
android:text="@string/button_reset_video" />

<TextView
android:id="@+id/queue_size"
android:layout_width="wrap_content"
android:layout_height="wrap_content"
android:layout_below="@+id/reset_video_button"
android:layout_centerHorizontal="true"
android:text="Queued events: 0" />

<TextView android:id="@+id/stored_size"
android:layout_width="wrap_content"
android:layout_height="wrap_content"
android:layout_centerHorizontal="true"
android:layout_below="@id/queue_size"
android:text="Stored events: 0"/>

<TextView android:id="@+id/interval"
android:layout_width="wrap_content"
android:layout_height="wrap_content"
android:layout_centerHorizontal="true"
android:layout_below="@id/stored_size"
android:layout_below="@id/reset_video_button"
android:text="Flush timer inactive"/>

<TextView
Expand All @@ -107,4 +92,4 @@
android:text="Video is inactive." />


</RelativeLayout>
</RelativeLayout>
Original file line number Diff line number Diff line change
Expand Up @@ -214,13 +214,12 @@ class FunctionalTests {
activity: Activity,
flushInterval: Duration = defaultFlushInterval
): ParselyTracker {
val field: Field = ParselyTracker::class.java.getDeclaredField("ROOT_URL")
field.isAccessible = true
field.set(this, url)
return ParselyTracker.sharedInstance(
siteId, flushInterval.inWholeSeconds.toInt(), activity.application
).apply {
val f: Field = this::class.java.getDeclaredField("ROOT_URL")
f.isAccessible = true
f.set(this, url)
}
)
}

private companion object {
Expand Down
25 changes: 17 additions & 8 deletions parsely/src/main/java/com/parsely/parselyandroid/FlushManager.kt
Original file line number Diff line number Diff line change
Expand Up @@ -13,26 +13,35 @@ import kotlinx.coroutines.launch
* Handles stopping and starting the flush timer. The flush timer
* controls how often we send events to Parse.ly servers.
*/
internal class FlushManager(
private val parselyTracker: ParselyTracker,
val intervalMillis: Long,
internal interface FlushManager {
fun start()
fun stop()
val isRunning: Boolean
val intervalMillis: Long
}

internal class ParselyFlushManager(
private val onFlush: () -> Unit,
override val intervalMillis: Long,
private val coroutineScope: CoroutineScope
) {
) : FlushManager {
private var job: Job? = null

fun start() {
override fun start() {
if (job?.isActive == true) return

job = coroutineScope.launch {
while (isActive) {
delay(intervalMillis)
parselyTracker.flushEvents()
onFlush.invoke()
}
}
}

fun stop() = job?.cancel()
override fun stop() {
job?.cancel()
}

val isRunning: Boolean
override val isRunning: Boolean
get() = job?.isActive ?: false
}
51 changes: 51 additions & 0 deletions parsely/src/main/java/com/parsely/parselyandroid/FlushQueue.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package com.parsely.parselyandroid

import com.parsely.parselyandroid.JsonSerializer.toParselyEventsPayload
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.launch
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock

internal class FlushQueue(
private val flushManager: FlushManager,
private val repository: QueueRepository,
private val restClient: RestClient,
private val scope: CoroutineScope
) {

private val mutex = Mutex()

operator fun invoke(skipSendingEvents: Boolean) {
scope.launch {
mutex.withLock {
val eventsToSend = repository.getStoredQueue()

if (eventsToSend.isEmpty()) {
flushManager.stop()
return@launch
}

if (skipSendingEvents) {
ParselyTracker.PLog("Debug mode on. Not sending to Parse.ly. Otherwise, would sent ${eventsToSend.size} events")
repository.remove(eventsToSend)
return@launch
}
ParselyTracker.PLog("Sending request with %d events", eventsToSend.size)
val jsonPayload = toParselyEventsPayload(eventsToSend)
ParselyTracker.PLog("POST Data %s", jsonPayload)
ParselyTracker.PLog("Requested %s", ParselyTracker.ROOT_URL)
restClient.send(jsonPayload)
.fold(
onSuccess = {
ParselyTracker.PLog("Pixel request success")
repository.remove(eventsToSend)
},
onFailure = {
ParselyTracker.PLog("Pixel request exception")
ParselyTracker.PLog(it.toString())
}
)
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import kotlinx.coroutines.sync.withLock

internal class InMemoryBuffer(
private val coroutineScope: CoroutineScope,
private val localStorageRepository: LocalStorageRepository,
private val localStorageRepository: QueueRepository,
private val onEventAddedListener: () -> Unit,
) {

Expand Down
32 changes: 32 additions & 0 deletions parsely/src/main/java/com/parsely/parselyandroid/JsonSerializer.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package com.parsely.parselyandroid

import com.fasterxml.jackson.databind.ObjectMapper
import java.io.IOException
import java.io.StringWriter

internal object JsonSerializer {

fun toParselyEventsPayload(eventsToSend: List<Map<String, Any?>?>): String {
val batchMap: MutableMap<String, Any> = HashMap()
batchMap["events"] = eventsToSend
return toJson(batchMap).orEmpty()
}
/**
* Encode an event Map as JSON.
*
* @param map The Map object to encode as JSON.
* @return The JSON-encoded value of `map`.
*/
private fun toJson(map: Map<String, Any>): String? {
val mapper = ObjectMapper()
var ret: String? = null
try {
val strWriter = StringWriter()
mapper.writeValue(strWriter, map)
ret = strWriter.toString()
} catch (e: IOException) {
e.printStackTrace()
}
return ret
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,16 @@ import java.io.EOFException
import java.io.FileNotFoundException
import java.io.ObjectInputStream
import java.io.ObjectOutputStream
import kotlinx.coroutines.currentCoroutineContext
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock

internal open class LocalStorageRepository(private val context: Context) {
internal interface QueueRepository {
suspend fun remove(toRemove: List<Map<String, Any?>?>)
suspend fun getStoredQueue(): ArrayList<Map<String, Any?>?>
suspend fun insertEvents(toInsert: List<Map<String, Any?>?>)
}

internal class LocalStorageRepository(private val context: Context) : QueueRepository {

private val mutex = Mutex()

Expand All @@ -33,23 +38,7 @@ internal open class LocalStorageRepository(private val context: Context) {
}
}

/**
* Delete the stored queue from persistent storage.
*/
fun purgeStoredQueue() {
persistObject(ArrayList<Map<String, Any>>())
}

fun remove(toRemove: List<Map<String, Any>>) {
persistObject(getStoredQueue() - toRemove.toSet())
}

/**
* Get the stored event queue from persistent storage.
*
* @return The stored queue of events.
*/
open fun getStoredQueue(): ArrayList<Map<String, Any?>?> {
private fun getInternalStoredQueue(): ArrayList<Map<String, Any?>?> {
var storedQueue: ArrayList<Map<String, Any?>?> = ArrayList()
try {
val fis = context.applicationContext.openFileInput(STORAGE_KEY)
Expand All @@ -71,19 +60,26 @@ internal open class LocalStorageRepository(private val context: Context) {
return storedQueue
}

override suspend fun remove(toRemove: List<Map<String, Any?>?>) = mutex.withLock {
val storedEvents = getInternalStoredQueue()
persistObject(storedEvents - toRemove.toSet())
}

/**
* Delete an event from the stored queue.
* Get the stored event queue from persistent storage.
*
* @return The stored queue of events.
*/
open fun expelStoredEvent() {
val storedQueue = getStoredQueue()
storedQueue.removeAt(0)
override suspend fun getStoredQueue(): ArrayList<Map<String, Any?>?> = mutex.withLock {
getInternalStoredQueue()
}

/**
* Save the event queue to persistent storage.
*/
open suspend fun insertEvents(toInsert: List<Map<String, Any?>?>) = mutex.withLock {
persistObject(ArrayList((toInsert + getStoredQueue()).distinct()))
override suspend fun insertEvents(toInsert: List<Map<String, Any?>?>) = mutex.withLock {
val storedEvents = getInternalStoredQueue()
persistObject(ArrayList((toInsert + storedEvents).distinct()))
}

companion object {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,50 +13,32 @@
See the License for the specific language governing permissions and
limitations under the License.
*/
@file:Suppress("DEPRECATION")
package com.parsely.parselyandroid

import android.os.AsyncTask
import java.net.HttpURLConnection
import java.net.URL

internal class ParselyAPIConnection(private val tracker: ParselyTracker) : AsyncTask<String?, Exception?, Void?>() {
private var exception: Exception? = null
internal interface RestClient {
suspend fun send(payload: String): Result<Unit>
}

@Deprecated("Deprecated in Java")
override fun doInBackground(vararg data: String?): Void? {
internal class ParselyAPIConnection(private val url: String) : RestClient {
override suspend fun send(payload: String): Result<Unit> {
var connection: HttpURLConnection? = null
try {
if (data.size == 1) { // non-batched (since no post data is included)
connection = URL(data[0]).openConnection() as HttpURLConnection
connection.inputStream
} else if (data.size == 2) { // batched (post data included)
connection = URL(data[0]).openConnection() as HttpURLConnection
connection.doOutput = true // Triggers POST (aka silliest interface ever)
connection.setRequestProperty("Content-Type", "application/json")
val output = connection.outputStream
output.write(data[1]?.toByteArray())
output.close()
connection.inputStream
}
connection = URL(url).openConnection() as HttpURLConnection
connection.doOutput = true
connection.setRequestProperty("Content-Type", "application/json")
val output = connection.outputStream
output.write(payload.toByteArray())
output.close()
connection.inputStream
} catch (ex: Exception) {
exception = ex
return Result.failure(ex)
} finally {
connection?.disconnect()
}
return null
}

@Deprecated("Deprecated in Java")
override fun onPostExecute(result: Void?) {
if (exception != null) {
ParselyTracker.PLog("Pixel request exception")
ParselyTracker.PLog(exception.toString())
} else {
ParselyTracker.PLog("Pixel request success")

// only purge the queue if the request was successful
tracker.purgeEventsQueue()
ParselyTracker.PLog("Event queue empty, flush timer cleared.")
tracker.stopFlushTimer()
}
return Result.success(Unit)
}
}
Loading

0 comments on commit 62cfa12

Please sign in to comment.