Skip to content

Commit e37ba55

Browse files
committed
gc'ing old message in the presence of new app
Signed-off-by: dorjesinpo <[email protected]>
1 parent 57a4779 commit e37ba55

6 files changed

+90
-21
lines changed

src/groups/mqb/mqbc/mqbc_storageutil.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -1521,7 +1521,7 @@ void StorageUtil::recoveredQueuesCb(
15211521
<< "encountered a duplicate AppId while processing "
15221522
<< "recovered queue [" << uri << "], " << "queueKey ["
15231523
<< qit->first << "]. AppId [" << *(appIdsIrc.first)
1524-
<< "]. AppKey [" << cit->second << "]."
1524+
<< "]. AppKey [" << cit->first << "]."
15251525
<< BMQTSK_ALARMLOG_END;
15261526
mqbu::ExitUtil::terminate(
15271527
mqbu::ExitCode::e_RECOVERY_FAILURE);

src/groups/mqb/mqbs/mqbs_virtualstorage.cpp

+7-11
Original file line numberDiff line numberDiff line change
@@ -142,16 +142,10 @@ bool VirtualStorage::remove(mqbi::DataStreamMessage* dataStreamMessage,
142142
return wasPending;
143143
}
144144

145-
void VirtualStorage::onGC(const mqbi::DataStreamMessage& dataStreamMessage)
145+
void VirtualStorage::onGC(int size)
146146
{
147-
if (!dataStreamMessage.d_apps.empty()) {
148-
const mqbi::AppMessage& appMessage = dataStreamMessage.app(ordinal());
149-
150-
if (!appMessage.isPending()) {
151-
d_removedBytes -= dataStreamMessage.d_size;
152-
--d_numRemoved;
153-
}
154-
}
147+
d_removedBytes -= size;
148+
--d_numRemoved;
155149
}
156150

157151
void VirtualStorage::resetStats()
@@ -165,9 +159,11 @@ void VirtualStorage::replaceOrdinal(unsigned int replacingOrdinal)
165159
d_ordinal = replacingOrdinal;
166160
}
167161

168-
void VirtualStorage::setNumRemoved(bsls::Types::Int64 numRemoved)
162+
void VirtualStorage::setNumRemoved(bsls::Types::Int64 numMessages,
163+
bsls::Types::Int64 bytes)
169164
{
170-
d_numRemoved = numRemoved;
165+
d_numRemoved = numMessages;
166+
d_removedBytes = bytes;
171167
}
172168

173169
bool VirtualStorage::hasReceipt(const bmqt::MessageGUID& msgGUID) const

src/groups/mqb/mqbs/mqbs_virtualstorage.h

+5-4
Original file line numberDiff line numberDiff line change
@@ -172,16 +172,17 @@ class VirtualStorage {
172172
bool remove(mqbi::DataStreamMessage* dataStreamMessage,
173173
unsigned int replacingOrdinal);
174174

175-
/// Observe removal of this App from the specified 'dataStreamMessage' by
176-
/// GC and update bytes and messages counts if needed.
177-
void onGC(const mqbi::DataStreamMessage& dataStreamMessage);
175+
/// Update bytes by the specified 'size' and messages counts by `1` for
176+
/// this App as the result of garbage-collecting the message.
177+
void onGC(int size);
178178

179179
/// Reset bytes and messages counts as in the case of purging all Apps.
180180
void resetStats();
181181

182182
void replaceOrdinal(unsigned int replacingOrdinal);
183183

184-
void setNumRemoved(bsls::Types::Int64 numRemoved);
184+
void setNumRemoved(bsls::Types::Int64 numRemoved,
185+
bsls::Types::Int64 bytes);
185186
};
186187

187188
// =====================

src/groups/mqb/mqbs/mqbs_virtualstoragecatalog.cpp

+19-3
Original file line numberDiff line numberDiff line change
@@ -312,7 +312,13 @@ VirtualStorageCatalog::gc(const bmqt::MessageGUID& msgGUID)
312312
for (VirtualStoragesIter it = d_virtualStorages.begin();
313313
it != d_virtualStorages.end();
314314
++it) {
315-
it->value()->onGC(data->second);
315+
const mqbi::DataStreamMessage& dataStreamMessage = data->second;
316+
317+
const mqbi::AppMessage& appMessage =
318+
appMessageView(dataStreamMessage, it->value()->ordinal());
319+
if (!appMessage.isPending()) {
320+
it->value()->onGC(dataStreamMessage.d_size);
321+
}
316322
}
317323

318324
d_totalBytes -= data->second.d_size;
@@ -351,7 +357,8 @@ void VirtualStorageCatalog::removeAll(const mqbu::StorageKey& appKey)
351357
}
352358

