1
+ /*
2
+ * Copyright 2005-2017 Red Hat, Inc.
3
+ *
4
+ * Red Hat licenses this file to you under the Apache License, version
5
+ * 2.0 (the "License"); you may not use this file except in compliance
6
+ * with the License. You may obtain a copy of the License at
7
+ *
8
+ * http://www.apache.org/licenses/LICENSE-2.0
9
+ *
10
+ * Unless required by applicable law or agreed to in writing, software
11
+ * distributed under the License is distributed on an "AS IS" BASIS,
12
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
13
+ * implied. See the License for the specific language governing
14
+ * permissions and limitations under the License.
15
+ */
16
+ package com .redhat .refarch .amq7 ;
17
+
18
+ import org .apache .activemq .artemis .api .core .management .ResourceNames ;
19
+ import org .apache .activemq .artemis .api .jms .ActiveMQJMSClient ;
20
+ import org .apache .activemq .artemis .api .jms .management .JMSManagementHelper ;
21
+ import org .junit .Assert ;
22
+ import org .slf4j .Logger ;
23
+ import org .slf4j .LoggerFactory ;
24
+
25
+ import javax .jms .*;
26
+ import javax .naming .InitialContext ;
27
+ import java .lang .IllegalStateException ;
28
+ import java .util .stream .IntStream ;
29
+
30
+ import static com .redhat .refarch .amq7 .Constants .*;
31
+
32
+ /***
33
+
34
+ */
35
+ public class BrokerDelegate {
36
+
37
+ private static final Logger logger = LoggerFactory .getLogger (BrokerDelegate .class );
38
+
39
+ private Session session ;
40
+ private Connection connection ;
41
+
42
+ private MessageConsumer queueConsumer ;
43
+ private MessageConsumer topicConsumer ;
44
+
45
+ private MessageProducer queueProducer ;
46
+ private MessageProducer topicProducer ;
47
+
48
+ public BrokerDelegate (InitialContext initialContext , String brokerName , boolean autoAck , boolean consumeQueue , boolean consumeTopic ,
49
+ boolean produceToQueue , boolean produceToTopic ) throws Exception {
50
+
51
+ ConnectionFactory connectionFactory = (ConnectionFactory ) initialContext .lookup (brokerName + "/ConnectionFactory" );
52
+ connection = connectionFactory .createConnection (USERNAME .val (), PASSWORD .val ());
53
+ session = connection .createSession (false , autoAck ? Session .AUTO_ACKNOWLEDGE : Session .CLIENT_ACKNOWLEDGE );
54
+ connection .start ();
55
+
56
+ Queue queue = (Queue ) initialContext .lookup ("queue/" + QUEUE_NAME .val ());
57
+ Topic topic = (Topic ) initialContext .lookup ("topic/" + TOPIC_NAME .val ());
58
+
59
+ if (consumeQueue )
60
+ queueConsumer = session .createConsumer (queue );
61
+
62
+ if (consumeTopic )
63
+ topicConsumer = session .createConsumer (topic );
64
+
65
+ if (produceToQueue )
66
+ queueProducer = session .createProducer (queue );
67
+
68
+ if (produceToTopic )
69
+ topicProducer = session .createProducer (topic );
70
+ }
71
+
72
+ public void sendToQueue (Integer count ) throws Exception {
73
+
74
+ if (queueProducer == null )
75
+ throw new IllegalStateException ("cannot publish to queue with null publisher" );
76
+
77
+ IntStream .rangeClosed (1 , count ).forEach (i -> {
78
+ try {
79
+ queueProducer .send (session .createTextMessage ("test" ));
80
+
81
+ } catch (Exception e ) {
82
+ logger .error ("error sending messages to queue" , e );
83
+ Assert .fail ();
84
+ }
85
+ });
86
+ }
87
+
88
+ public void sendToTopic (Integer count ) throws Exception {
89
+
90
+ if (topicProducer == null )
91
+ throw new IllegalStateException ("cannot publish to topic with null publisher" );
92
+
93
+ IntStream .rangeClosed (1 , count ).forEach (i -> {
94
+ try {
95
+ topicProducer .send (session .createTextMessage ("test" ));
96
+
97
+ } catch (Exception e ) {
98
+ logger .error ("error sending messages" , e );
99
+ Assert .fail ();
100
+ }
101
+ });
102
+ }
103
+
104
+ public void receiveFromQueue (Integer count ) throws Exception {
105
+
106
+ if (queueConsumer == null )
107
+ throw new IllegalStateException ("cannot receive from queue with null consumer" );
108
+
109
+ IntStream .rangeClosed (1 , count ).forEach (i -> {
110
+ try {
111
+ Assert .assertNotNull ("received message is null" , queueConsumer .receive (Long .valueOf (TIMEOUT .val ())));
112
+ } catch (Exception e ) {
113
+ logger .error ("error sending messages" , e );
114
+ Assert .fail ();
115
+ }
116
+ });
117
+ }
118
+
119
+ public void receiveFromQueue () throws Exception {
120
+ receiveFromQueue (1 );
121
+ }
122
+
123
+ public void ackFromQueue (Integer count ) throws Exception {
124
+
125
+ if (queueConsumer == null )
126
+ throw new IllegalStateException ("cannot receive from queue with null consumer" );
127
+
128
+ IntStream .rangeClosed (1 , count ).forEach (i -> {
129
+ try {
130
+ Message message = queueConsumer .receive (Long .valueOf (TIMEOUT .val ()));
131
+ Assert .assertNotNull ("received message is null" , message );
132
+ message .acknowledge ();
133
+
134
+ } catch (Exception e ) {
135
+ logger .error ("error sending messages" , e );
136
+ Assert .fail ();
137
+ }
138
+ });
139
+ }
140
+
141
+ public void ackFromQueue () throws Exception {
142
+ ackFromQueue (1 );
143
+ }
144
+
145
+ public void receiveFromTopic (Integer count ) throws Exception {
146
+
147
+ if (topicConsumer == null )
148
+ throw new IllegalStateException ("cannot receive from topic with null consumer" );
149
+
150
+ IntStream .rangeClosed (1 , count ).forEach (i -> {
151
+ try {
152
+ Assert .assertNotNull ("rcvd message is null" , topicConsumer .receive (Long .valueOf (TIMEOUT .val ())));
153
+ } catch (Exception e ) {
154
+ logger .error ("error sending messages" , e );
155
+ Assert .fail ();
156
+ }
157
+ });
158
+ }
159
+
160
+ public void sendShutdown () throws Exception {
161
+
162
+ QueueSession queueSession = ((QueueConnection ) connection ).createQueueSession (false , Session .AUTO_ACKNOWLEDGE );
163
+ QueueRequestor requestor = new QueueRequestor (queueSession , ActiveMQJMSClient .createQueue ("activemq.management" ));
164
+
165
+ Message message = queueSession .createMessage ();
166
+ JMSManagementHelper .putOperationInvocation (message , ResourceNames .BROKER , "forceFailover" );
167
+ try {
168
+ requestor .request (message );
169
+ } catch (JMSException e ) {
170
+ if (!e .getLocalizedMessage ().startsWith ("AMQ119016" )) {
171
+ // we killed the broker - a connection failure is expected, throw anything else
172
+ throw e ;
173
+ }
174
+ }
175
+ }
176
+
177
+ public MessageConsumer queueConsumer () {
178
+ return queueConsumer ;
179
+ }
180
+ }
0 commit comments