Skip to content

Commit

Permalink
PIN-4722: Refactor get purposes to avoid DocumentDB memory error (#200)
Browse files Browse the repository at this point in the history
  • Loading branch information
galales authored Mar 20, 2024
1 parent f0b43f1 commit b721852
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 34 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package it.pagopa.interop.purposeprocess.common.readmodel

import it.pagopa.interop.commons.queue.message.Message.uuidFormat
import spray.json.DefaultJsonProtocol._
import spray.json.RootJsonFormat

import java.util.UUID

final case class ReadModelId(id: UUID)

object ReadModelId {
implicit val rmiFormat: RootJsonFormat[ReadModelId] = jsonFormat1(ReadModelId.apply)
}
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
package it.pagopa.interop.purposeprocess.common.readmodel

import it.pagopa.interop.catalogmanagement.model.CatalogItem
import it.pagopa.interop.catalogmanagement.model.persistence.JsonFormats.ciFormat
import it.pagopa.interop.commons.cqrs.service.ReadModelService
import it.pagopa.interop.purposemanagement.model.persistence.JsonFormats._
import it.pagopa.interop.purposemanagement.model.purpose._
import org.mongodb.scala.Document
import org.mongodb.scala.bson.conversions.Bson
import org.mongodb.scala.model.Aggregates._
import org.mongodb.scala.model.{Field, Filters}
import org.mongodb.scala.model.Projections.{computed, fields, include}
import org.mongodb.scala.model.Sorts.ascending
import org.mongodb.scala.model.{Field, Filters, Projections}

import java.util.UUID
import scala.concurrent.{ExecutionContext, Future}
Expand Down Expand Up @@ -60,58 +62,54 @@ object ReadModelPurposeQueries extends ReadModelQuery {
limit: Int,
exactMatchOnTitle: Boolean = false
)(implicit ec: ExecutionContext, readModel: ReadModelService): Future[PaginatedResult[PersistentPurpose]] = {

val simpleFilters: Bson = listPurposesFilters(title, eServicesIds, consumersIds, states, exactMatchOnTitle)
val query: Seq[Bson] = Seq(
`match`(simpleFilters),
lookup("eservices", "data.eserviceId", "data.id", "eservices"),
addFields(Field("eservice", Document("""{ $arrayElemAt: [ "$eservices", 0 ] }"""))),
`match`(
Filters
.and(listPurposesAuthorizationFilters(excludeDraft), listPurposesProducersFilters(producersIds))
)
)

for {
producersFilter <- producersEservicesFilter(producersIds)
query = Seq(
`match`(simpleFilters),
`match`(listPurposesAuthorizationFilters(excludeDraft)),
`match`(producersFilter)
)
// Using aggregate to perform case insensitive sorting
// N.B.: Required because DocumentDB does not support collation
purposes <- readModel.aggregate[PersistentPurpose](
purposes <- readModel.aggregate[PersistentPurpose](
"purposes",
query ++
Seq(
addFields(Field("lowerName", Document("""{ "$toLower" : "$data.title" }"""))),
sort(ascending("lowerName")),
// If requester is not Consumer or Producer, override the risk analysis form with null value
addFields(
Field(
"data.riskAnalysisForm",
Document(s"""{
| $$cond: {
| if: {
| $$or: [
| { $$eq: [ "$$data.consumerId", "${requesterId.toString}" ] },
| { $$eq: [ "$$eservice.data.producerId", "${requesterId.toString}" ] }
| ],
| },
| then: "$$data.riskAnalysisForm",
| else: null,
| },
|}""".stripMargin)
)
),
project(fields(include("data")))
),
offset = offset,
limit = limit
limit = limit,
allowDiskUse = true
)
eservices <- purposesEservices(purposes, limit)

// If requester is not Consumer or Producer, override the risk analysis form with empty value
authorizedPurposes = purposes.map(p =>
Option
.when(p.consumerId == requesterId)(())
.orElse(
eservices
.find(pe => pe.id == p.eserviceId)
.filter(_.producerId == requesterId)
.map(_ => ())
)
.fold(p.copy(riskAnalysisForm = None))(_ => p)
)

// Note: This could be obtained using $facet function (avoiding to execute the query twice),
// but it is not supported by DocumentDB
count <- readModel.aggregate[TotalCountResult](
count <- readModel.aggregate[TotalCountResult](
"purposes",
query ++ Seq(count("totalCount"), project(computed("data", Document("""{ "totalCount" : "$totalCount" }""")))),
offset = 0,
limit = Int.MaxValue
)
} yield PaginatedResult(results = purposes, totalCount = count.headOption.map(_.totalCount).getOrElse(0))
} yield PaginatedResult(results = authorizedPurposes, totalCount = count.headOption.map(_.totalCount).getOrElse(0))
}

private def listPurposesFilters(
Expand Down Expand Up @@ -160,6 +158,29 @@ object ReadModelPurposeQueries extends ReadModelQuery {
mapToVarArgs(versionsFilter :: Nil)(Filters.and).getOrElse(Filters.empty())
}

private def listPurposesProducersFilters(producersIds: Seq[String]): Bson =
mapToVarArgs(producersIds.map(Filters.eq("eservices.data.producerId", _)))(Filters.or).getOrElse(Filters.empty())
private def producersEservicesFilter(
producersIds: Seq[String]
)(implicit ec: ExecutionContext, readModel: ReadModelService): Future[Bson] = {

val producersEservicesIds: Future[Seq[ReadModelId]] =
mapToVarArgs(producersIds.map(Filters.eq("data.producerId", _)))(Filters.or)
.fold[Future[Seq[ReadModelId]]](Future.successful(Seq.empty))(filter =>
readModel
.find[ReadModelId]("eservices", filter, Projections.include("data.id"), offset = 0, limit = Int.MaxValue)
)

producersEservicesIds.map(ids =>
mapToVarArgs(ids.map(id => Filters.eq("data.eserviceId", id.id.toString)))(Filters.or).getOrElse(Filters.empty())
)
}

private def purposesEservices(purposes: Seq[PersistentPurpose], limit: Int)(implicit
ec: ExecutionContext,
readModel: ReadModelService
): Future[Seq[CatalogItem]] =
mapToVarArgs(purposes.map(p => Filters.eq("data.id", p.eserviceId.toString)))(Filters.or)
.fold[Future[Seq[CatalogItem]]](Future.successful(Seq.empty))(filter =>
readModel.find[CatalogItem]("eservices", filter, offset = 0, limit = limit)
)

}

0 comments on commit b721852

Please sign in to comment.