Skip to content

Commit

Permalink
Merge pull request #240 from lichess-org/team-ingestor
Browse files Browse the repository at this point in the history
team ingestor
  • Loading branch information
ornicar committed Jun 23, 2024
2 parents 2eaa27f + ace4d7a commit 0bdefaf
Show file tree
Hide file tree
Showing 4 changed files with 191 additions and 61 deletions.
93 changes: 40 additions & 53 deletions modules/ingestor/src/main/scala/ingestor.forum.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,19 @@ import java.time.Instant
import scala.concurrent.duration.*

trait ForumIngestor:
// Utilize change events functionality of MongoDB to watch for changes in the forum posts collection.
def watch(): fs2.Stream[IO, Unit]
// Fetch posts from since to until and ingest to data
// watch change events from MongoDB and ingest forum posts into elastic search
def watch: fs2.Stream[IO, Unit]
// Fetch posts in [since, until] and ingest into elastic search
def run(since: Instant, until: Option[Instant], dryRun: Boolean): fs2.Stream[IO, Unit]

object ForumIngestor:

private val index = Index.Forum

private val topicProjection = Projection.include(List("_id", "name"))

private val interestedOperations = List(DELETE, INSERT, REPLACE).map(_.getValue)
private val eventFilter = Filter.in("operationType", interestedOperations)

private val interestedFields = List("_id", "text", "topicId", "troll", "createdAt", "userId", "erasedAt")
private val interestedFields = List(_id, F.text, F.topicId, F.troll, F.createdAt, F.userId, F.erasedAt)
private val postProjection = Projection.include(interestedFields)

private val interestedEventFields =
Expand All @@ -49,7 +47,7 @@ object ForumIngestor:
posts: MongoCollection
)(using Logger[IO]): ForumIngestor = new:

def watch(): fs2.Stream[IO, Unit] =
def watch: fs2.Stream[IO, Unit] =
fs2.Stream
.eval(startAt.flatTap(since => info"Starting forum ingestor from $since"))
.flatMap: last =>
Expand All @@ -58,13 +56,13 @@ object ForumIngestor:
val lastEventTimestamp = events.flatten(_.clusterTime.flatMap(_.asInstant)).maxOption
val (toDelete, toIndex) = events.partition(_.isDelete)
storeBulk(toIndex.flatten(_.fullDocument))
*> deleteMany(toDelete)
*> elastic.deleteMany(index, toDelete)
*> saveLastIndexedTimestamp(lastEventTimestamp.getOrElse(Instant.now()))

def run(since: Instant, until: Option[Instant], dryRun: Boolean): fs2.Stream[IO, Unit] =
val filter = range("createdAt")(since, until)
.or(range("updatedAt")(since, until))
.or(range("erasedAt")(since, until))
val filter = range(F.createdAt)(since, until)
.or(range(F.updatedAt)(since, until))
.or(range(F.erasedAt)(since, until))
posts
.find(filter)
.projection(postProjection)
Expand All @@ -77,37 +75,17 @@ object ForumIngestor:
dryRun.fold(
toIndex.traverse_(doc => debug"Would index $doc")
*> toDelete.traverse_(doc => debug"Would delete $doc"),
storeBulk(toIndex) *> deleteMany(toDelete)
storeBulk(toIndex) *> elastic.deleteMany(index, toDelete)
)

private def storeBulk(events: List[Document]): IO[Unit] =
info"Received ${events.size} forum posts to index" *>
IO.whenA(events.nonEmpty):
events.toSources
.flatMap: sources =>
elastic.storeBulk(index, sources) *> info"Indexed ${sources.size} forum posts"
.handleErrorWith: e =>
Logger[IO].error(e)(s"Failed to index forum posts: ${events.map(_._id).mkString(", ")}")

@scala.annotation.targetName("deleteManyWithDocs")
private def deleteMany(events: List[Document]): IO[Unit] =
info"Received ${events.size} forum posts to delete" *>
IO.whenA(events.nonEmpty):
deleteMany(events.flatMap(_._id).map(Id.apply))

@scala.annotation.targetName("deleteManyWithChanges")
private def deleteMany(events: List[ChangeStreamDocument[Document]]): IO[Unit] =
info"Received ${events.size} forum posts to delete" *>
deleteMany(events.flatMap(_.docId).map(Id.apply)).whenA(events.nonEmpty)

