Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: query request flow #1098

Draft
wants to merge 26 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
bde0dd1
refactor: remove method that doesn't need to be
de-sh Jan 12, 2025
918105c
unnecessary allocation
de-sh Jan 12, 2025
29ca742
refactor: serde params
de-sh Jan 12, 2025
beb1137
refactor: serde flatten
de-sh Jan 12, 2025
315e997
refactor: reorganize query construction from request
de-sh Jan 12, 2025
1ac8977
refactor: reorganize query plan transformation and execution
de-sh Jan 12, 2025
c4b676d
get rid of reconstruction and comment the code
de-sh Jan 12, 2025
483eaae
ci: deepsource fix
de-sh Jan 13, 2025
a8d32d1
clippy fix
de-sh Jan 13, 2025
28de72e
fix: don't reconstruct the plan
de-sh Jan 13, 2025
4325f9f
refactor: rename `raw_logical_plan` ~> `plan`
de-sh Jan 13, 2025
fba711f
doc: transform code purpose
de-sh Jan 13, 2025
e89221f
refactor: consume, don't clone
de-sh Jan 13, 2025
08ad124
expose `metadata` and `query`
de-sh Jan 13, 2025
ad18936
Merge remote-tracking branch 'origin/main' into query
de-sh Jan 13, 2025
7f58d3f
revert/fix: only use first table's time partition
de-sh Jan 13, 2025
9769071
revert: matches expr
de-sh Jan 13, 2025
c787ebc
Merge remote-tracking branch 'origin/main' into query
de-sh Jan 14, 2025
e462e3b
fix: label name
de-sh Jan 14, 2025
38663db
Merge remote-tracking branch 'origin/main' into query
de-sh Jan 19, 2025
fc67dca
Merge branch 'main' into query
de-sh Jan 23, 2025
55461f9
Merge remote-tracking branch 'origin/main' into query
de-sh Jan 28, 2025
4d119f8
Merge remote-tracking branch 'origin/main' into query
de-sh Jan 31, 2025
3fe61ba
Merge remote-tracking branch 'origin' into query
de-sh Feb 9, 2025
5c46df5
Merge remote-tracking branch 'origin/main' into query
de-sh Feb 12, 2025
5bf9d9f
Merge remote-tracking branch 'origin/main' into query
de-sh Mar 18, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 14 additions & 12 deletions src/alerts/alerts_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use tracing::trace;
use crate::{
alerts::AggregateCondition,
parseable::PARSEABLE,
query::{TableScanVisitor, QUERY_SESSION},
query::{Query, TableScanVisitor, QUERY_SESSION},
rbac::{
map::SessionKey,
role::{Action, Permission},
Expand Down Expand Up @@ -102,7 +102,7 @@ pub async fn evaluate_alert(alert: &AlertConfig) -> Result<(), AlertError> {
trace!("RUNNING EVAL TASK FOR- {alert:?}");

let query = prepare_query(alert).await?;
let base_df = execute_base_query(&query, &alert.query).await?;
let base_df = execute_base_query(query, &alert.query).await?;
let agg_results = evaluate_aggregates(&alert.aggregate_config, &base_df).await?;
let final_res = calculate_final_result(&alert.aggregate_config, &agg_results);

Expand All @@ -118,27 +118,29 @@ async fn prepare_query(alert: &AlertConfig) -> Result<crate::query::Query, Alert
};

let session_state = QUERY_SESSION.state();
let raw_logical_plan = session_state.create_logical_plan(&alert.query).await?;
let plan = session_state.create_logical_plan(&alert.query).await?;

let time_range = TimeRange::parse_human_time(start_time, end_time)
.map_err(|err| AlertError::CustomError(err.to_string()))?;
// create a visitor to extract the table names present in query
let mut visitor = TableScanVisitor::default();
let _ = plan.visit(&mut visitor);
let stream_names = visitor.into_inner();

Ok(crate::query::Query {
raw_logical_plan,
Ok(Query {
plan,
time_range,
filter_tag: None,
stream_names,
})
}

async fn execute_base_query(
query: &crate::query::Query,
original_query: &str,
) -> Result<DataFrame, AlertError> {
let stream_name = query.first_table_name().ok_or_else(|| {
AlertError::CustomError(format!("Table name not found in query- {}", original_query))
async fn execute_base_query(query: Query, original_query: &str) -> Result<DataFrame, AlertError> {
let stream_name = query.first_stream_name().ok_or_else(|| {
AlertError::CustomError(format!("Table name not found in query- {original_query}"))
})?;

let time_partition = PARSEABLE.get_stream(&stream_name)?.get_time_partition();
let time_partition = PARSEABLE.get_stream(stream_name)?.get_time_partition();
query
.get_dataframe(time_partition.as_ref())
.await
Expand Down
18 changes: 10 additions & 8 deletions src/alerts/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ pub mod alerts_utils;
pub mod target;

use crate::parseable::{StreamNotFound, PARSEABLE};
use crate::query::{TableScanVisitor, QUERY_SESSION};
use crate::query::{Query, TableScanVisitor, QUERY_SESSION};
use crate::rbac::map::SessionKey;
use crate::storage;
use crate::storage::ObjectStorageError;
Expand Down Expand Up @@ -480,13 +480,14 @@ impl AlertConfig {
self.validate_configs()?;

let session_state = QUERY_SESSION.state();
let raw_logical_plan = session_state.create_logical_plan(&self.query).await?;
let plan = session_state.create_logical_plan(&self.query).await?;

// create a visitor to extract the table names present in query
let mut visitor = TableScanVisitor::default();
let _ = raw_logical_plan.visit(&mut visitor);
let _ = plan.visit(&mut visitor);
let stream_names = visitor.into_inner();

let table = visitor.into_inner().first().unwrap().to_owned();
let table = stream_names.first().unwrap().to_owned();

let lowercase = self.query.split(&table).collect_vec()[0].to_lowercase();

Expand All @@ -506,22 +507,23 @@ impl AlertConfig {
let time_range = TimeRange::parse_human_time("1m", "now")
.map_err(|err| AlertError::CustomError(err.to_string()))?;

let query = crate::query::Query {
raw_logical_plan,
let query = Query {
plan,
time_range,
filter_tag: None,
stream_names,
};

// for now proceed in a similar fashion as we do in query
// TODO: in case of multiple table query does the selection of time partition make a difference? (especially when the tables don't have overlapping data)
let Some(stream_name) = query.first_table_name() else {
let Some(stream_name) = query.first_stream_name() else {
return Err(AlertError::CustomError(format!(
"Table name not found in query- {}",
self.query
)));
};

let time_partition = PARSEABLE.get_stream(&stream_name)?.get_time_partition();
let time_partition = PARSEABLE.get_stream(stream_name)?.get_time_partition();
let base_df = query
.get_dataframe(time_partition.as_ref())
.await
Expand Down
92 changes: 28 additions & 64 deletions src/handlers/airplane.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ use arrow_flight::flight_service_server::FlightServiceServer;
use arrow_flight::PollInfo;
use arrow_schema::ArrowError;

use datafusion::common::tree_node::TreeNode;
use serde_json::json;
use std::net::SocketAddr;
use std::time::Instant;
Expand All @@ -34,17 +33,14 @@ use tonic::transport::{Identity, Server, ServerTlsConfig};
use tonic_web::GrpcWebLayer;

use crate::handlers::http::cluster::get_ingestor_info;
use crate::handlers::http::query::{into_query, update_schema_when_distributed};
use crate::handlers::livetail::cross_origin_config;
use crate::metrics::QUERY_EXECUTE_TIME;
use crate::parseable::PARSEABLE;
use crate::query::{execute, TableScanVisitor, QUERY_SESSION};
use crate::query::execute;
use crate::utils::arrow::flight::{
append_temporary_events, get_query_from_ticket, into_flight_data, run_do_get_rpc,
send_to_ingester,
};
use crate::utils::time::TimeRange;
use crate::utils::user_auth_for_query;
use arrow_flight::{
flight_service_server::FlightService, Action, ActionType, Criteria, Empty, FlightData,
FlightDescriptor, FlightInfo, HandshakeRequest, HandshakeResponse, PutResult, SchemaAsIpc,
Expand Down Expand Up @@ -127,55 +123,41 @@ impl FlightService for AirServiceImpl {
async fn do_get(&self, req: Request<Ticket>) -> Result<Response<Self::DoGetStream>, Status> {
let key = extract_session_key(req.metadata())?;

let ticket = get_query_from_ticket(&req)?;
// try authorize
match Users.authorize(key.clone(), rbac::role::Action::Query, None, None) {
rbac::Response::Authorized => (),
rbac::Response::UnAuthorized => {
return Err(Status::permission_denied(
"user is not authorized to access this resource",
))
}
rbac::Response::ReloadRequired => {
return Err(Status::unauthenticated("reload required"))
}
}

info!("query requested to airplane: {:?}", ticket);
let query_request = get_query_from_ticket(&req)?;
info!("query requested to airplane: {:?}", query_request);

// get the query session_state
let session_state = QUERY_SESSION.state();
// map payload to query
let query = query_request.into_query(&key).await.map_err(|e| {
error!("Error faced while constructing logical query: {e}");
Status::internal(format!("Failed to process query: {e}"))
})?;

// get the logical plan and extract the table name
let raw_logical_plan = session_state
.create_logical_plan(&ticket.query)
.await
.map_err(|err| {
error!("Datafusion Error: Failed to create logical plan: {}", err);
Status::internal("Failed to create logical plan")
})?;

let time_range = TimeRange::parse_human_time(&ticket.start_time, &ticket.end_time)
.map_err(|e| Status::internal(e.to_string()))?;
// create a visitor to extract the table name
let mut visitor = TableScanVisitor::default();
let _ = raw_logical_plan.visit(&mut visitor);

let streams = visitor.into_inner();

let stream_name = streams
.first()
.ok_or_else(|| Status::aborted("Malformed SQL Provided, Table Name Not Found"))?
let stream_name = query
.first_stream_name()
.ok_or_else(|| Status::internal("Failed to get stream name from query"))?
.to_owned();

update_schema_when_distributed(&streams)
.await
.map_err(|err| Status::internal(err.to_string()))?;

// map payload to query
let query = into_query(&ticket, &session_state, time_range)
.await
.map_err(|_| Status::internal("Failed to parse query"))?;

let event = if send_to_ingester(
query.time_range.start.timestamp_millis(),
query.time_range.end.timestamp_millis(),
) {
let sql = format!("select * from {}", &stream_name);
let start_time = ticket.start_time.clone();
let end_time = ticket.end_time.clone();
let out_ticket = json!({
"query": sql,
"startTime": start_time,
"endTime": end_time
"query": format!("select * from {stream_name}"),
"startTime": query_request.start_time,
"endTime": query_request.end_time,
})
.to_string();

Expand All @@ -190,30 +172,12 @@ impl FlightService for AirServiceImpl {
}
}
let mr = minute_result.iter().collect::<Vec<_>>();
let event = append_temporary_events(&stream_name, mr).await?;
let event = append_temporary_events(stream_name.as_str(), mr).await?;
Some(event)
} else {
None
};

// try authorize
match Users.authorize(key.clone(), rbac::role::Action::Query, None, None) {
rbac::Response::Authorized => (),
rbac::Response::UnAuthorized => {
return Err(Status::permission_denied(
"user is not authorized to access this resource",
))
}
rbac::Response::ReloadRequired => {
return Err(Status::unauthenticated("reload required"))
}
}

let permissions = Users.get_permissions(&key);

user_auth_for_query(&permissions, &streams).map_err(|_| {
Status::permission_denied("User Does not have permission to access this")
})?;
let time = Instant::now();

let (records, _) = execute(query, &stream_name)
Expand Down Expand Up @@ -241,7 +205,7 @@ impl FlightService for AirServiceImpl {

let time = time.elapsed().as_secs_f64();
QUERY_EXECUTE_TIME
.with_label_values(&[&format!("flight-query-{}", stream_name)])
.with_label_values(&[&format!("flight-query-{stream_name}")])
.observe(time);

// Airplane takes off 🛫
Expand Down
4 changes: 2 additions & 2 deletions src/handlers/http/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use serde_json::Value;

use crate::{parseable::PARSEABLE, storage::STREAM_ROOT_DIRECTORY, HTTP_CLIENT};

use self::{cluster::get_ingestor_info, query::Query};
use self::{cluster::get_ingestor_info, query::QueryRequest};

pub mod about;
pub mod alerts;
Expand Down Expand Up @@ -105,7 +105,7 @@ pub async fn fetch_schema(stream_name: &str) -> anyhow::Result<arrow_schema::Sch

/// unused for now, might need it later
#[allow(unused)]
pub async fn send_query_request_to_ingestor(query: &Query) -> anyhow::Result<Vec<Value>> {
pub async fn send_query_request_to_ingestor(query: &QueryRequest) -> anyhow::Result<Vec<Value>> {
// send the query request to the ingestor
let mut res = vec![];
let ima = get_ingestor_info().await?;
Expand Down
Loading
Loading