Skip to content

Commit 2269bb2

Browse files
committed
ARTEMIS-4924 Do not allow sending messages directly to store-and-forward queues
1 parent bc1bb99 commit 2269bb2

File tree

9 files changed

+533
-9
lines changed

9 files changed

+533
-9
lines changed

artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -641,9 +641,8 @@ void slowConsumerDetected(String sessionID,
641641
@LogMessage(id = 222109, value = "Timed out waiting for write lock on consumer {} from {}. Check the Thread dump", level = LogMessage.Level.WARN)
642642
void timeoutLockingConsumer(String consumer, String remoteAddress);
643643

644-
@LogMessage(id = 222110, value = "no queue IDs defined!, originalMessage = {}, copiedMessage = {}, props={}", level = LogMessage.Level.WARN)
644+
@LogMessage(id = 222110, value = "no queue IDs defined!, originalMessage = {}, props={}", level = LogMessage.Level.WARN)
645645
void noQueueIdDefined(org.apache.activemq.artemis.api.core.Message message,
646-
org.apache.activemq.artemis.api.core.Message messageCopy,
647646
SimpleString idsHeaderName);
648647

649648
@LogMessage(id = 222111, value = "exception while invoking {} on {}", level = LogMessage.Level.TRACE)

artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java

+4
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,10 @@ public interface Queue extends Bindable,CriticalComponent {
7474

7575
void refDown(MessageReference messageReference);
7676

77+
default boolean checkInvalid(final MessageReference ref) {
78+
return false;
79+
}
80+
7781
/** Remove item with a supplied non-negative {@literal (>= 0) } ID.
7882
* If the idSupplier returns {@literal < 0} the ID is considered a non value (null) and it will be ignored.
7983
*

artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterConnection.java

+2
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@
3030

3131
public interface ClusterConnection extends ActiveMQComponent, ClusterTopologyListener {
3232

33+
String SN_PREFIX = "sf.";
34+
3335
SimpleString getName();
3436

3537
String getNodeID();

artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java

-6
Original file line numberDiff line numberDiff line change
@@ -201,12 +201,6 @@ protected Message beforeForward(final Message message, final SimpleString forwar
201201

202202
byte[] queueIds = message.getExtraBytesProperty(idsHeaderName);
203203

204-
if (queueIds == null) {
205-
// Sanity check only
206-
ActiveMQServerLogger.LOGGER.noQueueIdDefined(message, messageCopy, idsHeaderName);
207-
throw new IllegalStateException("no queueIDs defined");
208-
}
209-
210204
for (SimpleString propName : propNames) {
211205
if (propName.startsWith(Message.HDR_ROUTE_TO_IDS)) {
212206
messageCopy.removeProperty(propName);

artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java

+12-1
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454
import org.apache.activemq.artemis.core.postoffice.PostOffice;
5555
import org.apache.activemq.artemis.core.postoffice.impl.PostOfficeImpl;
5656
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
57+
import org.apache.activemq.artemis.core.security.Role;
5758
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
5859
import org.apache.activemq.artemis.core.server.ActiveMQServer;
5960
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
@@ -84,7 +85,6 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn
8485

8586
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
8687

87-
private static final String SN_PREFIX = "sf.";
8888
/**
8989
* When getting member on node-up and down we have to remove the name from the transport config
9090
* as the setting we build here doesn't need to consider the name, so use the same name on all
@@ -712,6 +712,17 @@ private synchronized void activate() throws Exception {
712712

713713
serverLocator.start(server.getExecutorFactory().getExecutor());
714714
}
715+
// add security role to disallow send/edit directly to sf queues
716+
Set<Role> roles = new HashSet<>();
717+
String addressMatch = storeAndForwardPrefix + name + "." + server.getConfiguration().getWildcardConfiguration().getSingleWordString();
718+
roles.add(new Role("*", false, true, false, false, false, false, false, true, false, false, true, false));
719+
Pair<String, Set<Role>> securityItem = new Pair<>(addressMatch, roles);
720+
if (server.getSecurityRepository().getMatch(addressMatch) == null) {
721+
server.getSecurityRepository().addMatch(securityItem.getA(), securityItem.getB());
722+
} else {
723+
//don't override user's configuration
724+
logger.debug("broker has security settings for store and forward addresses {}", addressMatch);
725+
}
715726

716727
if (managementService != null) {
717728
TypedProperties props = new TypedProperties();

artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueFactoryImpl.java

+10
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.apache.activemq.artemis.core.server.Queue;
3333
import org.apache.activemq.artemis.core.server.QueueConfig;
3434
import org.apache.activemq.artemis.core.server.QueueFactory;
35+
import org.apache.activemq.artemis.core.server.cluster.ClusterConnection;
3536
import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
3637
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
3738
import org.apache.activemq.artemis.utils.ExecutorFactory;
@@ -94,13 +95,22 @@ public Queue createQueueWith(final QueueConfiguration config, PagingManager pagi
9495
PageSubscription pageSubscription = getPageSubscription(config, pagingManager, filter);
9596
if (lastValueKey(config) != null) {
9697
queue = new LastValueQueue(config.setLastValueKey(lastValueKey(config)), filter, pageSubscription != null ? pageSubscription.getPagingStore() : null, pageSubscription, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor(), server, this);
98+
} else if (isSnf(config)) {
99+
queue = new StoreAndForwardQueue(config, filter, pageSubscription != null ? pageSubscription.getPagingStore() : null, pageSubscription, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor(), server, this);
97100
} else {
98101
queue = new QueueImpl(config, filter, pageSubscription != null ? pageSubscription.getPagingStore() : null, pageSubscription, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor(), server, this);
99102
}
100103
server.getCriticalAnalyzer().add(queue);
101104
return queue;
102105
}
103106

107+
private boolean isSnf(QueueConfiguration config) {
108+
if (config.isInternal()) {
109+
return config.getName().toString().startsWith(server.getInternalNamingPrefix() + ClusterConnection.SN_PREFIX);
110+
}
111+
return false;
112+
}
113+
104114
@Deprecated
105115
@Override
106116
public Queue createQueue(final long persistenceID,

artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java

+22
Original file line numberDiff line numberDiff line change
@@ -3230,6 +3230,19 @@ private boolean deliver() {
32303230
consumers.reset();
32313231
continue;
32323232
}
3233+
if (checkInvalid(ref)) {
3234+
logger.trace("Reference {} is not valid", ref);
3235+
incDelivering(ref);
3236+
try {
3237+
sendToDeadLetterAddress(null, ref);
3238+
} catch (Exception e) {
3239+
logger.trace("Failed to send reference {} to DLA", ref);
3240+
}
3241+
removeMessageReference(holder, ref);
3242+
handled++;
3243+
consumers.reset();
3244+
continue;
3245+
}
32333246

32343247
logger.trace("Queue {} is delivering reference {}", name, ref);
32353248

@@ -4001,6 +4014,15 @@ private boolean deliver(final MessageReference ref) {
40014014
if (checkExpired(ref)) {
40024015
return true;
40034016
}
4017+
if (checkInvalid(ref)) {
4018+
logger.trace("Reference {} is not valid", ref);
4019+
try {
4020+
sendToDeadLetterAddress(null, ref);
4021+
} catch (Exception e) {
4022+
logger.trace("Failed to send reference {} to DLA", ref);
4023+
}
4024+
return true;
4025+
}
40044026

40054027
consumers.reset();
40064028

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
* <p>
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
* <p>
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.activemq.artemis.core.server.impl;
19+
20+
import java.util.concurrent.ScheduledExecutorService;
21+
22+
import org.apache.activemq.artemis.api.core.Message;
23+
import org.apache.activemq.artemis.api.core.QueueConfiguration;
24+
import org.apache.activemq.artemis.api.core.SimpleString;
25+
import org.apache.activemq.artemis.core.filter.Filter;
26+
import org.apache.activemq.artemis.core.paging.PagingStore;
27+
import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
28+
import org.apache.activemq.artemis.core.persistence.StorageManager;
29+
import org.apache.activemq.artemis.core.postoffice.PostOffice;
30+
import org.apache.activemq.artemis.core.server.ActiveMQServer;
31+
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
32+
import org.apache.activemq.artemis.core.server.MessageReference;
33+
import org.apache.activemq.artemis.core.server.QueueFactory;
34+
import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
35+
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
36+
import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
37+
38+
public class StoreAndForwardQueue extends QueueImpl {
39+
40+
public StoreAndForwardQueue(QueueConfiguration queueConfiguration,
41+
Filter filter,
42+
PagingStore pagingStore,
43+
PageSubscription pageSubscription,
44+
ScheduledExecutorService scheduledExecutor,
45+
PostOffice postOffice,
46+
StorageManager storageManager,
47+
HierarchicalRepository<AddressSettings> addressSettingsRepository,
48+
ArtemisExecutor executor,
49+
ActiveMQServer server,
50+
QueueFactory factory) {
51+
super(queueConfiguration, filter, pagingStore, pageSubscription, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executor, server, factory);
52+
}
53+
54+
@Override
55+
public boolean checkInvalid(final MessageReference ref) {
56+
Message message = ref.getMessage();
57+
SimpleString idsHeaderName = Message.HDR_ROUTE_TO_IDS.concat(this.getName());
58+
byte[] queueIds = message.getExtraBytesProperty(idsHeaderName);
59+
if (queueIds == null) {
60+
ActiveMQServerLogger.LOGGER.noQueueIdDefined(message, idsHeaderName);
61+
return true;
62+
}
63+
return false;
64+
}
65+
}

0 commit comments

Comments
 (0)