Skip to content

Commit 161d2bf

Browse files
feat: add sysinfo metrics
collect CPU usage, memory usage of the server collect disk usage of the volume - data, staging, hot-tier add these metrics to Prometheus Metrics export these metrics to cluster metrics API add the metrics to pmeta stream add the querier node's sysinfo metrics to pmeta and cluster metrics API
1 parent 8748715 commit 161d2bf

File tree

8 files changed

+535
-141
lines changed

8 files changed

+535
-141
lines changed

src/correlation.rs

+3-4
Original file line numberDiff line numberDiff line change
@@ -129,10 +129,9 @@ impl Correlations {
129129
.await?;
130130

131131
// Update in memory
132-
self.write().await.insert(
133-
correlation.id.to_owned(),
134-
correlation.clone(),
135-
);
132+
self.write()
133+
.await
134+
.insert(correlation.id.to_owned(), correlation.clone());
136135

137136
Ok(correlation)
138137
}

src/handlers/http/cluster/mod.rs

+13-23
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ use crate::storage::{ObjectStorageError, STREAM_ROOT_DIRECTORY};
3535
use crate::storage::{ObjectStoreFormat, PARSEABLE_ROOT_DIRECTORY};
3636
use crate::HTTP_CLIENT;
3737
use actix_web::http::header::{self, HeaderMap};
38-
use actix_web::web::Path;
38+
use actix_web::web::{Json, Path};
3939
use actix_web::Responder;
4040
use bytes::Bytes;
4141
use chrono::Utc;
@@ -729,6 +729,9 @@ async fn fetch_cluster_metrics() -> Result<Vec<Metrics>, PostError> {
729729
let mut dresses = vec![];
730730

731731
for ingestor in ingestor_metadata {
732+
if !utils::check_liveness(&ingestor.domain_name).await {
733+
continue;
734+
}
732735
let uri = Url::parse(&format!(
733736
"{}{}/metrics",
734737
&ingestor.domain_name,
@@ -749,11 +752,10 @@ async fn fetch_cluster_metrics() -> Result<Vec<Metrics>, PostError> {
749752
let text = res.text().await.map_err(PostError::NetworkError)?;
750753
let lines: Vec<Result<String, std::io::Error>> =
751754
text.lines().map(|line| Ok(line.to_owned())).collect_vec();
752-
753755
let sample = prometheus_parse::Scrape::parse(lines.into_iter())
754756
.map_err(|err| PostError::CustomError(err.to_string()))?
755757
.samples;
756-
let ingestor_metrics = Metrics::from_prometheus_samples(sample, &ingestor)
758+
let ingestor_metrics = Metrics::ingestor_prometheus_samples(sample, &ingestor)
757759
.await
758760
.map_err(|err| {
759761
error!("Fatal: failed to get ingestor metrics: {:?}", err);
@@ -767,10 +769,11 @@ async fn fetch_cluster_metrics() -> Result<Vec<Metrics>, PostError> {
767769
);
768770
}
769771
}
772+
dresses.push(Metrics::querier_prometheus_metrics().await);
770773
Ok(dresses)
771774
}
772775

773-
pub fn init_cluster_metrics_schedular() -> Result<(), PostError> {
776+
pub async fn init_cluster_metrics_scheduler() -> Result<(), PostError> {
774777
info!("Setting up schedular for cluster metrics ingestion");
775778
let mut scheduler = AsyncScheduler::new();
776779
scheduler
@@ -779,25 +782,12 @@ pub fn init_cluster_metrics_schedular() -> Result<(), PostError> {
779782
let result: Result<(), PostError> = async {
780783
let cluster_metrics = fetch_cluster_metrics().await;
781784
if let Ok(metrics) = cluster_metrics {
782-
if !metrics.is_empty() {
783-
info!("Cluster metrics fetched successfully from all ingestors");
784-
if let Ok(metrics_bytes) = serde_json::to_vec(&metrics) {
785-
if matches!(
786-
ingest_internal_stream(
787-
INTERNAL_STREAM_NAME.to_string(),
788-
bytes::Bytes::from(metrics_bytes),
789-
)
790-
.await,
791-
Ok(())
792-
) {
793-
info!("Cluster metrics successfully ingested into internal stream");
794-
} else {
795-
error!("Failed to ingest cluster metrics into internal stream");
796-
}
797-
} else {
798-
error!("Failed to serialize cluster metrics");
799-
}
800-
}
785+
let json_value = serde_json::to_value(metrics)
786+
.map_err(|e| anyhow::anyhow!("Failed to serialize metrics: {}", e))?;
787+
788+
ingest_internal_stream(INTERNAL_STREAM_NAME.to_string(), Json(json_value))
789+
.await
790+
.map_err(|e| anyhow::anyhow!("Failed to ingest metrics: {}", e))?;
801791
}
802792
Ok(())
803793
}

src/handlers/http/ingest.rs

+19-12
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,8 @@ use super::logstream::error::{CreateStreamError, StreamError};
2020
use super::modal::utils::ingest_utils::{flatten_and_push_logs, push_logs};
2121
use super::users::dashboards::DashboardError;
2222
use super::users::filters::FiltersError;
23-
use crate::event::format::LogSource;
24-
use crate::event::{
25-
self,
26-
error::EventError,
27-
format::{self, EventFormat},
28-
};
23+
use crate::event::format::{self, EventFormat, LogSource};
24+
use crate::event::{self, error::EventError};
2925
use crate::handlers::http::modal::utils::logstream_utils::create_stream_and_schema_from_storage;
3026
use crate::handlers::{LOG_SOURCE_KEY, STREAM_NAME_HEADER_KEY};
3127
use crate::metadata::error::stream_info::MetadataError;
@@ -36,12 +32,12 @@ use crate::otel::metrics::flatten_otel_metrics;
3632
use crate::otel::traces::flatten_otel_traces;
3733
use crate::storage::{ObjectStorageError, StreamType};
3834
use crate::utils::header_parsing::ParseHeaderError;
39-
use crate::utils::json::flatten::JsonFlattenError;
35+
use crate::utils::json::convert_array_to_object;
36+
use crate::utils::json::flatten::{convert_to_array, JsonFlattenError};
4037
use actix_web::web::{Json, Path};
4138
use actix_web::{http::header::ContentType, HttpRequest, HttpResponse};
4239
use arrow_array::RecordBatch;
4340
use arrow_schema::Schema;
44-
use bytes::Bytes;
4541
use chrono::Utc;
4642
use http::StatusCode;
4743
use opentelemetry_proto::tonic::logs::v1::LogsData;
@@ -77,18 +73,29 @@ pub async fn ingest(req: HttpRequest, Json(json): Json<Value>) -> Result<HttpRes
7773
Ok(HttpResponse::Ok().finish())
7874
}
7975

80-
pub async fn ingest_internal_stream(stream_name: String, body: Bytes) -> Result<(), PostError> {
81-
let size: usize = body.len();
76+
pub async fn ingest_internal_stream(
77+
stream_name: String,
78+
Json(json): Json<Value>,
79+
) -> Result<(), PostError> {
80+
let size = serde_json::to_vec(&json).unwrap().len() as u64;
81+
let data = convert_to_array(convert_array_to_object(
82+
json,
83+
None,
84+
None,
85+
None,
86+
SchemaVersion::V0,
87+
&LogSource::default(),
88+
)?)?;
89+
8290
let parsed_timestamp = Utc::now().naive_utc();
8391
let (rb, is_first) = {
84-
let body_val: Value = serde_json::from_slice(&body)?;
8592
let hash_map = STREAM_INFO.read().unwrap();
8693
let schema = hash_map
8794
.get(&stream_name)
8895
.ok_or(PostError::StreamNotFound(stream_name.clone()))?
8996
.schema
9097
.clone();
91-
let event = format::json::Event { data: body_val };
98+
let event = format::json::Event { data };
9299
// For internal streams, use old schema
93100
event.into_recordbatch(&schema, false, None, SchemaVersion::V0)?
94101
};

src/handlers/http/modal/ingest_server.rs

+1
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,7 @@ impl ParseableServer for IngestServer {
113113

114114
// set the ingestor metadata
115115
set_ingestor_metadata().await?;
116+
metrics::init_system_metrics_scheduler().await?;
116117

117118
// Ingestors shouldn't have to deal with OpenId auth flow
118119
let app = self.start(shutdown_rx, prometheus, None);

src/handlers/http/modal/query_server.rs

+4-6
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,10 @@
1818

1919
use crate::correlation::CORRELATIONS;
2020
use crate::handlers::airplane;
21-
use crate::handlers::http::base_path;
22-
use crate::handlers::http::cluster::{self, init_cluster_metrics_schedular};
2321
use crate::handlers::http::logstream::create_internal_stream_if_not_exists;
2422
use crate::handlers::http::middleware::{DisAllowRootUser, RouteExt};
2523
use crate::handlers::http::{self, role};
24+
use crate::handlers::http::{base_path, cluster};
2625
use crate::handlers::http::{logstream, MAX_EVENT_PAYLOAD_SIZE};
2726
use crate::hottier::HotTierManager;
2827
use crate::rbac::role::Action;
@@ -35,7 +34,7 @@ use actix_web::{web, Scope};
3534
use async_trait::async_trait;
3635
use bytes::Bytes;
3736
use tokio::sync::oneshot;
38-
use tracing::{error, info};
37+
use tracing::error;
3938

4039
use crate::{option::CONFIG, ParseableServer};
4140

@@ -104,15 +103,14 @@ impl ParseableServer for QueryServer {
104103
// track all parquet files already in the data directory
105104
storage::retention::load_retention_from_global();
106105

106+
metrics::init_system_metrics_scheduler().await?;
107+
cluster::init_cluster_metrics_scheduler().await?;
107108
// all internal data structures populated now.
108109
// start the analytics scheduler if enabled
109110
if CONFIG.options.send_analytics {
110111
analytics::init_analytics_scheduler()?;
111112
}
112113

113-
if matches!(init_cluster_metrics_schedular(), Ok(())) {
114-
info!("Cluster metrics scheduler started successfully");
115-
}
116114
if let Some(hot_tier_manager) = HotTierManager::global() {
117115
hot_tier_manager.put_internal_stream_hot_tier().await?;
118116
hot_tier_manager.download_from_s3()?;

src/handlers/http/modal/server.rs

+2
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,8 @@ impl ParseableServer for Server {
119119
let (mut remote_sync_handler, mut remote_sync_outbox, mut remote_sync_inbox) =
120120
sync::object_store_sync().await;
121121

122+
metrics::init_system_metrics_scheduler().await?;
123+
122124
if CONFIG.options.send_analytics {
123125
analytics::init_analytics_scheduler()?;
124126
}

0 commit comments

Comments
 (0)