Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add api to execute clickbench benchmark #1219

Open
wants to merge 5 commits into
base: main
Choose a base branch
from

Conversation

nikhilsinhaparseable
Copy link
Contributor

@nikhilsinhaparseable nikhilsinhaparseable commented Mar 2, 2025

use API /api/v1/benchmark/clickbench
to perform benchmark on clickbench dataset and defined queries

add env PARQUET_FILE to provide file path of hits.parquet add env QUERIES_FILE to provide file path of queries file

3 tries for each query, total 43 queries in the set api response with query no, iteration no, response time in ms

Summary by CodeRabbit

  • New Features

    • Introduced a new in-app benchmarking service that executes SQL queries on data files and returns performance metrics in JSON format.
    • Added a secure web endpoint for accessing the benchmarking functionality at /benchmark/clickbench.
    • Enhanced role-based access by including a new permission for benchmarking actions.
  • Bug Fixes

    • Improved error handling in file metadata retrieval for better logging clarity.

Copy link

coderabbitai bot commented Mar 2, 2025

Walkthrough

A new benchmarking feature has been integrated into the system. A dedicated Rust module has been created to execute SQL queries on Parquet files using the DataFusion library. The feature exposes asynchronous endpoints that initialize a session, register data, and execute queries while returning results in JSON format. Additionally, a corresponding web scope and RBAC permission have been added to support access control. A minor change also refines error logging in the streams module without altering the underlying logic.

Changes

File(s) Change Summary
src/handlers/http/clickbench.rs
src/handlers/http/mod.rs
src/handlers/http/modal/server.rs
Introduced a new benchmarking feature: a dedicated module and HTTP endpoint that uses asynchronous functions to run SQL queries against Parquet files. This includes session setup, table registration, query execution, and authorization via a new RBAC action.
src/parseable/streams.rs Modified error logging in the convert_disk_files_to_parquet method by condensing a multi-line warning into a single line while preserving the error messaging and flow.
src/rbac/role.rs Added a new enum variant Benchmark to the Action enum and updated the associated match statements for permission generation.

Sequence Diagram(s)

sequenceDiagram
    participant C as Client
    participant S as Server (HTTP Router)
    participant CB as Clickbench Handler
    participant R as Benchmark Runner
    participant P as Parquet Data Source

    C->>S: GET /benchmark/clickbench
    S->>CB: Handle benchmarking request
    CB->>R: call run_benchmark()
    R->>R: Initialize session and register Parquet file ("hits")
    R->>R: For each SQL query, parse and generate execution plan
    loop Execute each query 3 times
        R->>P: Execute query on Parquet file
        P-->>R: Return query result and execution time
    end
    R->>CB: Return JSON with benchmark results
    CB-->>S: Pass response
    S-->>C: Send JSON response
Loading

Poem

I'm a rabbit hopping through the byte-filled glade,
New benchmarks bloom, in rust they are laid.
SQL queries dance like leaves in the breeze,
Parquet files whisper secrets with ease.
Routes and roles now sing in a synchronized beat,
A code garden of changes, merry and sweet!
🥕 Keep on hopping through code so neat!

Suggested reviewers

  • parmesant
✨ Finishing Touches
  • 📝 Generate Docstrings

Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Caution

Inline review comments failed to post. This is likely due to GitHub's limits when posting large numbers of comments. If you are seeing this consistently it is likely a permissions issue. Please check "Moderation" -> "Code review limits" under your organization settings.

Actionable comments posted: 1

🧹 Nitpick comments (5)
src/rbac/role.rs (1)

70-70: Add necessary documentation for Benchmark action.

Adding the Benchmark variant is straightforward, but it would be helpful to briefly document its intended usage so that future maintainers know why permissions related to benchmarking must be gated under this action.

Also applies to: 112-112

src/handlers/http/modal/server.rs (2)

28-28: Ensure consistent scope ordering and maintain readability.

New imports and routing entries look fine. However, consider grouping or commenting the .service(Self::get_benchmark_webscope()) call near other related performance/analytics endpoints for clarity.

Also applies to: 91-92


165-173: Implement input validation or usage notes for the benchmark endpoint.

The new /benchmark/clickbench scope is clear and well-defined. However, if environment variables are not set correctly or files are missing, the endpoint may fail. Consider returning a user-friendly error or guidance within the endpoint on how to supply valid environment variables.

src/handlers/http/clickbench.rs (2)

19-31: Consider reorganizing imports for clarity.

The imports are correct and necessary for the functionality. Optionally, you could separate standard library, third-party, and local dependencies into distinct blocks for readability.


75-83: Graceful error handling for table registration.

Registration logic is clear. Consider adding logs for troubleshooting if the parquet file registration fails, to help diagnose environment or file issues quickly.

🛑 Comments failed to post (1)
src/handlers/http/clickbench.rs (1)

40-73: 🛠️ Refactor suggestion

