diff --git a/wres-events/src/wres/events/broker/BrokerConnectionFactory.java b/wres-events/src/wres/events/broker/BrokerConnectionFactory.java index bce84524a5..02a41b8d02 100644 --- a/wres-events/src/wres/events/broker/BrokerConnectionFactory.java +++ b/wres-events/src/wres/events/broker/BrokerConnectionFactory.java @@ -45,12 +45,12 @@ public class BrokerConnectionFactory implements Supplier /** The maximum number of times a message can be resent on failure. */ private final Integer maximumMessageRetries; - /** Context that maps JMS objects to names. */ - private final Context context; - /** A connection factory. */ private final ConnectionFactory connectionFactory; + /** The connection properties. */ + private final Properties properties; + /** *

Returns an instance of a factory, which is created with supplied properties and a default number of message * retries, {@link #DEFAULT_MAXIMUM_MESSAGE_RETRIES}. @@ -120,7 +120,18 @@ public Destination getDestination( String name ) throws NamingException { Objects.requireNonNull( name ); - return ( Destination ) this.context.lookup( name ); + String connectionPropertyName = BrokerUtilities.getConnectionPropertyName( this.properties ); + BrokerUtilities.testConnectionProperty( connectionPropertyName, this.properties ); + String connectionUrl = this.properties.getProperty( connectionPropertyName ); + Context context = BrokerConnectionFactory.getContextFromProperties( connectionUrl, this.properties ); + try + { + return ( Destination ) context.lookup( name ); + } + finally + { + BrokerConnectionFactory.closeContext( context ); + } } /** @@ -212,26 +223,55 @@ static void testConnection( Properties properties, int retries ) } finally { - // Close any connection that succeeded - if ( Objects.nonNull( connection ) ) - { - LOGGER.info( "Established a connection to an AMQP message broker at binding URL: {}.", - connectionUrl ); - - try - { - connection.close(); - } - catch ( JMSException e ) - { - LOGGER.error( "Failed to close an attempted connection during the instantiation of a " - + "connection factory." ); - } - } + BrokerConnectionFactory.closeConnection( connection, connectionUrl ); + BrokerConnectionFactory.closeContext( localContext ); } } } + /** + * Closes a connection. + * + * @param connection the connection, possibly null + * @param connectionUrl the connection URL for logging + */ + private static void closeConnection( Connection connection, String connectionUrl ) + { + // Close any connection that succeeded + if ( Objects.nonNull( connection ) ) + { + LOGGER.info( "Established a connection to an AMQP message broker at binding URL: {}.", + connectionUrl ); + + try + { + connection.close(); + } + catch ( JMSException e ) + { + LOGGER.error( "Failed to close an attempted connection during the instantiation of a " + + "connection factory. The correction URL is: {}.", connectionUrl ); + } + } + } + + /** + * Closes the supplied context. + * @param context the context + */ + + private static void closeContext( Context context ) + { + try + { + context.close(); + } + catch ( NamingException e ) + { + LOGGER.error( "Failed to close the context for a connection." ); + } + } + /** * Returns the context from the properties. * @param connectionUrl the connection string to help with exception messaging @@ -313,41 +353,35 @@ private BrokerConnectionFactory( Properties properties, } this.maximumMessageRetries = maximumMessageRetries; + this.properties = properties; // The connection property name String connectionPropertyName = BrokerUtilities.getConnectionPropertyName( properties ); // Set any variables that depend on the (possibly adjusted) properties - try - { - LOGGER.debug( "Creating a connection factory with these properties: {}.", properties ); - // Test - String connectionString = properties.getProperty( connectionPropertyName ); - this.context = new InitialContext( properties ); - this.connectionFactory = BrokerConnectionFactory.getConnectionFactory( connectionString, - this.context, - connectionPropertyName ); + LOGGER.debug( "Creating a connection factory with these properties: {}.", properties ); - LOGGER.debug( "Testing the connection property {} with corresponding connection string {}.", - connectionPropertyName, - connectionString ); + // Test + String connectionString = properties.getProperty( connectionPropertyName ); + Context context = BrokerConnectionFactory.getContextFromProperties( connectionString, properties ); + this.connectionFactory = BrokerConnectionFactory.getConnectionFactory( connectionString, + context, + connectionPropertyName ); - BrokerConnectionFactory.testConnection( properties, - BrokerConnectionFactory.MAXIMUM_CONNECTION_RETRIES ); + LOGGER.debug( "Testing the connection property {} with corresponding connection string {}.", + connectionPropertyName, + connectionString ); - if ( LOGGER.isInfoEnabled() ) - { - LOGGER.info( "Created a broker connection factory {} with name {} and binding URL {}.", - this, - connectionPropertyName, - properties.getProperty( connectionPropertyName ) ); - } - } - catch ( NamingException e ) + BrokerConnectionFactory.testConnection( properties, + BrokerConnectionFactory.MAXIMUM_CONNECTION_RETRIES ); + + if ( LOGGER.isInfoEnabled() ) { - throw new CouldNotLoadBrokerConfigurationException( "Unable to load the expected broker configuration.", - e ); + LOGGER.info( "Created a broker connection factory {} with name {} and binding URL {}.", + this, + connectionPropertyName, + properties.getProperty( connectionPropertyName ) ); } } diff --git a/wres-events/test/wres/events/broker/BrokerConnectionFactoryTest.java b/wres-events/test/wres/events/broker/BrokerConnectionFactoryTest.java index 5608a7b273..8e12eeacce 100644 --- a/wres-events/test/wres/events/broker/BrokerConnectionFactoryTest.java +++ b/wres-events/test/wres/events/broker/BrokerConnectionFactoryTest.java @@ -1,6 +1,7 @@ package wres.events.broker; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import java.io.IOException; @@ -38,17 +39,17 @@ class BrokerConnectionFactoryTest void testMessageRouting() throws IOException, NamingException, JMSException, InterruptedException { Properties properties = BrokerUtilities.getBrokerConnectionProperties( "eventbroker.properties" ); - + // Create and start the broker, clean up on completion - try ( EmbeddedBroker ignored = EmbeddedBroker.of( properties, true ) ) + try ( EmbeddedBroker ignored = EmbeddedBroker.of( properties, true ) ) { BrokerConnectionFactory factory = BrokerConnectionFactory.of( properties, 2 ); - Topic evaluationTopic = (Topic) factory.getDestination( "evaluation" ); - Topic evaluationStatusTopic = (Topic) factory.getDestination( "status" ); - Topic statisticsTopic = (Topic) factory.getDestination( "statistics" ); + Topic evaluationTopic = ( Topic ) factory.getDestination( "evaluation" ); + Topic evaluationStatusTopic = ( Topic ) factory.getDestination( "status" ); + Topic statisticsTopic = ( Topic ) factory.getDestination( "statistics" ); String evaluationId = "1234567"; - + // Application/JMS-level message selection based on correlation id String messageSelector = "JMSCorrelationID = '" + evaluationId + "'"; @@ -69,8 +70,8 @@ void testMessageRouting() throws IOException, NamingException, JMSException, Int // Listen for evaluation messages MessageListener evaluationListener = message -> { - TextMessage textMessage = (TextMessage) message; - + TextMessage textMessage = ( TextMessage ) message; + try { assertEquals( "I am an evaluation message!", textMessage.getText() ); @@ -79,15 +80,15 @@ void testMessageRouting() throws IOException, NamingException, JMSException, Int { throw new IllegalStateException( e ); } - + LOGGER.info( "Received an evaluation message {}", textMessage ); evaluationConsumerCount.countDown(); }; // Listen for evaluation status messages MessageListener evaluationStatusListener = message -> { - TextMessage textMessage = (TextMessage) message; - + TextMessage textMessage = ( TextMessage ) message; + try { assertEquals( "I am an evaluation status message!", textMessage.getText() ); @@ -96,15 +97,15 @@ void testMessageRouting() throws IOException, NamingException, JMSException, Int { throw new IllegalStateException( e ); } - + LOGGER.info( "Received an evaluation status message {}", textMessage ); evaluationStatusConsumerCount.countDown(); }; // Listen for statistics messages MessageListener evaluationStatisticsListener = message -> { - TextMessage textMessage = (TextMessage) message; - + TextMessage textMessage = ( TextMessage ) message; + try { assertEquals( "I am a statistics message!", textMessage.getText() ); @@ -113,7 +114,7 @@ void testMessageRouting() throws IOException, NamingException, JMSException, Int { throw new IllegalStateException( e ); } - + LOGGER.info( "Received a statistics message {}", textMessage ); statisticsConsumerCount.countDown(); }; @@ -148,5 +149,22 @@ void testMessageRouting() throws IOException, NamingException, JMSException, Int } } + @Test + void testConnectionSucceedsWhenPropertiesAreCorrect() + { + Properties properties = BrokerUtilities.getBrokerConnectionProperties( "eventbroker.properties" ); + assertThrows( BrokerConnectionException.class, () -> BrokerConnectionFactory.testConnection( properties, 0 ) ); + } + + @Test + void testConnectionFailsWhenPropertiesAreIncorrect() + { + Properties properties = BrokerUtilities.getBrokerConnectionProperties( "eventbroker.properties" ); + + String connectionPropertyName = BrokerUtilities.getConnectionPropertyName( properties ); + // Replace resolved property with url/port that is guaranteed not to have a broker running + properties.put( connectionPropertyName, "amqp://localhost:-1" ); + assertThrows( BrokerConnectionException.class, () -> BrokerConnectionFactory.testConnection( properties, 0 ) ); + } }