diff --git a/nexus-ingestion-api/src/main/java/org/techbd/ingest/listener/NettyTcpServer.java b/nexus-ingestion-api/src/main/java/org/techbd/ingest/listener/NettyTcpServer.java index 4c29ae38c51..f424f558fb6 100644 --- a/nexus-ingestion-api/src/main/java/org/techbd/ingest/listener/NettyTcpServer.java +++ b/nexus-ingestion-api/src/main/java/org/techbd/ingest/listener/NettyTcpServer.java @@ -41,6 +41,7 @@ import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; import io.netty.channel.SimpleChannelInboundHandler; @@ -51,6 +52,7 @@ import io.netty.handler.codec.haproxy.HAProxyCommand; import io.netty.handler.codec.haproxy.HAProxyMessage; import io.netty.handler.codec.haproxy.HAProxyMessageDecoder; +import io.netty.handler.timeout.ReadTimeoutException; import io.netty.handler.timeout.ReadTimeoutHandler; import io.netty.util.AttributeKey; import jakarta.annotation.PostConstruct; @@ -65,10 +67,10 @@ public class NettyTcpServer implements MessageSourceProvider { @Value("${TCP_DISPATCHER_PORT:6001}") private int tcpPort; - @Value("${TCP_READ_TIMEOUT_SECONDS:30}") + @Value("${TCP_READ_TIMEOUT_SECONDS:180}") private int readTimeoutSeconds; - @Value("${TCP_MAX_MESSAGE_SIZE_BYTES:10485760}") // 10MB default + @Value("${TCP_MAX_MESSAGE_SIZE_BYTES:52428800}") // 50MB default private int maxMessageSizeBytes; // MLLP protocol markers (unchanged) @@ -99,6 +101,8 @@ public class NettyTcpServer implements MessageSourceProvider { private static final AttributeKey MESSAGE_START_TIME_KEY = AttributeKey.valueOf("MESSAGE_START_TIME"); private static final AttributeKey FRAGMENT_COUNT_KEY = AttributeKey.valueOf("FRAGMENT_COUNT"); private static final AttributeKey TOTAL_BYTES_KEY = AttributeKey.valueOf("TOTAL_BYTES"); + private static final AttributeKey MESSAGE_SIZE_EXCEEDED_KEY = AttributeKey.valueOf("MESSAGE_SIZE_EXCEEDED"); + private static final AttributeKey ERROR_NACK_SENT_KEY = AttributeKey.valueOf("ERROR_NACK_SENT"); public NettyTcpServer(MessageProcessorService messageProcessorService, AppConfig appConfig, @@ -130,6 +134,8 @@ protected void initChannel(SocketChannel ch) { ch.attr(MESSAGE_START_TIME_KEY).set(System.currentTimeMillis()); ch.attr(FRAGMENT_COUNT_KEY).set(new AtomicInteger(0)); ch.attr(TOTAL_BYTES_KEY).set(new AtomicLong(0)); + ch.attr(MESSAGE_SIZE_EXCEEDED_KEY).set(false); + ch.attr(ERROR_NACK_SENT_KEY).set(false); // Add read timeout handler ch.pipeline().addLast(new ReadTimeoutHandler(readTimeoutSeconds, TimeUnit.SECONDS)); @@ -186,10 +192,99 @@ protected void channelRead0(ChannelHandlerContext ctx, Object msg) { @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { UUID interactionId = ctx.channel().attr(INTERACTION_ATTRIBUTE_KEY).get(); - logger.error("Exception in TCP handler for interactionId {}: {}", - interactionId, cause.getMessage(), cause); + if (interactionId == null) { + interactionId = UUID.randomUUID(); + ctx.channel().attr(INTERACTION_ATTRIBUTE_KEY).set(interactionId); + logger.warn("NULL_INTERACTION_ID_DETECTED - generated new UUID: {}", interactionId); + } + + // Check if we've already sent a NACK for this channel + Boolean nackAlreadySent = ctx.channel().attr(ERROR_NACK_SENT_KEY).get(); + if (nackAlreadySent != null && nackAlreadySent) { + logger.debug("NACK_ALREADY_SENT [interactionId={}] - skipping duplicate NACK", interactionId); + return; + } + + // Mark that we're sending a NACK to prevent duplicates + ctx.channel().attr(ERROR_NACK_SENT_KEY).set(true); + + String errorTraceId = ErrorTraceIdGenerator.generateErrorTraceId(); + + logger.error("Exception in TCP handler [interactionId={}] [errorTraceId={}]: {}", + interactionId, errorTraceId, cause.getMessage(), cause); + + // Convert Throwable to Exception for logging if needed + Exception exceptionForLogging; + if (cause instanceof Exception) { + exceptionForLogging = (Exception) cause; + } else { + exceptionForLogging = new Exception(cause); + } + + try { + LogUtil.logDetailedError( + 500, + "Channel exception caught", + interactionId.toString(), + errorTraceId, + exceptionForLogging + ); + } catch (Exception logException) { + logger.warn("Failed to log detailed error [interactionId={}]: {}", + interactionId, logException.getMessage()); + } + if (ctx.channel().isActive()) { - ctx.close(); + try { + // Try to send NACK before closing + String errorMsg = cause.getMessage() != null ? cause.getMessage() : "Unknown error"; + String sanitizedError = errorMsg.replace("|", " ").replace("\r", " ").replace("\n", " "); + + String genericNack = "MSH|^~\\&|SERVER|LOCAL|CLIENT|REMOTE|" + Instant.now() + "||ACK|" + + UUID.randomUUID().toString().substring(0, 20) + "|P|2.5\r" + + "MSA|AR|UNKNOWN|Channel exception: " + sanitizedError + "\r" + + "ERR|||207^Application internal error^HL70357||E|||Channel exception occurred\r" + + "NTE|1||InteractionID: " + interactionId + " | TechBDIngestionApiVersion: " + + appConfig.getVersion() + " | ErrorTraceID: " + errorTraceId + "\r"; + + String wrappedNack = String.valueOf((char)MLLP_START) + genericNack + (char)MLLP_END_1 + (char)MLLP_END_2; + + ByteBuf responseBuf = ctx.alloc().buffer(); + responseBuf.writeBytes(wrappedNack.getBytes(StandardCharsets.UTF_8)); + + logger.info("SENDING_NACK_ON_EXCEPTION [interactionId={}] [errorTraceId={}]", + interactionId, errorTraceId); + + final UUID finalInteractionId = interactionId; + // Synchronous write with delay before close + ctx.writeAndFlush(responseBuf).addListener(future -> { + if (future.isSuccess()) { + logger.info("NACK_SENT_ON_EXCEPTION [interactionId={}] [errorTraceId={}]", + finalInteractionId, errorTraceId); + // Delay close to ensure NACK is transmitted + ctx.executor().schedule(() -> { + logger.info("CLOSING_CONNECTION_AFTER_EXCEPTION [interactionId={}]", finalInteractionId); + clearChannelAttributes(ctx); + ctx.close(); + }, 100, TimeUnit.MILLISECONDS); + } else { + logger.error("NACK_SEND_FAILED_ON_EXCEPTION [interactionId={}] [errorTraceId={}]: {}", + finalInteractionId, errorTraceId, + future.cause() != null ? future.cause().getMessage() : "unknown"); + clearChannelAttributes(ctx); + ctx.close(); + } + }); + } catch (Exception e) { + logger.error("FAILED_TO_SEND_NACK_ON_EXCEPTION [interactionId={}] [errorTraceId={}]: {}", + interactionId, errorTraceId, e.getMessage(), e); + clearChannelAttributes(ctx); + ctx.close(); + } + } else { + logger.warn("CHANNEL_ALREADY_INACTIVE [interactionId={}] [errorTraceId={}] - cannot send NACK", + interactionId, errorTraceId); + clearChannelAttributes(ctx); } } @@ -201,6 +296,118 @@ public void channelInactive(ChannelHandlerContext ctx) { clearChannelAttributes(ctx); } }); + ch.pipeline().addLast(new ChannelInboundHandlerAdapter() { + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) + throws Exception { + if (cause instanceof ReadTimeoutException) { + UUID interactionId = ctx.channel().attr(INTERACTION_ATTRIBUTE_KEY).get(); + if (interactionId == null) { + interactionId = UUID.randomUUID(); + ctx.channel().attr(INTERACTION_ATTRIBUTE_KEY).set(interactionId); + } + + // Check if we've already sent a NACK for this channel + Boolean nackAlreadySent = ctx.channel().attr(ERROR_NACK_SENT_KEY).get(); + if (nackAlreadySent != null && nackAlreadySent) { + logger.debug( + "NACK_ALREADY_SENT_ON_TIMEOUT [interactionId={}] - skipping duplicate", + interactionId); + ctx.close(); + return; + } + + // Mark that we're sending a NACK + ctx.channel().attr(ERROR_NACK_SENT_KEY).set(true); + + String errorTraceId = ErrorTraceIdGenerator.generateErrorTraceId(); + + logger.error( + "READ_TIMEOUT_EXCEEDED [interactionId={}] [errorTraceId={}] timeout={}s - sending NACK", + interactionId, errorTraceId, readTimeoutSeconds); + + try { + LogUtil.logDetailedError( + 408, + String.format("Read timeout exceeded after %d seconds", + readTimeoutSeconds), + interactionId.toString(), + errorTraceId, + new ReadTimeoutException()); + } catch (Exception logException) { + logger.warn("Failed to log timeout error [interactionId={}]: {}", + interactionId, logException.getMessage()); + } + + if (ctx.channel().isActive()) { + try { + String timeoutError = String.format( + "Read timeout: No complete message received within %d seconds", + readTimeoutSeconds); + + // Generate HL7 NACK for timeout + String timeoutNack = "MSH|^~\\&|SERVER|LOCAL|CLIENT|REMOTE|" + + Instant.now() + "||ACK|" + + UUID.randomUUID().toString().substring(0, 20) + "|P|2.5\r" + + "MSA|AR|UNKNOWN|" + timeoutError + "\r" + + "ERR|||207^Application internal error^HL70357||E|||Read timeout occurred\r" + + + "NTE|1||InteractionID: " + interactionId + + " | TechBDIngestionApiVersion: " + appConfig.getVersion() + + " | ErrorTraceID: " + errorTraceId + "\r"; + + String wrappedNack = String.valueOf((char) MLLP_START) + timeoutNack + + (char) MLLP_END_1 + (char) MLLP_END_2; + + ByteBuf responseBuf = ctx.alloc().buffer(); + responseBuf + .writeBytes(wrappedNack.getBytes(StandardCharsets.UTF_8)); + + logger.info( + "SENDING_NACK_ON_TIMEOUT [interactionId={}] [errorTraceId={}]", + interactionId, errorTraceId); + + final UUID finalInteractionId = interactionId; + final String finalErrorTraceId = errorTraceId; + + ctx.writeAndFlush(responseBuf).addListener(future -> { + if (future.isSuccess()) { + logger.info( + "NACK_SENT_ON_TIMEOUT [interactionId={}] [errorTraceId={}]", + finalInteractionId, finalErrorTraceId); + } else { + logger.error( + "NACK_SEND_FAILED_ON_TIMEOUT [interactionId={}] [errorTraceId={}]: {}", + finalInteractionId, finalErrorTraceId, + future.cause() != null ? future.cause().getMessage() + : "unknown"); + } + // Close connection after attempting to send NACK + logger.info( + "CLOSING_CONNECTION_AFTER_TIMEOUT [interactionId={}]", + finalInteractionId); + clearChannelAttributes(ctx); + ctx.close(); + }); + } catch (Exception e) { + logger.error( + "FAILED_TO_SEND_NACK_ON_TIMEOUT [interactionId={}] [errorTraceId={}]: {}", + interactionId, errorTraceId, e.getMessage(), e); + clearChannelAttributes(ctx); + ctx.close(); + } + } else { + logger.warn( + "CHANNEL_ALREADY_INACTIVE_ON_TIMEOUT [interactionId={}] [errorTraceId={}]", + interactionId, errorTraceId); + clearChannelAttributes(ctx); + } + } else { + // Pass other exceptions to the next handler + super.exceptionCaught(ctx, cause); + } + } + }); } }); @@ -258,7 +465,7 @@ private void parseTcpDelimiters() { *
  • Buffers partial messages until complete delimiters are received
  • *
  • Tracks fragment count and total bytes for logging purposes
  • *
  • Supports MLLP (STX/ETX) and custom TCP start/end delimiters
  • - *
  • Handles messages exceeding max frame length safely
  • + *
  • Handles messages exceeding max frame length safely by capturing complete message
  • * */ private class DelimiterBasedFrameDecoder extends ByteToMessageDecoder { @@ -320,10 +527,10 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) t if (endIndex == -1) { // End markers not found yet if (in.readableBytes() > maxFrameLength) { - logger.error("MLLP_MESSAGE_TOO_LARGE [interactionId={}] size={} bytes exceeds max={} bytes", + logger.warn("MLLP_MESSAGE_SIZE_LIMIT_EXCEEDED [interactionId={}] size={} bytes exceeds max={} bytes", interactionId, in.readableBytes(), maxFrameLength); - in.skipBytes(in.readableBytes()); - throw new IllegalStateException("MLLP message exceeds max size: " + maxFrameLength + " bytes"); + ctx.channel().attr(MESSAGE_SIZE_EXCEEDED_KEY).set(true); + endIndex=startIndex + 1; } logger.info("MLLP_END_NOT_FOUND [interactionId={}] buffered={} bytes, waiting for more data", interactionId, in.readableBytes()); @@ -358,10 +565,12 @@ else if (firstByte == tcpStartDelimiter) { if (endIndex == -1) { // End markers not found yet if (in.readableBytes() > maxFrameLength) { - logger.error("TCP_DELIMITED_MESSAGE_TOO_LARGE [interactionId={}] size={} bytes exceeds max={} bytes", + logger.warn("TCP_DELIMITED_MESSAGE_SIZE_LIMIT_EXCEEDED [interactionId={}] size={} bytes exceeds max={} bytes", interactionId, in.readableBytes(), maxFrameLength); - in.skipBytes(in.readableBytes()); - throw new IllegalStateException("TCP delimited message exceeds max size: " + maxFrameLength + " bytes"); + + // Set flag but continue buffering to capture complete message + ctx.channel().attr(MESSAGE_SIZE_EXCEEDED_KEY).set(true); + endIndex=startIndex + 1; } logger.debug("TCP_END_NOT_FOUND [interactionId={}] buffered={} bytes, waiting for more data", interactionId, in.readableBytes()); @@ -419,6 +628,8 @@ private void clearChannelAttributes(ChannelHandlerContext ctx) { ctx.channel().attr(MESSAGE_START_TIME_KEY).set(null); ctx.channel().attr(FRAGMENT_COUNT_KEY).set(null); ctx.channel().attr(TOTAL_BYTES_KEY).set(null); + ctx.channel().attr(MESSAGE_SIZE_EXCEEDED_KEY).set(null); + ctx.channel().attr(ERROR_NACK_SENT_KEY).set(null); ctx.channel().attr(CLIENT_IP_KEY).set(null); ctx.channel().attr(CLIENT_PORT_KEY).set(null); ctx.channel().attr(DESTINATION_IP_KEY).set(null); @@ -468,7 +679,7 @@ private void handleSandboxProxy(ChannelHandlerContext ctx, UUID interactionId) { /** * Main message handler - routes to HL7 or generic handler based on MLLP detection - * MODIFIED: Added error trace ID generation and structured logging + * Added size limit check and error trace ID generation */ private void handleMessage(ChannelHandlerContext ctx, String rawMessage, UUID interactionId) { String clientIP = ctx.channel().attr(CLIENT_IP_KEY).get(); @@ -484,6 +695,82 @@ private void handleMessage(ChannelHandlerContext ctx, String rawMessage, UUID in destinationIP = ctx.channel().localAddress().toString(); } + // Check if message size exceeded limit - now we have the complete message + Boolean messageSizeExceeded = ctx.channel().attr(MESSAGE_SIZE_EXCEEDED_KEY).get(); + if (messageSizeExceeded != null && messageSizeExceeded) { + String errorTraceId = ErrorTraceIdGenerator.generateErrorTraceId(); + String errorMessage = String.format("Message size %d bytes exceeds maximum allowed size of %d bytes", + rawMessage.length(), maxMessageSizeBytes); + + logger.error("MESSAGE_SIZE_LIMIT_EXCEEDED [interactionId={}] [errorTraceId={}] size={} bytes, max={} bytes", + interactionId, errorTraceId, rawMessage.length(), maxMessageSizeBytes); + + LogUtil.logDetailedError( + 413, + errorMessage, + interactionId.toString(), + errorTraceId, + new IllegalArgumentException("Message size limit exceeded") + ); + + boolean isMllpWrapped = detectMllpWrapper(rawMessage); + boolean isTcpDelimited = detectTcpDelimiterWrapper(rawMessage); + + // Build context and store the COMPLETE oversized message + RequestContext requestContext = buildRequestContext( + rawMessage.trim(), // Store complete message with delimiters + interactionId.toString(), + Optional.empty(), + String.valueOf(clientPort), + clientIP, + destinationIP, + String.valueOf(destinationPort), + isMllpWrapped ? MessageSourceType.MLLP : MessageSourceType.TCP); + + requestContext.setIngestionFailed(true); + + String nackMessage; + if (isMllpWrapped) { + nackMessage = createHL7AckFromMsh( + rawMessage, + "AR", + errorMessage, + interactionId.toString(), + errorTraceId); + + // try { + // // Store the complete oversized message + // logger.info("STORING_COMPLETE_OVERSIZED_MESSAGE [interactionId={}] [errorTraceId={}] size={} bytes", + // interactionId, errorTraceId, rawMessage.length()); + // messageProcessorService.processMessage(requestContext, rawMessage, nackMessage); + // logger.info("OVERSIZED_MESSAGE_STORED_SUCCESSFULLY [interactionId={}] [errorTraceId={}]", + // interactionId, errorTraceId); + // } catch (Exception e) { + // logger.error("FAILED_TO_STORE_OVERSIZED_MESSAGE [interactionId={}] [errorTraceId={}]: {}", + // interactionId, errorTraceId, e.getMessage(), e); + // } + + sendResponseAndClose(ctx, wrapMllp(nackMessage), interactionId, "HL7_NACK_SIZE_EXCEEDED"); + } else { + nackMessage = generateSimpleNack(interactionId.toString(), errorMessage, errorTraceId); + + // try { + // // Store the complete oversized message + // logger.info("STORING_COMPLETE_OVERSIZED_MESSAGE [interactionId={}] [errorTraceId={}] size={} bytes", + // interactionId, errorTraceId, rawMessage.length()); + // messageProcessorService.processMessage(requestContext, rawMessage, nackMessage); + // logger.info("OVERSIZED_MESSAGE_STORED_SUCCESSFULLY [interactionId={}] [errorTraceId={}]", + // interactionId, errorTraceId); + // } catch (Exception e) { + // logger.error("FAILED_TO_STORE_OVERSIZED_MESSAGE [interactionId={}] [errorTraceId={}]: {}", + // interactionId, errorTraceId, e.getMessage(), e); + // } + + sendResponseAndClose(ctx, nackMessage + "\n", interactionId, "TCP_NACK_SIZE_EXCEEDED"); + } + return; + } + boolean isMllpWrapped = detectMllpWrapper(rawMessage); // NEW: Also detect TCP delimiter wrapper boolean isTcpDelimited = detectTcpDelimiterWrapper(rawMessage); @@ -682,9 +969,12 @@ private void handleHL7Message( String cleanMsg = null; String ackMessage = null; Message hl7Message = null; - boolean nackGenerated = false; String errorTraceId = null; RequestContext requestContext = null; + boolean shouldSendResponse = true; + String responseToSend = null; + String responseType = null; + try { try { cleanMsg = unwrapMllp(rawMessage); @@ -732,7 +1022,6 @@ private void handleHL7Message( null, interactionId.toString(), null); // No errorTraceId for successful ACK - nackGenerated = false; } if (!zntPresent) { @@ -759,15 +1048,16 @@ private void handleHL7Message( interactionId.toString(), errorTraceId); requestContext.setIngestionFailed(true); - messageProcessorService.processMessage(requestContext, cleanMsg, nack); - sendResponseAndClose(ctx, wrapMllp(nack), interactionId, "HL7_NACK_MISSING_ZNT"); + messageProcessorService.processMessage(requestContext, cleanMsg, nack); + responseToSend = wrapMllp(nack); + responseType = "HL7_NACK_MISSING_ZNT"; return; } } messageProcessorService.processMessage(requestContext, cleanMsg, ackMessage); // Send MLLP-wrapped ACK - String response = wrapMllp(ackMessage); - sendResponseAndClose(ctx, response, interactionId, "HL7_ACK"); + responseToSend = wrapMllp(ackMessage); + responseType = "HL7_ACK"; } catch (Exception e) { // Generate error trace ID for processing errors @@ -776,7 +1066,6 @@ private void handleHL7Message( logger.error("PROCESSING_ERROR [interactionId={}] [errorTraceId={}] Sending Reject NACK(AR): {}", interactionId, errorTraceId, e.getMessage(), e); - // Log detailed error LogUtil.logDetailedError( 500, "Internal processing error", @@ -785,18 +1074,47 @@ private void handleHL7Message( e ); - // Final fallback NACK (AR) - String errorAck = createHL7AckFromMsh( - cleanMsg != null ? cleanMsg : unwrapMllp(rawMessage), - "AR", - e.getMessage(), - interactionId.toString(), - errorTraceId); - if (requestContext != null) { - requestContext.setIngestionFailed(true); - messageProcessorService.processMessage(requestContext, cleanMsg, errorAck); - } - sendResponseAndClose(ctx, wrapMllp(errorAck), interactionId, "HL7_NACK"); + try { + String errorAck = createHL7AckFromMsh( + cleanMsg != null ? cleanMsg : unwrapMllp(rawMessage), + "AR", + e.getMessage(), + interactionId.toString(), + errorTraceId); + + if (requestContext != null) { + requestContext.setIngestionFailed(true); + messageProcessorService.processMessage(requestContext, cleanMsg, errorAck); + } + + responseToSend = wrapMllp(errorAck); + responseType = "HL7_NACK"; + } catch (Exception nackException) { + logger.error("FAILED_TO_PREPARE_NACK [interactionId={}] [errorTraceId={}]: {}", + interactionId, errorTraceId, nackException.getMessage(), nackException); + shouldSendResponse = false; + } + } finally { + // Always send response in finally block + if (shouldSendResponse && responseToSend != null && responseType != null) { + sendResponseAndClose(ctx, responseToSend, interactionId, responseType); + } else if (ctx.channel().isActive()) { + // Send generic NACK for unexpected errors + if (errorTraceId == null) { + errorTraceId = ErrorTraceIdGenerator.generateErrorTraceId(); + } + logger.error("NO_RESPONSE_PREPARED [interactionId={}] [errorTraceId={}] - sending generic NACK", + interactionId, errorTraceId); + + String genericNack = "MSH|^~\\&|SERVER|LOCAL|CLIENT|REMOTE|" + Instant.now() + "||ACK|" + + UUID.randomUUID().toString().substring(0, 20) + "|P|2.5\r" + + "MSA|AR|UNKNOWN|Unexpected error occurred\r" + + "ERR|||207^Application internal error^HL70357||E|||Unexpected error occurred\r" + + "NTE|1||InteractionID: " + interactionId + " | TechBDIngestionApiVersion: " + + appConfig.getVersion() + " | ErrorTraceID: " + errorTraceId + "\r"; + + sendResponseAndClose(ctx, wrapMllp(genericNack), interactionId, "HL7_NACK_UNEXPECTED_ERROR"); + } } } @@ -809,6 +1127,9 @@ private void handleGenericMessage(ChannelHandlerContext ctx, String rawMessage, Optional portEntryOpt) { String errorTraceId = null; RequestContext requestContext = null; + boolean shouldSendResponse = true; + String responseToSend = null; + String responseType = null; try { String cleanMsg = rawMessage.trim(); @@ -836,18 +1157,16 @@ private void handleGenericMessage(ChannelHandlerContext ctx, String rawMessage, messageProcessorService.processMessage(requestContext, cleanMsg, ackMessage); // Send simple ACK (with newline, no MLLP wrapping) - sendResponseAndClose(ctx, ackMessage + "\n", interactionId, "SIMPLE_ACK"); + responseToSend = ackMessage + "\n"; + responseType = "SIMPLE_ACK"; } catch (Exception e) { - // Generate error trace ID for processing errors + // Prepare NACK for finally block errorTraceId = ErrorTraceIdGenerator.generateErrorTraceId(); - if (requestContext != null) { - requestContext.setIngestionFailed(true); - } + logger.error("GENERIC_PROCESSING_ERROR [interactionId={}] [errorTraceId={}]: {}", interactionId, errorTraceId, e.getMessage(), e); - // Log detailed error LogUtil.logDetailedError( 500, "Generic message processing error", @@ -856,11 +1175,44 @@ private void handleGenericMessage(ChannelHandlerContext ctx, String rawMessage, e ); - String errorResponse = generateSimpleNack(interactionId.toString(), e.getMessage(), errorTraceId) + "\n"; - if (requestContext != null) { - messageProcessorService.processMessage(requestContext, rawMessage.trim(), errorResponse); + try { + if (requestContext != null) { + requestContext.setIngestionFailed(true); + } + + String errorResponse = generateSimpleNack(interactionId.toString(), e.getMessage(), errorTraceId) + "\n"; + + if (requestContext != null) { + messageProcessorService.processMessage(requestContext, rawMessage.trim(), errorResponse); + } + + responseToSend = errorResponse; + responseType = "SIMPLE_NACK"; + } catch (Exception nackException) { + logger.error("FAILED_TO_PREPARE_NACK [interactionId={}] [errorTraceId={}]: {}", + interactionId, errorTraceId, nackException.getMessage(), nackException); + shouldSendResponse = false; + } + } finally { + // Always send response in finally block + if (shouldSendResponse && responseToSend != null && responseType != null) { + sendResponseAndClose(ctx, responseToSend, interactionId, responseType); + } else if (ctx.channel().isActive()) { + // Send generic NACK for unexpected errors + if (errorTraceId == null) { + errorTraceId = ErrorTraceIdGenerator.generateErrorTraceId(); + } + logger.error("NO_RESPONSE_PREPARED [interactionId={}] [errorTraceId={}] - sending generic NACK", + interactionId, errorTraceId); + + String genericNack = String.format("NACK|InteractionId^%s|ErrorTraceId^%s|ERROR|%s|%s\n", + interactionId, + errorTraceId, + "Unexpected error occurred", + Instant.now().toString()); + + sendResponseAndClose(ctx, genericNack, interactionId, "SIMPLE_NACK_UNEXPECTED_ERROR"); } - sendResponseAndClose(ctx, errorResponse, interactionId, "SIMPLE_NACK"); } } diff --git a/pom.xml b/pom.xml index 65027040674..a527d8aa277 100644 --- a/pom.xml +++ b/pom.xml @@ -20,7 +20,7 @@ - 0.996.0 + 0.997.0 21 3.3.3 21