Skip to content

Commit 2cb2a97

Browse files
committed
Feat[MQB]: collect queue stats on replicas
Signed-off-by: Evgeny Malygin <[email protected]>
1 parent c4a007c commit 2cb2a97

17 files changed

+310
-89
lines changed

src/groups/mqb/mqbblp/mqbblp_pushstream.t.cpp

+7-2
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,9 @@
1616
// mqbblp_pushstream.t.cpp -*-C++-*-
1717
#include <mqbblp_pushstream.h>
1818

19+
// MQB
20+
#include <mqbmock_domain.h>
21+
1922
// BMQ
2023
#include <bmqp_messageguidgenerator.h>
2124

@@ -122,12 +125,14 @@ static void test2_iterations()
122125
"dummy",
123126
bmqtst::TestHelperUtil::allocator());
124127
bmqt::Uri dummyUri("dummy", bmqtst::TestHelperUtil::allocator());
125-
mqbconfm::Domain dummyDomain(bmqtst::TestHelperUtil::allocator());
128+
mqbmock::Domain dummyDomain(0, bmqtst::TestHelperUtil::allocator());
129+
mqbconfm::Domain dummyDomainConfig(bmqtst::TestHelperUtil::allocator());
126130

127131
mqbs::InMemoryStorage dummyStorage(dummyUri,
128132
mqbu::StorageKey::k_NULL_KEY,
133+
&dummyDomain,
129134
mqbs::DataStore::k_INVALID_PARTITION_ID,
130-
dummyDomain,
135+
dummyDomainConfig,
131136
&dummyCapacityMeter,
132137
bmqtst::TestHelperUtil::allocator());
133138

src/groups/mqb/mqbblp/mqbblp_queueconsumptionmonitor.t.cpp

