diff --git a/src/alerts/alerts_utils.rs b/src/alerts/alerts_utils.rs index 878f78644..04d366428 100644 --- a/src/alerts/alerts_utils.rs +++ b/src/alerts/alerts_utils.rs @@ -32,7 +32,7 @@ use tracing::trace; use crate::{ alerts::AggregateCondition, parseable::PARSEABLE, - query::{TableScanVisitor, QUERY_SESSION}, + query::{Query, TableScanVisitor, QUERY_SESSION}, rbac::{ map::SessionKey, role::{Action, Permission}, @@ -102,7 +102,7 @@ pub async fn evaluate_alert(alert: &AlertConfig) -> Result<(), AlertError> { trace!("RUNNING EVAL TASK FOR- {alert:?}"); let query = prepare_query(alert).await?; - let base_df = execute_base_query(&query, &alert.query).await?; + let base_df = execute_base_query(query, &alert.query).await?; let agg_results = evaluate_aggregates(&alert.aggregate_config, &base_df).await?; let final_res = calculate_final_result(&alert.aggregate_config, &agg_results); @@ -118,27 +118,29 @@ async fn prepare_query(alert: &AlertConfig) -> Result Result { - let stream_name = query.first_table_name().ok_or_else(|| { - AlertError::CustomError(format!("Table name not found in query- {}", original_query)) +async fn execute_base_query(query: Query, original_query: &str) -> Result { + let stream_name = query.first_stream_name().ok_or_else(|| { + AlertError::CustomError(format!("Table name not found in query- {original_query}")) })?; - let time_partition = PARSEABLE.get_stream(&stream_name)?.get_time_partition(); + let time_partition = PARSEABLE.get_stream(stream_name)?.get_time_partition(); query .get_dataframe(time_partition.as_ref()) .await diff --git a/src/alerts/mod.rs b/src/alerts/mod.rs index 82581ffff..3f24dcd1f 100644 --- a/src/alerts/mod.rs +++ b/src/alerts/mod.rs @@ -38,7 +38,7 @@ pub mod alerts_utils; pub mod target; use crate::parseable::{StreamNotFound, PARSEABLE}; -use crate::query::{TableScanVisitor, QUERY_SESSION}; +use crate::query::{Query, TableScanVisitor, QUERY_SESSION}; use crate::rbac::map::SessionKey; use crate::storage; use crate::storage::ObjectStorageError; @@ -480,13 +480,14 @@ impl AlertConfig { self.validate_configs()?; let session_state = QUERY_SESSION.state(); - let raw_logical_plan = session_state.create_logical_plan(&self.query).await?; + let plan = session_state.create_logical_plan(&self.query).await?; // create a visitor to extract the table names present in query let mut visitor = TableScanVisitor::default(); - let _ = raw_logical_plan.visit(&mut visitor); + let _ = plan.visit(&mut visitor); + let stream_names = visitor.into_inner(); - let table = visitor.into_inner().first().unwrap().to_owned(); + let table = stream_names.first().unwrap().to_owned(); let lowercase = self.query.split(&table).collect_vec()[0].to_lowercase(); @@ -506,22 +507,23 @@ impl AlertConfig { let time_range = TimeRange::parse_human_time("1m", "now") .map_err(|err| AlertError::CustomError(err.to_string()))?; - let query = crate::query::Query { - raw_logical_plan, + let query = Query { + plan, time_range, filter_tag: None, + stream_names, }; // for now proceed in a similar fashion as we do in query // TODO: in case of multiple table query does the selection of time partition make a difference? (especially when the tables don't have overlapping data) - let Some(stream_name) = query.first_table_name() else { + let Some(stream_name) = query.first_stream_name() else { return Err(AlertError::CustomError(format!( "Table name not found in query- {}", self.query ))); }; - let time_partition = PARSEABLE.get_stream(&stream_name)?.get_time_partition(); + let time_partition = PARSEABLE.get_stream(stream_name)?.get_time_partition(); let base_df = query .get_dataframe(time_partition.as_ref()) .await diff --git a/src/handlers/airplane.rs b/src/handlers/airplane.rs index 1a72c7470..f6011f08e 100644 --- a/src/handlers/airplane.rs +++ b/src/handlers/airplane.rs @@ -21,7 +21,6 @@ use arrow_flight::flight_service_server::FlightServiceServer; use arrow_flight::PollInfo; use arrow_schema::ArrowError; -use datafusion::common::tree_node::TreeNode; use serde_json::json; use std::net::SocketAddr; use std::time::Instant; @@ -34,17 +33,14 @@ use tonic::transport::{Identity, Server, ServerTlsConfig}; use tonic_web::GrpcWebLayer; use crate::handlers::http::cluster::get_ingestor_info; -use crate::handlers::http::query::{into_query, update_schema_when_distributed}; use crate::handlers::livetail::cross_origin_config; use crate::metrics::QUERY_EXECUTE_TIME; use crate::parseable::PARSEABLE; -use crate::query::{execute, TableScanVisitor, QUERY_SESSION}; +use crate::query::execute; use crate::utils::arrow::flight::{ append_temporary_events, get_query_from_ticket, into_flight_data, run_do_get_rpc, send_to_ingester, }; -use crate::utils::time::TimeRange; -use crate::utils::user_auth_for_query; use arrow_flight::{ flight_service_server::FlightService, Action, ActionType, Criteria, Empty, FlightData, FlightDescriptor, FlightInfo, HandshakeRequest, HandshakeResponse, PutResult, SchemaAsIpc, @@ -127,55 +123,41 @@ impl FlightService for AirServiceImpl { async fn do_get(&self, req: Request) -> Result, Status> { let key = extract_session_key(req.metadata())?; - let ticket = get_query_from_ticket(&req)?; + // try authorize + match Users.authorize(key.clone(), rbac::role::Action::Query, None, None) { + rbac::Response::Authorized => (), + rbac::Response::UnAuthorized => { + return Err(Status::permission_denied( + "user is not authorized to access this resource", + )) + } + rbac::Response::ReloadRequired => { + return Err(Status::unauthenticated("reload required")) + } + } - info!("query requested to airplane: {:?}", ticket); + let query_request = get_query_from_ticket(&req)?; + info!("query requested to airplane: {:?}", query_request); - // get the query session_state - let session_state = QUERY_SESSION.state(); + // map payload to query + let query = query_request.into_query(&key).await.map_err(|e| { + error!("Error faced while constructing logical query: {e}"); + Status::internal(format!("Failed to process query: {e}")) + })?; - // get the logical plan and extract the table name - let raw_logical_plan = session_state - .create_logical_plan(&ticket.query) - .await - .map_err(|err| { - error!("Datafusion Error: Failed to create logical plan: {}", err); - Status::internal("Failed to create logical plan") - })?; - - let time_range = TimeRange::parse_human_time(&ticket.start_time, &ticket.end_time) - .map_err(|e| Status::internal(e.to_string()))?; - // create a visitor to extract the table name - let mut visitor = TableScanVisitor::default(); - let _ = raw_logical_plan.visit(&mut visitor); - - let streams = visitor.into_inner(); - - let stream_name = streams - .first() - .ok_or_else(|| Status::aborted("Malformed SQL Provided, Table Name Not Found"))? + let stream_name = query + .first_stream_name() + .ok_or_else(|| Status::internal("Failed to get stream name from query"))? .to_owned(); - update_schema_when_distributed(&streams) - .await - .map_err(|err| Status::internal(err.to_string()))?; - - // map payload to query - let query = into_query(&ticket, &session_state, time_range) - .await - .map_err(|_| Status::internal("Failed to parse query"))?; - let event = if send_to_ingester( query.time_range.start.timestamp_millis(), query.time_range.end.timestamp_millis(), ) { - let sql = format!("select * from {}", &stream_name); - let start_time = ticket.start_time.clone(); - let end_time = ticket.end_time.clone(); let out_ticket = json!({ - "query": sql, - "startTime": start_time, - "endTime": end_time + "query": format!("select * from {stream_name}"), + "startTime": query_request.start_time, + "endTime": query_request.end_time, }) .to_string(); @@ -190,30 +172,12 @@ impl FlightService for AirServiceImpl { } } let mr = minute_result.iter().collect::>(); - let event = append_temporary_events(&stream_name, mr).await?; + let event = append_temporary_events(stream_name.as_str(), mr).await?; Some(event) } else { None }; - // try authorize - match Users.authorize(key.clone(), rbac::role::Action::Query, None, None) { - rbac::Response::Authorized => (), - rbac::Response::UnAuthorized => { - return Err(Status::permission_denied( - "user is not authorized to access this resource", - )) - } - rbac::Response::ReloadRequired => { - return Err(Status::unauthenticated("reload required")) - } - } - - let permissions = Users.get_permissions(&key); - - user_auth_for_query(&permissions, &streams).map_err(|_| { - Status::permission_denied("User Does not have permission to access this") - })?; let time = Instant::now(); let (records, _) = execute(query, &stream_name) @@ -241,7 +205,7 @@ impl FlightService for AirServiceImpl { let time = time.elapsed().as_secs_f64(); QUERY_EXECUTE_TIME - .with_label_values(&[&format!("flight-query-{}", stream_name)]) + .with_label_values(&[&format!("flight-query-{stream_name}")]) .observe(time); // Airplane takes off 🛫 diff --git a/src/handlers/http/mod.rs b/src/handlers/http/mod.rs index aa2e0a02d..54db938d5 100644 --- a/src/handlers/http/mod.rs +++ b/src/handlers/http/mod.rs @@ -25,7 +25,7 @@ use serde_json::Value; use crate::{parseable::PARSEABLE, storage::STREAM_ROOT_DIRECTORY, HTTP_CLIENT}; -use self::{cluster::get_ingestor_info, query::Query}; +use self::{cluster::get_ingestor_info, query::QueryRequest}; pub mod about; pub mod alerts; @@ -105,7 +105,7 @@ pub async fn fetch_schema(stream_name: &str) -> anyhow::Result anyhow::Result> { +pub async fn send_query_request_to_ingestor(query: &QueryRequest) -> anyhow::Result> { // send the query request to the ingestor let mut res = vec![]; let ima = get_ingestor_info().await?; diff --git a/src/handlers/http/query.rs b/src/handlers/http/query.rs index 7d9c33a45..7baf5bf24 100644 --- a/src/handlers/http/query.rs +++ b/src/handlers/http/query.rs @@ -16,96 +16,129 @@ * */ -use actix_web::http::header::ContentType; -use actix_web::web::{self, Json}; -use actix_web::{FromRequest, HttpRequest, HttpResponse, Responder}; +use std::{future::Future, pin::Pin, sync::Arc, time::Instant}; + +use actix_web::{ + http::header::ContentType, web::Json, FromRequest, HttpRequest, HttpResponse, Responder, +}; use chrono::{DateTime, Utc}; -use datafusion::common::tree_node::TreeNode; -use datafusion::error::DataFusionError; -use datafusion::execution::context::SessionState; -use futures_util::Future; +use datafusion::{common::tree_node::TreeNode, error::DataFusionError}; use http::StatusCode; use serde::{Deserialize, Serialize}; use serde_json::{json, Value}; -use std::collections::HashMap; -use std::pin::Pin; -use std::sync::Arc; -use std::time::Instant; -use tracing::error; - -use crate::event::error::EventError; -use crate::handlers::http::fetch_schema; - -use crate::event::commit_schema; -use crate::metrics::QUERY_EXECUTE_TIME; -use crate::option::Mode; -use crate::parseable::{StreamNotFound, PARSEABLE}; -use crate::query::error::ExecuteError; -use crate::query::{execute, CountsRequest, CountsResponse, Query as LogicalQuery}; -use crate::query::{TableScanVisitor, QUERY_SESSION}; -use crate::rbac::Users; -use crate::response::QueryResponse; -use crate::storage::object_storage::commit_schema_to_storage; -use crate::storage::ObjectStorageError; -use crate::utils::actix::extract_session_key_from_req; -use crate::utils::time::{TimeParseError, TimeRange}; -use crate::utils::user_auth_for_query; - -/// Query Request through http endpoint. + +use crate::{ + event::{commit_schema, error::EventError}, + metrics::QUERY_EXECUTE_TIME, + option::Mode, + parseable::{StreamNotFound, PARSEABLE}, + query::{ + error::ExecuteError, execute, CountsRequest, CountsResponse, Query, TableScanVisitor, + QUERY_SESSION, + }, + rbac::{map::SessionKey, Users}, + response::QueryResponse, + storage::{object_storage::commit_schema_to_storage, ObjectStorageError}, + utils::{ + actix::extract_session_key_from_req, + time::{TimeParseError, TimeRange}, + user_auth_for_query, + }, +}; + +use super::fetch_schema; + +/// Can be optionally be accepted as query params in the query request +/// NOTE: ensure that the fields param is not set based on request body +#[derive(Debug, Default, Deserialize, Serialize, Clone)] +#[serde(rename_all = "camelCase")] +pub struct QueryParams { + #[serde(default)] + pub fields: bool, + #[serde(default)] + pub send_null: bool, +} + +/// Query Request in json format. #[derive(Debug, Deserialize, Serialize, Clone)] #[serde(rename_all = "camelCase")] -pub struct Query { +pub struct QueryRequest { pub query: String, pub start_time: String, pub end_time: String, - #[serde(default)] - pub send_null: bool, - #[serde(skip)] - pub fields: bool, + #[serde(default, flatten)] + pub params: QueryParams, #[serde(skip)] pub filter_tags: Option>, } -pub async fn query(req: HttpRequest, query_request: Query) -> Result { - let session_state = QUERY_SESSION.state(); - let raw_logical_plan = match session_state - .create_logical_plan(&query_request.query) - .await - { - Ok(raw_logical_plan) => raw_logical_plan, - Err(_) => { +impl QueryRequest { + // fields param is set based only on query param and send_null is set to true if either body or query param is true + fn update_params(&mut self, QueryParams { fields, send_null }: QueryParams) { + self.params.fields = fields; + self.params.send_null |= send_null; + } + + // Constructs a query from the http request + pub async fn into_query(&self, key: &SessionKey) -> Result { + if self.query.is_empty() { + return Err(QueryError::EmptyQuery); + } + + if self.start_time.is_empty() { + return Err(QueryError::EmptyStartTime); + } + + if self.end_time.is_empty() { + return Err(QueryError::EmptyEndTime); + } + + let session_state = QUERY_SESSION.state(); + let plan = if let Ok(plan) = session_state.create_logical_plan(&self.query).await { + plan + } else { //if logical plan creation fails, create streams and try again create_streams_for_querier().await; - session_state - .create_logical_plan(&query_request.query) - .await? - } - }; - let time_range = - TimeRange::parse_human_time(&query_request.start_time, &query_request.end_time)?; + session_state.create_logical_plan(&self.query).await? + }; - // Create a visitor to extract the table names present in query - let mut visitor = TableScanVisitor::default(); - let _ = raw_logical_plan.visit(&mut visitor); + // create a visitor to extract the table names present in query + let mut visitor = TableScanVisitor::default(); + let _ = plan.visit(&mut visitor); + let stream_names = visitor.into_inner(); + let permissions = Users.get_permissions(key); + user_auth_for_query(&permissions, &stream_names)?; - let tables = visitor.into_inner(); - update_schema_when_distributed(&tables).await?; - let query: LogicalQuery = into_query(&query_request, &session_state, time_range).await?; + update_schema_when_distributed(&stream_names).await?; - let creds = extract_session_key_from_req(&req)?; - let permissions = Users.get_permissions(&creds); + let time_range = TimeRange::parse_human_time(&self.start_time, &self.end_time)?; - let table_name = query - .first_table_name() - .ok_or_else(|| QueryError::MalformedQuery("No table name found in query"))?; + Ok(Query { + plan, + time_range, + filter_tag: self.filter_tags.clone(), + stream_names, + }) + } +} - user_auth_for_query(&permissions, &tables)?; +pub async fn query( + req: HttpRequest, + query_request: QueryRequest, +) -> Result { + let key = extract_session_key_from_req(&req)?; + let query = query_request.into_query(&key).await?; + let first_stream_name = query + .first_stream_name() + .ok_or_else(|| QueryError::MalformedQuery("No table name found in query"))?; + let histogram = QUERY_EXECUTE_TIME.with_label_values(&[first_stream_name]); let time = Instant::now(); + // Intercept `count(*)`` queries and use the counts API if let Some(column_name) = query.is_logical_plan_count_without_filters() { let counts_req = CountsRequest { - stream: table_name.clone(), + stream: first_stream_name.to_owned(), start_time: query_request.start_time.clone(), end_time: query_request.end_time.clone(), num_bins: 1, @@ -113,7 +146,7 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result Result) -> Result<(), /// create streams for memory from storage if they do not exist pub async fn create_streams_for_querier() { let querier_streams = PARSEABLE.streams.list(); - let store = PARSEABLE.storage.get_object_store(); - let storage_streams = store.list_streams().await.unwrap(); - for stream_name in storage_streams { + for stream_name in PARSEABLE + .storage + .get_object_store() + .list_streams() + .await + .unwrap() + { if !querier_streams.contains(&stream_name) { let _ = PARSEABLE .create_stream_and_schema_from_storage(&stream_name) @@ -199,25 +234,20 @@ pub async fn create_streams_for_querier() { } } -impl FromRequest for Query { +impl FromRequest for QueryRequest { type Error = actix_web::Error; type Future = Pin>>>; fn from_request(req: &HttpRequest, payload: &mut actix_web::dev::Payload) -> Self::Future { - let query = Json::::from_request(req, payload); - let params = web::Query::>::from_request(req, payload) + let query = Json::::from_request(req, payload); + let params = actix_web::web::Query::::from_request(req, payload) .into_inner() .map(|x| x.0) .unwrap_or_default(); let fut = async move { let mut query = query.await?.into_inner(); - // format output json to include field names - query.fields = params.get("fields").cloned().unwrap_or(false); - - if !query.send_null { - query.send_null = params.get("sendNull").cloned().unwrap_or(false); - } + query.update_params(params); Ok(query) }; @@ -226,33 +256,9 @@ impl FromRequest for Query { } } -pub async fn into_query( - query: &Query, - session_state: &SessionState, - time_range: TimeRange, -) -> Result { - if query.query.is_empty() { - return Err(QueryError::EmptyQuery); - } - - if query.start_time.is_empty() { - return Err(QueryError::EmptyStartTime); - } - - if query.end_time.is_empty() { - return Err(QueryError::EmptyEndTime); - } - - Ok(crate::query::Query { - raw_logical_plan: session_state.create_logical_plan(&query.query).await?, - time_range, - filter_tag: query.filter_tags.clone(), - }) -} - /// unused for now, might need it in the future #[allow(unused)] -fn transform_query_for_ingestor(query: &Query) -> Option { +fn transform_query_for_ingestor(query: &QueryRequest) -> Option { if query.query.is_empty() { return None; } @@ -265,7 +271,7 @@ fn transform_query_for_ingestor(query: &Query) -> Option { return None; } - let end_time: DateTime = if query.end_time == "now" { + let end_time = if query.end_time == "now" { Utc::now() } else { DateTime::parse_from_rfc3339(&query.end_time) @@ -275,14 +281,10 @@ fn transform_query_for_ingestor(query: &Query) -> Option { let start_time = end_time - chrono::Duration::minutes(1); // when transforming the query, the ingestors are forced to return an array of values - let q = Query { - query: query.query.clone(), - fields: false, - filter_tags: query.filter_tags.clone(), - send_null: query.send_null, - start_time: start_time.to_rfc3339(), - end_time: end_time.to_rfc3339(), - }; + let mut q = query.clone(); + q.params.fields = false; + q.start_time = start_time.to_rfc3339(); + q.end_time = end_time.to_rfc3339(); Some(q) } diff --git a/src/lib.rs b/src/lib.rs index 94b81639d..efb761fce 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -31,7 +31,7 @@ mod event; pub mod handlers; pub mod hottier; mod livetail; -mod metadata; +pub mod metadata; pub mod metrics; pub mod migration; mod oidc; diff --git a/src/prism/logstream/mod.rs b/src/prism/logstream/mod.rs index e37db20e4..1801625fa 100644 --- a/src/prism/logstream/mod.rs +++ b/src/prism/logstream/mod.rs @@ -33,18 +33,15 @@ use crate::{ utils::{merge_quried_stats, IngestionStats, QueriedStats, StorageStats}, }, logstream::error::StreamError, - query::{into_query, update_schema_when_distributed, Query, QueryError}, + query::{update_schema_when_distributed, QueryError, QueryParams, QueryRequest}, }, hottier::{HotTierError, HotTierManager, StreamHotTier}, parseable::{StreamNotFound, PARSEABLE}, - query::{error::ExecuteError, execute, CountsRequest, CountsResponse, QUERY_SESSION}, + query::{error::ExecuteError, execute, CountsRequest, CountsResponse}, rbac::{map::SessionKey, role::Action, Users}, stats, storage::{retention::Retention, StreamInfo, StreamType}, - utils::{ - arrow::record_batches_to_json, - time::{TimeParseError, TimeRange}, - }, + utils::{arrow::record_batches_to_json, time::TimeParseError}, LOCK_EXPECT, }; @@ -294,8 +291,14 @@ impl PrismDatasetRequest { // Retrieve distinct values for source identifiers // Returns None if fields aren't present or if query fails - let ips = self.get_distinct_entries(stream, "p_src_ip").await.ok(); - let user_agents = self.get_distinct_entries(stream, "p_user_agent").await.ok(); + let ips = self + .get_distinct_entries(stream, "p_src_ip", &key) + .await + .ok(); + let user_agents = self + .get_distinct_entries(stream, "p_user_agent", &key) + .await + .ok(); responses.push(PrismDatasetResponse { stream: stream.clone(), @@ -327,19 +330,20 @@ impl PrismDatasetRequest { &self, stream_name: &str, field: &str, + key: &SessionKey, ) -> Result, QueryError> { - let query = Query { + let query = QueryRequest { query: format!("SELECT DISTINCT({field}) FOR {stream_name}"), start_time: "1h".to_owned(), end_time: "now".to_owned(), - send_null: false, + params: QueryParams { + send_null: false, + fields: true, + }, filter_tags: None, - fields: true, }; - let time_range = TimeRange::parse_human_time("1h", "now")?; - let session_state = QUERY_SESSION.state(); - let query = into_query(&query, &session_state, time_range).await?; + let query = query.into_query(key).await?; let (records, _) = execute(query, stream_name).await?; let response = record_batches_to_json(&records)?; // Extract field values from the JSON response diff --git a/src/query/mod.rs b/src/query/mod.rs index df230ef07..f2d619f07 100644 --- a/src/query/mod.rs +++ b/src/query/mod.rs @@ -30,7 +30,7 @@ use datafusion::execution::disk_manager::DiskManagerConfig; use datafusion::execution::SessionStateBuilder; use datafusion::logical_expr::expr::Alias; use datafusion::logical_expr::{ - Aggregate, Explain, Filter, LogicalPlan, PlanType, Projection, ToStringifiedPlan, + Aggregate, Filter, LogicalPlan, PlanType, Projection, ToStringifiedPlan, }; use datafusion::prelude::*; use itertools::Itertools; @@ -55,11 +55,76 @@ use crate::event; use crate::handlers::http::query::QueryError; use crate::option::Mode; use crate::parseable::PARSEABLE; -use crate::storage::{ObjectStorageProvider, ObjectStoreFormat, STREAM_ROOT_DIRECTORY}; +use crate::storage::{ObjectStoreFormat, STREAM_ROOT_DIRECTORY}; use crate::utils::time::TimeRange; -pub static QUERY_SESSION: Lazy = - Lazy::new(|| Query::create_session_context(PARSEABLE.storage())); +pub static QUERY_SESSION: Lazy = Lazy::new(|| { + let runtime_config = PARSEABLE + .storage + .get_datafusion_runtime() + .with_disk_manager(DiskManagerConfig::NewOs); + + let (pool_size, fraction) = match PARSEABLE.options.query_memory_pool_size { + Some(size) => (size, 1.), + None => { + let mut system = System::new(); + system.refresh_memory(); + let available_mem = system.available_memory(); + (available_mem as usize, 0.85) + } + }; + + let runtime_config = runtime_config.with_memory_limit(pool_size, fraction); + let runtime = Arc::new(runtime_config.build().unwrap()); + + // All the config options are explained here - + // https://datafusion.apache.org/user-guide/configs.html + let mut config = SessionConfig::default() + .with_parquet_pruning(true) + .with_prefer_existing_sort(true) + .with_batch_size(1000000); + + // Pushdown filters allows DF to push the filters as far down in the plan as possible + // and thus, reducing the number of rows decoded + config.options_mut().execution.parquet.pushdown_filters = true; + + // Reorder filters allows DF to decide the order of filters minimizing the cost of filter evaluation + config.options_mut().execution.parquet.reorder_filters = true; + config.options_mut().execution.parquet.binary_as_string = true; + config + .options_mut() + .execution + .use_row_number_estimates_to_optimize_partitioning = true; + + //adding this config as it improves query performance as explained here - + // https://github.com/apache/datafusion/pull/13101 + config + .options_mut() + .execution + .parquet + .schema_force_view_types = true; + + let state = SessionStateBuilder::new() + .with_default_features() + .with_config(config) + .with_runtime_env(runtime) + .build(); + + let schema_provider = Arc::new(GlobalSchemaProvider { + storage: PARSEABLE.storage.get_object_store(), + }); + state + .catalog_list() + .catalog(&state.config_options().catalog.default_catalog) + .expect("default catalog is provided by datafusion") + .register_schema( + &state.config_options().catalog.default_schema, + schema_provider, + ) + .unwrap(); + + SessionContext::new_with_state(state) +}); /// Dedicated multi-threaded runtime to run all queries on pub static QUERY_RUNTIME: Lazy = @@ -81,82 +146,15 @@ pub async fn execute( // A query request by client #[derive(Debug)] pub struct Query { - pub raw_logical_plan: LogicalPlan, + pub plan: LogicalPlan, pub time_range: TimeRange, pub filter_tag: Option>, + pub stream_names: Vec, } impl Query { - // create session context for this query - pub fn create_session_context(storage: Arc) -> SessionContext { - let runtime_config = storage - .get_datafusion_runtime() - .with_disk_manager(DiskManagerConfig::NewOs); - - let (pool_size, fraction) = match PARSEABLE.options.query_memory_pool_size { - Some(size) => (size, 1.), - None => { - let mut system = System::new(); - system.refresh_memory(); - let available_mem = system.available_memory(); - (available_mem as usize, 0.85) - } - }; - - let runtime_config = runtime_config.with_memory_limit(pool_size, fraction); - let runtime = Arc::new(runtime_config.build().unwrap()); - - // All the config options are explained here - - // https://datafusion.apache.org/user-guide/configs.html - let mut config = SessionConfig::default() - .with_parquet_pruning(true) - .with_prefer_existing_sort(true) - .with_batch_size(1000000); - - // Pushdown filters allows DF to push the filters as far down in the plan as possible - // and thus, reducing the number of rows decoded - config.options_mut().execution.parquet.pushdown_filters = true; - - // Reorder filters allows DF to decide the order of filters minimizing the cost of filter evaluation - config.options_mut().execution.parquet.reorder_filters = true; - config.options_mut().execution.parquet.binary_as_string = true; - config - .options_mut() - .execution - .use_row_number_estimates_to_optimize_partitioning = true; - - //adding this config as it improves query performance as explained here - - // https://github.com/apache/datafusion/pull/13101 - config - .options_mut() - .execution - .parquet - .schema_force_view_types = true; - - let state = SessionStateBuilder::new() - .with_default_features() - .with_config(config) - .with_runtime_env(runtime) - .build(); - - let schema_provider = Arc::new(GlobalSchemaProvider { - storage: storage.get_object_store(), - }); - state - .catalog_list() - .catalog(&state.config_options().catalog.default_catalog) - .expect("default catalog is provided by datafusion") - .register_schema( - &state.config_options().catalog.default_schema, - schema_provider, - ) - .unwrap(); - - SessionContext::new_with_state(state) - } - pub async fn execute( - &self, + self, time_partition: Option<&String>, ) -> Result<(Vec, Vec), ExecuteError> { let df = QUERY_SESSION @@ -181,7 +179,7 @@ impl Query { } pub async fn get_dataframe( - &self, + self, time_partition: Option<&String>, ) -> Result { let df = QUERY_SESSION @@ -192,52 +190,44 @@ impl Query { } /// return logical plan with all time filters applied through - fn final_logical_plan(&self, time_partition: Option<&String>) -> LogicalPlan { + fn final_logical_plan(self, time_partition: Option<&String>) -> LogicalPlan { // see https://github.com/apache/arrow-datafusion/pull/8400 // this can be eliminated in later version of datafusion but with slight caveat // transform cannot modify stringified plans by itself // we by knowing this plan is not in the optimization procees chose to overwrite the stringified plan + let LogicalPlan::Explain(mut explain) = self.plan else { + return transform( + self.plan, + self.time_range.start.naive_utc(), + self.time_range.end.naive_utc(), + time_partition, + ) + .data; + }; - match self.raw_logical_plan.clone() { - LogicalPlan::Explain(plan) => { - let transformed = transform( - plan.plan.as_ref().clone(), - self.time_range.start.naive_utc(), - self.time_range.end.naive_utc(), - time_partition, - ); - LogicalPlan::Explain(Explain { - verbose: plan.verbose, - stringified_plans: vec![transformed - .data - .to_stringified(PlanType::InitialLogicalPlan)], - plan: Arc::new(transformed.data), - schema: plan.schema, - logical_optimization_succeeded: plan.logical_optimization_succeeded, - }) - } - x => { - transform( - x, - self.time_range.start.naive_utc(), - self.time_range.end.naive_utc(), - time_partition, - ) - .data - } - } + let transformed = transform( + explain.plan.as_ref().clone(), + self.time_range.start.naive_utc(), + self.time_range.end.naive_utc(), + time_partition, + ); + explain.stringified_plans = vec![transformed + .data + .to_stringified(PlanType::InitialLogicalPlan)]; + explain.plan = Arc::new(transformed.data); + + LogicalPlan::Explain(explain) } - pub fn first_table_name(&self) -> Option { - let mut visitor = TableScanVisitor::default(); - let _ = self.raw_logical_plan.visit(&mut visitor); - visitor.into_inner().pop() + // name of the main/first stream in the query + pub fn first_stream_name(&self) -> Option<&String> { + self.stream_names.first() } /// Evaluates to Some("count(*)") | Some("column_name") if the logical plan is a Projection: SELECT COUNT(*) | SELECT COUNT(*) as column_name pub fn is_logical_plan_count_without_filters(&self) -> Option<&String> { // Check if the raw logical plan is a Projection: SELECT - let LogicalPlan::Projection(Projection { input, expr, .. }) = &self.raw_logical_plan else { + let LogicalPlan::Projection(Projection { input, expr, .. }) = &self.plan else { return None; }; // Check if the input of the Projection is an Aggregate: COUNT(*) @@ -442,6 +432,7 @@ impl TreeNodeVisitor<'_> for TableScanVisitor { } } +// transform the plan to apply time filters pub async fn get_manifest_list( stream_name: &str, time_range: &TimeRange, @@ -514,78 +505,50 @@ fn transform( end_time: NaiveDateTime, time_partition: Option<&String>, ) -> Transformed { - plan.transform(&|plan| match plan { - LogicalPlan::TableScan(table) => { - let mut new_filters = vec![]; - if !table_contains_any_time_filters(&table, time_partition) { - let mut _start_time_filter: Expr; - let mut _end_time_filter: Expr; - match time_partition { - Some(time_partition) => { - _start_time_filter = - PartialTimeFilter::Low(std::ops::Bound::Included(start_time)) - .binary_expr(Expr::Column(Column::new( - Some(table.table_name.to_owned()), - time_partition.clone(), - ))); - _end_time_filter = - PartialTimeFilter::High(std::ops::Bound::Excluded(end_time)) - .binary_expr(Expr::Column(Column::new( - Some(table.table_name.to_owned()), - time_partition, - ))); - } - None => { - _start_time_filter = - PartialTimeFilter::Low(std::ops::Bound::Included(start_time)) - .binary_expr(Expr::Column(Column::new( - Some(table.table_name.to_owned()), - event::DEFAULT_TIMESTAMP_KEY, - ))); - _end_time_filter = - PartialTimeFilter::High(std::ops::Bound::Excluded(end_time)) - .binary_expr(Expr::Column(Column::new( - Some(table.table_name.to_owned()), - event::DEFAULT_TIMESTAMP_KEY, - ))); - } - } + plan.transform(|plan| { + let LogicalPlan::TableScan(table) = &plan else { + return Ok(Transformed::no(plan)); + }; - new_filters.push(_start_time_filter); - new_filters.push(_end_time_filter); - } - let new_filter = new_filters.into_iter().reduce(and); - if let Some(new_filter) = new_filter { - let filter = - Filter::try_new(new_filter, Arc::new(LogicalPlan::TableScan(table))).unwrap(); - Ok(Transformed::yes(LogicalPlan::Filter(filter))) - } else { - Ok(Transformed::no(LogicalPlan::TableScan(table))) - } + // Early return if filters already exist + if query_can_be_filtered_on_stream_time_partition(table, time_partition) { + return Ok(Transformed::no(plan)); } - x => Ok(Transformed::no(x)), + + let stream = table.table_name.clone(); + let time_partition = time_partition.map_or(event::DEFAULT_TIMESTAMP_KEY, |x| x.as_str()); + let column_expr = Expr::Column(Column::new(Some(stream), time_partition)); + + // Reconstruct plan with start and end time filters + let low_filter = + PartialTimeFilter::Low(Bound::Included(start_time)).binary_expr(column_expr.clone()); + let high_filter = + PartialTimeFilter::High(Bound::Excluded(end_time)).binary_expr(column_expr); + let filter = Filter::try_new(and(low_filter, high_filter), Arc::new(plan))?; + + Ok(Transformed::yes(LogicalPlan::Filter(filter))) }) .expect("transform only transforms the tablescan") } -fn table_contains_any_time_filters( +// check if the query contains the streams's time partition as filter +fn query_can_be_filtered_on_stream_time_partition( table: &datafusion::logical_expr::TableScan, time_partition: Option<&String>, ) -> bool { table .filters .iter() - .filter_map(|x| { - if let Expr::BinaryExpr(binexpr) = x { - Some(binexpr) - } else { - None - } + .filter_map(|x| match x { + Expr::BinaryExpr(binexpr) => Some(binexpr), + _ => None, }) .any(|expr| { - matches!(&*expr.left, Expr::Column(Column { name, .. }) - if (time_partition.is_some_and(|field| field == name) || - (time_partition.is_none() && name == event::DEFAULT_TIMESTAMP_KEY))) + matches!( + &*expr.left, + Expr::Column(Column { name, .. }) + if time_partition.map_or(event::DEFAULT_TIMESTAMP_KEY, |n| n.as_str()) == name + ) }) } @@ -645,6 +608,8 @@ pub mod error { ObjectStorage(#[from] ObjectStorageError), #[error("Query Execution failed due to error in datafusion: {0}")] Datafusion(#[from] DataFusionError), + #[error("Query Execution failed due to missing stream name in query")] + NoStream, #[error("{0}")] StreamNotFound(#[from] StreamNotFound), } diff --git a/src/utils/arrow/flight.rs b/src/utils/arrow/flight.rs index c8d2dacf2..379c56077 100644 --- a/src/utils/arrow/flight.rs +++ b/src/utils/arrow/flight.rs @@ -18,7 +18,7 @@ use crate::event::Event; use crate::handlers::http::ingest::push_logs_unchecked; -use crate::handlers::http::query::Query as QueryJson; +use crate::handlers::http::query::QueryRequest; use crate::parseable::PARSEABLE; use crate::query::stream_schema_provider::{extract_primary_filter, is_within_staging_window}; use crate::{handlers::http::modal::IngestorMetadata, option::Mode}; @@ -41,8 +41,8 @@ use tonic::transport::{Channel, Uri}; pub type DoGetStream = stream::BoxStream<'static, Result>; -pub fn get_query_from_ticket(req: &Request) -> Result { - serde_json::from_slice::(&req.get_ref().ticket) +pub fn get_query_from_ticket(req: &Request) -> Result { + serde_json::from_slice::(&req.get_ref().ticket) .map_err(|err| Status::internal(err.to_string())) }