From bde0dd1e65a08768c63fbc2c0cb20081348bfb35 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Sun, 12 Jan 2025 20:55:48 +0530 Subject: [PATCH 01/17] refactor: remove method that doesn't need to be --- src/query/mod.rs | 134 +++++++++++++++++++++++------------------------ 1 file changed, 65 insertions(+), 69 deletions(-) diff --git a/src/query/mod.rs b/src/query/mod.rs index b86573d27..c847cb9e0 100644 --- a/src/query/mod.rs +++ b/src/query/mod.rs @@ -44,11 +44,73 @@ pub use self::stream_schema_provider::PartialTimeFilter; use crate::event; use crate::metadata::STREAM_INFO; use crate::option::CONFIG; -use crate::storage::{ObjectStorageProvider, StorageDir}; +use crate::storage::StorageDir; use crate::utils::time::TimeRange; -pub static QUERY_SESSION: Lazy = - Lazy::new(|| Query::create_session_context(CONFIG.storage())); +pub static QUERY_SESSION: Lazy = Lazy::new(|| { + let storage = CONFIG.storage(); + let (pool_size, fraction) = match CONFIG.parseable.query_memory_pool_size { + Some(size) => (size, 1.), + None => { + let mut system = System::new(); + system.refresh_memory(); + let available_mem = system.available_memory(); + (available_mem as usize, 0.85) + } + }; + + let runtime_config = storage + .get_datafusion_runtime() + .with_disk_manager(DiskManagerConfig::NewOs) + .with_memory_limit(pool_size, fraction); + let runtime = Arc::new(runtime_config.build().unwrap()); + + let mut config = SessionConfig::default() + .with_parquet_pruning(true) + .with_prefer_existing_sort(true) + .with_round_robin_repartition(true); + + // For more details refer https://datafusion.apache.org/user-guide/configs.html + + // Reduce the number of rows read (if possible) + config.options_mut().execution.parquet.enable_page_index = true; + + // Pushdown filters allows DF to push the filters as far down in the plan as possible + // and thus, reducing the number of rows decoded + config.options_mut().execution.parquet.pushdown_filters = true; + + // Reorder filters allows DF to decide the order of filters minimizing the cost of filter evaluation + config.options_mut().execution.parquet.reorder_filters = true; + + // Enable StringViewArray + // https://www.influxdata.com/blog/faster-queries-with-stringview-part-one-influxdb/ + config + .options_mut() + .execution + .parquet + .schema_force_view_types = true; + + let state = SessionStateBuilder::new() + .with_default_features() + .with_config(config) + .with_runtime_env(runtime) + .build(); + + let schema_provider = Arc::new(GlobalSchemaProvider { + storage: storage.get_object_store(), + }); + state + .catalog_list() + .catalog(&state.config_options().catalog.default_catalog) + .expect("default catalog is provided by datafusion") + .register_schema( + &state.config_options().catalog.default_schema, + schema_provider, + ) + .unwrap(); + + SessionContext::new_with_state(state) +}); // A query request by client #[derive(Debug)] @@ -59,72 +121,6 @@ pub struct Query { } impl Query { - // create session context for this query - pub fn create_session_context(storage: Arc) -> SessionContext { - let runtime_config = storage - .get_datafusion_runtime() - .with_disk_manager(DiskManagerConfig::NewOs); - - let (pool_size, fraction) = match CONFIG.parseable.query_memory_pool_size { - Some(size) => (size, 1.), - None => { - let mut system = System::new(); - system.refresh_memory(); - let available_mem = system.available_memory(); - (available_mem as usize, 0.85) - } - }; - - let runtime_config = runtime_config.with_memory_limit(pool_size, fraction); - let runtime = Arc::new(runtime_config.build().unwrap()); - - let mut config = SessionConfig::default() - .with_parquet_pruning(true) - .with_prefer_existing_sort(true) - .with_round_robin_repartition(true); - - // For more details refer https://datafusion.apache.org/user-guide/configs.html - - // Reduce the number of rows read (if possible) - config.options_mut().execution.parquet.enable_page_index = true; - - // Pushdown filters allows DF to push the filters as far down in the plan as possible - // and thus, reducing the number of rows decoded - config.options_mut().execution.parquet.pushdown_filters = true; - - // Reorder filters allows DF to decide the order of filters minimizing the cost of filter evaluation - config.options_mut().execution.parquet.reorder_filters = true; - - // Enable StringViewArray - // https://www.influxdata.com/blog/faster-queries-with-stringview-part-one-influxdb/ - config - .options_mut() - .execution - .parquet - .schema_force_view_types = true; - - let state = SessionStateBuilder::new() - .with_default_features() - .with_config(config) - .with_runtime_env(runtime) - .build(); - - let schema_provider = Arc::new(GlobalSchemaProvider { - storage: storage.get_object_store(), - }); - state - .catalog_list() - .catalog(&state.config_options().catalog.default_catalog) - .expect("default catalog is provided by datafusion") - .register_schema( - &state.config_options().catalog.default_schema, - schema_provider, - ) - .unwrap(); - - SessionContext::new_with_state(state) - } - pub async fn execute( &self, stream_name: String, From 918105caba44689f4227ea2921c5e7c705eb5f70 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Sun, 12 Jan 2025 21:25:20 +0530 Subject: [PATCH 02/17] unnecessary allocation --- src/handlers/http/modal/utils/logstream_utils.rs | 6 ++---- src/storage/mod.rs | 2 +- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/src/handlers/http/modal/utils/logstream_utils.rs b/src/handlers/http/modal/utils/logstream_utils.rs index 3543198d0..20d8302e7 100644 --- a/src/handlers/http/modal/utils/logstream_utils.rs +++ b/src/handlers/http/modal/utils/logstream_utils.rs @@ -32,7 +32,7 @@ use crate::{ metadata::{self, SchemaVersion, STREAM_INFO}, option::{Mode, CONFIG}, static_schema::{convert_static_schema_to_arrow_schema, StaticSchema}, - storage::{LogStream, ObjectStoreFormat, StreamType}, + storage::{ObjectStoreFormat, StreamType}, validator, }; @@ -443,9 +443,7 @@ pub async fn create_stream_and_schema_from_storage(stream_name: &str) -> Result< // Proceed to create log stream if it doesn't exist let storage = CONFIG.storage().get_object_store(); let streams = storage.list_streams().await?; - if streams.contains(&LogStream { - name: stream_name.to_owned(), - }) { + if streams.iter().any(|stream| stream.name == stream_name) { let mut stream_metadata = ObjectStoreFormat::default(); let stream_metadata_bytes = storage.create_stream_from_ingestor(stream_name).await?; if !stream_metadata_bytes.is_empty() { diff --git a/src/storage/mod.rs b/src/storage/mod.rs index 573800812..2b2a99128 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -209,7 +209,7 @@ impl Default for ObjectStoreFormat { } } -#[derive(serde::Serialize, PartialEq, Debug)] +#[derive(serde::Serialize, Debug)] pub struct LogStream { pub name: String, } From 29ca7426549d9a32ec65b6782c907723c7b9e01e Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Sun, 12 Jan 2025 22:03:35 +0530 Subject: [PATCH 03/17] refactor: serde params --- src/handlers/http/query.rs | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/src/handlers/http/query.rs b/src/handlers/http/query.rs index 7cecb1416..1ec3ccc4b 100644 --- a/src/handlers/http/query.rs +++ b/src/handlers/http/query.rs @@ -25,7 +25,6 @@ use datafusion::error::DataFusionError; use datafusion::execution::context::SessionState; use futures_util::Future; use http::StatusCode; -use std::collections::HashMap; use std::pin::Pin; use std::sync::Arc; use std::time::Instant; @@ -153,13 +152,22 @@ pub async fn create_streams_for_querier() { } } +#[derive(Debug, Default, serde::Deserialize)] +#[serde(rename_all = "camelCase")] +struct QueryParams { + #[serde(default)] + fields: bool, + #[serde(default)] + send_null: bool, +} + impl FromRequest for Query { type Error = actix_web::Error; type Future = Pin>>>; fn from_request(req: &HttpRequest, payload: &mut actix_web::dev::Payload) -> Self::Future { let query = Json::::from_request(req, payload); - let params = web::Query::>::from_request(req, payload) + let params = web::Query::::from_request(req, payload) .into_inner() .map(|x| x.0) .unwrap_or_default(); @@ -167,10 +175,10 @@ impl FromRequest for Query { let fut = async move { let mut query = query.await?.into_inner(); // format output json to include field names - query.fields = params.get("fields").cloned().unwrap_or(false); + query.fields = params.fields; if !query.send_null { - query.send_null = params.get("sendNull").cloned().unwrap_or(false); + query.send_null = params.send_null; } Ok(query) From beb1137433ff42f74791a057f5a4dc2891dbabfc Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Sun, 12 Jan 2025 22:26:15 +0530 Subject: [PATCH 04/17] refactor: serde flatten --- src/handlers/http/query.rs | 61 +++++++++++++++++++------------------- 1 file changed, 30 insertions(+), 31 deletions(-) diff --git a/src/handlers/http/query.rs b/src/handlers/http/query.rs index 1ec3ccc4b..02500e4c9 100644 --- a/src/handlers/http/query.rs +++ b/src/handlers/http/query.rs @@ -50,21 +50,38 @@ use crate::utils::user_auth_for_query; use super::modal::utils::logstream_utils::create_stream_and_schema_from_storage; -/// Query Request through http endpoint. +/// Can be optionally be accepted as query params in the query request +/// NOTE: ensure that the fields param is not set based on request body +#[derive(Debug, Default, serde::Deserialize, serde::Serialize, Clone)] +#[serde(rename_all = "camelCase")] +pub struct QueryParams { + #[serde(default)] + fields: bool, + #[serde(default)] + send_null: bool, +} + +/// Query Request in json format. #[derive(Debug, serde::Deserialize, serde::Serialize, Clone)] #[serde(rename_all = "camelCase")] pub struct Query { pub query: String, pub start_time: String, pub end_time: String, - #[serde(default)] - pub send_null: bool, - #[serde(skip)] - pub fields: bool, + #[serde(default, flatten)] + pub params: QueryParams, #[serde(skip)] pub filter_tags: Option>, } +impl Query { + // fields param is set based only on query param and send_null is set to true if either body or query param is true + fn update_params(&mut self, QueryParams { fields, send_null }: QueryParams) { + self.params.fields = fields; + self.params.send_null |= send_null; + } +} + pub async fn query(req: HttpRequest, query_request: Query) -> Result { let session_state = QUERY_SESSION.state(); let raw_logical_plan = match session_state @@ -107,8 +124,8 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result>>>; @@ -174,12 +182,7 @@ impl FromRequest for Query { let fut = async move { let mut query = query.await?.into_inner(); - // format output json to include field names - query.fields = params.fields; - - if !query.send_null { - query.send_null = params.send_null; - } + query.update_params(params); Ok(query) }; @@ -227,7 +230,7 @@ fn transform_query_for_ingestor(query: &Query) -> Option { return None; } - let end_time: DateTime = if query.end_time == "now" { + let end_time = if query.end_time == "now" { Utc::now() } else { DateTime::parse_from_rfc3339(&query.end_time) @@ -237,14 +240,10 @@ fn transform_query_for_ingestor(query: &Query) -> Option { let start_time = end_time - chrono::Duration::minutes(1); // when transforming the query, the ingestors are forced to return an array of values - let q = Query { - query: query.query.clone(), - fields: false, - filter_tags: query.filter_tags.clone(), - send_null: query.send_null, - start_time: start_time.to_rfc3339(), - end_time: end_time.to_rfc3339(), - }; + let mut q = query.clone(); + q.params.fields = false; + q.start_time = start_time.to_rfc3339(); + q.end_time = end_time.to_rfc3339(); Some(q) } From 315e99721a12b1c85aede3aa5a61ecd41336d667 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Sun, 12 Jan 2025 23:25:27 +0530 Subject: [PATCH 05/17] refactor: reorganize query construction from request --- src/handlers/airplane.rs | 91 ++++++++------------------ src/handlers/http/mod.rs | 4 +- src/handlers/http/query.rs | 128 ++++++++++++++++++------------------- src/query/mod.rs | 7 +- src/utils/arrow/flight.rs | 6 +- 5 files changed, 99 insertions(+), 137 deletions(-) diff --git a/src/handlers/airplane.rs b/src/handlers/airplane.rs index 899157df9..1f45845bd 100644 --- a/src/handlers/airplane.rs +++ b/src/handlers/airplane.rs @@ -21,7 +21,6 @@ use arrow_flight::flight_service_server::FlightServiceServer; use arrow_flight::PollInfo; use arrow_schema::ArrowError; -use datafusion::common::tree_node::TreeNode; use serde_json::json; use std::net::SocketAddr; use std::time::Instant; @@ -34,17 +33,13 @@ use tonic::transport::{Identity, Server, ServerTlsConfig}; use tonic_web::GrpcWebLayer; use crate::handlers::http::cluster::get_ingestor_info; -use crate::handlers::http::query::{into_query, update_schema_when_distributed}; use crate::handlers::livetail::cross_origin_config; use crate::metrics::QUERY_EXECUTE_TIME; use crate::option::CONFIG; -use crate::query::{TableScanVisitor, QUERY_SESSION}; use crate::utils::arrow::flight::{ append_temporary_events, get_query_from_ticket, into_flight_data, run_do_get_rpc, send_to_ingester, }; -use crate::utils::time::TimeRange; -use crate::utils::user_auth_for_query; use arrow_flight::{ flight_service_server::FlightService, Action, ActionType, Criteria, Empty, FlightData, FlightDescriptor, FlightInfo, HandshakeRequest, HandshakeResponse, PutResult, SchemaAsIpc, @@ -127,55 +122,41 @@ impl FlightService for AirServiceImpl { async fn do_get(&self, req: Request) -> Result, Status> { let key = extract_session_key(req.metadata())?; - let ticket = get_query_from_ticket(&req)?; - - info!("query requested to airplane: {:?}", ticket); - - // get the query session_state - let session_state = QUERY_SESSION.state(); - - // get the logical plan and extract the table name - let raw_logical_plan = session_state - .create_logical_plan(&ticket.query) - .await - .map_err(|err| { - error!("Datafusion Error: Failed to create logical plan: {}", err); - Status::internal("Failed to create logical plan") - })?; + // try authorize + match Users.authorize(key.clone(), rbac::role::Action::Query, None, None) { + rbac::Response::Authorized => (), + rbac::Response::UnAuthorized => { + return Err(Status::permission_denied( + "user is not authorized to access this resource", + )) + } + rbac::Response::ReloadRequired => { + return Err(Status::unauthenticated("reload required")) + } + } - let time_range = TimeRange::parse_human_time(&ticket.start_time, &ticket.end_time) - .map_err(|e| Status::internal(e.to_string()))?; - // create a visitor to extract the table name - let mut visitor = TableScanVisitor::default(); - let _ = raw_logical_plan.visit(&mut visitor); + let query_request = get_query_from_ticket(&req)?; + info!("query requested to airplane: {:?}", query_request); - let streams = visitor.into_inner(); + // map payload to query + let query = query_request.into_query(key).await.map_err(|e| { + error!("Error faced while constructing logical query: {e}"); + Status::internal(format!("Failed to process query: {e}")) + })?; - let stream_name = streams + let stream_names = query.stream_names(); + let stream_name = stream_names .first() - .ok_or_else(|| Status::aborted("Malformed SQL Provided, Table Name Not Found"))? - .to_owned(); - - update_schema_when_distributed(&streams) - .await - .map_err(|err| Status::internal(err.to_string()))?; - - // map payload to query - let query = into_query(&ticket, &session_state, time_range) - .await - .map_err(|_| Status::internal("Failed to parse query"))?; + .ok_or_else(|| Status::internal("Failed to get stream name from query"))?; let event = if send_to_ingester( query.time_range.start.timestamp_millis(), query.time_range.end.timestamp_millis(), ) { - let sql = format!("select * from {}", &stream_name); - let start_time = ticket.start_time.clone(); - let end_time = ticket.end_time.clone(); let out_ticket = json!({ - "query": sql, - "startTime": start_time, - "endTime": end_time + "query": format!("select * from {stream_name}"), + "startTime": query_request.start_time, + "endTime": query_request.end_time, }) .to_string(); @@ -190,30 +171,12 @@ impl FlightService for AirServiceImpl { } } let mr = minute_result.iter().collect::>(); - let event = append_temporary_events(&stream_name, mr).await?; + let event = append_temporary_events(stream_name, mr).await?; Some(event) } else { None }; - // try authorize - match Users.authorize(key.clone(), rbac::role::Action::Query, None, None) { - rbac::Response::Authorized => (), - rbac::Response::UnAuthorized => { - return Err(Status::permission_denied( - "user is not authorized to access this resource", - )) - } - rbac::Response::ReloadRequired => { - return Err(Status::unauthenticated("reload required")) - } - } - - let permissions = Users.get_permissions(&key); - - user_auth_for_query(&permissions, &streams).map_err(|_| { - Status::permission_denied("User Does not have permission to access this") - })?; let time = Instant::now(); let (records, _) = query .execute(stream_name.clone()) @@ -234,7 +197,7 @@ impl FlightService for AirServiceImpl { let out = into_flight_data(records); if let Some(event) = event { - event.clear(&stream_name); + event.clear(stream_name); } let time = time.elapsed().as_secs_f64(); diff --git a/src/handlers/http/mod.rs b/src/handlers/http/mod.rs index 8d8db14c9..878ec8e9e 100644 --- a/src/handlers/http/mod.rs +++ b/src/handlers/http/mod.rs @@ -25,7 +25,7 @@ use serde_json::Value; use crate::{option::CONFIG, storage::STREAM_ROOT_DIRECTORY, HTTP_CLIENT}; -use self::{cluster::get_ingestor_info, query::Query}; +use self::{cluster::get_ingestor_info, query::QueryRequest}; pub mod about; mod audit; @@ -97,7 +97,7 @@ pub async fn fetch_schema(stream_name: &str) -> anyhow::Result anyhow::Result> { +pub async fn send_query_request_to_ingestor(query: &QueryRequest) -> anyhow::Result> { // send the query request to the ingestor let mut res = vec![]; let ima = get_ingestor_info().await?; diff --git a/src/handlers/http/query.rs b/src/handlers/http/query.rs index 02500e4c9..6154cf0d4 100644 --- a/src/handlers/http/query.rs +++ b/src/handlers/http/query.rs @@ -22,7 +22,6 @@ use actix_web::{FromRequest, HttpRequest, Responder}; use chrono::{DateTime, Utc}; use datafusion::common::tree_node::TreeNode; use datafusion::error::DataFusionError; -use datafusion::execution::context::SessionState; use futures_util::Future; use http::StatusCode; use std::pin::Pin; @@ -40,6 +39,7 @@ use crate::option::{Mode, CONFIG}; use crate::query::error::ExecuteError; use crate::query::Query as LogicalQuery; use crate::query::{TableScanVisitor, QUERY_SESSION}; +use crate::rbac::map::SessionKey; use crate::rbac::Users; use crate::response::QueryResponse; use crate::storage::object_storage::commit_schema_to_storage; @@ -64,7 +64,7 @@ pub struct QueryParams { /// Query Request in json format. #[derive(Debug, serde::Deserialize, serde::Serialize, Clone)] #[serde(rename_all = "camelCase")] -pub struct Query { +pub struct QueryRequest { pub query: String, pub start_time: String, pub end_time: String, @@ -74,52 +74,71 @@ pub struct Query { pub filter_tags: Option>, } -impl Query { +impl QueryRequest { // fields param is set based only on query param and send_null is set to true if either body or query param is true fn update_params(&mut self, QueryParams { fields, send_null }: QueryParams) { self.params.fields = fields; self.params.send_null |= send_null; } -} -pub async fn query(req: HttpRequest, query_request: Query) -> Result { - let session_state = QUERY_SESSION.state(); - let raw_logical_plan = match session_state - .create_logical_plan(&query_request.query) - .await - { - Ok(raw_logical_plan) => raw_logical_plan, - Err(_) => { - //if logical plan creation fails, create streams and try again - create_streams_for_querier().await; - session_state - .create_logical_plan(&query_request.query) - .await? + // Constructs a query from the + pub async fn into_query(&self, key: SessionKey) -> Result { + if self.query.is_empty() { + return Err(QueryError::EmptyQuery); } - }; - let time_range = - TimeRange::parse_human_time(&query_request.start_time, &query_request.end_time)?; + if self.start_time.is_empty() { + return Err(QueryError::EmptyStartTime); + } - // create a visitor to extract the table names present in query - let mut visitor = TableScanVisitor::default(); - let _ = raw_logical_plan.visit(&mut visitor); + if self.end_time.is_empty() { + return Err(QueryError::EmptyEndTime); + } - let tables = visitor.into_inner(); - update_schema_when_distributed(&tables).await?; - let query: LogicalQuery = into_query(&query_request, &session_state, time_range).await?; + let session_state = QUERY_SESSION.state(); + let raw_logical_plan = match session_state.create_logical_plan(&self.query).await { + Ok(raw_logical_plan) => raw_logical_plan, + Err(_) => { + //if logical plan creation fails, create streams and try again + create_streams_for_querier().await; + session_state.create_logical_plan(&self.query).await? + } + }; - let creds = extract_session_key_from_req(&req)?; - let permissions = Users.get_permissions(&creds); + // create a visitor to extract the table names present in query + let mut visitor = TableScanVisitor::default(); + let _ = raw_logical_plan.visit(&mut visitor); + let stream_names = visitor.into_inner(); + let permissions = Users.get_permissions(&key); + user_auth_for_query(&permissions, &stream_names)?; - let table_name = query - .first_table_name() - .ok_or_else(|| QueryError::MalformedQuery("No table name found in query"))?; + update_schema_when_distributed(&stream_names).await?; + + let time_range = TimeRange::parse_human_time(&self.start_time, &self.end_time)?; - user_auth_for_query(&permissions, &tables)?; + Ok(crate::query::Query { + raw_logical_plan: session_state.create_logical_plan(&self.query).await?, + time_range, + filter_tag: self.filter_tags.clone(), + stream_names, + }) + } +} + +pub async fn query( + req: HttpRequest, + query_request: QueryRequest, +) -> Result { + let key = extract_session_key_from_req(&req)?; + let query = query_request.into_query(key).await?; + + let stream_names = query.stream_names(); + let first_stream_name = stream_names + .first() + .ok_or_else(|| QueryError::MalformedQuery("No table name found in query"))?; let time = Instant::now(); - let (records, fields) = query.execute(table_name.clone()).await?; + let (records, fields) = query.execute(first_stream_name.clone()).await?; let response = QueryResponse { records, @@ -132,7 +151,7 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result) -> Result<(), /// create streams for memory from storage if they do not exist pub async fn create_streams_for_querier() { let querier_streams = STREAM_INFO.list_streams(); - let store = CONFIG.storage().get_object_store(); - let storage_streams = store.list_streams().await.unwrap(); - for stream in storage_streams { + + for stream in CONFIG + .storage() + .get_object_store() + .list_streams() + .await + .unwrap() + { let stream_name = stream.name; if !querier_streams.contains(&stream_name) { @@ -169,12 +193,12 @@ pub async fn create_streams_for_querier() { } } -impl FromRequest for Query { +impl FromRequest for QueryRequest { type Error = actix_web::Error; type Future = Pin>>>; fn from_request(req: &HttpRequest, payload: &mut actix_web::dev::Payload) -> Self::Future { - let query = Json::::from_request(req, payload); + let query = Json::::from_request(req, payload); let params = web::Query::::from_request(req, payload) .into_inner() .map(|x| x.0) @@ -191,33 +215,9 @@ impl FromRequest for Query { } } -pub async fn into_query( - query: &Query, - session_state: &SessionState, - time_range: TimeRange, -) -> Result { - if query.query.is_empty() { - return Err(QueryError::EmptyQuery); - } - - if query.start_time.is_empty() { - return Err(QueryError::EmptyStartTime); - } - - if query.end_time.is_empty() { - return Err(QueryError::EmptyEndTime); - } - - Ok(crate::query::Query { - raw_logical_plan: session_state.create_logical_plan(&query.query).await?, - time_range, - filter_tag: query.filter_tags.clone(), - }) -} - /// unused for now, might need it in the future #[allow(unused)] -fn transform_query_for_ingestor(query: &Query) -> Option { +fn transform_query_for_ingestor(query: &QueryRequest) -> Option { if query.query.is_empty() { return None; } diff --git a/src/query/mod.rs b/src/query/mod.rs index c847cb9e0..95b5c52b1 100644 --- a/src/query/mod.rs +++ b/src/query/mod.rs @@ -118,6 +118,7 @@ pub struct Query { pub raw_logical_plan: LogicalPlan, pub time_range: TimeRange, pub filter_tag: Option>, + pub stream_names: Vec, } impl Query { @@ -184,10 +185,8 @@ impl Query { } } - pub fn first_table_name(&self) -> Option { - let mut visitor = TableScanVisitor::default(); - let _ = self.raw_logical_plan.visit(&mut visitor); - visitor.into_inner().pop() + pub fn stream_names(&self) -> &[String] { + &self.stream_names } } diff --git a/src/utils/arrow/flight.rs b/src/utils/arrow/flight.rs index 6781f2d6e..525ba36a9 100644 --- a/src/utils/arrow/flight.rs +++ b/src/utils/arrow/flight.rs @@ -18,7 +18,7 @@ use crate::event::Event; use crate::handlers::http::ingest::push_logs_unchecked; -use crate::handlers::http::query::Query as QueryJson; +use crate::handlers::http::query::QueryRequest; use crate::metadata::STREAM_INFO; use crate::query::stream_schema_provider::include_now; use crate::{ @@ -44,8 +44,8 @@ use tonic::transport::{Channel, Uri}; pub type DoGetStream = stream::BoxStream<'static, Result>; -pub fn get_query_from_ticket(req: &Request) -> Result { - serde_json::from_slice::(&req.get_ref().ticket) +pub fn get_query_from_ticket(req: &Request) -> Result { + serde_json::from_slice::(&req.get_ref().ticket) .map_err(|err| Status::internal(err.to_string())) } From 1ac8977f2bfe780101380b6669b31ea0398fc482 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Mon, 13 Jan 2025 00:24:46 +0530 Subject: [PATCH 06/17] refactor: reorganize query plan transformation and execution --- src/handlers/airplane.rs | 7 +- src/handlers/http/query.rs | 8 +- src/query/mod.rs | 146 ++++++++++++++++++------------------- 3 files changed, 76 insertions(+), 85 deletions(-) diff --git a/src/handlers/airplane.rs b/src/handlers/airplane.rs index 1f45845bd..3045d52ad 100644 --- a/src/handlers/airplane.rs +++ b/src/handlers/airplane.rs @@ -144,9 +144,8 @@ impl FlightService for AirServiceImpl { Status::internal(format!("Failed to process query: {e}")) })?; - let stream_names = query.stream_names(); - let stream_name = stream_names - .first() + let stream_name = query + .first_stream_name() .ok_or_else(|| Status::internal("Failed to get stream name from query"))?; let event = if send_to_ingester( @@ -179,7 +178,7 @@ impl FlightService for AirServiceImpl { let time = Instant::now(); let (records, _) = query - .execute(stream_name.clone()) + .execute() .await .map_err(|err| Status::internal(err.to_string()))?; diff --git a/src/handlers/http/query.rs b/src/handlers/http/query.rs index 6154cf0d4..5085b7c84 100644 --- a/src/handlers/http/query.rs +++ b/src/handlers/http/query.rs @@ -131,14 +131,12 @@ pub async fn query( ) -> Result { let key = extract_session_key_from_req(&req)?; let query = query_request.into_query(key).await?; - - let stream_names = query.stream_names(); - let first_stream_name = stream_names - .first() + let first_stream_name = query + .first_stream_name() .ok_or_else(|| QueryError::MalformedQuery("No table name found in query"))?; let time = Instant::now(); - let (records, fields) = query.execute(first_stream_name.clone()).await?; + let (records, fields) = query.execute().await?; let response = QueryResponse { records, diff --git a/src/query/mod.rs b/src/query/mod.rs index 95b5c52b1..af263cd4b 100644 --- a/src/query/mod.rs +++ b/src/query/mod.rs @@ -122,15 +122,10 @@ pub struct Query { } impl Query { - pub async fn execute( - &self, - stream_name: String, - ) -> Result<(Vec, Vec), ExecuteError> { - let time_partition = STREAM_INFO.get_time_partition(&stream_name)?; - - let df = QUERY_SESSION - .execute_logical_plan(self.final_logical_plan(&time_partition)) - .await?; + pub async fn execute(&self) -> Result<(Vec, Vec), ExecuteError> { + let time_partitions = self.get_time_partitions()?; + let logical_plan = self.final_logical_plan(time_partitions); + let df = QUERY_SESSION.execute_logical_plan(logical_plan).await?; let fields = df .schema() @@ -148,8 +143,21 @@ impl Query { Ok((results, fields)) } + /// Get the time partitions for the streams mentioned in the query + fn get_time_partitions(&self) -> Result, ExecuteError> { + let mut time_partitions = HashMap::default(); + for stream_name in self.stream_names.iter() { + let Some(time_partition) = STREAM_INFO.get_time_partition(stream_name)? else { + continue; + }; + time_partitions.insert(stream_name.clone(), time_partition); + } + + Ok(time_partitions) + } + /// return logical plan with all time filters applied through - fn final_logical_plan(&self, time_partition: &Option) -> LogicalPlan { + fn final_logical_plan(&self, time_partitions: HashMap) -> LogicalPlan { // see https://github.com/apache/arrow-datafusion/pull/8400 // this can be eliminated in later version of datafusion but with slight caveat // transform cannot modify stringified plans by itself @@ -161,7 +169,7 @@ impl Query { plan.plan.as_ref().clone(), self.time_range.start.naive_utc(), self.time_range.end.naive_utc(), - time_partition, + &time_partitions, ); LogicalPlan::Explain(Explain { verbose: plan.verbose, @@ -178,15 +186,16 @@ impl Query { x, self.time_range.start.naive_utc(), self.time_range.end.naive_utc(), - time_partition, + &time_partitions, ) .data } } } - pub fn stream_names(&self) -> &[String] { - &self.stream_names + // name of the main/first stream in the query + pub fn first_stream_name(&self) -> Option<&String> { + self.stream_names.first() } } @@ -223,80 +232,65 @@ fn transform( plan: LogicalPlan, start_time: NaiveDateTime, end_time: NaiveDateTime, - time_partition: &Option, + time_partitions: &HashMap, ) -> Transformed { - plan.transform(&|plan| match plan { - LogicalPlan::TableScan(table) => { - let mut new_filters = vec![]; - if !table_contains_any_time_filters(&table, time_partition) { - let mut _start_time_filter: Expr; - let mut _end_time_filter: Expr; - match time_partition { - Some(time_partition) => { - _start_time_filter = - PartialTimeFilter::Low(std::ops::Bound::Included(start_time)) - .binary_expr(Expr::Column(Column::new( - Some(table.table_name.to_owned()), - time_partition.clone(), - ))); - _end_time_filter = - PartialTimeFilter::High(std::ops::Bound::Excluded(end_time)) - .binary_expr(Expr::Column(Column::new( - Some(table.table_name.to_owned()), - time_partition, - ))); - } - None => { - _start_time_filter = - PartialTimeFilter::Low(std::ops::Bound::Included(start_time)) - .binary_expr(Expr::Column(Column::new( - Some(table.table_name.to_owned()), - event::DEFAULT_TIMESTAMP_KEY, - ))); - _end_time_filter = - PartialTimeFilter::High(std::ops::Bound::Excluded(end_time)) - .binary_expr(Expr::Column(Column::new( - Some(table.table_name.to_owned()), - event::DEFAULT_TIMESTAMP_KEY, - ))); - } - } - - new_filters.push(_start_time_filter); - new_filters.push(_end_time_filter); - } - let new_filter = new_filters.into_iter().reduce(and); - if let Some(new_filter) = new_filter { - let filter = - Filter::try_new(new_filter, Arc::new(LogicalPlan::TableScan(table))).unwrap(); - Ok(Transformed::yes(LogicalPlan::Filter(filter))) - } else { - Ok(Transformed::no(LogicalPlan::TableScan(table))) - } + plan.transform(|plan| { + let LogicalPlan::TableScan(table) = plan else { + return Ok(Transformed::no(plan)); + }; + + // Early return if filters already exist + if query_can_be_filtered_on_stream_time_partition(&table, time_partitions) { + return Ok(Transformed::no(LogicalPlan::TableScan(table))); } - x => Ok(Transformed::no(x)), + + let stream = table.table_name.clone(); + let time_partition = time_partitions + .get(stream.table()) + .map(|x| x.as_str()) + .unwrap_or(event::DEFAULT_TIMESTAMP_KEY); + + // Create column expression once + let column_expr = Expr::Column(Column::new(Some(stream.clone()), time_partition)); + + // Build filters + let low_filter = PartialTimeFilter::Low(std::ops::Bound::Included(start_time)) + .binary_expr(column_expr.clone()); + let high_filter = + PartialTimeFilter::High(std::ops::Bound::Excluded(end_time)).binary_expr(column_expr); + + // Combine filters + let new_filter = and(low_filter, high_filter); + let filter = Filter::try_new(new_filter, Arc::new(LogicalPlan::TableScan(table))).unwrap(); + + Ok(Transformed::yes(LogicalPlan::Filter(filter))) }) .expect("transform only transforms the tablescan") } -fn table_contains_any_time_filters( +// check if the query contains the streams's time partition as filter +fn query_can_be_filtered_on_stream_time_partition( table: &datafusion::logical_expr::TableScan, - time_partition: &Option, + time_partitions: &HashMap, ) -> bool { table .filters .iter() - .filter_map(|x| { - if let Expr::BinaryExpr(binexpr) = x { - Some(binexpr) - } else { - None - } + .filter_map(|x| match x { + Expr::BinaryExpr(binexpr) => Some(binexpr), + _ => None, }) - .any(|expr| { - matches!(&*expr.left, Expr::Column(Column { name, .. }) - if ((time_partition.is_some() && name == time_partition.as_ref().unwrap()) || - (!time_partition.is_some() && name == event::DEFAULT_TIMESTAMP_KEY))) + .any(|expr| match &*expr.left { + Expr::Column(Column { + name: column_name, .. + }) => { + time_partitions + .get(table.table_name.table()) + .map(|x| x.as_str()) + .unwrap_or(event::DEFAULT_TIMESTAMP_KEY) + == column_name + } + _ => false, }) } From c4b676dc4cfc1f813324bda5d02b1cbcae03867c Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Mon, 13 Jan 2025 00:42:18 +0530 Subject: [PATCH 07/17] get rid of reconstruction and comment the code --- src/query/mod.rs | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) diff --git a/src/query/mod.rs b/src/query/mod.rs index af263cd4b..1cd6a30fd 100644 --- a/src/query/mod.rs +++ b/src/query/mod.rs @@ -228,6 +228,7 @@ impl TreeNodeVisitor<'_> for TableScanVisitor { } } +// transform the plan to apply time filters fn transform( plan: LogicalPlan, start_time: NaiveDateTime, @@ -235,13 +236,13 @@ fn transform( time_partitions: &HashMap, ) -> Transformed { plan.transform(|plan| { - let LogicalPlan::TableScan(table) = plan else { + let LogicalPlan::TableScan(table) = &plan else { return Ok(Transformed::no(plan)); }; // Early return if filters already exist if query_can_be_filtered_on_stream_time_partition(&table, time_partitions) { - return Ok(Transformed::no(LogicalPlan::TableScan(table))); + return Ok(Transformed::no(plan)); } let stream = table.table_name.clone(); @@ -249,9 +250,7 @@ fn transform( .get(stream.table()) .map(|x| x.as_str()) .unwrap_or(event::DEFAULT_TIMESTAMP_KEY); - - // Create column expression once - let column_expr = Expr::Column(Column::new(Some(stream.clone()), time_partition)); + let column_expr = Expr::Column(Column::new(Some(stream), time_partition)); // Build filters let low_filter = PartialTimeFilter::Low(std::ops::Bound::Included(start_time)) @@ -259,11 +258,9 @@ fn transform( let high_filter = PartialTimeFilter::High(std::ops::Bound::Excluded(end_time)).binary_expr(column_expr); - // Combine filters - let new_filter = and(low_filter, high_filter); - let filter = Filter::try_new(new_filter, Arc::new(LogicalPlan::TableScan(table))).unwrap(); - - Ok(Transformed::yes(LogicalPlan::Filter(filter))) + Ok(Transformed::yes(LogicalPlan::Filter( + Filter::try_new(and(low_filter, high_filter), Arc::new(plan)).unwrap(), + ))) }) .expect("transform only transforms the tablescan") } From 483eaaeb36637effec3d70b2f450a10b3043dc68 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Mon, 13 Jan 2025 11:21:22 +0530 Subject: [PATCH 08/17] ci: deepsource fix --- src/query/mod.rs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/src/query/mod.rs b/src/query/mod.rs index 1cd6a30fd..46a0aad98 100644 --- a/src/query/mod.rs +++ b/src/query/mod.rs @@ -248,8 +248,7 @@ fn transform( let stream = table.table_name.clone(); let time_partition = time_partitions .get(stream.table()) - .map(|x| x.as_str()) - .unwrap_or(event::DEFAULT_TIMESTAMP_KEY); + .map_or(event::DEFAULT_TIMESTAMP_KEY, |x| x.as_str()); let column_expr = Expr::Column(Column::new(Some(stream), time_partition)); // Build filters @@ -283,8 +282,7 @@ fn query_can_be_filtered_on_stream_time_partition( }) => { time_partitions .get(table.table_name.table()) - .map(|x| x.as_str()) - .unwrap_or(event::DEFAULT_TIMESTAMP_KEY) + .map_or(event::DEFAULT_TIMESTAMP_KEY, |x| x.as_str()) == column_name } _ => false, From a8d32d1d15180ef91f7ec4b21dcf010642e1f7f4 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Mon, 13 Jan 2025 12:09:47 +0530 Subject: [PATCH 09/17] clippy fix --- src/query/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/query/mod.rs b/src/query/mod.rs index 46a0aad98..698a920fd 100644 --- a/src/query/mod.rs +++ b/src/query/mod.rs @@ -241,7 +241,7 @@ fn transform( }; // Early return if filters already exist - if query_can_be_filtered_on_stream_time_partition(&table, time_partitions) { + if query_can_be_filtered_on_stream_time_partition(table, time_partitions) { return Ok(Transformed::no(plan)); } From 28de72eb44e452c1dd95362acdb551c9883710a7 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Mon, 13 Jan 2025 12:14:39 +0530 Subject: [PATCH 10/17] fix: don't reconstruct the plan --- src/handlers/http/query.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/handlers/http/query.rs b/src/handlers/http/query.rs index 5085b7c84..58cd28940 100644 --- a/src/handlers/http/query.rs +++ b/src/handlers/http/query.rs @@ -117,7 +117,7 @@ impl QueryRequest { let time_range = TimeRange::parse_human_time(&self.start_time, &self.end_time)?; Ok(crate::query::Query { - raw_logical_plan: session_state.create_logical_plan(&self.query).await?, + raw_logical_plan, time_range, filter_tag: self.filter_tags.clone(), stream_names, From 4325f9fde633f4cc341c7ea43b35115115fca389 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Mon, 13 Jan 2025 12:18:46 +0530 Subject: [PATCH 11/17] refactor: rename `raw_logical_plan` ~> `plan` --- src/handlers/http/query.rs | 17 ++++++++--------- src/query/mod.rs | 4 ++-- 2 files changed, 10 insertions(+), 11 deletions(-) diff --git a/src/handlers/http/query.rs b/src/handlers/http/query.rs index 58cd28940..3fbad7322 100644 --- a/src/handlers/http/query.rs +++ b/src/handlers/http/query.rs @@ -96,18 +96,17 @@ impl QueryRequest { } let session_state = QUERY_SESSION.state(); - let raw_logical_plan = match session_state.create_logical_plan(&self.query).await { - Ok(raw_logical_plan) => raw_logical_plan, - Err(_) => { - //if logical plan creation fails, create streams and try again - create_streams_for_querier().await; - session_state.create_logical_plan(&self.query).await? - } + let plan = if let Ok(plan) = session_state.create_logical_plan(&self.query).await { + plan + } else { + //if logical plan creation fails, create streams and try again + create_streams_for_querier().await; + session_state.create_logical_plan(&self.query).await? }; // create a visitor to extract the table names present in query let mut visitor = TableScanVisitor::default(); - let _ = raw_logical_plan.visit(&mut visitor); + let _ = plan.visit(&mut visitor); let stream_names = visitor.into_inner(); let permissions = Users.get_permissions(&key); user_auth_for_query(&permissions, &stream_names)?; @@ -117,7 +116,7 @@ impl QueryRequest { let time_range = TimeRange::parse_human_time(&self.start_time, &self.end_time)?; Ok(crate::query::Query { - raw_logical_plan, + plan, time_range, filter_tag: self.filter_tags.clone(), stream_names, diff --git a/src/query/mod.rs b/src/query/mod.rs index 698a920fd..76f6e24ab 100644 --- a/src/query/mod.rs +++ b/src/query/mod.rs @@ -115,7 +115,7 @@ pub static QUERY_SESSION: Lazy = Lazy::new(|| { // A query request by client #[derive(Debug)] pub struct Query { - pub raw_logical_plan: LogicalPlan, + pub plan: LogicalPlan, pub time_range: TimeRange, pub filter_tag: Option>, pub stream_names: Vec, @@ -163,7 +163,7 @@ impl Query { // transform cannot modify stringified plans by itself // we by knowing this plan is not in the optimization procees chose to overwrite the stringified plan - match self.raw_logical_plan.clone() { + match self.plan.clone() { LogicalPlan::Explain(plan) => { let transformed = transform( plan.plan.as_ref().clone(), From fba711fa555da09b0bcf657abe997a931bb0b436 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Mon, 13 Jan 2025 13:29:14 +0530 Subject: [PATCH 12/17] doc: transform code purpose --- src/query/mod.rs | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/src/query/mod.rs b/src/query/mod.rs index 76f6e24ab..1b14293d3 100644 --- a/src/query/mod.rs +++ b/src/query/mod.rs @@ -34,6 +34,7 @@ use itertools::Itertools; use once_cell::sync::Lazy; use serde_json::{json, Value}; use std::collections::HashMap; +use std::ops::Bound; use std::path::{Path, PathBuf}; use std::sync::Arc; use sysinfo::System; @@ -162,7 +163,6 @@ impl Query { // this can be eliminated in later version of datafusion but with slight caveat // transform cannot modify stringified plans by itself // we by knowing this plan is not in the optimization procees chose to overwrite the stringified plan - match self.plan.clone() { LogicalPlan::Explain(plan) => { let transformed = transform( @@ -251,15 +251,14 @@ fn transform( .map_or(event::DEFAULT_TIMESTAMP_KEY, |x| x.as_str()); let column_expr = Expr::Column(Column::new(Some(stream), time_partition)); - // Build filters - let low_filter = PartialTimeFilter::Low(std::ops::Bound::Included(start_time)) - .binary_expr(column_expr.clone()); + // Reconstruct plan with start and end time filters + let low_filter = + PartialTimeFilter::Low(Bound::Included(start_time)).binary_expr(column_expr.clone()); let high_filter = - PartialTimeFilter::High(std::ops::Bound::Excluded(end_time)).binary_expr(column_expr); + PartialTimeFilter::High(Bound::Excluded(end_time)).binary_expr(column_expr); + let filter = Filter::try_new(and(low_filter, high_filter), Arc::new(plan))?; - Ok(Transformed::yes(LogicalPlan::Filter( - Filter::try_new(and(low_filter, high_filter), Arc::new(plan)).unwrap(), - ))) + Ok(Transformed::yes(LogicalPlan::Filter(filter))) }) .expect("transform only transforms the tablescan") } From e89221f99f0f8e47fec9e5bccafae38fae2e02ef Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Mon, 13 Jan 2025 13:35:31 +0530 Subject: [PATCH 13/17] refactor: consume, don't clone --- src/handlers/airplane.rs | 12 ++++---- src/handlers/http/query.rs | 5 ++-- src/query/mod.rs | 56 +++++++++++++++++--------------------- 3 files changed, 33 insertions(+), 40 deletions(-) diff --git a/src/handlers/airplane.rs b/src/handlers/airplane.rs index 3045d52ad..4da792ebf 100644 --- a/src/handlers/airplane.rs +++ b/src/handlers/airplane.rs @@ -146,7 +146,8 @@ impl FlightService for AirServiceImpl { let stream_name = query .first_stream_name() - .ok_or_else(|| Status::internal("Failed to get stream name from query"))?; + .ok_or_else(|| Status::internal("Failed to get stream name from query"))? + .to_owned(); let event = if send_to_ingester( query.time_range.start.timestamp_millis(), @@ -170,7 +171,7 @@ impl FlightService for AirServiceImpl { } } let mr = minute_result.iter().collect::>(); - let event = append_temporary_events(stream_name, mr).await?; + let event = append_temporary_events(stream_name.as_str(), mr).await?; Some(event) } else { None @@ -193,18 +194,17 @@ impl FlightService for AirServiceImpl { .collect::>(); let schema = Schema::try_merge(schemas).map_err(|err| Status::internal(err.to_string()))?; */ - let out = into_flight_data(records); if let Some(event) = event { - event.clear(stream_name); + event.clear(&stream_name); } let time = time.elapsed().as_secs_f64(); QUERY_EXECUTE_TIME - .with_label_values(&[&format!("flight-query-{}", stream_name)]) + .with_label_values(&[&stream_name]) .observe(time); - out + into_flight_data(records) } async fn do_put( diff --git a/src/handlers/http/query.rs b/src/handlers/http/query.rs index 3fbad7322..0e9d0065a 100644 --- a/src/handlers/http/query.rs +++ b/src/handlers/http/query.rs @@ -133,6 +133,7 @@ pub async fn query( let first_stream_name = query .first_stream_name() .ok_or_else(|| QueryError::MalformedQuery("No table name found in query"))?; + let histogram = QUERY_EXECUTE_TIME.with_label_values(&[first_stream_name]); let time = Instant::now(); let (records, fields) = query.execute().await?; @@ -147,9 +148,7 @@ pub async fn query( let time = time.elapsed().as_secs_f64(); - QUERY_EXECUTE_TIME - .with_label_values(&[first_stream_name]) - .observe(time); + histogram.observe(time); Ok(response) } diff --git a/src/query/mod.rs b/src/query/mod.rs index 1b14293d3..460995099 100644 --- a/src/query/mod.rs +++ b/src/query/mod.rs @@ -28,7 +28,7 @@ use datafusion::common::tree_node::{Transformed, TreeNode, TreeNodeRecursion, Tr use datafusion::error::DataFusionError; use datafusion::execution::disk_manager::DiskManagerConfig; use datafusion::execution::SessionStateBuilder; -use datafusion::logical_expr::{Explain, Filter, LogicalPlan, PlanType, ToStringifiedPlan}; +use datafusion::logical_expr::{Filter, LogicalPlan, PlanType, ToStringifiedPlan}; use datafusion::prelude::*; use itertools::Itertools; use once_cell::sync::Lazy; @@ -123,7 +123,7 @@ pub struct Query { } impl Query { - pub async fn execute(&self) -> Result<(Vec, Vec), ExecuteError> { + pub async fn execute(self) -> Result<(Vec, Vec), ExecuteError> { let time_partitions = self.get_time_partitions()?; let logical_plan = self.final_logical_plan(time_partitions); let df = QUERY_SESSION.execute_logical_plan(logical_plan).await?; @@ -158,39 +158,33 @@ impl Query { } /// return logical plan with all time filters applied through - fn final_logical_plan(&self, time_partitions: HashMap) -> LogicalPlan { + fn final_logical_plan(self, time_partitions: HashMap) -> LogicalPlan { // see https://github.com/apache/arrow-datafusion/pull/8400 // this can be eliminated in later version of datafusion but with slight caveat // transform cannot modify stringified plans by itself // we by knowing this plan is not in the optimization procees chose to overwrite the stringified plan - match self.plan.clone() { - LogicalPlan::Explain(plan) => { - let transformed = transform( - plan.plan.as_ref().clone(), - self.time_range.start.naive_utc(), - self.time_range.end.naive_utc(), - &time_partitions, - ); - LogicalPlan::Explain(Explain { - verbose: plan.verbose, - stringified_plans: vec![transformed - .data - .to_stringified(PlanType::InitialLogicalPlan)], - plan: Arc::new(transformed.data), - schema: plan.schema, - logical_optimization_succeeded: plan.logical_optimization_succeeded, - }) - } - x => { - transform( - x, - self.time_range.start.naive_utc(), - self.time_range.end.naive_utc(), - &time_partitions, - ) - .data - } - } + let LogicalPlan::Explain(mut explain) = self.plan else { + return transform( + self.plan, + self.time_range.start.naive_utc(), + self.time_range.end.naive_utc(), + &time_partitions, + ) + .data; + }; + + let transformed = transform( + explain.plan.as_ref().clone(), + self.time_range.start.naive_utc(), + self.time_range.end.naive_utc(), + &time_partitions, + ); + explain.stringified_plans = vec![transformed + .data + .to_stringified(PlanType::InitialLogicalPlan)]; + explain.plan = Arc::new(transformed.data); + + LogicalPlan::Explain(explain) } // name of the main/first stream in the query From 08ad12400a452f29950a2e0133a8c0db475d9802 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Mon, 13 Jan 2025 14:03:30 +0530 Subject: [PATCH 14/17] expose `metadata` and `query` --- src/lib.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 973406cb5..22e887fb9 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -33,13 +33,13 @@ pub mod hottier; ))] pub mod kafka; mod livetail; -mod metadata; +pub mod metadata; pub mod metrics; pub mod migration; mod oidc; pub mod option; pub mod otel; -mod query; +pub mod query; pub mod rbac; mod response; mod static_schema; From 7f58d3f71a04e417cbfbcc7145b5358a8d57c4ba Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Mon, 13 Jan 2025 19:06:08 +0530 Subject: [PATCH 15/17] revert/fix: only use first table's time partition --- src/query/mod.rs | 44 ++++++++++++++++---------------------------- 1 file changed, 16 insertions(+), 28 deletions(-) diff --git a/src/query/mod.rs b/src/query/mod.rs index 460995099..a2961bf00 100644 --- a/src/query/mod.rs +++ b/src/query/mod.rs @@ -124,8 +124,12 @@ pub struct Query { impl Query { pub async fn execute(self) -> Result<(Vec, Vec), ExecuteError> { - let time_partitions = self.get_time_partitions()?; - let logical_plan = self.final_logical_plan(time_partitions); + let stream_name = self + .stream_names + .first() + .ok_or_else(|| ExecuteError::NoStream)?; + let time_partition = STREAM_INFO.get_time_partition(stream_name)?; + let logical_plan = self.final_logical_plan(time_partition.as_ref()); let df = QUERY_SESSION.execute_logical_plan(logical_plan).await?; let fields = df @@ -144,21 +148,8 @@ impl Query { Ok((results, fields)) } - /// Get the time partitions for the streams mentioned in the query - fn get_time_partitions(&self) -> Result, ExecuteError> { - let mut time_partitions = HashMap::default(); - for stream_name in self.stream_names.iter() { - let Some(time_partition) = STREAM_INFO.get_time_partition(stream_name)? else { - continue; - }; - time_partitions.insert(stream_name.clone(), time_partition); - } - - Ok(time_partitions) - } - /// return logical plan with all time filters applied through - fn final_logical_plan(self, time_partitions: HashMap) -> LogicalPlan { + fn final_logical_plan(self, time_partition: Option<&String>) -> LogicalPlan { // see https://github.com/apache/arrow-datafusion/pull/8400 // this can be eliminated in later version of datafusion but with slight caveat // transform cannot modify stringified plans by itself @@ -168,7 +159,7 @@ impl Query { self.plan, self.time_range.start.naive_utc(), self.time_range.end.naive_utc(), - &time_partitions, + time_partition, ) .data; }; @@ -177,7 +168,7 @@ impl Query { explain.plan.as_ref().clone(), self.time_range.start.naive_utc(), self.time_range.end.naive_utc(), - &time_partitions, + time_partition, ); explain.stringified_plans = vec![transformed .data @@ -227,7 +218,7 @@ fn transform( plan: LogicalPlan, start_time: NaiveDateTime, end_time: NaiveDateTime, - time_partitions: &HashMap, + time_partition: Option<&String>, ) -> Transformed { plan.transform(|plan| { let LogicalPlan::TableScan(table) = &plan else { @@ -235,14 +226,12 @@ fn transform( }; // Early return if filters already exist - if query_can_be_filtered_on_stream_time_partition(table, time_partitions) { + if query_can_be_filtered_on_stream_time_partition(table, time_partition) { return Ok(Transformed::no(plan)); } let stream = table.table_name.clone(); - let time_partition = time_partitions - .get(stream.table()) - .map_or(event::DEFAULT_TIMESTAMP_KEY, |x| x.as_str()); + let time_partition = time_partition.map_or(event::DEFAULT_TIMESTAMP_KEY, |x| x.as_str()); let column_expr = Expr::Column(Column::new(Some(stream), time_partition)); // Reconstruct plan with start and end time filters @@ -260,7 +249,7 @@ fn transform( // check if the query contains the streams's time partition as filter fn query_can_be_filtered_on_stream_time_partition( table: &datafusion::logical_expr::TableScan, - time_partitions: &HashMap, + time_partition: Option<&String>, ) -> bool { table .filters @@ -273,10 +262,7 @@ fn query_can_be_filtered_on_stream_time_partition( Expr::Column(Column { name: column_name, .. }) => { - time_partitions - .get(table.table_name.table()) - .map_or(event::DEFAULT_TIMESTAMP_KEY, |x| x.as_str()) - == column_name + time_partition.map_or(event::DEFAULT_TIMESTAMP_KEY, |x| x.as_str()) == column_name } _ => false, }) @@ -381,6 +367,8 @@ pub mod error { Datafusion(#[from] DataFusionError), #[error("Query Execution failed due to error in fetching metadata: {0}")] Metadata(#[from] MetadataError), + #[error("Query Execution failed due to missing stream name in query")] + NoStream, } } From 976907126ce4ff054967be7dd8d511d9b03647ff Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Mon, 13 Jan 2025 20:47:41 +0530 Subject: [PATCH 16/17] revert: matches expr --- src/query/mod.rs | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/src/query/mod.rs b/src/query/mod.rs index a2961bf00..93c66a815 100644 --- a/src/query/mod.rs +++ b/src/query/mod.rs @@ -258,13 +258,12 @@ fn query_can_be_filtered_on_stream_time_partition( Expr::BinaryExpr(binexpr) => Some(binexpr), _ => None, }) - .any(|expr| match &*expr.left { - Expr::Column(Column { - name: column_name, .. - }) => { - time_partition.map_or(event::DEFAULT_TIMESTAMP_KEY, |x| x.as_str()) == column_name - } - _ => false, + .any(|expr| { + matches!( + &*expr.left, + Expr::Column(Column { name, .. }) + if time_partition.map_or(event::DEFAULT_TIMESTAMP_KEY, |n| n.as_str()) == name + ) }) } From e462e3b325caab2cef3ebe8cffa358eb897fee32 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Wed, 15 Jan 2025 02:43:40 +0530 Subject: [PATCH 17/17] fix: label name Signed-off-by: Devdutt Shenoi --- src/handlers/airplane.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/handlers/airplane.rs b/src/handlers/airplane.rs index 2e9b5c22f..165b811d3 100644 --- a/src/handlers/airplane.rs +++ b/src/handlers/airplane.rs @@ -201,7 +201,7 @@ impl FlightService for AirServiceImpl { let time = time.elapsed().as_secs_f64(); QUERY_EXECUTE_TIME - .with_label_values(&[&stream_name]) + .with_label_values(&[&format!("flight-query-{stream_name}")]) .observe(time); into_flight_data(records)