Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor + test: query response serialization #1165

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 18 additions & 15 deletions src/handlers/http/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
use actix_web::http::header::ContentType;
use actix_web::web::{self, Json};
use actix_web::{FromRequest, HttpRequest, HttpResponse, Responder};
use arrow_schema::ArrowError;
use chrono::{DateTime, Utc};
use datafusion::common::tree_node::TreeNode;
use datafusion::error::DataFusionError;
Expand All @@ -31,7 +32,7 @@ use std::collections::HashMap;
use std::pin::Pin;
use std::sync::Arc;
use std::time::Instant;
use tracing::error;
use tracing::{debug, error};

use crate::event::error::EventError;
use crate::handlers::http::fetch_schema;
Expand Down Expand Up @@ -101,7 +102,7 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result<HttpRespons

user_auth_for_query(&permissions, &tables)?;

let time = Instant::now();
let start = Instant::now();
// Intercept `count(*)`` queries and use the counts API
if let Some(column_name) = query.is_logical_plan_count_without_filters() {
let counts_req = CountsRequest {
Expand All @@ -122,12 +123,13 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result<HttpRespons
Value::Array(vec![json!({column_name: count})])
};

let time = time.elapsed().as_secs_f64();

let time = start.elapsed().as_secs_f64();
QUERY_EXECUTE_TIME
.with_label_values(&[&table_name])
.observe(time);

debug!("Query results returned in {time}s");

return Ok(HttpResponse::Ok().json(response));
}

Expand All @@ -139,15 +141,16 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result<HttpRespons
fill_null: query_request.send_null,
with_fields: query_request.fields,
}
.to_http()?;

let time = time.elapsed().as_secs_f64();
.to_json()?;

let time = start.elapsed().as_secs_f64();
QUERY_EXECUTE_TIME
.with_label_values(&[&table_name])
.observe(time);

Ok(response)
debug!("Query results returned in {time}s");

Ok(HttpResponse::Ok().json(response))
}

