@@ -23,12 +23,11 @@ use datafusion::{
23
23
common:: plan_datafusion_err,
24
24
error:: DataFusionError ,
25
25
execution:: { runtime_env:: RuntimeEnvBuilder , SessionStateBuilder } ,
26
- physical_plan:: collect,
27
26
prelude:: { ParquetReadOptions , SessionConfig , SessionContext } ,
28
27
sql:: { parser:: DFParser , sqlparser:: dialect:: dialect_from_str} ,
29
28
} ;
30
29
use serde_json:: { json, Value } ;
31
- use tracing:: warn;
30
+ use tracing:: { info , warn} ;
32
31
static PARQUET_FILE : & str = "PARQUET_FILE" ;
33
32
static QUERIES_FILE : & str = "QUERIES_FILE" ;
34
33
@@ -45,20 +44,24 @@ pub async fn clickbench_benchmark() -> Result<impl Responder, actix_web::Error>
45
44
46
45
pub async fn drop_system_caches ( ) -> Result < ( ) , anyhow:: Error > {
47
46
// Sync to flush file system buffers
48
- Command :: new ( "sync" )
49
- . status ( )
50
- . expect ( "Failed to execute sync command" ) ;
47
+ match Command :: new ( "sync" ) . status ( ) {
48
+ Ok ( _) => { }
49
+ Err ( e) => warn ! ( "Failed to execute sync command: {}" , e) ,
50
+ }
51
51
let _ = Command :: new ( "sudo" )
52
- . args ( [ "sh" , "-c" , "echo 3 > /proc/sys/vm/drop_caches" ] )
53
- . output ( )
54
- . map_err ( |e| anyhow:: Error :: msg ( e. to_string ( ) ) ) ?;
52
+ . args ( [ "sh" , "-c" , "echo 3 > /proc/sys/vm/drop_caches" ] )
53
+ . output ( )
54
+ . map_err ( |e| {
55
+ warn ! ( "Failed to drop system caches: {}" , e) ;
56
+ anyhow:: Error :: msg ( "Failed to drop system caches. This might be expected if not running on Linux or without sudo privileges." )
57
+ } ) ?;
55
58
56
59
Ok ( ( ) )
57
60
}
58
61
59
62
#[ tokio:: main( flavor = "multi_thread" ) ]
60
63
pub async fn run_benchmark ( ) -> Result < Json < Value > , anyhow:: Error > {
61
- let mut session_config = SessionConfig :: from_env ( ) ? . with_information_schema ( true ) ;
64
+ let mut session_config = SessionConfig :: new ( ) . with_information_schema ( true ) ;
62
65
63
66
session_config = session_config. with_batch_size ( 8192 ) ;
64
67
@@ -81,12 +84,12 @@ pub async fn run_benchmark() -> Result<Json<Value>, anyhow::Error> {
81
84
table_options. insert ( "binary_as_string" , "true" ) ;
82
85
83
86
let parquet_file = env:: var ( PARQUET_FILE )
84
- . map_err ( |_| anyhow:: anyhow!( "PARQUET_FILE environment variable not set. Please set it to the path of the hits.parquet file." ) ) ?;
87
+ . map_err ( |_| anyhow:: anyhow!( "PARQUET_FILE environment variable not set. Please set it to the path of the hits.parquet file." ) ) ?;
85
88
register_hits ( & ctx, & parquet_file) . await ?;
86
- println ! ( "hits registered" ) ;
89
+ info ! ( "hits.parquet registered" ) ;
87
90
let mut query_list = Vec :: new ( ) ;
88
91
let queries_file = env:: var ( QUERIES_FILE )
89
- . map_err ( |_| anyhow:: anyhow!( "QUERIES_FILE environment variable not set. Please set it to the path of the queries file." ) ) ?;
92
+ . map_err ( |_| anyhow:: anyhow!( "QUERIES_FILE environment variable not set. Please set it to the path of the queries file." ) ) ?;
90
93
let queries = fs:: read_to_string ( queries_file) ?;
91
94
for query in queries. lines ( ) {
92
95
query_list. push ( query. to_string ( ) ) ;
@@ -110,9 +113,7 @@ pub async fn execute_queries(
110
113
) -> Result < Json < Value > , anyhow:: Error > {
111
114
const TRIES : usize = 3 ;
112
115
let mut results = Vec :: with_capacity ( query_list. len ( ) ) ;
113
- let mut query_count = 1 ;
114
116
let mut total_elapsed_per_iteration = [ 0.0 ; TRIES ] ;
115
-
116
117
for ( query_index, sql) in query_list. iter ( ) . enumerate ( ) {
117
118
let mut elapsed_times = Vec :: with_capacity ( TRIES ) ;
118
119
for iteration in 1 ..=TRIES {
@@ -122,8 +123,8 @@ pub async fn execute_queries(
122
123
let dialect = dialect_from_str ( dialect) . ok_or_else ( || {
123
124
plan_datafusion_err ! (
124
125
"Unsupported SQL dialect: {dialect}. Available dialects: \
125
- Generic, MySQL, PostgreSQL, Hive, SQLite, Snowflake, Redshift, \
126
- MsSQL, ClickHouse, BigQuery, Ansi."
126
+ Generic, MySQL, PostgreSQL, Hive, SQLite, Snowflake, Redshift, \
127
+ MsSQL, ClickHouse, BigQuery, Ansi."
127
128
)
128
129
} ) ?;
129
130
@@ -134,31 +135,41 @@ pub async fn execute_queries(
134
135
let plan = ctx. state ( ) . statement_to_plan ( statement. clone ( ) ) . await ?;
135
136
136
137
let df = ctx. execute_logical_plan ( plan) . await ?;
137
- let physical_plan = df. create_physical_plan ( ) . await ?;
138
138
139
- let _ = collect ( physical_plan , task_ctx . clone ( ) ) . await ?;
139
+ let _ = df . collect ( ) . await ?;
140
140
let elapsed = start. elapsed ( ) . as_secs_f64 ( ) ;
141
141
total_elapsed_per_iteration[ iteration - 1 ] += elapsed;
142
-
143
- warn ! ( "query {query_count} iteration {iteration} completed in {elapsed} secs" ) ;
142
+ info ! ( "query {query_index} iteration {iteration} completed in {elapsed} secs" ) ;
144
143
elapsed_times. push ( elapsed) ;
144
+
145
+ results. push ( json ! ( {
146
+ "query_index" : query_index,
147
+ "query" : sql,
148
+ "elapsed_times" : {
149
+ "iteration" : iteration + 1 ,
150
+ "elapsed_time" : elapsed_times
151
+ }
152
+ } ) ) ;
145
153
}
146
- query_count += 1 ;
147
- results. push ( json ! ( {
148
- "query_index" : query_index,
149
- "query" : sql,
150
- "elapsed_times" : elapsed_times
151
- } ) ) ;
152
- }
153
- for ( iteration, total_elapsed) in total_elapsed_per_iteration. iter ( ) . enumerate ( ) {
154
- warn ! (
155
- "Total time for iteration {}: {} seconds" ,
156
- iteration + 1 ,
157
- total_elapsed
158
- ) ;
159
154
}
160
155
161
- let result_json = json ! ( results) ;
156
+ let summary: Vec < Value > = total_elapsed_per_iteration
157
+ . iter ( )
158
+ . enumerate ( )
159
+ . map ( |( iteration, & total_elapsed) | {
160
+ json ! ( {
161
+ "iteration" : iteration + 1 ,
162
+ "total_elapsed" : total_elapsed
163
+ } )
164
+ } )
165
+ . collect ( ) ;
166
+
167
+ info ! ( "summary: {:?}" , summary) ;
168
+
169
+ let result_json = json ! ( {
170
+ "summary" : summary,
171
+ "results" : results
172
+ } ) ;
162
173
163
174
Ok ( Json ( result_json) )
164
175
}
0 commit comments