Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit e67c355

Browse files
committedJan 29, 2025·
feat: add sysinfo metrics
collect CPU usage, memory usage of the server collect disk usage of the volume - data, staging, hot-tier add these metrics to Prometheus Metrics export these metrics to cluster metrics API add the metrics to pmeta stream add the querier node's sysinfo metrics to pmeta and cluster metrics API
1 parent cb8b6c1 commit e67c355

File tree

8 files changed

+535
-141
lines changed

8 files changed

+535
-141
lines changed
 

‎src/correlation.rs

+3-4
Original file line numberDiff line numberDiff line change
@@ -129,10 +129,9 @@ impl Correlations {
129129
.await?;
130130

131131
// Update in memory
132-
self.write().await.insert(
133-
correlation.id.to_owned(),
134-
correlation.clone(),
135-
);
132+
self.write()
133+
.await
134+
.insert(correlation.id.to_owned(), correlation.clone());
136135

137136
Ok(correlation)
138137
}

‎src/handlers/http/cluster/mod.rs

+13-23
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ use crate::storage::{ObjectStorageError, STREAM_ROOT_DIRECTORY};
3535
use crate::storage::{ObjectStoreFormat, PARSEABLE_ROOT_DIRECTORY};
3636
use crate::HTTP_CLIENT;
3737
use actix_web::http::header::{self, HeaderMap};
38-
use actix_web::web::Path;
38+
use actix_web::web::{Json, Path};
3939
use actix_web::Responder;
4040
use bytes::Bytes;
4141
use chrono::Utc;
@@ -729,6 +729,9 @@ async fn fetch_cluster_metrics() -> Result<Vec<Metrics>, PostError> {
729729
let mut dresses = vec![];
730730

731731
for ingestor in ingestor_metadata {
732+
if !utils::check_liveness(&ingestor.domain_name).await {
733+
continue;
734+
}
732735
let uri = Url::parse(&format!(
733736
"{}{}/metrics",
734737
&ingestor.domain_name,
@@ -749,11 +752,10 @@ async fn fetch_cluster_metrics() -> Result<Vec<Metrics>, PostError> {
749752
let text = res.text().await.map_err(PostError::NetworkError)?;
750753
let lines: Vec<Result<String, std::io::Error>> =
751754
text.lines().map(|line| Ok(line.to_owned())).collect_vec();
752-
753755
let sample = prometheus_parse::Scrape::parse(lines.into_iter())
754756
.map_err(|err| PostError::CustomError(err.to_string()))?
755757
.samples;
756-
let ingestor_metrics = Metrics::from_prometheus_samples(sample, &ingestor)
758+
let ingestor_metrics = Metrics::ingestor_prometheus_samples(sample, &ingestor)
757759
.await
758760
.map_err(|err| {
759761
error!("Fatal: failed to get ingestor metrics: {:?}", err);
@@ -767,10 +769,11 @@ async fn fetch_cluster_metrics() -> Result<Vec<Metrics>, PostError> {
767769
);
768770
}
769771
}
772+
dresses.push(Metrics::querier_prometheus_metrics().await);
770773
Ok(dresses)
771774
}
772775

773-
pub fn init_cluster_metrics_schedular() -> Result<(), PostError> {
776+
pub async fn init_cluster_metrics_scheduler() -> Result<(), PostError> {
774777
info!("Setting up schedular for cluster metrics ingestion");
775778
let mut scheduler = AsyncScheduler::new();
776779
scheduler
@@ -779,25 +782,12 @@ pub fn init_cluster_metrics_schedular() -> Result<(), PostError> {
779782
let result: Result<(), PostError> = async {
780783
let cluster_metrics = fetch_cluster_metrics().await;
781784
if let Ok(metrics) = cluster_metrics {
782-
if !metrics.is_empty() {
783-
info!("Cluster metrics fetched successfully from all ingestors");
784-
if let Ok(metrics_bytes) = serde_json::to_vec(&metrics) {
785-
if matches!(
786-
ingest_internal_stream(
787-
INTERNAL_STREAM_NAME.to_string(),
788-
bytes::Bytes::from(metrics_bytes),
789-
)
790-
.await,
791-
Ok(())
792-
) {
793-
info!("Cluster metrics successfully ingested into internal stream");
794-
} else {
795-
error!("Failed to ingest cluster metrics into internal stream");
796-
}
797-
} else {
798-
error!("Failed to serialize cluster metrics");
799-
}
800-
}
785+
let json_value = serde_json::to_value(metrics)
786+
.map_err(|e| anyhow::anyhow!("Failed to serialize metrics: {}", e))?;
787+
788+
ingest_internal_stream(INTERNAL_STREAM_NAME.to_string(), Json(json_value))
789+
.await
790+
.map_err(|e| anyhow::anyhow!("Failed to ingest metrics: {}", e))?;
801791
}
802792
Ok(())
803793
}

‎src/handlers/http/ingest.rs

+19-12
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,8 @@ use super::logstream::error::{CreateStreamError, StreamError};
2020
use super::modal::utils::ingest_utils::{flatten_and_push_logs, push_logs};
2121
use super::users::dashboards::DashboardError;
2222
use super::users::filters::FiltersError;
23-
use crate::event::format::LogSource;
24-
use crate::event::{
25-
self,
26-
error::EventError,
27-
format::{self, EventFormat},
28-
};
23+
use crate::event::format::{self, EventFormat, LogSource};
24+
use crate::event::{self, error::EventError};
2925
use crate::handlers::http::modal::utils::logstream_utils::create_stream_and_schema_from_storage;
3026
use crate::handlers::{LOG_SOURCE_KEY, STREAM_NAME_HEADER_KEY};
3127
use crate::metadata::error::stream_info::MetadataError;
@@ -36,12 +32,12 @@ use crate::otel::metrics::flatten_otel_metrics;
3632
use crate::otel::traces::flatten_otel_traces;
3733
use crate::storage::{ObjectStorageError, StreamType};
3834
use crate::utils::header_parsing::ParseHeaderError;
39-
use crate::utils::json::flatten::JsonFlattenError;
35+
use crate::utils::json::convert_array_to_object;
36+
use crate::utils::json::flatten::{convert_to_array, JsonFlattenError};
4037
use actix_web::web::{Json, Path};
4138
use actix_web::{http::header::ContentType, HttpRequest, HttpResponse};
4239
use arrow_array::RecordBatch;
4340
use arrow_schema::Schema;
44-
use bytes::Bytes;
4541
use chrono::Utc;
4642
use http::StatusCode;
4743
use opentelemetry_proto::tonic::logs::v1::LogsData;
@@ -77,18 +73,29 @@ pub async fn ingest(req: HttpRequest, Json(json): Json<Value>) -> Result<HttpRes
7773
Ok(HttpResponse::Ok().finish())
7874
}
7975

80-
pub async fn ingest_internal_stream(stream_name: String, body: Bytes) -> Result<(), PostError> {
81-
let size: usize = body.len();
76+
pub async fn ingest_internal_stream(
77+
stream_name: String,
78+
Json(json): Json<Value>,
79+
) -> Result<(), PostError> {
80+
let size = serde_json::to_vec(&json).unwrap().len() as u64;
81+
let data = convert_to_array(convert_array_to_object(
82+
json,
83+
None,
84+
None,
85+
None,
86+
SchemaVersion::V0,
87+
&LogSource::default(),
88+
)?)?;
89+
8290
let parsed_timestamp = Utc::now().naive_utc();
8391
let (rb, is_first) = {
84-
let body_val: Value = serde_json::from_slice(&body)?;
8592
let hash_map = STREAM_INFO.read().unwrap();
8693
let schema = hash_map
8794
.get(&stream_name)
8895
.ok_or(PostError::StreamNotFound(stream_name.clone()))?
8996
.schema
9097
.clone();
91-
let event = format::json::Event { data: body_val };
98+
let event = format::json::Event { data };
9299
// For internal streams, use old schema
93100
event.into_recordbatch(&schema, false, None, SchemaVersion::V0)?
94101
};

‎src/handlers/http/modal/ingest_server.rs

+1
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,7 @@ impl ParseableServer for IngestServer {
113113

114114
// set the ingestor metadata
115115
set_ingestor_metadata().await?;
116+
metrics::init_system_metrics_scheduler().await?;
116117

117118
// Ingestors shouldn't have to deal with OpenId auth flow
118119
let app = self.start(shutdown_rx, prometheus, None);

‎src/handlers/http/modal/query_server.rs

+4-6
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,10 @@
1818

1919
use crate::correlation::CORRELATIONS;
2020
use crate::handlers::airplane;
21-
use crate::handlers::http::base_path;
22-
use crate::handlers::http::cluster::{self, init_cluster_metrics_schedular};
2321
use crate::handlers::http::logstream::create_internal_stream_if_not_exists;
2422
use crate::handlers::http::middleware::{DisAllowRootUser, RouteExt};
2523
use crate::handlers::http::{self, role};
24+
use crate::handlers::http::{base_path, cluster};
2625
use crate::handlers::http::{logstream, MAX_EVENT_PAYLOAD_SIZE};
2726
use crate::hottier::HotTierManager;
2827
use crate::rbac::role::Action;
@@ -35,7 +34,7 @@ use actix_web::{web, Scope};
3534
use async_trait::async_trait;
3635
use bytes::Bytes;
3736
use tokio::sync::oneshot;
38-
use tracing::{error, info};
37+
use tracing::error;
3938

4039
use crate::{option::CONFIG, ParseableServer};
4140

@@ -104,15 +103,14 @@ impl ParseableServer for QueryServer {
104103
// track all parquet files already in the data directory
105104
storage::retention::load_retention_from_global();
106105

106+
metrics::init_system_metrics_scheduler().await?;
107+
cluster::init_cluster_metrics_scheduler().await?;
107108
// all internal data structures populated now.
108109
// start the analytics scheduler if enabled
109110
if CONFIG.options.send_analytics {
110111
analytics::init_analytics_scheduler()?;
111112
}
112113

113-
if matches!(init_cluster_metrics_schedular(), Ok(())) {
114-
info!("Cluster metrics scheduler started successfully");
115-
}
116114
if let Some(hot_tier_manager) = HotTierManager::global() {
117115
hot_tier_manager.put_internal_stream_hot_tier().await?;
118116
hot_tier_manager.download_from_s3()?;

‎src/handlers/http/modal/server.rs

+2
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,8 @@ impl ParseableServer for Server {
119119
let (mut remote_sync_handler, mut remote_sync_outbox, mut remote_sync_inbox) =
120120
sync::object_store_sync().await;
121121

122+
metrics::init_system_metrics_scheduler().await?;
123+
122124
if CONFIG.options.send_analytics {
123125
analytics::init_analytics_scheduler()?;
124126
}

‎src/metrics/mod.rs

+250-5
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,25 @@
1818

1919
pub mod prom_utils;
2020
pub mod storage;
21-
22-
use crate::{handlers::http::metrics_path, stats::FullStats};
21+
use actix_web::HttpResponse;
22+
use clokwerk::{AsyncScheduler, Interval};
23+
use http::StatusCode;
24+
use serde::Serialize;
25+
use std::{path::Path, time::Duration};
26+
use sysinfo::{Disks, System};
27+
use tracing::{error, info};
28+
29+
use crate::{handlers::http::metrics_path, option::CONFIG, stats::FullStats};
2330
use actix_web::Responder;
2431
use actix_web_prometheus::{PrometheusMetrics, PrometheusMetricsBuilder};
2532
use error::MetricsError;
2633
use once_cell::sync::Lazy;
27-
use prometheus::{HistogramOpts, HistogramVec, IntCounterVec, IntGaugeVec, Opts, Registry};
34+
use prometheus::{
35+
GaugeVec, HistogramOpts, HistogramVec, IntCounterVec, IntGaugeVec, Opts, Registry,
36+
};
2837

2938
pub const METRICS_NAMESPACE: &str = env!("CARGO_PKG_NAME");
39+
const SYSTEM_METRICS_INTERVAL_SECONDS: Interval = clokwerk::Interval::Minutes(1);
3040

3141
pub static EVENTS_INGESTED: Lazy<IntGaugeVec> = Lazy::new(|| {
3242
IntGaugeVec::new(
@@ -182,6 +192,42 @@ pub static ALERTS_STATES: Lazy<IntCounterVec> = Lazy::new(|| {
182192
.expect("metric can be created")
183193
});
184194

195+
pub static TOTAL_DISK: Lazy<IntGaugeVec> = Lazy::new(|| {
196+
IntGaugeVec::new(
197+
Opts::new("total_disk", "Total Disk Size").namespace(METRICS_NAMESPACE),
198+
&["volume"],
199+
)
200+
.expect("metric can be created")
201+
});
202+
pub static USED_DISK: Lazy<IntGaugeVec> = Lazy::new(|| {
203+
IntGaugeVec::new(
204+
Opts::new("used_disk", "Used Disk Size").namespace(METRICS_NAMESPACE),
205+
&["volume"],
206+
)
207+
.expect("metric can be created")
208+
});
209+
pub static AVAILABLE_DISK: Lazy<IntGaugeVec> = Lazy::new(|| {
210+
IntGaugeVec::new(
211+
Opts::new("available_disk", "Available Disk Size").namespace(METRICS_NAMESPACE),
212+
&["volume"],
213+
)
214+
.expect("metric can be created")
215+
});
216+
pub static MEMORY: Lazy<IntGaugeVec> = Lazy::new(|| {
217+
IntGaugeVec::new(
218+
Opts::new("memory_usage", "Memory Usage").namespace(METRICS_NAMESPACE),
219+
&["memory_usage"],
220+
)
221+
.expect("metric can be created")
222+
});
223+
pub static CPU: Lazy<GaugeVec> = Lazy::new(|| {
224+
GaugeVec::new(
225+
Opts::new("cpu_usage", "CPU Usage").namespace(METRICS_NAMESPACE),
226+
&["cpu_usage"],
227+
)
228+
.expect("metric can be created")
229+
});
230+
185231
fn custom_metrics(registry: &Registry) {
186232
registry
187233
.register(Box::new(EVENTS_INGESTED.clone()))
@@ -231,6 +277,21 @@ fn custom_metrics(registry: &Registry) {
231277
registry
232278
.register(Box::new(ALERTS_STATES.clone()))
233279
.expect("metric can be registered");
280+
registry
281+
.register(Box::new(TOTAL_DISK.clone()))
282+
.expect("metric can be registered");
283+
registry
284+
.register(Box::new(USED_DISK.clone()))
285+
.expect("metric can be registered");
286+
registry
287+
.register(Box::new(AVAILABLE_DISK.clone()))
288+
.expect("metric can be registered");
289+
registry
290+
.register(Box::new(MEMORY.clone()))
291+
.expect("metric can be registered");
292+
registry
293+
.register(Box::new(CPU.clone()))
294+
.expect("metric can be registered");
234295
}
235296

236297
pub fn build_metrics_handler() -> PrometheusMetrics {
@@ -290,12 +351,196 @@ pub async fn fetch_stats_from_storage(stream_name: &str, stats: FullStats) {
290351
.set(stats.lifetime_stats.storage as i64);
291352
}
292353

293-
use actix_web::HttpResponse;
294-
295354
pub async fn get() -> Result<impl Responder, MetricsError> {
296355
Ok(HttpResponse::Ok().body(format!("{:?}", build_metrics_handler())))
297356
}
298357

358+
#[derive(Debug, Serialize, Default, Clone)]
359+
pub struct DiskMetrics {
360+
total: u64,
361+
used: u64,
362+
available: u64,
363+
}
364+
365+
#[derive(Debug, Serialize, Default, Clone)]
366+
pub struct SystemMetrics {
367+
memory: MemoryMetrics,
368+
cpu: Vec<CpuMetrics>,
369+
}
370+
371+
#[derive(Debug, Serialize, Default, Clone)]
372+
pub struct MemoryMetrics {
373+
total: u64,
374+
used: u64,
375+
total_swap: u64,
376+
used_swap: u64,
377+
}
378+
379+
#[derive(Debug, Serialize, Default, Clone)]
380+
pub struct CpuMetrics {
381+
name: String,
382+
usage: f64,
383+
}
384+
385+
// Scheduler for collecting all system metrics
386+
pub async fn init_system_metrics_scheduler() -> Result<(), MetricsError> {
387+
info!("Setting up scheduler for capturing system metrics");
388+
let mut scheduler = AsyncScheduler::new();
389+
390+
scheduler
391+
.every(SYSTEM_METRICS_INTERVAL_SECONDS)
392+
.run(move || async {
393+
if let Err(err) = collect_all_metrics().await {
394+
error!("Error in capturing system metrics: {:#}", err);
395+
}
396+
});
397+
398+
tokio::spawn(async move {
399+
loop {
400+
scheduler.run_pending().await;
401+
tokio::time::sleep(Duration::from_secs(10)).await;
402+
}
403+
});
404+
405+
Ok(())
406+
}
407+
408+
// Function to collect memory, CPU and disk usage metrics
409+
pub async fn collect_all_metrics() -> Result<(), MetricsError> {
410+
// Collect system metrics (CPU and memory)
411+
collect_system_metrics().await?;
412+
413+
// Collect disk metrics for all volumes
414+
collect_disk_metrics().await?;
415+
416+
Ok(())
417+
}
418+
419+
// Function to collect disk usage metrics
420+
async fn collect_disk_metrics() -> Result<(), MetricsError> {
421+
// collect staging volume metrics
422+
collect_volume_disk_usage("staging", CONFIG.staging_dir())?;
423+
// Collect data volume metrics for local storage
424+
if CONFIG.get_storage_mode_string() == "Local drive" {
425+
collect_volume_disk_usage("data", Path::new(&CONFIG.storage().get_endpoint()))?;
426+
}
427+
428+
// Collect hot tier volume metrics if configured
429+
if let Some(hot_tier_dir) = CONFIG.hot_tier_dir() {
430+
collect_volume_disk_usage("hot_tier", hot_tier_dir)?;
431+
}
432+
433+
Ok(())
434+
}
435+
436+
// Function to collect disk usage metrics for a specific volume
437+
fn collect_volume_disk_usage(label: &str, path: &Path) -> Result<(), MetricsError> {
438+
let metrics = get_volume_disk_usage(path)?;
439+
440+
TOTAL_DISK
441+
.with_label_values(&[label])
442+
.set(metrics.total as i64);
443+
USED_DISK
444+
.with_label_values(&[label])
445+
.set(metrics.used as i64);
446+
AVAILABLE_DISK
447+
.with_label_values(&[label])
448+
.set(metrics.available as i64);
449+
450+
Ok(())
451+
}
452+
453+
// Function to get disk usage for a specific volume
454+
fn get_volume_disk_usage(path: &Path) -> Result<DiskMetrics, MetricsError> {
455+
let mut disks = Disks::new_with_refreshed_list();
456+
disks.sort_by(|a, b| {
457+
b.mount_point()
458+
.to_str()
459+
.unwrap_or("")
460+
.len()
461+
.cmp(&a.mount_point().to_str().unwrap_or("").len())
462+
});
463+
464+
for disk in disks.iter() {
465+
let mount_point = disk.mount_point().to_str().unwrap();
466+
467+
if path.starts_with(mount_point) {
468+
return Ok(DiskMetrics {
469+
total: disk.total_space(),
470+
used: disk.total_space() - disk.available_space(),
471+
available: disk.available_space(),
472+
});
473+
}
474+
}
475+
476+
Err(MetricsError::Custom(
477+
format!("No matching disk found for path: {:?}", path),
478+
StatusCode::INTERNAL_SERVER_ERROR,
479+
))
480+
}
481+
482+
// Function to collect CPU and memory usage metrics
483+
async fn collect_system_metrics() -> Result<(), MetricsError> {
484+
let metrics = get_system_metrics()?;
485+
486+
// Set memory metrics
487+
MEMORY
488+
.with_label_values(&["total_memory"])
489+
.set(metrics.memory.total as i64);
490+
MEMORY
491+
.with_label_values(&["used_memory"])
492+
.set(metrics.memory.used as i64);
493+
MEMORY
494+
.with_label_values(&["total_swap"])
495+
.set(metrics.memory.total_swap as i64);
496+
MEMORY
497+
.with_label_values(&["used_swap"])
498+
.set(metrics.memory.used_swap as i64);
499+
500+
// Set CPU metrics
501+
for cpu in metrics.cpu {
502+
CPU.with_label_values(&[&cpu.name]).set(cpu.usage);
503+
}
504+
505+
Ok(())
506+
}
507+
508+
// Get system metrics
509+
fn get_system_metrics() -> Result<SystemMetrics, MetricsError> {
510+
let mut sys = System::new_all();
511+
sys.refresh_all();
512+
513+
// Collect memory metrics
514+
let memory = MemoryMetrics {
515+
total: sys.total_memory(),
516+
used: sys.used_memory(),
517+
total_swap: sys.total_swap(),
518+
used_swap: sys.used_swap(),
519+
};
520+
521+
// Collect CPU metrics
522+
let mut cpu_metrics = Vec::new();
523+
524+
// Add global CPU usage
525+
cpu_metrics.push(CpuMetrics {
526+
name: "global".to_string(),
527+
usage: sys.global_cpu_usage() as f64,
528+
});
529+
530+
// Add individual CPU usage
531+
for cpu in sys.cpus() {
532+
cpu_metrics.push(CpuMetrics {
533+
name: cpu.name().to_string(),
534+
usage: cpu.cpu_usage() as f64,
535+
});
536+
}
537+
538+
Ok(SystemMetrics {
539+
memory,
540+
cpu: cpu_metrics,
541+
})
542+
}
543+
299544
pub mod error {
300545

301546
use actix_web::http::header::ContentType;

‎src/metrics/prom_utils.rs

+243-91
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,14 @@
1616
*
1717
*/
1818

19+
use std::collections::HashMap;
20+
use std::path::Path;
21+
22+
use crate::about::current;
1923
use crate::handlers::http::base_path_without_preceding_slash;
2024
use crate::handlers::http::ingest::PostError;
2125
use crate::handlers::http::modal::IngestorMetadata;
26+
use crate::option::CONFIG;
2227
use crate::utils::get_url;
2328
use crate::HTTP_CLIENT;
2429
use actix_web::http::header;
@@ -33,6 +38,11 @@ use tracing::error;
3338
use tracing::warn;
3439
use url::Url;
3540

41+
use super::get_system_metrics;
42+
use super::get_volume_disk_usage;
43+
use super::DiskMetrics;
44+
use super::MemoryMetrics;
45+
3646
#[derive(Debug, Serialize, Clone)]
3747
pub struct Metrics {
3848
address: String,
@@ -51,6 +61,11 @@ pub struct Metrics {
5161
event_time: NaiveDateTime,
5262
commit: String,
5363
staging: String,
64+
parseable_data_disk_usage: DiskMetrics,
65+
parseable_staging_disk_usage: DiskMetrics,
66+
parseable_hot_tier_disk_usage: DiskMetrics,
67+
parseable_memory_usage: MemoryMetrics,
68+
parseable_cpu_usage: HashMap<String, f64>,
5469
}
5570

5671
#[derive(Debug, Serialize, Default, Clone)]
@@ -59,15 +74,8 @@ struct StorageMetrics {
5974
data: f64,
6075
}
6176

62-
impl Default for Metrics {
63-
fn default() -> Self {
64-
let url = get_url();
65-
let address = format!(
66-
"http://{}:{}",
67-
url.domain()
68-
.unwrap_or(url.host_str().expect("should have a host")),
69-
url.port().unwrap_or_default()
70-
);
77+
impl Metrics {
78+
fn new(address: String) -> Self {
7179
Metrics {
7280
address,
7381
parseable_events_ingested: 0.0,
@@ -85,34 +93,240 @@ impl Default for Metrics {
8593
event_time: Utc::now().naive_utc(),
8694
commit: "".to_string(),
8795
staging: "".to_string(),
96+
parseable_data_disk_usage: DiskMetrics {
97+
total: 0,
98+
used: 0,
99+
available: 0,
100+
},
101+
parseable_staging_disk_usage: DiskMetrics {
102+
total: 0,
103+
used: 0,
104+
available: 0,
105+
},
106+
parseable_hot_tier_disk_usage: DiskMetrics {
107+
total: 0,
108+
used: 0,
109+
available: 0,
110+
},
111+
parseable_memory_usage: MemoryMetrics {
112+
total: 0,
113+
used: 0,
114+
total_swap: 0,
115+
used_swap: 0,
116+
},
117+
parseable_cpu_usage: HashMap::new(),
88118
}
89119
}
90120
}
91121

92-
impl Metrics {
93-
fn new(address: String) -> Self {
94-
Metrics {
95-
address,
96-
parseable_events_ingested: 0.0,
97-
parseable_events_ingested_size: 0.0,
98-
parseable_staging_files: 0.0,
99-
process_resident_memory_bytes: 0.0,
100-
parseable_storage_size: StorageMetrics::default(),
101-
parseable_lifetime_events_ingested: 0.0,
102-
parseable_lifetime_events_ingested_size: 0.0,
103-
parseable_deleted_events_ingested: 0.0,
104-
parseable_deleted_events_ingested_size: 0.0,
105-
parseable_deleted_storage_size: StorageMetrics::default(),
106-
parseable_lifetime_storage_size: StorageMetrics::default(),
107-
event_type: "cluster-metrics".to_string(),
108-
event_time: Utc::now().naive_utc(),
109-
commit: "".to_string(),
110-
staging: "".to_string(),
122+
#[derive(Debug)]
123+
enum MetricType {
124+
SimpleGauge(String),
125+
StorageSize(String),
126+
DiskUsage(String),
127+
MemoryUsage(String),
128+
CpuUsage,
129+
}
130+
131+
impl MetricType {
132+
fn from_metric(metric: &str, labels: &HashMap<String, String>) -> Option<Self> {
133+
match metric {
134+
"parseable_events_ingested" => {
135+
Some(Self::SimpleGauge("parseable_events_ingested".into()))
136+
}
137+
"parseable_events_ingested_size" => {
138+
Some(Self::SimpleGauge("parseable_events_ingested_size".into()))
139+
}
140+
"parseable_lifetime_events_ingested" => Some(Self::SimpleGauge(
141+
"parseable_lifetime_events_ingested".into(),
142+
)),
143+
"parseable_lifetime_events_ingested_size" => Some(Self::SimpleGauge(
144+
"parseable_lifetime_events_ingested_size".into(),
145+
)),
146+
"parseable_events_deleted" => {
147+
Some(Self::SimpleGauge("parseable_events_deleted".into()))
148+
}
149+
"parseable_events_deleted_size" => {
150+
Some(Self::SimpleGauge("parseable_events_deleted_size".into()))
151+
}
152+
"parseable_staging_files" => Some(Self::SimpleGauge("parseable_staging_files".into())),
153+
"process_resident_memory_bytes" => {
154+
Some(Self::SimpleGauge("process_resident_memory_bytes".into()))
155+
}
156+
"parseable_storage_size" => labels.get("type").map(|t| Self::StorageSize(t.clone())),
157+
"parseable_lifetime_events_storage_size" => {
158+
labels.get("type").map(|t| Self::StorageSize(t.clone()))
159+
}
160+
"parseable_deleted_events_storage_size" => {
161+
labels.get("type").map(|t| Self::StorageSize(t.clone()))
162+
}
163+
"parseable_total_disk" | "parseable_used_disk" | "parseable_available_disk" => {
164+
labels.get("volume").map(|v| Self::DiskUsage(v.clone()))
165+
}
166+
"parseable_memory_usage" => labels
167+
.get("memory_usage")
168+
.map(|m| Self::MemoryUsage(m.clone())),
169+
"parseable_cpu_usage" => Some(Self::CpuUsage),
170+
_ => None,
111171
}
112172
}
113173
}
114-
115174
impl Metrics {
175+
pub async fn ingestor_prometheus_samples(
176+
samples: Vec<PromSample>,
177+
ingestor_metadata: &IngestorMetadata,
178+
) -> Result<Self, PostError> {
179+
let mut metrics = Metrics::new(ingestor_metadata.domain_name.to_string());
180+
181+
Self::build_metrics_from_samples(samples, &mut metrics)?;
182+
183+
// Get additional metadata
184+
let (commit_id, staging) = Self::from_about_api_response(ingestor_metadata.clone())
185+
.await
186+
.map_err(|err| {
187+
error!("Fatal: failed to get ingestor info: {:?}", err);
188+
PostError::Invalid(err.into())
189+
})?;
190+
191+
metrics.commit = commit_id;
192+
metrics.staging = staging;
193+
194+
Ok(metrics)
195+
}
196+
197+
pub async fn querier_prometheus_metrics() -> Self {
198+
let mut metrics = Metrics::new(get_url().to_string());
199+
200+
let system_metrics = get_system_metrics().expect("Failed to get system metrics");
201+
202+
metrics.parseable_memory_usage.total = system_metrics.memory.total;
203+
metrics.parseable_memory_usage.used = system_metrics.memory.used;
204+
metrics.parseable_memory_usage.total_swap = system_metrics.memory.total_swap;
205+
metrics.parseable_memory_usage.used_swap = system_metrics.memory.used_swap;
206+
for cpu_usage in system_metrics.cpu {
207+
metrics
208+
.parseable_cpu_usage
209+
.insert(cpu_usage.name.clone(), cpu_usage.usage);
210+
}
211+
212+
let staging_disk_usage = get_volume_disk_usage(CONFIG.staging_dir())
213+
.expect("Failed to get staging volume disk usage");
214+
215+
metrics.parseable_staging_disk_usage.total = staging_disk_usage.total;
216+
metrics.parseable_staging_disk_usage.used = staging_disk_usage.used;
217+
metrics.parseable_staging_disk_usage.available = staging_disk_usage.available;
218+
219+
if CONFIG.get_storage_mode_string() == "Local drive" {
220+
let data_disk_usage =
221+
get_volume_disk_usage(Path::new(&CONFIG.storage().get_endpoint()))
222+
.expect("Failed to get data volume disk usage");
223+
224+
metrics.parseable_data_disk_usage.total = data_disk_usage.total;
225+
metrics.parseable_data_disk_usage.used = data_disk_usage.used;
226+
metrics.parseable_data_disk_usage.available = data_disk_usage.available;
227+
}
228+
229+
if CONFIG.options.hot_tier_storage_path.is_some() {
230+
let hot_tier_disk_usage =
231+
get_volume_disk_usage(CONFIG.hot_tier_dir().as_ref().unwrap())
232+
.expect("Failed to get hot tier volume disk usage");
233+
234+
metrics.parseable_hot_tier_disk_usage.total = hot_tier_disk_usage.total;
235+
metrics.parseable_hot_tier_disk_usage.used = hot_tier_disk_usage.used;
236+
metrics.parseable_hot_tier_disk_usage.available = hot_tier_disk_usage.available;
237+
}
238+
239+
metrics.commit = current().commit_hash;
240+
metrics.staging = CONFIG.staging_dir().display().to_string();
241+
242+
metrics
243+
}
244+
245+
fn build_metrics_from_samples(
246+
samples: Vec<PromSample>,
247+
metrics: &mut Metrics,
248+
) -> Result<(), PostError> {
249+
for sample in samples {
250+
let metric_type = MetricType::from_metric(&sample.metric, &sample.labels);
251+
252+
match (sample.value.clone(), metric_type) {
253+
(PromValue::Gauge(val), Some(metric_type)) => {
254+
Self::process_gauge_metric(
255+
metrics,
256+
metric_type,
257+
val,
258+
&sample.metric,
259+
sample.clone(),
260+
);
261+
}
262+
_ => continue,
263+
}
264+
}
265+
Ok(())
266+
}
267+
268+
fn process_gauge_metric(
269+
metrics: &mut Metrics,
270+
metric_type: MetricType,
271+
val: f64,
272+
metric_name: &str,
273+
sample: PromSample,
274+
) {
275+
match metric_type {
276+
MetricType::SimpleGauge(metric_name) => match metric_name.as_str() {
277+
"parseable_events_ingested" => metrics.parseable_events_ingested += val,
278+
"parseable_events_ingested_size" => metrics.parseable_events_ingested_size += val,
279+
"parseable_lifetime_events_ingested" => {
280+
metrics.parseable_lifetime_events_ingested += val
281+
}
282+
"parseable_lifetime_events_ingested_size" => {
283+
metrics.parseable_lifetime_events_ingested_size += val
284+
}
285+
"parseable_events_deleted" => metrics.parseable_deleted_events_ingested += val,
286+
"parseable_events_deleted_size" => {
287+
metrics.parseable_deleted_events_ingested_size += val
288+
}
289+
"parseable_staging_files" => metrics.parseable_staging_files += val,
290+
"process_resident_memory_bytes" => metrics.process_resident_memory_bytes += val,
291+
_ => {}
292+
},
293+
MetricType::StorageSize(storage_type) => match storage_type.as_str() {
294+
"staging" => metrics.parseable_storage_size.staging += val,
295+
"data" => metrics.parseable_storage_size.data += val,
296+
_ => {}
297+
},
298+
MetricType::DiskUsage(volume_type) => {
299+
let disk_usage = match volume_type.as_str() {
300+
"data" => &mut metrics.parseable_data_disk_usage,
301+
"staging" => &mut metrics.parseable_staging_disk_usage,
302+
"hot_tier" => &mut metrics.parseable_hot_tier_disk_usage,
303+
_ => return,
304+
};
305+
306+
match metric_name {
307+
"parseable_total_disk" => disk_usage.total = val as u64,
308+
"parseable_used_disk" => disk_usage.used = val as u64,
309+
"parseable_available_disk" => disk_usage.available = val as u64,
310+
_ => {}
311+
}
312+
}
313+
MetricType::MemoryUsage(memory_type) => match memory_type.as_str() {
314+
"total_memory" => metrics.parseable_memory_usage.total = val as u64,
315+
"used_memory" => metrics.parseable_memory_usage.used = val as u64,
316+
"total_swap" => metrics.parseable_memory_usage.total_swap = val as u64,
317+
"used_swap" => metrics.parseable_memory_usage.used_swap = val as u64,
318+
_ => {}
319+
},
320+
MetricType::CpuUsage => {
321+
if let Some(cpu_name) = sample.labels.get("cpu_usage") {
322+
metrics
323+
.parseable_cpu_usage
324+
.insert(cpu_name.to_string(), val);
325+
}
326+
}
327+
}
328+
}
329+
116330
pub fn get_daily_stats_from_samples(
117331
samples: Vec<PromSample>,
118332
stream_name: &str,
@@ -154,68 +368,6 @@ impl Metrics {
154368
}
155369
(events_ingested, ingestion_size, storage_size)
156370
}
157-
pub async fn from_prometheus_samples(
158-
samples: Vec<PromSample>,
159-
ingestor_metadata: &IngestorMetadata,
160-
) -> Result<Self, PostError> {
161-
let mut prom_dress = Metrics::new(ingestor_metadata.domain_name.to_string());
162-
for sample in samples {
163-
if let PromValue::Gauge(val) = sample.value {
164-
match sample.metric.as_str() {
165-
"parseable_events_ingested" => prom_dress.parseable_events_ingested += val,
166-
"parseable_events_ingested_size" => {
167-
prom_dress.parseable_events_ingested_size += val
168-
}
169-
"parseable_lifetime_events_ingested" => {
170-
prom_dress.parseable_lifetime_events_ingested += val
171-
}
172-
"parseable_lifetime_events_ingested_size" => {
173-
prom_dress.parseable_lifetime_events_ingested_size += val
174-
}
175-
"parseable_events_deleted" => {
176-
prom_dress.parseable_deleted_events_ingested += val
177-
}
178-
"parseable_events_deleted_size" => {
179-
prom_dress.parseable_deleted_events_ingested_size += val
180-
}
181-
"parseable_staging_files" => prom_dress.parseable_staging_files += val,
182-
"process_resident_memory_bytes" => {
183-
prom_dress.process_resident_memory_bytes += val
184-
}
185-
"parseable_storage_size" => {
186-
if sample.labels.get("type").expect("type is present") == "staging" {
187-
prom_dress.parseable_storage_size.staging += val;
188-
}
189-
if sample.labels.get("type").expect("type is present") == "data" {
190-
prom_dress.parseable_storage_size.data += val;
191-
}
192-
}
193-
"parseable_lifetime_events_storage_size" => {
194-
if sample.labels.get("type").expect("type is present") == "data" {
195-
prom_dress.parseable_lifetime_storage_size.data += val;
196-
}
197-
}
198-
"parseable_deleted_events_storage_size" => {
199-
if sample.labels.get("type").expect("type is present") == "data" {
200-
prom_dress.parseable_deleted_storage_size.data += val;
201-
}
202-
}
203-
_ => {}
204-
}
205-
}
206-
}
207-
let (commit_id, staging) = Self::from_about_api_response(ingestor_metadata.clone())
208-
.await
209-
.map_err(|err| {
210-
error!("Fatal: failed to get ingestor info: {:?}", err);
211-
PostError::Invalid(err.into())
212-
})?;
213-
214-
prom_dress.commit = commit_id;
215-
prom_dress.staging = staging;
216-
217-
Ok(prom_dress)
218-
}
219371

220372
pub async fn from_about_api_response(
221373
ingestor_metadata: IngestorMetadata,

0 commit comments

Comments
 (0)
Please sign in to comment.