Skip to content

Commit 81fbdb7

Browse files
committed
ARTEMIS-4924 Proper handling of invalid messages in SNF queues
- Send invalid messages in SNF queues to DLQ - Add documentation for store and forward queue proper usage
1 parent d0c83af commit 81fbdb7

File tree

9 files changed

+162
-9
lines changed

9 files changed

+162
-9
lines changed

artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQExceptionType.java

+6
Original file line numberDiff line numberDiff line change
@@ -279,6 +279,12 @@ public ActiveMQException createException(String msg) {
279279
public ActiveMQException createException(String msg) {
280280
return new ActiveMQTimeoutException(msg);
281281
}
282+
},
283+
INVALID_MESSAGE_EXCEPTION(224) {
284+
@Override
285+
public ActiveMQException createException(String msg) {
286+
return new ActiveMQInvalidMessageException(msg);
287+
}
282288
};
283289
private static final Map<Integer, ActiveMQExceptionType> TYPE_MAP;
284290

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
* <p>
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
* <p>
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.activemq.artemis.api.core;
19+
20+
public class ActiveMQInvalidMessageException extends ActiveMQException {
21+
22+
public ActiveMQInvalidMessageException(String message) {
23+
super(ActiveMQExceptionType.INVALID_MESSAGE_EXCEPTION, message);
24+
}
25+
}

artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java

