Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

16394 add ack functionality #16552

Merged
merged 22 commits into from
Dec 4, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
eab9461
16394 add ack functionality
jalbinson Nov 15, 2024
5deaf7c
tweaks
jalbinson Nov 15, 2024
d524858
better handle segments
jalbinson Nov 15, 2024
77f3aed
check diff
jalbinson Nov 15, 2024
a91a0f7
timezone fix
jalbinson Nov 15, 2024
f99f5f8
PR feedback
jalbinson Nov 21, 2024
23f2ba1
Merge branch 'main' into platform/jamie/16394-ack
jalbinson Nov 21, 2024
9c4b060
cleanup and comments
jalbinson Nov 22, 2024
dde2274
Merge branch 'platform/jamie/16394-ack' of github.com:CDCgov/prime-re…
jalbinson Nov 22, 2024
0ec2e47
Merge branch 'main' into platform/jamie/16394-ack
jalbinson Nov 25, 2024
1f31755
Merge branch 'main' into platform/jamie/16394-ack
jalbinson Nov 27, 2024
1055b00
Merge branch 'main' into platform/jamie/16394-ack
jalbinson Dec 3, 2024
d656e8d
arnej feedback
jalbinson Dec 3, 2024
1cc9c55
Merge branch 'platform/jamie/16394-ack' of github.com:CDCgov/prime-re…
jalbinson Dec 3, 2024
1c060e9
Merge branch 'main' into platform/jamie/16394-ack
jalbinson Dec 3, 2024
9da6790
Merge branch 'main' into platform/jamie/16394-ack
jalbinson Dec 3, 2024
f45eb3e
more arnej feedback
jalbinson Dec 3, 2024
e922498
Merge branch 'platform/jamie/16394-ack' of github.com:CDCgov/prime-re…
jalbinson Dec 3, 2024
813dc4c
Merge branch 'main' into platform/jamie/16394-ack
jalbinson Dec 3, 2024
1333e1f
Merge branch 'main' into platform/jamie/16394-ack
jalbinson Dec 3, 2024
f205823
Merge branch 'main' into platform/jamie/16394-ack
jalbinson Dec 4, 2024
5ea93e2
Merge branch 'main' into platform/jamie/16394-ack
jalbinson Dec 4, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 8 additions & 71 deletions prime-router/src/main/kotlin/azure/ReportFunction.kt
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import com.fasterxml.jackson.databind.SerializationFeature
import com.fasterxml.jackson.databind.json.JsonMapper
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule
import com.github.ajalt.clikt.core.CliktError
import com.google.common.net.HttpHeaders
import com.microsoft.azure.functions.HttpMethod
import com.microsoft.azure.functions.HttpRequestMessage
import com.microsoft.azure.functions.HttpResponseMessage
Expand All @@ -30,21 +29,18 @@ import gov.cdc.prime.router.Sender.ProcessingType
import gov.cdc.prime.router.SubmissionReceiver
import gov.cdc.prime.router.UniversalPipelineReceiver
import gov.cdc.prime.router.azure.BlobAccess.Companion.getBlobContainer
import gov.cdc.prime.router.azure.HttpUtilities.Companion.isSuccessful
import gov.cdc.prime.router.azure.db.enums.TaskAction
import gov.cdc.prime.router.azure.db.tables.pojos.ReportFile
import gov.cdc.prime.router.azure.observability.event.IReportStreamEventService
import gov.cdc.prime.router.azure.observability.event.ReportStreamEventName
import gov.cdc.prime.router.azure.observability.event.ReportStreamEventProperties
import gov.cdc.prime.router.azure.observability.event.ReportStreamEventService
import gov.cdc.prime.router.azure.service.SubmissionResponseBuilder
import gov.cdc.prime.router.cli.PIIRemovalCommands
import gov.cdc.prime.router.cli.ProcessFhirCommands
import gov.cdc.prime.router.common.AzureHttpUtils.getSenderIP
import gov.cdc.prime.router.common.Environment
import gov.cdc.prime.router.common.JacksonMapperUtilities
import gov.cdc.prime.router.fhirengine.translation.hl7.utils.HL7ACKUtils
import gov.cdc.prime.router.fhirengine.utils.FhirTranscoder
import gov.cdc.prime.router.history.DetailedSubmissionHistory
import gov.cdc.prime.router.history.azure.SubmissionsFacade
import gov.cdc.prime.router.tokens.AuthenticatedClaims
import gov.cdc.prime.router.tokens.Scope
Expand All @@ -71,7 +67,7 @@ class ReportFunction(
workflowEngine.azureEventService,
workflowEngine.reportService
),
private val hl7ACKUtils: HL7ACKUtils = HL7ACKUtils(),
private val submissionResponseBuilder: SubmissionResponseBuilder = SubmissionResponseBuilder(),
) : RequestFunction(workflowEngine),
Logging {

Expand Down Expand Up @@ -605,7 +601,12 @@ class ReportFunction(
SubmissionsFacade.instance.findDetailedSubmissionHistory(txn, null, actionHistory.action)
}

val response = buildResponse(request, httpStatus, submission, sender)
val response = submissionResponseBuilder.buildResponse(
sender,
httpStatus,
request,
submission
)

// queue messages here after all task / action records are in
actionHistory.queueMessages(workflowEngine)
Expand All @@ -625,68 +626,4 @@ class ReportFunction(
}
}
}

