16
16
*
17
17
*/
18
18
19
- use std:: { collections:: HashMap , env, fs, time:: Instant } ;
19
+ use std:: { collections:: HashMap , env, fs, process :: Command , time:: Instant } ;
20
20
21
21
use actix_web:: { web:: Json , Responder } ;
22
22
use datafusion:: {
@@ -28,15 +28,34 @@ use datafusion::{
28
28
sql:: { parser:: DFParser , sqlparser:: dialect:: dialect_from_str} ,
29
29
} ;
30
30
use serde_json:: { json, Value } ;
31
+ use tracing:: warn;
32
+ static PARQUET_FILE : & str = "PARQUET_FILE" ;
33
+ static QUERIES_FILE : & str = "QUERIES_FILE" ;
31
34
32
35
pub async fn clickbench_benchmark ( ) -> Result < impl Responder , actix_web:: Error > {
36
+ drop_system_caches ( )
37
+ . await
38
+ . map_err ( actix_web:: error:: ErrorInternalServerError ) ?;
33
39
let results = tokio:: task:: spawn_blocking ( run_benchmark)
34
40
. await
35
41
. map_err ( actix_web:: error:: ErrorInternalServerError ) ?
36
42
. map_err ( actix_web:: error:: ErrorInternalServerError ) ?;
37
43
Ok ( results)
38
44
}
39
45
46
+ pub async fn drop_system_caches ( ) -> Result < ( ) , anyhow:: Error > {
47
+ // Sync to flush file system buffers
48
+ Command :: new ( "sync" )
49
+ . status ( )
50
+ . expect ( "Failed to execute sync command" ) ;
51
+ let _ = Command :: new ( "sudo" )
52
+ . args ( [ "sh" , "-c" , "echo 3 > /proc/sys/vm/drop_caches" ] )
53
+ . output ( )
54
+ . map_err ( |e| anyhow:: Error :: msg ( e. to_string ( ) ) ) ?;
55
+
56
+ Ok ( ( ) )
57
+ }
58
+
40
59
#[ tokio:: main( flavor = "multi_thread" ) ]
41
60
pub async fn run_benchmark ( ) -> Result < Json < Value > , anyhow:: Error > {
42
61
let mut session_config = SessionConfig :: from_env ( ) ?. with_information_schema ( true ) ;
@@ -61,10 +80,13 @@ pub async fn run_benchmark() -> Result<Json<Value>, anyhow::Error> {
61
80
let mut table_options = HashMap :: new ( ) ;
62
81
table_options. insert ( "binary_as_string" , "true" ) ;
63
82
64
- let parquet_file = env:: var ( "PARQUET_FILE" ) ?;
83
+ let parquet_file = env:: var ( PARQUET_FILE )
84
+ . map_err ( |_| anyhow:: anyhow!( "PARQUET_FILE environment variable not set. Please set it to the path of the hits.parquet file." ) ) ?;
65
85
register_hits ( & ctx, & parquet_file) . await ?;
86
+ println ! ( "hits registered" ) ;
66
87
let mut query_list = Vec :: new ( ) ;
67
- let queries_file = env:: var ( "QUERIES_FILE" ) ?;
88
+ let queries_file = env:: var ( QUERIES_FILE )
89
+ . map_err ( |_| anyhow:: anyhow!( "QUERIES_FILE environment variable not set. Please set it to the path of the queries file." ) ) ?;
68
90
let queries = fs:: read_to_string ( queries_file) ?;
69
91
for query in queries. lines ( ) {
70
92
query_list. push ( query. to_string ( ) ) ;
@@ -73,7 +95,7 @@ pub async fn run_benchmark() -> Result<Json<Value>, anyhow::Error> {
73
95
}
74
96
75
97
async fn register_hits ( ctx : & SessionContext , parquet_file : & str ) -> Result < ( ) , anyhow:: Error > {
76
- let options: ParquetReadOptions < ' _ > = Default :: default ( ) ;
98
+ let options: ParquetReadOptions < ' _ > = ParquetReadOptions :: default ( ) ;
77
99
ctx. register_parquet ( "hits" , parquet_file, options)
78
100
. await
79
101
. map_err ( |e| {
@@ -87,34 +109,53 @@ pub async fn execute_queries(
87
109
query_list : Vec < String > ,
88
110
) -> Result < Json < Value > , anyhow:: Error > {
89
111
const TRIES : usize = 3 ;
90
- let mut results = Vec :: new ( ) ;
112
+ let mut results = Vec :: with_capacity ( query_list. len ( ) ) ;
113
+ let mut query_count = 1 ;
114
+ let mut total_elapsed_per_iteration = [ 0.0 ; TRIES ] ;
91
115
92
- for sql in query_list. iter ( ) {
93
- let mut elapsed_times = Vec :: new ( ) ;
94
- for _iteration in 1 ..=TRIES {
116
+ for ( query_index , sql) in query_list. iter ( ) . enumerate ( ) {
117
+ let mut elapsed_times = Vec :: with_capacity ( TRIES ) ;
118
+ for iteration in 1 ..=TRIES {
95
119
let start = Instant :: now ( ) ;
96
120
let task_ctx = ctx. task_ctx ( ) ;
97
121
let dialect = & task_ctx. session_config ( ) . options ( ) . sql_parser . dialect ;
98
122
let dialect = dialect_from_str ( dialect) . ok_or_else ( || {
99
123
plan_datafusion_err ! (
100
124
"Unsupported SQL dialect: {dialect}. Available dialects: \
101
- Generic, MySQL, PostgreSQL, Hive, SQLite, Snowflake, Redshift, \
102
- MsSQL, ClickHouse, BigQuery, Ansi."
125
+ Generic, MySQL, PostgreSQL, Hive, SQLite, Snowflake, Redshift, \
126
+ MsSQL, ClickHouse, BigQuery, Ansi."
103
127
)
104
128
} ) ?;
105
129
106
130
let statements = DFParser :: parse_sql_with_dialect ( sql, dialect. as_ref ( ) ) ?;
107
- let statement = statements. front ( ) . unwrap ( ) ;
131
+ let statement = statements
132
+ . front ( )
133
+ . ok_or_else ( || anyhow:: anyhow!( "No SQL statement found in query: {}" , sql) ) ?;
108
134
let plan = ctx. state ( ) . statement_to_plan ( statement. clone ( ) ) . await ?;
109
135
110
136
let df = ctx. execute_logical_plan ( plan) . await ?;
111
137
let physical_plan = df. create_physical_plan ( ) . await ?;
112
138
113
139
let _ = collect ( physical_plan, task_ctx. clone ( ) ) . await ?;
114
140
let elapsed = start. elapsed ( ) . as_secs_f64 ( ) ;
141
+ total_elapsed_per_iteration[ iteration - 1 ] += elapsed;
142
+
143
+ warn ! ( "query {query_count} iteration {iteration} completed in {elapsed} secs" ) ;
115
144
elapsed_times. push ( elapsed) ;
116
145
}
117
- results. push ( elapsed_times) ;
146
+ query_count += 1 ;
147
+ results. push ( json ! ( {
148
+ "query_index" : query_index,
149
+ "query" : sql,
150
+ "elapsed_times" : elapsed_times
151
+ } ) ) ;
152
+ }
153
+ for ( iteration, total_elapsed) in total_elapsed_per_iteration. iter ( ) . enumerate ( ) {
154
+ warn ! (
155
+ "Total time for iteration {}: {} seconds" ,
156
+ iteration + 1 ,
157
+ total_elapsed
158
+ ) ;
118
159
}
119
160
120
161
let result_json = json ! ( results) ;
0 commit comments