diff --git a/build.gradle.kts b/build.gradle.kts index e14ed02..33aec63 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -10,7 +10,7 @@ plugins { } group = "io.streamcord" -version = "1.0.0-RC3" +version = "1.0.0-RC4" repositories { mavenCentral() diff --git a/src/main/kotlin/App.kt b/src/main/kotlin/App.kt index db9d663..c51fa65 100644 --- a/src/main/kotlin/App.kt +++ b/src/main/kotlin/App.kt @@ -45,27 +45,31 @@ suspend fun main() = coroutineScope { TwitchServer.create(database, config.amqp).start() twitchClient.awaitCallbackAccess(TwitchServer.rootHttpStatus) - // synchronize subscriptions between DB and twitch - syncSubscriptions(twitchClient, database.subscriptions) { workerInfo shouldHandle it } + if (workerInfo.index == 0L) { + // synchronize subscriptions between DB and twitch + syncSubscriptions(twitchClient, database, removeFromDB = true) + } val eventHandler = EventHandler(4, workerInfo) eventHandler.collectIn(this) // find subscriptions without an associated notification and remove them - database.subscriptions.find().forEach { - val userID = it.getLong("user_id") - val doc = Document("streamer_id", userID.toString()) - - if (database.notifications.countDocuments(doc) == 0L) { - val subID = it.getString("sub_id") - logger.info("Subscription with ID $subID has no associated notifications, attempting removal") - eventHandler.submitEvent(userID) { - if (twitchClient.removeSubscription(subID)) { - database.subscriptions.deleteOne(it) + database.subscriptions.find() + .filter { workerInfo shouldHandle it.getLong("user_id") } + .forEach { + val userID = it.getLong("user_id") + val doc = Document("streamer_id", userID.toString()) + + if (database.notifications.countDocuments(doc) == 0L) { + val subID = it.getString("sub_id") + logger.info("Subscription with ID $subID has no associated notifications, attempting removal") + eventHandler.submitEvent(userID) { + if (twitchClient.removeSubscription(subID)) { + database.subscriptions.deleteOne(it) + } } } } - } database.notifications.find().forEach { doc -> val streamerID = doc.getString("streamer_id")?.toLong() ?: return@forEach @@ -134,51 +138,32 @@ private val Any.ansiBold get() = "\u001B[1m$this\u001B[0m" */ private suspend fun syncSubscriptions( twitchClient: TwitchClient, - collection: MongoCollection, - filter: (Long) -> Boolean + database: DatabaseController, + removeFromDB: Boolean ) { - val twitchSubscriptions = twitchClient.fetchExistingSubscriptions() - .associateBy { it.id } - .filter { filter(it.value.condition.broadcaster_user_id) } - logger.info("Fetched existing subscriptions from Twitch. Count for this worker: ${twitchSubscriptions.size}") + val twitchSubscriptions = twitchClient.fetchExistingSubscriptions().associateBy { it.id } + logger.info("Fetched existing subscriptions from Twitch. Count: ${twitchSubscriptions.size}") - val dbSubscriptions = collection.find() - .filter { filter(it.getLong("user_id")) } - .associateBy { it.getString("sub_id") } - logger.info("Found existing subscriptions in DB. Count for this worker: ${dbSubscriptions.size}") - - class MissingSubscription(val userID: Long, val type: String) + val dbSubscriptions = database.subscriptions.find().associateBy { it.getString("sub_id") } + logger.info("Found existing subscriptions in DB. Count: ${dbSubscriptions.size}") val missingFromTwitch = dbSubscriptions .filter { it.key !in twitchSubscriptions } - .mapValues { - val userID = it.value.getLong("user_id") - val type = it.value.getString("type") - MissingSubscription(userID, type) - } .also { logger.info("Found ${it.size.ansiBold} subscriptions in DB that were not reported by Twitch") } val missingFromDB = twitchSubscriptions .filter { it.key !in dbSubscriptions } - .mapValues { MissingSubscription(it.value.condition.broadcaster_user_id, it.value.type) } .also { logger.info("Received ${it.size.ansiBold} subscriptions from Twitch that were not in the DB") } - suspend fun replaceSubscription(sub: MissingSubscription) { - val secret = generateSecret() - twitchClient.createSubscription(sub.userID, sub.type, secret)?.let { - collection.insertSubscription(twitchClient.clientID, secret, it) - logger.info("Recreated missing subscription for user with ID ${sub.userID} and type ${sub.type}") - } - } - - missingFromTwitch.forEach { - collection.findOneAndDelete(Document("sub_id", it.key)) - replaceSubscription(it.value) + if (removeFromDB) { + missingFromTwitch + .onEach { database.subscriptions.findOneAndDelete(Document("sub_id", it.key)) } + .also { logger.info("Removed ${it.size.ansiBold} subscriptions from the DB that were not reported by Twitch") } } missingFromDB .filter { twitchClient.removeSubscription(it.key) } - .forEach { replaceSubscription(it.value) } + .also { logger.info("Removed ${it.size.ansiBold} subscriptions from Twitch that were not in the database") } } suspend fun maybeCreateSubscriptions( @@ -197,9 +182,8 @@ suspend fun maybeCreateSubscriptions( } suspend fun createSubscription(type: String, secret: String) { - twitchClient.createSubscription(streamerID, type, secret)?.let { + twitchClient.createSubscription(streamerID, type, secret)?.forEach { subscriptions.insertSubscription(twitchClient.clientID, secret, it) - logger.info("Created subscription with type $type for user ID $streamerID") } } @@ -217,7 +201,6 @@ suspend fun maybeRemoveSubscriptions(streamerID: Long, twitchClient: TwitchClien suspend fun removeSubscription(document: Document, subID: String) { if (twitchClient.removeSubscription(subID)) { database.subscriptions.deleteOne(document) - logger.info("Removed subscription with ID $subID") } } diff --git a/src/main/kotlin/TwitchClient.kt b/src/main/kotlin/TwitchClient.kt index 9106ccd..2e320a5 100644 --- a/src/main/kotlin/TwitchClient.kt +++ b/src/main/kotlin/TwitchClient.kt @@ -7,6 +7,8 @@ import io.ktor.client.request.* import io.ktor.client.statement.* import io.ktor.http.* import kotlinx.coroutines.CompletableDeferred +import kotlinx.coroutines.CompletableJob +import kotlinx.coroutines.Job import kotlinx.coroutines.delay import kotlinx.coroutines.sync.Mutex import kotlinx.coroutines.sync.withLock @@ -14,6 +16,9 @@ import kotlinx.serialization.Serializable import kotlinx.serialization.encodeToString import kotlinx.serialization.json.Json import kotlinx.serialization.json.JsonObject +import kotlinx.serialization.json.jsonPrimitive +import java.time.LocalDateTime +import java.time.ZoneOffset class TwitchClient private constructor( val clientID: ClientID, private val clientSecret: ClientSecret, private val callbackUri: String, @@ -22,6 +27,9 @@ class TwitchClient private constructor( private val refetchTokenMutex = Mutex(locked = false) private var awaitingToken: CompletableDeferred? = null + private val ratelimitMutex = Mutex(locked = false) + private var awaitingRatelimitReset: CompletableJob? = null + suspend fun awaitCallbackAccess(expectedStatus: HttpStatusCode) { logger.info("Testing callback URI in 10 seconds...") @@ -47,34 +55,50 @@ class TwitchClient private constructor( ) } - tailrec suspend fun fetchExistingSubscriptions(): List { + suspend fun fetchExistingSubscriptions(): List = fetchExistingSubscriptions(mutableListOf(), null) + + private tailrec suspend fun fetchExistingSubscriptions( + list: MutableList, + cursor: String? + ): List { awaitingToken?.await() val response = httpClient.get("https://api.twitch.tv/helix/eventsub/subscriptions") { withDefaults() + cursor?.let { parameter("after", it) } } // if unauthorized, get a new access token and store it, then rerun the request - return when { + when { response.status == HttpStatusCode.Unauthorized -> { refetchToken() - fetchExistingSubscriptions() + return fetchExistingSubscriptions(list, cursor) } !response.status.isSuccess() -> { logger.error("Failed to fetch subscriptions from Twitch. Error ${response.status}. Attempting refetch in 30 seconds") - delay(30) - fetchExistingSubscriptions() + delay(30_000) + return fetchExistingSubscriptions(list, cursor) } - else -> Json.safeDecodeFromString(response.readText()).data + } + + val body = Json.safeDecodeFromString(response.readText()) + list.addAll(body.data) + + return if (list.size >= body.total) { + logger.info("Total subscriptions: ${body.total}") + list + } else { + fetchExistingSubscriptions(list, body.pagination.getValue("cursor").jsonPrimitive.content) } } - tailrec suspend fun createSubscription(userID: Long, type: String, secret: String): SubscriptionData? { + tailrec suspend fun createSubscription(userID: Long, type: String, secret: String): List? { if (userID !in 0..Int.MAX_VALUE) { logger.warn("Attempted to create subscription for invalid user ID $userID") return null } awaitingToken?.await() + awaitingRatelimitReset?.join() val condition = RequestBody.CreateSub.Condition(userID.toString()) val transport = RequestBody.CreateSub.Transport("webhook", "https://$callbackUri/webhooks/callback", secret) @@ -96,16 +120,23 @@ class TwitchClient private constructor( delay(30000) createSubscription(userID, type, secret) } + response.status == HttpStatusCode.TooManyRequests -> { + checkRatelimit(response.headers) + createSubscription(userID, type, secret) + } !response.status.isSuccess() -> { logger.error("Failed to create subscription for user ID $userID with type $type. ${response.readText()}") null } - else -> Json.safeDecodeFromString(response.readText()).data.first() + else -> Json.safeDecodeFromString(response.readText()).data.onEach { + logger.info("Created subscription with ID ${it.id} for user ID $userID and type $type") + } } } tailrec suspend fun removeSubscription(subID: String): Boolean { awaitingToken?.await() + awaitingRatelimitReset?.join() val response = httpClient.delete("https://api.twitch.tv/helix/eventsub/subscriptions") { withDefaults() @@ -123,11 +154,18 @@ class TwitchClient private constructor( delay(30000) removeSubscription(subID) } + response.status == HttpStatusCode.TooManyRequests -> { + checkRatelimit(response.headers) + removeSubscription(subID) + } !response.status.isSuccess() -> { logger.error("Failed to remove subscription with ID $subID. Error ${response.readText()}") false } - else -> true + else -> { + logger.info("Removed subscription with ID $subID") + true + } } } @@ -152,6 +190,22 @@ class TwitchClient private constructor( } } + private suspend fun checkRatelimit(headers: Headers) { + if (ratelimitMutex.isLocked) { + return + } + + ratelimitMutex.withLock { + val secs = headers["Ratelimit-Reset"]!!.toLong() - LocalDateTime.now().toEpochSecond(ZoneOffset.UTC) + + awaitingRatelimitReset = Job() + logger.info("Hit rate limit for Twitch API, waiting $secs seconds...") + delay(secs * 1000) + awaitingRatelimitReset?.complete() + awaitingToken = null + } + } + companion object { private val httpClient: HttpClient by lazy { HttpClient(Java) } diff --git a/src/main/kotlin/TwitchServer.kt b/src/main/kotlin/TwitchServer.kt index 9d85047..81cc6b7 100644 --- a/src/main/kotlin/TwitchServer.kt +++ b/src/main/kotlin/TwitchServer.kt @@ -58,7 +58,7 @@ class TwitchServer(private val database: DatabaseController, private val sender: call.respond(HttpStatusCode.NotFound) } } else { - call.respond(HttpStatusCode.Unauthorized) + call.respond(HttpStatusCode.Forbidden) } } "notification" -> { @@ -114,7 +114,7 @@ private fun PipelineContext.verifyRequest( text: String ): Boolean { val subscription = subsCollection.find(Document("sub_id", subID)).firstOrNull() ?: run { - logger.warn("Request failed verification: no sub ID $subID in database") + logger.info("Request failed verification: no sub ID $subID in database") return false }