diff --git a/modules/ingestor/src/main/scala/ingestor.forum.scala b/modules/ingestor/src/main/scala/ingestor.forum.scala index 720f865..a1b5d17 100644 --- a/modules/ingestor/src/main/scala/ingestor.forum.scala +++ b/modules/ingestor/src/main/scala/ingestor.forum.scala @@ -68,18 +68,23 @@ object ForumIngestor: private def storeBulk(events: List[ChangeStreamDocument[Document]]): IO[Unit] = info"Received ${events.size} forum posts to index" *> - events.toSources - .flatMap: sources => - elastic.storeBulk(index, sources) *> info"Indexed ${sources.size} forum posts" - .handleErrorWith: e => - Logger[IO].error(e)(s"Failed to index forum posts: ${events.map(_.id).mkString(", ")}") + IO.whenA(events.nonEmpty): + events.toSources + .flatMap: sources => + elastic.storeBulk(index, sources) *> info"Indexed ${sources.size} forum posts" + .handleErrorWith: e => + Logger[IO].error(e)(s"Failed to index forum posts: ${events.map(_.id).mkString(", ")}") private def deleteMany(events: List[ChangeStreamDocument[Document]]): IO[Unit] = - elastic - .deleteMany(index, events.flatMap(_.id.map(Id.apply))) - .flatTap(_ => info"Deleted ${events.size} forum posts") - .handleErrorWith: e => - Logger[IO].error(e)(s"Failed to delete forum posts: ${events.map(_.id).mkString(", ")}") + info"Received ${events.size} forum posts to delete" *> + IO.whenA(events.nonEmpty): + IO(events.flatMap(_.id).map(Id.apply)) + .flatMap: ids => + elastic + .deleteMany(index, events.flatMap(_.id.map(Id.apply))) + .flatTap(_ => info"Deleted ${ids.size} forum posts") + .handleErrorWith: e => + Logger[IO].error(e)(s"Failed to delete forum posts: ${events.map(_.id).mkString(", ")}") private def saveLastIndexedTimestamp(time: Instant): IO[Unit] = store.put(index.name, time) @@ -113,8 +118,8 @@ object ForumIngestor: extension (events: List[ChangeStreamDocument[Document]]) private def toSources: IO[List[(String, ForumSource)]] = val topicIds = events.flatMap(_.topicId).distinct - if topicIds.isEmpty then info"no topics found for posts: $events".as(Nil) - else + topicIds.isEmpty.fold( + info"no topics found for posts: $events".as(Nil), topicByIds(topicIds) .flatMap: topicMap => events @@ -126,6 +131,7 @@ object ForumIngestor: case Some(value) => value.some.pure[IO] case _ => info"failed to convert document to source: $event".as(none) .map(_.flatten) + ) extension (doc: Document) private def toSource(topicName: Option[String]): Option[ForumSource] =