Skip to content

Commit f3bdc4d

Browse files
chrisbeard678098
andauthored
Fix[MQB]: Select active node quickly when no cluster node in same DC (#606)
* Feat[MQB]: Add unit test for ClusterActiveNodeManager Signed-off-by: Christopher Beard <[email protected]> * Fix[MQB]: Select active node quickly when no cluster node in same DC When a broker connects to a cluster and attempts to select an active node, the broker attempts to select a node in the same data center. After 10 seconds, if no active node has been selected, the broker relaxes the data center requirement and picks any available node. If a cluster does not have any nodes in the broker's data center, this leads to a 10 second delay on the first open queue request to a cluster. This commit changes the active node selection logic for the case where there is no cluster node within the same data center as a broker. In this case, the broker will ignore the data center may use any available node as the active node. Signed-off-by: Christopher Beard <[email protected]> * Tests: Add more active node manager tests Signed-off-by: Christopher Beard <[email protected]> * Update src/groups/mqb/mqbnet/mqbnet_clusteractivenodemanager.t.cpp Co-authored-by: Evgeny Malygin <[email protected]> Signed-off-by: Chris Beard <[email protected]> * Update src/groups/mqb/mqbnet/mqbnet_clusteractivenodemanager.t.cpp Co-authored-by: Evgeny Malygin <[email protected]> Signed-off-by: Chris Beard <[email protected]> * Update src/groups/mqb/mqbnet/mqbnet_clusteractivenodemanager.t.cpp Co-authored-by: Evgeny Malygin <[email protected]> Signed-off-by: Chris Beard <[email protected]> * Update src/groups/mqb/mqbnet/mqbnet_clusteractivenodemanager.t.cpp Co-authored-by: Evgeny Malygin <[email protected]> Signed-off-by: Chris Beard <[email protected]> * Update src/groups/mqb/mqbnet/mqbnet_clusteractivenodemanager.h Co-authored-by: Evgeny Malygin <[email protected]> Signed-off-by: Chris Beard <[email protected]> * Update src/groups/mqb/mqbnet/mqbnet_clusteractivenodemanager.cpp Co-authored-by: Evgeny Malygin <[email protected]> Signed-off-by: Chris Beard <[email protected]> --------- Signed-off-by: Christopher Beard <[email protected]> Signed-off-by: Chris Beard <[email protected]> Co-authored-by: Evgeny Malygin <[email protected]>
1 parent 09b6881 commit f3bdc4d

3 files changed

+353
-2
lines changed

src/groups/mqb/mqbnet/mqbnet_clusteractivenodemanager.cpp

+6-1
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,7 @@ bool ClusterActiveNodeManager::findNewActiveNode()
163163
++it) {
164164
mqbnet::ClusterNode* node = it->first;
165165
if (it->second.d_status == bmqp_ctrlmsg::NodeStatus::E_AVAILABLE &&
166-
((node->dataCenter() == d_dataCenter) ||
166+
(d_ignoreDataCenter || node->dataCenter() == d_dataCenter ||
167167
d_dataCenter == "UNSPECIFIED")) {
168168
candidates.push_back(node);
169169
}
@@ -242,14 +242,19 @@ ClusterActiveNodeManager::ClusterActiveNodeManager(
242242
: d_description(description)
243243
, d_dataCenter(dataCenter)
244244
, d_activeNodeIt(d_nodes.end())
245+
, d_ignoreDataCenter(false)
245246
, d_useExtendedSelection(false)
246247
{
248+
bool clusterHasNodeInLocalDC = false;
247249
for (mqbnet::Cluster::NodesList::const_iterator it = nodes.begin();
248250
it != nodes.end();
249251
++it) {
250252
mqbnet::ClusterNode* node = *it;
251253
d_nodes[node].d_status = bmqp_ctrlmsg::NodeStatus::E_UNAVAILABLE;
254+
clusterHasNodeInLocalDC = clusterHasNodeInLocalDC ||
255+
d_dataCenter == node->dataCenter();
252256
}
257+
d_ignoreDataCenter = !clusterHasNodeInLocalDC;
253258
}
254259

255260
ClusterActiveNodeManager::~ClusterActiveNodeManager()

src/groups/mqb/mqbnet/mqbnet_clusteractivenodemanager.h

+6-1
Original file line numberDiff line numberDiff line change
@@ -258,6 +258,11 @@ class ClusterActiveNodeManager {
258258
// Pointer to the currently active node
259259
// and its context.
260260

261+
/// If true, remove the data center requirement when selecting active
262+
/// node. Set to true when the cluster does not have any nodes in the
263+
/// current machine's data center.
264+
bool d_ignoreDataCenter;
265+
261266
bool d_useExtendedSelection;
262267
// If true, drop the same data center
263268
// requirement when selecting active
@@ -327,7 +332,7 @@ class ClusterActiveNodeManager {
327332

328333
// ACCESSORS
329334

330-
/// Return the currently active node, or a null pointer if there are no
335+
/// Return the currently active node, or a null pointer if there is no
331336
/// node currently active.
332337
ClusterNode* activeNode() const;
333338

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,341 @@
1+
// Copyright 2025 Bloomberg Finance L.P.
2+
// SPDX-License-Identifier: Apache-2.0
3+
//
4+
// Licensed under the Apache License, Version 2.0 (the "License");
5+
// you may not use this file except in compliance with the License.
6+
// You may obtain a copy of the License at
7+
//
8+
// http://www.apache.org/licenses/LICENSE-2.0
9+
//
10+
// Unless required by applicable law or agreed to in writing, software
11+
// distributed under the License is distributed on an "AS IS" BASIS,
12+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
// See the License for the specific language governing permissions and
14+
// limitations under the License.
15+
16+
// mqbnet_clusteractivenodemanager.t.cpp -*-C++-*-
17+
#include <mqbnet_clusteractivenodemanager.h>
18+
19+
// MQB
20+
#include <mqbcfg_messages.h>
21+
#include <mqbnet_cluster.h>
22+
#include <mqbnet_mockcluster.h>
23+
24+
// BMQ
25+
#include <bmqp_ctrlmsg_messages.h>
26+
27+
// BDE
28+
#include <bdlbb_pooledblobbufferfactory.h>
29+
#include <bsl_string.h>
30+
31+
// TEST DRIVER
32+
#include <bmqtst_scopedlogobserver.h>
33+
#include <bmqtst_testhelper.h>
34+
35+
// CONVENIENCE
36+
using namespace BloombergLP;
37+
using namespace bsl;
38+
39+
// ============================================================================
40+
// TESTS
41+
// ----------------------------------------------------------------------------
42+
43+
static void test1_breathingTest()
44+
{
45+
bmqtst::TestHelper::printTestName("BREATHING TEST");
46+
47+
bmqtst::ScopedLogObserver logObserver(ball::Severity::ERROR,
48+
bmqtst::TestHelperUtil::allocator());
49+
50+
mqbnet::Cluster::NodesList nodes;
51+
bsl::string description = "dummy";
52+
bsl::string dataCenter = "east";
53+
54+
mqbnet::ClusterActiveNodeManager mgr =
55+
mqbnet::ClusterActiveNodeManager(nodes, description, dataCenter);
56+
57+
BMQTST_ASSERT(!mgr.activeNode());
58+
BMQTST_ASSERT(logObserver.records().empty());
59+
}
60+
61+
static void test2_activeNodeWithinDC()
62+
// Validate that an available node in the same data center is promptly
63+
// selected as the active node. Nodes outside of the data center will not be
64+
// selected until the selection criteria is explicitly extended.
65+
{
66+
bmqtst::TestHelper::printTestName("ACTIVE NODE IN SAME DC");
67+
68+
bmqtst::ScopedLogObserver logObserver(ball::Severity::ERROR,
69+
bmqtst::TestHelperUtil::allocator());
70+
71+
// Set up mock cluster
72+
mqbcfg::ClusterDefinition clusterConfig(
73+
bmqtst::TestHelperUtil::allocator());
74+
bdlbb::PooledBlobBufferFactory bufferFactory(
75+
1024,
76+
bmqtst::TestHelperUtil::allocator());
77+
mqbnet::MockCluster mockCluster(clusterConfig,
78+
&bufferFactory,
79+
bmqtst::TestHelperUtil::allocator());
80+
81+
// Populate cluster nodes
82+
// - 1 node in "east" data center
83+
// - 1 node in "west" data center
84+
mqbnet::Cluster::NodesList nodes(bmqtst::TestHelperUtil::allocator());
85+
mqbcfg::ClusterNode clusterNodeConfig(bmqtst::TestHelperUtil::allocator());
86+
87+
clusterNodeConfig.dataCenter() = "east";
88+
clusterNodeConfig.name() = "east-1";
89+
mqbnet::MockClusterNode east1(&mockCluster,
90+
clusterNodeConfig,
91+
&bufferFactory,
92+
bmqtst::TestHelperUtil::allocator());
93+
nodes.push_back(&east1);
94+
95+
clusterNodeConfig.dataCenter() = "west";
96+
clusterNodeConfig.name() = "west-1";
97+
mqbnet::MockClusterNode west1(&mockCluster,
98+
clusterNodeConfig,
99+
&bufferFactory,
100+
bmqtst::TestHelperUtil::allocator());
101+
nodes.push_back(&west1);
102+
103+
// Create ClusterActiveNodeManager
104+
bsl::string description = "dummy";
105+
bsl::string dataCenter = "east";
106+
mqbnet::ClusterActiveNodeManager mgr =
107+
mqbnet::ClusterActiveNodeManager(nodes, description, dataCenter);
108+
BMQTST_ASSERT(!mgr.activeNode());
109+
110+
bmqp_ctrlmsg::NegotiationMessage negotiationMessage(
111+
bmqtst::TestHelperUtil::allocator());
112+
negotiationMessage.makeClientIdentity().hostName() = "dummyIdentity";
113+
114+
// "west" node up, it should not become active
115+
{
116+
int rc = mgr.onNodeUp(&west1, negotiationMessage.clientIdentity());
117+
BMQTST_ASSERT_EQ(rc, mqbnet::ClusterActiveNodeManager::e_NO_CHANGE);
118+
BMQTST_ASSERT(!mgr.activeNode());
119+
}
120+
121+
// "east" node up, it should become active
122+
{
123+
int rc = mgr.onNodeUp(&east1, negotiationMessage.clientIdentity());
124+
BMQTST_ASSERT_EQ(rc, mqbnet::ClusterActiveNodeManager::e_NEW_ACTIVE);
125+
BMQTST_ASSERT_EQ(mgr.activeNode(), &east1);
126+
}
127+
128+
// "east" node down, no active node
129+
{
130+
int rc = mgr.onNodeDown(&east1);
131+
BMQTST_ASSERT_EQ(rc, mqbnet::ClusterActiveNodeManager::e_LOST_ACTIVE);
132+
BMQTST_ASSERT(!mgr.activeNode());
133+
}
134+
135+
// Refresh should not change active node
136+
{
137+
int rc = mgr.refresh();
138+
BMQTST_ASSERT_EQ(rc, mqbnet::ClusterActiveNodeManager::e_NO_CHANGE);
139+
BMQTST_ASSERT(!mgr.activeNode());
140+
}
141+
142+
// Relax DC filter logic, "west" should become active
143+
{
144+
mgr.enableExtendedSelection();
145+
int rc = mgr.refresh();
146+
BMQTST_ASSERT_EQ(rc, mqbnet::ClusterActiveNodeManager::e_NEW_ACTIVE);
147+
BMQTST_ASSERT_EQ(mgr.activeNode(), &west1);
148+
}
149+
150+
BMQTST_ASSERT(logObserver.records().empty());
151+
}
152+
153+
static void test3_activeNodeOutsideDC()
154+
// Validate that any available node can be promptly selected as the active
155+
// node if the cluster does not have any nodes in the same data center as the
156+
// local machine.
157+
{
158+
bmqtst::TestHelper::printTestName("ACTIVE NDOE OUTSIDE DC");
159+
160+
bmqtst::ScopedLogObserver logObserver(ball::Severity::ERROR,
161+
bmqtst::TestHelperUtil::allocator());
162+
163+
// Set up mock cluster
164+
mqbcfg::ClusterDefinition clusterConfig(
165+
bmqtst::TestHelperUtil::allocator());
166+
bdlbb::PooledBlobBufferFactory bufferFactory(
167+
1024,
168+
bmqtst::TestHelperUtil::allocator());
169+
mqbnet::MockCluster mockCluster(clusterConfig,
170+
&bufferFactory,
171+
bmqtst::TestHelperUtil::allocator());
172+
173+
// Populate cluster nodes
174+
// - 1 node in "east" data center
175+
// - 1 node in "west" data center
176+
mqbnet::Cluster::NodesList nodes(bmqtst::TestHelperUtil::allocator());
177+
mqbcfg::ClusterNode clusterNodeConfig(bmqtst::TestHelperUtil::allocator());
178+
179+
clusterNodeConfig.dataCenter() = "east";
180+
clusterNodeConfig.name() = "east-1";
181+
mqbnet::MockClusterNode east1(&mockCluster,
182+
clusterNodeConfig,
183+
&bufferFactory,
184+
bmqtst::TestHelperUtil::allocator());
185+
nodes.push_back(&east1);
186+
187+
clusterNodeConfig.dataCenter() = "west";
188+
clusterNodeConfig.name() = "west-1";
189+
mqbnet::MockClusterNode west1(&mockCluster,
190+
clusterNodeConfig,
191+
&bufferFactory,
192+
bmqtst::TestHelperUtil::allocator());
193+
nodes.push_back(&west1);
194+
195+
// Create ClusterActiveNodeManager
196+
bsl::string description = "dummy";
197+
bsl::string dataCenter = "south";
198+
mqbnet::ClusterActiveNodeManager mgr =
199+
mqbnet::ClusterActiveNodeManager(nodes, description, dataCenter);
200+
BMQTST_ASSERT(!mgr.activeNode());
201+
202+
bmqp_ctrlmsg::NegotiationMessage negotiationMessage(
203+
bmqtst::TestHelperUtil::allocator());
204+
negotiationMessage.makeClientIdentity().hostName() = "dummyIdentity";
205+
206+
// "west" node up, it should become active
207+
{
208+
int rc = mgr.onNodeUp(&west1, negotiationMessage.clientIdentity());
209+
BMQTST_ASSERT_EQ(rc, mqbnet::ClusterActiveNodeManager::e_NEW_ACTIVE);
210+
BMQTST_ASSERT_EQ(mgr.activeNode(), &west1);
211+
}
212+
213+
// "east" node up, it should become active
214+
{
215+
int rc = mgr.onNodeUp(&east1, negotiationMessage.clientIdentity());
216+
BMQTST_ASSERT_EQ(rc, mqbnet::ClusterActiveNodeManager::e_NO_CHANGE);
217+
BMQTST_ASSERT_EQ(mgr.activeNode(), &west1);
218+
}
219+
220+
// "west" node down, "east" should become active
221+
{
222+
int rc = mgr.onNodeDown(&west1);
223+
BMQTST_ASSERT_EQ(rc,
224+
mqbnet::ClusterActiveNodeManager::e_LOST_ACTIVE |
225+
mqbnet::ClusterActiveNodeManager::e_NEW_ACTIVE);
226+
BMQTST_ASSERT_EQ(mgr.activeNode(), &east1);
227+
}
228+
229+
BMQTST_ASSERT(logObserver.records().empty());
230+
}
231+
232+
static void test4_panicInExtendedMode()
233+
// Validate that a PANIC log is emitted if in extended mode and unable to
234+
// select an active node.
235+
{
236+
bmqtst::TestHelper::printTestName("ACTIVE NDOE OUTSIDE DC");
237+
238+
bmqtst::ScopedLogObserver logObserver(ball::Severity::ERROR,
239+
bmqtst::TestHelperUtil::allocator());
240+
241+
// Set up mock cluster
242+
mqbcfg::ClusterDefinition clusterConfig(
243+
bmqtst::TestHelperUtil::allocator());
244+
bdlbb::PooledBlobBufferFactory bufferFactory(
245+
1024,
246+
bmqtst::TestHelperUtil::allocator());
247+
mqbnet::MockCluster mockCluster(clusterConfig,
248+
&bufferFactory,
249+
bmqtst::TestHelperUtil::allocator());
250+
251+
// Populate cluster nodes
252+
// - 1 node in "east" data center
253+
// - 1 node in "west" data center
254+
mqbnet::Cluster::NodesList nodes(bmqtst::TestHelperUtil::allocator());
255+
mqbcfg::ClusterNode clusterNodeConfig(bmqtst::TestHelperUtil::allocator());
256+
257+
clusterNodeConfig.dataCenter() = "east";
258+
clusterNodeConfig.name() = "east-1";
259+
mqbnet::MockClusterNode east1(&mockCluster,
260+
clusterNodeConfig,
261+
&bufferFactory,
262+
bmqtst::TestHelperUtil::allocator());
263+
nodes.push_back(&east1);
264+
265+
clusterNodeConfig.dataCenter() = "west";
266+
clusterNodeConfig.name() = "west-1";
267+
mqbnet::MockClusterNode west1(&mockCluster,
268+
clusterNodeConfig,
269+
&bufferFactory,
270+
bmqtst::TestHelperUtil::allocator());
271+
nodes.push_back(&west1);
272+
273+
// Create ClusterActiveNodeManager
274+
bsl::string description = "dummy";
275+
bsl::string dataCenter = "east";
276+
mqbnet::ClusterActiveNodeManager mgr =
277+
mqbnet::ClusterActiveNodeManager(nodes, description, dataCenter);
278+
BMQTST_ASSERT(!mgr.activeNode());
279+
280+
bmqp_ctrlmsg::NegotiationMessage negotiationMessage(
281+
bmqtst::TestHelperUtil::allocator());
282+
negotiationMessage.makeClientIdentity().hostName() = "dummyIdentity";
283+
284+
// Refresh should not panic in normal mode
285+
{
286+
int rc = mgr.refresh();
287+
BMQTST_ASSERT_EQ(rc, mqbnet::ClusterActiveNodeManager::e_NO_CHANGE);
288+
BMQTST_ASSERT(logObserver.records().empty());
289+
}
290+
291+
// Refresh with extended selection should panic
292+
{
293+
mgr.enableExtendedSelection();
294+
int rc = mgr.refresh();
295+
BMQTST_ASSERT_EQ(rc, mqbnet::ClusterActiveNodeManager::e_NO_CHANGE);
296+
BMQTST_ASSERT_EQ(1L, logObserver.records().size());
297+
BMQTST_ASSERT(
298+
logObserver.records().back().fixedFields().messageRef().find(
299+
"PANIC [CLUSTER_ACTIVE_NODE]") != bsl::string::npos);
300+
}
301+
302+
// "west" node up, new active node
303+
{
304+
int rc = mgr.onNodeUp(&west1, negotiationMessage.clientIdentity());
305+
BMQTST_ASSERT_EQ(rc, mqbnet::ClusterActiveNodeManager::e_NEW_ACTIVE);
306+
BMQTST_ASSERT_EQ(mgr.activeNode(), &west1);
307+
}
308+
309+
// "west" node down, should panic again
310+
{
311+
int rc = mgr.onNodeDown(&west1);
312+
BMQTST_ASSERT_EQ(rc, mqbnet::ClusterActiveNodeManager::e_LOST_ACTIVE);
313+
BMQTST_ASSERT_EQ(2L, logObserver.records().size());
314+
BMQTST_ASSERT(
315+
logObserver.records().back().fixedFields().messageRef().find(
316+
"PANIC [CLUSTER_ACTIVE_NODE]") != bsl::string::npos);
317+
}
318+
}
319+
320+
// ============================================================================
321+
// MAIN PROGRAM
322+
// ----------------------------------------------------------------------------
323+
324+
int main(int argc, char* argv[])
325+
{
326+
TEST_PROLOG(bmqtst::TestHelper::e_DEFAULT);
327+
328+
switch (_testCase) {
329+
case 0:
330+
case 4: test4_panicInExtendedMode(); break;
331+
case 3: test3_activeNodeOutsideDC(); break;
332+
case 2: test2_activeNodeWithinDC(); break;
333+
case 1: test1_breathingTest(); break;
334+
default: {
335+
cerr << "WARNING: CASE '" << _testCase << "' NOT FOUND." << endl;
336+
bmqtst::TestHelperUtil::testStatus() = -1;
337+
} break;
338+
}
339+
340+
TEST_EPILOG(bmqtst::TestHelper::e_CHECK_GBL_ALLOC);
341+
}

0 commit comments

Comments
 (0)