+1
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,7 @@ Test::Test()
109109
bmqtst::TestHelperUtil::allocator())
110110
, d_storage(d_queue.uri(),
111111
mqbu::StorageKey::k_NULL_KEY,
112+
&d_domain,
112113
mqbs::DataStore::k_INVALID_PARTITION_ID,
113114
getDomainConfig(),
114115
d_domain.capacityMeter(),

src/groups/mqb/mqbblp/mqbblp_queueenginetester.cpp

+1
Original file line numberDiff line numberDiff line change
@@ -527,6 +527,7 @@ void QueueEngineTester::init(const mqbconfm::Domain& domainConfig,
527527
mqbi::Storage* storage_p = new (*d_allocator_p)
528528
mqbs::InMemoryStorage(d_mockQueue_sp->uri(),
529529
k_NULL_QUEUE_KEY,
530+
d_mockDomain_mp.get(),
530531
k_PARTITION_ID,
531532
domainConfig,
532533
d_mockDomain_mp->capacityMeter(),

src/groups/mqb/mqbblp/mqbblp_remotequeue.cpp

+1
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,7 @@ int RemoteQueue::configureAsProxy(bsl::ostream& errorDescription,
113113
storageSp.load(new (*d_allocator_p) mqbs::InMemoryStorage(
114114
d_state_p->uri(),
115115
d_state_p->key(),
116+
d_state_p->domain(),
116117
mqbs::DataStore::k_INVALID_PARTITION_ID,
117118
domainCfg,
118119
d_state_p->domain()->capacityMeter(),

src/groups/mqb/mqbblp/mqbblp_routers.t.cpp

+4
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626

2727
// MQB
2828
#include <mqbcfg_brokerconfig.h>
29+
#include <mqbmock_domain.h>
2930
#include <mqbmock_queue.h>
3031
#include <mqbmock_queuehandle.h>
3132
#include <mqbs_inmemorystorage.h>
@@ -54,6 +55,7 @@ struct TestStorage {
5455
mqbconfm::Domain d_domainCfg;
5556
mqbu::CapacityMeter d_capacityMeter;
5657
mqbu::StorageKey d_storageKey;
58+
mqbmock::Domain d_domain;
5759
mqbs::InMemoryStorage d_storage;
5860
bslma::ManagedPtr<mqbi::StorageIterator> d_iterator;
5961
bdlbb::PooledBlobBufferFactory d_bufferFactory;
@@ -67,8 +69,10 @@ struct TestStorage {
6769
, d_domainCfg(allocator)
6870
, d_capacityMeter("cm", allocator)
6971
, d_storageKey(d_subQueueId)
72+
, d_domain(0, allocator) // Use domain only to hold mqbstat::StatContext
7073
, d_storage(bmqt::Uri("uri", allocator),
7174
d_storageKey,
75+
&d_domain,
7276
1,
7377
d_domainCfg,
7478
&d_capacityMeter,

src/groups/mqb/mqbs/mqbs_filebackedstorage.cpp

+3-4
Original file line numberDiff line numberDiff line change
@@ -112,8 +112,10 @@ FileBackedStorage::FileBackedStorage(
112112
, d_queueKey(queueKey)
113113
, d_config()
114114
, d_queueUri(queueUri, allocator)
115+
, d_queueStats_sp(bsl::allocate_shared<mqbstat::QueueStatsDomain>(allocator))
115116
, d_virtualStorageCatalog(
116117
this,
118+
d_queueStats_sp,
117119
allocatorStore ? allocatorStore->get("VirtualHandles") : d_allocator_p)
118120
, d_ttlSeconds(domain->config().messageTtl())
119121
, d_capacityMeter(
@@ -133,7 +135,6 @@ FileBackedStorage::FileBackedStorage(
133135
, d_hasReceipts(!domain->config().consistency().isStrongValue())
134136
, d_currentlyAutoConfirming()
135137
, d_autoConfirms(d_allocator_p)
136-
, d_queueStats_sp()
137138
{
138139
BSLS_ASSERT(d_store_p);
139140

@@ -147,11 +148,9 @@ FileBackedStorage::FileBackedStorage(
147148
// and domain instance will return a zero capacity meter when queries to be
148149
// passed to the 'FileBackedStorage' instance.
149150

151+
d_queueStats_sp->initialize(queueUri, domain);
150152
d_virtualStorageCatalog.setDefaultRda(
151153
domain->config().maxDeliveryAttempts());
152-
153-
d_queueStats_sp.createInplace(d_allocator_p, d_allocator_p);
154-
d_queueStats_sp->initialize(queueUri, domain);
155154
}
156155

157156
FileBackedStorage::~FileBackedStorage()

src/groups/mqb/mqbs/mqbs_filebackedstorage.h

+3-3
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,9 @@ class FileBackedStorage BSLS_KEYWORD_FINAL : public ReplicatedStorage {
169169

170170
bmqt::Uri d_queueUri;
171171

172+
/// Statistics of the queue associated to this storage.
173+
bsl::shared_ptr<mqbstat::QueueStatsDomain> d_queueStats_sp;
174+
172175
VirtualStorageCatalog d_virtualStorageCatalog;
173176

174177
bsls::Types::Int64 d_ttlSeconds;
@@ -210,9 +213,6 @@ class FileBackedStorage BSLS_KEYWORD_FINAL : public ReplicatedStorage {
210213
AutoConfirms d_autoConfirms;
211214
// Auto CONFIRMs waiting for 'put' or 'processMessageRecord'
212215

213-
bsl::shared_ptr<mqbstat::QueueStatsDomain> d_queueStats_sp;
214-
// Statistics of the queue associated to this storage.
215-
216216
private:
217217
// NOT IMPLEMENTED
218218
FileBackedStorage(const FileBackedStorage&) BSLS_KEYWORD_DELETED;

src/groups/mqb/mqbs/mqbs_filestore.cpp

+1
Original file line numberDiff line numberDiff line change
@@ -5457,6 +5457,7 @@ void FileStore::createStorage(bsl::shared_ptr<ReplicatedStorage>* storageSp,
54575457
storageSp->reset(new (*storageAlloc)
54585458
InMemoryStorage(queueUri,
54595459
queueKey,
5460+
domain,
54605461
config().partitionId(),
54615462
domain->config(),
54625463
domain->capacityMeter(),

src/groups/mqb/mqbs/mqbs_inmemorystorage.cpp

+35-53
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ const int k_GC_MESSAGES_BATCH_SIZE = 1000; // how many to process in one run
5050
// CREATORS
5151
InMemoryStorage::InMemoryStorage(const bmqt::Uri& uri,
5252
const mqbu::StorageKey& queueKey,
53+
mqbi::Domain* domain,
5354
int partitionId,
5455
const mqbconfm::Domain& config,
5556
mqbu::CapacityMeter* parentCapacityMeter,
@@ -72,8 +73,11 @@ InMemoryStorage::InMemoryStorage(const bmqt::Uri& uri,
7273
.addMilliseconds(config.deduplicationTimeMs())
7374
.totalNanoseconds(),
7475
allocatorStore ? allocatorStore->get("Handles") : d_allocator_p)
76+
, d_queueStats_sp(
77+
bsl::allocate_shared<mqbstat::QueueStatsDomain>(d_allocator_p))
7578
, d_virtualStorageCatalog(
7679
this,
80+
d_queueStats_sp,
7781
allocatorStore ? allocatorStore->get("VirtualHandles") : d_allocator_p)
7882
, d_ttlSeconds(config.messageTtl())
7983
, d_isEmpty(1)
@@ -82,6 +86,7 @@ InMemoryStorage::InMemoryStorage(const bmqt::Uri& uri,
8286
{
8387
BSLS_ASSERT_SAFE(0 <= d_ttlSeconds); // Broadcast queues can use 0 for TTL
8488

89+
d_queueStats_sp->initialize(d_uri, domain);
8590
d_virtualStorageCatalog.setDefaultRda(config.maxDeliveryAttempts());
8691

8792
if (isProxy()) {
@@ -132,6 +137,8 @@ void InMemoryStorage::setQueue(mqbi::Queue* queue)
132137
// Update queue stats if a queue has been associated with the storage.
133138

134139
if (queue) {
140+
queue->setStats(d_queueStats_sp);
141+
135142
const bsls::Types::Int64 numMessage = numMessages(
136143
mqbu::StorageKey::k_NULL_KEY);
137144
const bsls::Types::Int64 numByte = numBytes(
@@ -234,13 +241,10 @@ InMemoryStorage::put(mqbi::StorageMessageAttributes* attributes,
234241

235242
d_currentlyAutoConfirming = bmqt::MessageGUID();
236243

237-
if (queue()) {
238-
queue()
239-
->stats()
240-
->onEvent<mqbstat::QueueStatsDomain::EventType::e_ADD_MESSAGE>(
244+
d_queueStats_sp
245+
->onEvent<mqbstat::QueueStatsDomain::EventType::e_ADD_MESSAGE>(
241246

242-
msgSize);
243-
}
247+
msgSize);
244248

245249
d_isEmpty.storeRelaxed(0);
246250

@@ -332,11 +336,10 @@ InMemoryStorage::releaseRef(const bmqt::MessageGUID& guid)
332336
d_capacityMeter.remove(1, msgLen);
333337
if (queue()) {
334338
queue()->queueEngine()->beforeMessageRemoved(guid);
335-
queue()
336-
->stats()
337-
->onEvent<mqbstat::QueueStatsDomain::EventType::e_DEL_MESSAGE>(
338-
msgLen);
339339
}
340+
d_queueStats_sp
341+
->onEvent<mqbstat::QueueStatsDomain::EventType::e_DEL_MESSAGE>(
342+
msgLen);
340343

341344
// There is not really a need to remove the guid from all virtual
342345
// storages, because we can be here only if guid doesn't exist in
@@ -347,13 +350,9 @@ InMemoryStorage::releaseRef(const bmqt::MessageGUID& guid)
347350

348351
d_items.erase(it);
349352

350-
if (queue()) {
351-
queue()
352-
->stats()
353-
->onEvent<
354-
mqbstat::QueueStatsDomain::EventType::e_UPDATE_HISTORY>(
355-
d_items.historySize());
356-
}
353+
d_queueStats_sp
354+
->onEvent<mqbstat::QueueStatsDomain::EventType::e_UPDATE_HISTORY>(
355+
d_items.historySize());
357356

358357
return mqbi::StorageResult::e_ZERO_REFERENCES; // RETURN
359358
}
@@ -378,16 +377,11 @@ InMemoryStorage::remove(const bmqt::MessageGUID& msgGUID, int* msgSize)
378377
// Update resource usage
379378
d_capacityMeter.remove(1, msgLen);
380379

381-
if (queue()) {
382-
queue()
383-
->stats()
384-
->onEvent<mqbstat::QueueStatsDomain::EventType::e_DEL_MESSAGE>(
385-
msgLen);
386-
queue()
387-
->stats()
388-
->onEvent<mqbstat::QueueStatsDomain::EventType::e_UPDATE_HISTORY>(
389-
d_items.historySize());
390-
}
380+
d_queueStats_sp
381+
->onEvent<mqbstat::QueueStatsDomain::EventType::e_DEL_MESSAGE>(msgLen);
382+
d_queueStats_sp
383+
->onEvent<mqbstat::QueueStatsDomain::EventType::e_UPDATE_HISTORY>(
384+
d_items.historySize());
391385

392386
if (msgSize) {
393387
*msgSize = msgLen;
@@ -410,11 +404,8 @@ InMemoryStorage::removeAll(const mqbu::StorageKey& appKey)
410404
d_items.clear();
411405
d_capacityMeter.clear();
412406

413-
if (queue()) {
414-
queue()
415-
->stats()
416-
->onEvent<mqbstat::QueueStatsDomain::EventType::e_PURGE>(0);
417-
}
407+
d_queueStats_sp
408+
->onEvent<mqbstat::QueueStatsDomain::EventType::e_PURGE>(0);
418409

419410
d_isEmpty.storeRelaxed(1);
420411

@@ -440,12 +431,9 @@ InMemoryStorage::removeAll(const mqbu::StorageKey& appKey)
440431
d_isEmpty.storeRelaxed(1);
441432
}
442433

443-
if (queue()) {
444-
queue()
445-
->stats()
446-
->onEvent<mqbstat::QueueStatsDomain::EventType::e_UPDATE_HISTORY>(
447-
d_items.historySize());
448-
}
434+
d_queueStats_sp
435+
->onEvent<mqbstat::QueueStatsDomain::EventType::e_UPDATE_HISTORY>(
436+
d_items.historySize());
449437

450438
return mqbi::StorageResult::e_SUCCESS;
451439
}
@@ -483,11 +471,10 @@ int InMemoryStorage::gcExpiredMessages(
483471
d_capacityMeter.remove(1, msgLen);
484472
if (queue()) {
485473
queue()->queueEngine()->beforeMessageRemoved(cit->first);
486-
queue()
487-
->stats()
488-
->onEvent<mqbstat::QueueStatsDomain::EventType::e_DEL_MESSAGE>(
489-
msgLen);
490474
}
475+
d_queueStats_sp
476+
->onEvent<mqbstat::QueueStatsDomain::EventType::e_DEL_MESSAGE>(
477+
msgLen);
491478

492479
// Remove message from all virtual storages and the physical (this)
493480
// storage.
@@ -496,13 +483,11 @@ int InMemoryStorage::gcExpiredMessages(
496483
++numMsgsDeleted;
497484
}
498485

499-
if (queue() && (numMsgsDeleted > 0)) {
500-
queue()
501-
->stats()
486+
if (numMsgsDeleted > 0) {
487+
d_queueStats_sp
502488
->onEvent<mqbstat::QueueStatsDomain::EventType::e_GC_MESSAGE>(
503489
numMsgsDeleted);
504-
queue()
505-
->stats()
490+
d_queueStats_sp
506491
->onEvent<mqbstat::QueueStatsDomain::EventType::e_UPDATE_HISTORY>(
507492
d_items.historySize());
508493
}
@@ -519,12 +504,9 @@ bool InMemoryStorage::gcHistory()
519504
bool hasMoreToGc = d_items.gc(bmqsys::Time::highResolutionTimer(),
520505
k_GC_MESSAGES_BATCH_SIZE);
521506

522-
if (queue()) {
523-
queue()
524-
->stats()
525-
->onEvent<mqbstat::QueueStatsDomain::EventType::e_UPDATE_HISTORY>(
526-
d_items.historySize());
527-
}
507+
d_queueStats_sp
508+
->onEvent<mqbstat::QueueStatsDomain::EventType::e_UPDATE_HISTORY>(
509+
d_items.historySize());
528510

529511
return hasMoreToGc;
530512
}

src/groups/mqb/mqbs/mqbs_inmemorystorage.h

+4
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,9 @@ class InMemoryStorage BSLS_KEYWORD_FINAL : public ReplicatedStorage {
187187

188188
ItemsMap d_items;
189189

190+
/// Statistics of the queue associated to this storage.
191+
bsl::shared_ptr<mqbstat::QueueStatsDomain> d_queueStats_sp;
192+
190193
VirtualStorageCatalog d_virtualStorageCatalog;
191194

192195
RecordHandles d_queueOpRecordHandles;
@@ -245,6 +248,7 @@ class InMemoryStorage BSLS_KEYWORD_FINAL : public ReplicatedStorage {
245248
/// `allocator`.
246249
InMemoryStorage(const bmqt::Uri& uri,
247250
const mqbu::StorageKey& queueKey,
251+
mqbi::Domain* domain,
248252
int partitionId,
249253
const mqbconfm::Domain& config,
250254
mqbu::CapacityMeter* parentCapacityMeter,

src/groups/mqb/mqbs/mqbs_inmemorystorage.t.cpp

+1
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,7 @@ struct Tester {
205205
new (*d_allocator_p) mqbs::InMemoryStorage(
206206
bmqt::Uri(uri, bmqtst::TestHelperUtil::allocator()),
207207
queueKey,
208+
&d_mockDomain,
208209
partitionId,
209210
domainCfg,
210211
d_mockDomain.capacityMeter(),

src/groups/mqb/mqbs/mqbs_storageprintutil.t.cpp

+4
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
// MQB
2020
#include <mqbcmd_messages.h>
2121
#include <mqbi_queueengine.h>
22+
#include <mqbmock_domain.h>
2223
#include <mqbs_inmemorystorage.h>
2324
#include <mqbu_capacitymeter.h>
2425
#include <mqbu_messageguidutil.h>
@@ -122,6 +123,7 @@ struct Tester {
122123
bdlbb::PooledBlobBufferFactory d_bufferFactory;
123124
bsl::vector<bmqt::MessageGUID> d_guids;
124125
mqbu::CapacityMeter d_capacityMeter;
126+
mqbmock::Domain d_domain;
125127
bslma::ManagedPtr<mqbs::InMemoryStorage> d_storage_mp;
126128

127129
public:
@@ -130,6 +132,7 @@ struct Tester {
130132
: d_bufferFactory(1024, bmqtst::TestHelperUtil::allocator())
131133
, d_guids(bmqtst::TestHelperUtil::allocator())
132134
, d_capacityMeter("test", bmqtst::TestHelperUtil::allocator())
135+
, d_domain(0, bmqtst::TestHelperUtil::allocator())
133136
{
134137
d_capacityMeter.setLimits(k_INT64_MAX, k_INT64_MAX);
135138

@@ -142,6 +145,7 @@ struct Tester {
142145
new (*bmqtst::TestHelperUtil::allocator()) mqbs::InMemoryStorage(
143146
bmqt::Uri(uri, bmqtst::TestHelperUtil::allocator()),
144147
k_QUEUE_KEY,
148+
&d_domain,
145149
0,
146150
domainCfg,
147151
&d_capacityMeter,

0 commit comments

Comments
 (0)