From 4d4a97947d86c60993f70331218545d748e2b741 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Thu, 6 Feb 2025 14:15:53 +0530 Subject: [PATCH 1/2] refactor + test: query response serialization --- src/handlers/http/query.rs | 15 +-- src/response.rs | 196 ++++++++++++++++++++++++++++++++++--- src/utils/arrow/mod.rs | 34 ------- 3 files changed, 189 insertions(+), 56 deletions(-) diff --git a/src/handlers/http/query.rs b/src/handlers/http/query.rs index 5cfc791ad..1375aca3d 100644 --- a/src/handlers/http/query.rs +++ b/src/handlers/http/query.rs @@ -19,6 +19,7 @@ use actix_web::http::header::ContentType; use actix_web::web::{self, Json}; use actix_web::{FromRequest, HttpRequest, HttpResponse, Responder}; +use arrow_schema::ArrowError; use chrono::{DateTime, Utc}; use datafusion::common::tree_node::TreeNode; use datafusion::error::DataFusionError; @@ -308,22 +309,22 @@ pub enum QueryError { EventError(#[from] EventError), #[error("Error: {0}")] MalformedQuery(&'static str), - #[allow(unused)] - #[error( - r#"Error: Failed to Parse Record Batch into Json -Description: {0}"# - )] - JsonParse(String), #[error("Error: {0}")] ActixError(#[from] actix_web::Error), #[error("Error: {0}")] Anyhow(#[from] anyhow::Error), + #[error("Arrow Error: {0}")] + Arrow(#[from] ArrowError), + #[error("Error: Failed to Parse Record Batch into Json: {0}")] + Json(#[from] serde_json::Error), } impl actix_web::ResponseError for QueryError { fn status_code(&self) -> http::StatusCode { match self { - QueryError::Execute(_) | QueryError::JsonParse(_) => StatusCode::INTERNAL_SERVER_ERROR, + QueryError::Execute(_) | QueryError::Json(_) | QueryError::Arrow(_) => { + StatusCode::INTERNAL_SERVER_ERROR + } _ => StatusCode::BAD_REQUEST, } } diff --git a/src/response.rs b/src/response.rs index 8211bf4c7..6633e83f0 100644 --- a/src/response.rs +++ b/src/response.rs @@ -16,11 +16,10 @@ * */ -use crate::{handlers::http::query::QueryError, utils::arrow::record_batches_to_json}; +use crate::handlers::http::query::QueryError; use actix_web::HttpResponse; use datafusion::arrow::record_batch::RecordBatch; -use itertools::Itertools; -use serde_json::{json, Value}; +use serde_json::{json, Map, Value}; use tracing::info; pub struct QueryResponse { @@ -31,31 +30,198 @@ pub struct QueryResponse { } impl QueryResponse { + /// TODO: maybe this can be futher optimized by directly converting `arrow` to `serde_json` instead of serializing to bytes pub fn to_http(&self) -> Result { - info!("{}", "Returning query results"); + info!("Returning query results"); + let response = self.to_json()?; + + Ok(HttpResponse::Ok().json(response)) + } + + fn to_json(&self) -> Result { + let buf = vec![]; + let mut writer = arrow_json::ArrayWriter::new(buf); let records: Vec<&RecordBatch> = self.records.iter().collect(); - let mut json_records = record_batches_to_json(&records)?; + writer.write_batches(&records)?; + writer.finish()?; + + let mut json: Vec> = serde_json::from_slice(&writer.into_inner())?; if self.fill_null { - for map in &mut json_records { - for field in &self.fields { - if !map.contains_key(field) { - map.insert(field.clone(), Value::Null); - } + for object in json.iter_mut() { + for field in self.fields.iter() { + object.entry(field).or_insert(Value::Null); } } } - let values = json_records.into_iter().map(Value::Object).collect_vec(); - let response = if self.with_fields { + let json = if self.with_fields { json!({ "fields": self.fields, - "records": values + "records": json }) } else { - Value::Array(values) + json!(json) }; - Ok(HttpResponse::Ok().json(response)) + Ok(json) + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use arrow_array::{Array, Float64Array, Int64Array, RecordBatch, StringArray}; + use arrow_schema::Schema; + use serde_json::{json, Value}; + + use crate::response::QueryResponse; + + #[test] + fn check_empty_record_batches_to_json() { + let response = QueryResponse { + records: vec![RecordBatch::new_empty(Arc::new(Schema::empty()))], + fields: vec![], + fill_null: false, + with_fields: false, + }; + + assert_eq!(response.to_json().unwrap(), Value::Array(vec![])); + } + + #[test] + fn check_record_batches_to_json() { + let array1: Arc = Arc::new(Int64Array::from_iter(0..3)); + let array2: Arc = Arc::new(Float64Array::from_iter((0..3).map(|x| x as f64))); + let array3: Arc = Arc::new(StringArray::from_iter( + (0..3).map(|x| Some(format!("str {x}"))), + )); + + let record = RecordBatch::try_from_iter_with_nullable([ + ("a", array1, true), + ("b", array2, true), + ("c", array3, true), + ]) + .unwrap(); + let response = QueryResponse { + records: vec![record], + fields: vec!["a".to_owned(), "b".to_owned(), "c".to_owned()], + fill_null: false, + with_fields: false, + }; + + assert_eq!( + response.to_json().unwrap(), + json!([ + {"a": 0, "b": 0.0, "c": "str 0"}, + {"a": 1, "b": 1.0, "c": "str 1"}, + {"a": 2, "b": 2.0, "c": "str 2"} + ]) + ); + } + + #[test] + fn check_record_batches_to_json_with_fields() { + let array1: Arc = Arc::new(Int64Array::from_iter(0..3)); + let array2: Arc = Arc::new(Float64Array::from_iter((0..3).map(|x| x as f64))); + let array3: Arc = Arc::new(StringArray::from_iter( + (0..3).map(|x| Some(format!("str {x}"))), + )); + + let record = RecordBatch::try_from_iter_with_nullable([ + ("a", array1, true), + ("b", array2, true), + ("c", array3, true), + ]) + .unwrap(); + let response = QueryResponse { + records: vec![record], + fields: vec!["a".to_owned(), "b".to_owned(), "c".to_owned()], + fill_null: false, + with_fields: true, + }; + + assert_eq!( + response.to_json().unwrap(), + json!({ + "fields": ["a", "b", "c"], + "records": [ + {"a": 0, "b": 0.0, "c": "str 0"}, + {"a": 1, "b": 1.0, "c": "str 1"}, + {"a": 2, "b": 2.0, "c": "str 2"} + ] + }) + ); + } + + #[test] + fn check_record_batches_to_json_without_nulls() { + let array1: Arc = Arc::new(Int64Array::from_iter(0..3)); + let array2: Arc = Arc::new(Float64Array::from_iter((0..3).map(|x| x as f64))); + let array3: Arc = Arc::new(StringArray::from_iter((0..3).map(|x| { + if x == 1 { + Some(format!("str {x}")) + } else { + None + } + }))); + + let record = RecordBatch::try_from_iter_with_nullable([ + ("a", array1, true), + ("b", array2, true), + ("c", array3, true), + ]) + .unwrap(); + let response = QueryResponse { + records: vec![record], + fields: vec!["a".to_owned(), "b".to_owned(), "c".to_owned()], + fill_null: false, + with_fields: false, + }; + + assert_eq!( + response.to_json().unwrap(), + json!([ + {"a": 0, "b": 0.0}, + {"a": 1, "b": 1.0, "c": "str 1"}, + {"a": 2, "b": 2.0} + ]) + ); + } + + #[test] + fn check_record_batches_to_json_with_nulls() { + let array1: Arc = Arc::new(Int64Array::from_iter(0..3)); + let array2: Arc = Arc::new(Float64Array::from_iter((0..3).map(|x| x as f64))); + let array3: Arc = Arc::new(StringArray::from_iter((0..3).map(|x| { + if x == 1 { + Some(format!("str {x}")) + } else { + None + } + }))); + + let record = RecordBatch::try_from_iter_with_nullable([ + ("a", array1, true), + ("b", array2, true), + ("c", array3, true), + ]) + .unwrap(); + let response = QueryResponse { + records: vec![record], + fields: vec!["a".to_owned(), "b".to_owned(), "c".to_owned()], + fill_null: true, + with_fields: false, + }; + + assert_eq!( + response.to_json().unwrap(), + json!([ + {"a": 0, "b": 0.0, "c": null}, + {"a": 1, "b": 1.0, "c": "str 1"}, + {"a": 2, "b": 2.0, "c": null} + ]) + ); } } diff --git a/src/utils/arrow/mod.rs b/src/utils/arrow/mod.rs index 3cdc5193c..ef72f2f04 100644 --- a/src/utils/arrow/mod.rs +++ b/src/utils/arrow/mod.rs @@ -51,9 +51,7 @@ use itertools::Itertools; pub mod batch_adapter; pub mod flight; -use anyhow::Result; pub use batch_adapter::adapt_batch; -use serde_json::{Map, Value}; /// Replaces columns in a record batch with new arrays. /// @@ -80,30 +78,6 @@ pub fn replace_columns( RecordBatch::try_new(schema, batch_arrays).unwrap() } -/// Converts a slice of record batches to JSON. -/// -/// # Arguments -/// -/// * `records` - The record batches to convert. -/// -/// # Returns -/// * Result>> -/// -/// A vector of JSON objects representing the record batches. -pub fn record_batches_to_json(records: &[&RecordBatch]) -> Result>> { - let buf = vec![]; - let mut writer = arrow_json::ArrayWriter::new(buf); - writer.write_batches(records)?; - writer.finish()?; - - let buf = writer.into_inner(); - - let json_rows: Vec> = - serde_json::from_reader(buf.as_slice()).unwrap_or_default(); - - Ok(json_rows) -} - /// Retrieves a field from a slice of fields by name. /// /// # Arguments @@ -185,14 +159,6 @@ mod tests { assert_eq!(new_rb.num_rows(), 3) } - #[test] - fn check_empty_json_to_record_batches() { - let r = RecordBatch::new_empty(Arc::new(Schema::empty())); - let rb = vec![&r]; - let batches = record_batches_to_json(&rb).unwrap(); - assert_eq!(batches, vec![]); - } - #[test] fn test_timestamp_array_has_correct_size_and_value() { let size = 5; From e515b271e844ca22332c78cd041e679759809e30 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Thu, 6 Feb 2025 15:00:48 +0530 Subject: [PATCH 2/2] refactor: simplify query response flow --- src/handlers/http/query.rs | 18 ++++++++++-------- src/response.rs | 11 +---------- 2 files changed, 11 insertions(+), 18 deletions(-) diff --git a/src/handlers/http/query.rs b/src/handlers/http/query.rs index 1375aca3d..96c02d3cb 100644 --- a/src/handlers/http/query.rs +++ b/src/handlers/http/query.rs @@ -32,7 +32,7 @@ use std::collections::HashMap; use std::pin::Pin; use std::sync::Arc; use std::time::Instant; -use tracing::error; +use tracing::{debug, error}; use crate::event::error::EventError; use crate::handlers::http::fetch_schema; @@ -104,7 +104,7 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result Result Result, @@ -31,14 +29,7 @@ pub struct QueryResponse { impl QueryResponse { /// TODO: maybe this can be futher optimized by directly converting `arrow` to `serde_json` instead of serializing to bytes - pub fn to_http(&self) -> Result { - info!("Returning query results"); - let response = self.to_json()?; - - Ok(HttpResponse::Ok().json(response)) - } - - fn to_json(&self) -> Result { + pub fn to_json(&self) -> Result { let buf = vec![]; let mut writer = arrow_json::ArrayWriter::new(buf); let records: Vec<&RecordBatch> = self.records.iter().collect();