|
20 | 20 | import com.rabbitmq.client.Command;
|
21 | 21 | import com.rabbitmq.client.Connection;
|
22 | 22 | import com.rabbitmq.client.ConnectionFactory;
|
| 23 | +import com.rabbitmq.client.DefaultConsumer; |
| 24 | +import com.rabbitmq.client.Envelope; |
23 | 25 | import com.rabbitmq.client.TrafficListener;
|
24 | 26 | import org.junit.Test;
|
25 | 27 | import org.junit.runner.RunWith;
|
26 | 28 | import org.junit.runners.Parameterized;
|
27 | 29 |
|
| 30 | +import java.io.IOException; |
28 | 31 | import java.util.List;
|
29 | 32 | import java.util.UUID;
|
30 | 33 | import java.util.concurrent.CopyOnWriteArrayList;
|
31 | 34 | import java.util.concurrent.CountDownLatch;
|
32 | 35 | import java.util.concurrent.TimeUnit;
|
33 |
| -import java.util.function.Consumer; |
34 | 36 |
|
35 | 37 | import static org.junit.Assert.assertEquals;
|
36 | 38 | import static org.junit.Assert.assertTrue;
|
|
42 | 44 | public class TrafficListenerTest {
|
43 | 45 |
|
44 | 46 | @Parameterized.Parameter
|
45 |
| - public Consumer<ConnectionFactory> configurator; |
| 47 | + public ConnectionFactoryConfigurator configurator; |
46 | 48 |
|
47 | 49 | @Parameterized.Parameters
|
48 | 50 | public static Object[] data() {
|
49 | 51 | return new Object[] { automaticRecoveryEnabled(), automaticRecoveryDisabled() };
|
50 | 52 | }
|
51 | 53 |
|
52 |
| - static Consumer<ConnectionFactory> automaticRecoveryEnabled() { |
53 |
| - return cf -> cf.setAutomaticRecoveryEnabled(true); |
| 54 | + static ConnectionFactoryConfigurator automaticRecoveryEnabled() { |
| 55 | + return new ConnectionFactoryConfigurator() { |
| 56 | + |
| 57 | + @Override |
| 58 | + public void configure(ConnectionFactory cf) { |
| 59 | + cf.setAutomaticRecoveryEnabled(true); |
| 60 | + } |
| 61 | + }; |
54 | 62 | }
|
55 | 63 |
|
56 |
| - static Consumer<ConnectionFactory> automaticRecoveryDisabled() { |
57 |
| - return cf -> cf.setAutomaticRecoveryEnabled(false); |
| 64 | + static ConnectionFactoryConfigurator automaticRecoveryDisabled() { |
| 65 | + return new ConnectionFactoryConfigurator() { |
| 66 | + |
| 67 | + @Override |
| 68 | + public void configure(ConnectionFactory cf) { |
| 69 | + cf.setAutomaticRecoveryEnabled(false); |
| 70 | + } |
| 71 | + }; |
58 | 72 | }
|
59 | 73 |
|
60 | 74 | @Test
|
61 | 75 | public void trafficListenerIsCalled() throws Exception {
|
62 | 76 | ConnectionFactory cf = TestUtils.connectionFactory();
|
63 | 77 | TestTrafficListener testTrafficListener = new TestTrafficListener();
|
64 | 78 | cf.setTrafficListener(testTrafficListener);
|
65 |
| - configurator.accept(cf); |
66 |
| - try (Connection c = cf.newConnection()) { |
| 79 | + configurator.configure(cf); |
| 80 | + Connection c = cf.newConnection(); |
| 81 | + try { |
67 | 82 | Channel ch = c.createChannel();
|
68 | 83 | String queue = ch.queueDeclare().getQueue();
|
69 |
| - CountDownLatch latch = new CountDownLatch(1); |
70 |
| - ch.basicConsume(queue, true, |
71 |
| - (consumerTag, message) -> latch.countDown(), consumerTag -> { |
72 |
| - }); |
| 84 | + final CountDownLatch latch = new CountDownLatch(1); |
| 85 | + ch.basicConsume(queue, true, new DefaultConsumer(ch) { |
| 86 | + |
| 87 | + @Override |
| 88 | + public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { |
| 89 | + latch.countDown(); |
| 90 | + } |
| 91 | + } |
| 92 | + ); |
73 | 93 | String messageContent = UUID.randomUUID().toString();
|
74 | 94 | ch.basicPublish("", queue, null, messageContent.getBytes());
|
75 | 95 | assertTrue(latch.await(5, TimeUnit.SECONDS));
|
76 | 96 | assertEquals(1, testTrafficListener.outboundContent.size());
|
77 | 97 | assertEquals(messageContent, testTrafficListener.outboundContent.get(0));
|
78 | 98 | assertEquals(1, testTrafficListener.inboundContent.size());
|
79 | 99 | assertEquals(messageContent, testTrafficListener.inboundContent.get(0));
|
| 100 | + } finally { |
| 101 | + TestUtils.close(c); |
80 | 102 | }
|
81 | 103 | }
|
82 | 104 |
|
| 105 | + interface ConnectionFactoryConfigurator { |
| 106 | + |
| 107 | + void configure(ConnectionFactory connectionFactory); |
| 108 | + } |
| 109 | + |
83 | 110 | private static class TestTrafficListener implements TrafficListener {
|
84 | 111 |
|
85 |
| - final List<String> outboundContent = new CopyOnWriteArrayList<>(); |
86 |
| - final List<String> inboundContent = new CopyOnWriteArrayList<>(); |
| 112 | + final List<String> outboundContent = new CopyOnWriteArrayList<String>(); |
| 113 | + final List<String> inboundContent = new CopyOnWriteArrayList<String>(); |
87 | 114 |
|
88 | 115 | @Override
|
89 | 116 | public void write(Command outboundCommand) {
|
|
0 commit comments