Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

16394 add ack functionality #16552

Merged
merged 22 commits into from
Dec 4, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
eab9461
16394 add ack functionality
jalbinson Nov 15, 2024
5deaf7c
tweaks
jalbinson Nov 15, 2024
d524858
better handle segments
jalbinson Nov 15, 2024
77f3aed
check diff
jalbinson Nov 15, 2024
a91a0f7
timezone fix
jalbinson Nov 15, 2024
f99f5f8
PR feedback
jalbinson Nov 21, 2024
23f2ba1
Merge branch 'main' into platform/jamie/16394-ack
jalbinson Nov 21, 2024
9c4b060
cleanup and comments
jalbinson Nov 22, 2024
dde2274
Merge branch 'platform/jamie/16394-ack' of github.com:CDCgov/prime-re…
jalbinson Nov 22, 2024
0ec2e47
Merge branch 'main' into platform/jamie/16394-ack
jalbinson Nov 25, 2024
1f31755
Merge branch 'main' into platform/jamie/16394-ack
jalbinson Nov 27, 2024
1055b00
Merge branch 'main' into platform/jamie/16394-ack
jalbinson Dec 3, 2024
d656e8d
arnej feedback
jalbinson Dec 3, 2024
1cc9c55
Merge branch 'platform/jamie/16394-ack' of github.com:CDCgov/prime-re…
jalbinson Dec 3, 2024
1c060e9
Merge branch 'main' into platform/jamie/16394-ack
jalbinson Dec 3, 2024
9da6790
Merge branch 'main' into platform/jamie/16394-ack
jalbinson Dec 3, 2024
f45eb3e
more arnej feedback
jalbinson Dec 3, 2024
e922498
Merge branch 'platform/jamie/16394-ack' of github.com:CDCgov/prime-re…
jalbinson Dec 3, 2024
813dc4c
Merge branch 'main' into platform/jamie/16394-ack
jalbinson Dec 3, 2024
1333e1f
Merge branch 'main' into platform/jamie/16394-ack
jalbinson Dec 3, 2024
f205823
Merge branch 'main' into platform/jamie/16394-ack
jalbinson Dec 4, 2024
5ea93e2
Merge branch 'main' into platform/jamie/16394-ack
jalbinson Dec 4, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions prime-router/src/main/kotlin/azure/ReportFunction.kt
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import com.microsoft.azure.functions.annotation.StorageAccount
import gov.cdc.prime.router.ActionError
import gov.cdc.prime.router.ActionLog
import gov.cdc.prime.router.ActionLogLevel
import gov.cdc.prime.router.ActionLogger
import gov.cdc.prime.router.CustomerStatus
import gov.cdc.prime.router.InvalidParamMessage
import gov.cdc.prime.router.InvalidReportMessage
Expand All @@ -41,7 +42,9 @@ import gov.cdc.prime.router.cli.ProcessFhirCommands
import gov.cdc.prime.router.common.AzureHttpUtils.getSenderIP
import gov.cdc.prime.router.common.Environment
import gov.cdc.prime.router.common.JacksonMapperUtilities
import gov.cdc.prime.router.fhirengine.translation.hl7.utils.HL7ACKUtils
import gov.cdc.prime.router.fhirengine.utils.FhirTranscoder
import gov.cdc.prime.router.fhirengine.utils.HL7Reader
import gov.cdc.prime.router.history.azure.SubmissionsFacade
import gov.cdc.prime.router.tokens.AuthenticatedClaims
import gov.cdc.prime.router.tokens.Scope
Expand All @@ -68,6 +71,7 @@ class ReportFunction(
workflowEngine.azureEventService,
workflowEngine.reportService
),
private val hl7ACKUtils: HL7ACKUtils = HL7ACKUtils(),
) : RequestFunction(workflowEngine),
Logging {

Expand Down Expand Up @@ -495,6 +499,12 @@ class ReportFunction(
request: HttpRequestMessage<String?>,
sender: Sender,
): HttpResponseMessage {
// check for ACK request
val maybeACKResponse = handleAckRequest(request)
if (maybeACKResponse != null) {
return maybeACKResponse
jalbinson marked this conversation as resolved.
Show resolved Hide resolved
}

// determine if we should be following the sync or async workflow
val isAsync = processingType(request, sender) == ProcessingType.async
// allow duplicates 'override' param
Expand Down Expand Up @@ -633,4 +643,36 @@ class ReportFunction(
}
}
}

