Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit 115db99

Browse files
committedMar 19, 2025·
Make prometheus metric updation async
1 parent fa9a268 commit 115db99

File tree

5 files changed

+127
-50
lines changed

5 files changed

+127
-50
lines changed
 

‎presto-native-execution/presto_cpp/main/common/Configs.cpp

+10
Original file line numberDiff line numberDiff line change
@@ -806,6 +806,16 @@ std::string NodeConfig::nodeEnvironment() const {
806806
return requiredProperty(kNodeEnvironment);
807807
}
808808

809+
int NodeConfig::prometheusExecutorThreads() const {
810+
static constexpr int
811+
kNodePrometheusExecutorThreadsDefault = 2;
812+
auto resultOpt = optionalProperty<int>(kNodePrometheusExecutorThreads);
813+
if (resultOpt.hasValue()) {
814+
return resultOpt.value();
815+
}
816+
return kNodePrometheusExecutorThreadsDefault;
817+
}
818+
809819
std::string NodeConfig::nodeId() const {
810820
auto resultOpt = optionalProperty(kNodeId);
811821
if (resultOpt.hasValue()) {

‎presto-native-execution/presto_cpp/main/common/Configs.h

+3
Original file line numberDiff line numberDiff line change
@@ -916,6 +916,7 @@ class NodeConfig : public ConfigBase {
916916
static constexpr std::string_view kNodeInternalAddress{
917917
"node.internal-address"};
918918
static constexpr std::string_view kNodeLocation{"node.location"};
919+
static constexpr std::string_view kNodePrometheusExecutorThreads{"node.prometheus.executor-threads"};
919920

920921
NodeConfig();
921922

@@ -925,6 +926,8 @@ class NodeConfig : public ConfigBase {
925926

926927
std::string nodeEnvironment() const;
927928

929+
int prometheusExecutorThreads() const;
930+
928931
std::string nodeId() const;
929932

930933
std::string nodeInternalAddress(

‎presto-native-execution/presto_cpp/main/runtime-metrics/PrometheusStatsReporter.cpp

+55-46
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,9 @@ struct PrometheusStatsReporter::PrometheusImpl {
4747
};
4848

4949
PrometheusStatsReporter::PrometheusStatsReporter(
50-
const std::map<std::string, std::string>& labels) {
51-
impl_ = std::make_shared<PrometheusImpl>(labels);
50+
const std::map<std::string, std::string>& labels, int numThreads)
51+
: executor_(std::make_shared<folly::CPUThreadPoolExecutor>(numThreads)),
52+
impl_(std::make_shared<PrometheusImpl>(labels)) {
5253
}
5354

5455
void PrometheusStatsReporter::registerMetricExportType(
@@ -161,36 +162,38 @@ void PrometheusStatsReporter::addMetricValue(
161162

162163
void PrometheusStatsReporter::addMetricValue(const char* key, size_t value)
163164
const {
164-
auto metricIterator = registeredMetricsMap_.find(key);
165-
if (metricIterator == registeredMetricsMap_.end()) {
166-
VLOG(1) << "addMetricValue called for unregistered metric " << key;
167-
return;
168-
}
169-
auto statsInfo = metricIterator->second;
170-
switch (statsInfo.statType) {
171-
case velox::StatType::COUNT: {
172-
auto* counter =
173-
reinterpret_cast<::prometheus::Counter*>(statsInfo.metricPtr);
174-
counter->Increment(static_cast<double>(value));
175-
break;
176-
}
177-
case velox::StatType::SUM: {
178-
auto* gauge = reinterpret_cast<::prometheus::Gauge*>(statsInfo.metricPtr);
179-
gauge->Increment(static_cast<double>(value));
180-
break;
181-
}
182-
case velox::StatType::AVG:
183-
case velox::StatType::RATE: {
184-
// Overrides the existing state.
185-
auto* gauge = reinterpret_cast<::prometheus::Gauge*>(statsInfo.metricPtr);
186-
gauge->Set(static_cast<double>(value));
187-
break;
165+
executor_->add([this, key, value]() {
166+
auto metricIterator = registeredMetricsMap_.find(key);
167+
if (metricIterator == registeredMetricsMap_.end()) {
168+
VLOG(1) << "addMetricValue called for unregistered metric " << key;
169+
return;
188170
}
189-
default:
190-
VELOX_UNSUPPORTED(
191-
"Unsupported metric type {}",
192-
velox::statTypeString(statsInfo.statType));
193-
};
171+
auto statsInfo = metricIterator->second;
172+
switch (statsInfo.statType) {
173+
case velox::StatType::COUNT: {
174+
auto* counter =
175+
reinterpret_cast<::prometheus::Counter*>(statsInfo.metricPtr);
176+
counter->Increment(static_cast<double>(value));
177+
break;
178+
}
179+
case velox::StatType::SUM: {
180+
auto* gauge = reinterpret_cast<::prometheus::Gauge*>(statsInfo.metricPtr);
181+
gauge->Increment(static_cast<double>(value));
182+
break;
183+
}
184+
case velox::StatType::AVG:
185+
case velox::StatType::RATE: {
186+
// Overrides the existing state.
187+
auto* gauge = reinterpret_cast<::prometheus::Gauge*>(statsInfo.metricPtr);
188+
gauge->Set(static_cast<double>(value));
189+
break;
190+
}
191+
default:
192+
VELOX_UNSUPPORTED(
193+
"Unsupported metric type {}",
194+
velox::statTypeString(statsInfo.statType));
195+
};
196+
});
194197
}
195198

196199
void PrometheusStatsReporter::addMetricValue(
@@ -208,22 +211,24 @@ void PrometheusStatsReporter::addHistogramMetricValue(
208211
void PrometheusStatsReporter::addHistogramMetricValue(
209212
const char* key,
210213
size_t value) const {
211-
auto metricIterator = registeredMetricsMap_.find(key);
212-
if (metricIterator == registeredMetricsMap_.end()) {
213-
VLOG(1) << "addMetricValue for unregistered metric " << key;
214-
return;
215-
}
216-
auto histogram = reinterpret_cast<::prometheus::Histogram*>(
217-
metricIterator->second.metricPtr);
218-
histogram->Observe(value);
219-
220-
std::string summaryKey = std::string(key).append(kSummarySuffix);
221-
metricIterator = registeredMetricsMap_.find(summaryKey);
222-
if (metricIterator != registeredMetricsMap_.end()) {
223-
auto summary = reinterpret_cast<::prometheus::Summary*>(
214+
executor_->add([this, key, value]() {
215+
auto metricIterator = registeredMetricsMap_.find(key);
216+
if (metricIterator == registeredMetricsMap_.end()) {
217+
VLOG(1) << "addMetricValue for unregistered metric " << key;
218+
return;
219+
}
220+
auto histogram = reinterpret_cast<::prometheus::Histogram*>(
224221
metricIterator->second.metricPtr);
225-
summary->Observe(value);
226-
}
222+
histogram->Observe(value);
223+
224+
std::string summaryKey = std::string(key).append(kSummarySuffix);
225+
metricIterator = registeredMetricsMap_.find(summaryKey);
226+
if (metricIterator != registeredMetricsMap_.end()) {
227+
auto summary = reinterpret_cast<::prometheus::Summary*>(
228+
metricIterator->second.metricPtr);
229+
summary->Observe(value);
230+
}
231+
});
227232
}
228233

229234
void PrometheusStatsReporter::addHistogramMetricValue(
@@ -241,4 +246,8 @@ std::string PrometheusStatsReporter::fetchMetrics() {
241246
return serializer.Serialize(impl_->registry->Collect());
242247
}
243248

249+
void PrometheusStatsReporter::waitForCompletion() const {
250+
executor_->join();
251+
}
252+
244253
} // namespace facebook::presto::prometheus

‎presto-native-execution/presto_cpp/main/runtime-metrics/PrometheusStatsReporter.h

+18-3
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
#include "velox/common/base/Exceptions.h"
1717
#include "velox/common/base/GTestMacros.h"
1818
#include "velox/common/base/StatsReporter.h"
19+
#include <folly/executors/CPUThreadPoolExecutor.h>
1920

2021
namespace facebook::presto::prometheus {
2122

@@ -38,8 +39,13 @@ class PrometheusStatsReporter : public facebook::velox::BaseStatsReporter {
3839
class PrometheusImpl;
3940

4041
public:
42+
/**
43+
* @brief Constructor with optional thread count
44+
* @param labels Labels for metrics.
45+
* @param numThreads Number of threads in the executor
46+
*/
4147
explicit PrometheusStatsReporter(
42-
const std::map<std::string, std::string>& labels);
48+
const std::map<std::string, std::string>& labels, int numThreads);
4349

4450
void registerMetricExportType(const char* key, velox::StatType)
4551
const override;
@@ -77,21 +83,30 @@ class PrometheusStatsReporter : public facebook::velox::BaseStatsReporter {
7783

7884
std::string fetchMetrics() override;
7985

86+
/**
87+
* Waits for all pending metric updates to complete.
88+
* This is only used in tests to ensure correct timing.
89+
*/
90+
void waitForCompletion() const;
91+
8092
static std::unique_ptr<velox::BaseStatsReporter> createPrometheusReporter() {
8193
auto nodeConfig = NodeConfig::instance();
8294
const std::string cluster = nodeConfig->nodeEnvironment();
8395
const char* hostName = std::getenv("HOSTNAME");
8496
const std::string worker = !hostName ? "" : hostName;
8597
std::map<std::string, std::string> labels{
8698
{"cluster", cluster}, {"worker", worker}};
87-
return std::make_unique<PrometheusStatsReporter>(labels);
99+
return std::make_unique<PrometheusStatsReporter>(labels, nodeConfig->prometheusExecutorThreads());
88100
}
89101

102+
// Visible for testing
103+
mutable std::unordered_map<std::string, StatsInfo> registeredMetricsMap_;
104+
90105
private:
106+
std::shared_ptr<folly::CPUThreadPoolExecutor> executor_;
91107
std::shared_ptr<PrometheusImpl> impl_;
92108
// A map of labels assigned to each metric which helps in filtering at client
93109
// end.
94-
mutable std::unordered_map<std::string, StatsInfo> registeredMetricsMap_;
95110
VELOX_FRIEND_TEST(PrometheusReporterTest, testCountAndGauge);
96111
VELOX_FRIEND_TEST(PrometheusReporterTest, testHistogramSummary);
97112
};

‎presto-native-execution/presto_cpp/main/runtime-metrics/tests/PrometheusReporterTest.cpp

+41-1
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,8 @@ namespace facebook::presto::prometheus {
2020
class PrometheusReporterTest : public testing::Test {
2121
public:
2222
void SetUp() override {
23-
reporter = std::make_shared<PrometheusStatsReporter>(testLabels);
23+
reporter = std::make_shared<PrometheusStatsReporter>(testLabels, 1);
24+
multiThreadedReporter = std::make_shared<PrometheusStatsReporter>(testLabels, 2);
2425
}
2526

2627
void verifySerializedResult(
@@ -40,6 +41,43 @@ class PrometheusReporterTest : public testing::Test {
4041
const std::string labelsSerialized =
4142
R"(cluster="test_cluster",worker="test_worker_pod")";
4243
std::shared_ptr<PrometheusStatsReporter> reporter;
44+
std::shared_ptr<PrometheusStatsReporter> multiThreadedReporter;
45+
};
46+
47+
TEST_F(PrometheusReporterTest, testConcurrentReporting) {
48+
multiThreadedReporter->registerMetricExportType(
49+
"test.key1", facebook::velox::StatType::COUNT);
50+
multiThreadedReporter->registerMetricExportType(
51+
"test.key3", facebook::velox::StatType::SUM);
52+
EXPECT_EQ(
53+
facebook::velox::StatType::COUNT,
54+
multiThreadedReporter->registeredMetricsMap_.find("test.key1")->second.statType);
55+
EXPECT_EQ(
56+
facebook::velox::StatType::SUM,
57+
multiThreadedReporter->registeredMetricsMap_.find("test.key3")->second.statType);
58+
59+
std::vector<size_t> testData = {10, 12, 14};
60+
for (auto i : testData) {
61+
multiThreadedReporter->addMetricValue("test.key1", i);
62+
multiThreadedReporter->addMetricValue("test.key3", i + 2000);
63+
}
64+
65+
// Uses default value of 1 for second parameter.
66+
multiThreadedReporter->addMetricValue("test.key1");
67+
multiThreadedReporter->addMetricValue("test.key3");
68+
69+
// Wait for all async updates to finish before validation
70+
multiThreadedReporter->waitForCompletion();
71+
72+
auto fullSerializedResult = multiThreadedReporter->fetchMetrics();
73+
74+
std::vector<std::string> expected = {
75+
"# TYPE test_key1 counter",
76+
"test_key1{" + labelsSerialized + "} 37",
77+
"# TYPE test_key3 gauge",
78+
"test_key3{" + labelsSerialized + "} 6037"};
79+
80+
verifySerializedResult(fullSerializedResult, expected);
4381
};
4482

4583
TEST_F(PrometheusReporterTest, testCountAndGauge) {
@@ -75,6 +113,7 @@ TEST_F(PrometheusReporterTest, testCountAndGauge) {
75113
// Uses default value of 1 for second parameter.
76114
reporter->addMetricValue("test.key1");
77115
reporter->addMetricValue("test.key3");
116+
reporter->waitForCompletion();
78117

79118
auto fullSerializedResult = reporter->fetchMetrics();
80119

@@ -114,6 +153,7 @@ TEST_F(PrometheusReporterTest, testHistogramSummary) {
114153
}
115154
}
116155
reporter->addHistogramMetricValue(histogramKey, 10);
156+
reporter->waitForCompletion();
117157
auto fullSerializedResult = reporter->fetchMetrics();
118158
std::replace(histSummaryKey.begin(), histSummaryKey.end(), '.', '_');
119159
std::replace(histogramKey.begin(), histogramKey.end(), '.', '_');

0 commit comments

Comments
 (0)
Please sign in to comment.