Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[native] Update prometheus metrics asynchronously #24716

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions presto-native-execution/presto_cpp/main/common/Configs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -851,6 +851,7 @@ NodeConfig::NodeConfig() {
NONE_PROP(kNodeIp),
NONE_PROP(kNodeInternalAddress),
NONE_PROP(kNodeLocation),
NONE_PROP(kNodePrometheusExecutorThreads),
};
}

Expand All @@ -863,6 +864,16 @@ std::string NodeConfig::nodeEnvironment() const {
return requiredProperty(kNodeEnvironment);
}

int NodeConfig::prometheusExecutorThreads() const {
static constexpr int
kNodePrometheusExecutorThreadsDefault = 2;
auto resultOpt = optionalProperty<int>(kNodePrometheusExecutorThreads);
if (resultOpt.hasValue()) {
return resultOpt.value();
}
return kNodePrometheusExecutorThreadsDefault;
}

std::string NodeConfig::nodeId() const {
auto resultOpt = optionalProperty(kNodeId);
if (resultOpt.hasValue()) {
Expand Down
3 changes: 3 additions & 0 deletions presto-native-execution/presto_cpp/main/common/Configs.h
Original file line number Diff line number Diff line change
Expand Up @@ -978,6 +978,7 @@ class NodeConfig : public ConfigBase {
static constexpr std::string_view kNodeInternalAddress{
"node.internal-address"};
static constexpr std::string_view kNodeLocation{"node.location"};
static constexpr std::string_view kNodePrometheusExecutorThreads{"node.prometheus.executor-threads"};

NodeConfig();

Expand All @@ -987,6 +988,8 @@ class NodeConfig : public ConfigBase {

std::string nodeEnvironment() const;

int prometheusExecutorThreads() const;

std::string nodeId() const;

std::string nodeInternalAddress(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,11 @@ TEST_F(ConfigTest, optionalNodeConfigs) {
init(config, {{std::string(NodeConfig::kNodeIp), "127.0.0.1"}});
ASSERT_EQ(
config.nodeInternalAddress([]() { return "0.0.0.0"; }), "127.0.0.1");

// make sure "node.kNodePrometheusExecutorThreads" works too
init(config, {{std::string(NodeConfig::kNodePrometheusExecutorThreads), "4"}});
ASSERT_EQ(
config.prometheusExecutorThreads(), 4);
}

TEST_F(ConfigTest, optionalSystemConfigsWithDefault) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,9 @@ struct PrometheusStatsReporter::PrometheusImpl {
};

PrometheusStatsReporter::PrometheusStatsReporter(
const std::map<std::string, std::string>& labels) {
impl_ = std::make_shared<PrometheusImpl>(labels);
const std::map<std::string, std::string>& labels, int numThreads)
: executor_(std::make_shared<folly::CPUThreadPoolExecutor>(numThreads)),
impl_(std::make_shared<PrometheusImpl>(labels)) {
}

void PrometheusStatsReporter::registerMetricExportType(
Expand Down Expand Up @@ -161,36 +162,38 @@ void PrometheusStatsReporter::addMetricValue(

void PrometheusStatsReporter::addMetricValue(const char* key, size_t value)
const {
auto metricIterator = registeredMetricsMap_.find(key);
if (metricIterator == registeredMetricsMap_.end()) {
VLOG(1) << "addMetricValue called for unregistered metric " << key;
return;
}
auto statsInfo = metricIterator->second;
switch (statsInfo.statType) {
case velox::StatType::COUNT: {
auto* counter =
reinterpret_cast<::prometheus::Counter*>(statsInfo.metricPtr);
counter->Increment(static_cast<double>(value));
break;
}
case velox::StatType::SUM: {
auto* gauge = reinterpret_cast<::prometheus::Gauge*>(statsInfo.metricPtr);
gauge->Increment(static_cast<double>(value));
break;
}
case velox::StatType::AVG:
case velox::StatType::RATE: {
// Overrides the existing state.
auto* gauge = reinterpret_cast<::prometheus::Gauge*>(statsInfo.metricPtr);
gauge->Set(static_cast<double>(value));
break;
executor_->add([this, key, value]() {
auto metricIterator = registeredMetricsMap_.find(key);
if (metricIterator == registeredMetricsMap_.end()) {
VLOG(1) << "addMetricValue called for unregistered metric " << key;
return;
}
default:
VELOX_UNSUPPORTED(
"Unsupported metric type {}",
velox::statTypeString(statsInfo.statType));
};
auto statsInfo = metricIterator->second;
switch (statsInfo.statType) {
case velox::StatType::COUNT: {
auto* counter =
reinterpret_cast<::prometheus::Counter*>(statsInfo.metricPtr);
counter->Increment(static_cast<double>(value));
break;
}
case velox::StatType::SUM: {
auto* gauge = reinterpret_cast<::prometheus::Gauge*>(statsInfo.metricPtr);
gauge->Increment(static_cast<double>(value));
break;
}
case velox::StatType::AVG:
case velox::StatType::RATE: {
// Overrides the existing state.
auto* gauge = reinterpret_cast<::prometheus::Gauge*>(statsInfo.metricPtr);
gauge->Set(static_cast<double>(value));
break;
}
default:
VELOX_UNSUPPORTED(
"Unsupported metric type {}",
velox::statTypeString(statsInfo.statType));
};
});
}

void PrometheusStatsReporter::addMetricValue(
Expand All @@ -208,22 +211,24 @@ void PrometheusStatsReporter::addHistogramMetricValue(
void PrometheusStatsReporter::addHistogramMetricValue(
const char* key,
size_t value) const {
auto metricIterator = registeredMetricsMap_.find(key);
if (metricIterator == registeredMetricsMap_.end()) {
VLOG(1) << "addMetricValue for unregistered metric " << key;
return;
}
auto histogram = reinterpret_cast<::prometheus::Histogram*>(
metricIterator->second.metricPtr);
histogram->Observe(value);

std::string summaryKey = std::string(key).append(kSummarySuffix);
metricIterator = registeredMetricsMap_.find(summaryKey);
if (metricIterator != registeredMetricsMap_.end()) {
auto summary = reinterpret_cast<::prometheus::Summary*>(
executor_->add([this, key, value]() {
auto metricIterator = registeredMetricsMap_.find(key);
if (metricIterator == registeredMetricsMap_.end()) {
VLOG(1) << "addMetricValue for unregistered metric " << key;
return;
}
auto histogram = reinterpret_cast<::prometheus::Histogram*>(
metricIterator->second.metricPtr);
summary->Observe(value);
}
histogram->Observe(value);

std::string summaryKey = std::string(key).append(kSummarySuffix);
metricIterator = registeredMetricsMap_.find(summaryKey);
if (metricIterator != registeredMetricsMap_.end()) {
auto summary = reinterpret_cast<::prometheus::Summary*>(
metricIterator->second.metricPtr);
summary->Observe(value);
}
});
}

void PrometheusStatsReporter::addHistogramMetricValue(
Expand All @@ -241,4 +246,8 @@ std::string PrometheusStatsReporter::fetchMetrics() {
return serializer.Serialize(impl_->registry->Collect());
}

void PrometheusStatsReporter::waitForCompletion() const {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this need to be called in the destructor as well ? Would ensure we didn't lose metrics at shutdown.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope we don't need in destructor. Prometheus is pull based and unless the metrics are pulled by the backend before closing that would be useless.

executor_->join();
}

} // namespace facebook::presto::prometheus
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include "velox/common/base/Exceptions.h"
#include "velox/common/base/GTestMacros.h"
#include "velox/common/base/StatsReporter.h"
#include <folly/executors/CPUThreadPoolExecutor.h>

namespace facebook::presto::prometheus {

Expand All @@ -38,8 +39,13 @@ class PrometheusStatsReporter : public facebook::velox::BaseStatsReporter {
class PrometheusImpl;

public:
/**
* @brief Constructor with optional thread count
* @param labels Labels for metrics.
* @param numThreads Number of threads in the executor
*/
explicit PrometheusStatsReporter(
const std::map<std::string, std::string>& labels);
const std::map<std::string, std::string>& labels, int numThreads);

void registerMetricExportType(const char* key, velox::StatType)
const override;
Expand Down Expand Up @@ -77,21 +83,30 @@ class PrometheusStatsReporter : public facebook::velox::BaseStatsReporter {

std::string fetchMetrics() override;

/**
* Waits for all pending metric updates to complete.
* This is only used in tests to ensure correct timing.
*/
void waitForCompletion() const;

static std::unique_ptr<velox::BaseStatsReporter> createPrometheusReporter() {
auto nodeConfig = NodeConfig::instance();
const std::string cluster = nodeConfig->nodeEnvironment();
const char* hostName = std::getenv("HOSTNAME");
const std::string worker = !hostName ? "" : hostName;
std::map<std::string, std::string> labels{
{"cluster", cluster}, {"worker", worker}};
return std::make_unique<PrometheusStatsReporter>(labels);
return std::make_unique<PrometheusStatsReporter>(labels, nodeConfig->prometheusExecutorThreads());
}

// Visible for testing
mutable std::unordered_map<std::string, StatsInfo> registeredMetricsMap_;

private:
std::shared_ptr<folly::CPUThreadPoolExecutor> executor_;
std::shared_ptr<PrometheusImpl> impl_;
// A map of labels assigned to each metric which helps in filtering at client
// end.
mutable std::unordered_map<std::string, StatsInfo> registeredMetricsMap_;
VELOX_FRIEND_TEST(PrometheusReporterTest, testCountAndGauge);
VELOX_FRIEND_TEST(PrometheusReporterTest, testHistogramSummary);
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ namespace facebook::presto::prometheus {
class PrometheusReporterTest : public testing::Test {
public:
void SetUp() override {
reporter = std::make_shared<PrometheusStatsReporter>(testLabels);
reporter = std::make_shared<PrometheusStatsReporter>(testLabels, 1);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not test with multiple threads?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem is that for gauge metric its the last value which stays so the testing has to be in a single thread for it to be deterministic

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jaystarshot : Could we have a single test without gauge metrics but with the multi-threading. Without that there is no signal that this code is working.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

multiThreadedReporter = std::make_shared<PrometheusStatsReporter>(testLabels, 2);
}

void verifySerializedResult(
Expand All @@ -40,6 +41,43 @@ class PrometheusReporterTest : public testing::Test {
const std::string labelsSerialized =
R"(cluster="test_cluster",worker="test_worker_pod")";
std::shared_ptr<PrometheusStatsReporter> reporter;
std::shared_ptr<PrometheusStatsReporter> multiThreadedReporter;
};

TEST_F(PrometheusReporterTest, testConcurrentReporting) {
multiThreadedReporter->registerMetricExportType(
"test.key1", facebook::velox::StatType::COUNT);
multiThreadedReporter->registerMetricExportType(
"test.key3", facebook::velox::StatType::SUM);
EXPECT_EQ(
facebook::velox::StatType::COUNT,
multiThreadedReporter->registeredMetricsMap_.find("test.key1")->second.statType);
EXPECT_EQ(
facebook::velox::StatType::SUM,
multiThreadedReporter->registeredMetricsMap_.find("test.key3")->second.statType);

std::vector<size_t> testData = {10, 12, 14};
for (auto i : testData) {
multiThreadedReporter->addMetricValue("test.key1", i);
multiThreadedReporter->addMetricValue("test.key3", i + 2000);
}

// Uses default value of 1 for second parameter.
multiThreadedReporter->addMetricValue("test.key1");
multiThreadedReporter->addMetricValue("test.key3");

// Wait for all async updates to finish before validation
multiThreadedReporter->waitForCompletion();

auto fullSerializedResult = multiThreadedReporter->fetchMetrics();

std::vector<std::string> expected = {
"# TYPE test_key1 counter",
"test_key1{" + labelsSerialized + "} 37",
"# TYPE test_key3 gauge",
"test_key3{" + labelsSerialized + "} 6037"};

verifySerializedResult(fullSerializedResult, expected);
};

TEST_F(PrometheusReporterTest, testCountAndGauge) {
Expand Down Expand Up @@ -75,6 +113,7 @@ TEST_F(PrometheusReporterTest, testCountAndGauge) {
// Uses default value of 1 for second parameter.
reporter->addMetricValue("test.key1");
reporter->addMetricValue("test.key3");
reporter->waitForCompletion();

auto fullSerializedResult = reporter->fetchMetrics();

Expand Down Expand Up @@ -114,6 +153,7 @@ TEST_F(PrometheusReporterTest, testHistogramSummary) {
}
}
reporter->addHistogramMetricValue(histogramKey, 10);
reporter->waitForCompletion();
auto fullSerializedResult = reporter->fetchMetrics();
std::replace(histSummaryKey.begin(), histSummaryKey.end(), '.', '_');
std::replace(histogramKey.begin(), histogramKey.end(), '.', '_');
Expand Down
Loading