Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PIN-4722: Refactor get purposes to avoid DocumentDB memory error #200

Merged
merged 2 commits into from
Mar 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package it.pagopa.interop.purposeprocess.common.readmodel

import it.pagopa.interop.commons.queue.message.Message.uuidFormat
import spray.json.DefaultJsonProtocol._
import spray.json.RootJsonFormat

import java.util.UUID

final case class ReadModelId(id: UUID)

object ReadModelId {
implicit val rmiFormat: RootJsonFormat[ReadModelId] = jsonFormat1(ReadModelId.apply)
}
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
package it.pagopa.interop.purposeprocess.common.readmodel

import it.pagopa.interop.catalogmanagement.model.CatalogItem
import it.pagopa.interop.catalogmanagement.model.persistence.JsonFormats.ciFormat
import it.pagopa.interop.commons.cqrs.service.ReadModelService
import it.pagopa.interop.purposemanagement.model.persistence.JsonFormats._
import it.pagopa.interop.purposemanagement.model.purpose._
import org.mongodb.scala.Document
import org.mongodb.scala.bson.conversions.Bson
import org.mongodb.scala.model.Aggregates._
import org.mongodb.scala.model.{Field, Filters}
import org.mongodb.scala.model.Projections.{computed, fields, include}
import org.mongodb.scala.model.Sorts.ascending
import org.mongodb.scala.model.{Field, Filters, Projections}

import java.util.UUID
import scala.concurrent.{ExecutionContext, Future}
Expand Down Expand Up @@ -60,58 +62,54 @@ object ReadModelPurposeQueries extends ReadModelQuery {
limit: Int,
exactMatchOnTitle: Boolean = false
)(implicit ec: ExecutionContext, readModel: ReadModelService): Future[PaginatedResult[PersistentPurpose]] = {

val simpleFilters: Bson = listPurposesFilters(title, eServicesIds, consumersIds, states, exactMatchOnTitle)
val query: Seq[Bson] = Seq(
`match`(simpleFilters),
lookup("eservices", "data.eserviceId", "data.id", "eservices"),
addFields(Field("eservice", Document("""{ $arrayElemAt: [ "$eservices", 0 ] }"""))),
`match`(
Filters
.and(listPurposesAuthorizationFilters(excludeDraft), listPurposesProducersFilters(producersIds))
)
)

for {
producersFilter <- producersEservicesFilter(producersIds)
query = Seq(
`match`(simpleFilters),
`match`(listPurposesAuthorizationFilters(excludeDraft)),
`match`(producersFilter)
)
// Using aggregate to perform case insensitive sorting
// N.B.: Required because DocumentDB does not support collation
purposes <- readModel.aggregate[PersistentPurpose](
purposes <- readModel.aggregate[PersistentPurpose](
"purposes",
query ++
Seq(
addFields(Field("lowerName", Document("""{ "$toLower" : "$data.title" }"""))),
sort(ascending("lowerName")),
// If requester is not Consumer or Producer, override the risk analysis form with null value
addFields(
Field(
"data.riskAnalysisForm",
Document(s"""{
| $$cond: {
| if: {
| $$or: [
| { $$eq: [ "$$data.consumerId", "${requesterId.toString}" ] },
| { $$eq: [ "$$eservice.data.producerId", "${requesterId.toString}" ] }
| ],
| },
| then: "$$data.riskAnalysisForm",
| else: null,
| },
|}""".stripMargin)
)
),
project(fields(include("data")))
),
offset = offset,
limit = limit
limit = limit,
allowDiskUse = true
)
eservices <- purposesEservices(purposes, limit)

// If requester is not Consumer or Producer, override the risk analysis form with empty value
authorizedPurposes = purposes.map(p =>
Option
.when(p.consumerId == requesterId)(())
.orElse(
eservices
.find(pe => pe.id == p.eserviceId)
.filter(_.producerId == requesterId)
.map(_ => ())
)
.fold(p.copy(riskAnalysisForm = None))(_ => p)
)

// Note: This could be obtained using $facet function (avoiding to execute the query twice),
// but it is not supported by DocumentDB
count <- readModel.aggregate[TotalCountResult](
count <- readModel.aggregate[TotalCountResult](
"purposes",
query ++ Seq(count("totalCount"), project(computed("data", Document("""{ "totalCount" : "$totalCount" }""")))),
offset = 0,
limit = Int.MaxValue
)
} yield PaginatedResult(results = purposes, totalCount = count.headOption.map(_.totalCount).getOrElse(0))
} yield PaginatedResult(results = authorizedPurposes, totalCount = count.headOption.map(_.totalCount).getOrElse(0))
}

private def listPurposesFilters(
Expand Down Expand Up @@ -160,6 +158,29 @@ object ReadModelPurposeQueries extends ReadModelQuery {
mapToVarArgs(versionsFilter :: Nil)(Filters.and).getOrElse(Filters.empty())
}

private def listPurposesProducersFilters(producersIds: Seq[String]): Bson =
mapToVarArgs(producersIds.map(Filters.eq("eservices.data.producerId", _)))(Filters.or).getOrElse(Filters.empty())
private def producersEservicesFilter(
producersIds: Seq[String]
)(implicit ec: ExecutionContext, readModel: ReadModelService): Future[Bson] = {

val producersEservicesIds: Future[Seq[ReadModelId]] =
mapToVarArgs(producersIds.map(Filters.eq("data.producerId", _)))(Filters.or)
.fold[Future[Seq[ReadModelId]]](Future.successful(Seq.empty))(filter =>
readModel
.find[ReadModelId]("eservices", filter, Projections.include("data.id"), offset = 0, limit = Int.MaxValue)
)

producersEservicesIds.map(ids =>
mapToVarArgs(ids.map(id => Filters.eq("data.eserviceId", id.id.toString)))(Filters.or).getOrElse(Filters.empty())
)
}

private def purposesEservices(purposes: Seq[PersistentPurpose], limit: Int)(implicit
ec: ExecutionContext,
readModel: ReadModelService
): Future[Seq[CatalogItem]] =
mapToVarArgs(purposes.map(p => Filters.eq("data.id", p.eserviceId.toString)))(Filters.or)
.fold[Future[Seq[CatalogItem]]](Future.successful(Seq.empty))(filter =>
readModel.find[CatalogItem]("eservices", filter, offset = 0, limit = limit)
)

}
Loading