+5
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,11 @@ public interface Message {
173173
*/
174174
SimpleString HDR_INGRESS_TIMESTAMP = SimpleString.of("_AMQ_INGRESS_TIMESTAMP");
175175

176+
/**
177+
* This gives extra information as to why the messages is sent to DLQ
178+
*/
179+
SimpleString HDR_ROUTE_DLQ_DETAIL = SimpleString.of("_AMQ_DLQ_DETAIL");
180+
176181
/**
177182
* The prefix used (if any) when sending this message. For protocols (e.g. STOMP) that need to track this and restore
178183
* the prefix when the message is consumed.

artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java

+3
Original file line numberDiff line numberDiff line change
@@ -561,4 +561,7 @@ IllegalStateException invalidRoutingTypeUpdate(String queueName,
561561

562562
@Message(id = 229256, value = "{} must be a positive power of 2 (actual value: {})")
563563
IllegalArgumentException positivePowerOfTwo(String name, Number val);
564+
565+
@Message(id = 229257, value = "Missing header {}")
566+
String messageMissingHeader(SimpleString idsHeaderName);
564567
}

artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -641,9 +641,8 @@ void slowConsumerDetected(String sessionID,
641641
@LogMessage(id = 222109, value = "Timed out waiting for write lock on consumer {} from {}. Check the Thread dump", level = LogMessage.Level.WARN)
642642
void timeoutLockingConsumer(String consumer, String remoteAddress);
643643

644-
@LogMessage(id = 222110, value = "no queue IDs defined!, originalMessage = {}, copiedMessage = {}, props={}", level = LogMessage.Level.WARN)
644+
@LogMessage(id = 222110, value = "no queue IDs defined!, originalMessage = {}, props={}", level = LogMessage.Level.WARN)
645645
void noQueueIdDefined(org.apache.activemq.artemis.api.core.Message message,
646-
org.apache.activemq.artemis.api.core.Message messageCopy,
647646
SimpleString idsHeaderName);
648647

649648
@LogMessage(id = 222111, value = "exception while invoking {} on {}", level = LogMessage.Level.TRACE)

artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java

+10-2
Original file line numberDiff line numberDiff line change
@@ -511,7 +511,7 @@ public void failed(Throwable t) {
511511
}
512512

513513
/* Hook for processing message before forwarding */
514-
protected Message beforeForward(Message message, final SimpleString forwardingAddress) {
514+
protected Message beforeForward(Message message, final SimpleString forwardingAddress) throws ActiveMQException {
515515
message = message.copy();
516516
((RefCountMessage)message).setParentRef((RefCountMessage)message);
517517

@@ -610,7 +610,15 @@ public HandleStatus handle(final MessageReference ref) throws Exception {
610610
dest = ref.getMessage().getAddressSimpleString();
611611
}
612612

613-
final Message message = beforeForward(ref.getMessage(), dest);
613+
final Message message;
614+
try {
615+
message = beforeForward(ref.getMessage(), dest);
616+
} catch (ActiveMQException ex) {
617+
ref.getMessage().putStringProperty(Message.HDR_ROUTE_DLQ_DETAIL, SimpleString.of(ex.toString()));
618+
ref.getQueue().sendToDeadLetterAddress(null, ref);
619+
refs.remove(ref.getMessageID());
620+
return HandleStatus.HANDLED;
621+
}
614622

615623
pendingAcks.countUp();
616624

artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java

+4-5
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import org.apache.activemq.artemis.core.filter.Filter;
4848
import org.apache.activemq.artemis.core.persistence.StorageManager;
4949
import org.apache.activemq.artemis.core.postoffice.BindingType;
50+
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
5051
import org.apache.activemq.artemis.core.server.ActiveMQServer;
5152
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
5253
import org.apache.activemq.artemis.core.server.ComponentConfigurationRoutingType;
@@ -185,7 +186,7 @@ protected ClientSessionFactoryInternal createSessionFactory() throws Exception {
185186
}
186187

187188
@Override
188-
protected Message beforeForward(final Message message, final SimpleString forwardingAddress) {
189+
protected Message beforeForward(final Message message, final SimpleString forwardingAddress) throws ActiveMQException {
189190
// We make a copy of the message, then we strip out the unwanted routing id headers and leave
190191
// only
191192
// the one pertinent for the address node - this is important since different queues on different
@@ -200,11 +201,9 @@ protected Message beforeForward(final Message message, final SimpleString forwar
200201
Set<SimpleString> propNames = new HashSet<>(messageCopy.getPropertyNames());
201202

202203
byte[] queueIds = message.getExtraBytesProperty(idsHeaderName);
203-
204204
if (queueIds == null) {
205-
// Sanity check only
206-
ActiveMQServerLogger.LOGGER.noQueueIdDefined(message, messageCopy, idsHeaderName);
207-
throw new IllegalStateException("no queueIDs defined");
205+
ActiveMQServerLogger.LOGGER.noQueueIdDefined(message, idsHeaderName);
206+
throw ActiveMQExceptionType.INVALID_MESSAGE_EXCEPTION.createException(ActiveMQMessageBundle.BUNDLE.messageMissingHeader(idsHeaderName));
208207
}
209208

210209
for (SimpleString propName : propNames) {

docs/user-manual/clusters.adoc

+5
Original file line numberDiff line numberDiff line change
@@ -715,6 +715,11 @@ The default value is `-1`.
715715

716716
It often makes sense to introduce a delay before redistributing as it's a common case that a consumer closes but another one quickly is created on the same queue, in such a case you probably don't want to redistribute immediately since the new consumer will arrive shortly.
717717

718+
[WARNING]
719+
====
720+
The broker uses internal store and forward queues to handle message redistribution. Be aware that any clients should not directly send messages to the sore and forward queues. If a client sends messages to a store and forward queue, the messages will be sent to dead letter address. If security is enabled, make sure the clients do not have `send` permission on any store and forward queues. (The name pattern for a store and forward queue is <internal-naming-prefix>.sf.<cluster-name>.<nodeID> where the default internal-naming-prefix is `$.activemq.internal`, the cluster-name is the name of the cluster-connection, and the nodeID is the target node's ID)
721+
====
722+
718723
== Cluster topologies
719724

720725
Apache ActiveMQ Artemis clusters can be connected together in many different topologies, let's consider the two most common ones here

tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/ClusteredBridgeReconnectTest.java

+103
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import java.util.concurrent.atomic.AtomicInteger;
3131

3232
import org.apache.activemq.artemis.api.core.ActiveMQException;
33+
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
3334
import org.apache.activemq.artemis.api.core.Message;
3435
import org.apache.activemq.artemis.api.core.SimpleString;
3536
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
@@ -43,6 +44,7 @@
4344
import org.apache.activemq.artemis.core.postoffice.Binding;
4445
import org.apache.activemq.artemis.core.postoffice.Bindings;
4546
import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
47+
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
4648
import org.apache.activemq.artemis.core.server.cluster.MessageFlowRecord;
4749
import org.apache.activemq.artemis.core.server.cluster.RemoteQueueBinding;
4850
import org.apache.activemq.artemis.core.server.cluster.impl.BridgeTestAccessor;
@@ -413,6 +415,107 @@ public void testPauseAddressBlockingSnFQueue() throws Exception {
413415
stopServers(0, 1);
414416
}
415417

418+
@Test
419+
public void testBadClientSendMessagesToSnFQueue() throws Exception {
420+
setupServer(0, isFileStorage(), isNetty());
421+
setupServer(1, isFileStorage(), isNetty());
422+
423+
setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 0, 1);
424+
425+
setupClusterConnection("cluster1", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 1, 0);
426+
427+
String dla = "DLA";
428+
AddressSettings addressSettings = new AddressSettings();
429+
addressSettings.setDeadLetterAddress(SimpleString.of(dla));
430+
431+
servers[0].getAddressSettingsRepository().addMatch("#", addressSettings);
432+
servers[1].getAddressSettingsRepository().addMatch("#", addressSettings);
433+
434+
startServers(0, 1);
435+
436+
setupSessionFactory(0, isNetty());
437+
setupSessionFactory(1, isNetty());
438+
439+
createQueue(0, dla, dla, null, true);
440+
createQueue(1, dla, dla, null, true);
441+
442+
waitForBindings(0, dla, 1, 0, true);
443+
waitForBindings(1, dla, 1, 0, true);
444+
445+
ClientSession session0 = sfs[0].createSession();
446+
ClientSession session1 = sfs[1].createSession();
447+
448+
session0.start();
449+
session1.start();
450+
451+
final int num = 10;
452+
453+
SimpleString nodeId1 = servers[1].getNodeID();
454+
ClusterConnectionImpl cc0 = (ClusterConnectionImpl) servers[0].getClusterManager().getClusterConnection("cluster0");
455+
SimpleString snfQueue0 = cc0.getSfQueueName(nodeId1.toString());
456+
457+
ClientProducer badProducer0 = session0.createProducer(snfQueue0);
458+
for (int i = 0; i < num; i++) {
459+
Message msg = session0.createMessage(true);
460+
msg.putStringProperty("origin", "from producer 0");
461+
badProducer0.send(msg);
462+
}
463+
464+
//add a remote queue and consumer to enable message to flow from node 0 to node 1
465+
createQueue(1, "queues.testaddress", "queue0", null, true);
466+
ClientConsumer consumer1 = session1.createConsumer("queue0");
467+
468+
waitForBindings(0, "queues.testaddress", 0, 0, true);
469+
waitForBindings(1, "queues.testaddress", 1, 1, true);
470+
471+
waitForBindings(0, "queues.testaddress", 1, 1, false);
472+
waitForBindings(1, "queues.testaddress", 0, 0, false);
473+
474+
ClientConsumer dlqConsumer = session0.createConsumer(dla);
475+
476+
for (int i = 0; i < num; i++) {
477+
Message msg = session0.createMessage(true);
478+
msg.putStringProperty("origin", "from producer 0");
479+
badProducer0.send(msg);
480+
}
481+
482+
//messages will never reache the consumer
483+
assertNull(consumer1.receiveImmediate());
484+
485+
SimpleString idsHeaderName = Message.HDR_ROUTE_TO_IDS.concat(snfQueue0);
486+
for (int i = 0; i < num * 2; i++) {
487+
ClientMessage m = dlqConsumer.receive(5000);
488+
assertNotNull(m);
489+
String propValue = m.getStringProperty("origin");
490+
assertEquals("from producer 0", propValue);
491+
propValue = m.getStringProperty(Message.HDR_ROUTE_DLQ_DETAIL);
492+
ActiveMQException expected = ActiveMQExceptionType.INVALID_MESSAGE_EXCEPTION.createException(ActiveMQMessageBundle.BUNDLE.messageMissingHeader(idsHeaderName));
493+
assertEquals(expected.toString(), propValue);
494+
m.acknowledge();
495+
}
496+
assertNull(dlqConsumer.receiveImmediate());
497+
498+
//normal message flow should work
499+
ClientProducer goodProducer0 = session0.createProducer("queues.testaddress");
500+
for (int i = 0; i < num; i++) {
501+
Message msg = session0.createMessage(true);
502+
msg.putStringProperty("origin", "from producer 0");
503+
goodProducer0.send(msg);
504+
}
505+
506+
//consumer1 can receive from node0
507+
for (int i = 0; i < num; i++) {
508+
ClientMessage m = consumer1.receive(5000);
509+
assertNotNull(m);
510+
String propValue = m.getStringProperty("origin");
511+
assertEquals("from producer 0", propValue);
512+
m.acknowledge();
513+
}
514+
assertNull(consumer1.receiveImmediate());
515+
516+
stopServers(0, 1);
517+
}
518+
416519
@Override
417520
@AfterEach
418521
public void tearDown() throws Exception {

0 commit comments

Comments
 (0)