353359
bsls::Types::Int64 VirtualStorageCatalog::seek(DataStreamIterator* it,
354-
const VirtualStorage* vs)
360+
const VirtualStorage* vs,
361+
bsls::Types::Int64* bytes)
355362
{
356363
BSLS_ASSERT_SAFE(vs);
357364

@@ -372,6 +379,11 @@ bsls::Types::Int64 VirtualStorageCatalog::seek(DataStreamIterator* it,
372379
break; // BREAK
373380
}
374381
// else 'dataStreamMessage' is older than this VirtualStorage
382+
383+
if (BSLS_PERFORMANCEHINT_PREDICT_UNLIKELY(bytes)) {
384+
BSLS_PERFORMANCEHINT_UNLIKELY_HINT;
385+
*bytes += data.d_size;
386+
}
375387
}
376388

377389
return result;
@@ -663,7 +675,11 @@ void VirtualStorageCatalog::calibrate()
663675
it != d_virtualStorages.end();
664676
++it) {
665677
DataStreamIterator itData;
666-
it->value()->setNumRemoved(seek(&itData, it->value().get()));
678+
bsls::Types::Int64 bytes = 0;
679+
bsls::Types::Int64 numMessages = seek(&itData,
680+
it->value().get(),
681+
&bytes);
682+
it->value()->setNumRemoved(numMessages, bytes);
667683
}
668684
}
669685

src/groups/mqb/mqbs/mqbs_virtualstoragecatalog.h

+6-2
Original file line numberDiff line numberDiff line change
@@ -255,8 +255,12 @@ class VirtualStorageCatalog {
255255

256256
/// Return the number of messages in the datastream which are older than
257257
/// the specified `vs`. Load into the specified `it` the iterator pointing
258-
/// either to the first newer message or to the end of the datastream.
259-
bsls::Types::Int64 seek(DataStreamIterator* it, const VirtualStorage* vs);
258+
/// either to the first newer message or to the end of the datastream. If
259+
/// the optionally specified `bytes` is not `0`, load the sum of relevant
260+
/// messages.
261+
bsls::Types::Int64 seek(DataStreamIterator* it,
262+
const VirtualStorage* vs,
263+
bsls::Types::Int64* bytes = 0);
260264

261265
/// Create, if it doesn't exist already, a virtual storage instance with
262266
/// the specified 'appId' and 'appKey'. Return zero upon success and a

src/integration-tests/test_appids.py

+52
Original file line numberDiff line numberDiff line change
@@ -871,3 +871,55 @@ def _verify_clients(andConfirm=False):
871871

872872
leader.list_messages(du.domain_fanout, tc.TEST_QUEUE, 0, 100)
873873
assert leader.outputs_substr(f"Printing 0 message(s)", 5)
874+
875+
876+
@tweak.domain.message_ttl(3)
877+
def test_gc_old_data_new_app(cluster: Cluster, domain_urls: tc.DomainUrls):
878+
"""Trigger old message GC in the presence of new App. Need to allocate
879+
Apps states first.
880+
"""
881+
du = domain_urls
882+
leader = cluster.last_known_leader
883+
proxies = cluster.proxy_cycle()
884+
proxy = next(proxies)
885+
886+
producer = proxy.create_client("producer")
887+
producer.open(du.uri_fanout, flags=["write,ack"], succeed=True)
888+
889+
app_id = default_app_ids[0]
890+
consumer = proxy.create_client(app_id)
891+
consumer_uri = f"{du.uri_fanout}?id={app_id}"
892+
consumer.open(consumer_uri, flags=["read"], succeed=True)
893+
894+
# ---------------------------------------------------------------------
895+
# Post a message.
896+
producer.post(du.uri_fanout, ["m1"], succeed=True, wait_ack=True)
897+
898+
# confirm, to make sure App state is allocated
899+
consumer.wait_push_event()
900+
901+
assert wait_until(
902+
lambda: len(consumer.list(consumer_uri, block=True)) == 1, timeout
903+
)
904+
assert consumer.confirm(consumer_uri, f"+{1}", block=True) == Client.e_SUCCESS
905+
906+
# ---------------------------------------------------------------------
907+
# +new_app_1
908+
new_app_1 = "new_app_1"
909+
set_app_ids(cluster, default_app_ids + [new_app_1], du)
910+
911+
assert consumer.close(consumer_uri, block=True) == Client.e_SUCCESS
912+
913+
# Observe that the message was GC'd from the queue.
914+
leader.capture(
915+
f"queue \\[{du.uri_fanout}\\].*garbage-collected \\[1\\] messages", timeout=5
916+
)
917+
918+
leader.list_messages(du.domain_fanout, tc.TEST_QUEUE, 0, 100)
919+
assert leader.outputs_substr(f"Printing 0 message(s)", 5)
920+
921+
leader.list_messages(du.domain_fanout, tc.TEST_QUEUE, 0, 100, appid=app_id)
922+
assert leader.outputs_substr(f"Printing 0 message(s)", 5)
923+
924+
leader.list_messages(du.domain_fanout, tc.TEST_QUEUE, 0, 100, appid=new_app_1)
925+
assert leader.outputs_substr(f"Printing 0 message(s)", 5)

0 commit comments

Comments
 (0)