|
22 | 22 | */
|
23 | 23 | public class IotHubServicesCommon
|
24 | 24 | {
|
25 |
| - //if error injection message has not taken effect after 1 minute, the test will timeout |
26 |
| - private final static long ERROR_INJECTION_MESSAGE_EFFECT_TIMEOUT_MILLISECONDS = 60 * 1000; |
27 |
| - private final static String TEST_ASC_SECURITY_MESSAGE = "{ \"AgentVersion\": \"0.0.1\", " |
28 |
| - + "\"AgentId\" : \"{4C1B4747-E4C7-4681-B31D-4B39E390E7F8}\", " |
29 |
| - + "\"MessageSchemaVersion\" : \"1.0\", \"Events\" : " |
30 |
| - + " { \"EventType\": \"Security\", " |
31 |
| - + "\"Category\" : \"Periodic\", " |
32 |
| - + "\"Name\" : \"ListeningPorts\", " |
33 |
| - + "\"IsEmpty\" : true, " |
34 |
| - + "\"PayloadSchemaVersion\" : \"1.0\", " |
35 |
| - + "\"Id\" : \"%s\", " |
36 |
| - + "\"TimestampLocal\" : \"2012-04-23T18:25:43.511Z\", " |
37 |
| - + "\"TimestampUTC\" : \"2012-04-23T18:25:43.511Z\" }, " |
38 |
| - + "\"Payload\": { \"data\": \"test\" } } }"; |
39 |
| - |
40 | 25 | private static final int TIMEOUT_MILLISECONDS = 60 * 1000; // 1 minute
|
41 | 26 | private static final int CHECK_INTERVAL_MILLISECONDS = 300;
|
42 | 27 |
|
43 |
| - /* |
44 |
| - * method to send message over given DeviceClient |
45 |
| - */ |
46 |
| - public static void sendMessages(InternalClient client, |
47 |
| - IotHubClientProtocol protocol, |
48 |
| - List<MessageAndResult> messagesToSend, |
49 |
| - final long RETRY_MILLISECONDS, |
50 |
| - final long SEND_TIMEOUT_MILLISECONDS, |
51 |
| - long interMessageDelay, |
52 |
| - List<Pair<IotHubConnectionStatus, Throwable>> statusUpdates) throws IOException, InterruptedException, IotHubClientException |
53 |
| - { |
54 |
| - try |
55 |
| - { |
56 |
| - client.open(false); |
57 |
| - |
58 |
| - for (MessageAndResult messageAndResult : messagesToSend) |
59 |
| - { |
60 |
| - if ((protocol == IotHubClientProtocol.MQTT || protocol == IotHubClientProtocol.MQTT_WS) && isErrorInjectionMessage(messageAndResult)) |
61 |
| - { |
62 |
| - // error injection message will not be ack'd by service if sent over MQTT/MQTT_WS, so the SDK's |
63 |
| - // retry logic will try to send it again after the connection drops. By setting expiry time, |
64 |
| - // we ensure that error injection message isn't resent to service too many times. The message will still likely |
65 |
| - // be sent 3 or 4 times causing 3 or 4 disconnections, but the test should recover anyways. |
66 |
| - messageAndResult.message.setExpiryTime(1000); |
67 |
| - |
68 |
| - // Since the message won't be ack'd, then we don't need to validate the status code when this message's callback is fired |
69 |
| - messageAndResult.statusCode = null; |
70 |
| - } |
71 |
| - |
72 |
| - sendMessageAndWaitForResponse(client, messageAndResult, protocol); |
73 |
| - |
74 |
| - if (isErrorInjectionMessage(messageAndResult)) |
75 |
| - { |
76 |
| - //wait until error injection message takes affect before sending the next message |
77 |
| - long startTime = System.currentTimeMillis(); |
78 |
| - while (!actualStatusUpdatesContainsStatus(statusUpdates, IotHubConnectionStatus.DISCONNECTED_RETRYING)) |
79 |
| - { |
80 |
| - Thread.sleep(1000); |
81 |
| - |
82 |
| - // send the fault injection message again in case it wasn't sent successfully before |
83 |
| - sendMessageAndWaitForResponse(client, messageAndResult, protocol); |
84 |
| - |
85 |
| - if (System.currentTimeMillis() - startTime > ERROR_INJECTION_MESSAGE_EFFECT_TIMEOUT_MILLISECONDS) |
86 |
| - { |
87 |
| - Assert.fail(buildExceptionMessage("Sending message over " + protocol + " protocol failed: Error injection message never caused connection to be lost", client)); |
88 |
| - } |
89 |
| - } |
90 |
| - } |
91 |
| - else |
92 |
| - { |
93 |
| - Thread.sleep(interMessageDelay); |
94 |
| - } |
95 |
| - } |
96 |
| - } |
97 |
| - finally |
98 |
| - { |
99 |
| - client.close(); |
100 |
| - } |
101 |
| - } |
102 |
| - |
103 |
| - /* |
104 |
| - * method to send message over given DeviceClient |
105 |
| - */ |
106 |
| - public static void sendBulkMessages(InternalClient client, |
107 |
| - IotHubClientProtocol protocol, |
108 |
| - List<MessageAndResult> messagesToSend, |
109 |
| - final long RETRY_MILLISECONDS, |
110 |
| - final long SEND_TIMEOUT_MILLISECONDS, |
111 |
| - long interMessageDelay, |
112 |
| - List<Pair<IotHubConnectionStatus, Throwable>> statusUpdates) throws IOException, InterruptedException, IotHubClientException |
113 |
| - { |
114 |
| - try |
115 |
| - { |
116 |
| - client.open(false); |
117 |
| - |
118 |
| - if (protocol != IotHubClientProtocol.HTTPS) |
119 |
| - { |
120 |
| - sendMessages(client, protocol, messagesToSend,RETRY_MILLISECONDS ,SEND_TIMEOUT_MILLISECONDS,interMessageDelay, statusUpdates); |
121 |
| - return; |
122 |
| - } |
123 |
| - |
124 |
| - List<Message> bulkMessages = new ArrayList<>(); |
125 |
| - for (MessageAndResult mar : messagesToSend) { |
126 |
| - bulkMessages.add(mar.message); |
127 |
| - } |
128 |
| - |
129 |
| - BulkMessagesAndResult bulkMessagesAndResult = new BulkMessagesAndResult(bulkMessages, IotHubStatusCode.OK); |
130 |
| - sendBulkMessagesAndWaitForResponse(client, bulkMessagesAndResult, RETRY_MILLISECONDS, SEND_TIMEOUT_MILLISECONDS, protocol); |
131 |
| - } |
132 |
| - finally |
133 |
| - { |
134 |
| - client.close(); |
135 |
| - } |
136 |
| - } |
137 |
| - |
138 |
| - public static void sendMessagesExpectingConnectionStatusChangeUpdate(InternalClient client, |
139 |
| - IotHubClientProtocol protocol, |
140 |
| - List<MessageAndResult> messagesToSend, |
141 |
| - final long RETRY_MILLISECONDS, |
142 |
| - final long SEND_TIMEOUT_MILLISECONDS, |
143 |
| - final IotHubConnectionStatus expectedStatus, |
144 |
| - int interMessageDelay, |
145 |
| - AuthenticationType authType) throws IOException, InterruptedException, IotHubClientException |
146 |
| - { |
147 |
| - final List<Pair<IotHubConnectionStatus, Throwable>> actualStatusUpdates = new ArrayList<>(); |
148 |
| - client.setConnectionStatusChangeCallback((context) -> actualStatusUpdates.add(new Pair<>(context.getNewStatus(), context.getCause())), new Object()); |
149 |
| - |
150 |
| - sendMessages(client, protocol, messagesToSend, RETRY_MILLISECONDS, SEND_TIMEOUT_MILLISECONDS, interMessageDelay, actualStatusUpdates); |
151 |
| - |
152 |
| - Assert.assertTrue(buildExceptionMessage(protocol + ", " + authType + ": Expected connection status update to occur: " + expectedStatus, client), actualStatusUpdatesContainsStatus(actualStatusUpdates, expectedStatus)); |
153 |
| - } |
154 |
| - |
155 |
| - /* |
156 |
| - * method to send message over given DeviceClient |
157 |
| - */ |
158 |
| - public static void sendMessagesMultiplex(InternalClient client, |
159 |
| - IotHubClientProtocol protocol, |
160 |
| - final int NUM_MESSAGES_PER_CONNECTION, |
161 |
| - final long RETRY_MILLISECONDS, |
162 |
| - final long SEND_TIMEOUT_MILLISECONDS) |
163 |
| - { |
164 |
| - String messageString = "Java client e2e test message over " + protocol + " protocol"; |
165 |
| - Message msg = new Message(messageString); |
166 |
| - |
167 |
| - for (int i = 0; i < NUM_MESSAGES_PER_CONNECTION; ++i) |
168 |
| - { |
169 |
| - try |
170 |
| - { |
171 |
| - Success messageSent = new Success(); |
172 |
| - EventCallback callback = new EventCallback(IotHubStatusCode.OK); |
173 |
| - client.sendEventAsync(msg, callback, messageSent); |
174 |
| - |
175 |
| - long startTime = System.currentTimeMillis(); |
176 |
| - while (!messageSent.wasCallbackFired()) |
177 |
| - { |
178 |
| - Thread.sleep(RETRY_MILLISECONDS); |
179 |
| - if (System.currentTimeMillis() - startTime > SEND_TIMEOUT_MILLISECONDS) |
180 |
| - { |
181 |
| - Assert.fail(buildExceptionMessage("Timed out waiting for message callback", client)); |
182 |
| - } |
183 |
| - } |
184 |
| - |
185 |
| - if (messageSent.getCallbackStatusCode() != IotHubStatusCode.OK) |
186 |
| - { |
187 |
| - Assert.fail(buildExceptionMessage("Sending message over " + protocol + " protocol failed: expected status code OK but received: " + messageSent.getCallbackStatusCode(), client)); |
188 |
| - } |
189 |
| - } |
190 |
| - catch (Exception e) |
191 |
| - { |
192 |
| - Assert.fail(buildExceptionMessage("Sending message over " + protocol + " protocol failed: Exception encountered while sending messages: " + e.getMessage(), client)); |
193 |
| - } |
194 |
| - } |
195 |
| - } |
196 |
| - |
197 |
| - public static void sendExpiredMessageExpectingMessageExpiredCallback(InternalClient client, |
198 |
| - IotHubClientProtocol protocol, |
199 |
| - final long RETRY_MILLISECONDS, |
200 |
| - final long SEND_TIMEOUT_MILLISECONDS, |
201 |
| - AuthenticationType authType) throws IOException |
202 |
| - { |
203 |
| - try |
204 |
| - { |
205 |
| - Message expiredMessage = new Message("This message has expired"); |
206 |
| - expiredMessage.setAbsoluteExpiryTime(1); //setting this to 0 causes the message to never expire |
207 |
| - Success messageSentExpiredCallback = new Success(); |
208 |
| - |
209 |
| - client.open(false); |
210 |
| - client.sendEventAsync(expiredMessage, new EventCallback(IotHubStatusCode.MESSAGE_EXPIRED), messageSentExpiredCallback); |
211 |
| - |
212 |
| - long startTime = System.currentTimeMillis(); |
213 |
| - while (!messageSentExpiredCallback.wasCallbackFired()) |
214 |
| - { |
215 |
| - Thread.sleep(RETRY_MILLISECONDS); |
216 |
| - if (System.currentTimeMillis() - startTime > SEND_TIMEOUT_MILLISECONDS) |
217 |
| - { |
218 |
| - Assert.fail(buildExceptionMessage(protocol + ", " + authType + ": Timed out waiting for a message callback", client)); |
219 |
| - } |
220 |
| - } |
221 |
| - |
222 |
| - client.close(); |
223 |
| - |
224 |
| - if (messageSentExpiredCallback.getCallbackStatusCode() != IotHubStatusCode.MESSAGE_EXPIRED) |
225 |
| - { |
226 |
| - Assert.fail(buildExceptionMessage("Sending message over " + protocol + " protocol failed: expected status code MESSAGE_EXPIRED but received: " + messageSentExpiredCallback.getCallbackStatusCode(), client)); |
227 |
| - } |
228 |
| - } |
229 |
| - catch (Exception e) |
230 |
| - { |
231 |
| - client.close(); |
232 |
| - Assert.fail(buildExceptionMessage("Sending expired message over " + protocol + " protocol failed: Exception encountered while sending message and waiting for MESSAGE_EXPIRED callback: " + e.getMessage(), client)); |
233 |
| - } |
234 |
| - } |
235 |
| - |
236 |
| - public static void sendMessagesExpectingUnrecoverableConnectionLossAndTimeout(InternalClient client, |
237 |
| - IotHubClientProtocol protocol, |
238 |
| - Message errorInjectionMessage, |
239 |
| - AuthenticationType authType) throws IOException, InterruptedException, IotHubClientException |
240 |
| - { |
241 |
| - final List<Pair<IotHubConnectionStatus, Throwable>> statusUpdates = new ArrayList<>(); |
242 |
| - client.setConnectionStatusChangeCallback((context) -> statusUpdates.add(new Pair<>(context.getNewStatus(), context.getCause())), new Object()); |
243 |
| - |
244 |
| - client.open(false); |
245 |
| - |
246 |
| - client.sendEventAsync(errorInjectionMessage, new EventCallback(null), new Success()); |
247 |
| - |
248 |
| - long startTime = System.currentTimeMillis(); |
249 |
| - while (!(actualStatusUpdatesContainsStatus(statusUpdates, IotHubConnectionStatus.DISCONNECTED_RETRYING) && actualStatusUpdatesContainsStatus(statusUpdates, IotHubConnectionStatus.DISCONNECTED))) |
250 |
| - { |
251 |
| - Thread.sleep(500); |
252 |
| - |
253 |
| - if (System.currentTimeMillis() - startTime > 30 * 1000) |
254 |
| - { |
255 |
| - break; |
256 |
| - } |
257 |
| - } |
258 |
| - |
259 |
| - Assert.assertTrue(buildExceptionMessage(protocol + ", " + authType + ": Expected notification about disconnected but retrying.", client), actualStatusUpdatesContainsStatus(statusUpdates, IotHubConnectionStatus.DISCONNECTED_RETRYING)); |
260 |
| - Assert.assertTrue(buildExceptionMessage(protocol + ", " + authType + ": Expected notification about disconnected.", client), actualStatusUpdatesContainsStatus(statusUpdates, IotHubConnectionStatus.DISCONNECTED)); |
261 |
| - |
262 |
| - client.close(); |
263 |
| - } |
264 |
| - |
265 | 28 | public static void sendErrorInjectionMessageAndWaitForResponse(InternalClient client, MessageAndResult messageAndResult, IotHubClientProtocol protocol)
|
266 | 29 | {
|
267 | 30 | if (protocol == IotHubClientProtocol.MQTT || protocol == IotHubClientProtocol.MQTT_WS)
|
@@ -309,50 +72,6 @@ public static void sendMessageAndWaitForResponse(InternalClient client, MessageA
|
309 | 72 | }
|
310 | 73 | }
|
311 | 74 |
|
312 |
| - public static void sendBulkMessagesAndWaitForResponse(InternalClient client, BulkMessagesAndResult messagesAndResults, long RETRY_MILLISECONDS, long SEND_TIMEOUT_MILLISECONDS, IotHubClientProtocol protocol) |
313 |
| - { |
314 |
| - try |
315 |
| - { |
316 |
| - Success messageSent = new Success(); |
317 |
| - EventCallback callback = new EventCallback(messagesAndResults.statusCode); |
318 |
| - client.sendEventsAsync(messagesAndResults.messages, callback, messageSent); |
319 |
| - |
320 |
| - long startTime = System.currentTimeMillis(); |
321 |
| - while (!messageSent.wasCallbackFired()) |
322 |
| - { |
323 |
| - Thread.sleep(RETRY_MILLISECONDS); |
324 |
| - if (System.currentTimeMillis() - startTime > SEND_TIMEOUT_MILLISECONDS) |
325 |
| - { |
326 |
| - Assert.fail(buildExceptionMessage("Timed out waiting for a message callback", client)); |
327 |
| - break; |
328 |
| - } |
329 |
| - } |
330 |
| - |
331 |
| - if (messagesAndResults.statusCode != null && messageSent.getCallbackStatusCode() != messagesAndResults.statusCode) |
332 |
| - { |
333 |
| - Assert.fail(buildExceptionMessage("Sending message over " + protocol + " protocol failed: expected " + messagesAndResults.statusCode + " but received " + messageSent.getCallbackStatusCode(), client)); |
334 |
| - } |
335 |
| - } |
336 |
| - catch (Exception e) |
337 |
| - { |
338 |
| - Assert.fail(buildExceptionMessage("Sending message over " + protocol + " protocol failed: Exception encountered while sending and waiting on a message: " + e.getMessage(), client)); |
339 |
| - } |
340 |
| - } |
341 |
| - |
342 |
| - private static boolean isErrorInjectionMessage(MessageAndResult messageAndResult) |
343 |
| - { |
344 |
| - MessageProperty[] properties = messageAndResult.message.getProperties(); |
345 |
| - for (MessageProperty property : properties) |
346 |
| - { |
347 |
| - if (property.getValue().equals(ErrorInjectionHelper.FaultCloseReason_Boom) || property.getValue().equals(ErrorInjectionHelper.FaultCloseReason_Bye)) |
348 |
| - { |
349 |
| - return true; |
350 |
| - } |
351 |
| - } |
352 |
| - |
353 |
| - return false; |
354 |
| - } |
355 |
| - |
356 | 75 | public static void waitForStabilizedConnection(List<Pair<IotHubConnectionStatus, Throwable>> actualStatusUpdates, InternalClient client) throws InterruptedException
|
357 | 76 | {
|
358 | 77 | //Wait until error injection takes effect
|
|
0 commit comments