Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

15766: item sent event #15920

Merged
merged 5 commits into from
Sep 25, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 47 additions & 0 deletions prime-router/src/main/kotlin/azure/ActionHistory.kt
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import com.microsoft.azure.functions.HttpRequestMessage
import com.microsoft.azure.functions.HttpResponseMessage
import com.microsoft.azure.functions.HttpStatusType
import com.networknt.org.apache.commons.validator.routines.InetAddressValidator
import fhirengine.engine.CustomFhirPathFunctions
import gov.cdc.prime.reportstream.shared.BlobUtils
import gov.cdc.prime.router.ActionLog
import gov.cdc.prime.router.ActionLogLevel
import gov.cdc.prime.router.ClientSource
Expand All @@ -25,11 +27,16 @@ import gov.cdc.prime.router.azure.db.tables.pojos.ItemLineage
import gov.cdc.prime.router.azure.db.tables.pojos.ReportFile
import gov.cdc.prime.router.azure.db.tables.pojos.ReportLineage
import gov.cdc.prime.router.azure.db.tables.pojos.Task
import gov.cdc.prime.router.azure.observability.bundleDigest.BundleDigestExtractor
import gov.cdc.prime.router.azure.observability.bundleDigest.FhirPathBundleDigestLabResultExtractorStrategy
import gov.cdc.prime.router.azure.observability.event.IReportStreamEventService
import gov.cdc.prime.router.azure.observability.event.ReportStreamEventName
import gov.cdc.prime.router.azure.observability.event.ReportStreamEventProperties
import gov.cdc.prime.router.common.AzureHttpUtils.getSenderIP
import gov.cdc.prime.router.common.JacksonMapperUtilities
import gov.cdc.prime.router.fhirengine.translation.hl7.utils.CustomContext
import gov.cdc.prime.router.fhirengine.utils.FhirTranscoder
import gov.cdc.prime.router.report.ReportService
import io.ktor.http.HttpStatusCode
import org.apache.logging.log4j.kotlin.Logging
import org.jooq.impl.SQLDataType
Expand Down Expand Up @@ -565,6 +572,7 @@ class ActionHistory(
result: String,
header: WorkflowEngine.Header,
reportEventService: IReportStreamEventService,
reportService: ReportService,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would this be better as a class member?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was really messy with how the action history gets instantiated to do it that way (also why I'm passed in the reportEventService that way originally). I was bucketing this is in things to clean up if we refactor how some this works

transportType: String,
) {
if (isReportAlreadyTracked(sentReportId)) {
Expand Down Expand Up @@ -616,6 +624,45 @@ class ActionHistory(
)
}

val lineages = Report.createItemLineagesFromDb(header, sentReportId)
lineages?.forEach { itemLineage ->
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This algorithm is okay for now, but just thinking out loud, downloading blobs and parsing them just to get Item information EVERY TIME we need it in the various steps seems inefficient. If we were storing "Item" in the database, the bundle digest is something that would fit nicely there and then we could just get it from the DB (given it does not contain PII). Alternatively, we could also use the covid pipelines metadata table strategy probably.

I don't think any of this is worth considering until we gather the external message monitoring requirements.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense to me. My ultimate preference would to have a clearer delineation of the uses cases since this doesn't even feel like monitoring and is more an analytics/business reporting question

val receiverFilterReportFile = reportService.getReportsForStep(
itemLineage.parentReportId,
itemLineage.parentIndex,
TaskAction.receiver_filter
)
if (receiverFilterReportFile != null) {
val blob = BlobAccess.downloadBlob(
receiverFilterReportFile.bodyUrl,
BlobUtils.digestToString(receiverFilterReportFile.blobDigest)
)
val bundle = FhirTranscoder.decode(blob)
val bundleDigestExtractor = BundleDigestExtractor(
FhirPathBundleDigestLabResultExtractorStrategy(
CustomContext(
bundle,
bundle,
mutableMapOf(),
CustomFhirPathFunctions()
)
)
)
reportEventService.sendItemEvent(ReportStreamEventName.ITEM_SENT, reportFile, TaskAction.send) {
trackingId(bundle)
parentReportId(header.reportFile.reportId)
childItemIndex(itemLineage.childIndex)
params(
mapOf(
ReportStreamEventProperties.BUNDLE_DIGEST
to bundleDigestExtractor.generateDigest(bundle),
ReportStreamEventProperties.RECEIVER_NAME to receiver.fullName,
)
)
}
} else {
logger.error("No translate report found for sent item.")
}
}
reportsOut[reportFile.reportId] = reportFile
}

Expand Down
3 changes: 2 additions & 1 deletion prime-router/src/main/kotlin/azure/SendFunction.kt
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,8 @@ class SendFunction(
retryItems,
context,
actionHistory,
reportEventService
reportEventService,
workflowEngine.reportService
)
if (nextRetry != null) {
nextRetryItems += nextRetry
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ enum class ReportStreamEventName {
ITEM_ROUTED,
REPORT_LAST_MILE_FAILURE,
REPORT_NOT_PROCESSABLE,
ITEM_SENT,
}

/**
Expand Down
53 changes: 44 additions & 9 deletions prime-router/src/main/kotlin/history/db/ReportGraph.kt
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import org.jooq.CommonTableExpression
import org.jooq.DSLContext
import org.jooq.Record
import org.jooq.Record1
import org.jooq.Record2
import org.jooq.SelectOnConditionStep
import org.jooq.impl.CustomRecord
import org.jooq.impl.CustomTable
Expand Down Expand Up @@ -213,6 +212,17 @@ class ReportGraph(
return descendantReportRecords(txn, cte, searchedForTaskActions).fetchInto(ReportFile::class.java)
}

fun getAncestorReports(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

kdoc?

txn: DataAccessTransaction,
childReportId: UUID,
childIndex: Int,
searchedForTaskActions: Set<TaskAction>? = null,
): ReportFile? {
val cte = itemAncestorGraphCommonTableExpression(childReportId, childIndex)

return ancestorReportRecords(txn, cte, searchedForTaskActions).fetchOneInto(ReportFile::class.java)
}

/**
* Returns all the metadata rows associated with the passed in [ItemGraphRecord]
*
Expand Down Expand Up @@ -421,19 +431,15 @@ class ReportGraph(
*/
fun reportAncestorGraphCommonTableExpression(childReportIds: List<UUID>) =
DSL.name(lineageCteName).fields(
PARENT_REPORT_ID_FIELD,
PATH_FIELD
PARENT_REPORT_ID_FIELD
).`as`(
DSL.select(
REPORT_LINEAGE.PARENT_REPORT_ID,
REPORT_LINEAGE.CHILD_REPORT_ID.cast(SQLDataType.VARCHAR),
REPORT_LINEAGE.PARENT_REPORT_ID
).from(REPORT_LINEAGE)
.where(REPORT_LINEAGE.CHILD_REPORT_ID.`in`(childReportIds))
.unionAll(
DSL.select(
REPORT_LINEAGE.PARENT_REPORT_ID,
DSL.field("$lineageCteName.$PATH_FIELD", SQLDataType.VARCHAR)
.concat(REPORT_LINEAGE.PARENT_REPORT_ID)
REPORT_LINEAGE.PARENT_REPORT_ID
)
.from(REPORT_LINEAGE)
.join(DSL.table(DSL.name(lineageCteName)))
Expand All @@ -454,7 +460,7 @@ class ReportGraph(
*/
private fun rootReportRecords(
txn: DataAccessTransaction,
cte: CommonTableExpression<Record2<UUID, String>>,
cte: CommonTableExpression<Record1<UUID>>,
) = DSL.using(txn)
.withRecursive(cte)
.select(REPORT_FILE.asterisk())
Expand Down Expand Up @@ -520,4 +526,33 @@ class ReportGraph(

return select
}

/**
* Fetches all descendant report records in a recursive manner.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"ancestor" instead of "descendant"?

*
* @param txn the data access transaction
* @param cte the common table expression for report lineage
* @return the descendant report records
*/
private fun ancestorReportRecords(
txn: DataAccessTransaction,
cte: CommonTableExpression<ItemGraphRecord>,
searchedForTaskActions: Set<TaskAction>?,
): SelectOnConditionStep<Record> {
val select = DSL.using(txn)
.withRecursive(cte)
.select(REPORT_FILE.asterisk())
.distinctOn(REPORT_FILE.REPORT_ID)
.from(cte)
.join(REPORT_FILE)
.on(REPORT_FILE.REPORT_ID.eq(ItemGraphTable.ITEM_GRAPH.PARENT_REPORT_ID))
.join(ACTION)
.on(ACTION.ACTION_ID.eq(REPORT_FILE.ACTION_ID))

if (searchedForTaskActions != null) {
select.where(ACTION.ACTION_NAME.`in`(searchedForTaskActions))
}

return select
}
}
9 changes: 8 additions & 1 deletion prime-router/src/main/kotlin/report/ReportService.kt
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package gov.cdc.prime.router.report

import gov.cdc.prime.router.ReportId
import gov.cdc.prime.router.azure.DatabaseAccess
import gov.cdc.prime.router.azure.db.enums.TaskAction
import gov.cdc.prime.router.azure.db.tables.pojos.ReportFile
import gov.cdc.prime.router.common.BaseEngine
import gov.cdc.prime.router.history.db.ReportGraph
Expand Down Expand Up @@ -47,7 +48,13 @@ class ReportService(
* @return List of ReportFile objects of the root reports
*/
fun getRootReports(childReportId: ReportId): List<ReportFile> {
return reportGraph.getRootReports(childReportId)
return reportGraph.getRootReports(childReportId).distinctBy { it.reportId }
}

fun getReportsForStep(childReportId: ReportId, childIndex: Int, task: TaskAction): ReportFile? {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

kdoc? Also confusing because query only returns one but method name is getReports?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, little copy pasta; this actually fetches a specific report for an item and action. Will update the name

return db.transactReturning { txn ->
reportGraph.getAncestorReports(txn, childReportId, childIndex, setOf(task))
}
}

/**
Expand Down
3 changes: 3 additions & 0 deletions prime-router/src/main/kotlin/transport/AS2Transport.kt
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import gov.cdc.prime.router.azure.observability.event.IReportStreamEventService
import gov.cdc.prime.router.credentials.CredentialHelper
import gov.cdc.prime.router.credentials.CredentialRequestReason
import gov.cdc.prime.router.credentials.UserJksCredential
import gov.cdc.prime.router.report.ReportService
import org.apache.hc.core5.util.Timeout
import org.apache.http.conn.ConnectTimeoutException
import org.apache.logging.log4j.kotlin.Logging
Expand Down Expand Up @@ -50,6 +51,7 @@ class AS2Transport(val metadata: Metadata? = null) : ITransport, Logging {
context: ExecutionContext,
actionHistory: ActionHistory,
reportEventService: IReportStreamEventService,
reportService: ReportService,
): RetryItems? {
// DevNote: This code is similar to the SFTP code in structure
//
Expand Down Expand Up @@ -78,6 +80,7 @@ class AS2Transport(val metadata: Metadata? = null) : ITransport, Logging {
msg,
header,
reportEventService,
reportService,
this::class.java.simpleName
)
actionHistory.trackItemLineages(Report.createItemLineagesFromDb(header, sentReportId))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import gov.cdc.prime.router.azure.BlobAccess
import gov.cdc.prime.router.azure.WorkflowEngine
import gov.cdc.prime.router.azure.db.enums.TaskAction
import gov.cdc.prime.router.azure.observability.event.IReportStreamEventService
import gov.cdc.prime.router.report.ReportService

class BlobStoreTransport : ITransport {
override fun send(
Expand All @@ -21,6 +22,7 @@ class BlobStoreTransport : ITransport {
context: ExecutionContext,
actionHistory: ActionHistory,
reportEventService: IReportStreamEventService,
reportService: ReportService,
): RetryItems? {
val blobTransportType = transportType as BlobStoreTransportType
val envVar: String = blobTransportType.containerName
Expand All @@ -41,6 +43,7 @@ class BlobStoreTransport : ITransport {
msg,
header,
reportEventService,
reportService,
this::class.java.simpleName
)
actionHistory.trackItemLineages(Report.createItemLineagesFromDb(header, sentReportId))
Expand Down
2 changes: 2 additions & 0 deletions prime-router/src/main/kotlin/transport/EmailTransport.kt
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import gov.cdc.prime.router.TransportType
import gov.cdc.prime.router.azure.ActionHistory
import gov.cdc.prime.router.azure.WorkflowEngine
import gov.cdc.prime.router.azure.observability.event.IReportStreamEventService
import gov.cdc.prime.router.report.ReportService
import org.thymeleaf.TemplateEngine
import org.thymeleaf.context.Context
import org.thymeleaf.templateresolver.StringTemplateResolver
Expand All @@ -33,6 +34,7 @@ class EmailTransport : ITransport {
context: ExecutionContext,
actionHistory: ActionHistory, // not used by emailer
reportEventService: IReportStreamEventService,
reportService: ReportService,
): RetryItems? {
val emailTransport = transportType as EmailTransportType
val content = buildContent(header)
Expand Down
11 changes: 9 additions & 2 deletions prime-router/src/main/kotlin/transport/GAENTransport.kt
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import gov.cdc.prime.router.common.HttpClientUtils
import gov.cdc.prime.router.credentials.CredentialHelper
import gov.cdc.prime.router.credentials.CredentialRequestReason
import gov.cdc.prime.router.credentials.UserApiKeyCredential
import gov.cdc.prime.router.report.ReportService
import io.ktor.client.HttpClient
import io.ktor.http.ContentType
import io.ktor.http.HttpStatusCode
Expand Down Expand Up @@ -78,6 +79,7 @@ class GAENTransport(val httpClient: HttpClient? = null) : ITransport, Logging {
context: ExecutionContext,
actionHistory: ActionHistory,
reportEventService: IReportStreamEventService,
reportService: ReportService,
): RetryItems? {
val gaenTransportInfo = transportType as GAENTransportType
val reportId = header.reportFile.reportId
Expand Down Expand Up @@ -106,7 +108,7 @@ class GAENTransport(val httpClient: HttpClient? = null) : ITransport, Logging {

// Record the work in history and logs
when (postResult) {
PostResult.SUCCESS -> recordFullSuccess(params, reportEventService)
PostResult.SUCCESS -> recordFullSuccess(params, reportEventService, reportService)
PostResult.RETRY -> recordFailureWithRetry(params)
PostResult.FAIL -> recordFailure(params)
}
Expand All @@ -123,7 +125,11 @@ class GAENTransport(val httpClient: HttpClient? = null) : ITransport, Logging {
/**
* Record in [ActionHistory] the full success of this notification. Log an info message as well.
*/
private fun recordFullSuccess(params: SendParams, reportEventService: IReportStreamEventService) {
private fun recordFullSuccess(
params: SendParams,
reportEventService: IReportStreamEventService,
reportService: ReportService,
) {
val msg = "${params.receiver.fullName}: Successful exposure notifications of ${params.comboId}"
val history = params.actionHistory
params.context.logger.info(msg)
Expand All @@ -137,6 +143,7 @@ class GAENTransport(val httpClient: HttpClient? = null) : ITransport, Logging {
msg,
params.header,
reportEventService,
reportService,
this::class.java.simpleName
)
history.trackItemLineages(Report.createItemLineagesFromDb(params.header, params.sentReportId))
Expand Down
2 changes: 2 additions & 0 deletions prime-router/src/main/kotlin/transport/ITransport.kt
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import gov.cdc.prime.router.TransportType
import gov.cdc.prime.router.azure.ActionHistory
import gov.cdc.prime.router.azure.WorkflowEngine
import gov.cdc.prime.router.azure.observability.event.IReportStreamEventService
import gov.cdc.prime.router.report.ReportService

interface ITransport {
/**
Expand All @@ -26,5 +27,6 @@ interface ITransport {
context: ExecutionContext,
actionHistory: ActionHistory,
reportEventService: IReportStreamEventService,
reportService: ReportService,
): RetryItems?
}
3 changes: 3 additions & 0 deletions prime-router/src/main/kotlin/transport/NullTransport.kt
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import gov.cdc.prime.router.TransportType
import gov.cdc.prime.router.azure.ActionHistory
import gov.cdc.prime.router.azure.WorkflowEngine
import gov.cdc.prime.router.azure.observability.event.IReportStreamEventService
import gov.cdc.prime.router.report.ReportService

/**
* The Null transport is intended for testing and benchmarking purposes.
Expand All @@ -21,6 +22,7 @@ class NullTransport : ITransport {
context: ExecutionContext,
actionHistory: ActionHistory,
reportEventService: IReportStreamEventService,
reportService: ReportService,
): RetryItems? {
if (header.content == null) error("No content for report ${header.reportFile.reportId}")
val receiver = header.receiver ?: error("No receiver defined for report ${header.reportFile.reportId}")
Expand All @@ -34,6 +36,7 @@ class NullTransport : ITransport {
msg,
header,
reportEventService,
reportService,
this::class.java.simpleName
)
actionHistory.trackItemLineages(Report.createItemLineagesFromDb(header, sentReportId))
Expand Down
3 changes: 3 additions & 0 deletions prime-router/src/main/kotlin/transport/RESTTransport.kt
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import gov.cdc.prime.router.credentials.UserApiKeyCredential
import gov.cdc.prime.router.credentials.UserAssertionCredential
import gov.cdc.prime.router.credentials.UserJksCredential
import gov.cdc.prime.router.credentials.UserPassCredential
import gov.cdc.prime.router.report.ReportService
import gov.cdc.prime.router.tokens.AuthUtils
import io.ktor.client.HttpClient
import io.ktor.client.call.body
Expand Down Expand Up @@ -93,6 +94,7 @@ class RESTTransport(private val httpClient: HttpClient? = null) : ITransport {
context: ExecutionContext,
actionHistory: ActionHistory,
reportEventService: IReportStreamEventService,
reportService: ReportService,
): RetryItems? {
val logger: Logger = context.logger

Expand Down Expand Up @@ -157,6 +159,7 @@ class RESTTransport(private val httpClient: HttpClient? = null) : ITransport {
msg,
header,
reportEventService,
reportService,
this::class.java.simpleName
)
actionHistory.trackItemLineages(Report.createItemLineagesFromDb(header, sentReportId))
Expand Down
Loading