From 835675c27f347e6874a1107105108614729121ff Mon Sep 17 00:00:00 2001 From: Jack Wang Date: Thu, 5 Dec 2024 15:36:10 -0500 Subject: [PATCH 1/6] replace check for receive step with check for report_ids that do not appear as child in report_lineage --- .../src/main/kotlin/history/db/ReportGraph.kt | 23 +++++++++++-------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/prime-router/src/main/kotlin/history/db/ReportGraph.kt b/prime-router/src/main/kotlin/history/db/ReportGraph.kt index 103b9831eb3..8371cff4179 100644 --- a/prime-router/src/main/kotlin/history/db/ReportGraph.kt +++ b/prime-router/src/main/kotlin/history/db/ReportGraph.kt @@ -123,11 +123,10 @@ class ReportGraph( } /** - * Recursively goes up the report_linage table from any report until it reaches - * a report with an action type of "receive" (the root report) + * Recursively goes up the report_lineage table from any report until it reaches + * a report that does not appear in report_lineage as a child report (the root report) * - * This will return null if no report with action type "receive" is present or if - * the root is passed in + * This will return null if the root is passed in */ fun getRootReport(childReportId: UUID): ReportFile? { return db.transactReturning { txn -> @@ -174,17 +173,19 @@ class ReportGraph( .on(REPORT_FILE.REPORT_ID.eq(ItemGraphTable.ITEM_GRAPH.PARENT_REPORT_ID)) .join(ACTION) .on(ACTION.ACTION_ID.eq(REPORT_FILE.ACTION_ID)) - .where(ACTION.ACTION_NAME.eq(TaskAction.receive)) + .leftJoin(REPORT_LINEAGE) + .on(REPORT_FILE.REPORT_ID.eq(REPORT_LINEAGE.CHILD_REPORT_ID)) + .where(REPORT_LINEAGE.PARENT_REPORT_ID.isNull()) + .orderBy(REPORT_FILE.REPORT_ID.asc()) .fetchOneInto(Item::class.java) return rootItem } /** - * Recursively goes up the report_linage table from any report until it reaches - * all reports with an action type of "receive" (the root report) + * Recursively goes up the report_lineage table from any report until it reaches + * all reports that do not appear in report_lineage as a child report (the root report) * - * This will return null if no report with action type "receive" is present or if - * the root is passed in + * This will return null if the root is passed in * * If the passed in report ID has multiple root reports, they will all be returned */ @@ -478,7 +479,9 @@ class ReportGraph( .on(REPORT_FILE.REPORT_ID.eq(cte.field(0, UUID::class.java))) .join(ACTION) .on(ACTION.ACTION_ID.eq(REPORT_FILE.ACTION_ID)) - .where(ACTION.ACTION_NAME.eq(TaskAction.receive)) + .leftJoin(REPORT_LINEAGE) + .on(REPORT_FILE.REPORT_ID.eq(REPORT_LINEAGE.CHILD_REPORT_ID)) + .where(REPORT_LINEAGE.PARENT_REPORT_ID.isNull()) /** * Accepts a list of ids and walks down the report lineage graph From 540f50593ffa134109f20f1842592ff4b8373601 Mon Sep 17 00:00:00 2001 From: Jack Wang Date: Fri, 6 Dec 2024 15:39:55 -0500 Subject: [PATCH 2/6] remove unnecessary join to action table --- prime-router/src/main/kotlin/history/db/ReportGraph.kt | 4 ---- 1 file changed, 4 deletions(-) diff --git a/prime-router/src/main/kotlin/history/db/ReportGraph.kt b/prime-router/src/main/kotlin/history/db/ReportGraph.kt index 8371cff4179..fc08c2343f8 100644 --- a/prime-router/src/main/kotlin/history/db/ReportGraph.kt +++ b/prime-router/src/main/kotlin/history/db/ReportGraph.kt @@ -171,8 +171,6 @@ class ReportGraph( .from(cte) .join(REPORT_FILE) .on(REPORT_FILE.REPORT_ID.eq(ItemGraphTable.ITEM_GRAPH.PARENT_REPORT_ID)) - .join(ACTION) - .on(ACTION.ACTION_ID.eq(REPORT_FILE.ACTION_ID)) .leftJoin(REPORT_LINEAGE) .on(REPORT_FILE.REPORT_ID.eq(REPORT_LINEAGE.CHILD_REPORT_ID)) .where(REPORT_LINEAGE.PARENT_REPORT_ID.isNull()) @@ -477,8 +475,6 @@ class ReportGraph( .from(cte) .join(REPORT_FILE) .on(REPORT_FILE.REPORT_ID.eq(cte.field(0, UUID::class.java))) - .join(ACTION) - .on(ACTION.ACTION_ID.eq(REPORT_FILE.ACTION_ID)) .leftJoin(REPORT_LINEAGE) .on(REPORT_FILE.REPORT_ID.eq(REPORT_LINEAGE.CHILD_REPORT_ID)) .where(REPORT_LINEAGE.PARENT_REPORT_ID.isNull()) From 02079e692199d49210210654f95269f55c5e7820 Mon Sep 17 00:00:00 2001 From: Jack Wang Date: Mon, 9 Dec 2024 11:54:41 -0500 Subject: [PATCH 3/6] track payloadname for submissions from the submissions api --- .../src/main/kotlin/fhirengine/engine/FHIRConverter.kt | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/prime-router/src/main/kotlin/fhirengine/engine/FHIRConverter.kt b/prime-router/src/main/kotlin/fhirengine/engine/FHIRConverter.kt index e5cd8908412..0d573d55b7f 100644 --- a/prime-router/src/main/kotlin/fhirengine/engine/FHIRConverter.kt +++ b/prime-router/src/main/kotlin/fhirengine/engine/FHIRConverter.kt @@ -111,6 +111,7 @@ class FHIRConverter( companion object { private val clientIdHeader = "client_id" + private val payloadNameHeader = "payloadname" /** * Converts a [FhirConvertQueueMessage] into the input to the convert processing @@ -157,6 +158,7 @@ class FHIRConverter( val blobSubFolderName = message.blobSubFolderName val clientId = message.headers[clientIdHeader] + val payloadName = message.headers[payloadNameHeader] val sender = clientId?.takeIf { it.isNotBlank() }?.let { settings.findSender(it) } if (sender == null) { throw SubmissionSenderNotFound(clientId ?: "", reportId, blobUrl) @@ -178,7 +180,8 @@ class FHIRConverter( // is properly recorded in the report file table with the correct sender actionHistory.trackExternalInputReport( report, - BlobAccess.BlobInfo(format, blobUrl, blobDigest.toByteArray()) + BlobAccess.BlobInfo(format, blobUrl, blobDigest.toByteArray()), + payloadName ) actionHistory.trackActionSenderInfo(sender.fullName) From 95839ba1e0e9339809d361d03634d1a8a24bc441 Mon Sep 17 00:00:00 2001 From: Jack Wang Date: Mon, 9 Dec 2024 15:02:49 -0500 Subject: [PATCH 4/6] add tests for sendoriginal from convert step --- .../common/UniversalPipelineTestUtils.kt | 2 +- .../azure/FHIRTranslatorIntegrationTests.kt | 166 +++++++++++++++++- 2 files changed, 165 insertions(+), 3 deletions(-) diff --git a/prime-router/src/test/kotlin/common/UniversalPipelineTestUtils.kt b/prime-router/src/test/kotlin/common/UniversalPipelineTestUtils.kt index 9a3aaf1c8b8..b647a93f997 100644 --- a/prime-router/src/test/kotlin/common/UniversalPipelineTestUtils.kt +++ b/prime-router/src/test/kotlin/common/UniversalPipelineTestUtils.kt @@ -441,7 +441,7 @@ object UniversalPipelineTestUtils { fileName: String = "mr_fhir_face.fhir", ): Report { val blobUrl = BlobAccess.uploadBlob( - "${TaskAction.receive.literal}/$fileName", + "${previousAction.literal}/$fileName", reportContents.toByteArray(), getBlobContainerMetadata(azuriteContainer) ) diff --git a/prime-router/src/test/kotlin/fhirengine/azure/FHIRTranslatorIntegrationTests.kt b/prime-router/src/test/kotlin/fhirengine/azure/FHIRTranslatorIntegrationTests.kt index 99eada67991..7db6e6190bd 100644 --- a/prime-router/src/test/kotlin/fhirengine/azure/FHIRTranslatorIntegrationTests.kt +++ b/prime-router/src/test/kotlin/fhirengine/azure/FHIRTranslatorIntegrationTests.kt @@ -640,7 +640,7 @@ class FHIRTranslatorIntegrationTests : Logging { } @Test - fun `successfully translate HL7 for FHIR receiver when isSendOriginal is true`() { + fun `successfully translate HL7 for FHIR receiver when isSendOriginal is true from receive step`() { // set up val receiverSetupData = listOf( UniversalPipelineTestUtils.ReceiverSetupData( @@ -720,7 +720,7 @@ class FHIRTranslatorIntegrationTests : Logging { } @Test - fun `successfully translate for FHIR receiver when isSendOriginal is true`() { + fun `successfully translate for FHIR receiver when isSendOriginal is true from receive step`() { // set up val receiverSetupData = listOf( UniversalPipelineTestUtils.ReceiverSetupData( @@ -798,4 +798,166 @@ class FHIRTranslatorIntegrationTests : Logging { assertThat(translatedValue).isEqualTo(reportContents.toByteArray()) } } + + @Test + fun `successfully translate HL7 for FHIR receiver when isSendOriginal is true from convert step`() { + // set up + val receiverSetupData = listOf( + UniversalPipelineTestUtils.ReceiverSetupData( + "x", + jurisdictionalFilter = listOf("true"), + qualityFilter = listOf("true"), + routingFilter = listOf("true"), + conditionFilter = listOf("true"), + format = MimeFormat.FHIR + ) + ) + val receivers = UniversalPipelineTestUtils.createReceivers(receiverSetupData) + val org = UniversalPipelineTestUtils.createOrganizationWithReceivers(receivers) + val translator = createFHIRTranslator(azureEventService, org) + val reportContents = File(HL7_WITH_BIRTH_TIME).readText() + val receiveReport = UniversalPipelineTestUtils.createReport( + reportContents, + TaskAction.convert, + Event.EventAction.CONVERT, + azuriteContainer, + TaskAction.convert, + fileName = "originalhl7.hl7" + ) + val queueMessage = generateQueueMessage( + receiveReport, + reportContents, + UniversalPipelineTestUtils.hl7SenderWithSendOriginal, + "phd.x" + ) + val fhirFunctions = UniversalPipelineTestUtils.createFHIRFunctionsInstance() + + // execute + fhirFunctions.process(queueMessage, 1, translator, ActionHistory(TaskAction.translate)) + + // check that send queue was updated + verify(exactly = 1) { + QueueAccess.sendMessage(QueueMessage.elrSendQueueName, any()) + } + + // check action table + UniversalPipelineTestUtils.checkActionTable(listOf(TaskAction.convert, TaskAction.translate)) + + // verify task and report_file tables were updated correctly in the Translate function (new task and new + // record file created) + ReportStreamTestDatabaseContainer.testDatabaseAccess.transact { txn -> + val report = fetchChildReports(receiveReport, txn, 1).single() + assertThat(report.nextAction).isEqualTo(TaskAction.send) + assertThat(report.receivingOrg).isEqualTo("phd") + assertThat(report.receivingOrgSvc).isEqualTo("x") + assertThat(report.schemaName).isEqualTo("None") + assertThat(report.schemaTopic).isEqualTo(Topic.ELR_ELIMS) + assertThat(report.bodyFormat).isEqualTo("HL7") + + val batchTask = DSL.using(txn).select(Task.TASK.asterisk()).from(Task.TASK) + .where(Task.TASK.NEXT_ACTION.eq(TaskAction.batch)) + .fetchOneInto(Task.TASK) + // verify batch queue task does not exist + assertThat(batchTask).isNull() + + val sendTask = DSL.using(txn).select(Task.TASK.asterisk()).from(Task.TASK) + .where(Task.TASK.NEXT_ACTION.eq(TaskAction.send)) + .fetchOneInto(Task.TASK) + // verify send queue task exists + assertThat(sendTask).isNotNull() + assertThat(sendTask!!.reportId).isEqualTo(report.reportId) + + // verify message format is HL7 and is for the expected receiver + assertThat(sendTask.receiverName).isEqualTo("phd.x") + assertThat(sendTask.bodyFormat).isEqualTo("HL7") + + // verify message matches the original HL7 input + val translatedValue = BlobAccess.downloadBlobAsByteArray( + report.bodyUrl, + UniversalPipelineTestUtils.getBlobContainerMetadata(azuriteContainer) + ) + assertThat(translatedValue).isEqualTo(reportContents.toByteArray()) + } + } + + @Test + fun `successfully translate for FHIR receiver when isSendOriginal is true from convert step`() { + // set up + val receiverSetupData = listOf( + UniversalPipelineTestUtils.ReceiverSetupData( + "x", + jurisdictionalFilter = listOf("true"), + qualityFilter = listOf("true"), + routingFilter = listOf("true"), + conditionFilter = listOf("true"), + format = MimeFormat.FHIR + ) + ) + val receivers = UniversalPipelineTestUtils.createReceivers(receiverSetupData) + val org = UniversalPipelineTestUtils.createOrganizationWithReceivers(receivers) + val translator = createFHIRTranslator(azureEventService, org) + val reportContents = File(MULTIPLE_TARGETS_FHIR_PATH).readText() + val receiveReport = UniversalPipelineTestUtils.createReport( + reportContents, + TaskAction.convert, + Event.EventAction.CONVERT, + azuriteContainer, + TaskAction.convert + ) + + val queueMessage = generateQueueMessage( + receiveReport, + reportContents, + UniversalPipelineTestUtils.fhirSenderWithSendOriginal, + "phd.x" + ) + val fhirFunctions = UniversalPipelineTestUtils.createFHIRFunctionsInstance() + + // execute + fhirFunctions.process(queueMessage, 1, translator, ActionHistory(TaskAction.translate)) + + // check that send queue was updated + verify(exactly = 1) { + QueueAccess.sendMessage(QueueMessage.elrSendQueueName, any()) + } + + // check action table + UniversalPipelineTestUtils.checkActionTable(listOf(TaskAction.convert, TaskAction.translate)) + + // verify task and report_file tables were updated correctly in the Translate function (new task and new + // record file created) + ReportStreamTestDatabaseContainer.testDatabaseAccess.transact { txn -> + val report = fetchChildReports(receiveReport, txn, 1).single() + assertThat(report.nextAction).isEqualTo(TaskAction.send) + assertThat(report.receivingOrg).isEqualTo("phd") + assertThat(report.receivingOrgSvc).isEqualTo("x") + assertThat(report.schemaName).isEqualTo("None") + assertThat(report.schemaTopic).isEqualTo(Topic.ELR_ELIMS) + assertThat(report.bodyFormat).isEqualTo("FHIR") + + val batchTask = DSL.using(txn).select(Task.TASK.asterisk()).from(Task.TASK) + .where(Task.TASK.NEXT_ACTION.eq(TaskAction.batch)) + .fetchOneInto(Task.TASK) + // verify batch queue task does not exist + assertThat(batchTask).isNull() + + val sendTask = DSL.using(txn).select(Task.TASK.asterisk()).from(Task.TASK) + .where(Task.TASK.NEXT_ACTION.eq(TaskAction.send)) + .fetchOneInto(Task.TASK) + // verify send queue task exists + assertThat(sendTask).isNotNull() + assertThat(sendTask!!.reportId).isEqualTo(report.reportId) + + // verify message format is FHIR and is for the expected receiver + assertThat(sendTask.receiverName).isEqualTo("phd.x") + assertThat(sendTask.bodyFormat).isEqualTo("FHIR") + + // verify message matches the original FHIR input + val translatedValue = BlobAccess.downloadBlobAsByteArray( + report.bodyUrl, + UniversalPipelineTestUtils.getBlobContainerMetadata(azuriteContainer) + ) + assertThat(translatedValue).isEqualTo(reportContents.toByteArray()) + } + } } \ No newline at end of file From 2074833519f4944c6061931e8ffae24001539e14 Mon Sep 17 00:00:00 2001 From: Jack Wang Date: Tue, 10 Dec 2024 22:31:20 -0500 Subject: [PATCH 5/6] order by action_id for consistent result ordering --- prime-router/src/main/kotlin/history/db/ReportGraph.kt | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/prime-router/src/main/kotlin/history/db/ReportGraph.kt b/prime-router/src/main/kotlin/history/db/ReportGraph.kt index fc08c2343f8..0b44dcfe271 100644 --- a/prime-router/src/main/kotlin/history/db/ReportGraph.kt +++ b/prime-router/src/main/kotlin/history/db/ReportGraph.kt @@ -174,7 +174,7 @@ class ReportGraph( .leftJoin(REPORT_LINEAGE) .on(REPORT_FILE.REPORT_ID.eq(REPORT_LINEAGE.CHILD_REPORT_ID)) .where(REPORT_LINEAGE.PARENT_REPORT_ID.isNull()) - .orderBy(REPORT_FILE.REPORT_ID.asc()) + .orderBy(REPORT_FILE.ACTION_ID.asc()) .fetchOneInto(Item::class.java) return rootItem } @@ -478,6 +478,7 @@ class ReportGraph( .leftJoin(REPORT_LINEAGE) .on(REPORT_FILE.REPORT_ID.eq(REPORT_LINEAGE.CHILD_REPORT_ID)) .where(REPORT_LINEAGE.PARENT_REPORT_ID.isNull()) + .orderBy(REPORT_FILE.ACTION_ID.asc()) /** * Accepts a list of ids and walks down the report lineage graph From 5e8a3c33aa9565ba7c8a2957a072d4ff6644ee42 Mon Sep 17 00:00:00 2001 From: Jack Wang Date: Wed, 11 Dec 2024 12:26:50 -0500 Subject: [PATCH 6/6] rename variable for clarity --- .../azure/FHIRTranslatorIntegrationTests.kt | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/prime-router/src/test/kotlin/fhirengine/azure/FHIRTranslatorIntegrationTests.kt b/prime-router/src/test/kotlin/fhirengine/azure/FHIRTranslatorIntegrationTests.kt index 7db6e6190bd..d8a9adeb1dd 100644 --- a/prime-router/src/test/kotlin/fhirengine/azure/FHIRTranslatorIntegrationTests.kt +++ b/prime-router/src/test/kotlin/fhirengine/azure/FHIRTranslatorIntegrationTests.kt @@ -816,7 +816,7 @@ class FHIRTranslatorIntegrationTests : Logging { val org = UniversalPipelineTestUtils.createOrganizationWithReceivers(receivers) val translator = createFHIRTranslator(azureEventService, org) val reportContents = File(HL7_WITH_BIRTH_TIME).readText() - val receiveReport = UniversalPipelineTestUtils.createReport( + val convertReport = UniversalPipelineTestUtils.createReport( reportContents, TaskAction.convert, Event.EventAction.CONVERT, @@ -825,7 +825,7 @@ class FHIRTranslatorIntegrationTests : Logging { fileName = "originalhl7.hl7" ) val queueMessage = generateQueueMessage( - receiveReport, + convertReport, reportContents, UniversalPipelineTestUtils.hl7SenderWithSendOriginal, "phd.x" @@ -846,7 +846,7 @@ class FHIRTranslatorIntegrationTests : Logging { // verify task and report_file tables were updated correctly in the Translate function (new task and new // record file created) ReportStreamTestDatabaseContainer.testDatabaseAccess.transact { txn -> - val report = fetchChildReports(receiveReport, txn, 1).single() + val report = fetchChildReports(convertReport, txn, 1).single() assertThat(report.nextAction).isEqualTo(TaskAction.send) assertThat(report.receivingOrg).isEqualTo("phd") assertThat(report.receivingOrgSvc).isEqualTo("x") @@ -897,7 +897,7 @@ class FHIRTranslatorIntegrationTests : Logging { val org = UniversalPipelineTestUtils.createOrganizationWithReceivers(receivers) val translator = createFHIRTranslator(azureEventService, org) val reportContents = File(MULTIPLE_TARGETS_FHIR_PATH).readText() - val receiveReport = UniversalPipelineTestUtils.createReport( + val convertReport = UniversalPipelineTestUtils.createReport( reportContents, TaskAction.convert, Event.EventAction.CONVERT, @@ -906,7 +906,7 @@ class FHIRTranslatorIntegrationTests : Logging { ) val queueMessage = generateQueueMessage( - receiveReport, + convertReport, reportContents, UniversalPipelineTestUtils.fhirSenderWithSendOriginal, "phd.x" @@ -927,7 +927,7 @@ class FHIRTranslatorIntegrationTests : Logging { // verify task and report_file tables were updated correctly in the Translate function (new task and new // record file created) ReportStreamTestDatabaseContainer.testDatabaseAccess.transact { txn -> - val report = fetchChildReports(receiveReport, txn, 1).single() + val report = fetchChildReports(convertReport, txn, 1).single() assertThat(report.nextAction).isEqualTo(TaskAction.send) assertThat(report.receivingOrg).isEqualTo("phd") assertThat(report.receivingOrgSvc).isEqualTo("x")