Skip to content

Commit 4261679

Browse files
feat: add api to execute clickbench benchmark
use API `/api/v1/benchmark/clickbench` to perform benchmark on clickbench dataset and defined queries add env `PARQUET_FILE` to provide file path of hits.parquet add env `QUERIES_FILE` to provide file path of queries file 3 tries for each query, total 43 queries in the set api response with query no, iteration no, response time in ms
1 parent 887a63f commit 4261679

File tree

5 files changed

+140
-5
lines changed

5 files changed

+140
-5
lines changed

src/handlers/http/clickbench.rs

+123
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
/*
2+
* Parseable Server (C) 2022 - 2024 Parseable, Inc.
3+
*
4+
* This program is free software: you can redistribute it and/or modify
5+
* it under the terms of the GNU Affero General Public License as
6+
* published by the Free Software Foundation, either version 3 of the
7+
* License, or (at your option) any later version.
8+
*
9+
* This program is distributed in the hope that it will be useful,
10+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
11+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12+
* GNU Affero General Public License for more details.
13+
*
14+
* You should have received a copy of the GNU Affero General Public License
15+
* along with this program. If not, see <http://www.gnu.org/licenses/>.
16+
*
17+
*/
18+
19+
use std::{collections::HashMap, env, fs, time::Instant};
20+
21+
use actix_web::{web::Json, Responder};
22+
use datafusion::{
23+
common::plan_datafusion_err,
24+
error::DataFusionError,
25+
execution::{runtime_env::RuntimeEnvBuilder, SessionStateBuilder},
26+
physical_plan::collect,
27+
prelude::{ParquetReadOptions, SessionConfig, SessionContext},
28+
sql::{parser::DFParser, sqlparser::dialect::dialect_from_str},
29+
};
30+
use serde_json::{json, Value};
31+
32+
pub async fn clickbench_benchmark() -> Result<impl Responder, actix_web::Error> {
33+
let results = tokio::task::spawn_blocking(run_benchmark)
34+
.await
35+
.map_err(actix_web::error::ErrorInternalServerError)?
36+
.map_err(actix_web::error::ErrorInternalServerError)?;
37+
Ok(results)
38+
}
39+
40+
#[tokio::main(flavor = "multi_thread")]
41+
pub async fn run_benchmark() -> Result<Json<Value>, anyhow::Error> {
42+
let mut session_config = SessionConfig::from_env()?.with_information_schema(true);
43+
44+
session_config = session_config.with_batch_size(8192);
45+
46+
let rt_builder = RuntimeEnvBuilder::new();
47+
// set memory pool size
48+
let runtime_env = rt_builder.build_arc()?;
49+
let state = SessionStateBuilder::new()
50+
.with_default_features()
51+
.with_config(session_config)
52+
.with_runtime_env(runtime_env)
53+
.build();
54+
state
55+
.catalog_list()
56+
.catalog(&state.config_options().catalog.default_catalog)
57+
.expect("default catalog is provided by datafusion");
58+
59+
let ctx = SessionContext::new_with_state(state);
60+
61+
let mut table_options = HashMap::new();
62+
table_options.insert("binary_as_string", "true");
63+
64+
let parquet_file = env::var("PARQUET_FILE")?;
65+
register_hits(&ctx, &parquet_file).await?;
66+
let mut query_list = Vec::new();
67+
let queries_file = env::var("QUERIES_FILE")?;
68+
let queries = fs::read_to_string(queries_file)?;
69+
for query in queries.lines() {
70+
query_list.push(query.to_string());
71+
}
72+
execute_queries(&ctx, query_list).await
73+
}
74+
75+
async fn register_hits(ctx: &SessionContext, parquet_file: &str) -> Result<(), anyhow::Error> {
76+
let options: ParquetReadOptions<'_> = Default::default();
77+
ctx.register_parquet("hits", parquet_file, options)
78+
.await
79+
.map_err(|e| {
80+
DataFusionError::Context(format!("Registering 'hits' as {parquet_file}"), Box::new(e))
81+
})?;
82+
Ok(())
83+
}
84+
85+
pub async fn execute_queries(
86+
ctx: &SessionContext,
87+
query_list: Vec<String>,
88+
) -> Result<Json<Value>, anyhow::Error> {
89+
const TRIES: usize = 3;
90+
let mut results = Vec::new();
91+
92+
for sql in query_list.iter() {
93+
let mut elapsed_times = Vec::new();
94+
for _iteration in 1..=TRIES {
95+
let start = Instant::now();
96+
let task_ctx = ctx.task_ctx();
97+
let dialect = &task_ctx.session_config().options().sql_parser.dialect;
98+
let dialect = dialect_from_str(dialect).ok_or_else(|| {
99+
plan_datafusion_err!(
100+
"Unsupported SQL dialect: {dialect}. Available dialects: \
101+
Generic, MySQL, PostgreSQL, Hive, SQLite, Snowflake, Redshift, \
102+
MsSQL, ClickHouse, BigQuery, Ansi."
103+
)
104+
})?;
105+
106+
let statements = DFParser::parse_sql_with_dialect(sql, dialect.as_ref())?;
107+
let statement = statements.front().unwrap();
108+
let plan = ctx.state().statement_to_plan(statement.clone()).await?;
109+
110+
let df = ctx.execute_logical_plan(plan).await?;
111+
let physical_plan = df.create_physical_plan().await?;
112+
113+
let _ = collect(physical_plan, task_ctx.clone()).await?;
114+
let elapsed = start.elapsed().as_secs_f64();
115+
elapsed_times.push(elapsed);
116+
}
117+
results.push(elapsed_times);
118+
}
119+
120+
let result_json = json!(results);
121+
122+
Ok(Json(result_json))
123+
}

