diff --git a/src/cli.rs b/src/cli.rs index 44317dd56..84719609f 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -282,6 +282,14 @@ pub struct Options { )] pub ingestor_endpoint: String, + #[arg( + long, + env = "P_JSON_FLATTEN_DEPTH_LIMIT", + default_value = "3", + help = "Set the depth limit for flattening nested JSON" + )] + pub json_flatten_depth_limit: usize, + #[command(flatten)] oidc: Option, diff --git a/src/event/format/json.rs b/src/event/format/json.rs index 71fcaffc7..5006be142 100644 --- a/src/event/format/json.rs +++ b/src/event/format/json.rs @@ -94,8 +94,8 @@ impl EventFormat for Event { }; if value_arr - .iter() - .any(|value| fields_mismatch(&schema, value, schema_version)) + .iter() + .any(|value| fields_mismatch(&schema, value, schema_version)) { return Err(anyhow!( "Could not process this event due to mismatch in datatype" diff --git a/src/event/format/mod.rs b/src/event/format/mod.rs index 2b2c2a0b3..c0a2ec323 100644 --- a/src/event/format/mod.rs +++ b/src/event/format/mod.rs @@ -112,11 +112,8 @@ pub trait EventFormat: Sized { time_partition: Option<&String>, schema_version: SchemaVersion, ) -> Result<(RecordBatch, bool), AnyError> { - let (data, mut schema, is_first) = self.to_data( - storage_schema, - time_partition, - schema_version, - )?; + let (data, mut schema, is_first) = + self.to_data(storage_schema, time_partition, schema_version)?; if get_field(&schema, DEFAULT_TIMESTAMP_KEY).is_some() { return Err(anyhow!( diff --git a/src/handlers/http/ingest.rs b/src/handlers/http/ingest.rs index b3da07761..4435e14df 100644 --- a/src/handlers/http/ingest.rs +++ b/src/handlers/http/ingest.rs @@ -383,13 +383,12 @@ mod tests { use arrow::datatypes::Int64Type; use arrow_array::{ArrayRef, Float64Array, Int64Array, ListArray, StringArray}; use arrow_schema::{DataType, Field}; - use serde_json::json; + use serde_json::{json, Value}; use std::{collections::HashMap, sync::Arc}; use crate::{ - handlers::http::modal::utils::ingest_utils::into_event_batch, - metadata::SchemaVersion, - utils::json::{convert_array_to_object, flatten::convert_to_array}, + handlers::http::modal::utils::ingest_utils::into_event_batch, metadata::SchemaVersion, + utils::json::flatten_json_body, }; trait TestExt { @@ -534,21 +533,6 @@ mod tests { assert_eq!(rb.num_columns(), 1); } - #[test] - fn non_object_arr_is_err() { - let json = json!([1]); - - assert!(convert_array_to_object( - json, - None, - None, - None, - SchemaVersion::V0, - &crate::event::format::LogSource::default() - ) - .is_err()) - } - #[test] fn array_into_recordbatch_inffered_schema() { let json = json!([ @@ -717,11 +701,11 @@ mod tests { let json = json!([ { "a": 1, - "b": "hello", + "b": "hello" }, { "a": 1, - "b": "hello", + "b": "hello" }, { "a": 1, @@ -732,72 +716,66 @@ mod tests { "a": 1, "b": "hello", "c": [{"a": 1, "b": 2}] - }, + } ]); - let flattened_json = convert_to_array( - convert_array_to_object( - json, - None, - None, - None, - SchemaVersion::V0, - &crate::event::format::LogSource::default(), - ) - .unwrap(), - ) - .unwrap(); - - let (rb, _) = into_event_batch( - flattened_json, - HashMap::default(), - false, + let data = flatten_json_body( + json, + None, + None, None, SchemaVersion::V0, + &crate::event::format::LogSource::default(), + 3, ) .unwrap(); - assert_eq!(rb.num_rows(), 4); - assert_eq!(rb.num_columns(), 5); - assert_eq!( - rb.column_by_name("a").unwrap().as_int64_arr().unwrap(), - &Int64Array::from(vec![Some(1), Some(1), Some(1), Some(1)]) - ); - assert_eq!( - rb.column_by_name("b").unwrap().as_utf8_arr().unwrap(), - &StringArray::from(vec![ - Some("hello"), - Some("hello"), - Some("hello"), - Some("hello") - ]) - ); - - assert_eq!( - rb.column_by_name("c_a") - .unwrap() - .as_any() - .downcast_ref::() - .unwrap(), - &ListArray::from_iter_primitive::(vec![ - None, - None, - Some(vec![Some(1i64)]), - Some(vec![Some(1)]) - ]) - ); - - assert_eq!( - rb.column_by_name("c_b") - .unwrap() - .as_any() - .downcast_ref::() - .unwrap(), - &ListArray::from_iter_primitive::(vec![ - None, - None, - None, - Some(vec![Some(2i64)]) - ]) - ); + for value in data { + let (rb, _) = + into_event_batch(value, HashMap::default(), false, None, SchemaVersion::V0) + .unwrap(); + assert_eq!(rb.num_rows(), 4); + assert_eq!(rb.num_columns(), 5); + assert_eq!( + rb.column_by_name("a").unwrap().as_int64_arr().unwrap(), + &Int64Array::from(vec![Some(1), Some(1), Some(1), Some(1)]) + ); + assert_eq!( + rb.column_by_name("b").unwrap().as_utf8_arr().unwrap(), + &StringArray::from(vec![ + Some("hello"), + Some("hello"), + Some("hello"), + Some("hello") + ]) + ); + + assert_eq!( + rb.column_by_name("c_a") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(), + &ListArray::from_iter_primitive::(vec![ + None, + None, + Some(vec![Some(1i64)]), + Some(vec![Some(1)]) + ]) + ); + + assert_eq!( + rb.column_by_name("c_b") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(), + &ListArray::from_iter_primitive::(vec![ + None, + None, + None, + Some(vec![Some(2i64)]) + ]) + ); + } } #[test] @@ -822,21 +800,20 @@ mod tests { "c": [{"a": 1, "b": 2}] }, ]); - let flattened_json = convert_to_array( - convert_array_to_object( - json, - None, - None, - None, - SchemaVersion::V1, - &crate::event::format::LogSource::default(), - ) - .unwrap(), + let flattened_json = flatten_json_body( + json, + None, + None, + None, + SchemaVersion::V1, + &crate::event::format::LogSource::default(), + 3, ) .unwrap(); + let arr_flattened_json = Value::Array(flattened_json); let (rb, _) = into_event_batch( - flattened_json, + arr_flattened_json, HashMap::default(), false, None, @@ -844,11 +821,11 @@ mod tests { ) .unwrap(); - assert_eq!(rb.num_rows(), 4); + assert_eq!(rb.num_rows(), 5); assert_eq!(rb.num_columns(), 5); assert_eq!( rb.column_by_name("a").unwrap().as_float64_arr().unwrap(), - &Float64Array::from(vec![Some(1.0), Some(1.0), Some(1.0), Some(1.0)]) + &Float64Array::from(vec![Some(1.0), Some(1.0), Some(1.0), Some(1.0), Some(1.0)]) ); assert_eq!( rb.column_by_name("b").unwrap().as_utf8_arr().unwrap(), @@ -856,18 +833,19 @@ mod tests { Some("hello"), Some("hello"), Some("hello"), + Some("hello"), Some("hello") ]) ); assert_eq!( rb.column_by_name("c_a").unwrap().as_float64_arr().unwrap(), - &Float64Array::from(vec![None, None, Some(1.0), Some(1.0)]) + &Float64Array::from(vec![None, None, Some(1.0), Some(1.0), None]) ); assert_eq!( rb.column_by_name("c_b").unwrap().as_float64_arr().unwrap(), - &Float64Array::from(vec![None, None, None, Some(2.0)]) + &Float64Array::from(vec![None, None, None, None, Some(2.0)]) ); } } diff --git a/src/handlers/http/modal/ingest_server.rs b/src/handlers/http/modal/ingest_server.rs index ce7ec20b2..215f79478 100644 --- a/src/handlers/http/modal/ingest_server.rs +++ b/src/handlers/http/modal/ingest_server.rs @@ -249,7 +249,7 @@ impl IngestServer { web::put() .to(ingestor_logstream::put_stream) .authorize_for_stream(Action::CreateStream), - ) + ), ) .service( // GET "/logstream/{logstream}/info" ==> Get info for given log stream diff --git a/src/handlers/http/modal/utils/ingest_utils.rs b/src/handlers/http/modal/utils/ingest_utils.rs index 3a2b9c797..51d7a96d7 100644 --- a/src/handlers/http/modal/utils/ingest_utils.rs +++ b/src/handlers/http/modal/utils/ingest_utils.rs @@ -32,8 +32,9 @@ use crate::{ kinesis::{flatten_kinesis_logs, Message}, }, metadata::{SchemaVersion, STREAM_INFO}, + option::CONFIG, storage::StreamType, - utils::json::{convert_array_to_object, flatten::convert_to_array}, + utils::json::flatten_json_body, }; pub async fn flatten_and_push_logs( @@ -69,25 +70,27 @@ pub async fn push_logs( let static_schema_flag = STREAM_INFO.get_static_schema_flag(stream_name)?; let custom_partition = STREAM_INFO.get_custom_partition(stream_name)?; let schema_version = STREAM_INFO.get_schema_version(stream_name)?; - + let json_flatten_depth_limit = CONFIG.options.json_flatten_depth_limit; let data = if time_partition.is_some() || custom_partition.is_some() { - convert_array_to_object( + flatten_json_body( json, time_partition.as_ref(), time_partition_limit, custom_partition.as_ref(), schema_version, log_source, + json_flatten_depth_limit, )? } else { - vec![convert_to_array(convert_array_to_object( + vec![Value::Array(flatten_json_body( json, - None, - None, - None, + time_partition.as_ref(), + time_partition_limit, + custom_partition.as_ref(), schema_version, log_source, - )?)?] + json_flatten_depth_limit, + )?)] }; for value in data { diff --git a/src/utils/json/flatten.rs b/src/utils/json/flatten.rs index 58809618c..fa7e942c6 100644 --- a/src/utils/json/flatten.rs +++ b/src/utils/json/flatten.rs @@ -49,6 +49,8 @@ pub enum JsonFlattenError { ExpectedObjectInArray, #[error("Found non-object element while flattening array of objects")] NonObjectInArray, + #[error("JSON hierarchy exceeds maximum depth of {0} levels")] + InvalidHierarchy(String), } // Recursively flattens JSON objects and arrays, e.g. with the separator `.`, starting from the TOP @@ -59,14 +61,11 @@ pub fn flatten( time_partition: Option<&String>, time_partition_limit: Option, custom_partition: Option<&String>, - validation_required: bool, ) -> Result<(), JsonFlattenError> { match nested_value { Value::Object(nested_dict) => { - if validation_required { - validate_time_partition(nested_dict, time_partition, time_partition_limit)?; - validate_custom_partition(nested_dict, custom_partition)?; - } + validate_time_partition(nested_dict, time_partition, time_partition_limit)?; + validate_custom_partition(nested_dict, custom_partition)?; let mut map = Map::new(); flatten_object(&mut map, None, nested_dict, separator)?; *nested_dict = map; @@ -80,7 +79,6 @@ pub fn flatten( time_partition, time_partition_limit, custom_partition, - validation_required, )?; } } @@ -270,6 +268,15 @@ pub fn flatten_array_objects( Ok(()) } +pub struct FlattenContext<'a> { + pub current_level: usize, + pub separator: &'a str, + pub time_partition: Option<&'a String>, + pub time_partition_limit: Option, + pub custom_partition: Option<&'a String>, + pub flatten_depth_limit: usize, +} + /// Recursively flattens a JSON value. /// - If the value is an array, it flattens all elements of the array. /// - If the value is an object, it flattens all nested objects and arrays. @@ -282,95 +289,183 @@ pub fn flatten_array_objects( /// 3. `[{"a": [{"b": 1}, {"c": 2}]}]` ~> `[{"a": {"b": 1)}}, {"a": {"c": 2)}}]` /// 4. `{"a": [{"b": 1}, {"c": 2}], "d": {"e": 4}}` ~> `[{"a": {"b":1}, "d": {"e":4}}, {"a": {"c":2}, "d": {"e":4}}]` /// 5. `{"a":{"b":{"c":{"d":{"e":["a","b"]}}}}}` ~> returns error - heavily nested, cannot flatten this JSON -pub fn generic_flattening(value: &Value) -> Result, JsonFlattenError> { +pub fn generic_flattening( + value: &mut Value, + context: &FlattenContext, + parent_key: Option<&str>, +) -> Result, JsonFlattenError> { + if context.current_level > context.flatten_depth_limit { + return Err(JsonFlattenError::InvalidHierarchy( + context.flatten_depth_limit.to_string(), + )); + } + match value { - Value::Array(arr) => Ok(arr - .iter() - .flat_map(|flatten_item| generic_flattening(flatten_item).unwrap_or_default()) - .collect()), - Value::Object(map) => { - let results = map - .iter() - .fold(vec![Map::new()], |results, (key, val)| match val { - Value::Array(arr) => arr - .iter() - .flat_map(|flatten_item| { - generic_flattening(flatten_item).unwrap_or_default() - }) - .flat_map(|flattened_item| { - results.iter().map(move |result| { - let mut new_obj = result.clone(); - new_obj.insert(key.clone(), flattened_item.clone()); - new_obj - }) - }) - .collect(), - Value::Object(_) => generic_flattening(val) - .unwrap_or_default() - .iter() - .flat_map(|nested_result| { - results.iter().map(move |result| { - let mut new_obj = result.clone(); - new_obj.insert(key.clone(), nested_result.clone()); - new_obj - }) - }) - .collect(), - _ => results - .into_iter() - .map(|mut result| { - result.insert(key.clone(), val.clone()); - result - }) - .collect(), - }); - - Ok(results.into_iter().map(Value::Object).collect()) - } + Value::Array(arr) => process_json_array(arr, context, parent_key), + Value::Object(map) => process_json_object(map, context, parent_key), _ => Ok(vec![value.clone()]), } } -/// recursively checks the level of nesting for the serde Value -/// if Value has more than 4 levels of hierarchy, returns true -/// example - -/// 1. `{"a":{"b":{"c":{"d":{"e":["a","b"]}}}}}` ~> returns true -/// 2. `{"a": [{"b": 1}, {"c": 2}], "d": {"e": 4}}` ~> returns false -pub fn has_more_than_four_levels(value: &Value, current_level: usize) -> bool { - if current_level > 4 { - return true; +fn process_json_array( + arr: &mut [Value], + context: &FlattenContext, + parent_key: Option<&str>, +) -> Result, JsonFlattenError> { + if let Some(parent) = parent_key { + return Ok(arr + .iter_mut() + .map(|flattened_item| { + let mut map = Map::new(); + map.insert(parent.to_string(), flattened_item.clone()); + Value::Object(map) + }) + .collect()); + } + Ok(arr + .iter_mut() + .flat_map(|flattened_item| { + generic_flattening(flattened_item, context, parent_key).unwrap_or_default() + }) + .collect()) +} + +fn process_json_object( + map: &mut Map, + context: &FlattenContext, + parent_key: Option<&str>, +) -> Result, JsonFlattenError> { + if context.current_level == 1 { + validate_time_partition(map, context.time_partition, context.time_partition_limit)?; + validate_custom_partition(map, context.custom_partition)?; } - match value { - Value::Array(arr) => arr - .iter() - .any(|item| has_more_than_four_levels(item, current_level)), - Value::Object(map) => map - .values() - .any(|val| has_more_than_four_levels(val, current_level + 1)), - _ => false, + + let mut results = vec![Map::new()]; + + for (key, val) in map.iter_mut() { + let new_key = create_nested_key(parent_key, key, context.separator); + let new_results = match val { + Value::Array(arr) => process_array_value(arr, &new_key, &results, context.separator), + Value::Object(_) => process_object_value(val, &new_key, &results, context), + _ => Ok(create_results_with_value(&results, &new_key, val)), + }?; + + if !new_results.is_empty() { + results = new_results; + } } + + Ok(results.into_iter().map(Value::Object).collect()) } -// Converts a Vector of values into a `Value::Array`, as long as all of them are objects -pub fn convert_to_array(flattened: Vec) -> Result { - let mut result = Vec::new(); - for item in flattened { - let mut map = Map::new(); - let Some(item) = item.as_object() else { - return Err(JsonFlattenError::ExpectedObjectInArray); - }; - for (key, value) in item { - map.insert(key.clone(), value.clone()); +fn create_nested_key(parent_key: Option<&str>, key: &str, separator: &str) -> String { + match parent_key { + Some(parent) => format!("{parent}{separator}{key}"), + None => key.to_string(), + } +} + +fn process_array_value( + arr: &mut [Value], + new_key: &str, + results: &[Map], + separator: &str, +) -> Result>, JsonFlattenError> { + let mut new_results = Vec::new(); + + for item in arr.iter_mut() { + match item { + Value::Object(obj) => { + new_results.extend(flatten_nested_object(obj, new_key, results, separator)); + } + _ => { + new_results.extend(create_results_with_value(results, new_key, item)); + } } - result.push(Value::Object(map)); } - Ok(Value::Array(result)) + + Ok(new_results) +} + +fn flatten_nested_object( + obj: &mut Map, + base_key: &str, + results: &[Map], + separator: &str, +) -> Vec> { + let mut temp_results = Vec::new(); + + for (k, v) in obj { + let nested_key = format!("{base_key}{separator}{k}"); + match v { + Value::Array(nested_arr) => { + for arr_item in nested_arr { + temp_results.extend(create_results_with_value(results, &nested_key, arr_item)); + } + } + _ => { + temp_results.extend(create_results_with_value(results, &nested_key, v)); + } + } + } + + temp_results +} + +fn create_results_with_value( + results: &[Map], + key: &str, + value: &Value, +) -> Vec> { + results + .iter() + .map(|result| { + let mut new_obj = result.clone(); + new_obj.insert(key.to_string(), value.clone()); + new_obj + }) + .collect() +} + +fn process_object_value( + val: &mut Value, + new_key: &str, + results: &[Map], + context: &FlattenContext, +) -> Result>, JsonFlattenError> { + let nested_context = FlattenContext { + current_level: context.current_level + 1, + ..*context + }; + + let nested_results = generic_flattening(val, &nested_context, Some(new_key))?; + + Ok(nested_results + .into_iter() + .flat_map(|nested| { + if let Value::Object(obj) = nested { + results + .iter() + .map(|result| { + let mut new_obj = result.clone(); + new_obj.extend(obj.clone()); + new_obj + }) + .collect() + } else { + vec![] + } + }) + .collect()) } #[cfg(test)] mod tests { + use std::vec; + use crate::utils::json::flatten::{ - flatten_array_objects, generic_flattening, has_more_than_four_levels, + create_nested_key, flatten_array_objects, generic_flattening, process_json_array, + process_json_object, FlattenContext, }; use super::{flatten, JsonFlattenError}; @@ -380,7 +475,7 @@ mod tests { fn flatten_single_key_string() { let mut obj = json!({"key": "value"}); let expected = obj.clone(); - flatten(&mut obj, "_", None, None, None, false).unwrap(); + flatten(&mut obj, "_", None, None, None).unwrap(); assert_eq!(obj, expected); } @@ -388,7 +483,7 @@ mod tests { fn flatten_single_key_int() { let mut obj = json!({"key": 1}); let expected = obj.clone(); - flatten(&mut obj, "_", None, None, None, false).unwrap(); + flatten(&mut obj, "_", None, None, None).unwrap(); assert_eq!(obj, expected); } @@ -396,7 +491,7 @@ mod tests { fn flatten_multiple_key_value() { let mut obj = json!({"key1": 1, "key2": "value2"}); let expected = obj.clone(); - flatten(&mut obj, "_", None, None, None, false).unwrap(); + flatten(&mut obj, "_", None, None, None).unwrap(); assert_eq!(obj, expected); } @@ -404,7 +499,7 @@ mod tests { fn flatten_nested_single_key_value() { let mut obj = json!({"key": "value", "nested_key": {"key":"value"}}); let expected = json!({"key": "value", "nested_key.key": "value"}); - flatten(&mut obj, ".", None, None, None, false).unwrap(); + flatten(&mut obj, ".", None, None, None).unwrap(); assert_eq!(obj, expected); } @@ -413,7 +508,7 @@ mod tests { let mut obj = json!({"key": "value", "nested_key": {"key1":"value1", "key2": "value2"}}); let expected = json!({"key": "value", "nested_key.key1": "value1", "nested_key.key2": "value2"}); - flatten(&mut obj, ".", None, None, None, false).unwrap(); + flatten(&mut obj, ".", None, None, None).unwrap(); assert_eq!(obj, expected); } @@ -421,7 +516,7 @@ mod tests { fn nested_key_value_with_array() { let mut obj = json!({"key": "value", "nested_key": {"key1":[1,2,3]}}); let expected = json!({"key": "value", "nested_key.key1": [1,2,3]}); - flatten(&mut obj, ".", None, None, None, false).unwrap(); + flatten(&mut obj, ".", None, None, None).unwrap(); assert_eq!(obj, expected); } @@ -429,7 +524,7 @@ mod tests { fn nested_obj_array() { let mut obj = json!({"key": [{"a": "value0"}, {"a": "value1"}]}); let expected = json!({"key.a": ["value0", "value1"]}); - flatten(&mut obj, ".", None, None, None, false).unwrap(); + flatten(&mut obj, ".", None, None, None).unwrap(); assert_eq!(obj, expected); } @@ -437,7 +532,7 @@ mod tests { fn nested_obj_array_nulls() { let mut obj = json!({"key": [{"a": "value0"}, {"a": "value1", "b": "value1"}]}); let expected = json!({"key.a": ["value0", "value1"], "key.b": [null, "value1"]}); - flatten(&mut obj, ".", None, None, None, false).unwrap(); + flatten(&mut obj, ".", None, None, None).unwrap(); assert_eq!(obj, expected); } @@ -445,7 +540,7 @@ mod tests { fn nested_obj_array_nulls_reversed() { let mut obj = json!({"key": [{"a": "value0", "b": "value0"}, {"a": "value1"}]}); let expected = json!({"key.a": ["value0", "value1"], "key.b": ["value0", null]}); - flatten(&mut obj, ".", None, None, None, false).unwrap(); + flatten(&mut obj, ".", None, None, None).unwrap(); assert_eq!(obj, expected); } @@ -453,7 +548,7 @@ mod tests { fn nested_obj_array_nested_obj() { let mut obj = json!({"key": [{"a": {"p": 0}, "b": "value0"}, {"b": "value1"}]}); let expected = json!({"key.a.p": [0, null], "key.b": ["value0", "value1"]}); - flatten(&mut obj, ".", None, None, None, false).unwrap(); + flatten(&mut obj, ".", None, None, None).unwrap(); assert_eq!(obj, expected); } @@ -461,14 +556,14 @@ mod tests { fn nested_obj_array_nested_obj_array() { let mut obj = json!({"key": [{"a": [{"p": "value0", "q": "value0"}, {"p": "value1", "q": null}], "b": "value0"}, {"b": "value1"}]}); let expected = json!({"key.a.p": [["value0", "value1"], null], "key.a.q": [["value0", null], null], "key.b": ["value0", "value1"]}); - flatten(&mut obj, ".", None, None, None, false).unwrap(); + flatten(&mut obj, ".", None, None, None).unwrap(); assert_eq!(obj, expected); } #[test] fn flatten_mixed_object() { let mut obj = json!({"a": 42, "arr": ["1", {"key": "2"}, {"key": {"nested": "3"}}]}); - assert!(flatten(&mut obj, ".", None, None, None, false).is_err()); + assert!(flatten(&mut obj, ".", None, None, None).is_err()); } #[test] @@ -561,22 +656,22 @@ mod tests { let mut value = json!({ "a": 1, }); - assert!(flatten(&mut value, "_", None, None, Some(&"a".to_string()), true).is_ok()); + assert!(flatten(&mut value, "_", None, None, Some(&"a".to_string())).is_ok()); let mut value = json!({ "a": true, }); - assert!(flatten(&mut value, "_", None, None, Some(&"a".to_string()), true).is_ok()); + assert!(flatten(&mut value, "_", None, None, Some(&"a".to_string())).is_ok()); let mut value = json!({ "a": "yes", }); - assert!(flatten(&mut value, "_", None, None, Some(&"a".to_string()), true).is_ok()); + assert!(flatten(&mut value, "_", None, None, Some(&"a".to_string())).is_ok()); let mut value = json!({ "a": -1, }); - assert!(flatten(&mut value, "_", None, None, Some(&"a".to_string()), true).is_ok()); + assert!(flatten(&mut value, "_", None, None, Some(&"a".to_string())).is_ok()); } #[test] @@ -585,7 +680,7 @@ mod tests { "a": null, }); matches!( - flatten(&mut value, "_", None, None, Some(&"a".to_string()), true).unwrap_err(), + flatten(&mut value, "_", None, None, Some(&"a".to_string())).unwrap_err(), JsonFlattenError::FieldEmptyOrNull(_) ); @@ -593,7 +688,7 @@ mod tests { "a": "", }); matches!( - flatten(&mut value, "_", None, None, Some(&"a".to_string()), true).unwrap_err(), + flatten(&mut value, "_", None, None, Some(&"a".to_string())).unwrap_err(), JsonFlattenError::FieldEmptyOrNull(_) ); @@ -601,7 +696,7 @@ mod tests { "a": {"b": 1}, }); matches!( - flatten(&mut value, "_", None, None, Some(&"a".to_string()), true).unwrap_err(), + flatten(&mut value, "_", None, None, Some(&"a".to_string())).unwrap_err(), JsonFlattenError::FieldIsObject(_) ); @@ -609,7 +704,7 @@ mod tests { "a": ["b", "c"], }); matches!( - flatten(&mut value, "_", None, None, Some(&"a".to_string()), true).unwrap_err(), + flatten(&mut value, "_", None, None, Some(&"a".to_string())).unwrap_err(), JsonFlattenError::FieldIsArray(_) ); @@ -617,7 +712,7 @@ mod tests { "a": "b.c", }); matches!( - flatten(&mut value, "_", None, None, Some(&"a".to_string()), true).unwrap_err(), + flatten(&mut value, "_", None, None, Some(&"a".to_string())).unwrap_err(), JsonFlattenError::FieldContainsPeriod(_) ); @@ -625,27 +720,155 @@ mod tests { "a": 1.0, }); matches!( - flatten(&mut value, "_", None, None, Some(&"a".to_string()), true).unwrap_err(), + flatten(&mut value, "_", None, None, Some(&"a".to_string())).unwrap_err(), JsonFlattenError::FieldContainsPeriod(_) ); } - #[test] fn unacceptable_levels_of_nested_json() { - let value = json!({"a":{"b":{"c":{"d":{"e":["a","b"]}}}}}); - assert!(has_more_than_four_levels(&value, 1)); + let mut value = json!({"a":{"b":{"c":{"d":{"e":["a","b"]}}}}}); + let context = FlattenContext { + current_level: 1, + separator: "_", + time_partition: None, + time_partition_limit: None, + custom_partition: None, + flatten_depth_limit: 3, + }; + assert!(generic_flattening(&mut value, &context, None).is_err()); } #[test] fn acceptable_levels_of_nested_json() { - let value = json!({"a":{"b":{"e":["a","b"]}}}); - assert!(!has_more_than_four_levels(&value, 1)); + let mut value = json!({"a":{"b":{"e":["a","b"]}}}); + let context = FlattenContext { + current_level: 1, + separator: "_", + time_partition: None, + time_partition_limit: None, + custom_partition: None, + flatten_depth_limit: 3, + }; + assert!(generic_flattening(&mut value, &context, None).is_ok()); } - #[test] fn flatten_json() { - let value = json!({"a":{"b":{"e":["a","b"]}}}); - let expected = vec![json!({"a":{"b":{"e":"a"}}}), json!({"a":{"b":{"e":"b"}}})]; - assert_eq!(generic_flattening(&value).unwrap(), expected); + let mut value = json!({"a":{"b":{"e":["a","b"]}}}); + let expected = vec![json!({"a_b_e":"a"}), json!({"a_b_e":"b"})]; + let context = FlattenContext { + current_level: 1, + separator: "_", + time_partition: None, + time_partition_limit: None, + custom_partition: None, + flatten_depth_limit: 3, + }; + assert_eq!( + generic_flattening(&mut value, &context, None).unwrap(), + expected + ); + } + + #[test] + fn test_process_json_array() { + let context = FlattenContext { + current_level: 1, + separator: "_", + time_partition: None, + time_partition_limit: None, + custom_partition: None, + flatten_depth_limit: 3, + }; + + let mut input = json!([ + {"name": "John", "age": 30, "address": {"city": "New York", "state": "NY"}}, + {"name": "Jane", "age": 25, "address": {"city": "New York", "state": "NY"}} + ]); + + let input_arr = input.as_array_mut().unwrap(); + + let expected = vec![ + json!({"name": "John", "age": 30, "address_city": "New York", "address_state": "NY"}), + json!({"name": "Jane", "age": 25, "address_city": "New York", "address_state": "NY"}), + ]; + + assert_eq!( + process_json_array(input_arr, &context, None).unwrap(), + expected + ); + + let mut input = json!([ + {"name": "John", "age": 30, "address": {"city": "New York", "state": "NY"}, "phone": ["123", "456"]}, + {"name": "Jane", "age": 25, "address": {"city": "New York", "state": "NY"}, "phone": ["789", "101"]} + ]); + let input_arr = input.as_array_mut().unwrap(); + + let expected = vec![ + json!({"name": "John", "age": 30, "address_city": "New York", "address_state": "NY", "phone": "123"}), + json!({"name": "John", "age": 30, "address_city": "New York", "address_state": "NY", "phone": "456"}), + json!({"name": "Jane", "age": 25, "address_city": "New York", "address_state": "NY", "phone": "789"}), + json!({"name": "Jane", "age": 25, "address_city": "New York", "address_state": "NY", "phone": "101"}), + ]; + + assert_eq!( + process_json_array(input_arr, &context, None).unwrap(), + expected + ); + } + + #[test] + fn test_process_json_object() { + let context = FlattenContext { + current_level: 1, + separator: "_", + time_partition: None, + time_partition_limit: None, + custom_partition: None, + flatten_depth_limit: 3, + }; + + let mut input = json!( + {"name": "Jane", "age": 25, "address": {"city": "New York", "state": "NY"}} + ); + + let input_map = input.as_object_mut().unwrap(); + + let expected = vec![ + json!({"name": "Jane", "age": 25, "address_city": "New York", "address_state": "NY"}), + ]; + + assert_eq!( + process_json_object(input_map, &context, None).unwrap(), + expected + ); + + let mut input = json!( + {"name": "Jane", "age": 25, "address": {"city": "New York", "state": "NY"}, "phone": ["123", "456"]} + ); + + let input_map = input.as_object_mut().unwrap(); + + let expected = vec![ + json!({"name": "Jane", "age": 25, "address_city": "New York", "address_state": "NY", "phone":"123"}), + json!({"name": "Jane", "age": 25, "address_city": "New York", "address_state": "NY", "phone":"456"}), + ]; + + assert_eq!( + process_json_object(input_map, &context, None).unwrap(), + expected + ); + } + + #[test] + fn test_create_nested_key() { + let parent_key = Some("a"); + let key = "b"; + let separator = "_"; + assert_eq!(create_nested_key(parent_key, key, separator), "a_b"); + + let parent_key = None; + let key = "b"; + let separator = "_"; + assert_eq!(create_nested_key(parent_key, key, separator), "b"); } } diff --git a/src/utils/json/mod.rs b/src/utils/json/mod.rs index 0f5c05812..6a751016d 100644 --- a/src/utils/json/mod.rs +++ b/src/utils/json/mod.rs @@ -19,7 +19,7 @@ use std::fmt; use std::num::NonZeroU32; -use flatten::{convert_to_array, generic_flattening, has_more_than_four_levels}; +use flatten::{generic_flattening, FlattenContext}; use serde::de::Visitor; use serde_json; use serde_json::Value; @@ -38,57 +38,38 @@ pub fn flatten_json_body( time_partition_limit: Option, custom_partition: Option<&String>, schema_version: SchemaVersion, - validation_required: bool, log_source: &LogSource, -) -> Result { + json_flatten_depth_limit: usize, +) -> Result, anyhow::Error> { // Flatten the json body only if new schema and has less than 4 levels of nesting - let mut nested_value = if schema_version == SchemaVersion::V1 - && !has_more_than_four_levels(&body, 1) + let mut body = body.clone(); + let nested_value = if schema_version == SchemaVersion::V1 && matches!( log_source, LogSource::Json | LogSource::Custom(_) | LogSource::Kinesis ) { - let flattened_json = generic_flattening(&body)?; - convert_to_array(flattened_json)? + let flatten_context = FlattenContext { + current_level: 1, + separator: "_", + time_partition, + time_partition_limit, + custom_partition, + flatten_depth_limit: json_flatten_depth_limit, + }; + generic_flattening(&mut body, &flatten_context, None)? } else { - body + flatten::flatten( + &mut body, + "_", + time_partition, + time_partition_limit, + custom_partition, + )?; + vec![body] }; - flatten::flatten( - &mut nested_value, - "_", - time_partition, - time_partition_limit, - custom_partition, - validation_required, - )?; Ok(nested_value) } -pub fn convert_array_to_object( - body: Value, - time_partition: Option<&String>, - time_partition_limit: Option, - custom_partition: Option<&String>, - schema_version: SchemaVersion, - log_source: &LogSource, -) -> Result, anyhow::Error> { - let data = flatten_json_body( - body, - time_partition, - time_partition_limit, - custom_partition, - schema_version, - true, - log_source, - )?; - let value_arr = match data { - Value::Array(arr) => arr, - value @ Value::Object(_) => vec![value], - _ => unreachable!("flatten would have failed beforehand"), - }; - Ok(value_arr) -} - pub fn convert_to_string(value: &Value) -> Value { match value { Value::Null => Value::String("null".to_owned()), @@ -173,7 +154,7 @@ mod tests { #[test] fn hierarchical_json_flattening_success() { let value = json!({"a":{"b":{"e":["a","b"]}}}); - let expected = json!([{"a_b_e": "a"}, {"a_b_e": "b"}]); + let expected = vec![json!({"a_b_e": "a"}), json!({"a_b_e": "b"})]; assert_eq!( flatten_json_body( value, @@ -181,8 +162,8 @@ mod tests { None, None, crate::metadata::SchemaVersion::V1, - false, - &LogSource::default() + &LogSource::default(), + 3 ) .unwrap(), expected @@ -192,20 +173,16 @@ mod tests { #[test] fn hierarchical_json_flattening_failure() { let value = json!({"a":{"b":{"c":{"d":{"e":["a","b"]}}}}}); - let expected = json!({"a_b_c_d_e": ["a","b"]}); - assert_eq!( - flatten_json_body( - value, - None, - None, - None, - crate::metadata::SchemaVersion::V1, - false, - &LogSource::default() - ) - .unwrap(), - expected - ); + assert!(flatten_json_body( + value, + None, + None, + None, + crate::metadata::SchemaVersion::V1, + &LogSource::default(), + 3 + ) + .is_err()); } #[derive(Serialize, Deserialize)]