|
1 | 1 | /* |
2 | | - * Copyright 2002-2017 the original author or authors. |
| 2 | + * Copyright 2002-2018 the original author or authors. |
3 | 3 | * |
4 | 4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | 5 | * you may not use this file except in compliance with the License. |
|
17 | 17 | package org.springframework.amqp.rabbit.core; |
18 | 18 |
|
19 | 19 | import static org.hamcrest.Matchers.containsString; |
| 20 | +import static org.hamcrest.Matchers.equalTo; |
20 | 21 | import static org.junit.Assert.assertEquals; |
21 | 22 | import static org.junit.Assert.assertSame; |
22 | 23 | import static org.junit.Assert.assertThat; |
|
25 | 26 | import static org.mockito.ArgumentMatchers.anyBoolean; |
26 | 27 | import static org.mockito.ArgumentMatchers.anyString; |
27 | 28 | import static org.mockito.ArgumentMatchers.isNull; |
| 29 | +import static org.mockito.BDDMockito.given; |
| 30 | +import static org.mockito.BDDMockito.willReturn; |
28 | 31 | import static org.mockito.Mockito.doAnswer; |
29 | 32 | import static org.mockito.Mockito.mock; |
30 | 33 | import static org.mockito.Mockito.times; |
31 | 34 | import static org.mockito.Mockito.verify; |
32 | 35 | import static org.mockito.Mockito.when; |
33 | 36 | import static org.mockito.Mockito.withSettings; |
34 | 37 |
|
| 38 | +import java.util.Collections; |
35 | 39 | import java.util.HashMap; |
36 | 40 | import java.util.Map; |
37 | 41 | import java.util.concurrent.ExecutorService; |
|
48 | 52 | import org.springframework.amqp.core.Address; |
49 | 53 | import org.springframework.amqp.core.Message; |
50 | 54 | import org.springframework.amqp.core.MessageProperties; |
| 55 | +import org.springframework.amqp.core.Queue; |
51 | 56 | import org.springframework.amqp.core.ReceiveAndReplyCallback; |
52 | 57 | import org.springframework.amqp.rabbit.connection.AbstractRoutingConnectionFactory; |
53 | 58 | import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; |
| 59 | +import org.springframework.amqp.rabbit.connection.ChannelProxy; |
54 | 60 | import org.springframework.amqp.rabbit.connection.PublisherCallbackChannelConnectionFactory; |
55 | 61 | import org.springframework.amqp.rabbit.connection.SimpleRoutingConnectionFactory; |
56 | 62 | import org.springframework.amqp.rabbit.connection.SingleConnectionFactory; |
57 | 63 | import org.springframework.amqp.rabbit.support.PublisherCallbackChannel; |
58 | 64 | import org.springframework.amqp.support.converter.SimpleMessageConverter; |
59 | 65 | import org.springframework.amqp.utils.SerializationUtils; |
| 66 | +import org.springframework.context.ApplicationContext; |
60 | 67 | import org.springframework.expression.Expression; |
61 | 68 | import org.springframework.expression.spel.standard.SpelExpressionParser; |
62 | 69 | import org.springframework.retry.support.RetryTemplate; |
|
74 | 81 | import com.rabbitmq.client.Consumer; |
75 | 82 | import com.rabbitmq.client.Envelope; |
76 | 83 | import com.rabbitmq.client.impl.AMQImpl; |
| 84 | +import com.rabbitmq.client.impl.AMQImpl.Queue.DeclareOk; |
77 | 85 |
|
78 | 86 | /** |
79 | 87 | * @author Gary Russell |
@@ -327,4 +335,65 @@ public void testRoutingConnectionFactory() throws Exception { |
327 | 335 | Mockito.verify(connectionFactory2, Mockito.times(4)).createConnection(); |
328 | 336 | } |
329 | 337 |
|
| 338 | + @Test |
| 339 | + public void testNestedTxBinding() throws Exception { |
| 340 | + ConnectionFactory cf = mock(ConnectionFactory.class); |
| 341 | + Connection connection = mock(Connection.class); |
| 342 | + Channel channel1 = mock(Channel.class, "channel1"); |
| 343 | + given(channel1.isOpen()).willReturn(true); |
| 344 | + Channel channel2 = mock(Channel.class, "channel2"); |
| 345 | + given(channel2.isOpen()).willReturn(true); |
| 346 | + willReturn(connection).given(cf).newConnection(any(ExecutorService.class), anyString()); |
| 347 | + given(connection.isOpen()).willReturn(true); |
| 348 | + given(connection.createChannel()).willReturn(channel1, channel2); |
| 349 | + DeclareOk dok = new DeclareOk("foo", 0, 0); |
| 350 | + willReturn(dok).given(channel1).queueDeclare(anyString(), anyBoolean(), anyBoolean(), anyBoolean(), isNull()); |
| 351 | + CachingConnectionFactory ccf = new CachingConnectionFactory(cf); |
| 352 | + ccf.setExecutor(mock(ExecutorService.class)); |
| 353 | + RabbitTemplate rabbitTemplate = new RabbitTemplate(ccf); |
| 354 | + rabbitTemplate.setChannelTransacted(true); |
| 355 | + RabbitAdmin admin = new RabbitAdmin(rabbitTemplate); |
| 356 | + ApplicationContext ac = mock(ApplicationContext.class); |
| 357 | + willReturn(Collections.singletonMap("foo", new Queue("foo"))).given(ac).getBeansOfType(Queue.class); |
| 358 | + admin.setApplicationContext(ac); |
| 359 | + admin.afterPropertiesSet(); |
| 360 | + AtomicReference<Channel> templateChannel = new AtomicReference<>(); |
| 361 | + new TransactionTemplate(new TestTransactionManager()).execute(s -> { |
| 362 | + return rabbitTemplate.execute(c -> { |
| 363 | + templateChannel.set(c); |
| 364 | + return true; |
| 365 | + }); |
| 366 | + }); |
| 367 | + verify(channel1).txSelect(); |
| 368 | + verify(channel1).queueDeclare(anyString(), anyBoolean(), anyBoolean(), anyBoolean(), isNull()); |
| 369 | + assertThat(((ChannelProxy) templateChannel.get()).getTargetChannel(), equalTo(channel1)); |
| 370 | + verify(channel1).txCommit(); |
| 371 | + } |
| 372 | + |
| 373 | + @SuppressWarnings("serial") |
| 374 | + private class TestTransactionManager extends AbstractPlatformTransactionManager { |
| 375 | + |
| 376 | + TestTransactionManager() { |
| 377 | + super(); |
| 378 | + } |
| 379 | + |
| 380 | + @Override |
| 381 | + protected void doBegin(Object transaction, TransactionDefinition definition) throws TransactionException { |
| 382 | + } |
| 383 | + |
| 384 | + @Override |
| 385 | + protected void doCommit(DefaultTransactionStatus status) throws TransactionException { |
| 386 | + } |
| 387 | + |
| 388 | + @Override |
| 389 | + protected Object doGetTransaction() throws TransactionException { |
| 390 | + return new Object(); |
| 391 | + } |
| 392 | + |
| 393 | + @Override |
| 394 | + protected void doRollback(DefaultTransactionStatus status) throws TransactionException { |
| 395 | + } |
| 396 | + |
| 397 | + } |
| 398 | + |
330 | 399 | } |
0 commit comments