@@ -47,8 +47,9 @@ struct PrometheusStatsReporter::PrometheusImpl {
47
47
};
48
48
49
49
PrometheusStatsReporter::PrometheusStatsReporter (
50
- const std::map<std::string, std::string>& labels) {
51
- impl_ = std::make_shared<PrometheusImpl>(labels);
50
+ const std::map<std::string, std::string>& labels, int numThreads)
51
+ : executor_(std::make_shared<folly::CPUThreadPoolExecutor>(numThreads)),
52
+ impl_ (std::make_shared<PrometheusImpl>(labels)) {
52
53
}
53
54
54
55
void PrometheusStatsReporter::registerMetricExportType (
@@ -161,36 +162,38 @@ void PrometheusStatsReporter::addMetricValue(
161
162
162
163
void PrometheusStatsReporter::addMetricValue (const char * key, size_t value)
163
164
const {
164
- auto metricIterator = registeredMetricsMap_.find (key);
165
- if (metricIterator == registeredMetricsMap_.end ()) {
166
- VLOG (1 ) << " addMetricValue called for unregistered metric " << key;
167
- return ;
168
- }
169
- auto statsInfo = metricIterator->second ;
170
- switch (statsInfo.statType ) {
171
- case velox::StatType::COUNT: {
172
- auto * counter =
173
- reinterpret_cast <::prometheus::Counter*>(statsInfo.metricPtr );
174
- counter->Increment (static_cast <double >(value));
175
- break ;
176
- }
177
- case velox::StatType::SUM: {
178
- auto * gauge = reinterpret_cast <::prometheus::Gauge*>(statsInfo.metricPtr );
179
- gauge->Increment (static_cast <double >(value));
180
- break ;
181
- }
182
- case velox::StatType::AVG:
183
- case velox::StatType::RATE: {
184
- // Overrides the existing state.
185
- auto * gauge = reinterpret_cast <::prometheus::Gauge*>(statsInfo.metricPtr );
186
- gauge->Set (static_cast <double >(value));
187
- break ;
165
+ executor_->add ([this , key, value]() {
166
+ auto metricIterator = registeredMetricsMap_.find (key);
167
+ if (metricIterator == registeredMetricsMap_.end ()) {
168
+ VLOG (1 ) << " addMetricValue called for unregistered metric " << key;
169
+ return ;
188
170
}
189
- default :
190
- VELOX_UNSUPPORTED (
191
- " Unsupported metric type {}" ,
192
- velox::statTypeString (statsInfo.statType ));
193
- };
171
+ auto statsInfo = metricIterator->second ;
172
+ switch (statsInfo.statType ) {
173
+ case velox::StatType::COUNT: {
174
+ auto * counter =
175
+ reinterpret_cast <::prometheus::Counter*>(statsInfo.metricPtr );
176
+ counter->Increment (static_cast <double >(value));
177
+ break ;
178
+ }
179
+ case velox::StatType::SUM: {
180
+ auto * gauge = reinterpret_cast <::prometheus::Gauge*>(statsInfo.metricPtr );
181
+ gauge->Increment (static_cast <double >(value));
182
+ break ;
183
+ }
184
+ case velox::StatType::AVG:
185
+ case velox::StatType::RATE: {
186
+ // Overrides the existing state.
187
+ auto * gauge = reinterpret_cast <::prometheus::Gauge*>(statsInfo.metricPtr );
188
+ gauge->Set (static_cast <double >(value));
189
+ break ;
190
+ }
191
+ default :
192
+ VELOX_UNSUPPORTED (
193
+ " Unsupported metric type {}" ,
194
+ velox::statTypeString (statsInfo.statType ));
195
+ };
196
+ });
194
197
}
195
198
196
199
void PrometheusStatsReporter::addMetricValue (
@@ -208,22 +211,24 @@ void PrometheusStatsReporter::addHistogramMetricValue(
208
211
void PrometheusStatsReporter::addHistogramMetricValue (
209
212
const char * key,
210
213
size_t value) const {
211
- auto metricIterator = registeredMetricsMap_.find (key);
212
- if (metricIterator == registeredMetricsMap_.end ()) {
213
- VLOG (1 ) << " addMetricValue for unregistered metric " << key;
214
- return ;
215
- }
216
- auto histogram = reinterpret_cast <::prometheus::Histogram*>(
217
- metricIterator->second .metricPtr );
218
- histogram->Observe (value);
219
-
220
- std::string summaryKey = std::string (key).append (kSummarySuffix );
221
- metricIterator = registeredMetricsMap_.find (summaryKey);
222
- if (metricIterator != registeredMetricsMap_.end ()) {
223
- auto summary = reinterpret_cast <::prometheus::Summary*>(
214
+ executor_->add ([this , key, value]() {
215
+ auto metricIterator = registeredMetricsMap_.find (key);
216
+ if (metricIterator == registeredMetricsMap_.end ()) {
217
+ VLOG (1 ) << " addMetricValue for unregistered metric " << key;
218
+ return ;
219
+ }
220
+ auto histogram = reinterpret_cast <::prometheus::Histogram*>(
224
221
metricIterator->second .metricPtr );
225
- summary->Observe (value);
226
- }
222
+ histogram->Observe (value);
223
+
224
+ std::string summaryKey = std::string (key).append (kSummarySuffix );
225
+ metricIterator = registeredMetricsMap_.find (summaryKey);
226
+ if (metricIterator != registeredMetricsMap_.end ()) {
227
+ auto summary = reinterpret_cast <::prometheus::Summary*>(
228
+ metricIterator->second .metricPtr );
229
+ summary->Observe (value);
230
+ }
231
+ });
227
232
}
228
233
229
234
void PrometheusStatsReporter::addHistogramMetricValue (
@@ -241,4 +246,8 @@ std::string PrometheusStatsReporter::fetchMetrics() {
241
246
return serializer.Serialize (impl_->registry ->Collect ());
242
247
}
243
248
249
+ void PrometheusStatsReporter::waitForCompletion () const {
250
+ executor_->join ();
251
+ }
252
+
244
253
} // namespace facebook::presto::prometheus
0 commit comments