diff --git a/modules/ingestor/src/main/scala/ingestor.forum.scala b/modules/ingestor/src/main/scala/ingestor.forum.scala index e6a9f1f..0be4282 100644 --- a/modules/ingestor/src/main/scala/ingestor.forum.scala +++ b/modules/ingestor/src/main/scala/ingestor.forum.scala @@ -16,21 +16,19 @@ import java.time.Instant import scala.concurrent.duration.* trait ForumIngestor: - // Utilize change events functionality of MongoDB to watch for changes in the forum posts collection. - def watch(): fs2.Stream[IO, Unit] - // Fetch posts from since to until and ingest to data + // watch change events from MongoDB and ingest forum posts into elastic search + def watch: fs2.Stream[IO, Unit] + // Fetch posts in [since, until] and ingest into elastic search def run(since: Instant, until: Option[Instant], dryRun: Boolean): fs2.Stream[IO, Unit] object ForumIngestor: private val index = Index.Forum - private val topicProjection = Projection.include(List("_id", "name")) - private val interestedOperations = List(DELETE, INSERT, REPLACE).map(_.getValue) private val eventFilter = Filter.in("operationType", interestedOperations) - private val interestedFields = List("_id", "text", "topicId", "troll", "createdAt", "userId", "erasedAt") + private val interestedFields = List(_id, F.text, F.topicId, F.troll, F.createdAt, F.userId, F.erasedAt) private val postProjection = Projection.include(interestedFields) private val interestedEventFields = @@ -49,7 +47,7 @@ object ForumIngestor: posts: MongoCollection )(using Logger[IO]): ForumIngestor = new: - def watch(): fs2.Stream[IO, Unit] = + def watch: fs2.Stream[IO, Unit] = fs2.Stream .eval(startAt.flatTap(since => info"Starting forum ingestor from $since")) .flatMap: last => @@ -58,13 +56,13 @@ object ForumIngestor: val lastEventTimestamp = events.flatten(_.clusterTime.flatMap(_.asInstant)).maxOption val (toDelete, toIndex) = events.partition(_.isDelete) storeBulk(toIndex.flatten(_.fullDocument)) - *> deleteMany(toDelete) + *> elastic.deleteMany(index, toDelete) *> saveLastIndexedTimestamp(lastEventTimestamp.getOrElse(Instant.now())) def run(since: Instant, until: Option[Instant], dryRun: Boolean): fs2.Stream[IO, Unit] = - val filter = range("createdAt")(since, until) - .or(range("updatedAt")(since, until)) - .or(range("erasedAt")(since, until)) + val filter = range(F.createdAt)(since, until) + .or(range(F.updatedAt)(since, until)) + .or(range(F.erasedAt)(since, until)) posts .find(filter) .projection(postProjection) @@ -77,37 +75,17 @@ object ForumIngestor: dryRun.fold( toIndex.traverse_(doc => debug"Would index $doc") *> toDelete.traverse_(doc => debug"Would delete $doc"), - storeBulk(toIndex) *> deleteMany(toDelete) + storeBulk(toIndex) *> elastic.deleteMany(index, toDelete) ) - private def storeBulk(events: List[Document]): IO[Unit] = - info"Received ${events.size} forum posts to index" *> - IO.whenA(events.nonEmpty): - events.toSources - .flatMap: sources => - elastic.storeBulk(index, sources) *> info"Indexed ${sources.size} forum posts" - .handleErrorWith: e => - Logger[IO].error(e)(s"Failed to index forum posts: ${events.map(_._id).mkString(", ")}") - - @scala.annotation.targetName("deleteManyWithDocs") - private def deleteMany(events: List[Document]): IO[Unit] = - info"Received ${events.size} forum posts to delete" *> - IO.whenA(events.nonEmpty): - deleteMany(events.flatMap(_._id).map(Id.apply)) - - @scala.annotation.targetName("deleteManyWithChanges") - private def deleteMany(events: List[ChangeStreamDocument[Document]]): IO[Unit] = - info"Received ${events.size} forum posts to delete" *> - deleteMany(events.flatMap(_.docId).map(Id.apply)).whenA(events.nonEmpty) - - @scala.annotation.targetName("deleteManyWithIds") - private def deleteMany(ids: List[Id]): IO[Unit] = - IO.whenA(ids.nonEmpty): - elastic - .deleteMany(index, ids) - .flatTap(_ => info"Deleted ${ids.size} forum posts") + private def storeBulk(docs: List[Document]): IO[Unit] = + info"Received ${docs.size} forum posts to index" *> + docs.toSources + .flatMap: sources => + elastic.storeBulk(index, sources) *> info"Indexed ${sources.size} forum posts" .handleErrorWith: e => - Logger[IO].error(e)(s"Failed to delete forum posts: ${ids.map(_.value).mkString(", ")}") + Logger[IO].error(e)(s"Failed to index forum posts: ${docs.map(_.id).mkString(", ")}") + .whenA(docs.nonEmpty) private def saveLastIndexedTimestamp(time: Instant): IO[Unit] = store.put(index.value, time) @@ -119,10 +97,10 @@ object ForumIngestor: // Fetches topic names by their ids private def topicByIds(ids: Seq[String]): IO[Map[String, String]] = topics - .find(Filter.in("_id", ids)) - .projection(topicProjection) + .find(Filter.in(_id, ids)) + .projection(Projection.include(List(_id, Topic.name))) .all - .map(_.map(doc => (doc.getString("_id") -> doc.getString("name")).mapN(_ -> _)).flatten.toMap) + .map(_.map(doc => (doc.id -> doc.getString(Topic.name)).mapN(_ -> _)).flatten.toMap) private def changes(since: Option[Instant]): fs2.Stream[IO, List[ChangeStreamDocument[Document]]] = val builder = posts.watch(aggregate) @@ -155,37 +133,46 @@ object ForumIngestor: extension (doc: Document) private def toSource(topicMap: Map[String, String]): IO[Option[SourceWithId]] = - (doc._id, doc.getString("topicId")) + (doc.id, doc.topicId) .flatMapN: (id, topicId) => doc.toSource(topicMap.get(topicId), topicId).map(id -> _) .match case Some(value) => value.some.pure[IO] case _ => - val reason = doc._id.fold("missing doc._id; ")(_ => "") + val reason = doc.id.fold("missing doc._id; ")(_ => "") + doc.topicId.fold("missing doc.topicId; ")(_ => "") + doc.topicId - .map(id => topicMap.get(id).fold("topic or topicName is missing")(_ => "")) + .map(id => topicMap.get(id).fold("topic or topic.name is missing")(_ => "")) .getOrElse("") info"failed to convert document to source: $doc because $reason".as(none) private def toSource(topicName: Option[String], topicId: String): Option[ForumSource] = ( - doc.getString("text"), + doc.getString(F.text), topicName, - doc.getBoolean("troll"), - doc.getNested("createdAt").flatMap(_.asInstant).map(_.toEpochMilli()), - doc.getString("userId").some + doc.getBoolean(F.troll), + doc.getNested(F.createdAt).flatMap(_.asInstant).map(_.toEpochMilli), + doc.getString(F.userId).some ).mapN(ForumSource.apply(_, _, topicId, _, _, _)) private def isErased: Boolean = doc.get("erasedAt").isDefined - private def _id: Option[String] = - doc.getString("_id") - private def topicId: Option[String] = - doc.getString("topicId") + doc.getString(F.topicId) extension (event: ChangeStreamDocument[Document]) private def isDelete: Boolean = event.operationType == DELETE || event.fullDocument.exists(_.isErased) + + object F: + val text = "text" + val topicId = "topicId" + val troll = "troll" + val createdAt = "createdAt" + val userId = "userId" + val erasedAt = "erasedAt" + val updatedAt = "updatedAt" + + object Topic: + val name = "name" diff --git a/modules/ingestor/src/main/scala/ingestor.scala b/modules/ingestor/src/main/scala/ingestor.scala index 285d484..f9c0424 100644 --- a/modules/ingestor/src/main/scala/ingestor.scala +++ b/modules/ingestor/src/main/scala/ingestor.scala @@ -2,6 +2,7 @@ package lila.search package ingestor import cats.effect.* +import cats.syntax.all.* import mongo4cats.database.MongoDatabase import org.typelevel.log4cats.Logger @@ -13,6 +14,8 @@ object Ingestor: def apply(mongo: MongoDatabase[IO], elastic: ESClient[IO], store: KVStore, config: IngestorConfig)(using Logger[IO] ): IO[Ingestor] = - ForumIngestor(mongo, elastic, store, config.forum).map: f => - new Ingestor: - def run(): IO[Unit] = f.watch().compile.drain + (ForumIngestor(mongo, elastic, store, config.forum), TeamIngestor(mongo, elastic, store, config.team)) + .mapN: (forum, team) => + new Ingestor: + def run() = + forum.watch.merge(team.watch).compile.drain diff --git a/modules/ingestor/src/main/scala/ingestor.team.scala b/modules/ingestor/src/main/scala/ingestor.team.scala new file mode 100644 index 0000000..410d942 --- /dev/null +++ b/modules/ingestor/src/main/scala/ingestor.team.scala @@ -0,0 +1,112 @@ +package lila.search +package ingestor + +import cats.effect.IO +import cats.syntax.all.* +import com.mongodb.client.model.changestream.FullDocument +import com.mongodb.client.model.changestream.OperationType.* +import lila.search.spec.TeamSource +import mongo4cats.bson.Document +import mongo4cats.database.MongoDatabase +import mongo4cats.models.collection.ChangeStreamDocument +import mongo4cats.operations.{ Aggregate, Filter, Projection } +import org.typelevel.log4cats.Logger +import org.typelevel.log4cats.syntax.* + +import java.time.Instant +import scala.concurrent.duration.* + +trait TeamIngestor: + // watch change events from MongoDB and ingest team data into elastic search + def watch: fs2.Stream[IO, Unit] + +object TeamIngestor: + + private val index = Index.Team + + private val interestedOperations = List(DELETE, INSERT, UPDATE, REPLACE).map(_.getValue) + private val eventFilter = Filter.in("operationType", interestedOperations) + + private val interestedFields = List("_id", F.name, F.description, F.nbMembers, F.name, F.enabled) + + private val interestedEventFields = + List("operationType", "clusterTime", "documentKey._id") ++ interestedFields.map("fullDocument." + _) + private val eventProjection = Projection.include(interestedEventFields) + + private val aggregate = Aggregate.matchBy(eventFilter).combinedWith(Aggregate.project(eventProjection)) + + def apply(mongo: MongoDatabase[IO], elastic: ESClient[IO], store: KVStore, config: IngestorConfig.Team)( + using Logger[IO] + ): IO[TeamIngestor] = + mongo.getCollection("team").map(apply(elastic, store, config)) + + def apply(elastic: ESClient[IO], store: KVStore, config: IngestorConfig.Team)(teams: MongoCollection)(using + Logger[IO] + ): TeamIngestor = new: + def watch = + fs2.Stream + .eval(startAt.flatTap(since => info"Starting team ingestor from $since")) + .flatMap: last => + changeStream(last) + .filterNot(_.isEmpty) + .evalMap: events => + val lastEventTimestamp = events.lastOption.flatMap(_.clusterTime).flatMap(_.asInstant) + val (toDelete, toIndex) = events.partition(_.isDelete) + storeBulk(toIndex.flatten(_.fullDocument)) + *> elastic.deleteMany(index, toDelete) + *> saveLastIndexedTimestamp(lastEventTimestamp.getOrElse(Instant.now)) + + private def storeBulk(docs: List[Document]): IO[Unit] = + val sources = docs.toSources + info"Received ${docs.size} teams to index" *> + elastic.storeBulk(index, sources) *> info"Indexed ${sources.size} teams" + .handleErrorWith: e => + Logger[IO].error(e)(s"Failed to index teams: ${docs.map(_.id).mkString(", ")}") + + private def saveLastIndexedTimestamp(time: Instant): IO[Unit] = + store.put(index.value, time) + *> info"Stored last indexed time ${time.getEpochSecond} for $index" + + private def startAt: IO[Option[Instant]] = + config.startAt.fold(store.get(index.value))(Instant.ofEpochSecond(_).some.pure[IO]) + + private def changeStream(since: Option[Instant]): fs2.Stream[IO, List[ChangeStreamDocument[Document]]] = + // skip the first event if we're starting from a specific timestamp + // since the event at that timestamp is already indexed + val skip = since.fold(0)(_ => 1) + val builder = teams.watch(aggregate) + since + .fold(builder)(x => builder.startAtOperationTime(x.asBsonTimestamp)) + .batchSize(config.batchSize) + .fullDocument(FullDocument.UPDATE_LOOKUP) // this is required for update event + .boundedStream(config.batchSize) + .drop(skip) + .evalTap(x => debug"Team change stream event: $x") + .groupWithin(config.batchSize, config.timeWindows.second) + .map(_.toList) + + extension (docs: List[Document]) + private def toSources: List[(String, TeamSource)] = + docs.flatten(doc => (doc.id, doc.toSource).mapN(_ -> _)) + + extension (doc: Document) + private def toSource: Option[TeamSource] = + ( + doc.getString(F.name), + doc.getString(F.description), + doc.getInt(F.nbMembers) + ).mapN(TeamSource.apply) + + private def isEnabled = + doc.getBoolean(F.enabled).getOrElse(true) + + extension (event: ChangeStreamDocument[Document]) + private def isDelete: Boolean = + event.operationType == DELETE || + event.fullDocument.fold(false)(x => !x.isEnabled) + + object F: + val name = "name" + val description = "description" + val nbMembers = "nbMembers" + val enabled = "enabled" diff --git a/modules/ingestor/src/main/scala/package.scala b/modules/ingestor/src/main/scala/package.scala index 6d80774..191f5fb 100644 --- a/modules/ingestor/src/main/scala/package.scala +++ b/modules/ingestor/src/main/scala/package.scala @@ -2,6 +2,7 @@ package lila.search package ingestor import cats.effect.IO +import cats.syntax.all.* import com.github.plokhotnyuk.jsoniter_scala.core.* import com.sksamuel.elastic4s.Indexable import lila.search.spec.{ ForumSource, Source } @@ -10,15 +11,22 @@ import mongo4cats.collection.GenericMongoCollection import mongo4cats.models.collection.ChangeStreamDocument import mongo4cats.operations.Filter import org.bson.BsonTimestamp +import org.typelevel.log4cats.Logger +import org.typelevel.log4cats.syntax.* import smithy4s.json.Json.given import smithy4s.schema.Schema import java.time.Instant +val _id = "_id" + type MongoCollection = GenericMongoCollection[IO, Document, [A] =>> fs2.Stream[IO, A]] -extension [A](change: ChangeStreamDocument[A]) - def docId: Option[String] = change.documentKey.flatMap(_.getString("_id")) +extension [A](change: ChangeStreamDocument[A]) def docId: Option[String] = change.documentKey.flatMap(_.id) + +extension (doc: Document) + private def id: Option[String] = + doc.getString(_id) given [A: Schema]: Indexable[A] = (a: A) => writeToString(a) given Indexable[Source] = @@ -32,6 +40,26 @@ extension (instant: Instant) def asBsonTimestamp: BsonTimestamp = BsonTimestamp(instant.getEpochSecond.toInt, 1) def range(field: String)(since: Instant, until: Option[Instant]): Filter = - import Filter.* - val gtes = gte(field, since) - until.fold(gtes)(until => gtes.and(lt(field, until))) + val gtes = Filter.gte(field, since) + until.fold(gtes)(until => gtes.and(Filter.lt(field, until))) + +extension (elastic: ESClient[IO]) + @scala.annotation.targetName("deleteManyWithIds") + def deleteMany(index: Index, ids: List[Id])(using Logger[IO]): IO[Unit] = + elastic + .deleteMany(index, ids) + .flatTap(_ => Logger[IO].info(s"Deleted ${ids.size} ${index.value}s")) + .handleErrorWith: e => + Logger[IO].error(e)(s"Failed to delete ${index.value}s: ${ids.map(_.value).mkString(", ")}") + .whenA(ids.nonEmpty) + + @scala.annotation.targetName("deleteManyWithDocs") + def deleteMany(index: Index, events: List[Document])(using Logger[IO]): IO[Unit] = + info"Received ${events.size} forum posts to delete" *> + deleteMany(index, events.flatMap(_.id).map(Id.apply)) + .whenA(events.nonEmpty) + + @scala.annotation.targetName("deleteManyWithChanges") + def deleteMany(index: Index, events: List[ChangeStreamDocument[Document]])(using Logger[IO]): IO[Unit] = + info"Received ${events.size} forum posts to delete" *> + deleteMany(index, events.flatMap(_.docId).map(Id.apply)).whenA(events.nonEmpty)