@scala.annotation.targetName("deleteManyWithIds")
private def deleteMany(ids: List[Id]): IO[Unit] =
IO.whenA(ids.nonEmpty):
elastic
.deleteMany(index, ids)
.flatTap(_ => info"Deleted ${ids.size} forum posts")
private def storeBulk(docs: List[Document]): IO[Unit] =
info"Received ${docs.size} forum posts to index" *>
docs.toSources
.flatMap: sources =>
elastic.storeBulk(index, sources) *> info"Indexed ${sources.size} forum posts"
.handleErrorWith: e =>
Logger[IO].error(e)(s"Failed to delete forum posts: ${ids.map(_.value).mkString(", ")}")
Logger[IO].error(e)(s"Failed to index forum posts: ${docs.map(_.id).mkString(", ")}")
.whenA(docs.nonEmpty)

private def saveLastIndexedTimestamp(time: Instant): IO[Unit] =
store.put(index.value, time)
Expand All @@ -119,10 +97,10 @@ object ForumIngestor:
// Fetches topic names by their ids
private def topicByIds(ids: Seq[String]): IO[Map[String, String]] =
topics
.find(Filter.in("_id", ids))
.projection(topicProjection)
.find(Filter.in(_id, ids))
.projection(Projection.include(List(_id, Topic.name)))
.all
.map(_.map(doc => (doc.getString("_id") -> doc.getString("name")).mapN(_ -> _)).flatten.toMap)
.map(_.map(doc => (doc.id -> doc.getString(Topic.name)).mapN(_ -> _)).flatten.toMap)

private def changes(since: Option[Instant]): fs2.Stream[IO, List[ChangeStreamDocument[Document]]] =
val builder = posts.watch(aggregate)
Expand Down Expand Up @@ -155,37 +133,46 @@ object ForumIngestor:
extension (doc: Document)

private def toSource(topicMap: Map[String, String]): IO[Option[SourceWithId]] =
(doc._id, doc.getString("topicId"))
(doc.id, doc.topicId)
.flatMapN: (id, topicId) =>
doc.toSource(topicMap.get(topicId), topicId).map(id -> _)
.match
case Some(value) => value.some.pure[IO]
case _ =>
val reason = doc._id.fold("missing doc._id; ")(_ => "")
val reason = doc.id.fold("missing doc._id; ")(_ => "")
+ doc.topicId.fold("missing doc.topicId; ")(_ => "")
+ doc.topicId
.map(id => topicMap.get(id).fold("topic or topicName is missing")(_ => ""))
.map(id => topicMap.get(id).fold("topic or topic.name is missing")(_ => ""))
.getOrElse("")
info"failed to convert document to source: $doc because $reason".as(none)

private def toSource(topicName: Option[String], topicId: String): Option[ForumSource] =
(
doc.getString("text"),
doc.getString(F.text),
topicName,
doc.getBoolean("troll"),
doc.getNested("createdAt").flatMap(_.asInstant).map(_.toEpochMilli()),
doc.getString("userId").some
doc.getBoolean(F.troll),
doc.getNested(F.createdAt).flatMap(_.asInstant).map(_.toEpochMilli),
doc.getString(F.userId).some
).mapN(ForumSource.apply(_, _, topicId, _, _, _))

private def isErased: Boolean =
doc.get("erasedAt").isDefined

private def _id: Option[String] =
doc.getString("_id")

private def topicId: Option[String] =
doc.getString("topicId")
doc.getString(F.topicId)

extension (event: ChangeStreamDocument[Document])
private def isDelete: Boolean =
event.operationType == DELETE || event.fullDocument.exists(_.isErased)

object F:
val text = "text"
val topicId = "topicId"
val troll = "troll"
val createdAt = "createdAt"
val userId = "userId"
val erasedAt = "erasedAt"
val updatedAt = "updatedAt"

object Topic:
val name = "name"
9 changes: 6 additions & 3 deletions modules/ingestor/src/main/scala/ingestor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package lila.search
package ingestor

import cats.effect.*
import cats.syntax.all.*
import mongo4cats.database.MongoDatabase
import org.typelevel.log4cats.Logger

