@@ -1456,16 +1456,18 @@ void messageReceived(CommandMessage cmdMessage, ByteBuf headersAndPayload, Clien
14561456 return ;
14571457 }
14581458
1459- ByteBuf decryptedPayload = decryptPayloadIfNeeded (messageId , redeliveryCount , msgMetadata , headersAndPayload ,
1459+ DecryptResult decryptResult = decryptPayloadIfNeeded (messageId , redeliveryCount , msgMetadata , headersAndPayload ,
14601460 cnx );
14611461
1462- boolean isMessageUndecryptable = isMessageUndecryptable (msgMetadata );
1463-
1464- if (decryptedPayload == null ) {
1462+ if (decryptResult .shouldDiscard ()) {
14651463 // Message was discarded or CryptoKeyReader isn't implemented
14661464 return ;
14671465 }
14681466
1467+ boolean isMessageUndecryptable = !decryptResult .success ;
1468+
1469+ ByteBuf decryptedPayload = decryptResult .payload ;
1470+
14691471 // uncompress decryptedPayload and release decryptedPayload-ByteBuf
14701472 ByteBuf uncompressedPayload = (isMessageUndecryptable || isChunkedMessage ) ? decryptedPayload .retain ()
14711473 : uncompressPayloadIfNeeded (messageId , msgMetadata , decryptedPayload , cnx , true );
@@ -1951,11 +1953,53 @@ public long getLastDisconnectedTimestamp() {
19511953 return connectionHandler .lastConnectionClosedTimestamp ;
19521954 }
19531955
1954- private ByteBuf decryptPayloadIfNeeded (MessageIdData messageId , int redeliveryCount , MessageMetadata msgMetadata ,
1955- ByteBuf payload , ClientCnx currentCnx ) {
1956+ /**
1957+ * Represents the outcome of a message decryption attempt for the consumer.
1958+ */
1959+ private static class DecryptResult {
1960+ private final boolean success ;
1961+ private final ByteBuf payload ;
1962+
1963+ private DecryptResult (boolean success , ByteBuf decryptedPayload ) {
1964+ this .success = success ;
1965+ this .payload = decryptedPayload ;
1966+ }
1967+
1968+ /**
1969+ * Returns true if the message should be discarded and not delivered to the consumer user.
1970+ */
1971+ public boolean shouldDiscard () {
1972+ return this .payload == null ;
1973+ }
1974+
1975+ /**
1976+ * Creates a result indicating decryption succeeded and the payload is ready for use.
1977+ */
1978+ public static DecryptResult success (ByteBuf decryptedPayload ) {
1979+ return new DecryptResult (true , decryptedPayload );
1980+ }
1981+
1982+ /**
1983+ * Creates a result indicating decryption failed, but the message should still be delivered.
1984+ */
1985+ public static DecryptResult failure (ByteBuf decryptedPayload ) {
1986+ return new DecryptResult (false , decryptedPayload );
1987+ }
1988+
1989+ /**
1990+ * Creates a result indicating the message should be discarded.
1991+ */
1992+ public static DecryptResult discard () {
1993+ return new DecryptResult (false , null );
1994+ }
1995+ }
1996+
1997+ private DecryptResult decryptPayloadIfNeeded (MessageIdData messageId , int redeliveryCount ,
1998+ MessageMetadata msgMetadata ,
1999+ ByteBuf payload , ClientCnx currentCnx ) {
19562000
19572001 if (msgMetadata .getEncryptionKeysCount () == 0 ) {
1958- return payload .retain ();
2002+ return DecryptResult . success ( payload .retain () );
19592003 }
19602004 int batchSize = msgMetadata .getNumMessagesInBatch ();
19612005 // If KeyReader is not configured throw exception based on config param
@@ -1969,15 +2013,15 @@ private ByteBuf decryptPayloadIfNeeded(MessageIdData messageId, int redeliveryCo
19692013 ByteBuffer nioDecryptedData = decryptedData .nioBuffer (0 , maxDecryptedSize );
19702014 if (msgCrypto .decrypt (() -> msgMetadata , payload .nioBuffer (), nioDecryptedData , conf .getCryptoKeyReader ())) {
19712015 decryptedData .writerIndex (nioDecryptedData .limit ());
1972- return decryptedData ;
2016+ return DecryptResult . success ( decryptedData ) ;
19732017 }
19742018
19752019 decryptedData .release ();
19762020
19772021 return handleCryptoFailure (payload , messageId , currentCnx , redeliveryCount , batchSize , false );
19782022 }
19792023
1980- private ByteBuf handleCryptoFailure (ByteBuf payload , MessageIdData messageId , ClientCnx currentCnx ,
2024+ private DecryptResult handleCryptoFailure (ByteBuf payload , MessageIdData messageId , ClientCnx currentCnx ,
19812025 int redeliveryCount , int batchSize , boolean cryptoReaderNotExist ) {
19822026
19832027 switch (conf .getCryptoFailureAction ()) {
@@ -1990,7 +2034,7 @@ private ByteBuf handleCryptoFailure(ByteBuf payload, MessageIdData messageId, Cl
19902034 log .warn ("[{}][{}][{}][{}] Decryption failed. Consuming encrypted message since config is set to"
19912035 + " consume." , topic , subscription , consumerName , messageId );
19922036 }
1993- return payload .retain ();
2037+ return DecryptResult . failure ( payload .retain () );
19942038 case DISCARD :
19952039 if (cryptoReaderNotExist ) {
19962040 log .warn (
@@ -2005,7 +2049,7 @@ private ByteBuf handleCryptoFailure(ByteBuf payload, MessageIdData messageId, Cl
20052049 messageId .getBatchIndex ());
20062050 }
20072051 discardMessage (messageId , currentCnx , ValidationError .DecryptionError , batchSize );
2008- return null ;
2052+ return DecryptResult . discard () ;
20092053 case FAIL :
20102054 if (cryptoReaderNotExist ) {
20112055 log .error (
@@ -2020,11 +2064,11 @@ private ByteBuf handleCryptoFailure(ByteBuf payload, MessageIdData messageId, Cl
20202064 }
20212065 MessageId m = new MessageIdImpl (messageId .getLedgerId (), messageId .getEntryId (), partitionIndex );
20222066 unAckedMessageTracker .add (m , redeliveryCount );
2023- return null ;
2067+ return DecryptResult . discard () ;
20242068 default :
20252069 log .warn ("[{}][{}][{}] Invalid crypto failure state found, continue message consumption." , topic ,
20262070 subscription , consumerName );
2027- return payload .retain ();
2071+ return DecryptResult . failure ( payload .retain () );
20282072 }
20292073 }
20302074
0 commit comments