Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add api to execute clickbench benchmark #1219

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
173 changes: 173 additions & 0 deletions src/handlers/http/clickbench.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
/*
* Parseable Server (C) 2022 - 2024 Parseable, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/

use std::{collections::HashMap, env, fs, process::Command, time::Instant};

use actix_web::{web::Json, Responder};
use datafusion::{
common::plan_datafusion_err,
error::DataFusionError,
execution::{runtime_env::RuntimeEnvBuilder, SessionStateBuilder},
prelude::{ParquetReadOptions, SessionConfig, SessionContext},
sql::{parser::DFParser, sqlparser::dialect::dialect_from_str},
};
use serde_json::{json, Value};
use tracing::{info, warn};
static PARQUET_FILE: &str = "PARQUET_FILE";
static QUERIES_FILE: &str = "QUERIES_FILE";

pub async fn clickbench_benchmark() -> Result<impl Responder, actix_web::Error> {
drop_system_caches()
.await
.map_err(actix_web::error::ErrorInternalServerError)?;
let results = tokio::task::spawn_blocking(run_benchmark)
.await
.map_err(actix_web::error::ErrorInternalServerError)?
.map_err(actix_web::error::ErrorInternalServerError)?;
Ok(results)
}

pub async fn drop_system_caches() -> Result<(), anyhow::Error> {
// Sync to flush file system buffers
match Command::new("sync").status() {
Ok(_) => {}
Err(e) => warn!("Failed to execute sync command: {}", e),
}
let _ = Command::new("sudo")
.args(["sh", "-c", "echo 3 > /proc/sys/vm/drop_caches"])
.output()
.map_err(|e| {
warn!("Failed to drop system caches: {}", e);
anyhow::Error::msg("Failed to drop system caches. This might be expected if not running on Linux or without sudo privileges.")
})?;

Ok(())
}

#[tokio::main(flavor = "multi_thread")]
pub async fn run_benchmark() -> Result<Json<Value>, anyhow::Error> {
let mut session_config = SessionConfig::new().with_information_schema(true);

session_config = session_config.with_batch_size(8192);

let rt_builder = RuntimeEnvBuilder::new();
// set memory pool size
let runtime_env = rt_builder.build_arc()?;
let state = SessionStateBuilder::new()
.with_default_features()
.with_config(session_config)
.with_runtime_env(runtime_env)
.build();
state
.catalog_list()
.catalog(&state.config_options().catalog.default_catalog)
.expect("default catalog is provided by datafusion");

let ctx = SessionContext::new_with_state(state);

let mut table_options = HashMap::new();
table_options.insert("binary_as_string", "true");

let parquet_file = env::var(PARQUET_FILE)
.map_err(|_| anyhow::anyhow!("PARQUET_FILE environment variable not set. Please set it to the path of the hits.parquet file."))?;
register_hits(&ctx, &parquet_file).await?;
info!("hits.parquet registered");
let mut query_list = Vec::new();
let queries_file = env::var(QUERIES_FILE)
.map_err(|_| anyhow::anyhow!("QUERIES_FILE environment variable not set. Please set it to the path of the queries file."))?;
let queries = fs::read_to_string(queries_file)?;
for query in queries.lines() {
query_list.push(query.to_string());
}
execute_queries(&ctx, query_list).await
}

async fn register_hits(ctx: &SessionContext, parquet_file: &str) -> Result<(), anyhow::Error> {
let options: ParquetReadOptions<'_> = ParquetReadOptions::default();
ctx.register_parquet("hits", parquet_file, options)
.await
.map_err(|e| {
DataFusionError::Context(format!("Registering 'hits' as {parquet_file}"), Box::new(e))
})?;
Ok(())
}

pub async fn execute_queries(
ctx: &SessionContext,
query_list: Vec<String>,
) -> Result<Json<Value>, anyhow::Error> {
const TRIES: usize = 3;
let mut results = Vec::with_capacity(query_list.len());
let mut total_elapsed_per_iteration = [0.0; TRIES];
for (query_index, sql) in query_list.iter().enumerate() {
let mut elapsed_times = Vec::with_capacity(TRIES);
for iteration in 1..=TRIES {
let start = Instant::now();
let task_ctx = ctx.task_ctx();
let dialect = &task_ctx.session_config().options().sql_parser.dialect;
let dialect = dialect_from_str(dialect).ok_or_else(|| {
plan_datafusion_err!(
"Unsupported SQL dialect: {dialect}. Available dialects: \
Generic, MySQL, PostgreSQL, Hive, SQLite, Snowflake, Redshift, \
MsSQL, ClickHouse, BigQuery, Ansi."
)
})?;

let statements = DFParser::parse_sql_with_dialect(sql, dialect.as_ref())?;
let statement = statements
.front()
.ok_or_else(|| anyhow::anyhow!("No SQL statement found in query: {}", sql))?;
let plan = ctx.state().statement_to_plan(statement.clone()).await?;

let df = ctx.execute_logical_plan(plan).await?;

let _ = df.collect().await?;
let elapsed = start.elapsed().as_secs_f64();
total_elapsed_per_iteration[iteration - 1] += elapsed;
info!("query {query_index} iteration {iteration} completed in {elapsed} secs");
elapsed_times.push(elapsed);

results.push(json!({
"query_index": query_index,
"query": sql,
"iteration": iteration,
"elapsed_time": elapsed
}));
}
}

let summary: Vec<Value> = total_elapsed_per_iteration
.iter()
.enumerate()
.map(|(iteration, &total_elapsed)| {
json!({
"iteration": iteration + 1,
"total_elapsed": total_elapsed
})
})
.collect();

info!("summary: {:?}", summary);

let result_json = json!({
"summary": summary,
"results": results
});

Ok(Json(result_json))
}
1 change: 1 addition & 0 deletions src/handlers/http/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use self::{cluster::get_ingestor_info, query::Query};
pub mod about;
pub mod alerts;
mod audit;
pub mod clickbench;
pub mod cluster;
pub mod correlation;
pub mod health_check;
Expand Down
14 changes: 13 additions & 1 deletion src/handlers/http/modal/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use crate::handlers;
use crate::handlers::http::about;
use crate::handlers::http::alerts;
use crate::handlers::http::base_path;
use crate::handlers::http::clickbench;
use crate::handlers::http::health_check;
use crate::handlers::http::query;
use crate::handlers::http::users::dashboards;
Expand Down Expand Up @@ -87,7 +88,8 @@ impl ParseableServer for Server {
.service(Self::get_user_role_webscope())
.service(Self::get_counts_webscope())
.service(Self::get_alerts_webscope())
.service(Self::get_metrics_webscope()),
.service(Self::get_metrics_webscope())
.service(Self::get_benchmark_webscope()),
)
.service(Self::get_ingest_otel_factory())
.service(Self::get_generated());
Expand Down Expand Up @@ -160,6 +162,16 @@ impl Server {
)
}

pub fn get_benchmark_webscope() -> Scope {
web::scope("/benchmark/clickbench").service(
web::resource("").route(
web::get()
.to(clickbench::clickbench_benchmark)
.authorize(Action::Benchmark),
),
)
}

pub fn get_correlation_webscope() -> Scope {
web::scope("/correlation")
.service(
Expand Down
5 changes: 1 addition & 4 deletions src/parseable/streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -513,10 +513,7 @@ impl Stream {
let file_size = match file.metadata() {
Ok(meta) => meta.len(),
Err(err) => {
warn!(
"File ({}) not found; Error = {err}",
file.display()
);
warn!("File ({}) not found; Error = {err}", file.display());
continue;
}
};
Expand Down
2 changes: 2 additions & 0 deletions src/rbac/role.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ pub enum Action {
CreateCorrelation,
DeleteCorrelation,
PutCorrelation,
Benchmark,
}

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
Expand Down Expand Up @@ -108,6 +109,7 @@ impl RoleBuilder {
),
Action::Login
| Action::Metrics
| Action::Benchmark
| Action::PutUser
| Action::ListUser
| Action::PutUserRoles
Expand Down
Loading