Avoid creating an additional tokio runtime inside an Actix service.

Using #[tokio::main] in run_benchmark() can lead to nested runtimes when running under Actix, which already uses a tokio runtime. Prefer converting run_benchmark into a standard async function and using tokio::spawn rather than nesting a new runtime.

- #[tokio::main(flavor = "multi_thread")]
  pub async fn run_benchmark() -> Result<Json<Value>, anyhow::Error> {
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

pub async fn run_benchmark() -> Result<Json<Value>, anyhow::Error> {
    let mut session_config = SessionConfig::from_env()?.with_information_schema(true);

    session_config = session_config.with_batch_size(8192);

    let rt_builder = RuntimeEnvBuilder::new();
    // set memory pool size
    let runtime_env = rt_builder.build_arc()?;
    let state = SessionStateBuilder::new()
        .with_default_features()
        .with_config(session_config)
        .with_runtime_env(runtime_env)
        .build();
    state
        .catalog_list()
        .catalog(&state.config_options().catalog.default_catalog)
        .expect("default catalog is provided by datafusion");

    let ctx = SessionContext::new_with_state(state);

    let mut table_options = HashMap::new();
    table_options.insert("binary_as_string", "true");

    let parquet_file = env::var("PARQUET_LOCATION")?;
    register_hits(&ctx, &parquet_file).await?;
    let mut query_list = Vec::new();
    let queries_file = env::var("QUERIES_FILE")?;
    let queries = fs::read_to_string(queries_file)?;
    for query in queries.lines() {
        query_list.push(query.to_string());
    }
    execute_queries(&ctx, query_list).await
}

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (2)
src/handlers/http/clickbench.rs (2)

40-58: Session configuration could benefit from customization options.

The session configuration uses hardcoded values which might not be optimal for all systems. Consider making the batch size configurable through environment variables.

- session_config = session_config.with_batch_size(8192);
+ let batch_size = env::var("CLICKBENCH_BATCH_SIZE").unwrap_or_else(|_| "8192".to_string()).parse::<usize>().unwrap_or(8192);
+ session_config = session_config.with_batch_size(batch_size);

120-123: Enhance result format to include more details.

The current JSON result format doesn't provide enough context for analysis. Including additional metadata would make the benchmark results more usable.

- let result_json = json!(results);
+ let result_json = json!({
+     "metadata": {
+         "timestamp": chrono::Utc::now().to_rfc3339(),
+         "num_queries": query_list.len(),
+         "num_iterations": TRIES
+     },
+     "results": results
+ });
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 91bdbfb and a5dc222.

📒 Files selected for processing (5)
  • src/handlers/http/clickbench.rs (1 hunks)
  • src/handlers/http/mod.rs (1 hunks)
  • src/handlers/http/modal/server.rs (3 hunks)
  • src/parseable/streams.rs (1 hunks)
  • src/rbac/role.rs (2 hunks)
🚧 Files skipped from review as they are similar to previous changes (3)
  • src/handlers/http/mod.rs
  • src/rbac/role.rs
  • src/parseable/streams.rs
⏰ Context from checks skipped due to timeout of 90000ms (7)
  • GitHub Check: Build Kafka aarch64-apple-darwin
  • GitHub Check: Build Default aarch64-apple-darwin
  • GitHub Check: Build Kafka x86_64-unknown-linux-gnu
  • GitHub Check: Build Default aarch64-unknown-linux-gnu
  • GitHub Check: Quest Smoke and Load Tests for Standalone deployments
  • GitHub Check: Quest Smoke and Load Tests for Distributed deployments
  • GitHub Check: coverage
🔇 Additional comments (4)
src/handlers/http/modal/server.rs (3)

28-28: Import added for clickbench functionality.

The import statement correctly adds the necessary module for the new clickbench benchmark functionality.


91-92: New benchmark webscope added to server configuration.

The benchmark webscope is properly integrated into the server's route configuration, following the established pattern for other webscopes.


165-173:

❓ Verification inconclusive

Benchmark webscope implementation looks good.

The implementation creates a web scope at the path /benchmark/clickbench with proper authorization using the Action::Benchmark permission.


🏁 Script executed:

#!/bin/bash
# Description: Verify that the Action::Benchmark enum variant is properly defined in the RBAC role module.

# Search for the Benchmark variant in the Action enum
rg -A 1 -B 1 "enum Action" --include="*.rs"
rg "Benchmark," --include="*.rs"

Length of output: 962


Benchmark webscope implementation looks good but requires manual verification of the RBAC configuration.

The implementation creates a web scope at the path /benchmark/clickbench with proper authorization using Action::Benchmark. However, our initial automated search for Action::Benchmark encountered issues due to command syntax. Please manually verify (or re-run the revised search commands below) that the Action::Benchmark variant is correctly defined in the RBAC role module.

Suggested revised verification commands:

# Check the definition of the Action enum, filtering only Rust sources
rg -g '*.rs' -A 1 -B 1 "enum Action"

# Search for the Benchmark variant within Rust files
rg -g '*.rs' "Benchmark,"
src/handlers/http/clickbench.rs (1)

32-38: HTTP handler implementation looks good.

The function correctly spawns a blocking task to run the CPU-intensive benchmark operation, which is a good practice for handling potentially long-running operations in an async web server.

Comment on lines 64 to 93
let parquet_file = env::var("PARQUET_FILE")?;
register_hits(&ctx, &parquet_file).await?;
let mut query_list = Vec::new();
let queries_file = env::var("QUERIES_FILE")?;
let queries = fs::read_to_string(queries_file)?;
for query in queries.lines() {
query_list.push(query.to_string());
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Improve environment variable handling.

The code directly uses environment variables without providing descriptive error messages if they're missing. It would be better to provide helpful error messages when required environment variables are not set.

- let parquet_file = env::var("PARQUET_FILE")?;
+ let parquet_file = env::var("PARQUET_FILE")
+     .map_err(|_| anyhow::anyhow!("PARQUET_FILE environment variable not set. Please set it to the path of the hits.parquet file."))?;

- let queries_file = env::var("QUERIES_FILE")?;
+ let queries_file = env::var("QUERIES_FILE")
+     .map_err(|_| anyhow::anyhow!("QUERIES_FILE environment variable not set. Please set it to the path of the queries file."))?;
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
let parquet_file = env::var("PARQUET_FILE")?;
register_hits(&ctx, &parquet_file).await?;
let mut query_list = Vec::new();
let queries_file = env::var("QUERIES_FILE")?;
let queries = fs::read_to_string(queries_file)?;
for query in queries.lines() {
query_list.push(query.to_string());
}
let parquet_file = env::var("PARQUET_FILE")
.map_err(|_| anyhow::anyhow!("PARQUET_FILE environment variable not set. Please set it to the path of the hits.parquet file."))?;
register_hits(&ctx, &parquet_file).await?;
let mut query_list = Vec::new();
let queries_file = env::var("QUERIES_FILE")
.map_err(|_| anyhow::anyhow!("QUERIES_FILE environment variable not set. Please set it to the path of the queries file."))?;
let queries = fs::read_to_string(queries_file)?;
for query in queries.lines() {
query_list.push(query.to_string());
}

Comment on lines 85 to 159
pub async fn execute_queries(
ctx: &SessionContext,
query_list: Vec<String>,
) -> Result<Json<Value>, anyhow::Error> {
const TRIES: usize = 3;
let mut results = Vec::new();

for sql in query_list.iter() {
let mut elapsed_times = Vec::new();
for _iteration in 1..=TRIES {
let start = Instant::now();
let task_ctx = ctx.task_ctx();
let dialect = &task_ctx.session_config().options().sql_parser.dialect;
let dialect = dialect_from_str(dialect).ok_or_else(|| {
plan_datafusion_err!(
"Unsupported SQL dialect: {dialect}. Available dialects: \
Generic, MySQL, PostgreSQL, Hive, SQLite, Snowflake, Redshift, \
MsSQL, ClickHouse, BigQuery, Ansi."
)
})?;

let statements = DFParser::parse_sql_with_dialect(sql, dialect.as_ref())?;
let statement = statements.front().unwrap();
let plan = ctx.state().statement_to_plan(statement.clone()).await?;

let df = ctx.execute_logical_plan(plan).await?;
let physical_plan = df.create_physical_plan().await?;

let _ = collect(physical_plan, task_ctx.clone()).await?;
let elapsed = start.elapsed().as_secs_f64();
elapsed_times.push(elapsed);
}
results.push(elapsed_times);
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Improve benchmark results structure and error handling.

The current implementation has several issues:

  1. The results don't include query identifiers, making it hard to interpret which elapsed times correspond to which query
  2. There's no error handling if statements.front() returns None
  3. Pre-allocating vectors would improve performance slightly
 pub async fn execute_queries(
     ctx: &SessionContext,
     query_list: Vec<String>,
 ) -> Result<Json<Value>, anyhow::Error> {
     const TRIES: usize = 3;
-    let mut results = Vec::new();
+    let mut results = Vec::with_capacity(query_list.len());

-    for sql in query_list.iter() {
-        let mut elapsed_times = Vec::new();
+    for (query_index, sql) in query_list.iter().enumerate() {
+        let mut elapsed_times = Vec::with_capacity(TRIES);
         for _iteration in 1..=TRIES {
             let start = Instant::now();
             let task_ctx = ctx.task_ctx();
             let dialect = &task_ctx.session_config().options().sql_parser.dialect;
             let dialect = dialect_from_str(dialect).ok_or_else(|| {
                 plan_datafusion_err!(
                     "Unsupported SQL dialect: {dialect}. Available dialects: \
                      Generic, MySQL, PostgreSQL, Hive, SQLite, Snowflake, Redshift, \
                      MsSQL, ClickHouse, BigQuery, Ansi."
                 )
             })?;

             let statements = DFParser::parse_sql_with_dialect(sql, dialect.as_ref())?;
-            let statement = statements.front().unwrap();
+            let statement = statements.front()
+                .ok_or_else(|| anyhow::anyhow!("No SQL statement found in query: {}", sql))?;
             let plan = ctx.state().statement_to_plan(statement.clone()).await?;

             let df = ctx.execute_logical_plan(plan).await?;
             let physical_plan = df.create_physical_plan().await?;

             let _ = collect(physical_plan, task_ctx.clone()).await?;
             let elapsed = start.elapsed().as_secs_f64();
             elapsed_times.push(elapsed);
         }
-        results.push(elapsed_times);
+        results.push(json!({
+            "query_index": query_index,
+            "query": sql,
+            "elapsed_times": elapsed_times
+        }));
     }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
pub async fn execute_queries(
ctx: &SessionContext,
query_list: Vec<String>,
) -> Result<Json<Value>, anyhow::Error> {
const TRIES: usize = 3;
let mut results = Vec::new();
for sql in query_list.iter() {
let mut elapsed_times = Vec::new();
for _iteration in 1..=TRIES {
let start = Instant::now();
let task_ctx = ctx.task_ctx();
let dialect = &task_ctx.session_config().options().sql_parser.dialect;
let dialect = dialect_from_str(dialect).ok_or_else(|| {
plan_datafusion_err!(
"Unsupported SQL dialect: {dialect}. Available dialects: \
Generic, MySQL, PostgreSQL, Hive, SQLite, Snowflake, Redshift, \
MsSQL, ClickHouse, BigQuery, Ansi."
)
})?;
let statements = DFParser::parse_sql_with_dialect(sql, dialect.as_ref())?;
let statement = statements.front().unwrap();
let plan = ctx.state().statement_to_plan(statement.clone()).await?;
let df = ctx.execute_logical_plan(plan).await?;
let physical_plan = df.create_physical_plan().await?;
let _ = collect(physical_plan, task_ctx.clone()).await?;
let elapsed = start.elapsed().as_secs_f64();
elapsed_times.push(elapsed);
}
results.push(elapsed_times);
}
pub async fn execute_queries(
ctx: &SessionContext,
query_list: Vec<String>,
) -> Result<Json<Value>, anyhow::Error> {
const TRIES: usize = 3;
let mut results = Vec::with_capacity(query_list.len());
for (query_index, sql) in query_list.iter().enumerate() {
let mut elapsed_times = Vec::with_capacity(TRIES);
for _iteration in 1..=TRIES {
let start = Instant::now();
let task_ctx = ctx.task_ctx();
let dialect = &task_ctx.session_config().options().sql_parser.dialect;
let dialect = dialect_from_str(dialect).ok_or_else(|| {
plan_datafusion_err!(
"Unsupported SQL dialect: {dialect}. Available dialects: \
Generic, MySQL, PostgreSQL, Hive, SQLite, Snowflake, Redshift, \
MsSQL, ClickHouse, BigQuery, Ansi."
)
})?;
let statements = DFParser::parse_sql_with_dialect(sql, dialect.as_ref())?;
let statement = statements.front()
.ok_or_else(|| anyhow::anyhow!("No SQL statement found in query: {}", sql))?;
let plan = ctx.state().statement_to_plan(statement.clone()).await?;
let df = ctx.execute_logical_plan(plan).await?;
let physical_plan = df.create_physical_plan().await?;
let _ = collect(physical_plan, task_ctx.clone()).await?;
let elapsed = start.elapsed().as_secs_f64();
elapsed_times.push(elapsed);
}
results.push(json!({
"query_index": query_index,
"query": sql,
"elapsed_times": elapsed_times
}));
}
// Assuming there is further handling of `results` before returning
}

use API `/api/v1/benchmark/clickbench`
to perform benchmark on clickbench dataset and defined queries

add env `PARQUET_FILE` to provide file path of hits.parquet
add env `QUERIES_FILE` to provide file path of queries file

3 tries for each query, total 43 queries in the set
api response with query no, iteration no, response time in ms
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🧹 Nitpick comments (2)
src/handlers/http/clickbench.rs (2)

111-115: Consider making benchmark parameters configurable.

The number of tries (3) is hardcoded. While this works, it might be better to make this configurable via environment variables or configuration parameters.

+static DEFAULT_TRIES: usize = 3;
+
pub async fn execute_queries(
    ctx: &SessionContext,
    query_list: Vec<String>,
) -> Result<Json<Value>, anyhow::Error> {
-   const TRIES: usize = 3;
+   let tries = env::var("BENCHMARK_TRIES")
+       .ok()
+       .and_then(|v| v.parse::<usize>().ok())
+       .unwrap_or(DEFAULT_TRIES);
-   let mut results = Vec::with_capacity(query_list.len());
+   let mut results = Vec::with_capacity(query_list.len());
-   let mut total_elapsed_per_iteration = [0.0; TRIES];
+   let mut total_elapsed_per_iteration = vec![0.0; tries];

59-95: Consider adding memory limits for benchmarking consistency.

The benchmark doesn't specify memory limits explicitly, which could lead to inconsistent results across different environments. Consider setting memory limits for more consistent benchmarking.

    let mut session_config = SessionConfig::from_env()?.with_information_schema(true);
    session_config = session_config.with_batch_size(8192);
+   // Set memory limit for consistent benchmarking
+   session_config = session_config.with_memory_limit(Some(4 * 1024 * 1024 * 1024)); // 4GB limit

    let rt_builder = RuntimeEnvBuilder::new();
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between a5dc222 and d694a70.

📒 Files selected for processing (5)
  • src/handlers/http/clickbench.rs (1 hunks)
  • src/handlers/http/mod.rs (1 hunks)
  • src/handlers/http/modal/server.rs (3 hunks)
  • src/parseable/streams.rs (1 hunks)
  • src/rbac/role.rs (2 hunks)
🚧 Files skipped from review as they are similar to previous changes (4)
  • src/handlers/http/mod.rs
  • src/rbac/role.rs
  • src/handlers/http/modal/server.rs
  • src/parseable/streams.rs
⏰ Context from checks skipped due to timeout of 90000ms (10)
  • GitHub Check: Build Default x86_64-pc-windows-msvc
  • GitHub Check: Build Default aarch64-apple-darwin
  • GitHub Check: Build Default x86_64-apple-darwin
  • GitHub Check: Build Kafka aarch64-apple-darwin
  • GitHub Check: Build Default aarch64-unknown-linux-gnu
  • GitHub Check: Build Kafka x86_64-unknown-linux-gnu
  • GitHub Check: Quest Smoke and Load Tests for Standalone deployments
  • GitHub Check: Quest Smoke and Load Tests for Distributed deployments
  • GitHub Check: Build Default x86_64-unknown-linux-gnu
  • GitHub Check: coverage
🔇 Additional comments (3)
src/handlers/http/clickbench.rs (3)

83-90: LGTM: Improved environment variable error handling.

Good job implementing detailed error messages for environment variables. This matches the recommended approach from previous reviews.


97-105: LGTM: Good error handling in register_hits function.

The function properly wraps DataFusion errors with context about what operation was being performed, which will make debugging issues easier.


130-134: LGTM: Proper error handling for SQL parsing.

Good implementation of error handling when parsing SQL statements, particularly for the case when no SQL statement is found.

Comment on lines 153 to 159
for (iteration, total_elapsed) in total_elapsed_per_iteration.iter().enumerate() {
warn!(
"Total time for iteration {}: {} seconds",
iteration + 1,
total_elapsed
);
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Use appropriate log levels for summary information.

Similar to the previous comment, use info! level for logging summary information instead of warn!.

-   warn!(
+   info!(
        "Total time for iteration {}: {} seconds",
        iteration + 1,
        total_elapsed
    );
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
for (iteration, total_elapsed) in total_elapsed_per_iteration.iter().enumerate() {
warn!(
"Total time for iteration {}: {} seconds",
iteration + 1,
total_elapsed
);
}
for (iteration, total_elapsed) in total_elapsed_per_iteration.iter().enumerate() {
info!(
"Total time for iteration {}: {} seconds",
iteration + 1,
total_elapsed
);
}

Comment on lines 46 to 57
pub async fn drop_system_caches() -> Result<(), anyhow::Error> {
// Sync to flush file system buffers
Command::new("sync")
.status()
.expect("Failed to execute sync command");
let _ = Command::new("sudo")
.args(["sh", "-c", "echo 3 > /proc/sys/vm/drop_caches"])
.output()
.map_err(|e| anyhow::Error::msg(e.to_string()))?;

Ok(())
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Platform dependency issue in system cache clearing.

The drop_system_caches function has several issues:

  1. It uses Linux-specific commands (/proc/sys/vm/drop_caches) without checking if the platform is Linux
  2. It requires sudo privileges which may not be available in all environments
  3. It uses expect() on the sync command which will panic if the command fails

Consider refactoring to make this cross-platform compatible or at least gracefully handle cases where these operations aren't supported:

pub async fn drop_system_caches() -> Result<(), anyhow::Error> {
    // Sync to flush file system buffers
-   Command::new("sync")
-       .status()
-       .expect("Failed to execute sync command");
+   match Command::new("sync").status() {
+       Ok(_) => {}
+       Err(e) => warn!("Failed to execute sync command: {}", e),
+   }
    let _ = Command::new("sudo")
        .args(["sh", "-c", "echo 3 > /proc/sys/vm/drop_caches"])
        .output()
-       .map_err(|e| anyhow::Error::msg(e.to_string()))?;
+       .map_err(|e| {
+           warn!("Failed to drop system caches: {}", e);
+           anyhow::Error::msg("Failed to drop system caches. This might be expected if not running on Linux or without sudo privileges.")
+       })?;

    Ok(())
}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
pub async fn drop_system_caches() -> Result<(), anyhow::Error> {
// Sync to flush file system buffers
Command::new("sync")
.status()
.expect("Failed to execute sync command");
let _ = Command::new("sudo")
.args(["sh", "-c", "echo 3 > /proc/sys/vm/drop_caches"])
.output()
.map_err(|e| anyhow::Error::msg(e.to_string()))?;
Ok(())
}
pub async fn drop_system_caches() -> Result<(), anyhow::Error> {
// Sync to flush file system buffers
match Command::new("sync").status() {
Ok(_) => {},
Err(e) => warn!("Failed to execute sync command: {}", e),
}
let _ = Command::new("sudo")
.args(["sh", "-c", "echo 3 > /proc/sys/vm/drop_caches"])
.output()
.map_err(|e| {
warn!("Failed to drop system caches: {}", e);
anyhow::Error::msg("Failed to drop system caches. This might be expected if not running on Linux or without sudo privileges.")
})?;
Ok(())
}

Comment on lines 116 to 152
for (query_index, sql) in query_list.iter().enumerate() {
let mut elapsed_times = Vec::with_capacity(TRIES);
for iteration in 1..=TRIES {
let start = Instant::now();
let task_ctx = ctx.task_ctx();
let dialect = &task_ctx.session_config().options().sql_parser.dialect;
let dialect = dialect_from_str(dialect).ok_or_else(|| {
plan_datafusion_err!(
"Unsupported SQL dialect: {dialect}. Available dialects: \
Generic, MySQL, PostgreSQL, Hive, SQLite, Snowflake, Redshift, \
MsSQL, ClickHouse, BigQuery, Ansi."
)
})?;

let statements = DFParser::parse_sql_with_dialect(sql, dialect.as_ref())?;
let statement = statements
.front()
.ok_or_else(|| anyhow::anyhow!("No SQL statement found in query: {}", sql))?;
let plan = ctx.state().statement_to_plan(statement.clone()).await?;

let df = ctx.execute_logical_plan(plan).await?;
let physical_plan = df.create_physical_plan().await?;

let _ = collect(physical_plan, task_ctx.clone()).await?;
let elapsed = start.elapsed().as_secs_f64();
total_elapsed_per_iteration[iteration - 1] += elapsed;

warn!("query {query_count} iteration {iteration} completed in {elapsed} secs");
elapsed_times.push(elapsed);
}
query_count += 1;
results.push(json!({
"query_index": query_index,
"query": sql,
"elapsed_times": elapsed_times
}));
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Use appropriate log levels instead of warn for normal operations.

Using warn! for routine benchmark information isn't appropriate as warnings should be reserved for actual warnings. Consider using info! or debug! instead.

-           warn!("query {query_count} iteration {iteration} completed in {elapsed} secs");
+           info!("query {query_count} iteration {iteration} completed in {elapsed} secs");

Additionally, the outer loop structure and result collection look good with proper capacity pre-allocation and comprehensive result structure.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
for (query_index, sql) in query_list.iter().enumerate() {
let mut elapsed_times = Vec::with_capacity(TRIES);
for iteration in 1..=TRIES {
let start = Instant::now();
let task_ctx = ctx.task_ctx();
let dialect = &task_ctx.session_config().options().sql_parser.dialect;
let dialect = dialect_from_str(dialect).ok_or_else(|| {
plan_datafusion_err!(
"Unsupported SQL dialect: {dialect}. Available dialects: \
Generic, MySQL, PostgreSQL, Hive, SQLite, Snowflake, Redshift, \
MsSQL, ClickHouse, BigQuery, Ansi."
)
})?;
let statements = DFParser::parse_sql_with_dialect(sql, dialect.as_ref())?;
let statement = statements
.front()
.ok_or_else(|| anyhow::anyhow!("No SQL statement found in query: {}", sql))?;
let plan = ctx.state().statement_to_plan(statement.clone()).await?;
let df = ctx.execute_logical_plan(plan).await?;
let physical_plan = df.create_physical_plan().await?;
let _ = collect(physical_plan, task_ctx.clone()).await?;
let elapsed = start.elapsed().as_secs_f64();
total_elapsed_per_iteration[iteration - 1] += elapsed;
warn!("query {query_count} iteration {iteration} completed in {elapsed} secs");
elapsed_times.push(elapsed);
}
query_count += 1;
results.push(json!({
"query_index": query_index,
"query": sql,
"elapsed_times": elapsed_times
}));
}
for (query_index, sql) in query_list.iter().enumerate() {
let mut elapsed_times = Vec::with_capacity(TRIES);
for iteration in 1..=TRIES {
let start = Instant::now();
let task_ctx = ctx.task_ctx();
let dialect = &task_ctx.session_config().options().sql_parser.dialect;
let dialect = dialect_from_str(dialect).ok_or_else(|| {
plan_datafusion_err!(
"Unsupported SQL dialect: {dialect}. Available dialects: \
Generic, MySQL, PostgreSQL, Hive, SQLite, Snowflake, Redshift, \
MsSQL, ClickHouse, BigQuery, Ansi."
)
})?;
let statements = DFParser::parse_sql_with_dialect(sql, dialect.as_ref())?;
let statement = statements
.front()
.ok_or_else(|| anyhow::anyhow!("No SQL statement found in query: {}", sql))?;
let plan = ctx.state().statement_to_plan(statement.clone()).await?;
let df = ctx.execute_logical_plan(plan).await?;
let physical_plan = df.create_physical_plan().await?;
let _ = collect(physical_plan, task_ctx.clone()).await?;
let elapsed = start.elapsed().as_secs_f64();
total_elapsed_per_iteration[iteration - 1] += elapsed;
info!("query {query_count} iteration {iteration} completed in {elapsed} secs");
elapsed_times.push(elapsed);
}
query_count += 1;
results.push(json!({
"query_index": query_index,
"query": sql,
"elapsed_times": elapsed_times
}));
}

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (2)
src/handlers/http/clickbench.rs (2)

62-62: Avoid multiple tokio::main declarations in library code.

Placing #[tokio::main] in this function makes it behave like an application entrypoint, which can lead to conflicts when the broader codebase includes other asynchronous main functions or test harnesses. Consider refactoring to allow a single main entrypoint, particularly if you plan to call run_benchmark from elsewhere.


91-96: Use asynchronous file IO to avoid blocking within an async context.

Reading large files synchronously in an async function can block the thread. Refactor to use tokio::fs to avoid blocking calls:

-    let queries = fs::read_to_string(queries_file)?;
-    for query in queries.lines() {
-        query_list.push(query.to_string());
-    }
+    let queries = tokio::fs::read_to_string(queries_file).await?;
+    for line in queries.lines() {
+        query_list.push(line.to_string());
+    }
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between d694a70 and 07199ee.

📒 Files selected for processing (1)
  • src/handlers/http/clickbench.rs (1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (10)
  • GitHub Check: Build Default x86_64-pc-windows-msvc
  • GitHub Check: Build Default aarch64-apple-darwin
  • GitHub Check: Build Default x86_64-apple-darwin
  • GitHub Check: Build Default aarch64-unknown-linux-gnu
  • GitHub Check: Build Default x86_64-unknown-linux-gnu
  • GitHub Check: Quest Smoke and Load Tests for Distributed deployments
  • GitHub Check: Build Kafka aarch64-apple-darwin
  • GitHub Check: coverage
  • GitHub Check: Build Kafka x86_64-unknown-linux-gnu
  • GitHub Check: Quest Smoke and Load Tests for Standalone deployments
🔇 Additional comments (1)
src/handlers/http/clickbench.rs (1)

45-60: Consider making drop_system_caches cross-platform or optional.

This logic depends on Linux-specific paths and privileges. A previous review already flagged this concern. The function should either gracefully detect non-Linux environments or skip the cache-drop step when sudo privileges aren’t available, rather than unconditionally attempting the command and returning an error.

Comment on lines 145 to 152
results.push(json!({
"query_index": query_index,
"query": sql,
"elapsed_times": {
"iteration": iteration + 1,
"elapsed_time": elapsed_times
}
}));
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix iteration numbering and avoid pushing growing arrays for each iteration.

Right now, each iteration adds a JSON object containing the entire elapsed_times, and uses iteration + 1 in one place but logs iteration in another. This inconsistency can confuse observers. Consider storing a single “elapsed_time” per iteration U and logging iteration consistently:

         results.push(json!({
             "query_index": query_index,
             "query": sql,
-            "elapsed_times": {
-              "iteration": iteration + 1,
-              "elapsed_time": elapsed_times
-            }
+            "iteration": iteration,
+            "elapsed_time": elapsed
         }));
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
results.push(json!({
"query_index": query_index,
"query": sql,
"elapsed_times": {
"iteration": iteration + 1,
"elapsed_time": elapsed_times
}
}));
results.push(json!({
"query_index": query_index,
"query": sql,
"iteration": iteration,
"elapsed_time": elapsed
}));

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (4)
src/handlers/http/clickbench.rs (4)

34-43: Consider simplifying the async invocation rather than using spawn_blocking.
Because run_benchmark is itself an async function, using tokio::task::spawn_blocking(run_benchmark) introduces an extra layer you may not need if the tasks within run_benchmark are non-blocking. If some tasks are intensive, you can continue using spawn_blocking, but ensure the approach is intentional.


62-98: Remove or use the unused table_options map.
You define table_options but do not reference it when registering the Parquet file or configuring the session. Either remove it to reduce noise or incorporate it if you intend to apply specialized read options.

     let mut table_options = HashMap::new();
     table_options.insert("binary_as_string", "true");
-    // Currently unused

110-116: Increase the capacity of the results vector to accommodate all iterations.
You allocate Vec::with_capacity(query_list.len()) but add 3 entries per query (for each iteration). To avoid reallocations, adjust the capacity to query_list.len() * TRIES.

-    let mut results = Vec::with_capacity(query_list.len());
+    let mut results = Vec::with_capacity(query_list.len() * TRIES);

118-119: Remove or utilize elapsed_times.
You populate elapsed_times but never use it after pushing values, since you push each iteration’s results directly into results. Consider either removing elapsed_times or structuring the final results to present each query's times in a nested array.

-       let mut elapsed_times = Vec::with_capacity(TRIES);
        for iteration in 1..=TRIES {
            ...
-           elapsed_times.push(elapsed);
        }
-       // Not used anymore
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 07199ee and 449ef80.

📒 Files selected for processing (1)
  • src/handlers/http/clickbench.rs (1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (10)
  • GitHub Check: Build Default x86_64-pc-windows-msvc
  • GitHub Check: Build Default aarch64-apple-darwin
  • GitHub Check: Build Default x86_64-apple-darwin
  • GitHub Check: Build Default aarch64-unknown-linux-gnu
  • GitHub Check: Build Kafka aarch64-apple-darwin
  • GitHub Check: Quest Smoke and Load Tests for Standalone deployments
  • GitHub Check: Build Default x86_64-unknown-linux-gnu
  • GitHub Check: coverage
  • GitHub Check: Quest Smoke and Load Tests for Distributed deployments
  • GitHub Check: Build Kafka x86_64-unknown-linux-gnu
🔇 Additional comments (1)
src/handlers/http/clickbench.rs (1)

45-60: Platform dependency note for cache clearing.
This function is tightly coupled to Linux (writing to /proc/sys/vm/drop_caches) and also requires sudo privileges. If you intended multi-platform support, consider gracefully handling non-Linux environments or skipping cache dropping in those cases.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (2)
src/handlers/http/clickbench.rs (2)

89-91: Consider ignoring or trimming blank lines in the queries file.

If the queries file contains comments or empty lines, attempting to parse them will cause an error. Below is a quick fix to skip blank lines:

 for query in queries.lines() {
-    query_list.push(query.to_string());
+    let trimmed = query.trim();
+    if !trimmed.is_empty() {
+        query_list.push(trimmed.to_string());
+    }
 }

143-143: Consider reducing log verbosity for query iteration.

Repeated “info!” logs can quickly clutter the console during benchmarking. If these logs are mainly for debugging, use debug! instead.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 449ef80 and 8ef2014.

📒 Files selected for processing (1)
  • src/handlers/http/clickbench.rs (1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (10)
  • GitHub Check: Build Default x86_64-pc-windows-msvc
  • GitHub Check: Build Default aarch64-apple-darwin
  • GitHub Check: Build Default x86_64-apple-darwin
  • GitHub Check: Build Kafka aarch64-apple-darwin
  • GitHub Check: Build Default aarch64-unknown-linux-gnu
  • GitHub Check: Build Kafka x86_64-unknown-linux-gnu
  • GitHub Check: Quest Smoke and Load Tests for Standalone deployments
  • GitHub Check: coverage
  • GitHub Check: Build Default x86_64-unknown-linux-gnu
  • GitHub Check: Quest Smoke and Load Tests for Distributed deployments
🔇 Additional comments (2)
src/handlers/http/clickbench.rs (2)

44-59: Platform dependency issue in system cache clearing.

This code is effectively Linux-specific and requires sudo privileges. Please note that this was flagged in a previous discussion.


92-95: Verify concurrency approach when offloading query execution.

Calling the asynchronous execute_queries function directly inside tokio::task::spawn_blocking can be problematic. Typically, spawn_blocking is meant for synchronous or CPU-bound work, and it may cause confusion in an already async context.

Comment on lines +110 to +111
#[tokio::main(flavor = "multi_thread")]
pub async fn execute_queries(
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Remove or revise the #[tokio::main] attribute for this library function.

Using #[tokio::main] in a function called by Actix (which already runs under a Tokio runtime) can lead to conflicts. If you need a runtime in this context, consider reusing the Actix runtime or using tokio::spawn instead:

-#[tokio::main(flavor = "multi_thread")]
 pub async fn execute_queries(
     ctx: &SessionContext,
     query_list: Vec<String>,
 ) -> Result<Json<Value>, anyhow::Error> {
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
#[tokio::main(flavor = "multi_thread")]
pub async fn execute_queries(
pub async fn execute_queries(
ctx: &SessionContext,
query_list: Vec<String>,
) -> Result<Json<Value>, anyhow::Error> {

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant