Skip to content

Commit

Permalink
16394 add ack functionality (#16552)
Browse files Browse the repository at this point in the history
  • Loading branch information
jalbinson authored Dec 4, 2024
1 parent 62207ec commit 6c6b77b
Show file tree
Hide file tree
Showing 12 changed files with 575 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,9 @@
"HL7_BATCH"
]
},
"hl7AcknowledgementEnabled": {
"type": "boolean"
},
"name": {
"type": "string"
},
Expand Down
7 changes: 6 additions & 1 deletion prime-router/src/main/kotlin/Sender.kt
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import java.time.OffsetDateTime
* @property allowDuplicates if false a duplicate submission will be rejected
* @property senderType one of four broad sender categories
* @property primarySubmissionMethod Sender preference for submission - manual or automatic
* @property hl7AcknowledgementEnabled should we return an HL7 ACK response if MSH.15 == "AL"?
*/
@JsonIgnoreProperties(ignoreUnknown = true)
@JsonTypeInfo(
Expand Down Expand Up @@ -59,6 +60,7 @@ abstract class Sender(
val allowDuplicates: Boolean = true,
val senderType: SenderType? = null,
val primarySubmissionMethod: PrimarySubmissionMethod? = null,
val hl7AcknowledgementEnabled: Boolean = false,
override var version: Int? = null,
override var createdBy: String? = null,
override var createdAt: OffsetDateTime? = null,
Expand Down Expand Up @@ -194,6 +196,7 @@ class UniversalPipelineSender : Sender {
allowDuplicates: Boolean = true,
senderType: SenderType? = null,
primarySubmissionMethod: PrimarySubmissionMethod? = null,
hl7AcknowledgementEnabled: Boolean = false,
topic: Topic,
) : super(
topic,
Expand All @@ -205,7 +208,8 @@ class UniversalPipelineSender : Sender {
processingType,
allowDuplicates,
senderType,
primarySubmissionMethod
primarySubmissionMethod,
hl7AcknowledgementEnabled
)

constructor(copy: UniversalPipelineSender) : this(
Expand All @@ -214,6 +218,7 @@ class UniversalPipelineSender : Sender {
copy.format,
copy.customerStatus,
copy.schemaName,
hl7AcknowledgementEnabled = copy.hl7AcknowledgementEnabled,
topic = copy.topic,
)

Expand Down
9 changes: 9 additions & 0 deletions prime-router/src/main/kotlin/azure/HttpUtilities.kt
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ class HttpUtilities {
companion object : Logging {
const val jsonMediaType = "application/json"
const val fhirMediaType = "application/fhir+json"
const val hl7V2MediaType = "application/hl7-v2"
const val oldApi = "/api/reports"
const val watersApi = "/api/waters"
const val tokenApi = "/api/token"
Expand Down Expand Up @@ -434,6 +435,14 @@ class HttpUtilities {
return responseCode to response
}
}

/**
* Is status code 2xx?
*/
fun HttpStatus.isSuccessful(): Boolean {
val status = this.value()
return status in 200..299
}
}
}

Expand Down
31 changes: 12 additions & 19 deletions prime-router/src/main/kotlin/azure/ReportFunction.kt
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import com.fasterxml.jackson.databind.SerializationFeature
import com.fasterxml.jackson.databind.json.JsonMapper
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule
import com.github.ajalt.clikt.core.CliktError
import com.google.common.net.HttpHeaders
import com.microsoft.azure.functions.HttpMethod
import com.microsoft.azure.functions.HttpRequestMessage
import com.microsoft.azure.functions.HttpResponseMessage
Expand Down Expand Up @@ -36,11 +35,11 @@ import gov.cdc.prime.router.azure.observability.event.IReportStreamEventService
import gov.cdc.prime.router.azure.observability.event.ReportStreamEventName
import gov.cdc.prime.router.azure.observability.event.ReportStreamEventProperties
import gov.cdc.prime.router.azure.observability.event.ReportStreamEventService
import gov.cdc.prime.router.azure.service.SubmissionResponseBuilder
import gov.cdc.prime.router.cli.PIIRemovalCommands
import gov.cdc.prime.router.cli.ProcessFhirCommands
import gov.cdc.prime.router.common.AzureHttpUtils.getSenderIP
import gov.cdc.prime.router.common.Environment
import gov.cdc.prime.router.common.JacksonMapperUtilities
import gov.cdc.prime.router.fhirengine.utils.FhirTranscoder
import gov.cdc.prime.router.history.azure.SubmissionsFacade
import gov.cdc.prime.router.tokens.AuthenticatedClaims
Expand Down Expand Up @@ -68,13 +67,14 @@ class ReportFunction(
workflowEngine.azureEventService,
workflowEngine.reportService
),
private val submissionResponseBuilder: SubmissionResponseBuilder = SubmissionResponseBuilder(),
) : RequestFunction(workflowEngine),
Logging {

enum class IngestionMethod {
SFTP,
REST,
}
enum class IngestionMethod {
SFTP,
REST,
}

/**
* POST a report to the router
Expand Down Expand Up @@ -601,19 +601,12 @@ class ReportFunction(
SubmissionsFacade.instance.findDetailedSubmissionHistory(txn, null, actionHistory.action)
}

val response = request.createResponseBuilder(httpStatus)
.header(HttpHeaders.CONTENT_TYPE, "application/json")
.body(
JacksonMapperUtilities.allowUnknownsMapper
.writeValueAsString(submission)
)
.header(
HttpHeaders.LOCATION,
request.uri.resolve(
"/api/waters/report/${submission?.reportId}/history"
).toString()
)
.build()
val response = submissionResponseBuilder.buildResponse(
sender,
httpStatus,
request,
submission
)

// queue messages here after all task / action records are in
actionHistory.queueMessages(workflowEngine)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
package gov.cdc.prime.router.azure.service

import ca.uhn.hl7v2.model.Message
import com.google.common.net.HttpHeaders
import com.microsoft.azure.functions.HttpRequestMessage
import com.microsoft.azure.functions.HttpResponseMessage
import com.microsoft.azure.functions.HttpStatus
import gov.cdc.prime.router.ActionLogger
import gov.cdc.prime.router.Sender
import gov.cdc.prime.router.azure.HttpUtilities
import gov.cdc.prime.router.azure.HttpUtilities.Companion.isSuccessful
import gov.cdc.prime.router.common.JacksonMapperUtilities
import gov.cdc.prime.router.fhirengine.translation.hl7.utils.HL7ACKUtils
import gov.cdc.prime.router.fhirengine.utils.HL7Reader
import gov.cdc.prime.router.history.DetailedSubmissionHistory
import org.apache.logging.log4j.kotlin.Logging

/**
* Builder class to create either JSON or HL7 response types based on the contents of the
* submitted reports
*/
class SubmissionResponseBuilder(
private val hL7ACKUtils: HL7ACKUtils = HL7ACKUtils(),
) : Logging {

/**
* Builds a response to send to the client after submitting a report
*
* This will be an HL7 ACK response given the client has enabled it and requested it. It will otherwise
* default to our default JSON response
*/
fun buildResponse(
sender: Sender,
responseStatus: HttpStatus,
request: HttpRequestMessage<String?>,
submission: DetailedSubmissionHistory?,
): HttpResponseMessage {
// Azure handles all headers as lowercase
val contentType = request.headers[HttpHeaders.CONTENT_TYPE.lowercase()]
val requestBody = request.body
return when (val responseType = determineResponseType(sender, responseStatus, contentType, requestBody)) {
is HL7ResponseType -> {
logger.info("Returning ACK response")
val responseBody = hL7ACKUtils.generateOutgoingACKMessage(responseType.message)
request.createResponseBuilder(responseStatus)
.header(HttpHeaders.CONTENT_TYPE, HttpUtilities.hl7V2MediaType)
.body(responseBody)
.build()
}
is JsonResponseType -> {
val responseBody = JacksonMapperUtilities
.allowUnknownsMapper
.writeValueAsString(submission)
request
.createResponseBuilder(responseStatus)
.header(HttpHeaders.CONTENT_TYPE, HttpUtilities.jsonMediaType)
.body(responseBody)
.header(
HttpHeaders.LOCATION,
request.uri.resolve(
"/api/waters/report/${submission?.reportId}/history"
).toString()
)
.build()
}
}
}

/**
* Figures out in what format we should respond to a submission with
*
* @return SubmissionResponseType the response type defined in this file
*/
private fun determineResponseType(
sender: Sender,
responseStatus: HttpStatus,
contentType: String?,
requestBody: String?,
): SubmissionResponseType {
val maybeACKMessage = hl7SuccessResponseRequired(sender, responseStatus, contentType, requestBody)
return when {
maybeACKMessage != null -> HL7ResponseType(maybeACKMessage)
else -> JsonResponseType
}
}

/**
* This function will return true if the following conditions are met:
* - The sender has the "hl7AcknowledgementEnabled" field set to true
* - The HL7 message has been processed successfully
* - The submitted HL7 contains MSH.15 == "AL"
* - The submitted HL7 is not a batch message
*
* @return HL7 message if ACK required or null otherwise
*/
private fun hl7SuccessResponseRequired(
sender: Sender,
responseStatus: HttpStatus,
contentType: String?,
requestBody: String?,
): Message? {
val acceptAcknowledgmentTypeRespondValues = setOf("AL") // AL means "Always"
return if (
sender.hl7AcknowledgementEnabled &&
responseStatus.isSuccessful() &&
contentType == HttpUtilities.hl7V2MediaType &&
requestBody != null
) {
val hl7Reader = HL7Reader(ActionLogger())
val messages = hl7Reader.getMessages(requestBody)
val isBatch = hl7Reader.isBatch(requestBody, messages.size)

if (!isBatch && messages.size == 1) {
val message = messages.first()
val acceptAcknowledgementType = HL7Reader.getAcceptAcknowledgmentType(message)
val ackResponseRequired = acceptAcknowledgmentTypeRespondValues.contains(acceptAcknowledgementType)
if (ackResponseRequired) {
message
} else {
null
}
} else {
null
}
} else {
null
}
}
}

/**
* Rather than an enum, we have used a hierarchy that allows an HL7 response type to hold onto the already
* parsed message during our check of MSH.15 to avoid doing that work twice.
*/
private sealed interface SubmissionResponseType
private data class HL7ResponseType(val message: Message) : SubmissionResponseType
private data object JsonResponseType : SubmissionResponseType
10 changes: 10 additions & 0 deletions prime-router/src/main/kotlin/common/Environment.kt
Original file line number Diff line number Diff line change
Expand Up @@ -103,12 +103,22 @@ enum class Environment(

/**
* Checks if the current environment is the local environment.
*
* @return true if local environment, false otherwise
*/
fun isLocal(): Boolean {
return get() == LOCAL
}

/**
* Checks if the current environment is the production environment.
*
* @return true if production environment, false otherwise
*/
fun isProd(): Boolean {
return get() == PROD
}

/**
* Time zone to use for ReportStream. Note that Azure runs on UTC, so this forces our local runs to also be UTC.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package gov.cdc.prime.router.fhirengine.translation.hl7.utils

import ca.uhn.hl7v2.model.Message
import ca.uhn.hl7v2.model.v251.message.ACK
import gov.cdc.prime.router.common.Environment
import gov.cdc.prime.router.fhirengine.utils.HL7Reader
import java.time.Clock
import java.util.Calendar
import java.util.Date
import java.util.TimeZone
import java.util.UUID

/**
* Helper class to generate HL7 ACK response
*/
class HL7ACKUtils(
private val clock: Clock = Clock.systemUTC(),
) {

/**
* Creates the output ACK message according to the spec defined in #16394
*
* It will read an incoming message and copy some values over to their required locations
*
* It will always output HL7 2.5.1 regardless of the version of the incoming HL7 message
*/
fun generateOutgoingACKMessage(incomingACKMessage: Message): String {
val outgoingAck = ACK()

val ackMsh = outgoingAck.msh
ackMsh.msh1_FieldSeparator.value = "|"
ackMsh.msh2_EncodingCharacters.value = "^~\\&"
ackMsh.msh3_SendingApplication.parse("ReportStream")
ackMsh.msh4_SendingFacility.parse("CDC")
ackMsh.msh5_ReceivingApplication.parse(HL7Reader.getSendingApplication(incomingACKMessage))
ackMsh.msh6_ReceivingFacility.parse(HL7Reader.getSendingFacility(incomingACKMessage))
ackMsh.msh7_DateTimeOfMessage.time.setValue(getTimestamp())
ackMsh.msh9_MessageType.parse("ACK")
ackMsh.msh10_MessageControlID.parse(UUID.randomUUID().toString())
ackMsh.msh11_ProcessingID.parse(if (Environment.isProd()) "P" else "T")
ackMsh.msh12_VersionID.versionID.parse("2.5.1")
ackMsh.msh15_AcceptAcknowledgmentType.parse("NE")
ackMsh.msh16_ApplicationAcknowledgmentType.parse("NE")

val ackMsa = outgoingAck.msa
ackMsa.msa1_AcknowledgmentCode.parse("CA")
ackMsa.msa2_MessageControlID.parse(HL7Reader.getMessageControlId(incomingACKMessage))

return outgoingAck.toString()
}

/**
* HL7 library requires old Java date libraries, so we do the conversion here.
*
* We must directly specify the UTC timezone or else the HL7 library will use
* your machines local timezone.
*/
private fun getTimestamp(): Calendar {
val instant = clock.instant()
val date = Date.from(instant)

val calendar = Calendar.getInstance()
calendar.time = date
calendar.timeZone = TimeZone.getTimeZone("UTC")

return calendar
}
}
Loading

0 comments on commit 6c6b77b

Please sign in to comment.