Skip to content

Commit ad28996

Browse files
authored
Merge pull request #16550 from CDCgov/platform/bill/16144
Platform/bill/16144
2 parents 9258b5d + 97b21ad commit ad28996

File tree

6 files changed

+239
-12
lines changed

6 files changed

+239
-12
lines changed

prime-router/src/main/kotlin/azure/observability/event/ReportStreamEventData.kt

+4
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,9 @@ enum class ReportStreamEventProperties {
6767
BUNDLE_DIGEST,
6868
INGESTION_TYPE,
6969
POISON_QUEUE_MESSAGE_ID,
70+
ENRICHMENTS,
71+
ORIGINAL_FORMAT,
72+
TARGET_FORMAT,
7073
;
7174

7275
@JsonKey
@@ -92,6 +95,7 @@ enum class ReportStreamEventName {
9295
REPORT_NOT_PROCESSABLE,
9396
ITEM_SENT,
9497
PIPELINE_EXCEPTION,
98+
ITEM_TRANSFORMED,
9599
}
96100

97101
/**

prime-router/src/main/kotlin/fhirengine/engine/FHIREngine.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,7 @@ abstract class FHIREngine(
171171
blobAccess ?: BlobAccess(),
172172
azureEventService ?: AzureEventServiceImpl(),
173173
reportService ?: ReportService(),
174-
ReportStreamEventService(
174+
reportEventService ?: ReportStreamEventService(
175175
databaseAccess ?: databaseAccessSingleton,
176176
azureEventService ?: AzureEventServiceImpl(),
177177
reportService ?: ReportService()

prime-router/src/main/kotlin/fhirengine/engine/FHIRTranslator.kt

+41-6
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import ca.uhn.hl7v2.model.Segment
66
import ca.uhn.hl7v2.util.Terser
77
import fhirengine.engine.CustomFhirPathFunctions
88
import fhirengine.engine.CustomTranslationFunctions
9+
import gov.cdc.prime.reportstream.shared.BlobUtils
910
import gov.cdc.prime.reportstream.shared.QueueMessage
1011
import gov.cdc.prime.router.ActionLogger
1112
import gov.cdc.prime.router.CustomerStatus
@@ -21,16 +22,21 @@ import gov.cdc.prime.router.azure.DatabaseAccess
2122
import gov.cdc.prime.router.azure.Event
2223
import gov.cdc.prime.router.azure.db.Tables
2324
import gov.cdc.prime.router.azure.db.enums.TaskAction
25+
import gov.cdc.prime.router.azure.observability.bundleDigest.BundleDigestExtractor
26+
import gov.cdc.prime.router.azure.observability.bundleDigest.FhirPathBundleDigestLabResultExtractorStrategy
2427
import gov.cdc.prime.router.azure.observability.context.MDCUtils
2528
import gov.cdc.prime.router.azure.observability.context.withLoggingContext
2629
import gov.cdc.prime.router.azure.observability.event.AzureEventService
2730
import gov.cdc.prime.router.azure.observability.event.AzureEventServiceImpl
2831
import gov.cdc.prime.router.azure.observability.event.IReportStreamEventService
32+
import gov.cdc.prime.router.azure.observability.event.ReportStreamEventName
33+
import gov.cdc.prime.router.azure.observability.event.ReportStreamEventProperties
2934
import gov.cdc.prime.router.common.Environment
3035
import gov.cdc.prime.router.fhirengine.config.HL7TranslationConfig
3136
import gov.cdc.prime.router.fhirengine.translation.hl7.FhirToHl7Context
3237
import gov.cdc.prime.router.fhirengine.translation.hl7.FhirToHl7Converter
3338
import gov.cdc.prime.router.fhirengine.translation.hl7.FhirTransformer
39+
import gov.cdc.prime.router.fhirengine.translation.hl7.utils.CustomContext
3440
import gov.cdc.prime.router.fhirengine.translation.hl7.utils.HL7Utils.defaultHl7EncodingFiveChars
3541
import gov.cdc.prime.router.fhirengine.translation.hl7.utils.HL7Utils.defaultHl7EncodingFourChars
3642
import gov.cdc.prime.router.fhirengine.utils.FhirTranscoder
@@ -113,12 +119,13 @@ class FHIRTranslator(
113119
): FHIREngineRunResult {
114120
logger.trace("Preparing to send original message")
115121
val originalReport = reportService.getRootReport(message.reportId)
116-
val bodyBytes = BlobAccess.downloadBlobAsByteArray(originalReport.bodyUrl)
122+
val bodyAsString =
123+
BlobAccess.downloadBlob(originalReport.bodyUrl, BlobUtils.digestToString(originalReport.blobDigest))
117124

118125
// get a Report from the message
119126
val (report, event, blobInfo) = Report.generateReportAndUploadBlob(
120127
Event.EventAction.SEND,
121-
bodyBytes,
128+
bodyAsString.toByteArray(),
122129
listOf(message.reportId),
123130
receiver,
124131
this.metadata,
@@ -148,10 +155,9 @@ class FHIRTranslator(
148155
actionHistory: ActionHistory,
149156
): FHIREngineRunResult {
150157
logger.trace("Preparing to send translated message")
151-
val bodyBytes =
152-
getByteArrayFromBundle(
153-
receiver, FhirTranscoder.decode(BlobAccess.downloadBlob(message.blobURL, message.digest))
154-
)
158+
val originalReport = reportService.getRootReport(message.reportId)
159+
val bundle = FhirTranscoder.decode(BlobAccess.downloadBlob(message.blobURL, message.digest))
160+
val bodyBytes = getByteArrayFromBundle(receiver, bundle)
155161

156162
val (report, event, blobInfo) = Report.generateReportAndUploadBlob(
157163
Event.EventAction.BATCH,
@@ -163,6 +169,35 @@ class FHIRTranslator(
163169
topic = message.topic
164170
)
165171

172+
val bundleDigestExtractor = BundleDigestExtractor(
173+
FhirPathBundleDigestLabResultExtractorStrategy(
174+
CustomContext(
175+
bundle,
176+
bundle,
177+
mutableMapOf(),
178+
CustomFhirPathFunctions()
179+
)
180+
)
181+
)
182+
reportEventService.sendItemEvent(
183+
eventName = ReportStreamEventName.ITEM_TRANSFORMED,
184+
childReport = report,
185+
pipelineStepName = TaskAction.translate
186+
) {
187+
parentReportId(message.reportId)
188+
params(
189+
mapOf(
190+
ReportStreamEventProperties.RECEIVER_NAME to receiver.fullName,
191+
ReportStreamEventProperties.BUNDLE_DIGEST
192+
to bundleDigestExtractor.generateDigest(bundle),
193+
ReportStreamEventProperties.ORIGINAL_FORMAT to originalReport.bodyFormat,
194+
ReportStreamEventProperties.TARGET_FORMAT to receiver.translation.format.name,
195+
ReportStreamEventProperties.ENRICHMENTS to listOf(receiver.translation.schemaName)
196+
)
197+
)
198+
trackingId(bundle)
199+
}
200+
166201
return FHIREngineRunResult(
167202
event,
168203
report,

prime-router/src/test/kotlin/common/UniversalPipelineTestUtils.kt

+5-1
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package gov.cdc.prime.router.common
33
import assertk.assertThat
44
import assertk.assertions.hasSize
55
import assertk.assertions.isEqualTo
6+
import gov.cdc.prime.reportstream.shared.BlobUtils.sha256Digest
67
import gov.cdc.prime.router.ClientSource
78
import gov.cdc.prime.router.CustomerStatus
89
import gov.cdc.prime.router.DeepOrganization
@@ -452,7 +453,8 @@ object UniversalPipelineTestUtils {
452453
event,
453454
Topic.FULL_ELR,
454455
parentReport,
455-
blobUrl
456+
blobUrl,
457+
reportContents,
456458
)
457459
}
458460

@@ -464,6 +466,7 @@ object UniversalPipelineTestUtils {
464466
topic: Topic,
465467
parentReport: Report? = null,
466468
bodyURL: String? = null,
469+
reportContents: String,
467470
): Report {
468471
val report = Report(
469472
fileFormat,
@@ -493,6 +496,7 @@ object UniversalPipelineTestUtils {
493496
.setBodyUrl(report.bodyURL)
494497
.setSendingOrg(universalPipelineOrganization.name)
495498
.setSendingOrgClient("Test Sender")
499+
.setBlobDigest(sha256Digest(reportContents.toByteArray(Charsets.UTF_8)))
496500

497501
ReportStreamTestDatabaseContainer.testDatabaseAccess.insertReportFile(
498502
reportFile, txn, action

prime-router/src/test/kotlin/fhirengine/azure/FHIRTranslatorIntegrationTests.kt

+65
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
package gov.cdc.prime.router.fhirengine.azure
22

33
import assertk.assertThat
4+
import assertk.assertions.hasSize
45
import assertk.assertions.isEqualTo
6+
import assertk.assertions.isInstanceOf
57
import assertk.assertions.isNotEqualTo
68
import assertk.assertions.isNotNull
79
import assertk.assertions.isNull
@@ -22,7 +24,10 @@ import gov.cdc.prime.router.azure.db.enums.TaskAction
2224
import gov.cdc.prime.router.azure.db.tables.Task
2325
import gov.cdc.prime.router.azure.observability.event.AzureEventService
2426
import gov.cdc.prime.router.azure.observability.event.InMemoryAzureEventService
27+
import gov.cdc.prime.router.azure.observability.event.ReportStreamEventName
28+
import gov.cdc.prime.router.azure.observability.event.ReportStreamEventProperties
2529
import gov.cdc.prime.router.azure.observability.event.ReportStreamEventService
30+
import gov.cdc.prime.router.azure.observability.event.ReportStreamItemEvent
2631
import gov.cdc.prime.router.cli.tests.CompareData
2732
import gov.cdc.prime.router.common.TestcontainersUtils
2833
import gov.cdc.prime.router.common.UniversalPipelineTestUtils
@@ -191,6 +196,21 @@ class FHIRTranslatorIntegrationTests : Logging {
191196
QueueAccess.sendMessage(any(), any())
192197
}
193198

199+
// check events
200+
assertThat(azureEventService.reportStreamEvents[ReportStreamEventName.ITEM_TRANSFORMED]!!).hasSize(1)
201+
assertThat(
202+
azureEventService
203+
.reportStreamEvents[ReportStreamEventName.ITEM_TRANSFORMED]!!.first()
204+
).isInstanceOf<ReportStreamItemEvent>()
205+
val event = azureEventService
206+
.reportStreamEvents[ReportStreamEventName.ITEM_TRANSFORMED]!!.first() as ReportStreamItemEvent
207+
assertThat(event.params[ReportStreamEventProperties.ORIGINAL_FORMAT]).isEqualTo("FHIR")
208+
assertThat(event.params[ReportStreamEventProperties.TARGET_FORMAT]).isEqualTo("HL7")
209+
assertThat(event.params[ReportStreamEventProperties.RECEIVER_NAME]).isEqualTo("phd.x")
210+
val enrichments = event.params[ReportStreamEventProperties.ENRICHMENTS] as List<*>
211+
assertThat(enrichments).hasSize(1)
212+
assertThat(enrichments.first()).isEqualTo(receiverSetupData.first().schemaName)
213+
194214
// check action table
195215
UniversalPipelineTestUtils.checkActionTable(listOf(TaskAction.receive, TaskAction.translate))
196216

@@ -289,6 +309,21 @@ class FHIRTranslatorIntegrationTests : Logging {
289309
QueueAccess.sendMessage(any(), any())
290310
}
291311

312+
// check events
313+
assertThat(azureEventService.reportStreamEvents[ReportStreamEventName.ITEM_TRANSFORMED]!!).hasSize(1)
314+
assertThat(
315+
azureEventService
316+
.reportStreamEvents[ReportStreamEventName.ITEM_TRANSFORMED]!!.first()
317+
).isInstanceOf<ReportStreamItemEvent>()
318+
val event = azureEventService
319+
.reportStreamEvents[ReportStreamEventName.ITEM_TRANSFORMED]!!.first() as ReportStreamItemEvent
320+
assertThat(event.params[ReportStreamEventProperties.ORIGINAL_FORMAT]).isEqualTo("FHIR")
321+
assertThat(event.params[ReportStreamEventProperties.TARGET_FORMAT]).isEqualTo("HL7")
322+
assertThat(event.params[ReportStreamEventProperties.RECEIVER_NAME]).isEqualTo("phd.x")
323+
val enrichments = event.params[ReportStreamEventProperties.ENRICHMENTS] as List<*>
324+
assertThat(enrichments).hasSize(1)
325+
assertThat(enrichments.first()).isEqualTo(receiverSetupData.first().schemaName)
326+
292327
// check action table
293328
UniversalPipelineTestUtils.checkActionTable(listOf(TaskAction.receive, TaskAction.translate))
294329

@@ -364,6 +399,21 @@ class FHIRTranslatorIntegrationTests : Logging {
364399
QueueAccess.sendMessage(any(), any())
365400
}
366401

402+
// check events
403+
assertThat(azureEventService.reportStreamEvents[ReportStreamEventName.ITEM_TRANSFORMED]!!).hasSize(1)
404+
assertThat(
405+
azureEventService
406+
.reportStreamEvents[ReportStreamEventName.ITEM_TRANSFORMED]!!.first()
407+
).isInstanceOf<ReportStreamItemEvent>()
408+
val event = azureEventService
409+
.reportStreamEvents[ReportStreamEventName.ITEM_TRANSFORMED]!!.first() as ReportStreamItemEvent
410+
assertThat(event.params[ReportStreamEventProperties.ORIGINAL_FORMAT]).isEqualTo("FHIR")
411+
assertThat(event.params[ReportStreamEventProperties.TARGET_FORMAT]).isEqualTo("FHIR")
412+
assertThat(event.params[ReportStreamEventProperties.RECEIVER_NAME]).isEqualTo("phd.x")
413+
val enrichments = event.params[ReportStreamEventProperties.ENRICHMENTS] as List<*>
414+
assertThat(enrichments).hasSize(1)
415+
assertThat(enrichments.first()).isEqualTo(receiverSetupData.first().schemaName)
416+
367417
// check action table
368418
UniversalPipelineTestUtils.checkActionTable(listOf(TaskAction.receive, TaskAction.translate))
369419

@@ -449,6 +499,21 @@ class FHIRTranslatorIntegrationTests : Logging {
449499
QueueAccess.sendMessage(any(), any())
450500
}
451501

502+
// check events
503+
assertThat(azureEventService.reportStreamEvents[ReportStreamEventName.ITEM_TRANSFORMED]!!).hasSize(1)
504+
assertThat(
505+
azureEventService
506+
.reportStreamEvents[ReportStreamEventName.ITEM_TRANSFORMED]!!.first()
507+
).isInstanceOf<ReportStreamItemEvent>()
508+
val event = azureEventService
509+
.reportStreamEvents[ReportStreamEventName.ITEM_TRANSFORMED]!!.first() as ReportStreamItemEvent
510+
assertThat(event.params[ReportStreamEventProperties.ORIGINAL_FORMAT]).isEqualTo("FHIR")
511+
assertThat(event.params[ReportStreamEventProperties.TARGET_FORMAT]).isEqualTo("FHIR")
512+
assertThat(event.params[ReportStreamEventProperties.RECEIVER_NAME]).isEqualTo("phd.x")
513+
val enrichments = event.params[ReportStreamEventProperties.ENRICHMENTS] as List<*>
514+
assertThat(enrichments).hasSize(1)
515+
assertThat(enrichments.first()).isEqualTo(receiverSetupData.first().schemaName)
516+
452517
// check action table
453518
UniversalPipelineTestUtils.checkActionTable(listOf(TaskAction.receive, TaskAction.translate))
454519

0 commit comments

Comments
 (0)