/**
* Returns an ACK response if required. Otherwise returns a JSON response.
*/
private fun buildResponse(
request: HttpRequestMessage<String?>,
responseStatus: HttpStatus,
submission: DetailedSubmissionHistory?,
sender: Sender,
): HttpResponseMessage {
return handleAckRequest(request, responseStatus, sender) ?: request
.createResponseBuilder(responseStatus)
.header(HttpHeaders.CONTENT_TYPE, "application/json")
.body(
JacksonMapperUtilities.allowUnknownsMapper
.writeValueAsString(submission)
)
.header(
HttpHeaders.LOCATION,
request.uri.resolve(
"/api/waters/report/${submission?.reportId}/history"
).toString()
)
.build()
}

/**
* This function will return an HL7 ACK response if the following conditions are met:
* - The sender has the "hl7AcknowledgementEnabled" field set to true
* - The HL7 message has been processed successfully
* - The submitted HL7 contains MSH.15 == "AL"
*
* @return ACK response or null
*/
private fun handleAckRequest(
request: HttpRequestMessage<String?>,
responseStatus: HttpStatus,
sender: Sender,
): HttpResponseMessage? {
// Azure handles all headers as lowercase
val contentType = request.headers[HttpHeaders.CONTENT_TYPE.lowercase()]
val requestBody = request.body
return if (
sender.hl7AcknowledgementEnabled &&
responseStatus.isSuccessful() &&
contentType == HttpUtilities.hl7V2MediaType &&
requestBody != null
) {
try {
hl7ACKUtils.generateOutgoingACKMessageIfRequired(requestBody)?.let { responseBody ->
logger.info("Creating HL7 ACK response")
request.createResponseBuilder(responseStatus)
.header(HttpHeaders.CONTENT_TYPE, HttpUtilities.hl7V2MediaType)
.body(responseBody)
.build()
}
} catch (ex: Exception) {
logger.warn("Error checking for HL7 ACK.", ex)
null
}
} else {
null
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
package gov.cdc.prime.router.azure.service

import ca.uhn.hl7v2.model.Message
import com.google.common.net.HttpHeaders
import com.microsoft.azure.functions.HttpRequestMessage
import com.microsoft.azure.functions.HttpResponseMessage
import com.microsoft.azure.functions.HttpStatus
import gov.cdc.prime.router.ActionLogger
import gov.cdc.prime.router.Sender
import gov.cdc.prime.router.azure.HttpUtilities
import gov.cdc.prime.router.azure.HttpUtilities.Companion.isSuccessful
import gov.cdc.prime.router.common.JacksonMapperUtilities
import gov.cdc.prime.router.fhirengine.translation.hl7.utils.HL7ACKUtils
import gov.cdc.prime.router.fhirengine.utils.HL7Reader
import gov.cdc.prime.router.history.DetailedSubmissionHistory
import org.apache.logging.log4j.kotlin.Logging

/**
* Builder class to create either JSON or HL7 response types based on the contents of the
* submitted reports
*/
class SubmissionResponseBuilder(
private val hL7ACKUtils: HL7ACKUtils = HL7ACKUtils(),
) : Logging {

/**
* Builds a response to send to the client after submitting a report
*
* This will be an HL7 ACK response given the client has enabled it and requested it. It will otherwise
* default to our default JSON response
*/
fun buildResponse(
sender: Sender,
responseStatus: HttpStatus,
request: HttpRequestMessage<String?>,
submission: DetailedSubmissionHistory?,
): HttpResponseMessage {
// Azure handles all headers as lowercase
val contentType = request.headers[HttpHeaders.CONTENT_TYPE.lowercase()]
val requestBody = request.body
return when (val responseType = determineResponseType(sender, responseStatus, contentType, requestBody)) {
is HL7ResponseType -> {
logger.info("Returning ACK response")
val responseBody = hL7ACKUtils.generateOutgoingACKMessage(responseType.message)
request.createResponseBuilder(responseStatus)
.header(HttpHeaders.CONTENT_TYPE, HttpUtilities.hl7V2MediaType)
.body(responseBody)
.build()
}
is JsonResponseType -> {
val responseBody = JacksonMapperUtilities
.allowUnknownsMapper
.writeValueAsString(submission)
request
.createResponseBuilder(responseStatus)
.header(HttpHeaders.CONTENT_TYPE, HttpUtilities.jsonMediaType)
.body(responseBody)
.header(
HttpHeaders.LOCATION,
request.uri.resolve(
"/api/waters/report/${submission?.reportId}/history"
).toString()
)
.build()
}
}
}

private fun determineResponseType(
jalbinson marked this conversation as resolved.
Show resolved Hide resolved
sender: Sender,
responseStatus: HttpStatus,
contentType: String?,
requestBody: String?,
): SubmissionResponseType {
val maybeACKMessage = hl7SuccessResponseRequired(sender, responseStatus, contentType, requestBody)
return when {
maybeACKMessage != null -> HL7ResponseType(maybeACKMessage)
else -> JsonResponseType
}
}

/**
* This function will return true if the following conditions are met:
* - The sender has the "hl7AcknowledgementEnabled" field set to true
* - The HL7 message has been processed successfully
* - The submitted HL7 contains MSH.15 == "AL"
* - The submitted HL7 is not a batch message
*
* @return HL7 message if ACK required or null otherwise
*/
private fun hl7SuccessResponseRequired(
sender: Sender,
responseStatus: HttpStatus,
contentType: String?,
requestBody: String?,
): Message? {
val acceptAcknowledgmentTypeRespondValues = setOf("AL") // AL means "Always"
return if (
sender.hl7AcknowledgementEnabled &&
responseStatus.isSuccessful() &&
contentType == HttpUtilities.hl7V2MediaType &&
requestBody != null
) {
val hl7Reader = HL7Reader(ActionLogger())
val messages = hl7Reader.getMessages(requestBody)
val isBatch = hl7Reader.isBatch(requestBody, messages.size)

if (!isBatch && messages.size == 1) {
val message = messages.first()
val acceptAcknowledgementType = HL7Reader.getAcceptAcknowledgmentType(message)
val ackResponseRequired = acceptAcknowledgmentTypeRespondValues.contains(acceptAcknowledgementType)
if (ackResponseRequired) {
message
} else {
null
}
} else {
null
}
} else {
null
}
}
}

/**
* Rather than an enum, we have used a hierarchy that allows an HL7 response type to hold onto the already
* parsed message during our check of MSH.15 to avoid doing that work twice.
*/
private sealed interface SubmissionResponseType
private data class HL7ResponseType(val message: Message) : SubmissionResponseType
private data object JsonResponseType : SubmissionResponseType
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package gov.cdc.prime.router.fhirengine.translation.hl7.utils

import ca.uhn.hl7v2.model.Message
import ca.uhn.hl7v2.model.v251.message.ACK
import gov.cdc.prime.router.ActionLogger
import gov.cdc.prime.router.common.Environment
import gov.cdc.prime.router.fhirengine.utils.HL7Reader
import java.time.Clock
Expand All @@ -18,24 +17,7 @@ class HL7ACKUtils(
private val clock: Clock = Clock.systemUTC(),
) {

/**
* Parses out raw HL7 message and then checks if MSH.15 == "AL".
*
* @return HL7 ACK message body or null
*/
fun generateOutgoingACKMessageIfRequired(rawHL7: String): String? {
val maybeMessage = HL7Reader(ActionLogger())
.getMessages(rawHL7)
.firstOrNull()

return if (maybeMessage != null && HL7Reader.requiresAckMessageResponse(maybeMessage)) {
generateOutgoingACKMessage(maybeMessage)
} else {
null
}
}

private fun generateOutgoingACKMessage(incomingACKMessage: Message): String {
fun generateOutgoingACKMessage(incomingACKMessage: Message): String {
jalbinson marked this conversation as resolved.
Show resolved Hide resolved
val outgoingAck = ACK()

val ackMsh = outgoingAck.msh
Expand Down
6 changes: 2 additions & 4 deletions prime-router/src/main/kotlin/fhirengine/utils/HL7Reader.kt
Original file line number Diff line number Diff line change
Expand Up @@ -433,15 +433,13 @@ class HL7Reader(private val actionLogger: ActionLogger) : Logging {
}
}

fun requiresAckMessageResponse(message: Message): Boolean {
val acceptAcknowledgmentType = when (val structure = message[MSH_SEGMENT_NAME]) {
fun getAcceptAcknowledgmentType(message: Message): String? {
jalbinson marked this conversation as resolved.
Show resolved Hide resolved
return when (val structure = message[MSH_SEGMENT_NAME]) {
is NIST_MSH -> structure.msh15_AcceptAcknowledgmentType.value
is v27_MSH -> structure.msh15_AcceptAcknowledgmentType.value
is v251_MSH -> structure.msh15_AcceptAcknowledgmentType.value
else -> null
jalbinson marked this conversation as resolved.
Show resolved Hide resolved
}
// AL means Always
return acceptAcknowledgmentType == "AL"
}

fun getSendingApplication(message: Message): String? {
Expand Down
Loading
Loading