Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

16409 do not use receive step to determine root report #16723

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ class FHIRConverter(
companion object {

private val clientIdHeader = "client_id"
private val payloadNameHeader = "payloadname"

/**
* Converts a [FhirConvertQueueMessage] into the input to the convert processing
Expand Down Expand Up @@ -157,6 +158,7 @@ class FHIRConverter(
val blobSubFolderName = message.blobSubFolderName

val clientId = message.headers[clientIdHeader]
val payloadName = message.headers[payloadNameHeader]
val sender = clientId?.takeIf { it.isNotBlank() }?.let { settings.findSender(it) }
if (sender == null) {
throw SubmissionSenderNotFound(clientId ?: "", reportId, blobUrl)
Expand All @@ -178,7 +180,8 @@ class FHIRConverter(
// is properly recorded in the report file table with the correct sender
actionHistory.trackExternalInputReport(
report,
BlobAccess.BlobInfo(format, blobUrl, blobDigest.toByteArray())
BlobAccess.BlobInfo(format, blobUrl, blobDigest.toByteArray()),
payloadName
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change to carry over payloadname from the submissions API is needed to continue supporting the sendOriginal function. This adds the information to the root report being created here.

)
actionHistory.trackActionSenderInfo(sender.fullName)

Expand Down
28 changes: 14 additions & 14 deletions prime-router/src/main/kotlin/history/db/ReportGraph.kt
Original file line number Diff line number Diff line change
Expand Up @@ -123,11 +123,10 @@ class ReportGraph(
}

/**
* Recursively goes up the report_linage table from any report until it reaches
* a report with an action type of "receive" (the root report)
* Recursively goes up the report_lineage table from any report until it reaches
* a report that does not appear in report_lineage as a child report (the root report)
*
* This will return null if no report with action type "receive" is present or if
* the root is passed in
* This will return null if the root is passed in
*/
fun getRootReport(childReportId: UUID): ReportFile? {
return db.transactReturning { txn ->
Expand Down Expand Up @@ -172,19 +171,19 @@ class ReportGraph(
.from(cte)
.join(REPORT_FILE)
.on(REPORT_FILE.REPORT_ID.eq(ItemGraphTable.ITEM_GRAPH.PARENT_REPORT_ID))
.join(ACTION)
.on(ACTION.ACTION_ID.eq(REPORT_FILE.ACTION_ID))
.where(ACTION.ACTION_NAME.eq(TaskAction.receive))
.leftJoin(REPORT_LINEAGE)
.on(REPORT_FILE.REPORT_ID.eq(REPORT_LINEAGE.CHILD_REPORT_ID))
.where(REPORT_LINEAGE.PARENT_REPORT_ID.isNull())
.orderBy(REPORT_FILE.ACTION_ID.asc())
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The previous query was to join the action table in order to check if the action_name is receive. The new query instead filters the list of reports to only include those that do not appear in report_lineage as a child. This list is then ordered by action_id to ensure the results are output in a consistent order. The join to the action table is removed since it is no longer necessary.

.fetchOneInto(Item::class.java)
return rootItem
}

/**
* Recursively goes up the report_linage table from any report until it reaches
* all reports with an action type of "receive" (the root report)
* Recursively goes up the report_lineage table from any report until it reaches
* all reports that do not appear in report_lineage as a child report (the root report)
*
* This will return null if no report with action type "receive" is present or if
* the root is passed in
* This will return null if the root is passed in
*
* If the passed in report ID has multiple root reports, they will all be returned
*/
Expand Down Expand Up @@ -476,9 +475,10 @@ class ReportGraph(
.from(cte)
.join(REPORT_FILE)
.on(REPORT_FILE.REPORT_ID.eq(cte.field(0, UUID::class.java)))
.join(ACTION)
.on(ACTION.ACTION_ID.eq(REPORT_FILE.ACTION_ID))
.where(ACTION.ACTION_NAME.eq(TaskAction.receive))
.leftJoin(REPORT_LINEAGE)
.on(REPORT_FILE.REPORT_ID.eq(REPORT_LINEAGE.CHILD_REPORT_ID))
.where(REPORT_LINEAGE.PARENT_REPORT_ID.isNull())
.orderBy(REPORT_FILE.ACTION_ID.asc())

/**
* Accepts a list of ids and walks down the report lineage graph
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,7 @@ object UniversalPipelineTestUtils {
fileName: String = "mr_fhir_face.fhir",
): Report {
val blobUrl = BlobAccess.uploadBlob(
"${TaskAction.receive.literal}/$fileName",
"${previousAction.literal}/$fileName",
reportContents.toByteArray(),
getBlobContainerMetadata(azuriteContainer)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -640,7 +640,7 @@ class FHIRTranslatorIntegrationTests : Logging {
}

@Test
fun `successfully translate HL7 for FHIR receiver when isSendOriginal is true`() {
fun `successfully translate HL7 for FHIR receiver when isSendOriginal is true from receive step`() {
// set up
val receiverSetupData = listOf(
UniversalPipelineTestUtils.ReceiverSetupData(
Expand Down Expand Up @@ -720,7 +720,7 @@ class FHIRTranslatorIntegrationTests : Logging {
}

@Test
fun `successfully translate for FHIR receiver when isSendOriginal is true`() {
fun `successfully translate for FHIR receiver when isSendOriginal is true from receive step`() {
// set up
val receiverSetupData = listOf(
UniversalPipelineTestUtils.ReceiverSetupData(
Expand Down Expand Up @@ -798,4 +798,166 @@ class FHIRTranslatorIntegrationTests : Logging {
assertThat(translatedValue).isEqualTo(reportContents.toByteArray())
}
}

@Test
fun `successfully translate HL7 for FHIR receiver when isSendOriginal is true from convert step`() {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These tests ensure sendOriginal items still function when convert is the root report, as the case would be when the submissions API is used.

// set up
val receiverSetupData = listOf(
UniversalPipelineTestUtils.ReceiverSetupData(
"x",
jurisdictionalFilter = listOf("true"),
qualityFilter = listOf("true"),
routingFilter = listOf("true"),
conditionFilter = listOf("true"),
format = MimeFormat.FHIR
)
)
val receivers = UniversalPipelineTestUtils.createReceivers(receiverSetupData)
val org = UniversalPipelineTestUtils.createOrganizationWithReceivers(receivers)
val translator = createFHIRTranslator(azureEventService, org)
val reportContents = File(HL7_WITH_BIRTH_TIME).readText()
val convertReport = UniversalPipelineTestUtils.createReport(
reportContents,
TaskAction.convert,
Event.EventAction.CONVERT,
azuriteContainer,
TaskAction.convert,
fileName = "originalhl7.hl7"
)
val queueMessage = generateQueueMessage(
convertReport,
reportContents,
UniversalPipelineTestUtils.hl7SenderWithSendOriginal,
"phd.x"
)
val fhirFunctions = UniversalPipelineTestUtils.createFHIRFunctionsInstance()

// execute
fhirFunctions.process(queueMessage, 1, translator, ActionHistory(TaskAction.translate))

// check that send queue was updated
verify(exactly = 1) {
QueueAccess.sendMessage(QueueMessage.elrSendQueueName, any())
}

// check action table
UniversalPipelineTestUtils.checkActionTable(listOf(TaskAction.convert, TaskAction.translate))

// verify task and report_file tables were updated correctly in the Translate function (new task and new
// record file created)
ReportStreamTestDatabaseContainer.testDatabaseAccess.transact { txn ->
val report = fetchChildReports(convertReport, txn, 1).single()
assertThat(report.nextAction).isEqualTo(TaskAction.send)
assertThat(report.receivingOrg).isEqualTo("phd")
assertThat(report.receivingOrgSvc).isEqualTo("x")
assertThat(report.schemaName).isEqualTo("None")
assertThat(report.schemaTopic).isEqualTo(Topic.ELR_ELIMS)
assertThat(report.bodyFormat).isEqualTo("HL7")

val batchTask = DSL.using(txn).select(Task.TASK.asterisk()).from(Task.TASK)
.where(Task.TASK.NEXT_ACTION.eq(TaskAction.batch))
.fetchOneInto(Task.TASK)
// verify batch queue task does not exist
assertThat(batchTask).isNull()

val sendTask = DSL.using(txn).select(Task.TASK.asterisk()).from(Task.TASK)
.where(Task.TASK.NEXT_ACTION.eq(TaskAction.send))
.fetchOneInto(Task.TASK)
// verify send queue task exists
assertThat(sendTask).isNotNull()
assertThat(sendTask!!.reportId).isEqualTo(report.reportId)

// verify message format is HL7 and is for the expected receiver
assertThat(sendTask.receiverName).isEqualTo("phd.x")
assertThat(sendTask.bodyFormat).isEqualTo("HL7")

// verify message matches the original HL7 input
val translatedValue = BlobAccess.downloadBlobAsByteArray(
report.bodyUrl,
UniversalPipelineTestUtils.getBlobContainerMetadata(azuriteContainer)
)
assertThat(translatedValue).isEqualTo(reportContents.toByteArray())
}
}

@Test
fun `successfully translate for FHIR receiver when isSendOriginal is true from convert step`() {
// set up
val receiverSetupData = listOf(
UniversalPipelineTestUtils.ReceiverSetupData(
"x",
jurisdictionalFilter = listOf("true"),
qualityFilter = listOf("true"),
routingFilter = listOf("true"),
conditionFilter = listOf("true"),
format = MimeFormat.FHIR
)
)
val receivers = UniversalPipelineTestUtils.createReceivers(receiverSetupData)
val org = UniversalPipelineTestUtils.createOrganizationWithReceivers(receivers)
val translator = createFHIRTranslator(azureEventService, org)
val reportContents = File(MULTIPLE_TARGETS_FHIR_PATH).readText()
val convertReport = UniversalPipelineTestUtils.createReport(
reportContents,
TaskAction.convert,
Event.EventAction.CONVERT,
azuriteContainer,
TaskAction.convert
)

val queueMessage = generateQueueMessage(
convertReport,
reportContents,
UniversalPipelineTestUtils.fhirSenderWithSendOriginal,
"phd.x"
)
val fhirFunctions = UniversalPipelineTestUtils.createFHIRFunctionsInstance()

// execute
fhirFunctions.process(queueMessage, 1, translator, ActionHistory(TaskAction.translate))

// check that send queue was updated
verify(exactly = 1) {
QueueAccess.sendMessage(QueueMessage.elrSendQueueName, any())
}

// check action table
UniversalPipelineTestUtils.checkActionTable(listOf(TaskAction.convert, TaskAction.translate))

// verify task and report_file tables were updated correctly in the Translate function (new task and new
// record file created)
ReportStreamTestDatabaseContainer.testDatabaseAccess.transact { txn ->
val report = fetchChildReports(convertReport, txn, 1).single()
assertThat(report.nextAction).isEqualTo(TaskAction.send)
assertThat(report.receivingOrg).isEqualTo("phd")
assertThat(report.receivingOrgSvc).isEqualTo("x")
assertThat(report.schemaName).isEqualTo("None")
assertThat(report.schemaTopic).isEqualTo(Topic.ELR_ELIMS)
assertThat(report.bodyFormat).isEqualTo("FHIR")

val batchTask = DSL.using(txn).select(Task.TASK.asterisk()).from(Task.TASK)
.where(Task.TASK.NEXT_ACTION.eq(TaskAction.batch))
.fetchOneInto(Task.TASK)
// verify batch queue task does not exist
assertThat(batchTask).isNull()

val sendTask = DSL.using(txn).select(Task.TASK.asterisk()).from(Task.TASK)
.where(Task.TASK.NEXT_ACTION.eq(TaskAction.send))
.fetchOneInto(Task.TASK)
// verify send queue task exists
assertThat(sendTask).isNotNull()
assertThat(sendTask!!.reportId).isEqualTo(report.reportId)

// verify message format is FHIR and is for the expected receiver
assertThat(sendTask.receiverName).isEqualTo("phd.x")
assertThat(sendTask.bodyFormat).isEqualTo("FHIR")

// verify message matches the original FHIR input
val translatedValue = BlobAccess.downloadBlobAsByteArray(
report.bodyUrl,
UniversalPipelineTestUtils.getBlobContainerMetadata(azuriteContainer)
)
assertThat(translatedValue).isEqualTo(reportContents.toByteArray())
}
}
}
Loading