Skip to content

Commit b0c75a3

Browse files
committed
Make prometheus metric updation async
1 parent fa9a268 commit b0c75a3

File tree

3 files changed

+75
-48
lines changed

3 files changed

+75
-48
lines changed

presto-native-execution/presto_cpp/main/runtime-metrics/PrometheusStatsReporter.cpp

+55-46
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,9 @@ struct PrometheusStatsReporter::PrometheusImpl {
4747
};
4848

4949
PrometheusStatsReporter::PrometheusStatsReporter(
50-
const std::map<std::string, std::string>& labels) {
51-
impl_ = std::make_shared<PrometheusImpl>(labels);
50+
const std::map<std::string, std::string>& labels, int numThreads)
51+
: executor_(std::make_shared<folly::CPUThreadPoolExecutor>(numThreads)),
52+
impl_(std::make_shared<PrometheusImpl>(labels)) {
5253
}
5354

5455
void PrometheusStatsReporter::registerMetricExportType(
@@ -161,36 +162,38 @@ void PrometheusStatsReporter::addMetricValue(
161162

162163
void PrometheusStatsReporter::addMetricValue(const char* key, size_t value)
163164
const {
164-
auto metricIterator = registeredMetricsMap_.find(key);
165-
if (metricIterator == registeredMetricsMap_.end()) {
166-
VLOG(1) << "addMetricValue called for unregistered metric " << key;
167-
return;
168-
}
169-
auto statsInfo = metricIterator->second;
170-
switch (statsInfo.statType) {
171-
case velox::StatType::COUNT: {
172-
auto* counter =
173-
reinterpret_cast<::prometheus::Counter*>(statsInfo.metricPtr);
174-
counter->Increment(static_cast<double>(value));
175-
break;
176-
}
177-
case velox::StatType::SUM: {
178-
auto* gauge = reinterpret_cast<::prometheus::Gauge*>(statsInfo.metricPtr);
179-
gauge->Increment(static_cast<double>(value));
180-
break;
181-
}
182-
case velox::StatType::AVG:
183-
case velox::StatType::RATE: {
184-
// Overrides the existing state.
185-
auto* gauge = reinterpret_cast<::prometheus::Gauge*>(statsInfo.metricPtr);
186-
gauge->Set(static_cast<double>(value));
187-
break;
165+
executor_->add([this, key, value]() {
166+
auto metricIterator = registeredMetricsMap_.find(key);
167+
if (metricIterator == registeredMetricsMap_.end()) {
168+
VLOG(1) << "addMetricValue called for unregistered metric " << key;
169+
return;
188170
}
189-
default:
190-
VELOX_UNSUPPORTED(
191-
"Unsupported metric type {}",
192-
velox::statTypeString(statsInfo.statType));
193-
};
171+
auto statsInfo = metricIterator->second;
172+
switch (statsInfo.statType) {
173+
case velox::StatType::COUNT: {
174+
auto* counter =
175+
reinterpret_cast<::prometheus::Counter*>(statsInfo.metricPtr);
176+
counter->Increment(static_cast<double>(value));
177+
break;
178+
}
179+
case velox::StatType::SUM: {
180+
auto* gauge = reinterpret_cast<::prometheus::Gauge*>(statsInfo.metricPtr);
181+
gauge->Increment(static_cast<double>(value));
182+
break;
183+
}
184+
case velox::StatType::AVG:
185+
case velox::StatType::RATE: {
186+
// Overrides the existing state.
187+
auto* gauge = reinterpret_cast<::prometheus::Gauge*>(statsInfo.metricPtr);
188+
gauge->Set(static_cast<double>(value));
189+
break;
190+
}
191+
default:
192+
VELOX_UNSUPPORTED(
193+
"Unsupported metric type {}",
194+
velox::statTypeString(statsInfo.statType));
195+
};
196+
});
194197
}
195198

196199
void PrometheusStatsReporter::addMetricValue(
@@ -208,22 +211,24 @@ void PrometheusStatsReporter::addHistogramMetricValue(
208211
void PrometheusStatsReporter::addHistogramMetricValue(
209212
const char* key,
210213
size_t value) const {
211-
auto metricIterator = registeredMetricsMap_.find(key);
212-
if (metricIterator == registeredMetricsMap_.end()) {
213-
VLOG(1) << "addMetricValue for unregistered metric " << key;
214-
return;
215-
}
216-
auto histogram = reinterpret_cast<::prometheus::Histogram*>(
217-
metricIterator->second.metricPtr);
218-
histogram->Observe(value);
219-
220-
std::string summaryKey = std::string(key).append(kSummarySuffix);
221-
metricIterator = registeredMetricsMap_.find(summaryKey);
222-
if (metricIterator != registeredMetricsMap_.end()) {
223-
auto summary = reinterpret_cast<::prometheus::Summary*>(
214+
executor_->add([this, key, value]() {
215+
auto metricIterator = registeredMetricsMap_.find(key);
216+
if (metricIterator == registeredMetricsMap_.end()) {
217+
VLOG(1) << "addMetricValue for unregistered metric " << key;
218+
return;
219+
}
220+
auto histogram = reinterpret_cast<::prometheus::Histogram*>(
224221
metricIterator->second.metricPtr);
225-
summary->Observe(value);
226-
}
222+
histogram->Observe(value);
223+
224+
std::string summaryKey = std::string(key).append(kSummarySuffix);
225+
metricIterator = registeredMetricsMap_.find(summaryKey);
226+
if (metricIterator != registeredMetricsMap_.end()) {
227+
auto summary = reinterpret_cast<::prometheus::Summary*>(
228+
metricIterator->second.metricPtr);
229+
summary->Observe(value);
230+
}
231+
});
227232
}
228233

229234
void PrometheusStatsReporter::addHistogramMetricValue(
@@ -241,4 +246,8 @@ std::string PrometheusStatsReporter::fetchMetrics() {
241246
return serializer.Serialize(impl_->registry->Collect());
242247
}
243248

249+
void PrometheusStatsReporter::waitForCompletion() const {
250+
executor_->join();
251+
}
252+
244253
} // namespace facebook::presto::prometheus

presto-native-execution/presto_cpp/main/runtime-metrics/PrometheusStatsReporter.h

+14-1
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
#include "velox/common/base/Exceptions.h"
1717
#include "velox/common/base/GTestMacros.h"
1818
#include "velox/common/base/StatsReporter.h"
19+
#include <folly/executors/CPUThreadPoolExecutor.h>
1920

2021
namespace facebook::presto::prometheus {
2122

@@ -38,8 +39,13 @@ class PrometheusStatsReporter : public facebook::velox::BaseStatsReporter {
3839
class PrometheusImpl;
3940

4041
public:
42+
/**
43+
* @brief Constructor with optional thread count (for testing).
44+
* @param labels Labels for metrics.
45+
* @param numThreads Number of threads in the executor (default: 2).
46+
*/
4147
explicit PrometheusStatsReporter(
42-
const std::map<std::string, std::string>& labels);
48+
const std::map<std::string, std::string>& labels, int numThreads = 2);
4349

4450
void registerMetricExportType(const char* key, velox::StatType)
4551
const override;
@@ -77,6 +83,12 @@ class PrometheusStatsReporter : public facebook::velox::BaseStatsReporter {
7783

7884
std::string fetchMetrics() override;
7985

86+
/**
87+
* Waits for all pending metric updates to complete.
88+
* This is only used in tests to ensure correct timing.
89+
*/
90+
void waitForCompletion() const;
91+
8092
static std::unique_ptr<velox::BaseStatsReporter> createPrometheusReporter() {
8193
auto nodeConfig = NodeConfig::instance();
8294
const std::string cluster = nodeConfig->nodeEnvironment();
@@ -88,6 +100,7 @@ class PrometheusStatsReporter : public facebook::velox::BaseStatsReporter {
88100
}
89101

90102
private:
103+
std::shared_ptr<folly::CPUThreadPoolExecutor> executor_;
91104
std::shared_ptr<PrometheusImpl> impl_;
92105
// A map of labels assigned to each metric which helps in filtering at client
93106
// end.

presto-native-execution/presto_cpp/main/runtime-metrics/tests/PrometheusReporterTest.cpp

+6-1
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ namespace facebook::presto::prometheus {
2020
class PrometheusReporterTest : public testing::Test {
2121
public:
2222
void SetUp() override {
23-
reporter = std::make_shared<PrometheusStatsReporter>(testLabels);
23+
reporter = std::make_shared<PrometheusStatsReporter>(testLabels, 1);
2424
}
2525

2626
void verifySerializedResult(
@@ -76,6 +76,9 @@ TEST_F(PrometheusReporterTest, testCountAndGauge) {
7676
reporter->addMetricValue("test.key1");
7777
reporter->addMetricValue("test.key3");
7878

79+
// 🔹 Wait for all async updates to finish before validation
80+
reporter->waitForCompletion();
81+
7982
auto fullSerializedResult = reporter->fetchMetrics();
8083

8184
std::vector<std::string> expected = {
@@ -114,6 +117,8 @@ TEST_F(PrometheusReporterTest, testHistogramSummary) {
114117
}
115118
}
116119
reporter->addHistogramMetricValue(histogramKey, 10);
120+
// 🔹 Wait for all async updates to finish before validation
121+
reporter->waitForCompletion();
117122
auto fullSerializedResult = reporter->fetchMetrics();
118123
std::replace(histSummaryKey.begin(), histSummaryKey.end(), '.', '_');
119124
std::replace(histogramKey.begin(), histogramKey.end(), '.', '_');

0 commit comments

Comments
 (0)