Skip to content
This repository has been archived by the owner on Sep 9, 2022. It is now read-only.

Commit

Permalink
Updates for v1.0.0-RC4
Browse files Browse the repository at this point in the history
  • Loading branch information
serebit committed May 10, 2021
1 parent 80e3bae commit 2e944b3
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 58 deletions.
2 changes: 1 addition & 1 deletion build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ plugins {
}

group = "io.streamcord"
version = "1.0.0-RC3"
version = "1.0.0-RC4"

repositories {
mavenCentral()
Expand Down
75 changes: 29 additions & 46 deletions src/main/kotlin/App.kt
Original file line number Diff line number Diff line change
Expand Up @@ -45,27 +45,31 @@ suspend fun main() = coroutineScope<Unit> {
TwitchServer.create(database, config.amqp).start()
twitchClient.awaitCallbackAccess(TwitchServer.rootHttpStatus)

// synchronize subscriptions between DB and twitch
syncSubscriptions(twitchClient, database.subscriptions) { workerInfo shouldHandle it }
if (workerInfo.index == 0L) {
// synchronize subscriptions between DB and twitch
syncSubscriptions(twitchClient, database, removeFromDB = true)
}

val eventHandler = EventHandler(4, workerInfo)
eventHandler.collectIn(this)

// find subscriptions without an associated notification and remove them
database.subscriptions.find().forEach {
val userID = it.getLong("user_id")
val doc = Document("streamer_id", userID.toString())

if (database.notifications.countDocuments(doc) == 0L) {
val subID = it.getString("sub_id")
logger.info("Subscription with ID $subID has no associated notifications, attempting removal")
eventHandler.submitEvent(userID) {
if (twitchClient.removeSubscription(subID)) {
database.subscriptions.deleteOne(it)
database.subscriptions.find()
.filter { workerInfo shouldHandle it.getLong("user_id") }
.forEach {
val userID = it.getLong("user_id")
val doc = Document("streamer_id", userID.toString())

if (database.notifications.countDocuments(doc) == 0L) {
val subID = it.getString("sub_id")
logger.info("Subscription with ID $subID has no associated notifications, attempting removal")
eventHandler.submitEvent(userID) {
if (twitchClient.removeSubscription(subID)) {
database.subscriptions.deleteOne(it)
}
}
}
}
}

database.notifications.find().forEach { doc ->
val streamerID = doc.getString("streamer_id")?.toLong() ?: return@forEach
Expand Down Expand Up @@ -134,51 +138,32 @@ private val Any.ansiBold get() = "\u001B[1m$this\u001B[0m"
*/
private suspend fun syncSubscriptions(
twitchClient: TwitchClient,
collection: MongoCollection<Document>,
filter: (Long) -> Boolean
database: DatabaseController,
removeFromDB: Boolean
) {
val twitchSubscriptions = twitchClient.fetchExistingSubscriptions()
.associateBy { it.id }
.filter { filter(it.value.condition.broadcaster_user_id) }
logger.info("Fetched existing subscriptions from Twitch. Count for this worker: ${twitchSubscriptions.size}")
val twitchSubscriptions = twitchClient.fetchExistingSubscriptions().associateBy { it.id }
logger.info("Fetched existing subscriptions from Twitch. Count: ${twitchSubscriptions.size}")

val dbSubscriptions = collection.find()
.filter { filter(it.getLong("user_id")) }
.associateBy { it.getString("sub_id") }
logger.info("Found existing subscriptions in DB. Count for this worker: ${dbSubscriptions.size}")

class MissingSubscription(val userID: Long, val type: String)
val dbSubscriptions = database.subscriptions.find().associateBy { it.getString("sub_id") }
logger.info("Found existing subscriptions in DB. Count: ${dbSubscriptions.size}")

val missingFromTwitch = dbSubscriptions
.filter { it.key !in twitchSubscriptions }
.mapValues {
val userID = it.value.getLong("user_id")
val type = it.value.getString("type")
MissingSubscription(userID, type)
}
.also { logger.info("Found ${it.size.ansiBold} subscriptions in DB that were not reported by Twitch") }

val missingFromDB = twitchSubscriptions
.filter { it.key !in dbSubscriptions }
.mapValues { MissingSubscription(it.value.condition.broadcaster_user_id, it.value.type) }
.also { logger.info("Received ${it.size.ansiBold} subscriptions from Twitch that were not in the DB") }

suspend fun replaceSubscription(sub: MissingSubscription) {
val secret = generateSecret()
twitchClient.createSubscription(sub.userID, sub.type, secret)?.let {
collection.insertSubscription(twitchClient.clientID, secret, it)
logger.info("Recreated missing subscription for user with ID ${sub.userID} and type ${sub.type}")
}
}

missingFromTwitch.forEach {
collection.findOneAndDelete(Document("sub_id", it.key))
replaceSubscription(it.value)
if (removeFromDB) {
missingFromTwitch
.onEach { database.subscriptions.findOneAndDelete(Document("sub_id", it.key)) }
.also { logger.info("Removed ${it.size.ansiBold} subscriptions from the DB that were not reported by Twitch") }
}

missingFromDB
.filter { twitchClient.removeSubscription(it.key) }
.forEach { replaceSubscription(it.value) }
.also { logger.info("Removed ${it.size.ansiBold} subscriptions from Twitch that were not in the database") }
}

suspend fun maybeCreateSubscriptions(
Expand All @@ -197,9 +182,8 @@ suspend fun maybeCreateSubscriptions(
}

suspend fun createSubscription(type: String, secret: String) {
twitchClient.createSubscription(streamerID, type, secret)?.let {
twitchClient.createSubscription(streamerID, type, secret)?.forEach {
subscriptions.insertSubscription(twitchClient.clientID, secret, it)
logger.info("Created subscription with type $type for user ID $streamerID")
}
}

Expand All @@ -217,7 +201,6 @@ suspend fun maybeRemoveSubscriptions(streamerID: Long, twitchClient: TwitchClien
suspend fun removeSubscription(document: Document, subID: String) {
if (twitchClient.removeSubscription(subID)) {
database.subscriptions.deleteOne(document)
logger.info("Removed subscription with ID $subID")
}
}

Expand Down
72 changes: 63 additions & 9 deletions src/main/kotlin/TwitchClient.kt
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,18 @@ import io.ktor.client.request.*
import io.ktor.client.statement.*
import io.ktor.http.*
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.CompletableJob
import kotlinx.coroutines.Job
import kotlinx.coroutines.delay
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import kotlinx.serialization.Serializable
import kotlinx.serialization.encodeToString
import kotlinx.serialization.json.Json
import kotlinx.serialization.json.JsonObject
import kotlinx.serialization.json.jsonPrimitive
import java.time.LocalDateTime
import java.time.ZoneOffset

class TwitchClient private constructor(
val clientID: ClientID, private val clientSecret: ClientSecret, private val callbackUri: String,
Expand All @@ -22,6 +27,9 @@ class TwitchClient private constructor(
private val refetchTokenMutex = Mutex(locked = false)
private var awaitingToken: CompletableDeferred<Unit>? = null

private val ratelimitMutex = Mutex(locked = false)
private var awaitingRatelimitReset: CompletableJob? = null

suspend fun awaitCallbackAccess(expectedStatus: HttpStatusCode) {
logger.info("Testing callback URI in 10 seconds...")

Expand All @@ -47,34 +55,50 @@ class TwitchClient private constructor(
)
}

tailrec suspend fun fetchExistingSubscriptions(): List<SubscriptionData> {
suspend fun fetchExistingSubscriptions(): List<SubscriptionData> = fetchExistingSubscriptions(mutableListOf(), null)

private tailrec suspend fun fetchExistingSubscriptions(
list: MutableList<SubscriptionData>,
cursor: String?
): List<SubscriptionData> {
awaitingToken?.await()

val response = httpClient.get<HttpResponse>("https://api.twitch.tv/helix/eventsub/subscriptions") {
withDefaults()
cursor?.let { parameter("after", it) }
}

// if unauthorized, get a new access token and store it, then rerun the request
return when {
when {
response.status == HttpStatusCode.Unauthorized -> {
refetchToken()
fetchExistingSubscriptions()
return fetchExistingSubscriptions(list, cursor)
}
!response.status.isSuccess() -> {
logger.error("Failed to fetch subscriptions from Twitch. Error ${response.status}. Attempting refetch in 30 seconds")
delay(30)
fetchExistingSubscriptions()
delay(30_000)
return fetchExistingSubscriptions(list, cursor)
}
else -> Json.safeDecodeFromString<ResponseBody.GetSubs>(response.readText()).data
}

val body = Json.safeDecodeFromString<ResponseBody.GetSubs>(response.readText())
list.addAll(body.data)

return if (list.size >= body.total) {
logger.info("Total subscriptions: ${body.total}")
list
} else {
fetchExistingSubscriptions(list, body.pagination.getValue("cursor").jsonPrimitive.content)
}
}

tailrec suspend fun createSubscription(userID: Long, type: String, secret: String): SubscriptionData? {
tailrec suspend fun createSubscription(userID: Long, type: String, secret: String): List<SubscriptionData>? {
if (userID !in 0..Int.MAX_VALUE) {
logger.warn("Attempted to create subscription for invalid user ID $userID")
return null
}
awaitingToken?.await()
awaitingRatelimitReset?.join()

val condition = RequestBody.CreateSub.Condition(userID.toString())
val transport = RequestBody.CreateSub.Transport("webhook", "https://$callbackUri/webhooks/callback", secret)
Expand All @@ -96,16 +120,23 @@ class TwitchClient private constructor(
delay(30000)
createSubscription(userID, type, secret)
}
response.status == HttpStatusCode.TooManyRequests -> {
checkRatelimit(response.headers)
createSubscription(userID, type, secret)
}
!response.status.isSuccess() -> {
logger.error("Failed to create subscription for user ID $userID with type $type. ${response.readText()}")
null
}
else -> Json.safeDecodeFromString<ResponseBody.CreateSub>(response.readText()).data.first()
else -> Json.safeDecodeFromString<ResponseBody.CreateSub>(response.readText()).data.onEach {
logger.info("Created subscription with ID ${it.id} for user ID $userID and type $type")
}
}
}

tailrec suspend fun removeSubscription(subID: String): Boolean {
awaitingToken?.await()
awaitingRatelimitReset?.join()

val response = httpClient.delete<HttpResponse>("https://api.twitch.tv/helix/eventsub/subscriptions") {
withDefaults()
Expand All @@ -123,11 +154,18 @@ class TwitchClient private constructor(
delay(30000)
removeSubscription(subID)
}
response.status == HttpStatusCode.TooManyRequests -> {
checkRatelimit(response.headers)
removeSubscription(subID)
}
!response.status.isSuccess() -> {
logger.error("Failed to remove subscription with ID $subID. Error ${response.readText()}")
false
}
else -> true
else -> {
logger.info("Removed subscription with ID $subID")
true
}
}
}

Expand All @@ -152,6 +190,22 @@ class TwitchClient private constructor(
}
}

private suspend fun checkRatelimit(headers: Headers) {
if (ratelimitMutex.isLocked) {
return
}

ratelimitMutex.withLock {
val secs = headers["Ratelimit-Reset"]!!.toLong() - LocalDateTime.now().toEpochSecond(ZoneOffset.UTC)

awaitingRatelimitReset = Job()
logger.info("Hit rate limit for Twitch API, waiting $secs seconds...")
delay(secs * 1000)
awaitingRatelimitReset?.complete()
awaitingToken = null
}
}

companion object {
private val httpClient: HttpClient by lazy { HttpClient(Java) }

Expand Down
4 changes: 2 additions & 2 deletions src/main/kotlin/TwitchServer.kt
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ class TwitchServer(private val database: DatabaseController, private val sender:
call.respond(HttpStatusCode.NotFound)
}
} else {
call.respond(HttpStatusCode.Unauthorized)
call.respond(HttpStatusCode.Forbidden)
}
}
"notification" -> {
Expand Down Expand Up @@ -114,7 +114,7 @@ private fun PipelineContext<Unit, ApplicationCall>.verifyRequest(
text: String
): Boolean {
val subscription = subsCollection.find(Document("sub_id", subID)).firstOrNull() ?: run {
logger.warn("Request failed verification: no sub ID $subID in database")
logger.info("Request failed verification: no sub ID $subID in database")
return false
}

Expand Down

0 comments on commit 2e944b3

Please sign in to comment.