Skip to content

Commit

Permalink
code tweak
Browse files Browse the repository at this point in the history
  • Loading branch information
lenguyenthanh committed Jun 22, 2024
1 parent 203af56 commit ace4d7a
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 20 deletions.
21 changes: 11 additions & 10 deletions modules/ingestor/src/main/scala/ingestor.forum.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,10 @@ object ForumIngestor:

private val index = Index.Forum

private val topicProjection = Projection.include(List("_id", "name"))

private val interestedOperations = List(DELETE, INSERT, REPLACE).map(_.getValue)
private val eventFilter = Filter.in("operationType", interestedOperations)

private val interestedFields = List("_id", F.text, F.topicId, F.troll, F.createdAt, F.userId, F.erasedAt)
private val interestedFields = List(_id, F.text, F.topicId, F.troll, F.createdAt, F.userId, F.erasedAt)
private val postProjection = Projection.include(interestedFields)

private val interestedEventFields =
Expand Down Expand Up @@ -86,7 +84,7 @@ object ForumIngestor:
.flatMap: sources =>
elastic.storeBulk(index, sources) *> info"Indexed ${sources.size} forum posts"
.handleErrorWith: e =>
Logger[IO].error(e)(s"Failed to index forum posts: ${docs.map(_._id).mkString(", ")}")
Logger[IO].error(e)(s"Failed to index forum posts: ${docs.map(_.id).mkString(", ")}")
.whenA(docs.nonEmpty)

private def saveLastIndexedTimestamp(time: Instant): IO[Unit] =
Expand All @@ -99,10 +97,10 @@ object ForumIngestor:
// Fetches topic names by their ids
private def topicByIds(ids: Seq[String]): IO[Map[String, String]] =
topics
.find(Filter.in("_id", ids))
.projection(topicProjection)
.find(Filter.in(_id, ids))
.projection(Projection.include(List(_id, Topic.name)))
.all
.map(_.map(doc => (doc.getString("_id") -> doc.getString("name")).mapN(_ -> _)).flatten.toMap)
.map(_.map(doc => (doc.id -> doc.getString(Topic.name)).mapN(_ -> _)).flatten.toMap)

private def changes(since: Option[Instant]): fs2.Stream[IO, List[ChangeStreamDocument[Document]]] =
val builder = posts.watch(aggregate)
Expand Down Expand Up @@ -135,16 +133,16 @@ object ForumIngestor:
extension (doc: Document)

private def toSource(topicMap: Map[String, String]): IO[Option[SourceWithId]] =
(doc._id, doc.topicId)
(doc.id, doc.topicId)
.flatMapN: (id, topicId) =>
doc.toSource(topicMap.get(topicId), topicId).map(id -> _)
.match
case Some(value) => value.some.pure[IO]
case _ =>
val reason = doc._id.fold("missing doc._id; ")(_ => "")
val reason = doc.id.fold("missing doc._id; ")(_ => "")
+ doc.topicId.fold("missing doc.topicId; ")(_ => "")
+ doc.topicId
.map(id => topicMap.get(id).fold("topic or topicName is missing")(_ => ""))
.map(id => topicMap.get(id).fold("topic or topic.name is missing")(_ => ""))
.getOrElse("")
info"failed to convert document to source: $doc because $reason".as(none)

Expand Down Expand Up @@ -175,3 +173,6 @@ object ForumIngestor:
val userId = "userId"
val erasedAt = "erasedAt"
val updatedAt = "updatedAt"

object Topic:
val name = "name"
4 changes: 2 additions & 2 deletions modules/ingestor/src/main/scala/ingestor.team.scala
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ object TeamIngestor:
info"Received ${docs.size} teams to index" *>
elastic.storeBulk(index, sources) *> info"Indexed ${sources.size} teams"
.handleErrorWith: e =>
Logger[IO].error(e)(s"Failed to index teams: ${docs.map(_._id).mkString(", ")}")
Logger[IO].error(e)(s"Failed to index teams: ${docs.map(_.id).mkString(", ")}")

private def saveLastIndexedTimestamp(time: Instant): IO[Unit] =
store.put(index.value, time)
Expand All @@ -87,7 +87,7 @@ object TeamIngestor:

extension (docs: List[Document])
private def toSources: List[(String, TeamSource)] =
docs.flatten(doc => (doc._id, doc.toSource).mapN(_ -> _))
docs.flatten(doc => (doc.id, doc.toSource).mapN(_ -> _))

extension (doc: Document)
private def toSource: Option[TeamSource] =
Expand Down
16 changes: 8 additions & 8 deletions modules/ingestor/src/main/scala/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,15 @@ import smithy4s.schema.Schema

import java.time.Instant

val _id = "_id"

type MongoCollection = GenericMongoCollection[IO, Document, [A] =>> fs2.Stream[IO, A]]

extension [A](change: ChangeStreamDocument[A])
def docId: Option[String] = change.documentKey.flatMap(_.getString("_id"))
extension [A](change: ChangeStreamDocument[A]) def docId: Option[String] = change.documentKey.flatMap(_.id)

extension (doc: Document)
private def _id: Option[String] =
doc.getString("_id")
private def id: Option[String] =
doc.getString(_id)

given [A: Schema]: Indexable[A] = (a: A) => writeToString(a)
given Indexable[Source] =
Expand All @@ -39,9 +40,8 @@ extension (instant: Instant)
def asBsonTimestamp: BsonTimestamp = BsonTimestamp(instant.getEpochSecond.toInt, 1)

def range(field: String)(since: Instant, until: Option[Instant]): Filter =
import Filter.*
val gtes = gte(field, since)
until.fold(gtes)(until => gtes.and(lt(field, until)))
val gtes = Filter.gte(field, since)
until.fold(gtes)(until => gtes.and(Filter.lt(field, until)))

extension (elastic: ESClient[IO])
@scala.annotation.targetName("deleteManyWithIds")
Expand All @@ -56,7 +56,7 @@ extension (elastic: ESClient[IO])
@scala.annotation.targetName("deleteManyWithDocs")
def deleteMany(index: Index, events: List[Document])(using Logger[IO]): IO[Unit] =
info"Received ${events.size} forum posts to delete" *>
deleteMany(index, events.flatMap(_._id).map(Id.apply))
deleteMany(index, events.flatMap(_.id).map(Id.apply))
.whenA(events.nonEmpty)

@scala.annotation.targetName("deleteManyWithChanges")
Expand Down

0 comments on commit ace4d7a

Please sign in to comment.