Expand All @@ -13,6 +14,8 @@ object Ingestor:
def apply(mongo: MongoDatabase[IO], elastic: ESClient[IO], store: KVStore, config: IngestorConfig)(using
Logger[IO]
): IO[Ingestor] =
ForumIngestor(mongo, elastic, store, config.forum).map: f =>
new Ingestor:
def run(): IO[Unit] = f.watch().compile.drain
(ForumIngestor(mongo, elastic, store, config.forum), TeamIngestor(mongo, elastic, store, config.team))
.mapN: (forum, team) =>
new Ingestor:
def run() =
forum.watch.merge(team.watch).compile.drain
112 changes: 112 additions & 0 deletions modules/ingestor/src/main/scala/ingestor.team.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
package lila.search
package ingestor

import cats.effect.IO
import cats.syntax.all.*
import com.mongodb.client.model.changestream.FullDocument
import com.mongodb.client.model.changestream.OperationType.*
import lila.search.spec.TeamSource
import mongo4cats.bson.Document
import mongo4cats.database.MongoDatabase
import mongo4cats.models.collection.ChangeStreamDocument
import mongo4cats.operations.{ Aggregate, Filter, Projection }
import org.typelevel.log4cats.Logger
import org.typelevel.log4cats.syntax.*

import java.time.Instant
import scala.concurrent.duration.*

trait TeamIngestor:
// watch change events from MongoDB and ingest team data into elastic search
def watch: fs2.Stream[IO, Unit]

object TeamIngestor:

private val index = Index.Team

private val interestedOperations = List(DELETE, INSERT, UPDATE, REPLACE).map(_.getValue)
private val eventFilter = Filter.in("operationType", interestedOperations)

private val interestedFields = List("_id", F.name, F.description, F.nbMembers, F.name, F.enabled)

private val interestedEventFields =
List("operationType", "clusterTime", "documentKey._id") ++ interestedFields.map("fullDocument." + _)
private val eventProjection = Projection.include(interestedEventFields)

private val aggregate = Aggregate.matchBy(eventFilter).combinedWith(Aggregate.project(eventProjection))

def apply(mongo: MongoDatabase[IO], elastic: ESClient[IO], store: KVStore, config: IngestorConfig.Team)(
using Logger[IO]
): IO[TeamIngestor] =
mongo.getCollection("team").map(apply(elastic, store, config))

def apply(elastic: ESClient[IO], store: KVStore, config: IngestorConfig.Team)(teams: MongoCollection)(using
Logger[IO]
): TeamIngestor = new:
def watch =
fs2.Stream
.eval(startAt.flatTap(since => info"Starting team ingestor from $since"))
.flatMap: last =>
changeStream(last)
.filterNot(_.isEmpty)
.evalMap: events =>
val lastEventTimestamp = events.lastOption.flatMap(_.clusterTime).flatMap(_.asInstant)
val (toDelete, toIndex) = events.partition(_.isDelete)
storeBulk(toIndex.flatten(_.fullDocument))
*> elastic.deleteMany(index, toDelete)
*> saveLastIndexedTimestamp(lastEventTimestamp.getOrElse(Instant.now))

private def storeBulk(docs: List[Document]): IO[Unit] =
val sources = docs.toSources
info"Received ${docs.size} teams to index" *>
elastic.storeBulk(index, sources) *> info"Indexed ${sources.size} teams"
.handleErrorWith: e =>
Logger[IO].error(e)(s"Failed to index teams: ${docs.map(_.id).mkString(", ")}")

private def saveLastIndexedTimestamp(time: Instant): IO[Unit] =
store.put(index.value, time)
*> info"Stored last indexed time ${time.getEpochSecond} for $index"

private def startAt: IO[Option[Instant]] =
config.startAt.fold(store.get(index.value))(Instant.ofEpochSecond(_).some.pure[IO])

