@@ -50,6 +50,7 @@ const int k_GC_MESSAGES_BATCH_SIZE = 1000; // how many to process in one run
50
50
// CREATORS
51
51
InMemoryStorage::InMemoryStorage (const bmqt::Uri& uri,
52
52
const mqbu::StorageKey& queueKey,
53
+ mqbi::Domain* domain,
53
54
int partitionId,
54
55
const mqbconfm::Domain& config,
55
56
mqbu::CapacityMeter* parentCapacityMeter,
@@ -72,8 +73,11 @@ InMemoryStorage::InMemoryStorage(const bmqt::Uri& uri,
72
73
.addMilliseconds(config.deduplicationTimeMs())
73
74
.totalNanoseconds(),
74
75
allocatorStore ? allocatorStore->get(" Handles" ) : d_allocator_p)
76
+ , d_queueStats_sp(
77
+ bsl::allocate_shared<mqbstat::QueueStatsDomain>(d_allocator_p))
75
78
, d_virtualStorageCatalog(
76
79
this ,
80
+ d_queueStats_sp,
77
81
allocatorStore ? allocatorStore->get (" VirtualHandles" ) : d_allocator_p)
78
82
, d_ttlSeconds(config.messageTtl())
79
83
, d_isEmpty(1 )
@@ -82,6 +86,7 @@ InMemoryStorage::InMemoryStorage(const bmqt::Uri& uri,
82
86
{
83
87
BSLS_ASSERT_SAFE (0 <= d_ttlSeconds); // Broadcast queues can use 0 for TTL
84
88
89
+ d_queueStats_sp->initialize (d_uri, domain);
85
90
d_virtualStorageCatalog.setDefaultRda (config.maxDeliveryAttempts ());
86
91
87
92
if (isProxy ()) {
@@ -234,13 +239,10 @@ InMemoryStorage::put(mqbi::StorageMessageAttributes* attributes,
234
239
235
240
d_currentlyAutoConfirming = bmqt::MessageGUID ();
236
241
237
- if (queue ()) {
238
- queue ()
239
- ->stats ()
240
- ->onEvent <mqbstat::QueueStatsDomain::EventType::e_ADD_MESSAGE>(
242
+ d_queueStats_sp
243
+ ->onEvent <mqbstat::QueueStatsDomain::EventType::e_ADD_MESSAGE>(
241
244
242
- msgSize);
243
- }
245
+ msgSize);
244
246
245
247
d_isEmpty.storeRelaxed (0 );
246
248
@@ -332,11 +334,10 @@ InMemoryStorage::releaseRef(const bmqt::MessageGUID& guid)
332
334
d_capacityMeter.remove (1 , msgLen);
333
335
if (queue ()) {
334
336
queue ()->queueEngine ()->beforeMessageRemoved (guid);
335
- queue ()
336
- ->stats ()
337
- ->onEvent <mqbstat::QueueStatsDomain::EventType::e_DEL_MESSAGE>(
338
- msgLen);
339
337
}
338
+ d_queueStats_sp
339
+ ->onEvent <mqbstat::QueueStatsDomain::EventType::e_DEL_MESSAGE>(
340
+ msgLen);
340
341
341
342
// There is not really a need to remove the guid from all virtual
342
343
// storages, because we can be here only if guid doesn't exist in
@@ -347,13 +348,9 @@ InMemoryStorage::releaseRef(const bmqt::MessageGUID& guid)
347
348
348
349
d_items.erase (it);
349
350
350
- if (queue ()) {
351
- queue ()
352
- ->stats ()
353
- ->onEvent <
354
- mqbstat::QueueStatsDomain::EventType::e_UPDATE_HISTORY>(
355
- d_items.historySize ());
356
- }
351
+ d_queueStats_sp
352
+ ->onEvent <mqbstat::QueueStatsDomain::EventType::e_UPDATE_HISTORY>(
353
+ d_items.historySize ());
357
354
358
355
return mqbi::StorageResult::e_ZERO_REFERENCES; // RETURN
359
356
}
@@ -378,16 +375,11 @@ InMemoryStorage::remove(const bmqt::MessageGUID& msgGUID, int* msgSize)
378
375
// Update resource usage
379
376
d_capacityMeter.remove (1 , msgLen);
380
377
381
- if (queue ()) {
382
- queue ()
383
- ->stats ()
384
- ->onEvent <mqbstat::QueueStatsDomain::EventType::e_DEL_MESSAGE>(
385
- msgLen);
386
- queue ()
387
- ->stats ()
388
- ->onEvent <mqbstat::QueueStatsDomain::EventType::e_UPDATE_HISTORY>(
389
- d_items.historySize ());
390
- }
378
+ d_queueStats_sp
379
+ ->onEvent <mqbstat::QueueStatsDomain::EventType::e_DEL_MESSAGE>(msgLen);
380
+ d_queueStats_sp
381
+ ->onEvent <mqbstat::QueueStatsDomain::EventType::e_UPDATE_HISTORY>(
382
+ d_items.historySize ());
391
383
392
384
if (msgSize) {
393
385
*msgSize = msgLen;
@@ -410,11 +402,8 @@ InMemoryStorage::removeAll(const mqbu::StorageKey& appKey)
410
402
d_items.clear ();
411
403
d_capacityMeter.clear ();
412
404
413
- if (queue ()) {
414
- queue ()
415
- ->stats ()
416
- ->onEvent <mqbstat::QueueStatsDomain::EventType::e_PURGE>(0 );
417
- }
405
+ d_queueStats_sp
406
+ ->onEvent <mqbstat::QueueStatsDomain::EventType::e_PURGE>(0 );
418
407
419
408
d_isEmpty.storeRelaxed (1 );
420
409
@@ -440,12 +429,9 @@ InMemoryStorage::removeAll(const mqbu::StorageKey& appKey)
440
429
d_isEmpty.storeRelaxed (1 );
441
430
}
442
431
443
- if (queue ()) {
444
- queue ()
445
- ->stats ()
446
- ->onEvent <mqbstat::QueueStatsDomain::EventType::e_UPDATE_HISTORY>(
447
- d_items.historySize ());
448
- }
432
+ d_queueStats_sp
433
+ ->onEvent <mqbstat::QueueStatsDomain::EventType::e_UPDATE_HISTORY>(
434
+ d_items.historySize ());
449
435
450
436
return mqbi::StorageResult::e_SUCCESS;
451
437
}
@@ -483,11 +469,10 @@ int InMemoryStorage::gcExpiredMessages(
483
469
d_capacityMeter.remove (1 , msgLen);
484
470
if (queue ()) {
485
471
queue ()->queueEngine ()->beforeMessageRemoved (cit->first );
486
- queue ()
487
- ->stats ()
488
- ->onEvent <mqbstat::QueueStatsDomain::EventType::e_DEL_MESSAGE>(
489
- msgLen);
490
472
}
473
+ d_queueStats_sp
474
+ ->onEvent <mqbstat::QueueStatsDomain::EventType::e_DEL_MESSAGE>(
475
+ msgLen);
491
476
492
477
// Remove message from all virtual storages and the physical (this)
493
478
// storage.
@@ -496,13 +481,11 @@ int InMemoryStorage::gcExpiredMessages(
496
481
++numMsgsDeleted;
497
482
}
498
483
499
- if (queue () && (numMsgsDeleted > 0 )) {
500
- queue ()
501
- ->stats ()
484
+ if (numMsgsDeleted > 0 ) {
485
+ d_queueStats_sp
502
486
->onEvent <mqbstat::QueueStatsDomain::EventType::e_GC_MESSAGE>(
503
487
numMsgsDeleted);
504
- queue ()
505
- ->stats ()
488
+ d_queueStats_sp
506
489
->onEvent <mqbstat::QueueStatsDomain::EventType::e_UPDATE_HISTORY>(
507
490
d_items.historySize ());
508
491
}
@@ -519,12 +502,9 @@ bool InMemoryStorage::gcHistory()
519
502
bool hasMoreToGc = d_items.gc (bmqsys::Time::highResolutionTimer (),
520
503
k_GC_MESSAGES_BATCH_SIZE);
521
504
522
- if (queue ()) {
523
- queue ()
524
- ->stats ()
525
- ->onEvent <mqbstat::QueueStatsDomain::EventType::e_UPDATE_HISTORY>(
526
- d_items.historySize ());
527
- }
505
+ d_queueStats_sp
506
+ ->onEvent <mqbstat::QueueStatsDomain::EventType::e_UPDATE_HISTORY>(
507
+ d_items.historySize ());
528
508
529
509
return hasMoreToGc;
530
510
}
0 commit comments