diff --git a/src/handlers/http/query.rs b/src/handlers/http/query.rs index 7d9c33a45..51cde9332 100644 --- a/src/handlers/http/query.rs +++ b/src/handlers/http/query.rs @@ -19,6 +19,7 @@ use actix_web::http::header::ContentType; use actix_web::web::{self, Json}; use actix_web::{FromRequest, HttpRequest, HttpResponse, Responder}; +use arrow_schema::ArrowError; use chrono::{DateTime, Utc}; use datafusion::common::tree_node::TreeNode; use datafusion::error::DataFusionError; @@ -31,7 +32,7 @@ use std::collections::HashMap; use std::pin::Pin; use std::sync::Arc; use std::time::Instant; -use tracing::error; +use tracing::{debug, error}; use crate::event::error::EventError; use crate::handlers::http::fetch_schema; @@ -101,7 +102,7 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result Result Result http::StatusCode { match self { - QueryError::Execute(_) | QueryError::JsonParse(_) => StatusCode::INTERNAL_SERVER_ERROR, + QueryError::Execute(_) | QueryError::Json(_) | QueryError::Arrow(_) => { + StatusCode::INTERNAL_SERVER_ERROR + } _ => StatusCode::BAD_REQUEST, } } diff --git a/src/response.rs b/src/response.rs index fe3159f71..478e12d46 100644 --- a/src/response.rs +++ b/src/response.rs @@ -17,11 +17,8 @@ */ use crate::{handlers::http::query::QueryError, utils::arrow::record_batches_to_json}; -use actix_web::HttpResponse; use datafusion::arrow::record_batch::RecordBatch; -use itertools::Itertools; -use serde_json::{json, Value}; -use tracing::info; +use serde_json::{json, Map, Value}; pub struct QueryResponse { pub records: Vec, @@ -31,30 +28,185 @@ pub struct QueryResponse { } impl QueryResponse { - pub fn to_http(&self) -> Result { - info!("{}", "Returning query results"); - let mut json_records = record_batches_to_json(&self.records)?; + /// TODO: maybe this can be futher optimized by directly converting `arrow` to `serde_json` instead of serializing to bytes + pub fn to_json(&self) -> Result { + let mut json: Vec> = record_batches_to_json(&self.records)?; if self.fill_null { - for map in &mut json_records { - for field in &self.fields { - if !map.contains_key(field) { - map.insert(field.clone(), Value::Null); - } + for object in json.iter_mut() { + for field in self.fields.iter() { + object.entry(field).or_insert(Value::Null); } } } - let values = json_records.into_iter().map(Value::Object).collect_vec(); - let response = if self.with_fields { + let json = if self.with_fields { json!({ "fields": self.fields, - "records": values + "records": json }) } else { - Value::Array(values) + json!(json) }; - Ok(HttpResponse::Ok().json(response)) + Ok(json) + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use arrow_array::{Array, Float64Array, Int64Array, RecordBatch, StringArray}; + use arrow_schema::Schema; + use serde_json::{json, Value}; + + use crate::response::QueryResponse; + + #[test] + fn check_empty_record_batches_to_json() { + let response = QueryResponse { + records: vec![RecordBatch::new_empty(Arc::new(Schema::empty()))], + fields: vec![], + fill_null: false, + with_fields: false, + }; + + assert_eq!(response.to_json().unwrap(), Value::Array(vec![])); + } + + #[test] + fn check_record_batches_to_json() { + let array1: Arc = Arc::new(Int64Array::from_iter(0..3)); + let array2: Arc = Arc::new(Float64Array::from_iter((0..3).map(|x| x as f64))); + let array3: Arc = Arc::new(StringArray::from_iter( + (0..3).map(|x| Some(format!("str {x}"))), + )); + + let record = RecordBatch::try_from_iter_with_nullable([ + ("a", array1, true), + ("b", array2, true), + ("c", array3, true), + ]) + .unwrap(); + let response = QueryResponse { + records: vec![record], + fields: vec!["a".to_owned(), "b".to_owned(), "c".to_owned()], + fill_null: false, + with_fields: false, + }; + + assert_eq!( + response.to_json().unwrap(), + json!([ + {"a": 0, "b": 0.0, "c": "str 0"}, + {"a": 1, "b": 1.0, "c": "str 1"}, + {"a": 2, "b": 2.0, "c": "str 2"} + ]) + ); + } + + #[test] + fn check_record_batches_to_json_with_fields() { + let array1: Arc = Arc::new(Int64Array::from_iter(0..3)); + let array2: Arc = Arc::new(Float64Array::from_iter((0..3).map(|x| x as f64))); + let array3: Arc = Arc::new(StringArray::from_iter( + (0..3).map(|x| Some(format!("str {x}"))), + )); + + let record = RecordBatch::try_from_iter_with_nullable([ + ("a", array1, true), + ("b", array2, true), + ("c", array3, true), + ]) + .unwrap(); + let response = QueryResponse { + records: vec![record], + fields: vec!["a".to_owned(), "b".to_owned(), "c".to_owned()], + fill_null: false, + with_fields: true, + }; + + assert_eq!( + response.to_json().unwrap(), + json!({ + "fields": ["a", "b", "c"], + "records": [ + {"a": 0, "b": 0.0, "c": "str 0"}, + {"a": 1, "b": 1.0, "c": "str 1"}, + {"a": 2, "b": 2.0, "c": "str 2"} + ] + }) + ); + } + + #[test] + fn check_record_batches_to_json_without_nulls() { + let array1: Arc = Arc::new(Int64Array::from_iter(0..3)); + let array2: Arc = Arc::new(Float64Array::from_iter((0..3).map(|x| x as f64))); + let array3: Arc = Arc::new(StringArray::from_iter((0..3).map(|x| { + if x == 1 { + Some(format!("str {x}")) + } else { + None + } + }))); + + let record = RecordBatch::try_from_iter_with_nullable([ + ("a", array1, true), + ("b", array2, true), + ("c", array3, true), + ]) + .unwrap(); + let response = QueryResponse { + records: vec![record], + fields: vec!["a".to_owned(), "b".to_owned(), "c".to_owned()], + fill_null: false, + with_fields: false, + }; + + assert_eq!( + response.to_json().unwrap(), + json!([ + {"a": 0, "b": 0.0}, + {"a": 1, "b": 1.0, "c": "str 1"}, + {"a": 2, "b": 2.0} + ]) + ); + } + + #[test] + fn check_record_batches_to_json_with_nulls() { + let array1: Arc = Arc::new(Int64Array::from_iter(0..3)); + let array2: Arc = Arc::new(Float64Array::from_iter((0..3).map(|x| x as f64))); + let array3: Arc = Arc::new(StringArray::from_iter((0..3).map(|x| { + if x == 1 { + Some(format!("str {x}")) + } else { + None + } + }))); + + let record = RecordBatch::try_from_iter_with_nullable([ + ("a", array1, true), + ("b", array2, true), + ("c", array3, true), + ]) + .unwrap(); + let response = QueryResponse { + records: vec![record], + fields: vec!["a".to_owned(), "b".to_owned(), "c".to_owned()], + fill_null: true, + with_fields: false, + }; + + assert_eq!( + response.to_json().unwrap(), + json!([ + {"a": 0, "b": 0.0, "c": null}, + {"a": 1, "b": 1.0, "c": "str 1"}, + {"a": 2, "b": 2.0, "c": null} + ]) + ); } } diff --git a/src/utils/arrow/mod.rs b/src/utils/arrow/mod.rs index e29762047..2b85bc87f 100644 --- a/src/utils/arrow/mod.rs +++ b/src/utils/arrow/mod.rs @@ -43,7 +43,7 @@ use std::sync::Arc; use arrow_array::{Array, RecordBatch, TimestampMillisecondArray, UInt64Array}; -use arrow_schema::Schema; +use arrow_schema::{ArrowError, Schema}; use arrow_select::take::take; use chrono::{DateTime, Utc}; use itertools::Itertools; @@ -51,7 +51,6 @@ use itertools::Itertools; pub mod batch_adapter; pub mod flight; -use anyhow::Result; pub use batch_adapter::adapt_batch; use serde_json::{Map, Value}; @@ -90,7 +89,9 @@ pub fn replace_columns( /// * Result>> /// /// A vector of JSON objects representing the record batches. -pub fn record_batches_to_json(records: &[RecordBatch]) -> Result>> { +pub fn record_batches_to_json( + records: &[RecordBatch], +) -> Result>, ArrowError> { let buf = vec![]; let mut writer = arrow_json::ArrayWriter::new(buf); for record in records {