Skip to content

Commit

Permalink
Don't block for getting node pids in cpu mem monitor. (#764)
Browse files Browse the repository at this point in the history
  • Loading branch information
bcoltin authored Feb 12, 2024
1 parent 82dfa00 commit 3da8b0f
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 32 deletions.
2 changes: 0 additions & 2 deletions astrobee/config/management/cpu_mem_monitor.config
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
-- frequency to check and publish cpu stats

llp = {
update_pid_hz = 0.1,
update_freq_hz = 1,
temperature_scale = 0.001,
cpu_avg_load_limit = 100,
Expand All @@ -40,7 +39,6 @@ llp = {
}

mlp = {
update_pid_hz = 0.1,
update_freq_hz = 1,
temperature_scale = 1.0,
cpu_avg_load_limit = 100,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@

#include <sys/types.h>
#include <sys/sysinfo.h>
#include <shared_mutex>

#include <config_reader/config_reader.h>
#include <cpu_mem_monitor/cpu.h>
Expand Down Expand Up @@ -74,7 +75,7 @@ class CpuMemMonitor : public ff_util::FreeFlyerNodelet {

private:
// Get the PIDs of the nodes to monitor
void GetPIDs(ros::TimerEvent const &te);
void GetPIDs();

// Assert CPU loads and report if too high
void AssertCPULoadHighFaultCallback(ros::TimerEvent const& te);
Expand Down Expand Up @@ -119,18 +120,19 @@ class CpuMemMonitor : public ff_util::FreeFlyerNodelet {
virt_percentage;
};

std::shared_timed_mutex pid_lock_;

config_reader::ConfigReader config_params_;
ros::Publisher cpu_state_pub_; // Cpu stats publisher
ros::Publisher mem_state_pub_; // Memory stats publisher
ros::Timer reload_params_timer_; // Ckeck if parameters were updated
ros::Timer pid_timer_; // Update PIDs
ros::Timer stats_timer_; // Update stats
ros::Timer assert_cpu_load_fault_timer_; // Check cpu load limits
ros::Timer clear_cpu_load_fault_timer_; // Clear cpu fault timer
ros::Timer assert_mem_load_fault_timer_; // Check memory load limits
ros::Timer clear_mem_load_fault_timer_; // Clear memory fault timer
int pub_queue_size_; // Monitor publishing queue size
double update_freq_hz_, update_pid_hz_; // Publishing and PID update frequency
double update_freq_hz_; // Publishing update frequency
struct sysinfo mem_info_; // Scope memory info from sysinfo

unsigned int ncpus_; // Number of cpu's
Expand Down
51 changes: 24 additions & 27 deletions management/cpu_mem_monitor/src/cpu_mem_monitor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -84,13 +84,6 @@ void CpuMemMonitor::Initialize(ros::NodeHandle *nh) {
true,
false);

// Timer for updating PID values of interest nodes
pid_timer_ = nh->createTimer(ros::Duration(1 / update_pid_hz_),
&CpuMemMonitor::GetPIDs,
this,
false,
true);

// Timer for checking cpu and memory stats. Timer is not one shot and start it right away
stats_timer_ = nh->createTimer(ros::Duration(1 / update_freq_hz_),
&CpuMemMonitor::PublishStatsCallback,
Expand Down Expand Up @@ -155,6 +148,9 @@ void CpuMemMonitor::Initialize(ros::NodeHandle *nh) {
// Initialize the memory state message
mem_state_msg_.name = monitor_host_;
mem_state_msg_.nodes.resize(nodes_pid_.size());

std::thread pid_thread(&cpu_mem_monitor::CpuMemMonitor::GetPIDs, this);
pid_thread.detach();
}

bool CpuMemMonitor::ReadParams() {
Expand All @@ -171,15 +167,6 @@ bool CpuMemMonitor::ReadParams() {
config_reader::ConfigReader::Table processor_config(&config_params_,
processor_name_.c_str());

// get udpate pid frequency
if (!processor_config.GetPosReal("update_pid_hz", &update_pid_hz_)) {
err_msg = "CPU monitor: Update PID frequency not specified for " +
processor_name_;
FF_ERROR(err_msg);
this->AssertFault(ff_util::INITIALIZATION_FAILED, err_msg);
return false;
}

// get udpate stats frequency
if (!processor_config.GetPosReal("update_freq_hz", &update_freq_hz_)) {
err_msg = "CPU monitor: Update frequency not specified for " +
Expand Down Expand Up @@ -255,16 +242,18 @@ bool CpuMemMonitor::ReadParams() {
FF_ERROR(err_msg);
this->AssertFault(ff_util::INITIALIZATION_FAILED, err_msg);
}
for (int i = 0; i < nodes.GetSize(); i++) {
config_reader::ConfigReader::Table node;
if (!nodes.GetTable(i + 1, &node)) {
FF_ERROR("Could not get node table");
return false;
}
std::string name;
if (node.GetStr("name", &name)) {
NODELET_DEBUG_STREAM("Read node " << name);
nodes_pid_.insert(std::pair<std::string, int>(name, 0));
if (nodes_pid_.size() == 0) { // modifying nodes while running not supported
for (int i = 0; i < nodes.GetSize(); i++) {
config_reader::ConfigReader::Table node;
if (!nodes.GetTable(i + 1, &node)) {
FF_ERROR("Could not get node table");
return false;
}
std::string name;
if (node.GetStr("name", &name)) {
NODELET_DEBUG_STREAM("Read node " << name);
nodes_pid_.insert(std::pair<std::string, int>(name, 0));
}
}
}
return true;
Expand Down Expand Up @@ -308,7 +297,7 @@ void CpuMemMonitor::ClearMemLoadHighFaultCallback(ros::TimerEvent const& te) {
load_fault_state_ = CLEARED;
}

void CpuMemMonitor::GetPIDs(ros::TimerEvent const &te) {
void CpuMemMonitor::GetPIDs() {
// Go through all the node list and get the PID
XmlRpc::XmlRpcValue args, result, payload;
std::map<std::string, int>::iterator it;
Expand All @@ -321,18 +310,21 @@ void CpuMemMonitor::GetPIDs(ros::TimerEvent const &te) {
args[0] = ros::this_node::getName();
args[1] = it->first;
if (!ros::master::execute("lookupNode", args, result, payload, true)) {
std::unique_lock<std::shared_timed_mutex> lock(pid_lock_);
it->second = -1;
continue;
}
std::string node_host = getHostfromURI(result[2]);
if (node_host.empty()) {
std::unique_lock<std::shared_timed_mutex> lock(pid_lock_);
it->second = -1;
continue;
}

// If it is not in the same cpu
if (node_host != monitor_host_) {
// Insert it on the list
std::unique_lock<std::shared_timed_mutex> lock(pid_lock_);
it->second = -1;
std::string err_msg = "CPU Memory Monitor: Specified node " + it->first + "in" + monitor_host_ +
" and not in the same cpu as manager " + monitor_host_ + ".";
Expand All @@ -346,6 +338,7 @@ void CpuMemMonitor::GetPIDs(ros::TimerEvent const &te) {
FILE* pipe = popen(("rosnode info " + it->first +
" 2>/dev/null | grep Pid| cut -d' ' -f2").c_str(), "r");
if (!pipe) {
std::unique_lock<std::shared_timed_mutex> lock(pid_lock_);
it->second = -1;
std::string err_msg = "CPU Memory Monitor: Could not open rosnode process for node " + it->first;
FF_ERROR(err_msg);
Expand All @@ -357,6 +350,7 @@ void CpuMemMonitor::GetPIDs(ros::TimerEvent const &te) {

if (pid.empty()) {
// Node not found
std::unique_lock<std::shared_timed_mutex> lock(pid_lock_);
it->second = -1;
std::string err_msg = "CPU Memory Monitor: Specified node " +
it->first + "does not have a PID.";
Expand All @@ -365,6 +359,7 @@ void CpuMemMonitor::GetPIDs(ros::TimerEvent const &te) {
}
pclose(pipe);
// Insert it on the list
std::unique_lock<std::shared_timed_mutex> lock(pid_lock_);
it->second = std::stoi(pid);
}
}
Expand Down Expand Up @@ -484,6 +479,7 @@ int CpuMemMonitor::CollectCPUStats() {
std::map<std::string, int>::iterator it;
std::map<std::string, uint64_t>::iterator it_load;
int i;
std::shared_lock<std::shared_timed_mutex> lock(pid_lock_);
for ( i = 0, it = nodes_pid_.begin(); it != nodes_pid_.end(); i++, it++ ) {
// Look if PID is invalid
if (it->second <= 0)
Expand Down Expand Up @@ -625,6 +621,7 @@ int CpuMemMonitor::CollectMemStats() {
// Go through all the node list and check memory usage
int i = -1;
std::map<std::string, int>::iterator it;
std::shared_lock<std::shared_timed_mutex> lock(pid_lock_);
for ( it = nodes_pid_.begin(); it != nodes_pid_.end(); it++ ) {
// Increment counter
i++;
Expand Down

0 comments on commit 3da8b0f

Please sign in to comment.