Skip to content

Commit

Permalink
Correct stop/restart forum changes stream behavior
Browse files Browse the repository at this point in the history
And some minor refactor for app.config, prepare for team
  • Loading branch information
lenguyenthanh committed Jun 18, 2024
1 parent 31090c0 commit 8ba2c8b
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 18 deletions.
23 changes: 17 additions & 6 deletions modules/ingestor/src/main/scala/app.config.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,20 +35,31 @@ object ElasticConfig:
private def uri = env("ELASTIC_URI").or(prop("elastic.uri")).as[String].default("http://127.0.0.1:9200")
def config = uri.map(ElasticConfig.apply)

case class IngestorConfig(forum: IngestorConfig.Config)
case class IngestorConfig(forum: IngestorConfig.Forum, team: IngestorConfig.Team)

object IngestorConfig:
case class Config(batchSize: Int, maxBodyLength: Int, timeWindows: Int, startAt: Option[Long])
case class Forum(batchSize: Int, maxBodyLength: Int, timeWindows: Int, startAt: Option[Long])
case class Team(batchSize: Int, timeWindows: Int, startAt: Option[Long])

object forum:
private object Forum:
private def batchSize =
env("INGESTOR_FORUM_BATCH_SIZE").or(prop("ingestor.forum.batch.size")).as[Int].default(100)
private def maxBodyLength =
env("INGESTOR_FORUM_MAX_BODY_LENGTH").or(prop("ingestor.forum.max.body.length")).as[Int].default(10000)
private def timeWindows =
env("INGESTOR_FORUM_TIME_WINDOWS").or(prop("ingestor.forum.time.windows")).as[Int].default(10)
private def startAt =
env("INGESTOR_FORUM_START_AT").or(prop("ingestor.forum.start.at")).as[Long].option
def config = (batchSize, maxBodyLength, timeWindows, startAt).parMapN(Config.apply)
env("INGESTOR_FORUM_START_AT")
.or(prop("ingestor.forum.start.at")).as[Long].option
def config = (batchSize, maxBodyLength, timeWindows, startAt).parMapN(Forum.apply)

def config = forum.config.map(IngestorConfig.apply)
private object Team:
private def batchSize =
env("INGESTOR_TEAM_BATCH_SIZE").or(prop("ingestor.team.batch.size")).as[Int].default(100)
private def timeWindows =
env("INGESTOR_TEAM_TIME_WINDOWS").or(prop("ingestor.team.time.windows")).as[Int].default(10)
private def startAt =
env("INGESTOR_TEAM_START_AT").or(prop("ingestor.team.start.at")).as[Long].option
def config = (batchSize, timeWindows, startAt).parMapN(Team.apply)

def config = (Forum.config, Team.config).parMapN(IngestorConfig.apply)
23 changes: 14 additions & 9 deletions modules/ingestor/src/main/scala/ingestor.forum.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,11 @@ object ForumIngestor:

private val topicProjection = Projection.include(List("_id", "name"))

val interestedOperations = List(DELETE, INSERT, UPDATE, REPLACE).map(_.getValue)
private val eventFilter = Filter.in("operationType", interestedOperations)
private val interestedOperations = List(DELETE, INSERT, UPDATE, REPLACE).map(_.getValue)
private val eventFilter = Filter.in("operationType", interestedOperations)
private val eventProjection = Projection.include(
List(
"clusterTime",
"documentKey._id",
"fullDocument.text",
"fullDocument.topicId",
Expand All @@ -39,12 +40,12 @@ object ForumIngestor:

private val index = Index("forum")

def apply(mongo: MongoDatabase[IO], elastic: ESClient[IO], store: KVStore, config: IngestorConfig.Config)(
def apply(mongo: MongoDatabase[IO], elastic: ESClient[IO], store: KVStore, config: IngestorConfig.Forum)(
using Logger[IO]
): IO[ForumIngestor] =
(mongo.getCollection("f_topic"), mongo.getCollection("f_post")).mapN(apply(elastic, store, config))

def apply(elastic: ESClient[IO], store: KVStore, config: IngestorConfig.Config)(
def apply(elastic: ESClient[IO], store: KVStore, config: IngestorConfig.Forum)(
topics: MongoCollection,
posts: MongoCollection
)(using Logger[IO]): ForumIngestor = new:
Expand All @@ -53,10 +54,10 @@ object ForumIngestor:
fs2.Stream
.eval(startAt.flatTap(since => info"Starting forum ingestor from $since"))
.flatMap: last =>
postStream(last)
changes(last)
.filterNot(_.isEmpty)
.evalMap: events =>
val lastEventTimestamp = events.lastOption.flatMap(_.clusterTime).flatMap(_.asInstant)
val lastEventTimestamp = events.flatten(_.clusterTime.flatMap(_.asInstant)).maxOption
val (toDelete, toIndex) = events.partition(_.isDelete)
storeBulk(toIndex)
*> deleteMany(toDelete)
Expand All @@ -78,8 +79,8 @@ object ForumIngestor:
Logger[IO].error(e)(s"Failed to delete forum posts: ${events.map(_.id).mkString(", ")}")

private def saveLastIndexedTimestamp(time: Instant): IO[Unit] =
store.put(index.name, time.plusSeconds(1)) // +1 to avoid reindexing the same event
*> info"Stored last indexed time ${time.getEpochSecond()} for index ${index.name}"
store.put(index.name, time)
*> info"Stored last indexed time ${time.getEpochSecond} for $index"

private def startAt: IO[Option[Instant]] =
config.startAt.fold(store.get(index.name))(Instant.ofEpochSecond(_).some.pure[IO])
Expand All @@ -92,12 +93,16 @@ object ForumIngestor:
.all
.map(_.map(doc => (doc.getString("_id") -> doc.getString("name")).mapN(_ -> _)).flatten.toMap)

private def postStream(since: Option[Instant]): fs2.Stream[IO, List[ChangeStreamDocument[Document]]] =
private def changes(since: Option[Instant]): fs2.Stream[IO, List[ChangeStreamDocument[Document]]] =
val builder = posts.watch(aggregate)
// skip the first event if we're starting from a specific timestamp
// since the event at that timestamp is already indexed
val skip = since.fold(0)(_ => 1)
since
.fold(builder)(x => builder.startAtOperationTime(x.asBsonTimestamp))
.batchSize(config.batchSize)
.boundedStream(config.batchSize)
.drop(skip)
.groupWithin(config.batchSize, config.timeWindows.second)
.evalTap(_.traverse_(x => debug"received $x"))
.map(_.toList)
Expand Down
8 changes: 5 additions & 3 deletions modules/ingestor/src/main/scala/kvstore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,17 @@ object KVStore:
def get(key: String): IO[Option[Instant]] =
read(file)
.map: content =>
content.get(key).map(Instant.ofEpochSecond)
content
.get(key)
.map(x => Instant.ofEpochSecond(x))

def put(key: String, value: Instant): IO[Unit] =
mutex.lock.surround:
read(file).flatMap: content =>
write(
file,
content.updated(key, value.getEpochSecond + 1)
) // +1 to avoid reindexing the same data
content.updated(key, value.getEpochSecond)
)

private def read(path: String): IO[State] =
Files[IO]
Expand Down

0 comments on commit 8ba2c8b

Please sign in to comment.