diff --git a/src/main/java/com/uci/transformer/odk/ODKConsumerReactive.java b/src/main/java/com/uci/transformer/odk/ODKConsumerReactive.java index c08bf21..de29b29 100644 --- a/src/main/java/com/uci/transformer/odk/ODKConsumerReactive.java +++ b/src/main/java/com/uci/transformer/odk/ODKConsumerReactive.java @@ -82,6 +82,9 @@ public class ODKConsumerReactive extends TransformerProvider { @Value("${outbound}") public String outboundTopic; + @Value("${processOutbound}") + private String processOutboundTopic; + @Value("${telemetry}") public String telemetryTopic; @@ -141,7 +144,7 @@ public void accept(List messages) { messages = (ArrayList) messages; for (XMessage msg : messages) { try { - kafkaProducer.send(outboundTopic, msg.toXML()); + kafkaProducer.send(processOutboundTopic, msg.toXML()); } catch (JAXBException e) { e.printStackTrace(); } @@ -156,7 +159,7 @@ public void accept(XMessage transformedMessage) { logTimeTaken(startTime, 2); if (transformedMessage != null) { try { - kafkaProducer.send(outboundTopic, transformedMessage.toXML()); + kafkaProducer.send(processOutboundTopic, transformedMessage.toXML()); long endTime = System.nanoTime(); long duration = (endTime - startTime); log.error("Total time spent in processing form: " + duration / 1000000); diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index 8ec30ee..a84db56 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -16,6 +16,7 @@ gupshup-opted-out=${KAFKA_INBOUND_GS_OPTED_OUT_TOPIC} inbound-error=${KAFKA_INBOUND_ERROR_TOPIC} odk-transformer=${KAFKA_ODK_TRANSFORMER_TOPIC} odk-topic-pattern=${KAFKA_ODK_TRANSFORMER_TOPIC_PATTERN} +processOutbound=${KAFKA_PROCESS_OUTBOUND} spring.autoconfigure.exclude=org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration spring.r2dbc.initialization-mode=always