pub async fn get_counts(
Expand Down Expand Up @@ -309,24 +312,24 @@ pub enum QueryError {
EventError(#[from] EventError),
#[error("Error: {0}")]
MalformedQuery(&'static str),
#[allow(unused)]
#[error(
r#"Error: Failed to Parse Record Batch into Json
Description: {0}"#
)]
JsonParse(String),
#[error("Error: {0}")]
ActixError(#[from] actix_web::Error),
#[error("Error: {0}")]
Anyhow(#[from] anyhow::Error),
#[error("Arrow Error: {0}")]
Arrow(#[from] ArrowError),
#[error("Error: Failed to Parse Record Batch into Json: {0}")]
Json(#[from] serde_json::Error),
#[error("Error: {0}")]
StreamNotFound(#[from] StreamNotFound),
}

impl actix_web::ResponseError for QueryError {
fn status_code(&self) -> http::StatusCode {
match self {
QueryError::Execute(_) | QueryError::JsonParse(_) => StatusCode::INTERNAL_SERVER_ERROR,
QueryError::Execute(_) | QueryError::Json(_) | QueryError::Arrow(_) => {
StatusCode::INTERNAL_SERVER_ERROR
}
_ => StatusCode::BAD_REQUEST,
}
}
Expand Down
186 changes: 169 additions & 17 deletions src/response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,8 @@
*/

use crate::{handlers::http::query::QueryError, utils::arrow::record_batches_to_json};
use actix_web::HttpResponse;
use datafusion::arrow::record_batch::RecordBatch;
use itertools::Itertools;
use serde_json::{json, Value};
use tracing::info;
use serde_json::{json, Map, Value};

pub struct QueryResponse {
pub records: Vec<RecordBatch>,
Expand All @@ -31,30 +28,185 @@ pub struct QueryResponse {
}

impl QueryResponse {
pub fn to_http(&self) -> Result<HttpResponse, QueryError> {
info!("{}", "Returning query results");
let mut json_records = record_batches_to_json(&self.records)?;
/// TODO: maybe this can be futher optimized by directly converting `arrow` to `serde_json` instead of serializing to bytes
pub fn to_json(&self) -> Result<Value, QueryError> {
let mut json: Vec<Map<String, Value>> = record_batches_to_json(&self.records)?;

if self.fill_null {
for map in &mut json_records {
for field in &self.fields {
if !map.contains_key(field) {
map.insert(field.clone(), Value::Null);
}
for object in json.iter_mut() {
for field in self.fields.iter() {
object.entry(field).or_insert(Value::Null);
}
}
}
let values = json_records.into_iter().map(Value::Object).collect_vec();

let response = if self.with_fields {
let json = if self.with_fields {
json!({
"fields": self.fields,
"records": values
"records": json
})
} else {
Value::Array(values)
json!(json)
};

Ok(HttpResponse::Ok().json(response))
Ok(json)
}
}

#[cfg(test)]
mod tests {
use std::sync::Arc;

use arrow_array::{Array, Float64Array, Int64Array, RecordBatch, StringArray};
use arrow_schema::Schema;
use serde_json::{json, Value};

use crate::response::QueryResponse;

#[test]
fn check_empty_record_batches_to_json() {
let response = QueryResponse {
records: vec![RecordBatch::new_empty(Arc::new(Schema::empty()))],
fields: vec![],
fill_null: false,
with_fields: false,
};

assert_eq!(response.to_json().unwrap(), Value::Array(vec![]));
}

#[test]
fn check_record_batches_to_json() {
let array1: Arc<dyn Array> = Arc::new(Int64Array::from_iter(0..3));
let array2: Arc<dyn Array> = Arc::new(Float64Array::from_iter((0..3).map(|x| x as f64)));
let array3: Arc<dyn Array> = Arc::new(StringArray::from_iter(
(0..3).map(|x| Some(format!("str {x}"))),
));

let record = RecordBatch::try_from_iter_with_nullable([
("a", array1, true),
("b", array2, true),
("c", array3, true),
])
.unwrap();
let response = QueryResponse {
records: vec![record],
fields: vec!["a".to_owned(), "b".to_owned(), "c".to_owned()],
fill_null: false,
with_fields: false,
};

assert_eq!(
response.to_json().unwrap(),
json!([
{"a": 0, "b": 0.0, "c": "str 0"},
{"a": 1, "b": 1.0, "c": "str 1"},
{"a": 2, "b": 2.0, "c": "str 2"}
])
);
}

#[test]
fn check_record_batches_to_json_with_fields() {
let array1: Arc<dyn Array> = Arc::new(Int64Array::from_iter(0..3));
let array2: Arc<dyn Array> = Arc::new(Float64Array::from_iter((0..3).map(|x| x as f64)));
let array3: Arc<dyn Array> = Arc::new(StringArray::from_iter(
(0..3).map(|x| Some(format!("str {x}"))),
));

let record = RecordBatch::try_from_iter_with_nullable([
("a", array1, true),
("b", array2, true),
("c", array3, true),
])
.unwrap();
let response = QueryResponse {
records: vec![record],
fields: vec!["a".to_owned(), "b".to_owned(), "c".to_owned()],
fill_null: false,
with_fields: true,
};

assert_eq!(
response.to_json().unwrap(),
json!({
"fields": ["a", "b", "c"],
"records": [
{"a": 0, "b": 0.0, "c": "str 0"},
{"a": 1, "b": 1.0, "c": "str 1"},
{"a": 2, "b": 2.0, "c": "str 2"}
]
})
);
}

#[test]
fn check_record_batches_to_json_without_nulls() {
let array1: Arc<dyn Array> = Arc::new(Int64Array::from_iter(0..3));
let array2: Arc<dyn Array> = Arc::new(Float64Array::from_iter((0..3).map(|x| x as f64)));
let array3: Arc<dyn Array> = Arc::new(StringArray::from_iter((0..3).map(|x| {
if x == 1 {
Some(format!("str {x}"))
} else {
None
}
})));

let record = RecordBatch::try_from_iter_with_nullable([
("a", array1, true),
("b", array2, true),
("c", array3, true),
])
.unwrap();
let response = QueryResponse {
records: vec![record],
fields: vec!["a".to_owned(), "b".to_owned(), "c".to_owned()],
fill_null: false,
with_fields: false,
};

assert_eq!(
response.to_json().unwrap(),
json!([
{"a": 0, "b": 0.0},
{"a": 1, "b": 1.0, "c": "str 1"},
{"a": 2, "b": 2.0}
])
);
}

#[test]
fn check_record_batches_to_json_with_nulls() {
let array1: Arc<dyn Array> = Arc::new(Int64Array::from_iter(0..3));
let array2: Arc<dyn Array> = Arc::new(Float64Array::from_iter((0..3).map(|x| x as f64)));
let array3: Arc<dyn Array> = Arc::new(StringArray::from_iter((0..3).map(|x| {
if x == 1 {
Some(format!("str {x}"))
} else {
None
}
})));

let record = RecordBatch::try_from_iter_with_nullable([
("a", array1, true),
("b", array2, true),
("c", array3, true),
])
.unwrap();
let response = QueryResponse {
records: vec![record],
fields: vec!["a".to_owned(), "b".to_owned(), "c".to_owned()],
fill_null: true,
with_fields: false,
};

assert_eq!(
response.to_json().unwrap(),
json!([
{"a": 0, "b": 0.0, "c": null},
{"a": 1, "b": 1.0, "c": "str 1"},
{"a": 2, "b": 2.0, "c": null}
])
);
}
}
7 changes: 4 additions & 3 deletions src/utils/arrow/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,15 +43,14 @@
use std::sync::Arc;

use arrow_array::{Array, RecordBatch, TimestampMillisecondArray, UInt64Array};
use arrow_schema::Schema;
use arrow_schema::{ArrowError, Schema};
use arrow_select::take::take;
use chrono::{DateTime, Utc};
use itertools::Itertools;

pub mod batch_adapter;
pub mod flight;

use anyhow::Result;
pub use batch_adapter::adapt_batch;
use serde_json::{Map, Value};

Expand Down Expand Up @@ -90,7 +89,9 @@ pub fn replace_columns(
/// * Result<Vec<Map<String, Value>>>
///
/// A vector of JSON objects representing the record batches.
pub fn record_batches_to_json(records: &[RecordBatch]) -> Result<Vec<Map<String, Value>>> {
pub fn record_batches_to_json(
records: &[RecordBatch],
) -> Result<Vec<Map<String, Value>>, ArrowError> {
let buf = vec![];
let mut writer = arrow_json::ArrayWriter::new(buf);
for record in records {
Expand Down
Loading