Skip to content

Commit

Permalink
Don't send any request when lists are empty
Browse files Browse the repository at this point in the history
  • Loading branch information
lenguyenthanh committed Jun 19, 2024
1 parent 4d751a8 commit 293fb8b
Showing 1 changed file with 18 additions and 12 deletions.
30 changes: 18 additions & 12 deletions modules/ingestor/src/main/scala/ingestor.forum.scala
Original file line number Diff line number Diff line change
Expand Up @@ -68,18 +68,23 @@ object ForumIngestor:

private def storeBulk(events: List[ChangeStreamDocument[Document]]): IO[Unit] =
info"Received ${events.size} forum posts to index" *>
events.toSources
.flatMap: sources =>
elastic.storeBulk(index, sources) *> info"Indexed ${sources.size} forum posts"
.handleErrorWith: e =>
Logger[IO].error(e)(s"Failed to index forum posts: ${events.map(_.id).mkString(", ")}")
IO.whenA(events.nonEmpty):
events.toSources
.flatMap: sources =>
elastic.storeBulk(index, sources) *> info"Indexed ${sources.size} forum posts"
.handleErrorWith: e =>
Logger[IO].error(e)(s"Failed to index forum posts: ${events.map(_.id).mkString(", ")}")

private def deleteMany(events: List[ChangeStreamDocument[Document]]): IO[Unit] =
elastic
.deleteMany(index, events.flatMap(_.id.map(Id.apply)))
.flatTap(_ => info"Deleted ${events.size} forum posts")
.handleErrorWith: e =>
Logger[IO].error(e)(s"Failed to delete forum posts: ${events.map(_.id).mkString(", ")}")
info"Received ${events.size} forum posts to delete" *>
IO.whenA(events.nonEmpty):
IO(events.flatMap(_.id).map(Id.apply))
.flatMap: ids =>
elastic
.deleteMany(index, events.flatMap(_.id.map(Id.apply)))
.flatTap(_ => info"Deleted ${ids.size} forum posts")
.handleErrorWith: e =>
Logger[IO].error(e)(s"Failed to delete forum posts: ${events.map(_.id).mkString(", ")}")

private def saveLastIndexedTimestamp(time: Instant): IO[Unit] =
store.put(index.name, time)
Expand Down Expand Up @@ -113,8 +118,8 @@ object ForumIngestor:
extension (events: List[ChangeStreamDocument[Document]])
private def toSources: IO[List[(String, ForumSource)]] =
val topicIds = events.flatMap(_.topicId).distinct
if topicIds.isEmpty then info"no topics found for posts: $events".as(Nil)
else
topicIds.isEmpty.fold(
info"no topics found for posts: $events".as(Nil),
topicByIds(topicIds)
.flatMap: topicMap =>
events
Expand All @@ -126,6 +131,7 @@ object ForumIngestor:
case Some(value) => value.some.pure[IO]
case _ => info"failed to convert document to source: $event".as(none)
.map(_.flatten)
)

extension (doc: Document)
private def toSource(topicName: Option[String]): Option[ForumSource] =
Expand Down

0 comments on commit 293fb8b

Please sign in to comment.