Skip to content

Commit b0451fc

Browse files
authored
16140: manually populate poison queue when a step encounters an error (#16258)
* 16140: manually populate poison queue when a step encounters an error * fixup! 16140: manually populate poison queue when a step encounters an error
1 parent d3c6ac4 commit b0451fc

17 files changed

+183
-23
lines changed

prime-router/src/main/kotlin/azure/QueueAccess.kt

+3-2
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ object QueueAccess {
5656
queueName: String,
5757
message: String,
5858
invisibleDuration: Duration = Duration.ZERO,
59-
) {
59+
): String {
6060
// Bug: event.at is calculated before the call to workflowengine.recordHistory
6161
// In cases of very large datasets, that db write can take a very long time, pushing
6262
// the current time past event.at. This causes negative durations. Hence this:
@@ -66,13 +66,14 @@ object QueueAccess {
6666
invisibleDuration
6767
}
6868
val timeToLive = invisibleDuration.plusDays(timeToLiveDays)
69-
createQueueClient(queueName).sendMessageWithResponse(
69+
val response = createQueueClient(queueName).sendMessageWithResponse(
7070
message,
7171
duration,
7272
timeToLive,
7373
null,
7474
null
7575
)
76+
return response.value.messageId
7677
}
7778

7879
fun receiveMessage(queueName: String): Event {

prime-router/src/main/kotlin/azure/observability/event/ReportStreamEventData.kt

+2
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ enum class ReportStreamEventProperties {
6666
SENDER_NAME,
6767
BUNDLE_DIGEST,
6868
INGESTION_TYPE,
69+
POISON_QUEUE_MESSAGE_ID,
6970
;
7071

7172
@JsonKey
@@ -90,6 +91,7 @@ enum class ReportStreamEventName {
9091
REPORT_LAST_MILE_FAILURE,
9192
REPORT_NOT_PROCESSABLE,
9293
ITEM_SENT,
94+
PIPELINE_EXCEPTION,
9395
}
9496

9597
/**

prime-router/src/main/kotlin/fhirengine/azure/FHIRFunctions.kt

+32-6
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ import gov.cdc.prime.router.azure.DatabaseAccess
1212
import gov.cdc.prime.router.azure.QueueAccess
1313
import gov.cdc.prime.router.azure.WorkflowEngine
1414
import gov.cdc.prime.router.azure.db.enums.TaskAction
15+
import gov.cdc.prime.router.azure.observability.event.ReportStreamEventName
16+
import gov.cdc.prime.router.azure.observability.event.ReportStreamEventProperties
1517
import gov.cdc.prime.router.common.BaseEngine
1618
import gov.cdc.prime.router.fhirengine.engine.FHIRConverter
1719
import gov.cdc.prime.router.fhirengine.engine.FHIRDestinationFilter
@@ -24,6 +26,7 @@ import gov.cdc.prime.router.fhirengine.engine.PrimeRouterQueueMessage
2426
import gov.cdc.prime.router.fhirengine.engine.ReportPipelineMessage
2527
import org.apache.commons.lang3.StringUtils
2628
import org.apache.logging.log4j.kotlin.Logging
29+
import org.jooq.exception.DataAccessException
2730

2831
class FHIRFunctions(
2932
private val workflowEngine: WorkflowEngine = WorkflowEngine(),
@@ -140,13 +143,36 @@ class FHIRFunctions(
140143
): List<QueueMessage> {
141144
val messageContent = readMessage(fhirEngine.engineType, message, dequeueCount)
142145

143-
val newMessages = databaseAccess.transactReturning { txn ->
144-
val results = fhirEngine.run(messageContent, actionLogger, actionHistory, txn)
145-
recordResults(message, actionHistory, txn)
146-
results
147-
}
146+
try {
147+
val newMessages = databaseAccess.transactReturning { txn ->
148+
val results = fhirEngine.run(messageContent, actionLogger, actionHistory, txn)
149+
recordResults(message, actionHistory, txn)
150+
results
151+
}
148152

149-
return newMessages
153+
return newMessages
154+
} catch (ex: DataAccessException) {
155+
// This is the one exception type that we currently will allow for retrying as there are occasional
156+
// DB connectivity issues that are resolved without intervention
157+
logger.error(ex)
158+
throw ex
159+
} catch (ex: Exception) {
160+
// We're catching anything else that occurs because the most likely cause is a code or configuration error
161+
// that will not be resolved if the message is automatically retried
162+
// Instead, the error is recorded as an event and message is manually inserted into the poison queue
163+
val report = databaseAccess.fetchReportFile(messageContent.reportId)
164+
val poisonQueueMessageId = queueAccess.sendMessage("${messageContent.messageQueueName}-poison", message)
165+
fhirEngine.reportEventService.sendReportProcessingError(
166+
ReportStreamEventName.PIPELINE_EXCEPTION,
167+
report,
168+
fhirEngine.taskAction,
169+
ex.message ?: ""
170+
) {
171+
params(mapOf(ReportStreamEventProperties.POISON_QUEUE_MESSAGE_ID to poisonQueueMessageId))
172+
}
173+
174+
return emptyList()
175+
}
150176
}
151177

152178
/**

prime-router/src/main/kotlin/fhirengine/engine/FHIRConverter.kt

+2
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,8 @@ class FHIRConverter(
8181

8282
override val engineType: String = "Convert"
8383

84+
override val taskAction: TaskAction = TaskAction.convert
85+
8486
/**
8587
* Accepts a [message] in either HL7 or FHIR format
8688
* HL7 messages will be converted into FHIR.

prime-router/src/main/kotlin/fhirengine/engine/FHIRDestinationFilter.kt

+1
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ class FHIRDestinationFilter(
5454
override val finishedField: Field<OffsetDateTime> = Tables.TASK.DESTINATION_FILTERED_AT
5555

5656
override val engineType: String = "DestinationFilter"
57+
override val taskAction: TaskAction = TaskAction.destination_filter
5758

5859
internal fun findTopicReceivers(topic: Topic): List<Receiver> =
5960
settings.receivers.filter { it.customerStatus != CustomerStatus.INACTIVE && it.topic == topic }

prime-router/src/main/kotlin/fhirengine/engine/FHIREngine.kt

+5
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,11 @@ abstract class FHIREngine(
188188
*/
189189
abstract val engineType: String
190190

191+
/**
192+
* The task action associated with the engine
193+
*/
194+
abstract val taskAction: TaskAction
195+
191196
/**
192197
* Result class that is returned as part of completing the work on a message
193198
*

prime-router/src/main/kotlin/fhirengine/engine/FHIRReceiver.kt

+1
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ class FHIRReceiver(
6161
override val finishedField: Field<OffsetDateTime> = Tables.TASK.PROCESSED_AT
6262

6363
override val engineType: String = "Receive"
64+
override val taskAction: TaskAction = TaskAction.receive
6465

6566
private val clientIdHeader = "client_id"
6667
private val contentTypeHeader = "content-type"

prime-router/src/main/kotlin/fhirengine/engine/FHIRReceiverFilter.kt

+1
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ class FHIRReceiverFilter(
6868
override val finishedField: Field<OffsetDateTime> = Tables.TASK.RECEIVER_FILTERED_AT
6969

7070
override val engineType: String = "ReceiverFilter"
71+
override val taskAction: TaskAction = TaskAction.receiver_filter
7172

7273
/**
7374
* Accepts a [message] in internal FHIR format

prime-router/src/main/kotlin/fhirengine/engine/FHIRTranslator.kt

+2
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import gov.cdc.prime.router.azure.BlobAccess
2020
import gov.cdc.prime.router.azure.DatabaseAccess
2121
import gov.cdc.prime.router.azure.Event
2222
import gov.cdc.prime.router.azure.db.Tables
23+
import gov.cdc.prime.router.azure.db.enums.TaskAction
2324
import gov.cdc.prime.router.azure.observability.context.MDCUtils
2425
import gov.cdc.prime.router.azure.observability.context.withLoggingContext
2526
import gov.cdc.prime.router.azure.observability.event.AzureEventService
@@ -170,6 +171,7 @@ class FHIRTranslator(
170171

171172
override val finishedField: Field<OffsetDateTime> = Tables.TASK.TRANSLATED_AT
172173
override val engineType: String = "Translate"
174+
override val taskAction: TaskAction = TaskAction.translate
173175

174176
/**
175177
* Returns a byteArray representation of the [bundle] in a format [receiver] expects, or throws an exception if the

prime-router/src/test/kotlin/SubmissionReceiverTests.kt

+6-6
Original file line numberDiff line numberDiff line change
@@ -787,7 +787,7 @@ class SubmissionReceiverTests {
787787
every { engine.routeReport(any(), any(), any(), any(), any()) } returns routeResult
788788
every { SubmissionReceiver.doDuplicateDetection(any(), any(), any()) } returns Unit
789789
every { engine.insertProcessTask(any(), any(), any(), any()) } returns Unit
790-
every { queueMock.sendMessage(QueueMessage.elrConvertQueueName, any()) } returns Unit
790+
every { queueMock.sendMessage(QueueMessage.elrConvertQueueName, any()) } returns ""
791791

792792
// act
793793
receiver.validateAndMoveToProcessing(
@@ -870,7 +870,7 @@ class SubmissionReceiverTests {
870870
every { engine.routeReport(any(), any(), any(), any(), any()) } returns routeResult
871871
every { SubmissionReceiver.doDuplicateDetection(any(), any(), any()) } returns Unit
872872
every { engine.insertProcessTask(any(), any(), any(), any()) } returns Unit
873-
every { queueMock.sendMessage(QueueMessage.elrConvertQueueName, any()) } returns Unit
873+
every { queueMock.sendMessage(QueueMessage.elrConvertQueueName, any()) } returns ""
874874

875875
// act
876876
receiver.validateAndMoveToProcessing(
@@ -941,7 +941,7 @@ class SubmissionReceiverTests {
941941
every { engine.routeReport(any(), any(), any(), any(), any()) } returns routeResult
942942
every { SubmissionReceiver.doDuplicateDetection(any(), any(), any()) } returns Unit
943943
every { engine.insertProcessTask(any(), any(), any(), any()) } returns Unit
944-
every { queueMock.sendMessage(QueueMessage.elrConvertQueueName, any()) } returns Unit
944+
every { queueMock.sendMessage(QueueMessage.elrConvertQueueName, any()) } returns ""
945945

946946
// act
947947
receiver.validateAndMoveToProcessing(
@@ -1011,7 +1011,7 @@ class SubmissionReceiverTests {
10111011
every { engine.routeReport(any(), any(), any(), any(), any()) } returns routeResult
10121012
every { SubmissionReceiver.doDuplicateDetection(any(), any(), any()) } returns Unit
10131013
every { engine.insertProcessTask(any(), any(), any(), any()) } returns Unit
1014-
every { queueMock.sendMessage(QueueMessage.elrConvertQueueName, any()) } returns Unit
1014+
every { queueMock.sendMessage(QueueMessage.elrConvertQueueName, any()) } returns ""
10151015

10161016
// act
10171017
receiver.validateAndMoveToProcessing(
@@ -1080,7 +1080,7 @@ class SubmissionReceiverTests {
10801080
every { engine.recordReceivedReport(any(), any(), any(), any(), any()) } returns blobInfo
10811081
every { engine.routeReport(any(), any(), any(), any(), any()) } returns routeResult
10821082
every { engine.insertProcessTask(any(), any(), any(), any()) } returns Unit
1083-
every { queueMock.sendMessage(QueueMessage.elrConvertQueueName, any()) } returns Unit
1083+
every { queueMock.sendMessage(QueueMessage.elrConvertQueueName, any()) } returns ""
10841084

10851085
// act
10861086
var exceptionThrown = false
@@ -1153,7 +1153,7 @@ class SubmissionReceiverTests {
11531153
every { engine.recordReceivedReport(any(), any(), any(), any(), any()) } returns blobInfo
11541154
every { engine.routeReport(any(), any(), any(), any(), any()) } returns routeResult
11551155
every { engine.insertProcessTask(any(), any(), any(), any()) } returns Unit
1156-
every { queueMock.sendMessage(QueueMessage.elrConvertQueueName, any()) } returns Unit
1156+
every { queueMock.sendMessage(QueueMessage.elrConvertQueueName, any()) } returns ""
11571157

11581158
// act / assert
11591159
assertFailure {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
package gov.cdc.prime.router.azure
2+
3+
import gov.cdc.prime.reportstream.shared.QueueMessage
4+
import gov.cdc.prime.router.FileSettings
5+
import gov.cdc.prime.router.azure.db.enums.TaskAction
6+
import gov.cdc.prime.router.azure.db.tables.pojos.ReportFile
7+
import gov.cdc.prime.router.azure.observability.event.IReportStreamEventService
8+
import gov.cdc.prime.router.azure.observability.event.ReportStreamEventName
9+
import gov.cdc.prime.router.azure.observability.event.ReportStreamReportProcessingErrorEventBuilder
10+
import gov.cdc.prime.router.common.UniversalPipelineTestUtils
11+
import gov.cdc.prime.router.common.UniversalPipelineTestUtils.createFHIRFunctionsInstance
12+
import gov.cdc.prime.router.fhirengine.azure.FHIRFunctions
13+
import gov.cdc.prime.router.fhirengine.engine.FHIRConverter
14+
import gov.cdc.prime.router.metadata.LookupTable
15+
import gov.cdc.prime.router.unittest.UnitTestUtils
16+
import io.mockk.every
17+
import io.mockk.mockk
18+
import io.mockk.mockkConstructor
19+
import io.mockk.mockkObject
20+
import io.mockk.slot
21+
import io.mockk.spyk
22+
import io.mockk.verify
23+
import org.jooq.exception.DataAccessException
24+
import org.jooq.tools.jdbc.MockConnection
25+
import org.jooq.tools.jdbc.MockDataProvider
26+
import org.jooq.tools.jdbc.MockResult
27+
import org.junit.jupiter.api.BeforeEach
28+
import org.junit.jupiter.api.Test
29+
import org.junit.jupiter.api.assertThrows
30+
import java.util.UUID
31+
32+
class FHIRFunctionsTests {
33+
34+
val queueMessage = """
35+
{
36+
"type": "convert",
37+
"reportId": "${UUID.randomUUID()}",
38+
"blobURL": "",
39+
"digest": "",
40+
"blobSubFolderName": "ignore.full-elr",
41+
"topic": "full-elr",
42+
"schemaName": ""
43+
}
44+
"""
45+
46+
@BeforeEach
47+
fun beforeEach() {
48+
mockkObject(QueueAccess)
49+
every { QueueAccess.sendMessage(any(), any()) } returns "poison-123"
50+
mockkObject(BlobAccess)
51+
mockkConstructor(DatabaseLookupTableAccess::class)
52+
}
53+
54+
private fun createFHIRFunctionsInstance(): FHIRFunctions {
55+
val settings = FileSettings().loadOrganizations(UniversalPipelineTestUtils.universalPipelineOrganization)
56+
val metadata = UnitTestUtils.simpleMetadata
57+
metadata.lookupTableStore += mapOf(
58+
"observation-mapping" to LookupTable("observation-mapping", emptyList())
59+
)
60+
val dataProvider = MockDataProvider { emptyArray<MockResult>() }
61+
val connection = MockConnection(dataProvider)
62+
val accessSpy = spyk(DatabaseAccess(connection))
63+
val workflowEngine = WorkflowEngine.Builder()
64+
.metadata(metadata)
65+
.settingsProvider(settings)
66+
.databaseAccess(accessSpy)
67+
.build()
68+
every { accessSpy.fetchReportFile(any()) } returns mockk<ReportFile>(relaxed = true)
69+
return FHIRFunctions(workflowEngine, databaseAccess = accessSpy)
70+
}
71+
72+
@Test
73+
fun `test should add to the poison queue and catch an unexpected exception`() {
74+
val fhirFunctions = createFHIRFunctionsInstance()
75+
76+
val mockReportEventService = mockk<IReportStreamEventService>(relaxed = true)
77+
val init = slot<ReportStreamReportProcessingErrorEventBuilder.() -> Unit>()
78+
every {
79+
mockReportEventService.sendReportProcessingError(any(), any<ReportFile>(), any(), any(), capture(init))
80+
} returns Unit
81+
val mockFHIRConverter = mockk<FHIRConverter>(relaxed = true)
82+
every { mockFHIRConverter.run(any(), any(), any(), any()) } throws RuntimeException("Error")
83+
every { mockFHIRConverter.reportEventService } returns mockReportEventService
84+
every { mockFHIRConverter.taskAction } returns TaskAction.convert
85+
fhirFunctions.process(queueMessage, 1, mockFHIRConverter, ActionHistory(TaskAction.convert))
86+
87+
verify(exactly = 1) {
88+
QueueAccess.sendMessage(
89+
"${QueueMessage.elrConvertQueueName}-poison",
90+
queueMessage
91+
)
92+
mockReportEventService.sendReportProcessingError(
93+
ReportStreamEventName.PIPELINE_EXCEPTION,
94+
any<ReportFile>(),
95+
TaskAction.convert,
96+
"Error",
97+
init.captured
98+
)
99+
}
100+
}
101+
102+
@Test
103+
fun `test should not add to the poison queue and throw a data access exception`() {
104+
val fhirFunctions = createFHIRFunctionsInstance()
105+
106+
val mockFHIRConverter = mockk<FHIRConverter>(relaxed = true)
107+
every { mockFHIRConverter.run(any(), any(), any(), any()) } throws DataAccessException("Error")
108+
assertThrows<DataAccessException> {
109+
fhirFunctions.process(queueMessage, 1, mockFHIRConverter, ActionHistory(TaskAction.convert))
110+
}
111+
112+
verify(exactly = 0) {
113+
QueueAccess.sendMessage(
114+
"${QueueMessage.elrConvertQueueName}-poison",
115+
queueMessage
116+
)
117+
}
118+
}
119+
}

prime-router/src/test/kotlin/azure/ReportFunctionTests.kt

+4-4
Original file line numberDiff line numberDiff line change
@@ -293,7 +293,7 @@ class ReportFunctionTests {
293293
} returns report1
294294

295295
every { engine.recordReceivedReport(any(), any(), any(), any(), any()) } returns blobInfo
296-
every { engine.queue.sendMessage(any(), any(), any()) } returns Unit
296+
every { engine.queue.sendMessage(any(), any(), any()) } returns ""
297297
val bodyBytes = "".toByteArray()
298298
mockkObject(ReportWriter)
299299
every { ReportWriter.getBodyBytes(any(), any(), any()) }.returns(bodyBytes)
@@ -660,7 +660,7 @@ class ReportFunctionTests {
660660
every { actionHistory.action.sendingOrg } returns "Test Sender"
661661
every { actionHistory.action.actionName } returns TaskAction.receive
662662
every { engine.recordReceivedReport(any(), any(), any(), any(), any()) } returns blobInfo
663-
every { engine.queue.sendMessage(any(), any(), any()) } returns Unit
663+
every { engine.queue.sendMessage(any(), any(), any()) } returns ""
664664
val bodyBytes = "".toByteArray()
665665
mockkObject(ReportWriter)
666666
every { ReportWriter.getBodyBytes(any(), any(), any()) }.returns(bodyBytes)
@@ -714,7 +714,7 @@ class ReportFunctionTests {
714714
every { actionHistory.action.sendingOrg } returns "Test Sender"
715715
every { actionHistory.action.actionName } returns TaskAction.receive
716716
every { engine.recordReceivedReport(any(), any(), any(), any(), any()) } returns blobInfo
717-
every { engine.queue.sendMessage(any(), any(), any()) } returns Unit
717+
every { engine.queue.sendMessage(any(), any(), any()) } returns ""
718718
val bodyBytes = "".toByteArray()
719719
mockkObject(ReportWriter)
720720
every { ReportWriter.getBodyBytes(any(), any(), any()) }.returns(bodyBytes)
@@ -784,7 +784,7 @@ class ReportFunctionTests {
784784
every { actionHistory.action.sendingOrg } returns "Test Sender"
785785
every { actionHistory.action.actionName } returns TaskAction.receive
786786
every { engine.recordReceivedReport(any(), any(), any(), any(), any()) } returns blobInfo
787-
every { engine.queue.sendMessage(any(), any(), any()) } returns Unit
787+
every { engine.queue.sendMessage(any(), any(), any()) } returns ""
788788
val bodyBytes = "".toByteArray()
789789
mockkObject(ReportWriter)
790790
every { ReportWriter.getBodyBytes(any(), any(), any()) }.returns(bodyBytes)

prime-router/src/test/kotlin/fhirengine/azure/FHIRConverterIntegrationTests.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,7 @@ class FHIRConverterIntegrationTests {
158158
@BeforeEach
159159
fun beforeEach() {
160160
mockkObject(QueueAccess)
161-
every { QueueAccess.sendMessage(any(), any()) } returns Unit
161+
every { QueueAccess.sendMessage(any(), any()) } returns ""
162162
mockkObject(BlobAccess)
163163
every { BlobAccess getProperty "defaultBlobMetadata" } returns getBlobContainerMetadata()
164164
mockkObject(BlobAccess.BlobContainerMetadata)

prime-router/src/test/kotlin/fhirengine/azure/FHIRDestinationFilterIntegrationTests.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ class FHIRDestinationFilterIntegrationTests : Logging {
9292
mockkObject(BlobAccess)
9393
mockkObject(BlobAccess.BlobContainerMetadata)
9494

95-
every { QueueAccess.sendMessage(any(), any()) } returns Unit
95+
every { QueueAccess.sendMessage(any(), any()) } returns ""
9696
every { BlobAccess getProperty "defaultBlobMetadata" } returns UniversalPipelineTestUtils
9797
.getBlobContainerMetadata(azuriteContainer)
9898
every { BlobAccess.BlobContainerMetadata.build(any(), any()) } returns

prime-router/src/test/kotlin/fhirengine/azure/FHIRReceiverFilterIntegrationTests.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,7 @@ class FHIRReceiverFilterIntegrationTests : Logging {
179179
@BeforeEach
180180
fun beforeEach() {
181181
mockkObject(QueueAccess)
182-
every { QueueAccess.sendMessage(any(), any()) } returns Unit
182+
every { QueueAccess.sendMessage(any(), any()) } returns ""
183183
mockkObject(BlobAccess)
184184
every { BlobAccess getProperty "defaultBlobMetadata" } returns UniversalPipelineTestUtils
185185
.getBlobContainerMetadata(azuriteContainer)

0 commit comments

Comments
 (0)