diff --git a/src/connectors/kafka/processor.rs b/src/connectors/kafka/processor.rs index 10025ed98..ebefff4dc 100644 --- a/src/connectors/kafka/processor.rs +++ b/src/connectors/kafka/processor.rs @@ -27,10 +27,7 @@ use tracing::{debug, error}; use crate::{ connectors::common::processor::Processor, - event::{ - format::{json, EventFormat, LogSourceEntry}, - Event as ParseableEvent, - }, + event::format::{json, EventFormat, LogSource, LogSourceEntry}, parseable::PARSEABLE, storage::StreamType, }; @@ -41,10 +38,7 @@ use super::{config::BufferConfig, ConsumerRecord, StreamConsumer, TopicPartition pub struct ParseableSinkProcessor; impl ParseableSinkProcessor { - async fn build_event_from_chunk( - &self, - records: &[ConsumerRecord], - ) -> anyhow::Result { + async fn process_event_from_chunk(&self, records: &[ConsumerRecord]) -> anyhow::Result { let stream_name = records .first() .map(|r| r.topic.as_str()) @@ -59,35 +53,27 @@ impl ParseableSinkProcessor { ) .await?; - let stream = PARSEABLE.get_stream(stream_name)?; - let schema = stream.get_schema_raw(); - let time_partition = stream.get_time_partition(); - let custom_partition = stream.get_custom_partition(); - let static_schema_flag = stream.get_static_schema_flag(); - let schema_version = stream.get_schema_version(); - let mut json_vec = Vec::with_capacity(records.len()); - let mut total_payload_size = 0u64; + let mut total_payload_size = 0; for record in records.iter().filter_map(|r| r.payload.as_ref()) { - total_payload_size += record.len() as u64; + total_payload_size += record.len(); if let Ok(value) = serde_json::from_slice::(record) { json_vec.push(value); } } - let p_event = json::Event::new(Value::Array(json_vec)).into_event( - stream_name.to_string(), + let stream = PARSEABLE.get_or_create_stream(stream_name); + + json::Event::new( + Value::Array(json_vec), total_payload_size, - &schema, - static_schema_flag, - custom_partition.as_ref(), - time_partition.as_ref(), - schema_version, - StreamType::UserDefined, - )?; - - Ok(p_event) + LogSource::Custom("Kafka".to_owned()), + ) + .into_event(&stream)? + .process(&stream)?; + + Ok(total_payload_size) } } @@ -97,9 +83,9 @@ impl Processor, ()> for ParseableSinkProcessor { let len = records.len(); debug!("Processing {len} records"); - self.build_event_from_chunk(&records).await?.process()?; + let size = self.process_event_from_chunk(&records).await?; - debug!("Processed {len} records"); + debug!("Processed {len} records, size = {size} Bytes"); Ok(()) } } diff --git a/src/event/format/json.rs b/src/event/format/json.rs index 903ab2752..90c6a481d 100644 --- a/src/event/format/json.rs +++ b/src/event/format/json.rs @@ -26,111 +26,209 @@ use arrow_schema::{DataType, Field, Fields, Schema}; use chrono::{DateTime, NaiveDate, NaiveDateTime, Utc}; use datafusion::arrow::util::bit_util::round_upto_multiple_of_64; use itertools::Itertools; +use opentelemetry_proto::tonic::{ + logs::v1::LogsData, metrics::v1::MetricsData, trace::v1::TracesData, +}; use serde_json::Value; -use std::{collections::HashMap, sync::Arc}; +use std::{ + collections::{HashMap, HashSet}, + num::NonZeroU32, + sync::Arc, +}; use tracing::error; -use super::EventFormat; -use crate::{metadata::SchemaVersion, storage::StreamType, utils::arrow::get_field}; +use super::{EventFormat, LogSource}; +use crate::{ + event::{get_schema_key, PartitionEvent}, + kinesis::{flatten_kinesis_logs, Message}, + metadata::SchemaVersion, + otel::{logs::flatten_otel_logs, metrics::flatten_otel_metrics, traces::flatten_otel_traces}, + parseable::Stream, + utils::{ + arrow::get_field, + json::{flatten_json_body, Json}, + time::Minute, + }, + OBJECT_STORE_DATA_GRANULARITY, +}; + +struct JsonPartition { + batch: Vec, + schema: Vec>, + parsed_timestamp: NaiveDateTime, +} pub struct Event { pub json: Value, + pub origin_size: usize, pub p_timestamp: DateTime, + pub log_source: LogSource, } impl Event { - pub fn new(json: Value) -> Self { + pub fn new(json: Value, origin_size: usize, log_source: LogSource) -> Self { Self { json, + origin_size, p_timestamp: Utc::now(), + log_source, } } -} -impl EventFormat for Event { - type Data = Vec; + pub fn flatten_logs( + self, + time_partition: Option<&String>, + time_partition_limit: Option, + custom_partitions: Option<&String>, + schema_version: SchemaVersion, + ) -> anyhow::Result> { + let data = match self.log_source { + LogSource::Kinesis => { + //custom flattening required for Amazon Kinesis + let message: Message = serde_json::from_value(self.json)?; + flatten_kinesis_logs(message) + } + LogSource::OtelLogs => { + //custom flattening required for otel logs + let logs: LogsData = serde_json::from_value(self.json)?; + flatten_otel_logs(&logs) + } + LogSource::OtelTraces => { + //custom flattening required for otel traces + let traces: TracesData = serde_json::from_value(self.json)?; + flatten_otel_traces(&traces) + } + LogSource::OtelMetrics => { + //custom flattening required for otel metrics + let metrics: MetricsData = serde_json::from_value(self.json)?; + flatten_otel_metrics(metrics) + } + _ => vec![self.json], + }; + + let mut logs = vec![]; + for json in data { + let json = flatten_json_body( + json, + time_partition, + time_partition_limit, + custom_partitions, + schema_version, + true, + &self.log_source, + )?; + + // incoming event may be a single json or a json array + // but Data (type defined above) is a vector of json values + // hence we need to convert the incoming event to a vector of json values + match json { + Value::Array(arr) => { + for log in arr { + let Value::Object(json) = log else { + return Err(anyhow!( + "Expected an object or a list of objects, received: {log:?}" + )); + }; + logs.push(json); + } + } + Value::Object(obj) => logs.push(obj), + _ => unreachable!("flatten would have failed beforehand"), + } + } - /// Returns the time at ingestion, i.e. the `p_timestamp` value - fn get_p_timestamp(&self) -> DateTime { - self.p_timestamp + Ok(logs) } +} + +impl EventFormat for Event { + type Data = Json; // convert the incoming json to a vector of json values // also extract the arrow schema, tags and metadata from the incoming json fn to_data( self, - schema: &HashMap>, time_partition: Option<&String>, + time_partition_limit: Option, + custom_partitions: Option<&String>, schema_version: SchemaVersion, - static_schema_flag: bool, - ) -> Result<(Self::Data, Vec>, bool), anyhow::Error> { - let stream_schema = schema; - - // incoming event may be a single json or a json array - // but Data (type defined above) is a vector of json values - // hence we need to convert the incoming event to a vector of json values - let value_arr = match self.json { - Value::Array(arr) => arr, - value @ Value::Object(_) => vec![value], - _ => unreachable!("flatten would have failed beforehand"), - }; + ) -> anyhow::Result> { + self.flatten_logs( + time_partition, + time_partition_limit, + custom_partitions, + schema_version, + ) + } + fn infer_schema( + data: &Self::Data, + stored_schema: &HashMap>, + time_partition: Option<&String>, + static_schema_flag: bool, + schema_version: SchemaVersion, + ) -> anyhow::Result<(super::EventSchema, bool)> { // collect all the keys from all the json objects in the request body - let fields = - collect_keys(value_arr.iter()).expect("fields can be collected from array of objects"); + let fields = collect_keys(data); let mut is_first = false; - let schema = match derive_arrow_schema(stream_schema, fields) { - Ok(schema) => schema, - Err(_) => { - let mut infer_schema = infer_json_schema_from_iterator(value_arr.iter().map(Ok)) + let schema = if let Some(schema) = derive_arrow_schema(stored_schema, fields) { + schema + } else { + // TODO: + let mut infer_schema = + infer_json_schema_from_iterator([Ok(Value::Object(data.clone()))].into_iter()) .map_err(|err| { anyhow!("Could not infer schema for this event due to err {:?}", err) })?; - let new_infer_schema = super::update_field_type_in_schema( - Arc::new(infer_schema), - Some(stream_schema), - time_partition, - Some(&value_arr), - schema_version, - ); - infer_schema = Schema::new(new_infer_schema.fields().clone()); - Schema::try_merge(vec![ - Schema::new(stream_schema.values().cloned().collect::()), - infer_schema.clone(), - ]).map_err(|err| anyhow!("Could not merge schema of this event with that of the existing stream. {:?}", err))?; - is_first = true; - infer_schema - .fields - .iter() - .filter(|field| !field.data_type().is_null()) - .cloned() - .sorted_by(|a, b| a.name().cmp(b.name())) - .collect() - } + let new_infer_schema = super::update_field_type_in_schema( + Arc::new(infer_schema), + Some(stored_schema), + time_partition, + Some(data), + schema_version, + ); + infer_schema = Schema::new(new_infer_schema.fields().clone()); + Schema::try_merge(vec![ + Schema::new(stored_schema.values().cloned().collect::()), + infer_schema.clone(), + ]) + .map_err(|err| { + anyhow!( + "Could not merge schema of this event with that of the existing stream. {:?}", + err + ) + })?; + is_first = true; + infer_schema + .fields + .iter() + .filter(|field| !field.data_type().is_null()) + .cloned() + .sorted_by(|a, b| a.name().cmp(b.name())) + .collect() }; - if value_arr - .iter() - .any(|value| fields_mismatch(&schema, value, schema_version, static_schema_flag)) - { + if fields_mismatch(&schema, data, schema_version, static_schema_flag) { return Err(anyhow!( "Could not process this event due to mismatch in datatype" )); } - Ok((value_arr, schema, is_first)) + let schema = Self::prepare_and_validate_schema(schema, stored_schema, static_schema_flag)?; + + Ok((schema, is_first)) } // Convert the Data type (defined above) to arrow record batch - fn decode(data: Self::Data, schema: Arc) -> Result { + fn decode(data: &[Self::Data], schema: Arc) -> anyhow::Result { let array_capacity = round_upto_multiple_of_64(data.len()); let mut reader = ReaderBuilder::new(schema) .with_batch_size(array_capacity) .with_coerce_primitive(false) .build_decoder()?; - reader.serialize(&data)?; + reader.serialize(data)?; match reader.flush() { Ok(Some(recordbatch)) => Ok(recordbatch), Err(err) => Err(anyhow!("Failed to create recordbatch due to {:?}", err)), @@ -139,55 +237,123 @@ impl EventFormat for Event { } /// Converts a JSON event into a Parseable Event - fn into_event( - self, - stream_name: String, - origin_size: u64, - storage_schema: &HashMap>, - static_schema_flag: bool, - custom_partitions: Option<&String>, - time_partition: Option<&String>, - schema_version: SchemaVersion, - stream_type: StreamType, - ) -> Result { - let custom_partition_values = match custom_partitions.as_ref() { - Some(custom_partition) => { - let custom_partitions = custom_partition.split(',').collect_vec(); - extract_custom_partition_values(&self.json, &custom_partitions) - } - None => HashMap::new(), - }; - - let parsed_timestamp = match time_partition { - Some(time_partition) => extract_and_parse_time(&self.json, time_partition)?, - _ => self.p_timestamp.naive_utc(), - }; + fn into_event(self, stream: &Stream) -> anyhow::Result { + let time_partition = stream.get_time_partition(); + let time_partition_limit = stream.get_time_partition_limit(); + let static_schema_flag = stream.get_static_schema_flag(); + let custom_partitions = stream.get_custom_partition(); + let schema_version = stream.get_schema_version(); + let stored_schema = stream.get_schema_raw(); + let stream_type = stream.get_stream_type(); - let (rb, is_first_event) = self.into_recordbatch( - storage_schema, - static_schema_flag, - time_partition, + let p_timestamp = self.p_timestamp; + let origin_size = self.origin_size; + let data = self.to_data( + time_partition.as_ref(), + time_partition_limit, + custom_partitions.as_ref(), schema_version, )?; + let mut is_first_event = false; + let mut json_partitions = HashMap::new(); + for json in data { + let (schema, is_first) = Self::infer_schema( + &json, + &stored_schema, + time_partition.as_ref(), + static_schema_flag, + schema_version, + )?; + + is_first_event = is_first_event || is_first; + let custom_partition_values = match custom_partitions.as_ref() { + Some(custom_partitions) => { + let custom_partitions = custom_partitions.split(',').collect_vec(); + extract_custom_partition_values(&json, &custom_partitions) + } + None => HashMap::new(), + }; + + let parsed_timestamp = match time_partition.as_ref() { + Some(time_partition) => extract_and_parse_time(&json, time_partition)?, + _ => p_timestamp.naive_utc(), + }; + + let prefix = generate_prefix(&schema, parsed_timestamp, &custom_partition_values); + if let Some(JsonPartition { batch, .. }) = json_partitions.get_mut(&prefix) { + batch.push(json) + } else { + json_partitions.insert( + prefix, + JsonPartition { + batch: vec![json], + schema, + parsed_timestamp, + }, + ); + } + } + + let mut partitions = HashMap::new(); + for ( + prefix, + JsonPartition { + batch, + schema, + parsed_timestamp, + }, + ) in json_partitions + { + let batch = Self::into_recordbatch( + p_timestamp, + &batch, + &schema, + time_partition.as_ref(), + schema_version, + )?; + + partitions.insert( + prefix, + PartitionEvent { + rb: batch, + parsed_timestamp, + }, + ); + } + Ok(super::Event { - rb, - stream_name, origin_format: "json", origin_size, is_first_event, - parsed_timestamp, - time_partition: None, - custom_partition_values, + partitions, stream_type, }) } } +fn generate_prefix( + schema: &[Arc], + parsed_timestamp: NaiveDateTime, + custom_partition_values: &HashMap, +) -> String { + format!( + "{}.{}.minute={}{}", + get_schema_key(schema), + parsed_timestamp.format("date=%Y-%m-%d.hour=%H"), + Minute::from(parsed_timestamp).to_slot(OBJECT_STORE_DATA_GRANULARITY), + custom_partition_values + .iter() + .sorted_by_key(|v| v.0) + .map(|(key, value)| format!(".{key}={value}")) + .join("") + ) +} + /// Extracts custom partition values from provided JSON object /// e.g. `json: {"status": 400, "msg": "Hello, World!"}, custom_partition_list: ["status"]` returns `{"status" => 400}` pub fn extract_custom_partition_values( - json: &Value, + json: &Json, custom_partition_list: &[&str], ) -> HashMap { let mut custom_partition_values: HashMap = HashMap::new(); @@ -208,10 +374,7 @@ pub fn extract_custom_partition_values( /// Returns the parsed timestamp of deignated time partition from json object /// e.g. `json: {"timestamp": "2025-05-15T15:30:00Z"}` returns `2025-05-15T15:30:00` -fn extract_and_parse_time( - json: &Value, - time_partition: &str, -) -> Result { +fn extract_and_parse_time(json: &Json, time_partition: &str) -> anyhow::Result { let current_time = json .get(time_partition) .ok_or_else(|| anyhow!("Missing field for time partition in json: {time_partition}"))?; @@ -222,56 +385,38 @@ fn extract_and_parse_time( // Returns arrow schema with the fields that are present in the request body // This schema is an input to convert the request body to arrow record batch +// Returns None if even one of the fields in the json is new and not seen before fn derive_arrow_schema( schema: &HashMap>, - fields: Vec<&str>, -) -> Result>, ()> { + fields: HashSet<&str>, +) -> Option>> { let mut res = Vec::with_capacity(fields.len()); - let fields = fields.into_iter().map(|field_name| schema.get(field_name)); - for field in fields { - let Some(field) = field else { return Err(()) }; + for field_name in fields { + let field = schema.get(field_name)?; res.push(field.clone()) } - Ok(res) + + Some(res) } -fn collect_keys<'a>(values: impl Iterator) -> Result, ()> { - let mut keys = Vec::new(); - for value in values { - if let Some(obj) = value.as_object() { - for key in obj.keys() { - match keys.binary_search(&key.as_str()) { - Ok(_) => (), - Err(pos) => { - keys.insert(pos, key.as_str()); - } - } - } - } else { - return Err(()); - } - } - Ok(keys) +// Returns a list of keys that are present in the given iterable of JSON objects +// Returns None if even one of the value is not an Object +fn collect_keys(object: &Json) -> HashSet<&str> { + object.keys().map(|k| k.as_str()).collect() } +// Returns true when the field doesn't exist in schema or has an invalid type fn fields_mismatch( schema: &[Arc], - body: &Value, + body: &Json, schema_version: SchemaVersion, static_schema_flag: bool, ) -> bool { - for (name, val) in body.as_object().expect("body is of object variant") { - if val.is_null() { - continue; - } - let Some(field) = get_field(schema, name) else { - return true; - }; - if !valid_type(field, val, schema_version, static_schema_flag) { - return true; - } - } - false + body.iter().any(|(key, value)| { + !value.is_null() + && get_field(schema, key) + .is_none_or(|field| !valid_type(field, value, schema_version, static_schema_flag)) + }) } fn valid_type( @@ -386,6 +531,9 @@ fn validate_struct( mod tests { use std::str::FromStr; + use arrow::datatypes::Int64Type; + use arrow_array::{ArrayRef, Float64Array, Int64Array, ListArray, StringArray}; + use chrono::Timelike; use serde_json::json; use super::*; @@ -393,7 +541,7 @@ mod tests { #[test] fn parse_time_parition_from_value() { let json = json!({"timestamp": "2025-05-15T15:30:00Z"}); - let parsed = extract_and_parse_time(&json, "timestamp"); + let parsed = extract_and_parse_time(json.as_object().unwrap(), "timestamp"); let expected = NaiveDateTime::from_str("2025-05-15T15:30:00").unwrap(); assert_eq!(parsed.unwrap(), expected); @@ -402,7 +550,7 @@ mod tests { #[test] fn time_parition_not_in_json() { let json = json!({"hello": "world!"}); - let parsed = extract_and_parse_time(&json, "timestamp"); + let parsed = extract_and_parse_time(json.as_object().unwrap(), "timestamp"); assert!(parsed.is_err()); } @@ -410,8 +558,540 @@ mod tests { #[test] fn time_parition_not_parseable_as_datetime() { let json = json!({"timestamp": "not time"}); - let parsed = extract_and_parse_time(&json, "timestamp"); + let parsed = extract_and_parse_time(json.as_object().unwrap(), "timestamp"); assert!(parsed.is_err()); } + + trait TestExt { + fn as_int64_arr(&self) -> Option<&Int64Array>; + fn as_float64_arr(&self) -> Option<&Float64Array>; + fn as_utf8_arr(&self) -> Option<&StringArray>; + } + + impl TestExt for ArrayRef { + fn as_int64_arr(&self) -> Option<&Int64Array> { + self.as_any().downcast_ref() + } + + fn as_float64_arr(&self) -> Option<&Float64Array> { + self.as_any().downcast_ref() + } + + fn as_utf8_arr(&self) -> Option<&StringArray> { + self.as_any().downcast_ref() + } + } + + fn fields_to_map(iter: impl Iterator) -> HashMap> { + iter.map(|x| (x.name().clone(), Arc::new(x))).collect() + } + + #[test] + fn basic_object_into_rb() { + let json = json!({ + "c": 4.23, + "a": 1, + "b": "hello", + }); + + let store_schema = HashMap::default(); + let data = Event::new(json, 0 /* doesn't matter */, LogSource::Json) + .to_data(None, None, None, SchemaVersion::V0) + .unwrap(); + let (schema, _) = + Event::infer_schema(&data[0], &store_schema, None, false, SchemaVersion::V0).unwrap(); + let rb = + Event::into_recordbatch(Utc::now(), &data, &schema, None, SchemaVersion::V0).unwrap(); + + assert_eq!(rb.num_rows(), 1); + assert_eq!(rb.num_columns(), 4); + assert_eq!( + rb.column_by_name("a").unwrap().as_int64_arr().unwrap(), + &Int64Array::from_iter([1]) + ); + assert_eq!( + rb.column_by_name("b").unwrap().as_utf8_arr().unwrap(), + &StringArray::from_iter_values(["hello"]) + ); + assert_eq!( + rb.column_by_name("c").unwrap().as_float64_arr().unwrap(), + &Float64Array::from_iter([4.23]) + ); + } + + #[test] + fn basic_object_with_null_into_rb() { + let json = json!({ + "a": 1, + "b": "hello", + "c": null + }); + + let store_schema = HashMap::default(); + let data = Event::new(json, 0 /* doesn't matter */, LogSource::Json) + .to_data(None, None, None, SchemaVersion::V0) + .unwrap(); + let (schema, _) = + Event::infer_schema(&data[0], &store_schema, None, false, SchemaVersion::V0).unwrap(); + let rb = + Event::into_recordbatch(Utc::now(), &data, &schema, None, SchemaVersion::V0).unwrap(); + + assert_eq!(rb.num_rows(), 1); + assert_eq!(rb.num_columns(), 3); + assert_eq!( + rb.column_by_name("a").unwrap().as_int64_arr().unwrap(), + &Int64Array::from_iter([1]) + ); + assert_eq!( + rb.column_by_name("b").unwrap().as_utf8_arr().unwrap(), + &StringArray::from_iter_values(["hello"]) + ); + } + + #[test] + fn basic_object_derive_schema_into_rb() { + let json = json!({ + "a": 1, + "b": "hello", + }); + + let store_schema = fields_to_map( + [ + Field::new("a", DataType::Int64, true), + Field::new("b", DataType::Utf8, true), + Field::new("c", DataType::Float64, true), + ] + .into_iter(), + ); + let data = Event::new(json, 0 /* doesn't matter */, LogSource::Json) + .to_data(None, None, None, SchemaVersion::V0) + .unwrap(); + let (schema, _) = + Event::infer_schema(&data[0], &store_schema, None, false, SchemaVersion::V0).unwrap(); + let rb = + Event::into_recordbatch(Utc::now(), &data, &schema, None, SchemaVersion::V0).unwrap(); + + assert_eq!(rb.num_rows(), 1); + assert_eq!(rb.num_columns(), 3); + assert_eq!( + rb.column_by_name("a").unwrap().as_int64_arr().unwrap(), + &Int64Array::from_iter([1]) + ); + assert_eq!( + rb.column_by_name("b").unwrap().as_utf8_arr().unwrap(), + &StringArray::from_iter_values(["hello"]) + ); + } + + #[test] + fn basic_object_schema_mismatch() { + let json = json!({ + "a": 1, + "b": 1, // type mismatch + }); + + let store_schema = fields_to_map( + [ + Field::new("a", DataType::Int64, true), + Field::new("b", DataType::Utf8, true), + Field::new("c", DataType::Float64, true), + ] + .into_iter(), + ); + + let data = Event::new(json, 0 /* doesn't matter */, LogSource::Json) + .to_data(None, None, None, SchemaVersion::V0) + .unwrap(); + + assert!( + Event::infer_schema(&data[0], &store_schema, None, false, SchemaVersion::V0).is_err() + ); + } + + #[test] + fn empty_object() { + let json = json!({}); + + let store_schema = fields_to_map( + [ + Field::new("a", DataType::Int64, true), + Field::new("b", DataType::Utf8, true), + Field::new("c", DataType::Float64, true), + ] + .into_iter(), + ); + + let data = Event::new(json, 0 /* doesn't matter */, LogSource::Json) + .to_data(None, None, None, SchemaVersion::V0) + .unwrap(); + let (schema, _) = + Event::infer_schema(&data[0], &store_schema, None, false, SchemaVersion::V0).unwrap(); + let rb = + Event::into_recordbatch(Utc::now(), &data, &schema, None, SchemaVersion::V0).unwrap(); + + assert_eq!(rb.num_rows(), 1); + assert_eq!(rb.num_columns(), 1); + } + + #[test] + fn array_into_recordbatch_inffered_schema() { + let json = json!([ + { + "b": "hello", + }, + { + "b": "hello", + "a": 1, + "c": 1 + }, + { + "a": 1, + "b": "hello", + "c": null + }, + ]); + + let store_schema = HashMap::new(); + let data = Event::new(json, 0 /* doesn't matter */, LogSource::Json) + .to_data(None, None, None, SchemaVersion::V0) + .unwrap(); + let (schema, _) = + Event::infer_schema(&data[1], &store_schema, None, false, SchemaVersion::V0).unwrap(); + let rb = + Event::into_recordbatch(Utc::now(), &data, &schema, None, SchemaVersion::V0).unwrap(); + + assert_eq!(rb.num_rows(), 3); + assert_eq!(rb.num_columns(), 4); + + let schema = rb.schema(); + let fields = &schema.fields; + + assert_eq!(&*fields[1], &Field::new("a", DataType::Int64, true)); + assert_eq!(&*fields[2], &Field::new("b", DataType::Utf8, true)); + assert_eq!(&*fields[3], &Field::new("c", DataType::Int64, true)); + + assert_eq!( + rb.column_by_name("a").unwrap().as_int64_arr().unwrap(), + &Int64Array::from(vec![None, Some(1), Some(1)]) + ); + assert_eq!( + rb.column_by_name("b").unwrap().as_utf8_arr().unwrap(), + &StringArray::from(vec![Some("hello"), Some("hello"), Some("hello"),]) + ); + assert_eq!( + rb.column_by_name("c").unwrap().as_int64_arr().unwrap(), + &Int64Array::from(vec![None, Some(1), None]) + ); + } + + #[test] + fn arr_with_null_into_rb() { + let json = json!([ + { + "c": null, + "b": "hello", + "a": null + }, + { + "a": 1, + "c": 1.22, + "b": "hello" + }, + { + "b": "hello", + "a": 1, + "c": null + }, + ]); + + let store_schema = HashMap::new(); + let data = Event::new(json, 0 /* doesn't matter */, LogSource::Json) + .to_data(None, None, None, SchemaVersion::V0) + .unwrap(); + let (schema, _) = + Event::infer_schema(&data[1], &store_schema, None, false, SchemaVersion::V0).unwrap(); + let rb = + Event::into_recordbatch(Utc::now(), &data, &schema, None, SchemaVersion::V0).unwrap(); + + assert_eq!(rb.num_rows(), 3); + assert_eq!(rb.num_columns(), 4); + assert_eq!( + rb.column_by_name("a").unwrap().as_int64_arr().unwrap(), + &Int64Array::from(vec![None, Some(1), Some(1)]) + ); + assert_eq!( + rb.column_by_name("b").unwrap().as_utf8_arr().unwrap(), + &StringArray::from(vec![Some("hello"), Some("hello"), Some("hello"),]) + ); + assert_eq!( + rb.column_by_name("c").unwrap().as_float64_arr().unwrap(), + &Float64Array::from(vec![None, Some(1.22), None,]) + ); + } + + #[test] + fn arr_with_null_derive_schema_into_rb() { + let json = json!([ + { + "c": null, + "b": "hello", + "a": null + }, + { + "a": 1, + "c": 1.22, + "b": "hello" + }, + { + "b": "hello", + "a": 1, + "c": null + }, + ]); + + let store_schema = fields_to_map( + [ + Field::new("a", DataType::Int64, true), + Field::new("b", DataType::Utf8, true), + Field::new("c", DataType::Float64, true), + ] + .into_iter(), + ); + let data = Event::new(json, 0 /* doesn't matter */, LogSource::Json) + .to_data(None, None, None, SchemaVersion::V0) + .unwrap(); + let (schema, _) = + Event::infer_schema(&data[0], &store_schema, None, false, SchemaVersion::V0).unwrap(); + let rb = + Event::into_recordbatch(Utc::now(), &data, &schema, None, SchemaVersion::V0).unwrap(); + + assert_eq!(rb.num_rows(), 3); + assert_eq!(rb.num_columns(), 4); + assert_eq!( + rb.column_by_name("a").unwrap().as_int64_arr().unwrap(), + &Int64Array::from(vec![None, Some(1), Some(1)]) + ); + assert_eq!( + rb.column_by_name("b").unwrap().as_utf8_arr().unwrap(), + &StringArray::from(vec![Some("hello"), Some("hello"), Some("hello"),]) + ); + assert_eq!( + rb.column_by_name("c").unwrap().as_float64_arr().unwrap(), + &Float64Array::from(vec![None, Some(1.22), None,]) + ); + } + + #[test] + fn arr_schema_mismatch() { + let json = json!( + { + "a": 1, + "b": "hello", + "c": 1 + }); + + let store_schema = fields_to_map( + [ + Field::new("a", DataType::Int64, true), + Field::new("b", DataType::Utf8, true), + Field::new("c", DataType::Float64, true), + ] + .into_iter(), + ); + + assert!(Event::infer_schema( + json.as_object().unwrap(), + &store_schema, + None, + false, + SchemaVersion::V0 + ) + .is_err()); + } + + #[test] + fn arr_obj_with_nested_type() { + let json = json!([ + { + "a": 1, + "b": "hello", + }, + { + "a": 1, + "b": "hello", + }, + { + "a": 1, + "b": "hello", + "c_a": [1], + }, + { + "a": 1, + "b": "hello", + "c_a": [1], + "c_b": [2], + }, + ]); + + let store_schema = HashMap::new(); + let data = Event::new(json, 0 /* doesn't matter */, LogSource::Json) + .to_data(None, None, None, SchemaVersion::V0) + .unwrap(); + let (schema, _) = + Event::infer_schema(&data[3], &store_schema, None, false, SchemaVersion::V0).unwrap(); + let rb = + Event::into_recordbatch(Utc::now(), &data, &schema, None, SchemaVersion::V0).unwrap(); + + assert_eq!(rb.num_rows(), 4); + assert_eq!(rb.num_columns(), 5); + assert_eq!( + rb.column_by_name("a").unwrap().as_int64_arr().unwrap(), + &Int64Array::from(vec![Some(1), Some(1), Some(1), Some(1)]) + ); + assert_eq!( + rb.column_by_name("b").unwrap().as_utf8_arr().unwrap(), + &StringArray::from(vec![ + Some("hello"), + Some("hello"), + Some("hello"), + Some("hello") + ]) + ); + + assert_eq!( + rb.column_by_name("c_a") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(), + &ListArray::from_iter_primitive::(vec![ + None, + None, + Some(vec![Some(1i64)]), + Some(vec![Some(1)]) + ]) + ); + + assert_eq!( + rb.column_by_name("c_b") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(), + &ListArray::from_iter_primitive::(vec![ + None, + None, + None, + Some(vec![Some(2i64)]) + ]) + ); + } + + #[test] + fn arr_obj_with_nested_type_v1() { + let json = json!([ + { + "a": 1, + "b": "hello", + }, + { + "a": 1, + "b": "hello", + }, + { + "a": 1, + "b": "hello", + "c_a": 1, + }, + { + "a": 1, + "b": "hello", + "c_a": 1, + "c_b": 2, + }, + ]); + + let store_schema = HashMap::new(); + let data = Event::new(json, 0 /* doesn't matter */, LogSource::Json) + .to_data(None, None, None, SchemaVersion::V1) + .unwrap(); + let (schema, _) = + Event::infer_schema(&data[3], &store_schema, None, false, SchemaVersion::V1).unwrap(); + let rb = + Event::into_recordbatch(Utc::now(), &data, &schema, None, SchemaVersion::V1).unwrap(); + + assert_eq!(rb.num_rows(), 4); + assert_eq!(rb.num_columns(), 5); + assert_eq!( + rb.column_by_name("a").unwrap().as_float64_arr().unwrap(), + &Float64Array::from(vec![Some(1.0), Some(1.0), Some(1.0), Some(1.0)]) + ); + assert_eq!( + rb.column_by_name("b").unwrap().as_utf8_arr().unwrap(), + &StringArray::from(vec![ + Some("hello"), + Some("hello"), + Some("hello"), + Some("hello") + ]) + ); + + assert_eq!( + rb.column_by_name("c_a").unwrap().as_float64_arr().unwrap(), + &Float64Array::from(vec![None, None, Some(1.0), Some(1.0)]) + ); + + assert_eq!( + rb.column_by_name("c_b").unwrap().as_float64_arr().unwrap(), + &Float64Array::from(vec![None, None, None, Some(2.0)]) + ); + } + + #[test] + fn generate_correct_prefix_with_current_time_and_no_custom_partitioning() { + let schema = vec![]; + let parsed_timestamp = NaiveDate::from_ymd_opt(2023, 10, 1) + .unwrap() + .and_hms_opt(12, 30, 0) + .unwrap(); + let custom_partition_values = HashMap::new(); + + let expected = format!( + "{}.date={}.hour={:02}.minute={}", + get_schema_key(&schema), + parsed_timestamp.date(), + parsed_timestamp.hour(), + Minute::from(parsed_timestamp).to_slot(OBJECT_STORE_DATA_GRANULARITY), + ); + + let generated = generate_prefix(&schema, parsed_timestamp, &custom_partition_values); + + assert_eq!(generated, expected); + } + + #[test] + fn generate_correct_prefix_with_current_time_and_custom_partitioning() { + let schema = vec![]; + let parsed_timestamp = NaiveDate::from_ymd_opt(2023, 10, 1) + .unwrap() + .and_hms_opt(12, 30, 0) + .unwrap(); + let custom_partition_values = HashMap::from_iter([ + ("key1".to_string(), "value1".to_string()), + ("key2".to_string(), "value2".to_string()), + ]); + + let expected = format!( + "{}.date={}.hour={:02}.minute={}.key1=value1.key2=value2", + get_schema_key(&schema), + parsed_timestamp.date(), + parsed_timestamp.hour(), + Minute::from(parsed_timestamp).to_slot(OBJECT_STORE_DATA_GRANULARITY), + ); + + let generated = generate_prefix(&schema, parsed_timestamp, &custom_partition_values); + + assert_eq!(generated, expected); + } } diff --git a/src/event/format/mod.rs b/src/event/format/mod.rs index 158951d3b..df73475b1 100644 --- a/src/event/format/mod.rs +++ b/src/event/format/mod.rs @@ -20,10 +20,11 @@ use std::{ collections::{HashMap, HashSet}, fmt::Display, + num::NonZeroU32, sync::Arc, }; -use anyhow::{anyhow, Error as AnyError}; +use anyhow::anyhow; use arrow_array::RecordBatch; use arrow_schema::{DataType, Field, Schema, TimeUnit}; use chrono::{DateTime, Utc}; @@ -32,8 +33,11 @@ use serde_json::Value; use crate::{ metadata::SchemaVersion, - storage::StreamType, - utils::arrow::{get_field, get_timestamp_array, replace_columns}, + parseable::Stream, + utils::{ + arrow::{get_field, get_timestamp_array, replace_columns}, + json::Json, + }, }; use super::{Event, DEFAULT_TIMESTAMP_KEY}; @@ -92,6 +96,8 @@ impl Display for LogSource { } } +pub type IsFirstEvent = bool; + /// Contains the format name and a list of known field names that are associated with the said format. /// Stored on disk as part of `ObjectStoreFormat` in stream.json #[derive(Default, Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] @@ -116,38 +122,31 @@ pub trait EventFormat: Sized { fn to_data( self, - schema: &HashMap>, time_partition: Option<&String>, + time_partition_limit: Option, + custom_partitions: Option<&String>, schema_version: SchemaVersion, - static_schema_flag: bool, - ) -> Result<(Self::Data, EventSchema, bool), AnyError>; + ) -> anyhow::Result>; - fn decode(data: Self::Data, schema: Arc) -> Result; + fn infer_schema( + data: &Self::Data, + stored_schema: &HashMap>, + time_partition: Option<&String>, + static_schema_flag: bool, + schema_version: SchemaVersion, + ) -> anyhow::Result<(EventSchema, IsFirstEvent)>; - /// Returns the UTC time at ingestion - fn get_p_timestamp(&self) -> DateTime; + fn decode(data: &[Self::Data], schema: Arc) -> anyhow::Result; - fn into_recordbatch( - self, + /// Updates inferred schema with `p_timestamp` field and ensures it adheres to expectations + fn prepare_and_validate_schema( + mut schema: EventSchema, storage_schema: &HashMap>, static_schema_flag: bool, - time_partition: Option<&String>, - schema_version: SchemaVersion, - ) -> Result<(RecordBatch, bool), AnyError> { - let p_timestamp = self.get_p_timestamp(); - let (data, mut schema, is_first) = self.to_data( - storage_schema, - time_partition, - schema_version, - static_schema_flag, - )?; - + ) -> anyhow::Result { if get_field(&schema, DEFAULT_TIMESTAMP_KEY).is_some() { - return Err(anyhow!( - "field {} is a reserved field", - DEFAULT_TIMESTAMP_KEY - )); - }; + return Err(anyhow!("field {DEFAULT_TIMESTAMP_KEY} is a reserved field",)); + } // add the p_timestamp field to the event schema to the 0th index schema.insert( @@ -159,11 +158,28 @@ pub trait EventFormat: Sized { )), ); - // prepare the record batch and new fields to be added - let mut new_schema = Arc::new(Schema::new(schema)); - if !Self::is_schema_matching(new_schema.clone(), storage_schema, static_schema_flag) { + if static_schema_flag + && schema.iter().any(|field| { + storage_schema + .get(field.name()) + .is_none_or(|storage_field| storage_field != field) + }) + { return Err(anyhow!("Schema mismatch")); } + + Ok(schema) + } + + fn into_recordbatch( + p_timestamp: DateTime, + data: &[Self::Data], + schema: &EventSchema, + time_partition: Option<&String>, + schema_version: SchemaVersion, + ) -> anyhow::Result { + // prepare the record batch and new fields to be added + let mut new_schema = Arc::new(Schema::new(schema.clone())); new_schema = update_field_type_in_schema(new_schema, None, time_partition, None, schema_version); @@ -175,43 +191,10 @@ pub trait EventFormat: Sized { &[Arc::new(get_timestamp_array(p_timestamp, rb.num_rows()))], ); - Ok((rb, is_first)) - } - - fn is_schema_matching( - new_schema: Arc, - storage_schema: &HashMap>, - static_schema_flag: bool, - ) -> bool { - if !static_schema_flag { - return true; - } - for field in new_schema.fields() { - let Some(storage_field) = storage_schema.get(field.name()) else { - return false; - }; - if field.name() != storage_field.name() { - return false; - } - if field.data_type() != storage_field.data_type() { - return false; - } - } - true + Ok(rb) } - #[allow(clippy::too_many_arguments)] - fn into_event( - self, - stream_name: String, - origin_size: u64, - storage_schema: &HashMap>, - static_schema_flag: bool, - custom_partitions: Option<&String>, - time_partition: Option<&String>, - schema_version: SchemaVersion, - stream_type: StreamType, - ) -> Result; + fn into_event(self, stream: &Stream) -> anyhow::Result; } pub fn get_existing_field_names( @@ -269,7 +252,7 @@ pub fn update_field_type_in_schema( inferred_schema: Arc, existing_schema: Option<&HashMap>>, time_partition: Option<&String>, - log_records: Option<&Vec>, + log_records: Option<&Json>, schema_version: SchemaVersion, ) -> Arc { let mut updated_schema = inferred_schema.clone(); @@ -280,11 +263,9 @@ pub fn update_field_type_in_schema( updated_schema = override_existing_timestamp_fields(existing_schema, updated_schema); } - if let Some(log_records) = log_records { - for log_record in log_records { - updated_schema = - override_data_type(updated_schema.clone(), log_record.clone(), schema_version); - } + if let Some(log_record) = log_records { + updated_schema = + override_data_type(updated_schema.clone(), log_record.clone(), schema_version); } let Some(time_partition) = time_partition else { @@ -314,18 +295,15 @@ pub fn update_field_type_in_schema( // a string value parseable into timestamp as timestamp type and all numbers as float64. pub fn override_data_type( inferred_schema: Arc, - log_record: Value, + log_record: Json, schema_version: SchemaVersion, ) -> Arc { - let Value::Object(map) = log_record else { - return inferred_schema; - }; let updated_schema: Vec = inferred_schema .fields() .iter() .map(|field| { let field_name = field.name().as_str(); - match (schema_version, map.get(field.name())) { + match (schema_version, log_record.get(field.name())) { // in V1 for new fields in json named "time"/"date" or such and having inferred // type string, that can be parsed as timestamp, use the timestamp type. // NOTE: support even more datetime string formats diff --git a/src/event/mod.rs b/src/event/mod.rs index 29a4a0899..5c5e0254d 100644 --- a/src/event/mod.rs +++ b/src/event/mod.rs @@ -20,85 +20,70 @@ pub mod format; use arrow_array::RecordBatch; -use arrow_schema::{Field, Fields, Schema}; +use arrow_schema::Field; use itertools::Itertools; use std::sync::Arc; use self::error::EventError; -use crate::{ - metadata::update_stats, - parseable::{StagingError, PARSEABLE}, - storage::StreamType, - LOCK_EXPECT, -}; +use crate::{metadata::update_stats, parseable::Stream, storage::StreamType}; use chrono::NaiveDateTime; use std::collections::HashMap; pub const DEFAULT_TIMESTAMP_KEY: &str = "p_timestamp"; -#[derive(Clone)] -pub struct Event { - pub stream_name: String, +#[derive(Debug)] +pub struct PartitionEvent { pub rb: RecordBatch, + pub parsed_timestamp: NaiveDateTime, +} + +#[derive(Debug)] +pub struct Event { pub origin_format: &'static str, - pub origin_size: u64, + pub origin_size: usize, pub is_first_event: bool, - pub parsed_timestamp: NaiveDateTime, - pub time_partition: Option, - pub custom_partition_values: HashMap, + pub partitions: HashMap, pub stream_type: StreamType, } // Events holds the schema related to a each event for a single log stream impl Event { - pub fn process(self) -> Result<(), EventError> { - let mut key = get_schema_key(&self.rb.schema().fields); - if self.time_partition.is_some() { - let parsed_timestamp_to_min = self.parsed_timestamp.format("%Y%m%dT%H%M").to_string(); - key.push_str(&parsed_timestamp_to_min); - } - - if !self.custom_partition_values.is_empty() { - for (k, v) in self.custom_partition_values.iter().sorted_by_key(|v| v.0) { - key.push_str(&format!("&{k}={v}")); + pub fn process(self, stream: &Stream) -> Result<(), EventError> { + for ( + prefix, + PartitionEvent { + rb, + parsed_timestamp, + }, + ) in self.partitions + { + if self.is_first_event { + stream.commit_schema(rb.schema().as_ref().clone())?; } - } + stream.push(&prefix, parsed_timestamp, &rb, self.stream_type)?; - if self.is_first_event { - commit_schema(&self.stream_name, self.rb.schema())?; - } - - PARSEABLE.get_or_create_stream(&self.stream_name).push( - &key, - &self.rb, - self.parsed_timestamp, - &self.custom_partition_values, - self.stream_type, - )?; - - update_stats( - &self.stream_name, - self.origin_format, - self.origin_size, - self.rb.num_rows(), - self.parsed_timestamp.date(), - ); - - crate::livetail::LIVETAIL.process(&self.stream_name, &self.rb); + update_stats( + &stream.stream_name, + self.origin_format, + self.origin_size, + rb.num_rows(), + parsed_timestamp.date(), + ); + crate::livetail::LIVETAIL.process(&stream.stream_name, &rb); + } Ok(()) } - pub fn process_unchecked(&self) -> Result<(), EventError> { - let key = get_schema_key(&self.rb.schema().fields); - - PARSEABLE.get_or_create_stream(&self.stream_name).push( - &key, - &self.rb, - self.parsed_timestamp, - &self.custom_partition_values, - self.stream_type, - )?; + pub fn process_unchecked(&self, stream: &Stream) -> Result<(), EventError> { + for (prefix, partition) in &self.partitions { + stream.push( + prefix, + partition.parsed_timestamp, + &partition.rb, + self.stream_type, + )?; + } Ok(()) } @@ -114,23 +99,6 @@ pub fn get_schema_key(fields: &[Arc]) -> String { format!("{hash:x}") } -pub fn commit_schema(stream_name: &str, schema: Arc) -> Result<(), StagingError> { - let mut stream_metadata = PARSEABLE.streams.write().expect("lock poisoned"); - - let map = &mut stream_metadata - .get_mut(stream_name) - .expect("map has entry for this stream name") - .metadata - .write() - .expect(LOCK_EXPECT) - .schema; - let current_schema = Schema::new(map.values().cloned().collect::()); - let schema = Schema::try_merge(vec![current_schema, schema.as_ref().clone()])?; - map.clear(); - map.extend(schema.fields.iter().map(|f| (f.name().clone(), f.clone()))); - Ok(()) -} - pub mod error { use crate::{parseable::StagingError, storage::ObjectStorageError}; diff --git a/src/handlers/http/cluster/mod.rs b/src/handlers/http/cluster/mod.rs index 3232d54cb..e74d00f34 100644 --- a/src/handlers/http/cluster/mod.rs +++ b/src/handlers/http/cluster/mod.rs @@ -24,6 +24,7 @@ use std::time::Duration; use actix_web::http::header::{self, HeaderMap}; use actix_web::web::Path; use actix_web::Responder; +use anyhow::anyhow; use bytes::Bytes; use chrono::Utc; use clokwerk::{AsyncScheduler, Interval}; @@ -37,7 +38,7 @@ use tracing::{error, info, warn}; use url::Url; use utils::{check_liveness, to_url_string, IngestionStats, QueriedStats, StorageStats}; -use crate::handlers::http::ingest::ingest_internal_stream; +use crate::event::format::{json, EventFormat, LogSource}; use crate::metrics::prom_utils::Metrics; use crate::parseable::PARSEABLE; use crate::rbac::role::model::DefaultPrivilege; @@ -739,29 +740,31 @@ pub fn init_cluster_metrics_schedular() -> Result<(), PostError> { scheduler .every(CLUSTER_METRICS_INTERVAL_SECONDS) .run(move || async { + let internal_stream = PARSEABLE.get_or_create_stream(INTERNAL_STREAM_NAME); let result: Result<(), PostError> = async { let cluster_metrics = fetch_cluster_metrics().await; - if let Ok(metrics) = cluster_metrics { - if !metrics.is_empty() { - info!("Cluster metrics fetched successfully from all ingestors"); - if let Ok(metrics_bytes) = serde_json::to_vec(&metrics) { - if matches!( - ingest_internal_stream( - INTERNAL_STREAM_NAME.to_string(), - bytes::Bytes::from(metrics_bytes), - ) - .await, - Ok(()) - ) { - info!("Cluster metrics successfully ingested into internal stream"); - } else { - error!("Failed to ingest cluster metrics into internal stream"); - } - } else { - error!("Failed to serialize cluster metrics"); - } + let Ok(metrics) = cluster_metrics else { + return Ok(()); + }; + if !metrics.is_empty() { + info!("Cluster metrics fetched successfully from all ingestors"); + let json = serde_json::to_value(&metrics).expect("should be json serializable"); + let byte_size = serde_json::to_vec(&metrics).unwrap().len(); + + if matches!( + json::Event::new(json, byte_size, LogSource::Pmeta) + .into_event(&internal_stream) + .and_then(|event| event + .process(&internal_stream) + .map_err(|e| anyhow!(e))), + Ok(()) + ) { + info!("Cluster metrics successfully ingested into internal stream"); + } else { + error!("Failed to ingest cluster metrics into internal stream"); } } + Ok(()) } .await; diff --git a/src/handlers/http/cluster/utils.rs b/src/handlers/http/cluster/utils.rs index b88e95bb1..9256525c6 100644 --- a/src/handlers/http/cluster/utils.rs +++ b/src/handlers/http/cluster/utils.rs @@ -16,10 +16,22 @@ * */ -use crate::{handlers::http::base_path_without_preceding_slash, HTTP_CLIENT}; -use actix_web::http::header; +use std::{future::Future, pin::Pin}; + +use crate::{ + handlers::http::{base_path_without_preceding_slash, MAX_EVENT_PAYLOAD_SIZE}, + HTTP_CLIENT, +}; +use actix_web::{ + dev::Payload, + error::{ErrorPayloadTooLarge, JsonPayloadError}, + http::header, + FromRequest, HttpRequest, +}; +use bytes::BytesMut; use chrono::{DateTime, Utc}; -use serde::{Deserialize, Serialize}; +use futures::StreamExt; +use serde::{de::DeserializeOwned, Deserialize, Serialize}; use tracing::error; use url::Url; @@ -199,3 +211,49 @@ pub fn to_url_string(str: String) -> String { format!("http://{}/", str) } + +pub struct JsonWithSize { + pub json: T, + pub byte_size: usize, +} + +impl FromRequest for JsonWithSize { + type Error = actix_web::error::Error; + type Future = Pin>>>; + + fn from_request(_: &HttpRequest, payload: &mut Payload) -> Self::Future { + let limit = MAX_EVENT_PAYLOAD_SIZE; + + // Take ownership of payload for async processing + let mut payload = payload.take(); + + Box::pin(async move { + // Buffer to collect all bytes + let mut body = BytesMut::new(); + let mut byte_size = 0; + + // Collect all bytes from the payload stream + while let Some(chunk) = payload.next().await { + let chunk = chunk?; + byte_size += chunk.len(); + + // Check the size limit + if byte_size > limit { + return Err(ErrorPayloadTooLarge(byte_size)); + } + + // Extend our buffer with the chunk + body.extend_from_slice(&chunk); + } + + // Convert the collected bytes to Bytes + let bytes = body.freeze(); + + // Deserialize the JSON payload + let json = + serde_json::from_slice::(&bytes).map_err(JsonPayloadError::Deserialize)?; + + Ok(JsonWithSize { json, byte_size }) + }) + } +} diff --git a/src/handlers/http/ingest.rs b/src/handlers/http/ingest.rs index 1e5e6d048..899ba8896 100644 --- a/src/handlers/http/ingest.rs +++ b/src/handlers/http/ingest.rs @@ -16,39 +16,40 @@ * */ -use std::collections::{HashMap, HashSet}; +use std::collections::HashSet; -use actix_web::web::{Json, Path}; +use actix_web::web::Path; use actix_web::{http::header::ContentType, HttpRequest, HttpResponse}; use arrow_array::RecordBatch; -use bytes::Bytes; use chrono::Utc; use http::StatusCode; use serde_json::Value; -use crate::event; use crate::event::error::EventError; -use crate::event::format::{self, EventFormat, LogSource, LogSourceEntry}; +use crate::event::format::{json, EventFormat, LogSource, LogSourceEntry}; +use crate::event::{self, get_schema_key, PartitionEvent}; use crate::handlers::{LOG_SOURCE_KEY, STREAM_NAME_HEADER_KEY}; -use crate::metadata::SchemaVersion; use crate::option::Mode; use crate::otel::logs::OTEL_LOG_KNOWN_FIELD_LIST; use crate::otel::metrics::OTEL_METRICS_KNOWN_FIELD_LIST; use crate::otel::traces::OTEL_TRACES_KNOWN_FIELD_LIST; -use crate::parseable::{StreamNotFound, PARSEABLE}; +use crate::parseable::{Stream, StreamNotFound, PARSEABLE}; use crate::storage::{ObjectStorageError, StreamType}; use crate::utils::header_parsing::ParseHeaderError; use crate::utils::json::flatten::JsonFlattenError; +use super::cluster::utils::JsonWithSize; use super::logstream::error::{CreateStreamError, StreamError}; -use super::modal::utils::ingest_utils::flatten_and_push_logs; use super::users::dashboards::DashboardError; use super::users::filters::FiltersError; // Handler for POST /api/v1/ingest // ingests events by extracting stream name from header // creates if stream does not exist -pub async fn ingest(req: HttpRequest, Json(json): Json) -> Result { +pub async fn ingest( + req: HttpRequest, + JsonWithSize { json, byte_size }: JsonWithSize, +) -> Result { let Some(stream_name) = req.headers().get(STREAM_NAME_HEADER_KEY) else { return Err(PostError::Header(ParseHeaderError::MissingStreamName)); }; @@ -81,31 +82,13 @@ pub async fn ingest(req: HttpRequest, Json(json): Json) -> Result Result<(), PostError> { - let size: usize = body.len(); - let json: Value = serde_json::from_slice(&body)?; - let schema = PARSEABLE.get_stream(&stream_name)?.get_schema_raw(); - - // For internal streams, use old schema - format::json::Event::new(json) - .into_event( - stream_name, - size as u64, - &schema, - false, - None, - None, - SchemaVersion::V0, - StreamType::Internal, - )? - .process()?; - - Ok(()) + Ok(HttpResponse::Ok().finish()) } // Handler for POST /v1/logs to ingest OTEL logs @@ -113,7 +96,7 @@ pub async fn ingest_internal_stream(stream_name: String, body: Bytes) -> Result< // creates if stream does not exist pub async fn handle_otel_logs_ingestion( req: HttpRequest, - Json(json): Json, + JsonWithSize { json, byte_size }: JsonWithSize, ) -> Result { let Some(stream_name) = req.headers().get(STREAM_NAME_HEADER_KEY) else { return Err(PostError::Header(ParseHeaderError::MissingStreamName)); @@ -144,7 +127,11 @@ pub async fn handle_otel_logs_ingestion( ) .await?; - flatten_and_push_logs(json, &stream_name, &log_source).await?; + let stream = PARSEABLE.get_or_create_stream(&stream_name); + + json::Event::new(json, byte_size, log_source) + .into_event(&stream)? + .process(&stream)?; Ok(HttpResponse::Ok().finish()) } @@ -154,7 +141,7 @@ pub async fn handle_otel_logs_ingestion( // creates if stream does not exist pub async fn handle_otel_metrics_ingestion( req: HttpRequest, - Json(json): Json, + JsonWithSize { json, byte_size }: JsonWithSize, ) -> Result { let Some(stream_name) = req.headers().get(STREAM_NAME_HEADER_KEY) else { return Err(PostError::Header(ParseHeaderError::MissingStreamName)); @@ -182,7 +169,11 @@ pub async fn handle_otel_metrics_ingestion( ) .await?; - flatten_and_push_logs(json, &stream_name, &log_source).await?; + let stream = PARSEABLE.get_or_create_stream(&stream_name); + + json::Event::new(json, byte_size, log_source) + .into_event(&stream)? + .process(&stream)?; Ok(HttpResponse::Ok().finish()) } @@ -192,7 +183,7 @@ pub async fn handle_otel_metrics_ingestion( // creates if stream does not exist pub async fn handle_otel_traces_ingestion( req: HttpRequest, - Json(json): Json, + JsonWithSize { json, byte_size }: JsonWithSize, ) -> Result { let Some(stream_name) = req.headers().get(STREAM_NAME_HEADER_KEY) else { return Err(PostError::Header(ParseHeaderError::MissingStreamName)); @@ -222,7 +213,11 @@ pub async fn handle_otel_traces_ingestion( ) .await?; - flatten_and_push_logs(json, &stream_name, &log_source).await?; + let stream = PARSEABLE.get_or_create_stream(&stream_name); + + json::Event::new(json, byte_size, log_source) + .into_event(&stream)? + .process(&stream)?; Ok(HttpResponse::Ok().finish()) } @@ -233,7 +228,7 @@ pub async fn handle_otel_traces_ingestion( pub async fn post_event( req: HttpRequest, stream_name: Path, - Json(json): Json, + JsonWithSize { json, byte_size }: JsonWithSize, ) -> Result { let stream_name = stream_name.into_inner(); @@ -271,27 +266,36 @@ pub async fn post_event( return Err(PostError::OtelNotSupported); } - flatten_and_push_logs(json, &stream_name, &log_source).await?; + let stream = PARSEABLE.get_or_create_stream(&stream_name); + + json::Event::new(json, byte_size, log_source) + .into_event(&stream)? + .process(&stream)?; Ok(HttpResponse::Ok().finish()) } pub async fn push_logs_unchecked( - batches: RecordBatch, - stream_name: &str, + rb: RecordBatch, + stream: &Stream, ) -> Result { let unchecked_event = event::Event { - rb: batches, - stream_name: stream_name.to_string(), origin_format: "json", origin_size: 0, - parsed_timestamp: Utc::now().naive_utc(), - time_partition: None, - is_first_event: true, // NOTE: Maybe should be false - custom_partition_values: HashMap::new(), // should be an empty map for unchecked push + is_first_event: true, // NOTE: Maybe should be false + partitions: [( + get_schema_key(&rb.schema().fields), + PartitionEvent { + rb, + parsed_timestamp: Utc::now().naive_utc(), + }, + )] + .into_iter() + .collect(), stream_type: StreamType::UserDefined, }; - unchecked_event.process_unchecked()?; + + unchecked_event.process_unchecked(stream)?; Ok(unchecked_event) } @@ -372,502 +376,3 @@ impl actix_web::ResponseError for PostError { .body(self.to_string()) } } - -#[cfg(test)] -mod tests { - - use arrow::datatypes::Int64Type; - use arrow_array::{ArrayRef, Float64Array, Int64Array, ListArray, StringArray}; - use arrow_schema::{DataType, Field}; - use serde_json::json; - use std::{collections::HashMap, sync::Arc}; - - use crate::{ - event::format::{json, EventFormat}, - metadata::SchemaVersion, - utils::json::{convert_array_to_object, flatten::convert_to_array}, - }; - - trait TestExt { - fn as_int64_arr(&self) -> Option<&Int64Array>; - fn as_float64_arr(&self) -> Option<&Float64Array>; - fn as_utf8_arr(&self) -> Option<&StringArray>; - } - - impl TestExt for ArrayRef { - fn as_int64_arr(&self) -> Option<&Int64Array> { - self.as_any().downcast_ref() - } - - fn as_float64_arr(&self) -> Option<&Float64Array> { - self.as_any().downcast_ref() - } - - fn as_utf8_arr(&self) -> Option<&StringArray> { - self.as_any().downcast_ref() - } - } - - fn fields_to_map(iter: impl Iterator) -> HashMap> { - iter.map(|x| (x.name().clone(), Arc::new(x))).collect() - } - - #[test] - fn basic_object_into_rb() { - let json = json!({ - "c": 4.23, - "a": 1, - "b": "hello", - }); - - let (rb, _) = json::Event::new(json) - .into_recordbatch(&HashMap::default(), false, None, SchemaVersion::V0) - .unwrap(); - - assert_eq!(rb.num_rows(), 1); - assert_eq!(rb.num_columns(), 4); - assert_eq!( - rb.column_by_name("a").unwrap().as_int64_arr().unwrap(), - &Int64Array::from_iter([1]) - ); - assert_eq!( - rb.column_by_name("b").unwrap().as_utf8_arr().unwrap(), - &StringArray::from_iter_values(["hello"]) - ); - assert_eq!( - rb.column_by_name("c").unwrap().as_float64_arr().unwrap(), - &Float64Array::from_iter([4.23]) - ); - } - - #[test] - fn basic_object_with_null_into_rb() { - let json = json!({ - "a": 1, - "b": "hello", - "c": null - }); - - let (rb, _) = json::Event::new(json) - .into_recordbatch(&HashMap::default(), false, None, SchemaVersion::V0) - .unwrap(); - - assert_eq!(rb.num_rows(), 1); - assert_eq!(rb.num_columns(), 3); - assert_eq!( - rb.column_by_name("a").unwrap().as_int64_arr().unwrap(), - &Int64Array::from_iter([1]) - ); - assert_eq!( - rb.column_by_name("b").unwrap().as_utf8_arr().unwrap(), - &StringArray::from_iter_values(["hello"]) - ); - } - - #[test] - fn basic_object_derive_schema_into_rb() { - let json = json!({ - "a": 1, - "b": "hello", - }); - - let schema = fields_to_map( - [ - Field::new("a", DataType::Int64, true), - Field::new("b", DataType::Utf8, true), - Field::new("c", DataType::Float64, true), - ] - .into_iter(), - ); - - let (rb, _) = json::Event::new(json) - .into_recordbatch(&schema, false, None, SchemaVersion::V0) - .unwrap(); - - assert_eq!(rb.num_rows(), 1); - assert_eq!(rb.num_columns(), 3); - assert_eq!( - rb.column_by_name("a").unwrap().as_int64_arr().unwrap(), - &Int64Array::from_iter([1]) - ); - assert_eq!( - rb.column_by_name("b").unwrap().as_utf8_arr().unwrap(), - &StringArray::from_iter_values(["hello"]) - ); - } - - #[test] - fn basic_object_schema_mismatch() { - let json = json!({ - "a": 1, - "b": 1, // type mismatch - }); - - let schema = fields_to_map( - [ - Field::new("a", DataType::Int64, true), - Field::new("b", DataType::Utf8, true), - Field::new("c", DataType::Float64, true), - ] - .into_iter(), - ); - - assert!(json::Event::new(json) - .into_recordbatch(&schema, false, None, SchemaVersion::V0,) - .is_err()); - } - - #[test] - fn empty_object() { - let json = json!({}); - - let schema = fields_to_map( - [ - Field::new("a", DataType::Int64, true), - Field::new("b", DataType::Utf8, true), - Field::new("c", DataType::Float64, true), - ] - .into_iter(), - ); - - let (rb, _) = json::Event::new(json) - .into_recordbatch(&schema, false, None, SchemaVersion::V0) - .unwrap(); - - assert_eq!(rb.num_rows(), 1); - assert_eq!(rb.num_columns(), 1); - } - - #[test] - fn non_object_arr_is_err() { - let json = json!([1]); - - assert!(convert_array_to_object( - json, - None, - None, - None, - SchemaVersion::V0, - &crate::event::format::LogSource::default() - ) - .is_err()) - } - - #[test] - fn array_into_recordbatch_inffered_schema() { - let json = json!([ - { - "b": "hello", - }, - { - "b": "hello", - "a": 1, - "c": 1 - }, - { - "a": 1, - "b": "hello", - "c": null - }, - ]); - - let (rb, _) = json::Event::new(json) - .into_recordbatch(&HashMap::default(), false, None, SchemaVersion::V0) - .unwrap(); - - assert_eq!(rb.num_rows(), 3); - assert_eq!(rb.num_columns(), 4); - - let schema = rb.schema(); - let fields = &schema.fields; - - assert_eq!(&*fields[1], &Field::new("a", DataType::Int64, true)); - assert_eq!(&*fields[2], &Field::new("b", DataType::Utf8, true)); - assert_eq!(&*fields[3], &Field::new("c", DataType::Int64, true)); - - assert_eq!( - rb.column_by_name("a").unwrap().as_int64_arr().unwrap(), - &Int64Array::from(vec![None, Some(1), Some(1)]) - ); - assert_eq!( - rb.column_by_name("b").unwrap().as_utf8_arr().unwrap(), - &StringArray::from(vec![Some("hello"), Some("hello"), Some("hello"),]) - ); - assert_eq!( - rb.column_by_name("c").unwrap().as_int64_arr().unwrap(), - &Int64Array::from(vec![None, Some(1), None]) - ); - } - - #[test] - fn arr_with_null_into_rb() { - let json = json!([ - { - "c": null, - "b": "hello", - "a": null - }, - { - "a": 1, - "c": 1.22, - "b": "hello" - }, - { - "b": "hello", - "a": 1, - "c": null - }, - ]); - - let (rb, _) = json::Event::new(json) - .into_recordbatch(&HashMap::default(), false, None, SchemaVersion::V0) - .unwrap(); - - assert_eq!(rb.num_rows(), 3); - assert_eq!(rb.num_columns(), 4); - assert_eq!( - rb.column_by_name("a").unwrap().as_int64_arr().unwrap(), - &Int64Array::from(vec![None, Some(1), Some(1)]) - ); - assert_eq!( - rb.column_by_name("b").unwrap().as_utf8_arr().unwrap(), - &StringArray::from(vec![Some("hello"), Some("hello"), Some("hello"),]) - ); - assert_eq!( - rb.column_by_name("c").unwrap().as_float64_arr().unwrap(), - &Float64Array::from(vec![None, Some(1.22), None,]) - ); - } - - #[test] - fn arr_with_null_derive_schema_into_rb() { - let json = json!([ - { - "c": null, - "b": "hello", - "a": null - }, - { - "a": 1, - "c": 1.22, - "b": "hello" - }, - { - "b": "hello", - "a": 1, - "c": null - }, - ]); - - let schema = fields_to_map( - [ - Field::new("a", DataType::Int64, true), - Field::new("b", DataType::Utf8, true), - Field::new("c", DataType::Float64, true), - ] - .into_iter(), - ); - - let (rb, _) = json::Event::new(json) - .into_recordbatch(&schema, false, None, SchemaVersion::V0) - .unwrap(); - - assert_eq!(rb.num_rows(), 3); - assert_eq!(rb.num_columns(), 4); - assert_eq!( - rb.column_by_name("a").unwrap().as_int64_arr().unwrap(), - &Int64Array::from(vec![None, Some(1), Some(1)]) - ); - assert_eq!( - rb.column_by_name("b").unwrap().as_utf8_arr().unwrap(), - &StringArray::from(vec![Some("hello"), Some("hello"), Some("hello"),]) - ); - assert_eq!( - rb.column_by_name("c").unwrap().as_float64_arr().unwrap(), - &Float64Array::from(vec![None, Some(1.22), None,]) - ); - } - - #[test] - fn arr_schema_mismatch() { - let json = json!([ - { - "a": null, - "b": "hello", - "c": 1.24 - }, - { - "a": 1, - "b": "hello", - "c": 1 - }, - { - "a": 1, - "b": "hello", - "c": null - }, - ]); - - let schema = fields_to_map( - [ - Field::new("a", DataType::Int64, true), - Field::new("b", DataType::Utf8, true), - Field::new("c", DataType::Float64, true), - ] - .into_iter(), - ); - - assert!(json::Event::new(json) - .into_recordbatch(&schema, false, None, SchemaVersion::V0,) - .is_err()); - } - - #[test] - fn arr_obj_with_nested_type() { - let json = json!([ - { - "a": 1, - "b": "hello", - }, - { - "a": 1, - "b": "hello", - }, - { - "a": 1, - "b": "hello", - "c": [{"a": 1}] - }, - { - "a": 1, - "b": "hello", - "c": [{"a": 1, "b": 2}] - }, - ]); - let flattened_json = convert_to_array( - convert_array_to_object( - json, - None, - None, - None, - SchemaVersion::V0, - &crate::event::format::LogSource::default(), - ) - .unwrap(), - ) - .unwrap(); - - let (rb, _) = json::Event::new(flattened_json) - .into_recordbatch(&HashMap::default(), false, None, SchemaVersion::V0) - .unwrap(); - assert_eq!(rb.num_rows(), 4); - assert_eq!(rb.num_columns(), 5); - assert_eq!( - rb.column_by_name("a").unwrap().as_int64_arr().unwrap(), - &Int64Array::from(vec![Some(1), Some(1), Some(1), Some(1)]) - ); - assert_eq!( - rb.column_by_name("b").unwrap().as_utf8_arr().unwrap(), - &StringArray::from(vec![ - Some("hello"), - Some("hello"), - Some("hello"), - Some("hello") - ]) - ); - - assert_eq!( - rb.column_by_name("c_a") - .unwrap() - .as_any() - .downcast_ref::() - .unwrap(), - &ListArray::from_iter_primitive::(vec![ - None, - None, - Some(vec![Some(1i64)]), - Some(vec![Some(1)]) - ]) - ); - - assert_eq!( - rb.column_by_name("c_b") - .unwrap() - .as_any() - .downcast_ref::() - .unwrap(), - &ListArray::from_iter_primitive::(vec![ - None, - None, - None, - Some(vec![Some(2i64)]) - ]) - ); - } - - #[test] - fn arr_obj_with_nested_type_v1() { - let json = json!([ - { - "a": 1, - "b": "hello", - }, - { - "a": 1, - "b": "hello", - }, - { - "a": 1, - "b": "hello", - "c": [{"a": 1}] - }, - { - "a": 1, - "b": "hello", - "c": [{"a": 1, "b": 2}] - }, - ]); - let flattened_json = convert_to_array( - convert_array_to_object( - json, - None, - None, - None, - SchemaVersion::V1, - &crate::event::format::LogSource::default(), - ) - .unwrap(), - ) - .unwrap(); - - let (rb, _) = json::Event::new(flattened_json) - .into_recordbatch(&HashMap::default(), false, None, SchemaVersion::V1) - .unwrap(); - - assert_eq!(rb.num_rows(), 4); - assert_eq!(rb.num_columns(), 5); - assert_eq!( - rb.column_by_name("a").unwrap().as_float64_arr().unwrap(), - &Float64Array::from(vec![Some(1.0), Some(1.0), Some(1.0), Some(1.0)]) - ); - assert_eq!( - rb.column_by_name("b").unwrap().as_utf8_arr().unwrap(), - &StringArray::from(vec![ - Some("hello"), - Some("hello"), - Some("hello"), - Some("hello") - ]) - ); - - assert_eq!( - rb.column_by_name("c_a").unwrap().as_float64_arr().unwrap(), - &Float64Array::from(vec![None, None, Some(1.0), Some(1.0)]) - ); - - assert_eq!( - rb.column_by_name("c_b").unwrap().as_float64_arr().unwrap(), - &Float64Array::from(vec![None, None, None, Some(2.0)]) - ); - } -} diff --git a/src/handlers/http/logstream.rs b/src/handlers/http/logstream.rs index a3a2096ae..1addfc158 100644 --- a/src/handlers/http/logstream.rs +++ b/src/handlers/http/logstream.rs @@ -114,7 +114,13 @@ pub async fn detect_schema(Json(json): Json) -> Result. - * - */ - -use actix_web::{HttpRequest, HttpResponse}; -use bytes::Bytes; - -use crate::{handlers::http::{ingest::PostError, modal::utils::ingest_utils::flatten_and_push_logs}, metadata::PARSEABLE.streams}; - - -// Handler for POST /api/v1/logstream/{logstream} -// only ingests events into the specified logstream -// fails if the logstream does not exist -pub async fn post_event(req: HttpRequest, body: Bytes) -> Result { - let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); - let internal_stream_names = PARSEABLE.streams.list_internal_streams(); - if internal_stream_names.contains(&stream_name) { - return Err(PostError::Invalid(anyhow::anyhow!( - "Stream {} is an internal stream and cannot be ingested into", - stream_name - ))); - } - if !PARSEABLE.streams.stream_exists(&stream_name) { - return Err(PostError::StreamNotFound(stream_name)); - } - - flatten_and_push_logs(req, body, stream_name).await?; - Ok(HttpResponse::Ok().finish()) -} \ No newline at end of file diff --git a/src/handlers/http/modal/utils/ingest_utils.rs b/src/handlers/http/modal/utils/ingest_utils.rs deleted file mode 100644 index 84d5ae117..000000000 --- a/src/handlers/http/modal/utils/ingest_utils.rs +++ /dev/null @@ -1,130 +0,0 @@ -/* - * Parseable Server (C) 2022 - 2024 Parseable, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as - * published by the Free Software Foundation, either version 3 of the - * License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - * - */ - -use chrono::Utc; -use opentelemetry_proto::tonic::{ - logs::v1::LogsData, metrics::v1::MetricsData, trace::v1::TracesData, -}; -use serde_json::Value; - -use crate::{ - event::format::{json, EventFormat, LogSource}, - handlers::http::{ - ingest::PostError, - kinesis::{flatten_kinesis_logs, Message}, - }, - otel::{logs::flatten_otel_logs, metrics::flatten_otel_metrics, traces::flatten_otel_traces}, - parseable::PARSEABLE, - storage::StreamType, - utils::json::{convert_array_to_object, flatten::convert_to_array}, -}; - -pub async fn flatten_and_push_logs( - json: Value, - stream_name: &str, - log_source: &LogSource, -) -> Result<(), PostError> { - match log_source { - LogSource::Kinesis => { - //custom flattening required for Amazon Kinesis - let message: Message = serde_json::from_value(json)?; - for record in flatten_kinesis_logs(message) { - push_logs(stream_name, record, &LogSource::default()).await?; - } - } - LogSource::OtelLogs => { - //custom flattening required for otel logs - let logs: LogsData = serde_json::from_value(json)?; - for record in flatten_otel_logs(&logs) { - push_logs(stream_name, record, log_source).await?; - } - } - LogSource::OtelTraces => { - //custom flattening required for otel traces - let traces: TracesData = serde_json::from_value(json)?; - for record in flatten_otel_traces(&traces) { - push_logs(stream_name, record, log_source).await?; - } - } - LogSource::OtelMetrics => { - //custom flattening required for otel metrics - let metrics: MetricsData = serde_json::from_value(json)?; - for record in flatten_otel_metrics(metrics) { - push_logs(stream_name, record, log_source).await?; - } - } - _ => { - push_logs(stream_name, json, log_source).await?; - } - } - Ok(()) -} - -async fn push_logs( - stream_name: &str, - json: Value, - log_source: &LogSource, -) -> Result<(), PostError> { - let stream = PARSEABLE.get_stream(stream_name)?; - let time_partition = stream.get_time_partition(); - let time_partition_limit = PARSEABLE - .get_stream(stream_name)? - .get_time_partition_limit(); - let static_schema_flag = stream.get_static_schema_flag(); - let custom_partition = stream.get_custom_partition(); - let schema_version = stream.get_schema_version(); - let p_timestamp = Utc::now(); - - let data = if time_partition.is_some() || custom_partition.is_some() { - convert_array_to_object( - json, - time_partition.as_ref(), - time_partition_limit, - custom_partition.as_ref(), - schema_version, - log_source, - )? - } else { - vec![convert_to_array(convert_array_to_object( - json, - None, - None, - None, - schema_version, - log_source, - )?)?] - }; - - for json in data { - let origin_size = serde_json::to_vec(&json).unwrap().len() as u64; // string length need not be the same as byte length - let schema = PARSEABLE.get_stream(stream_name)?.get_schema_raw(); - json::Event { json, p_timestamp } - .into_event( - stream_name.to_owned(), - origin_size, - &schema, - static_schema_flag, - custom_partition.as_ref(), - time_partition.as_ref(), - schema_version, - StreamType::UserDefined, - )? - .process()?; - } - Ok(()) -} diff --git a/src/handlers/http/modal/utils/mod.rs b/src/handlers/http/modal/utils/mod.rs index 61930d43d..1d0a3767b 100644 --- a/src/handlers/http/modal/utils/mod.rs +++ b/src/handlers/http/modal/utils/mod.rs @@ -16,6 +16,5 @@ * */ -pub mod ingest_utils; pub mod logstream_utils; pub mod rbac_utils; diff --git a/src/handlers/http/query.rs b/src/handlers/http/query.rs index 7d9c33a45..4d60d62d2 100644 --- a/src/handlers/http/query.rs +++ b/src/handlers/http/query.rs @@ -29,14 +29,12 @@ use serde::{Deserialize, Serialize}; use serde_json::{json, Value}; use std::collections::HashMap; use std::pin::Pin; -use std::sync::Arc; use std::time::Instant; use tracing::error; use crate::event::error::EventError; use crate::handlers::http::fetch_schema; -use crate::event::commit_schema; use crate::metrics::QUERY_EXECUTE_TIME; use crate::option::Mode; use crate::parseable::{StreamNotFound, PARSEABLE}; @@ -175,7 +173,9 @@ pub async fn update_schema_when_distributed(tables: &Vec) -> Result<(), // commit schema merges the schema internally and updates the schema in storage. commit_schema_to_storage(table, new_schema.clone()).await?; - commit_schema(table, Arc::new(new_schema))?; + PARSEABLE + .get_or_create_stream(table) + .commit_schema(new_schema)?; } } } diff --git a/src/handlers/http/kinesis.rs b/src/kinesis.rs similarity index 100% rename from src/handlers/http/kinesis.rs rename to src/kinesis.rs diff --git a/src/lib.rs b/src/lib.rs index 94b81639d..c107b3047 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -30,6 +30,7 @@ pub mod enterprise; mod event; pub mod handlers; pub mod hottier; +mod kinesis; mod livetail; mod metadata; pub mod metrics; diff --git a/src/metadata.rs b/src/metadata.rs index 79cec19e5..441cd1e06 100644 --- a/src/metadata.rs +++ b/src/metadata.rs @@ -35,7 +35,7 @@ use crate::storage::StreamType; pub fn update_stats( stream_name: &str, origin: &'static str, - size: u64, + size: usize, num_rows: usize, parsed_date: NaiveDate, ) { diff --git a/src/parseable/streams.rs b/src/parseable/streams.rs index 784658c1f..a6fc55de8 100644 --- a/src/parseable/streams.rs +++ b/src/parseable/streams.rs @@ -20,6 +20,7 @@ use std::{ collections::{HashMap, HashSet}, fs::{remove_file, write, File, OpenOptions}, + io::BufReader, num::NonZeroU32, path::{Path, PathBuf}, process, @@ -29,7 +30,7 @@ use std::{ use arrow_array::RecordBatch; use arrow_schema::{Field, Fields, Schema}; -use chrono::{NaiveDateTime, Timelike, Utc}; +use chrono::{NaiveDateTime, Utc}; use derive_more::{Deref, DerefMut}; use itertools::Itertools; use parquet::{ @@ -54,7 +55,7 @@ use crate::{ metrics, option::Mode, storage::{object_storage::to_bytes, retention::Retention, StreamType}, - utils::time::{Minute, TimeRange}, + utils::time::TimeRange, LOCK_EXPECT, OBJECT_STORE_DATA_GRANULARITY, }; @@ -114,64 +115,44 @@ impl Stream { // Concatenates record batches and puts them in memory store for each event. pub fn push( &self, - schema_key: &str, - record: &RecordBatch, + prefix: &str, parsed_timestamp: NaiveDateTime, - custom_partition_values: &HashMap, + record: &RecordBatch, stream_type: StreamType, ) -> Result<(), StagingError> { let mut guard = self.writer.lock().unwrap(); if self.options.mode != Mode::Query || stream_type == StreamType::Internal { - let filename = - self.filename_by_partition(schema_key, parsed_timestamp, custom_partition_values); - match guard.disk.get_mut(&filename) { + match guard.disk.get_mut(prefix) { Some(writer) => { writer.write(record)?; } + // entry is not present thus we create it None => { - // entry is not present thus we create it std::fs::create_dir_all(&self.data_path)?; - let range = TimeRange::granularity_range( parsed_timestamp.and_local_timezone(Utc).unwrap(), OBJECT_STORE_DATA_GRANULARITY, ); - let file_path = self.data_path.join(&filename); - let mut writer = DiskWriter::try_new(file_path, &record.schema(), range) - .expect("File and RecordBatch both are checked"); - + let filename = self.filename_from_prefix(prefix); + let file_path = self.data_path.join(filename); + let mut writer = DiskWriter::try_new(file_path, &record.schema(), range)?; writer.write(record)?; - guard.disk.insert(filename, writer); + guard.disk.insert(prefix.to_owned(), writer); } }; } - guard.mem.push(schema_key, record); + guard.mem.push(prefix, record); Ok(()) } - pub fn filename_by_partition( - &self, - stream_hash: &str, - parsed_timestamp: NaiveDateTime, - custom_partition_values: &HashMap, - ) -> String { + pub fn filename_from_prefix(&self, prefix: &str) -> String { let mut hostname = hostname::get().unwrap().into_string().unwrap(); if let Some(id) = &self.ingestor_id { hostname.push_str(id); } - format!( - "{stream_hash}.date={}.hour={:02}.minute={}.{}{hostname}.data.{ARROW_FILE_EXTENSION}", - parsed_timestamp.date(), - parsed_timestamp.hour(), - Minute::from(parsed_timestamp).to_slot(OBJECT_STORE_DATA_GRANULARITY), - custom_partition_values - .iter() - .sorted_by_key(|v| v.0) - .map(|(key, value)| format!("{key}={value}.")) - .join("") - ) + format!("{prefix}.{hostname}.data.{ARROW_FILE_EXTENSION}") } pub fn arrow_files(&self) -> Vec { @@ -262,23 +243,13 @@ impl Stream { } pub fn get_schemas_if_present(&self) -> Option> { - let Ok(dir) = self.data_path.read_dir() else { - return None; - }; - let mut schemas: Vec = Vec::new(); - for file in dir.flatten() { - if let Some(ext) = file.path().extension() { - if ext.eq("schema") { - let file = File::open(file.path()).expect("Schema File should exist"); + for path in self.schema_files() { + let file = File::open(path).expect("Schema File should exist"); - let schema = match serde_json::from_reader(file) { - Ok(schema) => schema, - Err(_) => continue, - }; - schemas.push(schema); - } + if let Ok(schema) = serde_json::from_reader(BufReader::new(file)) { + schemas.push(schema); } } @@ -601,6 +572,19 @@ impl Stream { Arc::new(Schema::new(fields)) } + pub fn commit_schema(&self, schema: Schema) -> Result<(), StagingError> { + let current_schema = self.get_schema().as_ref().clone(); + let updated_schema = Schema::try_merge([current_schema, schema])? + .fields + .into_iter() + .map(|field| (field.name().to_owned(), field.clone())) + .collect(); + + self.metadata.write().expect(LOCK_EXPECT).schema = updated_schema; + + Ok(()) + } + pub fn get_schema_raw(&self) -> HashMap> { self.metadata.read().expect(LOCK_EXPECT).schema.clone() } @@ -816,10 +800,12 @@ mod tests { use arrow_array::{Int32Array, StringArray, TimestampMillisecondArray}; use arrow_schema::{DataType, Field, TimeUnit}; - use chrono::{NaiveDate, TimeDelta, Utc}; + use chrono::{NaiveDate, NaiveDateTime, TimeDelta, Utc}; use temp_dir::TempDir; use tokio::time::sleep; + use crate::{utils::time::Minute, OBJECT_STORE_DATA_GRANULARITY}; + use super::*; #[test] @@ -915,41 +901,8 @@ mod tests { } #[test] - fn generate_correct_path_with_current_time_and_no_custom_partitioning() { - let stream_name = "test_stream"; - let stream_hash = "abc123"; - let parsed_timestamp = NaiveDate::from_ymd_opt(2023, 10, 1) - .unwrap() - .and_hms_opt(12, 30, 0) - .unwrap(); - let custom_partition_values = HashMap::new(); - - let options = Options::default(); - let staging = Stream::new( - Arc::new(options), - stream_name, - LogStreamMetadata::default(), - None, - ); - - let expected = format!( - "{stream_hash}.date={}.hour={:02}.minute={}.{}.data.{ARROW_FILE_EXTENSION}", - parsed_timestamp.date(), - parsed_timestamp.hour(), - Minute::from(parsed_timestamp).to_slot(OBJECT_STORE_DATA_GRANULARITY), - hostname::get().unwrap().into_string().unwrap() - ); - - let generated = - staging.filename_by_partition(stream_hash, parsed_timestamp, &custom_partition_values); - - assert_eq!(generated, expected); - } - - #[test] - fn generate_correct_path_with_current_time_and_custom_partitioning() { + fn generate_filename_given_prefix() { let stream_name = "test_stream"; - let stream_hash = "abc123"; let parsed_timestamp = NaiveDate::from_ymd_opt(2023, 10, 1) .unwrap() .and_hms_opt(12, 30, 0) @@ -958,6 +911,12 @@ mod tests { custom_partition_values.insert("key1".to_string(), "value1".to_string()); custom_partition_values.insert("key2".to_string(), "value2".to_string()); + let prefix = format!( + "abc123.{}.minute={}.key1=value1.key2=value2", + parsed_timestamp.format("date={%Y-%m-%d}.hour={%H}"), + Minute::from(parsed_timestamp).to_slot(OBJECT_STORE_DATA_GRANULARITY), + ); + let options = Options::default(); let staging = Stream::new( Arc::new(options), @@ -967,15 +926,11 @@ mod tests { ); let expected = format!( - "{stream_hash}.date={}.hour={:02}.minute={}.key1=value1.key2=value2.{}.data.{ARROW_FILE_EXTENSION}", - parsed_timestamp.date(), - parsed_timestamp.hour(), - Minute::from(parsed_timestamp).to_slot(OBJECT_STORE_DATA_GRANULARITY), + "{prefix}.{}.data.{ARROW_FILE_EXTENSION}", hostname::get().unwrap().into_string().unwrap() ); - let generated = - staging.filename_by_partition(stream_hash, parsed_timestamp, &custom_partition_values); + let generated = staging.filename_from_prefix(&prefix); assert_eq!(generated, expected); } @@ -1015,6 +970,10 @@ mod tests { .checked_sub_signed(TimeDelta::minutes(mins)) .unwrap() .naive_utc(); + let prefix = format!( + "abc.{}.key1=value1.key2=value2", + time.format("date=%Y-%m-%d.hour=%H.minute=%M") + ); let batch = RecordBatch::try_new( Arc::new(schema.clone()), vec![ @@ -1024,14 +983,9 @@ mod tests { ], ) .unwrap(); + let parsed_timestamp = Utc::now().naive_utc(); staging - .push( - "abc", - &batch, - time, - &HashMap::new(), - StreamType::UserDefined, - ) + .push(&prefix, parsed_timestamp, &batch, StreamType::UserDefined) .unwrap(); staging.flush(true); } diff --git a/src/utils/arrow/flight.rs b/src/utils/arrow/flight.rs index c8d2dacf2..57ea88b67 100644 --- a/src/utils/arrow/flight.rs +++ b/src/utils/arrow/flight.rs @@ -95,14 +95,14 @@ pub async fn append_temporary_events( Event, Status, > { - let schema = PARSEABLE + let stream = PARSEABLE .get_stream(stream_name) - .map_err(|err| Status::failed_precondition(format!("Metadata Error: {}", err)))? - .get_schema(); + .map_err(|err| Status::failed_precondition(format!("Metadata Error: {}", err)))?; + let schema = stream.get_schema(); let rb = concat_batches(&schema, minute_result) .map_err(|err| Status::failed_precondition(format!("ArrowError: {}", err)))?; - let event = push_logs_unchecked(rb, stream_name) + let event = push_logs_unchecked(rb, &stream) .await .map_err(|err| Status::internal(err.to_string()))?; Ok(event) diff --git a/src/utils/json/mod.rs b/src/utils/json/mod.rs index efa9cb2e2..cc7edf507 100644 --- a/src/utils/json/mod.rs +++ b/src/utils/json/mod.rs @@ -21,14 +21,16 @@ use std::num::NonZeroU32; use flatten::{convert_to_array, generic_flattening, has_more_than_four_levels}; use serde::de::Visitor; -use serde_json; use serde_json::Value; +use serde_json::{self, Map}; use crate::event::format::LogSource; use crate::metadata::SchemaVersion; pub mod flatten; +pub type Json = Map; + /// calls the function `flatten_json` which results Vec or Error /// in case when Vec is returned, converts the Vec to Value of Array /// this is to ensure recursive flattening does not happen for heavily nested jsons @@ -61,32 +63,8 @@ pub fn flatten_json_body( custom_partition, validation_required, )?; - Ok(nested_value) -} -pub fn convert_array_to_object( - body: Value, - time_partition: Option<&String>, - time_partition_limit: Option, - custom_partition: Option<&String>, - schema_version: SchemaVersion, - log_source: &LogSource, -) -> Result, anyhow::Error> { - let data = flatten_json_body( - body, - time_partition, - time_partition_limit, - custom_partition, - schema_version, - true, - log_source, - )?; - let value_arr = match data { - Value::Array(arr) => arr, - value @ Value::Object(_) => vec![value], - _ => unreachable!("flatten would have failed beforehand"), - }; - Ok(value_arr) + Ok(nested_value) } struct TrueFromStr; @@ -278,4 +256,138 @@ mod tests { assert_eq!(deserialized.value, original.value); assert_eq!(deserialized.other_field, original.other_field); } + + #[test] + fn non_object_arr_is_err() { + let json = json!([1]); + + assert!(flatten_json_body( + json, + None, + None, + None, + SchemaVersion::V0, + false, + &crate::event::format::LogSource::default() + ) + .is_err()) + } + + #[test] + fn arr_obj_with_nested_type() { + let json = json!([ + { + "a": 1, + "b": "hello", + }, + { + "a": 1, + "b": "hello", + }, + { + "a": 1, + "b": "hello", + "c": [{"a": 1}] + }, + { + "a": 1, + "b": "hello", + "c": [{"a": 1, "b": 2}] + }, + ]); + let flattened_json = flatten_json_body( + json, + None, + None, + None, + SchemaVersion::V0, + false, + &crate::event::format::LogSource::default(), + ) + .unwrap(); + + assert_eq!( + json!([ + { + "a": 1, + "b": "hello", + }, + { + "a": 1, + "b": "hello", + }, + { + "a": 1, + "b": "hello", + "c_a": [1], + }, + { + "a": 1, + "b": "hello", + "c_a": [1], + "c_b": [2], + }, + ]), + flattened_json + ); + } + + #[test] + fn arr_obj_with_nested_type_v1() { + let json = json!([ + { + "a": 1, + "b": "hello", + }, + { + "a": 1, + "b": "hello", + }, + { + "a": 1, + "b": "hello", + "c": [{"a": 1}] + }, + { + "a": 1, + "b": "hello", + "c": [{"a": 1, "b": 2}] + }, + ]); + let flattened_json = flatten_json_body( + json, + None, + None, + None, + SchemaVersion::V1, + false, + &crate::event::format::LogSource::default(), + ) + .unwrap(); + + assert_eq!( + json!([ + { + "a": 1, + "b": "hello", + }, + { + "a": 1, + "b": "hello", + }, + { + "a": 1, + "b": "hello", + "c_a": 1, + }, + { + "a": 1, + "b": "hello", + "c_a": 1, + "c_b": 2, + }, + ]), + flattened_json + ); + } }