src/handlers/http/mod.rs

+1
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ use self::{cluster::get_ingestor_info, query::Query};
3030
pub mod about;
3131
pub mod alerts;
3232
mod audit;
33+
pub mod clickbench;
3334
pub mod cluster;
3435
pub mod correlation;
3536
pub mod health_check;

src/handlers/http/modal/server.rs

+13-1
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ use crate::handlers;
2525
use crate::handlers::http::about;
2626
use crate::handlers::http::alerts;
2727
use crate::handlers::http::base_path;
28+
use crate::handlers::http::clickbench;
2829
use crate::handlers::http::health_check;
2930
use crate::handlers::http::query;
3031
use crate::handlers::http::users::dashboards;
@@ -87,7 +88,8 @@ impl ParseableServer for Server {
8788
.service(Self::get_user_role_webscope())
8889
.service(Self::get_counts_webscope())
8990
.service(Self::get_alerts_webscope())
90-
.service(Self::get_metrics_webscope()),
91+
.service(Self::get_metrics_webscope())
92+
.service(Self::get_benchmark_webscope()),
9193
)
9294
.service(Self::get_ingest_otel_factory())
9395
.service(Self::get_generated());
@@ -160,6 +162,16 @@ impl Server {
160162
)
161163
}
162164

165+
pub fn get_benchmark_webscope() -> Scope {
166+
web::scope("/benchmark/clickbench").service(
167+
web::resource("").route(
168+
web::get()
169+
.to(clickbench::clickbench_benchmark)
170+
.authorize(Action::Benchmark),
171+
),
172+
)
173+
}
174+
163175
pub fn get_correlation_webscope() -> Scope {
164176
web::scope("/correlation")
165177
.service(

src/parseable/streams.rs

+1-4
Original file line numberDiff line numberDiff line change
@@ -513,10 +513,7 @@ impl Stream {
513513
let file_size = match file.metadata() {
514514
Ok(meta) => meta.len(),
515515
Err(err) => {
516-
warn!(
517-
"File ({}) not found; Error = {err}",
518-
file.display()
519-
);
516+
warn!("File ({}) not found; Error = {err}", file.display());
520517
continue;
521518
}
522519
};

src/rbac/role.rs

+2
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ pub enum Action {
6767
CreateCorrelation,
6868
DeleteCorrelation,
6969
PutCorrelation,
70+
Benchmark,
7071
}
7172

7273
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
@@ -108,6 +109,7 @@ impl RoleBuilder {
108109
),
109110
Action::Login
110111
| Action::Metrics
112+
| Action::Benchmark
111113
| Action::PutUser
112114
| Action::ListUser
113115
| Action::PutUserRoles

0 commit comments

Comments
 (0)