diff --git a/src/handlers/http/clickbench.rs b/src/handlers/http/clickbench.rs new file mode 100644 index 000000000..02df9d4e2 --- /dev/null +++ b/src/handlers/http/clickbench.rs @@ -0,0 +1,185 @@ +/* + * Parseable Server (C) 2022 - 2024 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +use std::{env, fs, process::Command, time::Instant}; + +use actix_web::{web::Json, Responder}; +use datafusion::{ + common::plan_datafusion_err, + error::DataFusionError, + execution::{runtime_env::RuntimeEnvBuilder, SessionStateBuilder}, + prelude::{ParquetReadOptions, SessionConfig, SessionContext}, + sql::{parser::DFParser, sqlparser::dialect::dialect_from_str}, +}; +use serde_json::{json, Value}; +use tracing::{info, warn}; +static PARQUET_FILE: &str = "PARQUET_FILE"; +static QUERIES_FILE: &str = "QUERIES_FILE"; + +pub async fn clickbench_benchmark() -> Result { + drop_system_caches() + .await + .map_err(actix_web::error::ErrorInternalServerError)?; + let results = run_benchmark() + .await + .map_err(actix_web::error::ErrorInternalServerError)?; + Ok(results) +} + +pub async fn drop_system_caches() -> Result<(), anyhow::Error> { + // Sync to flush file system buffers + match Command::new("sync").status() { + Ok(_) => {} + Err(e) => warn!("Failed to execute sync command: {}", e), + } + let _ = Command::new("sudo") + .args(["sh", "-c", "echo 3 > /proc/sys/vm/drop_caches"]) + .output() + .map_err(|e| { + warn!("Failed to drop system caches: {}", e); + anyhow::Error::msg("Failed to drop system caches. This might be expected if not running on Linux or without sudo privileges.") + })?; + + Ok(()) +} + +pub async fn run_benchmark() -> Result, anyhow::Error> { + let mut session_config = SessionConfig::new().with_information_schema(true); + + session_config = session_config.with_batch_size(8192); + + let rt_builder = RuntimeEnvBuilder::new(); + // set memory pool size + let runtime_env = rt_builder.build_arc()?; + let state = SessionStateBuilder::new() + .with_default_features() + .with_config(session_config) + .with_runtime_env(runtime_env) + .build(); + state + .catalog_list() + .catalog(&state.config_options().catalog.default_catalog) + .expect("default catalog is provided by datafusion"); + + let ctx = SessionContext::new_with_state(state); + + let parquet_file = env::var(PARQUET_FILE) + .map_err(|_| anyhow::anyhow!("PARQUET_FILE environment variable not set. Please set it to the path of the hits.parquet file."))?; + register_hits(&ctx, &parquet_file).await?; + info!("hits.parquet registered"); + let mut query_list = Vec::new(); + let queries_file = env::var(QUERIES_FILE) + .map_err(|_| anyhow::anyhow!("QUERIES_FILE environment variable not set. Please set it to the path of the queries file."))?; + let queries = fs::read_to_string(queries_file)?; + for query in queries.lines() { + query_list.push(query.to_string()); + } + let results = tokio::task::spawn_blocking(move || execute_queries(&ctx, query_list)) + .await + .map_err(|e| anyhow::anyhow!(e))? + .map_err(|e| anyhow::anyhow!(e))?; + + Ok(results) +} + +async fn register_hits(ctx: &SessionContext, parquet_file: &str) -> Result<(), anyhow::Error> { + let options: ParquetReadOptions<'_> = ParquetReadOptions::default(); + ctx.register_parquet("hits", parquet_file, options) + .await + .map_err(|e| { + DataFusionError::Context(format!("Registering 'hits' as {parquet_file}"), Box::new(e)) + })?; + Ok(()) +} + +#[tokio::main(flavor = "multi_thread")] +pub async fn execute_queries( + ctx: &SessionContext, + query_list: Vec, +) -> Result, anyhow::Error> { + const TRIES: usize = 3; + let mut results = Vec::with_capacity(query_list.len() * TRIES); + let mut total_elapsed_per_iteration = [0.0; TRIES]; + for (query_index, sql) in query_list.iter().enumerate() { + let mut elapsed_times = Vec::with_capacity(TRIES); + for iteration in 1..=TRIES { + let start = Instant::now(); + let task_ctx = ctx.task_ctx(); + let dialect = &task_ctx.session_config().options().sql_parser.dialect; + let dialect = dialect_from_str(dialect).ok_or_else(|| { + plan_datafusion_err!( + "Unsupported SQL dialect: {dialect}. Available dialects: \ + Generic, MySQL, PostgreSQL, Hive, SQLite, Snowflake, Redshift, \ + MsSQL, ClickHouse, BigQuery, Ansi." + ) + })?; + + let statements = DFParser::parse_sql_with_dialect(sql, dialect.as_ref())?; + let statement = statements + .front() + .ok_or_else(|| anyhow::anyhow!("No SQL statement found in query: {}", sql))?; + let plan = ctx.state().statement_to_plan(statement.clone()).await?; + + let df = ctx.execute_logical_plan(plan).await?; + + let _ = df.collect().await?; + let elapsed = start.elapsed().as_secs_f64(); + total_elapsed_per_iteration[iteration - 1] += elapsed; + info!("query {query_index} iteration {iteration} completed in {elapsed} secs"); + elapsed_times.push(elapsed); + } + let iterations: Vec = elapsed_times + .iter() + .enumerate() + .map(|(iteration, &elapsed_time)| { + json!({ + "iteration": iteration + 1, + "elapsed_time": elapsed_time + }) + }) + .collect(); + + let query_result = json!({ + "query_details": { + "query_index": query_index + 1, + "query": sql + }, + "iterations": iterations + }); + + results.push(query_result); + } + + let summary: Vec = total_elapsed_per_iteration + .iter() + .enumerate() + .map(|(iteration, &total_elapsed)| { + json!({ + "iteration": iteration + 1, + "total_elapsed": total_elapsed + }) + }) + .collect(); + + let result_json = json!({ + "summary": summary, + "results": results + }); + + Ok(Json(result_json)) +} diff --git a/src/handlers/http/mod.rs b/src/handlers/http/mod.rs index f1f702d4b..c3b166a4a 100644 --- a/src/handlers/http/mod.rs +++ b/src/handlers/http/mod.rs @@ -30,6 +30,7 @@ use self::{cluster::get_ingestor_info, query::Query}; pub mod about; pub mod alerts; mod audit; +pub mod clickbench; pub mod cluster; pub mod correlation; pub mod health_check; diff --git a/src/handlers/http/modal/server.rs b/src/handlers/http/modal/server.rs index 27a4d30f4..1205d78f5 100644 --- a/src/handlers/http/modal/server.rs +++ b/src/handlers/http/modal/server.rs @@ -25,6 +25,7 @@ use crate::handlers; use crate::handlers::http::about; use crate::handlers::http::alerts; use crate::handlers::http::base_path; +use crate::handlers::http::clickbench; use crate::handlers::http::health_check; use crate::handlers::http::query; use crate::handlers::http::users::dashboards; @@ -87,7 +88,8 @@ impl ParseableServer for Server { .service(Self::get_user_role_webscope()) .service(Self::get_counts_webscope()) .service(Self::get_alerts_webscope()) - .service(Self::get_metrics_webscope()), + .service(Self::get_metrics_webscope()) + .service(Self::get_benchmark_webscope()), ) .service(Self::get_ingest_otel_factory()) .service(Self::get_generated()); @@ -160,6 +162,16 @@ impl Server { ) } + pub fn get_benchmark_webscope() -> Scope { + web::scope("/benchmark/clickbench").service( + web::resource("").route( + web::get() + .to(clickbench::clickbench_benchmark) + .authorize(Action::Benchmark), + ), + ) + } + pub fn get_correlation_webscope() -> Scope { web::scope("/correlation") .service( diff --git a/src/parseable/streams.rs b/src/parseable/streams.rs index 088ca509d..009e01d2c 100644 --- a/src/parseable/streams.rs +++ b/src/parseable/streams.rs @@ -513,10 +513,7 @@ impl Stream { let file_size = match file.metadata() { Ok(meta) => meta.len(), Err(err) => { - warn!( - "File ({}) not found; Error = {err}", - file.display() - ); + warn!("File ({}) not found; Error = {err}", file.display()); continue; } }; diff --git a/src/rbac/role.rs b/src/rbac/role.rs index 00208631c..df7fbe15e 100644 --- a/src/rbac/role.rs +++ b/src/rbac/role.rs @@ -67,6 +67,7 @@ pub enum Action { CreateCorrelation, DeleteCorrelation, PutCorrelation, + Benchmark, } #[derive(Debug, Clone, PartialEq, Eq, Hash)] @@ -108,6 +109,7 @@ impl RoleBuilder { ), Action::Login | Action::Metrics + | Action::Benchmark | Action::PutUser | Action::ListUser | Action::PutUserRoles