33import static io .quarkus .smallrye .reactivemessaging .kafka .HibernateOrmStateStore .HIBERNATE_ORM_STATE_STORE ;
44import static io .quarkus .smallrye .reactivemessaging .kafka .HibernateReactiveStateStore .HIBERNATE_REACTIVE_STATE_STORE ;
55import static io .quarkus .smallrye .reactivemessaging .kafka .RedisStateStore .REDIS_STATE_STORE ;
6+ import static io .quarkus .smallrye .reactivemessaging .runtime .ReactiveMessagingConfiguration .getChannelIncomingPropertyName ;
7+ import static io .quarkus .smallrye .reactivemessaging .runtime .ReactiveMessagingConfiguration .getChannelOutgoingPropertyName ;
8+ import static io .quarkus .smallrye .reactivemessaging .runtime .ReactiveMessagingConfiguration .getChannelPropertyName ;
69
710import java .util .ArrayList ;
811import java .util .HashMap ;
@@ -97,7 +100,7 @@ static boolean hasStateStoreConfig(String stateStoreName, Config config) {
97100 }
98101
99102 static boolean hasDLQConfig (String channelName , Config config ) {
100- String propertyKey = getChannelPropertyKey (channelName , "failure-strategy" , true );
103+ String propertyKey = getChannelIncomingPropertyName (channelName , "failure-strategy" );
101104 Optional <String > channelFailureStrategy = config .getOptionalValue (propertyKey , String .class );
102105 Optional <String > failureStrategy = channelFailureStrategy .or (() -> getConnectorProperty ("failure-strategy" , config ));
103106
@@ -121,16 +124,6 @@ private static List<String> getChannelProperties(String keySuffix, Config config
121124 return values ;
122125 }
123126
124- static String channelPropertyFormat = "mp.messaging.%s.%s.%s" ;
125-
126- static String getChannelPropertyKey (String channelName , String propertyName , boolean incoming ) {
127- if ((channelName .charAt (0 ) != '"' || channelName .charAt (channelName .length () - 1 ) != '"' )
128- && channelName .contains ("." )) {
129- channelName = "\" " + channelName + "\" " ;
130- }
131- return String .format (channelPropertyFormat , incoming ? "incoming" : "outgoing" , channelName , propertyName );
132- }
133-
134127 @ BuildStep
135128 public void checkpointRedis (BuildProducer <AdditionalBeanBuildItem > additionalBean ,
136129 BuildProducer <ReflectiveClassBuildItem > reflectiveClass ,
@@ -219,7 +212,7 @@ void disableGracefulShutdown(List<ConnectorManagedChannelBuildItem> channelsMana
219212 if (!discoveryState .isKafkaConnector (channelsManagedByConnectors , incoming , channelName )) {
220213 continue ;
221214 }
222- String key = getChannelPropertyKey (channelName , incoming ? "graceful-shutdown" : "close-timeout" , incoming );
215+ String key = getChannelPropertyName (channelName , incoming ? "graceful-shutdown" : "close-timeout" , incoming );
223216 discoveryState .ifNotYetConfigured (key , () -> {
224217 defaultConfigProducer .produce (new RunTimeConfigurationDefaultBuildItem (key , incoming ? "false" : "0" ));
225218 });
@@ -259,9 +252,9 @@ void discoverDefaultSerdeConfig(DefaultSerdeDiscoveryState discovery,
259252 Type outgoingType = getOutgoingTypeFromMethod (method );
260253 processOutgoingType (discovery , outgoingType , (keySerializer , valueSerializer ) -> {
261254 produceRuntimeConfigurationDefaultBuildItem (discovery , config ,
262- getChannelPropertyKey (channelName , "key.serializer" , false ), keySerializer );
255+ getChannelOutgoingPropertyName (channelName , "key.serializer" ), keySerializer );
263256 produceRuntimeConfigurationDefaultBuildItem (discovery , config ,
264- getChannelPropertyKey (channelName , "value.serializer" , false ), valueSerializer );
257+ getChannelOutgoingPropertyName (channelName , "value.serializer" ), valueSerializer );
265258
266259 handleAdditionalProperties (channelName , false , discovery , config , keySerializer , valueSerializer );
267260 }, generatedClass , reflection , alreadyGeneratedSerializers );
@@ -291,9 +284,9 @@ void discoverDefaultSerdeConfig(DefaultSerdeDiscoveryState discovery,
291284 Type replyType = injectionPointType .asParameterizedType ().arguments ().get (1 );
292285 processOutgoingType (discovery , requestType , (keySerializer , valueSerializer ) -> {
293286 produceRuntimeConfigurationDefaultBuildItem (discovery , config ,
294- getChannelPropertyKey (channelName , "key.serializer" , false ), keySerializer );
287+ getChannelOutgoingPropertyName (channelName , "key.serializer" ), keySerializer );
295288 produceRuntimeConfigurationDefaultBuildItem (discovery , config ,
296- getChannelPropertyKey (channelName , "value.serializer" , false ), valueSerializer );
289+ getChannelOutgoingPropertyName (channelName , "value.serializer" ), valueSerializer );
297290 }, generatedClass , reflection , alreadyGeneratedSerializers );
298291 extractKeyValueType (replyType , (key , value , isBatchType ) -> {
299292 Result keyDeserializer = deserializerFor (discovery , key , true , channelName , generatedClass , reflection ,
@@ -302,18 +295,18 @@ void discoverDefaultSerdeConfig(DefaultSerdeDiscoveryState discovery,
302295 alreadyGeneratedDeserializers , alreadyGeneratedSerializers );
303296
304297 produceRuntimeConfigurationDefaultBuildItem (discovery , config ,
305- getChannelPropertyKey (channelName , "reply.key.deserializer" , false ), keyDeserializer );
298+ getChannelOutgoingPropertyName (channelName , "reply.key.deserializer" ), keyDeserializer );
306299 produceRuntimeConfigurationDefaultBuildItem (discovery , config ,
307- getChannelPropertyKey (channelName , "reply.value.deserializer" , false ), valueDeserializer );
300+ getChannelOutgoingPropertyName (channelName , "reply.value.deserializer" ), valueDeserializer );
308301 handleAdditionalProperties (channelName , false , discovery , config , keyDeserializer , valueDeserializer );
309302 });
310303 } else {
311304 Type outgoingType = getOutgoingTypeFromChannelInjectionPoint (injectionPointType );
312305 processOutgoingType (discovery , outgoingType , (keySerializer , valueSerializer ) -> {
313306 produceRuntimeConfigurationDefaultBuildItem (discovery , config ,
314- getChannelPropertyKey (channelName , "key.serializer" , false ), keySerializer );
307+ getChannelOutgoingPropertyName (channelName , "key.serializer" ), keySerializer );
315308 produceRuntimeConfigurationDefaultBuildItem (discovery , config ,
316- getChannelPropertyKey (channelName , "value.serializer" , false ), valueSerializer );
309+ getChannelOutgoingPropertyName (channelName , "value.serializer" ), valueSerializer );
317310
318311 handleAdditionalProperties (channelName , false , discovery , config , keySerializer , valueSerializer );
319312 }, generatedClass , reflection , alreadyGeneratedSerializers );
@@ -324,9 +317,9 @@ void discoverDefaultSerdeConfig(DefaultSerdeDiscoveryState discovery,
324317 private void processKafkaTransactions (DefaultSerdeDiscoveryState discovery ,
325318 BuildProducer <RunTimeConfigurationDefaultBuildItem > config , String channelName , Type injectionPointType ) {
326319 if (injectionPointType != null && isKafkaTransactionsEmitter (injectionPointType )) {
327- String transactionalIdKey = getChannelPropertyKey (channelName , "transactional.id" , false );
328- String enableIdempotenceKey = getChannelPropertyKey (channelName , "enable.idempotence" , false );
329- String acksKey = getChannelPropertyKey (channelName , "acks" , false );
320+ String transactionalIdKey = getChannelOutgoingPropertyName (channelName , "transactional.id" );
321+ String enableIdempotenceKey = getChannelOutgoingPropertyName (channelName , "enable.idempotence" );
322+ String acksKey = getChannelOutgoingPropertyName (channelName , "acks" );
330323 LOGGER .infof ("Transactional producer detected for channel '%s', setting following default config values: "
331324 + "'" + transactionalIdKey + "=${quarkus.application.name}-${channelName}', "
332325 + "'" + enableIdempotenceKey + "=true', "
@@ -349,12 +342,12 @@ private void processIncomingType(DefaultSerdeDiscoveryState discovery,
349342 alreadyGeneratedDeserializers , alreadyGeneratedSerializers );
350343
351344 produceRuntimeConfigurationDefaultBuildItem (discovery , config ,
352- getChannelPropertyKey (channelName , "key.deserializer" , true ), keyDeserializer );
345+ getChannelIncomingPropertyName (channelName , "key.deserializer" ), keyDeserializer );
353346 produceRuntimeConfigurationDefaultBuildItem (discovery , config ,
354- getChannelPropertyKey (channelName , "value.deserializer" , true ), valueDeserializer );
347+ getChannelIncomingPropertyName (channelName , "value.deserializer" ), valueDeserializer );
355348 if (Boolean .TRUE .equals (isBatchType )) {
356349 produceRuntimeConfigurationDefaultBuildItem (discovery , config ,
357- getChannelPropertyKey (channelName , "batch" , true ), "true" );
350+ getChannelIncomingPropertyName (channelName , "batch" ), "true" );
358351 }
359352
360353 handleAdditionalProperties (channelName , true , discovery , config , keyDeserializer , valueDeserializer );
@@ -385,7 +378,7 @@ private void handleAdditionalProperties(String channelName, boolean incoming, De
385378 }
386379
387380 result .additionalProperties .forEach ((key , value ) -> {
388- String configKey = getChannelPropertyKey (channelName , key , incoming );
381+ String configKey = getChannelPropertyName (channelName , key , incoming );
389382 produceRuntimeConfigurationDefaultBuildItem (discovery , config , configKey , value );
390383 });
391384 }
@@ -1101,7 +1094,7 @@ private void processAnnotationsForReflectiveClassPayload(IndexView index, Config
11011094 }
11021095
11031096 private boolean isSerdeJson (IndexView index , Config config , String channelName , boolean serializer , boolean isKey ) {
1104- String configKey = getChannelPropertyKey (channelName , (isKey ? "key" : "value" ) + "." +
1097+ String configKey = getChannelPropertyName (channelName , (isKey ? "key" : "value" ) + "." +
11051098 (serializer ? "serializer" : "deserializer" ), !serializer );
11061099 ConfigValue configValue = config .getConfigValue (configKey );
11071100 if (configValue .getValue () != null ) {
0 commit comments