private fun handleAckRequest(request: HttpRequestMessage<String?>): HttpResponseMessage? {
// why does Azure handle Headers case-sensitive???
jalbinson marked this conversation as resolved.
Show resolved Hide resolved
val contentType = request.headers[HttpHeaders.CONTENT_TYPE.lowercase()]
val requestBody = request.body
return if (contentType == "application/hl7-v2" && requestBody != null) {
jalbinson marked this conversation as resolved.
Show resolved Hide resolved
try {
// Parse HL7 message
val maybeMessage = HL7Reader(ActionLogger())
.getMessages(requestBody)
.firstOrNull()

// Is the message an ACK?
if (maybeMessage != null && HL7Reader.isAckMessage(maybeMessage)) {
logger.info("Creating HL7 ACK response")
request.createResponseBuilder(HttpStatus.OK)
.header(HttpHeaders.CONTENT_TYPE, "application/hl7-v2")
.body(hl7ACKUtils.generateOutgoingACKMessage(maybeMessage))
.build()
} else {
logger.trace("Not an HL7 ACK message. Continuing.")
null
}
} catch (ex: Exception) {
logger.warn("Error checking for HL7 ACK. Continuing normal pipeline execution.", ex)
null
}
} else {
logger.trace("Not an HL7 ACK message. Continuing.")
null
}
}
}
4 changes: 4 additions & 0 deletions prime-router/src/main/kotlin/common/Environment.kt
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,10 @@ enum class Environment(
return get() == LOCAL
}

fun isProd(): Boolean {
return get() == PROD
}

/**
* Time zone to use for ReportStream. Note that Azure runs on UTC, so this forces our local runs to also be UTC.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package gov.cdc.prime.router.fhirengine.translation.hl7.utils

import ca.uhn.hl7v2.model.Message
import ca.uhn.hl7v2.model.v251.message.ACK
import gov.cdc.prime.router.common.Environment
import gov.cdc.prime.router.fhirengine.utils.HL7Reader
import java.time.Clock
import java.util.Calendar
import java.util.Date
import java.util.TimeZone
import java.util.UUID

/**
* Helper class to generate HL7 ACK response
*/
class HL7ACKUtils(
private val clock: Clock = Clock.systemUTC(),
) {

fun generateOutgoingACKMessage(incomingACKMessage: Message): String {
jalbinson marked this conversation as resolved.
Show resolved Hide resolved
val outgoingAck = ACK()

val ackMsh = outgoingAck.msh
ackMsh.msh1_FieldSeparator.value = "|"
ackMsh.msh2_EncodingCharacters.value = "^~\\&"
ackMsh.msh3_SendingApplication.parse("ReportStream")
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I took these values directly from the Spec document so just double check that these are exactly what we want.

ackMsh.msh4_SendingFacility.parse("CDC")
ackMsh.msh5_ReceivingApplication.parse(HL7Reader.getSendingApplication(incomingACKMessage))
ackMsh.msh6_ReceivingFacility.parse(HL7Reader.getSendingFacility(incomingACKMessage))
ackMsh.msh7_DateTimeOfMessage.time.setValue(getTimestamp())
ackMsh.msh9_MessageType.parse("ACK")
ackMsh.msh10_MessageControlID.parse(UUID.randomUUID().toString())
ackMsh.msh11_ProcessingID.parse(if (Environment.isProd()) "P" else "T")
ackMsh.msh12_VersionID.versionID.parse("2.5.1")
ackMsh.msh15_AcceptAcknowledgmentType.parse("NE")
ackMsh.msh16_ApplicationAcknowledgmentType.parse("NE")

val ackMsa = outgoingAck.msa
ackMsa.msa1_AcknowledgmentCode.parse("CA")
ackMsa.msa2_MessageControlID.parse(HL7Reader.getMessageControlId(incomingACKMessage))

return outgoingAck.toString()
}

/**
* HL7 library requires old Java date libraries, so we do the conversion here.
*
* We must directly specify the UTC timezone or else the HL7 library will use
* your machines local timezone.
*/
private fun getTimestamp(): Calendar {
val instant = clock.instant()
val date = Date.from(instant)

val calendar = Calendar.getInstance()
calendar.time = date
calendar.timeZone = TimeZone.getTimeZone("UTC")

return calendar
}
}
37 changes: 37 additions & 0 deletions prime-router/src/main/kotlin/fhirengine/utils/HL7Reader.kt
Original file line number Diff line number Diff line change
Expand Up @@ -424,5 +424,42 @@ class HL7Reader(private val actionLogger: ActionLogger) : Logging {
else -> null
}
}

fun isAckMessage(message: Message): Boolean {
val acceptAcknowledgmentType = when (val structure = message[MSH_SEGMENT_NAME]) {
is NIST_MSH -> structure.msh15_AcceptAcknowledgmentType.value
is v27_MSH -> structure.msh15_AcceptAcknowledgmentType.value
is v251_MSH -> structure.msh15_AcceptAcknowledgmentType.value
else -> null
jalbinson marked this conversation as resolved.
Show resolved Hide resolved
}
return acceptAcknowledgmentType == "AL"
jalbinson marked this conversation as resolved.
Show resolved Hide resolved
}

fun getSendingApplication(message: Message): String? {
return when (val structure = message[MSH_SEGMENT_NAME]) {
is NIST_MSH -> structure.msh3_SendingApplication.encode()
is v27_MSH -> structure.msh3_SendingApplication.encode()
is v251_MSH -> structure.msh3_SendingApplication.encode()
else -> null
}
}

fun getSendingFacility(message: Message): String? {
return when (val structure = message[MSH_SEGMENT_NAME]) {
is NIST_MSH -> structure.msh4_SendingFacility.encode()
is v27_MSH -> structure.msh4_SendingFacility.encode()
is v251_MSH -> structure.msh4_SendingFacility.encode()
else -> null
}
}

fun getMessageControlId(message: Message): String? {
return when (val structure = message[MSH_SEGMENT_NAME]) {
is NIST_MSH -> structure.msh10_MessageControlID.encode()
is v27_MSH -> structure.msh10_MessageControlID.encode()
is v251_MSH -> structure.msh10_MessageControlID.encode()
else -> null
}
}
}
}
29 changes: 29 additions & 0 deletions prime-router/src/test/kotlin/azure/ReportFunctionTests.kt
Original file line number Diff line number Diff line change
Expand Up @@ -1201,4 +1201,33 @@ class ReportFunctionTests {
)
assert(receiverReturned!!.name == receiver.name)
}

@Test
fun `return ack if requested`() {
jalbinson marked this conversation as resolved.
Show resolved Hide resolved
val metadata = UnitTestUtils.simpleMetadata
val settings = FileSettings().loadOrganizations(oneOrganization)
val sender = CovidSender("Test Sender", "test", MimeFormat.CSV, schemaName = "one")

val engine = makeEngine(metadata, settings)
val actionHistory = spyk(ActionHistory(TaskAction.receive))
val reportFunc = ReportFunction(engine, actionHistory)

every { engine.settings.findSender("Test Sender") } returns sender

val body = """
MSH|^~\&|Epic|Hospital|LIMS|StatePHL|20241003000000||ORM^O01^ORM_O01|4AFA57FE-D41D-4631-9500-286AAAF797E4|T|2.5.1|||AL|NE
""".trimIndent()

val req = MockHttpRequestMessage(body)
req.httpHeaders += mapOf(
"client" to "Test Sender",
"content-length" to body.length.toString(),
"content-type" to "application/hl7-v2"
)

val response = reportFunc.run(req)

assertThat(response.status.value()).isEqualTo(200)
assertThat(response.getHeader("Content-Type")).isEqualTo("application/hl7-v2")
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package gov.cdc.prime.router.fhirengine.translation.hl7.utils

import assertk.assertThat
import assertk.assertions.isEqualTo
import gov.cdc.prime.router.ActionLogger
import gov.cdc.prime.router.cli.helpers.HL7DiffHelper
import gov.cdc.prime.router.fhirengine.utils.HL7Reader
import io.mockk.every
import io.mockk.mockkStatic
import io.mockk.unmockkAll
import org.junit.jupiter.api.AfterEach
import java.time.Clock
import java.time.LocalDateTime
import java.time.ZoneId
import java.time.ZoneOffset
import java.util.UUID
import kotlin.test.Test

class HL7ACKUtilsTest {

inner class Fixture {
val hl7Reader = HL7Reader(ActionLogger())
val hl7DiffHelper = HL7DiffHelper()

private val clock = Clock.fixed(
LocalDateTime.of(2024, 9, 21, 0, 0).toInstant(ZoneOffset.UTC),
ZoneId.of("UTC")
)
val utils = HL7ACKUtils(clock)
}

@AfterEach
fun cleanUp() {
unmockkAll()
}

@Test
fun `generates ACK response`() {
val f = Fixture()

val id = UUID.randomUUID()
mockkStatic(UUID::class)
every { UUID.randomUUID() } returns id

val incomingAckMessage = """
MSH|^~\&|Epic|Hospital|LIMS|StatePHL|20241003000000||ORM^O01^ORM_O01|4AFA57FE-D41D-4631-9500-286AAAF797E4|T|2.5.1|||AL|NE
""".trimIndent()
val message = f.hl7Reader.getMessages(incomingAckMessage).first()

val ack = f.utils.generateOutgoingACKMessage(message)

val expected = f.hl7Reader.getMessages(
"""
MSH|^~\&|ReportStream|CDC|Epic|Hospital|20240921000000+0000||ACK|$id|T|2.5.1|||NE|NE
MSA|CA|4AFA57FE-D41D-4631-9500-286AAAF797E4
"""
).first()
val actual = f.hl7Reader.getMessages(ack).first()

val diffs = f.hl7DiffHelper.diffHl7(expected, actual)
if (diffs.isNotEmpty()) {
println(diffs)
}
assertThat(diffs.size).isEqualTo(0)
}
}
23 changes: 23 additions & 0 deletions prime-router/src/test/kotlin/fhirengine/utils/HL7ReaderTests.kt
Original file line number Diff line number Diff line change
Expand Up @@ -469,4 +469,27 @@ OBX|1|test|94558-4^SARS-CoV-2 (COVID-19) Ag [Presence] in Respiratory specimen b
val obxSegment = Terser(messages[0]).getSegment("/PATIENT_RESULT/ORDER_OBSERVATION/OBSERVATION/OBX") as OBX
assertThat(obxSegment.getObservationValue(0).data).isInstanceOf(CWE::class)
}

@Test
fun `extract MSH segment values`() {
val actionLogger = ActionLogger()
val hL7Reader = HL7Reader(actionLogger)

@Suppress("ktlint:standard:max-line-length")
val rawMessage = "MSH|^~\\&|Epic|Hospital|LIMS|StatePHL|20241003000000||ORM^O01^ORM_O01|4AFA57FE-D41D-4631-9500-286AAAF797E4|T|2.5.1|||AL|NE"
val message = hL7Reader.getMessages(rawMessage).first()

assertThat(
HL7Reader.getSendingApplication(message)
).isEqualTo("Epic")
assertThat(
HL7Reader.getSendingFacility(message),
).isEqualTo("Hospital")
assertThat(
HL7Reader.getMessageControlId(message)
).isEqualTo("4AFA57FE-D41D-4631-9500-286AAAF797E4")
assertThat(
HL7Reader.isAckMessage(message)
).isTrue()
}
}