private def changeStream(since: Option[Instant]): fs2.Stream[IO, List[ChangeStreamDocument[Document]]] =
// skip the first event if we're starting from a specific timestamp
// since the event at that timestamp is already indexed
val skip = since.fold(0)(_ => 1)
val builder = teams.watch(aggregate)
since
.fold(builder)(x => builder.startAtOperationTime(x.asBsonTimestamp))
.batchSize(config.batchSize)
.fullDocument(FullDocument.UPDATE_LOOKUP) // this is required for update event
.boundedStream(config.batchSize)
.drop(skip)
.evalTap(x => debug"Team change stream event: $x")
.groupWithin(config.batchSize, config.timeWindows.second)
.map(_.toList)

extension (docs: List[Document])
private def toSources: List[(String, TeamSource)] =
docs.flatten(doc => (doc.id, doc.toSource).mapN(_ -> _))

extension (doc: Document)
private def toSource: Option[TeamSource] =
(
doc.getString(F.name),
doc.getString(F.description),
doc.getInt(F.nbMembers)
).mapN(TeamSource.apply)

private def isEnabled =
doc.getBoolean(F.enabled).getOrElse(true)

extension (event: ChangeStreamDocument[Document])
private def isDelete: Boolean =
event.operationType == DELETE ||
event.fullDocument.fold(false)(x => !x.isEnabled)

object F:
val name = "name"
val description = "description"
val nbMembers = "nbMembers"
val enabled = "enabled"
38 changes: 33 additions & 5 deletions modules/ingestor/src/main/scala/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package lila.search
package ingestor

import cats.effect.IO
import cats.syntax.all.*
import com.github.plokhotnyuk.jsoniter_scala.core.*
import com.sksamuel.elastic4s.Indexable
import lila.search.spec.{ ForumSource, Source }
Expand All @@ -10,15 +11,22 @@ import mongo4cats.collection.GenericMongoCollection
import mongo4cats.models.collection.ChangeStreamDocument
import mongo4cats.operations.Filter
import org.bson.BsonTimestamp
import org.typelevel.log4cats.Logger
import org.typelevel.log4cats.syntax.*
import smithy4s.json.Json.given
import smithy4s.schema.Schema

import java.time.Instant

val _id = "_id"

type MongoCollection = GenericMongoCollection[IO, Document, [A] =>> fs2.Stream[IO, A]]

extension [A](change: ChangeStreamDocument[A])
def docId: Option[String] = change.documentKey.flatMap(_.getString("_id"))
extension [A](change: ChangeStreamDocument[A]) def docId: Option[String] = change.documentKey.flatMap(_.id)

extension (doc: Document)
private def id: Option[String] =
doc.getString(_id)

given [A: Schema]: Indexable[A] = (a: A) => writeToString(a)
given Indexable[Source] =
Expand All @@ -32,6 +40,26 @@ extension (instant: Instant)
def asBsonTimestamp: BsonTimestamp = BsonTimestamp(instant.getEpochSecond.toInt, 1)

def range(field: String)(since: Instant, until: Option[Instant]): Filter =
import Filter.*
val gtes = gte(field, since)
until.fold(gtes)(until => gtes.and(lt(field, until)))
val gtes = Filter.gte(field, since)
until.fold(gtes)(until => gtes.and(Filter.lt(field, until)))

extension (elastic: ESClient[IO])
@scala.annotation.targetName("deleteManyWithIds")
def deleteMany(index: Index, ids: List[Id])(using Logger[IO]): IO[Unit] =
elastic
.deleteMany(index, ids)
.flatTap(_ => Logger[IO].info(s"Deleted ${ids.size} ${index.value}s"))
.handleErrorWith: e =>
Logger[IO].error(e)(s"Failed to delete ${index.value}s: ${ids.map(_.value).mkString(", ")}")
.whenA(ids.nonEmpty)

@scala.annotation.targetName("deleteManyWithDocs")
def deleteMany(index: Index, events: List[Document])(using Logger[IO]): IO[Unit] =
info"Received ${events.size} forum posts to delete" *>
deleteMany(index, events.flatMap(_.id).map(Id.apply))
.whenA(events.nonEmpty)

@scala.annotation.targetName("deleteManyWithChanges")
def deleteMany(index: Index, events: List[ChangeStreamDocument[Document]])(using Logger[IO]): IO[Unit] =
info"Received ${events.size} forum posts to delete" *>
deleteMany(index, events.flatMap(_.docId).map(Id.apply)).whenA(events.nonEmpty)

0 comments on commit 0bdefaf

Please sign in to comment.