diff --git a/modules/ingestor/src/main/scala/app.config.scala b/modules/ingestor/src/main/scala/app.config.scala index 43f38c0..920e8e3 100644 --- a/modules/ingestor/src/main/scala/app.config.scala +++ b/modules/ingestor/src/main/scala/app.config.scala @@ -35,12 +35,13 @@ object ElasticConfig: private def uri = env("ELASTIC_URI").or(prop("elastic.uri")).as[String].default("http://127.0.0.1:9200") def config = uri.map(ElasticConfig.apply) -case class IngestorConfig(forum: IngestorConfig.Config) +case class IngestorConfig(forum: IngestorConfig.Forum, team: IngestorConfig.Team) object IngestorConfig: - case class Config(batchSize: Int, maxBodyLength: Int, timeWindows: Int, startAt: Option[Long]) + case class Forum(batchSize: Int, maxBodyLength: Int, timeWindows: Int, startAt: Option[Long]) + case class Team(batchSize: Int, timeWindows: Int, startAt: Option[Long]) - object forum: + private object Forum: private def batchSize = env("INGESTOR_FORUM_BATCH_SIZE").or(prop("ingestor.forum.batch.size")).as[Int].default(100) private def maxBodyLength = @@ -48,7 +49,17 @@ object IngestorConfig: private def timeWindows = env("INGESTOR_FORUM_TIME_WINDOWS").or(prop("ingestor.forum.time.windows")).as[Int].default(10) private def startAt = - env("INGESTOR_FORUM_START_AT").or(prop("ingestor.forum.start.at")).as[Long].option - def config = (batchSize, maxBodyLength, timeWindows, startAt).parMapN(Config.apply) + env("INGESTOR_FORUM_START_AT") + .or(prop("ingestor.forum.start.at")).as[Long].option + def config = (batchSize, maxBodyLength, timeWindows, startAt).parMapN(Forum.apply) - def config = forum.config.map(IngestorConfig.apply) + private object Team: + private def batchSize = + env("INGESTOR_TEAM_BATCH_SIZE").or(prop("ingestor.team.batch.size")).as[Int].default(100) + private def timeWindows = + env("INGESTOR_TEAM_TIME_WINDOWS").or(prop("ingestor.team.time.windows")).as[Int].default(10) + private def startAt = + env("INGESTOR_TEAM_START_AT").or(prop("ingestor.team.start.at")).as[Long].option + def config = (batchSize, timeWindows, startAt).parMapN(Team.apply) + + def config = (Forum.config, Team.config).parMapN(IngestorConfig.apply) diff --git a/modules/ingestor/src/main/scala/ingestor.forum.scala b/modules/ingestor/src/main/scala/ingestor.forum.scala index f50ce32..0279bce 100644 --- a/modules/ingestor/src/main/scala/ingestor.forum.scala +++ b/modules/ingestor/src/main/scala/ingestor.forum.scala @@ -22,10 +22,11 @@ object ForumIngestor: private val topicProjection = Projection.include(List("_id", "name")) - val interestedOperations = List(DELETE, INSERT, UPDATE, REPLACE).map(_.getValue) - private val eventFilter = Filter.in("operationType", interestedOperations) + private val interestedOperations = List(DELETE, INSERT, UPDATE, REPLACE).map(_.getValue) + private val eventFilter = Filter.in("operationType", interestedOperations) private val eventProjection = Projection.include( List( + "clusterTime", "documentKey._id", "fullDocument.text", "fullDocument.topicId", @@ -39,12 +40,12 @@ object ForumIngestor: private val index = Index("forum") - def apply(mongo: MongoDatabase[IO], elastic: ESClient[IO], store: KVStore, config: IngestorConfig.Config)( + def apply(mongo: MongoDatabase[IO], elastic: ESClient[IO], store: KVStore, config: IngestorConfig.Forum)( using Logger[IO] ): IO[ForumIngestor] = (mongo.getCollection("f_topic"), mongo.getCollection("f_post")).mapN(apply(elastic, store, config)) - def apply(elastic: ESClient[IO], store: KVStore, config: IngestorConfig.Config)( + def apply(elastic: ESClient[IO], store: KVStore, config: IngestorConfig.Forum)( topics: MongoCollection, posts: MongoCollection )(using Logger[IO]): ForumIngestor = new: @@ -53,10 +54,10 @@ object ForumIngestor: fs2.Stream .eval(startAt.flatTap(since => info"Starting forum ingestor from $since")) .flatMap: last => - postStream(last) + changes(last) .filterNot(_.isEmpty) .evalMap: events => - val lastEventTimestamp = events.lastOption.flatMap(_.clusterTime).flatMap(_.asInstant) + val lastEventTimestamp = events.flatten(_.clusterTime.flatMap(_.asInstant)).maxOption val (toDelete, toIndex) = events.partition(_.isDelete) storeBulk(toIndex) *> deleteMany(toDelete) @@ -78,8 +79,8 @@ object ForumIngestor: Logger[IO].error(e)(s"Failed to delete forum posts: ${events.map(_.id).mkString(", ")}") private def saveLastIndexedTimestamp(time: Instant): IO[Unit] = - store.put(index.name, time.plusSeconds(1)) // +1 to avoid reindexing the same event - *> info"Stored last indexed time ${time.getEpochSecond()} for index ${index.name}" + store.put(index.name, time) + *> info"Stored last indexed time ${time.getEpochSecond} for $index" private def startAt: IO[Option[Instant]] = config.startAt.fold(store.get(index.name))(Instant.ofEpochSecond(_).some.pure[IO]) @@ -92,12 +93,16 @@ object ForumIngestor: .all .map(_.map(doc => (doc.getString("_id") -> doc.getString("name")).mapN(_ -> _)).flatten.toMap) - private def postStream(since: Option[Instant]): fs2.Stream[IO, List[ChangeStreamDocument[Document]]] = + private def changes(since: Option[Instant]): fs2.Stream[IO, List[ChangeStreamDocument[Document]]] = val builder = posts.watch(aggregate) + // skip the first event if we're starting from a specific timestamp + // since the event at that timestamp is already indexed + val skip = since.fold(0)(_ => 1) since .fold(builder)(x => builder.startAtOperationTime(x.asBsonTimestamp)) .batchSize(config.batchSize) .boundedStream(config.batchSize) + .drop(skip) .groupWithin(config.batchSize, config.timeWindows.second) .evalTap(_.traverse_(x => debug"received $x")) .map(_.toList) diff --git a/modules/ingestor/src/main/scala/kvstore.scala b/modules/ingestor/src/main/scala/kvstore.scala index bcf119a..72b4040 100644 --- a/modules/ingestor/src/main/scala/kvstore.scala +++ b/modules/ingestor/src/main/scala/kvstore.scala @@ -28,15 +28,17 @@ object KVStore: def get(key: String): IO[Option[Instant]] = read(file) .map: content => - content.get(key).map(Instant.ofEpochSecond) + content + .get(key) + .map(x => Instant.ofEpochSecond(x)) def put(key: String, value: Instant): IO[Unit] = mutex.lock.surround: read(file).flatMap: content => write( file, - content.updated(key, value.getEpochSecond + 1) - ) // +1 to avoid reindexing the same data + content.updated(key, value.getEpochSecond) + ) private def read(path: String): IO[State] = Files[IO]